osmosis-models/models/silver/core/silver__msgs.sql
Eric Laurello 8d60d2c572
Some checks failed
docs_update / run_dbt_jobs (push) Has been cancelled
docs_update / notify-failure (push) Has been cancelled
dbt_run_daily / run_dbt_jobs (push) Has been cancelled
dbt_run_dev_refresh / run_dbt_jobs (push) Has been cancelled
dbt_run_streamline_balances / run_dbt_jobs (push) Has been cancelled
dbt_run_daily / notify-failure (push) Has been cancelled
dbt_run_streamline_balances / notify-failure (push) Has been cancelled
dbt_test_scheduled / run_dbt_jobs (push) Has been cancelled
dbt_test_scheduled / notify-failure (push) Has been cancelled
dbt_run_incremental_core / run_dbt_jobs (push) Has been cancelled
dbt_run_incremental_non_core / run_dbt_jobs (push) Has been cancelled
dbt_run_udf_blockchain / run_dbt_jobs (push) Has been cancelled
dbt_run_incremental_core / notify-failure (push) Has been cancelled
dbt_run_incremental_non_core / notify-failure (push) Has been cancelled
dbt_run_udf_blockchain / notify-failure (push) Has been cancelled
dbt_run_observability / run_dbt_jobs (push) Has been cancelled
dbt_run_streamline_transactions_realtime / run_dbt_jobs (push) Has been cancelled
dbt_run_observability / notify-failure (push) Has been cancelled
dbt_run_streamline_transactions_realtime / notify-failure (push) Has been cancelled
dbt_run_streamline_blocks_txcount_realtime / run_dbt_jobs (push) Has been cancelled
dbt_run_streamline_blocks_txcount_realtime / notify-failure (push) Has been cancelled
remove so
2025-08-01 09:33:21 -04:00

185 lines
3.7 KiB
SQL

{{ config(
materialized = 'incremental',
unique_key = "_unique_key",
incremental_strategy = 'merge',
incremental_predicates = ["dynamic_range_predicate", "block_timestamp::date"],
merge_exclude_columns = ["inserted_timestamp"],
cluster_by = ['block_timestamp::DATE','_inserted_timestamp::DATE'],
tags = ['core']
) }}
WITH b AS (
SELECT
block_id,
block_timestamp,
tx_id,
tx_succeeded,
INDEX AS msg_index,
VALUE :type :: STRING AS msg_type,
VALUE AS msg,
IFF(
COALESCE(
TRY_BASE64_DECODE_STRING(
msg :attributes [0] :key
),
CASE
WHEN block_id >= 12833808 THEN msg :attributes [0] :key
END
) :: STRING = 'action',
TRUE,
FALSE
) AS is_action,
IFF(
COALESCE(
CASE
WHEN block_id < 12833808 THEN TRY_BASE64_DECODE_STRING(
msg :attributes [0] :key
)
END,
msg :attributes [0] :key
) :: STRING = 'module',
TRUE,
FALSE
) AS is_module,
COALESCE(
CASE
WHEN block_id < 12833808 THEN TRY_BASE64_DECODE_STRING(
msg :attributes [0] :key
)
END,
msg :attributes [0] :key
) :: STRING AS attribute_key,
COALESCE(
CASE
WHEN block_id < 12833808 THEN TRY_BASE64_DECODE_STRING(
msg :attributes [0] :value
)
END,
msg :attributes [0] :value
) :: STRING AS attribute_value,
_inserted_timestamp
FROM
{{ ref('silver__transactions') }} A,
LATERAL FLATTEN(input => A.msgs)
{% if is_incremental() %}
WHERE
_inserted_timestamp >= DATEADD(
'minute',
-30,(
SELECT
MAX(
_inserted_timestamp
)
FROM
{{ this }}
)
)
{% endif %}
),
prefinal AS (
SELECT
block_id,
block_timestamp,
tx_id,
tx_succeeded,
NULLIF(
(conditional_true_event(is_action) over (PARTITION BY tx_id
ORDER BY
msg_index) -1),
-1
) AS msg_group,
msg_index,
msg_type,
LAST_VALUE(
msg_type
) over(
PARTITION BY tx_id
ORDER BY
msg_index
) AS last_msg_type,
msg,
is_module,
attribute_key,
attribute_value,
_inserted_timestamp
FROM
b
),
exec_actions AS (
SELECT
DISTINCT tx_id,
msg_group
FROM
prefinal
WHERE
msg_type = 'message'
AND attribute_key = 'action'
AND (LOWER(attribute_value) LIKE '%exec%'
OR attribute_value IN ('/ibc.core.channel.v1.MsgRecvPacket', '/ibc.applications.transfer.v1.MsgTransfer'))
),
grp AS (
SELECT
A.tx_id,
A.msg_index,
is_module,
RANK() over(
PARTITION BY A.tx_id,
A.msg_group
ORDER BY
A.msg_index
) -1 msg_sub_group
FROM
prefinal A
JOIN exec_actions b
ON A.tx_id = b.tx_id
AND A.msg_group = b.msg_group
WHERE
A.is_module = TRUE
AND A.msg_type = 'message'
)
SELECT
block_id,
block_timestamp,
A.tx_id,
tx_succeeded,
msg_group,
COALESCE(
CASE
WHEN msg_group IS NULL THEN NULL
ELSE LAST_VALUE(
b.msg_sub_group ignore nulls
) over(
PARTITION BY A.tx_id,
msg_group
ORDER BY
A.msg_index DESC rows unbounded preceding
)
END,
CASE
WHEN msg_group IS NOT NULL
AND last_msg_type <> 'tx' THEN 0
END
) AS msg_sub_group,
A.msg_index,
msg_type,
msg,
_inserted_timestamp,
concat_ws(
'-',
A.tx_id,
A.msg_index
) AS _unique_key,
{{ dbt_utils.generate_surrogate_key(
['_unique_key']
) }} AS msgs_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
prefinal A
LEFT JOIN grp b
ON A.tx_id = b.tx_id
AND A.msg_index = b.msg_index