From f626b51957934f8e1d1d7deedb4cb310a638f052 Mon Sep 17 00:00:00 2001 From: forgash_ Date: Tue, 24 Jan 2023 16:37:26 -0700 Subject: [PATCH] macro for manual batch refresh --- macros/manual_batch_refresh.sql | 37 +++++++++++++++++++ macros/partition_batch_load.sql | 27 +------------- macros/{ => tests}/sequence_gaps.sql | 0 macros/{ => tests}/tx_gaps.sql | 0 .../silver__flatten_receipts.sql | 8 +++- .../silver__receipt_tx_hash_mapping.sql | 34 ++++++++++------- .../silver__streamline_receipts_final.sql | 12 ++++-- .../silver__streamline_transactions_final.sql | 34 +++++++++++------ 8 files changed, 96 insertions(+), 56 deletions(-) create mode 100644 macros/manual_batch_refresh.sql rename macros/{ => tests}/sequence_gaps.sql (100%) rename macros/{ => tests}/tx_gaps.sql (100%) rename models/silver/streamline/{ => helpers}/silver__flatten_receipts.sql (82%) rename models/silver/streamline/{ => helpers}/silver__receipt_tx_hash_mapping.sql (60%) diff --git a/macros/manual_batch_refresh.sql b/macros/manual_batch_refresh.sql new file mode 100644 index 0000000..1861d77 --- /dev/null +++ b/macros/manual_batch_refresh.sql @@ -0,0 +1,37 @@ +{% macro partition_load_manual( + scope = 'no_buffer' + ) %} + {# if range_start and range_end not set in cli, default to earliest rpc data #} + {% set range_start = var( + 'range_start', + 46700000 + ) %} + {% set range_end = var( + 'range_end', + 47000000 + ) %} + {% set front_buffer = var( + 'front_buffer', + 0 + ) %} + {% set end_buffer = var( + 'end_buffer', + 0 + ) %} + {% if scope == 'front' %} + _partition_by_block_number BETWEEN {{ range_start }} - ( + 10000 * {{ front_buffer }} + ) + AND {{ range_end }} + + {% elif scope == 'end' %} + _partition_by_block_number BETWEEN {{ range_start }} + AND {{ range_end }} + ( + 10000 * {{ end_buffer }} + ) {% elif scope == 'no_buffer' %} + _partition_by_block_number BETWEEN {{ range_start }} + AND {{ range_end }} + {% else %} + TRUE + {% endif %} +{%- endmacro %} diff --git a/macros/partition_batch_load.sql b/macros/partition_batch_load.sql index 4f5ba89..35ab2d5 100644 --- a/macros/partition_batch_load.sql +++ b/macros/partition_batch_load.sql @@ -23,32 +23,6 @@ WHERE {% endif %} {%- endmacro %} -{% macro partition_batch_load_dev(batch_size) %} - -{% if is_incremental() %} -WHERE - _partition_by_block_number > ( - SELECT - MAX(_partition_by_block_number) - FROM - {{ this }} - ) - AND _partition_by_block_number <= ( - ( - SELECT - MAX(_partition_by_block_number) - FROM - {{ this }} - ) + {{ batch_size }} - ) -{%- else -%} -WHERE - {# earliest block in RPC data, use for comparison testing #} - _partition_by_block_number BETWEEN 46670000 - AND 47000000 -{% endif %} -{%- endmacro %} - {% macro partition_incremental_load( batch_size, front_buffer = 0, @@ -76,3 +50,4 @@ WHERE TRUE {% endif %} {%- endmacro %} + diff --git a/macros/sequence_gaps.sql b/macros/tests/sequence_gaps.sql similarity index 100% rename from macros/sequence_gaps.sql rename to macros/tests/sequence_gaps.sql diff --git a/macros/tx_gaps.sql b/macros/tests/tx_gaps.sql similarity index 100% rename from macros/tx_gaps.sql rename to macros/tests/tx_gaps.sql diff --git a/models/silver/streamline/silver__flatten_receipts.sql b/models/silver/streamline/helpers/silver__flatten_receipts.sql similarity index 82% rename from models/silver/streamline/silver__flatten_receipts.sql rename to models/silver/streamline/helpers/silver__flatten_receipts.sql index 04bda30..84b49e3 100644 --- a/models/silver/streamline/silver__flatten_receipts.sql +++ b/models/silver/streamline/helpers/silver__flatten_receipts.sql @@ -1,6 +1,6 @@ {{ config( materialized = 'view', - tags = ['s3', 's3_helper'] + tags = ['s3', 's3_helper', 's3_manual'] ) }} WITH receipts AS ( @@ -17,6 +17,11 @@ WITH receipts AS ( A.outcome_receipts, outer => TRUE ) b + + {% if target.name == 'manual_fix' %} + WHERE + {{ partition_load_manual('front') }} + {% else %} WHERE _partition_by_block_number >= ( SELECT @@ -30,6 +35,7 @@ WITH receipts AS ( FROM silver.streamline_receipts_final ) + 220000 + {% endif %} ) SELECT * diff --git a/models/silver/streamline/silver__receipt_tx_hash_mapping.sql b/models/silver/streamline/helpers/silver__receipt_tx_hash_mapping.sql similarity index 60% rename from models/silver/streamline/silver__receipt_tx_hash_mapping.sql rename to models/silver/streamline/helpers/silver__receipt_tx_hash_mapping.sql index caeac29..6fc4e32 100644 --- a/models/silver/streamline/silver__receipt_tx_hash_mapping.sql +++ b/models/silver/streamline/helpers/silver__receipt_tx_hash_mapping.sql @@ -1,7 +1,7 @@ {{ config( materalized = 'view', unique_key = 'receipt_id', - tags = ['s3', 's3_helper'] + tags = ['s3', 's3_helper', 's3_manual'] ) }} WITH recursive ancestrytree AS ( @@ -28,19 +28,25 @@ txs AS ( * FROM {{ ref('silver__streamline_transactions') }} - WHERE - _partition_by_block_number >= ( - SELECT - MAX(_partition_by_block_number) - FROM - silver.streamline_receipts_final - ) - 20000 - AND _partition_by_block_number <= ( - SELECT - MAX(_partition_by_block_number) - FROM - silver.streamline_receipts_final - ) + 220000 + + {% if target.name == 'manual_fix' %} + WHERE + {{ partition_load_manual('front') }} + {% else %} + WHERE + _partition_by_block_number >= ( + SELECT + MAX(_partition_by_block_number) + FROM + silver.streamline_receipts_final + ) - 20000 + AND _partition_by_block_number <= ( + SELECT + MAX(_partition_by_block_number) + FROM + silver.streamline_receipts_final + ) + 220000 + {% endif %} ), FINAL AS ( SELECT diff --git a/models/silver/streamline/silver__streamline_receipts_final.sql b/models/silver/streamline/silver__streamline_receipts_final.sql index 4c74839..329e33b 100644 --- a/models/silver/streamline/silver__streamline_receipts_final.sql +++ b/models/silver/streamline/silver__streamline_receipts_final.sql @@ -3,7 +3,7 @@ incremental_strategy = 'delete+insert', unique_key = 'receipt_object_id', cluster_by = ['_load_timestamp::date', 'block_id'], - tags = ['s3', 's3_final'] + tags = ['s3', 's3_final', 's3_manual'] ) }} WITH base_receipts AS ( @@ -12,8 +12,14 @@ WITH base_receipts AS ( * FROM {{ ref('silver__streamline_receipts') }} - {{ partition_batch_load(150000) }} - AND {{ incremental_load_filter('_load_timestamp') }} + + {% if target.name == 'manual_fix' %} + WHERE + {{ partition_load_manual('no_buffer') }} + {% else %} + {{ partition_batch_load(150000) }} + AND {{ incremental_load_filter('_load_timestamp') }} + {% endif %} ), blocks AS ( SELECT diff --git a/models/silver/streamline/silver__streamline_transactions_final.sql b/models/silver/streamline/silver__streamline_transactions_final.sql index 477a491..0d66665 100644 --- a/models/silver/streamline/silver__streamline_transactions_final.sql +++ b/models/silver/streamline/silver__streamline_transactions_final.sql @@ -1,9 +1,9 @@ {{ config( materialized = 'incremental', unique_key = 'tx_hash', - incremental_strategy = 'merge', + incremental_strategy = 'delete+insert', cluster_by = ['_load_timestamp::date', 'block_timestamp::date'], - tags = ['s3', 's3_final'] + tags = ['s3', 's3_final', 's3_manual'] ) }} WITH int_txs AS ( @@ -12,24 +12,34 @@ WITH int_txs AS ( * FROM {{ ref('silver__streamline_transactions') }} - {{ partition_incremental_load( - 150000, - 10000, - 0 - ) }} + {% if target.name == 'manual_fix' %} + WHERE + {{ partition_load_manual('no_buffer') }} + {% else %} + {{ partition_incremental_load( + 150000, + 10000, + 0 + ) }} + {% endif %} ), int_receipts AS ( SELECT * FROM {{ ref('silver__streamline_receipts_final') }} - {{ partition_incremental_load( - 150000, - 10000, - 0 - ) }} + {% if target.name == 'manual_fix' %} + WHERE + {{ partition_load_manual('end') }} + {% else %} + {{ partition_incremental_load( + 150000, + 10000, + 0 + ) }} + {% endif %} ), int_blocks AS ( SELECT