From 251ddd2ac10e9f82a235a4eac0b16f5fa497441d Mon Sep 17 00:00:00 2001 From: drethereum <71602799+drethereum@users.noreply.github.com> Date: Wed, 13 Mar 2024 11:21:32 -0600 Subject: [PATCH] AN-4606/decoded-logs (#20) * decoded logs streamline, silver, core and macros * realtime job * vars in workflow * v2 * wait * overview docs * decoder sched * remove coalesce --- .../dbt_run_scheduled_non_realtime.yml | 2 +- .../workflows/dbt_run_streamline_decoder.yml | 44 ++++ .../dbt_run_streamline_decoder_history.yml | 45 ++++ data/github_actions__workflows.csv | 1 + macros/create_udfs.sql | 3 +- macros/streamline/streamline_udfs.sql | 11 + .../doc_descriptions/general/__overview__.md | 2 + .../gold/core/core__ez_decoded_event_logs.sql | 30 +++ .../gold/core/core__ez_decoded_event_logs.yml | 65 +++++ .../core/core__fact_decoded_event_logs.sql | 20 ++ .../core/core__fact_decoded_event_logs.yml | 49 ++++ models/silver/core/silver__decoded_logs.sql | 238 ++++++++++++++++++ .../test_silver__decoded_logs_full.sql | 9 + .../test_silver__decoded_logs_full.yml | 52 ++++ .../test_silver__decoded_logs_recent.sql | 23 ++ .../test_silver__decoded_logs_recent.yml | 56 +++++ .../silver__asset_metadata_all_providers.sql | 0 .../silver__asset_metadata_all_providers.yml | 0 .../silver__asset_metadata_priority.sql | 0 .../silver__asset_metadata_priority.yml | 0 .../silver__hourly_prices_all_providers.sql | 0 .../silver__hourly_prices_all_providers.yml | 0 .../prices/silver__hourly_prices_priority.sql | 0 .../prices/silver__hourly_prices_priority.yml | 0 .../silver__hourly_prices_priority_eth.sql | 0 .../silver__hourly_prices_priority_eth.yml | 0 .../bronze__streamline_FR_blocks.sql | 0 .../bronze__streamline_FR_confirm_blocks.sql | 0 .../bronze__streamline_FR_receipts.sql | 0 .../bronze__streamline_FR_traces.sql | 0 .../bronze__streamline_FR_transactions.sql | 0 .../{ => core}/bronze__streamline_blocks.sql | 0 .../bronze__streamline_confirm_blocks.sql | 0 .../bronze__streamline_receipts.sql | 0 .../{ => core}/bronze__streamline_traces.sql | 0 .../bronze__streamline_transactions.sql | 0 .../bronze/decoder/bronze__decoded_logs.sql | 41 +++ .../decoder/bronze__fr_decoded_logs.sql | 40 +++ .../streamline__complete_decode_logs.sql | 32 +++ ...ecode_logs_history_000000001_002500000.sql | 16 ++ ...ecode_logs_history_002500001_005000000.sql | 16 ++ ...ecode_logs_history_005000001_007500000.sql | 16 ++ ...ecode_logs_history_010000001_012500000.sql | 16 ++ ...ecode_logs_history_012500001_015000000.sql | 16 ++ ...ecode_logs_history_015000001_017500000.sql | 16 ++ ...ecode_logs_history_017500001_020000000.sql | 16 ++ ...ecode_logs_history_020000001_022500000.sql | 16 ++ ...ecode_logs_history_022500001_025000000.sql | 16 ++ ...ecode_logs_history_025000001_027500000.sql | 16 ++ ...ecode_logs_history_027500001_030000000.sql | 16 ++ ...ecode_logs_history_030000001_032500000.sql | 16 ++ ...ecode_logs_history_032500001_035000000.sql | 16 ++ ...ecode_logs_history_035000001_037500000.sql | 16 ++ ...ecode_logs_history_037500001_040000000.sql | 16 ++ ...ecode_logs_history_040000001_042500000.sql | 16 ++ ...ecode_logs_history_042500001_045000000.sql | 16 ++ ...ecode_logs_history_045000001_047500000.sql | 16 ++ ...ecode_logs_history_047500001_050000000.sql | 16 ++ .../streamline__decode_logs_realtime.sql | 66 +++++ 59 files changed, 1131 insertions(+), 2 deletions(-) create mode 100644 .github/workflows/dbt_run_streamline_decoder.yml create mode 100644 .github/workflows/dbt_run_streamline_decoder_history.yml create mode 100644 models/gold/core/core__ez_decoded_event_logs.sql create mode 100644 models/gold/core/core__ez_decoded_event_logs.yml create mode 100644 models/gold/core/core__fact_decoded_event_logs.sql create mode 100644 models/gold/core/core__fact_decoded_event_logs.yml create mode 100644 models/silver/core/silver__decoded_logs.sql create mode 100644 models/silver/core/tests/decoded_logs/test_silver__decoded_logs_full.sql create mode 100644 models/silver/core/tests/decoded_logs/test_silver__decoded_logs_full.yml create mode 100644 models/silver/core/tests/decoded_logs/test_silver__decoded_logs_recent.sql create mode 100644 models/silver/core/tests/decoded_logs/test_silver__decoded_logs_recent.yml rename models/silver/{core => }/prices/silver__asset_metadata_all_providers.sql (100%) rename models/silver/{core => }/prices/silver__asset_metadata_all_providers.yml (100%) rename models/silver/{core => }/prices/silver__asset_metadata_priority.sql (100%) rename models/silver/{core => }/prices/silver__asset_metadata_priority.yml (100%) rename models/silver/{core => }/prices/silver__hourly_prices_all_providers.sql (100%) rename models/silver/{core => }/prices/silver__hourly_prices_all_providers.yml (100%) rename models/silver/{core => }/prices/silver__hourly_prices_priority.sql (100%) rename models/silver/{core => }/prices/silver__hourly_prices_priority.yml (100%) rename models/silver/{core => }/prices/silver__hourly_prices_priority_eth.sql (100%) rename models/silver/{core => }/prices/silver__hourly_prices_priority_eth.yml (100%) rename models/streamline/bronze/{ => core}/bronze__streamline_FR_blocks.sql (100%) rename models/streamline/bronze/{ => core}/bronze__streamline_FR_confirm_blocks.sql (100%) rename models/streamline/bronze/{ => core}/bronze__streamline_FR_receipts.sql (100%) rename models/streamline/bronze/{ => core}/bronze__streamline_FR_traces.sql (100%) rename models/streamline/bronze/{ => core}/bronze__streamline_FR_transactions.sql (100%) rename models/streamline/bronze/{ => core}/bronze__streamline_blocks.sql (100%) rename models/streamline/bronze/{ => core}/bronze__streamline_confirm_blocks.sql (100%) rename models/streamline/bronze/{ => core}/bronze__streamline_receipts.sql (100%) rename models/streamline/bronze/{ => core}/bronze__streamline_traces.sql (100%) rename models/streamline/bronze/{ => core}/bronze__streamline_transactions.sql (100%) create mode 100644 models/streamline/bronze/decoder/bronze__decoded_logs.sql create mode 100644 models/streamline/bronze/decoder/bronze__fr_decoded_logs.sql create mode 100644 models/streamline/silver/decoder/complete/streamline__complete_decode_logs.sql create mode 100644 models/streamline/silver/decoder/history/streamline__decode_logs_history_000000001_002500000.sql create mode 100644 models/streamline/silver/decoder/history/streamline__decode_logs_history_002500001_005000000.sql create mode 100644 models/streamline/silver/decoder/history/streamline__decode_logs_history_005000001_007500000.sql create mode 100644 models/streamline/silver/decoder/history/streamline__decode_logs_history_010000001_012500000.sql create mode 100644 models/streamline/silver/decoder/history/streamline__decode_logs_history_012500001_015000000.sql create mode 100644 models/streamline/silver/decoder/history/streamline__decode_logs_history_015000001_017500000.sql create mode 100644 models/streamline/silver/decoder/history/streamline__decode_logs_history_017500001_020000000.sql create mode 100644 models/streamline/silver/decoder/history/streamline__decode_logs_history_020000001_022500000.sql create mode 100644 models/streamline/silver/decoder/history/streamline__decode_logs_history_022500001_025000000.sql create mode 100644 models/streamline/silver/decoder/history/streamline__decode_logs_history_025000001_027500000.sql create mode 100644 models/streamline/silver/decoder/history/streamline__decode_logs_history_027500001_030000000.sql create mode 100644 models/streamline/silver/decoder/history/streamline__decode_logs_history_030000001_032500000.sql create mode 100644 models/streamline/silver/decoder/history/streamline__decode_logs_history_032500001_035000000.sql create mode 100644 models/streamline/silver/decoder/history/streamline__decode_logs_history_035000001_037500000.sql create mode 100644 models/streamline/silver/decoder/history/streamline__decode_logs_history_037500001_040000000.sql create mode 100644 models/streamline/silver/decoder/history/streamline__decode_logs_history_040000001_042500000.sql create mode 100644 models/streamline/silver/decoder/history/streamline__decode_logs_history_042500001_045000000.sql create mode 100644 models/streamline/silver/decoder/history/streamline__decode_logs_history_045000001_047500000.sql create mode 100644 models/streamline/silver/decoder/history/streamline__decode_logs_history_047500001_050000000.sql create mode 100644 models/streamline/silver/decoder/realtime/streamline__decode_logs_realtime.sql diff --git a/.github/workflows/dbt_run_scheduled_non_realtime.yml b/.github/workflows/dbt_run_scheduled_non_realtime.yml index 8579184..0919170 100644 --- a/.github/workflows/dbt_run_scheduled_non_realtime.yml +++ b/.github/workflows/dbt_run_scheduled_non_realtime.yml @@ -41,4 +41,4 @@ jobs: dbt deps - name: Run DBT Jobs run: | - dbt run -m "blast_models,tag:non_realtime" \ No newline at end of file + dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True}' -m "blast_models,tag:non_realtime" "blast_models,tag:streamline_decoded_logs_complete" "blast_models,tag:streamline_decoded_logs_realtime" \ No newline at end of file diff --git a/.github/workflows/dbt_run_streamline_decoder.yml b/.github/workflows/dbt_run_streamline_decoder.yml new file mode 100644 index 0000000..60755a4 --- /dev/null +++ b/.github/workflows/dbt_run_streamline_decoder.yml @@ -0,0 +1,44 @@ +name: dbt_run_streamline_decoder +run-name: dbt_run_streamline_decoder + +on: + workflow_dispatch: + branches: + - "main" + +env: + DBT_PROFILES_DIR: ./ + + ACCOUNT: "${{ vars.ACCOUNT }}" + ROLE: "${{ vars.ROLE }}" + USER: "${{ vars.USER }}" + PASSWORD: "${{ secrets.PASSWORD }}" + REGION: "${{ vars.REGION }}" + DATABASE: "${{ vars.DATABASE }}" + WAREHOUSE: "${{ vars.WAREHOUSE }}" + SCHEMA: "${{ vars.SCHEMA }}" + +concurrency: + group: ${{ github.workflow }} + +jobs: + run_dbt_jobs: + runs-on: ubuntu-latest + environment: + name: workflow_prod + + steps: + - uses: actions/checkout@v3 + + - uses: actions/setup-python@v4 + with: + python-version: "3.10" + cache: "pip" + + - name: install dependencies + run: | + pip install -r requirements.txt + dbt deps + - name: Run DBT Jobs + run: | + dbt run -m "blast_models,tag:decoded_logs" \ No newline at end of file diff --git a/.github/workflows/dbt_run_streamline_decoder_history.yml b/.github/workflows/dbt_run_streamline_decoder_history.yml new file mode 100644 index 0000000..8913a3d --- /dev/null +++ b/.github/workflows/dbt_run_streamline_decoder_history.yml @@ -0,0 +1,45 @@ +name: dbt_run_streamline_decoder_history +run-name: dbt_run_streamline_decoder_history + +on: + workflow_dispatch: + schedule: + # Runs "at 1:05 UTC AM" (see https://crontab.guru) + - cron: '5 1 * * *' + +env: + DBT_PROFILES_DIR: ./ + + ACCOUNT: "${{ vars.ACCOUNT }}" + ROLE: "${{ vars.ROLE }}" + USER: "${{ vars.USER }}" + PASSWORD: "${{ secrets.PASSWORD }}" + REGION: "${{ vars.REGION }}" + DATABASE: "${{ vars.DATABASE }}" + WAREHOUSE: "${{ vars.WAREHOUSE }}" + SCHEMA: "${{ vars.SCHEMA }}" + +concurrency: + group: ${{ github.workflow }} + +jobs: + run_dbt_jobs: + runs-on: ubuntu-latest + environment: + name: workflow_prod + + steps: + - uses: actions/checkout@v3 + + - uses: actions/setup-python@v4 + with: + python-version: "3.10" + cache: "pip" + + - name: install dependencies + run: | + pip install -r requirements.txt + dbt deps + - name: Run DBT Jobs + run: | + dbt run --threads 8 --vars '{"STREAMLINE_INVOKE_STREAMS":True,"WAIT":120}' -m "blast_models,tag:streamline_decoded_logs_complete" "blast_models,tag:streamline_decoded_logs_history" \ No newline at end of file diff --git a/data/github_actions__workflows.csv b/data/github_actions__workflows.csv index c7fccc6..6876a8e 100644 --- a/data/github_actions__workflows.csv +++ b/data/github_actions__workflows.csv @@ -1,4 +1,5 @@ workflow_name,workflow_schedule dbt_run_scheduled_non_realtime,"17,47 * * * *" dbt_run_streamline_chainhead,"10,40 * * * *" +dbt_run_streamline_decoder,"25,55 * * * *" dbt_test_tasks,"10 * * * *" \ No newline at end of file diff --git a/macros/create_udfs.sql b/macros/create_udfs.sql index bb72c5e..efe9eaf 100644 --- a/macros/create_udfs.sql +++ b/macros/create_udfs.sql @@ -8,7 +8,8 @@ ) }} {{ create_udf_rest_api() }} {{ create_aws_blast_api() }} - + {{ create_udf_bulk_decode_logs() }} + {% endset %} {% do run_query(sql) %} {{- fsc_utils.create_udfs() -}} diff --git a/macros/streamline/streamline_udfs.sql b/macros/streamline/streamline_udfs.sql index d33d670..9ed17c4 100644 --- a/macros/streamline/streamline_udfs.sql +++ b/macros/streamline/streamline_udfs.sql @@ -8,4 +8,15 @@ {% else %} aws_blast_api_dev AS 'https://y9d0tuavh6.execute-api.us-east-1.amazonaws.com/stg/udf_bulk_rest_api' {%- endif %}; +{% endmacro %} + +{% macro create_udf_bulk_decode_logs() %} + CREATE + OR REPLACE EXTERNAL FUNCTION streamline.udf_bulk_decode_logs( + json OBJECT + ) returns ARRAY api_integration = {% if target.name == "prod" %} + aws_blast_api AS 'https://42gzudc5si.execute-api.us-east-1.amazonaws.com/prod/bulk_decode_logs' + {% else %} + aws_blast_api_dev AS'https://y9d0tuavh6.execute-api.us-east-1.amazonaws.com/stg/bulk_decode_logs' + {%- endif %}; {% endmacro %} \ No newline at end of file diff --git a/models/doc_descriptions/general/__overview__.md b/models/doc_descriptions/general/__overview__.md index e388d95..2a09227 100644 --- a/models/doc_descriptions/general/__overview__.md +++ b/models/doc_descriptions/general/__overview__.md @@ -25,6 +25,7 @@ There is more information on how to use dbt docs in the last section of this doc **Fact Tables:** - [fact_blocks](https://flipsidecrypto.github.io/blast-models/#!/model/model.blast_models.core__fact_blocks) - [fact_event_logs](https://flipsidecrypto.github.io/blast-models/#!/model/model.blast_models.core__fact_event_logs) +- [fact_decoded_event_logs](https://flipsidecrypto.github.io/blast-models/#!/model/model.blast_models.core__fact_decoded_event_logs) - [fact_traces](https://flipsidecrypto.github.io/blast-models/#!/model/model.blast_models.core__fact_traces) - [fact_transactions](https://flipsidecrypto.github.io/blast-models/#!/model/model.blast_models.core__fact_transactions) - [fact_token_transfers](https://flipsidecrypto.github.io/blast-models/#!/model/model.blast_models.core__fact_token_transfers) @@ -32,6 +33,7 @@ There is more information on how to use dbt docs in the last section of this doc **Convenience Tables:** - [ez_native_transfers](https://flipsidecrypto.github.io/blast-models/#!/model/model.blast_models.core__ez_native_transfers) - [ez_token_transfers](https://flipsidecrypto.github.io/blast-models/#!/model/model.blast_models.core__ez_token_transfers) +- [ez_decoded_event_logs](https://flipsidecrypto.github.io/blast-models/#!/model/model.blast_models.core__ez_decoded_event_logs) ### Price Tables (blast.price) - [fact_hourly_token_prices](https://flipsidecrypto.github.io/blast-models/#!/model/model.blast_models.price__fact_hourly_token_prices) diff --git a/models/gold/core/core__ez_decoded_event_logs.sql b/models/gold/core/core__ez_decoded_event_logs.sql new file mode 100644 index 0000000..792e1bf --- /dev/null +++ b/models/gold/core/core__ez_decoded_event_logs.sql @@ -0,0 +1,30 @@ +{{ config( + materialized = 'view', + persist_docs ={ "relation": true, + "columns": true } +) }} + +SELECT + block_number, + block_timestamp, + tx_hash, + event_index, + contract_address, + token_name AS contract_name, + event_name, + decoded_flat AS decoded_log, + decoded_data AS full_decoded_log, + origin_function_signature, + origin_from_address, + origin_to_address, + topics, + DATA, + event_removed, + tx_status, + decoded_logs_id AS ez_decoded_event_logs_id, + GREATEST(COALESCE(l.inserted_timestamp, '2000-01-01'), COALESCE(C.inserted_timestamp, '2000-01-01')) AS inserted_timestamp, + GREATEST(COALESCE(l.modified_timestamp, '2000-01-01'), COALESCE(C.modified_timestamp, '2000-01-01')) AS modified_timestamp +FROM + {{ ref('silver__decoded_logs') }} + l + LEFT JOIN {{ ref('silver__contracts') }} C USING (contract_address) diff --git a/models/gold/core/core__ez_decoded_event_logs.yml b/models/gold/core/core__ez_decoded_event_logs.yml new file mode 100644 index 0000000..a1b55fe --- /dev/null +++ b/models/gold/core/core__ez_decoded_event_logs.yml @@ -0,0 +1,65 @@ +version: 2 +models: + - name: core__ez_decoded_event_logs + description: > + 'For information on how to submit a contract for decoding, as well as how ABIs are sourced, please visit [here](https://science.flipsidecrypto.xyz/abi-requestor/). + This model contains decoded event logs for contracts that we have an ABI for. Please note, this table does not include all event logs, only those that we have an ABI for. + The `decoded_log` column is the easiest place to query decoded data. It is a JSON object, where the keys are the names of the event parameters, and the values are the values of the event parameters. + You can select from this column using the following sample format, `decoded_log:from::string` or more generally, `decoded_log:::datatype`. See below for a full sample query. + The `full_decoded_logs` column contains the same information, as well as additional fields such as the datatype of the decoded data. You may need to laterally flatten this column to query the data. + + Sample query for USDC Transfer events: + + ```sql + select + tx_hash, + block_number, + contract_address, + decoded_log:from::string as from_address, + decoded_log:to::string as to_address, + decoded_log:value::integer as value + from ethereum.core.fact_decoded_event_logs + where contract_address = lower('0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48') + and block_number between 16400000 and 16405000 + and event_name = 'Transfer' + limit 50```' + + columns: + - name: BLOCK_NUMBER + description: '{{ doc("blast_block_number") }}' + - name: BLOCK_TIMESTAMP + description: '{{ doc("blast_block_timestamp") }}' + - name: TX_HASH + description: '{{ doc("blast_logs_tx_hash") }}' + - name: EVENT_INDEX + description: '{{ doc("blast_event_index") }}' + - name: CONTRACT_ADDRESS + description: '{{ doc("blast_logs_contract_address") }}' + - name: CONTRACT_NAME + description: 'The name of the contract, if the contract has a name() function.' + - name: EVENT_NAME + description: 'The name of the event, as defined in the contract ABI.' + - name: DECODED_LOG + description: 'The flattened decoded log, where the keys are the names of the event parameters, and the values are the values of the event parameters.' + - name: FULL_DECODED_LOG + description: 'The full decoded log, including the event name, the event parameters, and the data type of the event parameters.' + - name: ORIGIN_FUNCTION_SIGNATURE + description: '{{ doc("blast_tx_origin_sig") }}' + - name: ORIGIN_FROM_ADDRESS + description: '{{ doc("blast_origin_from") }}' + - name: ORIGIN_TO_ADDRESS + description: '{{ doc("blast_origin_to") }}' + - name: TOPICS + description: '{{ doc("blast_topics") }}' + - name: DATA + description: '{{ doc("blast_logs_data") }}' + - name: EVENT_REMOVED + description: '{{ doc("blast_event_removed") }}' + - name: TX_STATUS + description: '{{ doc("blast_tx_status") }}' + - name: EZ_DECODED_EVENT_LOGS_ID + description: '{{ doc("pk") }}' + - name: INSERTED_TIMESTAMP + description: '{{ doc("inserted_timestamp") }}' + - name: MODIFIED_TIMESTAMP + description: '{{ doc("modified_timestamp") }}' \ No newline at end of file diff --git a/models/gold/core/core__fact_decoded_event_logs.sql b/models/gold/core/core__fact_decoded_event_logs.sql new file mode 100644 index 0000000..7a2a518 --- /dev/null +++ b/models/gold/core/core__fact_decoded_event_logs.sql @@ -0,0 +1,20 @@ +{{ config( + materialized = 'view', + persist_docs ={ "relation": true, + "columns": true } +) }} + +SELECT + block_number, + block_timestamp, + tx_hash, + event_index, + contract_address, + event_name, + decoded_flat AS decoded_log, + decoded_data AS full_decoded_log, + decoded_logs_id AS fact_decoded_event_logs_id, + inserted_timestamp, + modified_timestamp +FROM + {{ ref('silver__decoded_logs') }} diff --git a/models/gold/core/core__fact_decoded_event_logs.yml b/models/gold/core/core__fact_decoded_event_logs.yml new file mode 100644 index 0000000..dc25608 --- /dev/null +++ b/models/gold/core/core__fact_decoded_event_logs.yml @@ -0,0 +1,49 @@ +version: 2 +models: + - name: core__fact_decoded_event_logs + description: > + 'For information on how to submit a contract for decoding, as well as how ABIs are sourced, please visit [here](https://science.flipsidecrypto.xyz/abi-requestor/). + This model contains decoded event logs for contracts that we have an ABI for. Please note, this table does not include all event logs, only those that we have an ABI for. + This table will perform better than the `core__ez_decoded_event_logs` table, but does not include as many columns. + The `decoded_log` column is the easiest place to query decoded data. It is a JSON object, where the keys are the names of the event parameters, and the values are the values of the event parameters. + You can select from this column using the following sample format, `decoded_log:from::string` or more generally, `decoded_log:::datatype`. See below for a full sample query. + The `full_decoded_logs` column contains the same information, as well as additional fields such as the datatype of the decoded data. You may need to laterally flatten this column to query the data. + + Sample query for USDC Transfer events: + ```sql + select + tx_hash, + block_number, + contract_address, + decoded_log:from::string as from_address, + decoded_log:to::string as to_address, + decoded_log:value::integer as value + from ethereum.core.fact_decoded_event_logs + where contract_address = lower('0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48') + and block_number between 16400000 and 16405000 + and event_name = 'Transfer' + limit 50```' + + columns: + - name: BLOCK_NUMBER + description: '{{ doc("blast_block_number") }}' + - name: BLOCK_TIMESTAMP + description: '{{ doc("blast_block_timestamp") }}' + - name: TX_HASH + description: '{{ doc("blast_logs_tx_hash") }}' + - name: EVENT_INDEX + description: '{{ doc("blast_event_index") }}' + - name: CONTRACT_ADDRESS + description: '{{ doc("blast_logs_contract_address") }}' + - name: EVENT_NAME + description: 'The name of the event, as defined in the contract ABI.' + - name: DECODED_LOG + description: 'The flattened decoded log, where the keys are the names of the event parameters, and the values are the values of the event parameters.' + - name: FULL_DECODED_LOG + description: 'The full decoded log, including the event name, the event parameters, and the data type of the event parameters.' + - name: FACT_DECODED_EVENT_LOGS_ID + description: '{{ doc("pk") }}' + - name: INSERTED_TIMESTAMP + description: '{{ doc("inserted_timestamp") }}' + - name: MODIFIED_TIMESTAMP + description: '{{ doc("modified_timestamp") }}' diff --git a/models/silver/core/silver__decoded_logs.sql b/models/silver/core/silver__decoded_logs.sql new file mode 100644 index 0000000..eaa7341 --- /dev/null +++ b/models/silver/core/silver__decoded_logs.sql @@ -0,0 +1,238 @@ +-- depends_on: {{ ref('bronze__decoded_logs') }} +{{ config ( + materialized = "incremental", + unique_key = ['block_number', 'event_index'], + cluster_by = "block_timestamp::date", + incremental_predicates = ["dynamic_range", "block_number"], + post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION", + merge_exclude_columns = ["inserted_timestamp"], + full_refresh = false, + tags = ['decoded_logs','reorg'] +) }} + +WITH base_data AS ( + + SELECT + block_number :: INTEGER AS block_number, + SPLIT( + id, + '-' + ) [0] :: STRING AS tx_hash, + SPLIT( + id, + '-' + ) [1] :: INTEGER AS event_index, + DATA :name :: STRING AS event_name, + LOWER( + DATA :address :: STRING + ) :: STRING AS contract_address, + DATA AS decoded_data, + id :: STRING AS _log_id, + TO_TIMESTAMP_NTZ(_inserted_timestamp) AS _inserted_timestamp + FROM + +{% if is_incremental() %} +{{ ref('bronze__decoded_logs') }} +WHERE + TO_TIMESTAMP_NTZ(_inserted_timestamp) >= ( + SELECT + MAX(_inserted_timestamp) + FROM + {{ this }} + ) + AND DATA NOT ILIKE '%Event topic is not present in given ABI%' +{% else %} + {{ ref('bronze__fr_decoded_logs') }} +WHERE + DATA NOT ILIKE '%Event topic is not present in given ABI%' +{% endif %} + +qualify(ROW_NUMBER() over (PARTITION BY block_number, event_index +ORDER BY + _inserted_timestamp DESC, _partition_by_created_date DESC)) = 1 +), +transformed_logs AS ( + SELECT + block_number, + tx_hash, + event_index, + contract_address, + event_name, + decoded_data, + _inserted_timestamp, + _log_id, + utils.udf_transform_logs(decoded_data) AS transformed + FROM + base_data +), +FINAL AS ( + SELECT + b.tx_hash, + b.block_number, + b.event_index, + b.event_name, + b.contract_address, + b.decoded_data, + transformed, + b._log_id, + b._inserted_timestamp, + OBJECT_AGG( + DISTINCT CASE + WHEN v.value :name = '' THEN CONCAT( + 'anonymous_', + v.index + ) + ELSE v.value :name + END, + v.value :value + ) AS decoded_flat + FROM + transformed_logs b, + LATERAL FLATTEN( + input => transformed :data + ) v + GROUP BY + b.tx_hash, + b.block_number, + b.event_index, + b.event_name, + b.contract_address, + b.decoded_data, + transformed, + b._log_id, + b._inserted_timestamp +), +new_records AS ( + SELECT + b.tx_hash, + b.block_number, + b.event_index, + b.event_name, + b.contract_address, + b.decoded_data, + b.transformed, + b._log_id, + b._inserted_timestamp, + b.decoded_flat, + block_timestamp, + origin_function_signature, + origin_from_address, + origin_to_address, + topics, + DATA, + event_removed :: STRING AS event_removed, + tx_status, + CASE + WHEN block_timestamp IS NULL THEN TRUE + ELSE FALSE + END AS is_pending + FROM + FINAL b + LEFT JOIN {{ ref('silver__logs') }} USING ( + block_number, + _log_id + ) +) + +{% if is_incremental() %}, +missing_data AS ( + SELECT + t.tx_hash, + t.block_number, + t.event_index, + t.event_name, + t.contract_address, + t.decoded_data, + t.transformed, + t._log_id, + GREATEST( + TO_TIMESTAMP_NTZ( + t._inserted_timestamp + ), + TO_TIMESTAMP_NTZ( + l._inserted_timestamp + ) + ) AS _inserted_timestamp, + t.decoded_flat, + l.block_timestamp, + l.origin_function_signature, + l.origin_from_address, + l.origin_to_address, + l.topics, + l.data, + l.event_removed :: STRING AS event_removed, + l.tx_status, + FALSE AS is_pending + FROM + {{ this }} + t + INNER JOIN {{ ref('silver__logs') }} + l USING ( + block_number, + _log_id + ) + WHERE + t.is_pending + AND l.block_timestamp IS NOT NULL +) +{% endif %} +SELECT + tx_hash, + block_number, + event_index, + event_name, + contract_address, + decoded_data, + transformed, + _log_id, + _inserted_timestamp, + decoded_flat, + block_timestamp, + origin_function_signature, + origin_from_address, + origin_to_address, + topics, + DATA, + event_removed, + tx_status, + is_pending, + {{ dbt_utils.generate_surrogate_key( + ['tx_hash', 'event_index'] + ) }} AS decoded_logs_id, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp, + '{{ invocation_id }}' AS _invocation_id +FROM + new_records + +{% if is_incremental() %} +UNION +SELECT + tx_hash, + block_number, + event_index, + event_name, + contract_address, + decoded_data, + transformed, + _log_id, + _inserted_timestamp, + decoded_flat, + block_timestamp, + origin_function_signature, + origin_from_address, + origin_to_address, + topics, + DATA, + event_removed, + tx_status, + is_pending, + {{ dbt_utils.generate_surrogate_key( + ['tx_hash', 'event_index'] + ) }} AS decoded_logs_id, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp, + '{{ invocation_id }}' AS _invocation_id +FROM + missing_data +{% endif %} diff --git a/models/silver/core/tests/decoded_logs/test_silver__decoded_logs_full.sql b/models/silver/core/tests/decoded_logs/test_silver__decoded_logs_full.sql new file mode 100644 index 0000000..1dcc890 --- /dev/null +++ b/models/silver/core/tests/decoded_logs/test_silver__decoded_logs_full.sql @@ -0,0 +1,9 @@ +{{ config ( + materialized = 'view', + tags = ['full_test'] +) }} + +SELECT + * +FROM + {{ ref('silver__decoded_logs') }} diff --git a/models/silver/core/tests/decoded_logs/test_silver__decoded_logs_full.yml b/models/silver/core/tests/decoded_logs/test_silver__decoded_logs_full.yml new file mode 100644 index 0000000..436c8e7 --- /dev/null +++ b/models/silver/core/tests/decoded_logs/test_silver__decoded_logs_full.yml @@ -0,0 +1,52 @@ +version: 2 +models: + - name: test_silver__decoded_logs_full + tests: + - dbt_utils.unique_combination_of_columns: + combination_of_columns: + - _LOG_ID + - dbt_utils.recency: + datepart: day + field: _INSERTED_TIMESTAMP + interval: 1 + + columns: + - name: BLOCK_NUMBER + tests: + - not_null + - dbt_expectations.expect_column_values_to_be_in_type_list: + column_type_list: + - NUMBER + - FLOAT + - name: TX_HASH + tests: + - not_null + - dbt_expectations.expect_column_values_to_match_regex: + regex: 0[xX][0-9a-fA-F]+ + - name: EVENT_INDEX + tests: + - not_null + - dbt_expectations.expect_column_values_to_be_in_type_list: + column_type_list: + - NUMBER + - FLOAT + - name: CONTRACT_ADDRESS + tests: + - not_null + - dbt_expectations.expect_column_values_to_match_regex: + regex: 0[xX][0-9a-fA-F]+ + - name: _INSERTED_TIMESTAMP + tests: + - not_null + - name: EVENT_NAME + tests: + - dbt_expectations.expect_column_values_to_be_in_type_list: + column_type_list: + - STRING + - VARCHAR + + + + + + diff --git a/models/silver/core/tests/decoded_logs/test_silver__decoded_logs_recent.sql b/models/silver/core/tests/decoded_logs/test_silver__decoded_logs_recent.sql new file mode 100644 index 0000000..3587de3 --- /dev/null +++ b/models/silver/core/tests/decoded_logs/test_silver__decoded_logs_recent.sql @@ -0,0 +1,23 @@ +{{ config ( + materialized = 'view', + tags = ['recent_test'] +) }} + +WITH last_3_days AS ( + + SELECT + block_number + FROM + {{ ref("_block_lookback") }} +) +SELECT + * +FROM + {{ ref('silver__decoded_logs') }} +WHERE + block_number >= ( + SELECT + block_number + FROM + last_3_days + ) diff --git a/models/silver/core/tests/decoded_logs/test_silver__decoded_logs_recent.yml b/models/silver/core/tests/decoded_logs/test_silver__decoded_logs_recent.yml new file mode 100644 index 0000000..d331274 --- /dev/null +++ b/models/silver/core/tests/decoded_logs/test_silver__decoded_logs_recent.yml @@ -0,0 +1,56 @@ +version: 2 +models: + - name: test_silver__decoded_logs_recent + tests: + - dbt_utils.unique_combination_of_columns: + combination_of_columns: + - _LOG_ID + - dbt_utils.recency: + datepart: day + field: _INSERTED_TIMESTAMP + interval: 1 + - fsc_utils.recent_decoded_logs_match: + config: + severity: error + error_if: ">0" + + columns: + - name: BLOCK_NUMBER + tests: + - not_null + - dbt_expectations.expect_column_values_to_be_in_type_list: + column_type_list: + - NUMBER + - FLOAT + - name: TX_HASH + tests: + - not_null + - dbt_expectations.expect_column_values_to_match_regex: + regex: 0[xX][0-9a-fA-F]+ + - name: EVENT_INDEX + tests: + - not_null + - dbt_expectations.expect_column_values_to_be_in_type_list: + column_type_list: + - NUMBER + - FLOAT + - name: CONTRACT_ADDRESS + tests: + - not_null + - dbt_expectations.expect_column_values_to_match_regex: + regex: 0[xX][0-9a-fA-F]+ + - name: _INSERTED_TIMESTAMP + tests: + - not_null + - name: EVENT_NAME + tests: + - dbt_expectations.expect_column_values_to_be_in_type_list: + column_type_list: + - STRING + - VARCHAR + + + + + + diff --git a/models/silver/core/prices/silver__asset_metadata_all_providers.sql b/models/silver/prices/silver__asset_metadata_all_providers.sql similarity index 100% rename from models/silver/core/prices/silver__asset_metadata_all_providers.sql rename to models/silver/prices/silver__asset_metadata_all_providers.sql diff --git a/models/silver/core/prices/silver__asset_metadata_all_providers.yml b/models/silver/prices/silver__asset_metadata_all_providers.yml similarity index 100% rename from models/silver/core/prices/silver__asset_metadata_all_providers.yml rename to models/silver/prices/silver__asset_metadata_all_providers.yml diff --git a/models/silver/core/prices/silver__asset_metadata_priority.sql b/models/silver/prices/silver__asset_metadata_priority.sql similarity index 100% rename from models/silver/core/prices/silver__asset_metadata_priority.sql rename to models/silver/prices/silver__asset_metadata_priority.sql diff --git a/models/silver/core/prices/silver__asset_metadata_priority.yml b/models/silver/prices/silver__asset_metadata_priority.yml similarity index 100% rename from models/silver/core/prices/silver__asset_metadata_priority.yml rename to models/silver/prices/silver__asset_metadata_priority.yml diff --git a/models/silver/core/prices/silver__hourly_prices_all_providers.sql b/models/silver/prices/silver__hourly_prices_all_providers.sql similarity index 100% rename from models/silver/core/prices/silver__hourly_prices_all_providers.sql rename to models/silver/prices/silver__hourly_prices_all_providers.sql diff --git a/models/silver/core/prices/silver__hourly_prices_all_providers.yml b/models/silver/prices/silver__hourly_prices_all_providers.yml similarity index 100% rename from models/silver/core/prices/silver__hourly_prices_all_providers.yml rename to models/silver/prices/silver__hourly_prices_all_providers.yml diff --git a/models/silver/core/prices/silver__hourly_prices_priority.sql b/models/silver/prices/silver__hourly_prices_priority.sql similarity index 100% rename from models/silver/core/prices/silver__hourly_prices_priority.sql rename to models/silver/prices/silver__hourly_prices_priority.sql diff --git a/models/silver/core/prices/silver__hourly_prices_priority.yml b/models/silver/prices/silver__hourly_prices_priority.yml similarity index 100% rename from models/silver/core/prices/silver__hourly_prices_priority.yml rename to models/silver/prices/silver__hourly_prices_priority.yml diff --git a/models/silver/core/prices/silver__hourly_prices_priority_eth.sql b/models/silver/prices/silver__hourly_prices_priority_eth.sql similarity index 100% rename from models/silver/core/prices/silver__hourly_prices_priority_eth.sql rename to models/silver/prices/silver__hourly_prices_priority_eth.sql diff --git a/models/silver/core/prices/silver__hourly_prices_priority_eth.yml b/models/silver/prices/silver__hourly_prices_priority_eth.yml similarity index 100% rename from models/silver/core/prices/silver__hourly_prices_priority_eth.yml rename to models/silver/prices/silver__hourly_prices_priority_eth.yml diff --git a/models/streamline/bronze/bronze__streamline_FR_blocks.sql b/models/streamline/bronze/core/bronze__streamline_FR_blocks.sql similarity index 100% rename from models/streamline/bronze/bronze__streamline_FR_blocks.sql rename to models/streamline/bronze/core/bronze__streamline_FR_blocks.sql diff --git a/models/streamline/bronze/bronze__streamline_FR_confirm_blocks.sql b/models/streamline/bronze/core/bronze__streamline_FR_confirm_blocks.sql similarity index 100% rename from models/streamline/bronze/bronze__streamline_FR_confirm_blocks.sql rename to models/streamline/bronze/core/bronze__streamline_FR_confirm_blocks.sql diff --git a/models/streamline/bronze/bronze__streamline_FR_receipts.sql b/models/streamline/bronze/core/bronze__streamline_FR_receipts.sql similarity index 100% rename from models/streamline/bronze/bronze__streamline_FR_receipts.sql rename to models/streamline/bronze/core/bronze__streamline_FR_receipts.sql diff --git a/models/streamline/bronze/bronze__streamline_FR_traces.sql b/models/streamline/bronze/core/bronze__streamline_FR_traces.sql similarity index 100% rename from models/streamline/bronze/bronze__streamline_FR_traces.sql rename to models/streamline/bronze/core/bronze__streamline_FR_traces.sql diff --git a/models/streamline/bronze/bronze__streamline_FR_transactions.sql b/models/streamline/bronze/core/bronze__streamline_FR_transactions.sql similarity index 100% rename from models/streamline/bronze/bronze__streamline_FR_transactions.sql rename to models/streamline/bronze/core/bronze__streamline_FR_transactions.sql diff --git a/models/streamline/bronze/bronze__streamline_blocks.sql b/models/streamline/bronze/core/bronze__streamline_blocks.sql similarity index 100% rename from models/streamline/bronze/bronze__streamline_blocks.sql rename to models/streamline/bronze/core/bronze__streamline_blocks.sql diff --git a/models/streamline/bronze/bronze__streamline_confirm_blocks.sql b/models/streamline/bronze/core/bronze__streamline_confirm_blocks.sql similarity index 100% rename from models/streamline/bronze/bronze__streamline_confirm_blocks.sql rename to models/streamline/bronze/core/bronze__streamline_confirm_blocks.sql diff --git a/models/streamline/bronze/bronze__streamline_receipts.sql b/models/streamline/bronze/core/bronze__streamline_receipts.sql similarity index 100% rename from models/streamline/bronze/bronze__streamline_receipts.sql rename to models/streamline/bronze/core/bronze__streamline_receipts.sql diff --git a/models/streamline/bronze/bronze__streamline_traces.sql b/models/streamline/bronze/core/bronze__streamline_traces.sql similarity index 100% rename from models/streamline/bronze/bronze__streamline_traces.sql rename to models/streamline/bronze/core/bronze__streamline_traces.sql diff --git a/models/streamline/bronze/bronze__streamline_transactions.sql b/models/streamline/bronze/core/bronze__streamline_transactions.sql similarity index 100% rename from models/streamline/bronze/bronze__streamline_transactions.sql rename to models/streamline/bronze/core/bronze__streamline_transactions.sql diff --git a/models/streamline/bronze/decoder/bronze__decoded_logs.sql b/models/streamline/bronze/decoder/bronze__decoded_logs.sql new file mode 100644 index 0000000..bd43f6f --- /dev/null +++ b/models/streamline/bronze/decoder/bronze__decoded_logs.sql @@ -0,0 +1,41 @@ +{{ config ( + materialized = 'view' +) }} + +WITH meta AS ( + + SELECT + last_modified AS _inserted_timestamp, + file_name, + CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 6), '_', 1) AS INTEGER) AS _partition_by_block_number, + TO_DATE( + concat_ws('-', SPLIT_PART(file_name, '/', 3), SPLIT_PART(file_name, '/', 4), SPLIT_PART(file_name, '/', 5)) + ) AS _partition_by_created_date + FROM + TABLE( + information_schema.external_table_file_registration_history( + start_time => DATEADD('day', -3, CURRENT_TIMESTAMP()), + table_name => '{{ source( "bronze_streamline", "decoded_logs") }}') + ) A + ) + SELECT + block_number, + id :: STRING AS id, + DATA, + _inserted_timestamp, + s._partition_by_block_number AS _partition_by_block_number, + s._partition_by_created_date AS _partition_by_created_date + FROM + {{ source( + "bronze_streamline", + "decoded_logs" + ) }} + s + JOIN meta b + ON b.file_name = metadata$filename + AND b._partition_by_block_number = s._partition_by_block_number + AND b._partition_by_created_date = s._partition_by_created_date + WHERE + b._partition_by_block_number = s._partition_by_block_number + AND b._partition_by_created_date = s._partition_by_created_date + AND s._partition_by_created_date >= DATEADD('day', -2, CURRENT_TIMESTAMP()) diff --git a/models/streamline/bronze/decoder/bronze__fr_decoded_logs.sql b/models/streamline/bronze/decoder/bronze__fr_decoded_logs.sql new file mode 100644 index 0000000..4e4a1c8 --- /dev/null +++ b/models/streamline/bronze/decoder/bronze__fr_decoded_logs.sql @@ -0,0 +1,40 @@ +{{ config ( + materialized = 'view' +) }} + +WITH meta AS ( + + SELECT + registered_on AS _inserted_timestamp, + file_name, + CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 6), '_', 1) AS INTEGER) AS _partition_by_block_number, + TO_DATE( + concat_ws('-', SPLIT_PART(file_name, '/', 3), SPLIT_PART(file_name, '/', 4), SPLIT_PART(file_name, '/', 5)) + ) AS _partition_by_created_date + FROM + TABLE( + information_schema.external_table_files( + table_name => '{{ source( "bronze_streamline", "decoded_logs") }}' + ) + ) A +) +SELECT + block_number, + id :: STRING AS id, + DATA, + _inserted_timestamp, + s._partition_by_block_number AS _partition_by_block_number, + s._partition_by_created_date AS _partition_by_created_date +FROM + {{ source( + "bronze_streamline", + "decoded_logs" + ) }} + s + JOIN meta b + ON b.file_name = metadata$filename + AND b._partition_by_block_number = s._partition_by_block_number + AND b._partition_by_created_date = s._partition_by_created_date +WHERE + b._partition_by_block_number = s._partition_by_block_number + AND b._partition_by_created_date = s._partition_by_created_date diff --git a/models/streamline/silver/decoder/complete/streamline__complete_decode_logs.sql b/models/streamline/silver/decoder/complete/streamline__complete_decode_logs.sql new file mode 100644 index 0000000..4130953 --- /dev/null +++ b/models/streamline/silver/decoder/complete/streamline__complete_decode_logs.sql @@ -0,0 +1,32 @@ +-- depends_on: {{ ref('bronze__decoded_logs') }} +{{ config ( + materialized = "incremental", + unique_key = "_log_id", + cluster_by = "ROUND(block_number, -3)", + incremental_predicates = ["dynamic_range", "block_number"], + merge_update_columns = ["_log_id"], + post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(_log_id)", + tags = ['streamline_decoded_logs_complete'] +) }} + +SELECT + block_number, + id AS _log_id, + _inserted_timestamp +FROM + +{% if is_incremental() %} +{{ ref('bronze__decoded_logs') }} +WHERE + TO_TIMESTAMP_NTZ(_inserted_timestamp) >= ( + SELECT + COALESCE(MAX(TO_TIMESTAMP_NTZ(_inserted_timestamp)), '1970-01-01 00:00:00') _inserted_timestamp + FROM + {{ this }}) + {% else %} + {{ ref('bronze__fr_decoded_logs') }} + {% endif %} + + qualify(ROW_NUMBER() over (PARTITION BY id + ORDER BY + _inserted_timestamp DESC)) = 1 diff --git a/models/streamline/silver/decoder/history/streamline__decode_logs_history_000000001_002500000.sql b/models/streamline/silver/decoder/history/streamline__decode_logs_history_000000001_002500000.sql new file mode 100644 index 0000000..ab65ae5 --- /dev/null +++ b/models/streamline/silver/decoder/history/streamline__decode_logs_history_000000001_002500000.sql @@ -0,0 +1,16 @@ +{{ config ( + materialized = "view", + post_hook = [if_data_call_function( + func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}', 'external_table', 'DECODED_LOGS', 'sql_limit', {{var('sql_limit','7500000')}}, 'producer_batch_size', {{var('producer_batch_size','20000000')}}, 'worker_batch_size', {{var('worker_batch_size','10000000')}}))", + target = "{{model.schema}}.{{model.alias}}" + ), + if_data_call_wait()], + tags = ['streamline_decoded_logs_history'] +) }} + +{% set start = this.identifier.split("_") [-2] %} +{% set stop = this.identifier.split("_") [-1] %} +{{ fsc_utils.decode_logs_history( + start, + stop +) }} diff --git a/models/streamline/silver/decoder/history/streamline__decode_logs_history_002500001_005000000.sql b/models/streamline/silver/decoder/history/streamline__decode_logs_history_002500001_005000000.sql new file mode 100644 index 0000000..ab65ae5 --- /dev/null +++ b/models/streamline/silver/decoder/history/streamline__decode_logs_history_002500001_005000000.sql @@ -0,0 +1,16 @@ +{{ config ( + materialized = "view", + post_hook = [if_data_call_function( + func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}', 'external_table', 'DECODED_LOGS', 'sql_limit', {{var('sql_limit','7500000')}}, 'producer_batch_size', {{var('producer_batch_size','20000000')}}, 'worker_batch_size', {{var('worker_batch_size','10000000')}}))", + target = "{{model.schema}}.{{model.alias}}" + ), + if_data_call_wait()], + tags = ['streamline_decoded_logs_history'] +) }} + +{% set start = this.identifier.split("_") [-2] %} +{% set stop = this.identifier.split("_") [-1] %} +{{ fsc_utils.decode_logs_history( + start, + stop +) }} diff --git a/models/streamline/silver/decoder/history/streamline__decode_logs_history_005000001_007500000.sql b/models/streamline/silver/decoder/history/streamline__decode_logs_history_005000001_007500000.sql new file mode 100644 index 0000000..ab65ae5 --- /dev/null +++ b/models/streamline/silver/decoder/history/streamline__decode_logs_history_005000001_007500000.sql @@ -0,0 +1,16 @@ +{{ config ( + materialized = "view", + post_hook = [if_data_call_function( + func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}', 'external_table', 'DECODED_LOGS', 'sql_limit', {{var('sql_limit','7500000')}}, 'producer_batch_size', {{var('producer_batch_size','20000000')}}, 'worker_batch_size', {{var('worker_batch_size','10000000')}}))", + target = "{{model.schema}}.{{model.alias}}" + ), + if_data_call_wait()], + tags = ['streamline_decoded_logs_history'] +) }} + +{% set start = this.identifier.split("_") [-2] %} +{% set stop = this.identifier.split("_") [-1] %} +{{ fsc_utils.decode_logs_history( + start, + stop +) }} diff --git a/models/streamline/silver/decoder/history/streamline__decode_logs_history_010000001_012500000.sql b/models/streamline/silver/decoder/history/streamline__decode_logs_history_010000001_012500000.sql new file mode 100644 index 0000000..ab65ae5 --- /dev/null +++ b/models/streamline/silver/decoder/history/streamline__decode_logs_history_010000001_012500000.sql @@ -0,0 +1,16 @@ +{{ config ( + materialized = "view", + post_hook = [if_data_call_function( + func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}', 'external_table', 'DECODED_LOGS', 'sql_limit', {{var('sql_limit','7500000')}}, 'producer_batch_size', {{var('producer_batch_size','20000000')}}, 'worker_batch_size', {{var('worker_batch_size','10000000')}}))", + target = "{{model.schema}}.{{model.alias}}" + ), + if_data_call_wait()], + tags = ['streamline_decoded_logs_history'] +) }} + +{% set start = this.identifier.split("_") [-2] %} +{% set stop = this.identifier.split("_") [-1] %} +{{ fsc_utils.decode_logs_history( + start, + stop +) }} diff --git a/models/streamline/silver/decoder/history/streamline__decode_logs_history_012500001_015000000.sql b/models/streamline/silver/decoder/history/streamline__decode_logs_history_012500001_015000000.sql new file mode 100644 index 0000000..ab65ae5 --- /dev/null +++ b/models/streamline/silver/decoder/history/streamline__decode_logs_history_012500001_015000000.sql @@ -0,0 +1,16 @@ +{{ config ( + materialized = "view", + post_hook = [if_data_call_function( + func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}', 'external_table', 'DECODED_LOGS', 'sql_limit', {{var('sql_limit','7500000')}}, 'producer_batch_size', {{var('producer_batch_size','20000000')}}, 'worker_batch_size', {{var('worker_batch_size','10000000')}}))", + target = "{{model.schema}}.{{model.alias}}" + ), + if_data_call_wait()], + tags = ['streamline_decoded_logs_history'] +) }} + +{% set start = this.identifier.split("_") [-2] %} +{% set stop = this.identifier.split("_") [-1] %} +{{ fsc_utils.decode_logs_history( + start, + stop +) }} diff --git a/models/streamline/silver/decoder/history/streamline__decode_logs_history_015000001_017500000.sql b/models/streamline/silver/decoder/history/streamline__decode_logs_history_015000001_017500000.sql new file mode 100644 index 0000000..ab65ae5 --- /dev/null +++ b/models/streamline/silver/decoder/history/streamline__decode_logs_history_015000001_017500000.sql @@ -0,0 +1,16 @@ +{{ config ( + materialized = "view", + post_hook = [if_data_call_function( + func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}', 'external_table', 'DECODED_LOGS', 'sql_limit', {{var('sql_limit','7500000')}}, 'producer_batch_size', {{var('producer_batch_size','20000000')}}, 'worker_batch_size', {{var('worker_batch_size','10000000')}}))", + target = "{{model.schema}}.{{model.alias}}" + ), + if_data_call_wait()], + tags = ['streamline_decoded_logs_history'] +) }} + +{% set start = this.identifier.split("_") [-2] %} +{% set stop = this.identifier.split("_") [-1] %} +{{ fsc_utils.decode_logs_history( + start, + stop +) }} diff --git a/models/streamline/silver/decoder/history/streamline__decode_logs_history_017500001_020000000.sql b/models/streamline/silver/decoder/history/streamline__decode_logs_history_017500001_020000000.sql new file mode 100644 index 0000000..ab65ae5 --- /dev/null +++ b/models/streamline/silver/decoder/history/streamline__decode_logs_history_017500001_020000000.sql @@ -0,0 +1,16 @@ +{{ config ( + materialized = "view", + post_hook = [if_data_call_function( + func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}', 'external_table', 'DECODED_LOGS', 'sql_limit', {{var('sql_limit','7500000')}}, 'producer_batch_size', {{var('producer_batch_size','20000000')}}, 'worker_batch_size', {{var('worker_batch_size','10000000')}}))", + target = "{{model.schema}}.{{model.alias}}" + ), + if_data_call_wait()], + tags = ['streamline_decoded_logs_history'] +) }} + +{% set start = this.identifier.split("_") [-2] %} +{% set stop = this.identifier.split("_") [-1] %} +{{ fsc_utils.decode_logs_history( + start, + stop +) }} diff --git a/models/streamline/silver/decoder/history/streamline__decode_logs_history_020000001_022500000.sql b/models/streamline/silver/decoder/history/streamline__decode_logs_history_020000001_022500000.sql new file mode 100644 index 0000000..ab65ae5 --- /dev/null +++ b/models/streamline/silver/decoder/history/streamline__decode_logs_history_020000001_022500000.sql @@ -0,0 +1,16 @@ +{{ config ( + materialized = "view", + post_hook = [if_data_call_function( + func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}', 'external_table', 'DECODED_LOGS', 'sql_limit', {{var('sql_limit','7500000')}}, 'producer_batch_size', {{var('producer_batch_size','20000000')}}, 'worker_batch_size', {{var('worker_batch_size','10000000')}}))", + target = "{{model.schema}}.{{model.alias}}" + ), + if_data_call_wait()], + tags = ['streamline_decoded_logs_history'] +) }} + +{% set start = this.identifier.split("_") [-2] %} +{% set stop = this.identifier.split("_") [-1] %} +{{ fsc_utils.decode_logs_history( + start, + stop +) }} diff --git a/models/streamline/silver/decoder/history/streamline__decode_logs_history_022500001_025000000.sql b/models/streamline/silver/decoder/history/streamline__decode_logs_history_022500001_025000000.sql new file mode 100644 index 0000000..ab65ae5 --- /dev/null +++ b/models/streamline/silver/decoder/history/streamline__decode_logs_history_022500001_025000000.sql @@ -0,0 +1,16 @@ +{{ config ( + materialized = "view", + post_hook = [if_data_call_function( + func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}', 'external_table', 'DECODED_LOGS', 'sql_limit', {{var('sql_limit','7500000')}}, 'producer_batch_size', {{var('producer_batch_size','20000000')}}, 'worker_batch_size', {{var('worker_batch_size','10000000')}}))", + target = "{{model.schema}}.{{model.alias}}" + ), + if_data_call_wait()], + tags = ['streamline_decoded_logs_history'] +) }} + +{% set start = this.identifier.split("_") [-2] %} +{% set stop = this.identifier.split("_") [-1] %} +{{ fsc_utils.decode_logs_history( + start, + stop +) }} diff --git a/models/streamline/silver/decoder/history/streamline__decode_logs_history_025000001_027500000.sql b/models/streamline/silver/decoder/history/streamline__decode_logs_history_025000001_027500000.sql new file mode 100644 index 0000000..ab65ae5 --- /dev/null +++ b/models/streamline/silver/decoder/history/streamline__decode_logs_history_025000001_027500000.sql @@ -0,0 +1,16 @@ +{{ config ( + materialized = "view", + post_hook = [if_data_call_function( + func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}', 'external_table', 'DECODED_LOGS', 'sql_limit', {{var('sql_limit','7500000')}}, 'producer_batch_size', {{var('producer_batch_size','20000000')}}, 'worker_batch_size', {{var('worker_batch_size','10000000')}}))", + target = "{{model.schema}}.{{model.alias}}" + ), + if_data_call_wait()], + tags = ['streamline_decoded_logs_history'] +) }} + +{% set start = this.identifier.split("_") [-2] %} +{% set stop = this.identifier.split("_") [-1] %} +{{ fsc_utils.decode_logs_history( + start, + stop +) }} diff --git a/models/streamline/silver/decoder/history/streamline__decode_logs_history_027500001_030000000.sql b/models/streamline/silver/decoder/history/streamline__decode_logs_history_027500001_030000000.sql new file mode 100644 index 0000000..ab65ae5 --- /dev/null +++ b/models/streamline/silver/decoder/history/streamline__decode_logs_history_027500001_030000000.sql @@ -0,0 +1,16 @@ +{{ config ( + materialized = "view", + post_hook = [if_data_call_function( + func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}', 'external_table', 'DECODED_LOGS', 'sql_limit', {{var('sql_limit','7500000')}}, 'producer_batch_size', {{var('producer_batch_size','20000000')}}, 'worker_batch_size', {{var('worker_batch_size','10000000')}}))", + target = "{{model.schema}}.{{model.alias}}" + ), + if_data_call_wait()], + tags = ['streamline_decoded_logs_history'] +) }} + +{% set start = this.identifier.split("_") [-2] %} +{% set stop = this.identifier.split("_") [-1] %} +{{ fsc_utils.decode_logs_history( + start, + stop +) }} diff --git a/models/streamline/silver/decoder/history/streamline__decode_logs_history_030000001_032500000.sql b/models/streamline/silver/decoder/history/streamline__decode_logs_history_030000001_032500000.sql new file mode 100644 index 0000000..ab65ae5 --- /dev/null +++ b/models/streamline/silver/decoder/history/streamline__decode_logs_history_030000001_032500000.sql @@ -0,0 +1,16 @@ +{{ config ( + materialized = "view", + post_hook = [if_data_call_function( + func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}', 'external_table', 'DECODED_LOGS', 'sql_limit', {{var('sql_limit','7500000')}}, 'producer_batch_size', {{var('producer_batch_size','20000000')}}, 'worker_batch_size', {{var('worker_batch_size','10000000')}}))", + target = "{{model.schema}}.{{model.alias}}" + ), + if_data_call_wait()], + tags = ['streamline_decoded_logs_history'] +) }} + +{% set start = this.identifier.split("_") [-2] %} +{% set stop = this.identifier.split("_") [-1] %} +{{ fsc_utils.decode_logs_history( + start, + stop +) }} diff --git a/models/streamline/silver/decoder/history/streamline__decode_logs_history_032500001_035000000.sql b/models/streamline/silver/decoder/history/streamline__decode_logs_history_032500001_035000000.sql new file mode 100644 index 0000000..ab65ae5 --- /dev/null +++ b/models/streamline/silver/decoder/history/streamline__decode_logs_history_032500001_035000000.sql @@ -0,0 +1,16 @@ +{{ config ( + materialized = "view", + post_hook = [if_data_call_function( + func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}', 'external_table', 'DECODED_LOGS', 'sql_limit', {{var('sql_limit','7500000')}}, 'producer_batch_size', {{var('producer_batch_size','20000000')}}, 'worker_batch_size', {{var('worker_batch_size','10000000')}}))", + target = "{{model.schema}}.{{model.alias}}" + ), + if_data_call_wait()], + tags = ['streamline_decoded_logs_history'] +) }} + +{% set start = this.identifier.split("_") [-2] %} +{% set stop = this.identifier.split("_") [-1] %} +{{ fsc_utils.decode_logs_history( + start, + stop +) }} diff --git a/models/streamline/silver/decoder/history/streamline__decode_logs_history_035000001_037500000.sql b/models/streamline/silver/decoder/history/streamline__decode_logs_history_035000001_037500000.sql new file mode 100644 index 0000000..ab65ae5 --- /dev/null +++ b/models/streamline/silver/decoder/history/streamline__decode_logs_history_035000001_037500000.sql @@ -0,0 +1,16 @@ +{{ config ( + materialized = "view", + post_hook = [if_data_call_function( + func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}', 'external_table', 'DECODED_LOGS', 'sql_limit', {{var('sql_limit','7500000')}}, 'producer_batch_size', {{var('producer_batch_size','20000000')}}, 'worker_batch_size', {{var('worker_batch_size','10000000')}}))", + target = "{{model.schema}}.{{model.alias}}" + ), + if_data_call_wait()], + tags = ['streamline_decoded_logs_history'] +) }} + +{% set start = this.identifier.split("_") [-2] %} +{% set stop = this.identifier.split("_") [-1] %} +{{ fsc_utils.decode_logs_history( + start, + stop +) }} diff --git a/models/streamline/silver/decoder/history/streamline__decode_logs_history_037500001_040000000.sql b/models/streamline/silver/decoder/history/streamline__decode_logs_history_037500001_040000000.sql new file mode 100644 index 0000000..ab65ae5 --- /dev/null +++ b/models/streamline/silver/decoder/history/streamline__decode_logs_history_037500001_040000000.sql @@ -0,0 +1,16 @@ +{{ config ( + materialized = "view", + post_hook = [if_data_call_function( + func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}', 'external_table', 'DECODED_LOGS', 'sql_limit', {{var('sql_limit','7500000')}}, 'producer_batch_size', {{var('producer_batch_size','20000000')}}, 'worker_batch_size', {{var('worker_batch_size','10000000')}}))", + target = "{{model.schema}}.{{model.alias}}" + ), + if_data_call_wait()], + tags = ['streamline_decoded_logs_history'] +) }} + +{% set start = this.identifier.split("_") [-2] %} +{% set stop = this.identifier.split("_") [-1] %} +{{ fsc_utils.decode_logs_history( + start, + stop +) }} diff --git a/models/streamline/silver/decoder/history/streamline__decode_logs_history_040000001_042500000.sql b/models/streamline/silver/decoder/history/streamline__decode_logs_history_040000001_042500000.sql new file mode 100644 index 0000000..ab65ae5 --- /dev/null +++ b/models/streamline/silver/decoder/history/streamline__decode_logs_history_040000001_042500000.sql @@ -0,0 +1,16 @@ +{{ config ( + materialized = "view", + post_hook = [if_data_call_function( + func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}', 'external_table', 'DECODED_LOGS', 'sql_limit', {{var('sql_limit','7500000')}}, 'producer_batch_size', {{var('producer_batch_size','20000000')}}, 'worker_batch_size', {{var('worker_batch_size','10000000')}}))", + target = "{{model.schema}}.{{model.alias}}" + ), + if_data_call_wait()], + tags = ['streamline_decoded_logs_history'] +) }} + +{% set start = this.identifier.split("_") [-2] %} +{% set stop = this.identifier.split("_") [-1] %} +{{ fsc_utils.decode_logs_history( + start, + stop +) }} diff --git a/models/streamline/silver/decoder/history/streamline__decode_logs_history_042500001_045000000.sql b/models/streamline/silver/decoder/history/streamline__decode_logs_history_042500001_045000000.sql new file mode 100644 index 0000000..ab65ae5 --- /dev/null +++ b/models/streamline/silver/decoder/history/streamline__decode_logs_history_042500001_045000000.sql @@ -0,0 +1,16 @@ +{{ config ( + materialized = "view", + post_hook = [if_data_call_function( + func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}', 'external_table', 'DECODED_LOGS', 'sql_limit', {{var('sql_limit','7500000')}}, 'producer_batch_size', {{var('producer_batch_size','20000000')}}, 'worker_batch_size', {{var('worker_batch_size','10000000')}}))", + target = "{{model.schema}}.{{model.alias}}" + ), + if_data_call_wait()], + tags = ['streamline_decoded_logs_history'] +) }} + +{% set start = this.identifier.split("_") [-2] %} +{% set stop = this.identifier.split("_") [-1] %} +{{ fsc_utils.decode_logs_history( + start, + stop +) }} diff --git a/models/streamline/silver/decoder/history/streamline__decode_logs_history_045000001_047500000.sql b/models/streamline/silver/decoder/history/streamline__decode_logs_history_045000001_047500000.sql new file mode 100644 index 0000000..ab65ae5 --- /dev/null +++ b/models/streamline/silver/decoder/history/streamline__decode_logs_history_045000001_047500000.sql @@ -0,0 +1,16 @@ +{{ config ( + materialized = "view", + post_hook = [if_data_call_function( + func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}', 'external_table', 'DECODED_LOGS', 'sql_limit', {{var('sql_limit','7500000')}}, 'producer_batch_size', {{var('producer_batch_size','20000000')}}, 'worker_batch_size', {{var('worker_batch_size','10000000')}}))", + target = "{{model.schema}}.{{model.alias}}" + ), + if_data_call_wait()], + tags = ['streamline_decoded_logs_history'] +) }} + +{% set start = this.identifier.split("_") [-2] %} +{% set stop = this.identifier.split("_") [-1] %} +{{ fsc_utils.decode_logs_history( + start, + stop +) }} diff --git a/models/streamline/silver/decoder/history/streamline__decode_logs_history_047500001_050000000.sql b/models/streamline/silver/decoder/history/streamline__decode_logs_history_047500001_050000000.sql new file mode 100644 index 0000000..ab65ae5 --- /dev/null +++ b/models/streamline/silver/decoder/history/streamline__decode_logs_history_047500001_050000000.sql @@ -0,0 +1,16 @@ +{{ config ( + materialized = "view", + post_hook = [if_data_call_function( + func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}', 'external_table', 'DECODED_LOGS', 'sql_limit', {{var('sql_limit','7500000')}}, 'producer_batch_size', {{var('producer_batch_size','20000000')}}, 'worker_batch_size', {{var('worker_batch_size','10000000')}}))", + target = "{{model.schema}}.{{model.alias}}" + ), + if_data_call_wait()], + tags = ['streamline_decoded_logs_history'] +) }} + +{% set start = this.identifier.split("_") [-2] %} +{% set stop = this.identifier.split("_") [-1] %} +{{ fsc_utils.decode_logs_history( + start, + stop +) }} diff --git a/models/streamline/silver/decoder/realtime/streamline__decode_logs_realtime.sql b/models/streamline/silver/decoder/realtime/streamline__decode_logs_realtime.sql new file mode 100644 index 0000000..2a12b12 --- /dev/null +++ b/models/streamline/silver/decoder/realtime/streamline__decode_logs_realtime.sql @@ -0,0 +1,66 @@ +{{ config ( + materialized = "view", + post_hook = [if_data_call_function( + func = "{{this.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{this.identifier}}', 'external_table', 'DECODED_LOGS', 'sql_limit', {{var('sql_limit','2000000')}}, 'producer_batch_size', {{var('producer_batch_size','400000')}}, 'worker_batch_size', {{var('worker_batch_size','200000')}}))", + target = "{{this.schema}}.{{this.identifier}}" + ), + "call system$wait(" ~ var("WAIT", 400) ~ ")" ], + tags = ['streamline_decoded_logs_realtime'] +) }} + +WITH look_back AS ( + + SELECT + block_number + FROM + {{ ref("_max_block_by_date") }} + qualify ROW_NUMBER() over ( + ORDER BY + block_number DESC + ) = 1 +) +SELECT + l.block_number, + l._log_id, + A.abi AS abi, + OBJECT_CONSTRUCT( + 'topics', + l.topics, + 'data', + l.data, + 'address', + l.contract_address + ) AS DATA +FROM + {{ ref("silver__logs") }} + l + INNER JOIN {{ ref("silver__complete_event_abis") }} A + ON A.parent_contract_address = l.contract_address + AND A.event_signature = l.topics [0] :: STRING + AND l.block_number BETWEEN A.start_block + AND A.end_block +WHERE + ( + l.block_number >= ( + SELECT + block_number + FROM + look_back + ) + ) + AND l.block_number IS NOT NULL + AND l.block_timestamp >= DATEADD('day', -2, CURRENT_DATE()) + AND _log_id NOT IN ( + SELECT + _log_id + FROM + {{ ref("streamline__complete_decode_logs") }} + WHERE + block_number >= ( + SELECT + block_number + FROM + look_back + ) + AND _inserted_timestamp >= DATEADD('day', -2, CURRENT_DATE()) + )