add aggregator logic (#260)

* add aggregator logic

* heal tag

* full refresh false tag

* more filters
This commit is contained in:
Sam 2024-03-08 09:34:10 +08:00 committed by GitHub
parent 3b13d77871
commit 1ace5e7edb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 893 additions and 641 deletions

View File

@ -0,0 +1,51 @@
{{ config(
materialized = 'incremental',
unique_key = 'aggregator_identifier',
merge_update_columns = ['aggregator_identifier', 'aggregator', 'aggregator_type'],
full_refresh = false
) }}
WITH calldata_aggregators AS (
SELECT
*
FROM
(
VALUES
('0', '0', 'calldata', '2020-01-01')
) t (aggregator_identifier, aggregator, aggregator_type, _inserted_timestamp)
),
platform_routers as (
SELECT
*
FROM
(
VALUES
('0xc9605a76b0370e148b4a510757685949f13248c7', 'element', 'router', '2024-03-07'),
('0xbbbbbbbe843515689f3182b748b5671665541e58', 'bluesweep', 'router', '2024-03-07')
) t (aggregator_identifier, aggregator, aggregator_type, _inserted_timestamp)
),
combined as (
SELECT *
FROM
calldata_aggregators
UNION ALL
SELECT *
FROM
platform_routers
)
SELECT
aggregator_identifier,
aggregator,
aggregator_type,
_inserted_timestamp
FROM combined
qualify row_number() over (partition by aggregator_identifier order by _inserted_timestamp desc ) = 1

View File

@ -0,0 +1,770 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'delete+insert',
unique_key = ['block_number','platform_exchange_version'],
cluster_by = ['block_timestamp::DATE'],
tags = ['curated','reorg','heal']
) }}
WITH nft_base_models AS (
SELECT
block_number,
block_timestamp,
tx_hash,
event_index,
event_type,
platform_address,
platform_name,
platform_exchange_version,
seller_address,
buyer_address,
nft_address,
erc1155_value :: STRING AS erc1155_value,
tokenId,
currency_address,
total_price_raw,
total_fees_raw,
platform_fee_raw,
creator_fee_raw,
tx_fee,
origin_from_address,
origin_to_address,
origin_function_signature,
input_data,
nft_log_id,
_log_id,
_inserted_timestamp
FROM
{{ ref('silver__quix_sales') }}
{% if is_incremental() %}
WHERE
_inserted_timestamp >= (
SELECT
MAX(
_inserted_timestamp
) - INTERVAL '36 hours'
FROM
{{ this }}
)
{% endif %}
UNION ALL
SELECT
block_number,
block_timestamp,
tx_hash,
event_index,
event_type,
platform_address,
platform_name,
platform_exchange_version,
seller_address,
buyer_address,
nft_address,
erc1155_value :: STRING AS erc1155_value,
tokenId,
currency_address,
total_price_raw,
total_fees_raw,
platform_fee_raw,
creator_fee_raw,
tx_fee,
origin_from_address,
origin_to_address,
origin_function_signature,
input_data,
nft_log_id,
_log_id,
_inserted_timestamp
FROM
{{ ref('silver__quix_seaport_sales') }}
{% if is_incremental() %}
WHERE
_inserted_timestamp >= (
SELECT
MAX(
_inserted_timestamp
) - INTERVAL '36 hours'
FROM
{{ this }}
)
{% endif %}
UNION ALL
SELECT
block_number,
block_timestamp,
tx_hash,
event_index,
event_type,
platform_address,
platform_name,
platform_exchange_version,
seller_address,
buyer_address,
nft_address,
erc1155_value :: STRING AS erc1155_value,
tokenId,
currency_address,
total_price_raw,
total_fees_raw,
platform_fee_raw,
creator_fee_raw,
tx_fee,
origin_from_address,
origin_to_address,
origin_function_signature,
input_data,
nft_log_id,
_log_id,
_inserted_timestamp
FROM
{{ ref('silver__seaport_1_1_sales') }}
{% if is_incremental() %}
WHERE
_inserted_timestamp >= (
SELECT
MAX(
_inserted_timestamp
) - INTERVAL '36 hours'
FROM
{{ this }}
)
{% endif %}
UNION ALL
SELECT
block_number,
block_timestamp,
tx_hash,
event_index,
event_type,
platform_address,
platform_name,
platform_exchange_version,
seller_address,
buyer_address,
nft_address,
erc1155_value :: STRING AS erc1155_value,
tokenId,
currency_address,
total_price_raw,
total_fees_raw,
platform_fee_raw,
creator_fee_raw,
tx_fee,
origin_from_address,
origin_to_address,
origin_function_signature,
input_data,
nft_log_id,
_log_id,
_inserted_timestamp
FROM
{{ ref('silver__seaport_1_4_sales') }}
{% if is_incremental() %}
WHERE
_inserted_timestamp >= (
SELECT
MAX(
_inserted_timestamp
) - INTERVAL '36 hours'
FROM
{{ this }}
)
{% endif %}
UNION ALL
SELECT
block_number,
block_timestamp,
tx_hash,
event_index,
event_type,
platform_address,
platform_name,
platform_exchange_version,
seller_address,
buyer_address,
nft_address,
erc1155_value :: STRING AS erc1155_value,
tokenId,
currency_address,
total_price_raw,
total_fees_raw,
platform_fee_raw,
creator_fee_raw,
tx_fee,
origin_from_address,
origin_to_address,
origin_function_signature,
input_data,
nft_log_id,
_log_id,
_inserted_timestamp
FROM
{{ ref('silver__seaport_1_5_sales') }}
{% if is_incremental() %}
WHERE
_inserted_timestamp >= (
SELECT
MAX(
_inserted_timestamp
) - INTERVAL '36 hours'
FROM
{{ this }}
)
{% endif %}
),
prices_raw AS (
SELECT
HOUR,
symbol,
token_address,
decimals,
price AS hourly_prices
FROM
{{ ref('price__ez_hourly_token_prices') }}
WHERE
token_address IN (
SELECT
DISTINCT currency_address
FROM
nft_base_models
)
AND HOUR :: DATE IN (
SELECT
DISTINCT block_timestamp :: DATE
FROM
nft_base_models
)
),
all_prices AS (
SELECT
HOUR,
symbol,
token_address,
decimals,
hourly_prices
FROM
prices_raw
UNION ALL
SELECT
HOUR,
'ETH' AS symbol,
'ETH' AS token_address,
decimals,
hourly_prices
FROM
prices_raw
WHERE
token_address = '0x4200000000000000000000000000000000000006'
),
eth_price AS (
SELECT
HOUR,
hourly_prices AS eth_price_hourly
FROM
prices_raw
WHERE
token_address = '0x4200000000000000000000000000000000000006'
),
contracts_decimal AS (
SELECT
contract_address AS address_contracts,
token_symbol AS symbol_contracts,
token_decimals AS decimals_contracts
FROM
{{ ref('silver__contracts') }}
WHERE
contract_address IN (
SELECT
currency_address
FROM
nft_base_models
)
),
final_base AS (
SELECT
block_number,
block_timestamp,
tx_hash,
event_index,
event_type,
platform_address,
platform_name,
platform_exchange_version,
NULL AS aggregator_name,
seller_address,
buyer_address,
nft_address,
erc1155_value,
tokenId,
COALESCE(
p.symbol,
symbol_contracts
) AS currency_symbol,
currency_address,
total_price_raw,
total_fees_raw,
platform_fee_raw,
creator_fee_raw,
CASE
WHEN currency_address IN (
'ETH',
'0x4200000000000000000000000000000000000042',
'0x4200000000000000000000000000000000000006'
) THEN total_price_raw / pow(
10,
18
)
ELSE COALESCE (
total_price_raw / pow(10, COALESCE(p.decimals, decimals_contracts)),
total_price_raw
)
END AS price,
IFF(
COALESCE(
p.decimals,
decimals_contracts
) IS NULL,
0,
price * COALESCE(
hourly_prices,
0
)
) AS price_usd,
CASE
WHEN currency_address IN (
'ETH',
'0x4200000000000000000000000000000000000042',
'0x4200000000000000000000000000000000000006'
) THEN total_fees_raw / pow(
10,
18
)
ELSE COALESCE (
total_fees_raw / pow(10, COALESCE(p.decimals, decimals_contracts)),
total_fees_raw
)
END AS total_fees,
IFF(
COALESCE(
p.decimals,
decimals_contracts
) IS NULL,
0,
total_fees * COALESCE(
hourly_prices,
0
)
) AS total_fees_usd,
CASE
WHEN currency_address IN (
'ETH',
'0x4200000000000000000000000000000000000042',
'0x4200000000000000000000000000000000000006'
) THEN platform_fee_raw / pow(
10,
18
)
ELSE COALESCE (
platform_fee_raw / pow(10, COALESCE(p.decimals, decimals_contracts)),
platform_fee_raw
)
END AS platform_fee,
IFF(
COALESCE(
p.decimals,
decimals_contracts
) IS NULL,
0,
platform_fee * COALESCE(
hourly_prices,
0
)
) AS platform_fee_usd,
CASE
WHEN currency_address IN (
'ETH',
'0x4200000000000000000000000000000000000042',
'0x4200000000000000000000000000000000000006'
) THEN creator_fee_raw / pow(
10,
18
)
ELSE COALESCE (
creator_fee_raw / pow(10, COALESCE(p.decimals, decimals_contracts)),
creator_fee_raw
)
END AS creator_fee,
IFF(
COALESCE(
p.decimals,
decimals_contracts
) IS NULL,
0,
creator_fee * COALESCE(
hourly_prices,
0
)
) AS creator_fee_usd,
tx_fee,
tx_fee * eth_price_hourly AS tx_fee_usd,
origin_from_address,
origin_to_address,
origin_function_signature,
nft_log_id,
input_data,
_log_id,
b._inserted_timestamp
FROM
nft_base_models b
LEFT JOIN all_prices p
ON DATE_TRUNC(
'hour',
b.block_timestamp
) = p.hour
AND b.currency_address = p.token_address
LEFT JOIN eth_price e
ON DATE_TRUNC(
'hour',
b.block_timestamp
) = e.hour
LEFT JOIN contracts_decimal C
ON b.currency_address = C.address_contracts
),
{% if is_incremental() and var(
'HEAL_MODEL'
) %}
heal_model AS (
SELECT
block_number,
block_timestamp,
tx_hash,
event_index,
event_type,
platform_address,
COALESCE(
a2.aggregator,
platform_name
) AS platform_name,
platform_exchange_version,
COALESCE(
aggregator_name,
A.aggregator
) AS aggregator_name,
seller_address,
buyer_address,
nft_address,
C.token_name AS project_name,
erc1155_value,
tokenId,
currency_symbol,
currency_address,
total_price_raw,
total_fees_raw,
platform_fee_raw,
creator_fee_raw,
price,
price_usd,
total_fees,
total_fees_usd,
platform_fee,
platform_fee_usd,
creator_fee,
creator_fee_usd,
tx_fee,
tx_fee_usd,
origin_from_address,
origin_to_address,
origin_function_signature,
nft_log_id,
input_data,
_log_id,
t._inserted_timestamp
FROM
{{ this }}
t
LEFT JOIN {{ ref('silver__contracts') }} C
ON t.nft_address = C.contract_address
LEFT JOIN {{ ref('silver__aggregator_list') }} A
ON RIGHT(
t.input_data,
8
) = A.aggregator_identifier
AND aggregator_type = 'calldata'
LEFT JOIN {{ ref('silver__aggregator_list') }}
a2
ON t.origin_to_address = a2.aggregator_identifier
AND a2.aggregator_type = 'router'
WHERE
(
t.block_number IN (
SELECT
DISTINCT t1.block_number AS block_number
FROM
{{ this }}
t1
WHERE
t1.project_name IS NULL
AND _inserted_timestamp < (
SELECT
MAX(
_inserted_timestamp
) - INTERVAL '36 hours'
FROM
{{ this }}
)
AND EXISTS (
SELECT
1
FROM
{{ ref('silver__contracts') }} C
WHERE
C._inserted_timestamp > DATEADD('DAY', -14, SYSDATE())
AND C.token_name IS NOT NULL
AND C.contract_address = t1.nft_address)
)
)
OR (
t.block_number IN (
SELECT
DISTINCT t1.block_number AS block_number
FROM
{{ this }}
t1
WHERE
t1.aggregator_name IS NULL
AND _inserted_timestamp < (
SELECT
MAX(
_inserted_timestamp
) - INTERVAL '36 hours'
FROM
{{ this }}
)
AND EXISTS (
SELECT
1
FROM
{{ ref('silver__aggregator_list') }} A
WHERE
A._inserted_timestamp > DATEADD('DAY', -2, SYSDATE())
AND A.aggregator_type = 'calldata'
AND RIGHT(
t1.input_data,
8
) = A.aggregator_identifier
)
)
)
OR (
t.block_number IN (
SELECT
DISTINCT t1.block_number AS block_number
FROM
{{ this }}
t1
WHERE
t1.origin_to_address IN (
SELECT
aggregator_identifier
FROM
{{ ref('silver__aggregator_list') }}
WHERE
aggregator_type = 'router'
AND _inserted_timestamp >= DATEADD('DAY', -2, SYSDATE()))
AND _inserted_timestamp < (
SELECT
MAX(
_inserted_timestamp
) - INTERVAL '36 hours'
FROM
{{ this }}
)
AND EXISTS (
SELECT
1
FROM
{{ ref('silver__aggregator_list') }}
a2
WHERE
a2._inserted_timestamp > DATEADD('DAY', -2, SYSDATE())
AND t1.origin_to_address = a2.aggregator_identifier
AND a2.aggregator_type = 'router')
)
)
),
{% endif %}
combined AS (
SELECT
block_number,
block_timestamp,
tx_hash,
event_index,
event_type,
platform_address,
COALESCE(
a2.aggregator,
platform_name
) AS platform_name,
platform_exchange_version,
COALESCE(
aggregator_name,
A.aggregator
) AS aggregator_name,
seller_address,
buyer_address,
nft_address,
C.token_name AS project_name,
erc1155_value,
tokenId,
currency_symbol,
currency_address,
total_price_raw,
total_fees_raw,
platform_fee_raw,
creator_fee_raw,
price,
price_usd,
total_fees,
total_fees_usd,
platform_fee,
platform_fee_usd,
creator_fee,
creator_fee_usd,
tx_fee,
tx_fee_usd,
origin_from_address,
origin_to_address,
origin_function_signature,
nft_log_id,
input_data,
_log_id,
b._inserted_timestamp,
{{ dbt_utils.generate_surrogate_key(
['tx_hash', 'event_index', 'nft_address','tokenId','platform_exchange_version']
) }} AS complete_nft_sales_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
final_base b
LEFT JOIN {{ ref('silver__contracts') }} C
ON b.nft_address = C.contract_address
LEFT JOIN {{ ref('silver__aggregator_list') }} A
ON RIGHT(
b.input_data,
8
) = A.aggregator_identifier
AND A.aggregator_type = 'calldata'
LEFT JOIN {{ ref('silver__aggregator_list') }}
a2
ON b.origin_to_address = a2.aggregator_identifier
AND a2.aggregator_type = 'router'
{% if is_incremental() and var(
'HEAL_MODEL'
) %}
UNION ALL
SELECT
block_number,
block_timestamp,
tx_hash,
event_index,
event_type,
platform_address,
platform_name,
platform_exchange_version,
aggregator_name,
seller_address,
buyer_address,
nft_address,
project_name,
erc1155_value,
tokenId,
currency_symbol,
currency_address,
total_price_raw,
total_fees_raw,
platform_fee_raw,
creator_fee_raw,
price,
price_usd,
total_fees,
total_fees_usd,
platform_fee,
platform_fee_usd,
creator_fee,
creator_fee_usd,
tx_fee,
tx_fee_usd,
origin_from_address,
origin_to_address,
origin_function_signature,
nft_log_id,
input_data,
_log_id,
_inserted_timestamp,
{{ dbt_utils.generate_surrogate_key(
['tx_hash', 'event_index', 'nft_address','tokenId','platform_exchange_version']
) }} AS complete_nft_sales_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
heal_model
{% endif %}
)
SELECT
block_number,
block_timestamp,
tx_hash,
event_index,
event_type,
platform_address,
platform_name,
platform_exchange_version,
aggregator_name,
seller_address,
buyer_address,
nft_address,
project_name,
erc1155_value,
tokenId,
currency_symbol,
currency_address,
total_price_raw,
total_fees_raw,
platform_fee_raw,
creator_fee_raw,
price,
price_usd,
total_fees,
total_fees_usd,
platform_fee,
platform_fee_usd,
creator_fee,
creator_fee_usd,
tx_fee,
tx_fee_usd,
origin_from_address,
origin_to_address,
origin_function_signature,
nft_log_id,
input_data,
_log_id,
_inserted_timestamp,
complete_nft_sales_id,
inserted_timestamp,
modified_timestamp,
_invocation_id
FROM
combined qualify (ROW_NUMBER() over(PARTITION BY nft_log_id
ORDER BY
_inserted_timestamp DESC)) = 1

View File

@ -1,537 +0,0 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'delete+insert',
unique_key = ['block_number','platform_name','platform_exchange_version'],
cluster_by = ['block_timestamp::DATE'],
tags = ['curated','reorg']
) }}
WITH nft_base_models AS (
SELECT
block_number,
block_timestamp,
tx_hash,
event_index,
event_type,
platform_address,
platform_name,
platform_exchange_version,
seller_address,
buyer_address,
nft_address,
erc1155_value :: STRING AS erc1155_value,
tokenId,
currency_address,
total_price_raw,
total_fees_raw,
platform_fee_raw,
creator_fee_raw,
tx_fee,
origin_from_address,
origin_to_address,
origin_function_signature,
input_data,
nft_log_id,
_log_id,
_inserted_timestamp
FROM
{{ ref('silver__quix_sales') }}
{% if is_incremental() %}
WHERE
_inserted_timestamp >= (
SELECT
MAX(
_inserted_timestamp
) - INTERVAL '36 hours'
FROM
{{ this }}
)
{% endif %}
UNION ALL
SELECT
block_number,
block_timestamp,
tx_hash,
event_index,
event_type,
platform_address,
platform_name,
platform_exchange_version,
seller_address,
buyer_address,
nft_address,
erc1155_value :: STRING AS erc1155_value,
tokenId,
currency_address,
total_price_raw,
total_fees_raw,
platform_fee_raw,
creator_fee_raw,
tx_fee,
origin_from_address,
origin_to_address,
origin_function_signature,
input_data,
nft_log_id,
_log_id,
_inserted_timestamp
FROM
{{ ref('silver__quix_seaport_sales') }}
{% if is_incremental() %}
WHERE
_inserted_timestamp >= (
SELECT
MAX(
_inserted_timestamp
) - INTERVAL '36 hours'
FROM
{{ this }}
)
{% endif %}
UNION ALL
SELECT
block_number,
block_timestamp,
tx_hash,
event_index,
event_type,
platform_address,
platform_name,
platform_exchange_version,
seller_address,
buyer_address,
nft_address,
erc1155_value :: STRING AS erc1155_value,
tokenId,
currency_address,
total_price_raw,
total_fees_raw,
platform_fee_raw,
creator_fee_raw,
tx_fee,
origin_from_address,
origin_to_address,
origin_function_signature,
input_data,
nft_log_id,
_log_id,
_inserted_timestamp
FROM
{{ ref('silver__seaport_1_1_sales') }}
{% if is_incremental() %}
WHERE
_inserted_timestamp >= (
SELECT
MAX(
_inserted_timestamp
) - INTERVAL '36 hours'
FROM
{{ this }}
)
{% endif %}
UNION ALL
SELECT
block_number,
block_timestamp,
tx_hash,
event_index,
event_type,
platform_address,
platform_name,
platform_exchange_version,
seller_address,
buyer_address,
nft_address,
erc1155_value :: STRING AS erc1155_value,
tokenId,
currency_address,
total_price_raw,
total_fees_raw,
platform_fee_raw,
creator_fee_raw,
tx_fee,
origin_from_address,
origin_to_address,
origin_function_signature,
input_data,
nft_log_id,
_log_id,
_inserted_timestamp
FROM
{{ ref('silver__seaport_1_4_sales') }}
{% if is_incremental() %}
WHERE
_inserted_timestamp >= (
SELECT
MAX(
_inserted_timestamp
) - INTERVAL '36 hours'
FROM
{{ this }}
)
{% endif %}
UNION ALL
SELECT
block_number,
block_timestamp,
tx_hash,
event_index,
event_type,
platform_address,
platform_name,
platform_exchange_version,
seller_address,
buyer_address,
nft_address,
erc1155_value :: STRING AS erc1155_value,
tokenId,
currency_address,
total_price_raw,
total_fees_raw,
platform_fee_raw,
creator_fee_raw,
tx_fee,
origin_from_address,
origin_to_address,
origin_function_signature,
input_data,
nft_log_id,
_log_id,
_inserted_timestamp
FROM
{{ ref('silver__seaport_1_5_sales') }}
{% if is_incremental() %}
WHERE
_inserted_timestamp >= (
SELECT
MAX(
_inserted_timestamp
) - INTERVAL '36 hours'
FROM
{{ this }}
)
{% endif %}
),
prices_raw AS (
SELECT
HOUR,
symbol,
token_address,
decimals,
price AS hourly_prices
FROM
{{ ref('price__ez_hourly_token_prices') }}
WHERE
token_address IN (
SELECT
DISTINCT currency_address
FROM
nft_base_models
)
AND HOUR :: DATE IN (
SELECT
DISTINCT block_timestamp :: DATE
FROM
nft_base_models
)
),
all_prices AS (
SELECT
HOUR,
symbol,
token_address,
decimals,
hourly_prices
FROM
prices_raw
UNION ALL
SELECT
HOUR,
'ETH' AS symbol,
'ETH' AS token_address,
decimals,
hourly_prices
FROM
prices_raw
WHERE
token_address = '0x4200000000000000000000000000000000000006'
),
eth_price AS (
SELECT
HOUR,
hourly_prices AS eth_price_hourly
FROM
prices_raw
WHERE
token_address = '0x4200000000000000000000000000000000000006'
),
final_base AS (
SELECT
block_number,
block_timestamp,
tx_hash,
event_index,
event_type,
platform_address,
CASE
WHEN origin_to_address IN (
'0xbbbbbbbe843515689f3182b748b5671665541e58'
) THEN 'bluesweep'
ELSE platform_name
END AS platform_name,
platform_exchange_version,
NULL AS aggregator_name,
seller_address,
buyer_address,
nft_address,
C.token_name AS project_name,
erc1155_value,
tokenId,
p.symbol AS currency_symbol,
currency_address,
total_price_raw,
total_fees_raw,
platform_fee_raw,
creator_fee_raw,
CASE
WHEN currency_address IN (
'ETH',
'0x4200000000000000000000000000000000000042',
'0x4200000000000000000000000000000000000006'
) THEN total_price_raw / pow(
10,
18
)
ELSE COALESCE (total_price_raw / pow(10, decimals), total_price_raw)
END AS price,
IFF(
decimals IS NULL,
0,
price * hourly_prices
) AS price_usd,
CASE
WHEN currency_address IN (
'ETH',
'0x4200000000000000000000000000000000000042',
'0x4200000000000000000000000000000000000006'
) THEN total_fees_raw / pow(
10,
18
)
ELSE COALESCE (total_fees_raw / pow(10, decimals), total_fees_raw)
END AS total_fees,
IFF(
decimals IS NULL,
0,
total_fees * hourly_prices
) AS total_fees_usd,
CASE
WHEN currency_address IN (
'ETH',
'0x4200000000000000000000000000000000000042',
'0x4200000000000000000000000000000000000006'
) THEN platform_fee_raw / pow(
10,
18
)
ELSE COALESCE (platform_fee_raw / pow(10, decimals), platform_fee_raw)
END AS platform_fee,
IFF(
decimals IS NULL,
0,
platform_fee * hourly_prices
) AS platform_fee_usd,
CASE
WHEN currency_address IN (
'ETH',
'0x4200000000000000000000000000000000000042',
'0x4200000000000000000000000000000000000006'
) THEN creator_fee_raw / pow(
10,
18
)
ELSE COALESCE (creator_fee_raw / pow(10, decimals), creator_fee_raw)
END AS creator_fee,
IFF(
decimals IS NULL,
0,
creator_fee * hourly_prices
) AS creator_fee_usd,
tx_fee,
tx_fee * eth_price_hourly AS tx_fee_usd,
origin_from_address,
origin_to_address,
origin_function_signature,
nft_log_id,
input_data,
_log_id,
b._inserted_timestamp
FROM
nft_base_models b
LEFT JOIN all_prices p
ON DATE_TRUNC(
'hour',
b.block_timestamp
) = p.hour
AND b.currency_address = p.token_address
LEFT JOIN eth_price e
ON DATE_TRUNC(
'hour',
b.block_timestamp
) = e.hour
LEFT JOIN {{ ref('silver__contracts') }} C
ON b.nft_address = C.contract_address
)
{% if is_incremental() %},
label_fill_sales AS (
SELECT
block_number,
block_timestamp,
tx_hash,
event_index,
event_type,
platform_address,
platform_name,
platform_exchange_version,
NULL AS aggregator_name,
seller_address,
buyer_address,
nft_address,
C.token_name AS project_name,
erc1155_value,
tokenId,
currency_symbol,
currency_address,
total_price_raw,
total_fees_raw,
platform_fee_raw,
creator_fee_raw,
price,
price_usd,
total_fees,
total_fees_usd,
platform_fee,
platform_fee_usd,
creator_fee,
creator_fee_usd,
tx_fee,
tx_fee_usd,
origin_from_address,
origin_to_address,
origin_function_signature,
nft_log_id,
input_data,
_log_id,
GREATEST(
t._inserted_timestamp,
C._inserted_timestamp
) AS _inserted_timestamp
FROM
{{ this }}
t
INNER JOIN {{ ref('silver__contracts') }} C
ON t.nft_address = C.contract_address
WHERE
t.project_name IS NULL
AND C.token_name IS NOT NULL
),
blocks_fill AS (
SELECT
* exclude (
complete_nft_sales_id,
inserted_timestamp,
modified_timestamp,
_invocation_id
)
FROM
{{ this }}
WHERE
block_number IN (
SELECT
block_number
FROM
label_fill_sales
)
AND nft_log_id NOT IN (
SELECT
nft_log_id
FROM
label_fill_sales
)
)
{% endif %},
final_joins AS (
SELECT
*
FROM
final_base
{% if is_incremental() %}
UNION ALL
SELECT
*
FROM
label_fill_sales
UNION ALL
SELECT
*
FROM
blocks_fill
{% endif %}
)
SELECT
block_number,
block_timestamp,
tx_hash,
event_index,
event_type,
platform_address,
platform_name,
platform_exchange_version,
aggregator_name,
seller_address,
buyer_address,
nft_address,
project_name,
erc1155_value,
tokenId,
currency_symbol,
currency_address,
total_price_raw,
total_fees_raw,
platform_fee_raw,
creator_fee_raw,
price,
price_usd,
total_fees,
total_fees_usd,
platform_fee,
platform_fee_usd,
creator_fee,
creator_fee_usd,
tx_fee,
tx_fee_usd,
origin_from_address,
origin_to_address,
origin_function_signature,
nft_log_id,
input_data,
_log_id,
_inserted_timestamp,
{{ dbt_utils.generate_surrogate_key(
['tx_hash', 'event_index', 'nft_address','tokenId','platform_exchange_version']
) }} AS complete_nft_sales_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
final_joins qualify(ROW_NUMBER() over(PARTITION BY nft_log_id
ORDER BY
_inserted_timestamp DESC)) = 1

View File

@ -4,7 +4,7 @@
unique_key = 'block_number',
cluster_by = ['block_timestamp::DATE', '_inserted_timestamp::DATE'],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(contract_address, tx_hash)",
tags = ['curated','reorg']
tags = ['curated','reorg', 'heal']
) }}
WITH base AS (
@ -325,61 +325,10 @@ transfer_base AS (
to_address IS NOT NULL
)
{% if is_incremental() %},
fill_transfers AS (
SELECT
t.block_number,
t.block_timestamp,
t.tx_hash,
t.event_index,
t.intra_event_index,
t.contract_address,
C.token_name AS project_name,
t.from_address,
t.to_address,
t.tokenId,
t.erc1155_value,
t.event_type,
t.token_transfer_type,
t._log_id,
GREATEST(
t._inserted_timestamp,
C._inserted_timestamp
) AS _inserted_timestamp
FROM
{{ this }}
t
INNER JOIN {{ ref('silver__contracts') }} C USING (contract_address)
WHERE
t.project_name IS NULL
AND C.token_name IS NOT NULL
),
blocks_fill AS (
SELECT
* exclude (
nft_transfers_id,
inserted_timestamp,
modified_timestamp,
_invocation_id
)
FROM
{{ this }}
WHERE
block_number IN (
SELECT
block_number
FROM
fill_transfers
)
AND _log_id NOT IN (
SELECT
_log_id
FROM
fill_transfers
)
)
{% endif %},
final_base AS (
{% if is_incremental() and var(
'HEAL_MODEL'
) %},
heal_model AS (
SELECT
block_number,
block_timestamp,
@ -387,7 +336,7 @@ final_base AS (
event_index,
intra_event_index,
contract_address,
project_name,
C.token_name AS project_name,
from_address,
to_address,
tokenId,
@ -395,51 +344,73 @@ final_base AS (
event_type,
token_transfer_type,
_log_id,
_inserted_timestamp
t._inserted_timestamp
FROM
transfer_base
{{ this }}
t
LEFT JOIN {{ ref('silver__contracts') }} C USING (contract_address)
WHERE
t.block_number IN (
SELECT
DISTINCT t1.block_number AS block_number
FROM
{{ this }}
t1
WHERE
t1.project_name IS NULL
AND _inserted_timestamp < (
SELECT
MAX(
_inserted_timestamp
) - INTERVAL '36 hours'
FROM
{{ this }}
)
AND EXISTS (
SELECT
1
FROM
{{ ref('silver__contracts') }} C
WHERE
C._inserted_timestamp > DATEADD('DAY', -14, SYSDATE())
AND C.token_name IS NOT NULL
AND C.contract_address = t1.contract_address)
)
)
{% endif %}
SELECT
block_number,
block_timestamp,
tx_hash,
event_index,
intra_event_index,
contract_address,
A.project_name,
from_address,
to_address,
tokenId,
erc1155_value,
event_type,
token_transfer_type,
_log_id,
A._inserted_timestamp,
{{ dbt_utils.generate_surrogate_key(
['tx_hash','event_index','intra_event_index']
) }} AS nft_transfers_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
transfer_base A qualify ROW_NUMBER() over (
PARTITION BY _log_id
ORDER BY
A._inserted_timestamp DESC
) = 1
{% if is_incremental() %}
{% if is_incremental() and var(
'HEAL_MODEL'
) %}
UNION ALL
SELECT
block_number,
block_timestamp,
tx_hash,
event_index,
intra_event_index,
contract_address,
project_name,
from_address,
to_address,
tokenId,
erc1155_value,
event_type,
token_transfer_type,
_log_id,
_inserted_timestamp
FROM
fill_transfers
UNION ALL
SELECT
block_number,
block_timestamp,
tx_hash,
event_index,
intra_event_index,
contract_address,
project_name,
from_address,
to_address,
tokenId,
erc1155_value,
event_type,
token_transfer_type,
_log_id,
_inserted_timestamp
FROM
blocks_fill
{% endif %}
)
SELECT
block_number,
block_timestamp,
@ -463,8 +434,5 @@ SELECT
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
final_base qualify ROW_NUMBER() over (
PARTITION BY _log_id
ORDER BY
_inserted_timestamp DESC
) = 1
heal_model
{% endif %}