near-models/models/silver/core/silver__receipts_final.sql
2025-03-14 16:24:32 -06:00

196 lines
5.7 KiB
SQL

{{ config(
materialized = 'incremental',
incremental_predicates = ["dynamic_range_predicate_custom","block_timestamp::date"],
incremental_strategy = 'merge',
merge_exclude_columns = ['inserted_timestamp'],
unique_key = 'receipt_id',
cluster_by = ['block_timestamp::DATE','modified_timestamp::DATE'],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(tx_hash,receipt_id,receiver_id,predecessor_id);",
tags = ['scheduled_core', 'core_v2'],
full_refresh = false
) }}
{% if var('NEAR_MIGRATE_ARCHIVE', False) %}
{% if execute %}
{% do log('Migrating receipts ' ~ var('RANGE_START') ~ ' to ' ~ var('RANGE_END'), info=True) %}
{% do log('Invocation ID: ' ~ invocation_id, info=True) %}
{% endif %}
SELECT
chunk_hash,
block_id,
block_timestamp,
tx_hash,
receipt_id,
predecessor_id,
receiver_id,
receipt_json,
outcome_json,
tx_succeeded,
receipt_succeeded,
_partition_by_block_number,
receipts_final_id,
inserted_timestamp,
modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
{{ ref('_migrate_receipts') }}
{% else %}
{% if execute and not var("MANUAL_FIX") %}
{% if is_incremental() %}
{% set max_mod_query %}
SELECT
COALESCE(MAX(modified_timestamp), '1970-01-01') modified_timestamp
FROM
{{ this }}
{% endset %}
{% set max_mod = run_query(max_mod_query) [0] [0] %}
{% do log('max_mod: ' ~ max_mod, info=True) %}
{% set min_block_date_query %}
SELECT
MIN(origin_block_timestamp :: DATE) block_timestamp
FROM
{{ ref('silver__transactions_v2') }}
WHERE
modified_timestamp >= '{{max_mod}}'
{% endset %}
{% set min_bd = run_query(min_block_date_query) [0] [0] %}
{% do log('min_bd: ' ~ min_bd, info=True) %}
{% if not min_bd or min_bd == 'None' %}
{% set min_bd = '2099-01-01' %}
{% do log('min_bd: ' ~ min_bd, info=True) %}
{% endif %}
{% do log('min_block_date: ' ~ min_bd, info=True) %}
{% endif %}
{% endif %}
WITH txs_with_receipts AS (
SELECT
chunk_hash,
origin_block_id,
origin_block_timestamp,
tx_hash,
response_json :receipts :: ARRAY AS receipts_json,
response_json :receipts_outcome :: ARRAY AS receipts_outcome_json,
response_json :status :Failure IS NULL AS tx_succeeded,
partition_key AS _partition_by_block_number,
modified_timestamp
FROM
{{ ref('silver__transactions_v2') }}
{% if var("MANUAL_FIX") %}
WHERE
{{ partition_load_manual('no_buffer', 'partition_key') }}
{% else %}
{% if is_incremental() %}
WHERE origin_block_timestamp :: DATE >= '{{min_bd}}'
{% endif %}
{% endif %}
),
blocks AS (
SELECT
block_id,
block_hash,
block_timestamp,
modified_timestamp
FROM
{{ ref('silver__blocks_v2') }}
{% if var("MANUAL_FIX") %}
WHERE
{{ partition_load_manual('no_buffer', 'partition_key') }}
{% else %}
{% if is_incremental() %}
WHERE block_timestamp :: DATE >= '{{min_bd}}' :: DATE
{% endif %}
{% endif %}
),
flatten_receipts AS (
SELECT
origin_block_timestamp,
chunk_hash,
tx_hash,
tx_succeeded,
VALUE :receipt_id :: STRING AS receipt_id,
VALUE :: variant AS receipt_json,
_partition_by_block_number,
modified_timestamp
FROM
txs_with_receipts,
LATERAL FLATTEN(
input => receipts_json
)
),
flatten_receipt_outcomes AS (
SELECT
VALUE :block_hash :: STRING AS block_hash,
tx_hash,
VALUE :id :: STRING AS receipt_id,
VALUE :: variant AS outcome_json
FROM
txs_with_receipts,
LATERAL FLATTEN(
input => receipts_outcome_json
)
),
receipts_full AS (
SELECT
chunk_hash,
ro.block_hash,
block_id,
block_timestamp,
r.tx_hash,
r.receipt_id,
receipt_json,
outcome_json,
tx_succeeded,
_partition_by_block_number
FROM
flatten_receipts r
LEFT JOIN flatten_receipt_outcomes ro
ON r.receipt_id = ro.receipt_id
LEFT JOIN blocks b
ON ro.block_hash = b.block_hash
{% if is_incremental() and not var("MANUAL_FIX") %}
WHERE
GREATEST(
COALESCE(r.modified_timestamp, '1970-01-01'),
COALESCE(b.modified_timestamp, '1970-01-01')
) >= '{{max_mod}}'
{% endif %}
),
FINAL AS (
SELECT
chunk_hash,
block_id,
block_timestamp,
tx_hash,
receipt_id,
receipt_json :predecessor_id :: STRING AS predecessor_id,
receipt_json :receiver_id :: STRING AS receiver_id,
receipt_json,
outcome_json,
tx_succeeded,
outcome_json :outcome :status :Failure IS NULL AS receipt_succeeded,
_partition_by_block_number
FROM
receipts_full
)
SELECT
*,
{{ dbt_utils.generate_surrogate_key(
['receipt_id']
) }} AS receipts_final_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
FINAL
qualify(row_number() over (partition by receipt_id order by block_id is not null desc, modified_timestamp desc) = 1)
{% endif %}