From 155ae341329f74281d2e1273c935c1da05ccb811 Mon Sep 17 00:00:00 2001 From: drethereum <71602799+drethereum@users.noreply.github.com> Date: Mon, 27 Jan 2025 16:01:39 -0700 Subject: [PATCH] AN-5715/polygon-sl2-decoded-logs (#415) * sl2 decoded logs * package --- dbt_project.yml | 8 ++ macros/decoder/decoded_logs_history.sql | 34 +++--- macros/decoder/run_decoded_logs_history.sql | 6 +- ...eamline_external_table_queries_decoder.sql | 101 ++++++++++++++++ .../main_package/logging/logging.sql | 36 ++++++ models/silver/core/silver__decoded_logs.sql | 2 +- models/sources.yml | 1 + .../bronze/decoder/bronze__decoded_logs.sql | 58 ++++----- .../decoder/bronze__decoded_logs_fr.sql | 20 ++++ .../decoder/bronze__decoded_logs_fr_v1.sql | 23 ++++ .../decoder/bronze__decoded_logs_fr_v2.sql | 23 ++++ .../decoder/bronze__fr_decoded_logs.sql | 40 ------- .../streamline__decoded_logs_complete.sql | 50 ++++++++ .../streamline__decoded_logs_realtime.sql | 110 ++++++++++++++++++ .../streamline__complete_decode_logs.sql | 34 ------ .../streamline__decode_logs_realtime.sql | 80 ------------- package-lock.yml | 4 +- packages.yml | 2 +- 18 files changed, 415 insertions(+), 217 deletions(-) create mode 100644 macros/fsc_evm_temp/decoder_package/streamline_external_table_queries_decoder.sql create mode 100644 macros/fsc_evm_temp/main_package/logging/logging.sql create mode 100644 models/streamline/bronze/decoder/bronze__decoded_logs_fr.sql create mode 100644 models/streamline/bronze/decoder/bronze__decoded_logs_fr_v1.sql create mode 100644 models/streamline/bronze/decoder/bronze__decoded_logs_fr_v2.sql delete mode 100644 models/streamline/bronze/decoder/bronze__fr_decoded_logs.sql create mode 100644 models/streamline/silver/decoded_logs/complete/streamline__decoded_logs_complete.sql create mode 100644 models/streamline/silver/decoded_logs/realtime/streamline__decoded_logs_realtime.sql delete mode 100644 models/streamline/silver/decoder/complete/streamline__complete_decode_logs.sql delete mode 100644 models/streamline/silver/decoder/realtime/streamline__decode_logs_realtime.sql diff --git a/dbt_project.yml b/dbt_project.yml index 4357c90..d5903c2 100644 --- a/dbt_project.yml +++ b/dbt_project.yml @@ -132,4 +132,12 @@ vars: ### MAIN_PACKAGE VARIABLES END ### + ### DECODER_PACKAGE VARIABLES BEGIN ### + + ## REQUIRED + + ## OPTIONAL + + ### DECODER_PACKAGE VARIABLES END ### + #### FSC_EVM END #### diff --git a/macros/decoder/decoded_logs_history.sql b/macros/decoder/decoded_logs_history.sql index 91b064c..4fe2282 100644 --- a/macros/decoder/decoded_logs_history.sql +++ b/macros/decoder/decoded_logs_history.sql @@ -1,29 +1,26 @@ {% macro decoded_logs_history(backfill_mode=false) %} {%- set params = { - "sql_limit": var("DECODED_LOGS_HISTORY_SQL_LIMIT", 7500000), + "sql_limit": var("DECODED_LOGS_HISTORY_SQL_LIMIT", 8000000), "producer_batch_size": var("DECODED_LOGS_HISTORY_PRODUCER_BATCH_SIZE", 400000), - "worker_batch_size": var("DECODED_LOGS_HISTORY_WORKER_BATCH_SIZE", 100000), - "producer_limit_size": var("DECODED_LOGS_HISTORY_WORKER_BATCH_SIZE", 100000) + "worker_batch_size": var("DECODED_LOGS_HISTORY_WORKER_BATCH_SIZE", 100000) } -%} {% set wait_time = var("DECODED_LOGS_HISTORY_WAIT_TIME", 60) %} - {% set find_months_query %} SELECT DISTINCT date_trunc('month', block_timestamp)::date as month FROM {{ ref('core__fact_blocks') }} ORDER BY month ASC {% endset %} - {% set results = run_query(find_months_query) %} {% if execute %} {% set months = results.columns[0].values() %} - + {% for month in months %} {% set view_name = 'decoded_logs_history_' ~ month.strftime('%Y_%m') %} - + {% set create_view_query %} create or replace view streamline.{{view_name}} as ( WITH target_blocks AS ( @@ -46,7 +43,7 @@ ), existing_logs_to_exclude AS ( SELECT _log_id - FROM {{ ref('streamline__complete_decode_logs') }} l + FROM {{ ref('streamline__decoded_logs_complete') }} l INNER JOIN target_blocks b using (block_number) ), candidate_logs AS ( @@ -84,11 +81,9 @@ LIMIT {{ params.sql_limit }} ) {% endset %} - {# Create the view #} {% do run_query(create_view_query) %} {{ log("Created view for month " ~ month.strftime('%Y-%m'), info=True) }} - {% if var("STREAMLINE_INVOKE_STREAMS", false) %} {# Check if rows exist first #} {% set check_rows_query %} @@ -99,19 +94,22 @@ {% set has_rows = results.columns[0].values()[0] %} {% if has_rows %} - {# Invoke streamline since rows exist to decode #} + {# Invoke streamline, if rows exist to decode #} {% set decode_query %} - SELECT streamline.udf_bulk_decode_logs( - object_construct( - 'sql_source', '{{view_name}}', - 'producer_batch_size', {{ params.producer_batch_size }}, - 'producer_limit_size', {{ params.producer_limit_size }}) + SELECT + streamline.udf_bulk_decode_logs_v2( + PARSE_JSON( + $${ "external_table": "decoded_logs", + "producer_batch_size": {{ params.producer_batch_size }}, + "sql_limit": {{ params.sql_limit }}, + "sql_source": "{{view_name}}", + "worker_batch_size": {{ params.worker_batch_size }} }$$ + ) ); {% endset %} {% do run_query(decode_query) %} {{ log("Triggered decoding for month " ~ month.strftime('%Y-%m'), info=True) }} - {# Call wait since we actually did some decoding #} {% do run_query("call system$wait(" ~ wait_time ~ ")") %} {{ log("Completed wait after decoding for month " ~ month.strftime('%Y-%m'), info=True) }} @@ -119,7 +117,7 @@ {{ log("No rows to decode for month " ~ month.strftime('%Y-%m'), info=True) }} {% endif %} {% endif %} - + {% endfor %} {% endif %} diff --git a/macros/decoder/run_decoded_logs_history.sql b/macros/decoder/run_decoded_logs_history.sql index 560a355..d38c39d 100644 --- a/macros/decoder/run_decoded_logs_history.sql +++ b/macros/decoder/run_decoded_logs_history.sql @@ -1,14 +1,14 @@ {% macro run_decoded_logs_history() %} +{% set blockchain = var('GLOBAL_PROD_DB_NAME','').lower() %} + {% set check_for_new_user_abis_query %} select 1 from {{ ref('silver__user_verified_abis') }} where _inserted_timestamp::date = sysdate()::date and dayname(sysdate()) <> 'Sat' {% endset %} - {% set results = run_query(check_for_new_user_abis_query) %} - {% if execute %} {% set new_user_abis = results.columns[0].values()[0] %} @@ -17,7 +17,7 @@ SELECT github_actions.workflow_dispatches( 'FlipsideCrypto', - 'polygon-models', + '{{ blockchain }}' ~ '-models', 'dbt_run_streamline_decoded_logs_history.yml', NULL ) diff --git a/macros/fsc_evm_temp/decoder_package/streamline_external_table_queries_decoder.sql b/macros/fsc_evm_temp/decoder_package/streamline_external_table_queries_decoder.sql new file mode 100644 index 0000000..318a3da --- /dev/null +++ b/macros/fsc_evm_temp/decoder_package/streamline_external_table_queries_decoder.sql @@ -0,0 +1,101 @@ +{% macro streamline_external_table_query_decoder( + source_name, + source_version + ) %} + + {% if source_version != '' %} + {% set source_version = '_' ~ source_version.lower() %} + {% endif %} + + WITH meta AS ( + SELECT + job_created_time AS _inserted_timestamp, + file_name, + CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 6), '_', 1) AS INTEGER) AS _partition_by_block_number, + TO_DATE( + concat_ws('-', SPLIT_PART(file_name, '/', 3), SPLIT_PART(file_name, '/', 4), SPLIT_PART(file_name, '/', 5)) + ) AS _partition_by_created_date + FROM + TABLE( + information_schema.external_table_file_registration_history( + start_time => DATEADD('day', -3, CURRENT_TIMESTAMP()), + table_name => '{{ source( "bronze_streamline", source_name ~ source_version) }}') + ) A + ) + SELECT + block_number, + id :: STRING AS id, + DATA, + metadata, + b.file_name, + _inserted_timestamp, + s._partition_by_block_number AS _partition_by_block_number, + s._partition_by_created_date AS _partition_by_created_date + FROM + {{ source( + "bronze_streamline", + source_name ~ source_version + ) }} + s + JOIN meta b + ON b.file_name = metadata$filename + AND b._partition_by_block_number = s._partition_by_block_number + AND b._partition_by_created_date = s._partition_by_created_date + WHERE + b._partition_by_block_number = s._partition_by_block_number + AND b._partition_by_created_date = s._partition_by_created_date + AND s._partition_by_created_date >= DATEADD('day', -2, CURRENT_TIMESTAMP()) + AND DATA :error IS NULL + AND DATA IS NOT NULL +{% endmacro %} + + +{% macro streamline_external_table_query_decoder_fr( + source_name, + source_version + ) %} + + {% if source_version != '' %} + {% set source_version = '_' ~ source_version.lower() %} + {% endif %} + + WITH meta AS ( + SELECT + registered_on AS _inserted_timestamp, + file_name, + CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 6), '_', 1) AS INTEGER) AS _partition_by_block_number, + TO_DATE( + concat_ws('-', SPLIT_PART(file_name, '/', 3), SPLIT_PART(file_name, '/', 4), SPLIT_PART(file_name, '/', 5)) + ) AS _partition_by_created_date + FROM + TABLE( + information_schema.external_table_files( + table_name => '{{ source( "bronze_streamline", source_name ~ source_version) }}' + ) + ) A + ) +SELECT + block_number, + id :: STRING AS id, + DATA, + metadata, + b.file_name, + _inserted_timestamp, + s._partition_by_block_number AS _partition_by_block_number, + s._partition_by_created_date AS _partition_by_created_date +FROM + {{ source( + "bronze_streamline", + source_name ~ source_version + ) }} + s + JOIN meta b + ON b.file_name = metadata$filename + AND b._partition_by_block_number = s._partition_by_block_number + AND b._partition_by_created_date = s._partition_by_created_date +WHERE + b._partition_by_block_number = s._partition_by_block_number + AND b._partition_by_created_date = s._partition_by_created_date + AND DATA :error IS NULL + AND DATA IS NOT NULL +{% endmacro %} diff --git a/macros/fsc_evm_temp/main_package/logging/logging.sql b/macros/fsc_evm_temp/main_package/logging/logging.sql new file mode 100644 index 0000000..f368685 --- /dev/null +++ b/macros/fsc_evm_temp/main_package/logging/logging.sql @@ -0,0 +1,36 @@ +{% macro log_model_details(vars=false, params=false) %} + +{%- if execute -%} +/* +DBT Model Config: +{{ model.config | tojson(indent=2) }} +*/ + +{% if vars is not false %} + +{% if var('LOG_MODEL_DETAILS', false) %} +{{ log( vars | tojson(indent=2), info=True) }} +{% endif %} +/* +Variables: +{{ vars | tojson(indent=2) }} +*/ +{% endif %} + +{% if params is not false %} + +{% if var('LOG_MODEL_DETAILS', false) %} +{{ log( params | tojson(indent=2), info=True) }} +{% endif %} +/* +Parameters: +{{ params | tojson(indent=2) }} +*/ +{% endif %} + +/* +Raw Code: +{{ model.raw_code }} +*/ +{%- endif -%} +{% endmacro %} \ No newline at end of file diff --git a/models/silver/core/silver__decoded_logs.sql b/models/silver/core/silver__decoded_logs.sql index 798368d..a589781 100644 --- a/models/silver/core/silver__decoded_logs.sql +++ b/models/silver/core/silver__decoded_logs.sql @@ -42,7 +42,7 @@ WHERE ) AND DATA NOT ILIKE '%Event topic is not present in given ABI%' {% else %} - {{ ref('bronze__fr_decoded_logs') }} + {{ ref('bronze__decoded_logs_fr') }} WHERE _partition_by_block_number <= 2500000 AND DATA NOT ILIKE '%Event topic is not present in given ABI%' diff --git a/models/sources.yml b/models/sources.yml index 02d1bf1..31b53ea 100644 --- a/models/sources.yml +++ b/models/sources.yml @@ -30,6 +30,7 @@ sources: - name: receipts_v2 - name: traces_v2 - name: confirm_blocks_v2 + - name: decoded_logs_v2 - name: streamline_crosschain database: streamline schema: crosschain diff --git a/models/streamline/bronze/decoder/bronze__decoded_logs.sql b/models/streamline/bronze/decoder/bronze__decoded_logs.sql index bd43f6f..8a339c6 100644 --- a/models/streamline/bronze/decoder/bronze__decoded_logs.sql +++ b/models/streamline/bronze/decoder/bronze__decoded_logs.sql @@ -1,41 +1,23 @@ -{{ config ( - materialized = 'view' +{# Set variables #} +{% set source_name = 'DECODED_LOGS' %} +{% set source_version = 'V2' if var('GLOBAL_USES_STREAMLINE_V1', false) else '' %} +{% set model_type = '' %} + +{%- set default_vars = set_default_variables_bronze(source_name, model_type) -%} + +{# Log configuration details #} +{{ log_model_details( + vars = default_vars ) }} -WITH meta AS ( +{# Set up dbt configuration #} +{{ config ( + materialized = 'view', + tags = ['bronze_decoded_logs'] +) }} - SELECT - last_modified AS _inserted_timestamp, - file_name, - CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 6), '_', 1) AS INTEGER) AS _partition_by_block_number, - TO_DATE( - concat_ws('-', SPLIT_PART(file_name, '/', 3), SPLIT_PART(file_name, '/', 4), SPLIT_PART(file_name, '/', 5)) - ) AS _partition_by_created_date - FROM - TABLE( - information_schema.external_table_file_registration_history( - start_time => DATEADD('day', -3, CURRENT_TIMESTAMP()), - table_name => '{{ source( "bronze_streamline", "decoded_logs") }}') - ) A - ) - SELECT - block_number, - id :: STRING AS id, - DATA, - _inserted_timestamp, - s._partition_by_block_number AS _partition_by_block_number, - s._partition_by_created_date AS _partition_by_created_date - FROM - {{ source( - "bronze_streamline", - "decoded_logs" - ) }} - s - JOIN meta b - ON b.file_name = metadata$filename - AND b._partition_by_block_number = s._partition_by_block_number - AND b._partition_by_created_date = s._partition_by_created_date - WHERE - b._partition_by_block_number = s._partition_by_block_number - AND b._partition_by_created_date = s._partition_by_created_date - AND s._partition_by_created_date >= DATEADD('day', -2, CURRENT_TIMESTAMP()) +{# Main query starts here #} +{{ streamline_external_table_query_decoder( + source_name = source_name.lower(), + source_version = source_version.lower() +) }} \ No newline at end of file diff --git a/models/streamline/bronze/decoder/bronze__decoded_logs_fr.sql b/models/streamline/bronze/decoder/bronze__decoded_logs_fr.sql new file mode 100644 index 0000000..bbe5955 --- /dev/null +++ b/models/streamline/bronze/decoder/bronze__decoded_logs_fr.sql @@ -0,0 +1,20 @@ +{# Log configuration details #} +{{ log_model_details() }} + +{# Set up dbt configuration #} +{{ config ( + materialized = 'view', + tags = ['bronze_decoded_logs'] +) }} + +SELECT + * +FROM + {{ ref('bronze__decoded_logs_fr_v2') }} +{% if var('GLOBAL_USES_STREAMLINE_V1', false) %} +UNION ALL +SELECT + * +FROM + {{ ref('bronze__decoded_logs_fr_v1') }} +{% endif %} diff --git a/models/streamline/bronze/decoder/bronze__decoded_logs_fr_v1.sql b/models/streamline/bronze/decoder/bronze__decoded_logs_fr_v1.sql new file mode 100644 index 0000000..8c122fa --- /dev/null +++ b/models/streamline/bronze/decoder/bronze__decoded_logs_fr_v1.sql @@ -0,0 +1,23 @@ +{# Set variables #} +{% set source_name = 'DECODED_LOGS' %} +{% set source_version = '' %} +{% set model_type = 'FR' %} + +{%- set default_vars = set_default_variables_bronze(source_name, model_type) -%} + +{# Log configuration details #} +{{ log_model_details( + vars = default_vars +) }} + +{# Set up dbt configuration #} +{{ config ( + materialized = 'view', + tags = ['bronze_decoded_logs_streamline_v1'] +) }} + +{# Main query starts here #} +{{ streamline_external_table_query_decoder_fr( + source_name = source_name.lower(), + source_version = source_version.lower() +) }} \ No newline at end of file diff --git a/models/streamline/bronze/decoder/bronze__decoded_logs_fr_v2.sql b/models/streamline/bronze/decoder/bronze__decoded_logs_fr_v2.sql new file mode 100644 index 0000000..2bd430a --- /dev/null +++ b/models/streamline/bronze/decoder/bronze__decoded_logs_fr_v2.sql @@ -0,0 +1,23 @@ +{# Set variables #} +{% set source_name = 'DECODED_LOGS' %} +{% set source_version = 'V2' if var('GLOBAL_USES_STREAMLINE_V1', false) else '' %} +{% set model_type = 'FR' %} + +{%- set default_vars = set_default_variables_bronze(source_name, model_type) -%} + +{# Log configuration details #} +{{ log_model_details( + vars = default_vars +) }} + +{# Set up dbt configuration #} +{{ config ( + materialized = 'view', + tags = ['bronze_decoded_logs'] +) }} + +{# Main query starts here #} +{{ streamline_external_table_query_decoder_fr( + source_name = source_name.lower(), + source_version = source_version.lower() +) }} \ No newline at end of file diff --git a/models/streamline/bronze/decoder/bronze__fr_decoded_logs.sql b/models/streamline/bronze/decoder/bronze__fr_decoded_logs.sql deleted file mode 100644 index 4e4a1c8..0000000 --- a/models/streamline/bronze/decoder/bronze__fr_decoded_logs.sql +++ /dev/null @@ -1,40 +0,0 @@ -{{ config ( - materialized = 'view' -) }} - -WITH meta AS ( - - SELECT - registered_on AS _inserted_timestamp, - file_name, - CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 6), '_', 1) AS INTEGER) AS _partition_by_block_number, - TO_DATE( - concat_ws('-', SPLIT_PART(file_name, '/', 3), SPLIT_PART(file_name, '/', 4), SPLIT_PART(file_name, '/', 5)) - ) AS _partition_by_created_date - FROM - TABLE( - information_schema.external_table_files( - table_name => '{{ source( "bronze_streamline", "decoded_logs") }}' - ) - ) A -) -SELECT - block_number, - id :: STRING AS id, - DATA, - _inserted_timestamp, - s._partition_by_block_number AS _partition_by_block_number, - s._partition_by_created_date AS _partition_by_created_date -FROM - {{ source( - "bronze_streamline", - "decoded_logs" - ) }} - s - JOIN meta b - ON b.file_name = metadata$filename - AND b._partition_by_block_number = s._partition_by_block_number - AND b._partition_by_created_date = s._partition_by_created_date -WHERE - b._partition_by_block_number = s._partition_by_block_number - AND b._partition_by_created_date = s._partition_by_created_date diff --git a/models/streamline/silver/decoded_logs/complete/streamline__decoded_logs_complete.sql b/models/streamline/silver/decoded_logs/complete/streamline__decoded_logs_complete.sql new file mode 100644 index 0000000..3e80589 --- /dev/null +++ b/models/streamline/silver/decoded_logs/complete/streamline__decoded_logs_complete.sql @@ -0,0 +1,50 @@ +{# Set variables #} +{%- set source_name = 'DECODED_LOGS' -%} +{%- set model_type = 'COMPLETE' -%} + +{%- set full_refresh_type = var((source_name ~ '_complete_full_refresh').upper(), false) -%} + +{% set post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(_log_id)" %} + +{# Log configuration details #} +{{ log_model_details() }} + +{# Set up dbt configuration #} +-- depends_on: {{ ref('bronze__' ~ source_name.lower()) }} + +{{ config ( + materialized = "incremental", + unique_key = "_log_id", + cluster_by = "ROUND(block_number, -3)", + incremental_predicates = ["dynamic_range", "block_number"], + merge_update_columns = ["_log_id"], + post_hook = post_hook, + full_refresh = full_refresh_type, + tags = ['streamline_decoded_logs_complete'] +) }} + +{# Main query starts here #} +SELECT + block_number, + file_name, + id AS _log_id, + {{ dbt_utils.generate_surrogate_key(['id']) }} AS complete_{{ source_name.lower() }}_id, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp, + _inserted_timestamp, + '{{ invocation_id }}' AS _invocation_id +FROM + {% if is_incremental() %} + {{ ref('bronze__' ~ source_name.lower()) }} + WHERE + _inserted_timestamp >= ( + SELECT + COALESCE(MAX(_inserted_timestamp), '1970-01-01'::TIMESTAMP) AS _inserted_timestamp + FROM + {{ this }} + ) + {% else %} + {{ ref('bronze__' ~ source_name.lower() ~ '_fr') }} + {% endif %} + +QUALIFY (ROW_NUMBER() OVER (PARTITION BY id ORDER BY _inserted_timestamp DESC)) = 1 diff --git a/models/streamline/silver/decoded_logs/realtime/streamline__decoded_logs_realtime.sql b/models/streamline/silver/decoded_logs/realtime/streamline__decoded_logs_realtime.sql new file mode 100644 index 0000000..3c624d2 --- /dev/null +++ b/models/streamline/silver/decoded_logs/realtime/streamline__decoded_logs_realtime.sql @@ -0,0 +1,110 @@ +{%- set testing_limit = var('DECODED_LOGS_REALTIME_TESTING_LIMIT', none) -%} + +{%- set streamline_params = { + "external_table": var("DECODED_LOGS_REALTIME_EXTERNAL_TABLE", "decoded_logs"), + "sql_limit": var("DECODED_LOGS_REALTIME_SQL_LIMIT", 10000000), + "producer_batch_size": var("DECODED_LOGS_REALTIME_PRODUCER_BATCH_SIZE", 400000), + "worker_batch_size": var("DECODED_LOGS_REALTIME_WORKER_BATCH_SIZE", 200000), + "sql_source": "decoded_logs_realtime" +} -%} + +{# Log configuration details #} +{{ log_model_details( + params = streamline_params +) }} + +{# Set up dbt configuration #} +{{ config ( + materialized = "view", + post_hook = [fsc_utils.if_data_call_function_v2( + func = 'streamline.udf_bulk_decode_logs_v2', + target = "{{this.schema}}.{{this.identifier}}", + params = { + "external_table": streamline_params['external_table'], + "sql_limit": streamline_params['sql_limit'], + "producer_batch_size": streamline_params['producer_batch_size'], + "worker_batch_size": streamline_params['worker_batch_size'], + "sql_source": streamline_params['sql_source'] + } + ), + fsc_utils.if_data_call_wait()], + tags = ['streamline_decoded_logs_realtime'] +) }} + +WITH target_blocks AS ( + SELECT + block_number + FROM + {{ ref('core__fact_blocks') }} + WHERE + block_number >= ( + SELECT + block_number + FROM + {{ ref('_24_hour_lookback') }} + ) +), +existing_logs_to_exclude AS ( + SELECT + _log_id + FROM + {{ ref('streamline__decoded_logs_complete') }} + l + INNER JOIN target_blocks b USING (block_number) + WHERE + l.inserted_timestamp :: DATE >= DATEADD('day', -2, SYSDATE()) +), +candidate_logs AS ( + SELECT + l.block_number, + l.tx_hash, + l.event_index, + l.contract_address, + l.topics, + l.data, + CONCAT( + l.tx_hash :: STRING, + '-', + l.event_index :: STRING + ) AS _log_id + FROM + target_blocks b + INNER JOIN {{ ref('core__fact_event_logs') }} + l USING (block_number) + WHERE + l.tx_status = 'SUCCESS' + AND l.inserted_timestamp :: DATE >= DATEADD('day', -2, SYSDATE()) +) +SELECT + l.block_number, + l._log_id, + A.abi, + OBJECT_CONSTRUCT( + 'topics', + l.topics, + 'data', + l.data, + 'address', + l.contract_address + ) AS DATA +FROM + candidate_logs l + INNER JOIN {{ ref('silver__complete_event_abis') }} A + ON A.parent_contract_address = l.contract_address + AND A.event_signature = l.topics [0] :: STRING + AND l.block_number BETWEEN A.start_block + AND A.end_block +WHERE + NOT EXISTS ( + SELECT + 1 + FROM + existing_logs_to_exclude e + WHERE + e._log_id = l._log_id + ) + +{% if testing_limit is not none %} + LIMIT + {{ testing_limit }} +{% endif %} \ No newline at end of file diff --git a/models/streamline/silver/decoder/complete/streamline__complete_decode_logs.sql b/models/streamline/silver/decoder/complete/streamline__complete_decode_logs.sql deleted file mode 100644 index 00a4c22..0000000 --- a/models/streamline/silver/decoder/complete/streamline__complete_decode_logs.sql +++ /dev/null @@ -1,34 +0,0 @@ --- depends_on: {{ ref('bronze__decoded_logs') }} -{{ config ( - materialized = "incremental", - unique_key = "_log_id", - cluster_by = "ROUND(block_number, -3)", - incremental_predicates = ["dynamic_range", "block_number"], - merge_update_columns = ["_log_id"], - post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(_log_id)", - full_refresh = false, - tags = ['streamline_decoded_logs_complete'] -) }} - -SELECT - block_number, - id AS _log_id, - _inserted_timestamp -FROM - -{% if is_incremental() %} -{{ ref('bronze__decoded_logs') }} -WHERE - TO_TIMESTAMP_NTZ(_inserted_timestamp) >= ( - SELECT - MAX(_inserted_timestamp) - FROM - {{ this }} - ) -{% else %} - {{ ref('bronze__fr_decoded_logs') }} -{% endif %} - -qualify(ROW_NUMBER() over (PARTITION BY id -ORDER BY - _inserted_timestamp DESC)) = 1 diff --git a/models/streamline/silver/decoder/realtime/streamline__decode_logs_realtime.sql b/models/streamline/silver/decoder/realtime/streamline__decode_logs_realtime.sql deleted file mode 100644 index 97e293a..0000000 --- a/models/streamline/silver/decoder/realtime/streamline__decode_logs_realtime.sql +++ /dev/null @@ -1,80 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = [if_data_call_function( func = "{{this.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{this.identifier}}','producer_batch_size', 20000000,'producer_limit_size', 20000000))", target = "{{this.schema}}.{{this.identifier}}" ),"call system$wait(" ~ var("WAIT", 400) ~ ")" ], - tags = ['streamline_decoded_logs_realtime'] -) }} - -WITH target_blocks AS ( - - SELECT - block_number - FROM - {{ ref('core__fact_blocks') }} - WHERE - block_number >= ( - SELECT - block_number - FROM - {{ ref("_block_lookback") }} - ) -), -existing_logs_to_exclude AS ( - SELECT - _log_id - FROM - {{ ref('streamline__complete_decode_logs') }} - l - INNER JOIN target_blocks b USING (block_number) - WHERE - l._inserted_timestamp :: DATE >= DATEADD('day', -5, SYSDATE()) -), -candidate_logs AS ( - SELECT - l.block_number, - l.tx_hash, - l.event_index, - l.contract_address, - l.topics, - l.data, - CONCAT( - l.tx_hash :: STRING, - '-', - l.event_index :: STRING - ) AS _log_id - FROM - target_blocks b - INNER JOIN {{ ref('core__fact_event_logs') }} - l USING (block_number) - WHERE - l.tx_status = 'SUCCESS' - AND l.inserted_timestamp :: DATE >= DATEADD('day', -5, SYSDATE()) -) -SELECT - l.block_number, - l._log_id, - A.abi AS abi, - OBJECT_CONSTRUCT( - 'topics', - l.topics, - 'data', - l.data, - 'address', - l.contract_address - ) AS DATA -FROM - candidate_logs l - INNER JOIN {{ ref('silver__complete_event_abis') }} A - ON A.parent_contract_address = l.contract_address - AND A.event_signature = l.topics [0] :: STRING - AND l.block_number BETWEEN A.start_block - AND A.end_block -WHERE - NOT EXISTS ( - SELECT - 1 - FROM - existing_logs_to_exclude e - WHERE - e._log_id = l._log_id - ) -limit 7500000 \ No newline at end of file diff --git a/package-lock.yml b/package-lock.yml index ee9ea8a..54b0cb5 100644 --- a/package-lock.yml +++ b/package-lock.yml @@ -6,7 +6,7 @@ packages: - package: dbt-labs/dbt_utils version: 1.0.0 - git: https://github.com/FlipsideCrypto/fsc-utils.git - revision: eb33ac727af26ebc8a8cc9711d4a6ebc3790a107 + revision: c3ab97e8e06d31e8c6f63819714e0a2d45c45e82 - package: get-select/dbt_snowflake_query_tags version: 2.5.0 - git: https://github.com/FlipsideCrypto/fsc-evm.git @@ -15,4 +15,4 @@ packages: version: 0.7.2 - git: https://github.com/FlipsideCrypto/livequery-models.git revision: b024188be4e9c6bc00ed77797ebdc92d351d620e -sha1_hash: 622a679ecf98e6ebf3c904241902ce5328c77e52 +sha1_hash: b53a8dd1a1f99a375e8f024920d5eb7630a0765f diff --git a/packages.yml b/packages.yml index 4fb6364..e90ff6f 100644 --- a/packages.yml +++ b/packages.yml @@ -6,7 +6,7 @@ packages: - package: dbt-labs/dbt_utils version: 1.0.0 - git: https://github.com/FlipsideCrypto/fsc-utils.git - revision: v1.29.0 + revision: v1.31.0 - package: get-select/dbt_snowflake_query_tags version: [">=2.0.0", "<3.0.0"] - git: https://github.com/FlipsideCrypto/fsc-evm.git