An 5441/upgrade decoder history process (#225)

* updates

* comment

* 5 day

* monday

* ignore

* param

* quotes

* config
This commit is contained in:
Austin 2024-11-19 16:28:31 -05:00 committed by GitHub
parent ca26eb4e79
commit 3a1719aa23
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
70 changed files with 271 additions and 772 deletions

View File

@ -4,8 +4,8 @@ run-name: dbt_run_dev_refresh
on:
workflow_dispatch:
schedule:
# Runs "at 9:00 UTC" (see https://crontab.guru)
- cron: '0 9 * * *'
# Runs "at 2:10 UTC on Monday" (see https://crontab.guru)
- cron: '10 2 * * 1'
env:
DBT_PROFILES_DIR: ./

View File

@ -1,12 +1,11 @@
name: dbt_run_streamline_decoder_history
run-name: dbt_run_streamline_decoder_history
name: dbt_run_scheduled_decoded_logs_history_user_abis
run-name: dbt_run_scheduled_decoded_logs_history_user_abis
on:
workflow_dispatch:
schedule:
# Runs "at 1:00 UTC AM" (see https://crontab.guru)
- cron: '0 1 * * *'
branches:
- "main"
env:
DBT_PROFILES_DIR: ./
@ -22,12 +21,10 @@ env:
concurrency:
group: ${{ github.workflow }}
jobs:
run_dbt_jobs:
runs-on: ubuntu-latest
environment:
environment:
name: workflow_prod
steps:
@ -42,6 +39,7 @@ jobs:
run: |
pip install -r requirements.txt
dbt deps
- name: Run DBT Jobs
- name: Kick off decoded logs history, if there are new ABIs from users
run: |
dbt run --threads 8 --vars '{"STREAMLINE_INVOKE_STREAMS":True,"WAIT":120}' -m "base_models,tag:streamline_decoded_logs_complete" "base_models,tag:streamline_decoded_logs_history"
dbt run-operation run_decoded_logs_history

View File

@ -0,0 +1,49 @@
name: dbt_run_streamline_decoded_logs_history
run-name: dbt_run_streamline_decoded_logs_history
on:
workflow_dispatch:
branches:
- "main"
env:
DBT_PROFILES_DIR: ./
ACCOUNT: "${{ vars.ACCOUNT }}"
ROLE: "${{ vars.ROLE }}"
USER: "${{ vars.USER }}"
PASSWORD: "${{ secrets.PASSWORD }}"
REGION: "${{ vars.REGION }}"
DATABASE: "${{ vars.DATABASE }}"
WAREHOUSE: "${{ vars.WAREHOUSE }}"
SCHEMA: "${{ vars.SCHEMA }}"
concurrency:
group: ${{ github.workflow }}
jobs:
run_dbt_jobs:
runs-on: ubuntu-latest
environment:
name: workflow_prod
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: "3.10"
cache: "pip"
- name: install dependencies
run: |
pip install -r requirements.txt
dbt deps
- name: Update complete table
run: |
dbt run -m "base_models,tag:streamline_decoded_logs_complete"
- name: Decode historical logs
run: |
dbt run-operation decoded_logs_history --vars '{"STREAMLINE_INVOKE_STREAMS":True}'

4
.gitignore vendored
View File

@ -15,4 +15,6 @@ logs/
**/.DS_Store
.vscode/
.env
dbt-env/
dbt-env/
package-lock.yml

View File

@ -3,4 +3,6 @@ dbt_run_scheduled_non_realtime,"22,52 * * * *"
dbt_run_streamline_chainhead,"15,45 * * * *"
dbt_run_streamline_decoder,"0,30 * * * *"
dbt_run_scheduled_curated,"40 * * * *"
dbt_test_tasks,"15 * * * *"
dbt_test_tasks,"15 * * * *"
dbt_run_streamline_decoded_logs_history,"45 7 * * 6"
dbt_run_scheduled_decoded_logs_history_user_abis,"39 23 * * *"
1 workflow_name workflow_schedule
3 dbt_run_streamline_chainhead 15,45 * * * *
4 dbt_run_streamline_decoder 0,30 * * * *
5 dbt_run_scheduled_curated 40 * * * *
6 dbt_test_tasks 15 * * * *
7 dbt_run_streamline_decoded_logs_history 45 7 * * 6
8 dbt_run_scheduled_decoded_logs_history_user_abis 39 23 * * *

View File

@ -0,0 +1,126 @@
{% macro decoded_logs_history(backfill_mode=false) %}
{%- set params = {
"sql_limit": var("DECODED_LOGS_HISTORY_SQL_LIMIT", 7500000),
"producer_batch_size": var("DECODED_LOGS_HISTORY_PRODUCER_BATCH_SIZE", 400000),
"worker_batch_size": var("DECODED_LOGS_HISTORY_WORKER_BATCH_SIZE", 100000),
"producer_limit_size": var("DECODED_LOGS_HISTORY_WORKER_BATCH_SIZE", 100000)
} -%}
{% set wait_time = var("DECODED_LOGS_HISTORY_WAIT_TIME", 60) %}
{% set find_months_query %}
SELECT
DISTINCT date_trunc('month', block_timestamp)::date as month
FROM {{ ref('core__fact_blocks') }}
ORDER BY month ASC
{% endset %}
{% set results = run_query(find_months_query) %}
{% if execute %}
{% set months = results.columns[0].values() %}
{% for month in months %}
{% set view_name = 'decoded_logs_history_' ~ month.strftime('%Y_%m') %}
{% set create_view_query %}
create or replace view streamline.{{view_name}} as (
WITH target_blocks AS (
SELECT
block_number
FROM {{ ref('core__fact_blocks') }}
WHERE date_trunc('month', block_timestamp) = '{{month}}'::timestamp
),
new_abis AS (
SELECT
abi,
parent_contract_address,
event_signature,
start_block,
end_block
FROM {{ ref('silver__complete_event_abis') }}
{% if not backfill_mode %}
WHERE inserted_timestamp > dateadd('day', -30, sysdate())
{% endif %}
),
existing_logs_to_exclude AS (
SELECT _log_id
FROM {{ ref('streamline__complete_decode_logs') }} l
INNER JOIN target_blocks b using (block_number)
),
candidate_logs AS (
SELECT
l.block_number,
l.tx_hash,
l.event_index,
l.contract_address,
l.topics,
l.data,
concat(l.tx_hash::string, '-', l.event_index::string) as _log_id
FROM target_blocks b
INNER JOIN {{ ref('core__fact_event_logs') }} l using (block_number)
WHERE l.tx_status = 'SUCCESS' and date_trunc('month', l.block_timestamp) = '{{month}}'::timestamp
)
SELECT
l.block_number,
l._log_id,
A.abi,
OBJECT_CONSTRUCT(
'topics', l.topics,
'data', l.data,
'address', l.contract_address
) AS data
FROM candidate_logs l
INNER JOIN new_abis A
ON A.parent_contract_address = l.contract_address
AND A.event_signature = l.topics[0]::STRING
AND l.block_number BETWEEN A.start_block AND A.end_block
WHERE NOT EXISTS (
SELECT 1
FROM existing_logs_to_exclude e
WHERE e._log_id = l._log_id
)
LIMIT {{ params.sql_limit }}
)
{% endset %}
{# Create the view #}
{% do run_query(create_view_query) %}
{{ log("Created view for month " ~ month.strftime('%Y-%m'), info=True) }}
{% if var("STREAMLINE_INVOKE_STREAMS", false) %}
{# Check if rows exist first #}
{% set check_rows_query %}
SELECT EXISTS(SELECT 1 FROM streamline.{{view_name}} LIMIT 1)
{% endset %}
{% set results = run_query(check_rows_query) %}
{% set has_rows = results.columns[0].values()[0] %}
{% if has_rows %}
{# Invoke streamline since rows exist to decode #}
{% set decode_query %}
SELECT streamline.udf_bulk_decode_logs(
object_construct(
'sql_source', '{{view_name}}',
'producer_batch_size', {{ params.producer_batch_size }},
'producer_limit_size', {{ params.producer_limit_size }})
);
{% endset %}
{% do run_query(decode_query) %}
{{ log("Triggered decoding for month " ~ month.strftime('%Y-%m'), info=True) }}
{# Call wait since we actually did some decoding #}
{% do run_query("call system$wait(" ~ wait_time ~ ")") %}
{{ log("Completed wait after decoding for month " ~ month.strftime('%Y-%m'), info=True) }}
{% else %}
{{ log("No rows to decode for month " ~ month.strftime('%Y-%m'), info=True) }}
{% endif %}
{% endif %}
{% endfor %}
{% endif %}
{% endmacro %}

View File

@ -0,0 +1,29 @@
{% macro run_decoded_logs_history() %}
{% set check_for_new_user_abis_query %}
select 1
from {{ ref('silver__user_verified_abis') }}
where _inserted_timestamp::date = sysdate()::date
and dayname(sysdate()) <> 'Sat'
{% endset %}
{% set results = run_query(check_for_new_user_abis_query) %}
{% if execute %}
{% set new_user_abis = results.columns[0].values()[0] %}
{% if new_user_abis %}
{% set invoke_workflow_query %}
SELECT
github_actions.workflow_dispatches(
'FlipsideCrypto',
'base-models',
'dbt_run_streamline_decoded_logs_history.yml',
NULL
)
{% endset %}
{% do run_query(invoke_workflow_query) %}
{% endif %}
{% endif %}
{% endmacro %}

View File

@ -7,6 +7,8 @@ models:
- dbt_expectations.expect_column_values_to_be_in_set:
value_set:
- TRUE
config:
severity: warn
- name: SUCCESSES
tests:
- dbt_expectations.expect_column_values_to_be_in_set:

View File

@ -1,12 +0,0 @@
{{ config (
materialized = "view",
post_hook = [if_data_call_function( func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}','producer_batch_size', 20000000,'producer_limit_size', {{var('row_limit',7500000)}}))", target = "{{model.schema}}.{{model.alias}}" ) ,if_data_call_wait()],
tags = ['streamline_decoded_logs_history']
) }}
{% set start = this.identifier.split("_") [-2] %}
{% set stop = this.identifier.split("_") [-1] %}
{{ fsc_utils.decode_logs_history(
start,
stop
) }}

View File

@ -1,12 +0,0 @@
{{ config (
materialized = "view",
post_hook = [if_data_call_function( func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}','producer_batch_size', 20000000,'producer_limit_size', {{var('row_limit',7500000)}}))", target = "{{model.schema}}.{{model.alias}}" ) ,if_data_call_wait()],
tags = ['streamline_decoded_logs_history']
) }}
{% set start = this.identifier.split("_") [-2] %}
{% set stop = this.identifier.split("_") [-1] %}
{{ fsc_utils.decode_logs_history(
start,
stop
) }}

View File

@ -1,12 +0,0 @@
{{ config (
materialized = "view",
post_hook = [if_data_call_function( func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}','producer_batch_size', 20000000,'producer_limit_size', {{var('row_limit',7500000)}}))", target = "{{model.schema}}.{{model.alias}}" ) ,if_data_call_wait()],
tags = ['streamline_decoded_logs_history']
) }}
{% set start = this.identifier.split("_") [-2] %}
{% set stop = this.identifier.split("_") [-1] %}
{{ fsc_utils.decode_logs_history(
start,
stop
) }}

View File

@ -1,12 +0,0 @@
{{ config (
materialized = "view",
post_hook = [if_data_call_function( func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}','producer_batch_size', 20000000,'producer_limit_size', {{var('row_limit',7500000)}}))", target = "{{model.schema}}.{{model.alias}}" ) ,if_data_call_wait()],
tags = ['streamline_decoded_logs_history']
) }}
{% set start = this.identifier.split("_") [-2] %}
{% set stop = this.identifier.split("_") [-1] %}
{{ fsc_utils.decode_logs_history(
start,
stop
) }}

View File

@ -1,12 +0,0 @@
{{ config (
materialized = "view",
post_hook = [if_data_call_function( func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}','producer_batch_size', 20000000,'producer_limit_size', {{var('row_limit',7500000)}}))", target = "{{model.schema}}.{{model.alias}}" ) ,if_data_call_wait()],
tags = ['streamline_decoded_logs_history']
) }}
{% set start = this.identifier.split("_") [-2] %}
{% set stop = this.identifier.split("_") [-1] %}
{{ fsc_utils.decode_logs_history(
start,
stop
) }}

View File

@ -1,12 +0,0 @@
{{ config (
materialized = "view",
post_hook = [if_data_call_function( func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}','producer_batch_size', 20000000,'producer_limit_size', {{var('row_limit',7500000)}}))", target = "{{model.schema}}.{{model.alias}}" ) ,if_data_call_wait()],
tags = ['streamline_decoded_logs_history']
) }}
{% set start = this.identifier.split("_") [-2] %}
{% set stop = this.identifier.split("_") [-1] %}
{{ fsc_utils.decode_logs_history(
start,
stop
) }}

View File

@ -1,12 +0,0 @@
{{ config (
materialized = "view",
post_hook = [if_data_call_function( func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}','producer_batch_size', 20000000,'producer_limit_size', {{var('row_limit',7500000)}}))", target = "{{model.schema}}.{{model.alias}}" ) ,if_data_call_wait()],
tags = ['streamline_decoded_logs_history']
) }}
{% set start = this.identifier.split("_") [-2] %}
{% set stop = this.identifier.split("_") [-1] %}
{{ fsc_utils.decode_logs_history(
start,
stop
) }}

View File

@ -1,12 +0,0 @@
{{ config (
materialized = "view",
post_hook = [if_data_call_function( func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}','producer_batch_size', 20000000,'producer_limit_size', {{var('row_limit',7500000)}}))", target = "{{model.schema}}.{{model.alias}}" ) ,if_data_call_wait()],
tags = ['streamline_decoded_logs_history']
) }}
{% set start = this.identifier.split("_") [-2] %}
{% set stop = this.identifier.split("_") [-1] %}
{{ fsc_utils.decode_logs_history(
start,
stop
) }}

View File

@ -1,12 +0,0 @@
{{ config (
materialized = "view",
post_hook = [if_data_call_function( func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}','producer_batch_size', 20000000,'producer_limit_size', {{var('row_limit',7500000)}}))", target = "{{model.schema}}.{{model.alias}}" ) ,if_data_call_wait()],
tags = ['streamline_decoded_logs_history']
) }}
{% set start = this.identifier.split("_") [-2] %}
{% set stop = this.identifier.split("_") [-1] %}
{{ fsc_utils.decode_logs_history(
start,
stop
) }}

View File

@ -1,12 +0,0 @@
{{ config (
materialized = "view",
post_hook = [if_data_call_function( func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}','producer_batch_size', 20000000,'producer_limit_size', {{var('row_limit',7500000)}}))", target = "{{model.schema}}.{{model.alias}}" ) ,if_data_call_wait()],
tags = ['streamline_decoded_logs_history']
) }}
{% set start = this.identifier.split("_") [-2] %}
{% set stop = this.identifier.split("_") [-1] %}
{{ fsc_utils.decode_logs_history(
start,
stop
) }}

View File

@ -1,12 +0,0 @@
{{ config (
materialized = "view",
post_hook = [if_data_call_function( func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}','producer_batch_size', 20000000,'producer_limit_size', {{var('row_limit',7500000)}}))", target = "{{model.schema}}.{{model.alias}}" ) ,if_data_call_wait()],
tags = ['streamline_decoded_logs_history']
) }}
{% set start = this.identifier.split("_") [-2] %}
{% set stop = this.identifier.split("_") [-1] %}
{{ fsc_utils.decode_logs_history(
start,
stop
) }}

View File

@ -1,12 +0,0 @@
{{ config (
materialized = "view",
post_hook = [if_data_call_function( func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}','producer_batch_size', 20000000,'producer_limit_size', {{var('row_limit',7500000)}}))", target = "{{model.schema}}.{{model.alias}}" ) ,if_data_call_wait()],
tags = ['streamline_decoded_logs_history']
) }}
{% set start = this.identifier.split("_") [-2] %}
{% set stop = this.identifier.split("_") [-1] %}
{{ fsc_utils.decode_logs_history(
start,
stop
) }}

View File

@ -1,12 +0,0 @@
{{ config (
materialized = "view",
post_hook = [if_data_call_function( func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}','producer_batch_size', 20000000,'producer_limit_size', {{var('row_limit',7500000)}}))", target = "{{model.schema}}.{{model.alias}}" ) ,if_data_call_wait()],
tags = ['streamline_decoded_logs_history']
) }}
{% set start = this.identifier.split("_") [-2] %}
{% set stop = this.identifier.split("_") [-1] %}
{{ fsc_utils.decode_logs_history(
start,
stop
) }}

View File

@ -1,12 +0,0 @@
{{ config (
materialized = "view",
post_hook = [if_data_call_function( func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}','producer_batch_size', 20000000,'producer_limit_size', {{var('row_limit',7500000)}}))", target = "{{model.schema}}.{{model.alias}}" ) ,if_data_call_wait()],
tags = ['streamline_decoded_logs_history']
) }}
{% set start = this.identifier.split("_") [-2] %}
{% set stop = this.identifier.split("_") [-1] %}
{{ fsc_utils.decode_logs_history(
start,
stop
) }}

View File

@ -1,12 +0,0 @@
{{ config (
materialized = "view",
post_hook = [if_data_call_function( func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}','producer_batch_size', 20000000,'producer_limit_size', {{var('row_limit',7500000)}}))", target = "{{model.schema}}.{{model.alias}}" ) ,if_data_call_wait()],
tags = ['streamline_decoded_logs_history']
) }}
{% set start = this.identifier.split("_") [-2] %}
{% set stop = this.identifier.split("_") [-1] %}
{{ fsc_utils.decode_logs_history(
start,
stop
) }}

View File

@ -1,12 +0,0 @@
{{ config (
materialized = "view",
post_hook = [if_data_call_function( func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}','producer_batch_size', 20000000,'producer_limit_size', {{var('row_limit',7500000)}}))", target = "{{model.schema}}.{{model.alias}}" ) ,if_data_call_wait()],
tags = ['streamline_decoded_logs_history']
) }}
{% set start = this.identifier.split("_") [-2] %}
{% set stop = this.identifier.split("_") [-1] %}
{{ fsc_utils.decode_logs_history(
start,
stop
) }}

View File

@ -1,12 +0,0 @@
{{ config (
materialized = "view",
post_hook = [if_data_call_function( func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}','producer_batch_size', 20000000,'producer_limit_size', {{var('row_limit',7500000)}}))", target = "{{model.schema}}.{{model.alias}}" ) ,if_data_call_wait()],
tags = ['streamline_decoded_logs_history']
) }}
{% set start = this.identifier.split("_") [-2] %}
{% set stop = this.identifier.split("_") [-1] %}
{{ fsc_utils.decode_logs_history(
start,
stop
) }}

View File

@ -1,12 +0,0 @@
{{ config (
materialized = "view",
post_hook = [if_data_call_function( func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}','producer_batch_size', 20000000,'producer_limit_size', {{var('row_limit',7500000)}}))", target = "{{model.schema}}.{{model.alias}}" ) ,if_data_call_wait()],
tags = ['streamline_decoded_logs_history']
) }}
{% set start = this.identifier.split("_") [-2] %}
{% set stop = this.identifier.split("_") [-1] %}
{{ fsc_utils.decode_logs_history(
start,
stop
) }}

View File

@ -1,12 +0,0 @@
{{ config (
materialized = "view",
post_hook = [if_data_call_function( func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}','producer_batch_size', 20000000,'producer_limit_size', {{var('row_limit',7500000)}}))", target = "{{model.schema}}.{{model.alias}}" ) ,if_data_call_wait()],
tags = ['streamline_decoded_logs_history']
) }}
{% set start = this.identifier.split("_") [-2] %}
{% set stop = this.identifier.split("_") [-1] %}
{{ fsc_utils.decode_logs_history(
start,
stop
) }}

View File

@ -1,12 +0,0 @@
{{ config (
materialized = "view",
post_hook = [if_data_call_function( func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}','producer_batch_size', 20000000,'producer_limit_size', {{var('row_limit',7500000)}}))", target = "{{model.schema}}.{{model.alias}}" ) ,if_data_call_wait()],
tags = ['streamline_decoded_logs_history']
) }}
{% set start = this.identifier.split("_") [-2] %}
{% set stop = this.identifier.split("_") [-1] %}
{{ fsc_utils.decode_logs_history(
start,
stop
) }}

View File

@ -1,12 +0,0 @@
{{ config (
materialized = "view",
post_hook = [if_data_call_function( func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}','producer_batch_size', 20000000,'producer_limit_size', {{var('row_limit',7500000)}}))", target = "{{model.schema}}.{{model.alias}}" ) ,if_data_call_wait()],
tags = ['streamline_decoded_logs_history']
) }}
{% set start = this.identifier.split("_") [-2] %}
{% set stop = this.identifier.split("_") [-1] %}
{{ fsc_utils.decode_logs_history(
start,
stop
) }}

View File

@ -1,12 +0,0 @@
{{ config (
materialized = "view",
post_hook = [if_data_call_function( func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}','producer_batch_size', 20000000,'producer_limit_size', {{var('row_limit',7500000)}}))", target = "{{model.schema}}.{{model.alias}}" ) ,if_data_call_wait()],
tags = ['streamline_decoded_logs_history']
) }}
{% set start = this.identifier.split("_") [-2] %}
{% set stop = this.identifier.split("_") [-1] %}
{{ fsc_utils.decode_logs_history(
start,
stop
) }}

View File

@ -1,12 +0,0 @@
{{ config (
materialized = "view",
post_hook = [if_data_call_function( func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}','producer_batch_size', 20000000,'producer_limit_size', {{var('row_limit',7500000)}}))", target = "{{model.schema}}.{{model.alias}}" ) ,if_data_call_wait()],
tags = ['streamline_decoded_logs_history']
) }}
{% set start = this.identifier.split("_") [-2] %}
{% set stop = this.identifier.split("_") [-1] %}
{{ fsc_utils.decode_logs_history(
start,
stop
) }}

View File

@ -1,12 +0,0 @@
{{ config (
materialized = "view",
post_hook = [if_data_call_function( func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}','producer_batch_size', 20000000,'producer_limit_size', {{var('row_limit',7500000)}}))", target = "{{model.schema}}.{{model.alias}}" ) ,if_data_call_wait()],
tags = ['streamline_decoded_logs_history']
) }}
{% set start = this.identifier.split("_") [-2] %}
{% set stop = this.identifier.split("_") [-1] %}
{{ fsc_utils.decode_logs_history(
start,
stop
) }}

View File

@ -1,12 +0,0 @@
{{ config (
materialized = "view",
post_hook = [if_data_call_function( func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}','producer_batch_size', 20000000,'producer_limit_size', {{var('row_limit',7500000)}}))", target = "{{model.schema}}.{{model.alias}}" ) ,if_data_call_wait()],
tags = ['streamline_decoded_logs_history']
) }}
{% set start = this.identifier.split("_") [-2] %}
{% set stop = this.identifier.split("_") [-1] %}
{{ fsc_utils.decode_logs_history(
start,
stop
) }}

View File

@ -1,12 +0,0 @@
{{ config (
materialized = "view",
post_hook = [if_data_call_function( func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}','producer_batch_size', 20000000,'producer_limit_size', {{var('row_limit',7500000)}}))", target = "{{model.schema}}.{{model.alias}}" ) ,if_data_call_wait()],
tags = ['streamline_decoded_logs_history']
) }}
{% set start = this.identifier.split("_") [-2] %}
{% set stop = this.identifier.split("_") [-1] %}
{{ fsc_utils.decode_logs_history(
start,
stop
) }}

View File

@ -1,12 +0,0 @@
{{ config (
materialized = "view",
post_hook = [if_data_call_function( func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}','producer_batch_size', 20000000,'producer_limit_size', {{var('row_limit',7500000)}}))", target = "{{model.schema}}.{{model.alias}}" ) ,if_data_call_wait()],
tags = ['streamline_decoded_logs_history']
) }}
{% set start = this.identifier.split("_") [-2] %}
{% set stop = this.identifier.split("_") [-1] %}
{{ fsc_utils.decode_logs_history(
start,
stop
) }}

View File

@ -1,12 +0,0 @@
{{ config (
materialized = "view",
post_hook = [if_data_call_function( func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}','producer_batch_size', 20000000,'producer_limit_size', {{var('row_limit',7500000)}}))", target = "{{model.schema}}.{{model.alias}}" ) ,if_data_call_wait()],
tags = ['streamline_decoded_logs_history']
) }}
{% set start = this.identifier.split("_") [-2] %}
{% set stop = this.identifier.split("_") [-1] %}
{{ fsc_utils.decode_logs_history(
start,
stop
) }}

View File

@ -1,12 +0,0 @@
{{ config (
materialized = "view",
post_hook = [if_data_call_function( func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}','producer_batch_size', 20000000,'producer_limit_size', {{var('row_limit',7500000)}}))", target = "{{model.schema}}.{{model.alias}}" ) ,if_data_call_wait()],
tags = ['streamline_decoded_logs_history']
) }}
{% set start = this.identifier.split("_") [-2] %}
{% set stop = this.identifier.split("_") [-1] %}
{{ fsc_utils.decode_logs_history(
start,
stop
) }}

View File

@ -1,12 +0,0 @@
{{ config (
materialized = "view",
post_hook = [if_data_call_function( func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}','producer_batch_size', 20000000,'producer_limit_size', {{var('row_limit',7500000)}}))", target = "{{model.schema}}.{{model.alias}}" ) ,if_data_call_wait()],
tags = ['streamline_decoded_logs_history']
) }}
{% set start = this.identifier.split("_") [-2] %}
{% set stop = this.identifier.split("_") [-1] %}
{{ fsc_utils.decode_logs_history(
start,
stop
) }}

View File

@ -1,12 +0,0 @@
{{ config (
materialized = "view",
post_hook = [if_data_call_function( func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}','producer_batch_size', 20000000,'producer_limit_size', {{var('row_limit',7500000)}}))", target = "{{model.schema}}.{{model.alias}}" ) ,if_data_call_wait()],
tags = ['streamline_decoded_logs_history']
) }}
{% set start = this.identifier.split("_") [-2] %}
{% set stop = this.identifier.split("_") [-1] %}
{{ fsc_utils.decode_logs_history(
start,
stop
) }}

View File

@ -1,12 +0,0 @@
{{ config (
materialized = "view",
post_hook = [if_data_call_function( func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}','producer_batch_size', 20000000,'producer_limit_size', {{var('row_limit',7500000)}}))", target = "{{model.schema}}.{{model.alias}}" ) ,if_data_call_wait()],
tags = ['streamline_decoded_logs_history']
) }}
{% set start = this.identifier.split("_") [-2] %}
{% set stop = this.identifier.split("_") [-1] %}
{{ fsc_utils.decode_logs_history(
start,
stop
) }}

View File

@ -1,12 +0,0 @@
{{ config (
materialized = "view",
post_hook = [if_data_call_function( func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}','producer_batch_size', 20000000,'producer_limit_size', {{var('row_limit',7500000)}}))", target = "{{model.schema}}.{{model.alias}}" ) ,if_data_call_wait()],
tags = ['streamline_decoded_logs_history']
) }}
{% set start = this.identifier.split("_") [-2] %}
{% set stop = this.identifier.split("_") [-1] %}
{{ fsc_utils.decode_logs_history(
start,
stop
) }}

View File

@ -1,12 +0,0 @@
{{ config (
materialized = "view",
post_hook = [if_data_call_function( func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}','producer_batch_size', 20000000,'producer_limit_size', {{var('row_limit',7500000)}}))", target = "{{model.schema}}.{{model.alias}}" ) ,if_data_call_wait()],
tags = ['streamline_decoded_logs_history']
) }}
{% set start = this.identifier.split("_") [-2] %}
{% set stop = this.identifier.split("_") [-1] %}
{{ fsc_utils.decode_logs_history(
start,
stop
) }}

View File

@ -1,12 +0,0 @@
{{ config (
materialized = "view",
post_hook = [if_data_call_function( func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}','producer_batch_size', 20000000,'producer_limit_size', {{var('row_limit',7500000)}}))", target = "{{model.schema}}.{{model.alias}}" ) ,if_data_call_wait()],
tags = ['streamline_decoded_logs_history']
) }}
{% set start = this.identifier.split("_") [-2] %}
{% set stop = this.identifier.split("_") [-1] %}
{{ fsc_utils.decode_logs_history(
start,
stop
) }}

View File

@ -1,12 +0,0 @@
{{ config (
materialized = "view",
post_hook = [if_data_call_function( func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}','producer_batch_size', 20000000,'producer_limit_size', {{var('row_limit',7500000)}}))", target = "{{model.schema}}.{{model.alias}}" ) ,if_data_call_wait()],
tags = ['streamline_decoded_logs_history']
) }}
{% set start = this.identifier.split("_") [-2] %}
{% set stop = this.identifier.split("_") [-1] %}
{{ fsc_utils.decode_logs_history(
start,
stop
) }}

View File

@ -1,12 +0,0 @@
{{ config (
materialized = "view",
post_hook = [if_data_call_function( func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}','producer_batch_size', 20000000,'producer_limit_size', {{var('row_limit',7500000)}}))", target = "{{model.schema}}.{{model.alias}}" ) ,if_data_call_wait()],
tags = ['streamline_decoded_logs_history']
) }}
{% set start = this.identifier.split("_") [-2] %}
{% set stop = this.identifier.split("_") [-1] %}
{{ fsc_utils.decode_logs_history(
start,
stop
) }}

View File

@ -1,12 +0,0 @@
{{ config (
materialized = "view",
post_hook = [if_data_call_function( func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}','producer_batch_size', 20000000,'producer_limit_size', {{var('row_limit',7500000)}}))", target = "{{model.schema}}.{{model.alias}}" ) ,if_data_call_wait()],
tags = ['streamline_decoded_logs_history']
) }}
{% set start = this.identifier.split("_") [-2] %}
{% set stop = this.identifier.split("_") [-1] %}
{{ fsc_utils.decode_logs_history(
start,
stop
) }}

View File

@ -1,12 +0,0 @@
{{ config (
materialized = "view",
post_hook = [if_data_call_function( func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}','producer_batch_size', 20000000,'producer_limit_size', {{var('row_limit',7500000)}}))", target = "{{model.schema}}.{{model.alias}}" ) ,if_data_call_wait()],
tags = ['streamline_decoded_logs_history']
) }}
{% set start = this.identifier.split("_") [-2] %}
{% set stop = this.identifier.split("_") [-1] %}
{{ fsc_utils.decode_logs_history(
start,
stop
) }}

View File

@ -1,12 +0,0 @@
{{ config (
materialized = "view",
post_hook = [if_data_call_function( func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}','producer_batch_size', 20000000,'producer_limit_size', {{var('row_limit',7500000)}}))", target = "{{model.schema}}.{{model.alias}}" ) ,if_data_call_wait()],
tags = ['streamline_decoded_logs_history']
) }}
{% set start = this.identifier.split("_") [-2] %}
{% set stop = this.identifier.split("_") [-1] %}
{{ fsc_utils.decode_logs_history(
start,
stop
) }}

View File

@ -1,12 +0,0 @@
{{ config (
materialized = "view",
post_hook = [if_data_call_function( func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}','producer_batch_size', 20000000,'producer_limit_size', {{var('row_limit',7500000)}}))", target = "{{model.schema}}.{{model.alias}}" ) ,if_data_call_wait()],
tags = ['streamline_decoded_logs_history']
) }}
{% set start = this.identifier.split("_") [-2] %}
{% set stop = this.identifier.split("_") [-1] %}
{{ fsc_utils.decode_logs_history(
start,
stop
) }}

View File

@ -1,12 +0,0 @@
{{ config (
materialized = "view",
post_hook = [if_data_call_function( func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}','producer_batch_size', 20000000,'producer_limit_size', {{var('row_limit',7500000)}}))", target = "{{model.schema}}.{{model.alias}}" ) ,if_data_call_wait()],
tags = ['streamline_decoded_logs_history']
) }}
{% set start = this.identifier.split("_") [-2] %}
{% set stop = this.identifier.split("_") [-1] %}
{{ fsc_utils.decode_logs_history(
start,
stop
) }}

View File

@ -1,12 +0,0 @@
{{ config (
materialized = "view",
post_hook = [if_data_call_function( func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}','producer_batch_size', 20000000,'producer_limit_size', {{var('row_limit',7500000)}}))", target = "{{model.schema}}.{{model.alias}}" ) ,if_data_call_wait()],
tags = ['streamline_decoded_logs_history']
) }}
{% set start = this.identifier.split("_") [-2] %}
{% set stop = this.identifier.split("_") [-1] %}
{{ fsc_utils.decode_logs_history(
start,
stop
) }}

View File

@ -1,12 +0,0 @@
{{ config (
materialized = "view",
post_hook = [if_data_call_function( func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}','producer_batch_size', 20000000,'producer_limit_size', {{var('row_limit',7500000)}}))", target = "{{model.schema}}.{{model.alias}}" ) ,if_data_call_wait()],
tags = ['streamline_decoded_logs_history']
) }}
{% set start = this.identifier.split("_") [-2] %}
{% set stop = this.identifier.split("_") [-1] %}
{{ fsc_utils.decode_logs_history(
start,
stop
) }}

View File

@ -1,12 +0,0 @@
{{ config (
materialized = "view",
post_hook = [if_data_call_function( func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}','producer_batch_size', 20000000,'producer_limit_size', {{var('row_limit',7500000)}}))", target = "{{model.schema}}.{{model.alias}}" ) ,if_data_call_wait()],
tags = ['streamline_decoded_logs_history']
) }}
{% set start = this.identifier.split("_") [-2] %}
{% set stop = this.identifier.split("_") [-1] %}
{{ fsc_utils.decode_logs_history(
start,
stop
) }}

View File

@ -1,12 +0,0 @@
{{ config (
materialized = "view",
post_hook = [if_data_call_function( func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}','producer_batch_size', 20000000,'producer_limit_size', {{var('row_limit',7500000)}}))", target = "{{model.schema}}.{{model.alias}}" ) ,if_data_call_wait()],
tags = ['streamline_decoded_logs_history']
) }}
{% set start = this.identifier.split("_") [-2] %}
{% set stop = this.identifier.split("_") [-1] %}
{{ fsc_utils.decode_logs_history(
start,
stop
) }}

View File

@ -1,12 +0,0 @@
{{ config (
materialized = "view",
post_hook = [if_data_call_function( func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}','producer_batch_size', 20000000,'producer_limit_size', {{var('row_limit',7500000)}}))", target = "{{model.schema}}.{{model.alias}}" ) ,if_data_call_wait()],
tags = ['streamline_decoded_logs_history']
) }}
{% set start = this.identifier.split("_") [-2] %}
{% set stop = this.identifier.split("_") [-1] %}
{{ fsc_utils.decode_logs_history(
start,
stop
) }}

View File

@ -1,12 +0,0 @@
{{ config (
materialized = "view",
post_hook = [if_data_call_function( func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}','producer_batch_size', 20000000,'producer_limit_size', {{var('row_limit',7500000)}}))", target = "{{model.schema}}.{{model.alias}}" ) ,if_data_call_wait()],
tags = ['streamline_decoded_logs_history']
) }}
{% set start = this.identifier.split("_") [-2] %}
{% set stop = this.identifier.split("_") [-1] %}
{{ fsc_utils.decode_logs_history(
start,
stop
) }}

View File

@ -1,12 +0,0 @@
{{ config (
materialized = "view",
post_hook = [if_data_call_function( func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}','producer_batch_size', 20000000,'producer_limit_size', {{var('row_limit',7500000)}}))", target = "{{model.schema}}.{{model.alias}}" ) ,if_data_call_wait()],
tags = ['streamline_decoded_logs_history']
) }}
{% set start = this.identifier.split("_") [-2] %}
{% set stop = this.identifier.split("_") [-1] %}
{{ fsc_utils.decode_logs_history(
start,
stop
) }}

View File

@ -1,12 +0,0 @@
{{ config (
materialized = "view",
post_hook = [if_data_call_function( func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}','producer_batch_size', 20000000,'producer_limit_size', {{var('row_limit',7500000)}}))", target = "{{model.schema}}.{{model.alias}}" ) ,if_data_call_wait()],
tags = ['streamline_decoded_logs_history']
) }}
{% set start = this.identifier.split("_") [-2] %}
{% set stop = this.identifier.split("_") [-1] %}
{{ fsc_utils.decode_logs_history(
start,
stop
) }}

View File

@ -1,12 +0,0 @@
{{ config (
materialized = "view",
post_hook = [if_data_call_function( func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}','producer_batch_size', 20000000,'producer_limit_size', {{var('row_limit',7500000)}}))", target = "{{model.schema}}.{{model.alias}}" ) ,if_data_call_wait()],
tags = ['streamline_decoded_logs_history']
) }}
{% set start = this.identifier.split("_") [-2] %}
{% set stop = this.identifier.split("_") [-1] %}
{{ fsc_utils.decode_logs_history(
start,
stop
) }}

View File

@ -1,12 +0,0 @@
{{ config (
materialized = "view",
post_hook = [if_data_call_function( func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}','producer_batch_size', 20000000,'producer_limit_size', {{var('row_limit',7500000)}}))", target = "{{model.schema}}.{{model.alias}}" ) ,if_data_call_wait()],
tags = ['streamline_decoded_logs_history']
) }}
{% set start = this.identifier.split("_") [-2] %}
{% set stop = this.identifier.split("_") [-1] %}
{{ fsc_utils.decode_logs_history(
start,
stop
) }}

View File

@ -1,12 +0,0 @@
{{ config (
materialized = "view",
post_hook = [if_data_call_function( func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}','producer_batch_size', 20000000,'producer_limit_size', {{var('row_limit',7500000)}}))", target = "{{model.schema}}.{{model.alias}}" ) ,if_data_call_wait()],
tags = ['streamline_decoded_logs_history']
) }}
{% set start = this.identifier.split("_") [-2] %}
{% set stop = this.identifier.split("_") [-1] %}
{{ fsc_utils.decode_logs_history(
start,
stop
) }}

View File

@ -1,12 +0,0 @@
{{ config (
materialized = "view",
post_hook = [if_data_call_function( func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}','producer_batch_size', 20000000,'producer_limit_size', {{var('row_limit',7500000)}}))", target = "{{model.schema}}.{{model.alias}}" ) ,if_data_call_wait()],
tags = ['streamline_decoded_logs_history']
) }}
{% set start = this.identifier.split("_") [-2] %}
{% set stop = this.identifier.split("_") [-1] %}
{{ fsc_utils.decode_logs_history(
start,
stop
) }}

View File

@ -1,12 +0,0 @@
{{ config (
materialized = "view",
post_hook = [if_data_call_function( func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}','producer_batch_size', 20000000,'producer_limit_size', {{var('row_limit',7500000)}}))", target = "{{model.schema}}.{{model.alias}}" ) ,if_data_call_wait()],
tags = ['streamline_decoded_logs_history']
) }}
{% set start = this.identifier.split("_") [-2] %}
{% set stop = this.identifier.split("_") [-1] %}
{{ fsc_utils.decode_logs_history(
start,
stop
) }}

View File

@ -1,12 +0,0 @@
{{ config (
materialized = "view",
post_hook = [if_data_call_function( func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}','producer_batch_size', 20000000,'producer_limit_size', {{var('row_limit',7500000)}}))", target = "{{model.schema}}.{{model.alias}}" ) ,if_data_call_wait()],
tags = ['streamline_decoded_logs_history']
) }}
{% set start = this.identifier.split("_") [-2] %}
{% set stop = this.identifier.split("_") [-1] %}
{{ fsc_utils.decode_logs_history(
start,
stop
) }}

View File

@ -1,12 +0,0 @@
{{ config (
materialized = "view",
post_hook = [if_data_call_function( func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}','producer_batch_size', 20000000,'producer_limit_size', {{var('row_limit',7500000)}}))", target = "{{model.schema}}.{{model.alias}}" ) ,if_data_call_wait()],
tags = ['streamline_decoded_logs_history']
) }}
{% set start = this.identifier.split("_") [-2] %}
{% set stop = this.identifier.split("_") [-1] %}
{{ fsc_utils.decode_logs_history(
start,
stop
) }}

View File

@ -1,12 +0,0 @@
{{ config (
materialized = "view",
post_hook = [if_data_call_function( func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}','producer_batch_size', 20000000,'producer_limit_size', {{var('row_limit',7500000)}}))", target = "{{model.schema}}.{{model.alias}}" ) ,if_data_call_wait()],
tags = ['streamline_decoded_logs_history']
) }}
{% set start = this.identifier.split("_") [-2] %}
{% set stop = this.identifier.split("_") [-1] %}
{{ fsc_utils.decode_logs_history(
start,
stop
) }}

View File

@ -1,12 +0,0 @@
{{ config (
materialized = "view",
post_hook = [if_data_call_function( func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}','producer_batch_size', 20000000,'producer_limit_size', {{var('row_limit',7500000)}}))", target = "{{model.schema}}.{{model.alias}}" ) ,if_data_call_wait()],
tags = ['streamline_decoded_logs_history']
) }}
{% set start = this.identifier.split("_") [-2] %}
{% set stop = this.identifier.split("_") [-1] %}
{{ fsc_utils.decode_logs_history(
start,
stop
) }}

View File

@ -1,12 +0,0 @@
{{ config (
materialized = "view",
post_hook = [if_data_call_function( func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}','producer_batch_size', 20000000,'producer_limit_size', {{var('row_limit',7500000)}}))", target = "{{model.schema}}.{{model.alias}}" ) ,if_data_call_wait()],
tags = ['streamline_decoded_logs_history']
) }}
{% set start = this.identifier.split("_") [-2] %}
{% set stop = this.identifier.split("_") [-1] %}
{{ fsc_utils.decode_logs_history(
start,
stop
) }}

View File

@ -1,12 +0,0 @@
{{ config (
materialized = "view",
post_hook = [if_data_call_function( func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}','producer_batch_size', 20000000,'producer_limit_size', {{var('row_limit',7500000)}}))", target = "{{model.schema}}.{{model.alias}}" ) ,if_data_call_wait()],
tags = ['streamline_decoded_logs_history']
) }}
{% set start = this.identifier.split("_") [-2] %}
{% set stop = this.identifier.split("_") [-1] %}
{{ fsc_utils.decode_logs_history(
start,
stop
) }}

View File

@ -4,12 +4,50 @@
tags = ['streamline_decoded_logs_realtime']
) }}
WITH look_back AS (
WITH target_blocks AS (
SELECT
block_number
FROM
{{ ref('core__fact_blocks') }}
WHERE
block_number >= (
SELECT
block_number
FROM
{{ ref("_block_lookback") }}
)
),
existing_logs_to_exclude AS (
SELECT
_log_id
FROM
{{ ref("_24_hour_lookback") }}
{{ ref('streamline__complete_decode_logs') }}
l
INNER JOIN target_blocks b USING (block_number)
WHERE
l._inserted_timestamp :: DATE >= DATEADD('day', -5, SYSDATE())
),
candidate_logs AS (
SELECT
l.block_number,
l.tx_hash,
l.event_index,
l.contract_address,
l.topics,
l.data,
CONCAT(
l.tx_hash :: STRING,
'-',
l.event_index :: STRING
) AS _log_id
FROM
target_blocks b
INNER JOIN {{ ref('core__fact_event_logs') }}
l USING (block_number)
WHERE
l.tx_status = 'SUCCESS'
AND l.inserted_timestamp :: DATE >= DATEADD('day', -5, SYSDATE())
)
SELECT
l.block_number,
@ -24,34 +62,19 @@ SELECT
l.contract_address
) AS DATA
FROM
{{ ref("silver__logs") }}
l
INNER JOIN {{ ref("silver__complete_event_abis") }} A
candidate_logs l
INNER JOIN {{ ref('silver__complete_event_abis') }} A
ON A.parent_contract_address = l.contract_address
AND A.event_signature = l.topics [0] :: STRING
AND l.block_number BETWEEN A.start_block
AND A.end_block
WHERE
(
l.block_number >= (
SELECT
block_number
FROM
look_back
)
)
AND l.block_number IS NOT NULL
AND l.block_timestamp >= DATEADD('day', -2, CURRENT_DATE())
AND _log_id NOT IN (
NOT EXISTS (
SELECT
_log_id
1
FROM
{{ ref("streamline__complete_decode_logs") }}
existing_logs_to_exclude e
WHERE
block_number >= (
SELECT
block_number
FROM
look_back
)
AND _inserted_timestamp >= DATEADD('day', -2, CURRENT_DATE()))
e._log_id = l._log_id
)
limit 7500000