mirror of
https://github.com/FlipsideCrypto/berachain-models.git
synced 2026-02-06 11:16:47 +00:00
updates
This commit is contained in:
parent
abb91958f5
commit
ab0437333b
@ -1,12 +1,11 @@
|
||||
name: dbt_run_streamline_decoder_history
|
||||
run-name: dbt_run_streamline_decoder_history
|
||||
name: dbt_run_scheduled_decoded_logs_history_user_abis
|
||||
run-name: dbt_run_scheduled_decoded_logs_history_user_abis
|
||||
|
||||
on:
|
||||
workflow_dispatch:
|
||||
schedule:
|
||||
# Runs “At minute 32 past every 12th hour.” (see https://crontab.guru)
|
||||
- cron: '32 */12 * * *'
|
||||
|
||||
branches:
|
||||
- "main"
|
||||
|
||||
env:
|
||||
DBT_PROFILES_DIR: ./
|
||||
|
||||
@ -22,12 +21,10 @@ env:
|
||||
concurrency:
|
||||
group: ${{ github.workflow }}
|
||||
|
||||
|
||||
|
||||
jobs:
|
||||
run_dbt_jobs:
|
||||
runs-on: ubuntu-latest
|
||||
environment:
|
||||
environment:
|
||||
name: workflow_prod
|
||||
|
||||
steps:
|
||||
@ -42,6 +39,7 @@ jobs:
|
||||
run: |
|
||||
pip install -r requirements.txt
|
||||
dbt deps
|
||||
- name: Run DBT Jobs
|
||||
|
||||
- name: Kick off decoded logs history, if there are new ABIs from users
|
||||
run: |
|
||||
dbt run --threads 8 --vars '{"STREAMLINE_INVOKE_STREAMS":True,"WAIT":240}' -m "berachain_models,tag:streamline_decoded_logs_history" "berachain_models,tag:streamline_decoded_logs_complete"
|
||||
dbt run-operation run_decoded_logs_history
|
||||
49
.github/workflows/dbt_run_streamline_decoded_logs_history.yml
vendored
Normal file
49
.github/workflows/dbt_run_streamline_decoded_logs_history.yml
vendored
Normal file
@ -0,0 +1,49 @@
|
||||
name: dbt_run_streamline_decoded_logs_history
|
||||
run-name: dbt_run_streamline_decoded_logs_history
|
||||
|
||||
on:
|
||||
workflow_dispatch:
|
||||
branches:
|
||||
- "main"
|
||||
|
||||
env:
|
||||
DBT_PROFILES_DIR: ./
|
||||
|
||||
ACCOUNT: "${{ vars.ACCOUNT }}"
|
||||
ROLE: "${{ vars.ROLE }}"
|
||||
USER: "${{ vars.USER }}"
|
||||
PASSWORD: "${{ secrets.PASSWORD }}"
|
||||
REGION: "${{ vars.REGION }}"
|
||||
DATABASE: "${{ vars.DATABASE }}"
|
||||
WAREHOUSE: "${{ vars.WAREHOUSE }}"
|
||||
SCHEMA: "${{ vars.SCHEMA }}"
|
||||
|
||||
concurrency:
|
||||
group: ${{ github.workflow }}
|
||||
|
||||
jobs:
|
||||
run_dbt_jobs:
|
||||
runs-on: ubuntu-latest
|
||||
environment:
|
||||
name: workflow_prod
|
||||
|
||||
steps:
|
||||
- uses: actions/checkout@v3
|
||||
|
||||
- uses: actions/setup-python@v4
|
||||
with:
|
||||
python-version: "3.10"
|
||||
cache: "pip"
|
||||
|
||||
- name: install dependencies
|
||||
run: |
|
||||
pip install -r requirements.txt
|
||||
dbt deps
|
||||
|
||||
- name: Update complete table
|
||||
run: |
|
||||
dbt run -m "berachain_models,tag:streamline_decoded_logs_complete"
|
||||
|
||||
- name: Decode historical logs
|
||||
run: |
|
||||
dbt run-operation decoded_logs_history --vars '{"STREAMLINE_INVOKE_STREAMS":True}'
|
||||
@ -2,4 +2,6 @@ workflow_name,workflow_schedule
|
||||
dbt_run_streamline_chainhead,"3,33 * * * *"
|
||||
dbt_run_scheduled_non_realtime,"19,49 * * * *"
|
||||
dbt_test_tasks,"8 * * * *"
|
||||
dbt_run_streamline_decoder,"27,57 * * * *"
|
||||
dbt_run_streamline_decoder,"27,57 * * * *"
|
||||
dbt_run_streamline_decoded_logs_history,"5 1 * * 6"
|
||||
dbt_run_scheduled_decoded_logs_history_user_abis,"21 23 * * *"
|
||||
|
129
macros/decoder/decoded_logs_history.sql
Normal file
129
macros/decoder/decoded_logs_history.sql
Normal file
@ -0,0 +1,129 @@
|
||||
{% macro decoded_logs_history(backfill_mode=false) %}
|
||||
|
||||
{%- set params = {
|
||||
"sql_limit": var("DECODED_LOGS_HISTORY_SQL_LIMIT", 7500000),
|
||||
"producer_batch_size": var("DECODED_LOGS_HISTORY_PRODUCER_BATCH_SIZE", 400000),
|
||||
"worker_batch_size": var("DECODED_LOGS_HISTORY_WORKER_BATCH_SIZE", 100000)
|
||||
} -%}
|
||||
|
||||
{% set wait_time = var("DECODED_LOGS_HISTORY_WAIT_TIME", 60) %}
|
||||
|
||||
{% set find_months_query %}
|
||||
SELECT
|
||||
DISTINCT date_trunc('month', block_timestamp)::date as month
|
||||
FROM {{ ref('testnet__fact_blocks') }}
|
||||
ORDER BY month ASC
|
||||
{% endset %}
|
||||
|
||||
{% set results = run_query(find_months_query) %}
|
||||
|
||||
{% if execute %}
|
||||
{% set months = results.columns[0].values() %}
|
||||
|
||||
{% for month in months %}
|
||||
{% set view_name = 'decoded_logs_history_' ~ month.strftime('%Y_%m') %}
|
||||
|
||||
{% set create_view_query %}
|
||||
create or replace view streamline.{{view_name}} as (
|
||||
WITH target_blocks AS (
|
||||
SELECT
|
||||
block_number
|
||||
FROM {{ ref('testnet__fact_blocks') }}
|
||||
WHERE date_trunc('month', block_timestamp) = '{{month}}'::timestamp
|
||||
),
|
||||
new_abis AS (
|
||||
SELECT
|
||||
abi,
|
||||
parent_contract_address,
|
||||
event_signature,
|
||||
start_block,
|
||||
end_block
|
||||
FROM {{ ref('silver_testnet__complete_event_abis') }}
|
||||
{% if not backfill_mode %}
|
||||
WHERE inserted_timestamp > dateadd('day', -30, sysdate())
|
||||
{% endif %}
|
||||
),
|
||||
existing_logs_to_exclude AS (
|
||||
SELECT _log_id
|
||||
FROM {{ ref('streamline__testnet_complete_decode_logs') }} l
|
||||
INNER JOIN target_blocks b using (block_number)
|
||||
),
|
||||
candidate_logs AS (
|
||||
SELECT
|
||||
l.block_number,
|
||||
l.tx_hash,
|
||||
l.event_index,
|
||||
l.contract_address,
|
||||
l.topics,
|
||||
l.data,
|
||||
concat(l.tx_hash::string, '-', l.event_index::string) as _log_id
|
||||
FROM target_blocks b
|
||||
INNER JOIN {{ ref('testnet__fact_event_logs') }} l using (block_number)
|
||||
WHERE l.tx_succeeded and date_trunc('month', l.block_timestamp) = '{{month}}'::timestamp
|
||||
)
|
||||
SELECT
|
||||
l.block_number,
|
||||
l._log_id,
|
||||
A.abi,
|
||||
OBJECT_CONSTRUCT(
|
||||
'topics', l.topics,
|
||||
'data', l.data,
|
||||
'address', l.contract_address
|
||||
) AS data
|
||||
FROM candidate_logs l
|
||||
INNER JOIN new_abis A
|
||||
ON A.parent_contract_address = l.contract_address
|
||||
AND A.event_signature = l.topics[0]::STRING
|
||||
AND l.block_number BETWEEN A.start_block AND A.end_block
|
||||
WHERE NOT EXISTS (
|
||||
SELECT 1
|
||||
FROM existing_logs_to_exclude e
|
||||
WHERE e._log_id = l._log_id
|
||||
)
|
||||
LIMIT {{ params.sql_limit }}
|
||||
)
|
||||
{% endset %}
|
||||
|
||||
{# Create the view #}
|
||||
{% do run_query(create_view_query) %}
|
||||
{{ log("Created view for month " ~ month.strftime('%Y-%m'), info=True) }}
|
||||
|
||||
{% if var("STREAMLINE_INVOKE_STREAMS", false) %}
|
||||
{# Check if rows exist first #}
|
||||
{% set check_rows_query %}
|
||||
SELECT EXISTS(SELECT 1 FROM streamline.{{view_name}} LIMIT 1)
|
||||
{% endset %}
|
||||
|
||||
{% set results = run_query(check_rows_query) %}
|
||||
{% set has_rows = results.columns[0].values()[0] %}
|
||||
|
||||
{% if has_rows %}
|
||||
{# Invoke streamline since rows exist to decode #}
|
||||
{% set decode_query %}
|
||||
SELECT
|
||||
streamline.udf_bulk_decode_logs_v2(
|
||||
PARSE_JSON(
|
||||
$${ "external_table": "testnet_decoded_logs",
|
||||
"producer_batch_size": {{ params.producer_batch_size }},
|
||||
"sql_limit": {{ params.sql_limit }},
|
||||
"sql_source": "{{view_name}}",
|
||||
"worker_batch_size": {{ params.worker_batch_size }} }$$
|
||||
)
|
||||
);
|
||||
{% endset %}
|
||||
|
||||
{% do run_query(decode_query) %}
|
||||
{{ log("Triggered decoding for month " ~ month.strftime('%Y-%m'), info=True) }}
|
||||
|
||||
{# Call wait since we actually did some decoding #}
|
||||
{% do run_query("call system$wait(" ~ wait_time ~ ")") %}
|
||||
{{ log("Completed wait after decoding for month " ~ month.strftime('%Y-%m'), info=True) }}
|
||||
{% else %}
|
||||
{{ log("No rows to decode for month " ~ month.strftime('%Y-%m'), info=True) }}
|
||||
{% endif %}
|
||||
{% endif %}
|
||||
|
||||
{% endfor %}
|
||||
{% endif %}
|
||||
|
||||
{% endmacro %}
|
||||
29
macros/decoder/run_decoded_logs_history.sql
Normal file
29
macros/decoder/run_decoded_logs_history.sql
Normal file
@ -0,0 +1,29 @@
|
||||
{% macro run_decoded_logs_history() %}
|
||||
|
||||
{% set check_for_new_user_abis_query %}
|
||||
select 1
|
||||
from {{ ref('silver__user_verified_abis') }}
|
||||
where _inserted_timestamp::date = sysdate()::date
|
||||
and dayname(sysdate()) <> 'Sat'
|
||||
{% endset %}
|
||||
|
||||
{% set results = run_query(check_for_new_user_abis_query) %}
|
||||
|
||||
{% if execute %}
|
||||
{% set new_user_abis = results.columns[0].values()[0] %}
|
||||
|
||||
{% if new_user_abis %}
|
||||
{% set invoke_workflow_query %}
|
||||
SELECT
|
||||
github_actions.workflow_dispatches(
|
||||
'FlipsideCrypto',
|
||||
'berachain-models',
|
||||
'dbt_run_streamline_decoded_logs_history.yml',
|
||||
NULL
|
||||
)
|
||||
{% endset %}
|
||||
|
||||
{% do run_query(invoke_workflow_query) %}
|
||||
{% endif %}
|
||||
{% endif %}
|
||||
{% endmacro %}
|
||||
@ -1,21 +0,0 @@
|
||||
{{ config (
|
||||
materialized = "view",
|
||||
post_hook = [fsc_utils.if_data_call_function_v2(
|
||||
func = 'streamline.udf_bulk_decode_logs_v2',
|
||||
target = "{{this.schema}}.{{this.identifier}}",
|
||||
params ={ "external_table" :"testnet_decoded_logs",
|
||||
"sql_limit" :"7500000",
|
||||
"producer_batch_size" :"400000",
|
||||
"worker_batch_size" :"200000",
|
||||
"sql_source" :"{{this.identifier}}" } ),
|
||||
if_data_call_wait()
|
||||
],
|
||||
tags = ['streamline_decoded_logs_history']
|
||||
) }}
|
||||
|
||||
{% set start = this.identifier.split("_") [-2] %}
|
||||
{% set stop = this.identifier.split("_") [-1] %}
|
||||
{{ decode_logs_history(
|
||||
start,
|
||||
stop
|
||||
) }}
|
||||
@ -1,21 +0,0 @@
|
||||
{{ config (
|
||||
materialized = "view",
|
||||
post_hook = [fsc_utils.if_data_call_function_v2(
|
||||
func = 'streamline.udf_bulk_decode_logs_v2',
|
||||
target = "{{this.schema}}.{{this.identifier}}",
|
||||
params ={ "external_table" :"testnet_decoded_logs",
|
||||
"sql_limit" :"7500000",
|
||||
"producer_batch_size" :"400000",
|
||||
"worker_batch_size" :"200000",
|
||||
"sql_source" :"{{this.identifier}}" } ),
|
||||
if_data_call_wait()
|
||||
],
|
||||
tags = ['streamline_decoded_logs_history']
|
||||
) }}
|
||||
|
||||
{% set start = this.identifier.split("_") [-2] %}
|
||||
{% set stop = this.identifier.split("_") [-1] %}
|
||||
{{ decode_logs_history(
|
||||
start,
|
||||
stop
|
||||
) }}
|
||||
@ -1,21 +0,0 @@
|
||||
{{ config (
|
||||
materialized = "view",
|
||||
post_hook = [fsc_utils.if_data_call_function_v2(
|
||||
func = 'streamline.udf_bulk_decode_logs_v2',
|
||||
target = "{{this.schema}}.{{this.identifier}}",
|
||||
params ={ "external_table" :"testnet_decoded_logs",
|
||||
"sql_limit" :"7500000",
|
||||
"producer_batch_size" :"400000",
|
||||
"worker_batch_size" :"200000",
|
||||
"sql_source" :"{{this.identifier}}" } ),
|
||||
if_data_call_wait()
|
||||
],
|
||||
tags = ['streamline_decoded_logs_history']
|
||||
) }}
|
||||
|
||||
{% set start = this.identifier.split("_") [-2] %}
|
||||
{% set stop = this.identifier.split("_") [-1] %}
|
||||
{{ decode_logs_history(
|
||||
start,
|
||||
stop
|
||||
) }}
|
||||
@ -1,21 +0,0 @@
|
||||
{{ config (
|
||||
materialized = "view",
|
||||
post_hook = [fsc_utils.if_data_call_function_v2(
|
||||
func = 'streamline.udf_bulk_decode_logs_v2',
|
||||
target = "{{this.schema}}.{{this.identifier}}",
|
||||
params ={ "external_table" :"testnet_decoded_logs",
|
||||
"sql_limit" :"7500000",
|
||||
"producer_batch_size" :"400000",
|
||||
"worker_batch_size" :"200000",
|
||||
"sql_source" :"{{this.identifier}}" } ),
|
||||
if_data_call_wait()
|
||||
],
|
||||
tags = ['streamline_decoded_logs_history']
|
||||
) }}
|
||||
|
||||
{% set start = this.identifier.split("_") [-2] %}
|
||||
{% set stop = this.identifier.split("_") [-1] %}
|
||||
{{ decode_logs_history(
|
||||
start,
|
||||
stop
|
||||
) }}
|
||||
@ -1,21 +0,0 @@
|
||||
{{ config (
|
||||
materialized = "view",
|
||||
post_hook = [fsc_utils.if_data_call_function_v2(
|
||||
func = 'streamline.udf_bulk_decode_logs_v2',
|
||||
target = "{{this.schema}}.{{this.identifier}}",
|
||||
params ={ "external_table" :"testnet_decoded_logs",
|
||||
"sql_limit" :"7500000",
|
||||
"producer_batch_size" :"400000",
|
||||
"worker_batch_size" :"200000",
|
||||
"sql_source" :"{{this.identifier}}" } ),
|
||||
if_data_call_wait()
|
||||
],
|
||||
tags = ['streamline_decoded_logs_history']
|
||||
) }}
|
||||
|
||||
{% set start = this.identifier.split("_") [-2] %}
|
||||
{% set stop = this.identifier.split("_") [-1] %}
|
||||
{{ decode_logs_history(
|
||||
start,
|
||||
stop
|
||||
) }}
|
||||
@ -1,21 +0,0 @@
|
||||
{{ config (
|
||||
materialized = "view",
|
||||
post_hook = [fsc_utils.if_data_call_function_v2(
|
||||
func = 'streamline.udf_bulk_decode_logs_v2',
|
||||
target = "{{this.schema}}.{{this.identifier}}",
|
||||
params ={ "external_table" :"testnet_decoded_logs",
|
||||
"sql_limit" :"7500000",
|
||||
"producer_batch_size" :"400000",
|
||||
"worker_batch_size" :"200000",
|
||||
"sql_source" :"{{this.identifier}}" } ),
|
||||
if_data_call_wait()
|
||||
],
|
||||
tags = ['streamline_decoded_logs_history']
|
||||
) }}
|
||||
|
||||
{% set start = this.identifier.split("_") [-2] %}
|
||||
{% set stop = this.identifier.split("_") [-1] %}
|
||||
{{ decode_logs_history(
|
||||
start,
|
||||
stop
|
||||
) }}
|
||||
@ -1,21 +0,0 @@
|
||||
{{ config (
|
||||
materialized = "view",
|
||||
post_hook = [fsc_utils.if_data_call_function_v2(
|
||||
func = 'streamline.udf_bulk_decode_logs_v2',
|
||||
target = "{{this.schema}}.{{this.identifier}}",
|
||||
params ={ "external_table" :"testnet_decoded_logs",
|
||||
"sql_limit" :"7500000",
|
||||
"producer_batch_size" :"400000",
|
||||
"worker_batch_size" :"200000",
|
||||
"sql_source" :"{{this.identifier}}" } ),
|
||||
if_data_call_wait()
|
||||
],
|
||||
tags = ['streamline_decoded_logs_history']
|
||||
) }}
|
||||
|
||||
{% set start = this.identifier.split("_") [-2] %}
|
||||
{% set stop = this.identifier.split("_") [-1] %}
|
||||
{{ decode_logs_history(
|
||||
start,
|
||||
stop
|
||||
) }}
|
||||
@ -1,21 +0,0 @@
|
||||
{{ config (
|
||||
materialized = "view",
|
||||
post_hook = [fsc_utils.if_data_call_function_v2(
|
||||
func = 'streamline.udf_bulk_decode_logs_v2',
|
||||
target = "{{this.schema}}.{{this.identifier}}",
|
||||
params ={ "external_table" :"testnet_decoded_logs",
|
||||
"sql_limit" :"7500000",
|
||||
"producer_batch_size" :"400000",
|
||||
"worker_batch_size" :"200000",
|
||||
"sql_source" :"{{this.identifier}}" } ),
|
||||
if_data_call_wait()
|
||||
],
|
||||
tags = ['streamline_decoded_logs_history']
|
||||
) }}
|
||||
|
||||
{% set start = this.identifier.split("_") [-2] %}
|
||||
{% set stop = this.identifier.split("_") [-1] %}
|
||||
{{ decode_logs_history(
|
||||
start,
|
||||
stop
|
||||
) }}
|
||||
@ -1,21 +0,0 @@
|
||||
{{ config (
|
||||
materialized = "view",
|
||||
post_hook = [fsc_utils.if_data_call_function_v2(
|
||||
func = 'streamline.udf_bulk_decode_logs_v2',
|
||||
target = "{{this.schema}}.{{this.identifier}}",
|
||||
params ={ "external_table" :"testnet_decoded_logs",
|
||||
"sql_limit" :"7500000",
|
||||
"producer_batch_size" :"400000",
|
||||
"worker_batch_size" :"200000",
|
||||
"sql_source" :"{{this.identifier}}" } ),
|
||||
if_data_call_wait()
|
||||
],
|
||||
tags = ['streamline_decoded_logs_history']
|
||||
) }}
|
||||
|
||||
{% set start = this.identifier.split("_") [-2] %}
|
||||
{% set stop = this.identifier.split("_") [-1] %}
|
||||
{{ decode_logs_history(
|
||||
start,
|
||||
stop
|
||||
) }}
|
||||
@ -1,21 +0,0 @@
|
||||
{{ config (
|
||||
materialized = "view",
|
||||
post_hook = [fsc_utils.if_data_call_function_v2(
|
||||
func = 'streamline.udf_bulk_decode_logs_v2',
|
||||
target = "{{this.schema}}.{{this.identifier}}",
|
||||
params ={ "external_table" :"testnet_decoded_logs",
|
||||
"sql_limit" :"7500000",
|
||||
"producer_batch_size" :"400000",
|
||||
"worker_batch_size" :"200000",
|
||||
"sql_source" :"{{this.identifier}}" } ),
|
||||
if_data_call_wait()
|
||||
],
|
||||
tags = ['streamline_decoded_logs_history']
|
||||
) }}
|
||||
|
||||
{% set start = this.identifier.split("_") [-2] %}
|
||||
{% set stop = this.identifier.split("_") [-1] %}
|
||||
{{ decode_logs_history(
|
||||
start,
|
||||
stop
|
||||
) }}
|
||||
@ -1,21 +0,0 @@
|
||||
{{ config (
|
||||
materialized = "view",
|
||||
post_hook = [fsc_utils.if_data_call_function_v2(
|
||||
func = 'streamline.udf_bulk_decode_logs_v2',
|
||||
target = "{{this.schema}}.{{this.identifier}}",
|
||||
params ={ "external_table" :"testnet_decoded_logs",
|
||||
"sql_limit" :"7500000",
|
||||
"producer_batch_size" :"400000",
|
||||
"worker_batch_size" :"200000",
|
||||
"sql_source" :"{{this.identifier}}" } ),
|
||||
if_data_call_wait()
|
||||
],
|
||||
tags = ['streamline_decoded_logs_history']
|
||||
) }}
|
||||
|
||||
{% set start = this.identifier.split("_") [-2] %}
|
||||
{% set stop = this.identifier.split("_") [-1] %}
|
||||
{{ decode_logs_history(
|
||||
start,
|
||||
stop
|
||||
) }}
|
||||
@ -12,12 +12,50 @@
|
||||
tags = ['streamline_decoded_logs_realtime']
|
||||
) }}
|
||||
|
||||
WITH look_back AS (
|
||||
WITH target_blocks AS (
|
||||
|
||||
SELECT
|
||||
SELECT
|
||||
block_number
|
||||
FROM
|
||||
{{ ref('testnet__fact_blocks') }}
|
||||
WHERE
|
||||
block_number >= (
|
||||
SELECT
|
||||
block_number
|
||||
FROM
|
||||
{{ ref("_block_lookback") }}
|
||||
)
|
||||
),
|
||||
existing_logs_to_exclude AS (
|
||||
SELECT
|
||||
_log_id
|
||||
FROM
|
||||
{{ ref("_24_hour_lookback") }}
|
||||
{{ ref('streamline__testnet_complete_decode_logs') }}
|
||||
l
|
||||
INNER JOIN target_blocks b USING (block_number)
|
||||
WHERE
|
||||
l._inserted_timestamp :: DATE >= DATEADD('day', -5, SYSDATE())
|
||||
),
|
||||
candidate_logs AS (
|
||||
SELECT
|
||||
l.block_number,
|
||||
l.tx_hash,
|
||||
l.event_index,
|
||||
l.contract_address,
|
||||
l.topics,
|
||||
l.data,
|
||||
CONCAT(
|
||||
l.tx_hash :: STRING,
|
||||
'-',
|
||||
l.event_index :: STRING
|
||||
) AS _log_id
|
||||
FROM
|
||||
target_blocks b
|
||||
INNER JOIN {{ ref('testnet__fact_event_logs') }}
|
||||
l USING (block_number)
|
||||
WHERE
|
||||
l.tx_succeeded
|
||||
AND l.inserted_timestamp :: DATE >= DATEADD('day', -5, SYSDATE())
|
||||
)
|
||||
SELECT
|
||||
l.block_number,
|
||||
@ -32,35 +70,19 @@ SELECT
|
||||
l.contract_address
|
||||
) AS DATA
|
||||
FROM
|
||||
{{ ref("silver_testnet__logs") }}
|
||||
l
|
||||
INNER JOIN {{ ref("silver_testnet__complete_event_abis") }} A
|
||||
candidate_logs l
|
||||
INNER JOIN {{ ref('silver_testnet__complete_event_abis') }} A
|
||||
ON A.parent_contract_address = l.contract_address
|
||||
AND A.event_signature = l.topics [0] :: STRING
|
||||
AND l.block_number BETWEEN A.start_block
|
||||
AND A.end_block
|
||||
WHERE
|
||||
(
|
||||
l.block_number >= (
|
||||
SELECT
|
||||
block_number
|
||||
FROM
|
||||
look_back
|
||||
)
|
||||
)
|
||||
AND l.block_number IS NOT NULL
|
||||
AND l.block_timestamp >= DATEADD('day', -2, CURRENT_DATE())
|
||||
AND _log_id NOT IN (
|
||||
NOT EXISTS (
|
||||
SELECT
|
||||
_log_id
|
||||
1
|
||||
FROM
|
||||
{{ ref("streamline__testnet_complete_decode_logs") }}
|
||||
existing_logs_to_exclude e
|
||||
WHERE
|
||||
block_number >= (
|
||||
SELECT
|
||||
block_number
|
||||
FROM
|
||||
look_back
|
||||
)
|
||||
AND _inserted_timestamp >= DATEADD('day', -2, CURRENT_DATE())
|
||||
)
|
||||
e._log_id = l._log_id
|
||||
)
|
||||
limit 7500000
|
||||
Loading…
Reference in New Issue
Block a user