AN-4558/add-univ2-to-all-evms (#146)

* add univ2 and heal models

* quick fixes

* fix label

* add liq heal + fixes
This commit is contained in:
Matt Romano 2024-03-07 09:27:14 -08:00 committed by GitHub
parent d6bc9c4707
commit 8919a4287e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 452 additions and 26 deletions

View File

@ -71,7 +71,7 @@ curve AS (
FROM
{{ ref('silver_dex__curve_pools') }}
{% if is_incremental() %}
{% if is_incremental() and 'curve' not in var('HEAL_CURATED_MODEL') %}
WHERE
_inserted_timestamp >= (
SELECT
@ -106,7 +106,7 @@ balancer AS (
FROM
{{ ref('silver_dex__balancer_pools') }}
{% if is_incremental() %}
{% if is_incremental() and 'balancer' not in var('HEAL_CURATED_MODEL') %}
WHERE
_inserted_timestamp >= (
SELECT
@ -134,7 +134,34 @@ uni_v3 AS (
FROM
{{ ref('silver_dex__univ3_pools') }}
{% if is_incremental() %}
{% if is_incremental() and 'uni_v3' not in var('HEAL_CURATED_MODEL') %}
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) - INTERVAL '12 hours'
FROM
{{ this }}
)
{% endif %}
),
uni_v2 AS (
SELECT
block_number,
block_timestamp,
tx_hash,
contract_address,
pool_address,
NULL AS pool_name,
token0,
token1,
'uniswap-v2' AS platform,
'v2' AS version,
_log_id AS _id,
_inserted_timestamp
FROM
{{ ref('silver_dex__univ2_pools') }}
{% if is_incremental() and 'uni_v2' not in var('HEAL_CURATED_MODEL') %}
WHERE
_inserted_timestamp >= (
SELECT
@ -162,7 +189,7 @@ dackieswap AS (
FROM
{{ ref('silver_dex__dackie_pools') }}
{% if is_incremental() %}
{% if is_incremental() and 'dackieswap' not in var('HEAL_CURATED_MODEL') %}
WHERE
_inserted_timestamp >= (
SELECT
@ -190,7 +217,7 @@ sushi AS (
FROM
{{ ref('silver_dex__sushi_pools') }}
{% if is_incremental() %}
{% if is_incremental() and 'sushi' not in var('HEAL_CURATED_MODEL') %}
WHERE
_inserted_timestamp >= (
SELECT
@ -217,7 +244,7 @@ maverick AS (
FROM
{{ ref('silver_dex__maverick_pools') }}
{% if is_incremental() %}
{% if is_incremental() and 'maverick' not in var('HEAL_CURATED_MODEL') %}
WHERE
_inserted_timestamp >= (
SELECT
@ -244,7 +271,7 @@ swapbased AS (
FROM
{{ ref('silver_dex__swapbased_pools') }}
{% if is_incremental() %}
{% if is_incremental() and 'swapbased' not in var('HEAL_CURATED_MODEL') %}
WHERE
_inserted_timestamp >= (
SELECT
@ -271,7 +298,7 @@ aerodrome AS (
FROM
{{ ref('silver_dex__aerodrome_pools') }}
{% if is_incremental() %}
{% if is_incremental() and 'aerodrome' not in var('HEAL_CURATED_MODEL') %}
WHERE
_inserted_timestamp >= (
SELECT
@ -298,7 +325,7 @@ baseswap AS (
FROM
{{ ref('silver_dex__baseswap_pools') }}
{% if is_incremental() %}
{% if is_incremental() and 'baseswap' not in var('HEAL_CURATED_MODEL') %}
WHERE
_inserted_timestamp >= (
SELECT
@ -319,6 +346,11 @@ all_pools_standard AS (
FROM
swapbased
UNION ALL
SELECT
*
FROM
uni_v2
UNION ALL
SELECT
*
FROM

View File

@ -91,7 +91,7 @@ curve_swaps AS (
'null'
) <> COALESCE(token_symbol_out, 'null')
{% if is_incremental() %}
{% if is_incremental() and 'curve_swaps' not in var('HEAL_CURATED_MODEL') %}
AND _inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) - INTERVAL '36 hours'
@ -192,7 +192,7 @@ dackie_swaps AS (
block_timestamp
) = p2.hour
{% if is_incremental() %}
{% if is_incremental() and 'dackie_swaps' not in var('HEAL_CURATED_MODEL') %}
WHERE
_inserted_timestamp >= (
SELECT
@ -294,7 +294,59 @@ univ3_swaps AS (
block_timestamp
) = p2.hour
{% if is_incremental() %}
{% if is_incremental() and 'univ3_swaps' not in var('HEAL_CURATED_MODEL') %}
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) - INTERVAL '36 hours'
FROM
{{ this }}
)
{% endif %}
),
univ2_swaps AS (
SELECT
block_number,
block_timestamp,
tx_hash,
origin_function_signature,
origin_from_address,
origin_to_address,
contract_address,
event_name,
c1.decimals AS decimals_in,
c1.symbol AS symbol_in,
amount_in_unadj,
CASE
WHEN decimals_in IS NULL THEN amount_in_unadj
ELSE (amount_in_unadj / pow(10, decimals_in))
END AS amount_in,
c2.decimals AS decimals_out,
c2.symbol AS symbol_out,
amount_out_unadj,
CASE
WHEN decimals_out IS NULL THEN amount_out_unadj
ELSE (amount_out_unadj / pow(10, decimals_out))
END AS amount_out,
sender,
tx_to,
event_index,
platform,
'v2' AS version,
token_in,
token_out,
NULL AS pool_name,
_log_id,
_inserted_timestamp
FROM
{{ ref('silver_dex__univ2_swaps') }}
s
LEFT JOIN contracts c1
ON s.token_in = c1.address
LEFT JOIN contracts c2
ON s.token_out = c2.address
{% if is_incremental() and 'univ2_swaps' not in var('HEAL_CURATED_MODEL') %}
WHERE
_inserted_timestamp >= (
SELECT
@ -396,7 +448,7 @@ sushi_swaps AS (
block_timestamp
) = p2.hour
{% if is_incremental() %}
{% if is_incremental() and 'sushi_swaps' not in var('HEAL_CURATED_MODEL') %}
WHERE
_inserted_timestamp >= (
SELECT
@ -448,7 +500,7 @@ maverick_swaps AS (
LEFT JOIN contracts c2
ON s.token_out = c2.address
{% if is_incremental() %}
{% if is_incremental() and 'maverick_swaps' not in var('HEAL_CURATED_MODEL') %}
WHERE
_inserted_timestamp >= (
SELECT
@ -522,7 +574,7 @@ woofi_swaps AS (
LEFT JOIN contracts c2
ON s.token_out = c2.address
{% if is_incremental() %}
{% if is_incremental() and 'woofi_swaps' not in var('HEAL_CURATED_MODEL') %}
WHERE
_inserted_timestamp >= (
SELECT
@ -574,7 +626,7 @@ balancer_swaps AS (
LEFT JOIN contracts c2
ON s.token_out = c2.address
{% if is_incremental() %}
{% if is_incremental() and 'balancer_swaps' not in var('HEAL_CURATED_MODEL') %}
WHERE
_inserted_timestamp >= (
SELECT
@ -584,7 +636,7 @@ WHERE
)
{% endif %}
),
baseswap AS (
baseswap_swaps AS (
SELECT
block_number,
block_timestamp,
@ -626,7 +678,7 @@ baseswap AS (
LEFT JOIN contracts c2
ON s.token_out = c2.address
{% if is_incremental() %}
{% if is_incremental() and 'baseswap_swaps' not in var('HEAL_CURATED_MODEL') %}
WHERE
_inserted_timestamp >= (
SELECT
@ -636,7 +688,7 @@ WHERE
)
{% endif %}
),
swapbased AS (
swapbased_swaps AS (
SELECT
block_number,
block_timestamp,
@ -678,7 +730,7 @@ swapbased AS (
LEFT JOIN contracts c2
ON s.token_out = c2.address
{% if is_incremental() %}
{% if is_incremental() and 'swapbased_swaps' not in var('HEAL_CURATED_MODEL') %}
WHERE
_inserted_timestamp >= (
SELECT
@ -688,7 +740,7 @@ WHERE
)
{% endif %}
),
aerodrome AS (
aerodrome_swaps AS (
SELECT
block_number,
block_timestamp,
@ -730,7 +782,7 @@ aerodrome AS (
LEFT JOIN contracts c2
ON s.token_out = c2.address
{% if is_incremental() %}
{% if is_incremental() and 'aerodrome_swaps' not in var('HEAL_CURATED_MODEL') %}
WHERE
_inserted_timestamp >= (
SELECT
@ -792,7 +844,7 @@ voodoo_swaps AS (
LEFT JOIN contracts c2
ON s.token_out = c2.address
{% if is_incremental() %}
{% if is_incremental() and 'voodoo_swaps' not in var('HEAL_CURATED_MODEL') %}
WHERE
_inserted_timestamp >= (
SELECT
@ -832,7 +884,7 @@ all_dex_standard AS (
_log_id,
_inserted_timestamp
FROM
swapbased
swapbased_swaps
UNION ALL
SELECT
block_number,
@ -862,7 +914,7 @@ all_dex_standard AS (
_log_id,
_inserted_timestamp
FROM
aerodrome
univ2_swaps
UNION ALL
SELECT
block_number,
@ -892,7 +944,37 @@ all_dex_standard AS (
_log_id,
_inserted_timestamp
FROM
baseswap
aerodrome_swaps
UNION ALL
SELECT
block_number,
block_timestamp,
tx_hash,
origin_function_signature,
origin_from_address,
origin_to_address,
contract_address,
pool_name,
event_name,
amount_in_unadj,
amount_out_unadj,
amount_in,
amount_out,
sender,
tx_to,
event_index,
platform,
version,
token_in,
token_out,
symbol_in,
symbol_out,
decimals_in,
decimals_out,
_log_id,
_inserted_timestamp
FROM
baseswap_swaps
UNION ALL
SELECT
block_number,

View File

@ -0,0 +1,55 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'delete+insert',
unique_key = 'pool_address',
tags = ['curated']
) }}
WITH pool_creation AS (
SELECT
block_number,
block_timestamp,
tx_hash,
event_index,
contract_address,
regexp_substr_all(SUBSTR(DATA, 3, len(DATA)), '.{64}') AS segmented_data,
CONCAT('0x', SUBSTR(topics [1] :: STRING, 27, 40)) AS token0,
CONCAT('0x', SUBSTR(topics [2] :: STRING, 27, 40)) AS token1,
CONCAT('0x', SUBSTR(segmented_data [0] :: STRING, 25, 40)) AS pool_address,
utils.udf_hex_to_int(
segmented_data [1] :: STRING
) :: INT AS pool_id,
_log_id,
_inserted_timestamp
FROM
{{ ref ('silver__logs') }}
WHERE
contract_address = LOWER('0x8909Dc15e40173Ff4699343b6eB8132c65e18eC6')
AND topics [0] :: STRING = '0x0d3648bd0f6ba80134a33ba9275ac585d9d315f0ad8355cddefde31afa28d0e9' --PairCreated
{% if is_incremental() %}
AND _inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) - INTERVAL '12 hours'
FROM
{{ this }}
)
{% endif %}
)
SELECT
block_number,
block_timestamp,
tx_hash,
contract_address,
event_index,
token0,
token1,
pool_address,
pool_id,
_log_id,
_inserted_timestamp
FROM
pool_creation qualify(ROW_NUMBER() over (PARTITION BY pool_address
ORDER BY
_inserted_timestamp DESC)) = 1

View File

@ -0,0 +1,23 @@
version: 2
models:
- name: silver_dex__univ2_pools
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- POOL_ADDRESS
columns:
- name: POOL_ADDRESS
tests:
- not_null
- name: TOKEN0
tests:
- not_null
- name: TOKEN1
tests:
- not_null
- name: _INSERTED_TIMESTAMP
tests:
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_LTZ
- TIMESTAMP_NTZ

View File

@ -0,0 +1,118 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'delete+insert',
unique_key = 'block_number',
cluster_by = ['block_timestamp::DATE'],
tags = ['curated','reorg']
) }}
WITH pools AS (
SELECT
pool_address,
token0,
token1
FROM
{{ ref('silver_dex__univ2_pools') }}
),
swaps_base AS (
SELECT
block_number,
origin_function_signature,
origin_from_address,
origin_to_address,
block_timestamp,
tx_hash,
event_index,
contract_address,
regexp_substr_all(SUBSTR(DATA, 3, len(DATA)), '.{64}') AS segmented_data,
TRY_TO_NUMBER(
utils.udf_hex_to_int(
segmented_data [0] :: STRING
) :: INTEGER
) AS amount0In,
TRY_TO_NUMBER(
utils.udf_hex_to_int(
segmented_data [1] :: STRING
) :: INTEGER
) AS amount1In,
TRY_TO_NUMBER(
utils.udf_hex_to_int(
segmented_data [2] :: STRING
) :: INTEGER
) AS amount0Out,
TRY_TO_NUMBER(
utils.udf_hex_to_int(
segmented_data [3] :: STRING
) :: INTEGER
) AS amount1Out,
CONCAT('0x', SUBSTR(topics [1] :: STRING, 27, 40)) AS sender,
CONCAT('0x', SUBSTR(topics [2] :: STRING, 27, 40)) AS tx_to,
token0,
token1,
_log_id,
_inserted_timestamp
FROM
{{ ref('silver__logs') }}
INNER JOIN pools p
ON p.pool_address = contract_address
WHERE
topics [0] :: STRING = '0xd78ad95fa46c994b6551d0da85fc275fe613ce37657fb8d5e3d130840159d822'
AND tx_status = 'SUCCESS'
{% if is_incremental() %}
AND _inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) - INTERVAL '12 hours'
FROM
{{ this }}
)
{% endif %}
)
SELECT
block_number,
block_timestamp,
origin_function_signature,
origin_from_address,
origin_to_address,
tx_hash,
event_index,
contract_address,
sender,
tx_to,
amount0In,
amount1In,
amount0Out,
amount1Out,
token0,
token1,
CASE
WHEN amount0In <> 0
AND amount1In <> 0
AND amount0Out <> 0 THEN amount1In
WHEN amount0In <> 0 THEN amount0In
WHEN amount1In <> 0 THEN amount1In
END AS amount_in_unadj,
CASE
WHEN amount0Out <> 0 THEN amount0Out
WHEN amount1Out <> 0 THEN amount1Out
END AS amount_out_unadj,
CASE
WHEN amount0In <> 0
AND amount1In <> 0
AND amount0Out <> 0 THEN token1
WHEN amount0In <> 0 THEN token0
WHEN amount1In <> 0 THEN token1
END AS token_in,
CASE
WHEN amount0Out <> 0 THEN token0
WHEN amount1Out <> 0 THEN token1
END AS token_out,
'Swap' AS event_name,
'uniswap-v2' AS platform,
_log_id,
_inserted_timestamp
FROM
swaps_base
WHERE
token_in <> token_out

View File

@ -0,0 +1,116 @@
version: 2
models:
- name: silver_dex__univ2_swaps
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- _LOG_ID
columns:
- name: BLOCK_NUMBER
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: BLOCK_TIMESTAMP
tests:
- not_null
- dbt_expectations.expect_row_values_to_have_recent_data:
datepart: day
interval: 2 #might be normal for swaps not to happen on a day
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_LTZ
- TIMESTAMP_NTZ
- name: TX_HASH
tests:
- not_null
- dbt_expectations.expect_column_values_to_match_regex:
regex: 0[xX][0-9a-fA-F]+
- name: CONTRACT_ADDRESS
tests:
- not_null
- dbt_expectations.expect_column_values_to_match_regex:
regex: 0[xX][0-9a-fA-F]+
- name: AMOUNT_IN_UNADJ
tests:
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: AMOUNT_IN
tests:
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: AMOUNT_OUT_UNADJ
tests:
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: AMOUNT_OUT
tests:
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: TOKEN_IN
tests:
- not_null
- dbt_expectations.expect_column_values_to_match_regex:
regex: 0[xX][0-9a-fA-F]+
- name: TOKEN_OUT
tests:
- not_null
- dbt_expectations.expect_column_values_to_match_regex:
regex: 0[xX][0-9a-fA-F]+
- name: SYMBOL_IN
tests:
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- name: SYMBOL_OUT
tests:
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- name: TX_TO
tests:
- not_null
- dbt_expectations.expect_column_values_to_match_regex:
regex: 0[xX][0-9a-fA-F]+
- name: PLATFORM
tests:
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- name: EVENT_INDEX
tests:
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: _LOG_ID
tests:
- not_null
- name: ORIGIN_FUNCTION_SIGNATURE
tests:
- not_null
- name: ORIGIN_FROM_ADDRESS
tests:
- not_null
- dbt_expectations.expect_column_values_to_match_regex:
regex: 0[xX][0-9a-fA-F]+
- name: ORIGIN_TO_ADDRESS
tests:
- dbt_expectations.expect_column_values_to_match_regex:
regex: 0[xX][0-9a-fA-F]+