native transfers

This commit is contained in:
Jack Forgash 2025-04-24 14:35:39 -06:00
parent 47a4f9a314
commit 492c6e3dc0
6 changed files with 284 additions and 168 deletions

View File

@ -514,4 +514,81 @@ Query Changes:
- Improved filtering by using native fields from `core__ez_actions`
- Maintained same business logic for identifying and processing staking pool transactions
## silver__token_transfer_deposit
### Major Changes
- Refactored to use `core__ez_actions` instead of `silver__actions_events_function_call_s3`
- Improved data quality by adding explicit receipt and transaction success checks
### Architecture Changes
- Removed dependency on deprecated `silver__actions_events_function_call_s3`
- Consolidated data sourcing into a single CTE from `core__ez_actions`
- Improved incremental processing with dynamic range predicate
### Column Changes
#### Removed
- `action_id` - Replaced with `receipt_id` and `action_index` for better granularity
#### Added
- `receipt_id` - Added for better transaction tracking
- `action_index` - Added to handle multiple actions within a receipt
- `modified_timestamp` - Added for incremental processing
### Configuration Changes
- Added dynamic range predicate for incremental processing
- Updated clustering keys to include both `block_timestamp::DATE` and `modified_timestamp::DATE`
- Changed surrogate key to use `receipt_id`, `action_index`, `predecessor_id`, `receiver_id`, and `amount_unadj`
- Added search optimization on `EQUALITY(tx_hash,receipt_id,predecessor_id,receiver_id)`
### Query Changes
- Simplified data sourcing by using `core__ez_actions` directly
- Added explicit success checks with `receipt_succeeded` and `tx_succeeded`
- Improved filtering by using native fields from `core__ez_actions`
- Maintained same business logic for identifying and processing token transfers
## silver__token_transfer_native
### Major Changes
- Refactored to use `core__ez_actions` instead of `silver__actions_events_s3`
- Improved data quality by adding explicit receipt and transaction success checks
- Enhanced tokens_burnt calculation using action_gas_price and receipt_gas_burnt
### Architecture Changes
- Removed dependency on deprecated `silver__actions_events_s3`
- Consolidated data sourcing into a single CTE from `core__ez_actions`
- Improved incremental processing with dynamic range predicate
### Column Changes
#### Removed
- `action_id` - Replaced with `receipt_id` and `action_index` for better granularity
- `_inserted_timestamp` - Deprecated column
#### Added
- `receipt_id` - Added for better transaction tracking
- `action_index` - Added to handle multiple actions within a receipt
- `modified_timestamp` - Added for incremental processing
#### Modified
- Changed source of gas fields to use native fields from `core__ez_actions`:
* `gas_price` now uses `action_gas_price`
* `gas_burnt` now uses `receipt_gas_burnt`
* `tokens_burnt` calculation updated to use these new fields
- Updated source of account fields to use receipt-level fields:
* `predecessor_id` from `receipt_predecessor_id`
* `receiver_id` from `receipt_receiver_id`
* `signer_id` from `receipt_signer_id`
### Configuration Changes
- Added dynamic range predicate for incremental processing
- Updated clustering keys to include both `block_timestamp::DATE` and `modified_timestamp::DATE`
- Changed surrogate key to use `receipt_id`, `action_index`, `predecessor_id`, `receiver_id`, and `amount_unadj`
- Added search optimization on `EQUALITY(tx_hash,receipt_id,predecessor_id,receiver_id)`
### Query Changes
- Simplified data sourcing by using `core__ez_actions` directly
- Added explicit success checks with `receipt_succeeded` and `tx_succeeded`
- Improved filtering by using native fields from `core__ez_actions`
- Enhanced tokens_burnt calculation using proper gas price and burnt values
- Maintained same business logic for identifying and processing native transfers
---

View File

@ -3,43 +3,70 @@ version: 2
models:
- name: silver__staking_pools_s3
description: |-
This table extracts all staking pools registered with NEAR.
This table extracts all staking pools registered with NEAR using core__ez_actions.
It captures both pool creation events and reward fee fraction updates.
columns:
- name: tx_hash
description: "{{ doc('tx_hash')}}"
tests:
- not_null
- name: block_timestamp
description: "{{ doc('block_timestamp')}}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_of_type:
column_type: TIMESTAMP_NTZ
- name: block_id
description: "{{ doc('block_id')}}"
tests:
- not_null
- name: receipt_id
description: "{{ doc('receipt_id')}}"
tests:
- not_null
- name: owner
description: "{{ doc('staking_pool_owner')}}"
tests:
- not_null
- name: address
description: "{{ doc('staking_pool_address')}}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_match_regex:
regex: '.*pool.*'
- name: reward_fee_fraction
description: "{{ doc('staking_pool_reward_fee_fraction')}}"
tests:
- not_null
- name: tx_type
description: "{{ doc('staking_pool_tx_type') }}"
tests:
- not_null
- accepted_values:
values: ['Create', 'Update']
- name: _PARTITION_BY_BLOCK_NUMBER
- name: _partition_by_block_number
description: "{{ doc('_partition_by_block_number')}}"
- name: _INSERTED_TIMESTAMP
description: "{{ doc('_inserted_timestamp')}}"
- name: STAKING_POOLS_ID
- name: staking_pools_id
description: "{{doc('id')}}"
tests:
- unique
- not_null
- name: INSERTED_TIMESTAMP
- name: inserted_timestamp
description: "{{doc('inserted_timestamp')}}"
- name: MODIFIED_TIMESTAMP
- name: modified_timestamp
description: "{{doc('modified_timestamp')}}"
- name: _INVOCATION_ID
- name: _invocation_id
description: "{{doc('invocation_id')}}"

View File

@ -1,66 +1,66 @@
{{ config(
materialized = 'incremental',
merge_exclude_columns = ["inserted_timestamp"],
incremental_predicates = ["dynamic_range_predicate_custom","block_timestamp::date"],
cluster_by = ['block_timestamp::DATE'],
unique_key = 'token_transfer_deposit_id',
incremental_strategy = 'merge',
incremental_predicates = ["dynamic_range_predicate_custom","block_timestamp::date"],
merge_exclude_columns = ["inserted_timestamp"],
cluster_by = ['block_timestamp::DATE', 'modified_timestamp::DATE'],
unique_key = 'token_transfer_deposit_id',
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(tx_hash,receipt_id,predecessor_id,receiver_id);",
tags = ['curated','scheduled_non_core']
) }}
WITH functioncalls AS (
WITH transfers AS (
SELECT
action_id,
tx_hash,
block_id,
block_timestamp,
predecessor_id,
signer_id,
receiver_id,
deposit,
receipt_id,
action_index,
receipt_predecessor_id AS predecessor_id,
receipt_signer_id AS signer_id,
receipt_receiver_id AS receiver_id,
action_data :deposit :: INT AS amount_unadj,
receipt_succeeded,
_inserted_timestamp,
_partition_by_block_number
FROM
{{ ref('silver__actions_events_function_call_s3') }}
{{ ref('core__ez_actions') }}
WHERE
deposit :: INT > 0
action_name = 'FunctionCall'
AND receipt_succeeded
AND action_data :deposit :: INT > 0
{% if var("MANUAL_FIX") %}
AND {{ partition_load_manual('no_buffer') }}
{% else %}
{% if is_incremental() %}
AND modified_timestamp >= (
SELECT
MAX(modified_timestamp)
FROM
{{ this }}
)
{% endif %}
{% endif %}
{% if is_incremental() %}
AND modified_timestamp >= (
SELECT
MAX(modified_timestamp)
FROM
{{ this }}
)
{% endif %}
{% endif %}
)
SELECT
action_id,
tx_hash,
block_id,
block_timestamp,
receipt_id,
action_index,
predecessor_id,
signer_id,
receiver_id,
deposit AS amount_unadj,
deposit :: DOUBLE / pow(
10,
24
) AS amount_adj,
amount_unadj,
amount_unadj :: DOUBLE / pow(10, 24) AS amount_adj,
receipt_succeeded,
_partition_by_block_number,
_inserted_timestamp,
{{ dbt_utils.generate_surrogate_key(
['action_id', 'predecessor_id', 'receiver_id', 'amount_unadj']
['receipt_id', 'action_index', 'predecessor_id', 'receiver_id', 'amount_unadj']
) }} AS token_transfer_deposit_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
functioncalls
transfers

View File

@ -3,79 +3,87 @@ version: 2
models:
- name: silver__token_transfer_deposit
description: |-
This table records all the positive Deposit events from FunctionCalls.
This table records all positive NEAR token deposits from FunctionCalls using core__ez_actions.
It captures native NEAR token transfers through the deposit field.
columns:
- name: TX_HASH
- name: tx_hash
description: "{{ doc('tx_hash')}}"
tests:
- not_null
- name: ACTION_ID
description: "{{ doc('action_id')}}"
tests:
- not_null
- name: BLOCK_ID
- name: block_id
description: "{{ doc('block_id')}}"
tests:
- not_null
- name: BLOCK_TIMESTAMP
- name: block_timestamp
description: "{{ doc('block_timestamp')}}"
tests:
- not_null:
where: _inserted_timestamp <= current_timestamp - interval '1 hour'
- not_null
- dbt_expectations.expect_column_values_to_be_of_type:
column_type: TIMESTAMP_NTZ
- name: AMOUNT_UNADJ
description: "{{ doc('amount_unadj')}}"
- name: receipt_id
description: "{{ doc('receipt_id')}}"
tests:
- not_null
- name: AMOUNT_ADJ
description: "{{ doc('amount_adj')}}"
- name: action_index
description: "{{ doc('action_index')}}"
tests:
- not_null
- name: PREDECESSOR_ID
- name: predecessor_id
description: "{{ doc('predecessor_id')}}"
tests:
- not_null
- name: SIGNER_ID
- name: signer_id
description: "{{ doc('signer_id')}}"
tests:
- not_null
- name: RECEIVER_ID
- name: receiver_id
description: "{{ doc('receiver_id')}}"
tests:
- not_null
- name: RECEIPT_SUCCEEDED
- name: amount_unadj
description: "{{ doc('amount_unadj')}}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_greater_than:
value: 0
- name: amount_adj
description: "{{ doc('amount_adj')}}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_greater_than:
value: 0
- name: receipt_succeeded
description: "{{ doc('receipt_succeeded')}}"
tests:
- not_null
- accepted_values:
values: [TRUE]
- name: _PARTITION_BY_BLOCK_NUMBER
- name: _partition_by_block_number
description: "{{ doc('_partition_by_block_number')}}"
- name: _INSERTED_TIMESTAMP
description: "{{ doc('_inserted_timestamp')}}"
- name: TOKEN_TRANSFER_DEPOSIT_ID
- name: token_transfer_deposit_id
description: "{{doc('id')}}"
tests:
- not_null
- unique
- not_null
- name: INSERTED_TIMESTAMP
- name: inserted_timestamp
description: "{{doc('inserted_timestamp')}}"
- name: MODIFIED_TIMESTAMP
- name: modified_timestamp
description: "{{doc('modified_timestamp')}}"
- name: _INVOCATION_ID
- name: _invocation_id
description: "{{doc('invocation_id')}}"

View File

@ -1,21 +1,58 @@
{{ config(
materialized = 'incremental',
merge_exclude_columns = ["inserted_timestamp"],
incremental_predicates = ["dynamic_range_predicate_custom","block_timestamp::date"],
cluster_by = ['block_timestamp::DATE'],
unique_key = 'token_transfer_native_id',
incremental_strategy = 'merge',
tags = ['curated','scheduled_non_core']
materialized = 'incremental',
incremental_strategy = 'merge',
incremental_predicates = ["dynamic_range_predicate_custom","block_timestamp::date"],
merge_exclude_columns = ["inserted_timestamp"],
cluster_by = ['block_timestamp::DATE', 'modified_timestamp::DATE'],
unique_key = 'token_transfer_native_id',
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(tx_hash,receipt_id,predecessor_id,receiver_id);",
tags = ['curated','scheduled_non_core']
) }}
WITH action_events AS(
WITH transfers AS (
SELECT
tx_hash,
block_id,
block_timestamp,
receipt_id,
action_index,
action_data :deposit :: STRING AS amount_unadj,
receipt_predecessor_id AS predecessor_id,
receipt_receiver_id AS receiver_id,
receipt_signer_id AS signer_id,
receipt_succeeded,
action_gas_price AS gas_price,
receipt_gas_burnt AS gas_burnt,
receipt_gas_burnt * action_gas_price / pow(10, 24) AS tokens_burnt,
_partition_by_block_number
FROM
{{ ref('core__ez_actions') }}
WHERE
action_name = 'Transfer'
AND receipt_succeeded
SELECT
{% if var("MANUAL_FIX") %}
AND {{ partition_load_manual('no_buffer') }}
{% else %}
{% if is_incremental() %}
AND modified_timestamp >= (
SELECT
MAX(modified_timestamp)
FROM
{{ this }}
)
{% endif %}
{% endif %}
)
SELECT
tx_hash,
block_id,
block_timestamp,
action_id,
action_data :deposit :: STRING AS amount_unadj,
receipt_id,
action_index,
amount_unadj,
amount_unadj :: DOUBLE / pow(10, 24) AS amount_adj,
predecessor_id,
receiver_id,
signer_id,
@ -24,52 +61,11 @@ WITH action_events AS(
gas_burnt,
tokens_burnt,
_partition_by_block_number,
_inserted_timestamp
FROM
{{ ref('silver__actions_events_s3') }}
WHERE
action_name = 'Transfer'
AND
receipt_succeeded
{% if var("MANUAL_FIX") %}
AND {{ partition_load_manual('no_buffer') }}
{% else %}
{% if is_incremental() %}
AND modified_timestamp >= (
SELECT
MAX(modified_timestamp)
FROM
{{ this }}
)
{% endif %}
{% endif %}
)
SELECT
tx_hash,
block_id,
block_timestamp,
action_id,
amount_unadj,
amount_unadj :: DOUBLE / pow(
10,
24
) AS amount_adj,
predecessor_id,
receiver_id,
signer_id,
receipt_succeeded,
gas_price,
gas_burnt,
tokens_burnt,
_partition_by_block_number,
_inserted_timestamp,
{{ dbt_utils.generate_surrogate_key(
['action_id', 'predecessor_id', 'receiver_id', 'amount_unadj']
) }} AS token_transfer_native_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
{{ dbt_utils.generate_surrogate_key(
['receipt_id', 'action_index', 'predecessor_id', 'receiver_id', 'amount_unadj']
) }} AS token_transfer_native_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
action_events
transfers

View File

@ -3,94 +3,102 @@ version: 2
models:
- name: silver__token_transfer_native
description: |-
This table records all the Transfer actions of the Near blockchain.
This table records all native NEAR token transfers using core__ez_actions.
It captures direct Transfer actions on the NEAR blockchain.
columns:
- name: TX_HASH
- name: tx_hash
description: "{{ doc('tx_hash')}}"
tests:
- not_null
- name: ACTION_ID
description: "{{ doc('action_id')}}"
tests:
- not_null
- name: BLOCK_ID
- name: block_id
description: "{{ doc('block_id')}}"
tests:
- not_null
- name: BLOCK_TIMESTAMP
- name: block_timestamp
description: "{{ doc('block_timestamp')}}"
tests:
- not_null:
where: _inserted_timestamp <= current_timestamp - interval '1 hour'
- not_null
- dbt_expectations.expect_column_values_to_be_of_type:
column_type: TIMESTAMP_NTZ
- name: AMOUNT_UNADJ
- name: receipt_id
description: "{{ doc('receipt_id')}}"
tests:
- not_null
- name: action_index
description: "{{ doc('action_index')}}"
tests:
- not_null
- name: amount_unadj
description: "{{ doc('amount_unadj')}}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_greater_than:
value: 0
- name: AMOUNT_ADJ
- name: amount_adj
description: "{{ doc('amount_adj')}}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_greater_than:
value: 0
- name: PREDECESSOR_ID
- name: predecessor_id
description: "{{ doc('predecessor_id')}}"
tests:
- not_null
- name: SIGNER_ID
description: "{{ doc('signer_id')}}"
tests:
- not_null
- name: RECEIVER_ID
- name: receiver_id
description: "{{ doc('receiver_id')}}"
tests:
- not_null
- name: RECEIPT_SUCCEEDED
description: "{{ doc('receipt_succeeded')}}"
- name: signer_id
description: "{{ doc('signer_id')}}"
tests:
- not_null
- name: GAS_PRICE
- name: receipt_succeeded
description: "{{ doc('receipt_succeeded')}}"
tests:
- not_null
- accepted_values:
values: [TRUE]
- name: gas_price
description: "{{ doc('gas_price')}}"
tests:
- not_null
- name: GAS_BURNT
- name: gas_burnt
description: "{{ doc('gas_burnt')}}"
tests:
- not_null
- name: TOKENS_BURNT
- name: tokens_burnt
description: "{{ doc('tokens_burnt')}}"
tests:
- not_null
- name: _PARTITION_BY_BLOCK_NUMBER
- name: _partition_by_block_number
description: "{{ doc('_partition_by_block_number')}}"
- name: _INSERTED_TIMESTAMP
description: "{{ doc('_inserted_timestamp')}}"
- name: TOKEN_TRANSFER_NATIVE_ID
- name: token_transfer_native_id
description: "{{doc('id')}}"
tests:
- not_null
- unique
- not_null
- name: INSERTED_TIMESTAMP
- name: inserted_timestamp
description: "{{doc('inserted_timestamp')}}"
- name: MODIFIED_TIMESTAMP
- name: modified_timestamp
description: "{{doc('modified_timestamp')}}"
- name: _INVOCATION_ID
- name: _invocation_id
description: "{{doc('invocation_id')}}"