Merge pull request #107 from FlipsideCrypto/AN-6241/core-dex

AN-6241/core-dex
This commit is contained in:
San Yong 2025-06-04 09:41:06 +08:00 committed by GitHub
commit 0b29942a23
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
22 changed files with 2934 additions and 0 deletions

View File

@ -0,0 +1,27 @@
{{ config(
materialized = 'view',
persist_docs ={ "relation": true,
"columns": true },
meta ={ 'database_tags':{ 'table':{ 'PROTOCOL': 'SUSHI, BITFLUX, GLYPH, COREX',
'PURPOSE': 'DEX, LIQUIDITY, POOLS, LP, SWAPS',}}},
tags = ['gold','defi','dex','curated']
) }}
SELECT
block_number AS creation_block,
block_timestamp AS creation_time,
tx_hash AS creation_tx,
platform,
contract_address AS factory_address,
pool_address,
pool_name,
tokens,
symbols,
decimals,
{{ dbt_utils.generate_surrogate_key(
['pool_address']
) }} AS dim_dex_liquidity_pools_id,
inserted_timestamp,
modified_timestamp
FROM
{{ ref('silver_dex__complete_dex_liquidity_pools') }}

View File

@ -0,0 +1,32 @@
version: 2
models:
- name: defi__dim_dex_liquidity_pools
description: '{{ doc("evm_dim_dex_lp_table_doc") }}'
columns:
- name: CREATION_BLOCK
description: '{{ doc("evm_dex_creation_block") }}'
- name: CREATION_TIME
description: '{{ doc("evm_dex_creation_time") }}'
- name: CREATION_TX
description: '{{ doc("evm_dex_creation_tx") }}'
- name: FACTORY_ADDRESS
description: '{{ doc("evm_dex_factory_address") }}'
- name: PLATFORM
description: '{{ doc("evm_dex_platform") }}'
- name: POOL_ADDRESS
description: '{{ doc("evm_dex_pool_address") }}'
- name: POOL_NAME
description: '{{ doc("evm_dex_pool_name") }}'
- name: TOKENS
description: '{{ doc("evm_dex_lp_tokens") }}'
- name: SYMBOLS
description: '{{ doc("evm_dex_lp_symbols") }}'
- name: DECIMALS
description: '{{ doc("evm_dex_lp_decimals") }}'
- name: DIM_DEX_LIQUIDITY_POOLS_ID
description: '{{ doc("evm_pk") }}'
- name: INSERTED_TIMESTAMP
description: '{{ doc("evm_inserted_timestamp") }}'
- name: MODIFIED_TIMESTAMP
description: '{{ doc("evm_modified_timestamp") }}'

View File

@ -0,0 +1,62 @@
{{ config(
materialized = 'view',
persist_docs ={ "relation": true,
"columns": true },
meta ={ 'database_tags':{ 'table':{ 'PROTOCOL': 'SUSHI, BITFLUX, GLYPH, COREX',
'PURPOSE': 'DEX, SWAPS' }}},
tags = ['gold','defi','dex','curated','ez']
) }}
SELECT
block_number,
block_timestamp,
tx_hash,
origin_function_signature,
origin_from_address,
origin_to_address,
contract_address,
pool_name,
event_name,
amount_in_unadj,
amount_in,
ROUND(
CASE
WHEN token_in <> '0x191e94fa59739e188dce837f7f6978d84727ad01'
AND (
amount_out_usd IS NULL
OR ABS((amount_in_usd - amount_out_usd) / NULLIF(amount_out_usd, 0)) > 0.75
OR ABS((amount_in_usd - amount_out_usd) / NULLIF(amount_in_usd, 0)) > 0.75
) THEN NULL
ELSE amount_in_usd
END,
2
) AS amount_in_usd,
amount_out_unadj,
amount_out,
ROUND(
CASE
WHEN token_out <> '0x191e94fa59739e188dce837f7f6978d84727ad01'
AND (
amount_in_usd IS NULL
OR ABS((amount_out_usd - amount_in_usd) / NULLIF(amount_in_usd, 0)) > 0.75
OR ABS((amount_out_usd - amount_in_usd) / NULLIF(amount_out_usd, 0)) > 0.75
) THEN NULL
ELSE amount_out_usd
END,
2
) AS amount_out_usd,
sender,
tx_to,
event_index,
platform,
token_in,
token_out,
symbol_in,
symbol_out,
{{ dbt_utils.generate_surrogate_key(
['tx_hash','event_index']
) }} AS ez_dex_swaps_id,
inserted_timestamp,
modified_timestamp
FROM
{{ ref('silver_dex__complete_dex_swaps') }}

View File

@ -0,0 +1,56 @@
version: 2
models:
- name: defi__ez_dex_swaps
description: '{{ doc("evm_ez_dex_swaps_table_doc") }}'
columns:
- name: BLOCK_NUMBER
description: '{{ doc("evm_block_number") }}'
- name: BLOCK_TIMESTAMP
description: '{{ doc("evm_block_timestamp") }}'
- name: TX_HASH
description: '{{ doc("evm_tx_hash") }}'
- name: CONTRACT_ADDRESS
description: '{{ doc("evm_contract_address") }}'
- name: EVENT_NAME
description: '{{ doc("evm_event_name") }}'
- name: AMOUNT_IN
description: '{{ doc("evm_dex_swaps_amount_in") }}'
- name: AMOUNT_OUT
description: '{{ doc("evm_dex_swaps_amount_out") }}'
- name: AMOUNT_IN_USD
description: '{{ doc("evm_dex_swaps_amount_in_usd") }}'
- name: AMOUNT_OUT_USD
description: '{{ doc("evm_dex_swaps_amount_out_usd") }}'
- name: TOKEN_IN
description: '{{ doc("evm_dex_swaps_token_in") }}'
- name: TOKEN_OUT
description: '{{ doc("evm_dex_swaps_token_out") }}'
- name: SYMBOL_IN
description: '{{ doc("evm_dex_swaps_symbol_in") }}'
- name: SYMBOL_OUT
description: '{{ doc("evm_dex_swaps_symbol_out") }}'
- name: SENDER
description: '{{ doc("evm_dex_swaps_sender") }}'
- name: TX_TO
description: '{{ doc("evm_dex_swaps_tx_to") }}'
- name: PLATFORM
description: '{{ doc("evm_dex_platform") }}'
- name: EVENT_INDEX
description: '{{ doc("evm_event_index") }}'
- name: ORIGIN_FUNCTION_SIGNATURE
description: '{{ doc("evm_tx_origin_sig") }}'
- name: ORIGIN_FROM_ADDRESS
description: '{{ doc("evm_origin_from") }}'
- name: ORIGIN_TO_ADDRESS
description: '{{ doc("evm_origin_to") }}'
- name: AMOUNT_IN_UNADJ
description: '{{ doc("evm_dex_swaps_amount_in_unadj") }}'
- name: AMOUNT_OUT_UNADJ
description: '{{ doc("evm_dex_swaps_amount_out_unadj") }}'
- name: EZ_DEX_SWAPS_ID
description: '{{ doc("evm_pk") }}'
- name: INSERTED_TIMESTAMP
description: '{{ doc("evm_inserted_timestamp") }}'
- name: MODIFIED_TIMESTAMP
description: '{{ doc("evm_modified_timestamp") }}'

View File

@ -0,0 +1,161 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'delete+insert',
unique_key = 'pool_address',
cluster_by = ['block_timestamp::DATE'],
tags = ['silver_dex','defi','dex','curated']
) }}
WITH pool_traces AS (
SELECT
block_number,
block_timestamp,
tx_hash,
to_address AS pool_address,
to_address AS contract_address,
regexp_substr_all(SUBSTR(input, 11, len(input)), '.{64}') AS segmented_data,
TRY_TO_NUMBER(utils.udf_hex_to_int(segmented_data [0] :: STRING)) / 32 AS token_index,
TRY_TO_NUMBER(utils.udf_hex_to_int(segmented_data [1] :: STRING)) / 32 AS decimal_index,
TRY_TO_NUMBER(
utils.udf_hex_to_int(
segmented_data [token_index] :: STRING
)
) AS token_number,
TRY_TO_NUMBER(
utils.udf_hex_to_int(
segmented_data [decimal_index] :: STRING
)
) AS decimals_number,
TRY_TO_NUMBER(utils.udf_hex_to_int(segmented_data [5] :: STRING)) * pow(
10,
-10
) AS swap_fee,
TRY_TO_NUMBER(utils.udf_hex_to_int(segmented_data [6] :: STRING)) * pow(
10,
-10
) AS admin_fee,
-- 50% of swap fee
CONCAT('0x', SUBSTR(segmented_data [7] :: STRING, 25, 40)) AS lp_token,
CONCAT(
'0x',
SUBSTR(
segmented_data [token_index+1] :: STRING,
25,
40
)
) AS token0,
CONCAT(
'0x',
SUBSTR(
segmented_data [token_index+2] :: STRING,
25,
40
)
) AS token1,
CASE
WHEN token_number > 2 THEN CONCAT(
'0x',
SUBSTR(
segmented_data [token_index+3] :: STRING,
25,
40
)
)
ELSE NULL
END AS token2,
CASE
WHEN token_number > 3 THEN CONCAT(
'0x',
SUBSTR(
segmented_data [token_index+4] :: STRING,
25,
40
)
)
ELSE NULL
END AS token3,
TRY_TO_NUMBER(
utils.udf_hex_to_int(
segmented_data [decimal_index+1] :: STRING
)
) AS decimal0,
TRY_TO_NUMBER(
utils.udf_hex_to_int(
segmented_data [decimal_index+2] :: STRING
)
) AS decimal1,
CASE
WHEN decimals_number > 2 THEN TRY_TO_NUMBER(
utils.udf_hex_to_int(
segmented_data [decimal_index+3] :: STRING
)
)
ELSE NULL
END AS decimal2,
CASE
WHEN decimals_number > 3 THEN TRY_TO_NUMBER(
utils.udf_hex_to_int(
segmented_data [decimal_index+4] :: STRING
)
)
ELSE NULL
END AS decimal3,
utils.udf_hex_to_string(
segmented_data [array_size(segmented_data)-3] :: STRING
) AS lp_name,
utils.udf_hex_to_string(
segmented_data [array_size(segmented_data)-1] :: STRING
) AS lp_symbol,
CONCAT(
tx_hash :: STRING,
'-',
trace_index :: STRING -- using trace_index instead of event_index
) AS _log_id,
modified_timestamp AS _inserted_timestamp
FROM
{{ ref('core__fact_traces') }}
WHERE
1 = 1
AND origin_function_signature = '0xb28cb6dc'
AND LEFT(
input,
10
) = '0xb28cb6dc'
AND trace_succeeded
{% if is_incremental() %}
AND _inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) - INTERVAL '12 hours'
FROM
{{ this }}
)
AND _inserted_timestamp >= SYSDATE() - INTERVAL '7 day'
{% endif %}
)
SELECT
block_number,
block_timestamp,
tx_hash,
contract_address,
pool_address,
token0,
token1,
token2,
token3,
decimal0,
decimal1,
decimal2,
decimal3,
lp_name,
lp_symbol,
swap_fee,
admin_fee,
lp_token,
_log_id,
_inserted_timestamp
FROM
pool_traces qualify(ROW_NUMBER() over (PARTITION BY pool_address
ORDER BY
_inserted_timestamp DESC)) = 1

View File

@ -0,0 +1,23 @@
version: 2
models:
- name: silver_dex__bitflux_pools
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- POOL_ADDRESS
columns:
- name: BLOCK_NUMBER
tests:
- not_null
- name: POOL_ADDRESS
tests:
- not_null
- name: TOKEN0
tests:
- not_null
- name: TOKEN1
tests:
- not_null

View File

@ -0,0 +1,116 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'delete+insert',
unique_key = 'block_number',
cluster_by = ['block_timestamp::DATE'],
tags = ['silver_dex','defi','dex','curated']
) }}
WITH bitflux_pools AS (
SELECT
pool_address,
token0,
token1,
token2,
token3,
decimal0,
decimal1,
decimal2,
decimal3
FROM
{{ ref('silver_dex__bitflux_pools') }}
),
base_swaps AS (
SELECT
l.block_number,
l.block_timestamp,
l.tx_hash,
l.contract_address,
l.origin_function_signature,
l.origin_from_address,
l.origin_to_address,
event_index,
COALESCE(
p1.pool_address,
p2.pool_address
) AS pool_address,
CONCAT('0x', SUBSTR(topic_1, 27, 40)) AS buyer,
regexp_substr_all(SUBSTR(DATA, 3, len(DATA)), '.{64}') AS segmented_data,
TRY_TO_NUMBER(utils.udf_hex_to_int(segmented_data [0] :: STRING)) AS tokensSold,
TRY_TO_NUMBER(utils.udf_hex_to_int(segmented_data [1] :: STRING)) AS tokensBought,
TRY_TO_NUMBER(utils.udf_hex_to_int(segmented_data [2] :: STRING)) AS soldId,
TRY_TO_NUMBER(utils.udf_hex_to_int(segmented_data [3] :: STRING)) AS boughtId,
p1.token0,
p1.token1,
p1.token2,
p1.token3,
CASE
WHEN boughtId = 0 THEN p1.token0
WHEN boughtId = 1 THEN p1.token1
WHEN boughtId = 2 THEN p1.token2
WHEN boughtId = 3 THEN p1.token3
ELSE NULL
END AS token_out,
CASE
WHEN soldId = 0 THEN p2.token0
WHEN soldId = 1 THEN p2.token1
WHEN soldId = 2 THEN p2.token2
WHEN soldId = 3 THEN p2.token3
ELSE NULL
END AS token_in,
tokensSold AS amount_in_unadj,
tokensBought AS amount_out_unadj,
CONCAT(
l.tx_hash :: STRING,
'-',
event_index :: STRING
) AS _log_id,
modified_timestamp AS _inserted_timestamp
FROM
{{ ref('core__fact_event_logs') }}
l
INNER JOIN bitflux_pools p1
ON l.contract_address = p1.pool_address
INNER JOIN bitflux_pools p2
ON l.contract_address = p2.pool_address
WHERE
topic_0 = '0xc6c1e0630dbe9130cc068028486c0d118ddcea348550819defd5cb8c257f8a38'
AND tx_succeeded
{% if is_incremental() %}
AND _inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) - INTERVAL '12 hours'
FROM
{{ this }}
)
AND _inserted_timestamp >= SYSDATE() - INTERVAL '7 day'
{% endif %}
)
SELECT
block_number,
block_timestamp,
tx_hash,
contract_address,
pool_address,
origin_function_signature,
origin_from_address,
origin_to_address,
buyer AS recipient,
buyer AS sender,
buyer AS tx_to,
'TokenSwap' AS event_name,
event_index,
token0,
token1,
token2,
token3,
token_in,
token_out,
amount_in_unadj,
amount_out_unadj,
_log_id,
_inserted_timestamp
FROM
base_swaps

View File

@ -0,0 +1,43 @@
version: 2
models:
- name: silver_dex__bitflux_swaps
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- _LOG_ID
columns:
- name: BLOCK_NUMBER
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: BLOCK_TIMESTAMP
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_LTZ
- TIMESTAMP_NTZ
- name: POOL_ADDRESS
tests:
- not_null
- name: RECIPIENT
tests:
- not_null
- name: SENDER
tests:
- not_null
- name: TOKEN0
tests:
- not_null
- name: TOKEN1
tests:
- not_null
- name: AMOUNT_IN_UNADJ
tests:
- not_null
- name: AMOUNT_OUT_UNADJ
tests:
- not_null

View File

@ -0,0 +1,68 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'delete+insert',
unique_key = 'pool_address',
cluster_by = ['block_timestamp::DATE'],
tags = ['silver_dex','defi','dex','curated']
) }}
WITH pool_creation AS (
SELECT
block_number,
block_timestamp,
tx_hash,
event_index,
contract_address,
regexp_substr_all(SUBSTR(DATA, 3, len(DATA)), '.{64}') AS segmented_data,
CONCAT('0x', SUBSTR(topic_1, 27, 40)) AS token0,
CONCAT('0x', SUBSTR(topic_2, 27, 40)) AS token1,
utils.udf_hex_to_int(
's2c',
topic_3
) :: INTEGER AS fee,
utils.udf_hex_to_int(
's2c',
segmented_data [0] :: STRING
) :: INTEGER AS tick_spacing,
CONCAT('0x', SUBSTR(segmented_data [1] :: STRING, 25, 40)) AS pool_address,
CONCAT(
tx_hash :: STRING,
'-',
event_index :: STRING
) AS _log_id,
modified_timestamp AS _inserted_timestamp
FROM
{{ ref('core__fact_event_logs') }}
WHERE
contract_address = '0x526190295afb6b8736b14e4b42744fbd95203a3a'
AND topic_0 = '0x783cca1c0412dd0d695e784568c96da2e9c22ff989357a2e8b1d9b2b4e6b7118'
AND tx_succeeded
{% if is_incremental() %}
AND _inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) - INTERVAL '12 hours'
FROM
{{ this }}
)
AND _inserted_timestamp >= SYSDATE() - INTERVAL '7 day'
{% endif %}
)
SELECT
block_number,
block_timestamp,
tx_hash,
contract_address,
event_index,
token0,
token1,
pool_address,
fee,
tick_spacing,
_log_id,
_inserted_timestamp
FROM
pool_creation qualify(ROW_NUMBER() over (PARTITION BY pool_address
ORDER BY
_inserted_timestamp DESC)) = 1

View File

@ -0,0 +1,23 @@
version: 2
models:
- name: silver_dex__corex_pools
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- POOL_ADDRESS
columns:
- name: BLOCK_NUMBER
tests:
- not_null
- name: POOL_ADDRESS
tests:
- not_null
- name: TOKEN0
tests:
- not_null
- name: TOKEN1
tests:
- not_null

View File

@ -0,0 +1,104 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'delete+insert',
unique_key = 'block_number',
cluster_by = ['block_timestamp::DATE'],
tags = ['silver_dex','defi','dex','curated']
) }}
WITH pool_data AS (
SELECT
token0,
token1,
fee,
tick_spacing,
pool_address
FROM
{{ ref('silver_dex__corex_pools') }}
),
base_swaps AS (
SELECT
block_number,
block_timestamp,
tx_hash,
contract_address,
event_index,
origin_function_signature,
origin_from_address,
origin_to_address,
CONCAT('0x', SUBSTR(topic_1, 27, 40)) AS sender,
CONCAT('0x', SUBSTR(topic_2, 27, 40)) AS recipient,
regexp_substr_all(SUBSTR(DATA, 3, len(DATA)), '.{64}') AS segmented_data,
utils.udf_hex_to_int(
's2c',
segmented_data [0] :: STRING
) :: FLOAT AS amount0_unadj,
utils.udf_hex_to_int(
's2c',
segmented_data [1] :: STRING
) :: FLOAT AS amount1_unadj,
utils.udf_hex_to_int(
's2c',
segmented_data [2] :: STRING
) :: FLOAT AS sqrtPriceX96,
utils.udf_hex_to_int(
's2c',
segmented_data [3] :: STRING
) :: FLOAT AS liquidity,
utils.udf_hex_to_int(
's2c',
segmented_data [4] :: STRING
) :: FLOAT AS tick,
CONCAT(
tx_hash :: STRING,
'-',
event_index :: STRING
) AS _log_id,
modified_timestamp AS _inserted_timestamp,
FROM
{{ ref('core__fact_event_logs') }} l
INNER JOIN pool_data p
ON p.pool_address = l.contract_address
WHERE
topic_0 = '0xc42079f94a6350d7e6235f29174924f928cc2ac818eb64fed8004e115fbcca67'
AND tx_succeeded
{% if is_incremental() %}
AND _inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) - INTERVAL '12 hours'
FROM
{{ this }}
)
AND _inserted_timestamp >= SYSDATE() - INTERVAL '7 day'
{% endif %}
)
SELECT
block_number,
block_timestamp,
tx_hash,
contract_address,
pool_address,
origin_function_signature,
origin_from_address,
origin_to_address,
recipient,
sender,
fee,
tick,
tick_spacing,
liquidity,
event_index,
token0,
token1,
amount0_unadj,
amount1_unadj,
_log_id,
_inserted_timestamp
FROM
base_swaps
INNER JOIN pool_data
ON pool_data.pool_address = base_swaps.contract_address qualify(ROW_NUMBER() over(PARTITION BY _log_id
ORDER BY
_inserted_timestamp DESC)) = 1

View File

@ -0,0 +1,43 @@
version: 2
models:
- name: silver_dex__corex_swaps
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- _LOG_ID
columns:
- name: BLOCK_NUMBER
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: BLOCK_TIMESTAMP
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_LTZ
- TIMESTAMP_NTZ
- name: POOL_ADDRESS
tests:
- not_null
- name: RECIPIENT
tests:
- not_null
- name: SENDER
tests:
- not_null
- name: TOKEN0
tests:
- not_null
- name: TOKEN1
tests:
- not_null
- name: AMOUNT0_UNADJ
tests:
- not_null
- name: AMOUNT1_UNADJ
tests:
- not_null

View File

@ -0,0 +1,155 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'delete+insert',
unique_key = 'pool_address',
cluster_by = ['block_timestamp::DATE'],
tags = ['silver_dex','defi','dex','curated']
) }}
WITH pool_creation AS (
SELECT
block_number,
block_timestamp,
tx_hash,
event_index,
contract_address,
CONCAT('0x', SUBSTR(topic_1, 27, 40)) AS token0,
CONCAT('0x', SUBSTR(topic_2, 27, 40)) AS token1,
CONCAT('0x', SUBSTR(DATA, 27, 40)) AS pool_address,
CONCAT(
tx_hash :: STRING,
'-',
event_index :: STRING
) AS _log_id,
modified_timestamp AS _inserted_timestamp
FROM
{{ ref('core__fact_event_logs') }}
WHERE
contract_address = '0x74efe55bea4988e7d92d03efd8ddb8bf8b7bd597'
AND topic_0 = '0x91ccaa7a278130b65168c3a0c8d3bcae84cf5e43704342bd3ec0b59e59c036db'
AND tx_succeeded
{% if is_incremental() %}
AND _inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) - INTERVAL '12 hours'
FROM
{{ this }}
)
AND _inserted_timestamp >= SYSDATE() - INTERVAL '7 day'
{% endif %}
),
initial_info AS (
SELECT
tx_hash,
contract_address,
regexp_substr_all(SUBSTR(DATA, 3, len(DATA)), '.{64}') AS segmented_data,
utils.udf_hex_to_int('s2c', CONCAT('0x', segmented_data [0] :: STRING)) AS price,
utils.udf_hex_to_int('s2c', CONCAT('0x', segmented_data [1] :: STRING)) AS tick
FROM
{{ ref('core__fact_event_logs') }}
WHERE
topics [0] :: STRING = '0x98636036cb66a9c19a37435efc1e90142190214e8abeb821bdba3f2990dd4c95'
AND tx_hash IN (
SELECT
tx_hash
FROM
pool_creation
)
AND tx_succeeded
{% if is_incremental() %}
AND modified_timestamp >= (
SELECT
MAX(_inserted_timestamp) - INTERVAL '12 hours'
FROM
{{ this }}
)
AND modified_timestamp >= SYSDATE() - INTERVAL '7 day'
{% endif %}
),
tick_spacing AS (
SELECT
tx_hash,
contract_address,
utils.udf_hex_to_int(
's2c',
DATA :: STRING
) :: INTEGER AS tick_spacing
FROM
{{ ref('core__fact_event_logs') }}
WHERE
topic_0 = '0x01413b1d5d4c359e9a0daa7909ecda165f6e8c51fe2ff529d74b22a5a7c02645'
AND tx_hash IN (
SELECT
tx_hash
FROM
pool_creation
)
AND tx_succeeded
{% if is_incremental() %}
AND modified_timestamp >= (
SELECT
MAX(_inserted_timestamp) - INTERVAL '12 hours'
FROM
{{ this }}
)
AND modified_timestamp >= SYSDATE() - INTERVAL '7 day'
{% endif %}
),
fee AS (
SELECT
tx_hash,
contract_address,
utils.udf_hex_to_int(
's2c',
DATA :: STRING
) :: INTEGER AS fee
FROM
{{ ref('core__fact_event_logs') }}
WHERE
topic_0 = '0x598b9f043c813aa6be3426ca60d1c65d17256312890be5118dab55b0775ebe2a'
AND tx_hash IN (
SELECT
tx_hash
FROM
pool_creation
)
AND tx_succeeded
{% if is_incremental() %}
AND modified_timestamp >= (
SELECT
MAX(_inserted_timestamp) - INTERVAL '12 hours'
FROM
{{ this }}
)
AND modified_timestamp >= SYSDATE() - INTERVAL '7 day'
{% endif %}
)
SELECT
block_number,
block_timestamp,
p.tx_hash,
p.contract_address,
event_index,
token0,
token1,
pool_address,
fee,
tick,
tick_spacing,
_log_id,
_inserted_timestamp
FROM
pool_creation p
INNER JOIN initial_info
ON initial_info.contract_address = p.pool_address
INNER JOIN tick_spacing
ON tick_spacing.contract_address = p.pool_address
INNER JOIN fee
ON fee.contract_address = p.pool_address qualify(ROW_NUMBER() over (PARTITION BY pool_address
ORDER BY
_inserted_timestamp DESC)) = 1

View File

@ -0,0 +1,23 @@
version: 2
models:
- name: silver_dex__glyph_v4_pools
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- POOL_ADDRESS
columns:
- name: BLOCK_NUMBER
tests:
- not_null
- name: POOL_ADDRESS
tests:
- not_null
- name: TOKEN0
tests:
- not_null
- name: TOKEN1
tests:
- not_null

View File

@ -0,0 +1,117 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'delete+insert',
unique_key = 'block_number',
cluster_by = ['block_timestamp::DATE'],
tags = ['silver_dex','defi','dex','curated']
) }}
WITH swaps_base AS (
SELECT
l.block_number,
l.block_timestamp,
l.tx_hash,
l.event_index,
l.origin_function_signature,
l.origin_from_address,
l.origin_to_address,
l.contract_address,
regexp_substr_all(SUBSTR(DATA, 3, len(DATA)), '.{64}') AS segmented_data,
CONCAT('0x', SUBSTR(topic_1, 27, 40)) AS sender,
CONCAT('0x', SUBSTR(topic_2, 27, 40)) AS recipient,
utils.udf_hex_to_int(
's2c',
segmented_data [0] :: STRING
) :: FLOAT AS amount0_unadj,
utils.udf_hex_to_int(
's2c',
segmented_data [1] :: STRING
) :: FLOAT AS amount1_unadj,
utils.udf_hex_to_int(
's2c',
segmented_data [2] :: STRING
) :: FLOAT AS sqrtPriceX96,
utils.udf_hex_to_int(
's2c',
segmented_data [3] :: STRING
) :: FLOAT AS liquidity,
utils.udf_hex_to_int(
's2c',
segmented_data [4] :: STRING
) :: FLOAT AS tick,
token0,
token1,
pool_address,
tick_spacing,
fee,
CONCAT(
l.tx_hash,
'-',
l.event_index
) AS _log_id,
l.modified_timestamp
FROM
{{ ref('core__fact_event_logs') }}
l
INNER JOIN {{ ref('silver_dex__glyph_v4_pools') }}
p
ON p.pool_address = l.contract_address
WHERE
topic_0 = '0xc42079f94a6350d7e6235f29174924f928cc2ac818eb64fed8004e115fbcca67'
AND tx_succeeded
{% if is_incremental() %}
AND l.modified_timestamp >= (
SELECT
MAX(modified_timestamp) - INTERVAL '12 hours'
FROM
{{ this }}
)
AND l.modified_timestamp >= SYSDATE() - INTERVAL '7 day'
{% endif %}
)
SELECT
block_number,
block_timestamp,
tx_hash,
event_index,
origin_function_signature,
origin_from_address,
origin_to_address,
contract_address,
pool_address,
recipient,
recipient AS tx_to,
sender,
fee,
tick,
tick_spacing,
liquidity,
token0,
token1,
amount0_unadj,
amount1_unadj,
CASE
WHEN amount0_unadj > 0 THEN ABS(amount0_unadj)
ELSE ABS(amount1_unadj)
END AS amount_in_unadj,
CASE
WHEN amount0_unadj < 0 THEN ABS(amount0_unadj)
ELSE ABS(amount1_unadj)
END AS amount_out_unadj,
CASE
WHEN amount0_unadj > 0 THEN token0
ELSE token1
END AS token_in,
CASE
WHEN amount0_unadj < 0 THEN token0
ELSE token1
END AS token_out,
_log_id,
modified_timestamp
FROM
swaps_base qualify(ROW_NUMBER() over(PARTITION BY _log_id
ORDER BY
modified_timestamp DESC)) = 1

View File

@ -0,0 +1,43 @@
version: 2
models:
- name: silver_dex__glyph_v4_swaps
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- _LOG_ID
columns:
- name: BLOCK_NUMBER
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: BLOCK_TIMESTAMP
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_LTZ
- TIMESTAMP_NTZ
- name: POOL_ADDRESS
tests:
- not_null
- name: RECIPIENT
tests:
- not_null
- name: SENDER
tests:
- not_null
- name: TOKEN0
tests:
- not_null
- name: TOKEN1
tests:
- not_null
- name: AMOUNT0_UNADJ
tests:
- not_null
- name: AMOUNT1_UNADJ
tests:
- not_null

View File

@ -0,0 +1,911 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'delete+insert',
unique_key = ['block_number','platform','version'],
cluster_by = ['block_timestamp::DATE','platform'],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(tx_hash, contract_address, pool_address, pool_name, tokens, symbols), SUBSTRING(pool_address, pool_name, tokens, symbols)",
tags = ['silver_dex','defi','dex','curated','heal']
) }}
WITH contracts AS (
SELECT
contract_address,
token_symbol,
token_decimals,
_inserted_timestamp
FROM
{{ ref('silver__contracts') }}
),
bitflux AS (
SELECT
block_number,
block_timestamp,
tx_hash,
contract_address,
pool_address,
NULL AS pool_name,
NULL AS fee,
NULL AS tick_spacing,
token0,
token1,
token2,
token3,
NULL AS token4,
NULL AS token5,
NULL AS token6,
NULL AS token7,
'bitflux' AS platform,
'v1' AS version,
_log_id AS _id,
_inserted_timestamp
FROM
{{ ref('silver_dex__bitflux_pools') }}
{% if is_incremental() and 'bitflux' not in var('HEAL_MODELS') %}
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}'
FROM
{{ this }}
)
{% endif %}
),
corex AS (
SELECT
block_number,
block_timestamp,
tx_hash,
contract_address,
pool_address,
NULL AS pool_name,
fee,
tick_spacing,
token0,
token1,
NULL AS token2,
NULL AS token3,
NULL AS token4,
NULL AS token5,
NULL AS token6,
NULL AS token7,
'corex' AS platform,
'v1' AS version,
_log_id AS _id,
_inserted_timestamp
FROM
{{ ref('silver_dex__corex_pools') }}
{% if is_incremental() and 'corex' not in var('HEAL_MODELS') %}
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}'
FROM
{{ this }}
)
{% endif %}
),
glyph_v4 AS (
SELECT
block_number,
block_timestamp,
tx_hash,
contract_address,
pool_address,
NULL AS pool_name,
fee,
tick_spacing,
token0,
token1,
NULL AS token2,
NULL AS token3,
NULL AS token4,
NULL AS token5,
NULL AS token6,
NULL AS token7,
'glyph-v4' AS platform,
'v4' AS version,
_log_id AS _id,
_inserted_timestamp
FROM
{{ ref('silver_dex__glyph_v4_pools') }}
{% if is_incremental() and 'glyph_v4' not in var('HEAL_MODELS') %}
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}'
FROM
{{ this }}
)
{% endif %}
),
sushi_v3 AS (
SELECT
block_number,
block_timestamp,
tx_hash,
contract_address,
pool_address,
NULL AS pool_name,
fee,
tick_spacing,
token0,
token1,
NULL AS token2,
NULL AS token3,
NULL AS token4,
NULL AS token5,
NULL AS token6,
NULL AS token7,
'sushi-v3' AS platform,
'v3' AS version,
_log_id AS _id,
modified_timestamp AS _inserted_timestamp
FROM
{{ ref('silver_dex__sushi_v3_pools') }}
{% if is_incremental() and 'sushi_v3' not in var('HEAL_MODELS') %}
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}'
FROM
{{ this }}
)
{% endif %}
),
all_pools AS (
SELECT
*
FROM
bitflux
UNION ALL
SELECT
*
FROM
corex
UNION ALL
SELECT
*
FROM
glyph_v4
UNION ALL
SELECT
*
FROM
sushi_v3
),
complete_lps AS (
SELECT
block_number,
block_timestamp,
tx_hash,
p.contract_address,
pool_address,
CASE
WHEN pool_name IS NOT NULL THEN pool_name
WHEN pool_name IS NULL
AND platform IN (
'corex',
'glyph-v4',
'sushi-v3'
) THEN CONCAT(
COALESCE(
c0.token_symbol,
CONCAT(SUBSTRING(token0, 1, 5), '...', SUBSTRING(token0, 39, 42))
),
'-',
COALESCE(
c1.token_symbol,
CONCAT(SUBSTRING(token1, 1, 5), '...', SUBSTRING(token1, 39, 42))
),
' ',
COALESCE(
fee,
0
),
' ',
COALESCE(
tick_spacing,
0
),
CASE
WHEN platform = 'corex' THEN ' COREX LP'
WHEN platform = 'glyph-v4' THEN ' GLYPH-V4 LP'
WHEN platform = 'sushi-v3' THEN ' SUSHI-V3 LP'
END
)
WHEN pool_name IS NULL
AND platform IN (
'balancer',
'curve',
'bitflux'
) THEN CONCAT(
COALESCE(c0.token_symbol, SUBSTRING(token0, 1, 5) || '...' || SUBSTRING(token0, 39, 42)),
CASE
WHEN token1 IS NOT NULL THEN '-' || COALESCE(c1.token_symbol, SUBSTRING(token1, 1, 5) || '...' || SUBSTRING(token1, 39, 42))
ELSE ''
END,
CASE
WHEN token2 IS NOT NULL THEN '-' || COALESCE(c2.token_symbol, SUBSTRING(token2, 1, 5) || '...' || SUBSTRING(token2, 39, 42))
ELSE ''
END,
CASE
WHEN token3 IS NOT NULL THEN '-' || COALESCE(c3.token_symbol, SUBSTRING(token3, 1, 5) || '...' || SUBSTRING(token3, 39, 42))
ELSE ''
END,
CASE
WHEN token4 IS NOT NULL THEN '-' || COALESCE(c4.token_symbol, SUBSTRING(token4, 1, 5) || '...' || SUBSTRING(token4, 39, 42))
ELSE ''
END,
CASE
WHEN token5 IS NOT NULL THEN '-' || COALESCE(c5.token_symbol, SUBSTRING(token5, 1, 5) || '...' || SUBSTRING(token5, 39, 42))
ELSE ''
END,
CASE
WHEN token6 IS NOT NULL THEN '-' || COALESCE(c6.token_symbol, SUBSTRING(token6, 1, 5) || '...' || SUBSTRING(token6, 39, 42))
ELSE ''
END,
CASE
WHEN token7 IS NOT NULL THEN '-' || COALESCE(c7.token_symbol, SUBSTRING(token7, 1, 5) || '...' || SUBSTRING(token7, 39, 42))
ELSE ''
END
)
ELSE CONCAT(
COALESCE(
c0.token_symbol,
CONCAT(SUBSTRING(token0, 1, 5), '...', SUBSTRING(token0, 39, 42))
),
'-',
COALESCE(
c1.token_symbol,
CONCAT(SUBSTRING(token1, 1, 5), '...', SUBSTRING(token1, 39, 42))
)
)
END AS pool_name,
fee,
tick_spacing,
token0,
token1,
token2,
token3,
token4,
token5,
token6,
token7,
OBJECT_CONSTRUCT(
'token0',
token0,
'token1',
token1,
'token2',
token2,
'token3',
token3,
'token4',
token4,
'token5',
token5,
'token6',
token6,
'token7',
token7
) AS tokens,
OBJECT_CONSTRUCT(
'token0',
c0.token_symbol,
'token1',
c1.token_symbol,
'token2',
c2.token_symbol,
'token3',
c3.token_symbol,
'token4',
c4.token_symbol,
'token5',
c5.token_symbol,
'token6',
c6.token_symbol,
'token7',
c7.token_symbol
) AS symbols,
OBJECT_CONSTRUCT(
'token0',
c0.token_decimals,
'token1',
c1.token_decimals,
'token2',
c2.token_decimals,
'token3',
c3.token_decimals,
'token4',
c4.token_decimals,
'token5',
c5.token_decimals,
'token6',
c6.token_decimals,
'token7',
c7.token_decimals
) AS decimals,
platform,
version,
_id,
p._inserted_timestamp
FROM
all_pools p
LEFT JOIN contracts c0
ON c0.contract_address = p.token0
LEFT JOIN contracts c1
ON c1.contract_address = p.token1
LEFT JOIN contracts c2
ON c2.contract_address = p.token2
LEFT JOIN contracts c3
ON c3.contract_address = p.token3
LEFT JOIN contracts c4
ON c4.contract_address = p.token4
LEFT JOIN contracts c5
ON c5.contract_address = p.token5
LEFT JOIN contracts c6
ON c6.contract_address = p.token6
LEFT JOIN contracts c7
ON c7.contract_address = p.token7
),
{% if is_incremental() and var(
'HEAL_MODEL'
) %}
heal_model AS (
SELECT
block_number,
block_timestamp,
tx_hash,
t0.contract_address,
pool_address,
CASE
WHEN pool_name IS NOT NULL THEN pool_name
WHEN pool_name IS NULL
AND platform IN (
'corex',
'glyph-v4',
'sushi-v3'
) THEN CONCAT(
COALESCE(
c0.token_symbol,
CONCAT(SUBSTRING(token0, 1, 5), '...', SUBSTRING(token0, 39, 42))
),
'-',
COALESCE(
c1.token_symbol,
CONCAT(SUBSTRING(token1, 1, 5), '...', SUBSTRING(token1, 39, 42))
),
' ',
COALESCE(
fee,
0
),
' ',
COALESCE(
tick_spacing,
0
),
CASE
WHEN platform = 'corex' THEN ' COREX LP'
WHEN platform = 'glyph-v4' THEN ' GLYPH-V4 LP'
WHEN platform = 'sushi-v3' THEN ' SUSHI-V3 LP'
END
)
WHEN pool_name IS NULL
AND platform IN (
'balancer',
'curve',
'bitflux'
) THEN CONCAT(
COALESCE(c0.token_symbol, SUBSTRING(token0, 1, 5) || '...' || SUBSTRING(token0, 39, 42)),
CASE
WHEN token1 IS NOT NULL THEN '-' || COALESCE(c1.token_symbol, SUBSTRING(token1, 1, 5) || '...' || SUBSTRING(token1, 39, 42))
ELSE ''
END,
CASE
WHEN token2 IS NOT NULL THEN '-' || COALESCE(c2.token_symbol, SUBSTRING(token2, 1, 5) || '...' || SUBSTRING(token2, 39, 42))
ELSE ''
END,
CASE
WHEN token3 IS NOT NULL THEN '-' || COALESCE(c3.token_symbol, SUBSTRING(token3, 1, 5) || '...' || SUBSTRING(token3, 39, 42))
ELSE ''
END,
CASE
WHEN token4 IS NOT NULL THEN '-' || COALESCE(c4.token_symbol, SUBSTRING(token4, 1, 5) || '...' || SUBSTRING(token4, 39, 42))
ELSE ''
END,
CASE
WHEN token5 IS NOT NULL THEN '-' || COALESCE(c5.token_symbol, SUBSTRING(token5, 1, 5) || '...' || SUBSTRING(token5, 39, 42))
ELSE ''
END,
CASE
WHEN token6 IS NOT NULL THEN '-' || COALESCE(c6.token_symbol, SUBSTRING(token6, 1, 5) || '...' || SUBSTRING(token6, 39, 42))
ELSE ''
END,
CASE
WHEN token7 IS NOT NULL THEN '-' || COALESCE(c7.token_symbol, SUBSTRING(token7, 1, 5) || '...' || SUBSTRING(token7, 39, 42))
ELSE ''
END
)
ELSE CONCAT(
COALESCE(
c0.token_symbol,
CONCAT(SUBSTRING(token0, 1, 5), '...', SUBSTRING(token0, 39, 42))
),
'-',
COALESCE(
c1.token_symbol,
CONCAT(SUBSTRING(token1, 1, 5), '...', SUBSTRING(token1, 39, 42))
)
)
END AS pool_name_heal,
fee,
tick_spacing,
token0,
token1,
token2,
token3,
token4,
token5,
token6,
token7,
tokens,
OBJECT_CONSTRUCT(
'token0',
c0.token_symbol,
'token1',
c1.token_symbol,
'token2',
c2.token_symbol,
'token3',
c3.token_symbol,
'token4',
c4.token_symbol,
'token5',
c5.token_symbol,
'token6',
c6.token_symbol,
'token7',
c7.token_symbol
) AS symbols_heal,
OBJECT_CONSTRUCT(
'token0',
c0.token_decimals,
'token1',
c1.token_decimals,
'token2',
c2.token_decimals,
'token3',
c3.token_decimals,
'token4',
c4.token_decimals,
'token5',
c5.token_decimals,
'token6',
c6.token_decimals,
'token7',
c7.token_decimals
) AS decimals_heal,
platform,
version,
_id,
t0._inserted_timestamp
FROM
{{ this }}
t0
LEFT JOIN contracts c0
ON c0.contract_address = t0.token0
LEFT JOIN contracts c1
ON c1.contract_address = t0.token1
LEFT JOIN contracts c2
ON c2.contract_address = t0.token2
LEFT JOIN contracts c3
ON c3.contract_address = t0.token3
LEFT JOIN contracts c4
ON c4.contract_address = t0.token4
LEFT JOIN contracts c5
ON c5.contract_address = t0.token5
LEFT JOIN contracts c6
ON c6.contract_address = t0.token6
LEFT JOIN contracts c7
ON c7.contract_address = t0.token7
WHERE
CONCAT(
t0.block_number,
'-',
t0.platform,
'-',
t0.version
) IN (
SELECT
CONCAT(
t1.block_number,
'-',
t1.platform,
'-',
t1.version
)
FROM
{{ this }}
t1
WHERE
t1.decimals :token0 :: INT IS NULL
AND t1._inserted_timestamp < (
SELECT
MAX(
_inserted_timestamp
) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}'
FROM
{{ this }}
)
AND EXISTS (
SELECT
1
FROM
{{ ref('silver__contracts') }} C
WHERE
C._inserted_timestamp > DATEADD('DAY', -14, SYSDATE())
AND C.token_decimals IS NOT NULL
AND C.contract_address = t1.tokens :token0 :: STRING)
GROUP BY
1
)
OR CONCAT(
t0.block_number,
'-',
t0.platform,
'-',
t0.version
) IN (
SELECT
CONCAT(
t2.block_number,
'-',
t2.platform,
'-',
t2.version
)
FROM
{{ this }}
t2
WHERE
t2.decimals :token1 :: INT IS NULL
AND t2._inserted_timestamp < (
SELECT
MAX(
_inserted_timestamp
) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}'
FROM
{{ this }}
)
AND EXISTS (
SELECT
1
FROM
{{ ref('silver__contracts') }} C
WHERE
C._inserted_timestamp > DATEADD('DAY', -14, SYSDATE())
AND C.token_decimals IS NOT NULL
AND C.contract_address = t2.tokens :token1 :: STRING)
GROUP BY
1
)
OR CONCAT(
t0.block_number,
'-',
t0.platform,
'-',
t0.version
) IN (
SELECT
CONCAT(
t3.block_number,
'-',
t3.platform,
'-',
t3.version
)
FROM
{{ this }}
t3
WHERE
t3.decimals :token2 :: INT IS NULL
AND t3._inserted_timestamp < (
SELECT
MAX(
_inserted_timestamp
) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}'
FROM
{{ this }}
)
AND EXISTS (
SELECT
1
FROM
{{ ref('silver__contracts') }} C
WHERE
C._inserted_timestamp > DATEADD('DAY', -14, SYSDATE())
AND C.token_decimals IS NOT NULL
AND C.contract_address = t3.tokens :token2 :: STRING)
GROUP BY
1
)
OR CONCAT(
t0.block_number,
'-',
t0.platform,
'-',
t0.version
) IN (
SELECT
CONCAT(
t4.block_number,
'-',
t4.platform,
'-',
t4.version
)
FROM
{{ this }}
t4
WHERE
t4.decimals :token3 :: INT IS NULL
AND t4._inserted_timestamp < (
SELECT
MAX(
_inserted_timestamp
) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}'
FROM
{{ this }}
)
AND EXISTS (
SELECT
1
FROM
{{ ref('silver__contracts') }} C
WHERE
C._inserted_timestamp > DATEADD('DAY', -14, SYSDATE())
AND C.token_decimals IS NOT NULL
AND C.contract_address = t4.tokens :token3 :: STRING)
GROUP BY
1
)
OR CONCAT(
t0.block_number,
'-',
t0.platform,
'-',
t0.version
) IN (
SELECT
CONCAT(
t5.block_number,
'-',
t5.platform,
'-',
t5.version
)
FROM
{{ this }}
t5
WHERE
t5.decimals :token4 :: INT IS NULL
AND t5._inserted_timestamp < (
SELECT
MAX(
_inserted_timestamp
) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}'
FROM
{{ this }}
)
AND EXISTS (
SELECT
1
FROM
{{ ref('silver__contracts') }} C
WHERE
C._inserted_timestamp > DATEADD('DAY', -14, SYSDATE())
AND C.token_decimals IS NOT NULL
AND C.contract_address = t5.tokens :token4 :: STRING)
GROUP BY
1
)
OR CONCAT(
t0.block_number,
'-',
t0.platform,
'-',
t0.version
) IN (
SELECT
CONCAT(
t6.block_number,
'-',
t6.platform,
'-',
t6.version
)
FROM
{{ this }}
t6
WHERE
t6.decimals :token5 :: INT IS NULL
AND t6._inserted_timestamp < (
SELECT
MAX(
_inserted_timestamp
) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}'
FROM
{{ this }}
)
AND EXISTS (
SELECT
1
FROM
{{ ref('silver__contracts') }} C
WHERE
C._inserted_timestamp > DATEADD('DAY', -14, SYSDATE())
AND C.token_decimals IS NOT NULL
AND C.contract_address = t6.tokens :token5 :: STRING)
GROUP BY
1
)
OR CONCAT(
t0.block_number,
'-',
t0.platform,
'-',
t0.version
) IN (
SELECT
CONCAT(
t7.block_number,
'-',
t7.platform,
'-',
t7.version
)
FROM
{{ this }}
t7
WHERE
t7.decimals :token6 :: INT IS NULL
AND t7._inserted_timestamp < (
SELECT
MAX(
_inserted_timestamp
) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}'
FROM
{{ this }}
)
AND EXISTS (
SELECT
1
FROM
{{ ref('silver__contracts') }} C
WHERE
C._inserted_timestamp > DATEADD('DAY', -14, SYSDATE())
AND C.token_decimals IS NOT NULL
AND C.contract_address = t7.tokens :token6 :: STRING)
GROUP BY
1
)
OR CONCAT(
t0.block_number,
'-',
t0.platform,
'-',
t0.version
) IN (
SELECT
CONCAT(
t8.block_number,
'-',
t8.platform,
'-',
t8.version
)
FROM
{{ this }}
t8
WHERE
t8.decimals :token7 :: INT IS NULL
AND t8._inserted_timestamp < (
SELECT
MAX(
_inserted_timestamp
) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}'
FROM
{{ this }}
)
AND EXISTS (
SELECT
1
FROM
{{ ref('silver__contracts') }} C
WHERE
C._inserted_timestamp > DATEADD('DAY', -14, SYSDATE())
AND C.token_decimals IS NOT NULL
AND C.contract_address = t8.tokens :token7 :: STRING)
GROUP BY
1
)
),
{% endif %}
FINAL AS (
SELECT
*
FROM
complete_lps
{% if is_incremental() and var(
'HEAL_MODEL'
) %}
UNION ALL
SELECT
block_number,
block_timestamp,
tx_hash,
contract_address,
pool_address,
pool_name_heal AS pool_name,
fee,
tick_spacing,
token0,
token1,
token2,
token3,
token4,
token5,
token6,
token7,
tokens,
symbols_heal AS symbols,
decimals_heal AS decimals,
platform,
version,
_id,
_inserted_timestamp
FROM
heal_model
{% endif %}
)
SELECT
block_number,
block_timestamp,
tx_hash,
platform,
version,
contract_address,
pool_address,
pool_name,
tokens,
symbols,
decimals,
fee,
tick_spacing,
token0,
token1,
token2,
token3,
token4,
token5,
token6,
token7,
_id,
_inserted_timestamp,
{{ dbt_utils.generate_surrogate_key(
['pool_address']
) }} AS complete_dex_liquidity_pools_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
FINAL

View File

@ -0,0 +1,635 @@
-- depends_on: {{ ref('silver__complete_token_prices') }}
{{ config(
materialized = 'incremental',
incremental_strategy = 'delete+insert',
unique_key = ['block_number','platform','version'],
cluster_by = ['block_timestamp::DATE','platform'],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(tx_hash, origin_function_signature, origin_from_address, origin_to_address, contract_address, pool_name, event_name, sender, tx_to, token_in, token_out, symbol_in, symbol_out), SUBSTRING(origin_function_signature, pool_name, event_name, sender, tx_to, token_in, token_out, symbol_in, symbol_out)",
tags = ['silver_dex','defi','dex','curated','heal']
) }}
WITH
bitflux AS (
SELECT
block_number,
block_timestamp,
tx_hash,
origin_function_signature,
origin_from_address,
origin_to_address,
pool_address AS contract_address,
event_name,
amount_in_unadj,
amount_out_unadj,
token_in,
token_out,
sender,
tx_to,
event_index,
'bitflux' AS platform,
'v1' AS version,
_log_id,
_inserted_timestamp
FROM
{{ ref('silver_dex__bitflux_swaps') }}
{% if is_incremental() and 'bitflux' not in var('HEAL_MODELS') %}
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}'
FROM
{{ this }}
)
{% endif %}
),
corex AS (
SELECT
block_number,
block_timestamp,
tx_hash,
origin_function_signature,
origin_from_address,
origin_to_address,
pool_address AS contract_address,
'Swap' AS event_name,
CASE
WHEN amount0_unadj > 0 THEN ABS(amount0_unadj)
ELSE ABS(amount1_unadj)
END AS amount_in_unadj,
CASE
WHEN amount0_unadj < 0 THEN ABS(amount0_unadj)
ELSE ABS(amount1_unadj)
END AS amount_out_unadj,
CASE
WHEN amount0_unadj > 0 THEN token0
ELSE token1
END AS token_in,
CASE
WHEN amount0_unadj < 0 THEN token0
ELSE token1
END AS token_out,
sender,
recipient AS tx_to,
event_index,
'corex' AS platform,
'v1' AS version,
_log_id,
_inserted_timestamp
FROM
{{ ref('silver_dex__corex_swaps') }}
{% if is_incremental() and 'corex' not in var('HEAL_MODELS') %}
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}'
FROM
{{ this }}
)
{% endif %}
),
glyph_v4 AS (
SELECT
block_number,
block_timestamp,
tx_hash,
origin_function_signature,
origin_from_address,
origin_to_address,
pool_address AS contract_address,
'Swap' AS event_name,
amount_in_unadj,
amount_out_unadj,
token_in,
token_out,
sender,
recipient AS tx_to,
event_index,
'glyph-v4' AS platform,
'v4' AS version,
_log_id,
modified_timestamp AS _inserted_timestamp
FROM
{{ ref('silver_dex__glyph_v4_swaps') }}
{% if is_incremental() and 'glyph_v4' not in var('HEAL_MODELS') %}
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}'
FROM
{{ this }}
)
{% endif %}
),
sushi_v3 AS (
SELECT
block_number,
block_timestamp,
tx_hash,
origin_function_signature,
origin_from_address,
origin_to_address,
pool_address AS contract_address,
'Swap' AS event_name,
amount_in_unadj,
amount_out_unadj,
token_in,
token_out,
sender,
recipient AS tx_to,
event_index,
'sushi-v3' AS platform,
'v3' AS version,
_log_id,
modified_timestamp AS _inserted_timestamp
FROM
{{ ref('silver_dex__sushi_v3_swaps') }}
{% if is_incremental() and 'sushi_v3' not in var('HEAL_MODELS') %}
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}'
FROM
{{ this }}
)
{% endif %}
),
all_dex AS (
SELECT
*
FROM
bitflux
UNION ALL
SELECT
*
FROM
corex
UNION ALL
SELECT
*
FROM
glyph_v4
UNION ALL
SELECT
*
FROM
sushi_v3
),
complete_dex_swaps AS (
SELECT
s.block_number,
s.block_timestamp,
s.tx_hash,
origin_function_signature,
origin_from_address,
origin_to_address,
s.contract_address,
event_name,
token_in,
c1.token_decimals AS decimals_in,
c1.token_symbol AS symbol_in,
amount_in_unadj,
CASE
WHEN decimals_in IS NULL THEN amount_in_unadj
ELSE (amount_in_unadj / pow(10, decimals_in))
END AS amount_in,
CASE
WHEN decimals_in IS NOT NULL THEN amount_in * p1.price
ELSE NULL
END AS amount_in_usd,
token_out,
c2.token_decimals AS decimals_out,
c2.token_symbol AS symbol_out,
amount_out_unadj,
CASE
WHEN decimals_out IS NULL THEN amount_out_unadj
ELSE (amount_out_unadj / pow(10, decimals_out))
END AS amount_out,
CASE
WHEN decimals_out IS NOT NULL THEN amount_out * p2.price
ELSE NULL
END AS amount_out_usd,
CASE
WHEN lp.pool_name IS NULL THEN CONCAT(
LEAST(
COALESCE(
symbol_in,
CONCAT(SUBSTRING(token_in, 1, 5), '...', SUBSTRING(token_in, 39, 42))
),
COALESCE(
symbol_out,
CONCAT(SUBSTRING(token_out, 1, 5), '...', SUBSTRING(token_out, 39, 42))
)
),
'-',
GREATEST(
COALESCE(
symbol_in,
CONCAT(SUBSTRING(token_in, 1, 5), '...', SUBSTRING(token_in, 39, 42))
),
COALESCE(
symbol_out,
CONCAT(SUBSTRING(token_out, 1, 5), '...', SUBSTRING(token_out, 39, 42))
)
)
)
ELSE lp.pool_name
END AS pool_name,
sender,
tx_to,
event_index,
s.platform,
s.version,
s._log_id,
s._inserted_timestamp
FROM
all_dex s
LEFT JOIN {{ ref('silver__contracts') }}
c1
ON s.token_in = c1.contract_address
LEFT JOIN {{ ref('silver__contracts') }}
c2
ON s.token_out = c2.contract_address
LEFT JOIN {{ ref('price__ez_prices_hourly') }}
p1
ON s.token_in = p1.token_address
AND DATE_TRUNC(
'hour',
block_timestamp
) = p1.hour
LEFT JOIN {{ ref('price__ez_prices_hourly') }}
p2
ON s.token_out = p2.token_address
AND DATE_TRUNC(
'hour',
block_timestamp
) = p2.hour
LEFT JOIN {{ ref('silver_dex__complete_dex_liquidity_pools') }}
lp
ON s.contract_address = lp.pool_address
),
{% if is_incremental() and var(
'HEAL_MODEL'
) %}
heal_model AS (
SELECT
t0.block_number,
t0.block_timestamp,
t0.tx_hash,
origin_function_signature,
origin_from_address,
origin_to_address,
t0.contract_address,
event_name,
token_in,
c1.token_decimals AS decimals_in,
c1.token_symbol AS symbol_in,
amount_in_unadj,
CASE
WHEN c1.token_decimals IS NULL THEN amount_in_unadj
ELSE (amount_in_unadj / pow(10, c1.token_decimals))
END AS amount_in_heal,
CASE
WHEN c1.token_decimals IS NOT NULL THEN amount_in_heal * p1.price
ELSE NULL
END AS amount_in_usd_heal,
token_out,
c2.token_decimals AS decimals_out,
c2.token_symbol AS symbol_out,
amount_out_unadj,
CASE
WHEN c2.token_decimals IS NULL THEN amount_out_unadj
ELSE (amount_out_unadj / pow(10, c2.token_decimals))
END AS amount_out_heal,
CASE
WHEN c2.token_decimals IS NOT NULL THEN amount_out_heal * p2.price
ELSE NULL
END AS amount_out_usd_heal,
CASE
WHEN lp.pool_name IS NULL THEN CONCAT(
LEAST(
COALESCE(
c1.token_symbol,
CONCAT(SUBSTRING(token_in, 1, 5), '...', SUBSTRING(token_in, 39, 42))
),
COALESCE(
c2.token_symbol,
CONCAT(SUBSTRING(token_out, 1, 5), '...', SUBSTRING(token_out, 39, 42))
)
),
'-',
GREATEST(
COALESCE(
c1.token_symbol,
CONCAT(SUBSTRING(token_in, 1, 5), '...', SUBSTRING(token_in, 39, 42))
),
COALESCE(
c2.token_symbol,
CONCAT(SUBSTRING(token_out, 1, 5), '...', SUBSTRING(token_out, 39, 42))
)
)
)
ELSE lp.pool_name
END AS pool_name_heal,
sender,
tx_to,
event_index,
t0.platform,
t0.version,
t0._log_id,
t0._inserted_timestamp
FROM
{{ this }}
t0
LEFT JOIN {{ ref('silver__contracts') }}
c1
ON t0.token_in = c1.contract_address
LEFT JOIN {{ ref('silver__contracts') }}
c2
ON t0.token_out = c2.contract_address
LEFT JOIN {{ ref('price__ez_prices_hourly') }}
p1
ON t0.token_in = p1.token_address
AND DATE_TRUNC(
'hour',
block_timestamp
) = p1.hour
LEFT JOIN {{ ref('price__ez_prices_hourly') }}
p2
ON t0.token_out = p2.token_address
AND DATE_TRUNC(
'hour',
block_timestamp
) = p2.hour
LEFT JOIN {{ ref('silver_dex__complete_dex_liquidity_pools') }}
lp
ON t0.contract_address = lp.pool_address
WHERE
CONCAT(
t0.block_number,
'-',
t0.platform,
'-',
t0.version
) IN (
SELECT
CONCAT(
t1.block_number,
'-',
t1.platform,
'-',
t1.version
)
FROM
{{ this }}
t1
WHERE
t1.decimals_in IS NULL
AND t1._inserted_timestamp < (
SELECT
MAX(
_inserted_timestamp
) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}'
FROM
{{ this }}
)
AND EXISTS (
SELECT
1
FROM
{{ ref('silver__contracts') }} C
WHERE
C._inserted_timestamp > DATEADD('DAY', -14, SYSDATE())
AND C.token_decimals IS NOT NULL
AND C.contract_address = t1.token_in)
GROUP BY
1
)
OR CONCAT(
t0.block_number,
'-',
t0.platform,
'-',
t0.version
) IN (
SELECT
CONCAT(
t2.block_number,
'-',
t2.platform,
'-',
t2.version
)
FROM
{{ this }}
t2
WHERE
t2.decimals_out IS NULL
AND t2._inserted_timestamp < (
SELECT
MAX(
_inserted_timestamp
) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}'
FROM
{{ this }}
)
AND EXISTS (
SELECT
1
FROM
{{ ref('silver__contracts') }} C
WHERE
C._inserted_timestamp > DATEADD('DAY', -14, SYSDATE())
AND C.token_decimals IS NOT NULL
AND C.contract_address = t2.token_out)
GROUP BY
1
)
OR CONCAT(
t0.block_number,
'-',
t0.platform,
'-',
t0.version
) IN (
SELECT
CONCAT(
t3.block_number,
'-',
t3.platform,
'-',
t3.version
)
FROM
{{ this }}
t3
WHERE
t3.amount_in_usd IS NULL
AND t3._inserted_timestamp < (
SELECT
MAX(
_inserted_timestamp
) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}'
FROM
{{ this }}
)
AND EXISTS (
SELECT
1
FROM
{{ ref('silver__complete_token_prices') }}
p
WHERE
p._inserted_timestamp > DATEADD('DAY', -14, SYSDATE())
AND p.price IS NOT NULL
AND p.token_address = t3.token_in
AND p.hour = DATE_TRUNC(
'hour',
t3.block_timestamp
)
)
GROUP BY
1
)
OR CONCAT(
t0.block_number,
'-',
t0.platform,
'-',
t0.version
) IN (
SELECT
CONCAT(
t4.block_number,
'-',
t4.platform,
'-',
t4.version
)
FROM
{{ this }}
t4
WHERE
t4.amount_out_usd IS NULL
AND t4._inserted_timestamp < (
SELECT
MAX(
_inserted_timestamp
) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}'
FROM
{{ this }}
)
AND EXISTS (
SELECT
1
FROM
{{ ref('silver__complete_token_prices') }}
p
WHERE
p._inserted_timestamp > DATEADD('DAY', -14, SYSDATE())
AND p.price IS NOT NULL
AND p.token_address = t4.token_out
AND p.hour = DATE_TRUNC(
'hour',
t4.block_timestamp
)
)
GROUP BY
1
)
),
{% endif %}
FINAL AS (
SELECT
*
FROM
complete_dex_swaps
{% if is_incremental() and var(
'HEAL_MODEL'
) %}
UNION ALL
SELECT
block_number,
block_timestamp,
tx_hash,
origin_function_signature,
origin_from_address,
origin_to_address,
contract_address,
event_name,
token_in,
decimals_in,
symbol_in,
amount_in_unadj,
amount_in_heal AS amount_in,
amount_in_usd_heal AS amount_in_usd,
token_out,
decimals_out,
symbol_out,
amount_out_unadj,
amount_out_heal AS amount_out,
amount_out_usd_heal AS amount_out_usd,
pool_name_heal AS pool_name,
sender,
tx_to,
event_index,
platform,
version,
_log_id,
_inserted_timestamp
FROM
heal_model
{% endif %}
)
SELECT
block_number,
block_timestamp,
tx_hash,
origin_function_signature,
origin_from_address,
origin_to_address,
contract_address,
pool_name,
event_name,
amount_in_unadj,
amount_in,
amount_in_usd,
amount_out_unadj,
amount_out,
amount_out_usd,
sender,
tx_to,
event_index,
platform,
version,
token_in,
token_out,
symbol_in,
symbol_out,
decimals_in,
decimals_out,
_log_id,
_inserted_timestamp,
{{ dbt_utils.generate_surrogate_key(
['tx_hash','event_index']
) }} AS complete_dex_swaps_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
FINAL qualify (ROW_NUMBER() over (PARTITION BY _log_id
ORDER BY
_inserted_timestamp DESC)) = 1

View File

@ -0,0 +1,109 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'delete+insert',
unique_key = 'pool_address',
cluster_by = ['block_timestamp::DATE'],
tags = ['silver_dex','defi','dex','curated']
) }}
WITH created_pools AS (
SELECT
block_number,
block_timestamp,
tx_hash,
event_index,
contract_address,
regexp_substr_all(SUBSTR(DATA, 3, len(DATA)), '.{64}') AS segmented_data,
LOWER(CONCAT('0x', SUBSTR(topics [1] :: STRING, 27, 40))) AS token0,
LOWER(CONCAT('0x', SUBSTR(topics [2] :: STRING, 27, 40))) AS token1,
utils.udf_hex_to_int(
's2c',
topics [3] :: STRING
) :: INTEGER AS fee,
utils.udf_hex_to_int(
's2c',
segmented_data [0] :: STRING
) :: INTEGER AS tick_spacing,
CONCAT('0x', SUBSTR(segmented_data [1] :: STRING, 25, 40)) AS pool_address,
CONCAT(
tx_hash,
'-',
event_index
) AS _log_id,
modified_timestamp
FROM
{{ ref('core__fact_event_logs') }}
WHERE
topic_0 = '0x783cca1c0412dd0d695e784568c96da2e9c22ff989357a2e8b1d9b2b4e6b7118'
AND contract_address = '0xc35dadb65012ec5796536bd9864ed8773abc74c4' --Sushi/UniswapV3Factory
AND tx_succeeded
{% if is_incremental() %}
AND modified_timestamp >= (
SELECT
MAX(modified_timestamp) - INTERVAL '12 hours'
FROM
{{ this }}
)
AND modified_timestamp >= SYSDATE() - INTERVAL '7 day'
{% endif %}
),
initial_info AS (
SELECT
contract_address,
regexp_substr_all(SUBSTR(DATA, 3, len(DATA)), '.{64}') AS segmented_data,
utils.udf_hex_to_int('s2c', CONCAT('0x', segmented_data [0] :: STRING)) :: FLOAT AS init_sqrtPriceX96,
utils.udf_hex_to_int('s2c', CONCAT('0x', segmented_data [1] :: STRING)) :: FLOAT AS init_tick,
pow(
1.0001,
init_tick
) AS init_price_1_0_unadj,
CONCAT(
tx_hash,
'-',
event_index
) AS _log_id,
modified_timestamp
FROM
{{ ref('core__fact_event_logs') }}
WHERE
topic_0 = '0x98636036cb66a9c19a37435efc1e90142190214e8abeb821bdba3f2990dd4c95'
AND tx_succeeded
{% if is_incremental() %}
AND modified_timestamp >= (
SELECT
MAX(modified_timestamp) - INTERVAL '12 hours'
FROM
{{ this }}
)
AND modified_timestamp >= SYSDATE() - INTERVAL '7 day'
{% endif %}
)
SELECT
block_number,
block_timestamp,
tx_hash,
event_index,
p.contract_address,
token0,
token1,
fee,
(
fee / 10000
) :: FLOAT AS fee_percent,
tick_spacing,
pool_address,
COALESCE(
init_tick,
0
) AS init_tick,
p._log_id,
p.modified_timestamp
FROM
created_pools p
LEFT JOIN initial_info i
ON p.pool_address = i.contract_address qualify(ROW_NUMBER() over(PARTITION BY pool_address
ORDER BY
p.modified_timestamp DESC)) = 1

View File

@ -0,0 +1,23 @@
version: 2
models:
- name: silver_dex__sushi_v3_pools
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- POOL_ADDRESS
columns:
- name: BLOCK_NUMBER
tests:
- not_null
- name: POOL_ADDRESS
tests:
- not_null
- name: TOKEN0
tests:
- not_null
- name: TOKEN1
tests:
- not_null

View File

@ -0,0 +1,117 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'delete+insert',
unique_key = 'block_number',
cluster_by = ['block_timestamp::DATE'],
tags = ['silver_dex','defi','dex','curated']
) }}
WITH swaps_base AS (
SELECT
l.block_number,
l.block_timestamp,
l.tx_hash,
l.event_index,
l.origin_function_signature,
l.origin_from_address,
l.origin_to_address,
l.contract_address,
regexp_substr_all(SUBSTR(DATA, 3, len(DATA)), '.{64}') AS segmented_data,
CONCAT('0x', SUBSTR(topic_1, 27, 40)) AS sender,
CONCAT('0x', SUBSTR(topic_2, 27, 40)) AS recipient,
utils.udf_hex_to_int(
's2c',
segmented_data [0] :: STRING
) :: FLOAT AS amount0_unadj,
utils.udf_hex_to_int(
's2c',
segmented_data [1] :: STRING
) :: FLOAT AS amount1_unadj,
utils.udf_hex_to_int(
's2c',
segmented_data [2] :: STRING
) :: FLOAT AS sqrtPriceX96,
utils.udf_hex_to_int(
's2c',
segmented_data [3] :: STRING
) :: FLOAT AS liquidity,
utils.udf_hex_to_int(
's2c',
segmented_data [4] :: STRING
) :: FLOAT AS tick,
token0,
token1,
pool_address,
tick_spacing,
fee,
CONCAT(
l.tx_hash,
'-',
l.event_index
) AS _log_id,
l.modified_timestamp
FROM
{{ ref('core__fact_event_logs') }}
l
INNER JOIN {{ ref('silver_dex__sushi_v3_pools') }}
p
ON p.pool_address = l.contract_address
WHERE
l.block_timestamp :: DATE >= '2023-04-01'
AND topic_0 = '0xc42079f94a6350d7e6235f29174924f928cc2ac818eb64fed8004e115fbcca67'
AND tx_succeeded
{% if is_incremental() %}
AND l.modified_timestamp >= (
SELECT
MAX(modified_timestamp) - INTERVAL '12 hours'
FROM
{{ this }}
)
AND l.modified_timestamp >= SYSDATE() - INTERVAL '7 day'
{% endif %}
)
SELECT
block_number,
block_timestamp,
tx_hash,
event_index,
origin_function_signature,
origin_from_address,
origin_to_address,
contract_address,
pool_address,
recipient,
recipient AS tx_to,
sender,
fee,
tick,
tick_spacing,
liquidity,
token0,
token1,
amount0_unadj,
amount1_unadj,
CASE
WHEN amount0_unadj > 0 THEN ABS(amount0_unadj)
ELSE ABS(amount1_unadj)
END AS amount_in_unadj,
CASE
WHEN amount0_unadj < 0 THEN ABS(amount0_unadj)
ELSE ABS(amount1_unadj)
END AS amount_out_unadj,
CASE
WHEN amount0_unadj > 0 THEN token0
ELSE token1
END AS token_in,
CASE
WHEN amount0_unadj < 0 THEN token0
ELSE token1
END AS token_out,
_log_id,
modified_timestamp
FROM
swaps_base qualify(ROW_NUMBER() over(PARTITION BY _log_id
ORDER BY
modified_timestamp DESC)) = 1

View File

@ -0,0 +1,43 @@
version: 2
models:
- name: silver_dex__sushi_v3_swaps
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- _LOG_ID
columns:
- name: BLOCK_NUMBER
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: BLOCK_TIMESTAMP
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_LTZ
- TIMESTAMP_NTZ
- name: POOL_ADDRESS
tests:
- not_null
- name: RECIPIENT
tests:
- not_null
- name: SENDER
tests:
- not_null
- name: TOKEN0
tests:
- not_null
- name: TOKEN1
tests:
- not_null
- name: AMOUNT0_UNADJ
tests:
- not_null
- name: AMOUNT1_UNADJ
tests:
- not_null