flow-models/models/silver/core/silver__streamline_events.sql
Jack Forgash 45a68debed
Some checks failed
docs_update / run_dbt_jobs (push) Has been cancelled
dbt_run_streamline_evm_daily_realtime / run_dbt_jobs (push) Has been cancelled
dbt_run_moments_metadata / run_dbt_jobs (push) Has been cancelled
dbt_observability_models / run_dbt_jobs (push) Has been cancelled
dbt_run_streamline_evm_daily_silver / run_dbt_jobs (push) Has been cancelled
dbt_run_evm_decoded_logs / run_dbt_jobs (push) Has been cancelled
dbt_run_streamline_transactions / run_dbt_jobs (push) Has been cancelled
dbt_run_streamline_transaction_results / run_dbt_jobs (push) Has been cancelled
dbt_run_streamline_external_realtime / run_dbt_jobs (push) Has been cancelled
dbt_run_streamline_external_points_balances_realtime / run_dbt_jobs (push) Has been cancelled
dbt_run_streamline_evm_realtime / run_dbt_jobs (push) Has been cancelled
dbt_run_scheduled / run_dbt_jobs (push) Has been cancelled
dbt_run_streamline_blocks / run_dbt_jobs (push) Has been cancelled
dbt_run_evm / run_dbt_jobs (push) Has been cancelled
dbt_run_scheduled_non_core / run_dbt_jobs (push) Has been cancelled
dbt_run_streamline_collections / run_dbt_jobs (push) Has been cancelled
docs_update / notify-failure (push) Has been cancelled
dbt_run_streamline_evm_daily_realtime / notify-failure (push) Has been cancelled
dbt_run_moments_metadata / notify-failure (push) Has been cancelled
dbt_observability_models / notify-failure (push) Has been cancelled
dbt_run_streamline_evm_daily_silver / notify-failure (push) Has been cancelled
dbt_run_evm_decoded_logs / notify-failure (push) Has been cancelled
dbt_run_streamline_transactions / notify-failure (push) Has been cancelled
dbt_run_streamline_transaction_results / notify-failure (push) Has been cancelled
dbt_run_streamline_external_realtime / notify-failure (push) Has been cancelled
dbt_run_streamline_external_points_balances_realtime / notify-failure (push) Has been cancelled
dbt_run_streamline_evm_realtime / notify-failure (push) Has been cancelled
dbt_run_scheduled / notify-failure (push) Has been cancelled
dbt_run_streamline_blocks / notify-failure (push) Has been cancelled
dbt_run_evm / notify-failure (push) Has been cancelled
dbt_run_scheduled_non_core / notify-failure (push) Has been cancelled
dbt_run_streamline_collections / notify-failure (push) Has been cancelled
dbt_run_streamline_decoded_logs_history / run_dbt_jobs (push) Has been cancelled
dbt_run_streamline_decoded_logs_history / notify-failure (push) Has been cancelled
AN-6231/modernize incremental predicates (#442)
* upd incr predicate on silver core models

* upd rest of the models

* rm predicate from d+i model
2025-08-06 12:40:17 -04:00

144 lines
4.4 KiB
SQL

{{ config(
materialized = 'incremental',
unique_key = 'event_id',
incremental_strategy = 'merge',
incremental_predicates = ["dynamic_range_predicate", "block_timestamp::date"],
merge_exclude_columns = ["inserted_timestamp"],
cluster_by = "block_timestamp::date",
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(tx_id,event_id,event_contract,event_type);",
tags = ['core', 'streamline_scheduled', 'scheduled', 'scheduled_core']
) }}
WITH transactions AS (
SELECT
*
FROM
{{ ref('silver__streamline_transactions_final') }}
WHERE
NOT pending_result_response
{% if is_incremental() %}
AND modified_timestamp >= (
SELECT
MAX(modified_timestamp)
FROM
{{ this }}
)
{% endif %}
),
flatten_events AS (
SELECT
block_height,
block_timestamp,
tx_id,
tx_succeeded,
events_count,
VALUE :: variant AS event_data_full,
VALUE :event_index :: INT AS event_index,
concat_ws(
'-',
tx_id,
event_index
) AS event_id,
VALUE :payload :: STRING AS payload,
TRY_PARSE_JSON(utils.udf_hex_to_string(payload)) AS decoded_payload,
VALUE :type :: STRING AS event_type_id,
VALUE :values :: variant AS event_values,
COALESCE(
SUBSTR(
VALUE :type :: STRING,
0,
LENGTH(
VALUE :type :: STRING
) - LENGTH(SPLIT(VALUE :type :: STRING, '.') [3]) - 1
),
-- if null, then flow.<event_type>
SPLIT(
VALUE :type :: STRING,
'.'
) [0]
) AS event_contract,
COALESCE(
SPLIT(
VALUE :type :: STRING,
'.'
) [3],
-- if null, then flow.<event_type>
SPLIT(
VALUE :type :: STRING,
'.'
) [1]
) :: STRING AS event_type,
_inserted_timestamp,
_partition_by_block_id
FROM
transactions t,
LATERAL FLATTEN(
input => events
) e
QUALIFY ROW_NUMBER() OVER (PARTITION BY event_id ORDER BY _inserted_timestamp DESC) = 1
),
attributes AS (
SELECT
event_id,
OBJECT_AGG(
data_key,
IFF(IS_ARRAY(TRY_PARSE_JSON(data_value)) OR IS_OBJECT(TRY_PARSE_JSON(data_value)), PARSE_JSON(data_value)::VARIANT, data_value::VARIANT)
) AS event_data
FROM
(
SELECT
event_id,
VALUE :name :: variant AS data_key,
COALESCE(
VALUE :value :value :fields,
VALUE :value :value :staticType,
VALUE :value :value :value :value :: STRING,
VALUE :value :value :value :: STRING,
VALUE :value :value :: STRING,
'null'
) AS data_value
FROM
flatten_events,
LATERAL FLATTEN (
COALESCE(
decoded_payload :value :fields :: variant,
event_values :value :fields :: variant
)
)
)
GROUP BY
1
),
FINAL AS (
SELECT
e.tx_id,
e.block_height,
e.block_timestamp,
e.event_id,
e.event_index,
e.events_count,
e.payload,
e.event_contract,
e.event_type,
A.event_data,
e.tx_succeeded,
e._inserted_timestamp,
e._partition_by_block_id,
{{ dbt_utils.generate_surrogate_key(
['event_id']
) }} AS streamline_event_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
flatten_events e
LEFT JOIN attributes A USING (event_id)
)
SELECT
*
FROM
FINAL