new v2 jupiter event_types (#890)

* new v2 jupiter event_types

* update pk and qualify
This commit is contained in:
tarikceric 2025-11-18 16:18:09 +01:00 committed by GitHub
parent 3e58156377
commit 226615c4e6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 69 additions and 12 deletions

View File

@ -50,7 +50,6 @@ models:
description: "{{ doc('swap_program_id') }}" description: "{{ doc('swap_program_id') }}"
tests: tests:
- dbt_expectations.expect_column_to_exist - dbt_expectations.expect_column_to_exist
- not_null: *recent_date_filter
- name: AGGREGATOR_PROGRAM_ID - name: AGGREGATOR_PROGRAM_ID
description: "{{ doc('aggregator_program_id') }}" description: "{{ doc('aggregator_program_id') }}"
tests: tests:

View File

@ -25,7 +25,7 @@
{{ ref('silver__decoded_logs') }} {{ ref('silver__decoded_logs') }}
WHERE WHERE
program_id = 'JUP6LkbZbjS1jKKwapdHNy74zcZ3tLUZoi5QNyVTaV4' program_id = 'JUP6LkbZbjS1jKKwapdHNy74zcZ3tLUZoi5QNyVTaV4'
AND event_type = 'SwapEvent' AND event_type IN ('SwapEvent', 'SwapsEvent')
AND succeeded AND succeeded
{% if is_incremental() %} {% if is_incremental() %}
AND _inserted_timestamp >= ( AND _inserted_timestamp >= (
@ -51,7 +51,7 @@
AND s.tx_id = l.tx_id AND s.tx_id = l.tx_id
WHERE WHERE
l.program_id = 'JUP6LkbZbjS1jKKwapdHNy74zcZ3tLUZoi5QNyVTaV4' l.program_id = 'JUP6LkbZbjS1jKKwapdHNy74zcZ3tLUZoi5QNyVTaV4'
AND l.event_type = 'SwapEvent' AND l.event_type IN ('SwapEvent', 'SwapsEvent')
AND l.succeeded AND l.succeeded
AND s.swapper IS NULL AND s.swapper IS NULL
AND s._inserted_timestamp >= current_date - 2 /* only look back 2 days */ AND s._inserted_timestamp >= current_date - 2 /* only look back 2 days */
@ -65,6 +65,7 @@
FROM FROM
base base
) )
-- Handle single SwapEvent records
SELECT SELECT
block_timestamp, block_timestamp,
block_id, block_id,
@ -79,6 +80,7 @@
decoded_log:args:outputMint::string AS to_mint, decoded_log:args:outputMint::string AS to_mint,
decoded_log:args:outputAmount::string AS to_amount, decoded_log:args:outputAmount::string AS to_amount,
_inserted_timestamp, _inserted_timestamp,
0 AS swap_event_index, -- Single swap always has index 0
FROM FROM
{{ ref('silver__decoded_logs') }} {{ ref('silver__decoded_logs') }}
JOIN JOIN
@ -105,6 +107,53 @@
{% else %} {% else %}
AND _inserted_timestamp::date < '2024-06-14' AND _inserted_timestamp::date < '2024-06-14'
{% endif %} {% endif %}
UNION ALL
-- Handle multiple SwapsEvent records (flatten the nested swaps)
SELECT
l.block_timestamp,
l.block_id,
l.tx_id,
l.index,
l.inner_index,
l.succeeded,
l.event_type,
NULL AS program_id, -- SwapsEvent doesn't have amm field
s.value:inputMint::string AS from_mint,
s.value:inputAmount::string AS from_amount,
s.value:outputMint::string AS to_mint,
s.value:outputAmount::string AS to_amount,
l._inserted_timestamp,
s.key::int AS swap_event_index, -- Add unique identifier for each swap in SwapsEvent
FROM
{{ ref('silver__decoded_logs') }} l
JOIN
distinct_entities
USING(tx_id)
JOIN
table(flatten(l.decoded_log:args:swapEvents)) s
WHERE
l.program_id = 'JUP6LkbZbjS1jKKwapdHNy74zcZ3tLUZoi5QNyVTaV4'
AND l.event_type = 'SwapsEvent'
AND l.succeeded
AND l.block_timestamp >= (
SELECT
MIN(block_timestamp)
FROM
distinct_entities
)
/* need to always keep the upper bound (if there is one) to prevent time gaps in incremental loading */
{% if is_incremental() %}
AND l._inserted_timestamp < (
SELECT
MAX(_inserted_timestamp) + INTERVAL '100 day'
FROM
{{ this }}
)
{% else %}
AND l._inserted_timestamp::date < '2024-06-14'
{% endif %}
{% endset %} {% endset %}
{% do run_query(base_query) %} {% do run_query(base_query) %}
{% set between_stmts = fsc_utils.dynamic_range_predicate("silver.swaps_inner_intermediate_jupiterv6__intermediate_tmp","block_timestamp::date") %} {% set between_stmts = fsc_utils.dynamic_range_predicate("silver.swaps_inner_intermediate_jupiterv6__intermediate_tmp","block_timestamp::date") %}
@ -116,7 +165,9 @@ WITH base AS (
FROM FROM
silver.swaps_inner_intermediate_jupiterv6__intermediate_tmp silver.swaps_inner_intermediate_jupiterv6__intermediate_tmp
QUALIFY QUALIFY
row_number() OVER (PARTITION BY tx_id, index, coalesce(inner_index,-1) ORDER BY _inserted_timestamp DESC) = 1 -- For SwapsEvent, partition includes swap_event_index to handle multiple swaps in one event.
-- For SwapEvent, swap_event_index is 0 and doesn't alter the original partitioning logic.
row_number() OVER (PARTITION BY tx_id, index, coalesce(inner_index,-1), coalesce(swap_event_index, 0) ORDER BY _inserted_timestamp DESC) = 1
), ),
swappers AS ( swappers AS (
SELECT SELECT
@ -134,7 +185,7 @@ swappers AS (
WHERE WHERE
{{ between_stmts }} {{ between_stmts }}
AND program_id = 'JUP6LkbZbjS1jKKwapdHNy74zcZ3tLUZoi5QNyVTaV4' AND program_id = 'JUP6LkbZbjS1jKKwapdHNy74zcZ3tLUZoi5QNyVTaV4'
AND event_type IN ('exactOutRoute', 'sharedAccountsExactOutRoute', 'sharedAccountsRoute', 'routeWithTokenLedger', 'route', 'sharedAccountsRouteWithTokenLedger', 'exact_out_route', 'shared_accounts_exact_out_route', 'shared_accounts_route', 'route_with_token_ledger', 'shared_accounts_route_with_token_ledger') AND event_type IN ('exactOutRoute', 'sharedAccountsExactOutRoute', 'sharedAccountsRoute', 'routeWithTokenLedger', 'route', 'sharedAccountsRouteWithTokenLedger', 'exact_out_route', 'shared_accounts_exact_out_route', 'shared_accounts_route', 'route_with_token_ledger', 'shared_accounts_route_with_token_ledger', 'route_v2', 'shared_accounts_route_v2', 'shared_accounts_exact_out_route_v2', 'exact_out_route_v2')
AND swapper IS NOT NULL AND swapper IS NOT NULL
QUALIFY QUALIFY
row_number() OVER (PARTITION BY tx_id, index, coalesce(inner_index, -1) ORDER BY _inserted_timestamp DESC) = 1 row_number() OVER (PARTITION BY tx_id, index, coalesce(inner_index, -1) ORDER BY _inserted_timestamp DESC) = 1
@ -161,7 +212,10 @@ pre_final AS (
b.tx_id, b.tx_id,
b.index, b.index,
b.inner_index, b.inner_index,
row_number() OVER (PARTITION BY b.tx_id, b.index, s.inner_index ORDER BY b.inner_index)-1 AS swap_index, /* we want the swap index as it relates to the top level swap instruction */ CASE
WHEN b.event_type = 'SwapsEvent' THEN b.swap_event_index
ELSE row_number() OVER (PARTITION BY b.tx_id, b.index, s.inner_index ORDER BY b.inner_index)-1
END AS swap_index, /* we want the swap index as it relates to the top level swap instruction */
b.succeeded, b.succeeded,
b.program_id AS swap_program_id, b.program_id AS swap_program_id,
'JUP6LkbZbjS1jKKwapdHNy74zcZ3tLUZoi5QNyVTaV4' AS aggregator_program_id, 'JUP6LkbZbjS1jKKwapdHNy74zcZ3tLUZoi5QNyVTaV4' AS aggregator_program_id,
@ -173,7 +227,12 @@ pre_final AS (
b.to_amount AS to_amount_int, b.to_amount AS to_amount_int,
b.to_amount * pow(10,-d2.decimal) AS to_amount, b.to_amount * pow(10,-d2.decimal) AS to_amount,
b._inserted_timestamp, b._inserted_timestamp,
{{ dbt_utils.generate_surrogate_key(['b.tx_id','b.index','b.inner_index']) }} as swaps_inner_intermediate_jupiterv6_id, CASE
WHEN b.event_type = 'SwapsEvent' THEN
{{ dbt_utils.generate_surrogate_key(['b.tx_id','b.index','coalesce(b.inner_index, -1)','coalesce(b.swap_event_index, 0)']) }}
ELSE
{{ dbt_utils.generate_surrogate_key(['b.tx_id','b.index','coalesce(b.inner_index, -1)']) }}
END as swaps_inner_intermediate_jupiterv6_id,
sysdate() as inserted_timestamp, sysdate() as inserted_timestamp,
sysdate() as modified_timestamp, sysdate() as modified_timestamp,
'{{ invocation_id }}' AS _invocation_id '{{ invocation_id }}' AS _invocation_id

View File

@ -7,6 +7,7 @@ models:
- TX_ID - TX_ID
- INDEX - INDEX
- INNER_INDEX - INNER_INDEX
- SWAP_INDEX
where: block_timestamp::date > current_date - 30 where: block_timestamp::date > current_date - 30
- compare_model_subset: - compare_model_subset:
name: silver__swaps_inner_intermediate_jupiterv6_business_logic_test name: silver__swaps_inner_intermediate_jupiterv6_business_logic_test
@ -65,9 +66,7 @@ models:
tests: tests:
- not_null: *recent_date_filter - not_null: *recent_date_filter
- name: SWAP_PROGRAM_ID - name: SWAP_PROGRAM_ID
description: "{{ doc('program_id') }}. This is the AMM performing the swap." description: "{{ doc('program_id') }}. This is the AMM performing the swap. This will be NULL for new _v2 events introduced in Jupiter in Sept. 2025."
tests:
- not_null: *recent_date_filter
- name: AGGREGATOR_PROGRAM_ID - name: AGGREGATOR_PROGRAM_ID
description: "{{ doc('program_id') }}. This is the aggregator calling the different AMMs." description: "{{ doc('program_id') }}. This is the aggregator calling the different AMMs."
tests: tests:

View File

@ -25,7 +25,7 @@
{{ ref('silver__decoded_instructions_combined') }} d {{ ref('silver__decoded_instructions_combined') }} d
WHERE WHERE
program_id = 'JUP6LkbZbjS1jKKwapdHNy74zcZ3tLUZoi5QNyVTaV4' program_id = 'JUP6LkbZbjS1jKKwapdHNy74zcZ3tLUZoi5QNyVTaV4'
AND event_type IN ('exactOutRoute','sharedAccountsExactOutRoute','sharedAccountsRoute','routeWithTokenLedger','route','sharedAccountsRouteWithTokenLedger', 'exact_out_route', 'shared_accounts_exact_out_route', 'shared_accounts_route', 'route_with_token_ledger', 'shared_accounts_route_with_token_ledger') AND event_type IN ('exactOutRoute','sharedAccountsExactOutRoute','sharedAccountsRoute','routeWithTokenLedger','route','sharedAccountsRouteWithTokenLedger', 'exact_out_route', 'shared_accounts_exact_out_route', 'shared_accounts_route', 'route_with_token_ledger', 'shared_accounts_route_with_token_ledger', 'route_v2', 'shared_accounts_route_v2', 'shared_accounts_exact_out_route_v2', 'exact_out_route_v2')
AND succeeded AND succeeded
{% if is_incremental() %} {% if is_incremental() %}
AND _inserted_timestamp >= ( AND _inserted_timestamp >= (
@ -63,7 +63,7 @@
table(flatten(decoded_instruction:args:routePlan)) p table(flatten(decoded_instruction:args:routePlan)) p
WHERE WHERE
program_id = 'JUP6LkbZbjS1jKKwapdHNy74zcZ3tLUZoi5QNyVTaV4' program_id = 'JUP6LkbZbjS1jKKwapdHNy74zcZ3tLUZoi5QNyVTaV4'
AND event_type IN ('exactOutRoute','sharedAccountsExactOutRoute','sharedAccountsRoute','routeWithTokenLedger','route','sharedAccountsRouteWithTokenLedger', 'exact_out_route', 'shared_accounts_exact_out_route', 'shared_accounts_route', 'route_with_token_ledger', 'shared_accounts_route_with_token_ledger') AND event_type IN ('exactOutRoute','sharedAccountsExactOutRoute','sharedAccountsRoute','routeWithTokenLedger','route','sharedAccountsRouteWithTokenLedger', 'exact_out_route', 'shared_accounts_exact_out_route', 'shared_accounts_route', 'route_with_token_ledger', 'shared_accounts_route_with_token_ledger', 'route_v2', 'shared_accounts_route_v2', 'shared_accounts_exact_out_route_v2', 'exact_out_route_v2')
AND succeeded AND succeeded
AND block_timestamp >= ( AND block_timestamp >= (
SELECT SELECT