diff --git a/dbt_project.yml b/dbt_project.yml index 87e0bea..df20eec 100644 --- a/dbt_project.yml +++ b/dbt_project.yml @@ -62,6 +62,7 @@ vars: OBSERV_FULL_TEST: False DBT_FULL_TEST: False STREAMLINE_LOAD_LOOKBACK_HOURS: 3 + IS_MIGRATION: False dispatch: - macro_namespace: dbt diff --git a/models/silver/streamline/silver__streamline_receipts_final.sql b/models/silver/streamline/silver__streamline_receipts_final.sql index 4171db1..d599102 100644 --- a/models/silver/streamline/silver__streamline_receipts_final.sql +++ b/models/silver/streamline/silver__streamline_receipts_final.sql @@ -3,10 +3,11 @@ incremental_strategy = 'merge', merge_exclude_columns = ['inserted_timestamp'], unique_key = 'receipt_object_id', - cluster_by = ['_inserted_timestamp::date', '_partition_by_block_number'], + cluster_by = ['_inserted_timestamp::date', '_modified_timestamp::DATE', '_partition_by_block_number'], tags = ['receipt_map'], full_refresh = False ) }} +{# TODO - check clustering. Add SO? #} WITH retry_range AS ( @@ -14,16 +15,28 @@ WITH retry_range AS ( receipt_object_id, block_id, _partition_by_block_number, - _inserted_timestamp, - _modified_timestamp + _inserted_timestamp + {% if not var('IS_MIGRATION') %} + , _modified_timestamp + {% endif %} FROM {{ this }} + + {% if var('IS_MIGRATION') %} + WHERE + _inserted_timestamp >= SYSDATE() - INTERVAL '1 day' + AND ( + tx_hash IS NULL + OR block_timestamp IS NULL + ) + {% else %} WHERE _modified_timestamp >= SYSDATE() - INTERVAL '1 day' AND ( tx_hash IS NULL OR block_timestamp IS NULL ) + {% endif %} ), base_receipts AS ( SELECT diff --git a/models/silver/streamline/silver__streamline_transactions.sql b/models/silver/streamline/silver__streamline_transactions.sql index adcc424..6a4119f 100644 --- a/models/silver/streamline/silver__streamline_transactions.sql +++ b/models/silver/streamline/silver__streamline_transactions.sql @@ -19,8 +19,14 @@ WITH chunks AS ( FROM {{ ref('silver__streamline_shards') }} WHERE - {{ incremental_load_filter('_inserted_timestamp') }} - AND chunk != 'null' + chunk != 'null' + {% if var('IS_MIGRATION') %} + AND + {{ incremental_load_filter('_inserted_timestamp') }} + {% else %} + AND + {{ incremental_load_filter('_modified_timestamp') }} + {% endif %} ), flatten_transactions AS ( SELECT @@ -86,10 +92,15 @@ FINAL AS ( _inserted_timestamp, _modified_timestamp FROM - txs qualify ROW_NUMBER() over ( + txs + qualify ROW_NUMBER() over ( PARTITION BY tx_hash ORDER BY + {% if var('IS_MIGRATION') %} _inserted_timestamp DESC + {% else %} + _modified_timestamp DESC + {% endif %} ) = 1 ) SELECT diff --git a/models/silver/streamline/silver__streamline_transactions_final.sql b/models/silver/streamline/silver__streamline_transactions_final.sql index a0a0345..ab64bfa 100644 --- a/models/silver/streamline/silver__streamline_transactions_final.sql +++ b/models/silver/streamline/silver__streamline_transactions_final.sql @@ -2,10 +2,12 @@ materialized = 'incremental', unique_key = 'tx_hash', incremental_strategy = 'delete+insert', - cluster_by = ['_inserted_timestamp::date', 'block_timestamp::date'], + cluster_by = ['_inserted_timestamp::date', '_modified_timestamp::DATE', '_partition_by_block_number'], tags = ['receipt_map'] ) }} - +{# TODO - upd * to col selection #} +{# TODO - add _modified_timestamp column #} +{# TODO - check clustering. Add SO? #} WITH int_txs AS ( SELECT @@ -13,16 +15,16 @@ WITH int_txs AS ( FROM {{ ref('silver__streamline_transactions') }} - {% if var("MANUAL_FIX") %} - WHERE - {{ partition_load_manual('no_buffer') }} - {% else %} + {% if var('IS_MIGRATION') %} WHERE {{ partition_incremental_load( 150000, 10000, 0 ) }} + {% else %} + WHERE + {{ incremental_load_filter('_modified_timestamp') }} {% endif %} ), int_receipts AS ( @@ -31,23 +33,25 @@ int_receipts AS ( FROM {{ ref('silver__streamline_receipts_final') }} - {% if var("MANUAL_FIX") %} - WHERE - {{ partition_load_manual('end') }} - {% else %} + {% if var('IS_MIGRATION') %} WHERE {{ partition_incremental_load( 150000, 10000, 0 ) }} + {% else %} + WHERE + {{ incremental_load_filter('_modified_timestamp') }} {% endif %} + ), int_blocks AS ( SELECT * FROM {{ ref('silver__streamline_blocks') }} + {# TODO add WHERE #} ), receipt_array AS ( SELECT