AN-5462 dragonswap and ez_dex_swaps (#84)

New dragonswap model
New combined models (with heal logic)
New EZ Dex Swaps model
This commit is contained in:
eric-laurello 2024-12-09 16:28:39 -05:00 committed by GitHub
parent e26efa7f0b
commit 3ac6452fbe
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 1197 additions and 1 deletions

View File

@ -48,6 +48,8 @@ vars:
UPDATE_UDFS_AND_SPS: False
WAIT: 0
START_GHA_TASKS: False
HEAL_MODELS: []
TEST_DAYS_THRESHOLD: 7
#### STREAMLINE 2.0 BEGIN ####

View File

@ -0,0 +1,56 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'merge',
unique_key = 'pool_address',
merge_exclude_columns = ["inserted_timestamp"],
tags = ['noncore']
) }}
WITH created_pools AS (
SELECT
block_number,
block_timestamp,
tx_hash,
event_index,
contract_address,
decoded_log :token0 :: STRING AS token0,
decoded_log :token1 :: STRING AS token1,
decoded_log :pair :: STRING AS pool_address,
decoded_log :length :: INT AS pool_id
FROM
{{ ref ('core_evm__ez_decoded_event_logs') }}
WHERE
tx_succeeded
AND contract_address = LOWER('0x71f6b49ae1558357bbb5a6074f1143c46cbca03d') --DragonswapFactory
AND event_name = 'PairCreated'
{% if is_incremental() %}
AND modified_timestamp >= (
SELECT
MAX(modified_timestamp) - INTERVAL '5 minutes'
FROM
{{ this }}
)
{% endif %}
)
SELECT
block_number,
block_timestamp,
tx_hash,
event_index,
contract_address,
token0,
token1,
pool_address,
pool_id,
{{ dbt_utils.generate_surrogate_key(
['pool_address']
) }} AS dragonswap_pools_v1_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
created_pools qualify(ROW_NUMBER() over (PARTITION BY pool_address
ORDER BY
block_timestamp DESC)) = 1

View File

@ -0,0 +1,61 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'merge',
unique_key = 'pool_address',
merge_exclude_columns = ["inserted_timestamp"],
tags = ['noncore']
) }}
WITH created_pools AS (
SELECT
block_number,
block_timestamp,
tx_hash,
event_index,
contract_address,
regexp_substr_all(SUBSTR(DATA, 3, len(DATA)), '.{64}') AS segmented_data,
CONCAT('0x', SUBSTR(topics [1] :: STRING, 27, 40)) AS token0,
CONCAT('0x', SUBSTR(topics [2] :: STRING, 27, 40)) AS token1,
CONCAT('0x', SUBSTR(segmented_data [1] :: STRING, 25, 40)) AS pool_address,
TRY_TO_NUMBER(
utils.udf_hex_to_int(
segmented_data [0] :: STRING
)
) AS pool_id,
FROM
{{ ref ('silver_evm__logs') }}
WHERE
tx_status = 'SUCCESS'
AND contract_address = LOWER('0x179d9a5592bc77050796f7be28058c51ca575df4') --DragonswapFactoryV2
AND topics [0] :: STRING = '0x783cca1c0412dd0d695e784568c96da2e9c22ff989357a2e8b1d9b2b4e6b7118'
{% if is_incremental() %}
AND modified_timestamp >= (
SELECT
MAX(modified_timestamp) - INTERVAL '5 minutes'
FROM
{{ this }}
)
{% endif %}
)
SELECT
block_number,
block_timestamp,
tx_hash,
event_index,
contract_address,
token0,
token1,
pool_address,
pool_id,
{{ dbt_utils.generate_surrogate_key(
['pool_address']
) }} AS dragonswap_pools_v2_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
created_pools qualify(ROW_NUMBER() over (PARTITION BY pool_address
ORDER BY
block_timestamp DESC)) = 1

View File

@ -0,0 +1,62 @@
{{ config(
materialized = 'view',
tags = ['noncore']
) }}
SELECT
block_number,
block_timestamp,
tx_hash,
event_index,
event_name,
origin_function_signature,
origin_from_address,
origin_to_address,
contract_address,
tx_to,
sender,
ABS(amount_in_unadj) AS amount_in_unadj,
ABS(amount_out_unadj) AS amount_out_unadj,
token_in,
token_out,
dragonswap_swaps_decoded_id,
inserted_timestamp,
modified_timestamp,
_invocation_id
FROM
{{ ref('silver_evm_dex__dragonswap_swaps_decoded') }}
UNION ALL
SELECT
block_number,
block_timestamp,
tx_hash,
event_index,
event_name,
origin_function_signature,
origin_from_address,
origin_to_address,
contract_address,
tx_to,
sender,
CASE
WHEN amount0_unadj < 0 THEN ABS(amount0_unadj)
ELSE amount1_unadj
END AS amount_in_unadj,
CASE
WHEN amount0_unadj < 0 THEN ABS(amount1_unadj)
ELSE amount0_unadj
END AS amount_out_unadj,
CASE
WHEN amount0_unadj < 0 THEN token0
ELSE token1
END AS token_in,
CASE
WHEN amount0_unadj < 0 THEN token1
ELSE token0
END AS token_out,
dragonswap_swaps_undecoded_id,
inserted_timestamp,
modified_timestamp,
_invocation_id
FROM
{{ ref('silver_evm_dex__dragonswap_swaps_undecoded') }}

View File

@ -0,0 +1,38 @@
version: 2
models:
- name: silver_evm_dex__dragonswap_swaps_combined
description: Records of swaps that occurred on the dragonswap platform
columns:
- name: BLOCK_NUMBER
- name: BLOCK_TIMESTAMP
tests:
- dbt_expectations.expect_row_values_to_have_recent_data:
datepart: day
interval: 1
- name: tx_hash
- name: event_index
- name: event_name
- name: origin_function_signature
- name: origin_from_address
- name: origin_to_address
- name: contract_address
- name: tx_to
- name: sender
- name: amount_in_unadj
- name: amount_out_unadj
- name: token_in
- name: token_out

View File

@ -0,0 +1,113 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'merge',
unique_key = 'dragonswap_swaps_decoded_id',
merge_exclude_columns = ["inserted_timestamp"],
cluster_by = ['block_timestamp::DATE'],
tags = ['noncore']
) }}
WITH pools AS (
SELECT
pool_address,
token0,
token1
FROM
{{ ref('silver_evm_dex__dragonswap_pools_v1') }}
UNION ALL
SELECT
pool_address,
token0,
token1
FROM
{{ ref('silver_evm_dex__dragonswap_pools_v2') }}
),
swaps_base AS (
SELECT
l.block_number,
l.block_timestamp,
l.tx_hash,
l.event_index,
l.event_name,
origin_function_signature,
origin_from_address,
origin_to_address,
l.contract_address,
regexp_substr_all(SUBSTR(DATA, 3, len(DATA)), '.{64}') AS segmented_data,
decoded_log: amount0In :: bigint AS amount0In,
decoded_log: amount1In :: bigint AS amount1In,
decoded_log: amount0Out :: bigint AS amount0Out,
decoded_log: amount1Out :: bigint AS amount1Out,
decoded_log :sender :: STRING AS sender,
decoded_log :to :: STRING AS tx_to,
token0,
token1
FROM
{{ ref('core_evm__ez_decoded_event_logs') }}
l
INNER JOIN pools p
ON p.pool_address = l.contract_address
WHERE
event_name = 'Swap'
AND tx_succeeded
{% if is_incremental() %}
AND l.modified_timestamp >= (
SELECT
MAX(modified_timestamp) - INTERVAL '5 minutes'
FROM
{{ this }}
)
{% endif %}
)
SELECT
block_number,
block_timestamp,
tx_hash,
event_index,
event_name,
origin_function_signature,
origin_from_address,
origin_to_address,
contract_address,
sender,
tx_to,
amount0In,
amount1In,
amount0Out,
amount1Out,
token0,
token1,
CASE
WHEN amount0In <> 0
AND amount1In <> 0
AND amount0Out <> 0 THEN amount1In
WHEN amount0In <> 0 THEN amount0In
WHEN amount1In <> 0 THEN amount1In
END AS amount_in_unadj,
CASE
WHEN amount0Out <> 0 THEN amount0Out
WHEN amount1Out <> 0 THEN amount1Out
END AS amount_out_unadj,
CASE
WHEN amount0In <> 0
AND amount1In <> 0
AND amount0Out <> 0 THEN token1
WHEN amount0In <> 0 THEN token0
WHEN amount1In <> 0 THEN token1
END AS token_in,
CASE
WHEN amount0Out <> 0 THEN token0
WHEN amount1Out <> 0 THEN token1
END AS token_out,
{{ dbt_utils.generate_surrogate_key(
['tx_hash','event_index']
) }} AS dragonswap_swaps_decoded_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
swaps_base
WHERE
token_in <> token_out

View File

@ -0,0 +1,96 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'merge',
unique_key = 'dragonswap_swaps_undecoded_id',
merge_exclude_columns = ["inserted_timestamp"],
cluster_by = ['block_timestamp::DATE'],
tags = ['noncore']
) }}
WITH pools AS (
SELECT
pool_address,
token0,
token1
FROM
{{ ref('silver_evm_dex__dragonswap_pools_v1') }}
UNION ALL
SELECT
pool_address,
token0,
token1
FROM
{{ ref('silver_evm_dex__dragonswap_pools_v2') }}
),
swaps_base AS (
SELECT
l.block_number,
l.block_timestamp,
l.tx_hash,
l.event_index,
NULL AS event_name,
l.origin_function_signature,
l.origin_from_address,
l.origin_to_address,
l.contract_address,
regexp_substr_all(SUBSTR(DATA, 3, len(DATA)), '.{64}') AS segmented_data,
CONCAT('0x', SUBSTR(topics [1] :: STRING, 27, 40)) AS sender,
CONCAT('0x', SUBSTR(topics [2] :: STRING, 27, 40)) AS tx_to,
utils.udf_hex_to_int(
's2c',
segmented_data [0] :: STRING
) :: FLOAT AS amount0_unadj,
utils.udf_hex_to_int(
's2c',
segmented_data [1] :: STRING
) :: FLOAT AS amount1_unadj,
token0,
token1,
pool_address
FROM
{{ ref('silver_evm__logs') }}
l
INNER JOIN pools p
ON p.pool_address = l.contract_address
WHERE
l.block_timestamp :: DATE >= '2024-01-01'
AND topics [0] :: STRING = '0xc42079f94a6350d7e6235f29174924f928cc2ac818eb64fed8004e115fbcca67'
AND tx_status = 'SUCCESS'
{% if is_incremental() %}
AND l.modified_timestamp >= (
SELECT
MAX(
modified_timestamp
) - INTERVAL '5 minutes'
FROM
{{ this }}
)
{% endif %}
)
SELECT
block_number,
block_timestamp,
tx_hash,
event_index,
event_name,
origin_function_signature,
origin_from_address,
origin_to_address,
contract_address,
pool_address,
tx_to,
sender,
token0,
token1,
amount0_unadj,
amount1_unadj,
{{ dbt_utils.generate_surrogate_key(
['tx_hash','event_index']
) }} AS dragonswap_swaps_undecoded_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
swaps_base

View File

@ -0,0 +1,260 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'merge',
unique_key = 'swaps_combined_id',
merge_exclude_columns = ["inserted_timestamp"],
cluster_by = ['modified_timestamp::DATE'],
tags = ['noncore']
) }}
{% if execute %}
{% if is_incremental() %}
-- get the max modified_timestamp from the target table
{% set max_m_query %}
SELECT
MAX(modified_timestamp) - INTERVAL '{{ var("LOOKBACK", "30 minutes") }}' AS modified_timestamp
FROM
{{ this }}
{% endset %}
{% set max_mod_timestamp = run_query(max_m_query).columns [0].values() [0] %}
{% endif %}
{% endif %}
WITH inc AS (
SELECT
'dragonswap' AS platform,
block_number,
block_timestamp,
tx_hash,
event_index,
event_name,
origin_function_signature,
origin_from_address,
origin_to_address,
contract_address,
tx_to,
sender,
amount_in_unadj,
amount_out_unadj,
token_in,
token_out,
dragonswap_swaps_decoded_id AS uk
FROM
{{ ref('silver_evm_dex__dragonswap_swaps_combined') }}
{% if is_incremental() and 'dragonswap' not in var('HEAL_MODELS') %}
WHERE
modified_timestamp >= '{{ max_mod_timestamp }}'
{% endif %}
qualify ROW_NUMBER() over (
PARTITION BY tx_hash,
event_index
ORDER BY
modified_timestamp DESC
) = 1 -- add other dexes
)
{% if is_incremental() %},
mod_price AS (
SELECT
token_address,
HOUR,
price
FROM
{{ ref('price__ez_prices_hourly') }}
WHERE
modified_timestamp >= '{{ max_mod_timestamp }}'
),
mod_decimal AS (
SELECT
contract_address,
token_decimals,
token_symbol
FROM
{{ ref('silver_evm__contracts') }}
WHERE
modified_timestamp >= '{{ max_mod_timestamp }}'
),
mod_map AS (
SELECT
evm_address,
sei_address
FROM
{{ ref('core__dim_address_mapping') }}
WHERE
modified_timestamp >= '{{ max_mod_timestamp }}'
)
{% endif %},
fin AS (
SELECT
A.block_number AS block_id,
A.block_timestamp,
A.tx_hash AS tx_id,
'evm' AS originated_from,
A.platform,
d.sei_address AS swapper,
A.origin_from_address,
A.contract_address AS pool_address,
A.amount_in_unadj,
CASE
WHEN c_in.token_decimals IS NOT NULL THEN (amount_in_unadj / pow(10, c_in.token_decimals))
END AS amount_in,
CASE
WHEN c_in.token_decimals IS NOT NULL THEN amount_in * b_in.price
END AS amount_in_usd,
A.amount_out_unadj,
CASE
WHEN c_out.token_decimals IS NOT NULL THEN (amount_out_unadj / pow(10, c_out.token_decimals))
END AS amount_out,
CASE
WHEN c_out.token_decimals IS NOT NULL THEN amount_out * b_out.price
END AS amount_out_usd,
A.token_in,
c_in.token_symbol AS symbol_in,
A.token_out,
c_out.token_symbol AS symbol_out,
A.origin_function_signature,
A.event_index AS INDEX,
A.origin_to_address,
A.sender,
A.tx_to,
A.event_name,
uk AS swaps_combined_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
inc A
LEFT JOIN {{ ref('price__ez_prices_hourly') }}
b_in
ON A.token_in = b_in.token_address
AND DATE_TRUNC(
'hour',
A.block_timestamp
) = b_in.hour
LEFT JOIN {{ ref('price__ez_prices_hourly') }}
b_out
ON A.token_out = b_out.token_address
AND DATE_TRUNC(
'hour',
A.block_timestamp
) = b_out.hour
LEFT JOIN {{ ref('silver_evm__contracts') }}
c_in
ON A.token_in = c_in.contract_address
LEFT JOIN {{ ref('silver_evm__contracts') }}
c_out
ON A.token_out = c_out.contract_address
LEFT JOIN {{ ref('core__dim_address_mapping') }}
d
ON A.origin_from_address = d.evm_address
{% if is_incremental() %}
UNION ALL
SELECT
A.block_id,
A.block_timestamp,
A.tx_id,
A.originated_from,
A.platform,
COALESCE(
A.swapper,
d.sei_address
) AS swapper,
A.origin_from_address,
A.pool_address,
A.amount_in_unadj,
COALESCE(
A.amount_in,
CASE
WHEN c_in.token_decimals IS NOT NULL THEN (A.amount_in_unadj / pow(10, c_in.token_decimals))
END
) AS amount_in,
COALESCE(
A.amount_in_usd,
CASE
WHEN c_in.token_decimals IS NOT NULL THEN (A.amount_in_unadj / pow(10, c_in.token_decimals)) * b_in.price
END
) AS amount_in_usd,
A.amount_out_unadj,
COALESCE(
A.amount_out,
CASE
WHEN c_out.token_decimals IS NOT NULL THEN (A.amount_out_unadj / pow(10, c_in.token_decimals))
END
) AS amount_out,
COALESCE(
A.amount_out_usd,
CASE
WHEN c_out.token_decimals IS NOT NULL THEN (A.amount_out_unadj / pow(10, c_in.token_decimals)) * b_out.price
END
) AS amount_out_usd,
A.token_in,
COALESCE(
A.symbol_in,
c_in.token_symbol
) AS symbol_in,
A.token_out,
COALESCE(
A.symbol_out,
c_out.token_symbol
) AS symbol_out,
A.origin_function_signature,
A.index,
A.origin_to_address,
A.sender,
A.tx_to,
A.event_name,
A.swaps_combined_id,
A.inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
{{ this }} A
LEFT JOIN mod_price b_in
ON A.token_in = b_in.token_address
AND DATE_TRUNC(
'hour',
A.block_timestamp
) = b_in.hour
LEFT JOIN mod_price b_out
ON A.token_out = b_out.token_address
AND DATE_TRUNC(
'hour',
A.block_timestamp
) = b_out.hour
LEFT JOIN mod_decimal c_in
ON A.token_in = c_in.contract_address
LEFT JOIN mod_decimal c_out
ON A.token_out = c_out.contract_address
LEFT JOIN mod_map d
ON A.origin_from_address = d.evm_address
WHERE
(
A.amount_in IS NULL
OR A.amount_in_usd IS NULL
OR A.amount_out IS NULL
OR A.amount_out_usd IS NULL
OR A.symbol_in IS NULL
OR A.symbol_out IS NULL
OR A.swapper IS NULL
)
AND (
b_in.price IS NOT NULL
OR b_out.price IS NOT NULL
OR c_in.token_decimals IS NOT NULL
OR c_out.token_decimals IS NOT NULL
OR d.sei_address IS NOT NULL
)
{% endif %}
)
SELECT
*
FROM
fin qualify(ROW_NUMBER() over (PARTITION BY swaps_combined_id
ORDER BY
inserted_timestamp DESC) = 1)

View File

@ -0,0 +1,93 @@
{{ config(
materialized = 'incremental',
unique_key = 'ez_dex_swaps_id',
merge_exclude_columns = ["inserted_timestamp"],
cluster_by = ['block_timestamp::DATE','originated_from','platform'],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(tx_id,origin_from_address,swapper,token_in,token_out,symbol_in,symbol_out);",
meta ={ 'database_tags':{ 'table':{ 'PURPOSE': 'SWAPS' }} },
tags = ['noncore','recent_test']
) }}
SELECT
block_id,
block_timestamp,
tx_id,
originated_from,
platform,
swapper,
origin_from_address,
pool_address,
amount_in_unadj,
amount_in,
amount_in_usd,
amount_out_unadj,
amount_out,
amount_out_usd,
token_in,
symbol_in,
token_out,
symbol_out,
origin_function_signature,
INDEX,
origin_to_address,
sender,
tx_to,
event_name,
swaps_combined_id AS ez_dex_swaps_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
{{ ref('silver_evm_dex__swaps_combined') }}
{% if is_incremental() %}
WHERE
modified_timestamp >= (
SELECT
MAX(modified_timestamp) - INTERVAL '{{ var("LOOKBACK", "30 minutes") }}'
FROM
{{ this }}
)
{% endif %}
UNION ALL
SELECT
block_id,
block_timestamp,
tx_id,
originated_from,
platform,
swapper,
origin_from_address,
pool_address,
amount_in_unadj,
amount_in,
amount_in_usd,
amount_out_unadj,
amount_out,
amount_out_usd,
token_in,
symbol_in,
token_out,
symbol_out,
origin_function_signature,
INDEX,
origin_to_address,
sender,
tx_to,
event_name,
swaps_combined_id AS ez_dex_swaps_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
{{ ref('silver__dex_swaps_combined') }}
{% if is_incremental() %}
WHERE
modified_timestamp >= (
SELECT
MAX(modified_timestamp) - INTERVAL '{{ var("LOOKBACK", "30 minutes") }}'
FROM
{{ this }}
)
{% endif %}

View File

@ -0,0 +1,107 @@
version: 2
models:
- name: defi__ez_dex_swaps
description: Records swap transactions on Sei (both native sei and evm protocols) for Astroport, Fuzio, Seaswap, and Dragonswap
columns:
- name: BLOCK_ID
description: "{{ doc('block_id') }}"
data_tests:
- not_null:
where: block_timestamp > current_date() - {{ var('test_days_threshold', 7) }}
- name: BLOCK_TIMESTAMP
description: "{{ doc('block_timestamp') }}"
data_tests:
- not_null:
where: block_timestamp > current_date() - {{ var('test_days_threshold', 7) }} or block_timestamp is null
- name: TX_ID
description: This will be either the native sei tx_id or the evm tx_hash. Swaps that are initiated on and native sei protocol will have a tx_id that can be find in CORE.FAC_TRANSACTIONS. Swaps that are initiated on an evm protocol will have a tx_id that is the tx_hash of the evm transaction that can be found in CORE_EVM.FACT_TRANSACTIONS.
data_tests:
- not_null:
where: block_timestamp > current_date() - {{ var('test_days_threshold', 7) }}
- name: originated_from
description: The evm from address of this transaction.
data_tests:
- not_null:
where: block_timestamp > current_date() - {{ var('test_days_threshold', 7) }}
- name: PLATFORM
description: The platform that the swap was initiated on. This will be either astroport, fuzio, seaswap, or dragonswap.
data_tests:
- not_null:
where: block_timestamp > current_date() - {{ var('test_days_threshold', 7) }}
- name: SWAPPER
description: "{{ doc('swapper') }}"
data_tests:
- not_null:
where: originated_from = 'sei' and block_timestamp > current_date() - {{ var('test_days_threshold', 7) }}
- name: origin_from_address
description: "{{doc('sei_origin_from')}}"
data_tests:
- not_null:
where: originated_from = 'evm' and block_timestamp > current_date() - {{ var('test_days_threshold', 7) }}
- name: pool_address
description: The contract address for the liquidity pool.
data_tests:
- not_null:
where: block_timestamp > current_date() - {{ var('test_days_threshold', 7) }}
- name: amount_in_unadj
description: The non-decimal adjusted amount of tokens put into the swap.
data_tests:
- not_null:
where: block_timestamp > current_date() - {{ var('test_days_threshold', 7) }}
- name: amount_in
description: The amount of tokens put into the swap. This will be null if we do not have the decimal adjustment for the token.
- name: amount_in_usd
description: The amount of tokens put into the swap converted to USD using the price of the token. This will be null if we do not have the decimal adjustment or price for the token.
- name: amount_out_unadj
description: The non-decimal adjusted amount of tokens taken out of or received from the swap. This will be null if we do not have the decimal adjustment for the token.
data_tests:
- not_null:
where: block_timestamp > current_date() - {{ var('test_days_threshold', 7) }}
- name: amount_out
description: The amount of tokens taken out or received from the swap. This will be null if we do not have the decimal adjustment for the token.
- name: amount_out_usd
description: The amount of tokens taken out or received from the swap converted to USD using the price of the token. This will be null if we do not have the decimal adjustment or price for the token.
- name: token_in
description: The address of the token sent for swap.
data_tests:
- not_null:
where: block_timestamp > current_date() - {{ var('test_days_threshold', 7) }}
- name: symbol_in
description: The symbol of the token sent for swap.
- name: token_out
description: The address of the token being swapped to.
- name: symbol_out
description: The symbol of the token being swapped to.
- name: origin_function_signature
description: "{{doc('sei_tx_origin_sig')}}"
data_tests:
- not_null:
where: originated_from = 'evm' and block_timestamp > current_date() - {{ var('test_days_threshold', 7) }}
- name: INDEX
description: This is the event_index for evm transactions or the msg_index for native sei transactions. This is used to uniquely identify the action within the transaction.
data_tests:
- not_null:
where: block_timestamp > current_date() - {{ var('test_days_threshold', 7) }}
- name: origin_to_address
description: "{{doc('sei_eth_origin_to')}} Note: this will only be populated for evm transactions."
data_tests:
- not_null:
where: originated_from = 'evm' and block_timestamp > current_date() - {{ var('test_days_threshold', 7) }}
- name: sender
description: "The Router is the Sender in the swap function. Note: this will only be populated for evm transactions."
data_tests:
- not_null:
where: originated_from = 'evm' and block_timestamp > current_date() - {{ var('test_days_threshold', 7) }}
- name: tx_to
description: 'The tx_to is the address who receives the swapped token. This corresponds to the "to" field in the swap function. Note: this will only be populated for evm transactions.'
data_tests:
- not_null:
where: originated_from = 'evm' and block_timestamp > current_date() - {{ var('test_days_threshold', 7) }}
- name: event_name
description: The decoded event name for a given event.
- name: EZ_DEX_SWAPS_ID
description: '{{ doc("pk") }}'
- name: INSERTED_TIMESTAMP
description: '{{ doc("inserted_timestamp") }}'
- name: MODIFIED_TIMESTAMP
description: '{{ doc("modified_timestamp") }}'

View File

@ -1,7 +1,7 @@
version: 2
models:
- name: defi__fact_dex_swaps
description: Records swap transactions on Sei for Astroport, Fuzio, and Seaswap
description: "Deprecating soon!Records swap transactions on Sei for Astroport, Fuzio, and Seaswap. Note: this only contains swaps from native sei protocols."
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:

View File

@ -0,0 +1,296 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'merge',
unique_key = 'swaps_combined_id',
merge_exclude_columns = ["inserted_timestamp"],
cluster_by = ['modified_timestamp::DATE'],
tags = ['noncore']
) }}
{% if execute %}
{% if is_incremental() %}
-- get the max modified_timestamp from the target table
{% set max_m_query %}
SELECT
MAX(modified_timestamp) - INTERVAL '{{ var("LOOKBACK", "30 minutes") }}' AS modified_timestamp
FROM
{{ this }}
{% endset %}
{% set max_mod_timestamp = run_query(max_m_query).columns [0].values() [0] %}
{% endif %}
{% endif %}
WITH inc AS (
SELECT
'astroport' AS platform,
block_id,
block_timestamp,
tx_succeeded,
tx_id,
swapper,
msg_index,
amount_in,
currency_in,
amount_out,
currency_out,
pool_address,
dex_swaps_astroport_id AS uk
FROM
{{ ref('silver__dex_swaps_astroport') }}
{% if is_incremental() and 'astroport' not in var('HEAL_MODELS') %}
WHERE
modified_timestamp >= '{{ max_mod_timestamp }}'
{% endif %}
UNION ALL
SELECT
'fuzio' AS platform,
block_id,
block_timestamp,
tx_succeeded,
tx_id,
swapper,
msg_index,
amount_in,
currency_in,
amount_out,
currency_out,
pool_address,
dex_swaps_fuzio_id AS uk
FROM
{{ ref('silver__dex_swaps_fuzio') }}
{% if is_incremental() and 'fuzio' not in var('HEAL_MODELS') %}
WHERE
modified_timestamp >= '{{ max_mod_timestamp }}'
{% endif %}
UNION ALL
SELECT
'seaswap' AS platform,
block_id,
block_timestamp,
tx_succeeded,
tx_id,
swapper,
msg_index,
amount_in,
currency_in,
amount_out,
currency_out,
pool_address,
dex_swaps_seaswap_id AS uk
FROM
{{ source(
'silver',
'dex_swaps_seaswap'
) }}
{% if is_incremental() and 'seaswap' not in var('HEAL_MODELS') %}
WHERE
modified_timestamp >= '{{ max_mod_timestamp }}'
{% endif %}
)
{% if is_incremental() %},
mod_price AS (
SELECT
token_address,
HOUR,
price
FROM
{{ ref('price__ez_prices_hourly') }}
WHERE
modified_timestamp >= '{{ max_mod_timestamp }}'
),
mod_decimal AS (
SELECT
currency,
decimals,
symbol
FROM
{{ ref('core__dim_tokens') }}
WHERE
modified_timestamp >= '{{ max_mod_timestamp }}'
),
mod_map AS (
SELECT
evm_address,
sei_address
FROM
{{ ref('core__dim_address_mapping') }}
WHERE
modified_timestamp >= '{{ max_mod_timestamp }}'
)
{% endif %},
fin AS (
SELECT
A.block_id,
A.block_timestamp,
A.tx_id AS tx_id,
'sei' AS originated_from,
A.platform,
A.swapper,
d.evm_address AS origin_from_address,
A.pool_address,
A.amount_in AS amount_in_unadj,
CASE
WHEN c_in.decimals IS NOT NULL THEN (amount_in_unadj / pow(10, c_in.decimals))
END AS amount_in,
CASE
WHEN c_in.decimals IS NOT NULL THEN amount_in * b_in.price
END AS amount_in_usd,
A.amount_out AS amount_out_unadj,
CASE
WHEN c_out.decimals IS NOT NULL THEN (amount_out_unadj / pow(10, c_out.decimals))
END AS amount_out,
CASE
WHEN c_out.decimals IS NOT NULL THEN amount_out * b_out.price
END AS amount_out_usd,
A.currency_in AS token_in,
c_in.symbol AS symbol_in,
A.currency_out AS token_out,
c_out.symbol AS symbol_out,
NULL AS origin_function_signature,
A.msg_index AS INDEX,
NULL AS origin_to_address,
NULL AS sender,
NULL AS tx_to,
NULL AS event_name,
uk AS swaps_combined_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
inc A
LEFT JOIN {{ ref('price__ez_prices_hourly') }}
b_in
ON A.currency_in = b_in.token_address
AND DATE_TRUNC(
'hour',
A.block_timestamp
) = b_in.hour
LEFT JOIN {{ ref('price__ez_prices_hourly') }}
b_out
ON A.currency_out = b_out.token_address
AND DATE_TRUNC(
'hour',
A.block_timestamp
) = b_out.hour
LEFT JOIN {{ ref('core__dim_tokens') }}
c_in
ON A.currency_in = c_in.currency
LEFT JOIN {{ ref('core__dim_tokens') }}
c_out
ON A.currency_out = c_out.currency
LEFT JOIN {{ ref('core__dim_address_mapping') }}
d
ON A.swapper = d.sei_address
{% if is_incremental() %}
UNION ALL
SELECT
A.block_id,
A.block_timestamp,
A.tx_id,
A.originated_from,
A.platform,
A.swapper,
COALESCE(
A.origin_from_address,
d.evm_address
) AS origin_from_address,
A.pool_address,
A.amount_in_unadj,
COALESCE(
A.amount_in,
CASE
WHEN c_in.decimals IS NOT NULL THEN (A.amount_in_unadj / pow(10, c_in.decimals))
END
) AS amount_in,
COALESCE(
A.amount_in_usd,
CASE
WHEN c_in.decimals IS NOT NULL THEN (A.amount_in_unadj / pow(10, c_in.decimals)) * b_in.price
END
) AS amount_in_usd,
A.amount_out_unadj,
COALESCE(
A.amount_out,
CASE
WHEN c_out.decimals IS NOT NULL THEN (A.amount_out_unadj / pow(10, c_in.decimals))
END
) AS amount_out,
COALESCE(
A.amount_out_usd,
CASE
WHEN c_out.decimals IS NOT NULL THEN (A.amount_out_unadj / pow(10, c_in.decimals)) * b_out.price
END
) AS amount_out_usd,
A.token_in,
COALESCE(
A.symbol_in,
c_in.symbol
) AS symbol_in,
A.token_out,
COALESCE(
A.symbol_out,
c_out.symbol
) AS symbol_out,
A.origin_function_signature,
A.index,
A.origin_to_address,
A.sender,
A.tx_to,
A.event_name,
A.swaps_combined_id,
A.inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
{{ this }} A
LEFT JOIN mod_price b_in
ON A.token_in = b_in.token_address
AND DATE_TRUNC(
'hour',
A.block_timestamp
) = b_in.hour
LEFT JOIN mod_price b_out
ON A.token_out = b_out.token_address
AND DATE_TRUNC(
'hour',
A.block_timestamp
) = b_out.hour
LEFT JOIN mod_decimal c_in
ON A.token_in = c_in.currency
LEFT JOIN mod_decimal c_out
ON A.token_out = c_out.currency
LEFT JOIN mod_map d
ON A.swapper = d.sei_address
WHERE
(
A.amount_in IS NULL
OR A.amount_in_usd IS NULL
OR A.amount_out IS NULL
OR A.amount_out_usd IS NULL
OR A.symbol_in IS NULL
OR A.symbol_out IS NULL
OR A.swapper IS NULL
)
AND (
b_in.price IS NOT NULL
OR b_out.price IS NOT NULL
OR c_in.decimals IS NOT NULL
OR c_out.decimals IS NOT NULL
OR d.evm_address IS NOT NULL
)
{% endif %}
)
SELECT
*
FROM
fin qualify(ROW_NUMBER() over (PARTITION BY swaps_combined_id
ORDER BY
inserted_timestamp DESC) = 1)

View File

@ -0,0 +1,12 @@
version: 2
models:
- name: silver__dex_swaps_combined
columns:
- name: BLOCK_NUMBER
- name: BLOCK_TIMESTAMP
tests:
- dbt_expectations.expect_row_values_to_have_recent_data:
datepart: day
interval: 1