From b698c5865d06c8a464427a0222e381f02e487ed8 Mon Sep 17 00:00:00 2001 From: Jack Forgash <58153492+forgxyz@users.noreply.github.com> Date: Wed, 19 Feb 2025 15:43:11 -0700 Subject: [PATCH] migrate txs logic --- .../lake/silver__streamline_transactions.sql | 2 +- models/silver/core/migration/_migrate_txs.sql | 56 +++++ models/silver/core/silver__receipts_final.sql | 2 +- .../core/silver__transactions_final.sql | 238 +++++++++--------- .../silver/core/silver__transactions_v2.sql | 4 + 5 files changed, 185 insertions(+), 117 deletions(-) create mode 100644 models/silver/core/migration/_migrate_txs.sql diff --git a/models/silver/core/lake/silver__streamline_transactions.sql b/models/silver/core/lake/silver__streamline_transactions.sql index a4502d7..51cac91 100644 --- a/models/silver/core/lake/silver__streamline_transactions.sql +++ b/models/silver/core/lake/silver__streamline_transactions.sql @@ -5,7 +5,7 @@ merge_exclude_columns = ['inserted_timestamp'], unique_key = 'tx_hash', cluster_by = ['modified_timestamp::date', '_partition_by_block_number'], - tags = ['load', 'load_shards','scheduled_core'] + tags = ['load', 'load_shards', 'scheduled_core', 'deprecated_lake_archive'] ) }} WITH chunks AS ( diff --git a/models/silver/core/migration/_migrate_txs.sql b/models/silver/core/migration/_migrate_txs.sql new file mode 100644 index 0000000..f203ca5 --- /dev/null +++ b/models/silver/core/migration/_migrate_txs.sql @@ -0,0 +1,56 @@ +{{ config( + materialized = 'ephemeral' +) }} +-- likely need to add batch logic for the migration +WITH lake_transactions_final AS ( + + SELECT + block_id, + block_timestamp, + tx_hash, + block_hash, + tx_succeeded, + gas_used, + transaction_fee, + attached_gas, + _partition_by_block_number + FROM + {{ ref('silver__streamline_transactions_final') }} +), +lake_transactions_int AS ( + SELECT + tx_hash, + block_id, + shard_number, + chunk_hash, + tx :transaction :: variant AS transaction_json, + tx :outcome :execution_outcome :: variant AS outcome_json, + _partition_by_block_number + FROM + {{ ref('silver__streamline_transactions') }} +), +transaction_archive AS ( + SELECT + i.chunk_hash, + i.shard_number AS shard_id, + f.block_hash, + f.block_id, + f.block_timestamp, + f.tx_hash, + i.transaction_json, + i.outcome_json, + f.tx_succeeded, + f.gas_used, + f.transaction_fee, + f.attached_gas, + f._partition_by_block_number + FROM + lake_transactions_final f + LEFT JOIN lake_transactions_int i + ON f.tx_hash = i.tx_hash + AND f._partition_by_block_number = i._partition_by_block_number +) +SELECT + * +FROM + transaction_archive diff --git a/models/silver/core/silver__receipts_final.sql b/models/silver/core/silver__receipts_final.sql index e8c601a..834ff2e 100644 --- a/models/silver/core/silver__receipts_final.sql +++ b/models/silver/core/silver__receipts_final.sql @@ -11,7 +11,7 @@ ) }} -- TODO if execute block for incremental min blockdate WITH -{% if var('NEAR_MIGRATE_RECEIPTS', False) %} +{% if var('NEAR_MIGRATE_ARCHIVE', False) %} lake_receipts_final as ( select * from {{ ref('silver__streamline_receipts_final') }} -- TODO incrementally load? diff --git a/models/silver/core/silver__transactions_final.sql b/models/silver/core/silver__transactions_final.sql index a7eeb65..e04da27 100644 --- a/models/silver/core/silver__transactions_final.sql +++ b/models/silver/core/silver__transactions_final.sql @@ -9,147 +9,155 @@ tags = ['scheduled_core'] ) }} -WITH -txs_with_receipts as ( -select * from {{ ref('silver__streamline_transactions_final') }} --- TODO incrementally load +{% if var('NEAR_MIGRATE_ARCHIVE', False) %} + + SELECT + chunk_hash, + shard_id, + block_id, + block_timestamp, + tx_hash, + transaction_json :actions :: ARRAY AS actions, + transaction_json :nonce :: INT AS nonce, + transaction_json :priority_fee :: INT AS priority_fee, + transaction_json :public_key :: STRING AS public_key, + transaction_json :receiver_id :: STRING AS receiver_id, + transaction_json :signature :: STRING AS signature, + transaction_json :signer_id :: STRING AS signer_id, + outcome_json, + OBJECT_CONSTRUCT() AS status_json, + tx_succeeded, + gas_used, + transaction_fee, + attached_gas, + _partition_by_block_number, + {{ dbt_utils.generate_surrogate_key( + ['tx_hash'] + ) }} AS transactions_final_id, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp, + '{{ invocation_id }}' AS _invocation_id + FROM + {{ ref('_migrate_txs') }} + -- TODO batch logic if needed + + {% else %} + +WITH txs_with_receipts AS ( + SELECT + chunk_hash, + shard_id, + origin_block_id AS block_id, + origin_block_timestamp AS block_timestamp, + tx_hash, + response_json :transaction :: variant AS transaction_json, + response_json :transaction_outcome :outcome :: variant AS outcome_json, + response_json :status :: variant AS status_json, + response_json :receipts_outcome :: ARRAY AS receipts_outcome_json, + response_json :status :Failure IS NULL AS tx_succeeded, + partition_key AS _partition_by_block_number + FROM + {{ ref('silver__transactions_v2') }} + + {% if var("MANUAL_FIX") %} + WHERE + {{ partition_load_manual('no_buffer') }} + {% else %} + + {% if is_incremental() %} + WHERE + modified_timestamp >= ( + SELECT + MAX(modified_timestamp) + FROM + {{ this }} + ) + {% endif %} + + {% endif %} ), -transaction_full as ( - select - -- shard_id, - -- chunk_hash, - origin_block_id AS block_id, - origin_block_timestamp AS block_timestamp, - tx_hash, - response_json :transaction:actions :: ARRAY as actions, - response_json :transaction:nonce :: INT as nonce, - response_json:transaction:priority_fee :: INT as priority_fee, - response_json:transaction:public_key :: STRING as public_key, - response_json:transaction:receiver_id :: STRING as receiver_id, - response_json:transaction:signature :: STRING as signature, - response_json:transaction:signer_id :: STRING as signer_id, - response_json:transaction_outcome:block_hash :: STRING as block_hash, - response_json:transaction_outcome:outcome :: variant as outcome_json, - response_json:status::VARIANT as status_json - from txs_with_receipts -) --- do we still want to calculate total used gas ? --- well, no need to pull in any other table anymore as the outcome exists in the underlying record --- can just flatten array and sum values -actions AS ( +determine_receipt_gas_burnt AS ( SELECT tx_hash, SUM( - VALUE :FunctionCall :gas :: NUMBER - ) AS attached_gas + VALUE :outcome :gas_burnt :: INT + ) AS total_gas_burnt_receipts, + SUM( + VALUE :outcome :tokens_burnt :: INT + ) AS total_tokens_burnt_receipts FROM - base_transactions, - LATERAL FLATTEN( - input => tx :actions + txs_with_receipts, + LATERAL FLATTEN ( + input => receipts_outcome_json ) GROUP BY 1 ), -transactions AS ( - SELECT - block_id, - tx :outcome :block_hash :: STRING AS block_hash, - tx_hash, - block_timestamp, - tx :nonce :: NUMBER AS nonce, - tx :signature :: STRING AS signature, - tx :receiver_id :: STRING AS tx_receiver, - tx :signer_id :: STRING AS tx_signer, - tx, - tx :outcome :outcome :gas_burnt :: NUMBER AS transaction_gas_burnt, - tx :outcome :outcome :tokens_burnt :: NUMBER AS transaction_tokens_burnt, - _partition_by_block_number, - _inserted_timestamp - FROM - base_transactions -), -gas_burnt AS ( +determine_attached_gas AS ( SELECT tx_hash, - SUM(gas_burnt) AS receipt_gas_burnt, - SUM(execution_outcome :outcome :tokens_burnt :: NUMBER) AS receipt_tokens_burnt + SUM( + VALUE :FunctionCall :gas :: INT + ) AS total_attached_gas FROM - int_receipts - WHERE - execution_outcome :outcome: tokens_burnt :: NUMBER != 0 - GROUP BY + txs_with_receipts, + LATERAL FLATTEN ( + input => transaction_json :actions :: ARRAY + ) + GROUP BY 1 ), -determine_tx_status AS ( +transactions_final AS ( SELECT - DISTINCT tx_hash, - LAST_VALUE( - receipt_succeeded - ) over ( - PARTITION BY tx_hash - ORDER BY - block_id ASC - ) AS tx_succeeded - FROM - int_receipts -), -FINAL AS ( - SELECT - t.block_id, - t.block_hash, + chunk_hash, + shard_id, + block_hash, + block_id, + block_timestamp, t.tx_hash, - t.block_timestamp, - t.nonce, - t.signature, - t.tx_receiver, - t.tx_signer, - t.tx, - t.transaction_gas_burnt + g.receipt_gas_burnt AS gas_used, - t.transaction_tokens_burnt + g.receipt_tokens_burnt AS transaction_fee, - COALESCE( - actions.attached_gas, - gas_used - ) AS attached_gas, - s.tx_succeeded, - IFF ( - tx_succeeded, - 'Success', - 'Fail' - ) AS tx_status, -- DEPRECATE TX_STATUS IN GOLD - t._partition_by_block_number, - t._inserted_timestamp + transaction_json, + outcome_json, + status_json, + total_gas_burnt_receipts, + total_tokens_burnt_receipts, + total_attached_gas, + tx_succeeded, + _partition_by_block_number FROM - transactions AS t - INNER JOIN determine_tx_status s - ON t.tx_hash = s.tx_hash - INNER JOIN actions - ON t.tx_hash = actions.tx_hash - INNER JOIN gas_burnt g - ON t.tx_hash = g.tx_hash + txs_with_receipts t + LEFT JOIN determine_receipt_gas_burnt d USING (tx_hash) + LEFT JOIN determine_attached_gas A USING (tx_hash) ) SELECT - tx_hash, - block_id, + chunk_hash, + shard_id, block_hash, + block_id, block_timestamp, - nonce, - signature, - tx_receiver, - tx_signer, - tx, - gas_used, - transaction_fee, - attached_gas, + tx_hash, + transaction_json :actions :: ARRAY AS actions, + transaction_json :nonce :: INT AS nonce, + transaction_json :priority_fee :: INT AS priority_fee, + transaction_json :public_key :: STRING AS public_key, + transaction_json :receiver_id :: STRING AS receiver_id, + transaction_json :signature :: STRING AS signature, + transaction_json :signer_id :: STRING AS signer_id, + outcome_json, + status_json, tx_succeeded, - tx_status, + outcome_json :outcome :gas_burnt :: INT + total_gas_burnt_receipts AS gas_used, + outcome_json :outcome :tokens_burnt :: INT + total_tokens_burnt_receipts AS transaction_fee, + COALESCE( + total_attached_gas, + gas_used + ) AS attached_gas, _partition_by_block_number, - _inserted_timestamp, {{ dbt_utils.generate_surrogate_key( ['tx_hash'] - ) }} AS streamline_transactions_final_id, + ) }} AS transactions_final_id, SYSDATE() AS inserted_timestamp, SYSDATE() AS modified_timestamp, '{{ invocation_id }}' AS _invocation_id FROM - FINAL - + transactions_final +{% endif %} diff --git a/models/silver/core/silver__transactions_v2.sql b/models/silver/core/silver__transactions_v2.sql index 6435c6f..d1835c6 100644 --- a/models/silver/core/silver__transactions_v2.sql +++ b/models/silver/core/silver__transactions_v2.sql @@ -15,6 +15,8 @@ WITH bronze_transactions AS ( SELECT VALUE :BLOCK_ID :: INT AS origin_block_id, VALUE :BLOCK_TIMESTAMP_EPOCH :: INT AS origin_block_timestamp_epoch, + VALUE :SHARD_ID :: INT AS shard_id, + VALUE :CHUNK_HASH :: STRING AS chunk_hash, DATA :transaction :hash :: STRING AS tx_hash, DATA :transaction :signer_id :: STRING AS signer_id, partition_key, @@ -41,6 +43,8 @@ SELECT origin_block_id, origin_block_timestamp_epoch, TO_TIMESTAMP_NTZ(origin_block_timestamp_epoch, 9) AS origin_block_timestamp, + shard_id, + chunk_hash, tx_hash, signer_id, partition_key,