mirror of
https://github.com/FlipsideCrypto/near-models.git
synced 2026-02-06 14:26:52 +00:00
update incremental logic
This commit is contained in:
parent
df9176137b
commit
91f1bdb572
29
README.md
29
README.md
@ -159,3 +159,32 @@ When creating a PR please include the following details in the PR description:
|
||||
* Learn more about dbt [in the docs](https://docs.getdbt.com/docs/introduction)
|
||||
* Check out [Discourse](https://discourse.getdbt.com/) for commonly asked questions and answers
|
||||
* Check out [the blog](https://blog.getdbt.com/) for the latest news on dbt's development and best practices
|
||||
|
||||
## Fixing Data Issues
|
||||
|
||||
### Manual Batch Refresh
|
||||
|
||||
If data needs to be re-run for some reason, partitions of data can be re-reun through the models by utilizing the column `_partition_by_block_number` and the dbt profile name.
|
||||
|
||||
Any data refresh will need to be done in a batch due to the nature of the receipt x tx hash mapping. The view `silver__receipt_tx_hash_mapping` is a recursive AncestryTree that follows linked receipt outcome ids to map all receipts generated in a transaction back to the primary hash. Receipts can be generated many blocks after the transaction occurs, so a generous buffer is required to ensure all receipts are captured.
|
||||
|
||||
Models in the `streamline` folder can be run with standard incremental logic up until the 2 final receipt and transaction tables. The next step, mapping receipts to tx hash over a range, can be run with the following command:
|
||||
|
||||
```
|
||||
dbt run -s tag:s3_manual --vars '{"range_start": 82700000, "range_end": 82750000, "front_buffer": 1, "end_buffer": 1}' -t manual_fix_dev
|
||||
```
|
||||
|
||||
The target name will determine how the model operates, calling a macro `partition_load_manual()` which takes the variables input in the command to set the range.
|
||||
|
||||
`front_buffer` and `end_buffer` are set to 1 by default and indicate how many partitions (10k blocks) should be added on to the front or end of the range.
|
||||
- Flatten receipts is set to look back 1 partition to grab receipts that may have occurred prior to the start of the range.
|
||||
- Receipt tx hash mapping is set to look back 1 partition to grab receipts that may have occurred prior to the start of the range.
|
||||
- Receipts final does not buffer the range to only map receipts that occurred within the range to a tx hash (but the lookback is necessary in case the tree is truncated by the partition break).
|
||||
- Transactions final does not add a buffer when grabbing transactions from the int txs model, but does add an end buffer when selecting from receipts final to include mapped receipts that may have occurred after the end of the range.
|
||||
|
||||
A range is necessary for the mapping view as it consumes a significant amount of memory and will otherwise run out.
|
||||
|
||||
Actions and curated models include the conditional based on target name so the tags `s3_actions` and `s3_curated` can be included to re-run the fixed data in downstream silver models.
|
||||
- if missing data is loaded in new, this is not necessary as `_load_timestamp` will be set to when the data hits snowflake and will flow through the standard incremental logic in the curated models.
|
||||
|
||||
Note: certain views, like `core__fact_blocks` are not tagged, so running all views in with `-s models/core/` is recommended after changes are made.
|
||||
|
||||
@ -7,7 +7,7 @@
|
||||
tags = ['s3_load']
|
||||
) }}
|
||||
|
||||
WITH blocksjson AS (
|
||||
WITH blocks_json AS (
|
||||
|
||||
SELECT
|
||||
block_id,
|
||||
@ -23,4 +23,4 @@ WITH blocksjson AS (
|
||||
SELECT
|
||||
*
|
||||
FROM
|
||||
blocksjson
|
||||
blocks_json
|
||||
|
||||
@ -7,17 +7,7 @@
|
||||
tags = ['s3_load']
|
||||
) }}
|
||||
|
||||
{% set target_partition = 82200000 %}
|
||||
-- 82200000
|
||||
-- 82680000
|
||||
|
||||
-- split into like 50k at a time
|
||||
|
||||
{% set target_partition_low = 82240000 %}
|
||||
{% set target_partition_high = 82680000 %}
|
||||
|
||||
|
||||
WITH shardsjson AS (
|
||||
WITH shards_json AS (
|
||||
|
||||
SELECT
|
||||
block_id,
|
||||
@ -33,13 +23,10 @@ WITH shardsjson AS (
|
||||
_partition_by_block_number
|
||||
FROM
|
||||
{{ ref('bronze__streamline_shards') }}
|
||||
where _partition_by_block_number between {{ target_partition_low }} and {{ target_partition_high }}
|
||||
and block_id in (
|
||||
select value from NEAR_DEV.tests.chunk_gaps, lateral flatten (blocks_to_walk)
|
||||
where _partition_by_block_number between {{ target_partition_low }} and {{ target_partition_high }}
|
||||
)
|
||||
WHERE
|
||||
{{ partition_batch_load(150000) }}
|
||||
)
|
||||
SELECT
|
||||
*
|
||||
FROM
|
||||
shardsjson
|
||||
shards_json
|
||||
|
||||
@ -3,7 +3,7 @@
|
||||
incremental_strategy = 'delete+insert',
|
||||
cluster_by = ['_partition_by_block_number', '_load_timestamp::DATE'],
|
||||
unique_key = 'block_id',
|
||||
tags = ['s3_load', 's3']
|
||||
tags = ['s3_load']
|
||||
) }}
|
||||
|
||||
WITH blocksjson AS (
|
||||
|
||||
@ -13,11 +13,7 @@ WITH shards AS (
|
||||
FROM
|
||||
{{ ref('silver__streamline_shards') }}
|
||||
WHERE
|
||||
(
|
||||
{{ partition_batch_load(150000) }}
|
||||
OR
|
||||
{{ incremental_load_filter('_load_timestamp') }}
|
||||
)
|
||||
{{ incremental_load_filter('_load_timestamp') }}
|
||||
AND chunk != 'null'
|
||||
),
|
||||
FINAL AS (
|
||||
|
||||
@ -12,14 +12,9 @@ WITH shards AS (
|
||||
*
|
||||
FROM
|
||||
{{ ref('silver__streamline_shards') }}
|
||||
WHERE
|
||||
WHERE
|
||||
ARRAY_SIZE(receipt_execution_outcomes) > 0
|
||||
AND
|
||||
(
|
||||
{{ partition_batch_load(150000) }}
|
||||
OR
|
||||
{{ incremental_load_filter('_load_timestamp') }}
|
||||
)
|
||||
AND {{ incremental_load_filter('_load_timestamp') }}
|
||||
),
|
||||
FINAL AS (
|
||||
SELECT
|
||||
|
||||
@ -3,7 +3,7 @@
|
||||
incremental_strategy = 'delete+insert',
|
||||
unique_key = 'receipt_object_id',
|
||||
cluster_by = ['_load_timestamp::date', 'block_id'],
|
||||
tags = ['s3', 's3_final', 's3_manual']
|
||||
tags = ['s3_final', 's3_manual']
|
||||
) }}
|
||||
|
||||
WITH base_receipts AS (
|
||||
|
||||
@ -3,7 +3,7 @@
|
||||
unique_key = 'tx_hash',
|
||||
incremental_strategy = 'delete+insert',
|
||||
cluster_by = ['_load_timestamp::date', 'block_timestamp::date'],
|
||||
tags = ['s3', 's3_final', 's3_manual']
|
||||
tags = ['s3_final', 's3_manual']
|
||||
) }}
|
||||
|
||||
WITH int_txs AS (
|
||||
|
||||
@ -1,4 +1,9 @@
|
||||
{{ config(
|
||||
severity = 'error'
|
||||
) }}
|
||||
|
||||
WITH block_chunks_included AS (
|
||||
|
||||
SELECT
|
||||
block_id,
|
||||
VALUE :header :chunks_included AS chunks_included,
|
||||
|
||||
Loading…
Reference in New Issue
Block a user