pushing logs models

This commit is contained in:
Mike Stepanovic 2025-05-19 13:13:18 -06:00
parent a352af34f0
commit 60569409d1
2 changed files with 167 additions and 0 deletions

View File

@ -0,0 +1,39 @@
{# Get variables #}
{% set vars = return_vars() %}
{# Set fact_transactions_logs specific variables #}
{% set rpc_vars = set_dynamic_fields('fact_transactions_logs') %}
{{ config(
materialized = 'incremental',
incremental_strategy = 'delete+insert',
unique_key = 'transactions_logs_id',
cluster_by = ['block_timestamp::DATE'],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(block_id, tx_id)",
incremental_predicates = [fsc_ibc.standard_predicate()],
tags = ['gold', 'core', 'phase_2']
) }}
SELECT
block_id,
block_timestamp,
tx_id,
tx_succeeded,
tx_code,
codespace,
tx_log,
transactions_logs_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
{{ ref('silver__transactions_logs') }}
{% if is_incremental() %}
WHERE
modified_timestamp :: DATE >= (
SELECT
MAX(modified_timestamp)
FROM
{{ this }}
)
{% endif %}

View File

@ -0,0 +1,128 @@
{{ config(
materialized = 'incremental',
unique_key = ['tx_id'],
incremental_strategy = 'delete+insert',
cluster_by = ['block_timestamp::DATE'],
tags = ['silver', 'core', 'phase_2']
) }}
WITH event_attributes AS (
SELECT
t.block_id,
t.block_timestamp,
t.tx_id,
t.tx_succeeded,
t.tx_code,
t.codespace,
COALESCE(l.value:msg_index::INTEGER, 0) as msg_index,
e.value:type::STRING as event_type,
ARRAY_AGG(
OBJECT_CONSTRUCT(
'key', a.value:key::STRING,
'value', COALESCE(a.value:value::STRING, '')
)
) as attributes,
t._inserted_timestamp
FROM {{ ref('silver__transactions') }} t,
LATERAL FLATTEN(input => PARSE_JSON(t.tx_log)) l,
LATERAL FLATTEN(input => l.value:events) e,
LATERAL FLATTEN(input => e.value:attributes) a
WHERE t.tx_log IS NOT NULL
AND t.tx_succeeded = TRUE
{% if is_incremental() %}
AND t._inserted_timestamp >= (
SELECT MAX(_inserted_timestamp)
FROM {{ this }}
)
{% endif %}
GROUP BY
t.block_id,
t.block_timestamp,
t.tx_id,
t.tx_succeeded,
t.tx_code,
t.codespace,
l.value:msg_index,
e.value:type,
t._inserted_timestamp
),
parsed_logs AS (
SELECT
block_id,
block_timestamp,
tx_id,
tx_succeeded,
tx_code,
codespace,
ARRAY_AGG(
OBJECT_CONSTRUCT(
'msg_index', msg_index,
'event_type', event_type,
'attributes', attributes
)
) WITHIN GROUP (ORDER BY msg_index, event_type) as tx_log,
_inserted_timestamp
FROM event_attributes
GROUP BY
block_id,
block_timestamp,
tx_id,
tx_succeeded,
tx_code,
codespace,
_inserted_timestamp
),
failed_txs AS (
SELECT
t.block_id,
t.block_timestamp,
t.tx_id,
t.tx_succeeded,
t.tx_code,
t.codespace,
ARRAY_CONSTRUCT(
OBJECT_CONSTRUCT(
'msg_index', 0,
'event_type', 'error',
'attributes', ARRAY_CONSTRUCT(
OBJECT_CONSTRUCT(
'key', 'error_message',
'value', t.tx_log::STRING
)
)
)
) as tx_log,
t._inserted_timestamp
FROM {{ ref('silver__transactions') }} t
WHERE t.tx_succeeded = FALSE
AND t.tx_log IS NOT NULL
{% if is_incremental() %}
AND t._inserted_timestamp >= (
SELECT MAX(_inserted_timestamp)
FROM {{ this }}
)
{% endif %}
)
SELECT
block_id,
block_timestamp,
tx_id,
tx_succeeded,
tx_code,
codespace,
tx_log,
{{ dbt_utils.generate_surrogate_key(['tx_id']) }} as transactions_logs_id,
_inserted_timestamp,
CURRENT_TIMESTAMP() as inserted_timestamp,
CURRENT_TIMESTAMP() as modified_timestamp,
'{{ invocation_id }}' as _invocation_id
FROM (
SELECT * FROM parsed_logs
UNION ALL
SELECT * FROM failed_txs
) t
QUALIFY ROW_NUMBER() OVER (PARTITION BY tx_id ORDER BY _inserted_timestamp DESC) = 1
ORDER BY block_timestamp, tx_id