better handle redundant SwapEvent modules, use price tbls

This commit is contained in:
Jack Forgash 2025-08-12 14:49:41 -04:00
parent 5ee320ebc3
commit 349ff9a663
2 changed files with 105 additions and 20 deletions

View File

@ -4,7 +4,7 @@
cluster_by = ['modified_timestamp::DATE','block_timestamp::DATE'],
incremental_predicates = ["dynamic_range_predicate", "block_timestamp::date"],
merge_exclude_columns = ["inserted_timestamp"],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(tx_digest, event_index, trader_address, platform_address);",
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(tx_digest, trader_address, platform_address);",
tags = ['gold','defi']
) }}
@ -28,7 +28,9 @@ WITH base_swaps AS (
token_out_type,
trader_address,
dex_swaps_id,
modified_timestamp
modified_timestamp,
token_in_from_txs, -- TEMP
token_out_from_txs -- TEMP
FROM {{ ref('silver__dex_swaps') }}
WHERE 1=1
{% if is_incremental() %}
@ -38,6 +40,20 @@ WITH base_swaps AS (
)
{% endif %}
),
prices AS (
SELECT
hour,
token_address,
symbol,
name,
decimals,
price,
blockchain,
is_native,
token_is_verified
FROM
{{ ref('price__ez_prices_hourly') }}
),
token_prices_in AS (
SELECT
@ -72,20 +88,20 @@ token_prices_in AS (
ON lower(bs.token_in_type) = lower(dim_in.coin_type)
-- Standard token address join
LEFT JOIN crosschain.price.ez_prices_hourly p_in_std
LEFT JOIN prices p_in_std
ON LOWER(SPLIT(bs.token_in_type, '::')[0]) = LOWER(p_in_std.token_address)
AND p_in_std.blockchain = 'sui'
AND p_in_std.hour = DATE_TRUNC('hour', bs.block_timestamp)
-- Native SUI join (for 0x2 addresses)
LEFT JOIN crosschain.price.ez_prices_hourly p_in_native
LEFT JOIN prices p_in_native
ON SPLIT(bs.token_in_type, '::')[0] = '0x2'
AND p_in_native.blockchain = 'sui'
AND p_in_native.is_native = true
AND p_in_native.hour = DATE_TRUNC('hour', bs.block_timestamp)
-- Long-form SUI address join (0x2 -> 0x000...002)
LEFT JOIN crosschain.price.ez_prices_hourly p_in_long
LEFT JOIN prices p_in_long
ON SPLIT(bs.token_in_type, '::')[0] = '0x2'
AND p_in_long.token_address = '0x0000000000000000000000000000000000000000000000000000000000000002'
AND p_in_long.blockchain = 'sui'
@ -192,6 +208,8 @@ SELECT
steps,
-- Token information
token_in_from_txs, -- TEMP
token_out_from_txs, -- TEMP
token_in_type,
token_in_address,
token_in_symbol,
@ -201,7 +219,7 @@ SELECT
token_out_symbol,
token_out_name,
-- Adjusted amounts (divide by decimals)
-- Adjusted amounts
amount_in_raw / POW(10, token_in_decimals) as amount_in,
amount_out_raw / POW(10, token_out_decimals) as amount_out,
CASE
@ -250,4 +268,5 @@ SELECT
modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM with_labels
FROM
with_labels

View File

@ -100,7 +100,10 @@ swaps AS (
COALESCE(
parsed_json:pool::STRING,
parsed_json:pool_address::STRING,
parsed_json:pool_id::STRING
parsed_json:pool_id::STRING,
parsed_json:event:pool::STRING,
parsed_json:event:pool_address::STRING,
parsed_json:event:pool_id::STRING
) AS pool_address,
-- Handle different direction field patterns
@ -117,6 +120,7 @@ swaps AS (
IFF(a_to_b,
COALESCE(
parsed_json:amount_in::NUMBER,
parsed_json:amounts_in[0]::NUMBER,
parsed_json:amount_a::NUMBER,
parsed_json:amount_x::NUMBER,
parsed_json:event:amount_in::NUMBER,
@ -124,6 +128,7 @@ swaps AS (
),
COALESCE(
parsed_json:amount_in::NUMBER,
parsed_json:amounts_in[0]::NUMBER,
parsed_json:amount_b::NUMBER,
parsed_json:amount_y::NUMBER,
parsed_json:event:amount_in::NUMBER,
@ -141,7 +146,8 @@ swaps AS (
parsed_json:coin_in::STRING,
parsed_json:type_in::STRING,
parsed_json:event:coin_in::STRING,
parsed_json:coin_in_type:name::STRING
parsed_json:coin_in_type:name::STRING,
parsed_json:types_in[0]::STRING
),
COALESCE(
parsed_json:coin_b:name::STRING,
@ -149,7 +155,8 @@ swaps AS (
parsed_json:coin_in::STRING,
parsed_json:type_in::STRING,
parsed_json:event:coin_in::STRING,
parsed_json:coin_in_type:name::STRING
parsed_json:coin_in_type:name::STRING,
parsed_json:types_in[0]::STRING
)
) AS token_in_type,
-- Token Out - handle different event patterns
@ -157,6 +164,7 @@ swaps AS (
IFF(a_to_b,
COALESCE(
parsed_json:amount_out::NUMBER,
parsed_json:amounts_out[0]::NUMBER,
parsed_json:amount_b::NUMBER,
parsed_json:amount_y::NUMBER,
parsed_json:event:amount_out::NUMBER,
@ -164,6 +172,7 @@ swaps AS (
),
COALESCE(
parsed_json:amount_out::NUMBER,
parsed_json:amounts_out[0]::NUMBER,
parsed_json:amount_a::NUMBER,
parsed_json:amount_x::NUMBER,
parsed_json:event:amount_out::NUMBER,
@ -181,7 +190,8 @@ swaps AS (
parsed_json:coin_out::STRING,
parsed_json:type_out::STRING,
parsed_json:event:coin_out::STRING,
parsed_json:coin_out_type:name::STRING
parsed_json:coin_out_type:name::STRING,
parsed_json:types_out[0]::STRING
),
COALESCE(
parsed_json:coin_a:name::STRING,
@ -189,7 +199,8 @@ swaps AS (
parsed_json:coin_out::STRING,
parsed_json:type_out::STRING,
parsed_json:event:coin_out::STRING,
parsed_json:coin_out_type:name::STRING
parsed_json:coin_out_type:name::STRING,
parsed_json:types_out[0]::STRING
)
) AS token_out_type,
@ -208,7 +219,54 @@ swaps AS (
tx_sender AS trader_address,
modified_timestamp,
parsed_json
FROM core_events
FROM
core_events
),
-- group swap events to determine the swap_index within the transaction
-- several dexes will emit multiple swap events when the swap is routed
swaps_with_groups AS (
SELECT
*,
-- Create a composite grouping key that handles null pool_address
CASE
WHEN pool_address IS NOT NULL THEN
CONCAT(pool_address, '|', COALESCE(amount_in_raw::STRING, '0'), '|', COALESCE(amount_out_raw::STRING, '0'))
ELSE
CONCAT(package_id, '|', transaction_module, '|', COALESCE(amount_in_raw::STRING, '0'), '|', COALESCE(amount_out_raw::STRING, '0'))
END AS group_key,
-- Get the minimum event_index for each group
MIN(event_index) OVER (PARTITION BY tx_digest,
CASE
WHEN pool_address IS NOT NULL THEN
CONCAT(pool_address, '|', COALESCE(amount_in_raw::STRING, '0'), '|', COALESCE(amount_out_raw::STRING, '0'))
ELSE
CONCAT(package_id, '|', transaction_module, '|', COALESCE(amount_in_raw::STRING, '0'), '|', COALESCE(amount_out_raw::STRING, '0'))
END
) AS group_min_event_index
FROM swaps
),
swaps_with_index AS (
SELECT
*,
-- Use DENSE_RANK to create swap_index based on group_min_event_index
DENSE_RANK() OVER (
PARTITION BY tx_digest
ORDER BY group_min_event_index
) as swap_index
FROM swaps_with_groups
),
deduplicate_swaps AS (
SELECT
*
FROM swaps_with_index
qualify row_number() over (
partition by tx_digest, swap_index
order by token_in_type IS NOT NULL DESC, token_out_type IS NOT NULL DESC
) = 1
),
append_transaction_data AS (
@ -217,6 +275,7 @@ append_transaction_data AS (
s.block_timestamp,
s.tx_digest,
s.event_index,
s.swap_index,
s.type,
s.event_module,
s.package_id,
@ -239,6 +298,7 @@ append_transaction_data AS (
WHEN s.event_resource = 'SellQuoteTokenEvent' THEN t.type_arguments[0] :: STRING
END
) AS token_in_type,
s.token_in_type IS NULL AS token_in_from_txs, -- TEMP
s.amount_out_raw,
COALESCE(
s.token_out_type,
@ -252,6 +312,8 @@ append_transaction_data AS (
WHEN s.event_resource = 'SellQuoteTokenEvent' THEN t.type_arguments[1] :: STRING
END
) AS token_out_type,
s.token_out_type IS NULL AS token_out_from_txs, -- TEMP
payload_details, -- TEMP
s.a_to_b,
s.fee_amount_raw,
s.partner_address,
@ -259,12 +321,13 @@ append_transaction_data AS (
s.parsed_json,
s.modified_timestamp
FROM
swaps s
deduplicate_swaps s
LEFT JOIN core_transactions t
ON s.tx_digest = t.tx_digest
AND s.package_id = t.package
AND s.transaction_module = t.module
AND s.package_index = t.package_index
AND (s.token_in_type IS NULL OR s.token_out_type IS NULL)
)
SELECT
checkpoint_number,
@ -291,13 +354,16 @@ SELECT
token_out_type,
'0x' || token_out_type
) AS token_out_type,
token_in_from_txs, -- TEMP
token_out_from_txs, -- TEMP
a_to_b,
fee_amount_raw,
partner_address,
steps,
ROW_NUMBER() OVER (PARTITION BY tx_digest ORDER BY event_index) AS swap_index,
swap_index,
package_index,
parsed_json,
parsed_json, -- TEMP
payload_details, -- TEMP
{{ dbt_utils.generate_surrogate_key(['tx_digest', 'trader_address', 'token_in_type', 'token_out_type', 'amount_in_raw', 'amount_out_raw']) }} AS dex_swaps_id,
SYSDATE() AS inserted_timestamp,
modified_timestamp,
@ -305,7 +371,7 @@ SELECT
FROM
append_transaction_data
qualify row_number() over (
partition by tx_digest, token_in_type, token_out_type, amount_in_raw, amount_out_raw
order by token_in_type IS NOT NULL DESC, token_out_type IS NOT NULL DESC
) = 1
-- qualify row_number() over (
-- partition by tx_digest, token_in_type, token_out_type, amount_in_raw, amount_out_raw
-- order by token_in_type IS NOT NULL DESC, token_out_type IS NOT NULL DESC
-- ) = 1