From 59dd7b736e79ea9467a04df2807c86b8547669b5 Mon Sep 17 00:00:00 2001 From: Mike Stepanovic Date: Thu, 5 Jun 2025 13:33:07 -0600 Subject: [PATCH] a slew of core fixes --- .../core/gold/core/core__fact_blocks.sql | 2 +- .../gold/core/core__fact_msg_attributes.sql | 69 +++++++++++++------ .../core/gold/core/core__fact_msgs.sql | 2 +- .../gold/core/core__fact_transactions.sql | 2 +- .../core/core__fact_transactions_logs.sql | 2 +- .../core/silver/core/silver__blocks.sql | 15 ++-- .../core/silver/core/silver__msgs.sql | 9 +-- .../core/silver/core/silver__transactions.sql | 24 ++++--- 8 files changed, 77 insertions(+), 48 deletions(-) diff --git a/models/main_package/core/gold/core/core__fact_blocks.sql b/models/main_package/core/gold/core/core__fact_blocks.sql index 5392ff5..e70c9de 100644 --- a/models/main_package/core/gold/core/core__fact_blocks.sql +++ b/models/main_package/core/gold/core/core__fact_blocks.sql @@ -2,7 +2,7 @@ {% set vars = return_vars() %} {# Set fact_blocks specific variables #} -{% set rpc_vars = set_dynamic_fields('fact_blocks') %} +{#{% set rpc_vars = set_dynamic_fields('fact_blocks') %}#} {{ config( materialized = 'incremental', diff --git a/models/main_package/core/gold/core/core__fact_msg_attributes.sql b/models/main_package/core/gold/core/core__fact_msg_attributes.sql index a24885d..b110d4b 100644 --- a/models/main_package/core/gold/core/core__fact_msg_attributes.sql +++ b/models/main_package/core/gold/core/core__fact_msg_attributes.sql @@ -1,29 +1,62 @@ {# Get variables #} {% set vars = return_vars() %} -{# Set fact_transactions specific variables #} -{% set rpc_vars = set_dynamic_fields('fact_transactions') %} +-- depends_on: {{ ref('silver__msgs') }} -{{ config( +{{ config ( materialized = 'incremental', incremental_strategy = 'merge', unique_key = 'msg_attributes_id', - cluster_by = ['block_timestamp::DATE'], - post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(block_id, tx_id)", + cluster_by = ['modified_timestamp::DATE'], incremental_predicates = [fsc_ibc.standard_predicate()], - tags = ['gold', 'core', 'phase_2'] + tags = ['silver', 'core', 'phase_2'] ) }} +WITH silver_msgs AS ( + SELECT + block_id, + block_timestamp, + tx_id, + tx_succeeded, + msg_group, + msg_sub_group, + msg_index, + msg_type, + f.index AS attribute_index, + CASE + WHEN TRY_BASE64_DECODE_STRING(f.value :key) IS NULL + THEN f.value :key + ELSE TRY_BASE64_DECODE_STRING(f.value :key) + END AS attribute_key, + CASE + WHEN TRY_BASE64_DECODE_STRING(f.value :key) IS NULL + THEN f.value :value + ELSE TRY_BASE64_DECODE_STRING(f.value :value) + END AS attribute_value, + msgs._inserted_timestamp + FROM + {{ ref('silver__msgs') }} AS msgs, + LATERAL FLATTEN( + input => msgs.msg, + path => 'attributes' + ) AS f + {% if is_incremental() %} + WHERE + _inserted_timestamp :: DATE >= ( + SELECT + MAX(_inserted_timestamp) :: DATE - 2 + FROM + {{ this }} + ) + {% endif %} +) SELECT block_id, block_timestamp, tx_id, tx_succeeded, - CONCAT( - msg_group, - ':', - msg_sub_group - ) AS msg_group, + msg_group, + msg_sub_group, msg_index, msg_type, attribute_index, @@ -31,18 +64,10 @@ SELECT attribute_value, {{ dbt_utils.generate_surrogate_key( ['tx_id','msg_index','attribute_index'] - ) }} AS fact_msg_attributes_id, + ) }} AS msg_attributes_id, SYSDATE() AS inserted_timestamp, SYSDATE() AS modified_timestamp, + _inserted_timestamp, '{{ invocation_id }}' AS _invocation_id FROM - {{ ref('silver__msg_attributes') }} -{% if is_incremental() %} -WHERE - modified_timestamp :: DATE >= ( - SELECT - MAX(modified_timestamp) - FROM - {{ this }} - ) -{% endif %} \ No newline at end of file + silver_msgs diff --git a/models/main_package/core/gold/core/core__fact_msgs.sql b/models/main_package/core/gold/core/core__fact_msgs.sql index 77292c8..485b6d1 100644 --- a/models/main_package/core/gold/core/core__fact_msgs.sql +++ b/models/main_package/core/gold/core/core__fact_msgs.sql @@ -2,7 +2,7 @@ {% set vars = return_vars() %} {# Set fact_transactions specific variables #} -{% set rpc_vars = set_dynamic_fields('fact_transactions') %} +{#{% set rpc_vars = set_dynamic_fields('fact_transactions') %}#} {{ config( materialized = 'incremental', diff --git a/models/main_package/core/gold/core/core__fact_transactions.sql b/models/main_package/core/gold/core/core__fact_transactions.sql index 7adc179..23331e8 100644 --- a/models/main_package/core/gold/core/core__fact_transactions.sql +++ b/models/main_package/core/gold/core/core__fact_transactions.sql @@ -2,7 +2,7 @@ {% set vars = return_vars() %} {# Set fact_transactions specific variables #} -{% set rpc_vars = set_dynamic_fields('fact_transactions') %} +{#{% set rpc_vars = set_dynamic_fields('fact_transactions') %}#} {{ config( materialized = 'incremental', diff --git a/models/main_package/core/gold/core/core__fact_transactions_logs.sql b/models/main_package/core/gold/core/core__fact_transactions_logs.sql index dc75db2..87108a3 100644 --- a/models/main_package/core/gold/core/core__fact_transactions_logs.sql +++ b/models/main_package/core/gold/core/core__fact_transactions_logs.sql @@ -2,7 +2,7 @@ {% set vars = return_vars() %} {# Set fact_transactions_logs specific variables #} -{% set rpc_vars = set_dynamic_fields('fact_transactions_logs') %} +{#{% set rpc_vars = set_dynamic_fields('fact_transactions_logs') %}#} {{ config( materialized = 'incremental', diff --git a/models/main_package/core/silver/core/silver__blocks.sql b/models/main_package/core/silver/core/silver__blocks.sql index db39bac..1c3e174 100644 --- a/models/main_package/core/silver/core/silver__blocks.sql +++ b/models/main_package/core/silver/core/silver__blocks.sql @@ -14,7 +14,7 @@ WITH bronze_blocks AS ( SELECT '{{ vars.GLOBAL_PROJECT_NAME }}' AS blockchain, - block_id, + VALUE :BLOCK_ID :: INT AS block_id, COALESCE( DATA :result :block :header :time :: TIMESTAMP, DATA :block :header :time :: TIMESTAMP, @@ -45,7 +45,7 @@ WITH bronze_blocks AS ( DATA :result :block :header, DATA :block :header ) AS header, - _inserted_timestamp + inserted_timestamp FROM {% if is_incremental() %} @@ -58,15 +58,16 @@ WHERE AND DATA :error IS NULL {% if is_incremental() %} -AND _inserted_timestamp >= ( +AND inserted_timestamp >= ( SELECT - MAX(_inserted_timestamp) + MAX(inserted_timestamp) FROM {{ this }} ) {% endif %} ) SELECT + blockchain, block_id, block_timestamp, chain_id, @@ -74,15 +75,15 @@ SELECT proposer_address, validator_hash, header, - _inserted_timestamp, {{ dbt_utils.generate_surrogate_key(['chain_id', 'block_id']) }} AS blocks_id, SYSDATE() AS inserted_timestamp, SYSDATE() AS modified_timestamp, '{{ invocation_id }}' AS _invocation_id FROM - bronze_blocks qualify ROW_NUMBER() over ( + bronze_blocks +QUALIFY ROW_NUMBER() OVER ( PARTITION BY chain_id, block_id ORDER BY - _inserted_timestamp DESC + inserted_timestamp DESC ) = 1 diff --git a/models/main_package/core/silver/core/silver__msgs.sql b/models/main_package/core/silver/core/silver__msgs.sql index 155eaa0..8c45a38 100644 --- a/models/main_package/core/silver/core/silver__msgs.sql +++ b/models/main_package/core/silver/core/silver__msgs.sql @@ -7,7 +7,7 @@ materialized = 'incremental', incremental_strategy = 'merge', unique_key = 'msgs_id', - cluster_by = ['modified_timestamp::DATE', 'partition_key'], + cluster_by = ['modified_timestamp::DATE'], incremental_predicates = [fsc_ibc.standard_predicate()], tags = ['silver', 'core', 'phase_2'] ) }} @@ -59,7 +59,7 @@ WITH bronze_msgs AS ( THEN msg :attributes [0] :key ELSE TRY_BASE64_DECODE_STRING(msg :attributes [0] :value) END AS attribute_value, - t._inserted_timestamp + transactions._inserted_timestamp FROM {{ ref('silver__transactions') }} transactions JOIN LATERAL FLATTEN(input => transactions.msgs) f @@ -124,10 +124,7 @@ msgs AS ( bronze_msgs.msg_index, msg_type, msg, - concat_ws( - '-', - bronze_msgs.tx_id, - _inserted_timestamp + bronze_msgs._inserted_timestamp FROM bronze_msgs LEFT JOIN GROUPING b diff --git a/models/main_package/core/silver/core/silver__transactions.sql b/models/main_package/core/silver/core/silver__transactions.sql index 99fd524..9c6d31c 100644 --- a/models/main_package/core/silver/core/silver__transactions.sql +++ b/models/main_package/core/silver/core/silver__transactions.sql @@ -14,9 +14,10 @@ WITH bronze_transactions AS ( SELECT - block_id, - TO_TIMESTAMP( - DATA :BLOCK_TIMESTAMP 'YYYY_MM_DD_HH_MI_SS_FF3' + VALUE :BLOCK_ID :: INT AS block_id, + TO_TIMESTAMP_NTZ( + DATA:BLOCK_TIMESTAMP::STRING, + 'YYYY_MM_DD_HH_MI_SS_FF3' ) AS block_timestamp, DATA :hash :: STRING AS tx_id, DATA :index AS tx_index, @@ -34,6 +35,7 @@ WITH bronze_transactions AS ( ), DATA :tx_result :log ) AS tx_log, + DATA :tx_result :events AS msgs, DATA, partition_key, DATA :BLOCK_ID_REQUESTED AS block_id_requested, @@ -51,9 +53,8 @@ WITH bronze_transactions AS ( {% else %} {{ ref('bronze__transactions_fr') }} {% endif %} -WHERE - {% if is_incremental() %} +WHERE inserted_timestamp >= ( SELECT MAX(_inserted_timestamp) @@ -61,7 +62,12 @@ inserted_timestamp >= ( {{ this }} ) {% endif %} - -qualify(ROW_NUMBER() over (PARTITION BY block_id_requested, tx_id -ORDER BY - _inserted_timestamp DESC)) = 1 +) +SELECT + * +FROM + bronze_transactions +QUALIFY ROW_NUMBER() OVER ( + PARTITION BY block_id_requested, tx_id + ORDER BY _inserted_timestamp DESC +) = 1