From 226615c4e6aa6a3835dfa3248d1445f928b524cb Mon Sep 17 00:00:00 2001 From: tarikceric <46071768+tarikceric@users.noreply.github.com> Date: Tue, 18 Nov 2025 16:18:09 +0100 Subject: [PATCH] new v2 jupiter event_types (#890) * new v2 jupiter event_types * update pk and qualify --- .../defi/defi__fact_swaps_jupiter_inner.yml | 1 - ...er__swaps_inner_intermediate_jupiterv6.sql | 71 +++++++++++++++++-- ...er__swaps_inner_intermediate_jupiterv6.yml | 5 +- ...silver__swaps_intermediate_jupiterv6_2.sql | 4 +- 4 files changed, 69 insertions(+), 12 deletions(-) diff --git a/models/gold/defi/defi__fact_swaps_jupiter_inner.yml b/models/gold/defi/defi__fact_swaps_jupiter_inner.yml index 8ee27c5a..c5ee4ab7 100644 --- a/models/gold/defi/defi__fact_swaps_jupiter_inner.yml +++ b/models/gold/defi/defi__fact_swaps_jupiter_inner.yml @@ -50,7 +50,6 @@ models: description: "{{ doc('swap_program_id') }}" tests: - dbt_expectations.expect_column_to_exist - - not_null: *recent_date_filter - name: AGGREGATOR_PROGRAM_ID description: "{{ doc('aggregator_program_id') }}" tests: diff --git a/models/silver/swaps/jupiter/v6/silver__swaps_inner_intermediate_jupiterv6.sql b/models/silver/swaps/jupiter/v6/silver__swaps_inner_intermediate_jupiterv6.sql index 6630a3ac..86741919 100644 --- a/models/silver/swaps/jupiter/v6/silver__swaps_inner_intermediate_jupiterv6.sql +++ b/models/silver/swaps/jupiter/v6/silver__swaps_inner_intermediate_jupiterv6.sql @@ -25,7 +25,7 @@ {{ ref('silver__decoded_logs') }} WHERE program_id = 'JUP6LkbZbjS1jKKwapdHNy74zcZ3tLUZoi5QNyVTaV4' - AND event_type = 'SwapEvent' + AND event_type IN ('SwapEvent', 'SwapsEvent') AND succeeded {% if is_incremental() %} AND _inserted_timestamp >= ( @@ -51,7 +51,7 @@ AND s.tx_id = l.tx_id WHERE l.program_id = 'JUP6LkbZbjS1jKKwapdHNy74zcZ3tLUZoi5QNyVTaV4' - AND l.event_type = 'SwapEvent' + AND l.event_type IN ('SwapEvent', 'SwapsEvent') AND l.succeeded AND s.swapper IS NULL AND s._inserted_timestamp >= current_date - 2 /* only look back 2 days */ @@ -65,6 +65,7 @@ FROM base ) + -- Handle single SwapEvent records SELECT block_timestamp, block_id, @@ -79,6 +80,7 @@ decoded_log:args:outputMint::string AS to_mint, decoded_log:args:outputAmount::string AS to_amount, _inserted_timestamp, + 0 AS swap_event_index, -- Single swap always has index 0 FROM {{ ref('silver__decoded_logs') }} JOIN @@ -105,6 +107,53 @@ {% else %} AND _inserted_timestamp::date < '2024-06-14' {% endif %} + + UNION ALL + + -- Handle multiple SwapsEvent records (flatten the nested swaps) + SELECT + l.block_timestamp, + l.block_id, + l.tx_id, + l.index, + l.inner_index, + l.succeeded, + l.event_type, + NULL AS program_id, -- SwapsEvent doesn't have amm field + s.value:inputMint::string AS from_mint, + s.value:inputAmount::string AS from_amount, + s.value:outputMint::string AS to_mint, + s.value:outputAmount::string AS to_amount, + l._inserted_timestamp, + s.key::int AS swap_event_index, -- Add unique identifier for each swap in SwapsEvent + FROM + {{ ref('silver__decoded_logs') }} l + JOIN + distinct_entities + USING(tx_id) + JOIN + table(flatten(l.decoded_log:args:swapEvents)) s + WHERE + l.program_id = 'JUP6LkbZbjS1jKKwapdHNy74zcZ3tLUZoi5QNyVTaV4' + AND l.event_type = 'SwapsEvent' + AND l.succeeded + AND l.block_timestamp >= ( + SELECT + MIN(block_timestamp) + FROM + distinct_entities + ) + /* need to always keep the upper bound (if there is one) to prevent time gaps in incremental loading */ + {% if is_incremental() %} + AND l._inserted_timestamp < ( + SELECT + MAX(_inserted_timestamp) + INTERVAL '100 day' + FROM + {{ this }} + ) + {% else %} + AND l._inserted_timestamp::date < '2024-06-14' + {% endif %} {% endset %} {% do run_query(base_query) %} {% set between_stmts = fsc_utils.dynamic_range_predicate("silver.swaps_inner_intermediate_jupiterv6__intermediate_tmp","block_timestamp::date") %} @@ -116,7 +165,9 @@ WITH base AS ( FROM silver.swaps_inner_intermediate_jupiterv6__intermediate_tmp QUALIFY - row_number() OVER (PARTITION BY tx_id, index, coalesce(inner_index,-1) ORDER BY _inserted_timestamp DESC) = 1 + -- For SwapsEvent, partition includes swap_event_index to handle multiple swaps in one event. + -- For SwapEvent, swap_event_index is 0 and doesn't alter the original partitioning logic. + row_number() OVER (PARTITION BY tx_id, index, coalesce(inner_index,-1), coalesce(swap_event_index, 0) ORDER BY _inserted_timestamp DESC) = 1 ), swappers AS ( SELECT @@ -134,7 +185,7 @@ swappers AS ( WHERE {{ between_stmts }} AND program_id = 'JUP6LkbZbjS1jKKwapdHNy74zcZ3tLUZoi5QNyVTaV4' - AND event_type IN ('exactOutRoute', 'sharedAccountsExactOutRoute', 'sharedAccountsRoute', 'routeWithTokenLedger', 'route', 'sharedAccountsRouteWithTokenLedger', 'exact_out_route', 'shared_accounts_exact_out_route', 'shared_accounts_route', 'route_with_token_ledger', 'shared_accounts_route_with_token_ledger') + AND event_type IN ('exactOutRoute', 'sharedAccountsExactOutRoute', 'sharedAccountsRoute', 'routeWithTokenLedger', 'route', 'sharedAccountsRouteWithTokenLedger', 'exact_out_route', 'shared_accounts_exact_out_route', 'shared_accounts_route', 'route_with_token_ledger', 'shared_accounts_route_with_token_ledger', 'route_v2', 'shared_accounts_route_v2', 'shared_accounts_exact_out_route_v2', 'exact_out_route_v2') AND swapper IS NOT NULL QUALIFY row_number() OVER (PARTITION BY tx_id, index, coalesce(inner_index, -1) ORDER BY _inserted_timestamp DESC) = 1 @@ -161,7 +212,10 @@ pre_final AS ( b.tx_id, b.index, b.inner_index, - row_number() OVER (PARTITION BY b.tx_id, b.index, s.inner_index ORDER BY b.inner_index)-1 AS swap_index, /* we want the swap index as it relates to the top level swap instruction */ + CASE + WHEN b.event_type = 'SwapsEvent' THEN b.swap_event_index + ELSE row_number() OVER (PARTITION BY b.tx_id, b.index, s.inner_index ORDER BY b.inner_index)-1 + END AS swap_index, /* we want the swap index as it relates to the top level swap instruction */ b.succeeded, b.program_id AS swap_program_id, 'JUP6LkbZbjS1jKKwapdHNy74zcZ3tLUZoi5QNyVTaV4' AS aggregator_program_id, @@ -173,7 +227,12 @@ pre_final AS ( b.to_amount AS to_amount_int, b.to_amount * pow(10,-d2.decimal) AS to_amount, b._inserted_timestamp, - {{ dbt_utils.generate_surrogate_key(['b.tx_id','b.index','b.inner_index']) }} as swaps_inner_intermediate_jupiterv6_id, + CASE + WHEN b.event_type = 'SwapsEvent' THEN + {{ dbt_utils.generate_surrogate_key(['b.tx_id','b.index','coalesce(b.inner_index, -1)','coalesce(b.swap_event_index, 0)']) }} + ELSE + {{ dbt_utils.generate_surrogate_key(['b.tx_id','b.index','coalesce(b.inner_index, -1)']) }} + END as swaps_inner_intermediate_jupiterv6_id, sysdate() as inserted_timestamp, sysdate() as modified_timestamp, '{{ invocation_id }}' AS _invocation_id diff --git a/models/silver/swaps/jupiter/v6/silver__swaps_inner_intermediate_jupiterv6.yml b/models/silver/swaps/jupiter/v6/silver__swaps_inner_intermediate_jupiterv6.yml index 629c6acc..fb0200b1 100644 --- a/models/silver/swaps/jupiter/v6/silver__swaps_inner_intermediate_jupiterv6.yml +++ b/models/silver/swaps/jupiter/v6/silver__swaps_inner_intermediate_jupiterv6.yml @@ -7,6 +7,7 @@ models: - TX_ID - INDEX - INNER_INDEX + - SWAP_INDEX where: block_timestamp::date > current_date - 30 - compare_model_subset: name: silver__swaps_inner_intermediate_jupiterv6_business_logic_test @@ -65,9 +66,7 @@ models: tests: - not_null: *recent_date_filter - name: SWAP_PROGRAM_ID - description: "{{ doc('program_id') }}. This is the AMM performing the swap." - tests: - - not_null: *recent_date_filter + description: "{{ doc('program_id') }}. This is the AMM performing the swap. This will be NULL for new _v2 events introduced in Jupiter in Sept. 2025." - name: AGGREGATOR_PROGRAM_ID description: "{{ doc('program_id') }}. This is the aggregator calling the different AMMs." tests: diff --git a/models/silver/swaps/jupiter/v6/silver__swaps_intermediate_jupiterv6_2.sql b/models/silver/swaps/jupiter/v6/silver__swaps_intermediate_jupiterv6_2.sql index d27bc302..8cecb267 100644 --- a/models/silver/swaps/jupiter/v6/silver__swaps_intermediate_jupiterv6_2.sql +++ b/models/silver/swaps/jupiter/v6/silver__swaps_intermediate_jupiterv6_2.sql @@ -25,7 +25,7 @@ {{ ref('silver__decoded_instructions_combined') }} d WHERE program_id = 'JUP6LkbZbjS1jKKwapdHNy74zcZ3tLUZoi5QNyVTaV4' - AND event_type IN ('exactOutRoute','sharedAccountsExactOutRoute','sharedAccountsRoute','routeWithTokenLedger','route','sharedAccountsRouteWithTokenLedger', 'exact_out_route', 'shared_accounts_exact_out_route', 'shared_accounts_route', 'route_with_token_ledger', 'shared_accounts_route_with_token_ledger') + AND event_type IN ('exactOutRoute','sharedAccountsExactOutRoute','sharedAccountsRoute','routeWithTokenLedger','route','sharedAccountsRouteWithTokenLedger', 'exact_out_route', 'shared_accounts_exact_out_route', 'shared_accounts_route', 'route_with_token_ledger', 'shared_accounts_route_with_token_ledger', 'route_v2', 'shared_accounts_route_v2', 'shared_accounts_exact_out_route_v2', 'exact_out_route_v2') AND succeeded {% if is_incremental() %} AND _inserted_timestamp >= ( @@ -63,7 +63,7 @@ table(flatten(decoded_instruction:args:routePlan)) p WHERE program_id = 'JUP6LkbZbjS1jKKwapdHNy74zcZ3tLUZoi5QNyVTaV4' - AND event_type IN ('exactOutRoute','sharedAccountsExactOutRoute','sharedAccountsRoute','routeWithTokenLedger','route','sharedAccountsRouteWithTokenLedger', 'exact_out_route', 'shared_accounts_exact_out_route', 'shared_accounts_route', 'route_with_token_ledger', 'shared_accounts_route_with_token_ledger') + AND event_type IN ('exactOutRoute','sharedAccountsExactOutRoute','sharedAccountsRoute','routeWithTokenLedger','route','sharedAccountsRouteWithTokenLedger', 'exact_out_route', 'shared_accounts_exact_out_route', 'shared_accounts_route', 'route_with_token_ledger', 'shared_accounts_route_with_token_ledger', 'route_v2', 'shared_accounts_route_v2', 'shared_accounts_exact_out_route_v2', 'exact_out_route_v2') AND succeeded AND block_timestamp >= ( SELECT