diff --git a/models/gold/defi/defi__ez_dex_swaps.sql b/models/gold/defi/defi__ez_dex_swaps.sql index 4d3fc78..b697bde 100644 --- a/models/gold/defi/defi__ez_dex_swaps.sql +++ b/models/gold/defi/defi__ez_dex_swaps.sql @@ -4,7 +4,7 @@ cluster_by = ['modified_timestamp::DATE','block_timestamp::DATE'], incremental_predicates = ["dynamic_range_predicate", "block_timestamp::date"], merge_exclude_columns = ["inserted_timestamp"], - post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(tx_digest, event_index, trader_address, platform_address);", + post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(tx_digest, trader_address, platform_address);", tags = ['gold','defi'] ) }} @@ -28,7 +28,9 @@ WITH base_swaps AS ( token_out_type, trader_address, dex_swaps_id, - modified_timestamp + modified_timestamp, + token_in_from_txs, -- TEMP + token_out_from_txs -- TEMP FROM {{ ref('silver__dex_swaps') }} WHERE 1=1 {% if is_incremental() %} @@ -38,6 +40,20 @@ WITH base_swaps AS ( ) {% endif %} ), +prices AS ( + SELECT + hour, + token_address, + symbol, + name, + decimals, + price, + blockchain, + is_native, + token_is_verified + FROM + {{ ref('price__ez_prices_hourly') }} +), token_prices_in AS ( SELECT @@ -72,20 +88,20 @@ token_prices_in AS ( ON lower(bs.token_in_type) = lower(dim_in.coin_type) -- Standard token address join - LEFT JOIN crosschain.price.ez_prices_hourly p_in_std + LEFT JOIN prices p_in_std ON LOWER(SPLIT(bs.token_in_type, '::')[0]) = LOWER(p_in_std.token_address) AND p_in_std.blockchain = 'sui' AND p_in_std.hour = DATE_TRUNC('hour', bs.block_timestamp) -- Native SUI join (for 0x2 addresses) - LEFT JOIN crosschain.price.ez_prices_hourly p_in_native + LEFT JOIN prices p_in_native ON SPLIT(bs.token_in_type, '::')[0] = '0x2' AND p_in_native.blockchain = 'sui' AND p_in_native.is_native = true AND p_in_native.hour = DATE_TRUNC('hour', bs.block_timestamp) -- Long-form SUI address join (0x2 -> 0x000...002) - LEFT JOIN crosschain.price.ez_prices_hourly p_in_long + LEFT JOIN prices p_in_long ON SPLIT(bs.token_in_type, '::')[0] = '0x2' AND p_in_long.token_address = '0x0000000000000000000000000000000000000000000000000000000000000002' AND p_in_long.blockchain = 'sui' @@ -192,6 +208,8 @@ SELECT steps, -- Token information + token_in_from_txs, -- TEMP + token_out_from_txs, -- TEMP token_in_type, token_in_address, token_in_symbol, @@ -201,7 +219,7 @@ SELECT token_out_symbol, token_out_name, - -- Adjusted amounts (divide by decimals) + -- Adjusted amounts amount_in_raw / POW(10, token_in_decimals) as amount_in, amount_out_raw / POW(10, token_out_decimals) as amount_out, CASE @@ -250,4 +268,5 @@ SELECT modified_timestamp, '{{ invocation_id }}' AS _invocation_id -FROM with_labels \ No newline at end of file +FROM + with_labels diff --git a/models/silver/defi/silver__dex_swaps.sql b/models/silver/defi/silver__dex_swaps.sql index 648262c..36eba50 100644 --- a/models/silver/defi/silver__dex_swaps.sql +++ b/models/silver/defi/silver__dex_swaps.sql @@ -100,7 +100,10 @@ swaps AS ( COALESCE( parsed_json:pool::STRING, parsed_json:pool_address::STRING, - parsed_json:pool_id::STRING + parsed_json:pool_id::STRING, + parsed_json:event:pool::STRING, + parsed_json:event:pool_address::STRING, + parsed_json:event:pool_id::STRING ) AS pool_address, -- Handle different direction field patterns @@ -117,6 +120,7 @@ swaps AS ( IFF(a_to_b, COALESCE( parsed_json:amount_in::NUMBER, + parsed_json:amounts_in[0]::NUMBER, parsed_json:amount_a::NUMBER, parsed_json:amount_x::NUMBER, parsed_json:event:amount_in::NUMBER, @@ -124,6 +128,7 @@ swaps AS ( ), COALESCE( parsed_json:amount_in::NUMBER, + parsed_json:amounts_in[0]::NUMBER, parsed_json:amount_b::NUMBER, parsed_json:amount_y::NUMBER, parsed_json:event:amount_in::NUMBER, @@ -141,7 +146,8 @@ swaps AS ( parsed_json:coin_in::STRING, parsed_json:type_in::STRING, parsed_json:event:coin_in::STRING, - parsed_json:coin_in_type:name::STRING + parsed_json:coin_in_type:name::STRING, + parsed_json:types_in[0]::STRING ), COALESCE( parsed_json:coin_b:name::STRING, @@ -149,7 +155,8 @@ swaps AS ( parsed_json:coin_in::STRING, parsed_json:type_in::STRING, parsed_json:event:coin_in::STRING, - parsed_json:coin_in_type:name::STRING + parsed_json:coin_in_type:name::STRING, + parsed_json:types_in[0]::STRING ) ) AS token_in_type, -- Token Out - handle different event patterns @@ -157,6 +164,7 @@ swaps AS ( IFF(a_to_b, COALESCE( parsed_json:amount_out::NUMBER, + parsed_json:amounts_out[0]::NUMBER, parsed_json:amount_b::NUMBER, parsed_json:amount_y::NUMBER, parsed_json:event:amount_out::NUMBER, @@ -164,6 +172,7 @@ swaps AS ( ), COALESCE( parsed_json:amount_out::NUMBER, + parsed_json:amounts_out[0]::NUMBER, parsed_json:amount_a::NUMBER, parsed_json:amount_x::NUMBER, parsed_json:event:amount_out::NUMBER, @@ -181,7 +190,8 @@ swaps AS ( parsed_json:coin_out::STRING, parsed_json:type_out::STRING, parsed_json:event:coin_out::STRING, - parsed_json:coin_out_type:name::STRING + parsed_json:coin_out_type:name::STRING, + parsed_json:types_out[0]::STRING ), COALESCE( parsed_json:coin_a:name::STRING, @@ -189,7 +199,8 @@ swaps AS ( parsed_json:coin_out::STRING, parsed_json:type_out::STRING, parsed_json:event:coin_out::STRING, - parsed_json:coin_out_type:name::STRING + parsed_json:coin_out_type:name::STRING, + parsed_json:types_out[0]::STRING ) ) AS token_out_type, @@ -208,7 +219,54 @@ swaps AS ( tx_sender AS trader_address, modified_timestamp, parsed_json - FROM core_events + FROM + core_events +), + +-- group swap events to determine the swap_index within the transaction +-- several dexes will emit multiple swap events when the swap is routed +swaps_with_groups AS ( + SELECT + *, + -- Create a composite grouping key that handles null pool_address + CASE + WHEN pool_address IS NOT NULL THEN + CONCAT(pool_address, '|', COALESCE(amount_in_raw::STRING, '0'), '|', COALESCE(amount_out_raw::STRING, '0')) + ELSE + CONCAT(package_id, '|', transaction_module, '|', COALESCE(amount_in_raw::STRING, '0'), '|', COALESCE(amount_out_raw::STRING, '0')) + END AS group_key, + -- Get the minimum event_index for each group + MIN(event_index) OVER (PARTITION BY tx_digest, + CASE + WHEN pool_address IS NOT NULL THEN + CONCAT(pool_address, '|', COALESCE(amount_in_raw::STRING, '0'), '|', COALESCE(amount_out_raw::STRING, '0')) + ELSE + CONCAT(package_id, '|', transaction_module, '|', COALESCE(amount_in_raw::STRING, '0'), '|', COALESCE(amount_out_raw::STRING, '0')) + END + ) AS group_min_event_index + FROM swaps +), + +swaps_with_index AS ( + SELECT + *, + -- Use DENSE_RANK to create swap_index based on group_min_event_index + DENSE_RANK() OVER ( + PARTITION BY tx_digest + ORDER BY group_min_event_index + ) as swap_index + FROM swaps_with_groups +), + +deduplicate_swaps AS ( + SELECT + * + FROM swaps_with_index + + qualify row_number() over ( + partition by tx_digest, swap_index + order by token_in_type IS NOT NULL DESC, token_out_type IS NOT NULL DESC + ) = 1 ), append_transaction_data AS ( @@ -217,6 +275,7 @@ append_transaction_data AS ( s.block_timestamp, s.tx_digest, s.event_index, + s.swap_index, s.type, s.event_module, s.package_id, @@ -239,6 +298,7 @@ append_transaction_data AS ( WHEN s.event_resource = 'SellQuoteTokenEvent' THEN t.type_arguments[0] :: STRING END ) AS token_in_type, + s.token_in_type IS NULL AS token_in_from_txs, -- TEMP s.amount_out_raw, COALESCE( s.token_out_type, @@ -252,6 +312,8 @@ append_transaction_data AS ( WHEN s.event_resource = 'SellQuoteTokenEvent' THEN t.type_arguments[1] :: STRING END ) AS token_out_type, + s.token_out_type IS NULL AS token_out_from_txs, -- TEMP + payload_details, -- TEMP s.a_to_b, s.fee_amount_raw, s.partner_address, @@ -259,12 +321,13 @@ append_transaction_data AS ( s.parsed_json, s.modified_timestamp FROM - swaps s + deduplicate_swaps s LEFT JOIN core_transactions t ON s.tx_digest = t.tx_digest AND s.package_id = t.package AND s.transaction_module = t.module AND s.package_index = t.package_index + AND (s.token_in_type IS NULL OR s.token_out_type IS NULL) ) SELECT checkpoint_number, @@ -291,13 +354,16 @@ SELECT token_out_type, '0x' || token_out_type ) AS token_out_type, + token_in_from_txs, -- TEMP + token_out_from_txs, -- TEMP a_to_b, fee_amount_raw, partner_address, steps, - ROW_NUMBER() OVER (PARTITION BY tx_digest ORDER BY event_index) AS swap_index, + swap_index, package_index, - parsed_json, + parsed_json, -- TEMP + payload_details, -- TEMP {{ dbt_utils.generate_surrogate_key(['tx_digest', 'trader_address', 'token_in_type', 'token_out_type', 'amount_in_raw', 'amount_out_raw']) }} AS dex_swaps_id, SYSDATE() AS inserted_timestamp, modified_timestamp, @@ -305,7 +371,7 @@ SELECT FROM append_transaction_data -qualify row_number() over ( - partition by tx_digest, token_in_type, token_out_type, amount_in_raw, amount_out_raw - order by token_in_type IS NOT NULL DESC, token_out_type IS NOT NULL DESC - ) = 1 +-- qualify row_number() over ( +-- partition by tx_digest, token_in_type, token_out_type, amount_in_raw, amount_out_raw +-- order by token_in_type IS NOT NULL DESC, token_out_type IS NOT NULL DESC +-- ) = 1