From ef0f78b9a35aa27905d707a6c4d1802e6eb20247 Mon Sep 17 00:00:00 2001 From: Austin <93135983+austinFlipside@users.noreply.github.com> Date: Tue, 19 Nov 2024 15:43:19 -0500 Subject: [PATCH] An 5441/upgrade decoder history process (#81) * upgrades * ignore --- .github/workflows/dbt_run_dev_refresh.yml | 4 +- ...eduled_decoded_logs_history_user_abis.yml} | 20 ++- ...bt_run_streamline_decoded_logs_history.yml | 49 +++++++ .gitignore | 4 +- data/github_actions__workflows.csv | 4 +- macros/decoder/decoded_logs_history.sql | 127 ++++++++++++++++++ macros/decoder/run_decoded_logs_history.sql | 29 ++++ ...ecode_logs_history_000000001_000100001.sql | 16 --- ...ecode_logs_history_000100002_000200002.sql | 16 --- ...ecode_logs_history_000200003_000300003.sql | 16 --- ...ecode_logs_history_000300004_000400004.sql | 16 --- ...ecode_logs_history_000400005_000500005.sql | 16 --- ...ecode_logs_history_000500006_000600006.sql | 16 --- ...ecode_logs_history_000600007_000700007.sql | 16 --- ...ecode_logs_history_000700008_000800008.sql | 16 --- ...ecode_logs_history_000800009_000900009.sql | 16 --- ...ecode_logs_history_000900010_001000010.sql | 16 --- ...ecode_logs_history_001000011_001100011.sql | 16 --- ...ecode_logs_history_001100012_001200012.sql | 16 --- ...ecode_logs_history_001200013_001300013.sql | 16 --- ...ecode_logs_history_001300014_001400014.sql | 16 --- ...ecode_logs_history_001400015_001500015.sql | 16 --- ...ecode_logs_history_001500016_001600016.sql | 16 --- ...ecode_logs_history_001600017_001700017.sql | 16 --- ...ecode_logs_history_001700018_001800018.sql | 16 --- ...ecode_logs_history_001800019_001900019.sql | 16 --- ...ecode_logs_history_001900020_002000020.sql | 16 --- ...ecode_logs_history_002000021_002100021.sql | 16 --- ...ecode_logs_history_002100022_002200022.sql | 16 --- ...ecode_logs_history_002200023_002300023.sql | 16 --- ...ecode_logs_history_002300024_002400024.sql | 16 --- ...ecode_logs_history_002400025_002500000.sql | 16 --- ...ecode_logs_history_002500001_005000000.sql | 16 --- ...ecode_logs_history_005000001_007500000.sql | 16 --- ...ecode_logs_history_010000001_012500000.sql | 16 --- ...ecode_logs_history_012500001_015000000.sql | 16 --- ...ecode_logs_history_015000001_017500000.sql | 16 --- ...ecode_logs_history_017500001_020000000.sql | 16 --- ...ecode_logs_history_020000001_022500000.sql | 16 --- ...ecode_logs_history_022500001_025000000.sql | 16 --- ...ecode_logs_history_025000001_027500000.sql | 16 --- ...ecode_logs_history_027500001_030000000.sql | 16 --- ...ecode_logs_history_030000001_032500000.sql | 16 --- ...ecode_logs_history_032500001_035000000.sql | 16 --- ...ecode_logs_history_035000001_037500000.sql | 16 --- ...ecode_logs_history_037500001_040000000.sql | 16 --- ...ecode_logs_history_040000001_042500000.sql | 16 --- ...ecode_logs_history_042500001_045000000.sql | 16 --- ...ecode_logs_history_045000001_047500000.sql | 16 --- ...ecode_logs_history_047500001_050000000.sql | 16 --- .../streamline__decode_logs_realtime.sql | 74 ++++++---- 51 files changed, 270 insertions(+), 729 deletions(-) rename .github/workflows/{dbt_run_streamline_decoder_history.yml => dbt_run_scheduled_decoded_logs_history_user_abis.yml} (63%) create mode 100644 .github/workflows/dbt_run_streamline_decoded_logs_history.yml create mode 100644 macros/decoder/decoded_logs_history.sql create mode 100644 macros/decoder/run_decoded_logs_history.sql delete mode 100644 models/streamline/silver/decoder/history/streamline__decode_logs_history_000000001_000100001.sql delete mode 100644 models/streamline/silver/decoder/history/streamline__decode_logs_history_000100002_000200002.sql delete mode 100644 models/streamline/silver/decoder/history/streamline__decode_logs_history_000200003_000300003.sql delete mode 100644 models/streamline/silver/decoder/history/streamline__decode_logs_history_000300004_000400004.sql delete mode 100644 models/streamline/silver/decoder/history/streamline__decode_logs_history_000400005_000500005.sql delete mode 100644 models/streamline/silver/decoder/history/streamline__decode_logs_history_000500006_000600006.sql delete mode 100644 models/streamline/silver/decoder/history/streamline__decode_logs_history_000600007_000700007.sql delete mode 100644 models/streamline/silver/decoder/history/streamline__decode_logs_history_000700008_000800008.sql delete mode 100644 models/streamline/silver/decoder/history/streamline__decode_logs_history_000800009_000900009.sql delete mode 100644 models/streamline/silver/decoder/history/streamline__decode_logs_history_000900010_001000010.sql delete mode 100644 models/streamline/silver/decoder/history/streamline__decode_logs_history_001000011_001100011.sql delete mode 100644 models/streamline/silver/decoder/history/streamline__decode_logs_history_001100012_001200012.sql delete mode 100644 models/streamline/silver/decoder/history/streamline__decode_logs_history_001200013_001300013.sql delete mode 100644 models/streamline/silver/decoder/history/streamline__decode_logs_history_001300014_001400014.sql delete mode 100644 models/streamline/silver/decoder/history/streamline__decode_logs_history_001400015_001500015.sql delete mode 100644 models/streamline/silver/decoder/history/streamline__decode_logs_history_001500016_001600016.sql delete mode 100644 models/streamline/silver/decoder/history/streamline__decode_logs_history_001600017_001700017.sql delete mode 100644 models/streamline/silver/decoder/history/streamline__decode_logs_history_001700018_001800018.sql delete mode 100644 models/streamline/silver/decoder/history/streamline__decode_logs_history_001800019_001900019.sql delete mode 100644 models/streamline/silver/decoder/history/streamline__decode_logs_history_001900020_002000020.sql delete mode 100644 models/streamline/silver/decoder/history/streamline__decode_logs_history_002000021_002100021.sql delete mode 100644 models/streamline/silver/decoder/history/streamline__decode_logs_history_002100022_002200022.sql delete mode 100644 models/streamline/silver/decoder/history/streamline__decode_logs_history_002200023_002300023.sql delete mode 100644 models/streamline/silver/decoder/history/streamline__decode_logs_history_002300024_002400024.sql delete mode 100644 models/streamline/silver/decoder/history/streamline__decode_logs_history_002400025_002500000.sql delete mode 100644 models/streamline/silver/decoder/history/streamline__decode_logs_history_002500001_005000000.sql delete mode 100644 models/streamline/silver/decoder/history/streamline__decode_logs_history_005000001_007500000.sql delete mode 100644 models/streamline/silver/decoder/history/streamline__decode_logs_history_010000001_012500000.sql delete mode 100644 models/streamline/silver/decoder/history/streamline__decode_logs_history_012500001_015000000.sql delete mode 100644 models/streamline/silver/decoder/history/streamline__decode_logs_history_015000001_017500000.sql delete mode 100644 models/streamline/silver/decoder/history/streamline__decode_logs_history_017500001_020000000.sql delete mode 100644 models/streamline/silver/decoder/history/streamline__decode_logs_history_020000001_022500000.sql delete mode 100644 models/streamline/silver/decoder/history/streamline__decode_logs_history_022500001_025000000.sql delete mode 100644 models/streamline/silver/decoder/history/streamline__decode_logs_history_025000001_027500000.sql delete mode 100644 models/streamline/silver/decoder/history/streamline__decode_logs_history_027500001_030000000.sql delete mode 100644 models/streamline/silver/decoder/history/streamline__decode_logs_history_030000001_032500000.sql delete mode 100644 models/streamline/silver/decoder/history/streamline__decode_logs_history_032500001_035000000.sql delete mode 100644 models/streamline/silver/decoder/history/streamline__decode_logs_history_035000001_037500000.sql delete mode 100644 models/streamline/silver/decoder/history/streamline__decode_logs_history_037500001_040000000.sql delete mode 100644 models/streamline/silver/decoder/history/streamline__decode_logs_history_040000001_042500000.sql delete mode 100644 models/streamline/silver/decoder/history/streamline__decode_logs_history_042500001_045000000.sql delete mode 100644 models/streamline/silver/decoder/history/streamline__decode_logs_history_045000001_047500000.sql delete mode 100644 models/streamline/silver/decoder/history/streamline__decode_logs_history_047500001_050000000.sql diff --git a/.github/workflows/dbt_run_dev_refresh.yml b/.github/workflows/dbt_run_dev_refresh.yml index 1d96e63..93187a6 100644 --- a/.github/workflows/dbt_run_dev_refresh.yml +++ b/.github/workflows/dbt_run_dev_refresh.yml @@ -4,8 +4,8 @@ run-name: dbt_run_dev_refresh on: workflow_dispatch: schedule: - # Runs "at 9:00 UTC" (see https://crontab.guru) - - cron: '0 9 * * *' + # Runs "at 9:30 UTC every Monday" (see https://crontab.guru) + - cron: '30 9 * * 1' env: DBT_PROFILES_DIR: ./ diff --git a/.github/workflows/dbt_run_streamline_decoder_history.yml b/.github/workflows/dbt_run_scheduled_decoded_logs_history_user_abis.yml similarity index 63% rename from .github/workflows/dbt_run_streamline_decoder_history.yml rename to .github/workflows/dbt_run_scheduled_decoded_logs_history_user_abis.yml index 001afe1..63bf5a2 100644 --- a/.github/workflows/dbt_run_streamline_decoder_history.yml +++ b/.github/workflows/dbt_run_scheduled_decoded_logs_history_user_abis.yml @@ -1,12 +1,11 @@ -name: dbt_run_streamline_decoder_history -run-name: dbt_run_streamline_decoder_history +name: dbt_run_scheduled_decoded_logs_history_user_abis +run-name: dbt_run_scheduled_decoded_logs_history_user_abis on: workflow_dispatch: - schedule: - # Runs "at 1:05 UTC AM" (see https://crontab.guru) - - cron: '5 1 * * *' - + branches: + - "main" + env: DBT_PROFILES_DIR: ./ @@ -22,12 +21,10 @@ env: concurrency: group: ${{ github.workflow }} - - jobs: run_dbt_jobs: runs-on: ubuntu-latest - environment: + environment: name: workflow_prod steps: @@ -42,6 +39,7 @@ jobs: run: | pip install -r requirements.txt dbt deps - - name: Run DBT Jobs + + - name: Kick off decoded logs history, if there are new ABIs from users run: | - dbt run --threads 8 --vars '{"STREAMLINE_INVOKE_STREAMS":True,"WAIT":120}' -m "blast_models,tag:streamline_decoded_logs_complete" "blast_models,tag:streamline_decoded_logs_history" \ No newline at end of file + dbt run-operation run_decoded_logs_history \ No newline at end of file diff --git a/.github/workflows/dbt_run_streamline_decoded_logs_history.yml b/.github/workflows/dbt_run_streamline_decoded_logs_history.yml new file mode 100644 index 0000000..dd30bc3 --- /dev/null +++ b/.github/workflows/dbt_run_streamline_decoded_logs_history.yml @@ -0,0 +1,49 @@ +name: dbt_run_streamline_decoded_logs_history +run-name: dbt_run_streamline_decoded_logs_history + +on: + workflow_dispatch: + branches: + - "main" + +env: + DBT_PROFILES_DIR: ./ + + ACCOUNT: "${{ vars.ACCOUNT }}" + ROLE: "${{ vars.ROLE }}" + USER: "${{ vars.USER }}" + PASSWORD: "${{ secrets.PASSWORD }}" + REGION: "${{ vars.REGION }}" + DATABASE: "${{ vars.DATABASE }}" + WAREHOUSE: "${{ vars.WAREHOUSE }}" + SCHEMA: "${{ vars.SCHEMA }}" + +concurrency: + group: ${{ github.workflow }} + +jobs: + run_dbt_jobs: + runs-on: ubuntu-latest + environment: + name: workflow_prod + + steps: + - uses: actions/checkout@v3 + + - uses: actions/setup-python@v4 + with: + python-version: "3.10" + cache: "pip" + + - name: install dependencies + run: | + pip install -r requirements.txt + dbt deps + + - name: Update complete table + run: | + dbt run -m "blast_models,tag:streamline_decoded_logs_complete" + + - name: Decode historical logs + run: | + dbt run-operation decoded_logs_history --vars '{"STREAMLINE_INVOKE_STREAMS":True}' \ No newline at end of file diff --git a/.gitignore b/.gitignore index d99e9bd..2e8fe84 100644 --- a/.gitignore +++ b/.gitignore @@ -15,4 +15,6 @@ logs/ **/.DS_Store .vscode/ .env -dbt-env/ \ No newline at end of file +dbt-env/ + +package-lock.yml \ No newline at end of file diff --git a/data/github_actions__workflows.csv b/data/github_actions__workflows.csv index 504d91e..3f7d109 100644 --- a/data/github_actions__workflows.csv +++ b/data/github_actions__workflows.csv @@ -3,4 +3,6 @@ dbt_run_scheduled_non_realtime,"17,47 * * * *" dbt_run_streamline_chainhead,"10,40 * * * *" dbt_run_streamline_decoder,"25,55 * * * *" dbt_test_tasks,"10 * * * *" -dbt_run_scheduled_curated,"30 * * * *" \ No newline at end of file +dbt_run_scheduled_curated,"30 * * * *" +dbt_run_streamline_decoded_logs_history,"5 3 * * 6" +dbt_run_scheduled_decoded_logs_history_user_abis,"56 23 * * *" \ No newline at end of file diff --git a/macros/decoder/decoded_logs_history.sql b/macros/decoder/decoded_logs_history.sql new file mode 100644 index 0000000..72f462b --- /dev/null +++ b/macros/decoder/decoded_logs_history.sql @@ -0,0 +1,127 @@ +{% macro decoded_logs_history(backfill_mode=false) %} + + {%- set params = { + "sql_limit": var("DECODED_LOGS_HISTORY_SQL_LIMIT", 7500000), + "producer_batch_size": var("DECODED_LOGS_HISTORY_PRODUCER_BATCH_SIZE", 400000), + "worker_batch_size": var("DECODED_LOGS_HISTORY_WORKER_BATCH_SIZE", 100000) + } -%} + + {% set wait_time = var("DECODED_LOGS_HISTORY_WAIT_TIME", 60) %} + + {% set find_months_query %} + SELECT + DISTINCT date_trunc('month', block_timestamp)::date as month + FROM {{ ref('core__fact_blocks') }} + ORDER BY month ASC + {% endset %} + + {% set results = run_query(find_months_query) %} + + {% if execute %} + {% set months = results.columns[0].values() %} + + {% for month in months %} + {% set view_name = 'decoded_logs_history_' ~ month.strftime('%Y_%m') %} + + {% set create_view_query %} + create or replace view streamline.{{view_name}} as ( + WITH target_blocks AS ( + SELECT + block_number + FROM {{ ref('core__fact_blocks') }} + WHERE date_trunc('month', block_timestamp) = '{{month}}'::timestamp + ), + new_abis AS ( + SELECT + abi, + parent_contract_address, + event_signature, + start_block, + end_block + FROM {{ ref('silver__complete_event_abis') }} + {% if not backfill_mode %} + WHERE inserted_timestamp > dateadd('day', -30, sysdate()) + {% endif %} + ), + existing_logs_to_exclude AS ( + SELECT _log_id + FROM {{ ref('streamline__complete_decode_logs') }} l + INNER JOIN target_blocks b using (block_number) + ), + candidate_logs AS ( + SELECT + l.block_number, + l.tx_hash, + l.event_index, + l.contract_address, + l.topics, + l.data, + concat(l.tx_hash::string, '-', l.event_index::string) as _log_id + FROM target_blocks b + INNER JOIN {{ ref('core__fact_event_logs') }} l using (block_number) + WHERE l.tx_status = 'SUCCESS' and date_trunc('month', l.block_timestamp) = '{{month}}'::timestamp + ) + SELECT + l.block_number, + l._log_id, + A.abi, + OBJECT_CONSTRUCT( + 'topics', l.topics, + 'data', l.data, + 'address', l.contract_address + ) AS data + FROM candidate_logs l + INNER JOIN new_abis A + ON A.parent_contract_address = l.contract_address + AND A.event_signature = l.topics[0]::STRING + AND l.block_number BETWEEN A.start_block AND A.end_block + WHERE NOT EXISTS ( + SELECT 1 + FROM existing_logs_to_exclude e + WHERE e._log_id = l._log_id + ) + LIMIT {{ params.sql_limit }} + ) + {% endset %} + + {# Create the view #} + {% do run_query(create_view_query) %} + {{ log("Created view for month " ~ month.strftime('%Y-%m'), info=True) }} + + {% if var("STREAMLINE_INVOKE_STREAMS", false) %} + {# Check if rows exist first #} + {% set check_rows_query %} + SELECT EXISTS(SELECT 1 FROM streamline.{{view_name}} LIMIT 1) + {% endset %} + + {% set results = run_query(check_rows_query) %} + {% set has_rows = results.columns[0].values()[0] %} + + {% if has_rows %} + {# Invoke streamline since rows exist to decode #} + {% set decode_query %} + SELECT streamline.udf_bulk_decode_logs( + object_construct( + 'sql_source', '{{view_name}}', + 'external_table', 'DECODED_LOGS', + 'sql_limit', {{ params.sql_limit }}, + 'producer_batch_size', {{ params.producer_batch_size }}, + 'worker_batch_size', {{ params.worker_batch_size }}) + ); + {% endset %} + + {% do run_query(decode_query) %} + {{ log("Triggered decoding for month " ~ month.strftime('%Y-%m'), info=True) }} + + {# Call wait since we actually did some decoding #} + {% do run_query("call system$wait(" ~ wait_time ~ ")") %} + {{ log("Completed wait after decoding for month " ~ month.strftime('%Y-%m'), info=True) }} + {% else %} + {{ log("No rows to decode for month " ~ month.strftime('%Y-%m'), info=True) }} + {% endif %} + {% endif %} + + {% endfor %} + {% endif %} + +{% endmacro %} \ No newline at end of file diff --git a/macros/decoder/run_decoded_logs_history.sql b/macros/decoder/run_decoded_logs_history.sql new file mode 100644 index 0000000..83c4365 --- /dev/null +++ b/macros/decoder/run_decoded_logs_history.sql @@ -0,0 +1,29 @@ +{% macro run_decoded_logs_history() %} + +{% set check_for_new_user_abis_query %} + select 1 + from {{ ref('silver__user_verified_abis') }} + where _inserted_timestamp::date = sysdate()::date + and dayname(sysdate()) <> 'Sat' +{% endset %} + +{% set results = run_query(check_for_new_user_abis_query) %} + +{% if execute %} + {% set new_user_abis = results.columns[0].values()[0] %} + + {% if new_user_abis %} + {% set invoke_workflow_query %} + SELECT + github_actions.workflow_dispatches( + 'FlipsideCrypto', + 'arbitrum-models', + 'dbt_run_streamline_decoded_logs_history.yml', + NULL + ) + {% endset %} + + {% do run_query(invoke_workflow_query) %} + {% endif %} +{% endif %} +{% endmacro %} \ No newline at end of file diff --git a/models/streamline/silver/decoder/history/streamline__decode_logs_history_000000001_000100001.sql b/models/streamline/silver/decoder/history/streamline__decode_logs_history_000000001_000100001.sql deleted file mode 100644 index ab65ae5..0000000 --- a/models/streamline/silver/decoder/history/streamline__decode_logs_history_000000001_000100001.sql +++ /dev/null @@ -1,16 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = [if_data_call_function( - func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}', 'external_table', 'DECODED_LOGS', 'sql_limit', {{var('sql_limit','7500000')}}, 'producer_batch_size', {{var('producer_batch_size','20000000')}}, 'worker_batch_size', {{var('worker_batch_size','10000000')}}))", - target = "{{model.schema}}.{{model.alias}}" - ), - if_data_call_wait()], - tags = ['streamline_decoded_logs_history'] -) }} - -{% set start = this.identifier.split("_") [-2] %} -{% set stop = this.identifier.split("_") [-1] %} -{{ fsc_utils.decode_logs_history( - start, - stop -) }} diff --git a/models/streamline/silver/decoder/history/streamline__decode_logs_history_000100002_000200002.sql b/models/streamline/silver/decoder/history/streamline__decode_logs_history_000100002_000200002.sql deleted file mode 100644 index ab65ae5..0000000 --- a/models/streamline/silver/decoder/history/streamline__decode_logs_history_000100002_000200002.sql +++ /dev/null @@ -1,16 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = [if_data_call_function( - func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}', 'external_table', 'DECODED_LOGS', 'sql_limit', {{var('sql_limit','7500000')}}, 'producer_batch_size', {{var('producer_batch_size','20000000')}}, 'worker_batch_size', {{var('worker_batch_size','10000000')}}))", - target = "{{model.schema}}.{{model.alias}}" - ), - if_data_call_wait()], - tags = ['streamline_decoded_logs_history'] -) }} - -{% set start = this.identifier.split("_") [-2] %} -{% set stop = this.identifier.split("_") [-1] %} -{{ fsc_utils.decode_logs_history( - start, - stop -) }} diff --git a/models/streamline/silver/decoder/history/streamline__decode_logs_history_000200003_000300003.sql b/models/streamline/silver/decoder/history/streamline__decode_logs_history_000200003_000300003.sql deleted file mode 100644 index ab65ae5..0000000 --- a/models/streamline/silver/decoder/history/streamline__decode_logs_history_000200003_000300003.sql +++ /dev/null @@ -1,16 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = [if_data_call_function( - func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}', 'external_table', 'DECODED_LOGS', 'sql_limit', {{var('sql_limit','7500000')}}, 'producer_batch_size', {{var('producer_batch_size','20000000')}}, 'worker_batch_size', {{var('worker_batch_size','10000000')}}))", - target = "{{model.schema}}.{{model.alias}}" - ), - if_data_call_wait()], - tags = ['streamline_decoded_logs_history'] -) }} - -{% set start = this.identifier.split("_") [-2] %} -{% set stop = this.identifier.split("_") [-1] %} -{{ fsc_utils.decode_logs_history( - start, - stop -) }} diff --git a/models/streamline/silver/decoder/history/streamline__decode_logs_history_000300004_000400004.sql b/models/streamline/silver/decoder/history/streamline__decode_logs_history_000300004_000400004.sql deleted file mode 100644 index ab65ae5..0000000 --- a/models/streamline/silver/decoder/history/streamline__decode_logs_history_000300004_000400004.sql +++ /dev/null @@ -1,16 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = [if_data_call_function( - func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}', 'external_table', 'DECODED_LOGS', 'sql_limit', {{var('sql_limit','7500000')}}, 'producer_batch_size', {{var('producer_batch_size','20000000')}}, 'worker_batch_size', {{var('worker_batch_size','10000000')}}))", - target = "{{model.schema}}.{{model.alias}}" - ), - if_data_call_wait()], - tags = ['streamline_decoded_logs_history'] -) }} - -{% set start = this.identifier.split("_") [-2] %} -{% set stop = this.identifier.split("_") [-1] %} -{{ fsc_utils.decode_logs_history( - start, - stop -) }} diff --git a/models/streamline/silver/decoder/history/streamline__decode_logs_history_000400005_000500005.sql b/models/streamline/silver/decoder/history/streamline__decode_logs_history_000400005_000500005.sql deleted file mode 100644 index ab65ae5..0000000 --- a/models/streamline/silver/decoder/history/streamline__decode_logs_history_000400005_000500005.sql +++ /dev/null @@ -1,16 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = [if_data_call_function( - func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}', 'external_table', 'DECODED_LOGS', 'sql_limit', {{var('sql_limit','7500000')}}, 'producer_batch_size', {{var('producer_batch_size','20000000')}}, 'worker_batch_size', {{var('worker_batch_size','10000000')}}))", - target = "{{model.schema}}.{{model.alias}}" - ), - if_data_call_wait()], - tags = ['streamline_decoded_logs_history'] -) }} - -{% set start = this.identifier.split("_") [-2] %} -{% set stop = this.identifier.split("_") [-1] %} -{{ fsc_utils.decode_logs_history( - start, - stop -) }} diff --git a/models/streamline/silver/decoder/history/streamline__decode_logs_history_000500006_000600006.sql b/models/streamline/silver/decoder/history/streamline__decode_logs_history_000500006_000600006.sql deleted file mode 100644 index ab65ae5..0000000 --- a/models/streamline/silver/decoder/history/streamline__decode_logs_history_000500006_000600006.sql +++ /dev/null @@ -1,16 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = [if_data_call_function( - func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}', 'external_table', 'DECODED_LOGS', 'sql_limit', {{var('sql_limit','7500000')}}, 'producer_batch_size', {{var('producer_batch_size','20000000')}}, 'worker_batch_size', {{var('worker_batch_size','10000000')}}))", - target = "{{model.schema}}.{{model.alias}}" - ), - if_data_call_wait()], - tags = ['streamline_decoded_logs_history'] -) }} - -{% set start = this.identifier.split("_") [-2] %} -{% set stop = this.identifier.split("_") [-1] %} -{{ fsc_utils.decode_logs_history( - start, - stop -) }} diff --git a/models/streamline/silver/decoder/history/streamline__decode_logs_history_000600007_000700007.sql b/models/streamline/silver/decoder/history/streamline__decode_logs_history_000600007_000700007.sql deleted file mode 100644 index ab65ae5..0000000 --- a/models/streamline/silver/decoder/history/streamline__decode_logs_history_000600007_000700007.sql +++ /dev/null @@ -1,16 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = [if_data_call_function( - func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}', 'external_table', 'DECODED_LOGS', 'sql_limit', {{var('sql_limit','7500000')}}, 'producer_batch_size', {{var('producer_batch_size','20000000')}}, 'worker_batch_size', {{var('worker_batch_size','10000000')}}))", - target = "{{model.schema}}.{{model.alias}}" - ), - if_data_call_wait()], - tags = ['streamline_decoded_logs_history'] -) }} - -{% set start = this.identifier.split("_") [-2] %} -{% set stop = this.identifier.split("_") [-1] %} -{{ fsc_utils.decode_logs_history( - start, - stop -) }} diff --git a/models/streamline/silver/decoder/history/streamline__decode_logs_history_000700008_000800008.sql b/models/streamline/silver/decoder/history/streamline__decode_logs_history_000700008_000800008.sql deleted file mode 100644 index ab65ae5..0000000 --- a/models/streamline/silver/decoder/history/streamline__decode_logs_history_000700008_000800008.sql +++ /dev/null @@ -1,16 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = [if_data_call_function( - func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}', 'external_table', 'DECODED_LOGS', 'sql_limit', {{var('sql_limit','7500000')}}, 'producer_batch_size', {{var('producer_batch_size','20000000')}}, 'worker_batch_size', {{var('worker_batch_size','10000000')}}))", - target = "{{model.schema}}.{{model.alias}}" - ), - if_data_call_wait()], - tags = ['streamline_decoded_logs_history'] -) }} - -{% set start = this.identifier.split("_") [-2] %} -{% set stop = this.identifier.split("_") [-1] %} -{{ fsc_utils.decode_logs_history( - start, - stop -) }} diff --git a/models/streamline/silver/decoder/history/streamline__decode_logs_history_000800009_000900009.sql b/models/streamline/silver/decoder/history/streamline__decode_logs_history_000800009_000900009.sql deleted file mode 100644 index ab65ae5..0000000 --- a/models/streamline/silver/decoder/history/streamline__decode_logs_history_000800009_000900009.sql +++ /dev/null @@ -1,16 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = [if_data_call_function( - func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}', 'external_table', 'DECODED_LOGS', 'sql_limit', {{var('sql_limit','7500000')}}, 'producer_batch_size', {{var('producer_batch_size','20000000')}}, 'worker_batch_size', {{var('worker_batch_size','10000000')}}))", - target = "{{model.schema}}.{{model.alias}}" - ), - if_data_call_wait()], - tags = ['streamline_decoded_logs_history'] -) }} - -{% set start = this.identifier.split("_") [-2] %} -{% set stop = this.identifier.split("_") [-1] %} -{{ fsc_utils.decode_logs_history( - start, - stop -) }} diff --git a/models/streamline/silver/decoder/history/streamline__decode_logs_history_000900010_001000010.sql b/models/streamline/silver/decoder/history/streamline__decode_logs_history_000900010_001000010.sql deleted file mode 100644 index ab65ae5..0000000 --- a/models/streamline/silver/decoder/history/streamline__decode_logs_history_000900010_001000010.sql +++ /dev/null @@ -1,16 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = [if_data_call_function( - func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}', 'external_table', 'DECODED_LOGS', 'sql_limit', {{var('sql_limit','7500000')}}, 'producer_batch_size', {{var('producer_batch_size','20000000')}}, 'worker_batch_size', {{var('worker_batch_size','10000000')}}))", - target = "{{model.schema}}.{{model.alias}}" - ), - if_data_call_wait()], - tags = ['streamline_decoded_logs_history'] -) }} - -{% set start = this.identifier.split("_") [-2] %} -{% set stop = this.identifier.split("_") [-1] %} -{{ fsc_utils.decode_logs_history( - start, - stop -) }} diff --git a/models/streamline/silver/decoder/history/streamline__decode_logs_history_001000011_001100011.sql b/models/streamline/silver/decoder/history/streamline__decode_logs_history_001000011_001100011.sql deleted file mode 100644 index ab65ae5..0000000 --- a/models/streamline/silver/decoder/history/streamline__decode_logs_history_001000011_001100011.sql +++ /dev/null @@ -1,16 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = [if_data_call_function( - func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}', 'external_table', 'DECODED_LOGS', 'sql_limit', {{var('sql_limit','7500000')}}, 'producer_batch_size', {{var('producer_batch_size','20000000')}}, 'worker_batch_size', {{var('worker_batch_size','10000000')}}))", - target = "{{model.schema}}.{{model.alias}}" - ), - if_data_call_wait()], - tags = ['streamline_decoded_logs_history'] -) }} - -{% set start = this.identifier.split("_") [-2] %} -{% set stop = this.identifier.split("_") [-1] %} -{{ fsc_utils.decode_logs_history( - start, - stop -) }} diff --git a/models/streamline/silver/decoder/history/streamline__decode_logs_history_001100012_001200012.sql b/models/streamline/silver/decoder/history/streamline__decode_logs_history_001100012_001200012.sql deleted file mode 100644 index ab65ae5..0000000 --- a/models/streamline/silver/decoder/history/streamline__decode_logs_history_001100012_001200012.sql +++ /dev/null @@ -1,16 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = [if_data_call_function( - func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}', 'external_table', 'DECODED_LOGS', 'sql_limit', {{var('sql_limit','7500000')}}, 'producer_batch_size', {{var('producer_batch_size','20000000')}}, 'worker_batch_size', {{var('worker_batch_size','10000000')}}))", - target = "{{model.schema}}.{{model.alias}}" - ), - if_data_call_wait()], - tags = ['streamline_decoded_logs_history'] -) }} - -{% set start = this.identifier.split("_") [-2] %} -{% set stop = this.identifier.split("_") [-1] %} -{{ fsc_utils.decode_logs_history( - start, - stop -) }} diff --git a/models/streamline/silver/decoder/history/streamline__decode_logs_history_001200013_001300013.sql b/models/streamline/silver/decoder/history/streamline__decode_logs_history_001200013_001300013.sql deleted file mode 100644 index ab65ae5..0000000 --- a/models/streamline/silver/decoder/history/streamline__decode_logs_history_001200013_001300013.sql +++ /dev/null @@ -1,16 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = [if_data_call_function( - func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}', 'external_table', 'DECODED_LOGS', 'sql_limit', {{var('sql_limit','7500000')}}, 'producer_batch_size', {{var('producer_batch_size','20000000')}}, 'worker_batch_size', {{var('worker_batch_size','10000000')}}))", - target = "{{model.schema}}.{{model.alias}}" - ), - if_data_call_wait()], - tags = ['streamline_decoded_logs_history'] -) }} - -{% set start = this.identifier.split("_") [-2] %} -{% set stop = this.identifier.split("_") [-1] %} -{{ fsc_utils.decode_logs_history( - start, - stop -) }} diff --git a/models/streamline/silver/decoder/history/streamline__decode_logs_history_001300014_001400014.sql b/models/streamline/silver/decoder/history/streamline__decode_logs_history_001300014_001400014.sql deleted file mode 100644 index ab65ae5..0000000 --- a/models/streamline/silver/decoder/history/streamline__decode_logs_history_001300014_001400014.sql +++ /dev/null @@ -1,16 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = [if_data_call_function( - func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}', 'external_table', 'DECODED_LOGS', 'sql_limit', {{var('sql_limit','7500000')}}, 'producer_batch_size', {{var('producer_batch_size','20000000')}}, 'worker_batch_size', {{var('worker_batch_size','10000000')}}))", - target = "{{model.schema}}.{{model.alias}}" - ), - if_data_call_wait()], - tags = ['streamline_decoded_logs_history'] -) }} - -{% set start = this.identifier.split("_") [-2] %} -{% set stop = this.identifier.split("_") [-1] %} -{{ fsc_utils.decode_logs_history( - start, - stop -) }} diff --git a/models/streamline/silver/decoder/history/streamline__decode_logs_history_001400015_001500015.sql b/models/streamline/silver/decoder/history/streamline__decode_logs_history_001400015_001500015.sql deleted file mode 100644 index ab65ae5..0000000 --- a/models/streamline/silver/decoder/history/streamline__decode_logs_history_001400015_001500015.sql +++ /dev/null @@ -1,16 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = [if_data_call_function( - func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}', 'external_table', 'DECODED_LOGS', 'sql_limit', {{var('sql_limit','7500000')}}, 'producer_batch_size', {{var('producer_batch_size','20000000')}}, 'worker_batch_size', {{var('worker_batch_size','10000000')}}))", - target = "{{model.schema}}.{{model.alias}}" - ), - if_data_call_wait()], - tags = ['streamline_decoded_logs_history'] -) }} - -{% set start = this.identifier.split("_") [-2] %} -{% set stop = this.identifier.split("_") [-1] %} -{{ fsc_utils.decode_logs_history( - start, - stop -) }} diff --git a/models/streamline/silver/decoder/history/streamline__decode_logs_history_001500016_001600016.sql b/models/streamline/silver/decoder/history/streamline__decode_logs_history_001500016_001600016.sql deleted file mode 100644 index ab65ae5..0000000 --- a/models/streamline/silver/decoder/history/streamline__decode_logs_history_001500016_001600016.sql +++ /dev/null @@ -1,16 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = [if_data_call_function( - func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}', 'external_table', 'DECODED_LOGS', 'sql_limit', {{var('sql_limit','7500000')}}, 'producer_batch_size', {{var('producer_batch_size','20000000')}}, 'worker_batch_size', {{var('worker_batch_size','10000000')}}))", - target = "{{model.schema}}.{{model.alias}}" - ), - if_data_call_wait()], - tags = ['streamline_decoded_logs_history'] -) }} - -{% set start = this.identifier.split("_") [-2] %} -{% set stop = this.identifier.split("_") [-1] %} -{{ fsc_utils.decode_logs_history( - start, - stop -) }} diff --git a/models/streamline/silver/decoder/history/streamline__decode_logs_history_001600017_001700017.sql b/models/streamline/silver/decoder/history/streamline__decode_logs_history_001600017_001700017.sql deleted file mode 100644 index ab65ae5..0000000 --- a/models/streamline/silver/decoder/history/streamline__decode_logs_history_001600017_001700017.sql +++ /dev/null @@ -1,16 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = [if_data_call_function( - func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}', 'external_table', 'DECODED_LOGS', 'sql_limit', {{var('sql_limit','7500000')}}, 'producer_batch_size', {{var('producer_batch_size','20000000')}}, 'worker_batch_size', {{var('worker_batch_size','10000000')}}))", - target = "{{model.schema}}.{{model.alias}}" - ), - if_data_call_wait()], - tags = ['streamline_decoded_logs_history'] -) }} - -{% set start = this.identifier.split("_") [-2] %} -{% set stop = this.identifier.split("_") [-1] %} -{{ fsc_utils.decode_logs_history( - start, - stop -) }} diff --git a/models/streamline/silver/decoder/history/streamline__decode_logs_history_001700018_001800018.sql b/models/streamline/silver/decoder/history/streamline__decode_logs_history_001700018_001800018.sql deleted file mode 100644 index ab65ae5..0000000 --- a/models/streamline/silver/decoder/history/streamline__decode_logs_history_001700018_001800018.sql +++ /dev/null @@ -1,16 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = [if_data_call_function( - func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}', 'external_table', 'DECODED_LOGS', 'sql_limit', {{var('sql_limit','7500000')}}, 'producer_batch_size', {{var('producer_batch_size','20000000')}}, 'worker_batch_size', {{var('worker_batch_size','10000000')}}))", - target = "{{model.schema}}.{{model.alias}}" - ), - if_data_call_wait()], - tags = ['streamline_decoded_logs_history'] -) }} - -{% set start = this.identifier.split("_") [-2] %} -{% set stop = this.identifier.split("_") [-1] %} -{{ fsc_utils.decode_logs_history( - start, - stop -) }} diff --git a/models/streamline/silver/decoder/history/streamline__decode_logs_history_001800019_001900019.sql b/models/streamline/silver/decoder/history/streamline__decode_logs_history_001800019_001900019.sql deleted file mode 100644 index ab65ae5..0000000 --- a/models/streamline/silver/decoder/history/streamline__decode_logs_history_001800019_001900019.sql +++ /dev/null @@ -1,16 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = [if_data_call_function( - func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}', 'external_table', 'DECODED_LOGS', 'sql_limit', {{var('sql_limit','7500000')}}, 'producer_batch_size', {{var('producer_batch_size','20000000')}}, 'worker_batch_size', {{var('worker_batch_size','10000000')}}))", - target = "{{model.schema}}.{{model.alias}}" - ), - if_data_call_wait()], - tags = ['streamline_decoded_logs_history'] -) }} - -{% set start = this.identifier.split("_") [-2] %} -{% set stop = this.identifier.split("_") [-1] %} -{{ fsc_utils.decode_logs_history( - start, - stop -) }} diff --git a/models/streamline/silver/decoder/history/streamline__decode_logs_history_001900020_002000020.sql b/models/streamline/silver/decoder/history/streamline__decode_logs_history_001900020_002000020.sql deleted file mode 100644 index ab65ae5..0000000 --- a/models/streamline/silver/decoder/history/streamline__decode_logs_history_001900020_002000020.sql +++ /dev/null @@ -1,16 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = [if_data_call_function( - func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}', 'external_table', 'DECODED_LOGS', 'sql_limit', {{var('sql_limit','7500000')}}, 'producer_batch_size', {{var('producer_batch_size','20000000')}}, 'worker_batch_size', {{var('worker_batch_size','10000000')}}))", - target = "{{model.schema}}.{{model.alias}}" - ), - if_data_call_wait()], - tags = ['streamline_decoded_logs_history'] -) }} - -{% set start = this.identifier.split("_") [-2] %} -{% set stop = this.identifier.split("_") [-1] %} -{{ fsc_utils.decode_logs_history( - start, - stop -) }} diff --git a/models/streamline/silver/decoder/history/streamline__decode_logs_history_002000021_002100021.sql b/models/streamline/silver/decoder/history/streamline__decode_logs_history_002000021_002100021.sql deleted file mode 100644 index ab65ae5..0000000 --- a/models/streamline/silver/decoder/history/streamline__decode_logs_history_002000021_002100021.sql +++ /dev/null @@ -1,16 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = [if_data_call_function( - func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}', 'external_table', 'DECODED_LOGS', 'sql_limit', {{var('sql_limit','7500000')}}, 'producer_batch_size', {{var('producer_batch_size','20000000')}}, 'worker_batch_size', {{var('worker_batch_size','10000000')}}))", - target = "{{model.schema}}.{{model.alias}}" - ), - if_data_call_wait()], - tags = ['streamline_decoded_logs_history'] -) }} - -{% set start = this.identifier.split("_") [-2] %} -{% set stop = this.identifier.split("_") [-1] %} -{{ fsc_utils.decode_logs_history( - start, - stop -) }} diff --git a/models/streamline/silver/decoder/history/streamline__decode_logs_history_002100022_002200022.sql b/models/streamline/silver/decoder/history/streamline__decode_logs_history_002100022_002200022.sql deleted file mode 100644 index ab65ae5..0000000 --- a/models/streamline/silver/decoder/history/streamline__decode_logs_history_002100022_002200022.sql +++ /dev/null @@ -1,16 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = [if_data_call_function( - func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}', 'external_table', 'DECODED_LOGS', 'sql_limit', {{var('sql_limit','7500000')}}, 'producer_batch_size', {{var('producer_batch_size','20000000')}}, 'worker_batch_size', {{var('worker_batch_size','10000000')}}))", - target = "{{model.schema}}.{{model.alias}}" - ), - if_data_call_wait()], - tags = ['streamline_decoded_logs_history'] -) }} - -{% set start = this.identifier.split("_") [-2] %} -{% set stop = this.identifier.split("_") [-1] %} -{{ fsc_utils.decode_logs_history( - start, - stop -) }} diff --git a/models/streamline/silver/decoder/history/streamline__decode_logs_history_002200023_002300023.sql b/models/streamline/silver/decoder/history/streamline__decode_logs_history_002200023_002300023.sql deleted file mode 100644 index ab65ae5..0000000 --- a/models/streamline/silver/decoder/history/streamline__decode_logs_history_002200023_002300023.sql +++ /dev/null @@ -1,16 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = [if_data_call_function( - func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}', 'external_table', 'DECODED_LOGS', 'sql_limit', {{var('sql_limit','7500000')}}, 'producer_batch_size', {{var('producer_batch_size','20000000')}}, 'worker_batch_size', {{var('worker_batch_size','10000000')}}))", - target = "{{model.schema}}.{{model.alias}}" - ), - if_data_call_wait()], - tags = ['streamline_decoded_logs_history'] -) }} - -{% set start = this.identifier.split("_") [-2] %} -{% set stop = this.identifier.split("_") [-1] %} -{{ fsc_utils.decode_logs_history( - start, - stop -) }} diff --git a/models/streamline/silver/decoder/history/streamline__decode_logs_history_002300024_002400024.sql b/models/streamline/silver/decoder/history/streamline__decode_logs_history_002300024_002400024.sql deleted file mode 100644 index ab65ae5..0000000 --- a/models/streamline/silver/decoder/history/streamline__decode_logs_history_002300024_002400024.sql +++ /dev/null @@ -1,16 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = [if_data_call_function( - func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}', 'external_table', 'DECODED_LOGS', 'sql_limit', {{var('sql_limit','7500000')}}, 'producer_batch_size', {{var('producer_batch_size','20000000')}}, 'worker_batch_size', {{var('worker_batch_size','10000000')}}))", - target = "{{model.schema}}.{{model.alias}}" - ), - if_data_call_wait()], - tags = ['streamline_decoded_logs_history'] -) }} - -{% set start = this.identifier.split("_") [-2] %} -{% set stop = this.identifier.split("_") [-1] %} -{{ fsc_utils.decode_logs_history( - start, - stop -) }} diff --git a/models/streamline/silver/decoder/history/streamline__decode_logs_history_002400025_002500000.sql b/models/streamline/silver/decoder/history/streamline__decode_logs_history_002400025_002500000.sql deleted file mode 100644 index ab65ae5..0000000 --- a/models/streamline/silver/decoder/history/streamline__decode_logs_history_002400025_002500000.sql +++ /dev/null @@ -1,16 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = [if_data_call_function( - func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}', 'external_table', 'DECODED_LOGS', 'sql_limit', {{var('sql_limit','7500000')}}, 'producer_batch_size', {{var('producer_batch_size','20000000')}}, 'worker_batch_size', {{var('worker_batch_size','10000000')}}))", - target = "{{model.schema}}.{{model.alias}}" - ), - if_data_call_wait()], - tags = ['streamline_decoded_logs_history'] -) }} - -{% set start = this.identifier.split("_") [-2] %} -{% set stop = this.identifier.split("_") [-1] %} -{{ fsc_utils.decode_logs_history( - start, - stop -) }} diff --git a/models/streamline/silver/decoder/history/streamline__decode_logs_history_002500001_005000000.sql b/models/streamline/silver/decoder/history/streamline__decode_logs_history_002500001_005000000.sql deleted file mode 100644 index ab65ae5..0000000 --- a/models/streamline/silver/decoder/history/streamline__decode_logs_history_002500001_005000000.sql +++ /dev/null @@ -1,16 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = [if_data_call_function( - func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}', 'external_table', 'DECODED_LOGS', 'sql_limit', {{var('sql_limit','7500000')}}, 'producer_batch_size', {{var('producer_batch_size','20000000')}}, 'worker_batch_size', {{var('worker_batch_size','10000000')}}))", - target = "{{model.schema}}.{{model.alias}}" - ), - if_data_call_wait()], - tags = ['streamline_decoded_logs_history'] -) }} - -{% set start = this.identifier.split("_") [-2] %} -{% set stop = this.identifier.split("_") [-1] %} -{{ fsc_utils.decode_logs_history( - start, - stop -) }} diff --git a/models/streamline/silver/decoder/history/streamline__decode_logs_history_005000001_007500000.sql b/models/streamline/silver/decoder/history/streamline__decode_logs_history_005000001_007500000.sql deleted file mode 100644 index ab65ae5..0000000 --- a/models/streamline/silver/decoder/history/streamline__decode_logs_history_005000001_007500000.sql +++ /dev/null @@ -1,16 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = [if_data_call_function( - func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}', 'external_table', 'DECODED_LOGS', 'sql_limit', {{var('sql_limit','7500000')}}, 'producer_batch_size', {{var('producer_batch_size','20000000')}}, 'worker_batch_size', {{var('worker_batch_size','10000000')}}))", - target = "{{model.schema}}.{{model.alias}}" - ), - if_data_call_wait()], - tags = ['streamline_decoded_logs_history'] -) }} - -{% set start = this.identifier.split("_") [-2] %} -{% set stop = this.identifier.split("_") [-1] %} -{{ fsc_utils.decode_logs_history( - start, - stop -) }} diff --git a/models/streamline/silver/decoder/history/streamline__decode_logs_history_010000001_012500000.sql b/models/streamline/silver/decoder/history/streamline__decode_logs_history_010000001_012500000.sql deleted file mode 100644 index ab65ae5..0000000 --- a/models/streamline/silver/decoder/history/streamline__decode_logs_history_010000001_012500000.sql +++ /dev/null @@ -1,16 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = [if_data_call_function( - func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}', 'external_table', 'DECODED_LOGS', 'sql_limit', {{var('sql_limit','7500000')}}, 'producer_batch_size', {{var('producer_batch_size','20000000')}}, 'worker_batch_size', {{var('worker_batch_size','10000000')}}))", - target = "{{model.schema}}.{{model.alias}}" - ), - if_data_call_wait()], - tags = ['streamline_decoded_logs_history'] -) }} - -{% set start = this.identifier.split("_") [-2] %} -{% set stop = this.identifier.split("_") [-1] %} -{{ fsc_utils.decode_logs_history( - start, - stop -) }} diff --git a/models/streamline/silver/decoder/history/streamline__decode_logs_history_012500001_015000000.sql b/models/streamline/silver/decoder/history/streamline__decode_logs_history_012500001_015000000.sql deleted file mode 100644 index ab65ae5..0000000 --- a/models/streamline/silver/decoder/history/streamline__decode_logs_history_012500001_015000000.sql +++ /dev/null @@ -1,16 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = [if_data_call_function( - func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}', 'external_table', 'DECODED_LOGS', 'sql_limit', {{var('sql_limit','7500000')}}, 'producer_batch_size', {{var('producer_batch_size','20000000')}}, 'worker_batch_size', {{var('worker_batch_size','10000000')}}))", - target = "{{model.schema}}.{{model.alias}}" - ), - if_data_call_wait()], - tags = ['streamline_decoded_logs_history'] -) }} - -{% set start = this.identifier.split("_") [-2] %} -{% set stop = this.identifier.split("_") [-1] %} -{{ fsc_utils.decode_logs_history( - start, - stop -) }} diff --git a/models/streamline/silver/decoder/history/streamline__decode_logs_history_015000001_017500000.sql b/models/streamline/silver/decoder/history/streamline__decode_logs_history_015000001_017500000.sql deleted file mode 100644 index ab65ae5..0000000 --- a/models/streamline/silver/decoder/history/streamline__decode_logs_history_015000001_017500000.sql +++ /dev/null @@ -1,16 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = [if_data_call_function( - func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}', 'external_table', 'DECODED_LOGS', 'sql_limit', {{var('sql_limit','7500000')}}, 'producer_batch_size', {{var('producer_batch_size','20000000')}}, 'worker_batch_size', {{var('worker_batch_size','10000000')}}))", - target = "{{model.schema}}.{{model.alias}}" - ), - if_data_call_wait()], - tags = ['streamline_decoded_logs_history'] -) }} - -{% set start = this.identifier.split("_") [-2] %} -{% set stop = this.identifier.split("_") [-1] %} -{{ fsc_utils.decode_logs_history( - start, - stop -) }} diff --git a/models/streamline/silver/decoder/history/streamline__decode_logs_history_017500001_020000000.sql b/models/streamline/silver/decoder/history/streamline__decode_logs_history_017500001_020000000.sql deleted file mode 100644 index ab65ae5..0000000 --- a/models/streamline/silver/decoder/history/streamline__decode_logs_history_017500001_020000000.sql +++ /dev/null @@ -1,16 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = [if_data_call_function( - func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}', 'external_table', 'DECODED_LOGS', 'sql_limit', {{var('sql_limit','7500000')}}, 'producer_batch_size', {{var('producer_batch_size','20000000')}}, 'worker_batch_size', {{var('worker_batch_size','10000000')}}))", - target = "{{model.schema}}.{{model.alias}}" - ), - if_data_call_wait()], - tags = ['streamline_decoded_logs_history'] -) }} - -{% set start = this.identifier.split("_") [-2] %} -{% set stop = this.identifier.split("_") [-1] %} -{{ fsc_utils.decode_logs_history( - start, - stop -) }} diff --git a/models/streamline/silver/decoder/history/streamline__decode_logs_history_020000001_022500000.sql b/models/streamline/silver/decoder/history/streamline__decode_logs_history_020000001_022500000.sql deleted file mode 100644 index ab65ae5..0000000 --- a/models/streamline/silver/decoder/history/streamline__decode_logs_history_020000001_022500000.sql +++ /dev/null @@ -1,16 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = [if_data_call_function( - func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}', 'external_table', 'DECODED_LOGS', 'sql_limit', {{var('sql_limit','7500000')}}, 'producer_batch_size', {{var('producer_batch_size','20000000')}}, 'worker_batch_size', {{var('worker_batch_size','10000000')}}))", - target = "{{model.schema}}.{{model.alias}}" - ), - if_data_call_wait()], - tags = ['streamline_decoded_logs_history'] -) }} - -{% set start = this.identifier.split("_") [-2] %} -{% set stop = this.identifier.split("_") [-1] %} -{{ fsc_utils.decode_logs_history( - start, - stop -) }} diff --git a/models/streamline/silver/decoder/history/streamline__decode_logs_history_022500001_025000000.sql b/models/streamline/silver/decoder/history/streamline__decode_logs_history_022500001_025000000.sql deleted file mode 100644 index ab65ae5..0000000 --- a/models/streamline/silver/decoder/history/streamline__decode_logs_history_022500001_025000000.sql +++ /dev/null @@ -1,16 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = [if_data_call_function( - func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}', 'external_table', 'DECODED_LOGS', 'sql_limit', {{var('sql_limit','7500000')}}, 'producer_batch_size', {{var('producer_batch_size','20000000')}}, 'worker_batch_size', {{var('worker_batch_size','10000000')}}))", - target = "{{model.schema}}.{{model.alias}}" - ), - if_data_call_wait()], - tags = ['streamline_decoded_logs_history'] -) }} - -{% set start = this.identifier.split("_") [-2] %} -{% set stop = this.identifier.split("_") [-1] %} -{{ fsc_utils.decode_logs_history( - start, - stop -) }} diff --git a/models/streamline/silver/decoder/history/streamline__decode_logs_history_025000001_027500000.sql b/models/streamline/silver/decoder/history/streamline__decode_logs_history_025000001_027500000.sql deleted file mode 100644 index ab65ae5..0000000 --- a/models/streamline/silver/decoder/history/streamline__decode_logs_history_025000001_027500000.sql +++ /dev/null @@ -1,16 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = [if_data_call_function( - func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}', 'external_table', 'DECODED_LOGS', 'sql_limit', {{var('sql_limit','7500000')}}, 'producer_batch_size', {{var('producer_batch_size','20000000')}}, 'worker_batch_size', {{var('worker_batch_size','10000000')}}))", - target = "{{model.schema}}.{{model.alias}}" - ), - if_data_call_wait()], - tags = ['streamline_decoded_logs_history'] -) }} - -{% set start = this.identifier.split("_") [-2] %} -{% set stop = this.identifier.split("_") [-1] %} -{{ fsc_utils.decode_logs_history( - start, - stop -) }} diff --git a/models/streamline/silver/decoder/history/streamline__decode_logs_history_027500001_030000000.sql b/models/streamline/silver/decoder/history/streamline__decode_logs_history_027500001_030000000.sql deleted file mode 100644 index ab65ae5..0000000 --- a/models/streamline/silver/decoder/history/streamline__decode_logs_history_027500001_030000000.sql +++ /dev/null @@ -1,16 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = [if_data_call_function( - func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}', 'external_table', 'DECODED_LOGS', 'sql_limit', {{var('sql_limit','7500000')}}, 'producer_batch_size', {{var('producer_batch_size','20000000')}}, 'worker_batch_size', {{var('worker_batch_size','10000000')}}))", - target = "{{model.schema}}.{{model.alias}}" - ), - if_data_call_wait()], - tags = ['streamline_decoded_logs_history'] -) }} - -{% set start = this.identifier.split("_") [-2] %} -{% set stop = this.identifier.split("_") [-1] %} -{{ fsc_utils.decode_logs_history( - start, - stop -) }} diff --git a/models/streamline/silver/decoder/history/streamline__decode_logs_history_030000001_032500000.sql b/models/streamline/silver/decoder/history/streamline__decode_logs_history_030000001_032500000.sql deleted file mode 100644 index ab65ae5..0000000 --- a/models/streamline/silver/decoder/history/streamline__decode_logs_history_030000001_032500000.sql +++ /dev/null @@ -1,16 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = [if_data_call_function( - func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}', 'external_table', 'DECODED_LOGS', 'sql_limit', {{var('sql_limit','7500000')}}, 'producer_batch_size', {{var('producer_batch_size','20000000')}}, 'worker_batch_size', {{var('worker_batch_size','10000000')}}))", - target = "{{model.schema}}.{{model.alias}}" - ), - if_data_call_wait()], - tags = ['streamline_decoded_logs_history'] -) }} - -{% set start = this.identifier.split("_") [-2] %} -{% set stop = this.identifier.split("_") [-1] %} -{{ fsc_utils.decode_logs_history( - start, - stop -) }} diff --git a/models/streamline/silver/decoder/history/streamline__decode_logs_history_032500001_035000000.sql b/models/streamline/silver/decoder/history/streamline__decode_logs_history_032500001_035000000.sql deleted file mode 100644 index ab65ae5..0000000 --- a/models/streamline/silver/decoder/history/streamline__decode_logs_history_032500001_035000000.sql +++ /dev/null @@ -1,16 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = [if_data_call_function( - func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}', 'external_table', 'DECODED_LOGS', 'sql_limit', {{var('sql_limit','7500000')}}, 'producer_batch_size', {{var('producer_batch_size','20000000')}}, 'worker_batch_size', {{var('worker_batch_size','10000000')}}))", - target = "{{model.schema}}.{{model.alias}}" - ), - if_data_call_wait()], - tags = ['streamline_decoded_logs_history'] -) }} - -{% set start = this.identifier.split("_") [-2] %} -{% set stop = this.identifier.split("_") [-1] %} -{{ fsc_utils.decode_logs_history( - start, - stop -) }} diff --git a/models/streamline/silver/decoder/history/streamline__decode_logs_history_035000001_037500000.sql b/models/streamline/silver/decoder/history/streamline__decode_logs_history_035000001_037500000.sql deleted file mode 100644 index ab65ae5..0000000 --- a/models/streamline/silver/decoder/history/streamline__decode_logs_history_035000001_037500000.sql +++ /dev/null @@ -1,16 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = [if_data_call_function( - func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}', 'external_table', 'DECODED_LOGS', 'sql_limit', {{var('sql_limit','7500000')}}, 'producer_batch_size', {{var('producer_batch_size','20000000')}}, 'worker_batch_size', {{var('worker_batch_size','10000000')}}))", - target = "{{model.schema}}.{{model.alias}}" - ), - if_data_call_wait()], - tags = ['streamline_decoded_logs_history'] -) }} - -{% set start = this.identifier.split("_") [-2] %} -{% set stop = this.identifier.split("_") [-1] %} -{{ fsc_utils.decode_logs_history( - start, - stop -) }} diff --git a/models/streamline/silver/decoder/history/streamline__decode_logs_history_037500001_040000000.sql b/models/streamline/silver/decoder/history/streamline__decode_logs_history_037500001_040000000.sql deleted file mode 100644 index ab65ae5..0000000 --- a/models/streamline/silver/decoder/history/streamline__decode_logs_history_037500001_040000000.sql +++ /dev/null @@ -1,16 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = [if_data_call_function( - func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}', 'external_table', 'DECODED_LOGS', 'sql_limit', {{var('sql_limit','7500000')}}, 'producer_batch_size', {{var('producer_batch_size','20000000')}}, 'worker_batch_size', {{var('worker_batch_size','10000000')}}))", - target = "{{model.schema}}.{{model.alias}}" - ), - if_data_call_wait()], - tags = ['streamline_decoded_logs_history'] -) }} - -{% set start = this.identifier.split("_") [-2] %} -{% set stop = this.identifier.split("_") [-1] %} -{{ fsc_utils.decode_logs_history( - start, - stop -) }} diff --git a/models/streamline/silver/decoder/history/streamline__decode_logs_history_040000001_042500000.sql b/models/streamline/silver/decoder/history/streamline__decode_logs_history_040000001_042500000.sql deleted file mode 100644 index ab65ae5..0000000 --- a/models/streamline/silver/decoder/history/streamline__decode_logs_history_040000001_042500000.sql +++ /dev/null @@ -1,16 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = [if_data_call_function( - func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}', 'external_table', 'DECODED_LOGS', 'sql_limit', {{var('sql_limit','7500000')}}, 'producer_batch_size', {{var('producer_batch_size','20000000')}}, 'worker_batch_size', {{var('worker_batch_size','10000000')}}))", - target = "{{model.schema}}.{{model.alias}}" - ), - if_data_call_wait()], - tags = ['streamline_decoded_logs_history'] -) }} - -{% set start = this.identifier.split("_") [-2] %} -{% set stop = this.identifier.split("_") [-1] %} -{{ fsc_utils.decode_logs_history( - start, - stop -) }} diff --git a/models/streamline/silver/decoder/history/streamline__decode_logs_history_042500001_045000000.sql b/models/streamline/silver/decoder/history/streamline__decode_logs_history_042500001_045000000.sql deleted file mode 100644 index ab65ae5..0000000 --- a/models/streamline/silver/decoder/history/streamline__decode_logs_history_042500001_045000000.sql +++ /dev/null @@ -1,16 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = [if_data_call_function( - func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}', 'external_table', 'DECODED_LOGS', 'sql_limit', {{var('sql_limit','7500000')}}, 'producer_batch_size', {{var('producer_batch_size','20000000')}}, 'worker_batch_size', {{var('worker_batch_size','10000000')}}))", - target = "{{model.schema}}.{{model.alias}}" - ), - if_data_call_wait()], - tags = ['streamline_decoded_logs_history'] -) }} - -{% set start = this.identifier.split("_") [-2] %} -{% set stop = this.identifier.split("_") [-1] %} -{{ fsc_utils.decode_logs_history( - start, - stop -) }} diff --git a/models/streamline/silver/decoder/history/streamline__decode_logs_history_045000001_047500000.sql b/models/streamline/silver/decoder/history/streamline__decode_logs_history_045000001_047500000.sql deleted file mode 100644 index ab65ae5..0000000 --- a/models/streamline/silver/decoder/history/streamline__decode_logs_history_045000001_047500000.sql +++ /dev/null @@ -1,16 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = [if_data_call_function( - func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}', 'external_table', 'DECODED_LOGS', 'sql_limit', {{var('sql_limit','7500000')}}, 'producer_batch_size', {{var('producer_batch_size','20000000')}}, 'worker_batch_size', {{var('worker_batch_size','10000000')}}))", - target = "{{model.schema}}.{{model.alias}}" - ), - if_data_call_wait()], - tags = ['streamline_decoded_logs_history'] -) }} - -{% set start = this.identifier.split("_") [-2] %} -{% set stop = this.identifier.split("_") [-1] %} -{{ fsc_utils.decode_logs_history( - start, - stop -) }} diff --git a/models/streamline/silver/decoder/history/streamline__decode_logs_history_047500001_050000000.sql b/models/streamline/silver/decoder/history/streamline__decode_logs_history_047500001_050000000.sql deleted file mode 100644 index ab65ae5..0000000 --- a/models/streamline/silver/decoder/history/streamline__decode_logs_history_047500001_050000000.sql +++ /dev/null @@ -1,16 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = [if_data_call_function( - func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}', 'external_table', 'DECODED_LOGS', 'sql_limit', {{var('sql_limit','7500000')}}, 'producer_batch_size', {{var('producer_batch_size','20000000')}}, 'worker_batch_size', {{var('worker_batch_size','10000000')}}))", - target = "{{model.schema}}.{{model.alias}}" - ), - if_data_call_wait()], - tags = ['streamline_decoded_logs_history'] -) }} - -{% set start = this.identifier.split("_") [-2] %} -{% set stop = this.identifier.split("_") [-1] %} -{{ fsc_utils.decode_logs_history( - start, - stop -) }} diff --git a/models/streamline/silver/decoder/realtime/streamline__decode_logs_realtime.sql b/models/streamline/silver/decoder/realtime/streamline__decode_logs_realtime.sql index 33f2f29..2d5ad4f 100644 --- a/models/streamline/silver/decoder/realtime/streamline__decode_logs_realtime.sql +++ b/models/streamline/silver/decoder/realtime/streamline__decode_logs_realtime.sql @@ -8,12 +8,50 @@ tags = ['streamline_decoded_logs_realtime'] ) }} -WITH look_back AS ( +WITH target_blocks AS ( - SELECT + SELECT block_number + FROM + {{ ref('core__fact_blocks') }} + WHERE + block_number >= ( + SELECT + block_number + FROM + {{ ref("_block_lookback") }} + ) +), +existing_logs_to_exclude AS ( + SELECT + _log_id FROM - {{ ref("_24_hour_lookback") }} + {{ ref('streamline__complete_decode_logs') }} + l + INNER JOIN target_blocks b USING (block_number) + WHERE + l._inserted_timestamp :: DATE >= DATEADD('day', -5, SYSDATE()) +), +candidate_logs AS ( + SELECT + l.block_number, + l.tx_hash, + l.event_index, + l.contract_address, + l.topics, + l.data, + CONCAT( + l.tx_hash :: STRING, + '-', + l.event_index :: STRING + ) AS _log_id + FROM + target_blocks b + INNER JOIN {{ ref('core__fact_event_logs') }} + l USING (block_number) + WHERE + l.tx_status = 'SUCCESS' + AND l.inserted_timestamp :: DATE >= DATEADD('day', -5, SYSDATE()) ) SELECT l.block_number, @@ -28,35 +66,19 @@ SELECT l.contract_address ) AS DATA FROM - {{ ref("silver__logs") }} - l - INNER JOIN {{ ref("silver__complete_event_abis") }} A + candidate_logs l + INNER JOIN {{ ref('silver__complete_event_abis') }} A ON A.parent_contract_address = l.contract_address AND A.event_signature = l.topics [0] :: STRING AND l.block_number BETWEEN A.start_block AND A.end_block WHERE - ( - l.block_number >= ( - SELECT - block_number - FROM - look_back - ) - ) - AND l.block_number IS NOT NULL - AND l.block_timestamp >= DATEADD('day', -2, CURRENT_DATE()) - AND _log_id NOT IN ( + NOT EXISTS ( SELECT - _log_id + 1 FROM - {{ ref("streamline__complete_decode_logs") }} + existing_logs_to_exclude e WHERE - block_number >= ( - SELECT - block_number - FROM - look_back - ) - AND _inserted_timestamp >= DATEADD('day', -2, CURRENT_DATE()) + e._log_id = l._log_id ) +limit 7500000 \ No newline at end of file