blast-models/models/streamline/silver/decoder/realtime/streamline__decode_logs_realtime.sql
drethereum 13ba135a6c
AN-5335-v2/blast-standardization (#79)
* new columns ymls and docs

* new column in silver logs

* refs to core fact tables

* ymls and new columns

* minor updates

* columns

* docs

* updates for native transfer

* updates

* deprecation notices

* native transfers table

* core column changes and docs

* full_decoded_log changes

* merge

* misc updates for deprecating columns

* tx and trace status

* misc updates

* remove comments

* remove trace_address from native_transfers

* timestamps for native

* docs

* token transfers

* missing_decoding macro

* l1 fee precise

* removed trace address column from silver native transfers

* deprecation date

* ez decoded_event_logs

* standard pred
2025-02-12 13:59:21 -07:00

84 lines
2.2 KiB
SQL

{{ config (
materialized = "view",
post_hook = [if_data_call_function(
func = "{{this.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{this.identifier}}', 'external_table', 'DECODED_LOGS', 'sql_limit', {{var('sql_limit','2000000')}}, 'producer_batch_size', {{var('producer_batch_size','400000')}}, 'worker_batch_size', {{var('worker_batch_size','200000')}}))",
target = "{{this.schema}}.{{this.identifier}}"
),
"call system$wait(" ~ var("WAIT", 400) ~ ")" ],
tags = ['streamline_decoded_logs_realtime']
) }}
WITH target_blocks AS (
SELECT
block_number
FROM
{{ ref('core__fact_blocks') }}
WHERE
block_number >= (
SELECT
block_number
FROM
{{ ref("_block_lookback") }}
)
),
existing_logs_to_exclude AS (
SELECT
_log_id
FROM
{{ ref('streamline__complete_decode_logs') }}
l
INNER JOIN target_blocks b USING (block_number)
WHERE
l._inserted_timestamp :: DATE >= DATEADD('day', -5, SYSDATE())
),
candidate_logs AS (
SELECT
l.block_number,
l.tx_hash,
l.event_index,
l.contract_address,
l.topics,
l.data,
CONCAT(
l.tx_hash :: STRING,
'-',
l.event_index :: STRING
) AS _log_id
FROM
target_blocks b
INNER JOIN {{ ref('core__fact_event_logs') }}
l USING (block_number)
WHERE
l.tx_succeeded
AND l.inserted_timestamp :: DATE >= DATEADD('day', -5, SYSDATE())
)
SELECT
l.block_number,
l._log_id,
A.abi AS abi,
OBJECT_CONSTRUCT(
'topics',
l.topics,
'data',
l.data,
'address',
l.contract_address
) AS DATA
FROM
candidate_logs l
INNER JOIN {{ ref('silver__complete_event_abis') }} A
ON A.parent_contract_address = l.contract_address
AND A.event_signature = l.topics [0] :: STRING
AND l.block_number BETWEEN A.start_block
AND A.end_block
WHERE
NOT EXISTS (
SELECT
1
FROM
existing_logs_to_exclude e
WHERE
e._log_id = l._log_id
)
limit 7500000