a slew of core fixes

This commit is contained in:
Mike Stepanovic 2025-06-05 13:33:07 -06:00
parent 8b01ccf088
commit 59dd7b736e
8 changed files with 77 additions and 48 deletions

View File

@ -2,7 +2,7 @@
{% set vars = return_vars() %}
{# Set fact_blocks specific variables #}
{% set rpc_vars = set_dynamic_fields('fact_blocks') %}
{#{% set rpc_vars = set_dynamic_fields('fact_blocks') %}#}
{{ config(
materialized = 'incremental',

View File

@ -1,29 +1,62 @@
{# Get variables #}
{% set vars = return_vars() %}
{# Set fact_transactions specific variables #}
{% set rpc_vars = set_dynamic_fields('fact_transactions') %}
-- depends_on: {{ ref('silver__msgs') }}
{{ config(
{{ config (
materialized = 'incremental',
incremental_strategy = 'merge',
unique_key = 'msg_attributes_id',
cluster_by = ['block_timestamp::DATE'],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(block_id, tx_id)",
cluster_by = ['modified_timestamp::DATE'],
incremental_predicates = [fsc_ibc.standard_predicate()],
tags = ['gold', 'core', 'phase_2']
tags = ['silver', 'core', 'phase_2']
) }}
WITH silver_msgs AS (
SELECT
block_id,
block_timestamp,
tx_id,
tx_succeeded,
msg_group,
msg_sub_group,
msg_index,
msg_type,
f.index AS attribute_index,
CASE
WHEN TRY_BASE64_DECODE_STRING(f.value :key) IS NULL
THEN f.value :key
ELSE TRY_BASE64_DECODE_STRING(f.value :key)
END AS attribute_key,
CASE
WHEN TRY_BASE64_DECODE_STRING(f.value :key) IS NULL
THEN f.value :value
ELSE TRY_BASE64_DECODE_STRING(f.value :value)
END AS attribute_value,
msgs._inserted_timestamp
FROM
{{ ref('silver__msgs') }} AS msgs,
LATERAL FLATTEN(
input => msgs.msg,
path => 'attributes'
) AS f
{% if is_incremental() %}
WHERE
_inserted_timestamp :: DATE >= (
SELECT
MAX(_inserted_timestamp) :: DATE - 2
FROM
{{ this }}
)
{% endif %}
)
SELECT
block_id,
block_timestamp,
tx_id,
tx_succeeded,
CONCAT(
msg_group,
':',
msg_sub_group
) AS msg_group,
msg_group,
msg_sub_group,
msg_index,
msg_type,
attribute_index,
@ -31,18 +64,10 @@ SELECT
attribute_value,
{{ dbt_utils.generate_surrogate_key(
['tx_id','msg_index','attribute_index']
) }} AS fact_msg_attributes_id,
) }} AS msg_attributes_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
_inserted_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
{{ ref('silver__msg_attributes') }}
{% if is_incremental() %}
WHERE
modified_timestamp :: DATE >= (
SELECT
MAX(modified_timestamp)
FROM
{{ this }}
)
{% endif %}
silver_msgs

View File

@ -2,7 +2,7 @@
{% set vars = return_vars() %}
{# Set fact_transactions specific variables #}
{% set rpc_vars = set_dynamic_fields('fact_transactions') %}
{#{% set rpc_vars = set_dynamic_fields('fact_transactions') %}#}
{{ config(
materialized = 'incremental',

View File

@ -2,7 +2,7 @@
{% set vars = return_vars() %}
{# Set fact_transactions specific variables #}
{% set rpc_vars = set_dynamic_fields('fact_transactions') %}
{#{% set rpc_vars = set_dynamic_fields('fact_transactions') %}#}
{{ config(
materialized = 'incremental',

View File

@ -2,7 +2,7 @@
{% set vars = return_vars() %}
{# Set fact_transactions_logs specific variables #}
{% set rpc_vars = set_dynamic_fields('fact_transactions_logs') %}
{#{% set rpc_vars = set_dynamic_fields('fact_transactions_logs') %}#}
{{ config(
materialized = 'incremental',

View File

@ -14,7 +14,7 @@ WITH bronze_blocks AS (
SELECT
'{{ vars.GLOBAL_PROJECT_NAME }}' AS blockchain,
block_id,
VALUE :BLOCK_ID :: INT AS block_id,
COALESCE(
DATA :result :block :header :time :: TIMESTAMP,
DATA :block :header :time :: TIMESTAMP,
@ -45,7 +45,7 @@ WITH bronze_blocks AS (
DATA :result :block :header,
DATA :block :header
) AS header,
_inserted_timestamp
inserted_timestamp
FROM
{% if is_incremental() %}
@ -58,15 +58,16 @@ WHERE
AND DATA :error IS NULL
{% if is_incremental() %}
AND _inserted_timestamp >= (
AND inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp)
MAX(inserted_timestamp)
FROM
{{ this }}
)
{% endif %}
)
SELECT
blockchain,
block_id,
block_timestamp,
chain_id,
@ -74,15 +75,15 @@ SELECT
proposer_address,
validator_hash,
header,
_inserted_timestamp,
{{ dbt_utils.generate_surrogate_key(['chain_id', 'block_id']) }} AS blocks_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
bronze_blocks qualify ROW_NUMBER() over (
bronze_blocks
QUALIFY ROW_NUMBER() OVER (
PARTITION BY chain_id,
block_id
ORDER BY
_inserted_timestamp DESC
inserted_timestamp DESC
) = 1

View File

@ -7,7 +7,7 @@
materialized = 'incremental',
incremental_strategy = 'merge',
unique_key = 'msgs_id',
cluster_by = ['modified_timestamp::DATE', 'partition_key'],
cluster_by = ['modified_timestamp::DATE'],
incremental_predicates = [fsc_ibc.standard_predicate()],
tags = ['silver', 'core', 'phase_2']
) }}
@ -59,7 +59,7 @@ WITH bronze_msgs AS (
THEN msg :attributes [0] :key
ELSE TRY_BASE64_DECODE_STRING(msg :attributes [0] :value)
END AS attribute_value,
t._inserted_timestamp
transactions._inserted_timestamp
FROM
{{ ref('silver__transactions') }} transactions
JOIN LATERAL FLATTEN(input => transactions.msgs) f
@ -124,10 +124,7 @@ msgs AS (
bronze_msgs.msg_index,
msg_type,
msg,
concat_ws(
'-',
bronze_msgs.tx_id,
_inserted_timestamp
bronze_msgs._inserted_timestamp
FROM
bronze_msgs
LEFT JOIN GROUPING b

View File

@ -14,9 +14,10 @@
WITH bronze_transactions AS (
SELECT
block_id,
TO_TIMESTAMP(
DATA :BLOCK_TIMESTAMP 'YYYY_MM_DD_HH_MI_SS_FF3'
VALUE :BLOCK_ID :: INT AS block_id,
TO_TIMESTAMP_NTZ(
DATA:BLOCK_TIMESTAMP::STRING,
'YYYY_MM_DD_HH_MI_SS_FF3'
) AS block_timestamp,
DATA :hash :: STRING AS tx_id,
DATA :index AS tx_index,
@ -34,6 +35,7 @@ WITH bronze_transactions AS (
),
DATA :tx_result :log
) AS tx_log,
DATA :tx_result :events AS msgs,
DATA,
partition_key,
DATA :BLOCK_ID_REQUESTED AS block_id_requested,
@ -51,9 +53,8 @@ WITH bronze_transactions AS (
{% else %}
{{ ref('bronze__transactions_fr') }}
{% endif %}
WHERE
{% if is_incremental() %}
WHERE
inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp)
@ -61,7 +62,12 @@ inserted_timestamp >= (
{{ this }}
)
{% endif %}
qualify(ROW_NUMBER() over (PARTITION BY block_id_requested, tx_id
ORDER BY
_inserted_timestamp DESC)) = 1
)
SELECT
*
FROM
bronze_transactions
QUALIFY ROW_NUMBER() OVER (
PARTITION BY block_id_requested, tx_id
ORDER BY _inserted_timestamp DESC
) = 1