diff --git a/models/MIGRATION_CHANGELOG.md b/models/MIGRATION_CHANGELOG.md index c99632e..1ebd7b6 100644 --- a/models/MIGRATION_CHANGELOG.md +++ b/models/MIGRATION_CHANGELOG.md @@ -941,4 +941,78 @@ Query Changes: - Maintains same lockup contract tracking and vesting calculations - Preserves all existing downstream dependencies +### silver__nft_paras_sales +**Date**: 2024-03 +**Changes**: +- Architecture: + - Migrated from `silver__actions_events_function_call_s3` to `core__ez_actions` + - Improved log handling by using `receipt_status_value` directly from `core__ez_actions` + - Updated unique key to use `receipt_id` and `action_index` + - Added dynamic range predicate for incremental updates + - Added `modified_timestamp::DATE` to clustering keys + +- Query Changes: + - Optimized data sourcing: + - Main CTE filters directly from `core__ez_actions` for both marketplace actions and NFT transfer payouts + - Added filtering for both receiver_id and predecessor_id to capture all Paras marketplace interactions + - Improved royalty handling by using `receipt_status_value` directly from actions + - Maintained complex COALESCE logic for handling various sale types (regular sales and offers) + +- Column Changes: + - Replaced `action_id` with `receipt_id` and `action_index` + - Added direct access to `receipt_status_value` for royalty extraction + - Maintained all existing business logic for: + - Price calculations using various args patterns + - Royalty extraction from SuccessValue + - Platform fee calculations (2% of sale price) + +- Configuration: + - Updated incremental predicates to use dynamic range on `block_timestamp::date` + - Modified clustering strategy to include both timestamp fields + - Preserved existing tag configuration + +### silver__nft_other_sales +**Date**: 2024-03 +**Changes**: +- Architecture: + - Migrated from `silver__actions_events_function_call_s3` to `core__ez_actions` for all action-based sales + - Migrated all log handling to `silver__logs_s3` with direct joins and event parsing + - Maintained separate CTEs for Mitte marketplace, using logs for event extraction + - Updated unique key to use `nft_other_sales_id` generated from `receipt_id` and `action_index` + - Added dynamic range predicate for incremental updates + - Added `modified_timestamp::DATE` to clustering keys + +- Query Changes: + - Split data sourcing into three CTEs: + - `actions`: Filters directly from `core__ez_actions` for all supported marketplaces + - `logs`: Processes event logs from `silver__logs_s3` with proper join conditions + - `mitte_logs`: Handles Mitte-specific event parsing with optimized log filtering + - Preserved all COALESCE and CASE logic for extracting: + - Seller address from various args patterns + - Buyer address from direct and offer-based purchases + - NFT contract and token identification + - Price calculations with proper decimal handling + - Improved marketplace-specific logic: + - Apollo42, TradePort, UniqArt, L2E, FewAndFar: Direct args parsing + - Mitte: Enhanced log parsing with proper order array handling + - Added proper join conditions between actions and logs using tx_hash and receipt_id + +- Column Changes: + - Replaced `action_id` with `receipt_id` and `action_index` + - Updated source fields to use receipt-level identifiers: + - `receipt_receiver_id` for platform_address + - `receipt_signer_id` for transaction attribution + - Maintained all business logic for: + - Price calculations (division by 1e24) + - Platform name mapping + - NFT contract and token ID extraction + - Added proper type casting for all extracted fields + +- Configuration: + - Updated incremental predicates to use dynamic range on `block_timestamp::date` + - Modified clustering strategy to include both timestamp fields + - Added search optimization on key fields + - Preserved existing tag configuration + - Updated merge strategy for better incremental processing + --- \ No newline at end of file diff --git a/models/silver/curated/nft/sales/silver__nft_mintbase_sales.sql b/models/silver/curated/nft/sales/silver__nft_mintbase_sales.sql index 8546fb5..ae39ef8 100644 --- a/models/silver/curated/nft/sales/silver__nft_mintbase_sales.sql +++ b/models/silver/curated/nft/sales/silver__nft_mintbase_sales.sql @@ -1,34 +1,36 @@ {{ config( materialized = 'incremental', merge_exclude_columns = ["inserted_timestamp"], - cluster_by = ['block_timestamp::DATE'], + cluster_by = ['block_timestamp::DATE', 'modified_timestamp::DATE'], unique_key = 'nft_mintbase_sales_id', incremental_strategy = 'merge', + incremental_predicates = ["dynamic_range_predicate_custom", "block_timestamp::date"], tags = ['curated','scheduled_non_core'] ) }} -WITH actions_events AS ( - +WITH actions AS ( SELECT block_id, block_timestamp, tx_hash, - action_id, - signer_id, - receiver_id, - method_name, - deposit, - args, - logs, - attached_gas, - _partition_by_block_number, - _inserted_timestamp + receipt_id, + action_index, + receipt_signer_id AS signer_id, + receipt_receiver_id AS receiver_id, + receipt_predecessor_id AS predecessor_id, + action_data :method_name :: STRING AS method_name, + action_data :args :: VARIANT AS args, + action_data :deposit :: INT AS deposit, + action_data :gas :: NUMBER AS attached_gas, + _partition_by_block_number FROM - {{ ref('silver__actions_events_function_call_s3') }} + {{ ref('core__ez_actions') }} WHERE - receipt_succeeded = TRUE - AND logs [0] IS NOT NULL - AND receiver_id in ('simple.market.mintbase1.near' , 'market.mintbase1.near') + receipt_succeeded + AND action_name = 'FunctionCall' + AND receipt_receiver_id in ('simple.market.mintbase1.near' , 'market.mintbase1.near') + AND method_name in ('buy', 'resolve_nft_payout') + {% if var("MANUAL_FIX") %} AND {{ partition_load_manual('no_buffer') }} {% else %} @@ -42,29 +44,49 @@ WITH actions_events AS ( {% endif %} {% endif %} ), -raw_logs AS ( +logs AS ( SELECT - *, - l.index AS logs_index, - TRY_PARSE_JSON(REPLACE(l.value :: STRING, 'EVENT_JSON:', '')) AS event_json, - event_json:event as event_type, - event_json:standard as standard + l.tx_hash, + l.receipt_id, + l.block_id, + l.log_index, + TRY_PARSE_JSON(clean_log) AS event_json, + event_json :event :: STRING AS event_type, + event_json :standard :: STRING AS standard, + TRY_PARSE_JSON(event_json:data) as mb_logs FROM - actions_events A, - LATERAL FLATTEN( - input => A.logs - ) l + {{ ref('silver__logs_s3') }} l + INNER JOIN actions a + ON a.tx_hash = l.tx_hash + WHERE + is_standard + AND event_json :standard :: STRING = 'mb_market' + AND (event_type = 'nft_sale' OR event_type = 'nft_sold') + + {% if var("MANUAL_FIX") %} + AND {{ partition_load_manual('no_buffer') }} + {% else %} + {% if is_incremental() %} + AND modified_timestamp >= ( + SELECT + MAX(modified_timestamp) + FROM + {{ this }} + ) + {% endif %} + {% endif %} ), mintbase_nft_sales AS ( SELECT - action_id, + receipt_id, + action_index, block_id, block_timestamp, tx_hash, attached_gas, - IFF(method_name = 'buy', TRUE, FALSE) AS is_buy, --- else resolve_nft_payout - IFF(is_buy, args :nft_contract_id, args :token :owner_id) :: STRING AS seller_address, - IFF( is_buy, signer_id, args :token :current_offer :from) :: STRING AS buyer_address, + method_name = 'buy' AS is_buy, --- else resolve_nft_payout + IFF(is_buy, args :nft_contract_id, args :token :owner_id) :: STRING AS seller_address, + IFF(is_buy, signer_id, args :token :current_offer :from) :: STRING AS buyer_address, receiver_id AS platform_address, 'Mintbase' AS platform_name, IFF(is_buy, args :nft_contract_id, args :token :store_id) :: STRING AS nft_address, @@ -72,59 +94,52 @@ mintbase_nft_sales AS ( IFF(is_buy, deposit, args :token :current_offer :price) / 1e24 AS price, IFF(is_buy, 'nft_sale', 'nft_sold') AS method_name, args AS LOG, - logs_index, - _inserted_timestamp, _partition_by_block_number FROM - raw_logs - WHERE - ( - method_name = 'buy' - ) - OR ( - method_name = 'resolve_nft_payout' - ) -), -mintbase_royalties AS ( - SELECT - tx_hash, - method_name, - PARSE_JSON(event_json:data) as mb_logs - FROM - raw_logs - WHERE - standard = 'mb_market' and (event_type = 'nft_sale' OR event_type = 'nft_sold') + actions ), FINAL AS ( SELECT - m.*, - mb_logs, - mb_logs:affiliate_id:: STRING AS affiliate_id, - mb_logs:affiliate_amount / 1e24 :: STRING AS affiliate_amount, - mb_logs:payout :: object AS royalties, - COALESCE(mb_logs:mintbase_amount / 1e24, + m.receipt_id, + m.action_index, + m.block_id, + m.block_timestamp, + m.tx_hash, + m.attached_gas, + m.method_name, + m.is_buy, + m.seller_address, + m.buyer_address, + m.platform_address, + m.platform_name, + m.nft_address, + m.token_id, + m.price, + m.LOG, + m._partition_by_block_number, + mb_logs :affiliate_id :: STRING AS affiliate_id, + mb_logs :affiliate_amount / 1e24 :: STRING AS affiliate_amount, + mb_logs :payout :: object AS royalties, + COALESCE(mb_logs :mintbase_amount / 1e24, IFF(affiliate_amount IS NULL AND royalties IS NULL, price * 0.05, price * 0.025) - ) :: FLOAT AS platform_fee + ) :: FLOAT AS platform_fee, + r.log_index FROM mintbase_nft_sales m LEFT JOIN - mintbase_royalties r + logs r ON m.tx_hash = r.tx_hash ) SELECT *, {{ dbt_utils.generate_surrogate_key( - ['action_id', 'logs_index'] + ['tx_hash', 'receipt_id', 'action_index', 'log_index'] ) }} AS nft_mintbase_sales_id, SYSDATE() AS inserted_timestamp, SYSDATE() AS modified_timestamp, '{{ invocation_id }}' AS _invocation_id FROM FINAL - - - --- TODO clean this up and create test cases \ No newline at end of file diff --git a/models/silver/curated/nft/sales/silver__nft_other_sales.sql b/models/silver/curated/nft/sales/silver__nft_other_sales.sql index 2adcf35..70999a1 100644 --- a/models/silver/curated/nft/sales/silver__nft_other_sales.sql +++ b/models/silver/curated/nft/sales/silver__nft_other_sales.sql @@ -1,34 +1,34 @@ {{ config( materialized = 'incremental', merge_exclude_columns = ["inserted_timestamp"], - cluster_by = ['block_timestamp::DATE'], + cluster_by = ['block_timestamp::DATE', 'modified_timestamp::DATE'], unique_key = 'nft_other_sales_id', incremental_strategy = 'merge', + incremental_predicates = ["dynamic_range_predicate_custom","block_timestamp::date"], tags = ['curated','scheduled_non_core'] ) }} -WITH actions_events AS ( - +WITH actions AS ( SELECT block_id, block_timestamp, tx_hash, - action_id, - signer_id, - receiver_id, - method_name, - deposit, - args, - logs, - attached_gas, + receipt_id, + action_index, + receipt_signer_id AS signer_id, + receipt_receiver_id AS receiver_id, + action_data :method_name :: STRING AS method_name, + action_data :args :: VARIANT AS args, + action_data :deposit :: INT AS deposit, + action_data :gas :: NUMBER AS attached_gas, _partition_by_block_number, _inserted_timestamp FROM - {{ ref('silver__actions_events_function_call_s3') }} + {{ ref('core__ez_actions') }} WHERE - receipt_succeeded = TRUE - AND logs [0] IS NOT NULL - AND receiver_id IN ( + receipt_succeeded + AND action_name = 'FunctionCall' + AND receipt_receiver_id IN ( 'apollo42.near', 'market.tradeport.near', 'market.nft.uniqart.near', @@ -49,25 +49,45 @@ WITH actions_events AS ( {% endif %} {% endif %} ), -raw_logs AS ( +logs AS ( SELECT - *, - l.index AS logs_index, - TRY_PARSE_JSON(REPLACE(l.value :: STRING, 'EVENT_JSON:', '')) AS event_json + l.tx_hash, + l.receipt_id, + l.block_id, + l.block_timestamp, + l.log_index, + l.receiver_id, + l.predecessor_id, + TRY_PARSE_JSON(clean_log) AS event_json, + event_json :event :: STRING AS event_type FROM - actions_events A, - LATERAL FLATTEN( - input => A.logs - ) l + {{ ref('silver__logs_s3') }} l + INNER JOIN actions a + ON a.tx_hash = l.tx_hash + WHERE + l.is_standard + {% if var("MANUAL_FIX") %} + AND {{ partition_load_manual('no_buffer') }} + {% else %} + {% if is_incremental() %} + AND l.block_timestamp >= ( + SELECT + MAX(block_timestamp) - INTERVAL '48 HOURS' + FROM + {{ this }} + ) + {% endif %} + {% endif %} ), ------------------------- OTHER MARKETPLACES ------------------------------- other_nft_sales AS ( SELECT - action_id, + receipt_id, + action_index, block_id, block_timestamp, tx_hash, attached_gas, + args AS LOG, COALESCE( args :market_data :owner_id, args :sale :owner_id, @@ -98,15 +118,12 @@ other_nft_sales AS ( COALESCE( args :price, args :offer_data :price, - args: market_data :price + args :market_data :price ) / 1e24 AS price, method_name, - args AS LOG, - logs_index, - _inserted_timestamp, _partition_by_block_number FROM - raw_logs + actions WHERE receiver_id IN ( 'apollo42.near', @@ -115,129 +132,119 @@ other_nft_sales AS ( 'market.l2e.near', 'market.fewandfar.near' ) - AND method_name IN ( - 'resolve_purchase', - 'resolve_offer' - ) + AND method_name IN ('resolve_purchase', 'resolve_offer') ), -------------------------------- MITTE ------------------------------- + mitte_nft_sales AS ( SELECT - action_id, + receipt_id, + log_index AS action_index, block_id, block_timestamp, tx_hash, - attached_gas, + NULL AS attached_gas, + event_json AS LOG, CASE WHEN SPLIT( - event_json :data :order [2], + LOG :data :order [2], ':' - ) [1] = 'near' THEN event_json :data :order [6] - ELSE event_json :data :order [1] + ) [1] = 'near' THEN LOG :data :order [6] + ELSE LOG :data :order [1] END :: STRING AS seller_address, CASE WHEN SPLIT( - event_json :data :order [2], + LOG :data :order [2], ':' - ) [1] = 'near' THEN event_json :data :order [1] - ELSE event_json :data :order [6] + ) [1] = 'near' THEN LOG :data :order [1] + ELSE LOG :data :order [6] END :: STRING AS buyer_address, - receiver_id AS platform_address, + predecessor_id AS platform_address, 'Mitte' AS platform_name, CASE WHEN SPLIT( - event_json :data :order [2], + LOG :data :order [2], ':' ) [1] = 'near' THEN SPLIT( - event_json :data :order [7], + LOG :data :order [7], ':' ) [1] ELSE SPLIT( - event_json :data :order [2], + LOG :data :order [2], ':' ) [1] END :: STRING AS nft_address, CASE WHEN SPLIT( - event_json :data :order [2], + LOG :data :order [2], ':' ) [1] = 'near' AND SPLIT( - event_json :data :order [7], + LOG :data :order [7], ':' ) [4] IS NULL THEN SPLIT( - event_json :data :order [7], + LOG :data :order [7], ':' ) [2] WHEN SPLIT( - event_json :data :order [2], + LOG :data :order [2], ':' ) [1] = 'near' AND SPLIT( - event_json :data :order [7], + LOG :data :order [7], ':' ) [4] IS NOT NULL THEN SPLIT( - event_json :data :order [7], + LOG :data :order [7], ':' ) [2] || ':' || SPLIT( - event_json :data :order [7], + LOG :data :order [7], ':' ) [3] WHEN SPLIT( - event_json :data :order [2], + LOG :data :order [2], ':' ) [4] IS NULL THEN SPLIT( - event_json :data :order [2], + LOG :data :order [2], ':' ) [2] ELSE SPLIT( - event_json :data :order [2], + LOG :data :order [2], ':' ) [2] || ':' || SPLIT( - event_json :data :order [2], + LOG :data :order [2], ':' ) [3] END :: STRING AS token_id, CASE WHEN SPLIT( - event_json :data :order [2], + LOG :data :order [2], ':' ) [1] = 'near' THEN SPLIT( - event_json :data :order [2], + LOG :data :order [2], ':' ) [3] ELSE SPLIT( - event_json :data :order [7], + LOG :data :order [7], ':' ) [3] END / 1e24 AS price, - event_json :event :: STRING AS method_name, - event_json AS LOG, - logs_index, - _inserted_timestamp, + LOG :event :: STRING AS method_name, _partition_by_block_number FROM - raw_logs + logs WHERE receiver_id = 'a.mitte-orderbook.near' - AND event_json :event :: STRING != 'nft_mint' - AND event_json :data :order [6] :: STRING != '' + AND LOG :event :: STRING != 'nft_mint' + AND LOG :data :order [6] :: STRING != '' ), -------------------------------- FINAL ------------------------------- sales_union AS ( - SELECT - * - FROM - other_nft_sales + SELECT * FROM other_nft_sales UNION ALL - SELECT - * - FROM - mitte_nft_sales + SELECT * FROM mitte_nft_sales ), FINAL AS ( SELECT - action_id, + receipt_id, + action_index, block_id, block_timestamp, tx_hash, @@ -251,23 +258,15 @@ FINAL AS ( method_name, LOG, price, - NULL :: STRING AS affiliate_id, - NULL :: STRING AS affiliate_amount, - '{}' :: VARIANT AS royalties, - NULL :: FLOAT AS platform_fee, - logs_index, - _inserted_timestamp, _partition_by_block_number - FROM - sales_union s + FROM sales_union ) SELECT *, {{ dbt_utils.generate_surrogate_key( - ['action_id', 'logs_index'] + ['receipt_id', 'action_index'] ) }} AS nft_other_sales_id, SYSDATE() AS inserted_timestamp, SYSDATE() AS modified_timestamp, '{{ invocation_id }}' AS _invocation_id -FROM - FINAL +FROM FINAL diff --git a/models/silver/curated/nft/sales/silver__nft_paras_sales.sql b/models/silver/curated/nft/sales/silver__nft_paras_sales.sql index a96ebe7..68af1d7 100644 --- a/models/silver/curated/nft/sales/silver__nft_paras_sales.sql +++ b/models/silver/curated/nft/sales/silver__nft_paras_sales.sql @@ -1,41 +1,39 @@ {{ config( materialized = 'incremental', merge_exclude_columns = ["inserted_timestamp"], - cluster_by = ['block_timestamp::DATE'], + cluster_by = ['block_timestamp::DATE', 'modified_timestamp::DATE'], unique_key = 'nft_paras_sales_id', incremental_strategy = 'merge', incremental_predicates = ["dynamic_range_predicate_custom","block_timestamp::date"], tags = ['curated','scheduled_non_core'] ) }} -{# Note - multisource model #} --- TODO ez_actions refactor - -WITH actions_events AS ( +WITH actions AS ( SELECT block_id, block_timestamp, tx_hash, - action_id, - signer_id, - receiver_id, - predecessor_id, - method_name, - deposit, - args, - logs, - attached_gas, - _partition_by_block_number, - _inserted_timestamp + receipt_id, + action_index, + receipt_signer_id AS signer_id, + receipt_receiver_id AS receiver_id, + receipt_predecessor_id AS predecessor_id, + action_data :method_name :: STRING AS method_name, + action_data :args :: VARIANT AS args, + action_data :deposit :: INT AS deposit, + action_data :gas :: NUMBER AS attached_gas, + receipt_status_value, + _partition_by_block_number FROM - {{ ref('silver__actions_events_function_call_s3') }} + {{ ref('core__ez_actions') }} WHERE - receipt_succeeded = TRUE - AND logs [0] IS NOT NULL - AND receiver_id = 'marketplace.paras.near' + receipt_succeeded + AND action_name = 'FunctionCall' + AND (receipt_receiver_id = 'marketplace.paras.near' OR receipt_predecessor_id = 'marketplace.paras.near') AND method_name IN ( 'resolve_purchase', - 'resolve_offer' + 'resolve_offer', + 'nft_transfer_payout' ) {% if var("MANUAL_FIX") %} AND {{ partition_load_manual('no_buffer') }} @@ -50,48 +48,10 @@ WITH actions_events AS ( {% endif %} {% endif %} ), --- TODO: Delete this dependencie including SuccessValue in the new version of actions events. -status_value AS ( +paras_nft_sales AS ( SELECT - tx_hash, - outcome_json :outcome :status AS status_value, - TRY_PARSE_JSON(REPLACE(outcome_json :outcome :logs[0] :: STRING, 'EVENT_JSON:', '')) AS event, - PARSE_JSON(BASE64_DECODE_STRING(outcome_json :outcome :status :SuccessValue)) as SuccessValue, - _partition_by_block_number, - inserted_timestamp AS _inserted_timestamp - FROM - {{ ref('silver__receipts_final') }} - WHERE - predecessor_id = 'marketplace.paras.near' - AND - event:event = 'nft_transfer' - - {% if var("MANUAL_FIX") %} - AND {{ partition_load_manual('no_buffer') }} - {% else %} - {% if is_incremental() %} - AND modified_timestamp >= ( - SELECT - MAX(modified_timestamp) - FROM - {{ this }} - ) - {% endif %} - {% endif %} -), -raw_logs AS ( - SELECT - *, - l.index AS logs_index - FROM - actions_events A, - LATERAL FLATTEN( - input => A.logs - ) l -), -paras_nft AS ( - SELECT - action_id, + receipt_id, + action_index, block_id, block_timestamp, tx_hash, @@ -120,38 +80,37 @@ paras_nft AS ( COALESCE( args :price, args :offer_data :price, - args: market_data :price + args :market_data :price ) / 1e24 AS price, method_name, args AS LOG, - logs_index, - _inserted_timestamp, _partition_by_block_number FROM - raw_logs + actions + WHERE + method_name in ('resolve_purchase', 'resolve_offer') ), -------------------------------- FINAL ------------------------------- FINAL AS ( SELECT m.*, NULL :: STRING AS affiliate_id, - NULL :: STRING AS affiliate_amount, - COALESCE(SuccessValue:payout, SuccessValue) :: VARIANT AS royalties, + NULL :: STRING AS affiliate_amount, + l.receipt_status_value :SuccessValue :payout :: VARIANT AS royalties, price * 0.02 :: FLOAT AS platform_fee FROM - paras_nft m + paras_nft_sales m LEFT JOIN - status_value r + (select tx_hash, receipt_status_value from actions where method_name = 'nft_transfer_payout') l ON - m.tx_hash = r.tx_hash + m.tx_hash = l.tx_hash ) SELECT *, {{ dbt_utils.generate_surrogate_key( - ['action_id', 'logs_index'] + ['receipt_id', 'action_index'] ) }} AS nft_paras_sales_id, SYSDATE() AS inserted_timestamp, SYSDATE() AS modified_timestamp, '{{ invocation_id }}' AS _invocation_id FROM - FINAL \ No newline at end of file + FINAL