optimism-models/models/streamline/silver/decoder/realtime/streamline__decode_logs_realtime.sql
drethereum 2c695fc815
An-5333-v2/op-standardization (#312)
* new columns, deprecation notices, docs

* docs and ymls

* new column in silver logs

* spacing

* spacing

* new decoded logs columns, updates for new core columns

* updates for column changes

* nft updates

* minor updates

* comments

* docs

* updates for native transfers

* update for identifier

* updates for identifier

* deprecation notices updated

* table notices

* native transfers

* core column changes and docs

* misc updates

* typo

* ref

* removed trace_address from native_transfers

* misc updates

* timestamps for native

* missing changes

* misc updates

* nft updates

* missing_decoding macro

* l1 fee precise docs

* removed trace address column from silver native transfers

* deprecation date

* ez_decoded_event_logs

* standard pred

* dbt_mega
2025-02-12 13:30:50 -07:00

80 lines
2.0 KiB
SQL

{{ config (
materialized = "view",
post_hook = [if_data_call_function( func = "{{this.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{this.identifier}}','producer_batch_size', 20000000,'producer_limit_size', 20000000))", target = "{{this.schema}}.{{this.identifier}}" ),"call system$wait(" ~ var("WAIT", 400) ~ ")" ],
tags = ['streamline_decoded_logs_realtime']
) }}
WITH target_blocks AS (
SELECT
block_number
FROM
{{ ref('core__fact_blocks') }}
WHERE
block_number >= (
SELECT
block_number
FROM
{{ ref("_block_lookback") }}
)
),
existing_logs_to_exclude AS (
SELECT
_log_id
FROM
{{ ref('streamline__complete_decode_logs') }}
l
INNER JOIN target_blocks b USING (block_number)
WHERE
l._inserted_timestamp :: DATE >= DATEADD('day', -5, SYSDATE())
),
candidate_logs AS (
SELECT
l.block_number,
l.tx_hash,
l.event_index,
l.contract_address,
l.topics,
l.data,
CONCAT(
l.tx_hash :: STRING,
'-',
l.event_index :: STRING
) AS _log_id
FROM
target_blocks b
INNER JOIN {{ ref('core__fact_event_logs') }}
l USING (block_number)
WHERE
l.tx_succeeded
AND l.inserted_timestamp :: DATE >= DATEADD('day', -5, SYSDATE())
)
SELECT
l.block_number,
l._log_id,
A.abi AS abi,
OBJECT_CONSTRUCT(
'topics',
l.topics,
'data',
l.data,
'address',
l.contract_address
) AS DATA
FROM
candidate_logs l
INNER JOIN {{ ref('silver__complete_event_abis') }} A
ON A.parent_contract_address = l.contract_address
AND A.event_signature = l.topics [0] :: STRING
AND l.block_number BETWEEN A.start_block
AND A.end_block
WHERE
NOT EXISTS (
SELECT
1
FROM
existing_logs_to_exclude e
WHERE
e._log_id = l._log_id
)
limit 7500000