arbitrum-models/models/streamline/silver/decoder/realtime/streamline__decode_logs_realtime.sql
Sam 9fb6bb11fd
An 5332/arb standardization v2 (#301)
* decoded logs

* core changes

* update silver core

* add bridge

* add dex

* add lending

* add nft

* add nft remaining

* fix nft

* add olas

* docs

* add docs

* remove dupe

* remove origin from silver nft transfers

* remove changes to fact traces col positions

* misc

* revert native transfer changes

* revert native transfers

* docs

* remove identifier, clean up

* revert

* tests

* base feee

* update deprecation date to March 10
2025-02-10 21:21:28 +07:00

80 lines
2.5 KiB
SQL

{{ config (
materialized = "view",
post_hook = [if_data_call_function( func = "{{this.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{this.identifier}}','producer_batch_size', 20000000,'producer_limit_size', 20000000))", target = "{{this.schema}}.{{this.identifier}}" ),"call system$wait(" ~ var("WAIT", 400) ~ ")" ],
tags = ['streamline_decoded_logs_realtime']
) }}
WITH target_blocks AS (
SELECT
block_number
FROM
{{ ref('core__fact_blocks') }}
WHERE
block_number >= (
SELECT
block_number
FROM
{{ ref("_block_lookback") }}
)
),
existing_logs_to_exclude AS (
SELECT
_log_id
FROM
{{ ref('streamline__complete_decode_logs') }}
l
INNER JOIN target_blocks b USING (block_number)
WHERE
l._inserted_timestamp :: DATE >= DATEADD('day', -5, SYSDATE())),
candidate_logs AS (
SELECT
l.block_number,
l.tx_hash,
l.event_index,
l.contract_address,
l.topics,
l.data,
CONCAT(
l.tx_hash :: STRING,
'-',
l.event_index :: STRING
) AS _log_id
FROM
target_blocks b
INNER JOIN {{ ref('core__fact_event_logs') }}
l USING (block_number)
WHERE
l.tx_succeeded
AND l.inserted_timestamp :: DATE >= DATEADD('day', -5, SYSDATE()))
SELECT
l.block_number,
l._log_id,
A.abi AS abi,
OBJECT_CONSTRUCT(
'topics',
l.topics,
'data',
l.data,
'address',
l.contract_address
) AS DATA
FROM
candidate_logs l
INNER JOIN {{ ref('silver__complete_event_abis') }} A
ON A.parent_contract_address = l.contract_address
AND A.event_signature = l.topics [0] :: STRING
AND l.block_number BETWEEN A.start_block
AND A.end_block
WHERE
NOT EXISTS (
SELECT
1
FROM
existing_logs_to_exclude e
WHERE
e._log_id = l._log_id
)
LIMIT
7500000