diff --git a/.github/workflows/dbt_run_dev_refresh.yml b/.github/workflows/dbt_run_dev_refresh.yml index 9de3592..041398a 100644 --- a/.github/workflows/dbt_run_dev_refresh.yml +++ b/.github/workflows/dbt_run_dev_refresh.yml @@ -4,8 +4,8 @@ run-name: dbt_run_dev_refresh on: workflow_dispatch: schedule: - # Runs "at 9:00 UTC" (see https://crontab.guru) - - cron: '0 9 * * *' + # Runs "at 2:10 UTC on Monday" (see https://crontab.guru) + - cron: '10 2 * * 1' env: DBT_PROFILES_DIR: ./ diff --git a/.github/workflows/dbt_run_streamline_decoder_history.yml b/.github/workflows/dbt_run_scheduled_decoded_logs_history_user_abis.yml similarity index 63% rename from .github/workflows/dbt_run_streamline_decoder_history.yml rename to .github/workflows/dbt_run_scheduled_decoded_logs_history_user_abis.yml index 0d76e4b..63bf5a2 100644 --- a/.github/workflows/dbt_run_streamline_decoder_history.yml +++ b/.github/workflows/dbt_run_scheduled_decoded_logs_history_user_abis.yml @@ -1,12 +1,11 @@ -name: dbt_run_streamline_decoder_history -run-name: dbt_run_streamline_decoder_history +name: dbt_run_scheduled_decoded_logs_history_user_abis +run-name: dbt_run_scheduled_decoded_logs_history_user_abis on: workflow_dispatch: - schedule: - # Runs "at 1:00 UTC AM" (see https://crontab.guru) - - cron: '0 1 * * *' - + branches: + - "main" + env: DBT_PROFILES_DIR: ./ @@ -22,12 +21,10 @@ env: concurrency: group: ${{ github.workflow }} - - jobs: run_dbt_jobs: runs-on: ubuntu-latest - environment: + environment: name: workflow_prod steps: @@ -42,6 +39,7 @@ jobs: run: | pip install -r requirements.txt dbt deps - - name: Run DBT Jobs + + - name: Kick off decoded logs history, if there are new ABIs from users run: | - dbt run --threads 8 --vars '{"STREAMLINE_INVOKE_STREAMS":True,"WAIT":120}' -m "base_models,tag:streamline_decoded_logs_complete" "base_models,tag:streamline_decoded_logs_history" \ No newline at end of file + dbt run-operation run_decoded_logs_history \ No newline at end of file diff --git a/.github/workflows/dbt_run_streamline_decoded_logs_history.yml b/.github/workflows/dbt_run_streamline_decoded_logs_history.yml new file mode 100644 index 0000000..34ff672 --- /dev/null +++ b/.github/workflows/dbt_run_streamline_decoded_logs_history.yml @@ -0,0 +1,49 @@ +name: dbt_run_streamline_decoded_logs_history +run-name: dbt_run_streamline_decoded_logs_history + +on: + workflow_dispatch: + branches: + - "main" + +env: + DBT_PROFILES_DIR: ./ + + ACCOUNT: "${{ vars.ACCOUNT }}" + ROLE: "${{ vars.ROLE }}" + USER: "${{ vars.USER }}" + PASSWORD: "${{ secrets.PASSWORD }}" + REGION: "${{ vars.REGION }}" + DATABASE: "${{ vars.DATABASE }}" + WAREHOUSE: "${{ vars.WAREHOUSE }}" + SCHEMA: "${{ vars.SCHEMA }}" + +concurrency: + group: ${{ github.workflow }} + +jobs: + run_dbt_jobs: + runs-on: ubuntu-latest + environment: + name: workflow_prod + + steps: + - uses: actions/checkout@v3 + + - uses: actions/setup-python@v4 + with: + python-version: "3.10" + cache: "pip" + + - name: install dependencies + run: | + pip install -r requirements.txt + dbt deps + + - name: Update complete table + run: | + dbt run -m "base_models,tag:streamline_decoded_logs_complete" + + - name: Decode historical logs + run: | + dbt run-operation decoded_logs_history --vars '{"STREAMLINE_INVOKE_STREAMS":True}' \ No newline at end of file diff --git a/.gitignore b/.gitignore index d99e9bd..2e8fe84 100644 --- a/.gitignore +++ b/.gitignore @@ -15,4 +15,6 @@ logs/ **/.DS_Store .vscode/ .env -dbt-env/ \ No newline at end of file +dbt-env/ + +package-lock.yml \ No newline at end of file diff --git a/data/github_actions__workflows.csv b/data/github_actions__workflows.csv index b17ab47..e565102 100644 --- a/data/github_actions__workflows.csv +++ b/data/github_actions__workflows.csv @@ -3,4 +3,6 @@ dbt_run_scheduled_non_realtime,"22,52 * * * *" dbt_run_streamline_chainhead,"15,45 * * * *" dbt_run_streamline_decoder,"0,30 * * * *" dbt_run_scheduled_curated,"40 * * * *" -dbt_test_tasks,"15 * * * *" \ No newline at end of file +dbt_test_tasks,"15 * * * *" +dbt_run_streamline_decoded_logs_history,"45 7 * * 6" +dbt_run_scheduled_decoded_logs_history_user_abis,"39 23 * * *" \ No newline at end of file diff --git a/macros/decoder/decoded_logs_history.sql b/macros/decoder/decoded_logs_history.sql new file mode 100644 index 0000000..91b064c --- /dev/null +++ b/macros/decoder/decoded_logs_history.sql @@ -0,0 +1,126 @@ +{% macro decoded_logs_history(backfill_mode=false) %} + + {%- set params = { + "sql_limit": var("DECODED_LOGS_HISTORY_SQL_LIMIT", 7500000), + "producer_batch_size": var("DECODED_LOGS_HISTORY_PRODUCER_BATCH_SIZE", 400000), + "worker_batch_size": var("DECODED_LOGS_HISTORY_WORKER_BATCH_SIZE", 100000), + "producer_limit_size": var("DECODED_LOGS_HISTORY_WORKER_BATCH_SIZE", 100000) + } -%} + + {% set wait_time = var("DECODED_LOGS_HISTORY_WAIT_TIME", 60) %} + + {% set find_months_query %} + SELECT + DISTINCT date_trunc('month', block_timestamp)::date as month + FROM {{ ref('core__fact_blocks') }} + ORDER BY month ASC + {% endset %} + + {% set results = run_query(find_months_query) %} + + {% if execute %} + {% set months = results.columns[0].values() %} + + {% for month in months %} + {% set view_name = 'decoded_logs_history_' ~ month.strftime('%Y_%m') %} + + {% set create_view_query %} + create or replace view streamline.{{view_name}} as ( + WITH target_blocks AS ( + SELECT + block_number + FROM {{ ref('core__fact_blocks') }} + WHERE date_trunc('month', block_timestamp) = '{{month}}'::timestamp + ), + new_abis AS ( + SELECT + abi, + parent_contract_address, + event_signature, + start_block, + end_block + FROM {{ ref('silver__complete_event_abis') }} + {% if not backfill_mode %} + WHERE inserted_timestamp > dateadd('day', -30, sysdate()) + {% endif %} + ), + existing_logs_to_exclude AS ( + SELECT _log_id + FROM {{ ref('streamline__complete_decode_logs') }} l + INNER JOIN target_blocks b using (block_number) + ), + candidate_logs AS ( + SELECT + l.block_number, + l.tx_hash, + l.event_index, + l.contract_address, + l.topics, + l.data, + concat(l.tx_hash::string, '-', l.event_index::string) as _log_id + FROM target_blocks b + INNER JOIN {{ ref('core__fact_event_logs') }} l using (block_number) + WHERE l.tx_status = 'SUCCESS' and date_trunc('month', l.block_timestamp) = '{{month}}'::timestamp + ) + SELECT + l.block_number, + l._log_id, + A.abi, + OBJECT_CONSTRUCT( + 'topics', l.topics, + 'data', l.data, + 'address', l.contract_address + ) AS data + FROM candidate_logs l + INNER JOIN new_abis A + ON A.parent_contract_address = l.contract_address + AND A.event_signature = l.topics[0]::STRING + AND l.block_number BETWEEN A.start_block AND A.end_block + WHERE NOT EXISTS ( + SELECT 1 + FROM existing_logs_to_exclude e + WHERE e._log_id = l._log_id + ) + LIMIT {{ params.sql_limit }} + ) + {% endset %} + + {# Create the view #} + {% do run_query(create_view_query) %} + {{ log("Created view for month " ~ month.strftime('%Y-%m'), info=True) }} + + {% if var("STREAMLINE_INVOKE_STREAMS", false) %} + {# Check if rows exist first #} + {% set check_rows_query %} + SELECT EXISTS(SELECT 1 FROM streamline.{{view_name}} LIMIT 1) + {% endset %} + + {% set results = run_query(check_rows_query) %} + {% set has_rows = results.columns[0].values()[0] %} + + {% if has_rows %} + {# Invoke streamline since rows exist to decode #} + {% set decode_query %} + SELECT streamline.udf_bulk_decode_logs( + object_construct( + 'sql_source', '{{view_name}}', + 'producer_batch_size', {{ params.producer_batch_size }}, + 'producer_limit_size', {{ params.producer_limit_size }}) + ); + {% endset %} + + {% do run_query(decode_query) %} + {{ log("Triggered decoding for month " ~ month.strftime('%Y-%m'), info=True) }} + + {# Call wait since we actually did some decoding #} + {% do run_query("call system$wait(" ~ wait_time ~ ")") %} + {{ log("Completed wait after decoding for month " ~ month.strftime('%Y-%m'), info=True) }} + {% else %} + {{ log("No rows to decode for month " ~ month.strftime('%Y-%m'), info=True) }} + {% endif %} + {% endif %} + + {% endfor %} + {% endif %} + +{% endmacro %} \ No newline at end of file diff --git a/macros/decoder/run_decoded_logs_history.sql b/macros/decoder/run_decoded_logs_history.sql new file mode 100644 index 0000000..dd388a4 --- /dev/null +++ b/macros/decoder/run_decoded_logs_history.sql @@ -0,0 +1,29 @@ +{% macro run_decoded_logs_history() %} + +{% set check_for_new_user_abis_query %} + select 1 + from {{ ref('silver__user_verified_abis') }} + where _inserted_timestamp::date = sysdate()::date + and dayname(sysdate()) <> 'Sat' +{% endset %} + +{% set results = run_query(check_for_new_user_abis_query) %} + +{% if execute %} + {% set new_user_abis = results.columns[0].values()[0] %} + + {% if new_user_abis %} + {% set invoke_workflow_query %} + SELECT + github_actions.workflow_dispatches( + 'FlipsideCrypto', + 'base-models', + 'dbt_run_streamline_decoded_logs_history.yml', + NULL + ) + {% endset %} + + {% do run_query(invoke_workflow_query) %} + {% endif %} +{% endif %} +{% endmacro %} \ No newline at end of file diff --git a/models/github_actions/github_actions__current_task_status.yml b/models/github_actions/github_actions__current_task_status.yml index 7923144..7f9db96 100644 --- a/models/github_actions/github_actions__current_task_status.yml +++ b/models/github_actions/github_actions__current_task_status.yml @@ -7,6 +7,8 @@ models: - dbt_expectations.expect_column_values_to_be_in_set: value_set: - TRUE + config: + severity: warn - name: SUCCESSES tests: - dbt_expectations.expect_column_values_to_be_in_set: diff --git a/models/streamline/silver/decoder/history/streamline__decode_logs_history_000000001_000500001.sql b/models/streamline/silver/decoder/history/streamline__decode_logs_history_000000001_000500001.sql deleted file mode 100644 index cb4ad80..0000000 --- a/models/streamline/silver/decoder/history/streamline__decode_logs_history_000000001_000500001.sql +++ /dev/null @@ -1,12 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = [if_data_call_function( func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}','producer_batch_size', 20000000,'producer_limit_size', {{var('row_limit',7500000)}}))", target = "{{model.schema}}.{{model.alias}}" ) ,if_data_call_wait()], - tags = ['streamline_decoded_logs_history'] -) }} - -{% set start = this.identifier.split("_") [-2] %} -{% set stop = this.identifier.split("_") [-1] %} -{{ fsc_utils.decode_logs_history( - start, - stop -) }} diff --git a/models/streamline/silver/decoder/history/streamline__decode_logs_history_000500002_001000002.sql b/models/streamline/silver/decoder/history/streamline__decode_logs_history_000500002_001000002.sql deleted file mode 100644 index cb4ad80..0000000 --- a/models/streamline/silver/decoder/history/streamline__decode_logs_history_000500002_001000002.sql +++ /dev/null @@ -1,12 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = [if_data_call_function( func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}','producer_batch_size', 20000000,'producer_limit_size', {{var('row_limit',7500000)}}))", target = "{{model.schema}}.{{model.alias}}" ) ,if_data_call_wait()], - tags = ['streamline_decoded_logs_history'] -) }} - -{% set start = this.identifier.split("_") [-2] %} -{% set stop = this.identifier.split("_") [-1] %} -{{ fsc_utils.decode_logs_history( - start, - stop -) }} diff --git a/models/streamline/silver/decoder/history/streamline__decode_logs_history_001000003_001500003.sql b/models/streamline/silver/decoder/history/streamline__decode_logs_history_001000003_001500003.sql deleted file mode 100644 index cb4ad80..0000000 --- a/models/streamline/silver/decoder/history/streamline__decode_logs_history_001000003_001500003.sql +++ /dev/null @@ -1,12 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = [if_data_call_function( func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}','producer_batch_size', 20000000,'producer_limit_size', {{var('row_limit',7500000)}}))", target = "{{model.schema}}.{{model.alias}}" ) ,if_data_call_wait()], - tags = ['streamline_decoded_logs_history'] -) }} - -{% set start = this.identifier.split("_") [-2] %} -{% set stop = this.identifier.split("_") [-1] %} -{{ fsc_utils.decode_logs_history( - start, - stop -) }} diff --git a/models/streamline/silver/decoder/history/streamline__decode_logs_history_001500004_002000004.sql b/models/streamline/silver/decoder/history/streamline__decode_logs_history_001500004_002000004.sql deleted file mode 100644 index cb4ad80..0000000 --- a/models/streamline/silver/decoder/history/streamline__decode_logs_history_001500004_002000004.sql +++ /dev/null @@ -1,12 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = [if_data_call_function( func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}','producer_batch_size', 20000000,'producer_limit_size', {{var('row_limit',7500000)}}))", target = "{{model.schema}}.{{model.alias}}" ) ,if_data_call_wait()], - tags = ['streamline_decoded_logs_history'] -) }} - -{% set start = this.identifier.split("_") [-2] %} -{% set stop = this.identifier.split("_") [-1] %} -{{ fsc_utils.decode_logs_history( - start, - stop -) }} diff --git a/models/streamline/silver/decoder/history/streamline__decode_logs_history_002000005_002500005.sql b/models/streamline/silver/decoder/history/streamline__decode_logs_history_002000005_002500005.sql deleted file mode 100644 index cb4ad80..0000000 --- a/models/streamline/silver/decoder/history/streamline__decode_logs_history_002000005_002500005.sql +++ /dev/null @@ -1,12 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = [if_data_call_function( func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}','producer_batch_size', 20000000,'producer_limit_size', {{var('row_limit',7500000)}}))", target = "{{model.schema}}.{{model.alias}}" ) ,if_data_call_wait()], - tags = ['streamline_decoded_logs_history'] -) }} - -{% set start = this.identifier.split("_") [-2] %} -{% set stop = this.identifier.split("_") [-1] %} -{{ fsc_utils.decode_logs_history( - start, - stop -) }} diff --git a/models/streamline/silver/decoder/history/streamline__decode_logs_history_002500006_003000006.sql b/models/streamline/silver/decoder/history/streamline__decode_logs_history_002500006_003000006.sql deleted file mode 100644 index cb4ad80..0000000 --- a/models/streamline/silver/decoder/history/streamline__decode_logs_history_002500006_003000006.sql +++ /dev/null @@ -1,12 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = [if_data_call_function( func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}','producer_batch_size', 20000000,'producer_limit_size', {{var('row_limit',7500000)}}))", target = "{{model.schema}}.{{model.alias}}" ) ,if_data_call_wait()], - tags = ['streamline_decoded_logs_history'] -) }} - -{% set start = this.identifier.split("_") [-2] %} -{% set stop = this.identifier.split("_") [-1] %} -{{ fsc_utils.decode_logs_history( - start, - stop -) }} diff --git a/models/streamline/silver/decoder/history/streamline__decode_logs_history_003000007_003500007.sql b/models/streamline/silver/decoder/history/streamline__decode_logs_history_003000007_003500007.sql deleted file mode 100644 index cb4ad80..0000000 --- a/models/streamline/silver/decoder/history/streamline__decode_logs_history_003000007_003500007.sql +++ /dev/null @@ -1,12 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = [if_data_call_function( func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}','producer_batch_size', 20000000,'producer_limit_size', {{var('row_limit',7500000)}}))", target = "{{model.schema}}.{{model.alias}}" ) ,if_data_call_wait()], - tags = ['streamline_decoded_logs_history'] -) }} - -{% set start = this.identifier.split("_") [-2] %} -{% set stop = this.identifier.split("_") [-1] %} -{{ fsc_utils.decode_logs_history( - start, - stop -) }} diff --git a/models/streamline/silver/decoder/history/streamline__decode_logs_history_003500008_004000008.sql b/models/streamline/silver/decoder/history/streamline__decode_logs_history_003500008_004000008.sql deleted file mode 100644 index cb4ad80..0000000 --- a/models/streamline/silver/decoder/history/streamline__decode_logs_history_003500008_004000008.sql +++ /dev/null @@ -1,12 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = [if_data_call_function( func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}','producer_batch_size', 20000000,'producer_limit_size', {{var('row_limit',7500000)}}))", target = "{{model.schema}}.{{model.alias}}" ) ,if_data_call_wait()], - tags = ['streamline_decoded_logs_history'] -) }} - -{% set start = this.identifier.split("_") [-2] %} -{% set stop = this.identifier.split("_") [-1] %} -{{ fsc_utils.decode_logs_history( - start, - stop -) }} diff --git a/models/streamline/silver/decoder/history/streamline__decode_logs_history_004000009_004500009.sql b/models/streamline/silver/decoder/history/streamline__decode_logs_history_004000009_004500009.sql deleted file mode 100644 index cb4ad80..0000000 --- a/models/streamline/silver/decoder/history/streamline__decode_logs_history_004000009_004500009.sql +++ /dev/null @@ -1,12 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = [if_data_call_function( func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}','producer_batch_size', 20000000,'producer_limit_size', {{var('row_limit',7500000)}}))", target = "{{model.schema}}.{{model.alias}}" ) ,if_data_call_wait()], - tags = ['streamline_decoded_logs_history'] -) }} - -{% set start = this.identifier.split("_") [-2] %} -{% set stop = this.identifier.split("_") [-1] %} -{{ fsc_utils.decode_logs_history( - start, - stop -) }} diff --git a/models/streamline/silver/decoder/history/streamline__decode_logs_history_004500010_005000010.sql b/models/streamline/silver/decoder/history/streamline__decode_logs_history_004500010_005000010.sql deleted file mode 100644 index cb4ad80..0000000 --- a/models/streamline/silver/decoder/history/streamline__decode_logs_history_004500010_005000010.sql +++ /dev/null @@ -1,12 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = [if_data_call_function( func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}','producer_batch_size', 20000000,'producer_limit_size', {{var('row_limit',7500000)}}))", target = "{{model.schema}}.{{model.alias}}" ) ,if_data_call_wait()], - tags = ['streamline_decoded_logs_history'] -) }} - -{% set start = this.identifier.split("_") [-2] %} -{% set stop = this.identifier.split("_") [-1] %} -{{ fsc_utils.decode_logs_history( - start, - stop -) }} diff --git a/models/streamline/silver/decoder/history/streamline__decode_logs_history_005000011_005500011.sql b/models/streamline/silver/decoder/history/streamline__decode_logs_history_005000011_005500011.sql deleted file mode 100644 index cb4ad80..0000000 --- a/models/streamline/silver/decoder/history/streamline__decode_logs_history_005000011_005500011.sql +++ /dev/null @@ -1,12 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = [if_data_call_function( func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}','producer_batch_size', 20000000,'producer_limit_size', {{var('row_limit',7500000)}}))", target = "{{model.schema}}.{{model.alias}}" ) ,if_data_call_wait()], - tags = ['streamline_decoded_logs_history'] -) }} - -{% set start = this.identifier.split("_") [-2] %} -{% set stop = this.identifier.split("_") [-1] %} -{{ fsc_utils.decode_logs_history( - start, - stop -) }} diff --git a/models/streamline/silver/decoder/history/streamline__decode_logs_history_005500012_006000012.sql b/models/streamline/silver/decoder/history/streamline__decode_logs_history_005500012_006000012.sql deleted file mode 100644 index cb4ad80..0000000 --- a/models/streamline/silver/decoder/history/streamline__decode_logs_history_005500012_006000012.sql +++ /dev/null @@ -1,12 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = [if_data_call_function( func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}','producer_batch_size', 20000000,'producer_limit_size', {{var('row_limit',7500000)}}))", target = "{{model.schema}}.{{model.alias}}" ) ,if_data_call_wait()], - tags = ['streamline_decoded_logs_history'] -) }} - -{% set start = this.identifier.split("_") [-2] %} -{% set stop = this.identifier.split("_") [-1] %} -{{ fsc_utils.decode_logs_history( - start, - stop -) }} diff --git a/models/streamline/silver/decoder/history/streamline__decode_logs_history_006000013_006500013.sql b/models/streamline/silver/decoder/history/streamline__decode_logs_history_006000013_006500013.sql deleted file mode 100644 index cb4ad80..0000000 --- a/models/streamline/silver/decoder/history/streamline__decode_logs_history_006000013_006500013.sql +++ /dev/null @@ -1,12 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = [if_data_call_function( func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}','producer_batch_size', 20000000,'producer_limit_size', {{var('row_limit',7500000)}}))", target = "{{model.schema}}.{{model.alias}}" ) ,if_data_call_wait()], - tags = ['streamline_decoded_logs_history'] -) }} - -{% set start = this.identifier.split("_") [-2] %} -{% set stop = this.identifier.split("_") [-1] %} -{{ fsc_utils.decode_logs_history( - start, - stop -) }} diff --git a/models/streamline/silver/decoder/history/streamline__decode_logs_history_006500014_007000014.sql b/models/streamline/silver/decoder/history/streamline__decode_logs_history_006500014_007000014.sql deleted file mode 100644 index cb4ad80..0000000 --- a/models/streamline/silver/decoder/history/streamline__decode_logs_history_006500014_007000014.sql +++ /dev/null @@ -1,12 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = [if_data_call_function( func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}','producer_batch_size', 20000000,'producer_limit_size', {{var('row_limit',7500000)}}))", target = "{{model.schema}}.{{model.alias}}" ) ,if_data_call_wait()], - tags = ['streamline_decoded_logs_history'] -) }} - -{% set start = this.identifier.split("_") [-2] %} -{% set stop = this.identifier.split("_") [-1] %} -{{ fsc_utils.decode_logs_history( - start, - stop -) }} diff --git a/models/streamline/silver/decoder/history/streamline__decode_logs_history_007000015_007500015.sql b/models/streamline/silver/decoder/history/streamline__decode_logs_history_007000015_007500015.sql deleted file mode 100644 index cb4ad80..0000000 --- a/models/streamline/silver/decoder/history/streamline__decode_logs_history_007000015_007500015.sql +++ /dev/null @@ -1,12 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = [if_data_call_function( func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}','producer_batch_size', 20000000,'producer_limit_size', {{var('row_limit',7500000)}}))", target = "{{model.schema}}.{{model.alias}}" ) ,if_data_call_wait()], - tags = ['streamline_decoded_logs_history'] -) }} - -{% set start = this.identifier.split("_") [-2] %} -{% set stop = this.identifier.split("_") [-1] %} -{{ fsc_utils.decode_logs_history( - start, - stop -) }} diff --git a/models/streamline/silver/decoder/history/streamline__decode_logs_history_007500016_008000016.sql b/models/streamline/silver/decoder/history/streamline__decode_logs_history_007500016_008000016.sql deleted file mode 100644 index cb4ad80..0000000 --- a/models/streamline/silver/decoder/history/streamline__decode_logs_history_007500016_008000016.sql +++ /dev/null @@ -1,12 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = [if_data_call_function( func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}','producer_batch_size', 20000000,'producer_limit_size', {{var('row_limit',7500000)}}))", target = "{{model.schema}}.{{model.alias}}" ) ,if_data_call_wait()], - tags = ['streamline_decoded_logs_history'] -) }} - -{% set start = this.identifier.split("_") [-2] %} -{% set stop = this.identifier.split("_") [-1] %} -{{ fsc_utils.decode_logs_history( - start, - stop -) }} diff --git a/models/streamline/silver/decoder/history/streamline__decode_logs_history_008000017_008500017.sql b/models/streamline/silver/decoder/history/streamline__decode_logs_history_008000017_008500017.sql deleted file mode 100644 index cb4ad80..0000000 --- a/models/streamline/silver/decoder/history/streamline__decode_logs_history_008000017_008500017.sql +++ /dev/null @@ -1,12 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = [if_data_call_function( func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}','producer_batch_size', 20000000,'producer_limit_size', {{var('row_limit',7500000)}}))", target = "{{model.schema}}.{{model.alias}}" ) ,if_data_call_wait()], - tags = ['streamline_decoded_logs_history'] -) }} - -{% set start = this.identifier.split("_") [-2] %} -{% set stop = this.identifier.split("_") [-1] %} -{{ fsc_utils.decode_logs_history( - start, - stop -) }} diff --git a/models/streamline/silver/decoder/history/streamline__decode_logs_history_008500018_009000018.sql b/models/streamline/silver/decoder/history/streamline__decode_logs_history_008500018_009000018.sql deleted file mode 100644 index cb4ad80..0000000 --- a/models/streamline/silver/decoder/history/streamline__decode_logs_history_008500018_009000018.sql +++ /dev/null @@ -1,12 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = [if_data_call_function( func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}','producer_batch_size', 20000000,'producer_limit_size', {{var('row_limit',7500000)}}))", target = "{{model.schema}}.{{model.alias}}" ) ,if_data_call_wait()], - tags = ['streamline_decoded_logs_history'] -) }} - -{% set start = this.identifier.split("_") [-2] %} -{% set stop = this.identifier.split("_") [-1] %} -{{ fsc_utils.decode_logs_history( - start, - stop -) }} diff --git a/models/streamline/silver/decoder/history/streamline__decode_logs_history_009000019_009500019.sql b/models/streamline/silver/decoder/history/streamline__decode_logs_history_009000019_009500019.sql deleted file mode 100644 index cb4ad80..0000000 --- a/models/streamline/silver/decoder/history/streamline__decode_logs_history_009000019_009500019.sql +++ /dev/null @@ -1,12 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = [if_data_call_function( func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}','producer_batch_size', 20000000,'producer_limit_size', {{var('row_limit',7500000)}}))", target = "{{model.schema}}.{{model.alias}}" ) ,if_data_call_wait()], - tags = ['streamline_decoded_logs_history'] -) }} - -{% set start = this.identifier.split("_") [-2] %} -{% set stop = this.identifier.split("_") [-1] %} -{{ fsc_utils.decode_logs_history( - start, - stop -) }} diff --git a/models/streamline/silver/decoder/history/streamline__decode_logs_history_009500020_010000020.sql b/models/streamline/silver/decoder/history/streamline__decode_logs_history_009500020_010000020.sql deleted file mode 100644 index cb4ad80..0000000 --- a/models/streamline/silver/decoder/history/streamline__decode_logs_history_009500020_010000020.sql +++ /dev/null @@ -1,12 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = [if_data_call_function( func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}','producer_batch_size', 20000000,'producer_limit_size', {{var('row_limit',7500000)}}))", target = "{{model.schema}}.{{model.alias}}" ) ,if_data_call_wait()], - tags = ['streamline_decoded_logs_history'] -) }} - -{% set start = this.identifier.split("_") [-2] %} -{% set stop = this.identifier.split("_") [-1] %} -{{ fsc_utils.decode_logs_history( - start, - stop -) }} diff --git a/models/streamline/silver/decoder/history/streamline__decode_logs_history_010000021_010500021.sql b/models/streamline/silver/decoder/history/streamline__decode_logs_history_010000021_010500021.sql deleted file mode 100644 index cb4ad80..0000000 --- a/models/streamline/silver/decoder/history/streamline__decode_logs_history_010000021_010500021.sql +++ /dev/null @@ -1,12 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = [if_data_call_function( func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}','producer_batch_size', 20000000,'producer_limit_size', {{var('row_limit',7500000)}}))", target = "{{model.schema}}.{{model.alias}}" ) ,if_data_call_wait()], - tags = ['streamline_decoded_logs_history'] -) }} - -{% set start = this.identifier.split("_") [-2] %} -{% set stop = this.identifier.split("_") [-1] %} -{{ fsc_utils.decode_logs_history( - start, - stop -) }} diff --git a/models/streamline/silver/decoder/history/streamline__decode_logs_history_010500022_011000022.sql b/models/streamline/silver/decoder/history/streamline__decode_logs_history_010500022_011000022.sql deleted file mode 100644 index cb4ad80..0000000 --- a/models/streamline/silver/decoder/history/streamline__decode_logs_history_010500022_011000022.sql +++ /dev/null @@ -1,12 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = [if_data_call_function( func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}','producer_batch_size', 20000000,'producer_limit_size', {{var('row_limit',7500000)}}))", target = "{{model.schema}}.{{model.alias}}" ) ,if_data_call_wait()], - tags = ['streamline_decoded_logs_history'] -) }} - -{% set start = this.identifier.split("_") [-2] %} -{% set stop = this.identifier.split("_") [-1] %} -{{ fsc_utils.decode_logs_history( - start, - stop -) }} diff --git a/models/streamline/silver/decoder/history/streamline__decode_logs_history_011000023_011500023.sql b/models/streamline/silver/decoder/history/streamline__decode_logs_history_011000023_011500023.sql deleted file mode 100644 index cb4ad80..0000000 --- a/models/streamline/silver/decoder/history/streamline__decode_logs_history_011000023_011500023.sql +++ /dev/null @@ -1,12 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = [if_data_call_function( func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}','producer_batch_size', 20000000,'producer_limit_size', {{var('row_limit',7500000)}}))", target = "{{model.schema}}.{{model.alias}}" ) ,if_data_call_wait()], - tags = ['streamline_decoded_logs_history'] -) }} - -{% set start = this.identifier.split("_") [-2] %} -{% set stop = this.identifier.split("_") [-1] %} -{{ fsc_utils.decode_logs_history( - start, - stop -) }} diff --git a/models/streamline/silver/decoder/history/streamline__decode_logs_history_011500024_012000024.sql b/models/streamline/silver/decoder/history/streamline__decode_logs_history_011500024_012000024.sql deleted file mode 100644 index cb4ad80..0000000 --- a/models/streamline/silver/decoder/history/streamline__decode_logs_history_011500024_012000024.sql +++ /dev/null @@ -1,12 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = [if_data_call_function( func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}','producer_batch_size', 20000000,'producer_limit_size', {{var('row_limit',7500000)}}))", target = "{{model.schema}}.{{model.alias}}" ) ,if_data_call_wait()], - tags = ['streamline_decoded_logs_history'] -) }} - -{% set start = this.identifier.split("_") [-2] %} -{% set stop = this.identifier.split("_") [-1] %} -{{ fsc_utils.decode_logs_history( - start, - stop -) }} diff --git a/models/streamline/silver/decoder/history/streamline__decode_logs_history_012000025_012500025.sql b/models/streamline/silver/decoder/history/streamline__decode_logs_history_012000025_012500025.sql deleted file mode 100644 index cb4ad80..0000000 --- a/models/streamline/silver/decoder/history/streamline__decode_logs_history_012000025_012500025.sql +++ /dev/null @@ -1,12 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = [if_data_call_function( func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}','producer_batch_size', 20000000,'producer_limit_size', {{var('row_limit',7500000)}}))", target = "{{model.schema}}.{{model.alias}}" ) ,if_data_call_wait()], - tags = ['streamline_decoded_logs_history'] -) }} - -{% set start = this.identifier.split("_") [-2] %} -{% set stop = this.identifier.split("_") [-1] %} -{{ fsc_utils.decode_logs_history( - start, - stop -) }} diff --git a/models/streamline/silver/decoder/history/streamline__decode_logs_history_012500026_013000026.sql b/models/streamline/silver/decoder/history/streamline__decode_logs_history_012500026_013000026.sql deleted file mode 100644 index cb4ad80..0000000 --- a/models/streamline/silver/decoder/history/streamline__decode_logs_history_012500026_013000026.sql +++ /dev/null @@ -1,12 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = [if_data_call_function( func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}','producer_batch_size', 20000000,'producer_limit_size', {{var('row_limit',7500000)}}))", target = "{{model.schema}}.{{model.alias}}" ) ,if_data_call_wait()], - tags = ['streamline_decoded_logs_history'] -) }} - -{% set start = this.identifier.split("_") [-2] %} -{% set stop = this.identifier.split("_") [-1] %} -{{ fsc_utils.decode_logs_history( - start, - stop -) }} diff --git a/models/streamline/silver/decoder/history/streamline__decode_logs_history_013000027_013500027.sql b/models/streamline/silver/decoder/history/streamline__decode_logs_history_013000027_013500027.sql deleted file mode 100644 index cb4ad80..0000000 --- a/models/streamline/silver/decoder/history/streamline__decode_logs_history_013000027_013500027.sql +++ /dev/null @@ -1,12 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = [if_data_call_function( func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}','producer_batch_size', 20000000,'producer_limit_size', {{var('row_limit',7500000)}}))", target = "{{model.schema}}.{{model.alias}}" ) ,if_data_call_wait()], - tags = ['streamline_decoded_logs_history'] -) }} - -{% set start = this.identifier.split("_") [-2] %} -{% set stop = this.identifier.split("_") [-1] %} -{{ fsc_utils.decode_logs_history( - start, - stop -) }} diff --git a/models/streamline/silver/decoder/history/streamline__decode_logs_history_013500028_014000028.sql b/models/streamline/silver/decoder/history/streamline__decode_logs_history_013500028_014000028.sql deleted file mode 100644 index cb4ad80..0000000 --- a/models/streamline/silver/decoder/history/streamline__decode_logs_history_013500028_014000028.sql +++ /dev/null @@ -1,12 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = [if_data_call_function( func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}','producer_batch_size', 20000000,'producer_limit_size', {{var('row_limit',7500000)}}))", target = "{{model.schema}}.{{model.alias}}" ) ,if_data_call_wait()], - tags = ['streamline_decoded_logs_history'] -) }} - -{% set start = this.identifier.split("_") [-2] %} -{% set stop = this.identifier.split("_") [-1] %} -{{ fsc_utils.decode_logs_history( - start, - stop -) }} diff --git a/models/streamline/silver/decoder/history/streamline__decode_logs_history_014000029_014500029.sql b/models/streamline/silver/decoder/history/streamline__decode_logs_history_014000029_014500029.sql deleted file mode 100644 index cb4ad80..0000000 --- a/models/streamline/silver/decoder/history/streamline__decode_logs_history_014000029_014500029.sql +++ /dev/null @@ -1,12 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = [if_data_call_function( func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}','producer_batch_size', 20000000,'producer_limit_size', {{var('row_limit',7500000)}}))", target = "{{model.schema}}.{{model.alias}}" ) ,if_data_call_wait()], - tags = ['streamline_decoded_logs_history'] -) }} - -{% set start = this.identifier.split("_") [-2] %} -{% set stop = this.identifier.split("_") [-1] %} -{{ fsc_utils.decode_logs_history( - start, - stop -) }} diff --git a/models/streamline/silver/decoder/history/streamline__decode_logs_history_014500030_015000030.sql b/models/streamline/silver/decoder/history/streamline__decode_logs_history_014500030_015000030.sql deleted file mode 100644 index cb4ad80..0000000 --- a/models/streamline/silver/decoder/history/streamline__decode_logs_history_014500030_015000030.sql +++ /dev/null @@ -1,12 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = [if_data_call_function( func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}','producer_batch_size', 20000000,'producer_limit_size', {{var('row_limit',7500000)}}))", target = "{{model.schema}}.{{model.alias}}" ) ,if_data_call_wait()], - tags = ['streamline_decoded_logs_history'] -) }} - -{% set start = this.identifier.split("_") [-2] %} -{% set stop = this.identifier.split("_") [-1] %} -{{ fsc_utils.decode_logs_history( - start, - stop -) }} diff --git a/models/streamline/silver/decoder/history/streamline__decode_logs_history_015000031_015500031.sql b/models/streamline/silver/decoder/history/streamline__decode_logs_history_015000031_015500031.sql deleted file mode 100644 index cb4ad80..0000000 --- a/models/streamline/silver/decoder/history/streamline__decode_logs_history_015000031_015500031.sql +++ /dev/null @@ -1,12 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = [if_data_call_function( func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}','producer_batch_size', 20000000,'producer_limit_size', {{var('row_limit',7500000)}}))", target = "{{model.schema}}.{{model.alias}}" ) ,if_data_call_wait()], - tags = ['streamline_decoded_logs_history'] -) }} - -{% set start = this.identifier.split("_") [-2] %} -{% set stop = this.identifier.split("_") [-1] %} -{{ fsc_utils.decode_logs_history( - start, - stop -) }} diff --git a/models/streamline/silver/decoder/history/streamline__decode_logs_history_015500032_016000032.sql b/models/streamline/silver/decoder/history/streamline__decode_logs_history_015500032_016000032.sql deleted file mode 100644 index cb4ad80..0000000 --- a/models/streamline/silver/decoder/history/streamline__decode_logs_history_015500032_016000032.sql +++ /dev/null @@ -1,12 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = [if_data_call_function( func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}','producer_batch_size', 20000000,'producer_limit_size', {{var('row_limit',7500000)}}))", target = "{{model.schema}}.{{model.alias}}" ) ,if_data_call_wait()], - tags = ['streamline_decoded_logs_history'] -) }} - -{% set start = this.identifier.split("_") [-2] %} -{% set stop = this.identifier.split("_") [-1] %} -{{ fsc_utils.decode_logs_history( - start, - stop -) }} diff --git a/models/streamline/silver/decoder/history/streamline__decode_logs_history_016000033_016500033.sql b/models/streamline/silver/decoder/history/streamline__decode_logs_history_016000033_016500033.sql deleted file mode 100644 index cb4ad80..0000000 --- a/models/streamline/silver/decoder/history/streamline__decode_logs_history_016000033_016500033.sql +++ /dev/null @@ -1,12 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = [if_data_call_function( func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}','producer_batch_size', 20000000,'producer_limit_size', {{var('row_limit',7500000)}}))", target = "{{model.schema}}.{{model.alias}}" ) ,if_data_call_wait()], - tags = ['streamline_decoded_logs_history'] -) }} - -{% set start = this.identifier.split("_") [-2] %} -{% set stop = this.identifier.split("_") [-1] %} -{{ fsc_utils.decode_logs_history( - start, - stop -) }} diff --git a/models/streamline/silver/decoder/history/streamline__decode_logs_history_016500034_017000034.sql b/models/streamline/silver/decoder/history/streamline__decode_logs_history_016500034_017000034.sql deleted file mode 100644 index cb4ad80..0000000 --- a/models/streamline/silver/decoder/history/streamline__decode_logs_history_016500034_017000034.sql +++ /dev/null @@ -1,12 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = [if_data_call_function( func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}','producer_batch_size', 20000000,'producer_limit_size', {{var('row_limit',7500000)}}))", target = "{{model.schema}}.{{model.alias}}" ) ,if_data_call_wait()], - tags = ['streamline_decoded_logs_history'] -) }} - -{% set start = this.identifier.split("_") [-2] %} -{% set stop = this.identifier.split("_") [-1] %} -{{ fsc_utils.decode_logs_history( - start, - stop -) }} diff --git a/models/streamline/silver/decoder/history/streamline__decode_logs_history_017000035_017500035.sql b/models/streamline/silver/decoder/history/streamline__decode_logs_history_017000035_017500035.sql deleted file mode 100644 index cb4ad80..0000000 --- a/models/streamline/silver/decoder/history/streamline__decode_logs_history_017000035_017500035.sql +++ /dev/null @@ -1,12 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = [if_data_call_function( func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}','producer_batch_size', 20000000,'producer_limit_size', {{var('row_limit',7500000)}}))", target = "{{model.schema}}.{{model.alias}}" ) ,if_data_call_wait()], - tags = ['streamline_decoded_logs_history'] -) }} - -{% set start = this.identifier.split("_") [-2] %} -{% set stop = this.identifier.split("_") [-1] %} -{{ fsc_utils.decode_logs_history( - start, - stop -) }} diff --git a/models/streamline/silver/decoder/history/streamline__decode_logs_history_017500036_018000036.sql b/models/streamline/silver/decoder/history/streamline__decode_logs_history_017500036_018000036.sql deleted file mode 100644 index cb4ad80..0000000 --- a/models/streamline/silver/decoder/history/streamline__decode_logs_history_017500036_018000036.sql +++ /dev/null @@ -1,12 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = [if_data_call_function( func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}','producer_batch_size', 20000000,'producer_limit_size', {{var('row_limit',7500000)}}))", target = "{{model.schema}}.{{model.alias}}" ) ,if_data_call_wait()], - tags = ['streamline_decoded_logs_history'] -) }} - -{% set start = this.identifier.split("_") [-2] %} -{% set stop = this.identifier.split("_") [-1] %} -{{ fsc_utils.decode_logs_history( - start, - stop -) }} diff --git a/models/streamline/silver/decoder/history/streamline__decode_logs_history_018000037_018500037.sql b/models/streamline/silver/decoder/history/streamline__decode_logs_history_018000037_018500037.sql deleted file mode 100644 index cb4ad80..0000000 --- a/models/streamline/silver/decoder/history/streamline__decode_logs_history_018000037_018500037.sql +++ /dev/null @@ -1,12 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = [if_data_call_function( func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}','producer_batch_size', 20000000,'producer_limit_size', {{var('row_limit',7500000)}}))", target = "{{model.schema}}.{{model.alias}}" ) ,if_data_call_wait()], - tags = ['streamline_decoded_logs_history'] -) }} - -{% set start = this.identifier.split("_") [-2] %} -{% set stop = this.identifier.split("_") [-1] %} -{{ fsc_utils.decode_logs_history( - start, - stop -) }} diff --git a/models/streamline/silver/decoder/history/streamline__decode_logs_history_018500038_019000038.sql b/models/streamline/silver/decoder/history/streamline__decode_logs_history_018500038_019000038.sql deleted file mode 100644 index cb4ad80..0000000 --- a/models/streamline/silver/decoder/history/streamline__decode_logs_history_018500038_019000038.sql +++ /dev/null @@ -1,12 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = [if_data_call_function( func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}','producer_batch_size', 20000000,'producer_limit_size', {{var('row_limit',7500000)}}))", target = "{{model.schema}}.{{model.alias}}" ) ,if_data_call_wait()], - tags = ['streamline_decoded_logs_history'] -) }} - -{% set start = this.identifier.split("_") [-2] %} -{% set stop = this.identifier.split("_") [-1] %} -{{ fsc_utils.decode_logs_history( - start, - stop -) }} diff --git a/models/streamline/silver/decoder/history/streamline__decode_logs_history_019000039_019500039.sql b/models/streamline/silver/decoder/history/streamline__decode_logs_history_019000039_019500039.sql deleted file mode 100644 index cb4ad80..0000000 --- a/models/streamline/silver/decoder/history/streamline__decode_logs_history_019000039_019500039.sql +++ /dev/null @@ -1,12 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = [if_data_call_function( func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}','producer_batch_size', 20000000,'producer_limit_size', {{var('row_limit',7500000)}}))", target = "{{model.schema}}.{{model.alias}}" ) ,if_data_call_wait()], - tags = ['streamline_decoded_logs_history'] -) }} - -{% set start = this.identifier.split("_") [-2] %} -{% set stop = this.identifier.split("_") [-1] %} -{{ fsc_utils.decode_logs_history( - start, - stop -) }} diff --git a/models/streamline/silver/decoder/history/streamline__decode_logs_history_019500040_020000040.sql b/models/streamline/silver/decoder/history/streamline__decode_logs_history_019500040_020000040.sql deleted file mode 100644 index cb4ad80..0000000 --- a/models/streamline/silver/decoder/history/streamline__decode_logs_history_019500040_020000040.sql +++ /dev/null @@ -1,12 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = [if_data_call_function( func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}','producer_batch_size', 20000000,'producer_limit_size', {{var('row_limit',7500000)}}))", target = "{{model.schema}}.{{model.alias}}" ) ,if_data_call_wait()], - tags = ['streamline_decoded_logs_history'] -) }} - -{% set start = this.identifier.split("_") [-2] %} -{% set stop = this.identifier.split("_") [-1] %} -{{ fsc_utils.decode_logs_history( - start, - stop -) }} diff --git a/models/streamline/silver/decoder/history/streamline__decode_logs_history_020000041_020500041.sql b/models/streamline/silver/decoder/history/streamline__decode_logs_history_020000041_020500041.sql deleted file mode 100644 index cb4ad80..0000000 --- a/models/streamline/silver/decoder/history/streamline__decode_logs_history_020000041_020500041.sql +++ /dev/null @@ -1,12 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = [if_data_call_function( func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}','producer_batch_size', 20000000,'producer_limit_size', {{var('row_limit',7500000)}}))", target = "{{model.schema}}.{{model.alias}}" ) ,if_data_call_wait()], - tags = ['streamline_decoded_logs_history'] -) }} - -{% set start = this.identifier.split("_") [-2] %} -{% set stop = this.identifier.split("_") [-1] %} -{{ fsc_utils.decode_logs_history( - start, - stop -) }} diff --git a/models/streamline/silver/decoder/history/streamline__decode_logs_history_020500042_021000042.sql b/models/streamline/silver/decoder/history/streamline__decode_logs_history_020500042_021000042.sql deleted file mode 100644 index cb4ad80..0000000 --- a/models/streamline/silver/decoder/history/streamline__decode_logs_history_020500042_021000042.sql +++ /dev/null @@ -1,12 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = [if_data_call_function( func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}','producer_batch_size', 20000000,'producer_limit_size', {{var('row_limit',7500000)}}))", target = "{{model.schema}}.{{model.alias}}" ) ,if_data_call_wait()], - tags = ['streamline_decoded_logs_history'] -) }} - -{% set start = this.identifier.split("_") [-2] %} -{% set stop = this.identifier.split("_") [-1] %} -{{ fsc_utils.decode_logs_history( - start, - stop -) }} diff --git a/models/streamline/silver/decoder/history/streamline__decode_logs_history_021000043_021500043.sql b/models/streamline/silver/decoder/history/streamline__decode_logs_history_021000043_021500043.sql deleted file mode 100644 index cb4ad80..0000000 --- a/models/streamline/silver/decoder/history/streamline__decode_logs_history_021000043_021500043.sql +++ /dev/null @@ -1,12 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = [if_data_call_function( func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}','producer_batch_size', 20000000,'producer_limit_size', {{var('row_limit',7500000)}}))", target = "{{model.schema}}.{{model.alias}}" ) ,if_data_call_wait()], - tags = ['streamline_decoded_logs_history'] -) }} - -{% set start = this.identifier.split("_") [-2] %} -{% set stop = this.identifier.split("_") [-1] %} -{{ fsc_utils.decode_logs_history( - start, - stop -) }} diff --git a/models/streamline/silver/decoder/history/streamline__decode_logs_history_021500044_022000044.sql b/models/streamline/silver/decoder/history/streamline__decode_logs_history_021500044_022000044.sql deleted file mode 100644 index cb4ad80..0000000 --- a/models/streamline/silver/decoder/history/streamline__decode_logs_history_021500044_022000044.sql +++ /dev/null @@ -1,12 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = [if_data_call_function( func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}','producer_batch_size', 20000000,'producer_limit_size', {{var('row_limit',7500000)}}))", target = "{{model.schema}}.{{model.alias}}" ) ,if_data_call_wait()], - tags = ['streamline_decoded_logs_history'] -) }} - -{% set start = this.identifier.split("_") [-2] %} -{% set stop = this.identifier.split("_") [-1] %} -{{ fsc_utils.decode_logs_history( - start, - stop -) }} diff --git a/models/streamline/silver/decoder/history/streamline__decode_logs_history_022000045_022500045.sql b/models/streamline/silver/decoder/history/streamline__decode_logs_history_022000045_022500045.sql deleted file mode 100644 index cb4ad80..0000000 --- a/models/streamline/silver/decoder/history/streamline__decode_logs_history_022000045_022500045.sql +++ /dev/null @@ -1,12 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = [if_data_call_function( func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}','producer_batch_size', 20000000,'producer_limit_size', {{var('row_limit',7500000)}}))", target = "{{model.schema}}.{{model.alias}}" ) ,if_data_call_wait()], - tags = ['streamline_decoded_logs_history'] -) }} - -{% set start = this.identifier.split("_") [-2] %} -{% set stop = this.identifier.split("_") [-1] %} -{{ fsc_utils.decode_logs_history( - start, - stop -) }} diff --git a/models/streamline/silver/decoder/history/streamline__decode_logs_history_022500046_023000046.sql b/models/streamline/silver/decoder/history/streamline__decode_logs_history_022500046_023000046.sql deleted file mode 100644 index cb4ad80..0000000 --- a/models/streamline/silver/decoder/history/streamline__decode_logs_history_022500046_023000046.sql +++ /dev/null @@ -1,12 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = [if_data_call_function( func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}','producer_batch_size', 20000000,'producer_limit_size', {{var('row_limit',7500000)}}))", target = "{{model.schema}}.{{model.alias}}" ) ,if_data_call_wait()], - tags = ['streamline_decoded_logs_history'] -) }} - -{% set start = this.identifier.split("_") [-2] %} -{% set stop = this.identifier.split("_") [-1] %} -{{ fsc_utils.decode_logs_history( - start, - stop -) }} diff --git a/models/streamline/silver/decoder/history/streamline__decode_logs_history_023000047_023500047.sql b/models/streamline/silver/decoder/history/streamline__decode_logs_history_023000047_023500047.sql deleted file mode 100644 index cb4ad80..0000000 --- a/models/streamline/silver/decoder/history/streamline__decode_logs_history_023000047_023500047.sql +++ /dev/null @@ -1,12 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = [if_data_call_function( func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}','producer_batch_size', 20000000,'producer_limit_size', {{var('row_limit',7500000)}}))", target = "{{model.schema}}.{{model.alias}}" ) ,if_data_call_wait()], - tags = ['streamline_decoded_logs_history'] -) }} - -{% set start = this.identifier.split("_") [-2] %} -{% set stop = this.identifier.split("_") [-1] %} -{{ fsc_utils.decode_logs_history( - start, - stop -) }} diff --git a/models/streamline/silver/decoder/history/streamline__decode_logs_history_023500048_024000048.sql b/models/streamline/silver/decoder/history/streamline__decode_logs_history_023500048_024000048.sql deleted file mode 100644 index cb4ad80..0000000 --- a/models/streamline/silver/decoder/history/streamline__decode_logs_history_023500048_024000048.sql +++ /dev/null @@ -1,12 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = [if_data_call_function( func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}','producer_batch_size', 20000000,'producer_limit_size', {{var('row_limit',7500000)}}))", target = "{{model.schema}}.{{model.alias}}" ) ,if_data_call_wait()], - tags = ['streamline_decoded_logs_history'] -) }} - -{% set start = this.identifier.split("_") [-2] %} -{% set stop = this.identifier.split("_") [-1] %} -{{ fsc_utils.decode_logs_history( - start, - stop -) }} diff --git a/models/streamline/silver/decoder/history/streamline__decode_logs_history_024000049_024500049.sql b/models/streamline/silver/decoder/history/streamline__decode_logs_history_024000049_024500049.sql deleted file mode 100644 index cb4ad80..0000000 --- a/models/streamline/silver/decoder/history/streamline__decode_logs_history_024000049_024500049.sql +++ /dev/null @@ -1,12 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = [if_data_call_function( func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}','producer_batch_size', 20000000,'producer_limit_size', {{var('row_limit',7500000)}}))", target = "{{model.schema}}.{{model.alias}}" ) ,if_data_call_wait()], - tags = ['streamline_decoded_logs_history'] -) }} - -{% set start = this.identifier.split("_") [-2] %} -{% set stop = this.identifier.split("_") [-1] %} -{{ fsc_utils.decode_logs_history( - start, - stop -) }} diff --git a/models/streamline/silver/decoder/history/streamline__decode_logs_history_024500050_025000050.sql b/models/streamline/silver/decoder/history/streamline__decode_logs_history_024500050_025000050.sql deleted file mode 100644 index cb4ad80..0000000 --- a/models/streamline/silver/decoder/history/streamline__decode_logs_history_024500050_025000050.sql +++ /dev/null @@ -1,12 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = [if_data_call_function( func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}','producer_batch_size', 20000000,'producer_limit_size', {{var('row_limit',7500000)}}))", target = "{{model.schema}}.{{model.alias}}" ) ,if_data_call_wait()], - tags = ['streamline_decoded_logs_history'] -) }} - -{% set start = this.identifier.split("_") [-2] %} -{% set stop = this.identifier.split("_") [-1] %} -{{ fsc_utils.decode_logs_history( - start, - stop -) }} diff --git a/models/streamline/silver/decoder/history/streamline__decode_logs_history_025000051_025500051.sql b/models/streamline/silver/decoder/history/streamline__decode_logs_history_025000051_025500051.sql deleted file mode 100644 index cb4ad80..0000000 --- a/models/streamline/silver/decoder/history/streamline__decode_logs_history_025000051_025500051.sql +++ /dev/null @@ -1,12 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = [if_data_call_function( func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}','producer_batch_size', 20000000,'producer_limit_size', {{var('row_limit',7500000)}}))", target = "{{model.schema}}.{{model.alias}}" ) ,if_data_call_wait()], - tags = ['streamline_decoded_logs_history'] -) }} - -{% set start = this.identifier.split("_") [-2] %} -{% set stop = this.identifier.split("_") [-1] %} -{{ fsc_utils.decode_logs_history( - start, - stop -) }} diff --git a/models/streamline/silver/decoder/history/streamline__decode_logs_history_025500052_026000052.sql b/models/streamline/silver/decoder/history/streamline__decode_logs_history_025500052_026000052.sql deleted file mode 100644 index cb4ad80..0000000 --- a/models/streamline/silver/decoder/history/streamline__decode_logs_history_025500052_026000052.sql +++ /dev/null @@ -1,12 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = [if_data_call_function( func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}','producer_batch_size', 20000000,'producer_limit_size', {{var('row_limit',7500000)}}))", target = "{{model.schema}}.{{model.alias}}" ) ,if_data_call_wait()], - tags = ['streamline_decoded_logs_history'] -) }} - -{% set start = this.identifier.split("_") [-2] %} -{% set stop = this.identifier.split("_") [-1] %} -{{ fsc_utils.decode_logs_history( - start, - stop -) }} diff --git a/models/streamline/silver/decoder/history/streamline__decode_logs_history_026000053_026500053.sql b/models/streamline/silver/decoder/history/streamline__decode_logs_history_026000053_026500053.sql deleted file mode 100644 index cb4ad80..0000000 --- a/models/streamline/silver/decoder/history/streamline__decode_logs_history_026000053_026500053.sql +++ /dev/null @@ -1,12 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = [if_data_call_function( func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}','producer_batch_size', 20000000,'producer_limit_size', {{var('row_limit',7500000)}}))", target = "{{model.schema}}.{{model.alias}}" ) ,if_data_call_wait()], - tags = ['streamline_decoded_logs_history'] -) }} - -{% set start = this.identifier.split("_") [-2] %} -{% set stop = this.identifier.split("_") [-1] %} -{{ fsc_utils.decode_logs_history( - start, - stop -) }} diff --git a/models/streamline/silver/decoder/history/streamline__decode_logs_history_026500054_027000054.sql b/models/streamline/silver/decoder/history/streamline__decode_logs_history_026500054_027000054.sql deleted file mode 100644 index cb4ad80..0000000 --- a/models/streamline/silver/decoder/history/streamline__decode_logs_history_026500054_027000054.sql +++ /dev/null @@ -1,12 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = [if_data_call_function( func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}','producer_batch_size', 20000000,'producer_limit_size', {{var('row_limit',7500000)}}))", target = "{{model.schema}}.{{model.alias}}" ) ,if_data_call_wait()], - tags = ['streamline_decoded_logs_history'] -) }} - -{% set start = this.identifier.split("_") [-2] %} -{% set stop = this.identifier.split("_") [-1] %} -{{ fsc_utils.decode_logs_history( - start, - stop -) }} diff --git a/models/streamline/silver/decoder/history/streamline__decode_logs_history_027000055_027500055.sql b/models/streamline/silver/decoder/history/streamline__decode_logs_history_027000055_027500055.sql deleted file mode 100644 index cb4ad80..0000000 --- a/models/streamline/silver/decoder/history/streamline__decode_logs_history_027000055_027500055.sql +++ /dev/null @@ -1,12 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = [if_data_call_function( func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}','producer_batch_size', 20000000,'producer_limit_size', {{var('row_limit',7500000)}}))", target = "{{model.schema}}.{{model.alias}}" ) ,if_data_call_wait()], - tags = ['streamline_decoded_logs_history'] -) }} - -{% set start = this.identifier.split("_") [-2] %} -{% set stop = this.identifier.split("_") [-1] %} -{{ fsc_utils.decode_logs_history( - start, - stop -) }} diff --git a/models/streamline/silver/decoder/history/streamline__decode_logs_history_027500056_028000056.sql b/models/streamline/silver/decoder/history/streamline__decode_logs_history_027500056_028000056.sql deleted file mode 100644 index cb4ad80..0000000 --- a/models/streamline/silver/decoder/history/streamline__decode_logs_history_027500056_028000056.sql +++ /dev/null @@ -1,12 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = [if_data_call_function( func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}','producer_batch_size', 20000000,'producer_limit_size', {{var('row_limit',7500000)}}))", target = "{{model.schema}}.{{model.alias}}" ) ,if_data_call_wait()], - tags = ['streamline_decoded_logs_history'] -) }} - -{% set start = this.identifier.split("_") [-2] %} -{% set stop = this.identifier.split("_") [-1] %} -{{ fsc_utils.decode_logs_history( - start, - stop -) }} diff --git a/models/streamline/silver/decoder/history/streamline__decode_logs_history_028000057_028500057.sql b/models/streamline/silver/decoder/history/streamline__decode_logs_history_028000057_028500057.sql deleted file mode 100644 index cb4ad80..0000000 --- a/models/streamline/silver/decoder/history/streamline__decode_logs_history_028000057_028500057.sql +++ /dev/null @@ -1,12 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = [if_data_call_function( func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}','producer_batch_size', 20000000,'producer_limit_size', {{var('row_limit',7500000)}}))", target = "{{model.schema}}.{{model.alias}}" ) ,if_data_call_wait()], - tags = ['streamline_decoded_logs_history'] -) }} - -{% set start = this.identifier.split("_") [-2] %} -{% set stop = this.identifier.split("_") [-1] %} -{{ fsc_utils.decode_logs_history( - start, - stop -) }} diff --git a/models/streamline/silver/decoder/history/streamline__decode_logs_history_028500058_029000058.sql b/models/streamline/silver/decoder/history/streamline__decode_logs_history_028500058_029000058.sql deleted file mode 100644 index cb4ad80..0000000 --- a/models/streamline/silver/decoder/history/streamline__decode_logs_history_028500058_029000058.sql +++ /dev/null @@ -1,12 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = [if_data_call_function( func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}','producer_batch_size', 20000000,'producer_limit_size', {{var('row_limit',7500000)}}))", target = "{{model.schema}}.{{model.alias}}" ) ,if_data_call_wait()], - tags = ['streamline_decoded_logs_history'] -) }} - -{% set start = this.identifier.split("_") [-2] %} -{% set stop = this.identifier.split("_") [-1] %} -{{ fsc_utils.decode_logs_history( - start, - stop -) }} diff --git a/models/streamline/silver/decoder/history/streamline__decode_logs_history_029000059_029500059.sql b/models/streamline/silver/decoder/history/streamline__decode_logs_history_029000059_029500059.sql deleted file mode 100644 index cb4ad80..0000000 --- a/models/streamline/silver/decoder/history/streamline__decode_logs_history_029000059_029500059.sql +++ /dev/null @@ -1,12 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = [if_data_call_function( func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}','producer_batch_size', 20000000,'producer_limit_size', {{var('row_limit',7500000)}}))", target = "{{model.schema}}.{{model.alias}}" ) ,if_data_call_wait()], - tags = ['streamline_decoded_logs_history'] -) }} - -{% set start = this.identifier.split("_") [-2] %} -{% set stop = this.identifier.split("_") [-1] %} -{{ fsc_utils.decode_logs_history( - start, - stop -) }} diff --git a/models/streamline/silver/decoder/history/streamline__decode_logs_history_029500060_030000060.sql b/models/streamline/silver/decoder/history/streamline__decode_logs_history_029500060_030000060.sql deleted file mode 100644 index cb4ad80..0000000 --- a/models/streamline/silver/decoder/history/streamline__decode_logs_history_029500060_030000060.sql +++ /dev/null @@ -1,12 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = [if_data_call_function( func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}','producer_batch_size', 20000000,'producer_limit_size', {{var('row_limit',7500000)}}))", target = "{{model.schema}}.{{model.alias}}" ) ,if_data_call_wait()], - tags = ['streamline_decoded_logs_history'] -) }} - -{% set start = this.identifier.split("_") [-2] %} -{% set stop = this.identifier.split("_") [-1] %} -{{ fsc_utils.decode_logs_history( - start, - stop -) }} diff --git a/models/streamline/silver/decoder/history/streamline__decode_logs_history_030000061_030500061.sql b/models/streamline/silver/decoder/history/streamline__decode_logs_history_030000061_030500061.sql deleted file mode 100644 index cb4ad80..0000000 --- a/models/streamline/silver/decoder/history/streamline__decode_logs_history_030000061_030500061.sql +++ /dev/null @@ -1,12 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = [if_data_call_function( func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}','producer_batch_size', 20000000,'producer_limit_size', {{var('row_limit',7500000)}}))", target = "{{model.schema}}.{{model.alias}}" ) ,if_data_call_wait()], - tags = ['streamline_decoded_logs_history'] -) }} - -{% set start = this.identifier.split("_") [-2] %} -{% set stop = this.identifier.split("_") [-1] %} -{{ fsc_utils.decode_logs_history( - start, - stop -) }} diff --git a/models/streamline/silver/decoder/realtime/streamline__decode_logs_realtime.sql b/models/streamline/silver/decoder/realtime/streamline__decode_logs_realtime.sql index 3c9f43f..97e293a 100644 --- a/models/streamline/silver/decoder/realtime/streamline__decode_logs_realtime.sql +++ b/models/streamline/silver/decoder/realtime/streamline__decode_logs_realtime.sql @@ -4,12 +4,50 @@ tags = ['streamline_decoded_logs_realtime'] ) }} -WITH look_back AS ( +WITH target_blocks AS ( SELECT block_number + FROM + {{ ref('core__fact_blocks') }} + WHERE + block_number >= ( + SELECT + block_number + FROM + {{ ref("_block_lookback") }} + ) +), +existing_logs_to_exclude AS ( + SELECT + _log_id FROM - {{ ref("_24_hour_lookback") }} + {{ ref('streamline__complete_decode_logs') }} + l + INNER JOIN target_blocks b USING (block_number) + WHERE + l._inserted_timestamp :: DATE >= DATEADD('day', -5, SYSDATE()) +), +candidate_logs AS ( + SELECT + l.block_number, + l.tx_hash, + l.event_index, + l.contract_address, + l.topics, + l.data, + CONCAT( + l.tx_hash :: STRING, + '-', + l.event_index :: STRING + ) AS _log_id + FROM + target_blocks b + INNER JOIN {{ ref('core__fact_event_logs') }} + l USING (block_number) + WHERE + l.tx_status = 'SUCCESS' + AND l.inserted_timestamp :: DATE >= DATEADD('day', -5, SYSDATE()) ) SELECT l.block_number, @@ -24,34 +62,19 @@ SELECT l.contract_address ) AS DATA FROM - {{ ref("silver__logs") }} - l - INNER JOIN {{ ref("silver__complete_event_abis") }} A + candidate_logs l + INNER JOIN {{ ref('silver__complete_event_abis') }} A ON A.parent_contract_address = l.contract_address AND A.event_signature = l.topics [0] :: STRING AND l.block_number BETWEEN A.start_block AND A.end_block WHERE - ( - l.block_number >= ( - SELECT - block_number - FROM - look_back - ) - ) - AND l.block_number IS NOT NULL - AND l.block_timestamp >= DATEADD('day', -2, CURRENT_DATE()) - AND _log_id NOT IN ( + NOT EXISTS ( SELECT - _log_id + 1 FROM - {{ ref("streamline__complete_decode_logs") }} + existing_logs_to_exclude e WHERE - block_number >= ( - SELECT - block_number - FROM - look_back - ) - AND _inserted_timestamp >= DATEADD('day', -2, CURRENT_DATE())) + e._log_id = l._log_id + ) +limit 7500000 \ No newline at end of file