diff --git a/models/silver/core/silver__transactions.sql b/models/silver/core/silver__transactions.sql index 30ffaa0..ae5715c 100644 --- a/models/silver/core/silver__transactions.sql +++ b/models/silver/core/silver__transactions.sql @@ -38,12 +38,107 @@ WITH base_table AS ( {{ ref('bronze__streamline_FR_transactions') }} {% endif %} WHERE - tx_id IS NOT NULL qualify(ROW_NUMBER() over (PARTITION BY tx_id + tx_id IS NOT NULL + +{% if is_incremental() %} +AND _inserted_timestamp >= DATEADD( + MINUTE, + -30,( + SELECT + MAX( + _inserted_timestamp + ) + FROM + {{ this }} + ) +) +{% endif %} + +qualify(ROW_NUMBER() over (PARTITION BY tx_id ORDER BY _inserted_timestamp DESC)) = 1 +), +NEW AS ( + SELECT + b.block_id, + block_timestamp, + tx_id, + tx_succeeded, + tx_code, + codespace, + gas_used, + gas_wanted, + tx_type, + msgs, + tx_log :: STRING AS tx_log, + tx AS full_tx, + b._inserted_timestamp + FROM + base_table b + LEFT JOIN {{ ref('silver__blocks') }} + bb + ON b.block_id = bb.block_id {# {% if is_incremental() %} + WHERE + b._inserted_timestamp >= DATEADD( + MINUTE, + -30,( + SELECT + MAX( + _inserted_timestamp + ) + FROM + {{ this }} + ) + ) + {% endif %} + + #} +), +combo AS ( + SELECT + block_id, + block_timestamp, + tx_id, + tx_succeeded, + tx_code, + codespace, + gas_used, + gas_wanted, + tx_type, + msgs, + tx_log, + full_tx, + _inserted_timestamp, + SYSDATE() AS inserted_timestamp, + FROM + NEW + UNION ALL + SELECT + b.block_id, + bb.block_timestamp, + tx_id, + tx_succeeded, + tx_code, + codespace, + gas_used, + gas_wanted, + tx_type, + msgs, + tx_log, + full_tx, + b._inserted_timestamp, + b.inserted_timestamp + FROM + {{ this }} + b + JOIN {{ ref('silver__blocks') }} + bb + ON b.block_id = bb.block_id + WHERE + b.block_timestamp IS NULL ) SELECT - b.block_id, + block_id, block_timestamp, tx_id, tx_succeeded, @@ -53,36 +148,16 @@ SELECT gas_wanted, tx_type, msgs, - tx_log :: STRING AS tx_log, - tx AS full_tx, + tx_log, + full_tx, {{ dbt_utils.generate_surrogate_key( ['tx_id'] ) }} AS transactions_id, - SYSDATE() AS inserted_timestamp, + inserted_timestamp, SYSDATE() AS modified_timestamp, - b._inserted_timestamp, + _inserted_timestamp, '{{ invocation_id }}' AS _invocation_id FROM - base_table b - JOIN {{ ref('silver__blocks') }} - bb - ON b.block_id = bb.block_id - -{% if is_incremental() %} -WHERE - b._inserted_timestamp >= DATEADD( - MINUTE, - -30,( - SELECT - MAX( - _inserted_timestamp - ) - FROM - {{ this }} - ) - ) -{% endif %} - -qualify(ROW_NUMBER() over (PARTITION BY tx_id + combo qualify(ROW_NUMBER() over (PARTITION BY tx_id, block_timestamp ORDER BY - tx_succeeded DESC, b._inserted_timestamp DESC)) = 1 + tx_succeeded DESC, _inserted_timestamp DESC)) = 1