An 3886/curate-new-base-dexes (#67)

* add aerodrome swaps and pools

* curve models -- not totally accurate yet

* add curve, add to complete swaps and pools, add to gold platform header

* format models

* fix formatting on curve pools

* fix curve union

* PR comment fixes

* doc updates

* doc update 2

* added e
This commit is contained in:
Matt Romano 2023-09-06 16:17:57 -07:00 committed by GitHub
parent 9155512738
commit 8d4699052f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 1297 additions and 9 deletions

View File

@ -1,5 +1,5 @@
{% docs eth_dex_lp_table_doc %}
This table contains details on decentralized exchange (DEX) liquidity pools (LP) on the base blockchain, including the tokens, symbols and decimals within each pool alongside the following protocols: SUSHI, UNISWAP, BALANCER, SWAPBASED, BASESWAP, DACKIE, WOOFI, and MAVERICK.
This table contains details on decentralized exchange (DEX) liquidity pools (LP) on the base blockchain, including the tokens, symbols and decimals within each pool alongside the following protocols: SUSHI, UNISWAP, BALANCER, SWAPBASED, BASESWAP, DACKIE, WOOFI, AERODROME, CURVE, and MAVERICK.
{% enddocs %}

View File

@ -1,6 +1,6 @@
{% docs eth_ez_dex_swaps_table_doc %}
This table currently contains swap events from the ```fact_event_logs``` table for SUSHI, UNISWAP, BALANCER, SWAPBASED, BASESWAP, DACKIE, WOOFI, and MAVERICK. along with other helpful columns including an amount USD where possible. Other dexes coming soon!
This table currently contains swap events from the ```fact_event_logs``` table for SUSHI, UNISWAP, BALANCER, SWAPBASED, BASESWAP, DACKIE, WOOFI, AERODROME, CURVE, and MAVERICK. along with other helpful columns including an amount USD where possible. Other dexes coming soon!
Note: A rule has been put in place to null out the amount_USD if that number is too divergent between amount_in_USD and amount_out_usd. This can happen for swaps of less liquid tokens during very high fluctuation of price.
{% enddocs %}

View File

@ -5,7 +5,7 @@
meta={
'database_tags':{
'table': {
'PROTOCOL': 'SUSHI, UNISWAP, BALANCER, SWAPBASED, BASESWAP, MAVERICK, DACKIE, WOOFI',
'PROTOCOL': 'SUSHI, UNISWAP, BALANCER, SWAPBASED, BASESWAP, MAVERICK, DACKIE, WOOFI, AERODROME, CURVE',
'PURPOSE': 'DEX, LIQUIDITY, POOLS, LP, SWAPS',
}
}

View File

@ -2,7 +2,7 @@
materialized = 'view',
persist_docs ={ "relation": true,
"columns": true },
meta ={ 'database_tags':{ 'table':{ 'PROTOCOL': 'SUSHI, UNISWAP, BALANCER, SWAPBASED, BASESWAP, MAVERICK, DACKIE, WOOFI',
meta ={ 'database_tags':{ 'table':{ 'PROTOCOL': 'SUSHI, UNISWAP, BALANCER, SWAPBASED, BASESWAP, MAVERICK, DACKIE, WOOFI, AERODROME, CURVE',
'PURPOSE': 'DEX, SWAPS' } } }
) }}

View File

@ -0,0 +1,63 @@
{{ config(
materialized = 'incremental',
unique_key = "pool_address",
tags = ['non_realtime']
) }}
WITH created_pools AS(
SELECT
block_number,
block_timestamp,
tx_hash,
contract_address,
regexp_substr_all(SUBSTR(DATA, 3, len(DATA)), '.{64}') AS segmented_data,
LOWER(CONCAT('0x', SUBSTR(topics [1] :: STRING, 27, 40))) AS token0,
LOWER(CONCAT('0x', SUBSTR(topics [2] :: STRING, 27, 40))) AS token1,
LOWER(CONCAT('0x', SUBSTR(topics [3] :: STRING, 27, 40))) AS stable,
utils.udf_hex_to_int(
's2c',
segmented_data [1] :: STRING
) :: INTEGER AS pool_number,
CONCAT('0x', SUBSTR(segmented_data [0] :: STRING, 25, 40)) AS pool_address,
_log_id,
_inserted_timestamp
FROM
{{ ref('silver__logs') }}
WHERE
topics [0] = '0x2128d88d14c80cb081c1252a5acff7a264671bf199ce226b53788fb26065005e'
AND contract_address = '0x420dd381b31aef6683db6b902084cb0ffece40da'
{% if is_incremental() %}
AND _inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) :: DATE
FROM
{{ this }}
)
AND pool_address NOT IN (
SELECT
DISTINCT pool_address
FROM
{{ this }}
)
{% endif %}
)
SELECT
block_number,
block_timestamp,
tx_hash,
contract_address,
segmented_data,
token0,
token1,
CASE
WHEN stable = '0x0000000000000000000000000000000000000001' THEN TRUE
WHEN stable = '0x0000000000000000000000000000000000000000' THEN FALSE
END AS stable,
pool_number,
pool_address,
_log_id,
_inserted_timestamp
FROM
created_pools

View File

@ -0,0 +1,23 @@
version: 2
models:
- name: silver_dex__aerodrome_pools
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- POOL_ADDRESS
columns:
- name: POOL_ADDRESS
tests:
- not_null
- name: TOKEN0
tests:
- not_null
- name: TOKEN1
tests:
- not_null
- name: _INSERTED_TIMESTAMP
tests:
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_NTZ
- TIMESTAMP_LTZ

View File

@ -0,0 +1,116 @@
{{ config(
materialized = 'incremental',
unique_key = '_log_id',
cluster_by = ['block_timestamp::DATE'],
tags = ['non_realtime']
) }}
WITH pools AS (
SELECT
pool_address,
token0,
token1
FROM
{{ ref('silver_dex__aerodrome_pools') }}
),
swaps_base AS (
SELECT
block_number,
origin_function_signature,
origin_from_address,
origin_to_address,
block_timestamp,
tx_hash,
event_index,
contract_address,
regexp_substr_all(SUBSTR(DATA, 3, len(DATA)), '.{64}') AS segmented_data,
TRY_TO_NUMBER(
utils.udf_hex_to_int(
segmented_data [0] :: STRING
)
) AS amount0In,
TRY_TO_NUMBER(
utils.udf_hex_to_int(
segmented_data [1] :: STRING
)
) AS amount1In,
TRY_TO_NUMBER(
utils.udf_hex_to_int(
segmented_data [2] :: STRING
)
) AS amount0Out,
TRY_TO_NUMBER(
utils.udf_hex_to_int(
segmented_data [3] :: STRING
)
) AS amount1Out,
CONCAT('0x', SUBSTR(topics [1] :: STRING, 27, 40)) AS sender,
CONCAT('0x', SUBSTR(topics [2] :: STRING, 27, 40)) AS tx_to,
token0,
token1,
_log_id,
_inserted_timestamp
FROM
{{ ref('silver__logs') }}
INNER JOIN pools p
ON p.pool_address = contract_address
WHERE
topics [0] :: STRING = '0xb3e2773606abfd36b5bd91394b3a54d1398336c65005baf7bf7a05efeffaf75b'
AND tx_status = 'SUCCESS'
{% if is_incremental() %}
AND _inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) :: DATE
FROM
{{ this }}
)
{% endif %}
)
SELECT
block_number,
block_timestamp,
origin_function_signature,
origin_from_address,
origin_to_address,
tx_hash,
event_index,
contract_address,
sender,
tx_to,
amount0In,
amount1In,
amount0Out,
amount1Out,
token0,
token1,
CASE
WHEN amount0In <> 0
AND amount1In <> 0
AND amount0Out <> 0 THEN amount1In
WHEN amount0In <> 0 THEN amount0In
WHEN amount1In <> 0 THEN amount1In
END AS amount_in_unadj,
CASE
WHEN amount0Out <> 0 THEN amount0Out
WHEN amount1Out <> 0 THEN amount1Out
END AS amount_out_unadj,
CASE
WHEN amount0In <> 0
AND amount1In <> 0
AND amount0Out <> 0 THEN token1
WHEN amount0In <> 0 THEN token0
WHEN amount1In <> 0 THEN token1
END AS token_in,
CASE
WHEN amount0Out <> 0 THEN token0
WHEN amount1Out <> 0 THEN token1
END AS token_out,
'Swap' AS event_name,
'aerodrome' AS platform,
_log_id,
_inserted_timestamp
FROM
swaps_base
WHERE
token_in <> token_out

View File

@ -0,0 +1,116 @@
version: 2
models:
- name: silver_dex__aerodrome_swaps
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- _LOG_ID
columns:
- name: BLOCK_NUMBER
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: BLOCK_TIMESTAMP
tests:
- not_null
- dbt_expectations.expect_row_values_to_have_recent_data:
datepart: day
interval: 2
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_NTZ
- TIMESTAMP_LTZ
- name: TX_HASH
tests:
- not_null
- dbt_expectations.expect_column_values_to_match_regex:
regex: 0[xX][0-9a-fA-F]+
- name: CONTRACT_ADDRESS
tests:
- not_null
- dbt_expectations.expect_column_values_to_match_regex:
regex: 0[xX][0-9a-fA-F]+
- name: AMOUNT_IN_UNADJ
tests:
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: AMOUNT_IN
tests:
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: AMOUNT_OUT_UNADJ
tests:
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: AMOUNT_OUT
tests:
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: TOKEN_IN
tests:
- not_null
- dbt_expectations.expect_column_values_to_match_regex:
regex: 0[xX][0-9a-fA-F]+
- name: TOKEN_OUT
tests:
- not_null
- dbt_expectations.expect_column_values_to_match_regex:
regex: 0[xX][0-9a-fA-F]+
- name: SYMBOL_IN
tests:
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- name: SYMBOL_OUT
tests:
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- name: TX_TO
tests:
- not_null
- dbt_expectations.expect_column_values_to_match_regex:
regex: 0[xX][0-9a-fA-F]+
- name: PLATFORM
tests:
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- name: EVENT_INDEX
tests:
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: _LOG_ID
tests:
- not_null
- name: ORIGIN_FUNCTION_SIGNATURE
tests:
- not_null
- name: ORIGIN_FROM_ADDRESS
tests:
- not_null
- dbt_expectations.expect_column_values_to_match_regex:
regex: 0[xX][0-9a-fA-F]+
- name: ORIGIN_TO_ADDRESS
tests:
- dbt_expectations.expect_column_values_to_match_regex:
regex: 0[xX][0-9a-fA-F]+

View File

@ -0,0 +1,348 @@
{{ config(
materialized = 'incremental',
unique_key = "pool_id",
tags = ['non_realtime']
) }}
-- full_refresh = false,
WITH contract_deployments AS (
SELECT
tx_hash,
block_number,
block_timestamp,
from_address AS deployer_address,
to_address AS contract_address,
_call_id,
_inserted_timestamp,
ROW_NUMBER() over (
ORDER BY
contract_address
) AS row_num
FROM
{{ ref(
'silver__traces'
) }}
WHERE
-- curve contract deployers
from_address IN (
'0xa5961898870943c68037f6848d2d866ed2016bcb',
'0x3093f9b57a428f3eb6285a589cb35bea6e78c336',
'0x5ef72230578b3e399e6c6f4f6360edf95e83bbfd'
)
{% if is_incremental() %}
AND _inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) :: DATE - 3
FROM
{{ this }}
)
AND to_address NOT IN (
SELECT
DISTINCT pool_address
FROM
{{ this }}
)
{% endif %}
QUALIFY(ROW_NUMBER() OVER(PARTITION BY to_address ORDER BY block_timestamp ASC)) = 1
),
function_sigs AS (
SELECT
'0x87cb4f57' AS function_sig,
'base_coins' AS function_name
UNION ALL
SELECT
'0xb9947eb0' AS function_sig,
'underlying_coins' AS function_name
UNION ALL
SELECT
'0xc6610657' AS function_sig,
'coins' AS function_name
UNION ALL
SELECT
'0x06fdde03' AS function_sig,
'name' AS function_name
UNION ALL
SELECT
'0x95d89b41' AS function_sig,
'symbol' AS function_name
UNION ALL
SELECT
'0x313ce567' AS function_sig,
'decimals' AS function_name
),
function_inputs AS (
SELECT
SEQ4() AS function_input
FROM
TABLE(GENERATOR(rowcount => 8))
),
inputs_coins AS (
SELECT
deployer_address,
contract_address,
block_number,
function_sig,
(ROW_NUMBER() OVER (PARTITION BY contract_address
ORDER BY block_number)) - 1 AS function_input
FROM contract_deployments
JOIN function_sigs ON 1=1
JOIN function_inputs ON 1=1
WHERE function_name = 'coins'
),
inputs_base_coins AS (
SELECT
deployer_address,
contract_address,
block_number,
function_sig,
(ROW_NUMBER() OVER (PARTITION BY contract_address
ORDER BY block_number)) - 1 AS function_input
FROM contract_deployments
JOIN function_sigs ON 1=1
JOIN function_inputs ON 1=1
WHERE function_name = 'base_coins'
),
inputs_underlying_coins AS (
SELECT
deployer_address,
contract_address,
block_number,
function_sig,
(ROW_NUMBER() OVER (PARTITION BY contract_address
ORDER BY block_number)) - 1 AS function_input
FROM contract_deployments
JOIN function_sigs ON 1=1
JOIN function_inputs ON 1=1
WHERE function_name = 'underlying_coins'
),
inputs_pool_details AS (
SELECT
deployer_address,
contract_address,
block_number,
function_sig,
NULL AS function_input
FROM contract_deployments
JOIN function_sigs ON 1=1
WHERE function_name IN ('name','symbol','decimals')
),
all_inputs AS (
SELECT
deployer_address,
contract_address,
block_number,
function_sig,
function_input
FROM inputs_coins
UNION ALL
SELECT
deployer_address,
contract_address,
block_number,
function_sig,
function_input
FROM inputs_base_coins
UNION ALL
SELECT
deployer_address,
contract_address,
block_number,
function_sig,
function_input
FROM inputs_underlying_coins
UNION ALL
SELECT
deployer_address,
contract_address,
block_number,
function_sig,
function_input
FROM inputs_pool_details
),
pool_token_reads AS (
{% for item in range(10) %}
(
SELECT
ethereum.streamline.udf_json_rpc_read_calls(
node_url,
headers,
PARSE_JSON(batch_read)
) AS read_output,
SYSDATE() AS _inserted_timestamp
FROM (
SELECT
CONCAT('[', LISTAGG(read_input, ','), ']') AS batch_read
FROM (
SELECT
deployer_address,
contract_address,
block_number,
function_sig,
function_input,
CONCAT(
'[\'',
contract_address,
'\',',
block_number,
',\'',
function_sig,
'\',\'',
(CASE WHEN function_input IS NULL THEN '' ELSE function_input::STRING END),
'\']'
) AS read_input,
row_num
FROM all_inputs
LEFT JOIN contract_deployments USING(contract_address)
) ready_reads_pools
WHERE row_num BETWEEN {{ item * 50 + 1 }} AND {{ (item + 1) * 50}}
) batch_reads_pools
JOIN {{ source(
'streamline_crosschain',
'node_mapping'
) }} ON 1=1
AND chain = 'base'
) {% if not loop.last %}
UNION ALL
{% endif %}
{% endfor %}
),
reads_adjusted AS (
SELECT
VALUE :id :: STRING AS read_id,
VALUE :result :: STRING AS read_result,
SPLIT(
read_id,
'-'
) AS read_id_object,
read_id_object [0] :: STRING AS contract_address,
read_id_object [1] :: STRING AS block_number,
read_id_object [2] :: STRING AS function_sig,
read_id_object [3] :: STRING AS function_input,
_inserted_timestamp
FROM
pool_token_reads,
LATERAL FLATTEN(
input => read_output [0] :data
)
),
tokens AS (
SELECT
contract_address,
function_sig,
function_name,
function_input,
read_result,
regexp_substr_all(SUBSTR(read_result, 3, len(read_result)), '.{64}')[0]AS segmented_token_address,
_inserted_timestamp
FROM reads_adjusted
LEFT JOIN function_sigs USING(function_sig)
WHERE function_name IN ('coins','base_coins','underlying_coins')
AND read_result IS NOT NULL
),
pool_details AS (
SELECT
contract_address,
function_sig,
function_name,
function_input,
read_result,
regexp_substr_all(SUBSTR(read_result, 3, len(read_result)), '.{64}') AS segmented_output,
_inserted_timestamp
FROM reads_adjusted
LEFT JOIN function_sigs USING(function_sig)
WHERE function_name IN ('name','symbol','decimals')
AND read_result IS NOT NULL
),
all_pools AS (
SELECT
t.contract_address AS pool_address,
CONCAT('0x',SUBSTRING(t.segmented_token_address,25,40)) AS token_address,
function_input AS token_id,
function_name AS token_type,
MIN(CASE WHEN p.function_name = 'symbol' THEN utils.udf_hex_to_string(RTRIM(p.segmented_output [2] :: STRING,0)) END) AS pool_symbol,
MIN(CASE WHEN p.function_name = 'name' THEN CONCAT(utils.udf_hex_to_string(p.segmented_output [2] :: STRING),
utils.udf_hex_to_string(segmented_output [3] :: STRING)) END) AS pool_name,
MIN(CASE
WHEN p.read_result::STRING = '0x' THEN NULL
ELSE utils.udf_hex_to_int(LEFT(p.read_result::STRING,66))
END)::INTEGER AS pool_decimals,
CONCAT(
t.contract_address,
'-',
CONCAT('0x',SUBSTRING(t.segmented_token_address,25,40)),
'-',
function_input,
'-',
function_name
) AS pool_id,
MAX(t._inserted_timestamp) AS _inserted_timestamp
FROM tokens t
LEFT JOIN pool_details p USING(contract_address)
WHERE token_address IS NOT NULL
AND token_address <> '0x0000000000000000000000000000000000000000'
GROUP BY 1,2,3,4
),
FINAL AS (
SELECT
block_number,
block_timestamp,
tx_hash,
deployer_address,
pool_address,
CASE
WHEN token_address = '0xeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee' THEN '0x4200000000000000000000000000000000000006'
ELSE token_address
END AS token_address,
token_id,
token_type,
CASE
WHEN token_address = '0xeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee' THEN 'WETH'
WHEN pool_symbol IS NULL THEN c.token_symbol
ELSE pool_symbol
END AS pool_symbol,
pool_name,
CASE
WHEN token_address = '0xeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee' THEN '18'
WHEN pool_decimals IS NULL THEN c.token_decimals
ELSE pool_decimals
END AS pool_decimals,
pool_id,
_call_id,
a._inserted_timestamp
FROM all_pools a
LEFT JOIN {{ ref('silver__contracts') }} c
ON a.token_address = c.contract_address
LEFT JOIN contract_deployments d
ON a.pool_address = d.contract_address
QUALIFY(ROW_NUMBER() OVER(PARTITION BY pool_address, token_address ORDER BY a._inserted_timestamp DESC)) = 1
)
SELECT
*,
ROW_NUMBER() OVER (PARTITION BY pool_address ORDER BY token_address ASC) AS token_num
FROM FINAL

View File

@ -0,0 +1,29 @@
version: 2
models:
- name: silver_dex__curve_pools
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- POOL_ID
columns:
- name: POOL_ADDRESS
tests:
- not_null
- dbt_expectations.expect_column_values_to_match_regex:
regex: 0[xX][0-9a-fA-F]+
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- varchar
- name: TOKEN_ADDRESS
tests:
- not_null
- dbt_expectations.expect_column_values_to_match_regex:
regex: 0[xX][0-9a-fA-F]+
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- varchar
- name: POOL_SYMBOL
- name: POOL_NAME
- name: POOL_DECIMALS
- name: _INSERTED_TIMESTAMP

View File

@ -0,0 +1,248 @@
{{ config(
materialized = 'incremental',
unique_key = "_log_id",
cluster_by = ['block_timestamp::DATE'],
tags = ['non_realtime']
) }}
WITH pool_meta AS (
SELECT
DISTINCT pool_address,
pool_name,
token_address,
pool_symbol AS symbol,
token_id :: INTEGER AS token_id,
token_type :: STRING AS token_type
FROM
{{ ref('silver_dex__curve_pools') }}
),
pools AS (
SELECT
DISTINCT pool_address,
pool_name
FROM
pool_meta qualify (ROW_NUMBER() over (PARTITION BY pool_address
ORDER BY
pool_name ASC nulls last)) = 1
),
curve_base AS (
SELECT
block_number,
block_timestamp,
tx_hash,
origin_function_signature,
origin_from_address,
origin_to_address,
contract_address,
event_index,
CASE
WHEN topics [0] :: STRING = '0xd013ca23e77a65003c2c659c5442c00c805371b7fc1ebd4c206c41d1536bd90b' THEN 'TokenExchangeUnderlying'
ELSE 'TokenExchange'
END AS event_name,
contract_address AS pool_address,
pool_name,
regexp_substr_all(SUBSTR(DATA, 3, len(DATA)), '.{64}') AS segmented_data,
CONCAT('0x', SUBSTR(topics [1] :: STRING, 27, 40)) AS sender,
TRY_TO_NUMBER(
utils.udf_hex_to_int(
segmented_data [0] :: STRING
)
) AS sold_id,
TRY_TO_NUMBER(
utils.udf_hex_to_int(
segmented_data [1] :: STRING
)
) AS tokens_sold,
TRY_TO_NUMBER(
utils.udf_hex_to_int(
segmented_data [2] :: STRING
)
) AS bought_id,
TRY_TO_NUMBER(
utils.udf_hex_to_int(
segmented_data [3] :: STRING
)
) AS tokens_bought,
_log_id,
_inserted_timestamp
FROM
{{ ref('silver__logs') }}
l
INNER JOIN pools p
ON p.pool_address = l.contract_address
WHERE
topics [0] :: STRING IN (
'0x143f1f8e861fbdeddd5b46e844b7d3ac7b86a122f36e8c463859ee6811b1f29c',
'0xb2e76ae99761dc136e598d4a629bb347eccb9532a5f8bbd72e18467c3c34cc98',
'0xd013ca23e77a65003c2c659c5442c00c805371b7fc1ebd4c206c41d1536bd90b'
)
{% if is_incremental() %}
AND _inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) :: DATE
FROM
{{ this }}
)
{% endif %}
),
token_exchange AS (
SELECT
_log_id,
MAX(
CASE
WHEN sold_id = token_id THEN token_address
END
) AS token_in,
MAX(
CASE
WHEN bought_id = token_id THEN token_address
END
) AS token_out,
MAX(
CASE
WHEN sold_id = token_id THEN symbol
END
) AS symbol_in,
MAX(
CASE
WHEN bought_id = token_id THEN symbol
END
) AS symbol_out
FROM
curve_base t
LEFT JOIN pool_meta p
ON p.pool_address = t.pool_address
AND (
p.token_id = t.sold_id
OR p.token_id = t.bought_id
)
WHERE
token_type = 'coins'
GROUP BY
1
),
token_transfers AS (
SELECT
tx_hash,
contract_address AS token_address,
TRY_TO_NUMBER(
utils.udf_hex_to_int(
DATA :: STRING
)
) AS amount,
CONCAT('0x', SUBSTR(topics [1] :: STRING, 27, 40)) AS from_address,
CONCAT('0x', SUBSTR(topics [2] :: STRING, 27, 40)) AS to_address
FROM
{{ ref('silver__logs') }}
WHERE
topics [0] :: STRING = '0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef'
AND tx_hash IN (
SELECT
DISTINCT tx_hash
FROM
curve_base
)
AND CONCAT('0x', SUBSTR(topics [2] :: STRING, 27, 40)) <> '0x0000000000000000000000000000000000000000'
{% if is_incremental() %}
AND _inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) :: DATE
FROM
{{ this }}
)
{% endif %}
),
from_transfers AS (
SELECT
DISTINCT tx_hash,
token_address,
from_address,
amount
FROM
token_transfers
),
to_transfers AS (
SELECT
DISTINCT tx_hash,
token_address,
to_address,
amount
FROM
token_transfers
),
ready_pool_info AS (
SELECT
s.block_number,
s.block_timestamp,
s.tx_hash,
s.origin_function_signature,
s.origin_from_address,
s.origin_from_address AS tx_to,
s.origin_to_address,
event_index,
event_name,
pool_address,
pool_address AS contract_address,
pool_name,
sender,
sold_id,
tokens_sold,
COALESCE(
sold.token_address,
e.token_in
) AS token_in,
e.symbol_in AS symbol_in,
bought_id,
tokens_bought,
COALESCE(
bought.token_address,
e.token_out
) AS token_out,
e.symbol_out AS symbol_out,
s._log_id,
_inserted_timestamp
FROM
curve_base s
LEFT JOIN token_exchange e
ON s._log_id = e._log_id
LEFT JOIN from_transfers sold
ON tokens_sold = sold.amount
AND s.tx_hash = sold.tx_hash
LEFT JOIN to_transfers bought
ON tokens_bought = bought.amount
AND s.tx_hash = bought.tx_hash
WHERE
tokens_sold <> 0 qualify(ROW_NUMBER() over(PARTITION BY s._log_id
ORDER BY
_inserted_timestamp DESC)) = 1
)
SELECT
block_number,
block_timestamp,
tx_hash,
origin_function_signature,
origin_from_address,
tx_to,
origin_to_address,
event_index,
event_name,
pool_address,
contract_address,
pool_name,
sender,
sold_id,
tokens_sold,
token_in,
symbol_in,
bought_id,
tokens_bought,
token_out,
symbol_out,
_log_id,
_inserted_timestamp,
'curve' AS platform
FROM
ready_pool_info

View File

@ -0,0 +1,54 @@
version: 2
models:
- name: silver_dex__curve_swaps
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- _log_id
columns:
- name: TOKENS_SOLD
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- decimal
- float
- number
- name: TOKENS_BOUGHT
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- decimal
- float
- number
- name: BLOCK_TIMESTAMP
tests:
- not_null
- dbt_expectations.expect_row_values_to_have_recent_data:
datepart: day
interval: 1
- name: EVENT_INDEX
tests:
- not_null
- name: POOL_ADDRESS
tests:
- not_null
- dbt_expectations.expect_column_values_to_match_regex:
regex: 0[xX][0-9a-fA-F]+
- name: POOL_NAME
tests:
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- varchar
- name: SENDER
tests:
- not_null
- name: TX_TO
tests:
- not_null
- name: TOKEN_IN
- name: TOKEN_OUT
- name: TX_HASH
tests:
- not_null

View File

@ -15,6 +15,72 @@ WITH contracts AS (
FROM
{{ ref('silver__contracts') }}
),
curve AS (
SELECT
block_number,
block_timestamp,
tx_hash,
deployer_address AS contract_address,
pool_address,
pool_name,
'curve' AS platform,
_call_id AS _id,
_inserted_timestamp,
MAX(
CASE
WHEN token_num = 1 THEN token_address
END
) AS token0,
MAX(
CASE
WHEN token_num = 2 THEN token_address
END
) AS token1,
MAX(
CASE
WHEN token_num = 3 THEN token_address
END
) AS token2,
MAX(
CASE
WHEN token_num = 4 THEN token_address
END
) AS token3,
MAX(
CASE
WHEN token_num = 5 THEN token_address
END
) AS token4,
MAX(
CASE
WHEN token_num = 6 THEN token_address
END
) AS token5,
MAX(
CASE
WHEN token_num = 7 THEN token_address
END
) AS token6,
MAX(
CASE
WHEN token_num = 8 THEN token_address
END
) AS token7
FROM
{{ ref('silver_dex__curve_pools') }}
{% if is_incremental() %}
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) :: DATE - 1
FROM
{{ this }}
)
{% endif %}
GROUP BY
ALL
),
balancer AS (
SELECT
block_number,
@ -170,6 +236,32 @@ swapbased AS (
FROM
{{ ref('silver_dex__swapbased_pools') }}
{% if is_incremental() %}
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) :: DATE - 1
FROM
{{ this }}
)
{% endif %}
),
aerodrome AS (
SELECT
block_number,
block_timestamp,
tx_hash,
contract_address,
pool_address,
NULL AS pool_name,
token0,
token1,
'aerodrome' AS platform,
_log_id AS _id,
_inserted_timestamp
FROM
{{ ref('silver_dex__aerodrome_pools') }}
{% if is_incremental() %}
WHERE
_inserted_timestamp >= (
@ -221,6 +313,11 @@ all_pools_standard AS (
*
FROM
maverick
UNION ALL
SELECT
*
FROM
aerodrome
),
all_pools_v3 AS (
SELECT
@ -243,6 +340,11 @@ all_pools_other AS (
*
FROM
balancer
UNION ALL
SELECT
*
FROM
curve
),
FINAL AS (
SELECT

View File

@ -39,6 +39,74 @@ AND HOUR >= (
)
{% endif %}
),
curve_swaps AS (
SELECT
block_number,
block_timestamp,
tx_hash,
origin_function_signature,
origin_from_address,
origin_to_address,
contract_address,
event_name,
s.tokens_sold AS amount_in_unadj,
s.tokens_bought AS amount_out_unadj,
sender,
tx_to,
event_index,
platform,
token_in,
token_out,
COALESCE(
c1.symbol,
s.symbol_in
) AS token_symbol_in,
COALESCE(
c2.symbol,
s.symbol_out
) AS token_symbol_out,
NULL AS pool_name,
c1.decimals AS decimals_in,
CASE
WHEN decimals_in IS NOT NULL THEN s.tokens_sold / pow(
10,
decimals_in
)
ELSE s.tokens_sold
END AS amount_in,
c2.decimals AS decimals_out,
CASE
WHEN decimals_out IS NOT NULL THEN s.tokens_bought / pow(
10,
decimals_out
)
ELSE s.tokens_bought
END AS amount_out,
_log_id,
_inserted_timestamp
FROM
{{ ref('silver_dex__curve_swaps') }}
s
LEFT JOIN contracts c1
ON c1.address = s.token_in
LEFT JOIN contracts c2
ON c2.address = s.token_out
WHERE
amount_out <> 0
AND COALESCE(
token_symbol_in,
'null'
) <> COALESCE(token_symbol_out, 'null')
{% if is_incremental() %}
AND _inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) :: DATE
FROM
{{ this }}
)
{% endif %}
),
dackie_swaps AS (
SELECT
block_number,
@ -423,15 +491,27 @@ woofi_swaps AS (
platform,
token_in,
token_out,
CONCAT(
CONCAT(
LEAST(
COALESCE(symbol_in, CONCAT(SUBSTRING(token_in, 1, 5), '...', SUBSTRING(token_in, 39, 42))),
COALESCE(symbol_out, CONCAT(SUBSTRING(token_out, 1, 5), '...', SUBSTRING(token_out, 39, 42)))
COALESCE(
symbol_in,
CONCAT(SUBSTRING(token_in, 1, 5), '...', SUBSTRING(token_in, 39, 42))
),
COALESCE(
symbol_out,
CONCAT(SUBSTRING(token_out, 1, 5), '...', SUBSTRING(token_out, 39, 42))
)
),
'-',
GREATEST(
COALESCE(symbol_in, CONCAT(SUBSTRING(token_in, 1, 5), '...', SUBSTRING(token_in, 39, 42))),
COALESCE(symbol_out, CONCAT(SUBSTRING(token_out, 1, 5), '...', SUBSTRING(token_out, 39, 42)))
COALESCE(
symbol_in,
CONCAT(SUBSTRING(token_in, 1, 5), '...', SUBSTRING(token_in, 39, 42))
),
COALESCE(
symbol_out,
CONCAT(SUBSTRING(token_out, 1, 5), '...', SUBSTRING(token_out, 39, 42))
)
)
) AS pool_name,
_log_id,
@ -597,6 +677,57 @@ swapbased AS (
LEFT JOIN contracts c2
ON s.token_out = c2.address
{% if is_incremental() %}
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) :: DATE - 1
FROM
{{ this }}
)
{% endif %}
),
aerodrome AS (
SELECT
block_number,
block_timestamp,
tx_hash,
origin_function_signature,
origin_from_address,
origin_to_address,
contract_address,
event_name,
c1.decimals AS decimals_in,
c1.symbol AS symbol_in,
amount_in_unadj,
CASE
WHEN decimals_in IS NULL THEN amount_in_unadj
ELSE (amount_in_unadj / pow(10, decimals_in))
END AS amount_in,
c2.decimals AS decimals_out,
c2.symbol AS symbol_out,
amount_out_unadj,
CASE
WHEN decimals_out IS NULL THEN amount_out_unadj
ELSE (amount_out_unadj / pow(10, decimals_out))
END AS amount_out,
sender,
tx_to,
event_index,
platform,
token_in,
token_out,
NULL AS pool_name,
_log_id,
_inserted_timestamp
FROM
{{ ref('silver_dex__aerodrome_swaps') }}
s
LEFT JOIN contracts c1
ON s.token_in = c1.address
LEFT JOIN contracts c2
ON s.token_out = c2.address
{% if is_incremental() %}
WHERE
_inserted_timestamp >= (
@ -638,6 +769,35 @@ all_dex_standard AS (
FROM
swapbased
UNION ALL
SELECT
block_number,
block_timestamp,
tx_hash,
origin_function_signature,
origin_from_address,
origin_to_address,
contract_address,
pool_name,
event_name,
amount_in_unadj,
amount_out_unadj,
amount_in,
amount_out,
sender,
tx_to,
event_index,
platform,
token_in,
token_out,
symbol_in,
symbol_out,
decimals_in,
decimals_out,
_log_id,
_inserted_timestamp
FROM
aerodrome
UNION ALL
SELECT
block_number,
block_timestamp,
@ -667,6 +827,35 @@ all_dex_standard AS (
FROM
baseswap
UNION ALL
SELECT
block_number,
block_timestamp,
tx_hash,
origin_function_signature,
origin_from_address,
origin_to_address,
contract_address,
pool_name,
event_name,
amount_in_unadj,
amount_out_unadj,
amount_in,
amount_out,
sender,
tx_to,
event_index,
platform,
token_in,
token_out,
token_symbol_in AS symbol_in,
token_symbol_out AS symbol_out,
decimals_in,
decimals_out,
_log_id,
_inserted_timestamp
FROM
curve_swaps
UNION ALL
SELECT
block_number,
block_timestamp,