receipt map - add retry range and lookback

This commit is contained in:
Jack Forgash 2024-02-06 13:04:32 -07:00
parent e08975a5e3
commit a0f26a1597
15 changed files with 150 additions and 124 deletions

View File

@ -62,6 +62,7 @@ vars:
OBSERV_FULL_TEST: False
DBT_FULL_TEST: False
STREAMLINE_LOAD_LOOKBACK_HOURS: 3
RECEIPT_MAP_LOOKBACK_HOURS: 6
IS_MIGRATION: False
dispatch:
@ -74,4 +75,3 @@ dispatch:
query-comment:
comment: "{{ dbt_snowflake_query_tags.get_query_comment(node) }}"
append: true # Snowflake removes prefixed comments.

View File

@ -0,0 +1,6 @@
{% docs _modified_timestamp %}
The timestamp at which the underlying record was last modified by an internal process.
This is used for incrementally loading based on when the source data was last modified.
{% enddocs %}

View File

@ -0,0 +1,21 @@
{{ config(
materialized = 'ephemeral',
tags = ['helper', 'receipt_map']
) }}
SELECT
receipt_object_id,
block_id,
_partition_by_block_number,
_inserted_timestamp
{% if not var('IS_MIGRATION') %}
, _modified_timestamp
{% endif %}
FROM
{{ target.database }}.silver.streamline_receipts_final
WHERE
{{ "_inserted_timestamp" if var('IS_MIGRATION') else "_modified_timestamp" }} >= SYSDATE() - INTERVAL '3 days'
AND (
tx_hash IS NULL
OR block_timestamp IS NULL
)

View File

@ -26,16 +26,10 @@ WITH receipts AS (
WHERE
_partition_by_block_number >= (
SELECT
MAX(_partition_by_block_number)
MIN(_partition_by_block_number) - (3000 * {{ var('RECEIPT_MAP_LOOKBACK_HOURS') }})
FROM
{{ target.database }}.silver.streamline_receipts_final
) - 20000
AND _partition_by_block_number <= (
SELECT
MAX(_partition_by_block_number)
FROM
{{ target.database }}.silver.streamline_receipts_final
) + 220000
{{ ref('_retry_range')}}
)
{% endif %}
)
SELECT

View File

@ -4,7 +4,8 @@
tags = ['helper', 'receipt_map']
) }}
WITH recursive ancestrytree AS (
WITH
recursive ancestrytree AS (
SELECT
item,
@ -25,7 +26,9 @@ WITH recursive ancestrytree AS (
),
txs AS (
SELECT
*
tx_hash,
outcome_receipts,
_partition_by_block_number
FROM
{{ ref('silver__streamline_transactions') }}
@ -36,16 +39,10 @@ txs AS (
WHERE
_partition_by_block_number >= (
SELECT
MAX(_partition_by_block_number)
MIN(_partition_by_block_number) - (3000 * {{ var('RECEIPT_MAP_LOOKBACK_HOURS') }})
FROM
{{ target.database }}.silver.streamline_receipts_final
) - 20000
AND _partition_by_block_number <= (
SELECT
MAX(_partition_by_block_number)
FROM
{{ target.database }}.silver.streamline_receipts_final
) + 220000
{{ ref('_retry_range')}}
)
{% endif %}
),
FINAL AS (

View File

@ -57,16 +57,8 @@ models:
- name: _partition_by_block_number
description: "{{ doc('_partition_by_block_number')}}"
- name: _load_timestamp
description: "{{ doc('_load_timestamp')}}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_NTZ
- dbt_expectations.expect_row_values_to_have_recent_data:
datepart: day
interval: 1
- name: _MODIFIED_TIMESTAMP
description: "{{ doc('_modified_timestamp')}}"
- name: _INSERTED_TIMESTAMP
description: "{{ doc('_inserted_timestamp')}}"

View File

@ -79,8 +79,8 @@ models:
tests:
- not_null
- name: _LOAD_TIMESTAMP
description: "{{ doc('_load_timestamp')}}"
- name: _MODIFIED_TIMESTAMP
description: "{{ doc('_modified_timestamp')}}"
- name: _PARTITION_BY_BLOCK_NUMBER
description: "{{ doc('_partition_by_block_number')}}"

View File

@ -7,36 +7,18 @@
tags = ['receipt_map'],
full_refresh = False
) }}
{# TODO - check clustering. Add SO? #}
{#
TODO - check clustering. Gold view on this table, add block_ts?
Add SO? Probably
#}
WITH retry_range AS (
SELECT
receipt_object_id,
block_id,
_partition_by_block_number,
_inserted_timestamp
{% if not var('IS_MIGRATION') %}
, _modified_timestamp
{% endif %}
*
FROM
{{ this }}
{{ ref('_retry_range')}}
{% if var('IS_MIGRATION') %}
WHERE
_inserted_timestamp >= SYSDATE() - INTERVAL '1 day'
AND (
tx_hash IS NULL
OR block_timestamp IS NULL
)
{% else %}
WHERE
_modified_timestamp >= SYSDATE() - INTERVAL '1 day'
AND (
tx_hash IS NULL
OR block_timestamp IS NULL
)
{% endif %}
),
base_receipts AS (
SELECT
@ -60,23 +42,36 @@ base_receipts AS (
_inserted_timestamp,
modified_timestamp AS _modified_timestamp
FROM
{{ ref('silver__streamline_receipts') }}
WHERE
_partition_by_block_number >= (
SELECT
MIN(_partition_by_block_number)
FROM
retry_range
)
AND (
{{ incremental_load_filter('_inserted_timestamp') }}
OR receipt_id IN (
{{ ref('silver__streamline_receipts') }} r
{% if var('MANUAL_FIX') %}
WHERE
{{ partition_load_manual('no_buffer') }}
{% else %}
WHERE
_partition_by_block_number >= (
SELECT
receipt_object_id
MIN(_partition_by_block_number) - (3000 * {{ var('RECEIPT_MAP_LOOKBACK_HOURS') }})
FROM
retry_range
)
)
AND (
{% if var('IS_MIGRATION') %}
{{ incremental_load_filter('_inserted_timestamp') }}
{% else %}
{{ incremental_load_filter('_modified_timestamp') }}
{% endif %}
OR receipt_id IN (
SELECT
DISTINCT receipt_object_id
FROM
retry_range
)
)
{% endif %}
),
blocks AS (
SELECT
@ -84,15 +79,24 @@ blocks AS (
block_timestamp,
_partition_by_block_number,
_inserted_timestamp
{% if not var('IS_MIGRATION') %}
, modified_timestamp AS _modified_timestamp
{% endif %}
FROM
{{ ref('silver__streamline_blocks') }}
WHERE
_partition_by_block_number >= (
SELECT
MIN(_partition_by_block_number)
FROM
retry_range
)
{% if var('MANUAL_FIX') %}
WHERE
{{ partition_load_manual('no_buffer') }}
{% else %}
WHERE
_partition_by_block_number >= (
SELECT
MIN(_partition_by_block_number) - (3000 * {{ var('RECEIPT_MAP_LOOKBACK_HOURS') }})
FROM
retry_range
)
{% endif %}
),
append_tx_hash AS (
SELECT

View File

@ -84,13 +84,8 @@ models:
- name: ERROR_MESSAGE
description: "{{ doc('error_message')}}"
- name: _LOAD_TIMESTAMP
description: "{{ doc('_load_timestamp')}}"
tests:
- not_null
- dbt_expectations.expect_row_values_to_have_recent_data:
datepart: day
interval: 1
- name: _MODIFIED_TIMESTAMP
description: "{{ doc('_modified_timestamp')}}"
- name: _PARTITION_BY_BLOCK_NUMBER
description: "{{ doc('_partition_by_block_number')}}"

View File

@ -53,16 +53,8 @@ models:
- name: _PARTITION_BY_BLOCK_NUMBER
description: "{{ doc('_partition_by_block_number')}}"
- name: _load_timestamp
description: "{{ doc('_load_timestamp')}}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_NTZ
- dbt_expectations.expect_row_values_to_have_recent_data:
datepart: day
interval: 1
- name: _MODIFIED_TIMESTAMP
description: "{{ doc('_modified_timestamp')}}"
- name: _INSERTED_TIMESTAMP
description: "{{ doc('_inserted_timestamp')}}"

View File

@ -31,10 +31,8 @@ models:
tests:
- not_null
- name: _LOAD_TIMESTAMP
description: "{{ doc('_load_timestamp')}}"
tests:
- not_null
- name: _MODIFIED_TIMESTAMP
description: "{{ doc('_modified_timestamp')}}"
- name: CHUNK_HASH
description: "{{ doc('chunk_hash')}}"

View File

@ -1,17 +1,36 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'merge',
merge_exclude_columns = ['inserted_timestamp'],
unique_key = 'tx_hash',
incremental_strategy = 'delete+insert',
cluster_by = ['_inserted_timestamp::date', '_modified_timestamp::DATE', '_partition_by_block_number'],
tags = ['receipt_map']
) }}
{# TODO - upd * to col selection #}
{# TODO - add _modified_timestamp column #}
{# TODO - check clustering. Add SO? #}
{# TODO - clean up the model and joins #}
WITH int_txs AS (
SELECT
*
block_id,
tx_hash,
shard_id,
transactions_index,
chunk_hash,
outcome_receipts,
_actions,
_hash,
_nonce,
_outcome,
_public_key,
_receiver_id,
_signature,
_signer_id,
_partition_by_block_number,
_inserted_timestamp,
modified_timestamp AS _modified_timestamp
FROM
{{ ref('silver__streamline_transactions') }}
@ -29,7 +48,15 @@ WITH int_txs AS (
),
int_receipts AS (
SELECT
*
block_id,
block_timestamp,
tx_hash,
execution_outcome,
receipt_succeeded,
gas_burnt,
_partition_by_block_number,
_inserted_timestamp,
modified_timestamp AS _modified_timestamp
FROM
{{ ref('silver__streamline_receipts_final') }}
@ -48,7 +75,12 @@ int_receipts AS (
),
int_blocks AS (
SELECT
*
block_id,
block_hash,
block_timestamp,
_partition_by_block_number,
_inserted_timestamp,
modified_timestamp AS _modified_timestamp
FROM
{{ ref('silver__streamline_blocks') }}
{# TODO add WHERE #}
@ -95,7 +127,6 @@ base_transactions AS (
'signer_id',
_signer_id
) AS tx,
_load_timestamp,
_partition_by_block_number,
t._inserted_timestamp
FROM
@ -108,7 +139,7 @@ actions AS (
SELECT
tx_hash,
SUM(
VALUE :FunctionCall :gas
VALUE :FunctionCall :gas :: NUMBER
) AS attached_gas
FROM
base_transactions,
@ -131,7 +162,6 @@ transactions AS (
tx,
tx :outcome :outcome :gas_burnt :: NUMBER AS transaction_gas_burnt,
tx :outcome :outcome :tokens_burnt :: NUMBER AS transaction_tokens_burnt,
_load_timestamp,
_partition_by_block_number,
_inserted_timestamp
FROM
@ -141,7 +171,7 @@ receipts AS (
SELECT
block_id,
tx_hash,
receipt_succeeded AS success_or_fail,
receipt_succeeded,
SUM(
gas_burnt
) over (
@ -160,7 +190,7 @@ receipts AS (
FROM
int_receipts
WHERE
tokens_burnt != '0'
tokens_burnt != 0 -- TODO is this a str? cast to INT
),
FINAL AS (
SELECT
@ -175,7 +205,6 @@ FINAL AS (
t.tx,
t.transaction_gas_burnt + r.receipt_gas_burnt AS gas_used,
t.transaction_tokens_burnt + r.receipt_tokens_burnt AS transaction_fee,
_load_timestamp,
_partition_by_block_number,
_inserted_timestamp,
COALESCE(
@ -183,7 +212,7 @@ FINAL AS (
gas_used
) AS attached_gas,
LAST_VALUE(
r.success_or_fail
r.receipt_succeeded
) over (
PARTITION BY r.tx_hash
ORDER BY
@ -213,12 +242,12 @@ SELECT
tx,
gas_used,
transaction_fee,
_load_timestamp,
_partition_by_block_number,
attached_gas,
tx_succeeded,
tx_status,
_inserted_timestamp,
{# TODO add _modified_timestamp #}
{{ dbt_utils.generate_surrogate_key(
['tx_hash']
) }} AS streamline_transactions_final_id,

View File

@ -27,14 +27,6 @@ models:
- not_null:
where: _inserted_timestamp <= current_timestamp - interval '1 hour'
- name: _LOAD_TIMESTAMP
description: "{{ doc('_load_timestamp')}}"
tests:
- not_null
- dbt_expectations.expect_row_values_to_have_recent_data:
datepart: day
interval: 1
- name: _PARTITION_BY_BLOCK_NUMBER
description: "{{ doc('_partition_by_block_number')}}"
@ -86,6 +78,9 @@ models:
- not_null:
where: _inserted_timestamp <= current_timestamp - interval '1 hour'
- name: _MODIFIED_TIMESTAMP
description: "{{ doc('_modified_timestamp')}}"
- name: _INSERTED_TIMESTAMP
description: "{{ doc('_inserted_timestamp')}}"

View File

@ -1,6 +1,6 @@
{{ config(
error_if = '>=25',
warn_if = 'BETWEEN 1 AND 24'
error_if = '>=10',
warn_if = 'BETWEEN 1 AND 9'
) }}
WITH blocks AS (
@ -73,6 +73,9 @@ FROM
comp
WHERE
chunk_ct_expected > 0
AND is_missing
AND is_missing
{# Filter out false positive from blocks at start of window #}
AND _inserted_timestamp_blocks > SYSDATE() - INTERVAL '7 days' + INTERVAL '1 hour'
AND _inserted_timestamp_shards > SYSDATE() - INTERVAL '7 days' + INTERVAL '1 hour'
ORDER BY
1

View File

@ -38,4 +38,4 @@ FROM
WHERE
prior_hash <> prev_hash
{# Filter out false positive from blocks at start of window (whose parent hash was cut off) #}
AND (_inserted_timestamp > SYSDATE() - INTERVAL '7 days' + INTERVAL '1 hour')
AND (_inserted_timestamp > SYSDATE() - INTERVAL '7 days' + INTERVAL '1 hour')