split out aftermath

This commit is contained in:
Jack Forgash 2025-08-12 16:00:57 -04:00
parent c1cd794e16
commit efd3f3b718
4 changed files with 574 additions and 0 deletions

View File

@ -0,0 +1,61 @@
-- Query to handle Aftermath duplicate events by detecting module patterns
WITH aftermath_events AS (
SELECT
*,
-- Detect Aftermath duplicate pattern: events module followed by aftermath module
CASE
WHEN package_id = '0x8ae871505a80d8bf6bf9c05906cda6edfeea460c85bebe2e26a4313f5e67874a'
AND pool_address IS NOT NULL
AND event_module = 'aftermath'
AND LAG(event_module) OVER (PARTITION BY tx_digest ORDER BY event_index) = 'events'
AND LAG(pool_address) OVER (PARTITION BY tx_digest ORDER BY event_index) = pool_address
AND LAG(event_index) OVER (PARTITION BY tx_digest ORDER BY event_index) = event_index - 1
AND LAG(amount_out_raw) OVER (PARTITION BY tx_digest ORDER BY event_index) = amount_out_raw
-- Check if amount_in differs by ~0.05% (Aftermath fee)
AND ABS(LAG(amount_in_raw) OVER (PARTITION BY tx_digest ORDER BY event_index) - amount_in_raw) / NULLIF(amount_in_raw, 0) BETWEEN 0.0004 AND 0.0006
THEN TRUE
ELSE FALSE
END AS is_aftermath_duplicate,
-- Detect Kriya AMM duplicate pattern: spot_dex module followed by kriya_amm module
CASE
WHEN package_id = '0x8ae871505a80d8bf6bf9c05906cda6edfeea460c85bebe2e26a4313f5e67874a'
AND pool_address IS NOT NULL
AND event_module = 'kriya_amm'
AND LAG(event_module) OVER (PARTITION BY tx_digest ORDER BY event_index) = 'spot_dex'
AND LAG(pool_address) OVER (PARTITION BY tx_digest ORDER BY event_index) = pool_address
AND LAG(event_index) OVER (PARTITION BY tx_digest ORDER BY event_index) = event_index - 1
AND LAG(amount_out_raw) OVER (PARTITION BY tx_digest ORDER BY event_index) = amount_out_raw
-- Check if amount_in differs by ~0.063% (Kriya fee)
AND ABS(LAG(amount_in_raw) OVER (PARTITION BY tx_digest ORDER BY event_index) - amount_in_raw) / NULLIF(amount_in_raw, 0) BETWEEN 0.0005 AND 0.0008
THEN TRUE
ELSE FALSE
END AS is_kriya_duplicate,
-- Get previous event's key fields for grouping
LAG(amount_in_raw) OVER (PARTITION BY tx_digest ORDER BY event_index) AS prev_amount_in,
LAG(event_module) OVER (PARTITION BY tx_digest ORDER BY event_index) AS prev_event_module
FROM swaps
),
swaps_with_groups AS (
SELECT
*,
-- Create base group key - for duplicates, use the normalized amounts
CASE
WHEN is_aftermath_duplicate OR is_kriya_duplicate THEN
-- For duplicates, group them together by using the larger amount_in (actual amount)
CONCAT(
pool_address,
'|',
GREATEST(COALESCE(amount_in_raw::STRING, '0'), COALESCE(prev_amount_in::STRING, '0')),
'|',
COALESCE(amount_out_raw::STRING, '0')
)
WHEN pool_address IS NOT NULL THEN
CONCAT(pool_address, '|', COALESCE(amount_in_raw::STRING, '0'), '|', COALESCE(amount_out_raw::STRING, '0'))
ELSE
CONCAT(package_id, '|', transaction_module, '|', COALESCE(amount_in_raw::STRING, '0'), '|', COALESCE(amount_out_raw::STRING, '0'))
END AS base_group_key
FROM aftermath_events
)
-- Continue with the rest of the grouping logic...

View File

@ -9,6 +9,7 @@
) }}
WITH base_swaps AS (
-- Union regular DEX swaps with Aftermath DEX swaps
SELECT
checkpoint_number,
block_timestamp,
@ -39,6 +40,39 @@ WITH base_swaps AS (
FROM {{ this }}
)
{% endif %}
UNION ALL
SELECT
checkpoint_number,
block_timestamp,
tx_digest,
event_index,
swap_index,
event_module,
platform_address,
pool_address,
amount_in_raw,
amount_out_raw,
a_to_b,
fee_amount_raw,
partner_address,
steps,
token_in_type,
token_out_type,
trader_address,
dex_swaps_id,
modified_timestamp,
token_in_from_txs, -- TEMP
token_out_from_txs -- TEMP
FROM {{ ref('silver__aftermath_dex_swaps') }}
WHERE 1=1
{% if is_incremental() %}
AND modified_timestamp >= (
SELECT COALESCE(MAX(modified_timestamp), '1900-01-01'::TIMESTAMP)
FROM {{ this }}
)
{% endif %}
),
prices AS (
SELECT

View File

@ -0,0 +1,476 @@
-- depends_on: {{ ref('core__fact_events') }}
{{ config (
materialized = "incremental",
unique_key = "dex_swaps_id",
cluster_by = ['modified_timestamp::DATE','block_timestamp::DATE'],
incremental_predicates = ["dynamic_range_predicate", "block_timestamp::date"],
merge_exclude_columns = ["inserted_timestamp"],
tags = ['scheduled_non_core']
) }}
WITH core_events AS (
SELECT
checkpoint_number,
block_timestamp,
tx_digest,
tx_sender,
event_index,
type,
event_address,
event_module,
event_resource,
package_id,
transaction_module,
parsed_json,
modified_timestamp
FROM
{{ ref('core__fact_events') }}
WHERE
{% if is_incremental() %}
modified_timestamp >= (
SELECT
COALESCE(MAX(modified_timestamp), '1900-01-01'::TIMESTAMP) AS modified_timestamp
FROM
{{ this }}
)
AND
{% endif %}
-- Only include Aftermath module swaps
transaction_module = 'aftermath'
AND (
event_resource ILIKE '%swap%'
OR event_resource IN (
'Swap',
'OrderFilled',
'TradeEvent'
)
)
-- edge cases that are not swaps
AND event_resource NOT IN (
'RepayFlashSwapEvent'
)
AND event_resource NOT ILIKE '%bondingcurve%'
-- limit to 30 days for dev
AND block_timestamp >= sysdate() - interval '30 days'
),
core_transactions AS (
SELECT
tx_digest,
payload_index,
payload_details:function::string as function,
payload_details:module::string as module,
payload_details:package::string as package,
payload_details:type_arguments::ARRAY as type_arguments,
payload_details,
row_number() over (partition by tx_digest, package, module order by payload_index) as package_index
FROM
{{ ref('core__fact_transactions') }}
WHERE
{% if is_incremental() %}
modified_timestamp >= (
SELECT
COALESCE(MAX(modified_timestamp), '1900-01-01'::TIMESTAMP) AS modified_timestamp
FROM
{{ this }}
)
AND
{% endif %}
date_trunc('day', block_timestamp) >= (SELECT MIN(date_trunc('day', block_timestamp)) FROM core_events)
AND tx_digest IN (SELECT DISTINCT tx_digest FROM core_events)
),
swaps AS (
SELECT
checkpoint_number,
block_timestamp,
tx_digest,
tx_sender,
event_index,
type,
event_module,
event_resource,
package_id,
transaction_module,
ROW_NUMBER() OVER (PARTITION BY tx_digest, package_id, transaction_module ORDER BY event_index) AS package_index,
event_address AS platform_address,
COALESCE(
parsed_json:pool::STRING,
parsed_json:pool_address::STRING,
parsed_json:pool_id::STRING,
parsed_json:event:pool::STRING,
parsed_json:event:pool_address::STRING,
parsed_json:event:pool_id::STRING
) AS pool_address,
-- Handle different direction field patterns
COALESCE(
parsed_json:a2b::BOOLEAN,
parsed_json:a_to_b::BOOLEAN,
parsed_json:atob::BOOLEAN,
parsed_json:x_for_y::BOOLEAN,
parsed_json:event:a2b::BOOLEAN
) AS a_to_b,
-- Token In amounts
IFF(a_to_b,
COALESCE(
parsed_json:amount_in::NUMBER,
parsed_json:amounts_in[0]::NUMBER,
parsed_json:amount_a::NUMBER,
parsed_json:amount_x::NUMBER,
parsed_json:event:amount_in::NUMBER,
parsed_json:coin_in_amount::NUMBER
),
COALESCE(
parsed_json:amount_in::NUMBER,
parsed_json:amounts_in[0]::NUMBER,
parsed_json:amount_b::NUMBER,
parsed_json:amount_y::NUMBER,
parsed_json:event:amount_in::NUMBER,
parsed_json:coin_in_amount::NUMBER
)
) AS amount_in_raw,
-- Token In types
IFF(a_to_b,
COALESCE(
parsed_json:coin_a:name::STRING,
parsed_json:coin_in:name::STRING,
parsed_json:coin_in::STRING,
parsed_json:type_in::STRING,
parsed_json:event:coin_in::STRING,
parsed_json:coin_in_type:name::STRING,
parsed_json:types_in[0]::STRING
),
COALESCE(
parsed_json:coin_b:name::STRING,
parsed_json:coin_in:name::STRING,
parsed_json:coin_in::STRING,
parsed_json:type_in::STRING,
parsed_json:event:coin_in::STRING,
parsed_json:coin_in_type:name::STRING,
parsed_json:types_in[0]::STRING
)
) AS token_in_type,
-- Token Out amounts
IFF(a_to_b,
COALESCE(
parsed_json:amount_out::NUMBER,
parsed_json:amounts_out[0]::NUMBER,
parsed_json:amount_b::NUMBER,
parsed_json:amount_y::NUMBER,
parsed_json:event:amount_out::NUMBER,
parsed_json:coin_out_amount::NUMBER
),
COALESCE(
parsed_json:amount_out::NUMBER,
parsed_json:amounts_out[0]::NUMBER,
parsed_json:amount_a::NUMBER,
parsed_json:amount_x::NUMBER,
parsed_json:event:amount_out::NUMBER,
parsed_json:coin_out_amount::NUMBER
)
) AS amount_out_raw,
-- Token Out types
IFF(a_to_b,
COALESCE(
parsed_json:coin_b:name::STRING,
parsed_json:coin_out:name::STRING,
parsed_json:coin_out::STRING,
parsed_json:type_out::STRING,
parsed_json:event:coin_out::STRING,
parsed_json:coin_out_type:name::STRING,
parsed_json:types_out[0]::STRING
),
COALESCE(
parsed_json:coin_a:name::STRING,
parsed_json:coin_out:name::STRING,
parsed_json:coin_out::STRING,
parsed_json:type_out::STRING,
parsed_json:event:coin_out::STRING,
parsed_json:coin_out_type:name::STRING,
parsed_json:types_out[0]::STRING
)
) AS token_out_type,
COALESCE(
parsed_json:fee_amount::NUMBER,
parsed_json:protocol_fee_amount::NUMBER,
parsed_json:protocol_fee::NUMBER
) AS fee_amount_raw,
COALESCE(
parsed_json:partner_id::STRING,
parsed_json:partner::STRING
) AS partner_address,
COALESCE(parsed_json:steps::NUMBER, 1) AS steps,
tx_sender AS trader_address,
modified_timestamp,
parsed_json
FROM
core_events
),
-- Aftermath-specific duplicate detection and grouping
aftermath_dedup AS (
SELECT
*,
-- Detect duplicate pattern: 'events' module followed by 'aftermath' module
CASE
WHEN event_module = 'aftermath'
AND pool_address IS NOT NULL
AND LAG(event_module) OVER (PARTITION BY tx_digest ORDER BY event_index) = 'events'
AND LAG(pool_address) OVER (PARTITION BY tx_digest ORDER BY event_index) = pool_address
AND LAG(event_index) OVER (PARTITION BY tx_digest ORDER BY event_index) = event_index - 1
AND LAG(amount_out_raw) OVER (PARTITION BY tx_digest ORDER BY event_index) = amount_out_raw
-- Check if amount_in differs by ~0.05% (Aftermath fee)
AND ABS(LAG(amount_in_raw) OVER (PARTITION BY tx_digest ORDER BY event_index) - amount_in_raw) / NULLIF(amount_in_raw, 0) BETWEEN 0.0004 AND 0.0006
THEN TRUE
ELSE FALSE
END AS is_duplicate_event,
-- Get the actual amount (larger amount_in) for duplicates
CASE
WHEN event_module = 'aftermath'
AND LAG(event_module) OVER (PARTITION BY tx_digest ORDER BY event_index) = 'events'
AND LAG(pool_address) OVER (PARTITION BY tx_digest ORDER BY event_index) = pool_address
AND LAG(amount_out_raw) OVER (PARTITION BY tx_digest ORDER BY event_index) = amount_out_raw
THEN GREATEST(amount_in_raw, LAG(amount_in_raw) OVER (PARTITION BY tx_digest ORDER BY event_index))
ELSE amount_in_raw
END AS normalized_amount_in
FROM swaps
),
-- Add LEAD values for filtering
events_with_lead AS (
SELECT
*,
LEAD(event_module) OVER (PARTITION BY tx_digest ORDER BY event_index) AS next_event_module,
LEAD(pool_address) OVER (PARTITION BY tx_digest ORDER BY event_index) AS next_pool_address,
LEAD(event_index) OVER (PARTITION BY tx_digest ORDER BY event_index) AS next_event_index,
LEAD(amount_out_raw) OVER (PARTITION BY tx_digest ORDER BY event_index) AS next_amount_out_raw
FROM aftermath_dedup
),
-- Filter out the duplicate 'events' module entries
filtered_swaps AS (
SELECT
checkpoint_number,
block_timestamp,
tx_digest,
tx_sender,
event_index,
type,
event_module,
event_resource,
package_id,
transaction_module,
package_index,
platform_address,
pool_address,
a_to_b,
normalized_amount_in,
token_in_type,
amount_out_raw,
token_out_type,
fee_amount_raw,
partner_address,
steps,
trader_address,
modified_timestamp,
parsed_json,
is_duplicate_event
FROM events_with_lead
WHERE
-- Keep 'aftermath' module entries (with normalized amounts)
-- Skip 'events' module entries that have a duplicate 'aftermath' entry
NOT (
event_module = 'events'
AND next_event_module = 'aftermath'
AND next_pool_address = pool_address
AND next_event_index = event_index + 1
AND next_amount_out_raw = amount_out_raw
)
),
-- Group swap events to determine the swap_index within the transaction
swaps_with_groups AS (
SELECT
*,
-- Use normalized amounts for grouping
CASE
WHEN pool_address IS NOT NULL THEN
CONCAT(pool_address, '|', COALESCE(normalized_amount_in::STRING, '0'), '|', COALESCE(amount_out_raw::STRING, '0'))
ELSE
CONCAT(package_id, '|', transaction_module, '|', COALESCE(normalized_amount_in::STRING, '0'), '|', COALESCE(amount_out_raw::STRING, '0'))
END AS base_group_key,
-- Find gaps in event_index sequence within the same base group
LAG(event_index) OVER (
PARTITION BY tx_digest,
CASE
WHEN pool_address IS NOT NULL THEN
CONCAT(pool_address, '|', COALESCE(normalized_amount_in::STRING, '0'), '|', COALESCE(amount_out_raw::STRING, '0'))
ELSE
CONCAT(package_id, '|', transaction_module, '|', COALESCE(normalized_amount_in::STRING, '0'), '|', COALESCE(amount_out_raw::STRING, '0'))
END
ORDER BY event_index
) AS prev_event_index
FROM filtered_swaps
),
swaps_with_gap_detection AS (
SELECT
*,
-- Detect if there's a significant gap (>3) between consecutive events with same base_group_key
CASE
WHEN prev_event_index IS NULL THEN 0
WHEN (event_index - prev_event_index) > 3 THEN 1
ELSE 0
END AS is_new_group,
-- Create running sum to generate unique group identifiers
SUM(
CASE
WHEN prev_event_index IS NULL THEN 0
WHEN (event_index - prev_event_index) > 3 THEN 1
ELSE 0
END
) OVER (
PARTITION BY tx_digest, base_group_key
ORDER BY event_index
ROWS UNBOUNDED PRECEDING
) AS group_sequence
FROM swaps_with_groups
),
swaps_with_final_groups AS (
SELECT
*,
-- Create final group key that includes the sequence number for gap detection
CONCAT(base_group_key, '|seq:', group_sequence::STRING) AS final_group_key,
-- Get minimum event_index for each final group
MIN(event_index) OVER (
PARTITION BY tx_digest, CONCAT(base_group_key, '|seq:', group_sequence::STRING)
) AS group_min_event_index
FROM swaps_with_gap_detection
),
swaps_with_index AS (
SELECT
*,
-- Use DENSE_RANK to create swap_index based on group_min_event_index
DENSE_RANK() OVER (
PARTITION BY tx_digest
ORDER BY group_min_event_index
) as swap_index
FROM swaps_with_final_groups
),
deduplicate_swaps AS (
SELECT
*
FROM swaps_with_index
qualify row_number() over (
partition by tx_digest, swap_index
order by token_in_type IS NOT NULL DESC, token_out_type IS NOT NULL DESC
) = 1
),
append_transaction_data AS (
SELECT
s.checkpoint_number,
s.block_timestamp,
s.tx_digest,
s.event_index,
s.swap_index,
s.type,
s.event_module,
s.package_id,
s.package_index,
s.transaction_module,
s.event_resource,
s.platform_address,
s.trader_address,
s.pool_address,
s.normalized_amount_in AS amount_in_raw, -- Use normalized amount
COALESCE(
s.token_in_type,
IFF(a_to_b,
t.type_arguments[0] :: STRING,
t.type_arguments[1] :: STRING
)
) AS token_in_type,
s.token_in_type IS NULL AS token_in_from_txs,
s.amount_out_raw,
COALESCE(
s.token_out_type,
IFF(a_to_b,
t.type_arguments[1] :: STRING,
t.type_arguments[0] :: STRING
)
) AS token_out_type,
s.token_out_type IS NULL AS token_out_from_txs,
payload_details,
s.a_to_b,
s.fee_amount_raw,
s.partner_address,
s.steps,
s.parsed_json,
s.modified_timestamp
FROM
deduplicate_swaps s
LEFT JOIN core_transactions t
ON s.tx_digest = t.tx_digest
AND s.package_id = t.package
AND s.transaction_module = t.module
AND s.package_index = t.package_index
AND (s.token_in_type IS NULL OR s.token_out_type IS NULL)
)
SELECT
checkpoint_number,
block_timestamp,
tx_digest,
event_index,
type,
event_module,
package_id,
transaction_module,
event_resource,
platform_address,
trader_address,
pool_address,
amount_in_raw,
IFF(
LEFT(token_in_type, 2) = '0x',
token_in_type,
'0x' || token_in_type
) AS token_in_type,
amount_out_raw,
IFF(
LEFT(token_out_type, 2) = '0x',
token_out_type,
'0x' || token_out_type
) AS token_out_type,
token_in_from_txs,
token_out_from_txs,
a_to_b,
fee_amount_raw,
partner_address,
steps,
swap_index,
package_index,
parsed_json,
payload_details,
{{ dbt_utils.generate_surrogate_key(['tx_digest', 'trader_address', 'token_in_type', 'token_out_type', 'amount_in_raw', 'amount_out_raw']) }} AS dex_swaps_id,
SYSDATE() AS inserted_timestamp,
modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
append_transaction_data

View File

@ -54,6 +54,9 @@ WITH core_events AS (
)
AND event_resource NOT ILIKE '%bondingcurve%'
-- exclude aftermath module swaps
AND transaction_module != 'aftermath'
-- limit to 30 days for dev
AND block_timestamp >= sysdate() - interval '30 days'
),