diff --git a/models/silver/defi/dex/bitflux/silver_dex__bitflux_pools.sql b/models/silver/defi/dex/bitflux/silver_dex__bitflux_pools.sql new file mode 100644 index 0000000..e69de29 diff --git a/models/silver/defi/dex/corex/silver_dex__corex_pools.sql b/models/silver/defi/dex/corex/silver_dex__corex_pools.sql new file mode 100644 index 0000000..6202a6b --- /dev/null +++ b/models/silver/defi/dex/corex/silver_dex__corex_pools.sql @@ -0,0 +1,62 @@ +{{ config( + materialized = 'incremental', + incremental_strategy = 'delete+insert', + unique_key = 'pool_address', + cluster_by = ['block_timestamp::DATE'], + tags = ['curated'] +) }} + +WITH pool_creation AS ( + + SELECT + block_number, + block_timestamp, + tx_hash, + event_index, + contract_address, + regexp_substr_all(SUBSTR(DATA, 3, len(DATA)), '.{64}') AS segmented_data, + CONCAT('0x', SUBSTR(topic_1, 27, 40)) AS token0, + CONCAT('0x', SUBSTR(topic_2, 27, 40)) AS token1, + TRY_TO_NUMBER(utils.udf_hex_to_int(topic_3)) AS fee, + TRY_TO_NUMBER(utils.udf_hex_to_int(segmented_data [0] :: STRING)) AS tick_spacing, + CONCAT('0x', SUBSTR(segmented_data [1] :: STRING, 25, 40)) AS pool_address, + CONCAT( + tx_hash :: STRING, + '-', + event_index :: STRING + ) AS _log_id, + modified_timestamp AS _inserted_timestamp + FROM + {{ ref('core__fact_event_logs') }} + WHERE + contract_address = '0x526190295afb6b8736b14e4b42744fbd95203a3a' + AND topic_0 = '0x783cca1c0412dd0d695e784568c96da2e9c22ff989357a2e8b1d9b2b4e6b7118' + AND tx_succeeded + +{% if is_incremental() %} +AND _inserted_timestamp >= ( + SELECT + MAX(_inserted_timestamp) - INTERVAL '12 hours' + FROM + {{ this }} +) +AND _inserted_timestamp >= SYSDATE() - INTERVAL '7 day' +{% endif %} +) +SELECT + block_number, + block_timestamp, + tx_hash, + contract_address, + event_index, + token0, + token1, + pool_address, + fee, + tick_spacing, + _log_id, + _inserted_timestamp +FROM + pool_creation qualify(ROW_NUMBER() over (PARTITION BY pool_address +ORDER BY + _inserted_timestamp DESC)) = 1 diff --git a/models/silver/defi/dex/corex/silver_dex__corex_swaps.sql b/models/silver/defi/dex/corex/silver_dex__corex_swaps.sql new file mode 100644 index 0000000..507804d --- /dev/null +++ b/models/silver/defi/dex/corex/silver_dex__corex_swaps.sql @@ -0,0 +1,104 @@ +{{ config( + materialized = 'incremental', + incremental_strategy = 'delete+insert', + unique_key = 'block_number', + cluster_by = ['block_timestamp::DATE'], + tags = ['curated','reorg'] +) }} + +WITH pool_data AS ( + + SELECT + token0, + token1, + fee, + tick_spacing, + pool_address + FROM + {{ ref('silver_dex__corex_pools') }} +), +base_swaps AS ( + SELECT + block_number, + block_timestamp, + tx_hash, + contract_address, + event_index, + origin_function_signature, + origin_from_address, + origin_to_address, + CONCAT('0x', SUBSTR(topic_1, 27, 40)) AS sender, + CONCAT('0x', SUBSTR(topic_2, 27, 40)) AS recipient, + regexp_substr_all(SUBSTR(DATA, 3, len(DATA)), '.{64}') AS segmented_data, + utils.udf_hex_to_int( + 's2c', + segmented_data [0] :: STRING + ) :: FLOAT AS amount0_unadj, + utils.udf_hex_to_int( + 's2c', + segmented_data [1] :: STRING + ) :: FLOAT AS amount1_unadj, + utils.udf_hex_to_int( + 's2c', + segmented_data [2] :: STRING + ) :: FLOAT AS sqrtPriceX96, + utils.udf_hex_to_int( + 's2c', + segmented_data [3] :: STRING + ) :: FLOAT AS liquidity, + utils.udf_hex_to_int( + 's2c', + segmented_data [4] :: STRING + ) :: FLOAT AS tick, + CONCAT( + tx_hash :: STRING, + '-', + event_index :: STRING + ) AS _log_id, + modified_timestamp AS _inserted_timestamp, + FROM + {{ ref('core__fact_event_logs') }} l + INNER JOIN pool_data p + ON p.pool_address = l.contract_address + WHERE + topic_0 = '0xc42079f94a6350d7e6235f29174924f928cc2ac818eb64fed8004e115fbcca67' + AND tx_succeeded + +{% if is_incremental() %} +AND _inserted_timestamp >= ( + SELECT + MAX(_inserted_timestamp) - INTERVAL '12 hours' + FROM + {{ this }} +) +AND _inserted_timestamp >= SYSDATE() - INTERVAL '7 day' +{% endif %} +) +SELECT + block_number, + block_timestamp, + tx_hash, + contract_address, + pool_address, + origin_function_signature, + origin_from_address, + origin_to_address, + recipient, + sender, + fee, + tick, + tick_spacing, + liquidity, + event_index, + token0, + token1, + amount0_unadj, + amount1_unadj, + _log_id, + _inserted_timestamp +FROM + base_swaps + INNER JOIN pool_data + ON pool_data.pool_address = base_swaps.contract_address qualify(ROW_NUMBER() over(PARTITION BY _log_id +ORDER BY + _inserted_timestamp DESC)) = 1 diff --git a/models/silver/defi/dex/glyph/silver_dex__glyph_v4_pools.sql b/models/silver/defi/dex/glyph/silver_dex__glyph_v4_pools.sql new file mode 100644 index 0000000..21638a0 --- /dev/null +++ b/models/silver/defi/dex/glyph/silver_dex__glyph_v4_pools.sql @@ -0,0 +1,155 @@ +{{ config( + materialized = 'incremental', + incremental_strategy = 'delete+insert', + unique_key = 'pool_address', + cluster_by = ['block_timestamp::DATE'], + tags = ['curated'] +) }} + +WITH pool_creation AS ( + + SELECT + block_number, + block_timestamp, + tx_hash, + event_index, + contract_address, + CONCAT('0x', SUBSTR(topic_1, 27, 40)) AS token0, + CONCAT('0x', SUBSTR(topic_2, 27, 40)) AS token1, + CONCAT('0x', SUBSTR(DATA, 27, 40)) AS pool_address, + CONCAT( + tx_hash :: STRING, + '-', + event_index :: STRING + ) AS _log_id, + modified_timestamp AS _inserted_timestamp + FROM + {{ ref('core__fact_event_logs') }} + WHERE + contract_address = '0x74efe55bea4988e7d92d03efd8ddb8bf8b7bd597' + AND topic_0 = '0x91ccaa7a278130b65168c3a0c8d3bcae84cf5e43704342bd3ec0b59e59c036db' + AND tx_succeeded + +{% if is_incremental() %} +AND _inserted_timestamp >= ( + SELECT + MAX(_inserted_timestamp) - INTERVAL '12 hours' + FROM + {{ this }} +) +AND _inserted_timestamp >= SYSDATE() - INTERVAL '7 day' +{% endif %} +), +initial_info AS ( + SELECT + tx_hash, + contract_address, + regexp_substr_all(SUBSTR(DATA, 3, len(DATA)), '.{64}') AS segmented_data, + utils.udf_hex_to_int('s2c', CONCAT('0x', segmented_data [0] :: STRING)) AS price, + utils.udf_hex_to_int('s2c', CONCAT('0x', segmented_data [1] :: STRING)) AS tick + FROM + {{ ref('core__fact_event_logs') }} + WHERE + topics [0] :: STRING = '0x98636036cb66a9c19a37435efc1e90142190214e8abeb821bdba3f2990dd4c95' + AND tx_hash IN ( + SELECT + tx_hash + FROM + pool_creation + ) + AND tx_succeeded + +{% if is_incremental() %} +AND modified_timestamp >= ( + SELECT + MAX(_inserted_timestamp) - INTERVAL '12 hours' + FROM + {{ this }} +) +AND modified_timestamp >= SYSDATE() - INTERVAL '7 day' +{% endif %} +), +tick_spacing AS ( + SELECT + tx_hash, + contract_address, + utils.udf_hex_to_int( + 's2c', + DATA :: STRING + ) AS tick_spacing + FROM + {{ ref('core__fact_event_logs') }} + WHERE + topic_0 = '0x01413b1d5d4c359e9a0daa7909ecda165f6e8c51fe2ff529d74b22a5a7c02645' + AND tx_hash IN ( + SELECT + tx_hash + FROM + pool_creation + ) + AND tx_succeeded + +{% if is_incremental() %} +AND modified_timestamp >= ( + SELECT + MAX(_inserted_timestamp) - INTERVAL '12 hours' + FROM + {{ this }} +) +AND modified_timestamp >= SYSDATE() - INTERVAL '7 day' +{% endif %} +), +fee AS ( + SELECT + tx_hash, + contract_address, + utils.udf_hex_to_int( + 's2c', + DATA :: STRING + ) AS fee + FROM + {{ ref('core__fact_event_logs') }} + WHERE + topic_0 = '0x598b9f043c813aa6be3426ca60d1c65d17256312890be5118dab55b0775ebe2a' + AND tx_hash IN ( + SELECT + tx_hash + FROM + pool_creation + ) + AND tx_succeeded + +{% if is_incremental() %} +AND modified_timestamp >= ( + SELECT + MAX(_inserted_timestamp) - INTERVAL '12 hours' + FROM + {{ this }} +) +AND modified_timestamp >= SYSDATE() - INTERVAL '7 day' +{% endif %} +) +SELECT + block_number, + block_timestamp, + p.tx_hash, + p.contract_address, + event_index, + token0, + token1, + pool_address, + fee, + tick, + tick_spacing, + _log_id, + _inserted_timestamp +FROM + pool_creation p + INNER JOIN initial_info + ON initial_info.contract_address = p.pool_address + INNER JOIN tick_spacing + ON tick_spacing.contract_address = p.pool_address + INNER JOIN fee + ON fee.contract_address = p.pool_address qualify(ROW_NUMBER() over (PARTITION BY pool_address +ORDER BY + _inserted_timestamp DESC)) = 1 diff --git a/models/silver/defi/dex/glyph/silver_dex__glyph_v4_swaps.sql b/models/silver/defi/dex/glyph/silver_dex__glyph_v4_swaps.sql new file mode 100644 index 0000000..fef1c36 --- /dev/null +++ b/models/silver/defi/dex/glyph/silver_dex__glyph_v4_swaps.sql @@ -0,0 +1,117 @@ +{{ config( + materialized = 'incremental', + incremental_strategy = 'delete+insert', + unique_key = 'block_number', + cluster_by = ['block_timestamp::DATE'], + tags = ['curated','reorg'] +) }} + +WITH swaps_base AS ( + + SELECT + l.block_number, + l.block_timestamp, + l.tx_hash, + l.event_index, + l.origin_function_signature, + l.origin_from_address, + l.origin_to_address, + l.contract_address, + regexp_substr_all(SUBSTR(DATA, 3, len(DATA)), '.{64}') AS segmented_data, + CONCAT('0x', SUBSTR(topic_1, 27, 40)) AS sender, + CONCAT('0x', SUBSTR(topic_2, 27, 40)) AS recipient, + utils.udf_hex_to_int( + 's2c', + segmented_data [0] :: STRING + ) :: FLOAT AS amount0_unadj, + utils.udf_hex_to_int( + 's2c', + segmented_data [1] :: STRING + ) :: FLOAT AS amount1_unadj, + utils.udf_hex_to_int( + 's2c', + segmented_data [2] :: STRING + ) :: FLOAT AS sqrtPriceX96, + utils.udf_hex_to_int( + 's2c', + segmented_data [3] :: STRING + ) :: FLOAT AS liquidity, + utils.udf_hex_to_int( + 's2c', + segmented_data [4] :: STRING + ) :: FLOAT AS tick, + token0, + token1, + pool_address, + tick_spacing, + fee, + CONCAT( + l.tx_hash, + '-', + l.event_index + ) AS _log_id, + l.modified_timestamp + FROM + {{ ref('core__fact_event_logs') }} + l + INNER JOIN {{ ref('silver_dex__glyph_v4_pools') }} + p + ON p.pool_address = l.contract_address + WHERE + topic_0 = '0xc42079f94a6350d7e6235f29174924f928cc2ac818eb64fed8004e115fbcca67' + AND tx_succeeded + +{% if is_incremental() %} +AND l.modified_timestamp >= ( + SELECT + MAX(modified_timestamp) - INTERVAL '12 hours' + FROM + {{ this }} +) +AND l.modified_timestamp >= SYSDATE() - INTERVAL '7 day' +{% endif %} +) + +SELECT + block_number, + block_timestamp, + tx_hash, + event_index, + origin_function_signature, + origin_from_address, + origin_to_address, + contract_address, + pool_address, + recipient, + recipient AS tx_to, + sender, + fee, + tick, + tick_spacing, + liquidity, + token0, + token1, + amount0_unadj, + amount1_unadj, + CASE + WHEN amount0_unadj > 0 THEN ABS(amount0_unadj) + ELSE ABS(amount1_unadj) + END AS amount_in_unadj, + CASE + WHEN amount0_unadj < 0 THEN ABS(amount0_unadj) + ELSE ABS(amount1_unadj) + END AS amount_out_unadj, + CASE + WHEN amount0_unadj > 0 THEN token0 + ELSE token1 + END AS token_in, + CASE + WHEN amount0_unadj < 0 THEN token0 + ELSE token1 + END AS token_out, + _log_id, + modified_timestamp +FROM + swaps_base qualify(ROW_NUMBER() over(PARTITION BY _log_id +ORDER BY + modified_timestamp DESC)) = 1 diff --git a/models/silver/defi/dex/silver_dex__complete_dex_liquidity_pools.sql b/models/silver/defi/dex/silver_dex__complete_dex_liquidity_pools.sql new file mode 100644 index 0000000..2e95562 --- /dev/null +++ b/models/silver/defi/dex/silver_dex__complete_dex_liquidity_pools.sql @@ -0,0 +1,869 @@ +{{ config( + materialized = 'incremental', + incremental_strategy = 'delete+insert', + unique_key = ['block_number','platform','version'], + cluster_by = ['block_timestamp::DATE','platform'], + post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(tx_hash, contract_address, pool_address, pool_name, tokens, symbols), SUBSTRING(pool_address, pool_name, tokens, symbols)", + tags = ['curated','reorg','heal'] +) }} + +WITH contracts AS ( + + SELECT + contract_address, + token_symbol, + token_decimals, + _inserted_timestamp + FROM + {{ ref('silver__contracts') }} +), +corex AS ( + SELECT + block_number, + block_timestamp, + tx_hash, + contract_address, + pool_address, + NULL as pool_name, + fee, + tick_spacing, + token0, + token1, + NULL as token2, + NULL as token3, + NULL as token4, + NULL as token5, + NULL as token6, + NULL as token7, + 'corex' AS platform, + 'v1' AS version, + _log_id AS _id, + _inserted_timestamp + FROM + {{ ref('silver_dex__corex_pools') }} + +{% if is_incremental() and 'corex' not in var('HEAL_MODELS') %} +WHERE + _inserted_timestamp >= ( + SELECT + MAX(_inserted_timestamp) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}' + FROM + {{ this }} + ) +{% endif %} +), +glyph_v4 AS ( + SELECT + block_number, + block_timestamp, + tx_hash, + contract_address, + pool_address, + NULL as pool_name, + fee, + tick_spacing, + token0, + token1, + NULL as token2, + NULL as token3, + NULL as token4, + NULL as token5, + NULL as token6, + NULL as token7, + 'glyph_v4' AS platform, + 'v4' AS version, + _log_id AS _id, + _inserted_timestamp + FROM + {{ ref('silver_dex__glyph_v4_pools') }} + +{% if is_incremental() and 'glyph_v4' not in var('HEAL_MODELS') %} +WHERE + _inserted_timestamp >= ( + SELECT + MAX(_inserted_timestamp) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}' + FROM + {{ this }} + ) +{% endif %} +), +sushi_v3 AS ( + SELECT + block_number, + block_timestamp, + tx_hash, + contract_address, + pool_address, + NULL as pool_name, + fee, + tick_spacing, + token0, + token1, + NULL as token2, + NULL as token3, + NULL as token4, + NULL as token5, + NULL as token6, + NULL as token7, + 'sushi_v3' AS platform, + 'v3' AS version, + _log_id AS _id, + modified_timestamp AS _inserted_timestamp + FROM + {{ ref('silver_dex__sushi_v3_pools') }} + +{% if is_incremental() and 'sushi_v3' not in var('HEAL_MODELS') %} +WHERE + _inserted_timestamp >= ( + SELECT + MAX(_inserted_timestamp) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}' + FROM + {{ this }} + ) +{% endif %} +), +all_pools AS ( + SELECT + * + FROM + corex +UNION ALL +SELECT + * +FROM + glyph_v4 +UNION ALL +SELECT + * +FROM + sushi_v3 +), +complete_lps AS ( + SELECT + block_number, + block_timestamp, + tx_hash, + p.contract_address, + pool_address, + CASE + WHEN pool_name IS NOT NULL THEN pool_name + WHEN pool_name IS NULL + AND platform IN ( + 'corex', + 'glyph_v4', + 'sushi_v3' + ) THEN CONCAT( + COALESCE( + c0.token_symbol, + CONCAT(SUBSTRING(token0, 1, 5), '...', SUBSTRING(token0, 39, 42)) + ), + '-', + COALESCE( + c1.token_symbol, + CONCAT(SUBSTRING(token1, 1, 5), '...', SUBSTRING(token1, 39, 42)) + ), + ' ', + COALESCE( + fee, + 0 + ), + ' ', + COALESCE( + tick_spacing, + 0 + ), + CASE + WHEN platform = 'corex' THEN 'COREX LP' + WHEN platform = 'glyph_v4' THEN 'GLYPH-V4 LP' + WHEN platform = 'sushi_v3' THEN 'SUSHI-V3 LP' + END + ) + WHEN pool_name IS NULL + AND platform IN ( + 'balancer', + 'curve' + ) THEN CONCAT( + COALESCE(c0.token_symbol, SUBSTRING(token0, 1, 5) || '...' || SUBSTRING(token0, 39, 42)), + CASE + WHEN token1 IS NOT NULL THEN '-' || COALESCE(c1.token_symbol, SUBSTRING(token1, 1, 5) || '...' || SUBSTRING(token1, 39, 42)) + ELSE '' + END, + CASE + WHEN token2 IS NOT NULL THEN '-' || COALESCE(c2.token_symbol, SUBSTRING(token2, 1, 5) || '...' || SUBSTRING(token2, 39, 42)) + ELSE '' + END, + CASE + WHEN token3 IS NOT NULL THEN '-' || COALESCE(c3.token_symbol, SUBSTRING(token3, 1, 5) || '...' || SUBSTRING(token3, 39, 42)) + ELSE '' + END, + CASE + WHEN token4 IS NOT NULL THEN '-' || COALESCE(c4.token_symbol, SUBSTRING(token4, 1, 5) || '...' || SUBSTRING(token4, 39, 42)) + ELSE '' + END, + CASE + WHEN token5 IS NOT NULL THEN '-' || COALESCE(c5.token_symbol, SUBSTRING(token5, 1, 5) || '...' || SUBSTRING(token5, 39, 42)) + ELSE '' + END, + CASE + WHEN token6 IS NOT NULL THEN '-' || COALESCE(c6.token_symbol, SUBSTRING(token6, 1, 5) || '...' || SUBSTRING(token6, 39, 42)) + ELSE '' + END, + CASE + WHEN token7 IS NOT NULL THEN '-' || COALESCE(c7.token_symbol, SUBSTRING(token7, 1, 5) || '...' || SUBSTRING(token7, 39, 42)) + ELSE '' + END + ) + ELSE CONCAT( + COALESCE( + c0.token_symbol, + CONCAT(SUBSTRING(token0, 1, 5), '...', SUBSTRING(token0, 39, 42)) + ), + '-', + COALESCE( + c1.token_symbol, + CONCAT(SUBSTRING(token1, 1, 5), '...', SUBSTRING(token1, 39, 42)) + ) + ) + END AS pool_name, + fee, + tick_spacing, + token0, + token1, + token2, + token3, + token4, + token5, + token6, + token7, + OBJECT_CONSTRUCT( + 'token0', + token0, + 'token1', + token1, + 'token2', + token2, + 'token3', + token3, + 'token4', + token4, + 'token5', + token5, + 'token6', + token6, + 'token7', + token7 + ) AS tokens, + OBJECT_CONSTRUCT( + 'token0', + c0.token_symbol, + 'token1', + c1.token_symbol, + 'token2', + c2.token_symbol, + 'token3', + c3.token_symbol, + 'token4', + c4.token_symbol, + 'token5', + c5.token_symbol, + 'token6', + c6.token_symbol, + 'token7', + c7.token_symbol + ) AS symbols, + OBJECT_CONSTRUCT( + 'token0', + c0.token_decimals, + 'token1', + c1.token_decimals, + 'token2', + c2.token_decimals, + 'token3', + c3.token_decimals, + 'token4', + c4.token_decimals, + 'token5', + c5.token_decimals, + 'token6', + c6.token_decimals, + 'token7', + c7.token_decimals + ) AS decimals, + platform, + version, + _id, + p._inserted_timestamp + FROM + all_pools p + LEFT JOIN contracts c0 + ON c0.contract_address = p.token0 + LEFT JOIN contracts c1 + ON c1.contract_address = p.token1 + LEFT JOIN contracts c2 + ON c2.contract_address = p.token2 + LEFT JOIN contracts c3 + ON c3.contract_address = p.token3 + LEFT JOIN contracts c4 + ON c4.contract_address = p.token4 + LEFT JOIN contracts c5 + ON c5.contract_address = p.token5 + LEFT JOIN contracts c6 + ON c6.contract_address = p.token6 + LEFT JOIN contracts c7 + ON c7.contract_address = p.token7 +), + +{% if is_incremental() and var( + 'HEAL_MODEL' +) %} +heal_model AS ( + SELECT + block_number, + block_timestamp, + tx_hash, + t0.contract_address, + pool_address, + CASE + WHEN pool_name IS NOT NULL THEN pool_name + WHEN pool_name IS NULL + AND platform IN ( + 'corex', + 'glyph_v4', + 'sushi_v3' + ) THEN CONCAT( + COALESCE( + c0.token_symbol, + CONCAT(SUBSTRING(token0, 1, 5), '...', SUBSTRING(token0, 39, 42)) + ), + '-', + COALESCE( + c1.token_symbol, + CONCAT(SUBSTRING(token1, 1, 5), '...', SUBSTRING(token1, 39, 42)) + ), + ' ', + COALESCE( + fee, + 0 + ), + ' ', + COALESCE( + tick_spacing, + 0 + ), + CASE + WHEN platform = 'corex' THEN 'COREX LP' + WHEN platform = 'glyph_v4' THEN 'GLYPH-V4 LP' + WHEN platform = 'sushi_v3' THEN 'SUSHI-V3 LP' + END + ) + WHEN pool_name IS NULL + AND platform IN ( + 'balancer', + 'curve' + ) THEN CONCAT( + COALESCE(c0.token_symbol, SUBSTRING(token0, 1, 5) || '...' || SUBSTRING(token0, 39, 42)), + CASE + WHEN token1 IS NOT NULL THEN '-' || COALESCE(c1.token_symbol, SUBSTRING(token1, 1, 5) || '...' || SUBSTRING(token1, 39, 42)) + ELSE '' + END, + CASE + WHEN token2 IS NOT NULL THEN '-' || COALESCE(c2.token_symbol, SUBSTRING(token2, 1, 5) || '...' || SUBSTRING(token2, 39, 42)) + ELSE '' + END, + CASE + WHEN token3 IS NOT NULL THEN '-' || COALESCE(c3.token_symbol, SUBSTRING(token3, 1, 5) || '...' || SUBSTRING(token3, 39, 42)) + ELSE '' + END, + CASE + WHEN token4 IS NOT NULL THEN '-' || COALESCE(c4.token_symbol, SUBSTRING(token4, 1, 5) || '...' || SUBSTRING(token4, 39, 42)) + ELSE '' + END, + CASE + WHEN token5 IS NOT NULL THEN '-' || COALESCE(c5.token_symbol, SUBSTRING(token5, 1, 5) || '...' || SUBSTRING(token5, 39, 42)) + ELSE '' + END, + CASE + WHEN token6 IS NOT NULL THEN '-' || COALESCE(c6.token_symbol, SUBSTRING(token6, 1, 5) || '...' || SUBSTRING(token6, 39, 42)) + ELSE '' + END, + CASE + WHEN token7 IS NOT NULL THEN '-' || COALESCE(c7.token_symbol, SUBSTRING(token7, 1, 5) || '...' || SUBSTRING(token7, 39, 42)) + ELSE '' + END + ) + ELSE CONCAT( + COALESCE( + c0.token_symbol, + CONCAT(SUBSTRING(token0, 1, 5), '...', SUBSTRING(token0, 39, 42)) + ), + '-', + COALESCE( + c1.token_symbol, + CONCAT(SUBSTRING(token1, 1, 5), '...', SUBSTRING(token1, 39, 42)) + ) + ) + END AS pool_name_heal, + fee, + tick_spacing, + token0, + token1, + token2, + token3, + token4, + token5, + token6, + token7, + tokens, + OBJECT_CONSTRUCT( + 'token0', + c0.token_symbol, + 'token1', + c1.token_symbol, + 'token2', + c2.token_symbol, + 'token3', + c3.token_symbol, + 'token4', + c4.token_symbol, + 'token5', + c5.token_symbol, + 'token6', + c6.token_symbol, + 'token7', + c7.token_symbol + ) AS symbols_heal, + OBJECT_CONSTRUCT( + 'token0', + c0.token_decimals, + 'token1', + c1.token_decimals, + 'token2', + c2.token_decimals, + 'token3', + c3.token_decimals, + 'token4', + c4.token_decimals, + 'token5', + c5.token_decimals, + 'token6', + c6.token_decimals, + 'token7', + c7.token_decimals + ) AS decimals_heal, + platform, + version, + _id, + t0._inserted_timestamp + FROM + {{ this }} + t0 + LEFT JOIN contracts c0 + ON c0.contract_address = t0.token0 + LEFT JOIN contracts c1 + ON c1.contract_address = t0.token1 + LEFT JOIN contracts c2 + ON c2.contract_address = t0.token2 + LEFT JOIN contracts c3 + ON c3.contract_address = t0.token3 + LEFT JOIN contracts c4 + ON c4.contract_address = t0.token4 + LEFT JOIN contracts c5 + ON c5.contract_address = t0.token5 + LEFT JOIN contracts c6 + ON c6.contract_address = t0.token6 + LEFT JOIN contracts c7 + ON c7.contract_address = t0.token7 + WHERE + CONCAT( + t0.block_number, + '-', + t0.platform, + '-', + t0.version + ) IN ( + SELECT + CONCAT( + t1.block_number, + '-', + t1.platform, + '-', + t1.version + ) + FROM + {{ this }} + t1 + WHERE + t1.decimals :token0 :: INT IS NULL + AND t1._inserted_timestamp < ( + SELECT + MAX( + _inserted_timestamp + ) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}' + FROM + {{ this }} + ) + AND EXISTS ( + SELECT + 1 + FROM + {{ ref('silver__contracts') }} C + WHERE + C._inserted_timestamp > DATEADD('DAY', -14, SYSDATE()) + AND C.token_decimals IS NOT NULL + AND C.contract_address = t1.tokens :token0 :: STRING) + GROUP BY + 1 + ) + OR CONCAT( + t0.block_number, + '-', + t0.platform, + '-', + t0.version + ) IN ( + SELECT + CONCAT( + t2.block_number, + '-', + t2.platform, + '-', + t2.version + ) + FROM + {{ this }} + t2 + WHERE + t2.decimals :token1 :: INT IS NULL + AND t2._inserted_timestamp < ( + SELECT + MAX( + _inserted_timestamp + ) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}' + FROM + {{ this }} + ) + AND EXISTS ( + SELECT + 1 + FROM + {{ ref('silver__contracts') }} C + WHERE + C._inserted_timestamp > DATEADD('DAY', -14, SYSDATE()) + AND C.token_decimals IS NOT NULL + AND C.contract_address = t2.tokens :token1 :: STRING) + GROUP BY + 1 + ) + OR CONCAT( + t0.block_number, + '-', + t0.platform, + '-', + t0.version + ) IN ( + SELECT + CONCAT( + t3.block_number, + '-', + t3.platform, + '-', + t3.version + ) + FROM + {{ this }} + t3 + WHERE + t3.decimals :token2 :: INT IS NULL + AND t3._inserted_timestamp < ( + SELECT + MAX( + _inserted_timestamp + ) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}' + FROM + {{ this }} + ) + AND EXISTS ( + SELECT + 1 + FROM + {{ ref('silver__contracts') }} C + WHERE + C._inserted_timestamp > DATEADD('DAY', -14, SYSDATE()) + AND C.token_decimals IS NOT NULL + AND C.contract_address = t3.tokens :token2 :: STRING) + GROUP BY + 1 + ) + OR CONCAT( + t0.block_number, + '-', + t0.platform, + '-', + t0.version + ) IN ( + SELECT + CONCAT( + t4.block_number, + '-', + t4.platform, + '-', + t4.version + ) + FROM + {{ this }} + t4 + WHERE + t4.decimals :token3 :: INT IS NULL + AND t4._inserted_timestamp < ( + SELECT + MAX( + _inserted_timestamp + ) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}' + FROM + {{ this }} + ) + AND EXISTS ( + SELECT + 1 + FROM + {{ ref('silver__contracts') }} C + WHERE + C._inserted_timestamp > DATEADD('DAY', -14, SYSDATE()) + AND C.token_decimals IS NOT NULL + AND C.contract_address = t4.tokens :token3 :: STRING) + GROUP BY + 1 + ) + OR CONCAT( + t0.block_number, + '-', + t0.platform, + '-', + t0.version + ) IN ( + SELECT + CONCAT( + t5.block_number, + '-', + t5.platform, + '-', + t5.version + ) + FROM + {{ this }} + t5 + WHERE + t5.decimals :token4 :: INT IS NULL + AND t5._inserted_timestamp < ( + SELECT + MAX( + _inserted_timestamp + ) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}' + FROM + {{ this }} + ) + AND EXISTS ( + SELECT + 1 + FROM + {{ ref('silver__contracts') }} C + WHERE + C._inserted_timestamp > DATEADD('DAY', -14, SYSDATE()) + AND C.token_decimals IS NOT NULL + AND C.contract_address = t5.tokens :token4 :: STRING) + GROUP BY + 1 + ) + OR CONCAT( + t0.block_number, + '-', + t0.platform, + '-', + t0.version + ) IN ( + SELECT + CONCAT( + t6.block_number, + '-', + t6.platform, + '-', + t6.version + ) + FROM + {{ this }} + t6 + WHERE + t6.decimals :token5 :: INT IS NULL + AND t6._inserted_timestamp < ( + SELECT + MAX( + _inserted_timestamp + ) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}' + FROM + {{ this }} + ) + AND EXISTS ( + SELECT + 1 + FROM + {{ ref('silver__contracts') }} C + WHERE + C._inserted_timestamp > DATEADD('DAY', -14, SYSDATE()) + AND C.token_decimals IS NOT NULL + AND C.contract_address = t6.tokens :token5 :: STRING) + GROUP BY + 1 + ) + OR CONCAT( + t0.block_number, + '-', + t0.platform, + '-', + t0.version + ) IN ( + SELECT + CONCAT( + t7.block_number, + '-', + t7.platform, + '-', + t7.version + ) + FROM + {{ this }} + t7 + WHERE + t7.decimals :token6 :: INT IS NULL + AND t7._inserted_timestamp < ( + SELECT + MAX( + _inserted_timestamp + ) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}' + FROM + {{ this }} + ) + AND EXISTS ( + SELECT + 1 + FROM + {{ ref('silver__contracts') }} C + WHERE + C._inserted_timestamp > DATEADD('DAY', -14, SYSDATE()) + AND C.token_decimals IS NOT NULL + AND C.contract_address = t7.tokens :token6 :: STRING) + GROUP BY + 1 + ) + OR CONCAT( + t0.block_number, + '-', + t0.platform, + '-', + t0.version + ) IN ( + SELECT + CONCAT( + t8.block_number, + '-', + t8.platform, + '-', + t8.version + ) + FROM + {{ this }} + t8 + WHERE + t8.decimals :token7 :: INT IS NULL + AND t8._inserted_timestamp < ( + SELECT + MAX( + _inserted_timestamp + ) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}' + FROM + {{ this }} + ) + AND EXISTS ( + SELECT + 1 + FROM + {{ ref('silver__contracts') }} C + WHERE + C._inserted_timestamp > DATEADD('DAY', -14, SYSDATE()) + AND C.token_decimals IS NOT NULL + AND C.contract_address = t8.tokens :token7 :: STRING) + GROUP BY + 1 + ) + ), + {% endif %} + + FINAL AS ( + SELECT + * + FROM + complete_lps + +{% if is_incremental() and var( + 'HEAL_MODEL' +) %} +UNION ALL +SELECT + block_number, + block_timestamp, + tx_hash, + contract_address, + pool_address, + pool_name_heal AS pool_name, + fee, + tick_spacing, + token0, + token1, + token2, + token3, + token4, + token5, + token6, + token7, + tokens, + symbols_heal AS symbols, + decimals_heal AS decimals, + platform, + version, + _id, + _inserted_timestamp +FROM + heal_model +{% endif %} +) +SELECT + block_number, + block_timestamp, + tx_hash, + platform, + version, + contract_address, + pool_address, + pool_name, + tokens, + symbols, + decimals, + fee, + tick_spacing, + token0, + token1, + token2, + token3, + token4, + token5, + token6, + token7, + _id, + _inserted_timestamp, + {{ dbt_utils.generate_surrogate_key( + ['pool_address'] + ) }} AS complete_dex_liquidity_pools_id, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp, + '{{ invocation_id }}' AS _invocation_id +FROM + FINAL diff --git a/models/silver/defi/dex/silver_dex__complete_dex_swaps.sql b/models/silver/defi/dex/silver_dex__complete_dex_swaps.sql new file mode 100644 index 0000000..789bafd --- /dev/null +++ b/models/silver/defi/dex/silver_dex__complete_dex_swaps.sql @@ -0,0 +1,512 @@ +-- depends_on: {{ ref('silver__complete_token_prices') }} +{{ config( + materialized = 'incremental', + incremental_strategy = 'delete+insert', + unique_key = ['block_number','platform','version'], + cluster_by = ['block_timestamp::DATE','platform'], + post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(tx_hash, origin_function_signature, origin_from_address, origin_to_address, contract_address, pool_name, event_name, sender, tx_to, token_in, token_out, symbol_in, symbol_out), SUBSTRING(origin_function_signature, pool_name, event_name, sender, tx_to, token_in, token_out, symbol_in, symbol_out)", + tags = ['curated','reorg','heal'] +) }} + +WITH corex AS ( + + SELECT + block_number, + block_timestamp, + tx_hash, + origin_function_signature, + origin_from_address, + origin_to_address, + pool_address AS contract_address, + 'Swap' AS event_name, + CASE + WHEN amount0_unadj > 0 THEN ABS(amount0_unadj) + ELSE ABS(amount1_unadj) + END AS amount_in_unadj, + CASE + WHEN amount0_unadj < 0 THEN ABS(amount0_unadj) + ELSE ABS(amount1_unadj) + END AS amount_out_unadj, + CASE + WHEN amount0_unadj > 0 THEN token0_address + ELSE token1_address + END AS token_in, + CASE + WHEN amount0_unadj < 0 THEN token0_address + ELSE token1_address + END AS token_out, + sender, + recipient AS tx_to, + event_index, + 'corex' AS platform, + 'v1' AS version, + _log_id, + _inserted_timestamp + FROM + {{ ref('silver_dex__corex_swaps') }} + +{% if is_incremental() and 'corex' not in var('HEAL_MODELS') %} +WHERE + _inserted_timestamp >= ( + SELECT + MAX(_inserted_timestamp) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}' + FROM + {{ this }} + ) +{% endif %} +), + +all_dex AS ( + SELECT + * + FROM + corex +), +complete_dex_swaps AS ( + SELECT + s.block_number, + s.block_timestamp, + s.tx_hash, + origin_function_signature, + origin_from_address, + origin_to_address, + s.contract_address, + event_name, + token_in, + c1.token_decimals AS decimals_in, + c1.token_symbol AS symbol_in, + amount_in_unadj, + CASE + WHEN decimals_in IS NULL THEN amount_in_unadj + ELSE (amount_in_unadj / pow(10, decimals_in)) + END AS amount_in, + CASE + WHEN decimals_in IS NOT NULL THEN amount_in * p1.price + ELSE NULL + END AS amount_in_usd, + token_out, + c2.token_decimals AS decimals_out, + c2.token_symbol AS symbol_out, + amount_out_unadj, + CASE + WHEN decimals_out IS NULL THEN amount_out_unadj + ELSE (amount_out_unadj / pow(10, decimals_out)) + END AS amount_out, + CASE + WHEN decimals_out IS NOT NULL THEN amount_out * p2.price + ELSE NULL + END AS amount_out_usd, + CASE + WHEN lp.pool_name IS NULL THEN CONCAT( + LEAST( + COALESCE( + symbol_in, + CONCAT(SUBSTRING(token_in, 1, 5), '...', SUBSTRING(token_in, 39, 42)) + ), + COALESCE( + symbol_out, + CONCAT(SUBSTRING(token_out, 1, 5), '...', SUBSTRING(token_out, 39, 42)) + ) + ), + '-', + GREATEST( + COALESCE( + symbol_in, + CONCAT(SUBSTRING(token_in, 1, 5), '...', SUBSTRING(token_in, 39, 42)) + ), + COALESCE( + symbol_out, + CONCAT(SUBSTRING(token_out, 1, 5), '...', SUBSTRING(token_out, 39, 42)) + ) + ) + ) + ELSE lp.pool_name + END AS pool_name, + sender, + tx_to, + event_index, + s.platform, + s.version, + s._log_id, + s._inserted_timestamp + FROM + all_dex s + LEFT JOIN {{ ref('silver__contracts') }} + c1 + ON s.token_in = c1.contract_address + LEFT JOIN {{ ref('silver__contracts') }} + c2 + ON s.token_out = c2.contract_address + LEFT JOIN {{ ref('price__ez_prices_hourly') }} + p1 + ON s.token_in = p1.token_address + AND DATE_TRUNC( + 'hour', + block_timestamp + ) = p1.hour + LEFT JOIN {{ ref('price__ez_prices_hourly') }} + p2 + ON s.token_out = p2.token_address + AND DATE_TRUNC( + 'hour', + block_timestamp + ) = p2.hour + LEFT JOIN {{ ref('silver_dex__complete_dex_liquidity_pools') }} + lp + ON s.contract_address = lp.pool_address +), + +{% if is_incremental() and var( + 'HEAL_MODEL' +) %} +heal_model AS ( + SELECT + t0.block_number, + t0.block_timestamp, + t0.tx_hash, + origin_function_signature, + origin_from_address, + origin_to_address, + t0.contract_address, + event_name, + token_in, + c1.token_decimals AS decimals_in, + c1.token_symbol AS symbol_in, + amount_in_unadj, + CASE + WHEN c1.token_decimals IS NULL THEN amount_in_unadj + ELSE (amount_in_unadj / pow(10, c1.token_decimals)) + END AS amount_in_heal, + CASE + WHEN c1.token_decimals IS NOT NULL THEN amount_in_heal * p1.price + ELSE NULL + END AS amount_in_usd_heal, + token_out, + c2.token_decimals AS decimals_out, + c2.token_symbol AS symbol_out, + amount_out_unadj, + CASE + WHEN c2.token_decimals IS NULL THEN amount_out_unadj + ELSE (amount_out_unadj / pow(10, c2.token_decimals)) + END AS amount_out_heal, + CASE + WHEN c2.token_decimals IS NOT NULL THEN amount_out_heal * p2.price + ELSE NULL + END AS amount_out_usd_heal, + CASE + WHEN lp.pool_name IS NULL THEN CONCAT( + LEAST( + COALESCE( + c1.token_symbol, + CONCAT(SUBSTRING(token_in, 1, 5), '...', SUBSTRING(token_in, 39, 42)) + ), + COALESCE( + c2.token_symbol, + CONCAT(SUBSTRING(token_out, 1, 5), '...', SUBSTRING(token_out, 39, 42)) + ) + ), + '-', + GREATEST( + COALESCE( + c1.token_symbol, + CONCAT(SUBSTRING(token_in, 1, 5), '...', SUBSTRING(token_in, 39, 42)) + ), + COALESCE( + c2.token_symbol, + CONCAT(SUBSTRING(token_out, 1, 5), '...', SUBSTRING(token_out, 39, 42)) + ) + ) + ) + ELSE lp.pool_name + END AS pool_name_heal, + sender, + tx_to, + event_index, + t0.platform, + t0.version, + t0._log_id, + t0._inserted_timestamp + FROM + {{ this }} + t0 + LEFT JOIN {{ ref('silver__contracts') }} + c1 + ON t0.token_in = c1.contract_address + LEFT JOIN {{ ref('silver__contracts') }} + c2 + ON t0.token_out = c2.contract_address + LEFT JOIN {{ ref('price__ez_prices_hourly') }} + p1 + ON t0.token_in = p1.token_address + AND DATE_TRUNC( + 'hour', + block_timestamp + ) = p1.hour + LEFT JOIN {{ ref('price__ez_prices_hourly') }} + p2 + ON t0.token_out = p2.token_address + AND DATE_TRUNC( + 'hour', + block_timestamp + ) = p2.hour + LEFT JOIN {{ ref('silver_dex__complete_dex_liquidity_pools') }} + lp + ON t0.contract_address = lp.pool_address + WHERE + CONCAT( + t0.block_number, + '-', + t0.platform, + '-', + t0.version + ) IN ( + SELECT + CONCAT( + t1.block_number, + '-', + t1.platform, + '-', + t1.version + ) + FROM + {{ this }} + t1 + WHERE + t1.decimals_in IS NULL + AND t1._inserted_timestamp < ( + SELECT + MAX( + _inserted_timestamp + ) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}' + FROM + {{ this }} + ) + AND EXISTS ( + SELECT + 1 + FROM + {{ ref('silver__contracts') }} C + WHERE + C._inserted_timestamp > DATEADD('DAY', -14, SYSDATE()) + AND C.token_decimals IS NOT NULL + AND C.contract_address = t1.token_in) + GROUP BY + 1 + ) + OR CONCAT( + t0.block_number, + '-', + t0.platform, + '-', + t0.version + ) IN ( + SELECT + CONCAT( + t2.block_number, + '-', + t2.platform, + '-', + t2.version + ) + FROM + {{ this }} + t2 + WHERE + t2.decimals_out IS NULL + AND t2._inserted_timestamp < ( + SELECT + MAX( + _inserted_timestamp + ) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}' + FROM + {{ this }} + ) + AND EXISTS ( + SELECT + 1 + FROM + {{ ref('silver__contracts') }} C + WHERE + C._inserted_timestamp > DATEADD('DAY', -14, SYSDATE()) + AND C.token_decimals IS NOT NULL + AND C.contract_address = t2.token_out) + GROUP BY + 1 + ) + OR CONCAT( + t0.block_number, + '-', + t0.platform, + '-', + t0.version + ) IN ( + SELECT + CONCAT( + t3.block_number, + '-', + t3.platform, + '-', + t3.version + ) + FROM + {{ this }} + t3 + WHERE + t3.amount_in_usd IS NULL + AND t3._inserted_timestamp < ( + SELECT + MAX( + _inserted_timestamp + ) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}' + FROM + {{ this }} + ) + AND EXISTS ( + SELECT + 1 + FROM + {{ ref('silver__complete_token_prices') }} + p + WHERE + p._inserted_timestamp > DATEADD('DAY', -14, SYSDATE()) + AND p.price IS NOT NULL + AND p.token_address = t3.token_in + AND p.hour = DATE_TRUNC( + 'hour', + t3.block_timestamp + ) + ) + GROUP BY + 1 + ) + OR CONCAT( + t0.block_number, + '-', + t0.platform, + '-', + t0.version + ) IN ( + SELECT + CONCAT( + t4.block_number, + '-', + t4.platform, + '-', + t4.version + ) + FROM + {{ this }} + t4 + WHERE + t4.amount_out_usd IS NULL + AND t4._inserted_timestamp < ( + SELECT + MAX( + _inserted_timestamp + ) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}' + FROM + {{ this }} + ) + AND EXISTS ( + SELECT + 1 + FROM + {{ ref('silver__complete_token_prices') }} + p + WHERE + p._inserted_timestamp > DATEADD('DAY', -14, SYSDATE()) + AND p.price IS NOT NULL + AND p.token_address = t4.token_out + AND p.hour = DATE_TRUNC( + 'hour', + t4.block_timestamp + ) + ) + GROUP BY + 1 + ) + ), + {% endif %} + + FINAL AS ( + SELECT + * + FROM + complete_dex_swaps + +{% if is_incremental() and var( + 'HEAL_MODEL' +) %} +UNION ALL +SELECT + block_number, + block_timestamp, + tx_hash, + origin_function_signature, + origin_from_address, + origin_to_address, + contract_address, + event_name, + token_in, + decimals_in, + symbol_in, + amount_in_unadj, + amount_in_heal AS amount_in, + amount_in_usd_heal AS amount_in_usd, + token_out, + decimals_out, + symbol_out, + amount_out_unadj, + amount_out_heal AS amount_out, + amount_out_usd_heal AS amount_out_usd, + pool_name_heal AS pool_name, + sender, + tx_to, + event_index, + platform, + version, + _log_id, + _inserted_timestamp +FROM + heal_model +{% endif %} +) +SELECT + block_number, + block_timestamp, + tx_hash, + origin_function_signature, + origin_from_address, + origin_to_address, + contract_address, + pool_name, + event_name, + amount_in_unadj, + amount_in, + amount_in_usd, + amount_out_unadj, + amount_out, + amount_out_usd, + sender, + tx_to, + event_index, + platform, + version, + token_in, + token_out, + symbol_in, + symbol_out, + decimals_in, + decimals_out, + _log_id, + _inserted_timestamp, + {{ dbt_utils.generate_surrogate_key( + ['tx_hash','event_index'] + ) }} AS complete_dex_swaps_id, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp, + '{{ invocation_id }}' AS _invocation_id +FROM + FINAL qualify (ROW_NUMBER() over (PARTITION BY _log_id +ORDER BY + _inserted_timestamp DESC)) = 1 diff --git a/models/silver/defi/dex/sushi/silver_dex__sushi_v3_pools.sql b/models/silver/defi/dex/sushi/silver_dex__sushi_v3_pools.sql new file mode 100644 index 0000000..776b9ee --- /dev/null +++ b/models/silver/defi/dex/sushi/silver_dex__sushi_v3_pools.sql @@ -0,0 +1,109 @@ +{{ config( + materialized = 'incremental', + incremental_strategy = 'delete+insert', + unique_key = 'pool_address', + cluster_by = ['block_timestamp::DATE'], + tags = ['curated'] +) }} + +WITH created_pools AS ( + + SELECT + block_number, + block_timestamp, + tx_hash, + event_index, + contract_address, + regexp_substr_all(SUBSTR(DATA, 3, len(DATA)), '.{64}') AS segmented_data, + LOWER(CONCAT('0x', SUBSTR(topics [1] :: STRING, 27, 40))) AS token0, + LOWER(CONCAT('0x', SUBSTR(topics [2] :: STRING, 27, 40))) AS token1, + utils.udf_hex_to_int( + 's2c', + topics [3] :: STRING + ) :: INTEGER AS fee, + utils.udf_hex_to_int( + 's2c', + segmented_data [0] :: STRING + ) :: INTEGER AS tick_spacing, + CONCAT('0x', SUBSTR(segmented_data [1] :: STRING, 25, 40)) AS pool_address, + CONCAT( + tx_hash, + '-', + event_index + ) AS _log_id, + modified_timestamp + FROM + {{ ref('core__fact_event_logs') }} + WHERE + topic_0 = '0x783cca1c0412dd0d695e784568c96da2e9c22ff989357a2e8b1d9b2b4e6b7118' + AND contract_address = '0xc35dadb65012ec5796536bd9864ed8773abc74c4' --Sushi/UniswapV3Factory + AND tx_succeeded + +{% if is_incremental() %} +AND modified_timestamp >= ( + SELECT + MAX(modified_timestamp) - INTERVAL '12 hours' + FROM + {{ this }} +) +AND modified_timestamp >= SYSDATE() - INTERVAL '7 day' +{% endif %} +), +initial_info AS ( + SELECT + contract_address, + regexp_substr_all(SUBSTR(DATA, 3, len(DATA)), '.{64}') AS segmented_data, + utils.udf_hex_to_int('s2c', CONCAT('0x', segmented_data [0] :: STRING)) :: FLOAT AS init_sqrtPriceX96, + utils.udf_hex_to_int('s2c', CONCAT('0x', segmented_data [1] :: STRING)) :: FLOAT AS init_tick, + pow( + 1.0001, + init_tick + ) AS init_price_1_0_unadj, + CONCAT( + tx_hash, + '-', + event_index + ) AS _log_id, + modified_timestamp + FROM + {{ ref('core__fact_event_logs') }} + WHERE + topic_0 = '0x98636036cb66a9c19a37435efc1e90142190214e8abeb821bdba3f2990dd4c95' + AND tx_succeeded + +{% if is_incremental() %} +AND modified_timestamp >= ( + SELECT + MAX(modified_timestamp) - INTERVAL '12 hours' + FROM + {{ this }} +) +AND modified_timestamp >= SYSDATE() - INTERVAL '7 day' +{% endif %} +) +SELECT + block_number, + block_timestamp, + tx_hash, + event_index, + p.contract_address, + token0, + token1, + fee :: INTEGER AS fee, + ( + fee / 10000 + ) :: FLOAT AS fee_percent, + tick_spacing, + pool_address, + COALESCE( + init_tick, + 0 + ) AS init_tick, + p._log_id, + p.modified_timestamp +FROM + created_pools p + LEFT JOIN initial_info i + ON p.pool_address = i.contract_address qualify(ROW_NUMBER() over(PARTITION BY pool_address +ORDER BY + p.modified_timestamp DESC)) = 1 diff --git a/models/silver/defi/dex/sushi/silver_dex__sushi_v3_swaps.sql b/models/silver/defi/dex/sushi/silver_dex__sushi_v3_swaps.sql new file mode 100644 index 0000000..8aa9cfe --- /dev/null +++ b/models/silver/defi/dex/sushi/silver_dex__sushi_v3_swaps.sql @@ -0,0 +1,117 @@ +{{ config( + materialized = 'incremental', + incremental_strategy = 'delete+insert', + unique_key = 'block_number', + cluster_by = ['block_timestamp::DATE'], + tags = ['curated','reorg'] +) }} + +WITH swaps_base AS ( + + SELECT + l.block_number, + l.block_timestamp, + l.tx_hash, + l.event_index, + l.origin_function_signature, + l.origin_from_address, + l.origin_to_address, + l.contract_address, + regexp_substr_all(SUBSTR(DATA, 3, len(DATA)), '.{64}') AS segmented_data, + CONCAT('0x', SUBSTR(topic_1, 27, 40)) AS sender, + CONCAT('0x', SUBSTR(topic_2, 27, 40)) AS recipient, + utils.udf_hex_to_int( + 's2c', + segmented_data [0] :: STRING + ) :: FLOAT AS amount0_unadj, + utils.udf_hex_to_int( + 's2c', + segmented_data [1] :: STRING + ) :: FLOAT AS amount1_unadj, + utils.udf_hex_to_int( + 's2c', + segmented_data [2] :: STRING + ) :: FLOAT AS sqrtPriceX96, + utils.udf_hex_to_int( + 's2c', + segmented_data [3] :: STRING + ) :: FLOAT AS liquidity, + utils.udf_hex_to_int( + 's2c', + segmented_data [4] :: STRING + ) :: FLOAT AS tick, + token0_address, + token1_address, + pool_address, + tick_spacing, + fee, + CONCAT( + l.tx_hash, + '-', + l.event_index + ) AS _log_id, + l.modified_timestamp + FROM + {{ ref('core__fact_event_logs') }} + l + INNER JOIN {{ ref('silver_dex__sushi_v3_pools') }} + p + ON p.pool_address = l.contract_address + WHERE + l.block_timestamp :: DATE >= '2023-04-01' + AND topic_0 = '0xc42079f94a6350d7e6235f29174924f928cc2ac818eb64fed8004e115fbcca67' + AND tx_succeeded + +{% if is_incremental() %} +AND l.modified_timestamp >= ( + SELECT + MAX(modified_timestamp) - INTERVAL '12 hours' + FROM + {{ this }} +) +AND l.modified_timestamp >= SYSDATE() - INTERVAL '7 day' +{% endif %} +) +SELECT + block_number, + block_timestamp, + tx_hash, + event_index, + origin_function_signature, + origin_from_address, + origin_to_address, + contract_address, + pool_address, + recipient, + recipient AS tx_to, + sender, + fee, + tick, + tick_spacing, + liquidity, + token0_address, + token1_address, + amount0_unadj, + amount1_unadj, + CASE + WHEN amount0_unadj > 0 THEN ABS(amount0_unadj) + ELSE ABS(amount1_unadj) + END AS amount_in_unadj, + CASE + WHEN amount0_unadj < 0 THEN ABS(amount0_unadj) + ELSE ABS(amount1_unadj) + END AS amount_out_unadj, + CASE + WHEN amount0_unadj > 0 THEN token0_address + ELSE token1_address + END AS token_in, + CASE + WHEN amount0_unadj < 0 THEN token0_address + ELSE token1_address + END AS token_out, + _log_id, + modified_timestamp +FROM + swaps_base qualify(ROW_NUMBER() over(PARTITION BY _log_id +ORDER BY + modified_timestamp DESC)) = 1