nft transfers and start a changelog

This commit is contained in:
Jack Forgash 2025-04-15 11:06:42 -06:00
parent d863635805
commit 5b67eda0a0
7 changed files with 111 additions and 82 deletions

View File

@ -0,0 +1,40 @@
# Migration Changelog: Actions Model Refactor
This changelog tracks all model changes related to the deprecation of silver actions models in favor of core__ez_actions.
## silver__nft_transfers
### Major Changes
- **REQUIRES FULL REFRESH** - Model has been completely refactored to use `silver__logs_s3` directly
- Previous version had data duplication due to incorrect handling of many-to-many relationship between actions and logs
- New version correctly processes NFT transfer logs without duplication
- No manual column changes needed as model will be rebuilt from scratch
### Column Changes
#### Columns Removed
- `action_id` (was used for unique identification, replaced by receipt_id)
- `_inserted_timestamp` (deprecated)
#### Columns Added
- `receipt_id` (from logs table)
- `predecessor_id` (from logs table)
- `_partition_by_block_number` (calculated as `FLOOR(block_id, -3)`)
#### Column Modifications
- Changed `contract_id` to `receiver_id` in initial CTE, still aliased as `contract_address` in final output
- Added type casting for `token_ids` as ARRAY
- Added type casting for event parsing with `TRY_PARSE_JSON(clean_log)`
### Configuration Changes
- Updated incremental predicates to use `dynamic_range_predicate_custom`
- Added `modified_timestamp::DATE` to clustering keys
- Updated search optimization to use raw column names (removed block_timestamp::DATE expression)
- Updated unique key components in surrogate key generation
### Downstream Impact
The gold model `nft__fact_nft_transfers.sql` needs to be updated:
1. Replace `action_id` with `receipt_id` in SELECT statement
2. Remove `_inserted_timestamp` from COALESCE logic in inserted_timestamp/modified_timestamp
---

View File

@ -0,0 +1,5 @@
{% docs batch_index %}
The index of an individual mint in a batch transaction.
{% enddocs %}

View File

@ -0,0 +1,6 @@
{% docs log_counter %}
The count of the referenced log in a receipt.
{% enddocs %}

View File

@ -0,0 +1,6 @@
{% docs token_index %}
The index of a token in an array from a batch transaction of some kind.
{% enddocs %}

View File

@ -16,13 +16,13 @@ SELECT
block_id,
block_timestamp,
tx_hash,
action_id,
receipt_id,
contract_address,
from_address,
to_address,
token_id,
nft_transfers_id AS fact_nft_transfers_id,
COALESCE(inserted_timestamp, _inserted_timestamp, '2000-01-01' :: TIMESTAMP_NTZ) AS inserted_timestamp,
COALESCE(modified_timestamp, _inserted_timestamp, '2000-01-01' :: TIMESTAMP_NTZ) AS modified_timestamp
COALESCE(inserted_timestamp, '2000-01-01' :: TIMESTAMP_NTZ) AS inserted_timestamp,
COALESCE(modified_timestamp, '2000-01-01' :: TIMESTAMP_NTZ) AS modified_timestamp
FROM
nft_token_transfers

View File

@ -1,35 +1,39 @@
{{ config(
materialized = 'incremental',
incremental_predicates = ["COALESCE(DBT_INTERNAL_DEST.block_timestamp::DATE,'2099-12-31') >= (select min(block_timestamp::DATE) from " ~ generate_tmp_view_name(this) ~ ")"],
incremental_predicates = ["dynamic_range_predicate_custom","block_timestamp::date"],
merge_exclude_columns = ["inserted_timestamp"],
cluster_by = ['block_timestamp::DATE'],
cluster_by = ['block_timestamp::DATE', 'modified_timestamp::DATE'],
unique_key = 'nft_transfers_id',
incremental_strategy = 'merge',
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(tx_hash,action_id,contract_address,from_address,to_address,token_id);",
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(tx_hash,receipt_id,contract_address,from_address,to_address,token_id);",
tags = ['curated','scheduled_non_core']
) }}
WITH actions_events AS (
WITH nft_logs AS (
SELECT
block_id,
block_timestamp,
tx_hash,
action_id,
receipt_id,
signer_id,
receiver_id,
logs,
_inserted_timestamp,
_partition_by_block_number
predecessor_id,
TRY_PARSE_JSON(clean_log) AS DATA,
log_index AS logs_rn,
FLOOR(block_id, -3) AS _partition_by_block_number
FROM
{{ ref('silver__actions_events_function_call_s3') }}
{{ ref('silver__logs_s3') }}
WHERE
receipt_succeeded = TRUE
AND logs [0] IS NOT NULL
is_standard
AND receipt_succeeded
AND TRY_PARSE_JSON(clean_log) :event :: STRING IN (
'nft_transfer',
'nft_mint'
)
{% if var("MANUAL_FIX") %}
AND {{ partition_load_manual('no_buffer') }}
AND {{ partition_load_manual('no_buffer', 'floor(block_id, -3)') }}
{% else %}
{% if is_incremental() %}
{% if is_incremental() %}
AND modified_timestamp >= (
SELECT
MAX(modified_timestamp)
@ -38,40 +42,15 @@ WITH actions_events AS (
)
{% endif %}
{% endif %}
),
-------------------------------- NFT Transfers --------------------------------
nft_logs AS (
SELECT
block_id,
signer_id,
block_timestamp,
tx_hash,
action_id,
TRY_PARSE_JSON(REPLACE(b.value, 'EVENT_JSON:')) AS DATA,
receiver_id AS contract_id,
b.index as logs_rn,
_inserted_timestamp,
_partition_by_block_number
FROM
actions_events
JOIN LATERAL FLATTEN(
input => logs
) b
WHERE
DATA :event IN (
'nft_transfer',
'nft_mint'
)
),
-------------------------------- FINAL --------------------------------
nft_transfers AS (
SELECT
block_id,
block_timestamp,
tx_hash,
action_id,
contract_id :: STRING AS contract_address,
receipt_id,
predecessor_id,
receiver_id AS contract_address,
COALESCE(
A.value :old_owner_id,
signer_id
@ -80,62 +59,54 @@ nft_transfers AS (
A.value :new_owner_id,
A.value :owner_id
) :: STRING AS to_address,
A.value :token_ids AS token_ids,
A.value :token_ids :: ARRAY AS token_ids,
token_ids [0] :: STRING AS token_id,
logs_rn + A.index as transfer_rn,
_inserted_timestamp,
_partition_by_block_number
FROM
nft_logs
JOIN LATERAL FLATTEN(
input => DATA :data
nft_logs,
LATERAL FLATTEN(
input => DATA :data :: ARRAY
) A
WHERE
token_id IS NOT NULL
),
nft_final AS (
SELECT
block_id,
block_timestamp,
tx_hash,
action_id,
receipt_id,
predecessor_id,
contract_address,
from_address,
to_address,
B.value :: STRING AS token_id,
transfer_rn + B.index as rn,
_inserted_timestamp,
_partition_by_block_number
FROM
nft_transfers
JOIN LATERAL FLATTEN(
nft_transfers,
LATERAL FLATTEN(
input => token_ids
) B
),
FINAL AS (
SELECT
block_id,
block_timestamp,
tx_hash,
action_id,
rn,
contract_address,
from_address,
to_address,
token_id,
_inserted_timestamp,
_partition_by_block_number
FROM
nft_final
)
SELECT
*,
block_id,
block_timestamp,
tx_hash,
receipt_id,
predecessor_id,
rn,
contract_address,
from_address,
to_address,
token_id,
_partition_by_block_number,
{{ dbt_utils.generate_surrogate_key(
['tx_hash', 'action_id','contract_address','from_address','to_address','token_id','rn']
['tx_hash', 'receipt_id', 'contract_address', 'from_address', 'to_address', 'token_id', 'rn']
) }} AS nft_transfers_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
FINAL
nft_final

View File

@ -6,7 +6,7 @@ models:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- tx_hash
- action_id
- receipt_id
- contract_address
- from_address
- to_address
@ -28,12 +28,15 @@ models:
- name: TX_HASH
description: "{{ doc('tx_hash')}}"
- name: RECEIPT_ID
description: "{{ doc('receipt_id')}}"
- name: PREDECESSOR_ID
description: "{{ doc('predecessor_id')}}"
- name: RN
description: "Row number"
- name: ACTION_ID
description: "{{ doc('action_id')}}"
- name: CONTRACT_ADDRESS
description: "{{ doc('tx_signer')}}"
@ -46,10 +49,8 @@ models:
- name: TOKEN_ID
description: "{{ doc('token_id')}}"
- name: _INSERTED_TIMESTAMP
description: "{{ doc('_inserted_timestamp')}}"
- name: _PARTITION_BY_BLOCK_NUMBER
description: "{{ doc('_partition_by_block_number')}}"
- name: NFT_TRANSFERS_ID
description: "{{doc('id')}}"