near-models/models/silver/transfers/silver__token_transfers_complete.sql
2025-05-02 10:54:47 -06:00

267 lines
7.8 KiB
SQL

{{ config(
materialized = 'incremental',
cluster_by = ['block_timestamp::DATE','modified_timestamp::DATE'],
unique_key = 'transfers_complete_id',
incremental_strategy = 'merge',
incremental_predicates = ["dynamic_range_predicate_custom","block_timestamp::date"],
tags = ['scheduled_non_core']
) }}
{% if execute %}
{% if is_incremental() and not var("MANUAL_FIX") %}
{% set max_mod_query %}
SELECT MAX(modified_timestamp) modified_timestamp
FROM {{ this }}
{% endset %}
{% set max_mod = run_query(max_mod_query)[0][0] %}
{% if not max_mod or max_mod == 'None' %}
{% set max_mod = '2099-01-01' %}
{% endif %}
{% do log("max_mod: " ~ max_mod, info=True) %}
{% set min_block_date_query %}
SELECT MIN(block_timestamp::DATE)
FROM (
SELECT MIN(block_timestamp) block_timestamp FROM {{ ref('silver__token_transfer_native') }} WHERE modified_timestamp >= '{{max_mod}}'
UNION ALL
SELECT MIN(block_timestamp) block_timestamp FROM {{ ref('silver__token_transfer_deposit') }} WHERE modified_timestamp >= '{{max_mod}}'
UNION ALL
SELECT MIN(block_timestamp) block_timestamp FROM {{ ref('silver__token_transfer_ft_transfers_method') }} WHERE modified_timestamp >= '{{max_mod}}'
UNION ALL
SELECT MIN(block_timestamp) block_timestamp FROM {{ ref('silver__token_transfer_ft_transfers_event') }} WHERE modified_timestamp >= '{{max_mod}}'
UNION ALL
SELECT MIN(block_timestamp) block_timestamp FROM {{ ref('silver__token_transfer_mints') }} WHERE modified_timestamp >= '{{max_mod}}'
UNION ALL
SELECT MIN(block_timestamp) block_timestamp FROM {{ ref('silver__token_transfer_orders') }} WHERE modified_timestamp >= '{{max_mod}}'
UNION ALL
SELECT MIN(block_timestamp) block_timestamp FROM {{ ref('silver__token_transfer_liquidity') }} WHERE modified_timestamp >= '{{max_mod}}'
)
{% endset %}
{% set min_bd = run_query(min_block_date_query)[0][0] %}
{% if not min_bd or min_bd == 'None' %}
{% set min_bd = '2099-01-01' %}
{% endif %}
{% do log("min_bd: " ~ min_bd, info=True) %}
{% endif %}
{% endif %}
WITH native_transfers AS (
SELECT
block_id,
block_timestamp,
tx_hash,
action_id,
action_index AS rn,
'wrap.near' AS contract_address,
predecessor_id AS from_address,
receiver_id AS to_address,
NULL AS memo,
amount_unadj,
'native' AS transfer_type,
_partition_by_block_number,
modified_timestamp
FROM
{{ ref('silver__token_transfer_native') }}
WHERE
receipt_succeeded
{% if var("MANUAL_FIX") %}
AND {{ partition_load_manual('no_buffer') }}
{% elif is_incremental() %}
AND block_timestamp::DATE >= '{{min_bd}}'
{% endif %}
),
native_deposits AS (
SELECT
block_id,
block_timestamp,
tx_hash,
action_id,
action_index AS rn,
'wrap.near' AS contract_address,
predecessor_id AS from_address,
receiver_id AS to_address,
NULL AS memo,
amount_unadj,
'native' AS transfer_type,
_partition_by_block_number,
modified_timestamp
FROM
{{ ref('silver__token_transfer_deposit') }}
WHERE
receipt_succeeded
{% if var("MANUAL_FIX") %}
AND {{ partition_load_manual('no_buffer') }}
{% elif is_incremental() %}
AND block_timestamp::DATE >= '{{min_bd}}'
{% endif %}
),
ft_transfers_method AS (
SELECT
block_id,
block_timestamp,
tx_hash,
action_id,
rn,
contract_address,
from_address,
to_address,
memo,
amount_unadj,
'nep141' AS transfer_type,
_partition_by_block_number,
modified_timestamp
FROM
{{ ref('silver__token_transfer_ft_transfers_method') }}
WHERE 1=1
{% if var("MANUAL_FIX") %}
AND {{ partition_load_manual('no_buffer') }}
{% elif is_incremental() %}
AND block_timestamp::DATE >= '{{min_bd}}'
{% endif %}
),
ft_transfers_event AS (
SELECT
block_id,
block_timestamp,
tx_hash,
action_id,
rn,
contract_address,
from_address,
to_address,
memo,
amount_unadj,
'nep141' AS transfer_type,
_partition_by_block_number,
modified_timestamp
FROM
{{ ref('silver__token_transfer_ft_transfers_event') }}
WHERE 1=1
{% if var("MANUAL_FIX") %}
AND {{ partition_load_manual('no_buffer') }}
{% elif is_incremental() %}
AND block_timestamp::DATE >= '{{min_bd}}'
{% endif %}
),
mints AS (
SELECT
block_id,
block_timestamp,
tx_hash,
action_id,
rn,
contract_address,
from_address,
to_address,
memo,
amount_unadj,
'nep141' AS transfer_type,
_partition_by_block_number,
modified_timestamp
FROM
{{ ref('silver__token_transfer_mints') }}
WHERE 1=1
{% if var("MANUAL_FIX") %}
AND {{ partition_load_manual('no_buffer') }}
{% elif is_incremental() %}
AND block_timestamp::DATE >= '{{min_bd}}'
{% endif %}
),
orders AS (
SELECT
block_id,
block_timestamp,
tx_hash,
action_id,
rn,
contract_address,
from_address,
to_address,
memo,
amount_unadj,
'nep141' AS transfer_type,
_partition_by_block_number,
modified_timestamp
FROM
{{ ref('silver__token_transfer_orders') }}
WHERE 1=1
{% if var("MANUAL_FIX") %}
AND {{ partition_load_manual('no_buffer') }}
{% elif is_incremental() %}
AND block_timestamp::DATE >= '{{min_bd}}'
{% endif %}
),
liquidity AS (
SELECT
block_id,
block_timestamp,
tx_hash,
action_id,
rn,
contract_address,
from_address,
to_address,
memo,
amount_unadj,
'nep141' AS transfer_type,
_partition_by_block_number,
modified_timestamp
FROM
{{ ref('silver__token_transfer_liquidity') }}
WHERE 1=1
{% if var("MANUAL_FIX") %}
AND {{ partition_load_manual('no_buffer') }}
{% elif is_incremental() %}
AND block_timestamp::DATE >= '{{min_bd}}'
{% endif %}
),
all_transfers AS (
SELECT * FROM native_transfers
UNION ALL
SELECT * FROM native_deposits
UNION ALL
SELECT * FROM ft_transfers_method
UNION ALL
SELECT * FROM ft_transfers_event
UNION ALL
SELECT * FROM mints
UNION ALL
SELECT * FROM orders
UNION ALL
SELECT * FROM liquidity
),
final_transfers AS (
SELECT
block_timestamp,
block_id,
tx_hash,
action_id,
rn,
contract_address,
from_address,
to_address,
memo,
amount_unadj,
transfer_type,
_partition_by_block_number,
{{ dbt_utils.generate_surrogate_key(
['action_id', 'contract_address', 'amount_unadj', 'from_address', 'to_address', 'rn']
) }} AS transfers_complete_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
all_transfers
{% if is_incremental() and not var("MANUAL_FIX") %}
WHERE modified_timestamp >= '{{max_mod}}'
{% endif %}
)
SELECT *
FROM final_transfers
QUALIFY(ROW_NUMBER() OVER (PARTITION BY transfers_complete_id ORDER BY modified_timestamp DESC)) = 1