diff --git a/models/gold/defi/defi__ez_dex_swaps.sql b/models/gold/defi/defi__ez_dex_swaps.sql index 9d61b7f..4d3fc78 100644 --- a/models/gold/defi/defi__ez_dex_swaps.sql +++ b/models/gold/defi/defi__ez_dex_swaps.sql @@ -1,6 +1,6 @@ {{ config ( materialized = "incremental", - unique_key = ["tx_digest", "event_index"], + unique_key = "dex_swaps_id", cluster_by = ['modified_timestamp::DATE','block_timestamp::DATE'], incremental_predicates = ["dynamic_range_predicate", "block_timestamp::date"], merge_exclude_columns = ["inserted_timestamp"], @@ -14,6 +14,7 @@ WITH base_swaps AS ( block_timestamp, tx_digest, event_index, + swap_index, event_module, platform_address, pool_address, @@ -174,6 +175,7 @@ SELECT block_timestamp, tx_digest, event_index, + swap_index, -- Platform information platform_address, diff --git a/models/silver/defi/silver__dex_swaps.sql b/models/silver/defi/silver__dex_swaps.sql index 5d284bc..f5a8f28 100644 --- a/models/silver/defi/silver__dex_swaps.sql +++ b/models/silver/defi/silver__dex_swaps.sql @@ -1,13 +1,15 @@ -- depends_on: {{ ref('core__fact_events') }} {{ config ( materialized = "incremental", - unique_key = ["tx_digest", "event_index"], + unique_key = "dex_swaps_id", cluster_by = ['modified_timestamp::DATE','block_timestamp::DATE'], incremental_predicates = ["dynamic_range_predicate", "block_timestamp::date"], merge_exclude_columns = ["inserted_timestamp"], tags = ['scheduled_non_core'] ) }} +-- TODO: use front matter IF EXECUTE block to set blockdate for table scans due to ongoing backfill + WITH core_events AS ( SELECT checkpoint_number, @@ -19,6 +21,7 @@ WITH core_events AS ( event_address, event_module, event_resource, + package_id, transaction_module, parsed_json, modified_timestamp @@ -34,35 +37,47 @@ WITH core_events AS ( ) AND {% endif %} - event_module IN ( - 'aftermath', - 'alphafi', - 'bluemove', - 'bluefin', - 'cetus', - 'deepbookv3', - 'flowx_amm', - 'flowx_clmm', - 'haedal', - 'haedalpmm', - 'kriya_amm', - 'kriya_clmm', - 'magma', - 'metastable', - 'momentum', - 'obric', - 'scallop', - 'sevenk', - 'steamm_cpmm', - 'steamm_omm_v2', - 'steammfe', - 'suilend', - 'turbos', - 'volo' + ( + event_resource ILIKE '%swap%' + OR event_resource IN ( + 'Swap', + 'OrderFilled', + 'TradeEvent' + ) ) + AND event_resource NOT IN ( + 'RepayFlashSwapEvent' + ) + -- limit to 30 days for dev AND block_timestamp >= sysdate() - interval '30 days' ), +core_transactions AS ( + SELECT + tx_digest, + payload_index, + payload_details:function::string as function, + payload_details:module::string as module, + payload_details:package::string as package, + payload_details:type_arguments::ARRAY as type_arguments, + payload_details, + row_number() over (partition by tx_digest, package, module order by payload_index) as package_index + FROM + {{ ref('core__fact_transactions') }} + WHERE +{% if is_incremental() %} + modified_timestamp >= ( + SELECT + COALESCE(MAX(modified_timestamp), '1900-01-01'::TIMESTAMP) AS modified_timestamp + FROM + {{ this }} + ) + AND +{% endif %} + date_trunc('day', block_timestamp) >= (SELECT MIN(date_trunc('day', block_timestamp)) FROM core_events) + AND tx_digest IN (SELECT DISTINCT tx_digest FROM core_events) +), + swaps AS ( SELECT checkpoint_number, @@ -73,30 +88,158 @@ swaps AS ( type, event_module, event_resource, + package_id, transaction_module, + ROW_NUMBER() OVER (PARTITION BY tx_digest, package_id, transaction_module ORDER BY event_index) AS package_index, event_address AS platform_address, - COALESCE(parsed_json:pool::STRING, parsed_json:pool_address::STRING, parsed_json:pool_id::STRING) AS pool_address, - parsed_json:amount_in::NUMBER AS amount_in_raw, - parsed_json:amount_out::NUMBER AS amount_out_raw, - parsed_json:a2b::BOOLEAN AS a_to_b, - parsed_json:fee_amount::NUMBER AS fee_amount_raw, - parsed_json:partner_id::STRING AS partner_address, - COALESCE(parsed_json:steps::NUMBER, 1) AS steps, - IFF( + COALESCE( + parsed_json:pool::STRING, + parsed_json:pool_address::STRING, + parsed_json:pool_id::STRING + ) AS pool_address, + + -- Handle different direction field patterns + COALESCE( parsed_json:a2b::BOOLEAN, - parsed_json:coin_a:name::STRING, - parsed_json:coin_b:name::STRING + parsed_json:a_to_b::BOOLEAN, + parsed_json:atob::BOOLEAN, + parsed_json:x_for_y::BOOLEAN, + parsed_json:event:a2b::BOOLEAN + ) AS a_to_b, + + -- Token In + IFF(a_to_b, + COALESCE( + parsed_json:amount_in::NUMBER, + parsed_json:amount_a::NUMBER, + parsed_json:amount_x::NUMBER, + parsed_json:event:amount_in::NUMBER, + parsed_json:coin_in_amount::NUMBER + ), + COALESCE( + parsed_json:amount_in::NUMBER, + parsed_json:amount_b::NUMBER, + parsed_json:amount_y::NUMBER, + parsed_json:event:amount_in::NUMBER, + parsed_json:coin_in_amount::NUMBER + ) + ) AS amount_in_raw, + IFF(a_to_b, + COALESCE( + parsed_json:coin_a:name::STRING, + parsed_json:coin_in:name::STRING, + parsed_json:coin_in::STRING, + parsed_json:type_in::STRING, + parsed_json:event:coin_in::STRING, + parsed_json:coin_in_type:name::STRING + ), + COALESCE( + parsed_json:coin_b:name::STRING, + parsed_json:coin_in:name::STRING, + parsed_json:coin_in::STRING, + parsed_json:type_in::STRING, + parsed_json:event:coin_in::STRING, + parsed_json:coin_in_type:name::STRING + ) ) AS token_in_type, - IFF( - parsed_json:a2b::BOOLEAN, - parsed_json:coin_b:name::STRING, - parsed_json:coin_a:name::STRING + -- Token Out + IFF(a_to_b, + COALESCE( + parsed_json:amount_out::NUMBER, + parsed_json:amount_b::NUMBER, + parsed_json:amount_y::NUMBER, + parsed_json:event:amount_out::NUMBER, + parsed_json:coin_out_amount::NUMBER + ), + COALESCE( + parsed_json:amount_out::NUMBER, + parsed_json:amount_a::NUMBER, + parsed_json:amount_x::NUMBER, + parsed_json:event:amount_out::NUMBER, + parsed_json:coin_out_amount::NUMBER + ) + ) AS amount_out_raw, + IFF(a_to_b, + COALESCE( + parsed_json:coin_b:name::STRING, + parsed_json:coin_out:name::STRING, + parsed_json:coin_out::STRING, + parsed_json:type_out::STRING, + parsed_json:event:coin_out::STRING, + parsed_json:coin_out_type:name::STRING + ), + COALESCE( + parsed_json:coin_a:name::STRING, + parsed_json:coin_out:name::STRING, + parsed_json:coin_out::STRING, + parsed_json:type_out::STRING, + parsed_json:event:coin_out::STRING, + parsed_json:coin_out_type:name::STRING + ) ) AS token_out_type, + + COALESCE( + parsed_json:fee_amount::NUMBER, + parsed_json:protocol_fee_amount::NUMBER, + parsed_json:protocol_fee::NUMBER + ) AS fee_amount_raw, + + COALESCE( + parsed_json:partner_id::STRING, + parsed_json:partner::STRING + ) AS partner_address, + COALESCE(parsed_json:steps::NUMBER, 1) AS steps, + tx_sender AS trader_address, modified_timestamp, parsed_json FROM core_events - WHERE event_resource like '%SwapEvent' +), + +append_transaction_data AS ( + SELECT + s.checkpoint_number, + s.block_timestamp, + s.tx_digest, + s.event_index, + s.type, + s.event_module, + s.package_id, + s.package_index, + s.transaction_module, + s.event_resource, + s.platform_address, + s.trader_address, + s.pool_address, + s.amount_in_raw, + COALESCE( + s.token_in_type, + IFF(a_to_b, + t.type_arguments[0] :: STRING, + t.type_arguments[1] :: STRING + ) + ) AS token_in_type, + s.amount_out_raw, + COALESCE( + s.token_out_type, + IFF(a_to_b, + t.type_arguments[1] :: STRING, + t.type_arguments[0] :: STRING + ) + ) AS token_out_type, + s.a_to_b, + s.fee_amount_raw, + s.partner_address, + s.steps, + s.parsed_json, + s.modified_timestamp + FROM + swaps s + LEFT JOIN core_transactions t + ON s.tx_digest = t.tx_digest + AND s.package_id = t.package + AND s.transaction_module = t.module + AND s.package_index = t.package_index ) SELECT checkpoint_number, @@ -105,29 +248,39 @@ SELECT event_index, type, event_module, + package_id, + transaction_module, event_resource, platform_address, + trader_address, pool_address, amount_in_raw, - amount_out_raw, - a_to_b, - fee_amount_raw, - partner_address, - steps, IFF( LEFT(token_in_type, 2) = '0x', token_in_type, '0x' || token_in_type ) AS token_in_type, + amount_out_raw, IFF( LEFT(token_out_type, 2) = '0x', token_out_type, '0x' || token_out_type ) AS token_out_type, - trader_address, - {{ dbt_utils.generate_surrogate_key(['tx_digest', 'platform_address', 'trader_address', 'token_in_type', 'token_out_type', 'amount_in_raw', 'amount_out_raw']) }} AS dex_swaps_id, + a_to_b, + fee_amount_raw, + partner_address, + steps, + ROW_NUMBER() OVER (PARTITION BY tx_digest ORDER BY event_index) AS swap_index, + package_index, + parsed_json, + {{ dbt_utils.generate_surrogate_key(['tx_digest', 'trader_address', 'token_in_type', 'token_out_type', 'amount_in_raw', 'amount_out_raw']) }} AS dex_swaps_id, SYSDATE() AS inserted_timestamp, modified_timestamp, '{{ invocation_id }}' AS _invocation_id FROM - swaps + append_transaction_data + +qualify row_number() over ( + partition by tx_digest, token_in_type, token_out_type, amount_in_raw, amount_out_raw + order by token_in_type IS NOT NULL DESC, token_out_type IS NOT NULL DESC + ) = 1