diff --git a/models/silver/core/lake/silver__streamline_blocks.sql b/models/silver/core/lake/silver__streamline_blocks.sql index e76813b..123de22 100644 --- a/models/silver/core/lake/silver__streamline_blocks.sql +++ b/models/silver/core/lake/silver__streamline_blocks.sql @@ -6,7 +6,7 @@ merge_exclude_columns = ['inserted_timestamp'], cluster_by = ['block_timestamp::DATE','_inserted_timestamp::DATE', '_partition_by_block_number'], unique_key = 'block_id', - tags = ['load', 'load_blocks','scheduled_core'], + tags = ['load', 'load_blocks','scheduled_core', 'deprecated_lake_archive'], full_refresh = False ) }} diff --git a/models/silver/core/lake/silver__streamline_receipts.sql b/models/silver/core/lake/silver__streamline_receipts.sql index 1da7fc2..18963f7 100644 --- a/models/silver/core/lake/silver__streamline_receipts.sql +++ b/models/silver/core/lake/silver__streamline_receipts.sql @@ -5,7 +5,7 @@ merge_exclude_columns = ['inserted_timestamp'], unique_key = 'receipt_id', cluster_by = ['modified_timestamp::date', '_partition_by_block_number'], - tags = ['load', 'load_shards','scheduled_core'] + tags = ['load', 'load_shards','scheduled_core', 'deprecated_lake_archive'] ) }} WITH shards AS ( diff --git a/models/silver/core/lake/silver__streamline_receipts_final.sql b/models/silver/core/lake/silver__streamline_receipts_final.sql index 260dbf5..cd1ebea 100644 --- a/models/silver/core/lake/silver__streamline_receipts_final.sql +++ b/models/silver/core/lake/silver__streamline_receipts_final.sql @@ -6,7 +6,7 @@ unique_key = 'receipt_object_id', cluster_by = ['block_timestamp::DATE','modified_timestamp::DATE', '_partition_by_block_number', ], post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(tx_hash,receipt_id,receiver_id,predecessor_id);", - tags = ['receipt_map','scheduled_core'], + tags = ['receipt_map','scheduled_core', 'deprecated_lake_archive'], full_refresh = False ) }} diff --git a/models/silver/core/lake/silver__streamline_shards.sql b/models/silver/core/lake/silver__streamline_shards.sql index e7682bb..bb50f80 100644 --- a/models/silver/core/lake/silver__streamline_shards.sql +++ b/models/silver/core/lake/silver__streamline_shards.sql @@ -5,7 +5,7 @@ merge_exclude_columns = ['inserted_timestamp'], cluster_by = ['_inserted_timestamp::DATE', '_partition_by_block_number'], unique_key = 'shard_id', - tags = ['load', 'load_shards','scheduled_core'], + tags = ['load', 'load_shards','scheduled_core', 'deprecated_lake_archive'], full_refresh = False ) }} diff --git a/models/silver/core/lake/silver__streamline_transactions_final.sql b/models/silver/core/lake/silver__streamline_transactions_final.sql index b2d772d..7fad2bc 100644 --- a/models/silver/core/lake/silver__streamline_transactions_final.sql +++ b/models/silver/core/lake/silver__streamline_transactions_final.sql @@ -3,7 +3,7 @@ incremental_strategy = 'delete+insert', unique_key = 'tx_hash', cluster_by = ['block_timestamp::DATE','modified_timestamp::DATE', '_partition_by_block_number'], - tags = ['receipt_map','scheduled_core'] + tags = ['receipt_map','scheduled_core', 'deprecated_lake_archive'] ) }} WITH int_txs AS ( diff --git a/models/silver/core/migration/_migrate_blocks.sql b/models/silver/core/migration/_migrate_blocks.sql new file mode 100644 index 0000000..1bbe6f5 --- /dev/null +++ b/models/silver/core/migration/_migrate_blocks.sql @@ -0,0 +1,24 @@ +{{ config( + materialized = 'ephemeral' +) }} + +SELECT + block_id, + block_timestamp, + block_hash, + prev_hash, + block_author, + chunks AS chunks_json, + header AS header_json, + _partition_by_block_number, + streamline_blocks_id, + inserted_timestamp, + modified_timestamp, + _invocation_id +FROM + {{ ref('silver__streamline_blocks') }} + + {% if var("BATCH_MIGRATE") %} + WHERE + {{ partition_load_manual('no_buffer') }} + {% endif %} diff --git a/models/silver/core/migration/_migrate_receipts.sql b/models/silver/core/migration/_migrate_receipts.sql index 5a88f64..b32b5e1 100644 --- a/models/silver/core/migration/_migrate_receipts.sql +++ b/models/silver/core/migration/_migrate_receipts.sql @@ -1,17 +1,27 @@ -{{ - config( - 'materialized' = 'ephemeral' - ) -}} +{{ config( + 'materialized' = 'ephemeral' +) }} SELECT chunk_hash, block_id, block_timestamp, tx_hash, - COALESCE(receipt_id, receipt_object_id) AS receipt_id, + COALESCE( + receipt_id, + receipt_object_id + ) AS receipt_id, receipt_actions AS receipt_json, execution_outcome AS outcome_json, - _partition_by_block_number + _partition_by_block_number, + streamline_receipts_final_id, + inserted_timestamp, + modified_timestamp, + _invocation_id FROM {{ ref('silver__streamline_receipts_final') }} + + {% if var("BATCH_MIGRATE") %} + WHERE + {{ partition_load_manual('no_buffer') }} + {% endif %} diff --git a/models/silver/core/migration/_migrate_txs.sql b/models/silver/core/migration/_migrate_txs.sql index f203ca5..42a0405 100644 --- a/models/silver/core/migration/_migrate_txs.sql +++ b/models/silver/core/migration/_migrate_txs.sql @@ -1,7 +1,7 @@ {{ config( materialized = 'ephemeral' ) }} --- likely need to add batch logic for the migration + WITH lake_transactions_final AS ( SELECT @@ -13,9 +13,18 @@ WITH lake_transactions_final AS ( gas_used, transaction_fee, attached_gas, - _partition_by_block_number + _partition_by_block_number, + streamline_transactions_final_id, + inserted_timestamp, + modified_timestamp, + _invocation_id FROM {{ ref('silver__streamline_transactions_final') }} + + {% if var("BATCH_MIGRATE") %} + WHERE + {{ partition_load_manual('no_buffer') }} + {% endif %} ), lake_transactions_int AS ( SELECT @@ -28,6 +37,11 @@ lake_transactions_int AS ( _partition_by_block_number FROM {{ ref('silver__streamline_transactions') }} + + {% if var("BATCH_MIGRATE") %} + WHERE + {{ partition_load_manual('no_buffer') }} + {% endif %} ), transaction_archive AS ( SELECT diff --git a/models/silver/core/silver__blocks_final.sql b/models/silver/core/silver__blocks_final.sql new file mode 100644 index 0000000..f5f47b0 --- /dev/null +++ b/models/silver/core/silver__blocks_final.sql @@ -0,0 +1,54 @@ +{{ config( + materialized = 'incremental', + incremental_predicates = ["COALESCE(DBT_INTERNAL_DEST.block_timestamp::DATE,'2099-12-31') >= (select min(block_timestamp::DATE) from " ~ generate_tmp_view_name(this) ~ ")"], + incremental_strategy = 'merge', + merge_exclude_columns = ['inserted_timestamp'], + unique_key = 'block_id', + cluster_by = ['block_timestamp::DATE','modified_timestamp::DATE', '_partition_by_block_number'], + post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(block_id);", + tags = ['scheduled_core'] +) }} + +{% if var('NEAR_MIGRATE_ARCHIVE', False) %} + + SELECT + block_id, + block_timestamp, + block_hash, + prev_hash, + block_author, + chunks_json, + header_json, + _partition_by_block_number, + streamline_blocks_id AS blocks_final_id, + inserted_timestamp, + SYSDATE() AS modified_timestamp, + '{{ invocation_id }}' AS _invocation_id + FROM + {{ ref('silver__streamline_blocks') }} + +{% else %} + +WITH blocks AS ( + SELECT + block_id, + block_timestamp, + block_hash, + block_json :prev_hash :: STRING AS prev_hash, + block_json :author :: STRING AS block_author, + block_json :chunks :: ARRAY AS chunks_json, + block_json :header :: OBJECT AS header_json, + partition_key AS _partition_by_block_number + FROM + {{ ref('silver__blocks_v2') }} +) +SELECT + *, + {{ dbt_utils.generate_surrogate_key( + ['block_id'] + ) }} AS blocks_final_id, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp, + '{{ invocation_id }}' AS _invocation_id +FROM + blocks diff --git a/models/silver/core/silver__receipts_final.sql b/models/silver/core/silver__receipts_final.sql index 7aae758..6837c20 100644 --- a/models/silver/core/silver__receipts_final.sql +++ b/models/silver/core/silver__receipts_final.sql @@ -20,13 +20,13 @@ receipt_id, receipt_json, outcome_json, - _partition_by_block_number + _partition_by_block_number, + streamline_receipts_final_id AS receipts_final_id, + inserted_timestamp, + SYSDATE() AS modified_timestamp, + '{{ invocation_id }}' AS _invocation_id FROM {{ ref('_migrate_receipts') }} - {% if var("BATCH_MIGRATE") %} - WHERE - {{ partition_load_manual('no_buffer') }} - {% endif %} {% else %} @@ -59,7 +59,8 @@ flatten_receipts AS ( chunk_hash, tx_hash, tx_succeeded, - VALUE :: variant AS receipt_json + VALUE :: variant AS receipt_json, + _partition_by_block_number FROM txs_with_receipts, LATERAL FLATTEN( @@ -88,7 +89,8 @@ receipts_full AS ( r.receipt_id, receipt_json, outcome_json, - tx_succeeded + tx_succeeded, + _partition_by_block_number FROM flatten_receipts r LEFT JOIN flatten_receipt_outcomes ro diff --git a/models/silver/core/silver__transactions_final.sql b/models/silver/core/silver__transactions_final.sql index a592d0a..f6e5c2e 100644 --- a/models/silver/core/silver__transactions_final.sql +++ b/models/silver/core/silver__transactions_final.sql @@ -30,18 +30,12 @@ transaction_fee, attached_gas, _partition_by_block_number, - {{ dbt_utils.generate_surrogate_key( - ['tx_hash'] - ) }} AS transactions_final_id, - SYSDATE() AS inserted_timestamp, + streamline_transactions_final_id AS transactions_final_id, + inserted_timestamp, SYSDATE() AS modified_timestamp, '{{ invocation_id }}' AS _invocation_id FROM {{ ref('_migrate_txs') }} - {% if var("BATCH_MIGRATE") %} - WHERE - {{ partition_load_manual('no_buffer') }} - {% endif %} {% else %}