revert balances

This commit is contained in:
stanz 2025-11-26 21:09:52 +07:00
parent 58017965a1
commit 4b3c8d2a45

View File

@ -1,170 +1,43 @@
{{ config(
materialized = 'incremental',
unique_key = ['block_date','address','token_address'],
incremental_strategy = 'delete+insert',
unique_key = ['tx_hash','change_index'],
incremental_strategy = 'merge',
merge_exclude_columns = ["inserted_timestamp"],
cluster_by = ['block_date','_inserted_timestamp::DATE'],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(address, token_address);",
tags = ['core','balances','daily'],
enabled = true
tags = ['core','full_test'],
enabled = false
) }}
WITH
fungible_asset_balances AS (
SELECT
c.block_number,
c.block_timestamp,
c.block_timestamp::DATE AS block_date,
c.version,
c.tx_hash,
c.change_data:metadata:inner::STRING AS token_address,
c.change_data:balance::BIGINT AS post_balance,
c.change_data:frozen::BOOLEAN AS frozen,
c.address,
c.modified_timestamp,
c._inserted_timestamp
FROM {{ ref('silver__changes') }} c
INNER JOIN {{ ref('silver__fungible_asset_metadata') }} m
ON c.change_data:metadata:inner::STRING = m.token_address
WHERE c.change_module = 'fungible_asset'
AND c.change_resource = 'FungibleStore'
AND c.change_data:balance IS NOT NULL
AND c.address IS NOT NULL
AND c.change_data:balance::BIGINT > 0
{% if is_incremental() %}
AND c.modified_timestamp >= (
SELECT MAX(modified_timestamp)
FROM {{ this }}
)
{% endif %}
),
coin_balances AS (
SELECT
c.block_number,
c.block_timestamp,
c.block_timestamp::DATE AS block_date,
c.version,
c.tx_hash,
REPLACE(REPLACE(c.change_resource::STRING, 'CoinStore<'), '>') AS token_address,
c.change_data:coin:value::BIGINT AS post_balance,
FALSE AS frozen, -- Coin shouldn't be frozen
COALESCE(
c.change_data:deposit_events:guid:id:addr,
c.change_data:withdraw_events:guid:id:addr,
c.change_data:coin_amount_event:guid:id:addr
)::STRING AS address,
c.modified_timestamp,
c._inserted_timestamp
FROM {{ ref('silver__changes') }} c
INNER JOIN {{ ref('core__dim_tokens') }} t
ON REPLACE(REPLACE(c.change_resource::STRING, 'CoinStore<'), '>') = t.token_address
WHERE c.change_module = 'coin'
AND c.change_resource LIKE 'CoinStore<%'
AND c.change_data:coin:value IS NOT NULL
AND COALESCE(
c.change_data:deposit_events:guid:id:addr,
c.change_data:withdraw_events:guid:id:addr,
c.change_data:coin_amount_event:guid:id:addr
) IS NOT NULL
AND c.change_data:coin:value::BIGINT > 0
{% if is_incremental() %}
AND c.modified_timestamp >= (
SELECT MAX(modified_timestamp)
FROM {{ this }}
)
{% endif %}
),
all_balances AS (
SELECT * FROM fungible_asset_balances
UNION ALL
SELECT * FROM coin_balances
),
token_metadata AS (
SELECT
token_address,
symbol,
decimals,
name
FROM {{ ref('silver__fungible_asset_metadata') }}
UNION ALL
SELECT
token_address,
symbol,
decimals,
name
FROM {{ ref('core__dim_tokens') }}
),
-- Decimal adjustment
balances_with_metadata AS (
SELECT
b.block_date,
b.block_number,
b.version,
b.tx_hash,
b.address,
b.token_address,
m.symbol,
m.decimals,
m.name,
b.post_balance,
b.frozen,
CASE
WHEN m.decimals IS NOT NULL
THEN b.post_balance / POW(10, m.decimals)
ELSE NULL
END AS balance,
b.modified_timestamp,
b._inserted_timestamp
FROM all_balances b
INNER JOIN token_metadata m
ON b.token_address = m.token_address
WHERE b.post_balance > 0
),
daily_balances AS (
SELECT
block_date,
address,
token_address,
symbol,
decimals,
name,
post_balance,
balance,
frozen,
modified_timestamp,
_inserted_timestamp
FROM balances_with_metadata
QUALIFY ROW_NUMBER() OVER (
PARTITION BY block_date, address, token_address
ORDER BY block_number DESC, version DESC
) = 1
)
{# cluster_by = ['block_timestamp::DATE','_inserted_timestamp::DATE'],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(tx_hash, change_type,inner_change_type,change_address,change_module,change_resource);",
#}
SELECT
block_date,
address,
token_address,
symbol,
decimals,
name,
post_balance,
balance,
frozen,
block_number,
block_timestamp,
tx_hash,
version,
REPLACE(
REPLACE(
change_resource :: STRING,
'CoinStore<'
),
'>'
) AS token_address,
change_data :coin :value :: INT AS post_balance,
COALESCE(
change_data :deposit_events :guid :id :addr,
change_data :withdraw_events :guid :id :addr,
change_data :coin_amount_event :guid :id :addr
) :: STRING AS account_address,
{{ dbt_utils.generate_surrogate_key(
['block_date','address','token_address']
) }} AS balances_id,
['block_number','version','account_address','token_address']
) }} AS changes_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
_inserted_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM daily_balances
WHERE balance > 0
FROM
{{ ref(
'silver__changes'
) }}
WHERE
post_balance IS NOT NULL