AN-4236 swaps overhaul (#148)

* load for all blocks

* merge main & cl

* finalize logic
This commit is contained in:
eric-laurello 2024-01-11 21:40:41 -05:00 committed by GitHub
parent 7fa382ee39
commit eb02a0e9c3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 875 additions and 370 deletions

View File

@ -0,0 +1,158 @@
{{ config(
materialized = 'incremental',
full_refresh = false,
tags = ['daily']
) }}
WITH call_1 AS (
SELECT
live.udf_api(
'GET',
'https://lcd.osmosis.zone/osmosis/concentratedliquidity/v1beta1/pools',{},{}
) AS resp,
SYSDATE() AS _inserted_timestamp
),
call_2 AS (
SELECT
live.udf_api(
'GET',
'https://lcd.osmosis.zone/osmosis/concentratedliquidity/v1beta1/pools?pagination.key=' || resp :data :pagination :next_key,{},{}
) AS resp,
SYSDATE() AS _inserted_timestamp
FROM
call_1
),
call_3 AS (
SELECT
live.udf_api(
'GET',
'https://lcd.osmosis.zone/osmosis/concentratedliquidity/v1beta1/pools?pagination.key=' || resp :data :pagination :next_key,{},{}
) AS resp,
SYSDATE() AS _inserted_timestamp
FROM
call_2
WHERE
resp :data :pagination :next_key <> 'null'
),
call_4 AS (
SELECT
live.udf_api(
'GET',
'https://lcd.osmosis.zone/osmosis/concentratedliquidity/v1beta1/pools?pagination.key=' || resp :data :pagination :next_key,{},{}
) AS resp,
SYSDATE() AS _inserted_timestamp
FROM
call_3
WHERE
resp :data :pagination :next_key <> 'null'
),
call_5 AS (
SELECT
live.udf_api(
'GET',
'https://lcd.osmosis.zone/osmosis/concentratedliquidity/v1beta1/pools?pagination.key=' || resp :data :pagination :next_key,{},{}
) AS resp,
SYSDATE() AS _inserted_timestamp
FROM
call_4
WHERE
resp :data :pagination :next_key <> 'null'
),
call_6 AS (
SELECT
live.udf_api(
'GET',
'https://lcd.osmosis.zone/osmosis/concentratedliquidity/v1beta1/pools?pagination.key=' || resp :data :pagination :next_key,{},{}
) AS resp,
SYSDATE() AS _inserted_timestamp
FROM
call_5
WHERE
resp :data :pagination :next_key <> 'null'
),
call_7 AS (
SELECT
live.udf_api(
'GET',
'https://lcd.osmosis.zone/osmosis/concentratedliquidity/v1beta1/pools?pagination.key=' || resp :data :pagination :next_key,{},{}
) AS resp,
SYSDATE() AS _inserted_timestamp
FROM
call_6
WHERE
resp :data :pagination :next_key <> 'null'
),
call_8 AS (
SELECT
live.udf_api(
'GET',
'https://lcd.osmosis.zone/osmosis/concentratedliquidity/v1beta1/pools?pagination.key=' || resp :data :pagination :next_key,{},{}
) AS resp,
SYSDATE() AS _inserted_timestamp
FROM
call_7
WHERE
resp :data :pagination :next_key <> 'null'
),
fin AS (
SELECT
resp,
_inserted_timestamp
FROM
call_1
UNION ALL
SELECT
resp,
_inserted_timestamp
FROM
call_2
UNION ALL
SELECT
resp,
_inserted_timestamp
FROM
call_3 {# WHERE
resp IS NOT NULL #}
UNION ALL
SELECT
resp,
_inserted_timestamp
FROM
call_4 {# WHERE
resp IS NOT NULL #}
UNION ALL
SELECT
resp,
_inserted_timestamp
FROM
call_5 {# WHERE
resp IS NOT NULL #}
UNION ALL
SELECT
resp,
_inserted_timestamp
FROM
call_6 {# WHERE
resp IS NOT NULL #}
UNION ALL
SELECT
resp,
_inserted_timestamp
FROM
call_7 {# WHERE
resp IS NOT NULL #}
UNION ALL
SELECT
resp,
_inserted_timestamp
FROM
call_8 {# WHERE
resp IS NOT NULL #}
)
SELECT
resp,
_inserted_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
fin

View File

@ -1,7 +1,7 @@
version: 2
models:
- name: core__fact_airdrop
description: Record of the OSMO airdrop to ATOM stakers. The columns blockchain, chain_id, and tx_status will be deprecating soon.
description: Record of the OSMO airdrop to ATOM stakers.
columns:
- name: BLOCK_ID
description: "{{ doc('block_id') }}"

View File

@ -1,7 +1,7 @@
version: 2
models:
- name: core__fact_msg_attributes
description: Records of all message attributes associated to messages that have occurred on Osmosis, dating back to the genesis block. The columns blockchain and chain_id will be deprecating soon.
description: Records of all message attributes associated to messages that have occurred on Osmosis, dating back to the genesis block.
columns:
- name: BLOCK_ID
description: "{{ doc('block_id') }}"

View File

@ -1,7 +1,7 @@
version: 2
models:
- name: core__fact_msgs
description: Records of all message attributes associated to messages that have occurred on Osmosis, dating back to the genesis block. The columns blockchain, chain_id, and tx_status will be deprecating soon.
description: Records of all message attributes associated to messages that have occurred on Osmosis, dating back to the genesis block.
columns:
- name: BLOCK_ID
description: "{{ doc('block_id') }}"

View File

@ -1,7 +1,7 @@
version: 2
models:
- name: core__fact_transactions
description: Records of all transactions that have occurred on Osmosis, dating back to the genesis block. The columns blockchain, chain_id, and tx_status will be deprecating soon.
description: Records of all transactions that have occurred on Osmosis, dating back to the genesis block.
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:

View File

@ -1,7 +1,7 @@
version: 2
models:
- name: core__fact_transfers
description: Records of all transfers on Osmosis, including IBC transfers as on- and off-ramps to Osmosis and wallet to wallet transfers. The columns blockchain, chain_id, and tx_status will be deprecating soon.
description: Records of all transfers on Osmosis, including IBC transfers as on- and off-ramps to Osmosis and wallet to wallet transfers.
columns:
- name: BLOCK_ID
description: "{{ doc('block_id') }}"

View File

@ -1,7 +1,7 @@
version: 2
models:
- name: defi__fact_liquidity_provider_actions
description: Includes all actions entering and exiting liquidity pools. The columns blockchain, chain_id, and tx_status will be deprecating soon.
description: Includes all actions entering and exiting liquidity pools.
columns:
- name: BLOCK_ID
description: "{{ doc('block_id') }}"

View File

@ -1,7 +1,7 @@
version: 2
models:
- name: defi__fact_locked_liquidity_actions
description: Includes all actions related to locked LP tokens. The columns blockchain, chain_id, and tx_status will be deprecating soon.
description: Includes all actions related to locked LP tokens.
columns:
- name: BLOCK_ID
description: "{{ doc('block_id') }}"

View File

@ -1,7 +1,7 @@
version: 2
models:
- name: defi__fact_superfluid_staking
description: Records of all staking related transactions that have occurred on Osmosis, dating back to the genesis block. These actions include delegate, undelegate, and redelegate. The columns blockchain, chain_id, and tx_status will be deprecating soon.
description: Records of all staking related transactions that have occurred on Osmosis, dating back to the genesis block. These actions include delegate, undelegate, and redelegate.
columns:
- name: BLOCK_ID
description: "{{ doc('block_id') }}"

View File

@ -17,7 +17,8 @@ SELECT
to_currency,
TO_DECIMAL,
pool_ids,
_BODY_INDEX,
pool_id,
_body_index,
COALESCE(
swaps_id,
{{ dbt_utils.generate_surrogate_key(
@ -34,3 +35,35 @@ SELECT
) AS modified_timestamp
FROM
{{ ref('silver__swaps') }}
UNION ALL
SELECT
block_id,
block_timestamp,
tx_id,
tx_succeeded,
trader,
from_amount,
from_currency,
from_decimal,
to_amount,
to_currency,
TO_DECIMAL,
pool_id :: ARRAY AS pool_ids,
pool_id,
msg_index AS _body_index,
COALESCE(
swaps_transfers_id,
{{ dbt_utils.generate_surrogate_key(
['tx_id','_body_index']
) }}
) AS fact_swaps_id,
COALESCE(
inserted_timestamp,
'2000-01-01'
) AS inserted_timestamp,
COALESCE(
modified_timestamp,
'2000-01-01'
) AS modified_timestamp
FROM
{{ ref('silver__swaps_transfers') }}

View File

@ -1,7 +1,7 @@
version: 2
models:
- name: defi__fact_swaps
description: Records of all transactions that have occurred on Osmosis, dating back to the genesis block. The columns blockchain, chain_id, and tx_status will be deprecating soon.
description: Records of all swaps that have occurred on Osmosis, dating back to the genesis block. The granularity of this table is one record for each pool a swap uses. For example, if a swap uses 3 pools, there will be 3 records in this table for that swap.
columns:
- name: BLOCK_ID
description: "{{ doc('block_id') }}"
@ -44,6 +44,10 @@ models:
- name: TO_DECIMAL
description: "{{ doc('to_decimal') }}"
- name: POOL_IDS
description: "Deprecating soon! This is an array value not an integer. Use pool_id instead. {{ doc('pool_id') }}"
tests:
- dbt_expectations.expect_column_to_exist
- name: POOL_ID
description: "{{ doc('pool_id') }}"
tests:
- dbt_expectations.expect_column_to_exist

View File

@ -1,7 +1,7 @@
version: 2
models:
- name: gov__fact_governance_proposal_deposits
description: Records of all proposal deposits that have occurred on Osmosis, dating back to the genesis block. At least 500 OSMO must be deposited for a proposal to be voted on. The columns blockchain, chain_id, and tx_status will be deprecating soon.
description: Records of all proposal deposits that have occurred on Osmosis, dating back to the genesis block. At least 500 OSMO must be deposited for a proposal to be voted on.
columns:
- name: BLOCK_ID
description: "{{ doc('block_id') }}"

View File

@ -1,7 +1,7 @@
version: 2
models:
- name: gov__fact_governance_submit_proposal
description: Records of all proposal submissions on Osmosis, dating back to the Genesis block. The columns blockchain, chain_id, and tx_status will be deprecating soon.
description: Records of all proposal submissions on Osmosis, dating back to the Genesis block.
columns:
- name: BLOCK_ID
description: "{{ doc('block_id') }}"

View File

@ -1,7 +1,7 @@
version: 2
models:
- name: gov__fact_governance_votes
description: Records of all votes on proposals. The columns blockchain, chain_id, and tx_status will be deprecating soon.
description: Records of all votes on proposals.
columns:
- name: BLOCK_ID
description: "{{ doc('block_id') }}"

View File

@ -1,7 +1,7 @@
version: 2
models:
- name: gov__fact_staking
description: Records of all staking related transactions that have occurred on Osmosis, dating back to the genesis block. These actions include delegate, undelegate, and redelegate. The columns blockchain, chain_id, and tx_status will be deprecating soon.
description: Records of all staking related transactions that have occurred on Osmosis, dating back to the genesis block. These actions include delegate, undelegate, and redelegate.
columns:
- name: BLOCK_ID
description: "{{ doc('block_id') }}"

View File

@ -1,7 +1,7 @@
version: 2
models:
- name: gov__fact_staking_rewards
description: Records of all claimed rewards earned from staking that have occurred on Osmosis, dating back to the genesis block. These actions include claim and withdrawal_rewards. The columns blockchain, chain_id, and tx_status will be deprecating soon.
description: Records of all claimed rewards earned from staking that have occurred on Osmosis, dating back to the genesis block. These actions include claim and withdrawal_rewards.
columns:
- name: BLOCK_ID
description: "{{ doc('block_id') }}"

View File

@ -0,0 +1,146 @@
{{ config(
materialized = 'incremental',
unique_key = ['tx_id','msg_index'],
incremental_strategy = 'merge',
merge_exclude_columns = ["inserted_timestamp"],
cluster_by = ['block_timestamp::DATE'],
tags = ['core']
) }}
WITH base_atts AS (
SELECT
block_id,
block_timestamp,
tx_id,
tx_succeeded,
msg_group,
msg_sub_group,
msg_index,
msg_type,
attribute_key,
attribute_value,
_inserted_timestamp
FROM
{{ ref('silver__msg_attributes') }}
WHERE
msg_type IN (
'tx',
'transfer'
)
{% if is_incremental() %}
AND _inserted_timestamp >= (
SELECT
MAX(
_inserted_timestamp
)
FROM
{{ this }}
)
{% endif %}
),
all_transfers AS (
SELECT
block_id,
block_timestamp,
tx_id,
tx_succeeded,
msg_group,
msg_sub_group,
msg_index,
_inserted_timestamp,
OBJECT_AGG(
attribute_key :: STRING,
attribute_value :: variant
) AS j,
j :sender :: STRING AS sender,
j :recipient :: STRING AS recipient,
j :amount :: STRING AS amount
FROM
base_atts
WHERE
msg_type = 'transfer'
GROUP BY
block_id,
block_timestamp,
tx_id,
tx_succeeded,
msg_group,
msg_sub_group,
msg_index,
_inserted_timestamp
),
sender AS (
SELECT
tx_id,
SPLIT_PART(
attribute_value,
'/',
0
) AS sender
FROM
base_atts
WHERE
msg_type = 'tx'
AND attribute_key = 'acc_seq' qualify(ROW_NUMBER() over(PARTITION BY tx_id
ORDER BY
msg_index)) = 1
),
new_fin AS (
SELECT
A.block_id,
A.block_timestamp,
A.tx_id,
A.tx_succeeded,
A.msg_group,
A.msg_sub_group,
A.msg_index,
A._inserted_timestamp,
COALESCE(
A.sender,
s.sender
) AS sender,
A.recipient AS receiver,
A.amount AS amount_raw,
SPLIT_PART(
TRIM(
REGEXP_REPLACE(
amount_raw,
'[^[:digit:]]',
' '
)
),
' ',
0
) AS amount_INT,
RIGHT(amount_raw, LENGTH(amount_raw) - LENGTH(SPLIT_PART(TRIM(REGEXP_REPLACE(amount_raw, '[^[:digit:]]', ' ')), ' ', 0))) AS currency
FROM
all_transfers A
JOIN sender s
ON A.tx_id = s.tx_id
)
SELECT
block_id,
block_timestamp,
tx_id,
tx_succeeded,
'OSMOIS' AS transfer_type,
msg_index,
sender,
receiver AS receiver,
TRY_CAST(
amount_int AS INT
) AS amount,
currency,
{{ dbt_utils.generate_surrogate_key(
['tx_id','msg_index']
) }} AS transfers_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
_inserted_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
new_fin
WHERE
amount IS NOT NULL

View File

@ -0,0 +1,92 @@
version: 2
models:
- name: silver__transfers_base
description: Records of all transfers on Osmosis, including IBC transfers as on- and off-ramps to Osmosis and wallet to wallet transfers
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- TX_ID
- MSG_INDEX
columns:
- name: BLOCK_ID
description: "{{ doc('block_id') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: BLOCK_TIMESTAMP
description: "{{ doc('block_timestamp') }}"
tests:
- not_null
- dbt_expectations.expect_row_values_to_have_recent_data:
datepart: day
interval: 1
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_NTZ
- name: TX_ID
description: "{{ doc('tx_id') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- name: TX_SUCCEEDED
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- BOOLEAN
- name: TRANSFER_TYPE
description: "{{ doc('transfer_type') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- name: SENDER
description: "{{ doc('sender') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- name: AMOUNT
description: "{{ doc('amount') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: CURRENCY
description: "{{ doc('currency') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- name: DECIMAL
description: "{{ doc('decimal') }}"
- name: RECEIVER
description: "{{ doc('receiver') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- name: FOREIGN_ADDRESS
description: "{{ doc('foreign_address') }}"
- name: FOREIGN_CHAIN
description: "{{ doc('foreign_chain') }}"
- name: _INSERTED_TIMESTAMP
description: "{{ doc('inserted_timestamp') }}"
tests:
- not_null

View File

@ -0,0 +1,61 @@
{{ config(
materialized = 'incremental',
unique_key = ['pool_id','_inserted_timestamp'],
incremental_strategy = 'delete+insert',
merge_exclude_columns = ["inserted_timestamp"],
cluster_by = ['_inserted_timestamp::DATE'],
tags = ['noncore']
) }}
WITH base AS (
SELECT
resp,
_inserted_timestamp
FROM
{{ ref(
'bronze_api__concentrated_liquidity_pools'
) }}
{% if is_incremental() %}
WHERE
_inserted_timestamp >= (
SELECT
MAX(
_inserted_timestamp
)
FROM
{{ this }}
)
{% endif %}
)
SELECT
VALUE :"@type" :: STRING AS TYPE,
VALUE: "address" :: STRING AS address,
VALUE: "incentives_address" :: STRING AS incentives_address,
VALUE :"spread_rewards_address" :: STRING AS spread_rewards_address,
VALUE :"id" :: INT AS pool_id,
VALUE :"current_tick_liquidity" :: FLOAT AS current_tick_liquidity,
VALUE :"token0" :: STRING AS token0,
VALUE :"token1" :: STRING AS token1,
VALUE :"token2" :: STRING AS token2,
VALUE :"token3" :: STRING AS token3,
VALUE :"token4" :: STRING AS token4,
VALUE: "current_sqrt_price" :: FLOAT AS current_sqrt_price,
VALUE :"current_tick" :: bigint AS current_tick,
VALUE :"tick_spacing" :: INT AS tick_spacing,
VALUE :"exponent_at_price_one" :: FLOAT AS exponent_at_price_one,
VALUE :"spread_factor" :: FLOAT AS spread_factor,
VALUE: "last_liquidity_update" :: datetime AS last_liquidity_update,
{{ dbt_utils.generate_surrogate_key(
['pool_id','_inserted_timestamp']
) }} AS concentrated_liquidity_pools_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
_inserted_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
base,
LATERAL FLATTEN(
resp :data :pools
)

View File

@ -0,0 +1,110 @@
version: 2
models:
- name: silver__concentrated_liquidity_pools
description: Includes all actions entering and exiting liquidity pools
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- TX_ID
- MSG_INDEX
- CURRENCY
columns:
- name: BLOCK_ID
description: "{{ doc('block_id') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: BLOCK_TIMESTAMP
description: "{{ doc('block_timestamp') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_NTZ
- name: TX_ID
description: "{{ doc('tx_id') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- name: TX_SUCCEEDED
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- BOOLEAN
- name: MSG_GROUP
description: "{{ doc('silver_msg_group') }}"
tests:
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: MSG_SUB_GROUP
description: "{{ doc('msg_sub_group') }}"
tests:
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: MSG_INDEX
description: "{{ doc('msg_index') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: LIQUIDITY_PROVIDER_ADDRESS
description: "{{ doc('liquidity_provider_address') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- dbt_expectations.expect_column_values_to_match_regex:
regex: osmo1[0-9a-z]{38,38}
- name: ACTION
description: "{{ doc('action') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- name: POOL_ID
description: "{{ doc('pool_id') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: AMOUNT
description: "{{ doc('amount') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: CURRENCY
description: "{{ doc('currency') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- name: DECIMAL
description: "{{ doc('decimal') }}"
- name: _INSERTED_TIMESTAMP
description: "{{ doc('inserted_timestamp') }}"
tests:
- not_null

View File

@ -1,376 +1,38 @@
{{ config(
materialized = 'incremental',
unique_key = "CONCAT_WS('-', tx_id, _body_index)",
unique_key = ['tx_id','_body_index'],
incremental_strategy = 'delete+insert',
cluster_by = ['block_timestamp::DATE'],
tags = ['noncore']
) }}
WITH
{% if is_incremental() %}
max_date AS (
SELECT
MAX(
_inserted_timestamp
) _inserted_timestamp
FROM
{{ this }}
),
{% endif %}
swaps AS (
SELECT
block_id,
block_timestamp,
tx_id,
tx_succeeded,
tx_body,
_inserted_timestamp,
ROW_NUMBER() over (
PARTITION BY tx_id
ORDER BY
_inserted_timestamp ASC
) - 1 AS rn
FROM
{{ ref('silver__transactions') }}
t,
LATERAL FLATTEN (
input => tx_body :messages,
recursive => TRUE
) b
WHERE
key = '@type'
AND VALUE :: STRING = '/osmosis.gamm.v1beta1.MsgSwapExactAmountIn'
AND tx_succeeded = TRUE
{% if is_incremental() %}
AND t._inserted_timestamp >= (
SELECT
MAX(
_inserted_timestamp
)
FROM
max_date
)
{% endif %}
),
msg_atts AS (
SELECT
tx_id,
msg_group,
msg_index,
msg_type,
attribute_key,
attribute_value
FROM
{{ ref('silver__msg_attributes') }}
{% if is_incremental() %}
WHERE
_inserted_timestamp >= (
SELECT
MAX(
_inserted_timestamp
)
FROM
max_date
)
{% endif %}
),
pre_final AS (
SELECT
block_id,
block_timestamp,
tx_id,
tx_succeeded,
b.value,
b.value :sender :: STRING AS trader,
COALESCE(
b.value :tokenOutMinAmount :: NUMBER,
b.value :token_out_min_amount :: NUMBER
) AS to_amount,
b.value :token_in :amount :: NUMBER from_amount,
b.value :token_in :denom :: STRING from_currency,
b.value :routes AS routes,
_inserted_timestamp,
b.index AS _body_index
FROM
swaps s,
TABLE(FLATTEN(tx_body :messages)) b
WHERE
b.value :routes IS NOT NULL
AND b.index = rn
),
pools AS (
SELECT
tx_id,
_body_index,
ARRAY_AGG(
COALESCE(
r.value :poolId,
r.value :pool_id
)
) AS pool_ids
FROM
pre_final p,
TABLE(FLATTEN(routes)) r
GROUP BY
tx_id,
_body_index),
msg_idx AS (
SELECT
p.tx_id,
msg_group,
MIN(
m.msg_index
) AS min_msg_index
FROM
pre_final p
INNER JOIN msg_atts m
ON p.tx_id = m.tx_id
WHERE
(
(
msg_type = 'token_swapped'
AND attribute_key = 'tokens_in'
)
OR (
msg_type = 'transfer'
AND attribute_key = 'amount'
)
)
AND msg_group IS NOT NULL
GROUP BY
p.tx_id,
msg_group
),
from_amt AS (
SELECT
m.tx_id,
p.msg_index,
m.msg_group,
RIGHT(attribute_value, LENGTH(attribute_value) - LENGTH(SPLIT_PART(TRIM(REGEXP_REPLACE(attribute_value, '[^[:digit:]]', ' ')), ' ', 0))) AS from_currency,
SPLIT_PART(
TRIM(
REGEXP_REPLACE(
attribute_value,
'[^[:digit:]]',
' '
)
),
' ',
0
) AS from_amount
FROM
msg_atts p
INNER JOIN msg_idx m
ON p.tx_id = m.tx_id
AND p.msg_group = m.msg_group
AND p.msg_index = min_msg_index
WHERE
(
(
msg_type = 'token_swapped'
AND attribute_key = 'tokens_in'
)
OR (
msg_type = 'transfer'
AND attribute_key = 'amount'
)
)
) {# ,
fee_rec AS (
SELECT
p.tx_id,
m.sender,
m.recipient
FROM
pre_final p
INNER JOIN (
SELECT
tx_id,
OBJECT_AGG(
attribute_key :: STRING,
attribute_value :: variant
) AS j,
j :sender :: STRING AS sender,
j :recipient :: STRING AS recipient
FROM
msg_atts
WHERE
msg_type = 'transfer'
AND attribute_key IN (
'recipient',
'sender'
)
AND msg_group IS NULL
GROUP BY
tx_id
) m
ON p.tx_id = m.tx_id
AND p.trader = sender
),
ex_protorev AS (
SELECT
A.tx_id,
msg_index
FROM
msg_atts A
JOIN fee_rec b
ON A.tx_id = b.tx_id
AND A.attribute_value = b.recipient
WHERE
msg_type = 'transfer'
AND attribute_key = 'recipient'
) #},
rel_to_transfers AS (
SELECT
A.tx_id,
A.msg_index
FROM
msg_atts A
JOIN pre_final b
ON A.tx_id = b.tx_id
AND A.attribute_value = b.trader
WHERE
msg_type = 'transfer'
AND attribute_key = 'recipient'
),
max_idx2 AS (
SELECT
p.tx_id,
msg_group,
MAX(
m.msg_index
) AS max_msg_index
FROM
pre_final p
INNER JOIN msg_atts m
ON p.tx_id = m.tx_id
JOIN rel_to_transfers ex
ON m.tx_id = ex.tx_id
AND m.msg_index = ex.msg_index
WHERE
(
{# (
msg_type = 'token_swapped'
AND attribute_key = 'tokens_out'
)
OR #}(
msg_type = 'transfer'
AND attribute_key = 'amount'
)
)
AND msg_group IS NOT NULL {# AND ex.tx_id IS NULL #}
GROUP BY
p.tx_id,
msg_group
),
to_amt AS (
SELECT
mm.tx_id,
p.msg_index,
mm.msg_group,
RIGHT(attribute_value, LENGTH(attribute_value) - LENGTH(SPLIT_PART(TRIM(REGEXP_REPLACE(attribute_value, '[^[:digit:]]', ' ')), ' ', 0))) AS to_currency,
SPLIT_PART(
TRIM(
REGEXP_REPLACE(
attribute_value,
'[^[:digit:]]',
' '
)
),
' ',
0
) AS to_amount
FROM
msg_atts p
INNER JOIN max_idx2 mm
ON p.tx_id = mm.tx_id
AND p.msg_group = mm.msg_group
AND p.msg_index = max_msg_index
WHERE
(
(
msg_type = 'token_swapped'
AND attribute_key = 'tokens_out'
)
OR (
msg_type = 'transfer'
AND attribute_key = 'amount'
)
)
),
pre_final2 AS (
SELECT
block_id,
block_timestamp,
p.tx_id,
tx_succeeded,
trader,
COALESCE(TRY_CAST(f.from_amount AS bigint), TRY_CAST(p.from_amount AS bigint)) AS from_amount,
COALESCE(
f.from_currency,
p.from_currency
) AS from_currency,
CASE
WHEN COALESCE(
f.from_currency,
p.from_currency
) LIKE 'gamm/pool/%' THEN 18
ELSE l.decimal
END AS from_decimal,
tt.to_amount,
tt.to_currency,
CASE
WHEN tt.to_currency LIKE 'gamm/pool/%' THEN 18
ELSE A.decimal
END AS TO_DECIMAL,
pool_ids,
p._inserted_timestamp,
p._body_index
FROM
pre_final p
LEFT OUTER JOIN pools pp
ON p.tx_id = pp.tx_id
AND p._body_index = pp._body_index
LEFT OUTER JOIN from_amt f
ON p.tx_id = f.tx_id
AND p._body_index = f.msg_group
LEFT OUTER JOIN to_amt tt
ON p.tx_id = tt.tx_id
AND p._body_index = tt.msg_group
LEFT OUTER JOIN {{ ref('silver__asset_metadata') }} A
ON tt.to_currency = A.address
LEFT OUTER JOIN {{ ref('silver__asset_metadata') }}
l
ON COALESCE(
f.from_currency,
p.from_currency
) = l.address
)
SELECT
block_id,
block_timestamp,
tx_id,
tx_succeeded,
trader,
from_amount :: NUMBER AS from_amount,
from_amount,
from_currency,
from_decimal,
to_amount :: NUMBER AS to_amount,
as_f.decimal AS from_decimal,
to_amount,
to_currency,
TO_DECIMAL,
pool_ids,
_inserted_timestamp,
_body_index,
as_t.decimal AS TO_DECIMAL,
pool_id :: ARRAY AS pool_ids,
pool_id :: INT AS pool_id,
msg_index AS _body_index,
{{ dbt_utils.generate_surrogate_key(
['tx_id','_body_index']
['tx_id','msg_index']
) }} AS swaps_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
A._inserted_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
pre_final2
{{ ref('silver__token_swapped') }} A
LEFT OUTER JOIN {{ ref('silver__asset_metadata') }}
as_f
ON A.from_currency = as_f.address
LEFT OUTER JOIN {{ ref('silver__asset_metadata') }}
as_t
ON A.to_currency = as_t.address

View File

@ -0,0 +1,136 @@
{{ config(
materialized = 'incremental',
unique_key = ['tx_id','msg_index'],
incremental_strategy = 'delete+insert',
cluster_by = ['block_timestamp::DATE']
) }}
WITH xfer AS (
SELECT
*
FROM
{{ ref('silver__transfers_base') }}
WHERE
block_timestamp < '2021-09-24 14:11:34.000'
),
matts AS (
SELECT
*
FROM
{{ ref('silver__msg_attributes') }}
WHERE
block_timestamp < '2021-09-24 14:11:34.000'
),
txs AS (
SELECT
tx_id
FROM
matts
WHERE
msg_type = 'message'
AND attribute_key = 'action'
AND attribute_value = 'swap_exact_amount_in' qualify(ROW_NUMBER() over(PARTITION BY tx_id
ORDER BY
msg_index)) = 1
),
trader AS (
SELECT
tx_id,
SPLIT_PART(
attribute_value,
'/',
0
) AS trader
FROM
matts
WHERE
msg_type = 'tx'
AND attribute_key = 'acc_seq' qualify(ROW_NUMBER() over(PARTITION BY tx_id
ORDER BY
msg_index)) = 1
),
pools AS (
SELECT
pool_address,
pool_id
FROM
{{ ref('silver__pool_metadata') }}
WHERE
pool_address IS NOT NULL
),
fin AS (
SELECT
A.*,
COALESCE(
b_send.pool_id,
b_rec.pool_id
) AS pool_id,
CASE
WHEN b_send.pool_address IS NOT NULL THEN 'pool_in_send'
ELSE 'pool_in_rec'
END xtype,
ROW_NUMBER() over (
PARTITION BY A.tx_id,
A.sender
ORDER BY
A.msg_index
) AS send_rank,
ROW_NUMBER() over (
PARTITION BY A.tx_id,
A.receiver
ORDER BY
A.msg_index
) AS rec_rank
FROM
xfer A
LEFT JOIN pools b_send
ON A.sender = b_send.pool_address
LEFT JOIN pools b_rec
ON A.receiver = b_rec.pool_address
WHERE
COALESCE(
b_send.pool_id,
b_rec.pool_id
) IS NOT NULL
)
SELECT
A.block_id,
A.block_timestamp,
A.tx_id,
A.tx_succeeded,
tder.trader,
A.amount AS from_amount,
A.currency AS from_currency,
as_f.decimal AS from_decimal,
b.amount AS to_amount,
b.currency AS to_currency,
as_t.decimal AS TO_DECIMAL,
A.pool_id,
A._inserted_timestamp,
A.msg_index,
{{ dbt_utils.generate_surrogate_key(
['a.tx_id','a.msg_index']
) }} AS swaps_transfers_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
fin A
JOIN txs x
ON A.tx_id = x.tx_id
JOIN trader tder
ON A.tx_id = tder.tx_id
JOIN fin b
ON A.tx_id = b.tx_id
AND A.receiver = b.sender
AND A.rec_rank = b.send_rank
LEFT OUTER JOIN {{ ref('silver__asset_metadata') }}
as_f
ON A.currency = as_f.address
LEFT OUTER JOIN {{ ref('silver__asset_metadata') }}
as_t
ON b.currency = as_t.address
WHERE
A.xtype = 'pool_in_rec'
AND b.xtype = 'pool_in_send'

View File

@ -0,0 +1,103 @@
version: 2
models:
- name: silver__swaps_transfers
description: Records of all transactions that have occurred on Osmosis, dating back to the genesis block.
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- TX_ID
- MSG_INDEX
columns:
- name: BLOCK_ID
description: "{{ doc('block_id') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: BLOCK_TIMESTAMP
description: "{{ doc('block_timestamp') }}"
tests:
- not_null
- dbt_expectations.expect_row_values_to_have_recent_data:
datepart: day
interval: 1
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_NTZ
- name: TX_ID
description: "{{ doc('tx_id') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- name: TX_SUCCEEDED
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- BOOLEAN
- name: TRADER
description: "{{ doc('trader') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- dbt_expectations.expect_column_values_to_match_regex:
regex: osmo1[0-9a-z]{38,38}
- name: FROM_AMOUNT
description: "{{ doc('from_amount') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: FROM_CURRENCY
description: "{{ doc('from_currency') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- name: FROM_DECIMAL
description: "{{ doc('from_decimal') }}"
- name: TO_AMOUNT
description: "{{ doc('to_amount') }}"
tests:
- not_null:
config:
severity: warn
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: TO_CURRENCY
description: "{{ doc('to_currency') }}"
tests:
- not_null:
config:
severity: warn
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- name: TO_DECIMAL
description: "{{ doc('to_decimal') }}"
- name: POOL_ID
description: "{{ doc('pool_id') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- name: _INSERTED_TIMESTAMP
description: "{{ doc('inserted_timestamp') }}"
tests:
- not_null