From c3388f4e6eb8f8552a0d988e77dc70547a32baac Mon Sep 17 00:00:00 2001 From: Sam <110511194+sam-xyz@users.noreply.github.com> Date: Fri, 8 Mar 2024 09:34:48 +0800 Subject: [PATCH] add aggregator logic (#151) * aggregator logic * more filters --- .../nft/sales/silver__aggregator_list.sql | 49 ++ .../nft/sales/silver__complete_nft_sales.sql | 656 ++++++++++++++++++ .../silver__complete_nft_sales.yml | 0 .../{ => sales}/silver__seaport_1_5_sales.sql | 0 .../{ => sales}/silver__seaport_1_5_sales.yml | 0 .../nft/{ => sales}/silver__zeroex_sales.sql | 0 .../nft/{ => sales}/silver__zeroex_sales.yml | 0 .../silver/nft/silver__complete_nft_sales.sql | 418 ----------- models/silver/nft/silver__nft_transfers.sql | 176 ++--- 9 files changed, 777 insertions(+), 522 deletions(-) create mode 100644 models/silver/nft/sales/silver__aggregator_list.sql create mode 100644 models/silver/nft/sales/silver__complete_nft_sales.sql rename models/silver/nft/{ => sales}/silver__complete_nft_sales.yml (100%) rename models/silver/nft/{ => sales}/silver__seaport_1_5_sales.sql (100%) rename models/silver/nft/{ => sales}/silver__seaport_1_5_sales.yml (100%) rename models/silver/nft/{ => sales}/silver__zeroex_sales.sql (100%) rename models/silver/nft/{ => sales}/silver__zeroex_sales.yml (100%) delete mode 100644 models/silver/nft/silver__complete_nft_sales.sql diff --git a/models/silver/nft/sales/silver__aggregator_list.sql b/models/silver/nft/sales/silver__aggregator_list.sql new file mode 100644 index 0000000..2b2b80b --- /dev/null +++ b/models/silver/nft/sales/silver__aggregator_list.sql @@ -0,0 +1,49 @@ +{{ config( + materialized = 'incremental', + unique_key = 'aggregator_identifier', + merge_update_columns = ['aggregator_identifier', 'aggregator', 'aggregator_type'], + full_refresh = false +) }} + + +WITH calldata_aggregators AS ( + SELECT + * + FROM + ( + VALUES + ('0', '0', 'calldata', '2020-01-01') + ) t (aggregator_identifier, aggregator, aggregator_type, _inserted_timestamp) +), + +platform_routers as ( +SELECT + * + FROM + ( + VALUES + ('0x66950320086664429c69735318724ae24ec0d835', 'element', 'router', '2024-03-07') + + ) t (aggregator_identifier, aggregator, aggregator_type, _inserted_timestamp) +), + +combined as ( +SELECT * +FROM + calldata_aggregators + +UNION ALL + +SELECT * +FROM + platform_routers +) + +SELECT + aggregator_identifier, + aggregator, + aggregator_type, + _inserted_timestamp +FROM combined + +qualify row_number() over (partition by aggregator_identifier order by _inserted_timestamp desc ) = 1 \ No newline at end of file diff --git a/models/silver/nft/sales/silver__complete_nft_sales.sql b/models/silver/nft/sales/silver__complete_nft_sales.sql new file mode 100644 index 0000000..7052801 --- /dev/null +++ b/models/silver/nft/sales/silver__complete_nft_sales.sql @@ -0,0 +1,656 @@ +{{ config( + materialized = 'incremental', + incremental_strategy = 'delete+insert', + unique_key = ['block_number','platform_exchange_version'], + cluster_by = ['block_timestamp::DATE'], + tags = ['curated','reorg', 'heal'] +) }} + +WITH nft_base_models AS ( + + SELECT + block_number, + block_timestamp, + tx_hash, + event_index, + event_type, + platform_address, + platform_name, + platform_exchange_version, + seller_address, + buyer_address, + nft_address, + erc1155_value :: STRING AS erc1155_value, + tokenId, + currency_address, + total_price_raw, + total_fees_raw, + platform_fee_raw, + creator_fee_raw, + tx_fee, + origin_from_address, + origin_to_address, + origin_function_signature, + input_data, + nft_log_id, + _log_id, + _inserted_timestamp + FROM + {{ ref('silver__seaport_1_5_sales') }} + +{% if is_incremental() %} +WHERE + _inserted_timestamp >= ( + SELECT + MAX( + _inserted_timestamp + ) - INTERVAL '36 hours' + FROM + {{ this }} + ) +{% endif %} +UNION ALL +SELECT + block_number, + block_timestamp, + tx_hash, + event_index, + event_type, + platform_address, + platform_name, + platform_exchange_version, + seller_address, + buyer_address, + nft_address, + erc1155_value :: STRING AS erc1155_value, + tokenId, + currency_address, + total_price_raw, + total_fees_raw, + platform_fee_raw, + creator_fee_raw, + tx_fee, + origin_from_address, + origin_to_address, + origin_function_signature, + input_data, + nft_log_id, + _log_id, + _inserted_timestamp +FROM + {{ ref('silver__zeroex_sales') }} + +{% if is_incremental() %} +WHERE + _inserted_timestamp >= ( + SELECT + MAX( + _inserted_timestamp + ) - INTERVAL '36 hours' + FROM + {{ this }} + ) +{% endif %} +), +prices_raw AS ( + SELECT + HOUR, + symbol, + token_address, + decimals, + price AS hourly_prices + FROM + {{ ref('price__ez_hourly_token_prices') }} + WHERE + token_address IN ( + SELECT + DISTINCT currency_address + FROM + nft_base_models + ) + AND HOUR :: DATE IN ( + SELECT + DISTINCT block_timestamp :: DATE + FROM + nft_base_models + ) + AND token_address != '0x4200000000000000000000000000000000000006' +), +eth_price AS ( + SELECT + HOUR, + 'ETH' AS symbol, + 'ETH' AS token_address, + 18 AS decimals, + price AS eth_price_hourly + FROM + {{ ref('silver__hourly_prices_priority_eth') }} + WHERE + HOUR :: DATE IN ( + SELECT + DISTINCT block_timestamp :: DATE + FROM + nft_base_models + ) +), +all_prices AS ( + SELECT + HOUR, + symbol, + token_address, + decimals, + hourly_prices + FROM + prices_raw + UNION ALL + SELECT + HOUR, + symbol, + token_address, + decimals, + eth_price_hourly + FROM + eth_price + UNION ALL + SELECT + HOUR, + 'WETH' AS symbol, + '0x4200000000000000000000000000000000000006' AS token_address, + decimals, + eth_price_hourly + FROM + eth_price +), +contracts_decimal AS ( + SELECT + contract_address AS address_contracts, + token_symbol AS symbol_contracts, + token_decimals AS decimals_contracts + FROM + {{ ref('silver__contracts') }} + WHERE + contract_address IN ( + SELECT + currency_address + FROM + nft_base_models + ) +), +final_base AS ( + SELECT + block_number, + block_timestamp, + tx_hash, + event_index, + event_type, + platform_address, + platform_name, + platform_exchange_version, + NULL AS aggregator_name, + seller_address, + buyer_address, + nft_address, + erc1155_value, + tokenId, + COALESCE( + p.symbol, + symbol_contracts + ) AS currency_symbol, + currency_address, + total_price_raw, + total_fees_raw, + platform_fee_raw, + creator_fee_raw, + CASE + WHEN currency_address IN ( + 'ETH', + '0x4200000000000000000000000000000000000006' + ) THEN total_price_raw / pow( + 10, + 18 + ) + ELSE COALESCE ( + total_price_raw / pow(10, COALESCE(p.decimals, decimals_contracts)), + total_price_raw + ) + END AS price, + IFF( + COALESCE( + p.decimals, + decimals_contracts + ) IS NULL, + 0, + price * COALESCE( + hourly_prices, + 0 + ) + ) AS price_usd, + CASE + WHEN currency_address IN ( + 'ETH', + '0x4200000000000000000000000000000000000006' + ) THEN total_fees_raw / pow( + 10, + 18 + ) + ELSE COALESCE ( + total_fees_raw / pow(10, COALESCE(p.decimals, decimals_contracts)), + total_fees_raw + ) + END AS total_fees, + IFF( + COALESCE( + p.decimals, + decimals_contracts + ) IS NULL, + 0, + total_fees * COALESCE( + hourly_prices, + 0 + ) + ) AS total_fees_usd, + CASE + WHEN currency_address IN ( + 'ETH', + '0x4200000000000000000000000000000000000006' + ) THEN platform_fee_raw / pow( + 10, + 18 + ) + ELSE COALESCE ( + platform_fee_raw / pow(10, COALESCE(p.decimals, decimals_contracts)), + platform_fee_raw + ) + END AS platform_fee, + IFF( + COALESCE( + p.decimals, + decimals_contracts + ) IS NULL, + 0, + platform_fee * COALESCE( + hourly_prices, + 0 + ) + ) AS platform_fee_usd, + CASE + WHEN currency_address IN ( + 'ETH', + '0x4200000000000000000000000000000000000006' + ) THEN creator_fee_raw / pow( + 10, + 18 + ) + ELSE COALESCE ( + creator_fee_raw / pow(10, COALESCE(p.decimals, decimals_contracts)), + creator_fee_raw + ) + END AS creator_fee, + IFF( + COALESCE( + p.decimals, + decimals_contracts + ) IS NULL, + 0, + creator_fee * COALESCE( + hourly_prices, + 0 + ) + ) AS creator_fee_usd, + tx_fee, + tx_fee * eth_price_hourly AS tx_fee_usd, + origin_from_address, + origin_to_address, + origin_function_signature, + nft_log_id, + input_data, + _log_id, + b._inserted_timestamp + FROM + nft_base_models b + LEFT JOIN all_prices p + ON DATE_TRUNC( + 'hour', + b.block_timestamp + ) = p.hour + AND b.currency_address = p.token_address + LEFT JOIN eth_price e + ON DATE_TRUNC( + 'hour', + b.block_timestamp + ) = e.hour + LEFT JOIN contracts_decimal C + ON b.currency_address = C.address_contracts +), + +{% if is_incremental() and var( + 'HEAL_MODEL' +) %} +heal_model AS ( + SELECT + block_number, + block_timestamp, + tx_hash, + event_index, + event_type, + platform_address, + COALESCE( + a2.aggregator, + platform_name + ) AS platform_name, + platform_exchange_version, + COALESCE( + aggregator_name, + A.aggregator + ) AS aggregator_name, + seller_address, + buyer_address, + nft_address, + C.token_name AS project_name, + erc1155_value, + tokenId, + currency_symbol, + currency_address, + total_price_raw, + total_fees_raw, + platform_fee_raw, + creator_fee_raw, + price, + price_usd, + total_fees, + total_fees_usd, + platform_fee, + platform_fee_usd, + creator_fee, + creator_fee_usd, + tx_fee, + tx_fee_usd, + origin_from_address, + origin_to_address, + origin_function_signature, + nft_log_id, + input_data, + _log_id, + t._inserted_timestamp + FROM + {{ this }} + t + LEFT JOIN {{ ref('silver__contracts') }} C + ON t.nft_address = C.contract_address + LEFT JOIN {{ ref('silver__aggregator_list') }} A + ON RIGHT( + t.input_data, + 8 + ) = A.aggregator_identifier + AND aggregator_type = 'calldata' + LEFT JOIN {{ ref('silver__aggregator_list') }} + a2 + ON t.origin_to_address = a2.aggregator_identifier + AND a2.aggregator_type = 'router' + WHERE + ( + t.block_number IN ( + SELECT + DISTINCT t1.block_number AS block_number + FROM + {{ this }} + t1 + WHERE + t1.project_name IS NULL + AND _inserted_timestamp < ( + SELECT + MAX( + _inserted_timestamp + ) - INTERVAL '36 hours' + FROM + {{ this }} + ) + AND EXISTS ( + SELECT + 1 + FROM + {{ ref('silver__contracts') }} C + WHERE + C._inserted_timestamp > DATEADD('DAY', -14, SYSDATE()) + AND C.token_name IS NOT NULL + AND C.contract_address = t1.nft_address) + ) + ) + OR ( + t.block_number IN ( + SELECT + DISTINCT t1.block_number AS block_number + FROM + {{ this }} + t1 + WHERE + t1.aggregator_name IS NULL + AND _inserted_timestamp < ( + SELECT + MAX( + _inserted_timestamp + ) - INTERVAL '36 hours' + FROM + {{ this }} + ) + AND EXISTS ( + SELECT + 1 + FROM + {{ ref('silver__aggregator_list') }} A + WHERE + A._inserted_timestamp > DATEADD('DAY', -2, SYSDATE()) + AND A.aggregator_type = 'calldata' + AND RIGHT( + t1.input_data, + 8 + ) = A.aggregator_identifier + ) + ) + ) + OR ( + t.block_number IN ( + SELECT + DISTINCT t1.block_number AS block_number + FROM + {{ this }} + t1 + WHERE + t1.origin_to_address IN ( + SELECT + aggregator_identifier + FROM + {{ ref('silver__aggregator_list') }} + WHERE + aggregator_type = 'router' + AND _inserted_timestamp >= DATEADD('DAY', -2, SYSDATE())) + AND _inserted_timestamp < ( + SELECT + MAX( + _inserted_timestamp + ) - INTERVAL '36 hours' + FROM + {{ this }} + ) + AND EXISTS ( + SELECT + 1 + FROM + {{ ref('silver__aggregator_list') }} + a2 + WHERE + a2._inserted_timestamp > DATEADD('DAY', -2, SYSDATE()) + AND t1.origin_to_address = a2.aggregator_identifier + AND a2.aggregator_type = 'router') + ) + ) + ), + {% endif %} + + combined AS ( + SELECT + block_number, + block_timestamp, + tx_hash, + event_index, + event_type, + platform_address, + COALESCE( + a2.aggregator, + platform_name + ) AS platform_name, + platform_exchange_version, + COALESCE( + aggregator_name, + A.aggregator + ) AS aggregator_name, + seller_address, + buyer_address, + nft_address, + C.token_name AS project_name, + erc1155_value, + tokenId, + currency_symbol, + currency_address, + total_price_raw, + total_fees_raw, + platform_fee_raw, + creator_fee_raw, + price, + price_usd, + total_fees, + total_fees_usd, + platform_fee, + platform_fee_usd, + creator_fee, + creator_fee_usd, + tx_fee, + tx_fee_usd, + origin_from_address, + origin_to_address, + origin_function_signature, + nft_log_id, + input_data, + _log_id, + b._inserted_timestamp, + {{ dbt_utils.generate_surrogate_key( + ['tx_hash', 'event_index', 'nft_address','tokenId','platform_exchange_version'] + ) }} AS complete_nft_sales_id, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp, + '{{ invocation_id }}' AS _invocation_id + FROM + final_base b + LEFT JOIN {{ ref('silver__contracts') }} C + ON b.nft_address = C.contract_address + LEFT JOIN {{ ref('silver__aggregator_list') }} A + ON RIGHT( + b.input_data, + 8 + ) = A.aggregator_identifier + AND A.aggregator_type = 'calldata' + LEFT JOIN {{ ref('silver__aggregator_list') }} + a2 + ON b.origin_to_address = a2.aggregator_identifier + AND a2.aggregator_type = 'router' + +{% if is_incremental() and var( + 'HEAL_MODEL' +) %} +UNION ALL +SELECT + block_number, + block_timestamp, + tx_hash, + event_index, + event_type, + platform_address, + platform_name, + platform_exchange_version, + aggregator_name, + seller_address, + buyer_address, + nft_address, + project_name, + erc1155_value, + tokenId, + currency_symbol, + currency_address, + total_price_raw, + total_fees_raw, + platform_fee_raw, + creator_fee_raw, + price, + price_usd, + total_fees, + total_fees_usd, + platform_fee, + platform_fee_usd, + creator_fee, + creator_fee_usd, + tx_fee, + tx_fee_usd, + origin_from_address, + origin_to_address, + origin_function_signature, + nft_log_id, + input_data, + _log_id, + _inserted_timestamp, + {{ dbt_utils.generate_surrogate_key( + ['tx_hash', 'event_index', 'nft_address','tokenId','platform_exchange_version'] + ) }} AS complete_nft_sales_id, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp, + '{{ invocation_id }}' AS _invocation_id +FROM + heal_model +{% endif %} +) +SELECT + block_number, + block_timestamp, + tx_hash, + event_index, + event_type, + platform_address, + platform_name, + platform_exchange_version, + aggregator_name, + seller_address, + buyer_address, + nft_address, + project_name, + erc1155_value, + tokenId, + currency_symbol, + currency_address, + total_price_raw, + total_fees_raw, + platform_fee_raw, + creator_fee_raw, + price, + price_usd, + total_fees, + total_fees_usd, + platform_fee, + platform_fee_usd, + creator_fee, + creator_fee_usd, + tx_fee, + tx_fee_usd, + origin_from_address, + origin_to_address, + origin_function_signature, + nft_log_id, + input_data, + _log_id, + _inserted_timestamp, + complete_nft_sales_id, + inserted_timestamp, + modified_timestamp, + _invocation_id +FROM + combined qualify (ROW_NUMBER() over(PARTITION BY nft_log_id +ORDER BY + _inserted_timestamp DESC)) = 1 diff --git a/models/silver/nft/silver__complete_nft_sales.yml b/models/silver/nft/sales/silver__complete_nft_sales.yml similarity index 100% rename from models/silver/nft/silver__complete_nft_sales.yml rename to models/silver/nft/sales/silver__complete_nft_sales.yml diff --git a/models/silver/nft/silver__seaport_1_5_sales.sql b/models/silver/nft/sales/silver__seaport_1_5_sales.sql similarity index 100% rename from models/silver/nft/silver__seaport_1_5_sales.sql rename to models/silver/nft/sales/silver__seaport_1_5_sales.sql diff --git a/models/silver/nft/silver__seaport_1_5_sales.yml b/models/silver/nft/sales/silver__seaport_1_5_sales.yml similarity index 100% rename from models/silver/nft/silver__seaport_1_5_sales.yml rename to models/silver/nft/sales/silver__seaport_1_5_sales.yml diff --git a/models/silver/nft/silver__zeroex_sales.sql b/models/silver/nft/sales/silver__zeroex_sales.sql similarity index 100% rename from models/silver/nft/silver__zeroex_sales.sql rename to models/silver/nft/sales/silver__zeroex_sales.sql diff --git a/models/silver/nft/silver__zeroex_sales.yml b/models/silver/nft/sales/silver__zeroex_sales.yml similarity index 100% rename from models/silver/nft/silver__zeroex_sales.yml rename to models/silver/nft/sales/silver__zeroex_sales.yml diff --git a/models/silver/nft/silver__complete_nft_sales.sql b/models/silver/nft/silver__complete_nft_sales.sql deleted file mode 100644 index 5f751c7..0000000 --- a/models/silver/nft/silver__complete_nft_sales.sql +++ /dev/null @@ -1,418 +0,0 @@ -{{ config( - materialized = 'incremental', - incremental_strategy = 'delete+insert', - unique_key = ['block_number','platform_name','platform_exchange_version'], - cluster_by = ['block_timestamp::DATE'], - tags = ['curated','reorg'] -) }} - -WITH nft_base_models AS ( - - SELECT - block_number, - block_timestamp, - tx_hash, - event_index, - event_type, - platform_address, - platform_name, - platform_exchange_version, - seller_address, - buyer_address, - nft_address, - erc1155_value :: STRING AS erc1155_value, - tokenId, - currency_address, - total_price_raw, - total_fees_raw, - platform_fee_raw, - creator_fee_raw, - tx_fee, - origin_from_address, - origin_to_address, - origin_function_signature, - input_data, - nft_log_id, - _log_id, - _inserted_timestamp - FROM - {{ ref('silver__seaport_1_5_sales') }} - -{% if is_incremental() %} -WHERE - _inserted_timestamp >= ( - SELECT - MAX( - _inserted_timestamp - ) - INTERVAL '36 hours' - FROM - {{ this }} - ) -{% endif %} -UNION ALL -SELECT - block_number, - block_timestamp, - tx_hash, - event_index, - event_type, - platform_address, - platform_name, - platform_exchange_version, - seller_address, - buyer_address, - nft_address, - erc1155_value :: STRING AS erc1155_value, - tokenId, - currency_address, - total_price_raw, - total_fees_raw, - platform_fee_raw, - creator_fee_raw, - tx_fee, - origin_from_address, - origin_to_address, - origin_function_signature, - input_data, - nft_log_id, - _log_id, - _inserted_timestamp -FROM - {{ ref('silver__zeroex_sales') }} - -{% if is_incremental() %} -WHERE - _inserted_timestamp >= ( - SELECT - MAX( - _inserted_timestamp - ) - INTERVAL '36 hours' - FROM - {{ this }} - ) -{% endif %} -), -prices_raw AS ( - SELECT - HOUR, - symbol, - token_address, - decimals, - price AS hourly_prices - FROM - {{ ref('price__ez_hourly_token_prices') }} - WHERE - token_address IN ( - SELECT - DISTINCT currency_address - FROM - nft_base_models - ) - AND HOUR :: DATE IN ( - SELECT - DISTINCT block_timestamp :: DATE - FROM - nft_base_models - ) - AND token_address != '0x4200000000000000000000000000000000000006' -), -eth_price AS ( - SELECT - HOUR, - 'ETH' AS symbol, - 'ETH' AS token_address, - 18 AS decimals, - price AS eth_price_hourly - FROM - {{ ref('silver__hourly_prices_priority_eth') }} - WHERE - HOUR :: DATE IN ( - SELECT - DISTINCT block_timestamp :: DATE - FROM - nft_base_models - ) -), -all_prices AS ( - SELECT - HOUR, - symbol, - token_address, - decimals, - hourly_prices - FROM - prices_raw - UNION ALL - SELECT - HOUR, - symbol, - token_address, - decimals, - eth_price_hourly - FROM - eth_price - UNION ALL - SELECT - HOUR, - 'WETH' AS symbol, - '0x4200000000000000000000000000000000000006' AS token_address, - decimals, - eth_price_hourly - FROM - eth_price -), -final_base AS ( - SELECT - block_number, - block_timestamp, - tx_hash, - event_index, - event_type, - platform_address, - platform_name, - platform_exchange_version, - NULL AS aggregator_name, - seller_address, - buyer_address, - nft_address, - C.token_name AS project_name, - erc1155_value, - tokenId, - p.symbol AS currency_symbol, - currency_address, - total_price_raw, - total_fees_raw, - platform_fee_raw, - creator_fee_raw, - CASE - WHEN currency_address IN ( - 'ETH', - '0x4200000000000000000000000000000000000006' - ) THEN total_price_raw / pow( - 10, - 18 - ) - ELSE COALESCE (total_price_raw / pow(10, p.decimals), total_price_raw) - END AS price, - IFF( - p.decimals IS NULL, - 0, - price * hourly_prices - ) AS price_usd, - CASE - WHEN currency_address IN ( - 'ETH', - '0x4200000000000000000000000000000000000006' - ) THEN total_fees_raw / pow( - 10, - 18 - ) - ELSE COALESCE (total_fees_raw / pow(10, p.decimals), total_fees_raw) - END AS total_fees, - IFF( - p.decimals IS NULL, - 0, - total_fees * hourly_prices - ) AS total_fees_usd, - CASE - WHEN currency_address IN ( - 'ETH', - '0x4200000000000000000000000000000000000006' - ) THEN platform_fee_raw / pow( - 10, - 18 - ) - ELSE COALESCE (platform_fee_raw / pow(10, p.decimals), platform_fee_raw) - END AS platform_fee, - IFF( - p.decimals IS NULL, - 0, - platform_fee * hourly_prices - ) AS platform_fee_usd, - CASE - WHEN currency_address IN ( - 'ETH', - '0x4200000000000000000000000000000000000006' - ) THEN creator_fee_raw / pow( - 10, - 18 - ) - ELSE COALESCE (creator_fee_raw / pow(10, p.decimals), creator_fee_raw) - END AS creator_fee, - IFF( - p.decimals IS NULL, - 0, - creator_fee * hourly_prices - ) AS creator_fee_usd, - tx_fee, - tx_fee * eth_price_hourly AS tx_fee_usd, - origin_from_address, - origin_to_address, - origin_function_signature, - nft_log_id, - input_data, - _log_id, - b._inserted_timestamp - FROM - nft_base_models b - LEFT JOIN all_prices p - ON DATE_TRUNC( - 'hour', - b.block_timestamp - ) = p.hour - AND b.currency_address = p.token_address - LEFT JOIN eth_price e - ON DATE_TRUNC( - 'hour', - b.block_timestamp - ) = e.hour - LEFT JOIN {{ ref('silver__contracts') }} C - ON b.nft_address = C.contract_address -) - -{% if is_incremental() %}, -label_fill_sales AS ( - SELECT - block_number, - block_timestamp, - tx_hash, - event_index, - event_type, - platform_address, - platform_name, - platform_exchange_version, - aggregator_name, - seller_address, - buyer_address, - nft_address, - C.token_name AS project_name, - erc1155_value, - tokenId, - currency_symbol, - currency_address, - total_price_raw, - total_fees_raw, - platform_fee_raw, - creator_fee_raw, - price, - price_usd, - total_fees, - total_fees_usd, - platform_fee, - platform_fee_usd, - creator_fee, - creator_fee_usd, - tx_fee, - tx_fee_usd, - origin_from_address, - origin_to_address, - origin_function_signature, - nft_log_id, - input_data, - _log_id, - GREATEST( - t._inserted_timestamp, - C._inserted_timestamp - ) AS _inserted_timestamp - FROM - {{ this }} - t - INNER JOIN {{ ref('silver__contracts') }} C - ON t.nft_address = C.contract_address - WHERE - t.project_name IS NULL - AND C.token_name IS NOT NULL -), -blocks_fill AS ( - SELECT - * exclude ( - complete_nft_sales_id, - inserted_timestamp, - modified_timestamp, - _invocation_id - ) - FROM - {{ this }} - WHERE - block_number IN ( - SELECT - block_number - FROM - label_fill_sales - ) - AND nft_log_id NOT IN ( - SELECT - nft_log_id - FROM - label_fill_sales - ) -) -{% endif %}, -final_joins AS ( - SELECT - * - FROM - final_base - -{% if is_incremental() %} -UNION ALL -SELECT - * -FROM - label_fill_sales -UNION ALL -SELECT - * -FROM - blocks_fill -{% endif %} -) -SELECT - block_number, - block_timestamp, - tx_hash, - event_index, - event_type, - platform_address, - platform_name, - platform_exchange_version, - aggregator_name, - seller_address, - buyer_address, - nft_address, - project_name, - erc1155_value, - tokenId, - currency_symbol, - currency_address, - total_price_raw, - total_fees_raw, - platform_fee_raw, - creator_fee_raw, - price, - price_usd, - total_fees, - total_fees_usd, - platform_fee, - platform_fee_usd, - creator_fee, - creator_fee_usd, - tx_fee, - tx_fee_usd, - origin_from_address, - origin_to_address, - origin_function_signature, - nft_log_id, - input_data, - _log_id, - _inserted_timestamp, - {{ dbt_utils.generate_surrogate_key( - ['tx_hash', 'event_index', 'nft_address','tokenId','platform_exchange_version'] - ) }} AS complete_nft_sales_id, - SYSDATE() AS inserted_timestamp, - SYSDATE() AS modified_timestamp, - '{{ invocation_id }}' AS _invocation_id -FROM - final_joins qualify(ROW_NUMBER() over(PARTITION BY nft_log_id -ORDER BY - _inserted_timestamp DESC)) = 1 diff --git a/models/silver/nft/silver__nft_transfers.sql b/models/silver/nft/silver__nft_transfers.sql index fc555d4..0ffb6e3 100644 --- a/models/silver/nft/silver__nft_transfers.sql +++ b/models/silver/nft/silver__nft_transfers.sql @@ -4,7 +4,7 @@ unique_key = "block_number", cluster_by = ['block_timestamp::DATE', '_inserted_timestamp::DATE'], post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(contract_address, tx_hash)", - tags = ['curated','reorg'] + tags = ['curated','reorg', 'heal'] ) }} WITH base AS ( @@ -323,61 +323,10 @@ transfer_base AS ( to_address IS NOT NULL ) -{% if is_incremental() %}, -fill_transfers AS ( - SELECT - t.block_number, - t.block_timestamp, - t.tx_hash, - t.event_index, - t.intra_event_index, - t.contract_address, - C.token_name AS project_name, - t.from_address, - t.to_address, - t.tokenId, - t.erc1155_value, - t.event_type, - t.token_transfer_type, - t._log_id, - GREATEST( - t._inserted_timestamp, - C._inserted_timestamp - ) AS _inserted_timestamp - FROM - {{ this }} - t - INNER JOIN {{ ref('silver__contracts') }} C USING (contract_address) - WHERE - t.project_name IS NULL - AND C.token_name IS NOT NULL -), -blocks_fill AS ( - SELECT - * exclude ( - nft_transfers_id, - inserted_timestamp, - modified_timestamp, - _invocation_id - ) - FROM - {{ this }} - WHERE - block_number IN ( - SELECT - block_number - FROM - fill_transfers - ) - AND _log_id NOT IN ( - SELECT - _log_id - FROM - fill_transfers - ) -) -{% endif %}, -final_base AS ( +{% if is_incremental() and var( + 'HEAL_MODEL' +) %}, +heal_model AS ( SELECT block_number, block_timestamp, @@ -385,7 +334,7 @@ final_base AS ( event_index, intra_event_index, contract_address, - project_name, + C.token_name AS project_name, from_address, to_address, tokenId, @@ -393,51 +342,73 @@ final_base AS ( event_type, token_transfer_type, _log_id, - _inserted_timestamp + t._inserted_timestamp FROM - transfer_base + {{ this }} + t + LEFT JOIN {{ ref('silver__contracts') }} C USING (contract_address) + WHERE + t.block_number IN ( + SELECT + DISTINCT t1.block_number AS block_number + FROM + {{ this }} + t1 + WHERE + t1.project_name IS NULL + AND _inserted_timestamp < ( + SELECT + MAX( + _inserted_timestamp + ) - INTERVAL '36 hours' + FROM + {{ this }} + ) + AND EXISTS ( + SELECT + 1 + FROM + {{ ref('silver__contracts') }} C + WHERE + C._inserted_timestamp > DATEADD('DAY', -14, SYSDATE()) + AND C.token_name IS NOT NULL + AND C.contract_address = t1.contract_address) + ) + ) + {% endif %} + SELECT + block_number, + block_timestamp, + tx_hash, + event_index, + intra_event_index, + contract_address, + A.project_name, + from_address, + to_address, + tokenId, + erc1155_value, + event_type, + token_transfer_type, + _log_id, + A._inserted_timestamp, + {{ dbt_utils.generate_surrogate_key( + ['tx_hash','event_index','intra_event_index'] + ) }} AS nft_transfers_id, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp, + '{{ invocation_id }}' AS _invocation_id + FROM + transfer_base A qualify ROW_NUMBER() over ( + PARTITION BY _log_id + ORDER BY + A._inserted_timestamp DESC + ) = 1 -{% if is_incremental() %} +{% if is_incremental() and var( + 'HEAL_MODEL' +) %} UNION ALL -SELECT - block_number, - block_timestamp, - tx_hash, - event_index, - intra_event_index, - contract_address, - project_name, - from_address, - to_address, - tokenId, - erc1155_value, - event_type, - token_transfer_type, - _log_id, - _inserted_timestamp -FROM - fill_transfers -UNION ALL -SELECT - block_number, - block_timestamp, - tx_hash, - event_index, - intra_event_index, - contract_address, - project_name, - from_address, - to_address, - tokenId, - erc1155_value, - event_type, - token_transfer_type, - _log_id, - _inserted_timestamp -FROM - blocks_fill -{% endif %} -) SELECT block_number, block_timestamp, @@ -461,8 +432,5 @@ SELECT SYSDATE() AS modified_timestamp, '{{ invocation_id }}' AS _invocation_id FROM - final_base qualify ROW_NUMBER() over ( - PARTITION BY _log_id - ORDER BY - _inserted_timestamp DESC - ) = 1 + heal_model +{% endif %}