total transfers overhaul

This commit is contained in:
Jack Forgash 2025-04-25 12:11:05 -06:00
parent 492c6e3dc0
commit 332e3e3fdc
19 changed files with 1402 additions and 538 deletions

View File

@ -591,4 +591,336 @@ Query Changes:
- Enhanced tokens_burnt calculation using proper gas price and burnt values
- Maintained same business logic for identifying and processing native transfers
## silver__token_transfer_base
### Major Changes
- Refactored to use `core__ez_actions` and `silver__logs_s3` instead of `silver__actions_events_function_call_s3`
- Improved data quality by adding explicit receipt and transaction success checks
- Enhanced log validation by using `silver__logs_s3` directly
### Architecture Changes
- Split data sourcing into two CTEs:
1. `base_actions` CTE from `core__ez_actions` for function call data
2. `logs_check` CTE from `silver__logs_s3` for log validation
- Improved incremental processing with dynamic range predicate
- Changed log validation to use clean_log from `silver__logs_s3`
### Column Changes
#### Removed
- `action_id` - Replaced with `receipt_id` and `action_index` for better granularity
- `_inserted_timestamp` - Deprecated column
- Removed dependency on deprecated `logs` column from actions table
#### Added
- `receipt_id` - Added for better transaction tracking
- `action_index` - Added to handle multiple actions within a receipt (always 0 for this model)
#### Modified
- Changed source of account fields to use receipt-level fields:
* `predecessor_id` from `receipt_predecessor_id`
* `receiver_id` from `receipt_receiver_id`
* `signer_id` from `receipt_signer_id`
- Changed `method_name` to parse from `action_data :method_name`
- Changed `deposit` to parse from `action_data :deposit`
### Configuration Changes
- Added dynamic range predicate for incremental processing
- Updated clustering keys to include both `block_timestamp::DATE` and `modified_timestamp::DATE`
- Changed surrogate key to use `receipt_id` and `action_index`
- Added search optimization on `EQUALITY(tx_hash,receipt_id,predecessor_id,receiver_id)`
### Query Changes
- Simplified data sourcing by using `core__ez_actions` directly
- Added explicit success checks with `receipt_succeeded` and `tx_succeeded`
- Improved filtering by using native fields from `core__ez_actions`
- Changed log validation to use proper join with `silver__logs_s3`
- Maintained same business logic for identifying first actions with logs
## silver__token_transfer_ft_transfers_event
### Major Changes
- Model has been refactored to use `silver__logs_s3` directly instead of `token_transfer_base`
- Improved log handling by directly filtering for standard EVENT_JSON logs with ft_transfer events
- Maintains same business logic for capturing FT transfer events
### Architecture Changes
- Removed dependency on deprecated `silver__token_transfer_base`
- Now sources data directly from `silver__logs_s3`
- Added upfront filtering for standard EVENT_JSON logs and ft_transfer events
- Improved event indexing using log_index
### Column Changes
- Added `receipt_id` for better transaction tracking
- Added `predecessor_id` and `signer_id` from logs table
- Renamed `rn` to `event_index` for clarity
- Changed `contract_address` to come directly from `receiver_id` in logs
- Removed `action_id` (replaced by receipt_id)
### Configuration Changes
- Added `dynamic_range_predicate_custom` for incremental processing
- Added `modified_timestamp::DATE` to clustering keys
- Added search optimization on key fields (tx_hash, receipt_id, contract_address, from_address, to_address)
- Changed surrogate key to use receipt_id instead of tx_hash for better uniqueness
### Query Changes
- Simplified log parsing by using `clean_log` directly from logs table
- Added direct filtering for ft_transfer events in logs CTE
- Maintained same business logic for amount validation and address extraction
- Enhanced event indexing to properly track multiple events within a transaction
### Testing Changes
- Added comprehensive column-level tests:
* Type validation for all fields
* Regex validation for NEAR addresses
* Amount validation to ensure positive values
* Not-null constraints where appropriate
- Added unique combination test across key fields
## silver__token_transfer_liquidity
⚠️ **REQUIRES FULL REFRESH** - Fixed from_address handling to to use predecessor_id
### Major Changes
- Model has been refactored to use `silver__logs_s3` directly instead of `token_transfer_base`
- Improved log handling by directly filtering for 'Liquidity added' events
- Fixed from_address to properly set NULL values for liquidity additions
- Maintains same business logic for capturing liquidity addition events
### Architecture Changes
- Removed dependency on deprecated `silver__token_transfer_base`
- Now sources data directly from `silver__logs_s3`
- Added upfront filtering for liquidity logs with log_index = 0
- Improved event indexing using log_index
### Column Changes
- Added `receipt_id` for better transaction tracking
- Added `predecessor_id` and `signer_id` from logs table
- Renamed `rn` to `event_index` for clarity
- Changed `contract_address` to come directly from regex on log value
- Removed `action_id` (replaced by receipt_id)
- Fixed `from_address` to consistently use NULL for liquidity additions
### Configuration Changes
- Added `dynamic_range_predicate_custom` for incremental processing
- Added `modified_timestamp::DATE` to clustering keys
- Added search optimization on key fields (tx_hash, receipt_id, contract_address, to_address)
- Changed surrogate key to use receipt_id instead of tx_hash for better uniqueness
### Query Changes
- Simplified log parsing by using `clean_log` directly from logs table
- Added direct filtering for liquidity events in logs CTE
- Improved log value extraction by removing unnecessary array access
- Enhanced event indexing to properly track multiple events within a transaction
- Maintained same regex pattern for extracting contract address and amount
### Testing Changes
- Added comprehensive column-level tests:
* Type validation for all fields
* Regex validation for NEAR addresses
* Amount validation to ensure positive values
* Not-null constraints where appropriate
* Value set validation for memo field
## silver__logs_s3
### Major Changes
- **REQUIRES MANUAL MIGRATION** - Column `receipt_object_id` is being removed in favor of just `receipt_id`
- Migration steps:
1. Verify no null values in receipt_id: `SELECT COUNT(*) FROM silver__logs_s3 where receipt_id is null;`
2. Execute ALTER TABLE command: `ALTER TABLE silver__logs_s3 RENAME COLUMN receipt_object_id TO receipt_id;`
### Architecture Changes
- Simplified column naming by removing redundant `receipt_object_id` column
- All references to `receipt_object_id` in downstream models already use `receipt_id` or alias it to `receipt_id`
### Column Changes
#### Removed
- `receipt_object_id` - Redundant column, functionality preserved through `receipt_id`
### Configuration Changes
- Updated search optimization to use `receipt_id` instead of `receipt_object_id`
- No changes to unique key or clustering configuration
### Query Changes
- No changes to core query logic
- Column renaming handled through ALTER TABLE command
- All downstream models already compatible with `receipt_id`
### Downstream Impact
- `defi__fact_intents.sql` - Currently aliases `receipt_object_id` to `receipt_id`, will continue to work after migration
- All other models using `silver__logs_s3` already reference `receipt_id` directly
- No changes needed in downstream models as they are already aligned with the new structure
### silver__token_transfer_mints
**Major Changes**
- Refactored model to use `silver__logs_s3` directly instead of `token_transfer_base`
- Improved handling of mint events by directly filtering for 'ft_mint' log type
- Added `event_index` to maintain event order within transactions
- Full refresh required (FR) due to improved log handling and event ordering
**Architecture**
- Data sourcing split into two main CTEs:
- `ft_mint_logs`: Filters relevant mint events from `silver__logs_s3`
- `ft_mints_final`: Processes and formats mint events with proper field extraction
**Column Changes**
- Added:
- `event_index`: Tracks event order within transactions
- `receipt_id`: Links to originating receipt
- Modified:
- `mint_id`: Now generated using consistent surrogate key pattern
- All timestamp fields now use TIMESTAMP_NTZ type
**Configuration Changes**
- Updated `incremental_predicates` to use dynamic range predicate
- Added `modified_timestamp::DATE` to clustering keys
- Maintained `merge` strategy for incremental processing
- Optimized merge exclude columns for better performance
**Query Changes**
- Enhanced log filtering to ensure only successful mint events are processed
- Improved amount validation to ensure positive values only
- Added proper handling of NEAR account formats in address fields
- Optimized field extraction from EVENT_JSON
**Testing**
- Added comprehensive column tests including:
- Data type validations
- Not null constraints where appropriate
- NEAR account format validation for addresses
- Positive amount validation
- Uniqueness check for mint_id
- Enhanced model description to better reflect its purpose and data sources
### silver__token_transfers_complete
`2024-03-XX`
### 🏗️ Architecture Changes
- Improved incremental logic to prevent data gaps:
- Added execute block to calculate minimum block date and maximum modified timestamp
- Split filtering strategy: block_timestamp at CTE level, modified_timestamp after UNION ALL
- Added final CTE for cleaner deduplication with QUALIFY clause
- Updated column names to align with new standards:
- Replaced `action_id` with `receipt_id`
- Replaced `rn` with `event_index`
- Added comprehensive tests in YAML file for all columns
- Added type validation tests for timestamps and numeric fields
- Added value validation for transfer_type field
### 🔄 Query Changes
- Optimized incremental processing by:
- Using date-based filtering at source CTEs
- Applying modified_timestamp filter after UNION ALL
- Moving QUALIFY clause to final SELECT
- Standardized WHERE clause structure across all CTEs
- Added explicit column list in final_transfers CTE
### 🎯 Functionality Changes
- Maintained all existing transfer types and sources
- Enhanced surrogate key generation using new column names
- Added search optimization on key columns including receipt_id
### silver__token_transfer_orders
**Major Changes**
- Refactored model to use `silver__logs_s3` directly instead of `token_transfer_base`
- Improved handling of order events by directly filtering for 'order_added' log type
- Added `event_index` to maintain event order within transactions
- Full refresh required (FR) due to improved log handling and event ordering
**Architecture**
- Data sourcing split into two main CTEs:
- `order_logs`: Filters relevant order events from `silver__logs_s3`
- `orders_final`: Processes and formats order events with proper field extraction
**Column Changes**
- Added:
- `event_index`: Tracks event order within transactions
- `receipt_id`: Links to originating receipt
- `predecessor_id`: Links to transaction originator
- `signer_id`: Links to transaction signer
- Modified:
- `transfers_orders_id`: Now generated using consistent surrogate key pattern
- All timestamp fields now use TIMESTAMP_NTZ type
- Removed:
- `action_id`: Replaced by receipt_id and event_index
- `_inserted_timestamp`: Deprecated column
**Configuration Changes**
- Updated `incremental_predicates` to use dynamic range predicate
- Added `modified_timestamp::DATE` to clustering keys
- Maintained `merge` strategy for incremental processing
- Added search optimization on key fields (tx_hash, receipt_id, contract_address, from_address, to_address)
**Query Changes**
- Enhanced log filtering to ensure only successful order events are processed
- Improved amount validation to ensure positive values only
- Added proper handling of NEAR account formats in address fields
- Optimized field extraction from EVENT_JSON
- Simplified log parsing by using clean_log directly
**Testing**
- Added comprehensive column tests including:
- Data type validations
- Not null constraints where appropriate
- NEAR account format validation for addresses
- Positive amount validation
- Value set validation for memo field
- Uniqueness check for transfers_orders_id
- Enhanced model description to better reflect its purpose and data sources
### silver__token_transfer_ft_transfers_method
**Major Changes**
- Refactored model to use both `core__ez_actions` and `silver__logs_s3` directly
- Improved handling of ft_transfer method calls by properly joining actions with their logs
- Added `event_index` to maintain action and log order within transactions
- Changed from flattening logs array to proper join with logs table
**Architecture**
- Split data sourcing into three CTEs:
1. `ft_transfer_actions`: Filters relevant ft_transfer actions from `core__ez_actions`
2. `ft_transfer_logs`: Joins actions with their logs from `silver__logs_s3`
3. `ft_transfers_final`: Processes and formats transfer events with proper field extraction
- Improved log handling by using clean_log field directly
- Maintained regex-based field extraction from logs
**Column Changes**
- Added:
- `event_index`: Tracks combined action and log order within transactions
- `receipt_id`: Links to originating receipt
- `predecessor_id`: Links to transaction originator
- `signer_id`: Links to transaction signer
- Modified:
- `transfers_id`: Now generated using consistent surrogate key pattern
- `from_address`: Now extracted from clean_log using regex
- `to_address`: Now extracted from clean_log using regex
- `amount_unadj`: Now extracted from clean_log using regex
- All timestamp fields now use TIMESTAMP_NTZ type
- Removed:
- `action_id`: Replaced by receipt_id and event_index
- `_inserted_timestamp`: Deprecated column
**Configuration Changes**
- Updated `incremental_predicates` to use dynamic range predicate
- Added `modified_timestamp::DATE` to clustering keys
- Changed from 'delete+insert' to 'merge' strategy for incremental processing
- Added search optimization on key fields (tx_hash, receipt_id, contract_address, from_address, to_address)
**Query Changes**
- Enhanced filtering to ensure only successful ft_transfer calls are processed
- Improved log handling by properly joining actions with their logs
- Added proper handling of NEAR account formats in address fields
- Maintained regex-based field extraction for consistency
- Added validation for required log fields (from, to, amount)
**Testing**
- Added comprehensive column tests including:
- Data type validations
- Not null constraints where appropriate
- NEAR account format validation for addresses
- Positive amount validation
- Uniqueness check for transfers_id
- Enhanced model description to better reflect its purpose and data sources
---

View File

@ -0,0 +1,5 @@
{% docs event_index %}
The sequential index of an event within a transaction, combining the log index with any sub-indices from array processing. Used to maintain the correct order of events when multiple events are emitted in a single transaction.
{% enddocs %}

View File

@ -66,7 +66,7 @@ SELECT
block_id,
block_timestamp,
tx_hash,
action_id,
receipt_id AS action_id,
contract_address,
from_address,
to_address,

View File

@ -68,7 +68,7 @@ logs_base AS(
block_timestamp,
block_id,
tx_hash,
receipt_object_id AS receipt_id,
receipt_id,
log_index,
receiver_id,
predecessor_id,

View File

@ -5,7 +5,7 @@
cluster_by = ["block_timestamp::DATE","modified_timestamp::DATE"],
unique_key = "log_id",
incremental_strategy = "merge",
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(tx_hash,receipt_object_id,receiver_id,predecessor_id,signer_id);",
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(tx_hash,receipt_id);",
tags = ['curated', 'scheduled_core']
) }}
@ -74,7 +74,6 @@ SELECT
block_timestamp,
block_id,
tx_hash,
receipt_id AS receipt_object_id, -- maintain for cutover
receipt_id,
log_id,
log_index,

View File

@ -2,69 +2,114 @@ version: 2
models:
- name: silver__lockup_actions
description: |-
This table records all disbursements by the contract lockup.near.
TODOs - drop _inserted_timestamp, _modified_timestamp. Rename receipt_object_ids to receipt_ids.
description: >
This model extracts data from core__ez_actions for NEAR Protocol lockup contract actions.
It captures lockup contract creation events and associated parameters like lockup duration,
vesting schedules, and release durations. The model processes actions with method names
'on_lockup_create', 'create', and 'new' from lockup.near contracts.
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- tx_hash
- action_index
columns:
- name: TX_HASH
description: "{{ doc('tx_hash')}}"
tests:
- not_null:
where: _inserted_timestamp <= current_timestamp - interval '1 hour'
- name: LOCKUP_ACTIONS_ID
description: Unique identifier for each lockup action, generated from tx_hash and action_index
tests:
- not_null
- unique
- name: BLOCK_TIMESTAMP
description: "{{ doc('block_timestamp')}}"
description: The timestamp of the block when the lockup action occurred
tests:
- not_null:
where: _inserted_timestamp <= current_timestamp - interval '1 hour'
- not_null
- dbt_expectations.expect_column_values_to_be_of_type:
column_type: TIMESTAMP_NTZ
- name: BLOCK_ID
description: "{{ doc('block_id')}}"
- name: TX_HASH
description: The transaction hash of the lockup action
tests:
- not_null
- name: DEPOSIT
description: "{{ doc('deposit')}}"
- name: RECEIPT_ID
description: The receipt ID associated with the lockup action
tests:
- not_null
- name: ACTION_INDEX
description: The index of the action within the transaction
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_of_type:
column_type: NUMBER
- name: LOCKUP_ACCOUNT_ID
description: "{{ doc('lockup_account_id')}}"
description: The account ID of the lockup contract
tests:
- not_null
- dbt_expectations.expect_column_values_to_match_regex:
regex: .*\.lockup\.near$
- name: OWNER_ACCOUNT_ID
description: "{{ doc('owner_account_id')}}"
description: The account ID of the lockup contract owner
tests:
- not_null
- name: DEPOSIT
description: The amount of NEAR tokens deposited into the lockup contract
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_of_type:
column_type: FLOAT
- name: LOCKUP_DURATION
description: "{{ doc('lockup_duration')}}"
description: The duration of the lockup period in nanoseconds
tests:
- dbt_expectations.expect_column_values_to_be_of_type:
column_type: STRING
- name: LOCKUP_TIMESTAMP
description: "{{ doc('lockup_timestamp')}}"
description: The timestamp when the lockup period starts
tests:
- dbt_expectations.expect_column_values_to_be_of_type:
column_type: STRING
- name: LOCKUP_TIMESTAMP_NTZ
description: "{{ doc('lockup_timestamp_ntz')}}"
description: The lockup timestamp converted to TIMESTAMP_NTZ format
tests:
- dbt_expectations.expect_column_values_to_be_of_type:
column_type: TIMESTAMP_NTZ
- name: RELEASE_DURATION
description: "{{ doc('release_duration')}}"
description: The duration over which tokens are released after the lockup period
tests:
- dbt_expectations.expect_column_values_to_be_of_type:
column_type: STRING
- name: VESTING_SCHEDULE
description: "{{ doc('vesting_schedule')}}"
description: The vesting schedule configuration for the lockup contract
tests:
- dbt_expectations.expect_column_values_to_be_of_type:
column_type: STRING
- name: TRANSFERS_INFORMATION
description: "{{ doc('transfers_information')}}"
- name: _PARTITION_BY_BLOCK_NUMBER
description: "{{ doc('_partition_by_block_number')}}"
- name: _INSERTED_TIMESTAMP
description: "{{ doc('_inserted_timestamp')}}"
- name: LOCKUP_ACTIONS_ID
description: "{{doc('id')}}"
- name: INSERTED_TIMESTAMP
description: "{{doc('inserted_timestamp')}}"
description: Additional information about token transfer restrictions
tests:
- dbt_expectations.expect_column_values_to_be_of_type:
column_type: STRING
- name: MODIFIED_TIMESTAMP
description: "{{doc('modified_timestamp')}}"
description: Timestamp when the record was last modified
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_of_type:
column_type: TIMESTAMP_NTZ
- name: _INVOCATION_ID
description: "{{doc('invocation_id')}}"
- name: INSERTED_TIMESTAMP
description: Timestamp when the record was first inserted
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_of_type:
column_type: TIMESTAMP_NTZ

View File

@ -1,57 +0,0 @@
{{ config(
materialized = 'incremental',
incremental_predicates = ["dynamic_range_predicate_custom","block_timestamp::date"],
merge_exclude_columns = ["inserted_timestamp"],
cluster_by = ['block_timestamp::DATE','modified_timestamp::Date'],
unique_key = 'transfers_base_id',
incremental_strategy = 'merge',
tags = ['curated','scheduled_non_core']
) }}
WITH nep141 AS (
SELECT
block_id,
block_timestamp,
tx_hash,
action_id,
predecessor_id,
signer_id,
receiver_id,
action_name,
method_name,
deposit,
logs,
receipt_succeeded,
_inserted_timestamp,
_partition_by_block_number
FROM
{{ ref('silver__actions_events_function_call_s3') }}
WHERE
receipt_succeeded = TRUE
AND logs [0] IS NOT NULL
AND RIGHT(
action_id,
2
) = '-0'
{% if var("MANUAL_FIX") %}
AND {{ partition_load_manual('no_buffer') }}
{% elif is_incremental() %}
AND modified_timestamp >= (
SELECT
MAX(modified_timestamp)
FROM
{{ this }}
)
{% endif %}
)
SELECT
*,
{{ dbt_utils.generate_surrogate_key(
['action_id']
) }} AS transfers_base_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
nep141

View File

@ -1,70 +1,57 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'merge',
merge_exclude_columns = ["inserted_timestamp"],
cluster_by = ['block_timestamp::DATE','modified_timestamp::Date'],
incremental_predicates = ["dynamic_range_predicate_custom","block_timestamp::date"],
cluster_by = ['block_timestamp::DATE', 'modified_timestamp::DATE'],
unique_key = 'transfers_event_id',
incremental_strategy = 'delete+insert',
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(tx_hash,receipt_id,contract_address,from_address,to_address);",
tags = ['curated','scheduled_non_core']
) }}
WITH actions_events AS (
WITH ft_transfer_logs AS (
SELECT
block_id,
block_timestamp,
tx_hash,
action_id,
signer_id,
receiver_id,
action_name,
method_name,
deposit,
logs,
receipt_succeeded,
_inserted_timestamp,
_partition_by_block_number
FROM
{{ ref('silver__token_transfer_base') }}
{% if var("MANUAL_FIX") %}
WHERE {{ partition_load_manual('no_buffer') }}
{% elif is_incremental() %}
WHERE modified_timestamp >= (
SELECT
MAX(modified_timestamp)
FROM
{{ this }}
)
{% endif %}
),
ft_transfers_event AS (
SELECT
block_id,
block_timestamp,
tx_hash,
action_id,
TRY_PARSE_JSON(REPLACE(VALUE, 'EVENT_JSON:')) AS DATA,
b.index AS logs_rn,
receipt_id,
receiver_id AS contract_address,
_inserted_timestamp,
predecessor_id,
signer_id,
log_index,
try_parse_json(clean_log) AS log_data,
receipt_succeeded,
_partition_by_block_number
FROM
actions_events
JOIN LATERAL FLATTEN(
input => logs
) b
WHERE
DATA :event :: STRING IN (
'ft_transfer'
FROM
{{ ref('silver__logs_s3') }}
WHERE
receipt_succeeded
AND is_standard -- Only look at EVENT_JSON formatted logs
AND try_parse_json(clean_log) :event :: STRING = 'ft_transfer'
{% if var("MANUAL_FIX") %}
AND {{ partition_load_manual('no_buffer') }}
{% else %}
{% if is_incremental() %}
AND modified_timestamp >= (
SELECT
MAX(modified_timestamp)
FROM
{{ this }}
)
{% endif %}
{% endif %}
),
ft_transfers_final AS (
SELECT
block_id,
block_timestamp,
tx_hash,
action_id,
receipt_id,
contract_address,
predecessor_id,
signer_id,
log_index,
NVL(
f.value :old_owner_id,
NULL
@ -75,24 +62,35 @@ ft_transfers_final AS (
) :: STRING AS to_address,
f.value :amount :: variant AS amount_unadj,
f.value :memo :: STRING AS memo,
logs_rn + f.index AS rn,
_inserted_timestamp,
log_index + f.index AS event_index,
_partition_by_block_number
FROM
ft_transfers_event
JOIN LATERAL FLATTEN(
input => DATA :data
ft_transfer_logs,
LATERAL FLATTEN(
input => log_data :data
) f
WHERE
amount_unadj > 0
)
SELECT
*,
{{ dbt_utils.generate_surrogate_key(
['tx_hash', 'action_id','contract_address','amount_unadj','from_address','to_address','memo','rn']
) }} AS transfers_event_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
block_timestamp,
block_id,
tx_hash,
receipt_id,
contract_address,
predecessor_id,
signer_id,
from_address,
to_address,
amount_unadj,
memo,
event_index,
_partition_by_block_number,
{{ dbt_utils.generate_surrogate_key(
['receipt_id', 'contract_address', 'amount_unadj', 'from_address', 'to_address', 'event_index']
) }} AS transfers_event_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
ft_transfers_final

View File

@ -2,12 +2,107 @@ version: 2
models:
- name: silver__token_transfer_ft_transfers_event
description: |-
This table records all the FT transfers by event.
description: >
This model captures fungible token (FT) transfer events from the NEAR blockchain.
It processes standard EVENT_JSON formatted logs from token contracts that emit
'ft_transfer' events.
columns:
- name: BLOCK_TIMESTAMP
description: "{{doc('block_timestamp')}}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_of_type:
column_type: TIMESTAMP_NTZ
- name: BLOCK_ID
description: "{{doc('block_id')}}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_of_type:
column_type: NUMBER
- name: TX_HASH
description: "{{doc('tx_hash')}}"
tests:
- not_null
- name: RECEIPT_ID
description: "{{doc('receipt_id')}}"
tests:
- not_null
- name: CONTRACT_ADDRESS
description: "{{doc('contract_address')}}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_match_regex:
regex: .*\.near$
- name: PREDECESSOR_ID
description: "{{doc('predecessor_id')}}"
tests:
- not_null
- name: SIGNER_ID
description: "{{doc('signer_id')}}"
tests:
- not_null
- name: FROM_ADDRESS
description: "{{doc('from_address')}}"
tests:
- dbt_expectations.expect_column_values_to_match_regex:
regex: .*\.near$
- name: TO_ADDRESS
description: "{{doc('to_address')}}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_match_regex:
regex: .*\.near$
- name: AMOUNT_UNADJ
description: "{{doc('amount_unadj')}}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_of_type:
column_type: FLOAT
- dbt_expectations.expect_column_values_to_be_between:
min_value: 0
- name: MEMO
description: "{{doc('memo')}}"
tests:
- dbt_expectations.expect_column_values_to_be_of_type:
column_type: STRING
- name: EVENT_INDEX
description: "{{doc('event_index')}}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_of_type:
column_type: NUMBER
- name: _PARTITION_BY_BLOCK_NUMBER
description: "{{doc('_partition_by_block_number')}}"
- name: TRANSFERS_EVENT_ID
description: "{{doc('id')}}"
tests:
- not_null
- unique
- name: INSERTED_TIMESTAMP
description: "{{doc('inserted_timestamp')}}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_of_type:
column_type: TIMESTAMP_NTZ
- name: MODIFIED_TIMESTAMP
description: "{{doc('modified_timestamp')}}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_of_type:
column_type: TIMESTAMP_NTZ

View File

@ -1,50 +1,78 @@
{{ config(
materialized = 'incremental',
merge_exclude_columns = ["inserted_timestamp"],
cluster_by = ['block_timestamp::DATE','modified_timestamp::Date'],
incremental_predicates = ["dynamic_range_predicate_custom","block_timestamp::date"],
cluster_by = ['block_timestamp::DATE', 'modified_timestamp::DATE'],
unique_key = 'transfers_id',
incremental_strategy = 'delete+insert',
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(tx_hash,receipt_id,contract_address,from_address,to_address);",
incremental_strategy = 'merge',
tags = ['curated','scheduled_non_core']
) }}
WITH actions_events AS (
WITH ft_transfer_actions AS (
SELECT
block_id,
block_timestamp,
tx_hash,
action_id,
signer_id,
receiver_id,
action_name,
method_name,
deposit,
logs,
receipt_id,
action_index,
receipt_receiver_id AS contract_address,
receipt_predecessor_id AS predecessor_id,
receipt_signer_id AS signer_id,
receipt_succeeded,
_inserted_timestamp,
_partition_by_block_number
FROM
{{ ref('silver__token_transfer_base') }}
FROM
{{ ref('core__ez_actions') }}
WHERE
action_name = 'FunctionCall'
AND action_data :method_name :: STRING = 'ft_transfer'
AND receipt_succeeded
{% if var("MANUAL_FIX") %}
WHERE {{ partition_load_manual('no_buffer') }}
{% elif is_incremental() %}
WHERE modified_timestamp >= (
SELECT
MAX(modified_timestamp)
FROM
{{ this }}
)
AND {{ partition_load_manual('no_buffer') }}
{% else %}
{% if is_incremental() %}
AND modified_timestamp >= (
SELECT
MAX(modified_timestamp)
FROM
{{ this }}
)
{% endif %}
{% endif %}
),
ft_transfers_method AS (
ft_transfer_logs AS (
SELECT
l.block_id,
l.block_timestamp,
l.tx_hash,
l.receipt_id,
l.log_index,
l.clean_log AS log_value,
l._partition_by_block_number,
a.contract_address,
a.predecessor_id,
a.signer_id,
a.action_index
FROM
{{ ref('silver__logs_s3') }} l
INNER JOIN ft_transfer_actions a
ON l.tx_hash = a.tx_hash
AND l.receipt_id = a.receipt_id
WHERE
l.receipt_succeeded
),
ft_transfers_final AS (
SELECT
block_id,
block_timestamp,
tx_hash,
action_id,
receiver_id AS contract_address,
receipt_id,
contract_address,
predecessor_id,
signer_id,
REGEXP_SUBSTR(
VALUE,
log_value,
'from ([^ ]+)',
1,
1,
@ -52,7 +80,7 @@ ft_transfers_method AS (
1
) :: STRING AS from_address,
REGEXP_SUBSTR(
VALUE,
log_value,
'to ([^ ]+)',
1,
1,
@ -60,31 +88,38 @@ ft_transfers_method AS (
1
) :: STRING AS to_address,
REGEXP_SUBSTR(
VALUE,
log_value,
'\\d+'
) :: variant AS amount_unadj,
'' AS memo,
b.index AS rn,
_inserted_timestamp,
log_index + action_index AS event_index,
_partition_by_block_number
FROM
actions_events
JOIN LATERAL FLATTEN(
input => logs
) b
ft_transfer_logs
WHERE
method_name = 'ft_transfer'
AND from_address IS NOT NULL
from_address IS NOT NULL
AND to_address IS NOT NULL
AND amount_unadj IS NOT NULL
)
SELECT
*,
{{ dbt_utils.generate_surrogate_key(
['tx_hash', 'action_id','contract_address','amount_unadj','from_address','to_address','memo','rn']
) }} AS transfers_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
block_timestamp,
block_id,
tx_hash,
receipt_id,
contract_address,
predecessor_id,
signer_id,
from_address,
to_address,
amount_unadj,
memo,
event_index,
_partition_by_block_number,
{{ dbt_utils.generate_surrogate_key(
['receipt_id', 'contract_address', 'amount_unadj', 'from_address', 'to_address', 'event_index']
) }} AS transfers_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
ft_transfers_method
ft_transfers_final

View File

@ -2,12 +2,107 @@ version: 2
models:
- name: silver__token_transfer_ft_transfers_method
description: |-
This table records all the FT transfers by method.
description: >
This model captures fungible token (FT) transfers from the NEAR blockchain using the ft_transfer method.
It processes function calls directly from core__ez_actions where method_name = 'ft_transfer'.
columns:
- name: BLOCK_TIMESTAMP
description: "{{doc('block_timestamp')}}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_of_type:
column_type: TIMESTAMP_NTZ
- name: BLOCK_ID
description: "{{doc('block_id')}}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_of_type:
column_type: NUMBER
- name: TX_HASH
description: "{{doc('tx_hash')}}"
tests:
- not_null
- name: RECEIPT_ID
description: "{{doc('receipt_id')}}"
tests:
- not_null
- name: CONTRACT_ADDRESS
description: "{{doc('contract_address')}}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_match_regex:
regex: .*\.near$
- name: PREDECESSOR_ID
description: "{{doc('predecessor_id')}}"
tests:
- not_null
- name: SIGNER_ID
description: "{{doc('signer_id')}}"
tests:
- not_null
- name: FROM_ADDRESS
description: "{{doc('from_address')}}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_match_regex:
regex: .*\.near$
- name: TO_ADDRESS
description: "{{doc('to_address')}}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_match_regex:
regex: .*\.near$
- name: AMOUNT_UNADJ
description: "{{doc('amount_unadj')}}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_of_type:
column_type: FLOAT
- dbt_expectations.expect_column_values_to_be_between:
min_value: 0
- name: MEMO
description: "{{doc('memo')}}"
tests:
- dbt_expectations.expect_column_values_to_be_of_type:
column_type: STRING
- name: EVENT_INDEX
description: "{{doc('event_index')}}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_of_type:
column_type: NUMBER
- name: _PARTITION_BY_BLOCK_NUMBER
description: "{{doc('_partition_by_block_number')}}"
- name: TRANSFERS_ID
description: "{{doc('id')}}"
tests:
- not_null
- unique
- name: INSERTED_TIMESTAMP
description: "{{doc('inserted_timestamp')}}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_of_type:
column_type: TIMESTAMP_NTZ
- name: MODIFIED_TIMESTAMP
description: "{{doc('modified_timestamp')}}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_of_type:
column_type: TIMESTAMP_NTZ

View File

@ -1,42 +1,45 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'merge',
incremental_predicates = ["dynamic_range_predicate_custom","block_timestamp::date"],
merge_exclude_columns = ["inserted_timestamp"],
cluster_by = ['block_timestamp::DATE','modified_timestamp::Date'],
cluster_by = ['block_timestamp::DATE', 'modified_timestamp::DATE'],
unique_key = 'transfers_liquidity_id',
incremental_strategy = 'merge',
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(tx_hash,receipt_id,contract_address,to_address);",
tags = ['curated','scheduled_non_core']
) }}
WITH actions_events AS (
WITH liquidity_logs AS (
SELECT
block_id,
block_timestamp,
tx_hash,
action_id,
signer_id,
receipt_id,
receiver_id,
action_name,
method_name,
deposit,
logs,
predecessor_id,
signer_id,
log_index,
clean_log :: STRING AS log_data,
receipt_succeeded,
_inserted_timestamp,
_partition_by_block_number
FROM
{{ ref('silver__token_transfer_base') }}
FROM
{{ ref('silver__logs_s3') }}
WHERE
receipt_succeeded
AND log_index = 0 -- Liquidity logs are always first
AND clean_log :: STRING like 'Liquidity added [%minted % shares'
{% if var("MANUAL_FIX") %}
WHERE {{ partition_load_manual('no_buffer') }}
{% elif is_incremental() %}
WHERE modified_timestamp >= (
SELECT
MAX(modified_timestamp)
FROM
{{ this }}
)
AND {{ partition_load_manual('no_buffer') }}
{% else %}
{% if is_incremental() %}
AND modified_timestamp >= (
SELECT
MAX(modified_timestamp)
FROM
{{ this }}
)
{% endif %}
{% endif %}
),
add_liquidity AS (
@ -44,7 +47,7 @@ add_liquidity AS (
block_id,
block_timestamp,
tx_hash,
action_id,
receipt_id,
REGEXP_SUBSTR(
SPLIT.value,
'"\\d+ ([^"]*)["]',
@ -53,7 +56,7 @@ add_liquidity AS (
'e',
1
) :: STRING AS contract_address,
NULL AS from_address,
predecessor_id AS from_address,
receiver_id AS to_address,
REGEXP_SUBSTR(
SPLIT.value,
@ -64,30 +67,41 @@ add_liquidity AS (
1
) :: variant AS amount_unadj,
'add_liquidity' AS memo,
INDEX AS rn,
_inserted_timestamp,
log_index + INDEX AS event_index,
predecessor_id,
signer_id,
_partition_by_block_number
FROM
actions_events,
liquidity_logs,
LATERAL FLATTEN (
input => SPLIT(
REGEXP_SUBSTR(
logs [0],
log_data,
'\\["(.*?)"\\]'
),
','
)
) SPLIT
WHERE
logs [0] LIKE 'Liquidity added [%minted % shares'
)
SELECT
*,
{{ dbt_utils.generate_surrogate_key(
['tx_hash', 'action_id','contract_address','amount_unadj','from_address','to_address','memo','rn']
) }} AS transfers_liquidity_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
block_timestamp,
block_id,
tx_hash,
receipt_id,
contract_address,
from_address,
to_address,
amount_unadj,
memo,
event_index,
predecessor_id,
signer_id,
_partition_by_block_number,
{{ dbt_utils.generate_surrogate_key(
['receipt_id', 'contract_address', 'amount_unadj', 'to_address', 'event_index']
) }} AS transfers_liquidity_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
add_liquidity

View File

@ -0,0 +1,105 @@
version: 2
models:
- name: silver__token_transfer_liquidity
description: >
This model captures liquidity addition events from the NEAR blockchain.
It processes logs from liquidity pool contracts that emit 'Liquidity added' events.
columns:
- name: BLOCK_TIMESTAMP
description: "{{doc('block_timestamp')}}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_of_type:
column_type: TIMESTAMP_NTZ
- name: BLOCK_ID
description: "{{doc('block_id')}}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_of_type:
column_type: NUMBER
- name: TX_HASH
description: "{{doc('tx_hash')}}"
tests:
- not_null
- name: RECEIPT_ID
description: "{{doc('receipt_id')}}"
tests:
- not_null
- name: CONTRACT_ADDRESS
description: "{{doc('contract_address')}}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_match_regex:
regex: .*\.near$
- name: FROM_ADDRESS
description: "{{doc('from_address')}}"
- name: TO_ADDRESS
description: "{{doc('to_address')}}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_match_regex:
regex: .*\.near$
- name: AMOUNT_UNADJ
description: "{{doc('amount_unadj')}}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_of_type:
column_type: FLOAT
- dbt_expectations.expect_column_values_to_be_between:
min_value: 0
- name: MEMO
description: "{{doc('memo')}}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_set:
value_set: ['add_liquidity']
- name: EVENT_INDEX
description: "{{doc('event_index')}}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_of_type:
column_type: NUMBER
- name: PREDECESSOR_ID
description: "{{doc('predecessor_id')}}"
tests:
- not_null
- name: SIGNER_ID
description: "{{doc('signer_id')}}"
tests:
- not_null
- name: _PARTITION_BY_BLOCK_NUMBER
description: "{{doc('_partition_by_block_number')}}"
- name: TRANSFERS_LIQUIDITY_ID
description: "{{doc('id')}}"
tests:
- not_null
- unique
- name: INSERTED_TIMESTAMP
description: "{{doc('inserted_timestamp')}}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_of_type:
column_type: TIMESTAMP_NTZ
- name: MODIFIED_TIMESTAMP
description: "{{doc('modified_timestamp')}}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_of_type:
column_type: TIMESTAMP_NTZ

View File

@ -1,70 +1,56 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'merge',
incremental_predicates = ["dynamic_range_predicate_custom","block_timestamp::date"],
merge_exclude_columns = ["inserted_timestamp"],
cluster_by = ['block_timestamp::DATE','modified_timestamp::Date'],
cluster_by = ['block_timestamp::DATE', 'modified_timestamp::DATE'],
unique_key = 'mint_id',
incremental_strategy = 'delete+insert',
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(tx_hash,receipt_id,contract_address,to_address);",
tags = ['curated','scheduled_non_core']
) }}
WITH actions_events AS (
WITH ft_mint_logs AS (
SELECT
block_id,
block_timestamp,
tx_hash,
action_id,
signer_id,
receiver_id,
action_name,
method_name,
deposit,
logs,
receipt_succeeded,
_inserted_timestamp,
_partition_by_block_number
FROM
{{ ref('silver__token_transfer_base') }}
{% if var("MANUAL_FIX") %}
WHERE {{ partition_load_manual('no_buffer') }}
{% elif is_incremental() %}
WHERE modified_timestamp >= (
SELECT
MAX(modified_timestamp)
FROM
{{ this }}
)
{% endif %}
)
,
ft_mints AS (
SELECT
block_id,
block_timestamp,
tx_hash,
action_id,
TRY_PARSE_JSON(REPLACE(VALUE, 'EVENT_JSON:')) AS DATA,
b.index AS logs_rn,
receipt_id,
receiver_id AS contract_address,
_inserted_timestamp,
predecessor_id,
signer_id,
log_index,
try_parse_json(clean_log) AS log_data,
receipt_succeeded,
_partition_by_block_number
FROM
actions_events
JOIN LATERAL FLATTEN(
input => logs
) b
WHERE
DATA :event :: STRING IN (
'ft_mint'
FROM
{{ ref('silver__logs_s3') }}
WHERE
receipt_succeeded
AND is_standard -- Only look at EVENT_JSON formatted logs
AND try_parse_json(clean_log) :event :: STRING = 'ft_mint'
{% if var("MANUAL_FIX") %}
AND {{ partition_load_manual('no_buffer') }}
{% else %}
{% if is_incremental() %}
AND modified_timestamp >= (
SELECT
MAX(modified_timestamp)
FROM
{{ this }}
)
{% endif %}
{% endif %}
),
ft_mints_final AS (
SELECT
block_id,
block_timestamp,
tx_hash,
action_id,
receipt_id,
contract_address,
predecessor_id,
signer_id,
NVL(
f.value :old_owner_id,
NULL
@ -75,26 +61,35 @@ ft_mints_final AS (
) :: STRING AS to_address,
f.value :amount :: variant AS amount_unadj,
f.value :memo :: STRING AS memo,
logs_rn + f.index AS rn,
_inserted_timestamp,
log_index + f.index AS event_index,
_partition_by_block_number
FROM
ft_mints
JOIN LATERAL FLATTEN(
input => DATA :data
ft_mint_logs,
LATERAL FLATTEN(
input => log_data :data
) f
WHERE
amount_unadj > 0
)
SELECT
*,
SELECT
block_timestamp,
block_id,
tx_hash,
receipt_id,
contract_address,
from_address,
to_address,
amount_unadj,
memo,
event_index,
predecessor_id,
signer_id,
_partition_by_block_number,
{{ dbt_utils.generate_surrogate_key(
['tx_hash', 'action_id','contract_address','amount_unadj','from_address','to_address','memo','rn']
)}} AS mint_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
['receipt_id', 'contract_address', 'amount_unadj', 'to_address', 'event_index']
) }} AS mint_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
ft_mints_final
WHERE
mint_id IS NOT NULL

View File

@ -2,12 +2,104 @@ version: 2
models:
- name: silver__token_transfer_mints
description: |-
This table records all the mint actions.
description: >
This model captures fungible token (FT) mint events from the NEAR blockchain.
It processes standard EVENT_JSON formatted logs from token contracts that emit
'ft_mint' events.
columns:
- name: BLOCK_TIMESTAMP
description: "{{doc('block_timestamp')}}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_of_type:
column_type: TIMESTAMP_NTZ
- name: BLOCK_ID
description: "{{doc('block_id')}}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_of_type:
column_type: NUMBER
- name: TX_HASH
description: "{{doc('tx_hash')}}"
tests:
- not_null
- name: RECEIPT_ID
description: "{{doc('receipt_id')}}"
tests:
- not_null
- name: CONTRACT_ADDRESS
description: "{{doc('contract_address')}}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_match_regex:
regex: .*\.near$
- name: FROM_ADDRESS
description: "{{doc('from_address')}}"
- name: TO_ADDRESS
description: "{{doc('to_address')}}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_match_regex:
regex: .*\.near$
- name: AMOUNT_UNADJ
description: "{{doc('amount_unadj')}}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_of_type:
column_type: FLOAT
- dbt_expectations.expect_column_values_to_be_between:
min_value: 0
- name: MEMO
description: "{{doc('memo')}}"
tests:
- dbt_expectations.expect_column_values_to_be_of_type:
column_type: STRING
- name: EVENT_INDEX
description: "{{doc('event_index')}}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_of_type:
column_type: NUMBER
- name: PREDECESSOR_ID
description: "{{doc('predecessor_id')}}"
tests:
- not_null
- name: SIGNER_ID
description: "{{doc('signer_id')}}"
tests:
- not_null
- name: _PARTITION_BY_BLOCK_NUMBER
description: "{{doc('_partition_by_block_number')}}"
- name: MINT_ID
description: "{{doc('id')}}"
tests:
- not_null
- unique
- name: INSERTED_TIMESTAMP
description: "{{doc('inserted_timestamp')}}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_of_type:
column_type: TIMESTAMP_NTZ
- name: MODIFIED_TIMESTAMP
description: "{{doc('modified_timestamp')}}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_of_type:
column_type: TIMESTAMP_NTZ

View File

@ -2,89 +2,86 @@
materialized = 'incremental',
merge_exclude_columns = ["inserted_timestamp"],
incremental_predicates = ["dynamic_range_predicate_custom","block_timestamp::date"],
cluster_by = ['block_timestamp::DATE','modified_timestamp::Date'],
cluster_by = ['block_timestamp::DATE', 'modified_timestamp::DATE'],
unique_key = 'transfers_orders_id',
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(tx_hash,receipt_id,contract_address,from_address,to_address);",
incremental_strategy = 'merge',
tags = ['curated','scheduled_non_core']
) }}
WITH actions_events AS (
WITH order_logs AS (
SELECT
block_id,
block_timestamp,
tx_hash,
action_id,
receipt_id,
receiver_id,
predecessor_id,
signer_id,
receiver_id,
action_name,
method_name,
deposit,
logs,
log_index,
try_parse_json(clean_log) AS log_data,
receipt_succeeded,
_inserted_timestamp,
_partition_by_block_number
FROM
{{ ref('silver__token_transfer_base') }}
FROM
{{ ref('silver__logs_s3') }}
WHERE
receipt_succeeded
AND is_standard -- Only look at EVENT_JSON formatted logs
AND try_parse_json(clean_log) :event :: STRING = 'order_added'
{% if var("MANUAL_FIX") %}
WHERE {{ partition_load_manual('no_buffer') }}
{% elif is_incremental() %}
WHERE modified_timestamp >= (
SELECT
MAX(modified_timestamp)
FROM
{{ this }}
)
AND {{ partition_load_manual('no_buffer') }}
{% else %}
{% if is_incremental() %}
AND modified_timestamp >= (
SELECT
MAX(modified_timestamp)
FROM
{{ this }}
)
{% endif %}
{% endif %}
),
orders AS (
SELECT
block_id,
block_timestamp,
tx_hash,
action_id,
receiver_id,
TRY_PARSE_JSON(REPLACE(g.value, 'EVENT_JSON:')) AS DATA,
DATA :event :: STRING AS event,
g.index AS rn,
_inserted_timestamp,
_partition_by_block_number
FROM
actions_events
JOIN LATERAL FLATTEN(
input => logs
) g
WHERE
DATA :event :: STRING = 'order_added'
),
orders_final AS (
SELECT
block_id,
block_timestamp,
tx_hash,
action_id,
receipt_id,
receiver_id AS to_address,
predecessor_id,
signer_id,
log_index,
f.value :sell_token :: STRING AS contract_address,
f.value :owner_id :: STRING AS from_address,
receiver_id :: STRING AS to_address,
(
f.value :original_amount
) :: variant AS amount_unadj,
f.value :original_amount :: variant AS amount_unadj,
'order' AS memo,
f.index AS rn,
_inserted_timestamp,
log_index + f.index AS event_index,
_partition_by_block_number
FROM
orders
JOIN LATERAL FLATTEN(
input => DATA :data
order_logs,
LATERAL FLATTEN(
input => log_data :data
) f
WHERE
amount_unadj > 0
)
SELECT
*,
SELECT
block_timestamp,
block_id,
tx_hash,
receipt_id,
contract_address,
predecessor_id,
signer_id,
from_address,
to_address,
amount_unadj,
memo,
event_index,
_partition_by_block_number,
{{ dbt_utils.generate_surrogate_key(
['tx_hash', 'action_id','contract_address','amount_unadj','from_address','to_address','memo','rn']
['receipt_id', 'contract_address', 'amount_unadj', 'from_address', 'to_address', 'event_index']
) }} AS transfers_orders_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,

View File

@ -0,0 +1,110 @@
version: 2
models:
- name: silver__token_transfer_orders
description: >
This model captures order addition events from the NEAR blockchain.
It processes standard EVENT_JSON formatted logs from contracts that emit
'order_added' events.
columns:
- name: BLOCK_TIMESTAMP
description: "{{doc('block_timestamp')}}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_of_type:
column_type: TIMESTAMP_NTZ
- name: BLOCK_ID
description: "{{doc('block_id')}}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_of_type:
column_type: NUMBER
- name: TX_HASH
description: "{{doc('tx_hash')}}"
tests:
- not_null
- name: RECEIPT_ID
description: "{{doc('receipt_id')}}"
tests:
- not_null
- name: CONTRACT_ADDRESS
description: "{{doc('contract_address')}}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_match_regex:
regex: .*\.near$
- name: PREDECESSOR_ID
description: "{{doc('predecessor_id')}}"
tests:
- not_null
- name: SIGNER_ID
description: "{{doc('signer_id')}}"
tests:
- not_null
- name: FROM_ADDRESS
description: "{{doc('from_address')}}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_match_regex:
regex: .*\.near$
- name: TO_ADDRESS
description: "{{doc('to_address')}}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_match_regex:
regex: .*\.near$
- name: AMOUNT_UNADJ
description: "{{doc('amount_unadj')}}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_of_type:
column_type: FLOAT
- dbt_expectations.expect_column_values_to_be_between:
min_value: 0
- name: MEMO
description: "{{doc('memo')}}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_set:
value_set: ['order']
- name: EVENT_INDEX
description: "{{doc('event_index')}}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_of_type:
column_type: NUMBER
- name: _PARTITION_BY_BLOCK_NUMBER
description: "{{doc('_partition_by_block_number')}}"
- name: TRANSFERS_ORDERS_ID
description: "{{doc('id')}}"
tests:
- not_null
- unique
- name: INSERTED_TIMESTAMP
description: "{{doc('inserted_timestamp')}}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_of_type:
column_type: TIMESTAMP_NTZ
- name: MODIFIED_TIMESTAMP
description: "{{doc('modified_timestamp')}}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_of_type:
column_type: TIMESTAMP_NTZ

View File

@ -1,107 +1,127 @@
{{ config(
materialized = 'incremental',
cluster_by = ['block_timestamp::DATE','modified_timestamp::Date'],
cluster_by = ['block_timestamp::DATE','modified_timestamp::DATE'],
unique_key = 'transfers_complete_id',
incremental_strategy = 'merge',
incremental_predicates = ["dynamic_range_predicate_custom","block_timestamp::date"],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(tx_hash,receipt_id,contract_address,from_address,to_address);",
tags = ['curated','scheduled_non_core']
) }}
WITH native_transfers AS (
{% if execute %}
{% if is_incremental() and not var("MANUAL_FIX") %}
{% set max_mod_query %}
SELECT MAX(modified_timestamp) modified_timestamp
FROM {{ this }}
{% endset %}
{% set max_mod = run_query(max_mod_query)[0][0] %}
{% if not max_mod or max_mod == 'None' %}
{% set max_mod = '2099-01-01' %}
{% endif %}
{% do log("max_mod: " ~ max_mod, info=True) %}
{% set min_block_date_query %}
SELECT MIN(block_timestamp::DATE)
FROM (
SELECT MIN(block_timestamp) block_timestamp FROM {{ ref('silver__token_transfer_native') }} WHERE modified_timestamp >= '{{max_mod}}'
UNION ALL
SELECT MIN(block_timestamp) block_timestamp FROM {{ ref('silver__token_transfer_deposit') }} WHERE modified_timestamp >= '{{max_mod}}'
UNION ALL
SELECT MIN(block_timestamp) block_timestamp FROM {{ ref('silver__token_transfer_ft_transfers_method') }} WHERE modified_timestamp >= '{{max_mod}}'
UNION ALL
SELECT MIN(block_timestamp) block_timestamp FROM {{ ref('silver__token_transfer_ft_transfers_event') }} WHERE modified_timestamp >= '{{max_mod}}'
UNION ALL
SELECT MIN(block_timestamp) block_timestamp FROM {{ ref('silver__token_transfer_mints') }} WHERE modified_timestamp >= '{{max_mod}}'
UNION ALL
SELECT MIN(block_timestamp) block_timestamp FROM {{ ref('silver__token_transfer_orders') }} WHERE modified_timestamp >= '{{max_mod}}'
UNION ALL
SELECT MIN(block_timestamp) block_timestamp FROM {{ ref('silver__token_transfer_liquidity') }} WHERE modified_timestamp >= '{{max_mod}}'
)
{% endset %}
{% set min_bd = run_query(min_block_date_query)[0][0] %}
{% if not min_bd or min_bd == 'None' %}
{% set min_bd = '2099-01-01' %}
{% endif %}
{% do log("min_bd: " ~ min_bd, info=True) %}
{% endif %}
{% endif %}
WITH native_transfers AS (
SELECT
block_id,
block_timestamp,
tx_hash,
action_id,
'0' AS rn,
receipt_id,
action_index AS event_index,
'wrap.near' AS contract_address,
predecessor_id AS from_address,
receiver_id AS to_address,
NULL AS memo,
amount_unadj,
'native' AS transfer_type,
_inserted_timestamp,
_partition_by_block_number
_partition_by_block_number,
modified_timestamp
FROM
{{ ref('silver__token_transfer_native') }}
WHERE
receipt_succeeded
{% if var("MANUAL_FIX") %}
AND {{ partition_load_manual('no_buffer') }}
AND {{ partition_load_manual('no_buffer') }}
{% elif is_incremental() %}
AND modified_timestamp >= (
SELECT
MAX(modified_timestamp)
FROM
{{ this }}
)
{% endif %}
AND block_timestamp::DATE >= '{{min_bd}}'
{% endif %}
),
native_deposits AS (
SELECT
block_id,
block_timestamp,
tx_hash,
action_id,
'0' AS rn,
receipt_id,
action_index AS event_index,
'wrap.near' AS contract_address,
predecessor_id AS from_address,
receiver_id AS to_address,
NULL AS memo,
amount_unadj,
'native' AS transfer_type,
_inserted_timestamp,
_partition_by_block_number
_partition_by_block_number,
modified_timestamp
FROM
{{ ref('silver__token_transfer_deposit') }}
WHERE
receipt_succeeded
{% if var("MANUAL_FIX") %}
AND {{ partition_load_manual('no_buffer') }}
AND {{ partition_load_manual('no_buffer') }}
{% elif is_incremental() %}
AND modified_timestamp >= (
SELECT
MAX(modified_timestamp)
FROM
{{ this }}
)
{% endif %}
AND block_timestamp::DATE >= '{{min_bd}}'
{% endif %}
),
ft_transfers_method AS (
SELECT
block_id,
block_timestamp,
tx_hash,
action_id,
rn,
receipt_id,
event_index,
contract_address,
from_address,
to_address,
memo,
amount_unadj,
'nep141' AS transfer_type,
_inserted_timestamp,
_partition_by_block_number
_partition_by_block_number,
modified_timestamp
FROM
{{ ref('silver__token_transfer_ft_transfers_method') }}
WHERE 1=1
{% if var("MANUAL_FIX") %}
WHERE
{{ partition_load_manual('no_buffer') }}
{% elif is_incremental() %}
WHERE
modified_timestamp >= (
SELECT
MAX(modified_timestamp)
FROM
{{ this }}
)
AND {{ partition_load_manual('no_buffer') }}
{% elif is_incremental() %}
AND block_timestamp::DATE >= '{{min_bd}}'
{% endif %}
),
ft_transfers_event AS (
@ -109,31 +129,23 @@ ft_transfers_event AS (
block_id,
block_timestamp,
tx_hash,
action_id,
rn,
receipt_id,
event_index,
contract_address,
from_address,
to_address,
memo,
amount_unadj,
'nep141' AS transfer_type,
_inserted_timestamp,
_partition_by_block_number
_partition_by_block_number,
modified_timestamp
FROM
{{ ref('silver__token_transfer_ft_transfers_event') }}
WHERE 1=1
{% if var("MANUAL_FIX") %}
WHERE
{{ partition_load_manual('no_buffer') }}
{% elif is_incremental() %}
WHERE
modified_timestamp >= (
SELECT
MAX(modified_timestamp)
FROM
{{ this }}
)
AND {{ partition_load_manual('no_buffer') }}
{% elif is_incremental() %}
AND block_timestamp::DATE >= '{{min_bd}}'
{% endif %}
),
mints AS (
@ -141,31 +153,23 @@ mints AS (
block_id,
block_timestamp,
tx_hash,
action_id,
rn,
receipt_id,
event_index,
contract_address,
from_address,
to_address,
memo,
amount_unadj,
'nep141' AS transfer_type,
_inserted_timestamp,
_partition_by_block_number
_partition_by_block_number,
modified_timestamp
FROM
{{ ref('silver__token_transfer_mints') }}
WHERE 1=1
{% if var("MANUAL_FIX") %}
WHERE
{{ partition_load_manual('no_buffer') }}
{% elif is_incremental() %}
WHERE
modified_timestamp >= (
SELECT
MAX(modified_timestamp)
FROM
{{ this }}
)
AND {{ partition_load_manual('no_buffer') }}
{% elif is_incremental() %}
AND block_timestamp::DATE >= '{{min_bd}}'
{% endif %}
),
orders AS (
@ -173,31 +177,23 @@ orders AS (
block_id,
block_timestamp,
tx_hash,
action_id,
rn,
receipt_id,
event_index,
contract_address,
from_address,
to_address,
memo,
amount_unadj,
'nep141' AS transfer_type,
_inserted_timestamp,
_partition_by_block_number
_partition_by_block_number,
modified_timestamp
FROM
{{ ref('silver__token_transfer_orders') }}
WHERE 1=1
{% if var("MANUAL_FIX") %}
WHERE
{{ partition_load_manual('no_buffer') }}
{% elif is_incremental() %}
WHERE
modified_timestamp >= (
SELECT
MAX(modified_timestamp)
FROM
{{ this }}
)
AND {{ partition_load_manual('no_buffer') }}
{% elif is_incremental() %}
AND block_timestamp::DATE >= '{{min_bd}}'
{% endif %}
),
liquidity AS (
@ -205,90 +201,67 @@ liquidity AS (
block_id,
block_timestamp,
tx_hash,
action_id,
rn,
receipt_id,
event_index,
contract_address,
from_address,
to_address,
memo,
amount_unadj,
'nep141' AS transfer_type,
_inserted_timestamp,
_partition_by_block_number
_partition_by_block_number,
modified_timestamp
FROM
{{ ref('silver__token_transfer_liquidity') }}
WHERE 1=1
{% if var("MANUAL_FIX") %}
WHERE
{{ partition_load_manual('no_buffer') }}
{% elif is_incremental() %}
WHERE
modified_timestamp >= (
SELECT
MAX(modified_timestamp)
FROM
{{ this }}
)
AND {{ partition_load_manual('no_buffer') }}
{% elif is_incremental() %}
AND block_timestamp::DATE >= '{{min_bd}}'
{% endif %}
),
FINAL AS (
SELECT
*
FROM
native_transfers
all_transfers AS (
SELECT * FROM native_transfers
UNION ALL
SELECT
*
FROM
native_deposits
SELECT * FROM native_deposits
UNION ALL
SELECT
*
FROM
ft_transfers_method
SELECT * FROM ft_transfers_method
UNION ALL
SELECT
*
FROM
ft_transfers_event
SELECT * FROM ft_transfers_event
UNION ALL
SELECT
*
FROM
mints
SELECT * FROM mints
UNION ALL
SELECT
*
FROM
orders
SELECT * FROM orders
UNION ALL
SELECT
*
FROM
liquidity
)
SELECT
block_id,
block_timestamp,
tx_hash,
action_id,
rn,
contract_address,
from_address,
to_address,
memo,
amount_unadj,
transfer_type,
_inserted_timestamp,
_partition_by_block_number,
{{ dbt_utils.generate_surrogate_key(
['action_id','contract_address','amount_unadj','from_address','to_address','rn']
) }} AS transfers_complete_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
FINAL
SELECT * FROM liquidity
qualify(row_number() over (partition by transfers_complete_id order by modified_timestamp desc)) = 1
),
final_transfers AS (
SELECT
block_timestamp,
block_id,
tx_hash,
receipt_id,
event_index,
contract_address,
from_address,
to_address,
memo,
amount_unadj,
transfer_type,
_partition_by_block_number,
{{ dbt_utils.generate_surrogate_key(
['receipt_id', 'contract_address', 'amount_unadj', 'from_address', 'to_address', 'event_index']
) }} AS transfers_complete_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
all_transfers
{% if is_incremental() and not var("MANUAL_FIX") %}
WHERE modified_timestamp >= '{{max_mod}}'
{% endif %}
)
SELECT *
FROM final_transfers
QUALIFY(ROW_NUMBER() OVER (PARTITION BY transfers_complete_id ORDER BY modified_timestamp DESC)) = 1

View File

@ -4,48 +4,69 @@ models:
- name: silver__token_transfers_complete
description: |-
This table records all the Native Token + FTs Transfers of the Near blockchain.
This table records all Native Token and Fungible Token (FT) transfers on the NEAR blockchain. It combines data from multiple sources including native transfers, deposits, NEP-141 method calls, events, mints, orders, and liquidity operations.
columns:
- name: BLOCK_ID
description: "{{ doc('block_id')}}"
tests:
- not_null
- name: BLOCK_TIMESTAMP
description: "{{ doc('block_timestamp')}}"
tests:
- not_null:
where: _inserted_timestamp <= SYSDATE() - interval '2 hours'
- not_null
- dbt_expectations.expect_column_values_to_be_of_type:
column_type: TIMESTAMP_NTZ
- name: TX_HASH
description: "{{ doc('tx_hash')}}"
tests:
- not_null
- name: ACTION_ID
description: "{{ doc('action_id')}}"
- name: RECEIPT_ID
description: "{{ doc('receipt_id')}}"
tests:
- not_null
- name: EVENT_INDEX
description: "The index of the event within the receipt, used to maintain order of events"
tests:
- not_null
- name: CONTRACT_ADDRESS
description: "{{ doc('tx_signer')}}"
tests:
- not_null
- name: FROM_ADDRESS
description: "{{ doc('from_address')}}"
tests:
- not_null
- name: TO_ADDRESS
description: "{{ doc('to_address')}}"
- name: RN
description: "Row number"
tests:
- not_null
- name: MEMO
description: "{{ doc('memo')}}"
- name: AMOUNT_UNADJ
description: "{{ doc('amount_unadj')}}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_of_type:
column_type: FLOAT
- name: TRANSFER_TYPE
description: "{{ doc('transfer_type')}}"
tests:
- not_null
- accepted_values:
values: ['native', 'nep141']
- name: _INSERTED_TIMESTAMP
description: "{{ doc('_inserted_timestamp')}}"
- name: _PARTITION_BY_BLOCK_NUMBER
description: "{{ doc('_partition_by_block_number')}}"
- name: TRANSFERS_COMPLETE_ID
description: "{{doc('id')}}"
@ -55,9 +76,19 @@ models:
- name: INSERTED_TIMESTAMP
description: "{{doc('inserted_timestamp')}}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_of_type:
column_type: TIMESTAMP_NTZ
- name: MODIFIED_TIMESTAMP
description: "{{doc('modified_timestamp')}}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_of_type:
column_type: TIMESTAMP_NTZ
- name: _INVOCATION_ID
description: "{{doc('invocation_id')}}"
tests:
- not_null