Compare commits

..

22 Commits
v0.0.4 ... main

Author SHA1 Message Date
Mike Stepanovic
6144152245
Merge pull request #4 from FlipsideCrypto/feature/review-comments
Addressed v1.0.0 comments
2025-06-13 10:40:07 -06:00
Mike Stepanovic
17ced5f779 addressed review comments 2025-06-12 12:45:36 -06:00
Mike Stepanovic
8f605f16de fixed typo with flatten in msg_attributes 2025-06-05 13:56:13 -06:00
Mike Stepanovic
ea78030cec fixed typos in realtime configs 2025-06-05 13:48:22 -06:00
Mike Stepanovic
59dd7b736e a slew of core fixes 2025-06-05 13:33:07 -06:00
Mike Stepanovic
8b01ccf088 cleaned up table names and fixed jinja issues 2025-06-05 11:10:15 -06:00
Mike Stepanovic
95e3d72354 bumped fsc-utils version 2025-06-03 16:16:53 -06:00
Mike Stepanovic
7c7de27891 adding column and table descriptions 2025-06-03 16:16:08 -06:00
Mike Stepanovic
ead0855985
Merge pull request #3 from FlipsideCrypto/add-neutron-vars
add neutron config
2025-06-03 15:13:30 -06:00
Mike Stepanovic
87be871708 add neutron config 2025-06-03 15:12:56 -06:00
Mike Stepanovic
757336a518
Merge pull request #2 from FlipsideCrypto/rm-tx-counts
use blocks for SL transactions
2025-05-23 07:38:24 -06:00
Eric Laurello
f0601c91bc use blocks for SL transactions 2025-05-22 16:00:59 -04:00
Mike Stepanovic
8e0b61cb38 remove 2 day lookback from blocks 2025-05-22 11:59:07 -06:00
Mike Stepanovic
a646e707a3 switch incremental strategy to merge 2025-05-22 11:51:05 -06:00
Mike Stepanovic
fbc56fd423 udpated sources with freshness tests, added minimum S/G tests and docs, removed transfers (TODO), modified msgs schema 2025-05-20 10:51:08 -06:00
Mike Stepanovic
70e395f113 push changes to price models 2025-05-19 14:16:29 -06:00
Mike Stepanovic
c0ee37fd1a push observ 2025-05-19 14:07:15 -06:00
Mike Stepanovic
7bbf6deb7d changed mat type 2025-05-19 13:51:18 -06:00
Mike Stepanovic
2da685386a fixed stats, added todos 2025-05-19 13:50:49 -06:00
Mike Stepanovic
60569409d1 pushing logs models 2025-05-19 13:13:18 -06:00
Mike Stepanovic
a352af34f0 fixed ref 2025-05-15 14:34:04 -06:00
Mike Stepanovic
0666ba33fb removed admin models, updated sources, revised SL blocks 2025-05-15 14:33:01 -06:00
54 changed files with 1198 additions and 505 deletions

View File

@ -3,7 +3,7 @@
'GLOBAL_PROJECT_NAME': 'cosmos',
'GLOBAL_NODE_PROVIDER': 'quicknode',
'GLOBAL_NODE_VAULT_PATH': 'vault/prod/cosmos/quicknode/mainnet',
'GLOBAL_NODE_URL': '{service}/{Authentication}',
'GLOBAL_NODE_URL': '{Service}/{Authentication}',
'GLOBAL_WRAPPED_NATIVE_ASSET_ADDRESS': '',
'MAIN_SL_BLOCKS_PER_HOUR': 3600
} %}

View File

@ -0,0 +1,12 @@
{% macro neutron_vars() %}
{% set vars = {
'GLOBAL_PROJECT_NAME': 'neutron',
'GLOBAL_NODE_PROVIDER': 'publicnode',
'GLOBAL_NODE_VAULT_PATH': 'vault/prod/neutron/mainnet',
'GLOBAL_NODE_URL': '{Service}/{Authentication}',
'GLOBAL_WRAPPED_NATIVE_ASSET_ADDRESS': '',
'MAIN_SL_BLOCKS_PER_HOUR': 3600
} %}
{{ return(vars) }}
{% endmacro %}

View File

@ -28,10 +28,9 @@
AND b.partition_key = s.partition_key
WHERE
b.partition_key = s.partition_key
AND DATA :error IS NULL
{% endmacro %}
{% macro streamline_external_table_FR_query_v2(
{% macro streamline_external_table_query_fr(
source_name,
partition_function="CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER)"
) %}
@ -62,5 +61,4 @@ FROM
AND b.partition_key = s.partition_key
WHERE
b.partition_key = s.partition_key
AND DATA :error IS NULL
{% endmacro %}

View File

@ -1,26 +0,0 @@
{# Log configuration details #}
{{ log_model_details() }}
{{ config(
materialized = 'view',
tags = ['silver','admin','variables','phase_1']
) }}
SELECT
PACKAGE,
category,
variable_key AS key,
default_value,
default_type,
_inserted_timestamp,
{{ dbt_utils.generate_surrogate_key(
['variable_key']
) }} AS dim_variables_id
FROM
{{ source(
'fsc_evm_admin',
'_master_keys'
) }}
qualify(ROW_NUMBER() over (PARTITION BY variable_key
ORDER BY
_inserted_timestamp DESC)) = 1

View File

@ -1,20 +0,0 @@
version: 2
models:
- name: admin__dim_variables
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- DIM_VARIABLES_ID
columns:
- name: KEY
tests:
- not_null
- name: DEFAULT_VALUE
tests:
- not_null
- name: DEFAULT_TYPE
tests:
- not_null
- name: DIM_VARIABLES_ID
tests:
- not_null

View File

@ -1,27 +0,0 @@
{# Log configuration details #}
{{ log_model_details() }}
{{ config(
materialized = 'view',
tags = ['silver','admin','variables','phase_1']
) }}
SELECT
f.project,
d.PACKAGE,
d.CATEGORY,
f.key,
f.value,
f.parent_key,
d.default_value,
d.default_type,
{{ dbt_utils.generate_surrogate_key(
['f.project', 'f.key', 'f.parent_key']
) }} AS ez_variables_id
FROM
{{ ref('admin__fact_variables') }}
f
LEFT JOIN {{ ref('admin__dim_variables') }}
d
ON f.key = d.key
OR f.parent_key = d.key

View File

@ -1,26 +0,0 @@
version: 2
models:
- name: admin__ez_variables
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- EZ_VARIABLES_ID
columns:
- name: PROJECT
tests:
- not_null
- name: KEY
tests:
- not_null
- name: VALUE
tests:
- not_null
- name: DEFAULT_VALUE
tests:
- not_null
- name: DEFAULT_TYPE
tests:
- not_null
- name: EZ_VARIABLES_ID
tests:
- not_null

View File

@ -1,34 +0,0 @@
{# Get variables #}
{% set vars = return_vars() %}
{# Log configuration details #}
{{ log_model_details() }}
{{ config(
materialized = 'table',
tags = ['silver','admin','rpc_settings','phase_1']
) }}
SELECT
blockchain,
receipts_by_block,
blocks_per_hour,
blocks_fields,
transactions_fields,
receipts_fields,
traces_fields,
inserted_at AS rpc_sampled_at
FROM
{{ source(
"fsc_evm_admin",
"rpc_node_logs"
) }}
WHERE
RESULT :error :: STRING IS NULL
AND LOWER(blockchain) = LOWER('{{ vars.GLOBAL_PROJECT_NAME }}')
AND LOWER(network) = LOWER('{{ vars.GLOBAL_NETWORK }}') qualify ROW_NUMBER() over (
PARTITION BY blockchain,
network
ORDER BY
rpc_sampled_at DESC
) = 1

View File

@ -1,68 +0,0 @@
{# Log configuration details #}
{{ log_model_details() }}
{{ config(
materialized = 'view',
tags = ['silver','admin','variables','phase_1']
) }}
{%- set vars_data = vars_config(all_projects=true) -%}
{%- set project = target.database.lower() | replace('_dev', '') -%}
WITH flattened_data AS (
{% for project, project_config in vars_data.items() %}
{% for key, value in project_config.items() %}
{% if value is mapping %}
{% for nested_key, nested_value in value.items() %}
SELECT
'{{ project }}' AS project,
'{{ nested_key }}' AS key,
{% if nested_value is string %}
'{{ nested_value }}' AS VALUE,
{% elif nested_value is iterable and nested_value is not string %}
'{{ nested_value | tojson }}' AS VALUE,
{% else %}
'{{ nested_value }}' AS VALUE,
{% endif %}
'{{ key }}' AS parent_key
{% if not loop.last %}UNION ALL{% endif %}
{% endfor %}
{% if not loop.last %}UNION ALL{% endif %}
{% else %}
SELECT
'{{ project }}' AS project,
'{{ key }}' AS key,
{% if value is string %}
'{{ value }}' AS VALUE,
{% elif value is iterable and value is not string %}
'{{ value | tojson }}' AS VALUE,
{% else %}
'{{ value }}' AS VALUE,
{% endif %}
NULL AS parent_key
{% if not loop.last %}UNION ALL{% endif %}
{% endif %}
{% endfor %}
{% if not loop.last %}UNION ALL{% endif %}
{% endfor %}
)
SELECT
project,
key,
VALUE,
parent_key,
{{ dbt_utils.generate_surrogate_key(
['project', 'key', 'parent_key']
) }} AS fact_variables_id
FROM
flattened_data
{% if project != 'fsc_evm' %}
WHERE
project = '{{ project }}'
{% endif %}

View File

@ -1,20 +0,0 @@
version: 2
models:
- name: admin__fact_variables
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- FACT_VARIABLES_ID
columns:
- name: PROJECT
tests:
- not_null
- name: KEY
tests:
- not_null
- name: VALUE
tests:
- not_null
- name: FACT_VARIABLES_ID
tests:
- not_null

View File

@ -1,25 +0,0 @@
{# Get variables #}
{% set vars = return_vars() %}
{# Log configuration details #}
{{ log_model_details() }}
{{ config(
materialized = 'incremental',
cluster_by = 'round(_id,-3)',
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(_id)",
full_refresh = false,
tags = ['silver','admin','phase_1']
) }}
SELECT
ROW_NUMBER() over (
ORDER BY
SEQ4()
) - 1 :: INT AS _id
FROM
TABLE(GENERATOR(rowcount => {{ vars.GLOBAL_MAX_SEQUENCE_NUMBER }}))
WHERE 1=1
{% if is_incremental() %}
AND 1=0
{% endif %}

View File

@ -1,14 +0,0 @@
version: 2
models:
- name: admin__number_sequence
description: |
This model generates a sequence of numbers for a given range.
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- _ID
columns:
- name: _ID
tests:
- not_null
description: Primary key for the table

View File

@ -4,5 +4,5 @@
) }}
{{ streamline_external_table_query(
source_name = 'tx_counts'
source_name = 'tx_count'
) }}

View File

@ -4,5 +4,5 @@
) }}
{{ streamline_external_table_query_fr(
source_name = 'tx_counts'
source_name = 'tx_count'
) }}

View File

@ -0,0 +1,172 @@
version: 2
models:
- name: core__fact_blocks
description: '{{ doc("core__fact_blocks") }}'
data_tests:
- dbt_utils.recency:
datepart: hour
field: modified_timestamp
interval: 3
severity: error
tags: ['test_recency']
columns:
- name: blockchain
description: '{{ doc("blockchain") }}'
- name: block_id
description: '{{ doc("block_id") }}'
- name: block_timestamp
description: '{{ doc("block_timestamp") }}'
- name: chain_id
description: '{{ doc("chain_id") }}'
- name: tx_count
description: '{{ doc("tx_count") }}'
- name: proposer_address
description: '{{ doc("proposer_address") }}'
- name: validator_hash
description: '{{ doc("validator_hash") }}'
- name: fact_blocks_id
description: '{{ doc("pk") }}'
- name: inserted_timestamp
description: '{{ doc("inserted_timestamp") }}'
- name: modified_timestamp
description: '{{ doc("modified_timestamp") }}'
- name: core__fact_msg_attributes
description: '{{ doc("core__fact_msg_attributes") }}'
data_tests:
- dbt_utils.recency:
datepart: hour
field: modified_timestamp
interval: 3
severity: error
tags: ['test_recency']
columns:
- name: block_id
description: '{{ doc("block_id") }}'
- name: block_timestamp
description: '{{ doc("block_timestamp") }}'
- name: tx_id
description: '{{ doc("tx_id") }}'
- name: tx_succeeded
description: '{{ doc("tx_succeeded") }}'
- name: msg_group
description: '{{ doc("msg_group") }}'
- name: msg_index
description: '{{ doc("msg_index") }}'
- name: msg_type
description: '{{ doc("msg_type") }}'
- name: attribute_index
description: '{{ doc("attribute_index") }}'
- name: attribute_key
description: '{{ doc("attribute_key") }}'
- name: attribute_value
description: '{{ doc("attribute_value") }}'
- name: fact_msg_attributes_id
description: '{{ doc("pk") }}'
- name: inserted_timestamp
description: '{{ doc("inserted_timestamp") }}'
- name: modified_timestamp
description: '{{ doc("modified_timestamp") }}'
- name: core__fact_msgs
description: '{{ doc("core__fact_msgs") }}'
data_tests:
- dbt_utils.recency:
datepart: hour
field: modified_timestamp
interval: 3
severity: error
tags: ['test_recency']
columns:
- name: block_id
description: '{{ doc("block_id") }}'
- name: block_timestamp
description: '{{ doc("block_timestamp") }}'
- name: tx_id
description: '{{ doc("tx_id") }}'
- name: tx_succeeded
description: '{{ doc("tx_succeeded") }}'
- name: msg_group
description: '{{ doc("msg_group") }}'
- name: msg_type
description: '{{ doc("msg_type") }}'
- name: msg_index
description: '{{ doc("msg_index") }}'
- name: msg
description: '{{ doc("msg") }}'
- name: fact_msgs_id
description: '{{ doc("pk") }}'
- name: inserted_timestamp
description: '{{ doc("inserted_timestamp") }}'
- name: modified_timestamp
description: '{{ doc("modified_timestamp") }}'
- name: core__fact_transactions
description: '{{ doc("core__fact_transactions") }}'
data_tests:
- dbt_utils.recency:
datepart: hour
field: modified_timestamp
interval: 3
severity: error
tags: ['test_recency']
columns:
- name: block_id
description: '{{ doc("block_id") }}'
- name: block_timestamp
description: '{{ doc("block_timestamp") }}'
- name: codespace
description: '{{ doc("codespace") }}'
- name: tx_id
description: '{{ doc("tx_id") }}'
- name: tx_succeeded
description: '{{ doc("tx_succeeded") }}'
- name: tx_code
description: '{{ doc("tx_code") }}'
- name: tx_log
description: '{{ doc("tx_log") }}'
- name: gas_used
description: '{{ doc("gas_used") }}'
- name: gas_wanted
description: '{{ doc("gas_wanted") }}'
- name: fact_transactions_id
description: '{{ doc("pk") }}'
- name: inserted_timestamp
description: '{{ doc("inserted_timestamp") }}'
- name: modified_timestamp
description: '{{ doc("modified_timestamp") }}'
- name: core__fact_transactions_logs
description: '{{ doc("core__fact_transactions_logs") }}'
data_tests:
- dbt_utils.recency:
datepart: hour
field: modified_timestamp
interval: 3
severity: error
tags: ['test_recency']
columns:
- name: block_id
description: '{{ doc("block_id") }}'
- name: block_timestamp
description: '{{ doc("block_timestamp") }}'
- name: tx_id
description: '{{ doc("tx_id") }}'
- name: tx_succeeded
description: '{{ doc("tx_succeeded") }}'
- name: tx_code
description: '{{ doc("tx_code") }}'
- name: codespace
description: '{{ doc("codespace") }}'
- name: tx_log
description: '{{ doc("tx_log") }}'
- name: transactions_logs_id
description: '{{ doc("pk") }}'
- name: inserted_timestamp
description: '{{ doc("inserted_timestamp") }}'
- name: modified_timestamp
description: '{{ doc("modified_timestamp") }}'

View File

@ -2,11 +2,11 @@
{% set vars = return_vars() %}
{# Set fact_blocks specific variables #}
{% set rpc_vars = set_dynamic_fields('fact_blocks') %}
{#{% set rpc_vars = set_dynamic_fields('fact_blocks') %}#}
{{ config(
materialized = 'incremental',
incremental_strategy = 'delete+insert',
incremental_strategy = 'merge',
unique_key = 'blocks_id',
cluster_by = ['block_timestamp::DATE'],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(block_id)",

View File

@ -1,29 +1,62 @@
{# Get variables #}
{% set vars = return_vars() %}
{# Set fact_transactions specific variables #}
{% set rpc_vars = set_dynamic_fields('fact_transactions') %}
-- depends_on: {{ ref('silver__msgs') }}
{{ config(
{{ config (
materialized = 'incremental',
incremental_strategy = 'delete+insert',
incremental_strategy = 'merge',
unique_key = 'msg_attributes_id',
cluster_by = ['block_timestamp::DATE'],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(block_id, tx_id)",
cluster_by = ['modified_timestamp::DATE'],
incremental_predicates = [fsc_ibc.standard_predicate()],
tags = ['gold', 'core', 'phase_2']
tags = ['silver', 'core', 'phase_2']
) }}
WITH silver_msgs AS (
SELECT
block_id,
block_timestamp,
tx_id,
tx_succeeded,
msg_group,
msg_sub_group,
msg_index,
msg_type,
f.index AS attribute_index,
CASE
WHEN TRY_BASE64_DECODE_STRING(f.value :key) IS NULL
THEN f.value :key
ELSE TRY_BASE64_DECODE_STRING(f.value :key)
END AS attribute_key,
CASE
WHEN TRY_BASE64_DECODE_STRING(f.value :key) IS NULL
THEN f.value :value
ELSE TRY_BASE64_DECODE_STRING(f.value :value)
END AS attribute_value,
msgs._inserted_timestamp
FROM
{{ ref('silver__msgs') }} AS msgs,
LATERAL FLATTEN(
input => msgs.msg,
path => 'attributes'
) AS f
{% if is_incremental() %}
WHERE
_inserted_timestamp :: DATE >= (
SELECT
MAX(_inserted_timestamp) :: DATE - 2
FROM
{{ this }}
)
{% endif %}
)
SELECT
block_id,
block_timestamp,
tx_id,
tx_succeeded,
CONCAT(
msg_group,
':',
msg_sub_group
) AS msg_group,
msg_group,
msg_sub_group,
msg_index,
msg_type,
attribute_index,
@ -31,18 +64,10 @@ SELECT
attribute_value,
{{ dbt_utils.generate_surrogate_key(
['tx_id','msg_index','attribute_index']
) }} AS fact_msg_attributes_id,
) }} AS msg_attributes_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
_inserted_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
{{ ref('silver__msg_attributes') }}
{% if is_incremental() %}
WHERE
modified_timestamp :: DATE >= (
SELECT
MAX(modified_timestamp)
FROM
{{ this }}
)
{% endif %}
silver_msgs

View File

@ -2,11 +2,11 @@
{% set vars = return_vars() %}
{# Set fact_transactions specific variables #}
{% set rpc_vars = set_dynamic_fields('fact_transactions') %}
{#{% set rpc_vars = set_dynamic_fields('fact_transactions') %}#}
{{ config(
materialized = 'incremental',
incremental_strategy = 'delete+insert',
incremental_strategy = 'merge',
unique_key = 'transactions_id',
cluster_by = ['block_timestamp::DATE'],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(block_id, tx_id)",

View File

@ -2,11 +2,11 @@
{% set vars = return_vars() %}
{# Set fact_transactions specific variables #}
{% set rpc_vars = set_dynamic_fields('fact_transactions') %}
{#{% set rpc_vars = set_dynamic_fields('fact_transactions') %}#}
{{ config(
materialized = 'incremental',
incremental_strategy = 'delete+insert',
incremental_strategy = 'merge',
unique_key = 'transactions_id',
cluster_by = ['block_timestamp::DATE'],
incremental_predicates = [fsc_ibc.standard_predicate()],

View File

@ -0,0 +1,40 @@
{# Get variables #}
{% set vars = return_vars() %}
{# Set fact_transactions_logs specific variables #}
{#{% set rpc_vars = set_dynamic_fields('fact_transactions_logs') %}#}
{{ config(
materialized = 'incremental',
incremental_strategy = 'merge',
unique_key = 'transactions_logs_id',
cluster_by = ['block_timestamp::DATE'],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(block_id, tx_id)",
incremental_predicates = [fsc_ibc.standard_predicate()],
tags = ['gold', 'core', 'phase_2'],
enabled = false
) }}
SELECT
block_id,
block_timestamp,
tx_id,
tx_succeeded,
tx_code,
codespace,
tx_log,
transactions_logs_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
{{ ref('silver__transactions_logs') }}
{% if is_incremental() %}
WHERE
modified_timestamp :: DATE >= (
SELECT
MAX(modified_timestamp)
FROM
{{ this }}
)
{% endif %}

View File

@ -0,0 +1,74 @@
{# Get variables #}
{% set vars = return_vars() %}
{{ log_model_details() }}
{{ config(
materialized = 'incremental',
incremental_strategy = 'delete+insert',
unique_key = "block_timestamp_hour",
cluster_by = ['block_timestamp_hour::DATE'],
meta ={ 'database_tags':{ 'table':{ 'PURPOSE': 'STATS, METRICS, CORE, HOURLY' } } },
tags = ['gold','stats','curated','phase_4']
) }}
WITH txs AS (
SELECT
block_timestamp_hour,
block_id_min,
block_id_max,
block_count,
transaction_count,
transaction_count_success,
transaction_count_failed,
total_gas_used,
-- unique_from_count, -- TODO: Add to silver_stats__core_metrics_hourly when available
-- total_fees, -- TODO: Add to silver_stats__core_metrics_hourly when available
LAST_VALUE(
p.close ignore nulls
) over (
ORDER BY
block_timestamp_hour rows unbounded preceding
) AS imputed_close,
s.inserted_timestamp,
s.modified_timestamp
FROM
{{ ref('silver_stats__core_metrics_hourly') }} s
LEFT JOIN {{ ref('silver__hourly_prices_coingecko') }} p
ON s.block_timestamp_hour = p.recorded_hour
AND p.id = '{{ vars.GLOBAL_PROJECT_NAME }}'
)
SELECT
block_timestamp_hour,
block_id_min AS block_number_min,
block_id_max AS block_number_max,
block_count,
transaction_count,
transaction_count_success,
transaction_count_failed,
-- unique_from_count, -- TODO: Add to silver_stats__core_metrics_hourly when available
total_gas_used AS total_fees_native, -- TODO: Replace with total_fees when available
ROUND(
total_gas_used * imputed_close,
2
) AS total_fees_usd,
{{ dbt_utils.generate_surrogate_key(['block_timestamp_hour']) }} AS ez_core_metrics_hourly_id,
inserted_timestamp,
modified_timestamp
FROM
txs
WHERE
block_timestamp_hour < DATE_TRUNC('hour', CURRENT_TIMESTAMP)
{% if is_incremental() %}
AND
block_timestamp_hour >= COALESCE(
DATEADD('hour', -4, (
SELECT DATE_TRUNC('hour', MIN(block_timestamp_hour))
FROM {{ ref('silver_stats__core_metrics_hourly') }}
WHERE modified_timestamp >= (
SELECT MAX(modified_timestamp) FROM {{ this }}
)
)),
'2025-01-01 00:00:00'
)
{% endif %}

View File

@ -0,0 +1,200 @@
version: 2
models:
- name: silver__blocks
config:
contract:
enforced: false
data_tests:
- dbt_utils.sequential_values:
column_name: block_id
interval: 1
config:
error_if: ">100"
tags: ['test_recency']
columns:
- name: block_id
data_type: NUMBER
data_tests:
- not_null:
tags: ['test_quality']
- name: block_timestamp
data_type: TIMESTAMP_NTZ
data_tests:
- not_null:
tags: ['test_quality']
- name: chain_id
data_type: VARCHAR
- name: tx_count
data_type: NUMBER
data_tests:
- dbt_utils.expression_is_true:
expression: ">=0"
tags: ['test_quality']
- name: proposer_address
data_type: VARCHAR
data_tests:
- not_null:
tags: ['test_quality']
- name: validator_hash
data_type: VARCHAR
data_tests:
- not_null:
tags: ['test_quality']
- name: header
data_type: VARIANT
- name: _inserted_timestamp
data_type: TIMESTAMP_NTZ
- name: blocks_id
data_type: VARCHAR
- name: inserted_timestamp
data_type: TIMESTAMP_NTZ
- name: modified_timestamp
data_type: TIMESTAMP_NTZ
- name: _invocation_id
data_type: VARCHAR
- name: silver__msg_attributes
columns:
- name: block_id
data_type: NUMBER
- name: block_timestamp
data_type: TIMESTAMP_NTZ
- name: tx_id
data_type: VARCHAR
- name: tx_succeeded
data_type: BOOLEAN
- name: msg_group
data_type: NUMBER
- name: msg_sub_group
data_type: NUMBER
- name: msg_index
data_type: NUMBER
- name: msg_type
data_type: VARCHAR
- name: attribute_index
data_type: NUMBER
- name: attribute_key
data_type: VARCHAR
- name: attribute_value
data_type: VARCHAR
- name: msg_attributes_id
data_type: VARCHAR
- name: inserted_timestamp
data_type: TIMESTAMP_NTZ
- name: modified_timestamp
data_type: TIMESTAMP_NTZ
- name: _inserted_timestamp
data_type: TIMESTAMP_NTZ
- name: _invocation_id
data_type: VARCHAR
- name: silver__msgs
columns:
- name: block_id
data_type: NUMBER
- name: block_timestamp
data_type: TIMESTAMP_NTZ
- name: tx_id
data_type: VARCHAR
- name: tx_succeeded
data_type: BOOLEAN
- name: msg_group
data_type: NUMBER
- name: msg_sub_group
data_type: NUMBER
- name: msg_index
data_type: NUMBER
- name: msg_type
data_type: VARCHAR
- name: msg
data_type: VARIANT
- name: msgs_id
data_type: VARCHAR
- name: inserted_timestamp
data_type: TIMESTAMP_NTZ
- name: modified_timestamp
data_type: TIMESTAMP_NTZ
- name: _inserted_timestamp
data_type: TIMESTAMP_NTZ
- name: _invocation_id
data_type: VARCHAR
- name: silver__transactions_logs
columns:
- name: block_id
data_type: NUMBER
- name: block_timestamp
data_type: TIMESTAMP_NTZ
- name: tx_id
data_type: VARCHAR
- name: tx_succeeded
data_type: BOOLEAN
- name: tx_code
data_type: NUMBER
- name: codespace
data_type: VARIANT
- name: tx_log
data_type: VARIANT
- name: transactions_logs_id
data_type: VARCHAR
- name: _inserted_timestamp
data_type: TIMESTAMP_NTZ
- name: inserted_timestamp
data_type: TIMESTAMP_NTZ
- name: modified_timestamp
data_type: TIMESTAMP_NTZ
- name: _invocation_id
data_type: VARCHAR
- name: silver__transactions
config:
contract:
enforced: false
columns:
- name: block_id
data_type: NUMBER
data_tests:
- not_null:
tags: ['test_quality']
- name: block_timestamp
data_type: TIMESTAMP_NTZ
data_tests:
- not_null:
tags: ['test_quality']
- name: codespace
data_type: VARIANT
- name: tx_id
data_type: VARCHAR
data_tests:
- not_null:
tags: ['test_quality']
- name: tx_index
data_type: NUMBER
- name: tx_log
data_type: VARCHAR
- name: tx_succeeded
data_type: BOOLEAN
- name: gas_used
data_type: NUMBER
- name: gas_wanted
data_type: NUMBER
- name: tx_code
data_type: NUMBER
- name: DATA
data_type: VARIANT
- name: partition_key
data_type: VARCHAR
- name: block_id_requested
data_type: NUMBER
- name: _inserted_timestamp
data_type: TIMESTAMP_NTZ
- name: transactions_id
data_type: VARCHAR
- name: inserted_timestamp
data_type: TIMESTAMP_NTZ
- name: modified_timestamp
data_type: TIMESTAMP_NTZ
- name: _invocation_id
data_type: VARCHAR
- name: silver__transfers

View File

@ -1,11 +1,9 @@
{# Get variables #}
{% set vars = return_vars() %}
-- depends_on: {{ ref('bronze__blocks') }}
{{ config(
materialized = 'incremental',
incremental_strategy = 'delete+insert',
incremental_strategy = 'merge',
unique_key = 'blocks_id',
cluster_by = ['modified_timestamp::DATE'],
incremental_predicates = [fsc_ibc.standard_predicate()],
@ -13,9 +11,10 @@
) }}
WITH bronze_blocks AS (
SELECT
'{{ vars.GLOBAL_PROJECT_NAME }}' AS blockchain,
block_id,
VALUE :BLOCK_ID :: INT AS block_id,
COALESCE(
DATA :result :block :header :time :: TIMESTAMP,
DATA :block :header :time :: TIMESTAMP,
@ -27,8 +26,12 @@ WITH bronze_blocks AS (
DATA :block :header :chain_id :: STRING
) AS chain_id,
COALESCE(
ARRAY_SIZE(DATA :result :block :data :txs) :: NUMBER,
ARRAY_SIZE(DATA :block :data :txs) :: NUMBER
ARRAY_SIZE(
DATA :result :block :data :txs
) :: NUMBER,
ARRAY_SIZE(
DATA :block :data :txs
) :: NUMBER
) AS tx_count,
COALESCE(
DATA :result :block :header :proposer_address :: STRING,
@ -42,23 +45,29 @@ WITH bronze_blocks AS (
DATA :result :block :header,
DATA :block :header
) AS header,
_inserted_timestamp
inserted_timestamp
FROM
{{ ref('bronze__blocks') }}
WHERE
VALUE :data :error IS NULL
AND DATA :error IS NULL
AND DATA :result :begin_block_events IS NULL
{% if is_incremental() %}
AND _inserted_timestamp :: DATE >= (
SELECT
MAX(_inserted_timestamp) :: DATE - 2
FROM
{{ this }}
)
{% endif %}
{% if is_incremental() %}
{{ ref('bronze__blocks') }}
{% else %}
{{ ref('bronze__blocks_fr') }}
{% endif %}
WHERE
VALUE :data :error IS NULL
AND DATA :error IS NULL
{% if is_incremental() %}
AND inserted_timestamp >= (
SELECT
MAX(inserted_timestamp)
FROM
{{ this }}
)
{% endif %}
)
SELECT
blockchain,
block_id,
block_timestamp,
chain_id,
@ -66,14 +75,17 @@ SELECT
proposer_address,
validator_hash,
header,
_inserted_timestamp,
{{ dbt_utils.generate_surrogate_key(['chain_id', 'block_id']) }} AS blocks_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
bronze_blocks
QUALIFY ROW_NUMBER() over (
PARTITION BY chain_id, block_id
ORDER BY _inserted_timestamp DESC
) = 1
bronze_blocks
WHERE
block_id is not null
QUALIFY ROW_NUMBER() OVER (
PARTITION BY chain_id,
block_id
ORDER BY
inserted_timestamp DESC
) = 1

View File

@ -5,7 +5,7 @@
{{ config (
materialized = 'incremental',
incremental_strategy = 'delete+insert',
incremental_strategy = 'merge',
unique_key = 'msg_attributes_id',
cluster_by = ['modified_timestamp::DATE'],
incremental_predicates = [fsc_ibc.standard_predicate()],
@ -35,11 +35,11 @@ WITH silver_msgs AS (
END AS attribute_value,
msgs._inserted_timestamp
FROM
{{ ref('silver__msgs') }} AS msgs
LATERAL FLATTEN(
input => msgs.msg,
path => 'attributes'
) AS f
{{ ref('silver__msgs') }} AS msgs,
LATERAL FLATTEN(
input => msgs.msg,
path => 'attributes'
) AS f
{% if is_incremental() %}
WHERE
_inserted_timestamp :: DATE >= (

View File

@ -5,9 +5,9 @@
{{ config (
materialized = 'incremental',
incremental_strategy = 'delete+insert',
incremental_strategy = 'merge',
unique_key = 'msgs_id',
cluster_by = ['modified_timestamp::DATE', 'partition_key'],
cluster_by = ['modified_timestamp::DATE'],
incremental_predicates = [fsc_ibc.standard_predicate()],
tags = ['silver', 'core', 'phase_2']
) }}
@ -59,7 +59,7 @@ WITH bronze_msgs AS (
THEN msg :attributes [0] :key
ELSE TRY_BASE64_DECODE_STRING(msg :attributes [0] :value)
END AS attribute_value,
t._inserted_timestamp
transactions._inserted_timestamp
FROM
{{ ref('silver__transactions') }} transactions
JOIN LATERAL FLATTEN(input => transactions.msgs) f
@ -124,12 +124,7 @@ msgs AS (
bronze_msgs.msg_index,
msg_type,
msg,
concat_ws(
'-',
bronze_msgs.tx_id,
bronze_msgs.msg_index
) AS unique_key,
_inserted_timestamp
bronze_msgs._inserted_timestamp
FROM
bronze_msgs
LEFT JOIN GROUPING b
@ -146,7 +141,6 @@ SELECT
msg_index,
msg_type,
msg :: OBJECT AS msg,
unique_key,
{{ dbt_utils.generate_surrogate_key(
['tx_id','msg_index']
) }} AS msgs_id,

View File

@ -0,0 +1,76 @@
{# Get variables #}
{% set vars = return_vars() %}
{{ config (
materialized = "incremental",
incremental_strategy = 'merge',
unique_key = 'transactions_id',
cluster_by = ['modified_timestamp::DATE','partition_key'],
incremental_predicates = [fsc_ibc.standard_predicate()],
tags = ['silver', 'core', 'phase_2']
) }}
-- depends_on: {{ ref('bronze__transactions') }}
-- depends_on: {{ ref('bronze__transactions_fr') }}
WITH bronze_transactions AS (
SELECT
VALUE :BLOCK_ID_REQUESTED AS block_id,
TO_TIMESTAMP_NTZ(
VALUE:BLOCK_TIMESTAMP::STRING,
'YYYY_MM_DD_HH_MI_SS_FF3'
) AS block_timestamp,
DATA :hash :: STRING AS tx_id,
DATA :index AS tx_index,
DATA :tx_result :codespace :: STRING AS codespace,
DATA :tx_result :gas_used :: NUMBER AS gas_used,
DATA :tx_result :gas_wanted :: NUMBER AS gas_wanted,
CASE
WHEN DATA :tx_result :code :: NUMBER = 0 THEN TRUE
ELSE FALSE
END AS tx_succeeded,
DATA :tx_result :code :: INT AS tx_code,
COALESCE(
TRY_PARSE_JSON(
DATA :tx_result :log
),
DATA :tx_result :log
) AS tx_log,
DATA :tx_result :events AS msgs,
DATA,
partition_key,
inserted_timestamp AS _inserted_timestamp,
{{ dbt_utils.generate_surrogate_key(
['block_id', 'tx_id']
) }} AS transactions_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
{% if is_incremental() %}
{{ ref('bronze__transactions') }}
{% else %}
{{ ref('bronze__transactions_fr') }}
{% endif %}
WHERE
DATA <> PARSE_JSON('[]')
{% if is_incremental() %}
AND
inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp)
FROM
{{ this }}
)
{% endif %}
)
SELECT
*
FROM
bronze_transactions
QUALIFY ROW_NUMBER() OVER (
PARTITION BY block_id, tx_id
ORDER BY _inserted_timestamp DESC
) = 1

View File

@ -0,0 +1,129 @@
{{ config(
materialized = 'incremental',
unique_key = ['tx_id'],
incremental_strategy = 'merge',
cluster_by = ['block_timestamp::DATE'],
tags = ['silver', 'core', 'phase_2']
) }}
WITH event_attributes AS (
SELECT
t.block_id,
t.block_timestamp,
t.tx_id,
t.tx_succeeded,
t.tx_code,
t.codespace,
COALESCE(l.value:msg_index::INTEGER, 0) as msg_index,
e.value:type::STRING as event_type,
ARRAY_AGG(
OBJECT_CONSTRUCT(
'key', a.value:key::STRING,
'value', COALESCE(a.value:value::STRING, '')
)
) as attributes,
t._inserted_timestamp
FROM {{ ref('silver__transactions') }} t,
LATERAL FLATTEN(input => PARSE_JSON(t.tx_log)) l,
LATERAL FLATTEN(input => l.value:events) e,
LATERAL FLATTEN(input => e.value:attributes) a
WHERE t.tx_log IS NOT NULL
AND t.tx_succeeded = TRUE
AND IS_OBJECT(PARSE_JSON(t.tx_log))
{% if is_incremental() %}
AND t._inserted_timestamp >= (
SELECT MAX(_inserted_timestamp)
FROM {{ this }}
)
{% endif %}
GROUP BY
t.block_id,
t.block_timestamp,
t.tx_id,
t.tx_succeeded,
t.tx_code,
t.codespace,
l.value:msg_index,
e.value:type,
t._inserted_timestamp
),
parsed_logs AS (
SELECT
block_id,
block_timestamp,
tx_id,
tx_succeeded,
tx_code,
codespace,
ARRAY_AGG(
OBJECT_CONSTRUCT(
'msg_index', msg_index,
'event_type', event_type,
'attributes', attributes
)
) WITHIN GROUP (ORDER BY msg_index, event_type) as tx_log,
_inserted_timestamp
FROM event_attributes
GROUP BY
block_id,
block_timestamp,
tx_id,
tx_succeeded,
tx_code,
codespace,
_inserted_timestamp
),
failed_txs AS (
SELECT
t.block_id,
t.block_timestamp,
t.tx_id,
t.tx_succeeded,
t.tx_code,
t.codespace,
ARRAY_CONSTRUCT(
OBJECT_CONSTRUCT(
'msg_index', 0,
'event_type', 'error',
'attributes', ARRAY_CONSTRUCT(
OBJECT_CONSTRUCT(
'key', 'error_message',
'value', t.tx_log::STRING
)
)
)
) as tx_log,
t._inserted_timestamp
FROM {{ ref('silver__transactions') }} t
WHERE t.tx_succeeded = FALSE
AND t.tx_log IS NOT NULL
{% if is_incremental() %}
AND t._inserted_timestamp >= (
SELECT MAX(_inserted_timestamp)
FROM {{ this }}
)
{% endif %}
)
SELECT
block_id,
block_timestamp,
tx_id,
tx_succeeded,
tx_code,
codespace,
tx_log,
{{ dbt_utils.generate_surrogate_key(['tx_id']) }} as transactions_logs_id,
_inserted_timestamp,
CURRENT_TIMESTAMP() as inserted_timestamp,
CURRENT_TIMESTAMP() as modified_timestamp,
'{{ invocation_id }}' as _invocation_id
FROM (
SELECT * FROM parsed_logs
UNION ALL
SELECT * FROM failed_txs
) t
QUALIFY ROW_NUMBER() OVER (PARTITION BY tx_id ORDER BY _inserted_timestamp DESC) = 1
ORDER BY block_timestamp, tx_id

View File

@ -0,0 +1,91 @@
{% set vars = return_vars() %}
{{ log_model_details() }}
{{ config(
materialized = 'incremental',
unique_key = 'test_timestamp',
full_refresh = vars.GLOBAL_SILVER_FR_ENABLED,
tags = ['silver','observability','phase_3']
) }}
WITH summary_stats AS (
SELECT
MIN(block_number) AS min_block,
MAX(block_number) AS max_block,
MIN(block_timestamp) AS min_block_timestamp,
MAX(block_timestamp) AS max_block_timestamp,
COUNT(1) AS blocks_tested
FROM
{{ ref('core__fact_blocks') }}
WHERE
block_timestamp <= DATEADD('hour', -12, CURRENT_TIMESTAMP())
{% if is_incremental() %}
AND (
block_number >= (
SELECT MIN(block_number) FROM (
SELECT MIN(block_number) AS block_number
FROM {{ ref('core__fact_blocks') }}
WHERE block_timestamp BETWEEN DATEADD('hour', -96, CURRENT_TIMESTAMP())
AND DATEADD('hour', -95, CURRENT_TIMESTAMP())
UNION
SELECT MIN(VALUE) - 1 AS block_number
FROM (
SELECT blocks_impacted_array
FROM {{ this }}
QUALIFY ROW_NUMBER() OVER (ORDER BY test_timestamp DESC) = 1
),
LATERAL FLATTEN(input => blocks_impacted_array)
)
)
{% if vars.MAIN_OBSERV_FULL_TEST_ENABLED %}
OR block_number >= 0
{% endif %}
)
{% endif %}
),
block_range AS (
SELECT _id AS block_number
FROM {{ source('crosschain_silver', 'number_sequence') }}
WHERE _id BETWEEN (
SELECT min_block FROM summary_stats
) AND (
SELECT max_block FROM summary_stats
)
),
blocks AS (
SELECT
l.block_number,
block_timestamp,
LAG(l.block_number, 1) OVER (ORDER BY l.block_number ASC) AS prev_block_number
FROM {{ ref('core__fact_blocks') }} l
INNER JOIN block_range b ON l.block_number = b.block_number
AND l.block_number >= (
SELECT MIN(block_number) FROM block_range
)
),
block_gen AS (
SELECT _id AS block_number
FROM {{ source('crosschain_silver', 'number_sequence') }}
WHERE _id BETWEEN (
SELECT MIN(block_number) FROM blocks
) AND (
SELECT MAX(block_number) FROM blocks
)
)
SELECT
'blocks' AS test_name,
MIN(b.block_number) AS min_block,
MAX(b.block_number) AS max_block,
MIN(b.block_timestamp) AS min_block_timestamp,
MAX(b.block_timestamp) AS max_block_timestamp,
COUNT(1) AS blocks_tested,
COUNT(CASE WHEN C.block_number IS NOT NULL THEN A.block_number END) AS blocks_impacted_count,
ARRAY_AGG(CASE WHEN C.block_number IS NOT NULL THEN A.block_number END) WITHIN GROUP (ORDER BY A.block_number) AS blocks_impacted_array,
CURRENT_TIMESTAMP() AS test_timestamp,
CURRENT_TIMESTAMP() AS modified_timestamp
FROM block_gen A
LEFT JOIN blocks b ON A.block_number = b.block_number
LEFT JOIN blocks C ON A.block_number > C.prev_block_number
AND A.block_number < C.block_number
AND C.block_number - C.prev_block_number <> 1
WHERE COALESCE(b.block_number, C.block_number) IS NOT NULL

View File

@ -0,0 +1,98 @@
{% set vars = return_vars() %}
{{ log_model_details() }}
{{ config(
materialized = 'incremental',
unique_key = 'test_timestamp',
full_refresh = vars.GLOBAL_SILVER_FR_ENABLED,
tags = ['silver','observability','phase_3']
) }}
WITH summary_stats AS (
SELECT
MIN(block_id) AS min_block_id,
MAX(block_id) AS max_block_id,
MIN(block_timestamp) AS min_block_timestamp,
MAX(block_timestamp) AS max_block_timestamp,
COUNT(1) AS blocks_tested
FROM
{{ ref('core__fact_blocks') }}
WHERE
block_timestamp <= DATEADD('hour', -12, CURRENT_TIMESTAMP())
{% if is_incremental() %}
AND (
block_id >= (
SELECT MIN(block_id) FROM (
SELECT MIN(block_id) AS block_id
FROM {{ ref('core__fact_blocks') }}
WHERE block_timestamp BETWEEN DATEADD('hour', -96, CURRENT_TIMESTAMP())
AND DATEADD('hour', -95, CURRENT_TIMESTAMP())
UNION
SELECT MIN(VALUE) - 1 AS block_id
FROM (
SELECT blocks_impacted_array
FROM {{ this }}
QUALIFY ROW_NUMBER() OVER (ORDER BY test_timestamp DESC) = 1
),
LATERAL FLATTEN(input => blocks_impacted_array)
)
)
{% if vars.MAIN_OBSERV_FULL_TEST_ENABLED %}
OR block_id >= 0
{% endif %}
)
{% endif %}
),
base_blocks AS (
SELECT
block_id,
block_timestamp,
tx_count AS transaction_count
FROM
{{ ref('core__fact_blocks') }}
WHERE
block_timestamp <= (
SELECT max_block_timestamp FROM summary_stats
)
),
actual_tx_counts AS (
SELECT
block_id,
COUNT(1) AS transaction_count
FROM
{{ ref('core__fact_transactions') }}
WHERE
block_id IS NOT NULL
GROUP BY block_id
),
potential_missing_txs AS (
SELECT
e.block_id
FROM
base_blocks e
LEFT OUTER JOIN actual_tx_counts a
ON e.block_id = a.block_id
WHERE
COALESCE(a.transaction_count, 0) <> e.transaction_count
),
impacted_blocks AS (
SELECT
COUNT(1) AS blocks_impacted_count,
ARRAY_AGG(block_id) WITHIN GROUP (ORDER BY block_id) AS blocks_impacted_array
FROM potential_missing_txs
)
SELECT
'transactions' AS test_name,
min_block_id AS min_block,
max_block_id AS max_block,
min_block_timestamp,
max_block_timestamp,
blocks_tested,
blocks_impacted_count,
blocks_impacted_array,
SYSDATE() AS test_timestamp,
SYSDATE() AS modified_timestamp
FROM
summary_stats
JOIN impacted_blocks
ON 1 = 1

View File

@ -0,0 +1,43 @@
{# Get variables #}
{% set vars = return_vars() %}
{# Log configuration details #}
{{ log_model_details() }}
{{ config(
materialized = 'incremental',
unique_key = ['id','recorded_hour'],
incremental_strategy = 'merge',
merge_exclude_columns = ["inserted_timestamp"],
cluster_by = 'recorded_hour::DATE',
tags = ['noncore']
) }}
SELECT
id,
recorded_hour,
OPEN,
high,
low,
CLOSE,
_INSERTED_TIMESTAMP,
hourly_prices_coin_gecko_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
{{ source(
'crosschain_silver',
'hourly_prices_coin_gecko'
) }}
WHERE
id = '{{ vars.GLOBAL_PROJECT_NAME }}'
{% if is_incremental() %}
AND modified_timestamp >= (
SELECT
MAX(modified_timestamp)
FROM
{{ this }}
)
{% endif %}

View File

@ -1,124 +0,0 @@
{# Get variables #}
{% set vars = return_vars() %}
-- depends_on: {{ ref('bronze__transactions') }}
-- depends_on: {{ ref('bronze__transactions_fr') }}
{{ config (
materialized = "incremental",
incremental_strategy = 'delete+insert',
unique_key = 'transactions_id',
cluster_by = ['modified_timestamp::DATE','partition_key'],
incremental_predicates = [fsc_ibc.standard_predicate()],
tags = ['silver', 'core', 'phase_2']
) }}
WITH bronze_transactions AS (
SELECT
block_id,
COALESCE(
DATA :hash,
f.value :hash
) :: STRING AS tx_id,
COALESCE(
DATA :index,
f.index
) AS tx_index,
COALESCE(
DATA :tx_result :codespace,
f.value :tx_result :codespace
) :: STRING AS codespace,
COALESCE(
DATA :tx_result :gas_used,
f.value :tx_result :gas_used
) :: NUMBER AS gas_used,
COALESCE(
DATA :tx_result :gas_wanted,
f.value :tx_result :gas_wanted
) :: NUMBER AS gas_wanted,
CASE
WHEN f.value :tx_result :code :: NUMBER = 0 THEN TRUE
ELSE FALSE
END AS tx_succeeded,
COALESCE(
DATA :tx_result :code,
f.value :tx_result :code
) :: INT AS tx_code,
COALESCE(
TRY_PARSE_JSON(
COALESCE(
DATA :tx_result :log,
f.value :tx_result :log
)
),
COALESCE(
DATA :tx_result :log,
f.value :tx_result :log
)
) AS tx_log,
CASE
WHEN f.value IS NOT NULL THEN f.value
ELSE DATA
END AS DATA,
partition_key,
COALESCE(
transactions.value :block_id_REQUESTED,
REPLACE(
metadata :request :params [0],
'tx.height='
)
) AS block_id_requested,
inserted_timestamp AS _inserted_timestamp
FROM
{% if is_incremental() %}
{{ ref('bronze__transactions') }}
{% else %}
{{ ref('bronze__transactions_fr') }}
{% endif %}
AS transactions
JOIN LATERAL FLATTEN(
DATA :result :txs,
outer => TRUE
) AS f
WHERE
{% if is_incremental() %}
inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp)
FROM
{{ this }}
)
{% endif %}
)
SELECT
block_id,
b.block_timestamp,
codespace,
tx_id,
tx_index,
tx_log,
tx_succeeded,
{# tx_from, #}
{# fee, #}
{# fee_denom, #}
gas_used,
gas_wanted,
tx_code,
DATA,
partition_key,
block_id_requested,
_inserted_timestamp,
{{ dbt_utils.generate_surrogate_key(
['block_id_requested', 'tx_id']
) }} AS transactions_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
bronze_transactions
JOIN {{ ref('silver__blocks') }} b
ON t.block_id = b.block_id
QUALIFY(ROW_NUMBER() over (
PARTITION BY block_id_requested, tx_id
ORDER BY _inserted_timestamp DESC)
) = 1

View File

@ -0,0 +1,59 @@
{# Log configuration details #}
{{ log_model_details() }}
{# Set up dbt configuration #}
{{ config(
materialized = 'incremental',
incremental_strategy = 'delete+insert',
unique_key = "block_timestamp_hour",
cluster_by = ['block_timestamp_hour::DATE'],
tags = ['silver_stats','curated','stats','daily_test','phase_4']
) }}
{# run incremental timestamp value first then use it as a static value #}
{% if execute %}
{% if is_incremental() %}
{% set query %}
SELECT
MIN(DATE_TRUNC('hour', block_timestamp)) AS block_timestamp_hour
FROM
{{ ref('core__fact_transactions') }}
WHERE
modified_timestamp >= (
SELECT
MAX(modified_timestamp)
FROM
{{ this }}
)
{% endset %}
{% set min_block_timestamp_hour = run_query(query).columns [0].values() [0] %}
{% endif %}
{% endif %}
{# TODO: unique_from_count and total_fees are omitted because tx_from and fee are not present in core__fact_transactions. #}
{# Main query starts here #}
SELECT
DATE_TRUNC('hour', block_timestamp) AS block_timestamp_hour,
MIN(block_id) AS block_id_min,
MAX(block_id) AS block_id_max,
COUNT(DISTINCT block_id) AS block_count,
COUNT(DISTINCT tx_id) AS transaction_count,
COUNT(DISTINCT CASE WHEN tx_succeeded THEN tx_id END) AS transaction_count_success,
COUNT(DISTINCT CASE WHEN NOT tx_succeeded THEN tx_id END) AS transaction_count_failed,
SUM(gas_used) AS total_gas_used,
MAX(modified_timestamp) AS _inserted_timestamp,
{{ dbt_utils.generate_surrogate_key(['block_timestamp_hour']) }} AS core_metrics_hourly_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
{{ ref('core__fact_transactions') }}
WHERE
DATE_TRUNC('hour', block_timestamp) < DATE_TRUNC('hour', CURRENT_TIMESTAMP)
{% if is_incremental() %}
AND DATE_TRUNC('hour', block_timestamp) >= '{{ min_block_timestamp_hour }}'
{% endif %}
GROUP BY 1

View File

@ -1,6 +1,5 @@
{# Get variables #}
{% set vars = return_vars() %}
-- depends_on: {{ ref('bronze__blocks') }}
{{ config (
materialized = "incremental",
@ -14,7 +13,11 @@
SELECT
DATA :result :block :header :height :: INT AS block_id,
{{ dbt_utils.generate_surrogate_key(
DATA :result :block :header :time :: TIMESTAMP AS block_timestamp,
ARRAY_SIZE(
DATA :result :block :data :txs
) tx_count,
{{ dbt_utils.generate_surrogate_key(
['block_id']
) }} AS complete_blocks_id,
SYSDATE() AS inserted_timestamp,

View File

@ -1,8 +1,6 @@
{# Get variables #}
{% set vars = return_vars() %}
-- depends_on: {{ ref('bronze__transactions') }}
{{ config (
materialized = "incremental",
incremental_strategy = 'merge',
@ -14,15 +12,8 @@
) }}
SELECT
COALESCE(
VALUE :block_id_REQUESTED,
DATA :height,
VALUE :data :result :txs [0] :height
) :: INT AS block_id,
COALESCE(
VALUE :PAGE_NUMBER,
metadata :request :params [2]
) :: INT AS page_number,
VALUE :BLOCK_ID_REQUESTED :: INT AS block_id,
VALUE :PAGE_NUMBER :: INT AS page_number,
{{ dbt_utils.generate_surrogate_key(
['block_id','page_number']
) }} AS complete_transactions_id,

View File

@ -1,7 +1,7 @@
{# Get variables #}
{% set vars = return_vars() %}
-- depends_on: {{ ref('bronze__tx_counts') }}
-- depends_on: {{ ref('bronze__tx_count') }}
{{ config (
materialized = "incremental",
@ -26,7 +26,7 @@ SELECT
FROM
{% if is_incremental() %}
{{ ref('bronze__tx_counts') }}
{{ ref('bronze__tx_count') }}
WHERE
inserted_timestamp >= (
SELECT
@ -34,9 +34,8 @@ WHERE
FROM
{{ this }}
)
AND block_id NOT IN (21208991)
{% else %}
{{ ref('bronze__tx_counts_fr') }}
{{ ref('bronze__tx_count_fr') }}
{% endif %}
qualify(ROW_NUMBER() over (PARTITION BY block_id

View File

@ -24,12 +24,12 @@ WITH blocks AS (
)
SELECT
ROUND(block_id, -4) :: INT AS partition_key,
block_id,
{{ target.database }}.live.udf_api(
'POST',
'{{ vars.GLOBAL_NODE_URL }}',
OBJECT_CONSTRUCT(
'Content-Type', 'application/json',
'fsc-quantum-state', 'streamline'
'Content-Type', 'application/json'
),
OBJECT_CONSTRUCT(
'id', block_id,
@ -40,9 +40,7 @@ SELECT
'{{ vars.GLOBAL_NODE_VAULT_PATH }}'
) AS request
FROM
blocks
ORDER BY
block_id
blocks
LIMIT {{ vars.MAIN_SL_BLOCKS_REALTIME_SQL_LIMIT }}
@ -54,13 +52,14 @@ LIMIT {{ vars.MAIN_SL_BLOCKS_REALTIME_SQL_LIMIT }}
'producer_batch_size': vars.MAIN_SL_BLOCKS_REALTIME_PRODUCER_BATCH_SIZE,
'worker_batch_size': vars.MAIN_SL_BLOCKS_REALTIME_WORKER_BATCH_SIZE,
'async_concurrent_requests': vars.MAIN_SL_BLOCKS_REALTIME_ASYNC_CONCURRENT_REQUESTS,
'sql_source': "{{this.identifier}}"
'sql_source' : this.identifier,
"order_by_column": "block_id"
} %}
{% set function_call_sql %}
{{ fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_rest_api_v2',
target = '{{this.schema}}.{{this.identifier}}',
target = this.schema ~ '.' ~ this.identifier,
params = params
) }}
{% endset %}

View File

@ -14,9 +14,10 @@ WITH blocks AS (
SELECT
A.block_id,
A.block_timestamp,
tx_count
FROM
{{ ref('streamline__tx_counts_complete') }} A
{{ ref('streamline__blocks_complete') }} A
WHERE
tx_count > 0
),
@ -39,6 +40,7 @@ numbers AS (
blocks_with_page_numbers AS (
SELECT
tt.block_id :: INT AS block_id,
tt.block_timestamp,
n.n AS page_number
FROM
blocks tt
@ -52,6 +54,7 @@ numbers AS (
EXCEPT
SELECT
block_id,
null as block_timestamp, -- placeholder for now...
page_number
FROM
{{ ref('streamline__transactions_complete') }}
@ -65,8 +68,7 @@ numbers AS (
'POST',
'{{ vars.GLOBAL_NODE_URL }}',
OBJECT_CONSTRUCT(
'Content-Type', 'application/json',
'fsc-quantum-state', 'streamline'
'Content-Type', 'application/json'
),
OBJECT_CONSTRUCT(
'id', block_id,
@ -83,11 +85,10 @@ numbers AS (
'{{ vars.GLOBAL_NODE_VAULT_PATH }}'
) AS request,
page_number,
block_id AS block_id_requested
block_id AS block_id_requested,
to_char(block_timestamp,'YYYY_MM_DD_HH_MI_SS_FF3') AS block_timestamp
FROM
blocks_with_page_numbers
ORDER BY
block_id
LIMIT {{ vars.MAIN_SL_TRANSACTIONS_REALTIME_SQL_LIMIT }}
@ -99,14 +100,15 @@ LIMIT {{ vars.MAIN_SL_TRANSACTIONS_REALTIME_SQL_LIMIT }}
'producer_batch_size': vars.MAIN_SL_TRANSACTIONS_REALTIME_PRODUCER_BATCH_SIZE,
'worker_batch_size': vars.MAIN_SL_TRANSACTIONS_REALTIME_WORKER_BATCH_SIZE,
'async_concurrent_requests': vars.MAIN_SL_TRANSACTIONS_REALTIME_ASYNC_CONCURRENT_REQUESTS,
'sql_source': "{{this.identifier}}",
'exploded_key': '["result.txs"]'
'sql_source' : this.identifier,
'exploded_key': '["result.txs"]',
"order_by_column": "block_id_requested"
} %}
{% set function_call_sql %}
{{ fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_rest_api_v2',
target = '{{this.schema}}.{{this.identifier}}',
target = this.schema ~ '.' ~ this.identifier,
params = params
) }}
{% endset %}

View File

@ -20,7 +20,7 @@ WITH blocks AS (
SELECT
block_id
FROM
{{ ref('streamline__tx_counts_complete') }}
{{ ref('streamline__tx_count_complete') }}
),
{# retry AS (
SELECT
@ -50,8 +50,7 @@ SELECT
'POST',
'{{ vars.GLOBAL_NODE_URL }}',
OBJECT_CONSTRUCT(
'Content-Type', 'application/json',
'fsc-quantum-state', 'streamline'
'Content-Type', 'application/json'
),
OBJECT_CONSTRUCT(
'id', block_id,
@ -80,13 +79,13 @@ ORDER BY
'producer_batch_size': vars.MAIN_SL_TX_COUNTS_REALTIME_PRODUCER_BATCH_SIZE,
'worker_batch_size': vars.MAIN_SL_TX_COUNTS_REALTIME_WORKER_BATCH_SIZE,
'async_concurrent_requests': vars.MAIN_SL_TX_COUNTS_REALTIME_ASYNC_CONCURRENT_REQUESTS,
'sql_source' :"{{this.identifier}}"
'sql_source' : this.identifier
} %}
{% set function_call_sql %}
{{ fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_rest_api_v2',
target = '{{this.schema}}.{{this.identifier}}',
target = this.schema ~ '.' ~ this.identifier,
params = params
) }}
{% endset %}

View File

@ -19,10 +19,7 @@ FROM
WHERE
_id <= (
SELECT
COALESCE(
block_id,
0
)
MAX(block_id)
FROM
{{ ref('streamline__chainhead') }}
)

View File

@ -0,0 +1,5 @@
{% docs attribute_index %}
The position in which attributes occur within a message
{% enddocs %}

View File

@ -0,0 +1,5 @@
{% docs attribute_key %}
The key from the key-value pair from the message attribute
{% enddocs %}

View File

@ -0,0 +1,5 @@
{% docs attribute_value %}
The value from the key-value pair from the message attribute
{% enddocs %}

View File

@ -0,0 +1,5 @@
{% docs msg %}
The flattened underlying json from the messages or events within the transactions
{% enddocs %}

View File

@ -0,0 +1,5 @@
{% docs core__fact_blocks %}
Records of all blocks that have occurred onchain, dating back to the genesis block.
{% enddocs %}

View File

@ -0,0 +1,5 @@
{% docs core__fact_msg_attributes %}
Records of all message attributes associated to messages that have occurred onchain.
{% enddocs %}

View File

@ -0,0 +1,5 @@
{% docs core__fact_msgs %}
Records of all message attributes associated to messages that have occurred onchain.
{% enddocs %}

View File

@ -0,0 +1,5 @@
{% docs core__fact_transactions %}
Records of all transactions that have occurred onchain, dating back to the genesis block.
{% enddocs %}

View File

@ -0,0 +1,5 @@
{% docs core__fact_transactions_logs %}
Records of all transactions logs that have occurred onchain, dating back to the genesis block.
{% enddocs %}

View File

@ -0,0 +1,5 @@
{% docs core__fact_transfers %}
Records of all transfers that have occurred onchain, dating back to the genesis block.
{% enddocs %}

View File

@ -0,0 +1,5 @@
{% docs tx_succeeded %}
A boolean indicator for whether the transaction was successful.
{% enddocs %}

View File

@ -11,8 +11,17 @@ sources:
schema: streamline
tables:
- name: blocks
freshness:
warn_after: {count: 2, period: hour}
error_after: {count: 4, period: hour}
- name: tx_counts
- name: txs
freshness:
warn_after: {count: 2, period: hour}
error_after: {count: 4, period: hour}
- name: transactions
freshness:
warn_after: {count: 2, period: hour}
error_after: {count: 4, period: hour}
- name: crosschain_silver
{{ 'CROSSCHAIN_DEV' if '_DEV' in target.database.upper() else 'CROSSCHAIN' }}
schema: silver

View File

@ -1,6 +1,6 @@
packages:
- git: https://github.com/FlipsideCrypto/fsc-utils.git
revision: v1.32.0
revision: v1.36.1
- package: get-select/dbt_snowflake_query_tags
version: [">=2.0.0", "<3.0.0"]
- package: dbt-labs/dbt_external_tables