AN-5301/dex-tables (#75)

* blaster ring sushi thruster complete and gold dex models

* pool reads for Ring

* heal logic

* wrapped tokens model, remove reads model

* bladeswap and standardized all models/ymls

* ambient swaps

* edge case for sender on ambient

* tests

* chain ref
This commit is contained in:
drethereum 2024-10-10 08:59:41 -06:00 committed by GitHub
parent ab8ac0a2f9
commit b46c0040fd
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
55 changed files with 5399 additions and 2 deletions

View File

@ -1,5 +1,5 @@
{% docs eth_dex_lp_table_doc %}
This table contains details on decentralized exchange (DEX) liquidity pools (LP) on the base blockchain, including the tokens, symbols and decimals within each pool alongside the following protocols: SUSHI, UNISWAP, BALANCER, SWAPBASED, BASESWAP, DACKIE, WOOFI, AERODROME, CURVE, and MAVERICK.
This table contains details on decentralized exchange (DEX) liquidity pools (LP) on the base blockchain, including the tokens, symbols and decimals within each pool alongside the following protocols: BLADESWAP, BLASTERSWAP, RING, SUSHI, and THRUSTER.
{% enddocs %}

View File

@ -1,6 +1,6 @@
{% docs eth_ez_dex_swaps_table_doc %}
This table currently contains swap events from the ```fact_event_logs``` table for SUSHI, UNISWAP, BALANCER, SWAPBASED, BASESWAP, DACKIE, WOOFI, AERODROME, CURVE, MAVERICK, and VOODOO. along with other helpful columns including an amount USD where possible. Other dexes coming soon!
This table currently contains swap events from the ```fact_event_logs``` table for AMBIENT, BLADESWAP, BLASTERSWAP, RING, SUSHI, and THRUSTER. along with other helpful columns including an amount USD where possible. Other dexes coming soon!
Note: A rule has been put in place to null out the amount_USD if that number is too divergent between amount_in_USD and amount_out_usd. This can happen for swaps of less liquid tokens during very high fluctuation of price.
{% enddocs %}

View File

@ -41,6 +41,10 @@ There is more information on how to use dbt docs in the last section of this doc
- [ez_asset_metadata](https://flipsidecrypto.github.io/blast-models/#!/model/model.blast_models.price__ez_asset_metadata)
- [ez_prices_hourly](https://flipsidecrypto.github.io/blast-models/#!/model/model.blast_models.price__ez_prices_hourly)
### DeFi Tables (blast.defi)
- [ez_dex_swaps](https://flipsidecrypto.github.io/blast-models/#!/model/model.blast_models.defi__ez_dex_swaps)
- [dim_dex_liquidity_pools](https://flipsidecrypto.github.io/blast-models/#!/model/model.blast_models.defi__dim_dex_liquidity_pools)
### Stats Tables (blast.stats)
- [ez_core_metrics_hourly](https://flipsidecrypto.github.io/blast-models/#!/model/model.blast_models.stats__ez_core_metrics_hourly)

View File

@ -0,0 +1,24 @@
{{ config(
materialized = 'view',
persist_docs ={ "relation": true,
"columns": true },
meta ={ 'database_tags':{ 'table':{ 'PROTOCOL': 'BLADESWAP, BLASTERSWAP, RING, SUSHI, THRUSTER',
'PURPOSE': 'DEX, LIQUIDITY, POOLS, LP, SWAPS',} } }
) }}
SELECT
block_number AS creation_block,
block_timestamp AS creation_time,
tx_hash AS creation_tx,
platform,
contract_address AS factory_address,
pool_address,
pool_name,
tokens,
symbols,
decimals,
complete_dex_liquidity_pools_id AS dim_dex_liquidity_pools_id,
inserted_timestamp,
modified_timestamp
FROM
{{ ref('silver_dex__complete_dex_liquidity_pools') }}

View File

@ -0,0 +1,32 @@
version: 2
models:
- name: defi__dim_dex_liquidity_pools
description: '{{ doc("eth_dex_lp_table_doc") }}'
columns:
- name: CREATION_BLOCK
description: '{{ doc("eth_dex_creation_block") }}'
- name: CREATION_TIME
description: '{{ doc("eth_dex_creation_time") }}'
- name: CREATION_TX
description: '{{ doc("eth_dex_creation_tx") }}'
- name: FACTORY_ADDRESS
description: '{{ doc("eth_dex_factory_address") }}'
- name: PLATFORM
description: '{{ doc("eth_dex_platform") }}'
- name: POOL_ADDRESS
description: '{{ doc("eth_dex_pool_address") }}'
- name: POOL_NAME
description: '{{ doc("eth_dex_pool_name") }}'
- name: TOKENS
description: '{{ doc("eth_dex_lp_tokens") }}'
- name: SYMBOLS
description: '{{ doc("eth_dex_lp_symbols") }}'
- name: DECIMALS
description: '{{ doc("eth_dex_lp_decimals") }}'
- name: DIM_DEX_LIQUIDITY_POOLS_ID
description: '{{ doc("pk") }}'
- name: INSERTED_TIMESTAMP
description: '{{ doc("inserted_timestamp") }}'
- name: MODIFIED_TIMESTAMP
description: '{{ doc("modified_timestamp") }}'

View File

@ -0,0 +1,61 @@
{{ config(
materialized = 'view',
persist_docs ={ "relation": true,
"columns": true },
meta ={ 'database_tags':{ 'table':{ 'PROTOCOL': 'AMBIENT, BLADESWAP, BLASTERSWAP, RING, SUSHI, THRUSTER',
'PURPOSE': 'DEX, SWAPS',
} } }
) }}
SELECT
block_number,
block_timestamp,
tx_hash,
origin_function_signature,
origin_from_address,
origin_to_address,
contract_address,
pool_name,
event_name,
amount_in_unadj,
amount_in,
ROUND(
CASE
WHEN token_in <> '0x4300000000000000000000000000000000000004'
AND (
amount_out_usd IS NULL
OR ABS((amount_in_usd - amount_out_usd) / NULLIF(amount_out_usd, 0)) > 0.75
OR ABS((amount_in_usd - amount_out_usd) / NULLIF(amount_in_usd, 0)) > 0.75
) THEN NULL
ELSE amount_in_usd
END,
2
) AS amount_in_usd,
amount_out_unadj,
amount_out,
ROUND(
CASE
WHEN token_out <> '0x4300000000000000000000000000000000000004'
AND (
amount_in_usd IS NULL
OR ABS((amount_out_usd - amount_in_usd) / NULLIF(amount_in_usd, 0)) > 0.75
OR ABS((amount_out_usd - amount_in_usd) / NULLIF(amount_out_usd, 0)) > 0.75
) THEN NULL
ELSE amount_out_usd
END,
2
) AS amount_out_usd,
sender,
tx_to,
event_index,
platform,
token_in,
token_out,
symbol_in,
symbol_out,
_log_id,
complete_dex_swaps_id AS ez_dex_swaps_id,
inserted_timestamp,
modified_timestamp
FROM
{{ ref('silver_dex__complete_dex_swaps') }}

View File

@ -0,0 +1,60 @@
version: 2
models:
- name: defi__ez_dex_swaps
description: '{{ doc("eth_ez_dex_swaps_table_doc") }}'
columns:
- name: BLOCK_NUMBER
description: '{{ doc("blast_block_number") }}'
- name: BLOCK_TIMESTAMP
description: '{{ doc("blast_block_timestamp") }}'
- name: TX_HASH
description: '{{ doc("blast_logs_tx_hash") }}'
- name: CONTRACT_ADDRESS
description: '{{ doc("blast_logs_contract_address") }}'
- name: EVENT_NAME
description: '{{ doc("blast_event_name") }}'
- name: AMOUNT_IN
description: '{{ doc("eth_dex_swaps_amount_in") }}'
- name: AMOUNT_OUT
description: '{{ doc("eth_dex_swaps_amount_out") }}'
- name: AMOUNT_IN_USD
description: '{{ doc("eth_dex_swaps_amount_in_usd") }}'
- name: AMOUNT_OUT_USD
description: '{{ doc("eth_dex_swaps_amount_out_usd") }}'
- name: TOKEN_IN
description: '{{ doc("eth_dex_swaps_token_in") }}'
- name: TOKEN_OUT
description: '{{ doc("eth_dex_swaps_token_out") }}'
- name: SYMBOL_IN
description: '{{ doc("eth_dex_swaps_symbol_in") }}'
- name: SYMBOL_OUT
description: '{{ doc("eth_dex_swaps_symbol_out") }}'
- name: SENDER
description: '{{ doc("eth_dex_swaps_sender") }}'
- name: TX_TO
description: '{{ doc("eth_dex_swaps_tx_to") }}'
- name: PLATFORM
description: '{{ doc("eth_dex_platform") }}'
- name: EVENT_INDEX
description: '{{ doc("blast_event_index") }}'
- name: _LOG_ID
description: '{{ doc("internal_column") }}'
- name: ORIGIN_FUNCTION_SIGNATURE
description: '{{ doc("blast_tx_origin_sig") }}'
- name: ORIGIN_FROM_ADDRESS
description: '{{ doc("blast_origin_from") }}'
- name: ORIGIN_TO_ADDRESS
description: '{{ doc("blast_origin_to") }}'
- name: AMOUNT_IN_UNADJ
description: '{{ doc("eth_dex_swaps_amount_in_unadj") }}'
- name: AMOUNT_OUT_UNADJ
description: '{{ doc("eth_dex_swaps_amount_out_unadj") }}'
- name: EZ_DEX_SWAPS_ID
description: '{{ doc("pk") }}'
- name: INSERTED_TIMESTAMP
description: '{{ doc("inserted_timestamp") }}'
- name: MODIFIED_TIMESTAMP
description: '{{ doc("modified_timestamp") }}'

View File

@ -0,0 +1,115 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'delete+insert',
unique_key = 'block_number',
cluster_by = ['block_timestamp::DATE'],
tags = ['curated','reorg']
) }}
WITH swaps_base AS (
SELECT
block_number,
block_timestamp,
tx_hash,
event_index,
origin_function_signature,
origin_from_address,
origin_to_address,
contract_address,
regexp_substr_all(SUBSTR(DATA, 3, len(DATA)), '.{64}') AS segmented_data,
CASE
WHEN topics [1] :: STRING = '0x0000000000000000000000000000000000000000000000000000000000000000' THEN '0x4300000000000000000000000000000000000004'
ELSE CONCAT('0x', SUBSTR(topics [1] :: STRING, 27, 40))
END AS base_token,
CASE
WHEN topics [2] :: STRING = '0x0000000000000000000000000000000000000000000000000000000000000000' THEN '0x4300000000000000000000000000000000000004'
ELSE CONCAT('0x', SUBSTR(topics [2] :: STRING, 27, 40))
END AS quote_token,
TRY_TO_BOOLEAN(RIGHT(segmented_data [1], 1)) AS isBuy,
-- TRUE when user pays in base token, FALSE when user pays in quote token
TRY_TO_BOOLEAN(RIGHT(segmented_data [2], 1)) AS inBaseQty,
-- TRUE when fixed qty is in base token, FALSE when fixed qty is in quote token
utils.udf_hex_to_int(
's2c',
segmented_data [3] :: STRING
) :: FLOAT AS qty,
-- in qty for fixed qty in swap or out qty for fixed qty out swap
utils.udf_hex_to_int(
's2c',
segmented_data [6] :: STRING
) :: FLOAT AS non_fixed_qty,
--minOut for fixed qty in swaps or maxIn for fixed qty out swaps
utils.udf_hex_to_int(
's2c',
segmented_data [8] :: STRING
) :: FLOAT AS base_qty,
utils.udf_hex_to_int(
's2c',
segmented_data [9] :: STRING
) :: FLOAT AS quote_qty,
CASE
WHEN base_qty > 0 THEN base_qty
WHEN quote_qty > 0 THEN quote_qty
ELSE NULL
END AS amount_in_unadj,
CASE
WHEN base_qty > 0 THEN ABS(quote_qty)
WHEN quote_qty > 0 THEN ABS(base_qty)
ELSE NULL
END AS amount_out_unadj,
CASE
WHEN base_qty > 0 THEN base_token
WHEN quote_qty > 0 THEN quote_token
ELSE NULL
END AS token_in,
CASE
WHEN base_qty > 0 THEN quote_token
WHEN quote_qty > 0 THEN base_token
ELSE NULL
END AS token_out,
_log_id,
_inserted_timestamp
FROM
{{ ref('silver__logs') }}
WHERE
contract_address = '0xaaaaaaaaffe404ee9433eef0094b6382d81fb958' --CrocSwapDex
AND topics [0] :: STRING = '0x5d7a6c346454f5c536b7f52655e780f6db27b15b489f80f2dbb288c9e4f366bd'
AND tx_status = 'SUCCESS'
{% if is_incremental() %}
AND _inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) - INTERVAL '12 hours'
FROM
{{ this }}
)
{% endif %}
)
SELECT
block_number,
block_timestamp,
tx_hash,
event_index,
origin_function_signature,
origin_from_address,
origin_to_address,
contract_address,
origin_from_address AS sender,
CASE
WHEN origin_function_signature = '0xa62ea46d' THEN origin_to_address
ELSE origin_from_address
END AS tx_to,
token_in,
token_out,
amount_in_unadj,
amount_out_unadj,
'Swap' AS event_name,
'ambient' AS platform,
_log_id,
_inserted_timestamp
FROM
swaps_base
WHERE
token_in IS NOT NULL
AND token_out IS NOT NULL

View File

@ -0,0 +1,83 @@
version: 2
models:
- name: silver_dex__ambient_swaps
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- _LOG_ID
columns:
- name: BLOCK_NUMBER
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: BLOCK_TIMESTAMP
tests:
- not_null
- dbt_expectations.expect_row_values_to_have_recent_data:
datepart: day
interval: 2
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_LTZ
- TIMESTAMP_NTZ
- name: CONTRACT_ADDRESS
tests:
- not_null
- dbt_expectations.expect_column_values_to_match_regex:
regex: 0[xX][0-9a-fA-F]+
- name: AMOUNT_IN_UNADJ
tests:
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: AMOUNT_IN
tests:
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: AMOUNT_OUT_UNADJ
tests:
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: AMOUNT_OUT
tests:
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: TOKEN_IN
tests:
- not_null
- dbt_expectations.expect_column_values_to_match_regex:
regex: 0[xX][0-9a-fA-F]+
- name: TOKEN_OUT
tests:
- not_null
- dbt_expectations.expect_column_values_to_match_regex:
regex: 0[xX][0-9a-fA-F]+
- name: SYMBOL_IN
tests:
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- name: SYMBOL_OUT
tests:
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- name: TX_TO
tests:
- not_null
- dbt_expectations.expect_column_values_to_match_regex:
regex: 0[xX][0-9a-fA-F]+

View File

@ -0,0 +1,58 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'delete+insert',
unique_key = 'pool_address',
tags = ['curated']
) }}
WITH created_pools AS (
SELECT
block_number,
block_timestamp,
tx_hash,
event_index,
contract_address,
regexp_substr_all(SUBSTR(DATA, 3, len(DATA)), '.{64}') AS segmented_data,
CONCAT('0x', SUBSTR(topics [1] :: STRING, 27, 40)) AS pool_address,
CASE
WHEN segmented_data [0] :: STRING = '000000000000000000000000eeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee' THEN '0x4300000000000000000000000000000000000004'
ELSE CONCAT('0x', SUBSTR(segmented_data [0] :: STRING, 25, 40))
END AS token0,
CASE
WHEN segmented_data [1] :: STRING = '000000000000000000000000eeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee' THEN '0x4300000000000000000000000000000000000004'
ELSE CONCAT('0x', SUBSTR(segmented_data [1] :: STRING, 25, 40))
END AS token1,
_log_id,
_inserted_timestamp
FROM
{{ ref ('silver__logs') }}
WHERE
contract_address = LOWER('0x75cb3ec310d3d1e22637f79d61eab5d9abcd68bd') --XYKPoolFactory
AND topics [0] :: STRING = '0x5f28326c23d3e7607ba7f55dcee451cc4dbb16e3f016b27f013c99fb1d7d4168' --PairCreated
AND tx_status = 'SUCCESS'
{% if is_incremental() %}
AND _inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) - INTERVAL '12 hours'
FROM
{{ this }}
)
{% endif %}
)
SELECT
block_number,
block_timestamp,
tx_hash,
event_index,
contract_address,
token0,
token1,
pool_address,
_log_id,
_inserted_timestamp
FROM
created_pools qualify(ROW_NUMBER() over (PARTITION BY pool_address
ORDER BY
_inserted_timestamp DESC)) = 1

View File

@ -0,0 +1,20 @@
version: 2
models:
- name: silver_dex__bladeswap_pools
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- POOL_ADDRESS
columns:
- name: BLOCK_NUMBER
tests:
- not_null
- name: POOL_ADDRESS
tests:
- not_null
- name: TOKEN0
tests:
- not_null
- name: TOKEN1
tests:
- not_null

View File

@ -0,0 +1,162 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'delete+insert',
unique_key = 'pool_address',
cluster_by = ['block_timestamp::DATE'],
tags = ['curated']
) }}
WITH created_pools AS (
SELECT
block_number,
block_timestamp,
tx_hash,
event_index,
contract_address,
regexp_substr_all(SUBSTR(DATA, 3, len(DATA)), '.{64}') AS segmented_data,
LOWER(CONCAT('0x', SUBSTR(topics [1] :: STRING, 27, 40))) AS token0_address,
LOWER(CONCAT('0x', SUBSTR(topics [2] :: STRING, 27, 40))) AS token1_address,
CONCAT('0x', SUBSTR(segmented_data [0] :: STRING, 25, 40)) AS pool_address,
_log_id,
_inserted_timestamp
FROM
{{ ref('silver__logs') }}
WHERE
topics [0] :: STRING = '0x91ccaa7a278130b65168c3a0c8d3bcae84cf5e43704342bd3ec0b59e59c036db'
AND contract_address = '0xa87dbf5082af26c9a6ab2b854e378f704638cca5' --AlgebraFactory
AND tx_status = 'SUCCESS'
{% if is_incremental() %}
AND _inserted_timestamp >= (
SELECT
MAX(
_inserted_timestamp
) - INTERVAL '12 hours'
FROM
{{ this }}
)
{% endif %}
),
pool_info AS (
SELECT
contract_address,
topics [0] :: STRING AS topics_0,
topics [1] :: STRING AS topics_1,
topics [2] :: STRING AS topics_2,
topics [3] :: STRING AS topics_3,
DATA,
regexp_substr_all(SUBSTR(DATA, 3, len(DATA)), '.{64}') AS segmented_data,
_log_id,
_inserted_timestamp
FROM
{{ ref('silver__logs') }}
WHERE
topics [0] :: STRING IN (
'0x98636036cb66a9c19a37435efc1e90142190214e8abeb821bdba3f2990dd4c95',
'0x598b9f043c813aa6be3426ca60d1c65d17256312890be5118dab55b0775ebe2a',
'0x01413b1d5d4c359e9a0daa7909ecda165f6e8c51fe2ff529d74b22a5a7c02645'
)
AND tx_status = 'SUCCESS'
{% if is_incremental() %}
AND _inserted_timestamp >= (
SELECT
MAX(
_inserted_timestamp
) - INTERVAL '12 hours'
FROM
{{ this }}
)
{% endif %}
),
initial_info AS (
SELECT
contract_address,
segmented_data,
utils.udf_hex_to_int('s2c', CONCAT('0x', segmented_data [0] :: STRING)) :: FLOAT AS init_sqrtPriceX96,
utils.udf_hex_to_int('s2c', CONCAT('0x', segmented_data [1] :: STRING)) :: FLOAT AS init_tick,
pow(
1.0001,
init_tick
) AS init_price_1_0_unadj,
_log_id,
_inserted_timestamp
FROM
pool_info
WHERE
topics_0 = '0x98636036cb66a9c19a37435efc1e90142190214e8abeb821bdba3f2990dd4c95'
),
fee_info AS (
SELECT
contract_address,
segmented_data,
utils.udf_hex_to_int('s2c', CONCAT('0x', segmented_data [0] :: STRING)) :: FLOAT AS fee,
_log_id,
_inserted_timestamp
FROM
pool_info
WHERE
topics_0 = '0x598b9f043c813aa6be3426ca60d1c65d17256312890be5118dab55b0775ebe2a'
),
tickspacing_info AS (
SELECT
contract_address,
segmented_data,
utils.udf_hex_to_int('s2c', CONCAT('0x', segmented_data [0] :: STRING)) :: FLOAT AS tick_spacing,
_log_id,
_inserted_timestamp
FROM
pool_info
WHERE
topics_0 = '0x01413b1d5d4c359e9a0daa7909ecda165f6e8c51fe2ff529d74b22a5a7c02645'
),
FINAL AS (
SELECT
block_number,
block_timestamp,
tx_hash,
event_index,
p.contract_address,
token0_address,
token1_address,
fee :: INTEGER AS fee,
(
fee / 10000
) :: FLOAT AS fee_percent,
tick_spacing,
pool_address,
COALESCE(
init_tick,
0
) AS init_tick,
p._log_id,
p._inserted_timestamp
FROM
created_pools p
LEFT JOIN initial_info i
ON p.pool_address = i.contract_address
LEFT JOIN fee_info f
ON p.pool_address = f.contract_address
LEFT JOIN tickspacing_info t
ON p.pool_address = t.contract_address
)
SELECT
block_number,
block_timestamp,
tx_hash,
event_index,
contract_address,
token0_address,
token1_address,
fee,
fee_percent,
tick_spacing,
pool_address,
init_tick,
_log_id,
_inserted_timestamp
FROM
FINAL qualify(ROW_NUMBER() over(PARTITION BY pool_address
ORDER BY
_inserted_timestamp DESC)) = 1

View File

@ -0,0 +1,23 @@
version: 2
models:
- name: silver_dex__bladeswap_pools_v3
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- POOL_ADDRESS
columns:
- name: BLOCK_NUMBER
tests:
- not_null
- name: POOL_ADDRESS
tests:
- not_null
- name: TOKEN0_ADDRESS
tests:
- not_null
- name: TOKEN1_ADDRESS
tests:
- not_null

View File

@ -0,0 +1,112 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'delete+insert',
unique_key = 'block_number',
cluster_by = ['block_timestamp::DATE'],
tags = ['curated','reorg']
) }}
WITH swaps_base AS (
SELECT
l.block_number,
l.block_timestamp,
l.tx_hash,
l.event_index,
origin_function_signature,
origin_from_address,
origin_to_address,
l.contract_address,
regexp_substr_all(SUBSTR(DATA, 3, len(DATA)), '.{64}') AS segmented_data,
TRY_TO_NUMBER(
utils.udf_hex_to_int(
segmented_data [0] :: STRING
)
) AS amount0In,
TRY_TO_NUMBER(
utils.udf_hex_to_int(
segmented_data [1] :: STRING
)
) AS amount1In,
TRY_TO_NUMBER(
utils.udf_hex_to_int(
segmented_data [2] :: STRING
)
) AS amount0Out,
TRY_TO_NUMBER(
utils.udf_hex_to_int(
segmented_data [3] :: STRING
)
) AS amount1Out,
CONCAT('0x', SUBSTR(topics [1] :: STRING, 27, 40)) AS sender,
CONCAT('0x', SUBSTR(topics [2] :: STRING, 27, 40)) AS tx_to,
token0,
token1,
l._log_id,
l._inserted_timestamp
FROM
{{ ref('silver__logs') }}
l
INNER JOIN {{ ref('silver_dex__bladeswap_pools') }}
p
ON p.pool_address = l.contract_address
WHERE
topics [0] :: STRING = '0xd78ad95fa46c994b6551d0da85fc275fe613ce37657fb8d5e3d130840159d822'
AND tx_status = 'SUCCESS'
{% if is_incremental() %}
AND l._inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) - INTERVAL '12 hours'
FROM
{{ this }}
)
{% endif %}
)
SELECT
block_number,
block_timestamp,
tx_hash,
event_index,
origin_function_signature,
origin_from_address,
origin_to_address,
contract_address,
sender,
tx_to,
amount0In,
amount1In,
amount0Out,
amount1Out,
token0,
token1,
CASE
WHEN amount0In <> 0
AND amount1In <> 0
AND amount0Out <> 0 THEN amount1In
WHEN amount0In <> 0 THEN amount0In
WHEN amount1In <> 0 THEN amount1In
END AS amount_in_unadj,
CASE
WHEN amount0Out <> 0 THEN amount0Out
WHEN amount1Out <> 0 THEN amount1Out
END AS amount_out_unadj,
CASE
WHEN amount0In <> 0
AND amount1In <> 0
AND amount0Out <> 0 THEN token1
WHEN amount0In <> 0 THEN token0
WHEN amount1In <> 0 THEN token1
END AS token_in,
CASE
WHEN amount0Out <> 0 THEN token0
WHEN amount1Out <> 0 THEN token1
END AS token_out,
'Swap' AS event_name,
'bladeswap' AS platform,
_log_id,
_inserted_timestamp
FROM
swaps_base
WHERE
token_in <> token_out

View File

@ -0,0 +1,64 @@
version: 2
models:
- name: silver_dex__bladeswap_swaps
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- _LOG_ID
columns:
- name: BLOCK_NUMBER
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: BLOCK_TIMESTAMP
tests:
- not_null
- dbt_expectations.expect_row_values_to_have_recent_data:
datepart: day
interval: 2
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_LTZ
- TIMESTAMP_NTZ
- name: CONTRACT_ADDRESS
tests:
- not_null
- dbt_expectations.expect_column_values_to_match_regex:
regex: 0[xX][0-9a-fA-F]+
- name: AMOUNT_IN_UNADJ
tests:
- not_null
- name: AMOUNT_OUT_UNADJ
tests:
- not_null
- name: TOKEN_IN
tests:
- not_null
- dbt_expectations.expect_column_values_to_match_regex:
regex: 0[xX][0-9a-fA-F]+
- name: TOKEN_OUT
tests:
- not_null
- dbt_expectations.expect_column_values_to_match_regex:
regex: 0[xX][0-9a-fA-F]+
- name: SYMBOL_IN
tests:
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- name: SYMBOL_OUT
tests:
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- name: TX_TO
tests:
- not_null
- dbt_expectations.expect_column_values_to_match_regex:
regex: 0[xX][0-9a-fA-F]+

View File

@ -0,0 +1,97 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'delete+insert',
unique_key = 'block_number',
cluster_by = ['block_timestamp::DATE'],
tags = ['curated','reorg']
) }}
WITH swaps_base AS (
SELECT
l.block_number,
l.block_timestamp,
l.tx_hash,
l.event_index,
l.origin_function_signature,
l.origin_from_address,
l.origin_to_address,
l.contract_address,
regexp_substr_all(SUBSTR(DATA, 3, len(DATA)), '.{64}') AS segmented_data,
CONCAT('0x', SUBSTR(topics [1] :: STRING, 27, 40)) AS sender,
CONCAT('0x', SUBSTR(topics [2] :: STRING, 27, 40)) AS recipient,
utils.udf_hex_to_int(
's2c',
segmented_data [0] :: STRING
) :: FLOAT AS amount0_unadj,
utils.udf_hex_to_int(
's2c',
segmented_data [1] :: STRING
) :: FLOAT AS amount1_unadj,
utils.udf_hex_to_int(
's2c',
segmented_data [2] :: STRING
) :: FLOAT AS sqrtPriceX96,
utils.udf_hex_to_int(
's2c',
segmented_data [3] :: STRING
) :: FLOAT AS liquidity,
utils.udf_hex_to_int(
's2c',
segmented_data [4] :: STRING
) :: FLOAT AS tick,
token0_address,
token1_address,
pool_address,
tick_spacing,
fee,
l._log_id,
l._inserted_timestamp
FROM
{{ ref('silver__logs') }}
l
INNER JOIN {{ ref('silver_dex__bladeswap_pools_v3') }}
p
ON p.pool_address = l.contract_address
WHERE
l.block_timestamp :: DATE >= '2024-01-01'
AND topics [0] :: STRING = '0xc42079f94a6350d7e6235f29174924f928cc2ac818eb64fed8004e115fbcca67'
AND tx_status = 'SUCCESS'
{% if is_incremental() %}
AND l._inserted_timestamp >= (
SELECT
MAX(
_inserted_timestamp
) - INTERVAL '12 hours'
FROM
{{ this }}
)
{% endif %}
)
SELECT
block_number,
block_timestamp,
tx_hash,
event_index,
origin_function_signature,
origin_from_address,
origin_to_address,
contract_address,
pool_address,
recipient,
sender,
fee,
tick,
tick_spacing,
liquidity,
token0_address,
token1_address,
amount0_unadj,
amount1_unadj,
_log_id,
_inserted_timestamp
FROM
swaps_base qualify(ROW_NUMBER() over(PARTITION BY _log_id
ORDER BY
_inserted_timestamp DESC)) = 1

View File

@ -0,0 +1,38 @@
version: 2
models:
- name: silver_dex__bladeswap_swaps_v3
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- _LOG_ID
columns:
- name: BLOCK_NUMBER
tests:
- not_null
- name: BLOCK_TIMESTAMP
tests:
- not_null
- dbt_expectations.expect_row_values_to_have_recent_data:
datepart: day
interval: 1
- name: POOL_ADDRESS
tests:
- not_null
- name: RECIPIENT
tests:
- not_null
- name: SENDER
tests:
- not_null
- name: TOKEN0_ADDRESS
tests:
- not_null
- name: TOKEN1_ADDRESS
tests:
- not_null
- name: AMOUNT0_UNADJ
tests:
- not_null
- name: AMOUNT1_UNADJ
tests:
- not_null

View File

@ -0,0 +1,58 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'delete+insert',
unique_key = 'pool_address',
tags = ['curated']
) }}
WITH created_pools AS (
SELECT
block_number,
block_timestamp,
tx_hash,
event_index,
contract_address,
regexp_substr_all(SUBSTR(DATA, 3, len(DATA)), '.{64}') AS segmented_data,
CONCAT('0x', SUBSTR(topics [1] :: STRING, 27, 40)) AS token0,
CONCAT('0x', SUBSTR(topics [2] :: STRING, 27, 40)) AS token1,
CONCAT('0x', SUBSTR(segmented_data [0] :: STRING, 25, 40)) AS pool_address,
TRY_TO_NUMBER(
utils.udf_hex_to_int(
segmented_data [1] :: STRING
)
) AS pool_id,
_log_id,
_inserted_timestamp
FROM
{{ ref ('silver__logs') }}
WHERE
contract_address = LOWER('0x9cc1599d4378ea41d444642d18aa9be44f709ffd') --BlasterswapV2Factory
AND topics [0] :: STRING = '0x0d3648bd0f6ba80134a33ba9275ac585d9d315f0ad8355cddefde31afa28d0e9' --PairCreated
AND tx_status = 'SUCCESS'
{% if is_incremental() %}
AND _inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) - INTERVAL '12 hours'
FROM
{{ this }}
)
{% endif %}
)
SELECT
block_number,
block_timestamp,
tx_hash,
event_index,
contract_address,
token0,
token1,
pool_address,
pool_id,
_log_id,
_inserted_timestamp
FROM
created_pools qualify(ROW_NUMBER() over (PARTITION BY pool_address
ORDER BY
_inserted_timestamp DESC)) = 1

View File

@ -0,0 +1,20 @@
version: 2
models:
- name: silver_dex__blaster_pools
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- POOL_ADDRESS
columns:
- name: BLOCK_NUMBER
tests:
- not_null
- name: POOL_ADDRESS
tests:
- not_null
- name: TOKEN0
tests:
- not_null
- name: TOKEN1
tests:
- not_null

View File

@ -0,0 +1,126 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'delete+insert',
unique_key = 'pool_address',
cluster_by = ['block_timestamp::DATE'],
tags = ['curated']
) }}
WITH created_pools AS (
SELECT
block_number,
block_timestamp,
tx_hash,
event_index,
contract_address,
regexp_substr_all(SUBSTR(DATA, 3, len(DATA)), '.{64}') AS segmented_data,
LOWER(CONCAT('0x', SUBSTR(topics [1] :: STRING, 27, 40))) AS token0_address,
LOWER(CONCAT('0x', SUBSTR(topics [2] :: STRING, 27, 40))) AS token1_address,
TRY_TO_NUMBER(
utils.udf_hex_to_int(
's2c',
topics [3] :: STRING
)
) AS fee,
TRY_TO_NUMBER(
utils.udf_hex_to_int(
's2c',
segmented_data [0] :: STRING
)
) AS tick_spacing,
CONCAT('0x', SUBSTR(segmented_data [1] :: STRING, 25, 40)) AS pool_address,
_log_id,
_inserted_timestamp
FROM
{{ ref('silver__logs') }}
WHERE
topics [0] :: STRING = '0x783cca1c0412dd0d695e784568c96da2e9c22ff989357a2e8b1d9b2b4e6b7118'
AND contract_address = '0x1a8027625c830aac43ad82a3f7cd6d5fdce89d78' --BlasterSwapV3Factory
AND tx_status = 'SUCCESS'
{% if is_incremental() %}
AND _inserted_timestamp >= (
SELECT
MAX(
_inserted_timestamp
) - INTERVAL '12 hours'
FROM
{{ this }}
)
{% endif %}
),
initial_info AS (
SELECT
contract_address,
regexp_substr_all(SUBSTR(DATA, 3, len(DATA)), '.{64}') AS segmented_data,
utils.udf_hex_to_int('s2c', CONCAT('0x', segmented_data [0] :: STRING)) :: FLOAT AS init_sqrtPriceX96,
utils.udf_hex_to_int('s2c', CONCAT('0x', segmented_data [1] :: STRING)) :: FLOAT AS init_tick,
pow(
1.0001,
init_tick
) AS init_price_1_0_unadj,
_log_id,
_inserted_timestamp
FROM
{{ ref('silver__logs') }}
WHERE
topics [0] :: STRING = '0x98636036cb66a9c19a37435efc1e90142190214e8abeb821bdba3f2990dd4c95'
AND tx_status = 'SUCCESS'
{% if is_incremental() %}
AND _inserted_timestamp >= (
SELECT
MAX(
_inserted_timestamp
) - INTERVAL '12 hours'
FROM
{{ this }}
)
{% endif %}
),
FINAL AS (
SELECT
block_number,
block_timestamp,
tx_hash,
event_index,
p.contract_address,
token0_address,
token1_address,
fee :: INTEGER AS fee,
(
fee / 10000
) :: FLOAT AS fee_percent,
tick_spacing,
pool_address,
COALESCE(
init_tick,
0
) AS init_tick,
p._log_id,
p._inserted_timestamp
FROM
created_pools p
LEFT JOIN initial_info i
ON p.pool_address = i.contract_address
)
SELECT
block_number,
block_timestamp,
tx_hash,
event_index,
contract_address,
token0_address,
token1_address,
fee,
fee_percent,
tick_spacing,
pool_address,
init_tick,
_log_id,
_inserted_timestamp
FROM
FINAL qualify(ROW_NUMBER() over(PARTITION BY pool_address
ORDER BY
_inserted_timestamp DESC)) = 1

View File

@ -0,0 +1,20 @@
version: 2
models:
- name: silver_dex__blaster_pools_v3
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- POOL_ADDRESS
columns:
- name: BLOCK_NUMBER
tests:
- not_null
- name: POOL_ADDRESS
tests:
- not_null
- name: TOKEN0_ADDRESS
tests:
- not_null
- name: TOKEN1_ADDRESS
tests:
- not_null

View File

@ -0,0 +1,112 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'delete+insert',
unique_key = 'block_number',
cluster_by = ['block_timestamp::DATE'],
tags = ['curated','reorg']
) }}
WITH swaps_base AS (
SELECT
l.block_number,
l.block_timestamp,
l.tx_hash,
l.event_index,
origin_function_signature,
origin_from_address,
origin_to_address,
l.contract_address,
regexp_substr_all(SUBSTR(DATA, 3, len(DATA)), '.{64}') AS segmented_data,
TRY_TO_NUMBER(
utils.udf_hex_to_int(
segmented_data [0] :: STRING
)
) AS amount0In,
TRY_TO_NUMBER(
utils.udf_hex_to_int(
segmented_data [1] :: STRING
)
) AS amount1In,
TRY_TO_NUMBER(
utils.udf_hex_to_int(
segmented_data [2] :: STRING
)
) AS amount0Out,
TRY_TO_NUMBER(
utils.udf_hex_to_int(
segmented_data [3] :: STRING
)
) AS amount1Out,
CONCAT('0x', SUBSTR(topics [1] :: STRING, 27, 40)) AS sender,
CONCAT('0x', SUBSTR(topics [2] :: STRING, 27, 40)) AS tx_to,
token0,
token1,
l._log_id,
l._inserted_timestamp
FROM
{{ ref('silver__logs') }}
l
INNER JOIN {{ ref('silver_dex__blaster_pools') }}
p
ON p.pool_address = l.contract_address
WHERE
topics [0] :: STRING = '0xd78ad95fa46c994b6551d0da85fc275fe613ce37657fb8d5e3d130840159d822'
AND tx_status = 'SUCCESS'
{% if is_incremental() %}
AND l._inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) - INTERVAL '12 hours'
FROM
{{ this }}
)
{% endif %}
)
SELECT
block_number,
block_timestamp,
tx_hash,
event_index,
origin_function_signature,
origin_from_address,
origin_to_address,
contract_address,
sender,
tx_to,
amount0In,
amount1In,
amount0Out,
amount1Out,
token0,
token1,
CASE
WHEN amount0In <> 0
AND amount1In <> 0
AND amount0Out <> 0 THEN amount1In
WHEN amount0In <> 0 THEN amount0In
WHEN amount1In <> 0 THEN amount1In
END AS amount_in_unadj,
CASE
WHEN amount0Out <> 0 THEN amount0Out
WHEN amount1Out <> 0 THEN amount1Out
END AS amount_out_unadj,
CASE
WHEN amount0In <> 0
AND amount1In <> 0
AND amount0Out <> 0 THEN token1
WHEN amount0In <> 0 THEN token0
WHEN amount1In <> 0 THEN token1
END AS token_in,
CASE
WHEN amount0Out <> 0 THEN token0
WHEN amount1Out <> 0 THEN token1
END AS token_out,
'Swap' AS event_name,
'blasterswap' AS platform,
_log_id,
_inserted_timestamp
FROM
swaps_base
WHERE
token_in <> token_out

View File

@ -0,0 +1,64 @@
version: 2
models:
- name: silver_dex__blaster_swaps
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- _LOG_ID
columns:
- name: BLOCK_NUMBER
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: BLOCK_TIMESTAMP
tests:
- not_null
- dbt_expectations.expect_row_values_to_have_recent_data:
datepart: day
interval: 2
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_LTZ
- TIMESTAMP_NTZ
- name: CONTRACT_ADDRESS
tests:
- not_null
- dbt_expectations.expect_column_values_to_match_regex:
regex: 0[xX][0-9a-fA-F]+
- name: AMOUNT_IN_UNADJ
tests:
- not_null
- name: AMOUNT_OUT_UNADJ
tests:
- not_null
- name: TOKEN_IN
tests:
- not_null
- dbt_expectations.expect_column_values_to_match_regex:
regex: 0[xX][0-9a-fA-F]+
- name: TOKEN_OUT
tests:
- not_null
- dbt_expectations.expect_column_values_to_match_regex:
regex: 0[xX][0-9a-fA-F]+
- name: SYMBOL_IN
tests:
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- name: SYMBOL_OUT
tests:
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- name: TX_TO
tests:
- not_null
- dbt_expectations.expect_column_values_to_match_regex:
regex: 0[xX][0-9a-fA-F]+

View File

@ -0,0 +1,97 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'delete+insert',
unique_key = 'block_number',
cluster_by = ['block_timestamp::DATE'],
tags = ['curated','reorg']
) }}
WITH swaps_base AS (
SELECT
l.block_number,
l.block_timestamp,
l.tx_hash,
l.event_index,
l.origin_function_signature,
l.origin_from_address,
l.origin_to_address,
l.contract_address,
regexp_substr_all(SUBSTR(DATA, 3, len(DATA)), '.{64}') AS segmented_data,
CONCAT('0x', SUBSTR(topics [1] :: STRING, 27, 40)) AS sender,
CONCAT('0x', SUBSTR(topics [2] :: STRING, 27, 40)) AS recipient,
utils.udf_hex_to_int(
's2c',
segmented_data [0] :: STRING
) :: FLOAT AS amount0_unadj,
utils.udf_hex_to_int(
's2c',
segmented_data [1] :: STRING
) :: FLOAT AS amount1_unadj,
utils.udf_hex_to_int(
's2c',
segmented_data [2] :: STRING
) :: FLOAT AS sqrtPriceX96,
utils.udf_hex_to_int(
's2c',
segmented_data [3] :: STRING
) :: FLOAT AS liquidity,
utils.udf_hex_to_int(
's2c',
segmented_data [4] :: STRING
) :: FLOAT AS tick,
token0_address,
token1_address,
pool_address,
tick_spacing,
fee,
l._log_id,
l._inserted_timestamp
FROM
{{ ref('silver__logs') }}
l
INNER JOIN {{ ref('silver_dex__blaster_pools_v3') }}
p
ON p.pool_address = l.contract_address
WHERE
l.block_timestamp :: DATE >= '2024-01-01'
AND topics [0] :: STRING = '0xc42079f94a6350d7e6235f29174924f928cc2ac818eb64fed8004e115fbcca67'
AND tx_status = 'SUCCESS'
{% if is_incremental() %}
AND l._inserted_timestamp >= (
SELECT
MAX(
_inserted_timestamp
) - INTERVAL '12 hours'
FROM
{{ this }}
)
{% endif %}
)
SELECT
block_number,
block_timestamp,
tx_hash,
event_index,
origin_function_signature,
origin_from_address,
origin_to_address,
contract_address,
pool_address,
recipient,
sender,
fee,
tick,
tick_spacing,
liquidity,
token0_address,
token1_address,
amount0_unadj,
amount1_unadj,
_log_id,
_inserted_timestamp
FROM
swaps_base qualify(ROW_NUMBER() over(PARTITION BY _log_id
ORDER BY
_inserted_timestamp DESC)) = 1

View File

@ -0,0 +1,38 @@
version: 2
models:
- name: silver_dex__blaster_swaps_v3
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- _LOG_ID
columns:
- name: BLOCK_NUMBER
tests:
- not_null
- name: BLOCK_TIMESTAMP
tests:
- not_null
- dbt_expectations.expect_row_values_to_have_recent_data:
datepart: day
interval: 1
- name: POOL_ADDRESS
tests:
- not_null
- name: RECIPIENT
tests:
- not_null
- name: SENDER
tests:
- not_null
- name: TOKEN0_ADDRESS
tests:
- not_null
- name: TOKEN1_ADDRESS
tests:
- not_null
- name: AMOUNT0_UNADJ
tests:
- not_null
- name: AMOUNT1_UNADJ
tests:
- not_null

View File

@ -0,0 +1,58 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'delete+insert',
unique_key = 'pool_address',
tags = ['curated']
) }}
WITH created_pools AS (
SELECT
block_number,
block_timestamp,
tx_hash,
event_index,
contract_address,
regexp_substr_all(SUBSTR(DATA, 3, len(DATA)), '.{64}') AS segmented_data,
CONCAT('0x', SUBSTR(topics [1] :: STRING, 27, 40)) AS token0,
CONCAT('0x', SUBSTR(topics [2] :: STRING, 27, 40)) AS token1,
CONCAT('0x', SUBSTR(segmented_data [0] :: STRING, 25, 40)) AS pool_address,
TRY_TO_NUMBER(
utils.udf_hex_to_int(
segmented_data [1] :: STRING
)
) AS pool_id,
_log_id,
_inserted_timestamp
FROM
{{ ref ('silver__logs') }}
WHERE
contract_address = LOWER('0x24f5ac9a706de0cf795a8193f6ab3966b14ecfe6') --SwapV2Factory
AND topics [0] :: STRING = '0x0d3648bd0f6ba80134a33ba9275ac585d9d315f0ad8355cddefde31afa28d0e9' --PairCreated
AND tx_status = 'SUCCESS'
{% if is_incremental() %}
AND _inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) - INTERVAL '12 hours'
FROM
{{ this }}
)
{% endif %}
)
SELECT
block_number,
block_timestamp,
tx_hash,
event_index,
contract_address,
token0,
token1,
pool_address,
pool_id,
_log_id,
_inserted_timestamp
FROM
created_pools qualify(ROW_NUMBER() over (PARTITION BY pool_address
ORDER BY
_inserted_timestamp DESC)) = 1

View File

@ -0,0 +1,20 @@
version: 2
models:
- name: silver_dex__ring_pools
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- POOL_ADDRESS
columns:
- name: BLOCK_NUMBER
tests:
- not_null
- name: POOL_ADDRESS
tests:
- not_null
- name: TOKEN0
tests:
- not_null
- name: TOKEN1
tests:
- not_null

View File

@ -0,0 +1,35 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'delete+insert',
unique_key = 'block_number',
tags = ['curated']
) }}
SELECT
block_number,
tx_hash,
event_index,
contract_address,
regexp_substr_all(SUBSTR(DATA, 3, len(DATA)), '.{64}') AS segmented_data,
CONCAT('0x', SUBSTR(topics [1] :: STRING, 27, 40)) AS original_token,
CONCAT('0x', SUBSTR(segmented_data [0] :: STRING, 25, 40)) AS wrapped_token,
utils.udf_hex_to_int(
segmented_data [1] :: STRING
) :: INTEGER AS asset_id,
_log_id,
_inserted_timestamp
FROM
{{ ref('silver__logs') }}
WHERE
contract_address = '0x455b20131d59f01d082df1225154fda813e8cee9' --FewFactory
AND topics [0] :: STRING = '0x940398321949af993516f7d144a2b9f43100b1de59365ba376e2b458d840c091' --WrappedTokenCreated
AND tx_status = 'SUCCESS'
{% if is_incremental() %}
AND _inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) - INTERVAL '12 hours'
FROM
{{ this }}
)
{% endif %}

View File

@ -0,0 +1,16 @@
version: 2
models:
- name: silver_dex__ring_pools_tokens_wrapped
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- WRAPPED_TOKEN
columns:
- name: BLOCK_NUMBER
tests:
- not_null
- name: ORIGINAL_TOKEN
tests:
- not_null
- dbt_expectations.expect_column_values_to_match_regex:
regex: 0[xX][0-9a-fA-F]+

View File

@ -0,0 +1,122 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'delete+insert',
unique_key = 'pool_address',
cluster_by = ['block_timestamp::DATE'],
tags = ['curated']
) }}
WITH created_pools AS (
SELECT
block_number,
block_timestamp,
tx_hash,
event_index,
contract_address,
regexp_substr_all(SUBSTR(DATA, 3, len(DATA)), '.{64}') AS segmented_data,
LOWER(CONCAT('0x', SUBSTR(topics [1] :: STRING, 27, 40))) AS token0_address,
LOWER(CONCAT('0x', SUBSTR(topics [2] :: STRING, 27, 40))) AS token1_address,
utils.udf_hex_to_int(
's2c',
topics [3] :: STRING
) :: INTEGER AS fee,
utils.udf_hex_to_int(
's2c',
segmented_data [0] :: STRING
) :: INTEGER AS tick_spacing,
CONCAT('0x', SUBSTR(segmented_data [1] :: STRING, 25, 40)) AS pool_address,
_log_id,
_inserted_timestamp
FROM
{{ ref('silver__logs') }}
WHERE
topics [0] :: STRING = '0x783cca1c0412dd0d695e784568c96da2e9c22ff989357a2e8b1d9b2b4e6b7118'
AND contract_address = '0x890509fab3dd11d4ff57d8471b5eac74687e4c75' --Ring/UniswapV3Factory
AND tx_status = 'SUCCESS'
{% if is_incremental() %}
AND _inserted_timestamp >= (
SELECT
MAX(
_inserted_timestamp
) - INTERVAL '12 hours'
FROM
{{ this }}
)
{% endif %}
),
initial_info AS (
SELECT
contract_address,
regexp_substr_all(SUBSTR(DATA, 3, len(DATA)), '.{64}') AS segmented_data,
utils.udf_hex_to_int('s2c', CONCAT('0x', segmented_data [0] :: STRING)) :: FLOAT AS init_sqrtPriceX96,
utils.udf_hex_to_int('s2c', CONCAT('0x', segmented_data [1] :: STRING)) :: FLOAT AS init_tick,
pow(
1.0001,
init_tick
) AS init_price_1_0_unadj,
_log_id,
_inserted_timestamp
FROM
{{ ref('silver__logs') }}
WHERE
topics [0] :: STRING = '0x98636036cb66a9c19a37435efc1e90142190214e8abeb821bdba3f2990dd4c95'
AND tx_status = 'SUCCESS'
{% if is_incremental() %}
AND _inserted_timestamp >= (
SELECT
MAX(
_inserted_timestamp
) - INTERVAL '12 hours'
FROM
{{ this }}
)
{% endif %}
),
FINAL AS (
SELECT
block_number,
block_timestamp,
tx_hash,
event_index,
p.contract_address,
token0_address,
token1_address,
fee :: INTEGER AS fee,
(
fee / 10000
) :: FLOAT AS fee_percent,
tick_spacing,
pool_address,
COALESCE(
init_tick,
0
) AS init_tick,
p._log_id,
p._inserted_timestamp
FROM
created_pools p
LEFT JOIN initial_info i
ON p.pool_address = i.contract_address
)
SELECT
block_number,
block_timestamp,
tx_hash,
event_index,
contract_address,
token0_address,
token1_address,
fee,
fee_percent,
tick_spacing,
pool_address,
init_tick,
_log_id,
_inserted_timestamp
FROM
FINAL qualify(ROW_NUMBER() over(PARTITION BY pool_address
ORDER BY
_inserted_timestamp DESC)) = 1

View File

@ -0,0 +1,22 @@
version: 2
models:
- name: silver_dex__ring_pools_v3
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- POOL_ADDRESS
columns:
- name: BLOCK_NUMBER
tests:
- not_null
- name: POOL_ADDRESS
tests:
- not_null
- name: TOKEN0_ADDRESS
tests:
- not_null
- name: TOKEN1_ADDRESS
tests:
- not_null

View File

@ -0,0 +1,112 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'delete+insert',
unique_key = 'block_number',
cluster_by = ['block_timestamp::DATE'],
tags = ['curated','reorg']
) }}
WITH swaps_base AS (
SELECT
l.block_number,
l.block_timestamp,
l.tx_hash,
l.event_index,
origin_function_signature,
origin_from_address,
origin_to_address,
l.contract_address,
regexp_substr_all(SUBSTR(DATA, 3, len(DATA)), '.{64}') AS segmented_data,
TRY_TO_NUMBER(
utils.udf_hex_to_int(
segmented_data [0] :: STRING
)
) AS amount0In,
TRY_TO_NUMBER(
utils.udf_hex_to_int(
segmented_data [1] :: STRING
)
) AS amount1In,
TRY_TO_NUMBER(
utils.udf_hex_to_int(
segmented_data [2] :: STRING
)
) AS amount0Out,
TRY_TO_NUMBER(
utils.udf_hex_to_int(
segmented_data [3] :: STRING
)
) AS amount1Out,
CONCAT('0x', SUBSTR(topics [1] :: STRING, 27, 40)) AS sender,
CONCAT('0x', SUBSTR(topics [2] :: STRING, 27, 40)) AS tx_to,
token0,
token1,
l._log_id,
l._inserted_timestamp
FROM
{{ ref('silver__logs') }}
l
INNER JOIN {{ ref('silver_dex__ring_pools') }}
p
ON p.pool_address = l.contract_address
WHERE
topics [0] :: STRING = '0xd78ad95fa46c994b6551d0da85fc275fe613ce37657fb8d5e3d130840159d822'
AND tx_status = 'SUCCESS'
{% if is_incremental() %}
AND l._inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) - INTERVAL '12 hours'
FROM
{{ this }}
)
{% endif %}
)
SELECT
block_number,
block_timestamp,
tx_hash,
event_index,
origin_function_signature,
origin_from_address,
origin_to_address,
contract_address,
sender,
tx_to,
amount0In,
amount1In,
amount0Out,
amount1Out,
token0,
token1,
CASE
WHEN amount0In <> 0
AND amount1In <> 0
AND amount0Out <> 0 THEN amount1In
WHEN amount0In <> 0 THEN amount0In
WHEN amount1In <> 0 THEN amount1In
END AS amount_in_unadj,
CASE
WHEN amount0Out <> 0 THEN amount0Out
WHEN amount1Out <> 0 THEN amount1Out
END AS amount_out_unadj,
CASE
WHEN amount0In <> 0
AND amount1In <> 0
AND amount0Out <> 0 THEN token1
WHEN amount0In <> 0 THEN token0
WHEN amount1In <> 0 THEN token1
END AS token_in,
CASE
WHEN amount0Out <> 0 THEN token0
WHEN amount1Out <> 0 THEN token1
END AS token_out,
'Swap' AS event_name,
'ring' AS platform,
_log_id,
_inserted_timestamp
FROM
swaps_base
WHERE
token_in <> token_out

View File

@ -0,0 +1,64 @@
version: 2
models:
- name: silver_dex__ring_swaps
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- _LOG_ID
columns:
- name: BLOCK_NUMBER
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: BLOCK_TIMESTAMP
tests:
- not_null
- dbt_expectations.expect_row_values_to_have_recent_data:
datepart: day
interval: 2
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_LTZ
- TIMESTAMP_NTZ
- name: CONTRACT_ADDRESS
tests:
- not_null
- dbt_expectations.expect_column_values_to_match_regex:
regex: 0[xX][0-9a-fA-F]+
- name: AMOUNT_IN_UNADJ
tests:
- not_null
- name: AMOUNT_OUT_UNADJ
tests:
- not_null
- name: TOKEN_IN
tests:
- not_null
- dbt_expectations.expect_column_values_to_match_regex:
regex: 0[xX][0-9a-fA-F]+
- name: TOKEN_OUT
tests:
- not_null
- dbt_expectations.expect_column_values_to_match_regex:
regex: 0[xX][0-9a-fA-F]+
- name: SYMBOL_IN
tests:
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- name: SYMBOL_OUT
tests:
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- name: TX_TO
tests:
- not_null
- dbt_expectations.expect_column_values_to_match_regex:
regex: 0[xX][0-9a-fA-F]+

View File

@ -0,0 +1,97 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'delete+insert',
unique_key = 'block_number',
cluster_by = ['block_timestamp::DATE'],
tags = ['curated','reorg']
) }}
WITH swaps_base AS (
SELECT
l.block_number,
l.block_timestamp,
l.tx_hash,
l.event_index,
l.origin_function_signature,
l.origin_from_address,
l.origin_to_address,
l.contract_address,
regexp_substr_all(SUBSTR(DATA, 3, len(DATA)), '.{64}') AS segmented_data,
CONCAT('0x', SUBSTR(topics [1] :: STRING, 27, 40)) AS sender,
CONCAT('0x', SUBSTR(topics [2] :: STRING, 27, 40)) AS recipient,
utils.udf_hex_to_int(
's2c',
segmented_data [0] :: STRING
) :: FLOAT AS amount0_unadj,
utils.udf_hex_to_int(
's2c',
segmented_data [1] :: STRING
) :: FLOAT AS amount1_unadj,
utils.udf_hex_to_int(
's2c',
segmented_data [2] :: STRING
) :: FLOAT AS sqrtPriceX96,
utils.udf_hex_to_int(
's2c',
segmented_data [3] :: STRING
) :: FLOAT AS liquidity,
utils.udf_hex_to_int(
's2c',
segmented_data [4] :: STRING
) :: FLOAT AS tick,
token0_address,
token1_address,
pool_address,
tick_spacing,
fee,
l._log_id,
l._inserted_timestamp
FROM
{{ ref('silver__logs') }}
l
INNER JOIN {{ ref('silver_dex__ring_pools_v3') }}
p
ON p.pool_address = l.contract_address
WHERE
l.block_timestamp :: DATE >= '2024-01-01'
AND topics [0] :: STRING = '0xc42079f94a6350d7e6235f29174924f928cc2ac818eb64fed8004e115fbcca67'
AND tx_status = 'SUCCESS'
{% if is_incremental() %}
AND l._inserted_timestamp >= (
SELECT
MAX(
_inserted_timestamp
) - INTERVAL '12 hours'
FROM
{{ this }}
)
{% endif %}
)
SELECT
block_number,
block_timestamp,
tx_hash,
event_index,
origin_function_signature,
origin_from_address,
origin_to_address,
contract_address,
pool_address,
recipient,
sender,
fee,
tick,
tick_spacing,
liquidity,
token0_address,
token1_address,
amount0_unadj,
amount1_unadj,
_log_id,
_inserted_timestamp
FROM
swaps_base qualify(ROW_NUMBER() over(PARTITION BY _log_id
ORDER BY
_inserted_timestamp DESC)) = 1

View File

@ -0,0 +1,38 @@
version: 2
models:
- name: silver_dex__ring_swaps_v3
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- _LOG_ID
columns:
- name: BLOCK_NUMBER
tests:
- not_null
- name: BLOCK_TIMESTAMP
tests:
- not_null
- dbt_expectations.expect_row_values_to_have_recent_data:
datepart: day
interval: 3
- name: POOL_ADDRESS
tests:
- not_null
- name: RECIPIENT
tests:
- not_null
- name: SENDER
tests:
- not_null
- name: TOKEN0_ADDRESS
tests:
- not_null
- name: TOKEN1_ADDRESS
tests:
- not_null
- name: AMOUNT0_UNADJ
tests:
- not_null
- name: AMOUNT1_UNADJ
tests:
- not_null

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,44 @@
version: 2
models:
- name: silver_dex__complete_dex_liquidity_pools
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- _ID
columns:
- name: BLOCK_NUMBER
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: BLOCK_TIMESTAMP
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_LTZ
- TIMESTAMP_NTZ
- name: TX_HASH
tests:
- not_null
- name: CONTRACT_ADDRESS
tests:
- not_null
- dbt_expectations.expect_column_values_to_match_regex:
regex: 0[xX][0-9a-fA-F]+
- name: POOL_ADDRESS
tests:
- not_null
- dbt_expectations.expect_column_values_to_match_regex:
regex: 0[xX][0-9a-fA-F]+
- name: PLATFORM
tests:
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- name: _ID
tests:
- not_null

View File

@ -0,0 +1,997 @@
-- depends_on: {{ ref('silver__complete_token_prices') }}
{{ config(
materialized = 'incremental',
incremental_strategy = 'delete+insert',
unique_key = ['block_number','platform','version'],
cluster_by = ['block_timestamp::DATE','platform'],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(tx_hash, origin_function_signature, origin_from_address, origin_to_address, contract_address, pool_name, event_name, sender, tx_to, token_in, token_out, symbol_in, symbol_out), SUBSTRING(origin_function_signature, pool_name, event_name, sender, tx_to, token_in, token_out, symbol_in, symbol_out)",
tags = ['curated','reorg','heal']
) }}
WITH ambient AS (
SELECT
block_number,
block_timestamp,
tx_hash,
event_index,
origin_function_signature,
origin_from_address,
origin_to_address,
contract_address,
event_name,
amount_in_unadj,
amount_out_unadj,
token_in,
token_out,
sender,
tx_to,
platform,
'v1' AS version,
_log_id,
_inserted_timestamp
FROM
{{ ref('silver_dex__ambient_swaps') }}
{% if is_incremental() and 'ambient' not in var('HEAL_MODELS') %}
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}'
FROM
{{ this }}
)
{% endif %}
),
bladeswap AS (
SELECT
block_number,
block_timestamp,
tx_hash,
event_index,
origin_function_signature,
origin_from_address,
origin_to_address,
contract_address,
event_name,
amount_in_unadj,
amount_out_unadj,
token_in,
token_out,
sender,
tx_to,
platform,
'v1' AS version,
_log_id,
_inserted_timestamp
FROM
{{ ref('silver_dex__bladeswap_swaps') }}
{% if is_incremental() and 'bladeswap' not in var('HEAL_MODELS') %}
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}'
FROM
{{ this }}
)
{% endif %}
),
bladeswap_v3 AS (
SELECT
block_number,
block_timestamp,
tx_hash,
event_index,
origin_function_signature,
origin_from_address,
origin_to_address,
pool_address AS contract_address,
'Swap' AS event_name,
CASE
WHEN amount0_unadj > 0 THEN ABS(amount0_unadj)
ELSE ABS(amount1_unadj)
END AS amount_in_unadj,
CASE
WHEN amount0_unadj < 0 THEN ABS(amount0_unadj)
ELSE ABS(amount1_unadj)
END AS amount_out_unadj,
CASE
WHEN amount0_unadj > 0 THEN token0_address
ELSE token1_address
END AS token_in,
CASE
WHEN amount0_unadj < 0 THEN token0_address
ELSE token1_address
END AS token_out,
sender,
recipient AS tx_to,
'bladeswap-v3' AS platform,
'v3' AS version,
_log_id,
_inserted_timestamp
FROM
{{ ref('silver_dex__bladeswap_swaps_v3') }}
{% if is_incremental() and 'bladeswap_v3' not in var('HEAL_MODELS') %}
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}'
FROM
{{ this }}
)
{% endif %}
),
blaster AS (
SELECT
block_number,
block_timestamp,
tx_hash,
event_index,
origin_function_signature,
origin_from_address,
origin_to_address,
contract_address,
event_name,
amount_in_unadj,
amount_out_unadj,
token_in,
token_out,
sender,
tx_to,
platform,
'v1' AS version,
_log_id,
_inserted_timestamp
FROM
{{ ref('silver_dex__blaster_swaps') }}
{% if is_incremental() and 'blaster' not in var('HEAL_MODELS') %}
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}'
FROM
{{ this }}
)
{% endif %}
),
blaster_v3 AS (
SELECT
block_number,
block_timestamp,
tx_hash,
event_index,
origin_function_signature,
origin_from_address,
origin_to_address,
pool_address AS contract_address,
'Swap' AS event_name,
CASE
WHEN amount0_unadj > 0 THEN ABS(amount0_unadj)
ELSE ABS(amount1_unadj)
END AS amount_in_unadj,
CASE
WHEN amount0_unadj < 0 THEN ABS(amount0_unadj)
ELSE ABS(amount1_unadj)
END AS amount_out_unadj,
CASE
WHEN amount0_unadj > 0 THEN token0_address
ELSE token1_address
END AS token_in,
CASE
WHEN amount0_unadj < 0 THEN token0_address
ELSE token1_address
END AS token_out,
sender,
recipient AS tx_to,
'blasterswap-v3' AS platform,
'v3' AS version,
_log_id,
_inserted_timestamp
FROM
{{ ref('silver_dex__blaster_swaps_v3') }}
{% if is_incremental() and 'blaster_v3' not in var('HEAL_MODELS') %}
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}'
FROM
{{ this }}
)
{% endif %}
),
ring AS (
SELECT
block_number,
block_timestamp,
tx_hash,
event_index,
origin_function_signature,
origin_from_address,
origin_to_address,
contract_address,
event_name,
amount_in_unadj,
amount_out_unadj,
token_in,
token_out,
sender,
tx_to,
platform,
'v1' AS version,
_log_id,
_inserted_timestamp
FROM
{{ ref('silver_dex__ring_swaps') }}
{% if is_incremental() and 'ring' not in var('HEAL_MODELS') %}
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}'
FROM
{{ this }}
)
{% endif %}
),
ring_v3 AS (
SELECT
block_number,
block_timestamp,
tx_hash,
event_index,
origin_function_signature,
origin_from_address,
origin_to_address,
pool_address AS contract_address,
'Swap' AS event_name,
CASE
WHEN amount0_unadj > 0 THEN ABS(amount0_unadj)
ELSE ABS(amount1_unadj)
END AS amount_in_unadj,
CASE
WHEN amount0_unadj < 0 THEN ABS(amount0_unadj)
ELSE ABS(amount1_unadj)
END AS amount_out_unadj,
CASE
WHEN amount0_unadj > 0 THEN token0_address
ELSE token1_address
END AS token_in,
CASE
WHEN amount0_unadj < 0 THEN token0_address
ELSE token1_address
END AS token_out,
sender,
recipient AS tx_to,
'ring-v3' AS platform,
'v3' AS version,
_log_id,
_inserted_timestamp
FROM
{{ ref('silver_dex__ring_swaps_v3') }}
{% if is_incremental() and 'ring_v3' not in var('HEAL_MODELS') %}
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}'
FROM
{{ this }}
)
{% endif %}
),
sushi AS (
SELECT
block_number,
block_timestamp,
tx_hash,
event_index,
origin_function_signature,
origin_from_address,
origin_to_address,
contract_address,
event_name,
amount_in_unadj,
amount_out_unadj,
token_in,
token_out,
sender,
tx_to,
platform,
'v1' AS version,
_log_id,
_inserted_timestamp
FROM
{{ ref('silver_dex__sushi_swaps') }}
{% if is_incremental() and 'sushi' not in var('HEAL_MODELS') %}
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}'
FROM
{{ this }}
)
{% endif %}
),
sushi_v3 AS (
SELECT
block_number,
block_timestamp,
tx_hash,
event_index,
origin_function_signature,
origin_from_address,
origin_to_address,
pool_address AS contract_address,
'Swap' AS event_name,
CASE
WHEN amount0_unadj > 0 THEN ABS(amount0_unadj)
ELSE ABS(amount1_unadj)
END AS amount_in_unadj,
CASE
WHEN amount0_unadj < 0 THEN ABS(amount0_unadj)
ELSE ABS(amount1_unadj)
END AS amount_out_unadj,
CASE
WHEN amount0_unadj > 0 THEN token0_address
ELSE token1_address
END AS token_in,
CASE
WHEN amount0_unadj < 0 THEN token0_address
ELSE token1_address
END AS token_out,
sender,
recipient AS tx_to,
'sushiswap-v3' AS platform,
'v3' AS version,
_log_id,
_inserted_timestamp
FROM
{{ ref('silver_dex__sushi_swaps_v3') }}
{% if is_incremental() and 'sushi' not in var('HEAL_MODELS') %}
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}'
FROM
{{ this }}
)
{% endif %}
),
thruster AS (
SELECT
block_number,
block_timestamp,
tx_hash,
event_index,
origin_function_signature,
origin_from_address,
origin_to_address,
contract_address,
event_name,
amount_in_unadj,
amount_out_unadj,
token_in,
token_out,
sender,
tx_to,
platform,
'v1' AS version,
_log_id,
_inserted_timestamp
FROM
{{ ref('silver_dex__thruster_swaps') }}
{% if is_incremental() and 'thruster' not in var('HEAL_MODELS') %}
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}'
FROM
{{ this }}
)
{% endif %}
),
thruster_v3 AS (
SELECT
block_number,
block_timestamp,
tx_hash,
event_index,
origin_function_signature,
origin_from_address,
origin_to_address,
pool_address AS contract_address,
'Swap' AS event_name,
CASE
WHEN amount0_unadj > 0 THEN ABS(amount0_unadj)
ELSE ABS(amount1_unadj)
END AS amount_in_unadj,
CASE
WHEN amount0_unadj < 0 THEN ABS(amount0_unadj)
ELSE ABS(amount1_unadj)
END AS amount_out_unadj,
CASE
WHEN amount0_unadj > 0 THEN token0_address
ELSE token1_address
END AS token_in,
CASE
WHEN amount0_unadj < 0 THEN token0_address
ELSE token1_address
END AS token_out,
sender,
recipient AS tx_to,
'thruster-v3' AS platform,
'v3' AS version,
_log_id,
_inserted_timestamp
FROM
{{ ref('silver_dex__thruster_swaps_v3') }}
{% if is_incremental() and 'thruster_v3' not in var('HEAL_MODELS') %}
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}'
FROM
{{ this }}
)
{% endif %}
),
all_dex AS (
SELECT
*
FROM
ambient
UNION ALL
SELECT
*
FROM
bladeswap
UNION ALL
SELECT
*
FROM
bladeswap_v3
UNION ALL
SELECT
*
FROM
blaster
UNION ALL
SELECT
*
FROM
blaster_v3
UNION ALL
SELECT
*
FROM
ring
UNION ALL
SELECT
*
FROM
ring_v3
UNION ALL
SELECT
*
FROM
sushi
UNION ALL
SELECT
*
FROM
sushi_v3
UNION ALL
SELECT
*
FROM
thruster
UNION ALL
SELECT
*
FROM
thruster_v3
),
complete_dex_swaps AS (
SELECT
s.block_number,
s.block_timestamp,
s.tx_hash,
s.event_index,
origin_function_signature,
origin_from_address,
origin_to_address,
s.contract_address,
event_name,
token_in,
c1.token_decimals AS decimals_in,
c1.token_symbol AS symbol_in,
amount_in_unadj,
CASE
WHEN decimals_in IS NULL THEN amount_in_unadj
ELSE (amount_in_unadj / pow(10, decimals_in))
END AS amount_in,
CASE
WHEN decimals_in IS NOT NULL THEN amount_in * p1.price
ELSE NULL
END AS amount_in_usd,
token_out,
c2.token_decimals AS decimals_out,
c2.token_symbol AS symbol_out,
amount_out_unadj,
CASE
WHEN decimals_out IS NULL THEN amount_out_unadj
ELSE (amount_out_unadj / pow(10, decimals_out))
END AS amount_out,
CASE
WHEN decimals_out IS NOT NULL THEN amount_out * p2.price
ELSE NULL
END AS amount_out_usd,
CASE
WHEN lp.pool_name IS NULL THEN CONCAT(
LEAST(
COALESCE(
symbol_in,
CONCAT(SUBSTRING(token_in, 1, 5), '...', SUBSTRING(token_in, 39, 42))
),
COALESCE(
symbol_out,
CONCAT(SUBSTRING(token_out, 1, 5), '...', SUBSTRING(token_out, 39, 42))
)
),
'-',
GREATEST(
COALESCE(
symbol_in,
CONCAT(SUBSTRING(token_in, 1, 5), '...', SUBSTRING(token_in, 39, 42))
),
COALESCE(
symbol_out,
CONCAT(SUBSTRING(token_out, 1, 5), '...', SUBSTRING(token_out, 39, 42))
)
)
)
ELSE lp.pool_name
END AS pool_name,
sender,
tx_to,
s.platform,
s.version,
s._log_id,
s._inserted_timestamp
FROM
all_dex s
LEFT JOIN {{ ref('silver_dex__ring_pools_tokens_wrapped') }}
rp1 -- join ring_pools_tokens to get prices for underlying, wrapped token addresses
ON s.token_in = rp1.wrapped_token
LEFT JOIN {{ ref('silver_dex__ring_pools_tokens_wrapped') }}
rp2
ON s.token_out = rp2.wrapped_token
LEFT JOIN {{ ref('silver__contracts') }}
c1
ON s.token_in = c1.contract_address
LEFT JOIN {{ ref('silver__contracts') }}
c2
ON s.token_out = c2.contract_address
LEFT JOIN {{ ref('price__ez_prices_hourly') }}
p1
ON (
CASE
WHEN rp1.original_token IS NOT NULL THEN rp1.original_token
ELSE s.token_in
END
) = p1.token_address
AND DATE_TRUNC(
'hour',
block_timestamp
) = p1.hour
LEFT JOIN {{ ref('price__ez_prices_hourly') }}
p2
ON (
CASE
WHEN rp2.original_token IS NOT NULL THEN rp2.original_token
ELSE s.token_out
END
) = p2.token_address
AND DATE_TRUNC(
'hour',
block_timestamp
) = p2.hour
LEFT JOIN {{ ref('silver_dex__complete_dex_liquidity_pools') }}
lp
ON s.contract_address = lp.pool_address
),
{% if is_incremental() and var(
'HEAL_MODEL'
) %}
heal_model AS (
SELECT
t0.block_number,
t0.block_timestamp,
t0.tx_hash,
t0.event_index,
origin_function_signature,
origin_from_address,
origin_to_address,
t0.contract_address,
event_name,
token_in,
c1.token_decimals AS decimals_in,
c1.token_symbol AS symbol_in,
amount_in_unadj,
CASE
WHEN c1.token_decimals IS NULL THEN amount_in_unadj
ELSE (amount_in_unadj / pow(10, c1.token_decimals))
END AS amount_in_heal,
CASE
WHEN c1.token_decimals IS NOT NULL THEN amount_in_heal * p1.price
ELSE NULL
END AS amount_in_usd_heal,
token_out,
c2.token_decimals AS decimals_out,
c2.token_symbol AS symbol_out,
amount_out_unadj,
CASE
WHEN c2.token_decimals IS NULL THEN amount_out_unadj
ELSE (amount_out_unadj / pow(10, c2.token_decimals))
END AS amount_out_heal,
CASE
WHEN c2.token_decimals IS NOT NULL THEN amount_out_heal * p2.price
ELSE NULL
END AS amount_out_usd_heal,
CASE
WHEN lp.pool_name IS NULL THEN CONCAT(
LEAST(
COALESCE(
c1.token_symbol,
CONCAT(SUBSTRING(token_in, 1, 5), '...', SUBSTRING(token_in, 39, 42))
),
COALESCE(
c2.token_symbol,
CONCAT(SUBSTRING(token_out, 1, 5), '...', SUBSTRING(token_out, 39, 42))
)
),
'-',
GREATEST(
COALESCE(
c1.token_symbol,
CONCAT(SUBSTRING(token_in, 1, 5), '...', SUBSTRING(token_in, 39, 42))
),
COALESCE(
c2.token_symbol,
CONCAT(SUBSTRING(token_out, 1, 5), '...', SUBSTRING(token_out, 39, 42))
)
)
)
ELSE lp.pool_name
END AS pool_name_heal,
sender,
tx_to,
t0.platform,
t0.version,
t0._log_id,
t0._inserted_timestamp
FROM
{{ this }}
t0
LEFT JOIN {{ ref('silver_dex__ring_pools_tokens_wrapped') }}
rp1
ON t0.token_in = rp1.wrapped_token
LEFT JOIN {{ ref('silver_dex__ring_pools_tokens_wrapped') }}
rp2
ON t0.token_out = rp2.wrapped_token
LEFT JOIN {{ ref('silver__contracts') }}
c1
ON t0.token_in = c1.contract_address
LEFT JOIN {{ ref('silver__contracts') }}
c2
ON t0.token_out = c2.contract_address
LEFT JOIN {{ ref('price__ez_prices_hourly') }}
p1
ON (
CASE
WHEN rp1.original_token IS NOT NULL THEN rp1.original_token
ELSE t0.token_in
END
) = p1.token_address
AND DATE_TRUNC(
'hour',
block_timestamp
) = p1.hour
LEFT JOIN {{ ref('price__ez_prices_hourly') }}
p2
ON (
CASE
WHEN rp2.original_token IS NOT NULL THEN rp2.original_token
ELSE t0.token_out
END
) = p2.token_address
AND DATE_TRUNC(
'hour',
block_timestamp
) = p2.hour
LEFT JOIN {{ ref('silver_dex__complete_dex_liquidity_pools') }}
lp
ON t0.contract_address = lp.pool_address
WHERE
CONCAT(
t0.block_number,
'-',
t0.platform,
'-',
t0.version
) IN (
SELECT
CONCAT(
t1.block_number,
'-',
t1.platform,
'-',
t1.version
)
FROM
{{ this }}
t1
WHERE
t1.decimals_in IS NULL
AND t1._inserted_timestamp < (
SELECT
MAX(
_inserted_timestamp
) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}'
FROM
{{ this }}
)
AND EXISTS (
SELECT
1
FROM
{{ ref('silver__contracts') }} C
WHERE
C._inserted_timestamp > DATEADD('DAY', -14, SYSDATE())
AND C.token_decimals IS NOT NULL
AND C.contract_address = t1.token_in)
GROUP BY
1
)
OR CONCAT(
t0.block_number,
'-',
t0.platform,
'-',
t0.version
) IN (
SELECT
CONCAT(
t2.block_number,
'-',
t2.platform,
'-',
t2.version
)
FROM
{{ this }}
t2
WHERE
t2.decimals_out IS NULL
AND t2._inserted_timestamp < (
SELECT
MAX(
_inserted_timestamp
) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}'
FROM
{{ this }}
)
AND EXISTS (
SELECT
1
FROM
{{ ref('silver__contracts') }} C
WHERE
C._inserted_timestamp > DATEADD('DAY', -14, SYSDATE())
AND C.token_decimals IS NOT NULL
AND C.contract_address = t2.token_out)
GROUP BY
1
)
OR CONCAT(
t0.block_number,
'-',
t0.platform,
'-',
t0.version
) IN (
SELECT
CONCAT(
t3.block_number,
'-',
t3.platform,
'-',
t3.version
)
FROM
{{ this }}
t3
LEFT JOIN {{ ref('silver_dex__ring_pools_tokens_wrapped') }}
rp1
ON t3.token_in = rp1.wrapped_token
WHERE
t3.amount_in_usd IS NULL
AND t3._inserted_timestamp < (
SELECT
MAX(
_inserted_timestamp
) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}'
FROM
{{ this }}
)
AND EXISTS (
SELECT
1
FROM
{{ ref('silver__complete_token_prices') }}
p
WHERE
p._inserted_timestamp > DATEADD('DAY', -14, SYSDATE())
AND p.price IS NOT NULL
AND p.token_address = (
CASE
WHEN rp1.original_token IS NOT NULL THEN rp1.original_token
ELSE t3.token_in
END
)
AND p.hour = DATE_TRUNC(
'hour',
t3.block_timestamp
)
)
GROUP BY
1
)
OR CONCAT(
t0.block_number,
'-',
t0.platform,
'-',
t0.version
) IN (
SELECT
CONCAT(
t4.block_number,
'-',
t4.platform,
'-',
t4.version
)
FROM
{{ this }}
t4
LEFT JOIN {{ ref('silver_dex__ring_pools_tokens_wrapped') }}
rp2
ON t4.token_out = rp2.wrapped_token
WHERE
t4.amount_out_usd IS NULL
AND t4._inserted_timestamp < (
SELECT
MAX(
_inserted_timestamp
) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}'
FROM
{{ this }}
)
AND EXISTS (
SELECT
1
FROM
{{ ref('silver__complete_token_prices') }}
p
WHERE
p._inserted_timestamp > DATEADD('DAY', -14, SYSDATE())
AND p.price IS NOT NULL
AND p.token_address = (
CASE
WHEN rp2.original_token IS NOT NULL THEN rp2.original_token
ELSE t4.token_out
END
)
AND p.hour = DATE_TRUNC(
'hour',
t4.block_timestamp
)
)
GROUP BY
1
)
),
{% endif %}
FINAL AS (
SELECT
*
FROM
complete_dex_swaps
{% if is_incremental() and var(
'HEAL_MODEL'
) %}
UNION ALL
SELECT
block_number,
block_timestamp,
tx_hash,
event_index,
origin_function_signature,
origin_from_address,
origin_to_address,
contract_address,
event_name,
token_in,
decimals_in,
symbol_in,
amount_in_unadj,
amount_in_heal AS amount_in,
amount_in_usd_heal AS amount_in_usd,
token_out,
decimals_out,
symbol_out,
amount_out_unadj,
amount_out_heal AS amount_out,
amount_out_usd_heal AS amount_out_usd,
pool_name_heal AS pool_name,
sender,
tx_to,
platform,
version,
_log_id,
_inserted_timestamp
FROM
heal_model
{% endif %}
)
SELECT
block_number,
block_timestamp,
tx_hash,
event_index,
origin_function_signature,
origin_from_address,
origin_to_address,
contract_address,
pool_name,
event_name,
amount_in_unadj,
amount_in,
amount_in_usd,
amount_out_unadj,
amount_out,
amount_out_usd,
sender,
tx_to,
platform,
version,
token_in,
token_out,
symbol_in,
symbol_out,
decimals_in,
decimals_out,
_log_id,
_inserted_timestamp,
{{ dbt_utils.generate_surrogate_key(
['tx_hash','event_index']
) }} AS complete_dex_swaps_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
FINAL qualify (ROW_NUMBER() over (PARTITION BY _log_id
ORDER BY
_inserted_timestamp DESC)) = 1

View File

@ -0,0 +1,128 @@
version: 2
models:
- name: silver_dex__complete_dex_swaps
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- _LOG_ID
columns:
- name: BLOCK_NUMBER
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: BLOCK_TIMESTAMP
tests:
- not_null
- dbt_expectations.expect_row_values_to_have_recent_data:
datepart: day
interval: 1
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_LTZ
- TIMESTAMP_NTZ
- name: TX_HASH
tests:
- not_null
- dbt_expectations.expect_column_values_to_match_regex:
regex: 0[xX][0-9a-fA-F]+
- name: CONTRACT_ADDRESS
tests:
- not_null
- dbt_expectations.expect_column_values_to_match_regex:
regex: 0[xX][0-9a-fA-F]+
- name: EVENT_NAME
tests:
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- name: AMOUNT_IN
tests:
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: AMOUNT_OUT
tests:
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: AMOUNT_IN_USD
tests:
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: AMOUNT_OUT_USD
tests:
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: TOKEN_IN
tests:
- not_null:
where: PLATFORM NOT IN ('uniswap-v3','curve')
- dbt_expectations.expect_column_values_to_match_regex:
regex: 0[xX][0-9a-fA-F]+
- name: TOKEN_OUT
tests:
- not_null:
where: PLATFORM NOT IN ('uniswap-v3','curve')
- dbt_expectations.expect_column_values_to_match_regex:
regex: 0[xX][0-9a-fA-F]+
- name: SYMBOL_IN
tests:
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- name: SYMBOL_OUT
tests:
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- name: SENDER
tests:
- not_null:
where: BLOCK_TIMESTAMP > '2021-08-01'
- dbt_expectations.expect_column_values_to_match_regex:
regex: 0[xX][0-9a-fA-F]+
- name: TX_TO
tests:
- not_null
- dbt_expectations.expect_column_values_to_match_regex:
regex: 0[xX][0-9a-fA-F]+
enabled: False
- name: PLATFORM
tests:
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- name: EVENT_INDEX
tests:
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: _LOG_ID
tests:
- not_null
- name: ORIGIN_FUNCTION_SIGNATURE
tests:
- not_null
- name: ORIGIN_FROM_ADDRESS
tests:
- not_null
- dbt_expectations.expect_column_values_to_match_regex:
regex: 0[xX][0-9a-fA-F]+
- name: ORIGIN_TO_ADDRESS
tests:
- dbt_expectations.expect_column_values_to_match_regex:
regex: 0[xX][0-9a-fA-F]+

View File

@ -0,0 +1,58 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'delete+insert',
unique_key = 'pool_address',
tags = ['curated']
) }}
WITH created_pools AS (
SELECT
block_number,
block_timestamp,
tx_hash,
event_index,
contract_address,
regexp_substr_all(SUBSTR(DATA, 3, len(DATA)), '.{64}') AS segmented_data,
CONCAT('0x', SUBSTR(topics [1] :: STRING, 27, 40)) AS token0,
CONCAT('0x', SUBSTR(topics [2] :: STRING, 27, 40)) AS token1,
CONCAT('0x', SUBSTR(segmented_data [0] :: STRING, 25, 40)) AS pool_address,
TRY_TO_NUMBER(
utils.udf_hex_to_int(
segmented_data [1] :: STRING
)
) AS pool_id,
_log_id,
_inserted_timestamp
FROM
{{ ref ('silver__logs') }}
WHERE
contract_address = LOWER('0x42fa929fc636e657ac568c0b5cf38e203b67ac2b') --Sushi/UniswapV2Factory
AND topics [0] :: STRING = '0x0d3648bd0f6ba80134a33ba9275ac585d9d315f0ad8355cddefde31afa28d0e9' --PairCreated
AND tx_status = 'SUCCESS'
{% if is_incremental() %}
AND _inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) - INTERVAL '12 hours'
FROM
{{ this }}
)
{% endif %}
)
SELECT
block_number,
block_timestamp,
tx_hash,
event_index,
contract_address,
token0,
token1,
pool_address,
pool_id,
_log_id,
_inserted_timestamp
FROM
created_pools qualify(ROW_NUMBER() over (PARTITION BY pool_address
ORDER BY
_inserted_timestamp DESC)) = 1

View File

@ -0,0 +1,20 @@
version: 2
models:
- name: silver_dex__sushi_pools
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- POOL_ADDRESS
columns:
- name: BLOCK_NUMBER
tests:
- not_null
- name: POOL_ADDRESS
tests:
- not_null
- name: TOKEN0
tests:
- not_null
- name: TOKEN1
tests:
- not_null

View File

@ -0,0 +1,122 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'delete+insert',
unique_key = 'pool_address',
cluster_by = ['block_timestamp::DATE'],
tags = ['curated']
) }}
WITH created_pools AS (
SELECT
block_number,
block_timestamp,
tx_hash,
event_index,
contract_address,
regexp_substr_all(SUBSTR(DATA, 3, len(DATA)), '.{64}') AS segmented_data,
LOWER(CONCAT('0x', SUBSTR(topics [1] :: STRING, 27, 40))) AS token0_address,
LOWER(CONCAT('0x', SUBSTR(topics [2] :: STRING, 27, 40))) AS token1_address,
utils.udf_hex_to_int(
's2c',
topics [3] :: STRING
) :: INTEGER AS fee,
utils.udf_hex_to_int(
's2c',
segmented_data [0] :: STRING
) :: INTEGER AS tick_spacing,
CONCAT('0x', SUBSTR(segmented_data [1] :: STRING, 25, 40)) AS pool_address,
_log_id,
_inserted_timestamp
FROM
{{ ref('silver__logs') }}
WHERE
topics [0] :: STRING = '0x783cca1c0412dd0d695e784568c96da2e9c22ff989357a2e8b1d9b2b4e6b7118'
AND contract_address = '0x7680d4b43f3d1d54d6cfeeb2169463bfa7a6cf0d' --Sushi/UniswapV3Factory
AND tx_status = 'SUCCESS'
{% if is_incremental() %}
AND _inserted_timestamp >= (
SELECT
MAX(
_inserted_timestamp
) - INTERVAL '12 hours'
FROM
{{ this }}
)
{% endif %}
),
initial_info AS (
SELECT
contract_address,
regexp_substr_all(SUBSTR(DATA, 3, len(DATA)), '.{64}') AS segmented_data,
utils.udf_hex_to_int('s2c', CONCAT('0x', segmented_data [0] :: STRING)) :: FLOAT AS init_sqrtPriceX96,
utils.udf_hex_to_int('s2c', CONCAT('0x', segmented_data [1] :: STRING)) :: FLOAT AS init_tick,
pow(
1.0001,
init_tick
) AS init_price_1_0_unadj,
_log_id,
_inserted_timestamp
FROM
{{ ref('silver__logs') }}
WHERE
topics [0] :: STRING = '0x98636036cb66a9c19a37435efc1e90142190214e8abeb821bdba3f2990dd4c95'
AND tx_status = 'SUCCESS'
{% if is_incremental() %}
AND _inserted_timestamp >= (
SELECT
MAX(
_inserted_timestamp
) - INTERVAL '12 hours'
FROM
{{ this }}
)
{% endif %}
),
FINAL AS (
SELECT
block_number,
block_timestamp,
tx_hash,
event_index,
p.contract_address,
token0_address,
token1_address,
fee :: INTEGER AS fee,
(
fee / 10000
) :: FLOAT AS fee_percent,
tick_spacing,
pool_address,
COALESCE(
init_tick,
0
) AS init_tick,
p._log_id,
p._inserted_timestamp
FROM
created_pools p
LEFT JOIN initial_info i
ON p.pool_address = i.contract_address
)
SELECT
block_number,
block_timestamp,
tx_hash,
event_index,
contract_address,
token0_address,
token1_address,
fee,
fee_percent,
tick_spacing,
pool_address,
init_tick,
_log_id,
_inserted_timestamp
FROM
FINAL qualify(ROW_NUMBER() over(PARTITION BY pool_address
ORDER BY
_inserted_timestamp DESC)) = 1

View File

@ -0,0 +1,23 @@
version: 2
models:
- name: silver_dex__sushi_pools_v3
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- POOL_ADDRESS
columns:
- name: BLOCK_NUMBER
tests:
- not_null
- name: POOL_ADDRESS
tests:
- not_null
- name: TOKEN0_ADDRESS
tests:
- not_null
- name: TOKEN1_ADDRESS
tests:
- not_null

View File

@ -0,0 +1,112 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'delete+insert',
unique_key = 'block_number',
cluster_by = ['block_timestamp::DATE'],
tags = ['curated','reorg']
) }}
WITH swaps_base AS (
SELECT
l.block_number,
l.block_timestamp,
l.tx_hash,
l.event_index,
origin_function_signature,
origin_from_address,
origin_to_address,
l.contract_address,
regexp_substr_all(SUBSTR(DATA, 3, len(DATA)), '.{64}') AS segmented_data,
TRY_TO_NUMBER(
utils.udf_hex_to_int(
segmented_data [0] :: STRING
)
) AS amount0In,
TRY_TO_NUMBER(
utils.udf_hex_to_int(
segmented_data [1] :: STRING
)
) AS amount1In,
TRY_TO_NUMBER(
utils.udf_hex_to_int(
segmented_data [2] :: STRING
)
) AS amount0Out,
TRY_TO_NUMBER(
utils.udf_hex_to_int(
segmented_data [3] :: STRING
)
) AS amount1Out,
CONCAT('0x', SUBSTR(topics [1] :: STRING, 27, 40)) AS sender,
CONCAT('0x', SUBSTR(topics [2] :: STRING, 27, 40)) AS tx_to,
token0,
token1,
l._log_id,
l._inserted_timestamp
FROM
{{ ref('silver__logs') }}
l
INNER JOIN {{ ref('silver_dex__sushi_pools') }}
p
ON p.pool_address = l.contract_address
WHERE
topics [0] :: STRING = '0xd78ad95fa46c994b6551d0da85fc275fe613ce37657fb8d5e3d130840159d822'
AND tx_status = 'SUCCESS'
{% if is_incremental() %}
AND l._inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) - INTERVAL '12 hours'
FROM
{{ this }}
)
{% endif %}
)
SELECT
block_number,
block_timestamp,
tx_hash,
event_index,
origin_function_signature,
origin_from_address,
origin_to_address,
contract_address,
sender,
tx_to,
amount0In,
amount1In,
amount0Out,
amount1Out,
token0,
token1,
CASE
WHEN amount0In <> 0
AND amount1In <> 0
AND amount0Out <> 0 THEN amount1In
WHEN amount0In <> 0 THEN amount0In
WHEN amount1In <> 0 THEN amount1In
END AS amount_in_unadj,
CASE
WHEN amount0Out <> 0 THEN amount0Out
WHEN amount1Out <> 0 THEN amount1Out
END AS amount_out_unadj,
CASE
WHEN amount0In <> 0
AND amount1In <> 0
AND amount0Out <> 0 THEN token1
WHEN amount0In <> 0 THEN token0
WHEN amount1In <> 0 THEN token1
END AS token_in,
CASE
WHEN amount0Out <> 0 THEN token0
WHEN amount1Out <> 0 THEN token1
END AS token_out,
'Swap' AS event_name,
'sushiswap' AS platform,
_log_id,
_inserted_timestamp
FROM
swaps_base
WHERE
token_in <> token_out

View File

@ -0,0 +1,64 @@
version: 2
models:
- name: silver_dex__sushi_swaps
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- _LOG_ID
columns:
- name: BLOCK_NUMBER
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: BLOCK_TIMESTAMP
tests:
- not_null
- dbt_expectations.expect_row_values_to_have_recent_data:
datepart: day
interval: 2
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_LTZ
- TIMESTAMP_NTZ
- name: CONTRACT_ADDRESS
tests:
- not_null
- dbt_expectations.expect_column_values_to_match_regex:
regex: 0[xX][0-9a-fA-F]+
- name: AMOUNT_IN_UNADJ
tests:
- not_null
- name: AMOUNT_OUT_UNADJ
tests:
- not_null
- name: TOKEN_IN
tests:
- not_null
- dbt_expectations.expect_column_values_to_match_regex:
regex: 0[xX][0-9a-fA-F]+
- name: TOKEN_OUT
tests:
- not_null
- dbt_expectations.expect_column_values_to_match_regex:
regex: 0[xX][0-9a-fA-F]+
- name: SYMBOL_IN
tests:
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- name: SYMBOL_OUT
tests:
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- name: TX_TO
tests:
- not_null
- dbt_expectations.expect_column_values_to_match_regex:
regex: 0[xX][0-9a-fA-F]+

View File

@ -0,0 +1,97 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'delete+insert',
unique_key = 'block_number',
cluster_by = ['block_timestamp::DATE'],
tags = ['curated','reorg']
) }}
WITH swaps_base AS (
SELECT
l.block_number,
l.block_timestamp,
l.tx_hash,
l.event_index,
l.origin_function_signature,
l.origin_from_address,
l.origin_to_address,
l.contract_address,
regexp_substr_all(SUBSTR(DATA, 3, len(DATA)), '.{64}') AS segmented_data,
CONCAT('0x', SUBSTR(topics [1] :: STRING, 27, 40)) AS sender,
CONCAT('0x', SUBSTR(topics [2] :: STRING, 27, 40)) AS recipient,
utils.udf_hex_to_int(
's2c',
segmented_data [0] :: STRING
) :: FLOAT AS amount0_unadj,
utils.udf_hex_to_int(
's2c',
segmented_data [1] :: STRING
) :: FLOAT AS amount1_unadj,
utils.udf_hex_to_int(
's2c',
segmented_data [2] :: STRING
) :: FLOAT AS sqrtPriceX96,
utils.udf_hex_to_int(
's2c',
segmented_data [3] :: STRING
) :: FLOAT AS liquidity,
utils.udf_hex_to_int(
's2c',
segmented_data [4] :: STRING
) :: FLOAT AS tick,
token0_address,
token1_address,
pool_address,
tick_spacing,
fee,
l._log_id,
l._inserted_timestamp
FROM
{{ ref('silver__logs') }}
l
INNER JOIN {{ ref('silver_dex__sushi_pools_v3') }}
p
ON p.pool_address = l.contract_address
WHERE
l.block_timestamp :: DATE >= '2024-01-01'
AND topics [0] :: STRING = '0xc42079f94a6350d7e6235f29174924f928cc2ac818eb64fed8004e115fbcca67'
AND tx_status = 'SUCCESS'
{% if is_incremental() %}
AND l._inserted_timestamp >= (
SELECT
MAX(
_inserted_timestamp
) - INTERVAL '12 hours'
FROM
{{ this }}
)
{% endif %}
)
SELECT
block_number,
block_timestamp,
tx_hash,
event_index,
origin_function_signature,
origin_from_address,
origin_to_address,
contract_address,
pool_address,
recipient,
sender,
fee,
tick,
tick_spacing,
liquidity,
token0_address,
token1_address,
amount0_unadj,
amount1_unadj,
_log_id,
_inserted_timestamp
FROM
swaps_base qualify(ROW_NUMBER() over(PARTITION BY _log_id
ORDER BY
_inserted_timestamp DESC)) = 1

View File

@ -0,0 +1,38 @@
version: 2
models:
- name: silver_dex__sushi_swaps_v3
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- _LOG_ID
columns:
- name: BLOCK_NUMBER
tests:
- not_null
- name: BLOCK_TIMESTAMP
tests:
- not_null
- dbt_expectations.expect_row_values_to_have_recent_data:
datepart: day
interval: 1
- name: POOL_ADDRESS
tests:
- not_null
- name: RECIPIENT
tests:
- not_null
- name: SENDER
tests:
- not_null
- name: TOKEN0_ADDRESS
tests:
- not_null
- name: TOKEN1_ADDRESS
tests:
- not_null
- name: AMOUNT0_UNADJ
tests:
- not_null
- name: AMOUNT1_UNADJ
tests:
- not_null

View File

@ -0,0 +1,58 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'delete+insert',
unique_key = 'pool_address',
tags = ['curated']
) }}
WITH created_pools AS (
SELECT
block_number,
block_timestamp,
tx_hash,
event_index,
contract_address,
regexp_substr_all(SUBSTR(DATA, 3, len(DATA)), '.{64}') AS segmented_data,
CONCAT('0x', SUBSTR(topics [1] :: STRING, 27, 40)) AS token0,
CONCAT('0x', SUBSTR(topics [2] :: STRING, 27, 40)) AS token1,
CONCAT('0x', SUBSTR(segmented_data [0] :: STRING, 25, 40)) AS pool_address,
TRY_TO_NUMBER(
utils.udf_hex_to_int(
segmented_data [1] :: STRING
)
) AS pool_id,
_log_id,
_inserted_timestamp
FROM
{{ ref ('silver__logs') }}
WHERE
contract_address = LOWER('0xb4a7d971d0adea1c73198c97d7ab3f9ce4aafa13') --Thruster: ThrusterRouter (0.30%)
AND topics [0] :: STRING = '0x0d3648bd0f6ba80134a33ba9275ac585d9d315f0ad8355cddefde31afa28d0e9' --PairCreated
AND tx_status = 'SUCCESS'
{% if is_incremental() %}
AND _inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) - INTERVAL '12 hours'
FROM
{{ this }}
)
{% endif %}
)
SELECT
block_number,
block_timestamp,
tx_hash,
event_index,
contract_address,
token0,
token1,
pool_address,
pool_id,
_log_id,
_inserted_timestamp
FROM
created_pools qualify(ROW_NUMBER() over (PARTITION BY pool_address
ORDER BY
_inserted_timestamp DESC)) = 1

View File

@ -0,0 +1,20 @@
version: 2
models:
- name: silver_dex__thruster_pools
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- POOL_ADDRESS
columns:
- name: BLOCK_NUMBER
tests:
- not_null
- name: POOL_ADDRESS
tests:
- not_null
- name: TOKEN0
tests:
- not_null
- name: TOKEN1
tests:
- not_null

View File

@ -0,0 +1,122 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'delete+insert',
unique_key = 'pool_address',
cluster_by = ['block_timestamp::DATE'],
tags = ['curated']
) }}
WITH created_pools AS (
SELECT
block_number,
block_timestamp,
tx_hash,
event_index,
contract_address,
regexp_substr_all(SUBSTR(DATA, 3, len(DATA)), '.{64}') AS segmented_data,
LOWER(CONCAT('0x', SUBSTR(topics [1] :: STRING, 27, 40))) AS token0_address,
LOWER(CONCAT('0x', SUBSTR(topics [2] :: STRING, 27, 40))) AS token1_address,
utils.udf_hex_to_int(
's2c',
topics [3] :: STRING
) :: INTEGER AS fee,
utils.udf_hex_to_int(
's2c',
segmented_data [0] :: STRING
) :: INTEGER AS tick_spacing,
CONCAT('0x', SUBSTR(segmented_data [1] :: STRING, 25, 40)) AS pool_address,
_log_id,
_inserted_timestamp
FROM
{{ ref('silver__logs') }}
WHERE
topics [0] :: STRING = '0x783cca1c0412dd0d695e784568c96da2e9c22ff989357a2e8b1d9b2b4e6b7118'
AND contract_address = '0x71b08f13b3c3af35aadeb3949afeb1ded1016127' --ThrusterPoolFactory
AND tx_status = 'SUCCESS'
{% if is_incremental() %}
AND _inserted_timestamp >= (
SELECT
MAX(
_inserted_timestamp
) - INTERVAL '12 hours'
FROM
{{ this }}
)
{% endif %}
),
initial_info AS (
SELECT
contract_address,
regexp_substr_all(SUBSTR(DATA, 3, len(DATA)), '.{64}') AS segmented_data,
utils.udf_hex_to_int('s2c', CONCAT('0x', segmented_data [0] :: STRING)) :: FLOAT AS init_sqrtPriceX96,
utils.udf_hex_to_int('s2c', CONCAT('0x', segmented_data [1] :: STRING)) :: FLOAT AS init_tick,
pow(
1.0001,
init_tick
) AS init_price_1_0_unadj,
_log_id,
_inserted_timestamp
FROM
{{ ref('silver__logs') }}
WHERE
topics [0] :: STRING = '0x98636036cb66a9c19a37435efc1e90142190214e8abeb821bdba3f2990dd4c95'
AND tx_status = 'SUCCESS'
{% if is_incremental() %}
AND _inserted_timestamp >= (
SELECT
MAX(
_inserted_timestamp
) - INTERVAL '12 hours'
FROM
{{ this }}
)
{% endif %}
),
FINAL AS (
SELECT
block_number,
block_timestamp,
tx_hash,
event_index,
p.contract_address,
token0_address,
token1_address,
fee :: INTEGER AS fee,
(
fee / 10000
) :: FLOAT AS fee_percent,
tick_spacing,
pool_address,
COALESCE(
init_tick,
0
) AS init_tick,
p._log_id,
p._inserted_timestamp
FROM
created_pools p
LEFT JOIN initial_info i
ON p.pool_address = i.contract_address
)
SELECT
block_number,
block_timestamp,
tx_hash,
event_index,
contract_address,
token0_address,
token1_address,
fee,
fee_percent,
tick_spacing,
pool_address,
init_tick,
_log_id,
_inserted_timestamp
FROM
FINAL qualify(ROW_NUMBER() over(PARTITION BY pool_address
ORDER BY
_inserted_timestamp DESC)) = 1

View File

@ -0,0 +1,23 @@
version: 2
models:
- name: silver_dex__thruster_pools_v3
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- POOL_ADDRESS
columns:
- name: BLOCK_NUMBER
tests:
- not_null
- name: POOL_ADDRESS
tests:
- not_null
- name: TOKEN0_ADDRESS
tests:
- not_null
- name: TOKEN1_ADDRESS
tests:
- not_null

View File

@ -0,0 +1,112 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'delete+insert',
unique_key = 'block_number',
cluster_by = ['block_timestamp::DATE'],
tags = ['curated','reorg']
) }}
WITH swaps_base AS (
SELECT
l.block_number,
l.block_timestamp,
l.tx_hash,
l.event_index,
origin_function_signature,
origin_from_address,
origin_to_address,
l.contract_address,
regexp_substr_all(SUBSTR(DATA, 3, len(DATA)), '.{64}') AS segmented_data,
TRY_TO_NUMBER(
utils.udf_hex_to_int(
segmented_data [0] :: STRING
)
) AS amount0In,
TRY_TO_NUMBER(
utils.udf_hex_to_int(
segmented_data [1] :: STRING
)
) AS amount1In,
TRY_TO_NUMBER(
utils.udf_hex_to_int(
segmented_data [2] :: STRING
)
) AS amount0Out,
TRY_TO_NUMBER(
utils.udf_hex_to_int(
segmented_data [3] :: STRING
)
) AS amount1Out,
CONCAT('0x', SUBSTR(topics [1] :: STRING, 27, 40)) AS sender,
CONCAT('0x', SUBSTR(topics [2] :: STRING, 27, 40)) AS tx_to,
token0,
token1,
l._log_id,
l._inserted_timestamp
FROM
{{ ref('silver__logs') }}
l
INNER JOIN {{ ref('silver_dex__thruster_pools') }}
p
ON p.pool_address = l.contract_address
WHERE
topics [0] :: STRING = '0xd78ad95fa46c994b6551d0da85fc275fe613ce37657fb8d5e3d130840159d822'
AND tx_status = 'SUCCESS'
{% if is_incremental() %}
AND l._inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) - INTERVAL '12 hours'
FROM
{{ this }}
)
{% endif %}
)
SELECT
block_number,
block_timestamp,
tx_hash,
event_index,
origin_function_signature,
origin_from_address,
origin_to_address,
contract_address,
sender,
tx_to,
amount0In,
amount1In,
amount0Out,
amount1Out,
token0,
token1,
CASE
WHEN amount0In <> 0
AND amount1In <> 0
AND amount0Out <> 0 THEN amount1In
WHEN amount0In <> 0 THEN amount0In
WHEN amount1In <> 0 THEN amount1In
END AS amount_in_unadj,
CASE
WHEN amount0Out <> 0 THEN amount0Out
WHEN amount1Out <> 0 THEN amount1Out
END AS amount_out_unadj,
CASE
WHEN amount0In <> 0
AND amount1In <> 0
AND amount0Out <> 0 THEN token1
WHEN amount0In <> 0 THEN token0
WHEN amount1In <> 0 THEN token1
END AS token_in,
CASE
WHEN amount0Out <> 0 THEN token0
WHEN amount1Out <> 0 THEN token1
END AS token_out,
'Swap' AS event_name,
'thruster' AS platform,
_log_id,
_inserted_timestamp
FROM
swaps_base
WHERE
token_in <> token_out

View File

@ -0,0 +1,65 @@
version: 2
models:
- name: silver_dex__thruster_swaps
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- _LOG_ID
columns:
- name: BLOCK_NUMBER
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: BLOCK_TIMESTAMP
tests:
- not_null
- dbt_expectations.expect_row_values_to_have_recent_data:
datepart: day
interval: 2
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_LTZ
- TIMESTAMP_NTZ
- name: CONTRACT_ADDRESS
tests:
- not_null
- dbt_expectations.expect_column_values_to_match_regex:
regex: 0[xX][0-9a-fA-F]+
- name: AMOUNT_IN_UNADJ
tests:
- not_null
- name: AMOUNT_OUT_UNADJ
tests:
- not_null
- name: TOKEN_IN
tests:
- not_null
- dbt_expectations.expect_column_values_to_match_regex:
regex: 0[xX][0-9a-fA-F]+
- name: TOKEN_OUT
tests:
- not_null
- dbt_expectations.expect_column_values_to_match_regex:
regex: 0[xX][0-9a-fA-F]+
- name: SYMBOL_IN
tests:
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- name: SYMBOL_OUT
tests:
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- name: TX_TO
tests:
- not_null
- dbt_expectations.expect_column_values_to_match_regex:
regex: 0[xX][0-9a-fA-F]+

View File

@ -0,0 +1,97 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'delete+insert',
unique_key = 'block_number',
cluster_by = ['block_timestamp::DATE'],
tags = ['curated','reorg']
) }}
WITH swaps_base AS (
SELECT
l.block_number,
l.block_timestamp,
l.tx_hash,
l.event_index,
l.origin_function_signature,
l.origin_from_address,
l.origin_to_address,
l.contract_address,
regexp_substr_all(SUBSTR(DATA, 3, len(DATA)), '.{64}') AS segmented_data,
CONCAT('0x', SUBSTR(topics [1] :: STRING, 27, 40)) AS sender,
CONCAT('0x', SUBSTR(topics [2] :: STRING, 27, 40)) AS recipient,
utils.udf_hex_to_int(
's2c',
segmented_data [0] :: STRING
) :: FLOAT AS amount0_unadj,
utils.udf_hex_to_int(
's2c',
segmented_data [1] :: STRING
) :: FLOAT AS amount1_unadj,
utils.udf_hex_to_int(
's2c',
segmented_data [2] :: STRING
) :: FLOAT AS sqrtPriceX96,
utils.udf_hex_to_int(
's2c',
segmented_data [3] :: STRING
) :: FLOAT AS liquidity,
utils.udf_hex_to_int(
's2c',
segmented_data [4] :: STRING
) :: FLOAT AS tick,
token0_address,
token1_address,
pool_address,
tick_spacing,
fee,
l._log_id,
l._inserted_timestamp
FROM
{{ ref('silver__logs') }}
l
INNER JOIN {{ ref('silver_dex__thruster_pools_v3') }}
p
ON p.pool_address = l.contract_address
WHERE
l.block_timestamp :: DATE >= '2024-01-01'
AND topics [0] :: STRING = '0xc42079f94a6350d7e6235f29174924f928cc2ac818eb64fed8004e115fbcca67'
AND tx_status = 'SUCCESS'
{% if is_incremental() %}
AND l._inserted_timestamp >= (
SELECT
MAX(
_inserted_timestamp
) - INTERVAL '12 hours'
FROM
{{ this }}
)
{% endif %}
)
SELECT
block_number,
block_timestamp,
tx_hash,
event_index,
origin_function_signature,
origin_from_address,
origin_to_address,
contract_address,
pool_address,
recipient,
sender,
fee,
tick,
tick_spacing,
liquidity,
token0_address,
token1_address,
amount0_unadj,
amount1_unadj,
_log_id,
_inserted_timestamp
FROM
swaps_base qualify(ROW_NUMBER() over(PARTITION BY _log_id
ORDER BY
_inserted_timestamp DESC)) = 1

View File

@ -0,0 +1,38 @@
version: 2
models:
- name: silver_dex__thruster_swaps_v3
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- _LOG_ID
columns:
- name: BLOCK_NUMBER
tests:
- not_null
- name: BLOCK_TIMESTAMP
tests:
- not_null
- dbt_expectations.expect_row_values_to_have_recent_data:
datepart: day
interval: 1
- name: POOL_ADDRESS
tests:
- not_null
- name: RECIPIENT
tests:
- not_null
- name: SENDER
tests:
- not_null
- name: TOKEN0_ADDRESS
tests:
- not_null
- name: TOKEN1_ADDRESS
tests:
- not_null
- name: AMOUNT0_UNADJ
tests:
- not_null
- name: AMOUNT1_UNADJ
tests:
- not_null