add condition to load

This commit is contained in:
forgash_ 2023-04-24 16:33:42 -06:00
parent 9900c087e3
commit 91b038d427
10 changed files with 28 additions and 11 deletions

View File

@ -92,7 +92,7 @@ Any data refresh will need to be done in a batch due to the nature of the receip
Models in the `streamline` folder can be run with standard incremental logic up until the 2 final receipt and transaction tables (tagged as such, see below). The next step, mapping receipts to tx hash over a range, can be run with the following command:
```
dbt run -s tag:receipt_map tag:curated --vars '{"range_start": X, "range_end": Y, "front_buffer": 1, "end_buffer": 1}' -t manual_fix_dev
dbt run -s tag:load_shards tag:receipt_map --vars '{"range_start": X, "range_end": Y, "front_buffer": 1, "end_buffer": 1}' -t manual_fix_dev
```
The target name will determine how the model operates, calling a macro `partition_load_manual()` which takes the variables input in the command to set the range.
@ -115,12 +115,15 @@ To help with targeted refreshes, a number of tags have been applied to the model
| Tag | Description |
| --- | --- |
| load | Runs models that load data into Snowflake from S3. The 2 `load_X` models are staging tables for data, which is then parsed and transformed up until the final txs/receipts models. |
| load_shards | Just the `load` models that touch shards |
| load_blocks | Just the `load` models that touch blocks |
| receipt_map | Runs the receipt-mapping models that must use a partition. This set of models cannot simply run with incremental logic due to the recursive tree used to map receipt IDs to Tx Hashes. |
| actions | Just the 3 action events models, an important set of intermediary models before curated activity. Note: These are also tagged with `s3_curated`. |
| curated | Models that are used to generate the curated tables |
| core | All public views are tagged with core, regardless of schema. At this time, that includes `core` and `social`. |
Note: there are other tags that are currently not used. All legacy models are tagged with something that includes `rpc`, but all are presently disabled to avoid an accidental run.
Note: there are other tags that are currently not used. All legacy models are tagged with something that includes `rpc`, but all are presently disabled to avoid an accidental run.
You can visualize these tags by using the DAG Explorer in the [docs](https://flipsidecrypto.github.io/near-models/#!/overview?g_v=1&g_i=tag:load).
### Incremental Load Strategy
Because data must be run in batches to properly map receipts to a transaction hash, a conditional is added to curated models using jinja. This should be present on everything after the mapping process.

View File

@ -4,7 +4,7 @@
cluster_by = ['_partition_by_block_number', '_load_timestamp::DATE'],
unique_key = 'block_id',
full_refresh = False,
tags = ['load']
tags = ['load', 'load_blocks']
) }}
WITH blocks_json AS (
@ -17,8 +17,15 @@ WITH blocks_json AS (
_partition_by_block_number
FROM
{{ ref('bronze__streamline_blocks') }}
{% if target.name == 'manual_fix' or target.name == 'manual_fix_dev' %}
WHERE
{{ partition_load_manual('no_buffer') }}
{% else %}
WHERE
{{ partition_batch_load(150000) }}
{% endif %}
)
SELECT
*

View File

@ -4,7 +4,7 @@
cluster_by = ['_partition_by_block_number', '_load_timestamp::DATE'],
unique_key = 'shard_id',
full_refresh = False,
tags = ['load']
tags = ['load', 'load_shards']
) }}
WITH shards_json AS (
@ -23,8 +23,15 @@ WITH shards_json AS (
_partition_by_block_number
FROM
{{ ref('bronze__streamline_shards') }}
{% if target.name == 'manual_fix' or target.name == 'manual_fix_dev' %}
WHERE
{{ partition_load_manual('no_buffer') }}
{% else %}
WHERE
{{ partition_batch_load(150000) }}
{% endif %}
)
SELECT
*

View File

@ -3,7 +3,7 @@
incremental_strategy = 'delete+insert',
cluster_by = ['_partition_by_block_number', '_load_timestamp::DATE'],
unique_key = 'block_id',
tags = ['load']
tags = ['load', 'load_blocks']
) }}
WITH blocksjson AS (

View File

@ -3,7 +3,7 @@
incremental_strategy = 'merge',
unique_key = 'receipt_id',
cluster_by = ['_load_timestamp::date', 'block_id'],
tags = ['load']
tags = ['load', 'load_shards']
) }}
WITH chunks AS (

View File

@ -3,7 +3,7 @@
incremental_strategy = 'merge',
unique_key = 'chunk_hash',
cluster_by = ['_load_timestamp::date','height_created','height_included'],
tags = ['load']
tags = ['load', 'load_shards']
) }}
WITH shards AS (

View File

@ -3,7 +3,7 @@
incremental_strategy = 'merge',
unique_key = 'receipt_execution_outcome_id',
cluster_by = ['_load_timestamp::date','block_id','chunk_hash'],
tags = ['load']
tags = ['load', 'load_shards']
) }}
WITH shards AS (

View File

@ -3,7 +3,7 @@
incremental_strategy = 'merge',
unique_key = 'receipt_id',
cluster_by = ['_load_timestamp::date', 'block_id'],
tags = ['load']
tags = ['load', 'load_shards']
) }}
WITH receipt_execution_outcomes AS (

View File

@ -3,7 +3,7 @@
incremental_strategy = 'delete+insert',
cluster_by = ['_partition_by_block_number', '_load_timestamp::DATE'],
unique_key = ['shard_id'],
tags = ['load']
tags = ['load', 'load_shards']
) }}
WITH shardsjson AS (

View File

@ -3,7 +3,7 @@
incremental_strategy = 'merge',
unique_key = 'tx_hash',
cluster_by = ['_load_timestamp::date', 'block_id', 'tx_hash'],
tags = ['load']
tags = ['load', 'load_shards']
) }}
WITH chunks AS (