diff --git a/models/descriptions/tables/balances__ez_balances.md b/models/descriptions/tables/balances__ez_balances.md new file mode 100644 index 0000000..6508113 --- /dev/null +++ b/models/descriptions/tables/balances__ez_balances.md @@ -0,0 +1,32 @@ +{% docs balances__ez_balances %} + +## Description +This table provides token balances for all addresses holding verified fungible assets on the Aptos blockchain. It combines a historical snapshot with recent balance changes to provide comprehensive balance data. Each row represents a unique address-token combination, with decimal-adjusted balances and USD valuations using end-of-day token prices. + +**Data Structure:** The table unions: +- Historical balances from a point-in-time snapshot (configurable via `SNAPSHOT_DATE` variable, default: 2025-09-01) +- Recent balance changes that occurred after the snapshot date + +## Key Use Cases +- Portfolio tracking and balance analysis +- Wallet wealth distribution and concentration metrics +- Token holder analysis and whale tracking +- DeFi TVL calculations and protocol health monitoring +- Point-in-time balance queries + +## Important Relationships +- Sources historical data from `silver.balances_snapshot` for the snapshot date +- Sources recent data from `silver.balances` for post-snapshot changes +- Joins to `core.dim_tokens` for token metadata (symbol, name, decimals) +- Joins to `price.ez_prices_hourly` for end-of-day USD price valuations +- Can be joined with `core.dim_labels` for address labeling and entity identification + +## Commonly-used Fields +- `balance_date`: The date of the balance record (snapshot date or block date) +- `address`: Core field for wallet-level analysis and filtering +- `token_address`: Essential for token-specific balance queries and aggregations +- `balance`: Decimal-adjusted balance for human-readable amounts +- `balance_usd`: Critical for portfolio valuation and cross-token comparisons +- `last_balance_change`: Timestamp of the most recent balance modification + +{% enddocs %} diff --git a/models/gold/balances/balances__ez_balances.sql b/models/gold/balances/balances__ez_balances.sql new file mode 100644 index 0000000..ba99172 --- /dev/null +++ b/models/gold/balances/balances__ez_balances.sql @@ -0,0 +1,85 @@ +{{ config( + materialized = 'view', + tags = ['balances'] +) }} + +{# Use the same snapshot date as the silver snapshot model #} +{% set snapshot_date = var('SNAPSHOT_DATE', '2025-09-02') %} + +WITH snapshot_balances AS ( + -- Historical balances from snapshot (state as of snapshot date) + SELECT + snapshot_date AS balance_date, + address, + token_address, + balance, + frozen, + block_timestamp AS last_balance_change, + balances_snapshot_id AS ez_balances_id, + inserted_timestamp, + modified_timestamp + FROM + {{ ref('silver__balances_snapshot') }} +), + +recent_balances AS ( + -- Balances that occurred after the snapshot date + SELECT + block_date AS balance_date, + address, + token_address, + balance, + frozen, + block_timestamp AS last_balance_change, + balances_id AS ez_balances_id, + inserted_timestamp, + modified_timestamp + FROM + {{ ref('silver__balances') }} + WHERE + block_date > '{{ snapshot_date }}'::DATE +), + +combined_balances AS ( + SELECT * FROM snapshot_balances + UNION ALL + SELECT * FROM recent_balances +), + +prices AS ( + SELECT + token_address, + hour::DATE AS price_date, + price, + is_verified + FROM + {{ ref('price__ez_prices_hourly') }} + QUALIFY ROW_NUMBER() OVER ( + PARTITION BY token_address, hour::DATE + ORDER BY hour DESC + ) = 1 +) + +SELECT + b.balance_date, + b.address, + b.token_address, + b.balance AS balance_raw, + b.balance / NULLIF(POW(10, COALESCE(t.decimals, 0)), 0) AS balance, + (b.balance / NULLIF(POW(10, COALESCE(t.decimals, 0)), 0)) * p.price AS balance_usd, + t.symbol, + t.name AS token_name, + t.decimals, + b.frozen, + b.last_balance_change, + p.is_verified AS token_is_verified, + b.ez_balances_id, + b.inserted_timestamp, + b.modified_timestamp +FROM + combined_balances b + LEFT JOIN {{ ref('core__dim_tokens') }} t + ON LOWER(b.token_address) = LOWER(t.token_address) + LEFT JOIN prices p + ON LOWER(b.token_address) = LOWER(p.token_address) + AND b.balance_date = p.price_date diff --git a/models/gold/balances/balances__ez_balances.yml b/models/gold/balances/balances__ez_balances.yml new file mode 100644 index 0000000..04f3b80 --- /dev/null +++ b/models/gold/balances/balances__ez_balances.yml @@ -0,0 +1,74 @@ +version: 2 + +models: + - name: balances__ez_balances + description: '{{ doc("balances__ez_balances") }}' + + columns: + - name: BALANCE_DATE + description: The date of the balance record. For snapshot data this is the snapshot date; for recent data this is the block date. + tests: + - not_null: + where: balance_date > current_date - 3 + - dbt_expectations.expect_row_values_to_have_recent_data: + datepart: day + interval: 2 + + - name: ADDRESS + description: '{{ doc("address") }}' + tests: + - not_null: + where: balance_date > current_date - 3 + + - name: TOKEN_ADDRESS + description: '{{ doc("token_address") }}' + tests: + - not_null: + where: balance_date > current_date - 3 + + - name: BALANCE_RAW + description: The unadjusted token balance as stored on-chain, before decimal adjustment. + tests: + - not_null: + where: balance_date > current_date - 3 + + - name: BALANCE + description: The decimal-adjusted token balance, representing human-readable token amounts. + + - name: BALANCE_USD + description: The USD value of the balance, calculated using the end-of-day token price. + + - name: SYMBOL + description: '{{ doc("symbol") }}' + + - name: TOKEN_NAME + description: The full name of the token. + + - name: DECIMALS + description: '{{ doc("decimals") }}' + + - name: FROZEN + description: Boolean indicating whether the fungible asset account is frozen and unable to transfer. + tests: + - not_null: + where: balance_date > current_date - 3 + + - name: LAST_BALANCE_CHANGE + description: The timestamp when the balance was last modified for this address-token combination. + + - name: TOKEN_IS_VERIFIED + description: Boolean indicating whether the token has been verified with a reliable price source. + + - name: EZ_BALANCES_ID + description: '{{ doc("pk") }}' + tests: + - unique: + where: balance_date > current_date - 3 + - not_null: + where: balance_date > current_date - 3 + + - name: INSERTED_TIMESTAMP + description: '{{ doc("inserted_timestamp") }}' + + - name: MODIFIED_TIMESTAMP + description: '{{ doc("modified_timestamp") }}' diff --git a/models/silver/core/balances/silver__balances.sql b/models/silver/core/balances/silver__balances.sql index dc7e783..e6e9b9b 100644 --- a/models/silver/core/balances/silver__balances.sql +++ b/models/silver/core/balances/silver__balances.sql @@ -1,43 +1,141 @@ {{ config( materialized = 'incremental', - unique_key = ['tx_hash','change_index'], + unique_key = ['balances_id'], incremental_strategy = 'merge', merge_exclude_columns = ["inserted_timestamp"], - tags = ['core','full_test'], - enabled = false + cluster_by = ['modified_timestamp'], + incremental_predicates = ["dynamic_range_predicate", "block_timestamp::DATE"], + post_hook = '{{ unverify_tokens() }}', + tags = ['daily', 'full_test', 'heal'] ) }} -{# cluster_by = ['block_timestamp::DATE','_inserted_timestamp::DATE'], -post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(tx_hash, change_type,inner_change_type,change_address,change_module,change_resource);", -#} +-- at most one record per (address, token_address) pair per day - we will get the last transaction of the day +WITH verified_tokens AS ( + SELECT + DISTINCT token_address + FROM + {{ ref('price__ez_prices_hourly') }} + WHERE + is_verified +), + +{% if is_incremental() and var( + 'HEAL_MODEL', + false +) %} +newly_verified_tokens AS ({{ get_missing_verified_tokens() }}), +heal_balances AS ( + SELECT + C.block_number, + C.block_timestamp, + C.version, + C.change_data :metadata :inner :: STRING AS token_address, + C.change_data :balance :: bigint AS post_balance, + C.change_data :frozen :: BOOLEAN AS frozen, + C.address + FROM + {{ ref('silver__changes') }} C + WHERE + block_timestamp :: DATE >= '2023-07-28' + AND C.change_module = 'fungible_asset' + AND C.change_resource = 'FungibleStore' + AND TRY_CAST( + C.change_data :balance :: STRING AS bigint + ) IS NOT NULL + AND C.address IS NOT NULL + AND LOWER( + C.change_data :metadata :inner :: STRING + ) IN ( + SELECT + token_address + FROM + newly_verified_tokens + ) +), +{% endif %} + +fungible_asset_balances AS ( + SELECT + C.block_number, + C.block_timestamp, + C.version, + C.change_data :metadata :inner :: STRING AS token_address, + C.change_data :balance :: bigint AS post_balance, + C.change_data :frozen :: BOOLEAN AS frozen, + C.address + FROM + {{ ref('silver__changes') }} C + WHERE + block_timestamp :: DATE >= '2023-07-28' + AND C.change_module = 'fungible_asset' + AND C.change_resource = 'FungibleStore' + AND TRY_CAST( + C.change_data :balance :: STRING AS bigint + ) IS NOT NULL + AND C.address IS NOT NULL + AND LOWER(token_address) IN ( + SELECT + LOWER(token_address) + FROM + verified_tokens + ) + +{% if is_incremental() %} +AND C.modified_timestamp >= ( + SELECT + MAX(modified_timestamp) + FROM + {{ this }} +) +{% endif %} +), +all_balances AS ( + SELECT + block_number, + block_timestamp, + version, + token_address, + post_balance, + frozen, + address + FROM + fungible_asset_balances + +{% if is_incremental() and var( + 'HEAL_MODEL', + false +) %} +UNION ALL SELECT block_number, block_timestamp, - tx_hash, version, - REPLACE( - REPLACE( - change_resource :: STRING, - 'CoinStore<' - ), - '>' - ) AS token_address, - change_data :coin :value :: INT AS post_balance, - COALESCE( - change_data :deposit_events :guid :id :addr, - change_data :withdraw_events :guid :id :addr, - change_data :coin_amount_event :guid :id :addr - ) :: STRING AS account_address, - {{ dbt_utils.generate_surrogate_key( - ['block_number','version','account_address','token_address'] - ) }} AS changes_id, + token_address, + post_balance, + frozen, + address +FROM + heal_balances +{% endif %} +) +SELECT + block_number, + block_timestamp, + block_timestamp :: DATE AS block_date, + version, + address, + token_address, + post_balance AS balance, + frozen, + {{ dbt_utils.generate_surrogate_key(['block_date', 'address', 'token_address']) }} AS balances_id, SYSDATE() AS inserted_timestamp, SYSDATE() AS modified_timestamp, - _inserted_timestamp, '{{ invocation_id }}' AS _invocation_id FROM - {{ ref( - 'silver__changes' - ) }} -WHERE - post_balance IS NOT NULL + all_balances qualify ROW_NUMBER() over ( + PARTITION BY block_timestamp :: DATE, + address, + token_address + ORDER BY + block_timestamp DESC + ) = 1 diff --git a/models/silver/core/balances/silver__balances.yml b/models/silver/core/balances/silver__balances.yml new file mode 100644 index 0000000..b52aced --- /dev/null +++ b/models/silver/core/balances/silver__balances.yml @@ -0,0 +1,61 @@ +version: 2 + +models: + - name: silver__balances + description: | + Raw balance changes for verified tokens only. Captures the last balance state + per (address, token_address) per day from fungible asset stores. + + columns: + - name: BLOCK_NUMBER + description: The block number of the transaction that modified this balance. + + - name: BLOCK_TIMESTAMP + description: The timestamp of the block containing the balance change. + + - name: BLOCK_DATE + description: The date portion of the block timestamp, used for daily partitioning. + + - name: VERSION + description: '{{ doc("version") }}' + tests: + - not_null: + where: block_date > current_date - 3 + + - name: ADDRESS + description: '{{ doc("address") }}' + tests: + - not_null: + where: block_date > current_date - 3 + + - name: TOKEN_ADDRESS + description: '{{ doc("token_address") }}' + tests: + - not_null: + where: block_date > current_date - 3 + + - name: BALANCE + description: The raw token balance amount before decimal adjustment. + tests: + - not_null: + where: block_date > current_date - 3 + + - name: FROZEN + description: Boolean indicating whether the fungible asset account is frozen and unable to transfer. + + - name: BALANCES_ID + description: '{{ doc("pk") }}' + tests: + - unique: + where: block_date > current_date - 3 + - not_null: + where: block_date > current_date - 3 + + - name: INSERTED_TIMESTAMP + description: '{{ doc("inserted_timestamp") }}' + + - name: MODIFIED_TIMESTAMP + description: '{{ doc("modified_timestamp") }}' + + - name: _INVOCATION_ID + description: The dbt invocation ID for the run that produced this record. diff --git a/models/silver/core/balances/silver__balances_snapshot.sql b/models/silver/core/balances/silver__balances_snapshot.sql new file mode 100644 index 0000000..419a548 --- /dev/null +++ b/models/silver/core/balances/silver__balances_snapshot.sql @@ -0,0 +1,30 @@ +{{ config( + materialized = 'table', + tags = ['balances_snapshot'] +) }} + +{# Set snapshot date - override with --var 'SNAPSHOT_DATE:2025-09-02' #} +{% set snapshot_date = var('SNAPSHOT_DATE', '2025-09-02') %} + +SELECT + block_number, + block_timestamp, + block_date, + version, + address, + token_address, + balance, + frozen, + '{{ snapshot_date }}'::DATE AS snapshot_date, + {{ dbt_utils.generate_surrogate_key(['address', 'token_address', "'" ~ snapshot_date ~ "'"]) }} AS balances_snapshot_id, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp, + '{{ invocation_id }}' AS _invocation_id +FROM + {{ ref('silver__balances') }} +WHERE + block_timestamp < '{{ snapshot_date }}'::TIMESTAMP +QUALIFY ROW_NUMBER() OVER ( + PARTITION BY address, token_address + ORDER BY version DESC +) = 1 diff --git a/models/silver/core/balances/silver__balances_snapshot.yml b/models/silver/core/balances/silver__balances_snapshot.yml new file mode 100644 index 0000000..3ed13ca --- /dev/null +++ b/models/silver/core/balances/silver__balances_snapshot.yml @@ -0,0 +1,48 @@ +version: 2 + +models: + - name: silver__balances_snapshot + description: | + Point-in-time snapshot of token balances for verified tokens. Contains one row per + (address, token_address) combination representing the most recent balance state + before the configured SNAPSHOT_DATE variable (default: 2025-09-02). + + columns: + - name: BLOCK_NUMBER + description: The block number of the transaction that last modified this balance. + + - name: BLOCK_TIMESTAMP + description: The timestamp of the block containing the balance change. + + - name: BLOCK_DATE + description: The date portion of the block timestamp. + + - name: VERSION + description: '{{ doc("version") }}' + + - name: ADDRESS + description: '{{ doc("address") }}' + + - name: TOKEN_ADDRESS + description: '{{ doc("token_address") }}' + + - name: BALANCE + description: The raw token balance amount before decimal adjustment. + + - name: FROZEN + description: Boolean indicating whether the fungible asset account is frozen and unable to transfer. + + - name: SNAPSHOT_DATE + description: The configured snapshot date used to filter balance records. + + - name: BALANCES_SNAPSHOT_ID + description: '{{ doc("pk") }}' + + - name: INSERTED_TIMESTAMP + description: '{{ doc("inserted_timestamp") }}' + + - name: MODIFIED_TIMESTAMP + description: '{{ doc("modified_timestamp") }}' + + - name: _INVOCATION_ID + description: The dbt invocation ID for the run that produced this record.