nft sales refactor

This commit is contained in:
Jack Forgash 2025-04-28 09:27:57 -06:00
parent c19a4a6763
commit bc94aa57dc
4 changed files with 274 additions and 227 deletions

View File

@ -941,4 +941,78 @@ Query Changes:
- Maintains same lockup contract tracking and vesting calculations
- Preserves all existing downstream dependencies
### silver__nft_paras_sales
**Date**: 2024-03
**Changes**:
- Architecture:
- Migrated from `silver__actions_events_function_call_s3` to `core__ez_actions`
- Improved log handling by using `receipt_status_value` directly from `core__ez_actions`
- Updated unique key to use `receipt_id` and `action_index`
- Added dynamic range predicate for incremental updates
- Added `modified_timestamp::DATE` to clustering keys
- Query Changes:
- Optimized data sourcing:
- Main CTE filters directly from `core__ez_actions` for both marketplace actions and NFT transfer payouts
- Added filtering for both receiver_id and predecessor_id to capture all Paras marketplace interactions
- Improved royalty handling by using `receipt_status_value` directly from actions
- Maintained complex COALESCE logic for handling various sale types (regular sales and offers)
- Column Changes:
- Replaced `action_id` with `receipt_id` and `action_index`
- Added direct access to `receipt_status_value` for royalty extraction
- Maintained all existing business logic for:
- Price calculations using various args patterns
- Royalty extraction from SuccessValue
- Platform fee calculations (2% of sale price)
- Configuration:
- Updated incremental predicates to use dynamic range on `block_timestamp::date`
- Modified clustering strategy to include both timestamp fields
- Preserved existing tag configuration
### silver__nft_other_sales
**Date**: 2024-03
**Changes**:
- Architecture:
- Migrated from `silver__actions_events_function_call_s3` to `core__ez_actions` for all action-based sales
- Migrated all log handling to `silver__logs_s3` with direct joins and event parsing
- Maintained separate CTEs for Mitte marketplace, using logs for event extraction
- Updated unique key to use `nft_other_sales_id` generated from `receipt_id` and `action_index`
- Added dynamic range predicate for incremental updates
- Added `modified_timestamp::DATE` to clustering keys
- Query Changes:
- Split data sourcing into three CTEs:
- `actions`: Filters directly from `core__ez_actions` for all supported marketplaces
- `logs`: Processes event logs from `silver__logs_s3` with proper join conditions
- `mitte_logs`: Handles Mitte-specific event parsing with optimized log filtering
- Preserved all COALESCE and CASE logic for extracting:
- Seller address from various args patterns
- Buyer address from direct and offer-based purchases
- NFT contract and token identification
- Price calculations with proper decimal handling
- Improved marketplace-specific logic:
- Apollo42, TradePort, UniqArt, L2E, FewAndFar: Direct args parsing
- Mitte: Enhanced log parsing with proper order array handling
- Added proper join conditions between actions and logs using tx_hash and receipt_id
- Column Changes:
- Replaced `action_id` with `receipt_id` and `action_index`
- Updated source fields to use receipt-level identifiers:
- `receipt_receiver_id` for platform_address
- `receipt_signer_id` for transaction attribution
- Maintained all business logic for:
- Price calculations (division by 1e24)
- Platform name mapping
- NFT contract and token ID extraction
- Added proper type casting for all extracted fields
- Configuration:
- Updated incremental predicates to use dynamic range on `block_timestamp::date`
- Modified clustering strategy to include both timestamp fields
- Added search optimization on key fields
- Preserved existing tag configuration
- Updated merge strategy for better incremental processing
---

View File

@ -1,34 +1,36 @@
{{ config(
materialized = 'incremental',
merge_exclude_columns = ["inserted_timestamp"],
cluster_by = ['block_timestamp::DATE'],
cluster_by = ['block_timestamp::DATE', 'modified_timestamp::DATE'],
unique_key = 'nft_mintbase_sales_id',
incremental_strategy = 'merge',
incremental_predicates = ["dynamic_range_predicate_custom", "block_timestamp::date"],
tags = ['curated','scheduled_non_core']
) }}
WITH actions_events AS (
WITH actions AS (
SELECT
block_id,
block_timestamp,
tx_hash,
action_id,
signer_id,
receiver_id,
method_name,
deposit,
args,
logs,
attached_gas,
_partition_by_block_number,
_inserted_timestamp
receipt_id,
action_index,
receipt_signer_id AS signer_id,
receipt_receiver_id AS receiver_id,
receipt_predecessor_id AS predecessor_id,
action_data :method_name :: STRING AS method_name,
action_data :args :: VARIANT AS args,
action_data :deposit :: INT AS deposit,
action_data :gas :: NUMBER AS attached_gas,
_partition_by_block_number
FROM
{{ ref('silver__actions_events_function_call_s3') }}
{{ ref('core__ez_actions') }}
WHERE
receipt_succeeded = TRUE
AND logs [0] IS NOT NULL
AND receiver_id in ('simple.market.mintbase1.near' , 'market.mintbase1.near')
receipt_succeeded
AND action_name = 'FunctionCall'
AND receipt_receiver_id in ('simple.market.mintbase1.near' , 'market.mintbase1.near')
AND method_name in ('buy', 'resolve_nft_payout')
{% if var("MANUAL_FIX") %}
AND {{ partition_load_manual('no_buffer') }}
{% else %}
@ -42,29 +44,49 @@ WITH actions_events AS (
{% endif %}
{% endif %}
),
raw_logs AS (
logs AS (
SELECT
*,
l.index AS logs_index,
TRY_PARSE_JSON(REPLACE(l.value :: STRING, 'EVENT_JSON:', '')) AS event_json,
event_json:event as event_type,
event_json:standard as standard
l.tx_hash,
l.receipt_id,
l.block_id,
l.log_index,
TRY_PARSE_JSON(clean_log) AS event_json,
event_json :event :: STRING AS event_type,
event_json :standard :: STRING AS standard,
TRY_PARSE_JSON(event_json:data) as mb_logs
FROM
actions_events A,
LATERAL FLATTEN(
input => A.logs
) l
{{ ref('silver__logs_s3') }} l
INNER JOIN actions a
ON a.tx_hash = l.tx_hash
WHERE
is_standard
AND event_json :standard :: STRING = 'mb_market'
AND (event_type = 'nft_sale' OR event_type = 'nft_sold')
{% if var("MANUAL_FIX") %}
AND {{ partition_load_manual('no_buffer') }}
{% else %}
{% if is_incremental() %}
AND modified_timestamp >= (
SELECT
MAX(modified_timestamp)
FROM
{{ this }}
)
{% endif %}
{% endif %}
),
mintbase_nft_sales AS (
SELECT
action_id,
receipt_id,
action_index,
block_id,
block_timestamp,
tx_hash,
attached_gas,
IFF(method_name = 'buy', TRUE, FALSE) AS is_buy, --- else resolve_nft_payout
IFF(is_buy, args :nft_contract_id, args :token :owner_id) :: STRING AS seller_address,
IFF( is_buy, signer_id, args :token :current_offer :from) :: STRING AS buyer_address,
method_name = 'buy' AS is_buy, --- else resolve_nft_payout
IFF(is_buy, args :nft_contract_id, args :token :owner_id) :: STRING AS seller_address,
IFF(is_buy, signer_id, args :token :current_offer :from) :: STRING AS buyer_address,
receiver_id AS platform_address,
'Mintbase' AS platform_name,
IFF(is_buy, args :nft_contract_id, args :token :store_id) :: STRING AS nft_address,
@ -72,59 +94,52 @@ mintbase_nft_sales AS (
IFF(is_buy, deposit, args :token :current_offer :price) / 1e24 AS price,
IFF(is_buy, 'nft_sale', 'nft_sold') AS method_name,
args AS LOG,
logs_index,
_inserted_timestamp,
_partition_by_block_number
FROM
raw_logs
WHERE
(
method_name = 'buy'
)
OR (
method_name = 'resolve_nft_payout'
)
),
mintbase_royalties AS (
SELECT
tx_hash,
method_name,
PARSE_JSON(event_json:data) as mb_logs
FROM
raw_logs
WHERE
standard = 'mb_market' and (event_type = 'nft_sale' OR event_type = 'nft_sold')
actions
),
FINAL AS (
SELECT
m.*,
mb_logs,
mb_logs:affiliate_id:: STRING AS affiliate_id,
mb_logs:affiliate_amount / 1e24 :: STRING AS affiliate_amount,
mb_logs:payout :: object AS royalties,
COALESCE(mb_logs:mintbase_amount / 1e24,
m.receipt_id,
m.action_index,
m.block_id,
m.block_timestamp,
m.tx_hash,
m.attached_gas,
m.method_name,
m.is_buy,
m.seller_address,
m.buyer_address,
m.platform_address,
m.platform_name,
m.nft_address,
m.token_id,
m.price,
m.LOG,
m._partition_by_block_number,
mb_logs :affiliate_id :: STRING AS affiliate_id,
mb_logs :affiliate_amount / 1e24 :: STRING AS affiliate_amount,
mb_logs :payout :: object AS royalties,
COALESCE(mb_logs :mintbase_amount / 1e24,
IFF(affiliate_amount IS NULL AND royalties IS NULL,
price * 0.05,
price * 0.025)
) :: FLOAT AS platform_fee
) :: FLOAT AS platform_fee,
r.log_index
FROM
mintbase_nft_sales m
LEFT JOIN
mintbase_royalties r
logs r
ON
m.tx_hash = r.tx_hash
)
SELECT
*,
{{ dbt_utils.generate_surrogate_key(
['action_id', 'logs_index']
['tx_hash', 'receipt_id', 'action_index', 'log_index']
) }} AS nft_mintbase_sales_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
FINAL
-- TODO clean this up and create test cases

View File

@ -1,34 +1,34 @@
{{ config(
materialized = 'incremental',
merge_exclude_columns = ["inserted_timestamp"],
cluster_by = ['block_timestamp::DATE'],
cluster_by = ['block_timestamp::DATE', 'modified_timestamp::DATE'],
unique_key = 'nft_other_sales_id',
incremental_strategy = 'merge',
incremental_predicates = ["dynamic_range_predicate_custom","block_timestamp::date"],
tags = ['curated','scheduled_non_core']
) }}
WITH actions_events AS (
WITH actions AS (
SELECT
block_id,
block_timestamp,
tx_hash,
action_id,
signer_id,
receiver_id,
method_name,
deposit,
args,
logs,
attached_gas,
receipt_id,
action_index,
receipt_signer_id AS signer_id,
receipt_receiver_id AS receiver_id,
action_data :method_name :: STRING AS method_name,
action_data :args :: VARIANT AS args,
action_data :deposit :: INT AS deposit,
action_data :gas :: NUMBER AS attached_gas,
_partition_by_block_number,
_inserted_timestamp
FROM
{{ ref('silver__actions_events_function_call_s3') }}
{{ ref('core__ez_actions') }}
WHERE
receipt_succeeded = TRUE
AND logs [0] IS NOT NULL
AND receiver_id IN (
receipt_succeeded
AND action_name = 'FunctionCall'
AND receipt_receiver_id IN (
'apollo42.near',
'market.tradeport.near',
'market.nft.uniqart.near',
@ -49,25 +49,45 @@ WITH actions_events AS (
{% endif %}
{% endif %}
),
raw_logs AS (
logs AS (
SELECT
*,
l.index AS logs_index,
TRY_PARSE_JSON(REPLACE(l.value :: STRING, 'EVENT_JSON:', '')) AS event_json
l.tx_hash,
l.receipt_id,
l.block_id,
l.block_timestamp,
l.log_index,
l.receiver_id,
l.predecessor_id,
TRY_PARSE_JSON(clean_log) AS event_json,
event_json :event :: STRING AS event_type
FROM
actions_events A,
LATERAL FLATTEN(
input => A.logs
) l
{{ ref('silver__logs_s3') }} l
INNER JOIN actions a
ON a.tx_hash = l.tx_hash
WHERE
l.is_standard
{% if var("MANUAL_FIX") %}
AND {{ partition_load_manual('no_buffer') }}
{% else %}
{% if is_incremental() %}
AND l.block_timestamp >= (
SELECT
MAX(block_timestamp) - INTERVAL '48 HOURS'
FROM
{{ this }}
)
{% endif %}
{% endif %}
),
------------------------ OTHER MARKETPLACES -------------------------------
other_nft_sales AS (
SELECT
action_id,
receipt_id,
action_index,
block_id,
block_timestamp,
tx_hash,
attached_gas,
args AS LOG,
COALESCE(
args :market_data :owner_id,
args :sale :owner_id,
@ -98,15 +118,12 @@ other_nft_sales AS (
COALESCE(
args :price,
args :offer_data :price,
args: market_data :price
args :market_data :price
) / 1e24 AS price,
method_name,
args AS LOG,
logs_index,
_inserted_timestamp,
_partition_by_block_number
FROM
raw_logs
actions
WHERE
receiver_id IN (
'apollo42.near',
@ -115,129 +132,119 @@ other_nft_sales AS (
'market.l2e.near',
'market.fewandfar.near'
)
AND method_name IN (
'resolve_purchase',
'resolve_offer'
)
AND method_name IN ('resolve_purchase', 'resolve_offer')
),
------------------------------- MITTE -------------------------------
mitte_nft_sales AS (
SELECT
action_id,
receipt_id,
log_index AS action_index,
block_id,
block_timestamp,
tx_hash,
attached_gas,
NULL AS attached_gas,
event_json AS LOG,
CASE
WHEN SPLIT(
event_json :data :order [2],
LOG :data :order [2],
':'
) [1] = 'near' THEN event_json :data :order [6]
ELSE event_json :data :order [1]
) [1] = 'near' THEN LOG :data :order [6]
ELSE LOG :data :order [1]
END :: STRING AS seller_address,
CASE
WHEN SPLIT(
event_json :data :order [2],
LOG :data :order [2],
':'
) [1] = 'near' THEN event_json :data :order [1]
ELSE event_json :data :order [6]
) [1] = 'near' THEN LOG :data :order [1]
ELSE LOG :data :order [6]
END :: STRING AS buyer_address,
receiver_id AS platform_address,
predecessor_id AS platform_address,
'Mitte' AS platform_name,
CASE
WHEN SPLIT(
event_json :data :order [2],
LOG :data :order [2],
':'
) [1] = 'near' THEN SPLIT(
event_json :data :order [7],
LOG :data :order [7],
':'
) [1]
ELSE SPLIT(
event_json :data :order [2],
LOG :data :order [2],
':'
) [1]
END :: STRING AS nft_address,
CASE
WHEN SPLIT(
event_json :data :order [2],
LOG :data :order [2],
':'
) [1] = 'near'
AND SPLIT(
event_json :data :order [7],
LOG :data :order [7],
':'
) [4] IS NULL THEN SPLIT(
event_json :data :order [7],
LOG :data :order [7],
':'
) [2]
WHEN SPLIT(
event_json :data :order [2],
LOG :data :order [2],
':'
) [1] = 'near'
AND SPLIT(
event_json :data :order [7],
LOG :data :order [7],
':'
) [4] IS NOT NULL THEN SPLIT(
event_json :data :order [7],
LOG :data :order [7],
':'
) [2] || ':' || SPLIT(
event_json :data :order [7],
LOG :data :order [7],
':'
) [3]
WHEN SPLIT(
event_json :data :order [2],
LOG :data :order [2],
':'
) [4] IS NULL THEN SPLIT(
event_json :data :order [2],
LOG :data :order [2],
':'
) [2]
ELSE SPLIT(
event_json :data :order [2],
LOG :data :order [2],
':'
) [2] || ':' || SPLIT(
event_json :data :order [2],
LOG :data :order [2],
':'
) [3]
END :: STRING AS token_id,
CASE
WHEN SPLIT(
event_json :data :order [2],
LOG :data :order [2],
':'
) [1] = 'near' THEN SPLIT(
event_json :data :order [2],
LOG :data :order [2],
':'
) [3]
ELSE SPLIT(
event_json :data :order [7],
LOG :data :order [7],
':'
) [3]
END / 1e24 AS price,
event_json :event :: STRING AS method_name,
event_json AS LOG,
logs_index,
_inserted_timestamp,
LOG :event :: STRING AS method_name,
_partition_by_block_number
FROM
raw_logs
logs
WHERE
receiver_id = 'a.mitte-orderbook.near'
AND event_json :event :: STRING != 'nft_mint'
AND event_json :data :order [6] :: STRING != ''
AND LOG :event :: STRING != 'nft_mint'
AND LOG :data :order [6] :: STRING != ''
),
------------------------------- FINAL -------------------------------
sales_union AS (
SELECT
*
FROM
other_nft_sales
SELECT * FROM other_nft_sales
UNION ALL
SELECT
*
FROM
mitte_nft_sales
SELECT * FROM mitte_nft_sales
),
FINAL AS (
SELECT
action_id,
receipt_id,
action_index,
block_id,
block_timestamp,
tx_hash,
@ -251,23 +258,15 @@ FINAL AS (
method_name,
LOG,
price,
NULL :: STRING AS affiliate_id,
NULL :: STRING AS affiliate_amount,
'{}' :: VARIANT AS royalties,
NULL :: FLOAT AS platform_fee,
logs_index,
_inserted_timestamp,
_partition_by_block_number
FROM
sales_union s
FROM sales_union
)
SELECT
*,
{{ dbt_utils.generate_surrogate_key(
['action_id', 'logs_index']
['receipt_id', 'action_index']
) }} AS nft_other_sales_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
FINAL
FROM FINAL

View File

@ -1,41 +1,39 @@
{{ config(
materialized = 'incremental',
merge_exclude_columns = ["inserted_timestamp"],
cluster_by = ['block_timestamp::DATE'],
cluster_by = ['block_timestamp::DATE', 'modified_timestamp::DATE'],
unique_key = 'nft_paras_sales_id',
incremental_strategy = 'merge',
incremental_predicates = ["dynamic_range_predicate_custom","block_timestamp::date"],
tags = ['curated','scheduled_non_core']
) }}
{# Note - multisource model #}
-- TODO ez_actions refactor
WITH actions_events AS (
WITH actions AS (
SELECT
block_id,
block_timestamp,
tx_hash,
action_id,
signer_id,
receiver_id,
predecessor_id,
method_name,
deposit,
args,
logs,
attached_gas,
_partition_by_block_number,
_inserted_timestamp
receipt_id,
action_index,
receipt_signer_id AS signer_id,
receipt_receiver_id AS receiver_id,
receipt_predecessor_id AS predecessor_id,
action_data :method_name :: STRING AS method_name,
action_data :args :: VARIANT AS args,
action_data :deposit :: INT AS deposit,
action_data :gas :: NUMBER AS attached_gas,
receipt_status_value,
_partition_by_block_number
FROM
{{ ref('silver__actions_events_function_call_s3') }}
{{ ref('core__ez_actions') }}
WHERE
receipt_succeeded = TRUE
AND logs [0] IS NOT NULL
AND receiver_id = 'marketplace.paras.near'
receipt_succeeded
AND action_name = 'FunctionCall'
AND (receipt_receiver_id = 'marketplace.paras.near' OR receipt_predecessor_id = 'marketplace.paras.near')
AND method_name IN (
'resolve_purchase',
'resolve_offer'
'resolve_offer',
'nft_transfer_payout'
)
{% if var("MANUAL_FIX") %}
AND {{ partition_load_manual('no_buffer') }}
@ -50,48 +48,10 @@ WITH actions_events AS (
{% endif %}
{% endif %}
),
-- TODO: Delete this dependencie including SuccessValue in the new version of actions events.
status_value AS (
paras_nft_sales AS (
SELECT
tx_hash,
outcome_json :outcome :status AS status_value,
TRY_PARSE_JSON(REPLACE(outcome_json :outcome :logs[0] :: STRING, 'EVENT_JSON:', '')) AS event,
PARSE_JSON(BASE64_DECODE_STRING(outcome_json :outcome :status :SuccessValue)) as SuccessValue,
_partition_by_block_number,
inserted_timestamp AS _inserted_timestamp
FROM
{{ ref('silver__receipts_final') }}
WHERE
predecessor_id = 'marketplace.paras.near'
AND
event:event = 'nft_transfer'
{% if var("MANUAL_FIX") %}
AND {{ partition_load_manual('no_buffer') }}
{% else %}
{% if is_incremental() %}
AND modified_timestamp >= (
SELECT
MAX(modified_timestamp)
FROM
{{ this }}
)
{% endif %}
{% endif %}
),
raw_logs AS (
SELECT
*,
l.index AS logs_index
FROM
actions_events A,
LATERAL FLATTEN(
input => A.logs
) l
),
paras_nft AS (
SELECT
action_id,
receipt_id,
action_index,
block_id,
block_timestamp,
tx_hash,
@ -120,38 +80,37 @@ paras_nft AS (
COALESCE(
args :price,
args :offer_data :price,
args: market_data :price
args :market_data :price
) / 1e24 AS price,
method_name,
args AS LOG,
logs_index,
_inserted_timestamp,
_partition_by_block_number
FROM
raw_logs
actions
WHERE
method_name in ('resolve_purchase', 'resolve_offer')
),
------------------------------- FINAL -------------------------------
FINAL AS (
SELECT
m.*,
NULL :: STRING AS affiliate_id,
NULL :: STRING AS affiliate_amount,
COALESCE(SuccessValue:payout, SuccessValue) :: VARIANT AS royalties,
NULL :: STRING AS affiliate_amount,
l.receipt_status_value :SuccessValue :payout :: VARIANT AS royalties,
price * 0.02 :: FLOAT AS platform_fee
FROM
paras_nft m
paras_nft_sales m
LEFT JOIN
status_value r
(select tx_hash, receipt_status_value from actions where method_name = 'nft_transfer_payout') l
ON
m.tx_hash = r.tx_hash
m.tx_hash = l.tx_hash
)
SELECT
*,
{{ dbt_utils.generate_surrogate_key(
['action_id', 'logs_index']
['receipt_id', 'action_index']
) }} AS nft_paras_sales_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
FINAL
FINAL