From 12bf4e2e860764f491ad7ea38055d4c958521cf1 Mon Sep 17 00:00:00 2001 From: Jack Forgash <58153492+forgxyz@users.noreply.github.com> Date: Wed, 12 Jun 2024 12:10:27 -0600 Subject: [PATCH] AN-4799/Backfill Migration (#329) * move models to bronze * blocks backfill logic ez ez ez * streamline_load models for all 4 methods * add back a set of views for the complete models to use * del dbt run history gha workflow * migrate txs final to modified ts * upd events to modified * set default val for LOAD_BACKFILL_VERSION in project yml * update curated model incr logic --- .github/workflows/dbt_run_history.yml | 53 ------------------ dbt_project.yml | 1 + models/bronze/streamline/history/README.md | 54 +++++++++++++++++++ .../bronze__streamline_blocks_history.sql | 36 +++++++++++++ ...bronze__streamline_collections_history.sql | 37 +++++++++++++ ...e__streamline_complete_blocks_history.sql} | 0 ...reamline_complete_collections_history.sql} | 0 ..._complete_transaction_results_history.sql} | 0 ...eamline_complete_transactions_history.sql} | 0 ...streamline_transaction_results_history.sql | 37 +++++++++++++ ...ronze__streamline_transactions_history.sql | 37 +++++++++++++ models/bronze/streamline/realtime/README.md | 3 ++ .../realtime/bronze__streamline_blocks.sql | 0 .../bronze__streamline_collections.sql | 0 .../realtime/bronze__streamline_fr_blocks.sql | 0 .../bronze__streamline_fr_collections.sql | 0 ...nze__streamline_fr_transaction_results.sql | 0 .../bronze__streamline_fr_transactions.sql | 0 ...bronze__streamline_transaction_results.sql | 0 .../bronze__streamline_transactions.sql | 0 .../silver/core/silver__streamline_blocks.sql | 23 ++++++-- .../core/silver__streamline_collections.sql | 8 +++ .../silver/core/silver__streamline_events.sql | 6 +-- ...silver__streamline_transaction_results.sql | 8 +++ .../core/silver__streamline_transactions.sql | 8 +++ .../silver__streamline_transactions_final.sql | 14 ++--- models/silver/defi/silver__swaps_events_s.sql | 28 +++++++--- models/silver/defi/silver__swaps_s.sql | 20 +++++-- .../defi/silver__swaps_single_trade_s.sql | 18 +++++-- .../labels/silver__contract_labels_s.sql | 14 +++-- .../silver__labels_pools_metapier_s.sql | 22 ++++++-- .../silver/labels/silver__labels_pools_s.sql | 21 ++++++-- .../onchain/silver__nft_moment_editions_s.sql | 28 ++++++++-- .../onchain/silver__nft_moment_metadata_s.sql | 34 +++++++++--- .../onchain/silver__nft_moment_minted_2_s.sql | 28 ++++++++-- .../onchain/silver__nft_moment_minted_s.sql | 28 ++++++++-- .../onchain/silver__nft_moment_series_s.sql | 28 ++++++++-- .../onchain/silver__nft_moment_set_s.sql | 28 ++++++++-- .../onchain/silver__nft_moments_s.sql | 46 +++++++++------- .../metadata/silver__nft_allday_metadata.sql | 1 + models/silver/nft/silver__nft_giglabs_s.sql | 38 ++++++++++--- models/silver/nft/silver__nft_sales_s.sql | 19 ++++--- .../nft/silver__nft_topshot_sales_s.sql | 23 ++++++-- ...r__nft_transactions_secondary_market_s.sql | 25 ++++++--- .../staking/silver__staking_actions_s.sql | 28 +++++++--- ...treamline__complete_get_blocks_history.sql | 4 +- ...line__complete_get_collections_history.sql | 4 +- ...mplete_get_transaction_results_history.sql | 4 +- ...ine__complete_get_transactions_history.sql | 4 +- .../transfers/silver__bridge_blocto_s.sql | 31 ++++++++--- .../transfers/silver__bridge_celer_s.sql | 32 ++++++++--- .../transfers/silver__token_transfers_s.sql | 17 ++++-- 52 files changed, 691 insertions(+), 207 deletions(-) delete mode 100644 .github/workflows/dbt_run_history.yml create mode 100644 models/bronze/streamline/history/README.md create mode 100644 models/bronze/streamline/history/bronze__streamline_blocks_history.sql create mode 100644 models/bronze/streamline/history/bronze__streamline_collections_history.sql rename models/{silver/streamline/bronze/core/history/bronze__streamline_blocks_history.sql => bronze/streamline/history/bronze__streamline_complete_blocks_history.sql} (100%) rename models/{silver/streamline/bronze/core/history/bronze__streamline_collections_history.sql => bronze/streamline/history/bronze__streamline_complete_collections_history.sql} (100%) rename models/{silver/streamline/bronze/core/history/bronze__streamline_transaction_results_history.sql => bronze/streamline/history/bronze__streamline_complete_transaction_results_history.sql} (100%) rename models/{silver/streamline/bronze/core/history/bronze__streamline_transactions_history.sql => bronze/streamline/history/bronze__streamline_complete_transactions_history.sql} (100%) create mode 100644 models/bronze/streamline/history/bronze__streamline_transaction_results_history.sql create mode 100644 models/bronze/streamline/history/bronze__streamline_transactions_history.sql create mode 100644 models/bronze/streamline/realtime/README.md rename models/{silver/streamline/bronze/core => bronze/streamline}/realtime/bronze__streamline_blocks.sql (100%) rename models/{silver/streamline/bronze/core => bronze/streamline}/realtime/bronze__streamline_collections.sql (100%) rename models/{silver/streamline/bronze/core => bronze/streamline}/realtime/bronze__streamline_fr_blocks.sql (100%) rename models/{silver/streamline/bronze/core => bronze/streamline}/realtime/bronze__streamline_fr_collections.sql (100%) rename models/{silver/streamline/bronze/core => bronze/streamline}/realtime/bronze__streamline_fr_transaction_results.sql (100%) rename models/{silver/streamline/bronze/core => bronze/streamline}/realtime/bronze__streamline_fr_transactions.sql (100%) rename models/{silver/streamline/bronze/core => bronze/streamline}/realtime/bronze__streamline_transaction_results.sql (100%) rename models/{silver/streamline/bronze/core => bronze/streamline}/realtime/bronze__streamline_transactions.sql (100%) diff --git a/.github/workflows/dbt_run_history.yml b/.github/workflows/dbt_run_history.yml deleted file mode 100644 index e547bfc..0000000 --- a/.github/workflows/dbt_run_history.yml +++ /dev/null @@ -1,53 +0,0 @@ -name: dbt_run_history -run-name: dbt_run_history - -on: - workflow_dispatch: - schedule: - # Runs every 2 hours - # - cron: "0 */2 * * *" - # Runs every hour - - cron: "0 * * * *" - -env: - USE_VARS: "${{ vars.USE_VARS }}" - DBT_PROFILES_DIR: "${{ vars.DBT_PROFILES_DIR }}" - DBT_VERSION: "${{ vars.DBT_VERSION }}" - ACCOUNT: "${{ vars.ACCOUNT }}" - ROLE: "${{ vars.ROLE }}" - USER: "${{ vars.USER }}" - PASSWORD: "${{ secrets.PASSWORD }}" - REGION: "${{ vars.REGION }}" - DATABASE: "${{ vars.DATABASE }}" - WAREHOUSE: "${{ vars.WAREHOUSE }}" - SCHEMA: "${{ vars.SCHEMA }}" - -concurrency: - group: ${{ github.workflow }} - -jobs: - dbt: - runs-on: ubuntu-latest - environment: - name: workflow_prod - steps: - - uses: actions/checkout@v3 - - uses: actions/setup-python@v4 - with: - python-version: "3.10" - cache: "pip" - - - name: install dependencies - run: | - pip install -r requirements.txt - dbt deps - - - name: Run DBT Jobs - run: | - dbt run -s \ - 2+streamline__get_transaction_results_history_mainnet_16 \ - 2+streamline__get_transaction_results_history_mainnet_17 \ - 2+streamline__get_transaction_results_history_mainnet_18 \ - 2+streamline__get_transaction_results_history_mainnet_19 \ - 2+streamline__get_transaction_results_history_mainnet_22 \ - --vars '{"STREAMLINE_INVOKE_STREAMS": True}' diff --git a/dbt_project.yml b/dbt_project.yml index e03185c..2da0cdc 100644 --- a/dbt_project.yml +++ b/dbt_project.yml @@ -66,6 +66,7 @@ vars: REST_API_PREFIX_PROD: quxfxtl934.execute-api.us-east-1.amazonaws.com/prod/ REST_API_PREFIX_DEV: ul6x832e8l.execute-api.us-east-1.amazonaws.com/dev/ STREAMLINE_START_BLOCK: 55114467 + LOAD_BACKFILL_VERSION: CANDIDATE_07 dispatch: - macro_namespace: dbt diff --git a/models/bronze/streamline/history/README.md b/models/bronze/streamline/history/README.md new file mode 100644 index 0000000..7f0f50e --- /dev/null +++ b/models/bronze/streamline/history/README.md @@ -0,0 +1,54 @@ +# Backfill + +Bronze backfill models have been parametrized to load one network version at a time, as each set of NVs and method responses is a separate bucket and external table. + +Run either an individual model type (blocks, collections, transactions, transaction_results) or all 4 at once with `tag:streamline_load`. + +```shell +dbt run -s 1+tag:streamline_load --vars '{"LOAD_BACKFILL": True, "LOAD_BACKFILL_VERSION": ""}' +``` + +## Valid Network Versions + - CANDIDATE_07 + - CANDIDATE_08 + - CANDIDATE_09 + - MAINNET_01 + - MAINNET_02 + - MAINNET_03 + - MAINNET_04 + - MAINNET_05 + - MAINNET_06 + - MAINNET_07 + - MAINNET_08 + - MAINNET_09 + - MAINNET_10 + - MAINNET_11 + - MAINNET_12 + - MAINNET_13 + - MAINNET_14 + - MAINNET_15 + - MAINNET_16 + - MAINNET_17 + - MAINNET_18 + - MAINNET_19 + - MAINNET_20 + - MAINNET_21 + - MAINNET_22 + +## View Types +Views with the word `complete` in the name are used in the complete history models at `models/silver/streamline/core/complete`. These use a macro to scan multiple external tables in one call, and feed the streamline backfill process. + +The views `bronze__streamline__history` query just one network version based on the `LOAD_BACKFILL_VERSION` argument passed at runtime. No default is set for this variable so execution fails if it is forgottten. + +## Running Streamline Backfill +If a a network version requires more backfill due to missing blocks or transactions (at present, there are 5800 missing transaction results), run the following command as the workflow dbt_run_history has been deleted. +```shell +dbt run -s 2+streamline__get__history_ --vars '{"STREAMLINE_INVOKE_STREAMS": True}' +``` + +i.e. +```shell +dbt run -s \ +2+streamline__get_transaction_results_history_mainnet_22 \ +--vars '{"STREAMLINE_INVOKE_STREAMS": True}' +``` diff --git a/models/bronze/streamline/history/bronze__streamline_blocks_history.sql b/models/bronze/streamline/history/bronze__streamline_blocks_history.sql new file mode 100644 index 0000000..3ed5294 --- /dev/null +++ b/models/bronze/streamline/history/bronze__streamline_blocks_history.sql @@ -0,0 +1,36 @@ +{{ config ( + materialized = 'ephemeral' +) }} + +{% set history_model = "BLOCKS_" ~ var('LOAD_BACKFILL_VERSION') %} + +WITH meta AS ( + SELECT + registered_on AS _inserted_timestamp, + file_name, + CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER) AS _partition_by_block_id + FROM + TABLE( + information_schema.external_table_files( + table_name => '{{ source( "bronze_streamline", history_model ) }}' + ) + ) A +) +SELECT + block_number, + DATA, + _inserted_timestamp, + MD5( + CAST( + COALESCE(CAST(block_number AS text), '' :: STRING) AS text + ) + ) AS _fsc_id, + s._partition_by_block_id, + s.value AS VALUE +FROM + {{ source("bronze_streamline", history_model ) }} s + JOIN meta b + ON b.file_name = metadata$filename + AND b._partition_by_block_id = s._partition_by_block_id +WHERE + b._partition_by_block_id = s._partition_by_block_id diff --git a/models/bronze/streamline/history/bronze__streamline_collections_history.sql b/models/bronze/streamline/history/bronze__streamline_collections_history.sql new file mode 100644 index 0000000..bbef965 --- /dev/null +++ b/models/bronze/streamline/history/bronze__streamline_collections_history.sql @@ -0,0 +1,37 @@ +{{ config ( + materialized = 'ephemeral' +) }} + +{% set history_model = "COLLECTIONS_" ~ var('LOAD_BACKFILL_VERSION') %} + +WITH meta AS ( + SELECT + registered_on AS _inserted_timestamp, + file_name, + CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER) AS _partition_by_block_id + FROM + TABLE( + information_schema.external_table_files( + table_name => '{{ source( "bronze_streamline", history_model ) }}' + ) + ) A +) +SELECT + block_number, + id, + DATA, + _inserted_timestamp, + MD5( + CAST( + COALESCE(CAST(block_number AS text), '' :: STRING) AS text + ) + ) AS _fsc_id, + s._partition_by_block_id, + s.value AS VALUE +FROM + {{ source("bronze_streamline", history_model ) }} s + JOIN meta b + ON b.file_name = metadata$filename + AND b._partition_by_block_id = s._partition_by_block_id +WHERE + b._partition_by_block_id = s._partition_by_block_id diff --git a/models/silver/streamline/bronze/core/history/bronze__streamline_blocks_history.sql b/models/bronze/streamline/history/bronze__streamline_complete_blocks_history.sql similarity index 100% rename from models/silver/streamline/bronze/core/history/bronze__streamline_blocks_history.sql rename to models/bronze/streamline/history/bronze__streamline_complete_blocks_history.sql diff --git a/models/silver/streamline/bronze/core/history/bronze__streamline_collections_history.sql b/models/bronze/streamline/history/bronze__streamline_complete_collections_history.sql similarity index 100% rename from models/silver/streamline/bronze/core/history/bronze__streamline_collections_history.sql rename to models/bronze/streamline/history/bronze__streamline_complete_collections_history.sql diff --git a/models/silver/streamline/bronze/core/history/bronze__streamline_transaction_results_history.sql b/models/bronze/streamline/history/bronze__streamline_complete_transaction_results_history.sql similarity index 100% rename from models/silver/streamline/bronze/core/history/bronze__streamline_transaction_results_history.sql rename to models/bronze/streamline/history/bronze__streamline_complete_transaction_results_history.sql diff --git a/models/silver/streamline/bronze/core/history/bronze__streamline_transactions_history.sql b/models/bronze/streamline/history/bronze__streamline_complete_transactions_history.sql similarity index 100% rename from models/silver/streamline/bronze/core/history/bronze__streamline_transactions_history.sql rename to models/bronze/streamline/history/bronze__streamline_complete_transactions_history.sql diff --git a/models/bronze/streamline/history/bronze__streamline_transaction_results_history.sql b/models/bronze/streamline/history/bronze__streamline_transaction_results_history.sql new file mode 100644 index 0000000..97a0e77 --- /dev/null +++ b/models/bronze/streamline/history/bronze__streamline_transaction_results_history.sql @@ -0,0 +1,37 @@ +{{ config ( + materialized = 'ephemeral' +) }} + +{% set history_model = "TRANSACTION_RESULTS_" ~ var('LOAD_BACKFILL_VERSION') %} + +WITH meta AS ( + SELECT + registered_on AS _inserted_timestamp, + file_name, + CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER) AS _partition_by_block_id + FROM + TABLE( + information_schema.external_table_files( + table_name => '{{ source( "bronze_streamline", history_model ) }}' + ) + ) A +) +SELECT + block_number, + id, + DATA, + _inserted_timestamp, + MD5( + CAST( + COALESCE(CAST(block_number AS text), '' :: STRING) AS text + ) + ) AS _fsc_id, + s._partition_by_block_id, + s.value AS VALUE +FROM + {{ source("bronze_streamline", history_model ) }} s + JOIN meta b + ON b.file_name = metadata$filename + AND b._partition_by_block_id = s._partition_by_block_id +WHERE + b._partition_by_block_id = s._partition_by_block_id diff --git a/models/bronze/streamline/history/bronze__streamline_transactions_history.sql b/models/bronze/streamline/history/bronze__streamline_transactions_history.sql new file mode 100644 index 0000000..5084d26 --- /dev/null +++ b/models/bronze/streamline/history/bronze__streamline_transactions_history.sql @@ -0,0 +1,37 @@ +{{ config ( + materialized = 'ephemeral' +) }} + +{% set history_model = "TRANSACTIONS_" ~ var('LOAD_BACKFILL_VERSION') %} + +WITH meta AS ( + SELECT + registered_on AS _inserted_timestamp, + file_name, + CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER) AS _partition_by_block_id + FROM + TABLE( + information_schema.external_table_files( + table_name => '{{ source( "bronze_streamline", history_model ) }}' + ) + ) A +) +SELECT + block_number, + id, + DATA, + _inserted_timestamp, + MD5( + CAST( + COALESCE(CAST(block_number AS text), '' :: STRING) AS text + ) + ) AS _fsc_id, + s._partition_by_block_id, + s.value AS VALUE +FROM + {{ source("bronze_streamline", history_model ) }} s + JOIN meta b + ON b.file_name = metadata$filename + AND b._partition_by_block_id = s._partition_by_block_id +WHERE + b._partition_by_block_id = s._partition_by_block_id diff --git a/models/bronze/streamline/realtime/README.md b/models/bronze/streamline/realtime/README.md new file mode 100644 index 0000000..e958584 --- /dev/null +++ b/models/bronze/streamline/realtime/README.md @@ -0,0 +1,3 @@ +# Streamline Realtime Models + +Flow migrated to Streamline with Mainnet-23. These views point to the external tables from there forward. diff --git a/models/silver/streamline/bronze/core/realtime/bronze__streamline_blocks.sql b/models/bronze/streamline/realtime/bronze__streamline_blocks.sql similarity index 100% rename from models/silver/streamline/bronze/core/realtime/bronze__streamline_blocks.sql rename to models/bronze/streamline/realtime/bronze__streamline_blocks.sql diff --git a/models/silver/streamline/bronze/core/realtime/bronze__streamline_collections.sql b/models/bronze/streamline/realtime/bronze__streamline_collections.sql similarity index 100% rename from models/silver/streamline/bronze/core/realtime/bronze__streamline_collections.sql rename to models/bronze/streamline/realtime/bronze__streamline_collections.sql diff --git a/models/silver/streamline/bronze/core/realtime/bronze__streamline_fr_blocks.sql b/models/bronze/streamline/realtime/bronze__streamline_fr_blocks.sql similarity index 100% rename from models/silver/streamline/bronze/core/realtime/bronze__streamline_fr_blocks.sql rename to models/bronze/streamline/realtime/bronze__streamline_fr_blocks.sql diff --git a/models/silver/streamline/bronze/core/realtime/bronze__streamline_fr_collections.sql b/models/bronze/streamline/realtime/bronze__streamline_fr_collections.sql similarity index 100% rename from models/silver/streamline/bronze/core/realtime/bronze__streamline_fr_collections.sql rename to models/bronze/streamline/realtime/bronze__streamline_fr_collections.sql diff --git a/models/silver/streamline/bronze/core/realtime/bronze__streamline_fr_transaction_results.sql b/models/bronze/streamline/realtime/bronze__streamline_fr_transaction_results.sql similarity index 100% rename from models/silver/streamline/bronze/core/realtime/bronze__streamline_fr_transaction_results.sql rename to models/bronze/streamline/realtime/bronze__streamline_fr_transaction_results.sql diff --git a/models/silver/streamline/bronze/core/realtime/bronze__streamline_fr_transactions.sql b/models/bronze/streamline/realtime/bronze__streamline_fr_transactions.sql similarity index 100% rename from models/silver/streamline/bronze/core/realtime/bronze__streamline_fr_transactions.sql rename to models/bronze/streamline/realtime/bronze__streamline_fr_transactions.sql diff --git a/models/silver/streamline/bronze/core/realtime/bronze__streamline_transaction_results.sql b/models/bronze/streamline/realtime/bronze__streamline_transaction_results.sql similarity index 100% rename from models/silver/streamline/bronze/core/realtime/bronze__streamline_transaction_results.sql rename to models/bronze/streamline/realtime/bronze__streamline_transaction_results.sql diff --git a/models/silver/streamline/bronze/core/realtime/bronze__streamline_transactions.sql b/models/bronze/streamline/realtime/bronze__streamline_transactions.sql similarity index 100% rename from models/silver/streamline/bronze/core/realtime/bronze__streamline_transactions.sql rename to models/bronze/streamline/realtime/bronze__streamline_transactions.sql diff --git a/models/silver/core/silver__streamline_blocks.sql b/models/silver/core/silver__streamline_blocks.sql index 8359745..74b7d51 100644 --- a/models/silver/core/silver__streamline_blocks.sql +++ b/models/silver/core/silver__streamline_blocks.sql @@ -1,4 +1,6 @@ -- depends_on: {{ ref('bronze__streamline_blocks') }} +-- depends_on: {{ ref('bronze__streamline_blocks_history') }} + {{ config( materialized = 'incremental', unique_key = "block_number", @@ -10,7 +12,7 @@ WITH -{% if is_incremental() %} +{% if is_incremental() and not var('LOAD_BACKFILL', False) %} tx_count_lookback AS ( -- lookback to ensure tx count is correct @@ -21,7 +23,7 @@ tx_count_lookback AS ( WHERE block_height >= {{ var( 'STREAMLINE_START_BLOCK' - ) }} + ) }} -- TODO, remove AFTER backfill is complete -- limit to 3 day lookback for performance AND _inserted_timestamp >= SYSDATE() - INTERVAL '3 days' AND ( @@ -47,7 +49,11 @@ streamline_blocks AS ( _partition_by_block_id, _inserted_timestamp FROM - +{% if var('LOAD_BACKFILL', False) %} + {{ ref('bronze__streamline_blocks_history') }} + -- TODO need incremental logic of some sort probably (for those 5800 missing txs) + -- where inserted timestamp >= max from this where network version = backfill version OR block range between root and end +{% else %} {% if is_incremental() %} {{ ref('bronze__streamline_blocks') }} WHERE @@ -66,6 +72,7 @@ WHERE {% else %} {{ ref('bronze__streamline_fr_blocks') }} {% endif %} +{% endif %} qualify(ROW_NUMBER() over (PARTITION BY block_number ORDER BY @@ -86,7 +93,14 @@ collections AS ( * FROM {{ ref('silver__streamline_collections') }} - +{% if var('LOAD_BACKFILL', False) %} +WHERE + block_number between ( + SELECT root_height FROM network_version WHERE lower(network_version) = lower('{{ var('LOAD_BACKFILL_VERSION').replace('_', '-') }}') + ) AND ( + SELECT end_height FROM network_version WHERE lower(network_version) = lower('{{ var('LOAD_BACKFILL_VERSION').replace('_', '-') }}') + ) +{% else %} {% if is_incremental() %} WHERE _inserted_timestamp >= ( @@ -102,6 +116,7 @@ WHERE tx_count_lookback ) {% endif %} +{% endif %} ), tx_count AS ( SELECT diff --git a/models/silver/core/silver__streamline_collections.sql b/models/silver/core/silver__streamline_collections.sql index 2f1f7af..d7d1bc4 100644 --- a/models/silver/core/silver__streamline_collections.sql +++ b/models/silver/core/silver__streamline_collections.sql @@ -25,6 +25,12 @@ SELECT '{{ invocation_id }}' AS _invocation_id FROM +{% if var('LOAD_BACKFILL', False) %} + {{ ref('bronze__streamline_collections_history') }} + -- TODO need incremental logic of some sort probably (for those 5800 missing txs) + -- where inserted timestamp >= max from this where network version = backfill version OR block range between root and end +{% else %} + {% if is_incremental() %} {{ ref('bronze__streamline_collections') }} WHERE @@ -38,6 +44,8 @@ WHERE {{ ref('bronze__streamline_fr_collections') }} {% endif %} +{% endif %} + qualify(ROW_NUMBER() over (PARTITION BY collection_id ORDER BY _inserted_timestamp DESC)) = 1 diff --git a/models/silver/core/silver__streamline_events.sql b/models/silver/core/silver__streamline_events.sql index e8f7a9f..707c1ac 100644 --- a/models/silver/core/silver__streamline_events.sql +++ b/models/silver/core/silver__streamline_events.sql @@ -14,12 +14,12 @@ WITH transactions AS ( FROM {{ ref('silver__streamline_transactions_final') }} WHERE - NOT pending_result_response -- inserted timestamp will update w TR ingestion, so should flow thru to events and curated + NOT pending_result_response {% if is_incremental() %} -AND _inserted_timestamp >= ( +AND modified_timestamp >= ( SELECT - MAX(_inserted_timestamp) + MAX(modified_timestamp) FROM {{ this }} ) diff --git a/models/silver/core/silver__streamline_transaction_results.sql b/models/silver/core/silver__streamline_transaction_results.sql index 2436a39..48fe987 100644 --- a/models/silver/core/silver__streamline_transaction_results.sql +++ b/models/silver/core/silver__streamline_transaction_results.sql @@ -26,6 +26,12 @@ SELECT '{{ invocation_id }}' AS _invocation_id FROM +{% if var('LOAD_BACKFILL', False) %} + {{ ref('bronze__streamline_transaction_results_history') }} + -- TODO need incremental logic of some sort probably (for those 5800 missing txs) + -- where inserted timestamp >= max from this where network version = backfill version OR block range between root and end +{% else %} + {% if is_incremental() %} {{ ref('bronze__streamline_transaction_results') }} WHERE @@ -39,6 +45,8 @@ WHERE {{ ref('bronze__streamline_fr_transaction_results') }} {% endif %} +{% endif %} + qualify(ROW_NUMBER() over (PARTITION BY tx_id ORDER BY _inserted_timestamp DESC)) = 1 diff --git a/models/silver/core/silver__streamline_transactions.sql b/models/silver/core/silver__streamline_transactions.sql index d05fb23..cf7b680 100644 --- a/models/silver/core/silver__streamline_transactions.sql +++ b/models/silver/core/silver__streamline_transactions.sql @@ -30,6 +30,12 @@ SELECT _inserted_timestamp FROM +{% if var('LOAD_BACKFILL', False) %} + {{ ref('bronze__streamline_transactions_history') }} + -- TODO need incremental logic of some sort probably (for those 5800 missing txs) + -- where inserted timestamp >= max from this where network version = backfill version OR block range between root and end +{% else %} + {% if is_incremental() %} {{ ref('bronze__streamline_transactions') }} WHERE @@ -43,6 +49,8 @@ WHERE {{ ref('bronze__streamline_fr_transactions') }} {% endif %} +{% endif %} + qualify(ROW_NUMBER() over (PARTITION BY tx_id ORDER BY _inserted_timestamp DESC)) = 1 diff --git a/models/silver/core/silver__streamline_transactions_final.sql b/models/silver/core/silver__streamline_transactions_final.sql index f9a7339..6d619d5 100644 --- a/models/silver/core/silver__streamline_transactions_final.sql +++ b/models/silver/core/silver__streamline_transactions_final.sql @@ -17,7 +17,7 @@ block_height FROM """ ~ this ~ """ WHERE - _inserted_timestamp >= SYSDATE() - INTERVAL '3 days' + modified_timestamp >= SYSDATE() - INTERVAL '3 days' AND ( block_timestamp IS NULL OR pending_result_response @@ -32,15 +32,15 @@ {% if is_incremental() %} {% set incr = """ WHERE - _inserted_timestamp >= ( + modified_timestamp >= ( SELECT - MAX(_inserted_timestamp) _inserted_timestamp + MAX(modified_timestamp) modified_timestamp FROM """ ~ this ~ """ ) OR -- re-run record if block comes in later than tx records ( - _inserted_timestamp >= SYSDATE() - INTERVAL '3 days' + modified_timestamp >= SYSDATE() - INTERVAL '3 days' AND tx_id IN ( SELECT @@ -60,7 +60,7 @@ Coalesce in case there are 0 txs returned by the temp table */ {% if execute %} - {% set min_time = run_query("select coalesce(min(_inserted_timestamp),current_timestamp()) from silver.streamline_transactions_final_intermediate_tmp").columns [0].values() [0] %} + {% set min_time = run_query("select coalesce(min(modified_timestamp),sysdate()) from silver.streamline_transactions_final_intermediate_tmp").columns [0].values() [0] %} {% endif %} WITH txs AS ( @@ -78,7 +78,7 @@ tx_results AS ( {% if is_incremental() %} WHERE - _inserted_timestamp >= SYSDATE() - INTERVAL '3 days' + modified_timestamp >= SYSDATE() - INTERVAL '3 days' AND tx_id IN ( SELECT DISTINCT tx_id @@ -95,7 +95,7 @@ blocks AS ( {% if is_incremental() %} WHERE - _inserted_timestamp >= SYSDATE() - INTERVAL '3 days' + modified_timestamp >= SYSDATE() - INTERVAL '3 days' AND block_number IN ( SELECT DISTINCT block_number diff --git a/models/silver/defi/silver__swaps_events_s.sql b/models/silver/defi/silver__swaps_events_s.sql index 16ed125..d263242 100644 --- a/models/silver/defi/silver__swaps_events_s.sql +++ b/models/silver/defi/silver__swaps_events_s.sql @@ -9,16 +9,20 @@ WITH swaps_txs AS ( SELECT - * + block_height, + tx_id, + _inserted_timestamp, + _partition_by_block_id, + modified_timestamp FROM {{ ref('silver__streamline_events') }} WHERE event_contract LIKE '%SwapPair%' {% if is_incremental() %} -AND _inserted_timestamp >= ( +AND modified_timestamp >= ( SELECT - MAX(_inserted_timestamp) + MAX(modified_timestamp) FROM {{ this }} ) @@ -26,7 +30,19 @@ AND _inserted_timestamp >= ( ), swap_events AS ( SELECT - * + tx_id, + block_height, + block_timestamp, + event_id, + event_index, + events_count, + payload, + event_contract, + event_type, + event_data, + tx_succeeded, + _inserted_timestamp, + _partition_by_block_id FROM {{ ref('silver__streamline_events') }} WHERE @@ -40,9 +56,9 @@ swap_events AS ( AND event_index < events_count - 3 {% if is_incremental() %} -AND _inserted_timestamp >= ( +AND modified_timestamp >= ( SELECT - MAX(_inserted_timestamp) + MAX(modified_timestamp) FROM {{ this }} ) diff --git a/models/silver/defi/silver__swaps_s.sql b/models/silver/defi/silver__swaps_s.sql index eca6d66..8dc97e1 100644 --- a/models/silver/defi/silver__swaps_s.sql +++ b/models/silver/defi/silver__swaps_s.sql @@ -9,15 +9,23 @@ WITH events AS ( SELECT - * + block_height, + block_timestamp, + tx_id, + event_index, + event_contract, + event_type, + event_data, + _inserted_timestamp, + _partition_by_block_id FROM {{ ref('silver__swaps_events_s') }} {% if is_incremental() %} WHERE - _inserted_timestamp >= ( + modified_timestamp >= ( SELECT - MAX(_inserted_timestamp) + MAX(modified_timestamp) FROM {{ this }} ) @@ -112,7 +120,8 @@ token_withdraws AS ( ) - 1 AS unique_order, event_contract, event_data, - _inserted_timestamp + _inserted_timestamp, + _partition_by_block_id FROM swap_events WHERE @@ -166,6 +175,7 @@ link_token_movement AS ( w.block_timestamp, w.block_height, w._inserted_timestamp, + w._partition_by_block_id, -- set transfer index based on execution via deposit, not withdraw, event RANK() over ( PARTITION BY w.tx_id @@ -283,6 +293,7 @@ boilerplate AS ( block_timestamp, block_height, _inserted_timestamp, + _partition_by_block_id, withdraw_from AS trader FROM link_token_movement @@ -310,6 +321,7 @@ FINAL AS ( tokens :token1 :: STRING AS token_in_contract, amounts :amount1 :: DOUBLE AS token_in_amount, _inserted_timestamp, + _partition_by_block_id, {{ dbt_utils.generate_surrogate_key( ['tx_id', 'swap_index'] ) }} AS swaps_id, diff --git a/models/silver/defi/silver__swaps_single_trade_s.sql b/models/silver/defi/silver__swaps_single_trade_s.sql index 2ca4c73..218ef43 100644 --- a/models/silver/defi/silver__swaps_single_trade_s.sql +++ b/models/silver/defi/silver__swaps_single_trade_s.sql @@ -9,15 +9,23 @@ WITH swaps_events AS ( SELECT - * + block_height, + block_timestamp, + tx_id, + event_index, + event_contract, + event_type, + event_data, + _inserted_timestamp, + _partition_by_block_id FROM {{ ref('silver__swaps_events_s') }} {% if is_incremental() %} WHERE - _inserted_timestamp >= ( + modified_timestamp >= ( SELECT - MAX(_inserted_timestamp) + MAX(modified_timestamp) FROM {{ this }} ) @@ -97,7 +105,8 @@ token_out_data AS ( LOWER( event_data :from :: STRING ) AS trader_token_out, - _inserted_timestamp + _inserted_timestamp, + _partition_by_block_id FROM index_id ii LEFT JOIN swaps_single_trade sst USING ( @@ -160,6 +169,7 @@ combo AS ( td.token_1_amount, td.token_2_amount, tod._inserted_timestamp, + tod._partition_by_block_id, {{ dbt_utils.generate_surrogate_key( ['tx_id'] ) }} AS swaps_single_trade_id, diff --git a/models/silver/labels/silver__contract_labels_s.sql b/models/silver/labels/silver__contract_labels_s.sql index 54e6daf..4294dd6 100644 --- a/models/silver/labels/silver__contract_labels_s.sql +++ b/models/silver/labels/silver__contract_labels_s.sql @@ -14,15 +14,17 @@ WITH splt AS ( event_contract, '.' ) AS ec_s, - _inserted_timestamp + _inserted_timestamp, + _partition_by_block_id, + modified_timestamp AS _modified_timestamp FROM {{ ref('silver__streamline_events') }} {% if is_incremental() %} WHERE - _inserted_timestamp >= ( + modified_timestamp >= ( SELECT - MAX(_inserted_timestamp) _inserted_timestamp + MAX(modified_timestamp) FROM {{ this }} ) @@ -36,7 +38,9 @@ FINAL AS ( '0x', ec_s [array_size(ec_s)-2] :: STRING ) AS account_address, - _inserted_timestamp + _inserted_timestamp, + _partition_by_block_id, + _modified_timestamp FROM splt WHERE @@ -54,5 +58,5 @@ FROM FINAL qualify ROW_NUMBER() over ( PARTITION BY event_contract ORDER BY - _inserted_timestamp DESC + _modified_timestamp DESC ) = 1 diff --git a/models/silver/labels/silver__labels_pools_metapier_s.sql b/models/silver/labels/silver__labels_pools_metapier_s.sql index 35ad0e1..434e85f 100644 --- a/models/silver/labels/silver__labels_pools_metapier_s.sql +++ b/models/silver/labels/silver__labels_pools_metapier_s.sql @@ -9,15 +9,24 @@ WITH events AS ( SELECT - * + block_height, + block_timestamp, + tx_id, + event_index, + event_type, + event_contract, + event_data, + _inserted_timestamp, + _partition_by_block_id, + modified_timestamp FROM {{ ref('silver__streamline_events') }} {% if is_incremental() %} WHERE - _inserted_timestamp >= ( + modified_timestamp >= ( SELECT - MAX(_inserted_timestamp) + MAX(modified_timestamp) FROM {{ this }} ) @@ -56,7 +65,8 @@ token_withdraws AS ( block_timestamp, event_contract, event_index, - _inserted_timestamp + _inserted_timestamp, + _partition_by_block_id FROM pier_events WHERE @@ -73,7 +83,8 @@ pairs AS ( ORDER BY event_index ) AS token1_contract, - _inserted_timestamp + _inserted_timestamp, + _partition_by_block_id FROM token_withdraws ), @@ -106,6 +117,7 @@ FINAL AS ( e.vault_address, pa.swap_contract, C._inserted_timestamp, + C._partition_by_block_id, {{ dbt_utils.generate_surrogate_key( ['tx_id'] ) }} AS labels_pools_metapier_id, diff --git a/models/silver/labels/silver__labels_pools_s.sql b/models/silver/labels/silver__labels_pools_s.sql index 115139c..ffb63af 100644 --- a/models/silver/labels/silver__labels_pools_s.sql +++ b/models/silver/labels/silver__labels_pools_s.sql @@ -9,15 +9,24 @@ WITH events AS ( SELECT - * + block_height, + block_timestamp, + tx_id, + event_index, + event_type, + event_contract, + event_data, + _inserted_timestamp, + _partition_by_block_id, + modified_timestamp FROM {{ ref('silver__streamline_events') }} {% if is_incremental() %} WHERE - _inserted_timestamp >= ( + modified_timestamp >= ( SELECT - MAX(_inserted_timestamp) + MAX(modified_timestamp) FROM {{ this }} ) @@ -40,7 +49,8 @@ pair_creation AS ( event_data :pairAddress :: STRING AS account_address, event_data :token0Key :: STRING AS token0_contract, event_data :token1Key :: STRING AS token1_contract, - _inserted_timestamp + _inserted_timestamp, + _partition_by_block_id FROM events WHERE @@ -61,7 +71,8 @@ FINAL AS ( SYSDATE() AS inserted_timestamp, SYSDATE() AS modified_timestamp, '{{ invocation_id }}' AS _invocation_id, - _inserted_timestamp + _inserted_timestamp, + _partition_by_block_id FROM pair_creation p LEFT JOIN pair_labels l USING (account_address) diff --git a/models/silver/nft/metadata/onchain/silver__nft_moment_editions_s.sql b/models/silver/nft/metadata/onchain/silver__nft_moment_editions_s.sql index 9e68670..454c8c0 100644 --- a/models/silver/nft/metadata/onchain/silver__nft_moment_editions_s.sql +++ b/models/silver/nft/metadata/onchain/silver__nft_moment_editions_s.sql @@ -9,7 +9,17 @@ WITH events AS ( SELECT - * + block_height, + block_timestamp, + tx_id, + event_id, + event_index, + event_type, + event_contract, + event_data, + _inserted_timestamp, + _partition_by_block_id, + modified_timestamp FROM {{ ref('silver__streamline_events') }} WHERE @@ -21,9 +31,9 @@ WITH events AS ( ) {% if is_incremental() %} -AND _inserted_timestamp >= ( +AND modified_timestamp >= ( SELECT - MAX(_inserted_timestamp) + MAX(modified_timestamp) FROM {{ this }} ) @@ -32,6 +42,7 @@ AND _inserted_timestamp >= ( org AS ( SELECT tx_id, + event_id, block_timestamp, event_contract, event_data :id :: STRING AS edition_id, @@ -40,11 +51,18 @@ org AS ( event_data :seriesID :: STRING AS series_id, event_data :setID :: STRING AS set_id, event_data :tier :: STRING AS tier, - _inserted_timestamp + _inserted_timestamp, + _partition_by_block_id FROM events ) SELECT - * + *, + {{ dbt_utils.generate_surrogate_key( + ['event_id'] + ) }} AS nft_moment_editions_id, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp, + '{{ invocation_id }}' AS _invocation_id FROM org diff --git a/models/silver/nft/metadata/onchain/silver__nft_moment_metadata_s.sql b/models/silver/nft/metadata/onchain/silver__nft_moment_metadata_s.sql index 3fb7299..c0efb1b 100644 --- a/models/silver/nft/metadata/onchain/silver__nft_moment_metadata_s.sql +++ b/models/silver/nft/metadata/onchain/silver__nft_moment_metadata_s.sql @@ -9,7 +9,17 @@ WITH play_creation AS ( SELECT - * + block_height, + block_timestamp, + tx_id, + event_id, + event_index, + event_type, + event_contract, + event_data, + _inserted_timestamp, + _partition_by_block_id, + modified_timestamp FROM {{ ref('silver__streamline_events') }} WHERE @@ -28,9 +38,9 @@ A.87ca73a41bb50ad5.Golazos {% if is_incremental() %} -AND _inserted_timestamp >= ( +AND modified_timestamp >= ( SELECT - MAX(_inserted_timestamp) + MAX(modified_timestamp) FROM {{ this }} ) @@ -39,12 +49,14 @@ AND _inserted_timestamp >= ( play_metadata AS ( SELECT tx_id, + event_id, block_timestamp, event_contract, event_data :id :: NUMBER AS play_id, VALUE :key :value :: STRING AS column_header, VALUE :value :value :: STRING AS column_value, - _inserted_timestamp + _inserted_timestamp, + _partition_by_block_id FROM play_creation, LATERAL FLATTEN(input => TRY_PARSE_JSON(event_data :metadata)) @@ -52,10 +64,12 @@ play_metadata AS ( FINAL AS ( SELECT tx_id, + event_id, block_timestamp, event_contract, play_id, _inserted_timestamp, + _partition_by_block_id, OBJECT_AGG( column_header :: variant, column_value :: variant @@ -67,9 +81,17 @@ FINAL AS ( 2, 3, 4, - 5 + 5, + 6, + 7 ) SELECT - * + *, + {{ dbt_utils.generate_surrogate_key( + ['event_id'] + ) }} AS nft_moment_metadata_id, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp, + '{{ invocation_id }}' AS _invocation_id FROM FINAL diff --git a/models/silver/nft/metadata/onchain/silver__nft_moment_minted_2_s.sql b/models/silver/nft/metadata/onchain/silver__nft_moment_minted_2_s.sql index 1df7d8a..472193f 100644 --- a/models/silver/nft/metadata/onchain/silver__nft_moment_minted_2_s.sql +++ b/models/silver/nft/metadata/onchain/silver__nft_moment_minted_2_s.sql @@ -9,16 +9,26 @@ WITH events AS ( SELECT - * + block_height, + block_timestamp, + tx_id, + event_id, + event_index, + event_type, + event_contract, + event_data, + _inserted_timestamp, + _partition_by_block_id, + modified_timestamp FROM {{ ref('silver__streamline_events') }} WHERE event_type = 'MomentMinted' {% if is_incremental() %} -AND _inserted_timestamp >= ( +AND modified_timestamp >= ( SELECT - MAX(_inserted_timestamp) + MAX(modified_timestamp) FROM {{ this }} ) @@ -27,17 +37,25 @@ AND _inserted_timestamp >= ( org AS ( SELECT tx_id, + event_id, block_timestamp, event_contract, event_data :momentID :: STRING AS moment_id, event_data :serialNumber :: STRING AS serial_number, event_data :seriesID :: STRING AS series_id, event_data :setID :: STRING AS set_id, - _inserted_timestamp + _inserted_timestamp, + _partition_by_block_id FROM events ) SELECT - * + *, + {{ dbt_utils.generate_surrogate_key( + ['event_id'] + ) }} AS nft_moment_minted_2_id, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp, + '{{ invocation_id }}' AS _invocation_id FROM org diff --git a/models/silver/nft/metadata/onchain/silver__nft_moment_minted_s.sql b/models/silver/nft/metadata/onchain/silver__nft_moment_minted_s.sql index 94e1008..9d2e765 100644 --- a/models/silver/nft/metadata/onchain/silver__nft_moment_minted_s.sql +++ b/models/silver/nft/metadata/onchain/silver__nft_moment_minted_s.sql @@ -9,16 +9,26 @@ WITH events AS ( SELECT - * + block_height, + block_timestamp, + tx_id, + event_id, + event_index, + event_type, + event_contract, + event_data, + _inserted_timestamp, + _partition_by_block_id, + modified_timestamp FROM {{ ref('silver__streamline_events') }} WHERE event_type = 'MomentNFTMinted' {% if is_incremental() %} -AND _inserted_timestamp >= ( +AND modified_timestamp >= ( SELECT - MAX(_inserted_timestamp) + MAX(modified_timestamp) FROM {{ this }} ) @@ -27,16 +37,24 @@ AND _inserted_timestamp >= ( org AS ( SELECT tx_id, + event_id, block_timestamp, event_contract, event_data :editionID :: STRING AS edition_id, event_data :id :: STRING AS nft_id, event_data :serialNumber :: STRING AS serial_number, - _inserted_timestamp + _inserted_timestamp, + _partition_by_block_id FROM events ) SELECT - * + *, + {{ dbt_utils.generate_surrogate_key( + ['event_id'] + ) }} AS nft_moment_minted_id, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp, + '{{ invocation_id }}' AS _invocation_id FROM org diff --git a/models/silver/nft/metadata/onchain/silver__nft_moment_series_s.sql b/models/silver/nft/metadata/onchain/silver__nft_moment_series_s.sql index 91a363b..938676b 100644 --- a/models/silver/nft/metadata/onchain/silver__nft_moment_series_s.sql +++ b/models/silver/nft/metadata/onchain/silver__nft_moment_series_s.sql @@ -9,7 +9,17 @@ WITH events AS ( SELECT - * + block_height, + block_timestamp, + tx_id, + event_id, + event_index, + event_type, + event_contract, + event_data, + _inserted_timestamp, + _partition_by_block_id, + modified_timestamp FROM {{ ref('silver__streamline_events') }} WHERE @@ -17,9 +27,9 @@ WITH events AS ( AND ARRAY_CONTAINS('name' :: variant, object_keys(event_data)) {% if is_incremental() %} -AND _inserted_timestamp >= ( +AND modified_timestamp >= ( SELECT - MAX(_inserted_timestamp) + MAX(modified_timestamp) FROM {{ this }} ) @@ -28,15 +38,23 @@ AND _inserted_timestamp >= ( org AS ( SELECT tx_id, + event_id, block_timestamp, event_contract, event_data :id :: STRING AS series_id, event_data :name :: STRING AS series_name, - _inserted_timestamp + _inserted_timestamp, + _partition_by_block_id FROM events ) SELECT - * + *, + {{ dbt_utils.generate_surrogate_key( + ['event_id'] + ) }} AS nft_moment_series_id, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp, + '{{ invocation_id }}' AS _invocation_id FROM org diff --git a/models/silver/nft/metadata/onchain/silver__nft_moment_set_s.sql b/models/silver/nft/metadata/onchain/silver__nft_moment_set_s.sql index 39d69f6..d0973c8 100644 --- a/models/silver/nft/metadata/onchain/silver__nft_moment_set_s.sql +++ b/models/silver/nft/metadata/onchain/silver__nft_moment_set_s.sql @@ -9,7 +9,17 @@ WITH events AS ( SELECT - * + block_height, + block_timestamp, + tx_id, + event_id, + event_index, + event_type, + event_contract, + event_data, + _inserted_timestamp, + _partition_by_block_id, + modified_timestamp FROM {{ ref('silver__streamline_events') }} WHERE @@ -17,9 +27,9 @@ WITH events AS ( AND ARRAY_CONTAINS('name' :: variant, object_keys(event_data)) {% if is_incremental() %} -AND _inserted_timestamp >= ( +AND modified_timestamp >= ( SELECT - MAX(_inserted_timestamp) + MAX(modified_timestamp) FROM {{ this }} ) @@ -28,17 +38,25 @@ AND _inserted_timestamp >= ( org AS ( SELECT tx_id, + event_id, block_timestamp, event_contract, event_data :id :: STRING AS set_id, event_data :name :: STRING AS set_name, - _inserted_timestamp + _inserted_timestamp, + _partition_by_block_id FROM events WHERE set_id IS NOT NULL ) SELECT - * + *, + {{ dbt_utils.generate_surrogate_key( + ['event_id'] + ) }} AS nft_moment_set_id, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp, + '{{ invocation_id }}' AS _invocation_id FROM org diff --git a/models/silver/nft/metadata/onchain/silver__nft_moments_s.sql b/models/silver/nft/metadata/onchain/silver__nft_moments_s.sql index 8bfd1c2..3839d89 100644 --- a/models/silver/nft/metadata/onchain/silver__nft_moments_s.sql +++ b/models/silver/nft/metadata/onchain/silver__nft_moments_s.sql @@ -6,28 +6,21 @@ tags = ['nft', 'scheduled', 'streamline_scheduled', 'scheduled_non_core'] ) }} -WITH events AS ( +WITH moment_events AS ( SELECT - * + block_height, + block_timestamp, + tx_id, + event_id, + event_index, + event_type, + event_contract, + event_data, + _inserted_timestamp, + _partition_by_block_id FROM {{ ref('silver__streamline_events') }} - -{% if is_incremental() %} -WHERE - _inserted_timestamp >= ( - SELECT - MAX(_inserted_timestamp) - FROM - {{ this }} - ) -{% endif %} -), -moment_events AS ( - SELECT - * - FROM - events WHERE event_type IN ( 'MomentPurchased', @@ -40,8 +33,23 @@ moment_events AS ( 'MomentMinted', 'MomentNFTMinted' ) +{% if is_incremental() %} +AND + modified_timestamp >= ( + SELECT + MAX(modified_timestamp) + FROM + {{ this }} + ) +{% endif %} ) SELECT - * + *, + {{ dbt_utils.generate_surrogate_key( + ['event_id'] + ) }} AS nft_moments_id, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp, + '{{ invocation_id }}' AS _invocation_id FROM moment_events diff --git a/models/silver/nft/metadata/silver__nft_allday_metadata.sql b/models/silver/nft/metadata/silver__nft_allday_metadata.sql index e07f0b1..b4ab825 100644 --- a/models/silver/nft/metadata/silver__nft_allday_metadata.sql +++ b/models/silver/nft/metadata/silver__nft_allday_metadata.sql @@ -6,6 +6,7 @@ ) }} {# Note - removed schedule tag as the legacy lambda workflow is inactive. No need to query external table #} +{# Not updating incremental to modts logic due to above comment JMF 6/7/2024 #} WITH metadata AS ( SELECT diff --git a/models/silver/nft/silver__nft_giglabs_s.sql b/models/silver/nft/silver__nft_giglabs_s.sql index 8ca3c8b..7c4d9d3 100644 --- a/models/silver/nft/silver__nft_giglabs_s.sql +++ b/models/silver/nft/silver__nft_giglabs_s.sql @@ -9,15 +9,25 @@ WITH events AS ( SELECT - * + block_height, + block_timestamp, + tx_id, + tx_succeeded, + event_index, + event_type, + event_contract, + event_data, + _inserted_timestamp, + _partition_by_block_id, + modified_timestamp FROM {{ ref('silver__streamline_events') }} {% if is_incremental() %} WHERE - _inserted_timestamp >= ( + modified_timestamp >= ( SELECT - MAX(_inserted_timestamp) + MAX(modified_timestamp) FROM {{ this }} ) @@ -36,9 +46,9 @@ mapped_sales AS ( {% if is_incremental() %} WHERE - _inserted_timestamp >= ( + modified_timestamp >= ( SELECT - MAX(_inserted_timestamp) + MAX(modified_timestamp) FROM {{ this }} ) @@ -54,7 +64,16 @@ duc AS ( ), duc_events AS ( SELECT - * + block_height, + block_timestamp, + tx_id, + event_index, + event_type, + event_contract, + event_data, + _inserted_timestamp, + _partition_by_block_id, + modified_timestamp FROM events WHERE @@ -132,6 +151,7 @@ missing_contract AS ( block_height, tx_succeeded, _inserted_timestamp, + _partition_by_block_id, event_contract AS currency, event_data :amount :: DOUBLE AS amount, event_data :from :: STRING AS forwarded_from, @@ -149,6 +169,7 @@ purchase_amt AS ( block_height, tx_succeeded, _inserted_timestamp, + _partition_by_block_id, 'A.ead892083b3e2c6c.DapperUtilityCoin' AS currency, event_data :amount :: DOUBLE AS amount, event_data :from :: STRING AS forwarded_from, @@ -212,6 +233,7 @@ gl_sales AS ( p.block_height, p.tx_succeeded, p._inserted_timestamp, + p._partition_by_block_id, 'Gigantik Primary Market' AS marketplace, p.missing, p.currency, @@ -266,7 +288,8 @@ giglabs_final AS ( withdraw_nft_id AS nft_id, m.nfts, tx_succeeded, - _inserted_timestamp + _inserted_timestamp, + _partition_by_block_id FROM gl_sales s LEFT JOIN multi m USING (tx_id) @@ -330,6 +353,7 @@ FINAL AS ( counterparties, tx_succeeded, _inserted_timestamp, + _partition_by_block_id, SYSDATE() AS inserted_timestamp, SYSDATE() AS modified_timestamp, '{{ invocation_id }}' AS _invocation_id diff --git a/models/silver/nft/silver__nft_sales_s.sql b/models/silver/nft/silver__nft_sales_s.sql index c693b83..78752fa 100644 --- a/models/silver/nft/silver__nft_sales_s.sql +++ b/models/silver/nft/silver__nft_sales_s.sql @@ -15,9 +15,9 @@ WITH topshot AS ( {% if is_incremental() %} WHERE - _inserted_timestamp >= ( + modified_timestamp >= ( SELECT - MAX(_inserted_timestamp) + MAX(modified_timestamp) FROM {{ this }} ) @@ -31,9 +31,9 @@ secondary_mkts AS ( {% if is_incremental() %} WHERE - _inserted_timestamp >= ( + modified_timestamp >= ( SELECT - MAX(_inserted_timestamp) + MAX(modified_timestamp) FROM {{ this }} ) @@ -47,9 +47,9 @@ giglabs AS ( {% if is_incremental() %} WHERE - _inserted_timestamp >= ( + modified_timestamp >= ( SELECT - MAX(_inserted_timestamp) + MAX(modified_timestamp) FROM {{ this }} ) @@ -69,6 +69,7 @@ combo AS ( currency, tx_succeeded, _inserted_timestamp, + _partition_by_block_id, {{ dbt_utils.generate_surrogate_key( ['tx_id','seller', 'buyer', 'nft_collection', 'nft_id'] ) }} AS nft_sales_id, @@ -79,7 +80,7 @@ combo AS ( counterparties FROM topshot - UNION + UNION ALL SELECT tx_id, block_height, @@ -93,6 +94,7 @@ combo AS ( currency, tx_succeeded, _inserted_timestamp, + _partition_by_block_id, {{ dbt_utils.generate_surrogate_key( ['tx_id','seller', 'buyer', 'nft_collection', 'nft_id'] ) }} AS nft_sales_id, @@ -103,7 +105,7 @@ combo AS ( counterparties FROM secondary_mkts - UNION + UNION ALL SELECT tx_id, block_height, @@ -117,6 +119,7 @@ combo AS ( currency, tx_succeeded, _inserted_timestamp, + _partition_by_block_id, {{ dbt_utils.generate_surrogate_key( ['tx_id','seller', 'buyer', 'nft_collection', 'nft_id'] ) }} AS nft_sales_id, diff --git a/models/silver/nft/silver__nft_topshot_sales_s.sql b/models/silver/nft/silver__nft_topshot_sales_s.sql index 3787826..87f823a 100644 --- a/models/silver/nft/silver__nft_topshot_sales_s.sql +++ b/models/silver/nft/silver__nft_topshot_sales_s.sql @@ -9,7 +9,17 @@ WITH silver_events AS ( SELECT - * + block_height, + block_timestamp, + tx_id, + tx_succeeded, + event_index, + event_type, + event_contract, + event_data, + _inserted_timestamp, + _partition_by_block_id, + modified_timestamp FROM {{ ref('silver__streamline_events') }} -- WHERE @@ -17,9 +27,9 @@ WITH silver_events AS ( {% if is_incremental() %} WHERE - _inserted_timestamp >= ( + modified_timestamp >= ( SELECT - MAX(_inserted_timestamp) + MAX(modified_timestamp) FROM {{ this }} ) @@ -35,7 +45,8 @@ moment_data AS ( event_data :price :: DOUBLE AS price, event_data :seller :: STRING AS seller, tx_succeeded, - _inserted_timestamp + _inserted_timestamp, + _partition_by_block_id FROM silver_events WHERE @@ -86,7 +97,8 @@ combo AS ( price, currency, tx_succeeded, - _inserted_timestamp + _inserted_timestamp, + _partition_by_block_id FROM moment_data LEFT JOIN currency_data USING (tx_id) @@ -143,6 +155,7 @@ FINAL AS ( currency, tx_succeeded, _inserted_timestamp, + _partition_by_block_id, {{ dbt_utils.generate_surrogate_key( ['tx_id'] ) }} AS nft_topshot_sales_id, diff --git a/models/silver/nft/silver__nft_transactions_secondary_market_s.sql b/models/silver/nft/silver__nft_transactions_secondary_market_s.sql index 7a95dd1..61d19c9 100644 --- a/models/silver/nft/silver__nft_transactions_secondary_market_s.sql +++ b/models/silver/nft/silver__nft_transactions_secondary_market_s.sql @@ -9,15 +9,25 @@ WITH silver_events AS ( SELECT - * + block_height, + block_timestamp, + tx_id, + tx_succeeded, + event_index, + event_type, + event_contract, + event_data, + _inserted_timestamp, + _partition_by_block_id, + modified_timestamp FROM {{ ref('silver__streamline_events') }} {% if is_incremental() %} WHERE - _inserted_timestamp >= ( + modified_timestamp >= ( SELECT - MAX(_inserted_timestamp) + MAX(modified_timestamp) FROM {{ this }} ) @@ -42,7 +52,8 @@ sale_trigger AS ( ), TRUE ) AS is_purchased, - _inserted_timestamp + _inserted_timestamp, + _partition_by_block_id FROM silver_events WHERE @@ -389,7 +400,8 @@ nft_sales AS ( b.nft_collection_deposit, b.nft_id_deposit, b.buyer_deposit, - e._inserted_timestamp + e._inserted_timestamp, + e._partition_by_block_id FROM sale_trigger e LEFT JOIN token_withdraw_event w USING (tx_id) @@ -470,7 +482,8 @@ FINAL AS ( cd.step_data, cd.counterparties, tx_succeeded, - _inserted_timestamp + _inserted_timestamp, + _partition_by_block_id FROM nft_sales ns LEFT JOIN counterparty_data cd USING (tx_id) diff --git a/models/silver/staking/silver__staking_actions_s.sql b/models/silver/staking/silver__staking_actions_s.sql index b20741c..b3f5892 100644 --- a/models/silver/staking/silver__staking_actions_s.sql +++ b/models/silver/staking/silver__staking_actions_s.sql @@ -9,15 +9,25 @@ WITH silver_events AS ( SELECT - * + block_height, + block_timestamp, + tx_id, + tx_succeeded, + event_index, + event_type, + event_contract, + event_data, + _inserted_timestamp, + _partition_by_block_id, + modified_timestamp FROM {{ ref('silver__streamline_events') }} {% if is_incremental() %} WHERE - _inserted_timestamp >= ( + modified_timestamp >= ( SELECT - MAX(_inserted_timestamp) + MAX(modified_timestamp) FROM {{ this }} ) @@ -27,7 +37,7 @@ WHERE FROM {{ this }} WHERE - _inserted_timestamp >= SYSDATE() - INTERVAL '3 days' + modified_timestamp >= SYSDATE() - INTERVAL '3 days' AND delegator IS NULL ) {% endif %} @@ -44,7 +54,8 @@ flow_staking AS ( event_data :amount :: FLOAT AS amount, event_data :delegatorID :: STRING AS delegator_id, event_data :nodeID :: STRING AS node_id, - _inserted_timestamp + _inserted_timestamp, + _partition_by_block_id FROM silver_events WHERE @@ -77,9 +88,9 @@ add_auth AS ( {% if is_incremental() %} AND ( - _inserted_timestamp >= ( + modified_timestamp >= ( SELECT - MAX(_inserted_timestamp) + MAX(modified_timestamp) FROM {{ this }} ) @@ -89,7 +100,7 @@ AND ( FROM {{ this }} WHERE - _inserted_timestamp >= SYSDATE() - INTERVAL '3 days' + modified_timestamp >= SYSDATE() - INTERVAL '3 days' AND delegator IS NULL ) ) @@ -107,6 +118,7 @@ FINAL AS ( amount, node_id, _inserted_timestamp, + _partition_by_block_id, {{ dbt_utils.generate_surrogate_key( ['tx_id', 'event_index', 'action'] ) }} AS staking_actions_id, diff --git a/models/silver/streamline/core/complete/streamline__complete_get_blocks_history.sql b/models/silver/streamline/core/complete/streamline__complete_get_blocks_history.sql index d0c04e5..2901691 100644 --- a/models/silver/streamline/core/complete/streamline__complete_get_blocks_history.sql +++ b/models/silver/streamline/core/complete/streamline__complete_get_blocks_history.sql @@ -1,4 +1,4 @@ --- depends_on: {{ ref('bronze__streamline_blocks_history') }} +-- depends_on: {{ ref('bronze__streamline_complete_blocks_history') }} {{ config ( materialized = "incremental", unique_key = "block_number", @@ -15,7 +15,7 @@ SELECT _inserted_timestamp FROM -{{ ref('bronze__streamline_blocks_history') }} +{{ ref('bronze__streamline_complete_blocks_history') }} WHERE TRUE diff --git a/models/silver/streamline/core/complete/streamline__complete_get_collections_history.sql b/models/silver/streamline/core/complete/streamline__complete_get_collections_history.sql index 79ec85d..a1c306c 100644 --- a/models/silver/streamline/core/complete/streamline__complete_get_collections_history.sql +++ b/models/silver/streamline/core/complete/streamline__complete_get_collections_history.sql @@ -1,4 +1,4 @@ --- depends_on: {{ ref('bronze__streamline_collections_history') }} +-- depends_on: {{ ref('bronze__streamline_complete_collections_history') }} {{ config ( materialized = "incremental", unique_key = "id", @@ -16,7 +16,7 @@ SELECT _inserted_timestamp FROM -{{ ref('bronze__streamline_collections_history') }} +{{ ref('bronze__streamline_complete_collections_history') }} WHERE TRUE {% if is_incremental() %} diff --git a/models/silver/streamline/core/complete/streamline__complete_get_transaction_results_history.sql b/models/silver/streamline/core/complete/streamline__complete_get_transaction_results_history.sql index dc42240..c7e4a7b 100644 --- a/models/silver/streamline/core/complete/streamline__complete_get_transaction_results_history.sql +++ b/models/silver/streamline/core/complete/streamline__complete_get_transaction_results_history.sql @@ -1,4 +1,4 @@ --- depends_on: {{ ref('bronze__streamline_transaction_results_history') }} +-- depends_on: {{ ref('bronze__streamline_complete_transaction_results_history') }} {{ config ( materialized = "incremental", unique_key = "id", @@ -16,7 +16,7 @@ SELECT _inserted_timestamp FROM -{{ ref('bronze__streamline_transaction_results_history') }} +{{ ref('bronze__streamline_complete_transaction_results_history') }} WHERE TRUE {% if is_incremental() %} diff --git a/models/silver/streamline/core/complete/streamline__complete_get_transactions_history.sql b/models/silver/streamline/core/complete/streamline__complete_get_transactions_history.sql index b1b2e09..71c676d 100644 --- a/models/silver/streamline/core/complete/streamline__complete_get_transactions_history.sql +++ b/models/silver/streamline/core/complete/streamline__complete_get_transactions_history.sql @@ -1,4 +1,4 @@ --- depends_on: {{ ref('bronze__streamline_transactions_history') }} +-- depends_on: {{ ref('bronze__streamline_complete_transactions_history') }} {{ config ( materialized = "incremental", unique_key = "id", @@ -16,7 +16,7 @@ SELECT _inserted_timestamp FROM -{{ ref('bronze__streamline_transactions_history') }} +{{ ref('bronze__streamline_complete_transactions_history') }} WHERE TRUE {% if is_incremental() %} diff --git a/models/silver/transfers/silver__bridge_blocto_s.sql b/models/silver/transfers/silver__bridge_blocto_s.sql index 589ab4c..4aa3a73 100644 --- a/models/silver/transfers/silver__bridge_blocto_s.sql +++ b/models/silver/transfers/silver__bridge_blocto_s.sql @@ -9,7 +9,16 @@ WITH events AS ( SELECT - * + block_height, + block_timestamp, + tx_id, + event_index, + event_type, + event_contract, + event_data, + _inserted_timestamp, + _partition_by_block_id, + modified_timestamp FROM {{ ref('silver__streamline_events') }} -- WHERE @@ -17,9 +26,9 @@ WITH events AS ( {% if is_incremental() %} WHERE - _inserted_timestamp >= ( + modified_timestamp >= ( SELECT - MAX(_inserted_timestamp) + MAX(modified_timestamp) FROM {{ this }} ) @@ -33,7 +42,8 @@ teleport_events AS ( event_contract AS teleport_contract_fee, event_data :amount :: DOUBLE AS amount_fee, event_data :type :: NUMBER AS teleport_direction, - _inserted_timestamp + _inserted_timestamp, + _partition_by_block_id FROM events WHERE @@ -113,7 +123,8 @@ blocto_inbound AS ( d.to_deposits AS flow_wallet_address, f.teleport_direction, 'blocto' AS bridge, - f._inserted_timestamp + f._inserted_timestamp, + f._partition_by_block_id FROM teleports_in t LEFT JOIN deposits d USING (tx_id) @@ -217,7 +228,8 @@ blocto_outbound AS ( w.from_withdraw AS flow_wallet_address, f.teleport_direction, 'blocto' AS bridge, - f._inserted_timestamp + f._inserted_timestamp, + f._partition_by_block_id FROM teleports_out t LEFT JOIN teleports_out_withdraw w USING (tx_id) @@ -236,7 +248,8 @@ tbl_union AS ( flow_wallet_address, teleport_direction, bridge, - _inserted_timestamp + _inserted_timestamp, + _partition_by_block_id FROM blocto_inbound UNION @@ -252,7 +265,8 @@ tbl_union AS ( flow_wallet_address, teleport_direction, bridge, - _inserted_timestamp + _inserted_timestamp, + _partition_by_block_id FROM blocto_outbound ), @@ -281,6 +295,7 @@ FINAL AS ( l.blockchain, bridge, _inserted_timestamp, + _partition_by_block_id, {{ dbt_utils.generate_surrogate_key( ['tx_id'] ) }} AS bridge_blocto_id, diff --git a/models/silver/transfers/silver__bridge_celer_s.sql b/models/silver/transfers/silver__bridge_celer_s.sql index 7c5a339..f8a4597 100644 --- a/models/silver/transfers/silver__bridge_celer_s.sql +++ b/models/silver/transfers/silver__bridge_celer_s.sql @@ -9,7 +9,17 @@ WITH events AS ( SELECT - * + block_height, + block_timestamp, + tx_id, + tx_succeeded, + event_index, + event_type, + event_contract, + event_data, + _inserted_timestamp, + _partition_by_block_id, + modified_timestamp FROM {{ ref('silver__streamline_events') }} -- WHERE @@ -17,9 +27,9 @@ WITH events AS ( {% if is_incremental() %} WHERE - _inserted_timestamp >= ( + modified_timestamp >= ( SELECT - MAX(_inserted_timestamp) + MAX(modified_timestamp) FROM {{ this }} ) @@ -35,7 +45,8 @@ cbridge_txs AS ( event_contract, event_type, event_data, - _inserted_timestamp + _inserted_timestamp, + _partition_by_block_id FROM events WHERE @@ -54,7 +65,8 @@ inbound AS ( event_data :refChId :: NUMBER AS chain_id, 'inbound' AS direction, 'cbridge' AS bridge, - _inserted_timestamp + _inserted_timestamp, + _partition_by_block_id FROM events WHERE @@ -82,7 +94,8 @@ outbound AS ( event_data :toChain :: NUMBER AS chain_id, 'outbound' AS direction, 'cbridge' AS bridge, - _inserted_timestamp + _inserted_timestamp, + _partition_by_block_id FROM events WHERE @@ -107,7 +120,8 @@ tbl_union AS ( chain_id, direction, bridge, - _inserted_timestamp + _inserted_timestamp, + _partition_by_block_id FROM inbound UNION @@ -123,7 +137,8 @@ tbl_union AS ( chain_id, direction, bridge, - _inserted_timestamp + _inserted_timestamp, + _partition_by_block_id FROM outbound ), @@ -149,6 +164,7 @@ FINAL AS ( direction, bridge, _inserted_timestamp, + _partition_by_block_id, {{ dbt_utils.generate_surrogate_key( ['tx_id'] ) }} AS bridge_celer_id, diff --git a/models/silver/transfers/silver__token_transfers_s.sql b/models/silver/transfers/silver__token_transfers_s.sql index cadea9d..ace4ffd 100644 --- a/models/silver/transfers/silver__token_transfers_s.sql +++ b/models/silver/transfers/silver__token_transfers_s.sql @@ -9,7 +9,17 @@ WITH events AS ( SELECT - * + block_height, + block_timestamp, + tx_id, + tx_succeeded, + event_index, + event_type, + event_contract, + event_data, + _inserted_timestamp, + _partition_by_block_id, + modified_timestamp FROM {{ ref('silver__streamline_events') }} -- WHERE @@ -17,9 +27,9 @@ WITH events AS ( {% if is_incremental() %} WHERE - _inserted_timestamp >= ( + modified_timestamp >= ( SELECT - MAX(_inserted_timestamp) + MAX(modified_timestamp) FROM {{ this }} ) @@ -144,6 +154,7 @@ FINAL AS ( ) SELECT *, + round(block_height, -5) AS _partition_by_block_id, {{ dbt_utils.generate_surrogate_key( ['tx_id','sender', 'recipient','token_contract', 'amount'] ) }} AS token_transfers_id,