fix price join, rm dev 30d restriction

This commit is contained in:
Jack Forgash 2025-08-14 10:15:09 -04:00
parent 8db8378dfd
commit 6cbe9555f6
6 changed files with 98 additions and 40 deletions

View File

@ -722,4 +722,20 @@ The human-readable name of the trader, derived from address labeling or defaulti
{% docs trader_project_name %}
The project or organization name associated with the trader address, extracted from address labeling data. This field provides organizational context for the trader, enabling analysis of institutional vs. retail trading patterns. May be NULL for individual traders without established project labels. Useful for understanding trading behavior by entity type.
{% enddocs %}
{% docs package_index %}
Zero-based index of the package within a transaction that contains multiple package calls. Used to distinguish between different package invocations in the same transaction. Essential for multi-package transaction analysis and package-level flow tracking. Example: 0 for the first package call, 1 for the second, etc.
{% enddocs %}
{% docs swap_index %}
Sequential index of the swap within a transaction that contains multiple swaps. Used to order and identify individual swaps when a transaction performs multiple DEX operations. Essential for multi-swap transaction analysis and swap-level flow tracking. Example: 1 for the first swap, 2 for the second, etc.
{% enddocs %}
{% docs token_in_from_txs %}
Boolean flag indicating whether the input token type was derived from transaction payload data rather than event data. True when the token type could not be determined from the event and was inferred from transaction type arguments. Important for data quality assessment and understanding token identification reliability. Example: true when event data was insufficient for token identification.
{% enddocs %}
{% docs token_out_from_txs %}
Boolean flag indicating whether the output token type was derived from transaction payload data rather than event data. True when the token type could not be determined from the event and was inferred from transaction type arguments. Important for data quality assessment and understanding token identification reliability. Example: true when event data was insufficient for token identification.
{% enddocs %}

View File

@ -1,15 +1,15 @@
{{ config (
materialized = "incremental",
unique_key = "dex_swaps_id",
cluster_by = ['modified_timestamp::DATE','block_timestamp::DATE'],
unique_key = "ez_dex_swaps_id",
cluster_by = ['block_timestamp::DATE'],
incremental_predicates = ["dynamic_range_predicate", "block_timestamp::date"],
merge_exclude_columns = ["inserted_timestamp"],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(tx_digest, trader_address, platform_address);",
tags = ['gold','defi']
tags = ['non_core']
) }}
WITH base_swaps AS (
-- Union regular DEX swaps with Aftermath DEX swaps
SELECT
checkpoint_number,
block_timestamp,
@ -30,9 +30,8 @@ WITH base_swaps AS (
dex_swaps_id,
modified_timestamp
FROM {{ ref('silver__dex_swaps') }}
WHERE 1=1
{% if is_incremental() %}
AND modified_timestamp >= (
WHERE modified_timestamp >= (
SELECT COALESCE(MAX(modified_timestamp), '1900-01-01'::TIMESTAMP)
FROM {{ this }}
)
@ -60,9 +59,8 @@ WITH base_swaps AS (
dex_swaps_id,
modified_timestamp,
FROM {{ ref('silver__aftermath_dex_swaps') }}
WHERE 1=1
{% if is_incremental() %}
AND modified_timestamp >= (
WHERE modified_timestamp >= (
SELECT COALESCE(MAX(modified_timestamp), '1900-01-01'::TIMESTAMP)
FROM {{ this }}
)
@ -117,7 +115,8 @@ token_prices_in AS (
-- Standard token address join
LEFT JOIN prices p_in_std
ON LOWER(SPLIT(bs.token_in_type, '::')[0]) = LOWER(p_in_std.token_address)
ON (LOWER(SPLIT(bs.token_in_type, '::')[0]) = LOWER(p_in_std.token_address)
OR LOWER(bs.token_in_type) = LOWER(p_in_std.token_address))
AND p_in_std.blockchain = 'sui'
AND p_in_std.hour = DATE_TRUNC('hour', bs.block_timestamp)
@ -167,7 +166,8 @@ with_all_prices AS (
-- Standard token address join
LEFT JOIN crosschain.price.ez_prices_hourly p_out_std
ON LOWER(tpi.token_out_address) = LOWER(p_out_std.token_address)
ON (LOWER(tpi.token_out_address) = LOWER(p_out_std.token_address)
OR LOWER(tpi.token_out_type) = LOWER(p_out_std.token_address))
AND p_out_std.blockchain = 'sui'
AND p_out_std.hour = DATE_TRUNC('hour', tpi.block_timestamp)
@ -288,7 +288,7 @@ SELECT
trader_address,
-- Metadata
dex_swaps_id,
dex_swaps_id AS ez_dex_swaps_id,
SYSDATE() AS inserted_timestamp,
modified_timestamp,
'{{ invocation_id }}' AS _invocation_id

View File

@ -190,7 +190,7 @@ models:
description: "{{ doc('trader_address') }}"
tests:
- not_null
- name: DEX_SWAPS_ID
- name: EZ_DEX_SWAPS_ID
description: "{{ doc('dex_swaps_id') }}"
tests:
- not_null

View File

@ -5,7 +5,7 @@
cluster_by = ['modified_timestamp::DATE','block_timestamp::DATE'],
incremental_predicates = ["dynamic_range_predicate", "block_timestamp::date"],
merge_exclude_columns = ["inserted_timestamp"],
tags = ['scheduled_non_core']
tags = ['non_core']
) }}
WITH core_events AS (
@ -41,9 +41,6 @@ WITH core_events AS (
'SwapEvent',
'SwapEventV2'
)
-- limit to 30 days for dev
AND block_timestamp >= sysdate() - interval '30 days'
),
swaps AS (

View File

@ -5,10 +5,30 @@
cluster_by = ['modified_timestamp::DATE','block_timestamp::DATE'],
incremental_predicates = ["dynamic_range_predicate", "block_timestamp::date"],
merge_exclude_columns = ["inserted_timestamp"],
tags = ['scheduled_non_core']
tags = ['non_core']
) }}
-- TODO: use front matter IF EXECUTE block to set blockdate for table scans due to ongoing backfill
{% if execute %}
{% if is_incremental() %}
{% set min_bd_query %}
SELECT
MIN(
block_timestamp :: DATE
)
FROM
{{ ref('core__fact_events') }}
WHERE
modified_timestamp >= (
SELECT
MAX(modified_timestamp)
FROM
{{ this }}
) {% endset %}
{% set min_bd = run_query(min_bd_query) [0] [0] %}
{% endif %}
{% endif %}
WITH core_events AS (
SELECT
@ -38,20 +58,16 @@ WITH core_events AS (
AND
{% endif %}
(
-- primary swap resources for this model
event_resource ILIKE '%swap%'
-- primary swap resources for this model use SwapEvent or PLATFORMSwapEvent
-- Haedal-specific resources uses buy and sell
event_resource ILIKE ANY ('%swapevent%', '%buy%', '%sell%')
OR event_resource IN (
'Swap',
'OrderFilled',
'TradeEvent',
'SwapEvent'
)
-- Haedal-specific resources
OR event_resource ILIKE '%buy%'
OR event_resource ILIKE '%sell%'
)
-- exclude modules that require special handling
AND event_resource NOT IN (
'RepayFlashSwapEvent',
@ -61,15 +77,14 @@ WITH core_events AS (
)
AND transaction_module NOT IN (
'aftermath',
'scallop'
'scallop',
'fulfill_swap',
'slippage'
)
AND transaction_module NOT ILIKE '%steamm%'
-- exclude limit orders from base model
AND event_module NOT IN ('settle')
-- limit to 30 days for dev
AND block_timestamp >= sysdate() - interval '30 days'
),
core_transactions AS (
@ -86,16 +101,15 @@ core_transactions AS (
{{ ref('core__fact_transactions') }}
WHERE
{% if is_incremental() %}
modified_timestamp >= (
block_timestamp :: DATE >= (
SELECT
COALESCE(MAX(modified_timestamp), '1900-01-01'::TIMESTAMP) AS modified_timestamp
COALESCE(MAX(block_timestamp :: DATE), '1900-01-01'::DATE) AS block_timestamp
FROM
{{ this }}
)
AND
{% endif %}
date_trunc('day', block_timestamp) >= (SELECT MIN(date_trunc('day', block_timestamp)) FROM core_events)
AND tx_digest IN (SELECT DISTINCT tx_digest FROM core_events)
tx_digest IN (SELECT DISTINCT tx_digest FROM core_events)
),
swaps AS (
@ -421,14 +435,9 @@ SELECT
package_index,
parsed_json, -- TEMP
payload_details, -- TEMP
{{ dbt_utils.generate_surrogate_key(['tx_digest', 'trader_address', 'token_in_type', 'token_out_type', 'amount_in_raw', 'amount_out_raw']) }} AS dex_swaps_id,
{{ dbt_utils.generate_surrogate_key(['tx_digest', 'trader_address', 'token_in_type', 'token_out_type', 'amount_in_raw', 'amount_out_raw', 'swap_index']) }} AS dex_swaps_id,
SYSDATE() AS inserted_timestamp,
modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
append_transaction_data
-- qualify row_number() over (
-- partition by tx_digest, token_in_type, token_out_type, amount_in_raw, amount_out_raw
-- order by token_in_type IS NOT NULL DESC, token_out_type IS NOT NULL DESC
-- ) = 1

View File

@ -79,7 +79,7 @@ models:
description: "{{ doc('a_to_b') }}"
- name: FEE_AMOUNT_RAW
data_type: NUMBER(38,0)
data_type: TEXT
description: "{{ doc('fee_amount_raw') }}"
- name: PARTNER_ADDRESS
@ -127,6 +127,14 @@ models:
data_type: TIMESTAMP_NTZ
description: "{{ doc('modified_timestamp') }}"
- name: PACKAGE_ID
data_type: TEXT
description: "{{ doc('package_id') }}"
- name: PARSED_JSON
data_type: VARIANT
description: "{{ doc('parsed_json') }}"
- name: _INVOCATION_ID
data_type: TEXT
description: "{{ doc('_invocation_id') }}"
@ -262,6 +270,34 @@ models:
data_type: TIMESTAMP_NTZ
description: "{{ doc('modified_timestamp') }}"
- name: PACKAGE_ID
data_type: TEXT
description: "{{ doc('package_id') }}"
- name: PACKAGE_INDEX
data_type: NUMBER
description: "{{ doc('package_index') }}"
- name: PARSED_JSON
data_type: VARIANT
description: "{{ doc('parsed_json') }}"
- name: PAYLOAD_DETAILS
data_type: VARIANT
description: "{{ doc('payload_details') }}"
- name: SWAP_INDEX
data_type: NUMBER
description: "{{ doc('swap_index') }}"
- name: TOKEN_IN_FROM_TXS
data_type: BOOLEAN
description: "{{ doc('token_in_from_txs') }}"
- name: TOKEN_OUT_FROM_TXS
data_type: BOOLEAN
description: "{{ doc('token_out_from_txs') }}"
- name: _INVOCATION_ID
data_type: TEXT
description: "{{ doc('_invocation_id') }}"