Merge pull request #79 from FlipsideCrypto/silver-balances-logic
Some checks failed
docs_update / docs_update (push) Has been cancelled
docs_update / notify-failure (push) Has been cancelled
dbt_test_monthly / run_dbt_jobs (push) Has been cancelled
dbt_test_monthly / notify-failure (push) Has been cancelled

silver-balances-logic
This commit is contained in:
stanz 2026-01-07 16:17:23 +01:00 committed by GitHub
commit a0c6ea07b0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 457 additions and 29 deletions

View File

@ -0,0 +1,32 @@
{% docs balances__ez_balances %}
## Description
This table provides token balances for all addresses holding verified fungible assets on the Aptos blockchain. It combines a historical snapshot with recent balance changes to provide comprehensive balance data. Each row represents a unique address-token combination, with decimal-adjusted balances and USD valuations using end-of-day token prices.
**Data Structure:** The table unions:
- Historical balances from a point-in-time snapshot (configurable via `SNAPSHOT_DATE` variable, default: 2025-09-01)
- Recent balance changes that occurred after the snapshot date
## Key Use Cases
- Portfolio tracking and balance analysis
- Wallet wealth distribution and concentration metrics
- Token holder analysis and whale tracking
- DeFi TVL calculations and protocol health monitoring
- Point-in-time balance queries
## Important Relationships
- Sources historical data from `silver.balances_snapshot` for the snapshot date
- Sources recent data from `silver.balances` for post-snapshot changes
- Joins to `core.dim_tokens` for token metadata (symbol, name, decimals)
- Joins to `price.ez_prices_hourly` for end-of-day USD price valuations
- Can be joined with `core.dim_labels` for address labeling and entity identification
## Commonly-used Fields
- `balance_date`: The date of the balance record (snapshot date or block date)
- `address`: Core field for wallet-level analysis and filtering
- `token_address`: Essential for token-specific balance queries and aggregations
- `balance`: Decimal-adjusted balance for human-readable amounts
- `balance_usd`: Critical for portfolio valuation and cross-token comparisons
- `last_balance_change`: Timestamp of the most recent balance modification
{% enddocs %}

View File

@ -0,0 +1,85 @@
{{ config(
materialized = 'view',
tags = ['balances']
) }}
{# Use the same snapshot date as the silver snapshot model #}
{% set snapshot_date = var('SNAPSHOT_DATE', '2025-09-02') %}
WITH snapshot_balances AS (
-- Historical balances from snapshot (state as of snapshot date)
SELECT
snapshot_date AS balance_date,
address,
token_address,
balance,
frozen,
block_timestamp AS last_balance_change,
balances_snapshot_id AS ez_balances_id,
inserted_timestamp,
modified_timestamp
FROM
{{ ref('silver__balances_snapshot') }}
),
recent_balances AS (
-- Balances that occurred after the snapshot date
SELECT
block_date AS balance_date,
address,
token_address,
balance,
frozen,
block_timestamp AS last_balance_change,
balances_id AS ez_balances_id,
inserted_timestamp,
modified_timestamp
FROM
{{ ref('silver__balances') }}
WHERE
block_date > '{{ snapshot_date }}'::DATE
),
combined_balances AS (
SELECT * FROM snapshot_balances
UNION ALL
SELECT * FROM recent_balances
),
prices AS (
SELECT
token_address,
hour::DATE AS price_date,
price,
is_verified
FROM
{{ ref('price__ez_prices_hourly') }}
QUALIFY ROW_NUMBER() OVER (
PARTITION BY token_address, hour::DATE
ORDER BY hour DESC
) = 1
)
SELECT
b.balance_date,
b.address,
b.token_address,
b.balance AS balance_raw,
b.balance / NULLIF(POW(10, COALESCE(t.decimals, 0)), 0) AS balance,
(b.balance / NULLIF(POW(10, COALESCE(t.decimals, 0)), 0)) * p.price AS balance_usd,
t.symbol,
t.name AS token_name,
t.decimals,
b.frozen,
b.last_balance_change,
p.is_verified AS token_is_verified,
b.ez_balances_id,
b.inserted_timestamp,
b.modified_timestamp
FROM
combined_balances b
LEFT JOIN {{ ref('core__dim_tokens') }} t
ON LOWER(b.token_address) = LOWER(t.token_address)
LEFT JOIN prices p
ON LOWER(b.token_address) = LOWER(p.token_address)
AND b.balance_date = p.price_date

View File

@ -0,0 +1,74 @@
version: 2
models:
- name: balances__ez_balances
description: '{{ doc("balances__ez_balances") }}'
columns:
- name: BALANCE_DATE
description: The date of the balance record. For snapshot data this is the snapshot date; for recent data this is the block date.
tests:
- not_null:
where: balance_date > current_date - 3
- dbt_expectations.expect_row_values_to_have_recent_data:
datepart: day
interval: 2
- name: ADDRESS
description: '{{ doc("address") }}'
tests:
- not_null:
where: balance_date > current_date - 3
- name: TOKEN_ADDRESS
description: '{{ doc("token_address") }}'
tests:
- not_null:
where: balance_date > current_date - 3
- name: BALANCE_RAW
description: The unadjusted token balance as stored on-chain, before decimal adjustment.
tests:
- not_null:
where: balance_date > current_date - 3
- name: BALANCE
description: The decimal-adjusted token balance, representing human-readable token amounts.
- name: BALANCE_USD
description: The USD value of the balance, calculated using the end-of-day token price.
- name: SYMBOL
description: '{{ doc("symbol") }}'
- name: TOKEN_NAME
description: The full name of the token.
- name: DECIMALS
description: '{{ doc("decimals") }}'
- name: FROZEN
description: Boolean indicating whether the fungible asset account is frozen and unable to transfer.
tests:
- not_null:
where: balance_date > current_date - 3
- name: LAST_BALANCE_CHANGE
description: The timestamp when the balance was last modified for this address-token combination.
- name: TOKEN_IS_VERIFIED
description: Boolean indicating whether the token has been verified with a reliable price source.
- name: EZ_BALANCES_ID
description: '{{ doc("pk") }}'
tests:
- unique:
where: balance_date > current_date - 3
- not_null:
where: balance_date > current_date - 3
- name: INSERTED_TIMESTAMP
description: '{{ doc("inserted_timestamp") }}'
- name: MODIFIED_TIMESTAMP
description: '{{ doc("modified_timestamp") }}'

View File

@ -1,43 +1,141 @@
{{ config(
materialized = 'incremental',
unique_key = ['tx_hash','change_index'],
unique_key = ['balances_id'],
incremental_strategy = 'merge',
merge_exclude_columns = ["inserted_timestamp"],
tags = ['core','full_test'],
enabled = false
cluster_by = ['modified_timestamp'],
incremental_predicates = ["dynamic_range_predicate", "block_timestamp::DATE"],
post_hook = '{{ unverify_tokens() }}',
tags = ['daily', 'full_test', 'heal']
) }}
{# cluster_by = ['block_timestamp::DATE','_inserted_timestamp::DATE'],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(tx_hash, change_type,inner_change_type,change_address,change_module,change_resource);",
#}
-- at most one record per (address, token_address) pair per day - we will get the last transaction of the day
WITH verified_tokens AS (
SELECT
DISTINCT token_address
FROM
{{ ref('price__ez_prices_hourly') }}
WHERE
is_verified
),
{% if is_incremental() and var(
'HEAL_MODEL',
false
) %}
newly_verified_tokens AS ({{ get_missing_verified_tokens() }}),
heal_balances AS (
SELECT
C.block_number,
C.block_timestamp,
C.version,
C.change_data :metadata :inner :: STRING AS token_address,
C.change_data :balance :: bigint AS post_balance,
C.change_data :frozen :: BOOLEAN AS frozen,
C.address
FROM
{{ ref('silver__changes') }} C
WHERE
block_timestamp :: DATE >= '2023-07-28'
AND C.change_module = 'fungible_asset'
AND C.change_resource = 'FungibleStore'
AND TRY_CAST(
C.change_data :balance :: STRING AS bigint
) IS NOT NULL
AND C.address IS NOT NULL
AND LOWER(
C.change_data :metadata :inner :: STRING
) IN (
SELECT
token_address
FROM
newly_verified_tokens
)
),
{% endif %}
fungible_asset_balances AS (
SELECT
C.block_number,
C.block_timestamp,
C.version,
C.change_data :metadata :inner :: STRING AS token_address,
C.change_data :balance :: bigint AS post_balance,
C.change_data :frozen :: BOOLEAN AS frozen,
C.address
FROM
{{ ref('silver__changes') }} C
WHERE
block_timestamp :: DATE >= '2023-07-28'
AND C.change_module = 'fungible_asset'
AND C.change_resource = 'FungibleStore'
AND TRY_CAST(
C.change_data :balance :: STRING AS bigint
) IS NOT NULL
AND C.address IS NOT NULL
AND LOWER(token_address) IN (
SELECT
LOWER(token_address)
FROM
verified_tokens
)
{% if is_incremental() %}
AND C.modified_timestamp >= (
SELECT
MAX(modified_timestamp)
FROM
{{ this }}
)
{% endif %}
),
all_balances AS (
SELECT
block_number,
block_timestamp,
version,
token_address,
post_balance,
frozen,
address
FROM
fungible_asset_balances
{% if is_incremental() and var(
'HEAL_MODEL',
false
) %}
UNION ALL
SELECT
block_number,
block_timestamp,
tx_hash,
version,
REPLACE(
REPLACE(
change_resource :: STRING,
'CoinStore<'
),
'>'
) AS token_address,
change_data :coin :value :: INT AS post_balance,
COALESCE(
change_data :deposit_events :guid :id :addr,
change_data :withdraw_events :guid :id :addr,
change_data :coin_amount_event :guid :id :addr
) :: STRING AS account_address,
{{ dbt_utils.generate_surrogate_key(
['block_number','version','account_address','token_address']
) }} AS changes_id,
token_address,
post_balance,
frozen,
address
FROM
heal_balances
{% endif %}
)
SELECT
block_number,
block_timestamp,
block_timestamp :: DATE AS block_date,
version,
address,
token_address,
post_balance AS balance,
frozen,
{{ dbt_utils.generate_surrogate_key(['block_date', 'address', 'token_address']) }} AS balances_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
_inserted_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
{{ ref(
'silver__changes'
) }}
WHERE
post_balance IS NOT NULL
all_balances qualify ROW_NUMBER() over (
PARTITION BY block_timestamp :: DATE,
address,
token_address
ORDER BY
block_timestamp DESC
) = 1

View File

@ -0,0 +1,61 @@
version: 2
models:
- name: silver__balances
description: |
Raw balance changes for verified tokens only. Captures the last balance state
per (address, token_address) per day from fungible asset stores.
columns:
- name: BLOCK_NUMBER
description: The block number of the transaction that modified this balance.
- name: BLOCK_TIMESTAMP
description: The timestamp of the block containing the balance change.
- name: BLOCK_DATE
description: The date portion of the block timestamp, used for daily partitioning.
- name: VERSION
description: '{{ doc("version") }}'
tests:
- not_null:
where: block_date > current_date - 3
- name: ADDRESS
description: '{{ doc("address") }}'
tests:
- not_null:
where: block_date > current_date - 3
- name: TOKEN_ADDRESS
description: '{{ doc("token_address") }}'
tests:
- not_null:
where: block_date > current_date - 3
- name: BALANCE
description: The raw token balance amount before decimal adjustment.
tests:
- not_null:
where: block_date > current_date - 3
- name: FROZEN
description: Boolean indicating whether the fungible asset account is frozen and unable to transfer.
- name: BALANCES_ID
description: '{{ doc("pk") }}'
tests:
- unique:
where: block_date > current_date - 3
- not_null:
where: block_date > current_date - 3
- name: INSERTED_TIMESTAMP
description: '{{ doc("inserted_timestamp") }}'
- name: MODIFIED_TIMESTAMP
description: '{{ doc("modified_timestamp") }}'
- name: _INVOCATION_ID
description: The dbt invocation ID for the run that produced this record.

View File

@ -0,0 +1,30 @@
{{ config(
materialized = 'table',
tags = ['balances_snapshot']
) }}
{# Set snapshot date - override with --var 'SNAPSHOT_DATE:2025-09-02' #}
{% set snapshot_date = var('SNAPSHOT_DATE', '2025-09-02') %}
SELECT
block_number,
block_timestamp,
block_date,
version,
address,
token_address,
balance,
frozen,
'{{ snapshot_date }}'::DATE AS snapshot_date,
{{ dbt_utils.generate_surrogate_key(['address', 'token_address', "'" ~ snapshot_date ~ "'"]) }} AS balances_snapshot_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
{{ ref('silver__balances') }}
WHERE
block_timestamp < '{{ snapshot_date }}'::TIMESTAMP
QUALIFY ROW_NUMBER() OVER (
PARTITION BY address, token_address
ORDER BY version DESC
) = 1

View File

@ -0,0 +1,48 @@
version: 2
models:
- name: silver__balances_snapshot
description: |
Point-in-time snapshot of token balances for verified tokens. Contains one row per
(address, token_address) combination representing the most recent balance state
before the configured SNAPSHOT_DATE variable (default: 2025-09-02).
columns:
- name: BLOCK_NUMBER
description: The block number of the transaction that last modified this balance.
- name: BLOCK_TIMESTAMP
description: The timestamp of the block containing the balance change.
- name: BLOCK_DATE
description: The date portion of the block timestamp.
- name: VERSION
description: '{{ doc("version") }}'
- name: ADDRESS
description: '{{ doc("address") }}'
- name: TOKEN_ADDRESS
description: '{{ doc("token_address") }}'
- name: BALANCE
description: The raw token balance amount before decimal adjustment.
- name: FROZEN
description: Boolean indicating whether the fungible asset account is frozen and unable to transfer.
- name: SNAPSHOT_DATE
description: The configured snapshot date used to filter balance records.
- name: BALANCES_SNAPSHOT_ID
description: '{{ doc("pk") }}'
- name: INSERTED_TIMESTAMP
description: '{{ doc("inserted_timestamp") }}'
- name: MODIFIED_TIMESTAMP
description: '{{ doc("modified_timestamp") }}'
- name: _INVOCATION_ID
description: The dbt invocation ID for the run that produced this record.