Marinade ez pool lp actions (#768)

* initial model, docs and tests

* formatting

* tmp view to pivot meteora lp data

* tmp view to pivot meteora dlmm lp data

* initial model, docs and tests

* add meta tags

* add test

* fix regex

* add whirlpools

* fix SO
This commit is contained in:
desmond-hui 2025-01-13 17:54:19 -08:00 committed by GitHub
parent f7aba13964
commit b10cb4a44d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 509 additions and 0 deletions

View File

@ -0,0 +1,154 @@
{{
config(
materialized = 'incremental',
incremental_strategy = 'merge',
meta={
'database_tags':{
'table': {
'PROTOCOL': 'MARINADE',
'PURPOSE': 'STAKING'
}
}
},
unique_key = ['ez_liquidity_pool_actions_id'],
merge_exclude_columns = ['inserted_timestamp'],
post_hook = enable_search_optimization(
'{{this.schema}}',
'{{this.identifier}}',
'ON EQUALITY(ez_liquidity_pool_actions_id, pool_address, provider_address, tx_id, action_type, token_a_mint, token_b_mint)'),
tags = ['scheduled_non_core'],
)
}}
{% set pool_platforms = [
'raydiumv4',
'raydium_cpmm',
'raydium_clmm',
'orcav1',
'orcav2',
'meteora',
'meteora_dlmm',
'orca_whirlpool',
] %}
WITH marinade_pool_tokens AS (
SELECT DISTINCT
token_a_mint AS token_address
FROM
{{ ref('marinade__dim_pools') }}
UNION
SELECT DISTINCT
token_b_mint
FROM
{{ ref('marinade__dim_pools') }}
),
base AS (
{% for platform in pool_platforms %}
select
lp.block_id,
lp.block_timestamp,
lp.tx_id,
lp.index,
lp.inner_index,
case
WHEN REGEXP_LIKE(lp.event_type, '^(increase|add|deposit|bootstrap|open).*', 'i') THEN
'deposit'
WHEN REGEXP_LIKE(lp.event_type, '^(decrease|remove|withdraw|close).*', 'i') THEN
'withdraw'
ELSE
lp.event_type
END AS action_type,
lp.provider_address,
lp.token_a_mint,
case
when lp.token_a_mint = m.token_a_mint then
m.token_a_symbol
when lp.token_a_mint = m.token_b_mint then
m.token_b_symbol
end AS token_a_symbol,
lp.token_a_amount,
lp.token_b_mint,
case
when lp.token_b_mint = m.token_a_mint then
m.token_a_symbol
when lp.token_b_mint = m.token_b_mint then
m.token_b_symbol
end AS token_b_symbol,
lp.token_b_amount,
lp.pool_address,
m.pool_name,
m.is_msol_pool,
m.is_mnde_pool,
lp.program_id,
m.platform,
lp.liquidity_pool_actions_{{ platform }}_id AS ez_liquidity_pool_actions_id
from
{% if platform == 'meteora' or platform == 'meteora_dlmm' %}
{{ ref('marinade__' ~ platform ~ '_pivot') }} AS lp
{% else %}
{{ ref('silver__liquidity_pool_actions_' ~ platform) }} AS lp
{% endif %}
inner join
{{ ref('marinade__dim_pools') }} AS m
using(pool_address)
{% if is_incremental() %}
where lp.modified_timestamp > (select max(modified_timestamp) from {{ this }})
{% endif %}
{% if not loop.last %}
union all
{% endif %}
{% endfor %}
),
token_prices AS (
SELECT
HOUR,
p.token_address,
price
FROM
{{ ref('price__ez_prices_hourly') }} AS p
INNER JOIN
marinade_pool_tokens AS m
ON p.token_address = m.token_address
WHERE
HOUR :: DATE IN (
SELECT
DISTINCT block_timestamp :: DATE
FROM
base
)
)
SELECT
block_id,
block_timestamp,
tx_id,
index,
inner_index,
action_type,
provider_address,
token_a_mint,
token_a_symbol,
token_a_amount,
(token_a_amount * tp_a.price)::numeric(20,8) AS token_a_amount_usd,
token_b_mint,
token_b_symbol,
token_b_amount,
(token_b_amount * tp_b.price)::numeric(20,8) AS token_b_amount_usd,
pool_address,
pool_name,
is_msol_pool,
is_mnde_pool,
program_id,
platform,
ez_liquidity_pool_actions_id,
sysdate() AS inserted_timestamp,
sysdate() AS modified_timestamp
FROM
base AS b
LEFT JOIN
token_prices AS tp_a
ON date_trunc('hour', b.block_timestamp) = tp_a.HOUR
AND b.token_a_mint = tp_a.token_address
LEFT JOIN
token_prices AS tp_b
ON date_trunc('hour', b.block_timestamp) = tp_b.HOUR
AND b.token_b_mint = tp_b.token_address

View File

@ -0,0 +1,103 @@
version: 2
models:
- name: marinade__ez_liquidity_pool_actions
recent_date_filter: &recent_date_filter
config:
where: >
modified_timestamp > current_date - 7
data_tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- TX_ID
- INDEX
- INNER_INDEX
columns:
- name: BLOCK_TIMESTAMP
description: "{{ doc('block_timestamp') }}"
data_tests:
- not_null: *recent_date_filter
- name: BLOCK_ID
description: "{{ doc('block_id') }}"
data_tests:
- not_null: *recent_date_filter
- name: TX_ID
description: "{{ doc('tx_id') }}"
data_tests:
- not_null: *recent_date_filter
- name: INDEX
description: "{{ doc('event_index') }}"
data_tests:
- not_null: *recent_date_filter
- name: INNER_INDEX
description: "{{ doc('inner_index') }}"
- name: ACTION_TYPE
description: "{{ doc('event_type') }}"
data_tests:
- not_null: *recent_date_filter
- accepted_values:
values: ["deposit", "withdraw"]
<<: *recent_date_filter
- name: PROVIDER_ADDRESS
description: "{{ doc('liquidity_provider') }}"
data_tests:
- not_null: *recent_date_filter
- name: TOKEN_A_MINT
description: "{{ doc('token_a_mint') }}"
data_tests:
- not_null: *recent_date_filter
- name: TOKEN_A_SYMBOL
description: "{{ doc('prices_symbol') }}"
- name: TOKEN_A_AMOUNT
description: "{{ doc('token_a_amount') }}"
data_tests:
- not_null: *recent_date_filter
- name: TOKEN_A_AMOUNT_USD
description: >
The amount of token A in USD.
- name: TOKEN_B_MINT
description: "{{ doc('token_b_mint') }}"
- name: TOKEN_B_SYMBOL
description: "{{ doc('prices_symbol') }}"
- name: TOKEN_B_AMOUNT
description: "{{ doc('token_b_amount') }}"
- name: TOKEN_B_AMOUNT_USD
description: >
The amount of token B in USD.
- name: POOL_ADDRESS
description: "{{ doc('liquidity_pool_address') }}"
data_tests:
- not_null: *recent_date_filter
- name: POOL_NAME
description: >
Name of the liquidity pool
- name: IS_MSOL_POOL
description: >
Whether the pool has MSOL as one of the tokens
data_tests:
- not_null
- name: IS_MNDE_POOL
description: >
Whether the pool has MNDE as one of the tokens
data_tests:
- not_null
- name: PROGRAM_ID
description: "{{ doc('program_id') }}"
data_tests:
- not_null: *recent_date_filter
- name: PLATFORM
description: >
Name of the liquidity pool protocol
data_tests:
- not_null
- name: EZ_LIQUIDITY_POOL_ACTIONS_ID
description: '{{ doc("pk") }}'
data_tests:
- unique
- name: INSERTED_TIMESTAMP
description: '{{ doc("inserted_timestamp") }}'
data_tests:
- not_null: *recent_date_filter
- name: MODIFIED_TIMESTAMP
description: '{{ doc("modified_timestamp") }}'
data_tests:
- not_null: *recent_date_filter

View File

@ -0,0 +1,78 @@
/* TODO: ephemeral names are working properly with our custom naming macro, has to be view for now
but external user should not have perms to select from this view */
/* this only needs to be run once */
{{
config(
materialized = 'view',
tags = ['exclude_change_tracking']
)
}}
with base as (
select
lp.*,
row_number() over (partition by lp.tx_id, lp.index, lp.action order by lp.inner_index) as rn
from
{{ ref('silver__liquidity_pool_actions_meteora_dlmm') }} AS lp
/*
TODO: Add this when meteora pools have been added to dim pools
inner join {{ ref('marinade__dim_pools') }} AS m
on lp.liquidity_pool_address = m.pool_address
*/
where action in (
'removeLiquidityByRange',
'removeLiquidity',
'removeAllLiquidity',
'addLiquidityByStrategyOneSide',
'addLiquidityOneSide',
'addLiquidity',
'addLiquidityByWeight',
'addLiquidityByStrategy'
)
),
pre_final as (
select
b1.* exclude(inner_index),
case when b1.action IN ('addLiquidityByStrategyOneSide','addLiquidityOneSide') then
NULL
else
iff(b1.inner_index=0,NULL,b1.inner_index-1)
end AS inner_index,
b2.mint as b_mint,
b2.amount AS b_amount
from base b1
left join
base b2
on b1.tx_id = b2.tx_id
and b1.index = b2.index
and b1.action = b2.action
and b1.rn = 1
and b2.rn <> 1
where
b1.rn = 1
/*
have to put this here because upstream has bad data
ex: fYzxEgSFVmK24twSBaVBSue1evhFnzTBE31SGduCC28Z6BAbx68BCQJJe4PVsKnRpHof5rM9hBF5GJr3w8C755B
*/
qualify
row_number() over (partition by b1.tx_id, b1.index, b1.action order by b2.rn) = 1
)
select
block_id,
block_timestamp,
tx_id,
index,
inner_index,
succeeded,
action AS event_type,
liquidity_pool_address AS pool_address,
liquidity_provider AS provider_address,
mint AS token_a_mint,
amount AS token_a_amount,
b_mint AS token_b_mint,
b_amount AS token_b_amount,
program_id,
modified_timestamp,
{{ dbt_utils.generate_surrogate_key(['block_id', 'tx_id', 'index', 'inner_index']) }} AS liquidity_pool_actions_meteora_dlmm_id
from pre_final

View File

@ -0,0 +1,54 @@
version: 2
models:
- name: marinade__meteora_dlmm_pivot
data_tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- TX_ID
- INDEX
- INNER_INDEX
- dbt_utils.expression_is_true:
expression: "inner_index >= 0"
columns:
- name: BLOCK_TIMESTAMP
description: "{{ doc('block_timestamp') }}"
- name: BLOCK_ID
description: "{{ doc('block_id') }}"
- name: TX_ID
description: "{{ doc('tx_id') }}"
- name: INDEX
description: "{{ doc('event_index') }}"
- name: INNER_INDEX
description: "{{ doc('inner_index') }}"
- name: SUCCEEDED
description: "{{ doc('tx_succeeded') }}"
- name: EVENT_TYPE
description: "{{ doc('event_type') }}"
- name: POOL_ADDRESS
description: "{{ doc('liquidity_pool_address') }}"
- name: PROVIDER_ADDRESS
description: "{{ doc('liquidity_provider') }}"
- name: TOKEN_A_MINT
description: "{{ doc('token_a_mint') }}"
data_tests:
- not_null
- name: TOKEN_A_AMOUNT
description: "{{ doc('token_a_amount') }}"
data_tests:
- not_null
- name: TOKEN_B_MINT
description: "{{ doc('token_b_mint') }}"
- name: TOKEN_B_AMOUNT
description: "{{ doc('token_b_amount') }}"
- name: PROGRAM_ID
description: "{{ doc('program_id') }}"
- name: _INSERTED_TIMESTAMP
description: "{{ doc('_inserted_timestamp') }}"
- name: LIQUIDITY_POOL_ACTIONS_METEORA_DLMM_ID
description: '{{ doc("pk") }}'
data_tests:
- unique
- name: INSERTED_TIMESTAMP
description: '{{ doc("inserted_timestamp") }}'
- name: MODIFIED_TIMESTAMP
description: '{{ doc("modified_timestamp") }}'

View File

@ -0,0 +1,66 @@
/* TODO: ephemeral names are working properly with our custom naming macro, has to be view for now
but external user should not have perms to select from this view */
/* this only needs to be run once */
{{
config(
materialized = 'view',
tags = ['exclude_change_tracking']
)
}}
with base as (
select
lp.*,
row_number() over (partition by lp.tx_id, lp.index, lp.action order by lp.inner_index) as rn
from
{{ ref('silver__liquidity_pool_actions_meteora') }} AS lp
/*
TODO: Add this when meteora pools have been added to dim pools
inner join {{ ref('marinade__dim_pools') }} AS m
on lp.liquidity_pool_address = m.pool_address
*/
where
lp.succeeded
AND action in (
'addBalanceLiquidity',
'addImbalanceLiquidity',
'bootstrapLiquidity',
'removeBalanceLiquidity',
'removeLiquiditySingleSide'
)
),
pre_final as (
select
b1.* exclude(inner_index),
iff(b1.inner_index=1,NULL,b1.inner_index-2) AS inner_index,
b2.mint as b_mint,
b2.amount AS b_amount
from base b1
left join
base b2
on b1.tx_id = b2.tx_id
and b1.index = b2.index
and b1.action = b2.action
and b1.rn = 1
and b2.rn <> 1
where
b1.rn = 1
)
select
block_id,
block_timestamp,
tx_id,
index,
inner_index,
action AS event_type,
liquidity_pool_address AS pool_address,
liquidity_provider AS provider_address,
mint AS token_a_mint,
amount AS token_a_amount,
b_mint AS token_b_mint,
b_amount AS token_b_amount,
program_id,
modified_timestamp,
{{ dbt_utils.generate_surrogate_key(['block_id', 'tx_id', 'index', 'inner_index']) }} AS liquidity_pool_actions_meteora_id
from pre_final

View File

@ -0,0 +1,54 @@
version: 2
models:
- name: marinade__meteora_pivot
data_tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- TX_ID
- INDEX
- INNER_INDEX
- dbt_utils.expression_is_true:
expression: "inner_index >= 0"
columns:
- name: BLOCK_TIMESTAMP
description: "{{ doc('block_timestamp') }}"
- name: BLOCK_ID
description: "{{ doc('block_id') }}"
- name: TX_ID
description: "{{ doc('tx_id') }}"
- name: INDEX
description: "{{ doc('event_index') }}"
- name: INNER_INDEX
description: "{{ doc('inner_index') }}"
- name: SUCCEEDED
description: "{{ doc('tx_succeeded') }}"
- name: EVENT_TYPE
description: "{{ doc('event_type') }}"
- name: POOL_ADDRESS
description: "{{ doc('liquidity_pool_address') }}"
- name: PROVIDER_ADDRESS
description: "{{ doc('liquidity_provider') }}"
- name: TOKEN_A_MINT
description: "{{ doc('token_a_mint') }}"
data_tests:
- not_null
- name: TOKEN_A_AMOUNT
description: "{{ doc('token_a_amount') }}"
data_tests:
- not_null
- name: TOKEN_B_MINT
description: "{{ doc('token_b_mint') }}"
- name: TOKEN_B_AMOUNT
description: "{{ doc('token_b_amount') }}"
- name: PROGRAM_ID
description: "{{ doc('program_id') }}"
- name: _INSERTED_TIMESTAMP
description: "{{ doc('_inserted_timestamp') }}"
- name: LIQUIDITY_POOL_ACTIONS_METEORA_ID
description: '{{ doc("pk") }}'
data_tests:
- unique
- name: INSERTED_TIMESTAMP
description: '{{ doc("inserted_timestamp") }}'
- name: MODIFIED_TIMESTAMP
description: '{{ doc("modified_timestamp") }}'