starting to add IS_MIGRATION: txs, receipts_final, txs_final

This commit is contained in:
forgash_ 2024-02-05 22:41:19 -07:00
parent 8fc10e95bd
commit 71b41749a7
4 changed files with 45 additions and 16 deletions

View File

@ -62,6 +62,7 @@ vars:
OBSERV_FULL_TEST: False
DBT_FULL_TEST: False
STREAMLINE_LOAD_LOOKBACK_HOURS: 3
IS_MIGRATION: False
dispatch:
- macro_namespace: dbt

View File

@ -3,10 +3,11 @@
incremental_strategy = 'merge',
merge_exclude_columns = ['inserted_timestamp'],
unique_key = 'receipt_object_id',
cluster_by = ['_inserted_timestamp::date', '_partition_by_block_number'],
cluster_by = ['_inserted_timestamp::date', '_modified_timestamp::DATE', '_partition_by_block_number'],
tags = ['receipt_map'],
full_refresh = False
) }}
{# TODO - check clustering. Add SO? #}
WITH retry_range AS (
@ -14,16 +15,28 @@ WITH retry_range AS (
receipt_object_id,
block_id,
_partition_by_block_number,
_inserted_timestamp,
_modified_timestamp
_inserted_timestamp
{% if not var('IS_MIGRATION') %}
, _modified_timestamp
{% endif %}
FROM
{{ this }}
{% if var('IS_MIGRATION') %}
WHERE
_inserted_timestamp >= SYSDATE() - INTERVAL '1 day'
AND (
tx_hash IS NULL
OR block_timestamp IS NULL
)
{% else %}
WHERE
_modified_timestamp >= SYSDATE() - INTERVAL '1 day'
AND (
tx_hash IS NULL
OR block_timestamp IS NULL
)
{% endif %}
),
base_receipts AS (
SELECT

View File

@ -19,8 +19,14 @@ WITH chunks AS (
FROM
{{ ref('silver__streamline_shards') }}
WHERE
{{ incremental_load_filter('_inserted_timestamp') }}
AND chunk != 'null'
chunk != 'null'
{% if var('IS_MIGRATION') %}
AND
{{ incremental_load_filter('_inserted_timestamp') }}
{% else %}
AND
{{ incremental_load_filter('_modified_timestamp') }}
{% endif %}
),
flatten_transactions AS (
SELECT
@ -86,10 +92,15 @@ FINAL AS (
_inserted_timestamp,
_modified_timestamp
FROM
txs qualify ROW_NUMBER() over (
txs
qualify ROW_NUMBER() over (
PARTITION BY tx_hash
ORDER BY
{% if var('IS_MIGRATION') %}
_inserted_timestamp DESC
{% else %}
_modified_timestamp DESC
{% endif %}
) = 1
)
SELECT

View File

@ -2,10 +2,12 @@
materialized = 'incremental',
unique_key = 'tx_hash',
incremental_strategy = 'delete+insert',
cluster_by = ['_inserted_timestamp::date', 'block_timestamp::date'],
cluster_by = ['_inserted_timestamp::date', '_modified_timestamp::DATE', '_partition_by_block_number'],
tags = ['receipt_map']
) }}
{# TODO - upd * to col selection #}
{# TODO - add _modified_timestamp column #}
{# TODO - check clustering. Add SO? #}
WITH int_txs AS (
SELECT
@ -13,16 +15,16 @@ WITH int_txs AS (
FROM
{{ ref('silver__streamline_transactions') }}
{% if var("MANUAL_FIX") %}
WHERE
{{ partition_load_manual('no_buffer') }}
{% else %}
{% if var('IS_MIGRATION') %}
WHERE
{{ partition_incremental_load(
150000,
10000,
0
) }}
{% else %}
WHERE
{{ incremental_load_filter('_modified_timestamp') }}
{% endif %}
),
int_receipts AS (
@ -31,23 +33,25 @@ int_receipts AS (
FROM
{{ ref('silver__streamline_receipts_final') }}
{% if var("MANUAL_FIX") %}
WHERE
{{ partition_load_manual('end') }}
{% else %}
{% if var('IS_MIGRATION') %}
WHERE
{{ partition_incremental_load(
150000,
10000,
0
) }}
{% else %}
WHERE
{{ incremental_load_filter('_modified_timestamp') }}
{% endif %}
),
int_blocks AS (
SELECT
*
FROM
{{ ref('silver__streamline_blocks') }}
{# TODO add WHERE #}
),
receipt_array AS (
SELECT