Orca v2 lp actions (#750)

* wip model

* initial schema and tests

* update unique key

* separate out single token actions to their own cte

* formatting

* add to scheduler
This commit is contained in:
desmond-hui 2025-01-07 21:03:49 -08:00 committed by GitHub
parent bdc81a8957
commit 1dcd928f73
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 414 additions and 0 deletions

View File

@ -0,0 +1,5 @@
{% docs token_a_amount %}
Amount of the first token in a liquidity pool pair transferred during a liquidity pool action
{% enddocs %}

View File

@ -0,0 +1,5 @@
{% docs token_b_amount %}
Amount of the second token in a liquidity pool pair transferred during a liquidity pool action
{% enddocs %}

View File

@ -0,0 +1,306 @@
-- depends_on: {{ ref('silver__decoded_instructions_combined') }}
{{
config(
materialized = 'incremental',
incremental_strategy = 'merge',
unique_key = ['block_timestamp::date','liquidity_pool_actions_orcav2_id'],
incremental_predicates = ["dynamic_range_predicate", "block_timestamp::date"],
merge_exclude_columns = ["inserted_timestamp"],
cluster_by = ['block_timestamp::date','modified_timestamp::date'],
post_hook = enable_search_optimization(
'{{this.schema}}',
'{{this.identifier}}',
'ON EQUALITY(tx_id, provider_address, token_a_mint, token_b_mint, liquidity_pool_actions_orcav2_id)'
),
tags = ['scheduled_non_core']
)
}}
{% if execute %}
{% if is_incremental() %}
{% set max_timestamp_query %}
SELECT max(_inserted_timestamp) FROM {{ this }}
{% endset %}
{% set max_timestamp = run_query(max_timestamp_query)[0][0] %}
{% endif %}
{% set base_query %}
CREATE OR REPLACE TEMPORARY TABLE silver.liquidity_pool_actions_orcav2__intermediate_tmp AS
SELECT
block_id,
block_timestamp,
tx_id,
index,
inner_index,
succeeded,
event_type,
decoded_instruction:accounts AS accounts,
program_id,
_inserted_timestamp
FROM
{{ ref('silver__decoded_instructions_combined')}}
WHERE
program_id = '9W959DqEETiGZocYWCQPaJ6sBmUzgfxXfqGeTEdp3aQP'
AND event_type IN (
'depositAllTokenTypes',
'depositSingleTokenTypeExactAmountIn',
'withdrawAllTokenTypes',
'withdrawSingleTokenTypeExactAmountOut'
)
{% if is_incremental() %}
AND _inserted_timestamp > '{{ max_timestamp }}'
{% else %}
AND block_timestamp::date BETWEEN '2021-06-10' AND '2023-01-01'
{% endif %}
{% endset %}
{% do run_query(base_query) %}
{% set between_stmts = fsc_utils.dynamic_range_predicate("silver.liquidity_pool_actions_orcav2__intermediate_tmp","block_timestamp::date") %}
{% endif %}
WITH base AS (
SELECT
* exclude(accounts),
silver.udf_get_account_pubkey_by_name('swap', accounts) AS pool_address,
silver.udf_get_account_pubkey_by_name('userTransferAuthority', accounts) AS provider_address,
silver.udf_get_account_pubkey_by_name('swapTokenA', accounts) AS pool_token_a_account,
silver.udf_get_account_pubkey_by_name('swapTokenB', accounts) AS pool_token_b_account,
CASE
WHEN event_type = 'depositAllTokenTypes' THEN
silver.udf_get_account_pubkey_by_name('depositTokenA', accounts)
WHEN event_type = 'depositSingleTokenTypeExactAmountIn' THEN
silver.udf_get_account_pubkey_by_name('sourceToken', accounts)
WHEN event_type = 'withdrawAllTokenTypes' THEN
silver.udf_get_account_pubkey_by_name('destinationTokenA', accounts)
WHEN event_type = 'withdrawSingleTokenTypeExactAmountOut' THEN
silver.udf_get_account_pubkey_by_name('destination', accounts)
ELSE NULL
END AS token_a_account,
CASE
WHEN event_type = 'depositAllTokenTypes' THEN
silver.udf_get_account_pubkey_by_name('depositTokenB', accounts)
WHEN event_type = 'withdrawAllTokenTypes' THEN
silver.udf_get_account_pubkey_by_name('destinationTokenB', accounts)
ELSE NULL
END AS token_b_account,
coalesce(lead(inner_index) OVER (
PARTITION BY tx_id, index
ORDER BY inner_index
), 9999) AS next_lp_action_inner_index
FROM
silver.liquidity_pool_actions_orcav2__intermediate_tmp
),
transfers AS (
SELECT
* exclude(index),
split_part(index,'.',1)::int AS index,
nullif(split_part(index,'.',2),'')::int AS inner_index
FROM
{{ ref('silver__transfers') }}
WHERE
{{ between_stmts }}
),
deposit_transfers AS (
SELECT
b.*,
t.mint AS token_a_mint,
t.amount AS token_a_amount,
t2.mint AS token_b_mint,
t2.amount AS token_b_amount
FROM
base AS b
LEFT JOIN
transfers AS t
ON t.block_timestamp::date = b.block_timestamp::date
AND t.tx_id = b.tx_id
AND t.index = b.index
AND coalesce(t.inner_index,0) > coalesce(b.inner_index,-1)
AND coalesce(t.inner_index,0) < coalesce(b.next_lp_action_inner_index,9999)
AND t.source_token_account = b.token_a_account
AND t.dest_token_account = b.pool_token_a_account
LEFT JOIN
transfers AS t2
ON t2.block_timestamp::date = b.block_timestamp::date
AND t2.tx_id = b.tx_id
AND t2.index = b.index
AND coalesce(t2.inner_index,0) > coalesce(b.inner_index,-1)
AND coalesce(t2.inner_index,0) < coalesce(b.next_lp_action_inner_index,9999)
AND t2.source_token_account = b.token_b_account
AND t2.dest_token_account = b.pool_token_b_account
WHERE
b.event_type = 'depositAllTokenTypes'
),
single_deposit_transfers AS (
SELECT
b.*,
t.mint AS token_a_mint,
t.amount AS token_a_amount,
NULL AS token_b_mint,
NULL AS token_b_amount
FROM
base AS b
LEFT JOIN
transfers AS t
ON t.block_timestamp::date = b.block_timestamp::date
AND t.tx_id = b.tx_id
AND t.index = b.index
AND coalesce(t.inner_index,0) > coalesce(b.inner_index,-1)
AND coalesce(t.inner_index,0) < coalesce(b.next_lp_action_inner_index,9999)
AND t.source_token_account = b.token_a_account
AND t.dest_token_account IN (b.pool_token_a_account, b.pool_token_b_account)
WHERE
b.event_type = 'depositSingleTokenTypeExactAmountIn'
),
withdraw_transfers AS (
SELECT
b.*,
t.mint AS token_a_mint,
t.amount AS token_a_amount,
t2.mint AS token_b_mint,
t2.amount AS token_b_amount
FROM
base AS b
LEFT JOIN
transfers AS t
ON t.block_timestamp::date = b.block_timestamp::date
AND t.tx_id = b.tx_id
AND t.index = b.index
AND coalesce(t.inner_index,0) > coalesce(b.inner_index,-1)
AND coalesce(t.inner_index,0) < coalesce(b.next_lp_action_inner_index,9999)
AND t.dest_token_account = b.token_a_account
AND t.source_token_account = b.pool_token_a_account
LEFT JOIN
transfers AS t2
ON t2.block_timestamp::date = b.block_timestamp::date
AND t2.tx_id = b.tx_id
AND t2.index = b.index
AND coalesce(t2.inner_index,0) > coalesce(b.inner_index,-1)
AND coalesce(t2.inner_index,0) < coalesce(b.next_lp_action_inner_index,9999)
AND t2.dest_token_account = b.token_b_account
AND t2.source_token_account = b.pool_token_b_account
WHERE
b.event_type = 'withdrawAllTokenTypes'
),
single_withdraw_transfers AS (
SELECT
b.*,
t.mint AS token_a_mint,
t.amount AS token_a_amount,
NULL AS token_b_mint,
NULL AS token_b_amount
FROM
base AS b
LEFT JOIN
transfers AS t
ON t.block_timestamp::date = b.block_timestamp::date
AND t.tx_id = b.tx_id
AND t.index = b.index
AND coalesce(t.inner_index,0) > coalesce(b.inner_index,-1)
AND coalesce(t.inner_index,0) < coalesce(b.next_lp_action_inner_index,9999)
AND t.dest_token_account = b.token_a_account
AND t.source_token_account IN (b.pool_token_a_account, b.pool_token_b_account)
WHERE
b.event_type = 'withdrawSingleTokenTypeExactAmountOut'
)
SELECT
block_id,
block_timestamp,
tx_id,
index,
inner_index,
succeeded,
event_type,
pool_address,
provider_address,
token_a_mint,
token_a_amount,
token_b_mint,
token_b_amount,
program_id,
_inserted_timestamp,
{{ dbt_utils.generate_surrogate_key(['block_id', 'tx_id', 'index', 'inner_index']) }} AS liquidity_pool_actions_orcav2_id,
sysdate() AS inserted_timestamp,
sysdate() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
deposit_transfers
UNION ALL
SELECT
block_id,
block_timestamp,
tx_id,
index,
inner_index,
succeeded,
event_type,
pool_address,
provider_address,
token_a_mint,
token_a_amount,
token_b_mint,
token_b_amount,
program_id,
_inserted_timestamp,
{{ dbt_utils.generate_surrogate_key(['block_id', 'tx_id', 'index', 'inner_index']) }} AS liquidity_pool_actions_orcav2_id,
sysdate() AS inserted_timestamp,
sysdate() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
withdraw_transfers
UNION ALL
SELECT
block_id,
block_timestamp,
tx_id,
index,
inner_index,
succeeded,
event_type,
pool_address,
provider_address,
token_a_mint,
token_a_amount,
token_b_mint,
token_b_amount,
program_id,
_inserted_timestamp,
{{ dbt_utils.generate_surrogate_key(['block_id', 'tx_id', 'index', 'inner_index']) }} AS liquidity_pool_actions_orcav2_id,
sysdate() AS inserted_timestamp,
sysdate() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
single_deposit_transfers
UNION ALL
SELECT
block_id,
block_timestamp,
tx_id,
index,
inner_index,
succeeded,
event_type,
pool_address,
provider_address,
token_a_mint,
token_a_amount,
token_b_mint,
token_b_amount,
program_id,
_inserted_timestamp,
{{ dbt_utils.generate_surrogate_key(['block_id', 'tx_id', 'index', 'inner_index']) }} AS liquidity_pool_actions_orcav2_id,
sysdate() AS inserted_timestamp,
sysdate() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
single_withdraw_transfers

View File

@ -0,0 +1,98 @@
version: 2
models:
- name: silver__liquidity_pool_actions_orcav2
recent_date_filter: &recent_date_filter
config:
where: >
_inserted_timestamp < current_date - 7
data_tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- TX_ID
- INDEX
- INNER_INDEX
columns:
- name: BLOCK_TIMESTAMP
description: "{{ doc('block_timestamp') }}"
data_tests:
- not_null: *recent_date_filter
- name: BLOCK_ID
description: "{{ doc('block_id') }}"
data_tests:
- not_null: *recent_date_filter
- name: TX_ID
description: "{{ doc('tx_id') }}"
data_tests:
- not_null: *recent_date_filter
- name: INDEX
description: "{{ doc('event_index') }}"
data_tests:
- not_null: *recent_date_filter
- name: INNER_INDEX
description: "{{ doc('inner_index') }}"
- name: SUCCEEDED
description: "{{ doc('tx_succeeded') }}"
data_tests:
- not_null: *recent_date_filter
- name: EVENT_TYPE
description: "{{ doc('event_type') }}"
data_tests:
- not_null: *recent_date_filter
- name: POOL_ADDRESS
description: "{{ doc('liquidity_pool_address') }}"
data_tests:
- not_null: *recent_date_filter
- name: PROVIDER_ADDRESS
description: "{{ doc('liquidity_provider') }}"
data_tests:
- not_null: *recent_date_filter
- name: TOKEN_A_MINT
description: "{{ doc('token_a_mint') }}"
data_tests:
- not_null: *recent_date_filter
- name: TOKEN_A_AMOUNT
description: "{{ doc('token_a_amount') }}"
data_tests:
- not_null: *recent_date_filter
- name: TOKEN_B_MINT
description: "{{ doc('token_b_mint') }}"
data_tests:
- not_null:
config:
where: >
_inserted_timestamp < current_date - 7
AND event_type IN ('withdrawAllTokenTypes','depositAllTokenTypes')
- name: TOKEN_B_AMOUNT
description: "{{ doc('token_b_amount') }}"
data_tests:
- not_null:
config:
where: >
_inserted_timestamp < current_date - 7
AND event_type IN ('withdrawAllTokenTypes','depositAllTokenTypes')
- name: PROGRAM_ID
description: "{{ doc('program_id') }}"
data_tests:
- not_null: *recent_date_filter
- name: _INSERTED_TIMESTAMP
description: "{{ doc('_inserted_timestamp') }}"
data_tests:
- not_null
- name: LIQUIDITY_POOL_ACTIONS_ORCAV2_ID
description: '{{ doc("pk") }}'
data_tests:
- unique
- name: INSERTED_TIMESTAMP
description: '{{ doc("inserted_timestamp") }}'
data_tests:
- not_null: *recent_date_filter
- name: MODIFIED_TIMESTAMP
description: '{{ doc("modified_timestamp") }}'
data_tests:
- not_null: *recent_date_filter
- name: _INVOCATION_ID
description: '{{ doc("_invocation_id") }}'
data_tests:
- not_null:
name: test_silver__not_null_liquidity_pool_actions_orcav2_invocation_id
<<: *recent_date_filter