migrate txs logic

This commit is contained in:
Jack Forgash 2025-02-19 15:43:11 -07:00
parent 3ea67df7bb
commit b698c5865d
5 changed files with 185 additions and 117 deletions

View File

@ -5,7 +5,7 @@
merge_exclude_columns = ['inserted_timestamp'],
unique_key = 'tx_hash',
cluster_by = ['modified_timestamp::date', '_partition_by_block_number'],
tags = ['load', 'load_shards','scheduled_core']
tags = ['load', 'load_shards', 'scheduled_core', 'deprecated_lake_archive']
) }}
WITH chunks AS (

View File

@ -0,0 +1,56 @@
{{ config(
materialized = 'ephemeral'
) }}
-- likely need to add batch logic for the migration
WITH lake_transactions_final AS (
SELECT
block_id,
block_timestamp,
tx_hash,
block_hash,
tx_succeeded,
gas_used,
transaction_fee,
attached_gas,
_partition_by_block_number
FROM
{{ ref('silver__streamline_transactions_final') }}
),
lake_transactions_int AS (
SELECT
tx_hash,
block_id,
shard_number,
chunk_hash,
tx :transaction :: variant AS transaction_json,
tx :outcome :execution_outcome :: variant AS outcome_json,
_partition_by_block_number
FROM
{{ ref('silver__streamline_transactions') }}
),
transaction_archive AS (
SELECT
i.chunk_hash,
i.shard_number AS shard_id,
f.block_hash,
f.block_id,
f.block_timestamp,
f.tx_hash,
i.transaction_json,
i.outcome_json,
f.tx_succeeded,
f.gas_used,
f.transaction_fee,
f.attached_gas,
f._partition_by_block_number
FROM
lake_transactions_final f
LEFT JOIN lake_transactions_int i
ON f.tx_hash = i.tx_hash
AND f._partition_by_block_number = i._partition_by_block_number
)
SELECT
*
FROM
transaction_archive

View File

@ -11,7 +11,7 @@
) }}
-- TODO if execute block for incremental min blockdate
WITH
{% if var('NEAR_MIGRATE_RECEIPTS', False) %}
{% if var('NEAR_MIGRATE_ARCHIVE', False) %}
lake_receipts_final as (
select * from {{ ref('silver__streamline_receipts_final') }}
-- TODO incrementally load?

View File

@ -9,147 +9,155 @@
tags = ['scheduled_core']
) }}
WITH
txs_with_receipts as (
select * from {{ ref('silver__streamline_transactions_final') }}
-- TODO incrementally load
{% if var('NEAR_MIGRATE_ARCHIVE', False) %}
SELECT
chunk_hash,
shard_id,
block_id,
block_timestamp,
tx_hash,
transaction_json :actions :: ARRAY AS actions,
transaction_json :nonce :: INT AS nonce,
transaction_json :priority_fee :: INT AS priority_fee,
transaction_json :public_key :: STRING AS public_key,
transaction_json :receiver_id :: STRING AS receiver_id,
transaction_json :signature :: STRING AS signature,
transaction_json :signer_id :: STRING AS signer_id,
outcome_json,
OBJECT_CONSTRUCT() AS status_json,
tx_succeeded,
gas_used,
transaction_fee,
attached_gas,
_partition_by_block_number,
{{ dbt_utils.generate_surrogate_key(
['tx_hash']
) }} AS transactions_final_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
{{ ref('_migrate_txs') }}
-- TODO batch logic if needed
{% else %}
WITH txs_with_receipts AS (
SELECT
chunk_hash,
shard_id,
origin_block_id AS block_id,
origin_block_timestamp AS block_timestamp,
tx_hash,
response_json :transaction :: variant AS transaction_json,
response_json :transaction_outcome :outcome :: variant AS outcome_json,
response_json :status :: variant AS status_json,
response_json :receipts_outcome :: ARRAY AS receipts_outcome_json,
response_json :status :Failure IS NULL AS tx_succeeded,
partition_key AS _partition_by_block_number
FROM
{{ ref('silver__transactions_v2') }}
{% if var("MANUAL_FIX") %}
WHERE
{{ partition_load_manual('no_buffer') }}
{% else %}
{% if is_incremental() %}
WHERE
modified_timestamp >= (
SELECT
MAX(modified_timestamp)
FROM
{{ this }}
)
{% endif %}
{% endif %}
),
transaction_full as (
select
-- shard_id,
-- chunk_hash,
origin_block_id AS block_id,
origin_block_timestamp AS block_timestamp,
tx_hash,
response_json :transaction:actions :: ARRAY as actions,
response_json :transaction:nonce :: INT as nonce,
response_json:transaction:priority_fee :: INT as priority_fee,
response_json:transaction:public_key :: STRING as public_key,
response_json:transaction:receiver_id :: STRING as receiver_id,
response_json:transaction:signature :: STRING as signature,
response_json:transaction:signer_id :: STRING as signer_id,
response_json:transaction_outcome:block_hash :: STRING as block_hash,
response_json:transaction_outcome:outcome :: variant as outcome_json,
response_json:status::VARIANT as status_json
from txs_with_receipts
)
-- do we still want to calculate total used gas ?
-- well, no need to pull in any other table anymore as the outcome exists in the underlying record
-- can just flatten array and sum values
actions AS (
determine_receipt_gas_burnt AS (
SELECT
tx_hash,
SUM(
VALUE :FunctionCall :gas :: NUMBER
) AS attached_gas
VALUE :outcome :gas_burnt :: INT
) AS total_gas_burnt_receipts,
SUM(
VALUE :outcome :tokens_burnt :: INT
) AS total_tokens_burnt_receipts
FROM
base_transactions,
LATERAL FLATTEN(
input => tx :actions
txs_with_receipts,
LATERAL FLATTEN (
input => receipts_outcome_json
)
GROUP BY
1
),
transactions AS (
SELECT
block_id,
tx :outcome :block_hash :: STRING AS block_hash,
tx_hash,
block_timestamp,
tx :nonce :: NUMBER AS nonce,
tx :signature :: STRING AS signature,
tx :receiver_id :: STRING AS tx_receiver,
tx :signer_id :: STRING AS tx_signer,
tx,
tx :outcome :outcome :gas_burnt :: NUMBER AS transaction_gas_burnt,
tx :outcome :outcome :tokens_burnt :: NUMBER AS transaction_tokens_burnt,
_partition_by_block_number,
_inserted_timestamp
FROM
base_transactions
),
gas_burnt AS (
determine_attached_gas AS (
SELECT
tx_hash,
SUM(gas_burnt) AS receipt_gas_burnt,
SUM(execution_outcome :outcome :tokens_burnt :: NUMBER) AS receipt_tokens_burnt
SUM(
VALUE :FunctionCall :gas :: INT
) AS total_attached_gas
FROM
int_receipts
WHERE
execution_outcome :outcome: tokens_burnt :: NUMBER != 0
GROUP BY
txs_with_receipts,
LATERAL FLATTEN (
input => transaction_json :actions :: ARRAY
)
GROUP BY
1
),
determine_tx_status AS (
transactions_final AS (
SELECT
DISTINCT tx_hash,
LAST_VALUE(
receipt_succeeded
) over (
PARTITION BY tx_hash
ORDER BY
block_id ASC
) AS tx_succeeded
FROM
int_receipts
),
FINAL AS (
SELECT
t.block_id,
t.block_hash,
chunk_hash,
shard_id,
block_hash,
block_id,
block_timestamp,
t.tx_hash,
t.block_timestamp,
t.nonce,
t.signature,
t.tx_receiver,
t.tx_signer,
t.tx,
t.transaction_gas_burnt + g.receipt_gas_burnt AS gas_used,
t.transaction_tokens_burnt + g.receipt_tokens_burnt AS transaction_fee,
COALESCE(
actions.attached_gas,
gas_used
) AS attached_gas,
s.tx_succeeded,
IFF (
tx_succeeded,
'Success',
'Fail'
) AS tx_status, -- DEPRECATE TX_STATUS IN GOLD
t._partition_by_block_number,
t._inserted_timestamp
transaction_json,
outcome_json,
status_json,
total_gas_burnt_receipts,
total_tokens_burnt_receipts,
total_attached_gas,
tx_succeeded,
_partition_by_block_number
FROM
transactions AS t
INNER JOIN determine_tx_status s
ON t.tx_hash = s.tx_hash
INNER JOIN actions
ON t.tx_hash = actions.tx_hash
INNER JOIN gas_burnt g
ON t.tx_hash = g.tx_hash
txs_with_receipts t
LEFT JOIN determine_receipt_gas_burnt d USING (tx_hash)
LEFT JOIN determine_attached_gas A USING (tx_hash)
)
SELECT
tx_hash,
block_id,
chunk_hash,
shard_id,
block_hash,
block_id,
block_timestamp,
nonce,
signature,
tx_receiver,
tx_signer,
tx,
gas_used,
transaction_fee,
attached_gas,
tx_hash,
transaction_json :actions :: ARRAY AS actions,
transaction_json :nonce :: INT AS nonce,
transaction_json :priority_fee :: INT AS priority_fee,
transaction_json :public_key :: STRING AS public_key,
transaction_json :receiver_id :: STRING AS receiver_id,
transaction_json :signature :: STRING AS signature,
transaction_json :signer_id :: STRING AS signer_id,
outcome_json,
status_json,
tx_succeeded,
tx_status,
outcome_json :outcome :gas_burnt :: INT + total_gas_burnt_receipts AS gas_used,
outcome_json :outcome :tokens_burnt :: INT + total_tokens_burnt_receipts AS transaction_fee,
COALESCE(
total_attached_gas,
gas_used
) AS attached_gas,
_partition_by_block_number,
_inserted_timestamp,
{{ dbt_utils.generate_surrogate_key(
['tx_hash']
) }} AS streamline_transactions_final_id,
) }} AS transactions_final_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
FINAL
transactions_final
{% endif %}

View File

@ -15,6 +15,8 @@ WITH bronze_transactions AS (
SELECT
VALUE :BLOCK_ID :: INT AS origin_block_id,
VALUE :BLOCK_TIMESTAMP_EPOCH :: INT AS origin_block_timestamp_epoch,
VALUE :SHARD_ID :: INT AS shard_id,
VALUE :CHUNK_HASH :: STRING AS chunk_hash,
DATA :transaction :hash :: STRING AS tx_hash,
DATA :transaction :signer_id :: STRING AS signer_id,
partition_key,
@ -41,6 +43,8 @@ SELECT
origin_block_id,
origin_block_timestamp_epoch,
TO_TIMESTAMP_NTZ(origin_block_timestamp_epoch, 9) AS origin_block_timestamp,
shard_id,
chunk_hash,
tx_hash,
signer_id,
partition_key,