From ab0437333be097d8486adca5d1cef2bb973092f3 Mon Sep 17 00:00:00 2001 From: Austin Date: Tue, 19 Nov 2024 11:07:32 -0500 Subject: [PATCH] updates --- ...eduled_decoded_logs_history_user_abis.yml} | 20 ++- ...bt_run_streamline_decoded_logs_history.yml | 49 +++++++ data/github_actions__workflows.csv | 4 +- macros/decoder/decoded_logs_history.sql | 129 ++++++++++++++++++ macros/decoder/run_decoded_logs_history.sql | 29 ++++ ...ode_logs_history_0000000000_0000817537.sql | 21 --- ...ode_logs_history_0000817538_0001295975.sql | 21 --- ...ode_logs_history_0001295976_0001526397.sql | 21 --- ...ode_logs_history_0001526398_0001712198.sql | 21 --- ...ode_logs_history_0001712199_0002027723.sql | 21 --- ...ode_logs_history_0002027724_0002221974.sql | 21 --- ...ode_logs_history_0002221975_0003000000.sql | 21 --- ...ode_logs_history_0003000001_0004000000.sql | 21 --- ...ode_logs_history_0004000001_0005000000.sql | 21 --- ...ode_logs_history_0005000001_0006000000.sql | 21 --- ...ode_logs_history_0006000001_0007000000.sql | 21 --- ...reamline__testnet_decode_logs_realtime.sql | 76 +++++++---- 17 files changed, 268 insertions(+), 270 deletions(-) rename .github/workflows/{dbt_run_streamline_decoder_history.yml => dbt_run_scheduled_decoded_logs_history_user_abis.yml} (61%) create mode 100644 .github/workflows/dbt_run_streamline_decoded_logs_history.yml create mode 100644 macros/decoder/decoded_logs_history.sql create mode 100644 macros/decoder/run_decoded_logs_history.sql delete mode 100644 models/streamline/decoder/history/streamline__decode_logs_history_0000000000_0000817537.sql delete mode 100644 models/streamline/decoder/history/streamline__decode_logs_history_0000817538_0001295975.sql delete mode 100644 models/streamline/decoder/history/streamline__decode_logs_history_0001295976_0001526397.sql delete mode 100644 models/streamline/decoder/history/streamline__decode_logs_history_0001526398_0001712198.sql delete mode 100644 models/streamline/decoder/history/streamline__decode_logs_history_0001712199_0002027723.sql delete mode 100644 models/streamline/decoder/history/streamline__decode_logs_history_0002027724_0002221974.sql delete mode 100644 models/streamline/decoder/history/streamline__decode_logs_history_0002221975_0003000000.sql delete mode 100644 models/streamline/decoder/history/streamline__decode_logs_history_0003000001_0004000000.sql delete mode 100644 models/streamline/decoder/history/streamline__decode_logs_history_0004000001_0005000000.sql delete mode 100644 models/streamline/decoder/history/streamline__decode_logs_history_0005000001_0006000000.sql delete mode 100644 models/streamline/decoder/history/streamline__decode_logs_history_0006000001_0007000000.sql diff --git a/.github/workflows/dbt_run_streamline_decoder_history.yml b/.github/workflows/dbt_run_scheduled_decoded_logs_history_user_abis.yml similarity index 61% rename from .github/workflows/dbt_run_streamline_decoder_history.yml rename to .github/workflows/dbt_run_scheduled_decoded_logs_history_user_abis.yml index 44e2a5f..63bf5a2 100644 --- a/.github/workflows/dbt_run_streamline_decoder_history.yml +++ b/.github/workflows/dbt_run_scheduled_decoded_logs_history_user_abis.yml @@ -1,12 +1,11 @@ -name: dbt_run_streamline_decoder_history -run-name: dbt_run_streamline_decoder_history +name: dbt_run_scheduled_decoded_logs_history_user_abis +run-name: dbt_run_scheduled_decoded_logs_history_user_abis on: workflow_dispatch: - schedule: - # Runs “At minute 32 past every 12th hour.” (see https://crontab.guru) - - cron: '32 */12 * * *' - + branches: + - "main" + env: DBT_PROFILES_DIR: ./ @@ -22,12 +21,10 @@ env: concurrency: group: ${{ github.workflow }} - - jobs: run_dbt_jobs: runs-on: ubuntu-latest - environment: + environment: name: workflow_prod steps: @@ -42,6 +39,7 @@ jobs: run: | pip install -r requirements.txt dbt deps - - name: Run DBT Jobs + + - name: Kick off decoded logs history, if there are new ABIs from users run: | - dbt run --threads 8 --vars '{"STREAMLINE_INVOKE_STREAMS":True,"WAIT":240}' -m "berachain_models,tag:streamline_decoded_logs_history" "berachain_models,tag:streamline_decoded_logs_complete" \ No newline at end of file + dbt run-operation run_decoded_logs_history \ No newline at end of file diff --git a/.github/workflows/dbt_run_streamline_decoded_logs_history.yml b/.github/workflows/dbt_run_streamline_decoded_logs_history.yml new file mode 100644 index 0000000..15d5d30 --- /dev/null +++ b/.github/workflows/dbt_run_streamline_decoded_logs_history.yml @@ -0,0 +1,49 @@ +name: dbt_run_streamline_decoded_logs_history +run-name: dbt_run_streamline_decoded_logs_history + +on: + workflow_dispatch: + branches: + - "main" + +env: + DBT_PROFILES_DIR: ./ + + ACCOUNT: "${{ vars.ACCOUNT }}" + ROLE: "${{ vars.ROLE }}" + USER: "${{ vars.USER }}" + PASSWORD: "${{ secrets.PASSWORD }}" + REGION: "${{ vars.REGION }}" + DATABASE: "${{ vars.DATABASE }}" + WAREHOUSE: "${{ vars.WAREHOUSE }}" + SCHEMA: "${{ vars.SCHEMA }}" + +concurrency: + group: ${{ github.workflow }} + +jobs: + run_dbt_jobs: + runs-on: ubuntu-latest + environment: + name: workflow_prod + + steps: + - uses: actions/checkout@v3 + + - uses: actions/setup-python@v4 + with: + python-version: "3.10" + cache: "pip" + + - name: install dependencies + run: | + pip install -r requirements.txt + dbt deps + + - name: Update complete table + run: | + dbt run -m "berachain_models,tag:streamline_decoded_logs_complete" + + - name: Decode historical logs + run: | + dbt run-operation decoded_logs_history --vars '{"STREAMLINE_INVOKE_STREAMS":True}' \ No newline at end of file diff --git a/data/github_actions__workflows.csv b/data/github_actions__workflows.csv index 34bbb32..606b400 100644 --- a/data/github_actions__workflows.csv +++ b/data/github_actions__workflows.csv @@ -2,4 +2,6 @@ workflow_name,workflow_schedule dbt_run_streamline_chainhead,"3,33 * * * *" dbt_run_scheduled_non_realtime,"19,49 * * * *" dbt_test_tasks,"8 * * * *" -dbt_run_streamline_decoder,"27,57 * * * *" \ No newline at end of file +dbt_run_streamline_decoder,"27,57 * * * *" +dbt_run_streamline_decoded_logs_history,"5 1 * * 6" +dbt_run_scheduled_decoded_logs_history_user_abis,"21 23 * * *" \ No newline at end of file diff --git a/macros/decoder/decoded_logs_history.sql b/macros/decoder/decoded_logs_history.sql new file mode 100644 index 0000000..73f4b51 --- /dev/null +++ b/macros/decoder/decoded_logs_history.sql @@ -0,0 +1,129 @@ +{% macro decoded_logs_history(backfill_mode=false) %} + + {%- set params = { + "sql_limit": var("DECODED_LOGS_HISTORY_SQL_LIMIT", 7500000), + "producer_batch_size": var("DECODED_LOGS_HISTORY_PRODUCER_BATCH_SIZE", 400000), + "worker_batch_size": var("DECODED_LOGS_HISTORY_WORKER_BATCH_SIZE", 100000) + } -%} + + {% set wait_time = var("DECODED_LOGS_HISTORY_WAIT_TIME", 60) %} + + {% set find_months_query %} + SELECT + DISTINCT date_trunc('month', block_timestamp)::date as month + FROM {{ ref('testnet__fact_blocks') }} + ORDER BY month ASC + {% endset %} + + {% set results = run_query(find_months_query) %} + + {% if execute %} + {% set months = results.columns[0].values() %} + + {% for month in months %} + {% set view_name = 'decoded_logs_history_' ~ month.strftime('%Y_%m') %} + + {% set create_view_query %} + create or replace view streamline.{{view_name}} as ( + WITH target_blocks AS ( + SELECT + block_number + FROM {{ ref('testnet__fact_blocks') }} + WHERE date_trunc('month', block_timestamp) = '{{month}}'::timestamp + ), + new_abis AS ( + SELECT + abi, + parent_contract_address, + event_signature, + start_block, + end_block + FROM {{ ref('silver_testnet__complete_event_abis') }} + {% if not backfill_mode %} + WHERE inserted_timestamp > dateadd('day', -30, sysdate()) + {% endif %} + ), + existing_logs_to_exclude AS ( + SELECT _log_id + FROM {{ ref('streamline__testnet_complete_decode_logs') }} l + INNER JOIN target_blocks b using (block_number) + ), + candidate_logs AS ( + SELECT + l.block_number, + l.tx_hash, + l.event_index, + l.contract_address, + l.topics, + l.data, + concat(l.tx_hash::string, '-', l.event_index::string) as _log_id + FROM target_blocks b + INNER JOIN {{ ref('testnet__fact_event_logs') }} l using (block_number) + WHERE l.tx_succeeded and date_trunc('month', l.block_timestamp) = '{{month}}'::timestamp + ) + SELECT + l.block_number, + l._log_id, + A.abi, + OBJECT_CONSTRUCT( + 'topics', l.topics, + 'data', l.data, + 'address', l.contract_address + ) AS data + FROM candidate_logs l + INNER JOIN new_abis A + ON A.parent_contract_address = l.contract_address + AND A.event_signature = l.topics[0]::STRING + AND l.block_number BETWEEN A.start_block AND A.end_block + WHERE NOT EXISTS ( + SELECT 1 + FROM existing_logs_to_exclude e + WHERE e._log_id = l._log_id + ) + LIMIT {{ params.sql_limit }} + ) + {% endset %} + + {# Create the view #} + {% do run_query(create_view_query) %} + {{ log("Created view for month " ~ month.strftime('%Y-%m'), info=True) }} + + {% if var("STREAMLINE_INVOKE_STREAMS", false) %} + {# Check if rows exist first #} + {% set check_rows_query %} + SELECT EXISTS(SELECT 1 FROM streamline.{{view_name}} LIMIT 1) + {% endset %} + + {% set results = run_query(check_rows_query) %} + {% set has_rows = results.columns[0].values()[0] %} + + {% if has_rows %} + {# Invoke streamline since rows exist to decode #} + {% set decode_query %} + SELECT + streamline.udf_bulk_decode_logs_v2( + PARSE_JSON( + $${ "external_table": "testnet_decoded_logs", + "producer_batch_size": {{ params.producer_batch_size }}, + "sql_limit": {{ params.sql_limit }}, + "sql_source": "{{view_name}}", + "worker_batch_size": {{ params.worker_batch_size }} }$$ + ) + ); + {% endset %} + + {% do run_query(decode_query) %} + {{ log("Triggered decoding for month " ~ month.strftime('%Y-%m'), info=True) }} + + {# Call wait since we actually did some decoding #} + {% do run_query("call system$wait(" ~ wait_time ~ ")") %} + {{ log("Completed wait after decoding for month " ~ month.strftime('%Y-%m'), info=True) }} + {% else %} + {{ log("No rows to decode for month " ~ month.strftime('%Y-%m'), info=True) }} + {% endif %} + {% endif %} + + {% endfor %} + {% endif %} + +{% endmacro %} \ No newline at end of file diff --git a/macros/decoder/run_decoded_logs_history.sql b/macros/decoder/run_decoded_logs_history.sql new file mode 100644 index 0000000..bbc320b --- /dev/null +++ b/macros/decoder/run_decoded_logs_history.sql @@ -0,0 +1,29 @@ +{% macro run_decoded_logs_history() %} + +{% set check_for_new_user_abis_query %} + select 1 + from {{ ref('silver__user_verified_abis') }} + where _inserted_timestamp::date = sysdate()::date + and dayname(sysdate()) <> 'Sat' +{% endset %} + +{% set results = run_query(check_for_new_user_abis_query) %} + +{% if execute %} + {% set new_user_abis = results.columns[0].values()[0] %} + + {% if new_user_abis %} + {% set invoke_workflow_query %} + SELECT + github_actions.workflow_dispatches( + 'FlipsideCrypto', + 'berachain-models', + 'dbt_run_streamline_decoded_logs_history.yml', + NULL + ) + {% endset %} + + {% do run_query(invoke_workflow_query) %} + {% endif %} +{% endif %} +{% endmacro %} \ No newline at end of file diff --git a/models/streamline/decoder/history/streamline__decode_logs_history_0000000000_0000817537.sql b/models/streamline/decoder/history/streamline__decode_logs_history_0000000000_0000817537.sql deleted file mode 100644 index 047890b..0000000 --- a/models/streamline/decoder/history/streamline__decode_logs_history_0000000000_0000817537.sql +++ /dev/null @@ -1,21 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = [fsc_utils.if_data_call_function_v2( - func = 'streamline.udf_bulk_decode_logs_v2', - target = "{{this.schema}}.{{this.identifier}}", - params ={ "external_table" :"testnet_decoded_logs", - "sql_limit" :"7500000", - "producer_batch_size" :"400000", - "worker_batch_size" :"200000", - "sql_source" :"{{this.identifier}}" } ), - if_data_call_wait() - ], - tags = ['streamline_decoded_logs_history'] -) }} - -{% set start = this.identifier.split("_") [-2] %} -{% set stop = this.identifier.split("_") [-1] %} -{{ decode_logs_history( - start, - stop -) }} diff --git a/models/streamline/decoder/history/streamline__decode_logs_history_0000817538_0001295975.sql b/models/streamline/decoder/history/streamline__decode_logs_history_0000817538_0001295975.sql deleted file mode 100644 index 047890b..0000000 --- a/models/streamline/decoder/history/streamline__decode_logs_history_0000817538_0001295975.sql +++ /dev/null @@ -1,21 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = [fsc_utils.if_data_call_function_v2( - func = 'streamline.udf_bulk_decode_logs_v2', - target = "{{this.schema}}.{{this.identifier}}", - params ={ "external_table" :"testnet_decoded_logs", - "sql_limit" :"7500000", - "producer_batch_size" :"400000", - "worker_batch_size" :"200000", - "sql_source" :"{{this.identifier}}" } ), - if_data_call_wait() - ], - tags = ['streamline_decoded_logs_history'] -) }} - -{% set start = this.identifier.split("_") [-2] %} -{% set stop = this.identifier.split("_") [-1] %} -{{ decode_logs_history( - start, - stop -) }} diff --git a/models/streamline/decoder/history/streamline__decode_logs_history_0001295976_0001526397.sql b/models/streamline/decoder/history/streamline__decode_logs_history_0001295976_0001526397.sql deleted file mode 100644 index 047890b..0000000 --- a/models/streamline/decoder/history/streamline__decode_logs_history_0001295976_0001526397.sql +++ /dev/null @@ -1,21 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = [fsc_utils.if_data_call_function_v2( - func = 'streamline.udf_bulk_decode_logs_v2', - target = "{{this.schema}}.{{this.identifier}}", - params ={ "external_table" :"testnet_decoded_logs", - "sql_limit" :"7500000", - "producer_batch_size" :"400000", - "worker_batch_size" :"200000", - "sql_source" :"{{this.identifier}}" } ), - if_data_call_wait() - ], - tags = ['streamline_decoded_logs_history'] -) }} - -{% set start = this.identifier.split("_") [-2] %} -{% set stop = this.identifier.split("_") [-1] %} -{{ decode_logs_history( - start, - stop -) }} diff --git a/models/streamline/decoder/history/streamline__decode_logs_history_0001526398_0001712198.sql b/models/streamline/decoder/history/streamline__decode_logs_history_0001526398_0001712198.sql deleted file mode 100644 index 047890b..0000000 --- a/models/streamline/decoder/history/streamline__decode_logs_history_0001526398_0001712198.sql +++ /dev/null @@ -1,21 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = [fsc_utils.if_data_call_function_v2( - func = 'streamline.udf_bulk_decode_logs_v2', - target = "{{this.schema}}.{{this.identifier}}", - params ={ "external_table" :"testnet_decoded_logs", - "sql_limit" :"7500000", - "producer_batch_size" :"400000", - "worker_batch_size" :"200000", - "sql_source" :"{{this.identifier}}" } ), - if_data_call_wait() - ], - tags = ['streamline_decoded_logs_history'] -) }} - -{% set start = this.identifier.split("_") [-2] %} -{% set stop = this.identifier.split("_") [-1] %} -{{ decode_logs_history( - start, - stop -) }} diff --git a/models/streamline/decoder/history/streamline__decode_logs_history_0001712199_0002027723.sql b/models/streamline/decoder/history/streamline__decode_logs_history_0001712199_0002027723.sql deleted file mode 100644 index 047890b..0000000 --- a/models/streamline/decoder/history/streamline__decode_logs_history_0001712199_0002027723.sql +++ /dev/null @@ -1,21 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = [fsc_utils.if_data_call_function_v2( - func = 'streamline.udf_bulk_decode_logs_v2', - target = "{{this.schema}}.{{this.identifier}}", - params ={ "external_table" :"testnet_decoded_logs", - "sql_limit" :"7500000", - "producer_batch_size" :"400000", - "worker_batch_size" :"200000", - "sql_source" :"{{this.identifier}}" } ), - if_data_call_wait() - ], - tags = ['streamline_decoded_logs_history'] -) }} - -{% set start = this.identifier.split("_") [-2] %} -{% set stop = this.identifier.split("_") [-1] %} -{{ decode_logs_history( - start, - stop -) }} diff --git a/models/streamline/decoder/history/streamline__decode_logs_history_0002027724_0002221974.sql b/models/streamline/decoder/history/streamline__decode_logs_history_0002027724_0002221974.sql deleted file mode 100644 index 047890b..0000000 --- a/models/streamline/decoder/history/streamline__decode_logs_history_0002027724_0002221974.sql +++ /dev/null @@ -1,21 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = [fsc_utils.if_data_call_function_v2( - func = 'streamline.udf_bulk_decode_logs_v2', - target = "{{this.schema}}.{{this.identifier}}", - params ={ "external_table" :"testnet_decoded_logs", - "sql_limit" :"7500000", - "producer_batch_size" :"400000", - "worker_batch_size" :"200000", - "sql_source" :"{{this.identifier}}" } ), - if_data_call_wait() - ], - tags = ['streamline_decoded_logs_history'] -) }} - -{% set start = this.identifier.split("_") [-2] %} -{% set stop = this.identifier.split("_") [-1] %} -{{ decode_logs_history( - start, - stop -) }} diff --git a/models/streamline/decoder/history/streamline__decode_logs_history_0002221975_0003000000.sql b/models/streamline/decoder/history/streamline__decode_logs_history_0002221975_0003000000.sql deleted file mode 100644 index 047890b..0000000 --- a/models/streamline/decoder/history/streamline__decode_logs_history_0002221975_0003000000.sql +++ /dev/null @@ -1,21 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = [fsc_utils.if_data_call_function_v2( - func = 'streamline.udf_bulk_decode_logs_v2', - target = "{{this.schema}}.{{this.identifier}}", - params ={ "external_table" :"testnet_decoded_logs", - "sql_limit" :"7500000", - "producer_batch_size" :"400000", - "worker_batch_size" :"200000", - "sql_source" :"{{this.identifier}}" } ), - if_data_call_wait() - ], - tags = ['streamline_decoded_logs_history'] -) }} - -{% set start = this.identifier.split("_") [-2] %} -{% set stop = this.identifier.split("_") [-1] %} -{{ decode_logs_history( - start, - stop -) }} diff --git a/models/streamline/decoder/history/streamline__decode_logs_history_0003000001_0004000000.sql b/models/streamline/decoder/history/streamline__decode_logs_history_0003000001_0004000000.sql deleted file mode 100644 index 047890b..0000000 --- a/models/streamline/decoder/history/streamline__decode_logs_history_0003000001_0004000000.sql +++ /dev/null @@ -1,21 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = [fsc_utils.if_data_call_function_v2( - func = 'streamline.udf_bulk_decode_logs_v2', - target = "{{this.schema}}.{{this.identifier}}", - params ={ "external_table" :"testnet_decoded_logs", - "sql_limit" :"7500000", - "producer_batch_size" :"400000", - "worker_batch_size" :"200000", - "sql_source" :"{{this.identifier}}" } ), - if_data_call_wait() - ], - tags = ['streamline_decoded_logs_history'] -) }} - -{% set start = this.identifier.split("_") [-2] %} -{% set stop = this.identifier.split("_") [-1] %} -{{ decode_logs_history( - start, - stop -) }} diff --git a/models/streamline/decoder/history/streamline__decode_logs_history_0004000001_0005000000.sql b/models/streamline/decoder/history/streamline__decode_logs_history_0004000001_0005000000.sql deleted file mode 100644 index 047890b..0000000 --- a/models/streamline/decoder/history/streamline__decode_logs_history_0004000001_0005000000.sql +++ /dev/null @@ -1,21 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = [fsc_utils.if_data_call_function_v2( - func = 'streamline.udf_bulk_decode_logs_v2', - target = "{{this.schema}}.{{this.identifier}}", - params ={ "external_table" :"testnet_decoded_logs", - "sql_limit" :"7500000", - "producer_batch_size" :"400000", - "worker_batch_size" :"200000", - "sql_source" :"{{this.identifier}}" } ), - if_data_call_wait() - ], - tags = ['streamline_decoded_logs_history'] -) }} - -{% set start = this.identifier.split("_") [-2] %} -{% set stop = this.identifier.split("_") [-1] %} -{{ decode_logs_history( - start, - stop -) }} diff --git a/models/streamline/decoder/history/streamline__decode_logs_history_0005000001_0006000000.sql b/models/streamline/decoder/history/streamline__decode_logs_history_0005000001_0006000000.sql deleted file mode 100644 index 047890b..0000000 --- a/models/streamline/decoder/history/streamline__decode_logs_history_0005000001_0006000000.sql +++ /dev/null @@ -1,21 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = [fsc_utils.if_data_call_function_v2( - func = 'streamline.udf_bulk_decode_logs_v2', - target = "{{this.schema}}.{{this.identifier}}", - params ={ "external_table" :"testnet_decoded_logs", - "sql_limit" :"7500000", - "producer_batch_size" :"400000", - "worker_batch_size" :"200000", - "sql_source" :"{{this.identifier}}" } ), - if_data_call_wait() - ], - tags = ['streamline_decoded_logs_history'] -) }} - -{% set start = this.identifier.split("_") [-2] %} -{% set stop = this.identifier.split("_") [-1] %} -{{ decode_logs_history( - start, - stop -) }} diff --git a/models/streamline/decoder/history/streamline__decode_logs_history_0006000001_0007000000.sql b/models/streamline/decoder/history/streamline__decode_logs_history_0006000001_0007000000.sql deleted file mode 100644 index 047890b..0000000 --- a/models/streamline/decoder/history/streamline__decode_logs_history_0006000001_0007000000.sql +++ /dev/null @@ -1,21 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = [fsc_utils.if_data_call_function_v2( - func = 'streamline.udf_bulk_decode_logs_v2', - target = "{{this.schema}}.{{this.identifier}}", - params ={ "external_table" :"testnet_decoded_logs", - "sql_limit" :"7500000", - "producer_batch_size" :"400000", - "worker_batch_size" :"200000", - "sql_source" :"{{this.identifier}}" } ), - if_data_call_wait() - ], - tags = ['streamline_decoded_logs_history'] -) }} - -{% set start = this.identifier.split("_") [-2] %} -{% set stop = this.identifier.split("_") [-1] %} -{{ decode_logs_history( - start, - stop -) }} diff --git a/models/streamline/decoder/realtime/streamline__testnet_decode_logs_realtime.sql b/models/streamline/decoder/realtime/streamline__testnet_decode_logs_realtime.sql index 41dbc33..1f9ddcc 100644 --- a/models/streamline/decoder/realtime/streamline__testnet_decode_logs_realtime.sql +++ b/models/streamline/decoder/realtime/streamline__testnet_decode_logs_realtime.sql @@ -12,12 +12,50 @@ tags = ['streamline_decoded_logs_realtime'] ) }} -WITH look_back AS ( +WITH target_blocks AS ( - SELECT + SELECT block_number + FROM + {{ ref('testnet__fact_blocks') }} + WHERE + block_number >= ( + SELECT + block_number + FROM + {{ ref("_block_lookback") }} + ) +), +existing_logs_to_exclude AS ( + SELECT + _log_id FROM - {{ ref("_24_hour_lookback") }} + {{ ref('streamline__testnet_complete_decode_logs') }} + l + INNER JOIN target_blocks b USING (block_number) + WHERE + l._inserted_timestamp :: DATE >= DATEADD('day', -5, SYSDATE()) +), +candidate_logs AS ( + SELECT + l.block_number, + l.tx_hash, + l.event_index, + l.contract_address, + l.topics, + l.data, + CONCAT( + l.tx_hash :: STRING, + '-', + l.event_index :: STRING + ) AS _log_id + FROM + target_blocks b + INNER JOIN {{ ref('testnet__fact_event_logs') }} + l USING (block_number) + WHERE + l.tx_succeeded + AND l.inserted_timestamp :: DATE >= DATEADD('day', -5, SYSDATE()) ) SELECT l.block_number, @@ -32,35 +70,19 @@ SELECT l.contract_address ) AS DATA FROM - {{ ref("silver_testnet__logs") }} - l - INNER JOIN {{ ref("silver_testnet__complete_event_abis") }} A + candidate_logs l + INNER JOIN {{ ref('silver_testnet__complete_event_abis') }} A ON A.parent_contract_address = l.contract_address AND A.event_signature = l.topics [0] :: STRING AND l.block_number BETWEEN A.start_block AND A.end_block WHERE - ( - l.block_number >= ( - SELECT - block_number - FROM - look_back - ) - ) - AND l.block_number IS NOT NULL - AND l.block_timestamp >= DATEADD('day', -2, CURRENT_DATE()) - AND _log_id NOT IN ( + NOT EXISTS ( SELECT - _log_id + 1 FROM - {{ ref("streamline__testnet_complete_decode_logs") }} + existing_logs_to_exclude e WHERE - block_number >= ( - SELECT - block_number - FROM - look_back - ) - AND _inserted_timestamp >= DATEADD('day', -2, CURRENT_DATE()) - ) \ No newline at end of file + e._log_id = l._log_id + ) +limit 7500000 \ No newline at end of file