From 91b038d427aa009d4bcd9f5ce40865307d7ceae9 Mon Sep 17 00:00:00 2001 From: forgash_ Date: Mon, 24 Apr 2023 16:33:42 -0600 Subject: [PATCH] add condition to load --- README.md | 7 +++++-- models/silver/streamline/silver__load_blocks.sql | 9 ++++++++- models/silver/streamline/silver__load_shards.sql | 9 ++++++++- models/silver/streamline/silver__streamline_blocks.sql | 2 +- .../streamline/silver__streamline_chunk_receipts.sql | 2 +- models/silver/streamline/silver__streamline_chunks.sql | 2 +- .../silver__streamline_receipt_execution_outcome.sql | 2 +- models/silver/streamline/silver__streamline_receipts.sql | 2 +- models/silver/streamline/silver__streamline_shards.sql | 2 +- .../streamline/silver__streamline_transactions.sql | 2 +- 10 files changed, 28 insertions(+), 11 deletions(-) diff --git a/README.md b/README.md index bd3b280..f1970b5 100644 --- a/README.md +++ b/README.md @@ -92,7 +92,7 @@ Any data refresh will need to be done in a batch due to the nature of the receip Models in the `streamline` folder can be run with standard incremental logic up until the 2 final receipt and transaction tables (tagged as such, see below). The next step, mapping receipts to tx hash over a range, can be run with the following command: ``` -dbt run -s tag:receipt_map tag:curated --vars '{"range_start": X, "range_end": Y, "front_buffer": 1, "end_buffer": 1}' -t manual_fix_dev +dbt run -s tag:load_shards tag:receipt_map --vars '{"range_start": X, "range_end": Y, "front_buffer": 1, "end_buffer": 1}' -t manual_fix_dev ``` The target name will determine how the model operates, calling a macro `partition_load_manual()` which takes the variables input in the command to set the range. @@ -115,12 +115,15 @@ To help with targeted refreshes, a number of tags have been applied to the model | Tag | Description | | --- | --- | | load | Runs models that load data into Snowflake from S3. The 2 `load_X` models are staging tables for data, which is then parsed and transformed up until the final txs/receipts models. | +| load_shards | Just the `load` models that touch shards | +| load_blocks | Just the `load` models that touch blocks | | receipt_map | Runs the receipt-mapping models that must use a partition. This set of models cannot simply run with incremental logic due to the recursive tree used to map receipt IDs to Tx Hashes. | | actions | Just the 3 action events models, an important set of intermediary models before curated activity. Note: These are also tagged with `s3_curated`. | | curated | Models that are used to generate the curated tables | | core | All public views are tagged with core, regardless of schema. At this time, that includes `core` and `social`. | -Note: there are other tags that are currently not used. All legacy models are tagged with something that includes `rpc`, but all are presently disabled to avoid an accidental run. +Note: there are other tags that are currently not used. All legacy models are tagged with something that includes `rpc`, but all are presently disabled to avoid an accidental run. +You can visualize these tags by using the DAG Explorer in the [docs](https://flipsidecrypto.github.io/near-models/#!/overview?g_v=1&g_i=tag:load). ### Incremental Load Strategy Because data must be run in batches to properly map receipts to a transaction hash, a conditional is added to curated models using jinja. This should be present on everything after the mapping process. diff --git a/models/silver/streamline/silver__load_blocks.sql b/models/silver/streamline/silver__load_blocks.sql index ff79b1d..976100b 100644 --- a/models/silver/streamline/silver__load_blocks.sql +++ b/models/silver/streamline/silver__load_blocks.sql @@ -4,7 +4,7 @@ cluster_by = ['_partition_by_block_number', '_load_timestamp::DATE'], unique_key = 'block_id', full_refresh = False, - tags = ['load'] + tags = ['load', 'load_blocks'] ) }} WITH blocks_json AS ( @@ -17,8 +17,15 @@ WITH blocks_json AS ( _partition_by_block_number FROM {{ ref('bronze__streamline_blocks') }} + + {% if target.name == 'manual_fix' or target.name == 'manual_fix_dev' %} + WHERE + {{ partition_load_manual('no_buffer') }} + {% else %} WHERE {{ partition_batch_load(150000) }} + {% endif %} + ) SELECT * diff --git a/models/silver/streamline/silver__load_shards.sql b/models/silver/streamline/silver__load_shards.sql index baeb6cd..789c4f0 100644 --- a/models/silver/streamline/silver__load_shards.sql +++ b/models/silver/streamline/silver__load_shards.sql @@ -4,7 +4,7 @@ cluster_by = ['_partition_by_block_number', '_load_timestamp::DATE'], unique_key = 'shard_id', full_refresh = False, - tags = ['load'] + tags = ['load', 'load_shards'] ) }} WITH shards_json AS ( @@ -23,8 +23,15 @@ WITH shards_json AS ( _partition_by_block_number FROM {{ ref('bronze__streamline_shards') }} + + {% if target.name == 'manual_fix' or target.name == 'manual_fix_dev' %} + WHERE + {{ partition_load_manual('no_buffer') }} + {% else %} WHERE {{ partition_batch_load(150000) }} + {% endif %} + ) SELECT * diff --git a/models/silver/streamline/silver__streamline_blocks.sql b/models/silver/streamline/silver__streamline_blocks.sql index 64c1f55..1ac4de7 100644 --- a/models/silver/streamline/silver__streamline_blocks.sql +++ b/models/silver/streamline/silver__streamline_blocks.sql @@ -3,7 +3,7 @@ incremental_strategy = 'delete+insert', cluster_by = ['_partition_by_block_number', '_load_timestamp::DATE'], unique_key = 'block_id', - tags = ['load'] + tags = ['load', 'load_blocks'] ) }} WITH blocksjson AS ( diff --git a/models/silver/streamline/silver__streamline_chunk_receipts.sql b/models/silver/streamline/silver__streamline_chunk_receipts.sql index d28d17d..6327454 100644 --- a/models/silver/streamline/silver__streamline_chunk_receipts.sql +++ b/models/silver/streamline/silver__streamline_chunk_receipts.sql @@ -3,7 +3,7 @@ incremental_strategy = 'merge', unique_key = 'receipt_id', cluster_by = ['_load_timestamp::date', 'block_id'], - tags = ['load'] + tags = ['load', 'load_shards'] ) }} WITH chunks AS ( diff --git a/models/silver/streamline/silver__streamline_chunks.sql b/models/silver/streamline/silver__streamline_chunks.sql index 802d96d..c8d7468 100644 --- a/models/silver/streamline/silver__streamline_chunks.sql +++ b/models/silver/streamline/silver__streamline_chunks.sql @@ -3,7 +3,7 @@ incremental_strategy = 'merge', unique_key = 'chunk_hash', cluster_by = ['_load_timestamp::date','height_created','height_included'], - tags = ['load'] + tags = ['load', 'load_shards'] ) }} WITH shards AS ( diff --git a/models/silver/streamline/silver__streamline_receipt_execution_outcome.sql b/models/silver/streamline/silver__streamline_receipt_execution_outcome.sql index 96f1d14..7851099 100644 --- a/models/silver/streamline/silver__streamline_receipt_execution_outcome.sql +++ b/models/silver/streamline/silver__streamline_receipt_execution_outcome.sql @@ -3,7 +3,7 @@ incremental_strategy = 'merge', unique_key = 'receipt_execution_outcome_id', cluster_by = ['_load_timestamp::date','block_id','chunk_hash'], - tags = ['load'] + tags = ['load', 'load_shards'] ) }} WITH shards AS ( diff --git a/models/silver/streamline/silver__streamline_receipts.sql b/models/silver/streamline/silver__streamline_receipts.sql index 7a073eb..526a6d5 100644 --- a/models/silver/streamline/silver__streamline_receipts.sql +++ b/models/silver/streamline/silver__streamline_receipts.sql @@ -3,7 +3,7 @@ incremental_strategy = 'merge', unique_key = 'receipt_id', cluster_by = ['_load_timestamp::date', 'block_id'], - tags = ['load'] + tags = ['load', 'load_shards'] ) }} WITH receipt_execution_outcomes AS ( diff --git a/models/silver/streamline/silver__streamline_shards.sql b/models/silver/streamline/silver__streamline_shards.sql index 588316e..d915893 100644 --- a/models/silver/streamline/silver__streamline_shards.sql +++ b/models/silver/streamline/silver__streamline_shards.sql @@ -3,7 +3,7 @@ incremental_strategy = 'delete+insert', cluster_by = ['_partition_by_block_number', '_load_timestamp::DATE'], unique_key = ['shard_id'], - tags = ['load'] + tags = ['load', 'load_shards'] ) }} WITH shardsjson AS ( diff --git a/models/silver/streamline/silver__streamline_transactions.sql b/models/silver/streamline/silver__streamline_transactions.sql index e7f3474..7062e9b 100644 --- a/models/silver/streamline/silver__streamline_transactions.sql +++ b/models/silver/streamline/silver__streamline_transactions.sql @@ -3,7 +3,7 @@ incremental_strategy = 'merge', unique_key = 'tx_hash', cluster_by = ['_load_timestamp::date', 'block_id', 'tx_hash'], - tags = ['load'] + tags = ['load', 'load_shards'] ) }} WITH chunks AS (