From 9e69166e4fe93b8691b629f8c7ce7ba86c4dd463 Mon Sep 17 00:00:00 2001 From: stanz Date: Wed, 26 Nov 2025 21:25:28 +0700 Subject: [PATCH 01/18] upd silver balances --- .../silver/core/balances/silver__balances.sql | 190 +++++++++++++++--- .../silver/core/balances/silver__balances.yml | 57 ++++++ 2 files changed, 215 insertions(+), 32 deletions(-) create mode 100644 models/silver/core/balances/silver__balances.yml diff --git a/models/silver/core/balances/silver__balances.sql b/models/silver/core/balances/silver__balances.sql index dc7e783..0259b97 100644 --- a/models/silver/core/balances/silver__balances.sql +++ b/models/silver/core/balances/silver__balances.sql @@ -1,43 +1,169 @@ {{ config( materialized = 'incremental', - unique_key = ['tx_hash','change_index'], - incremental_strategy = 'merge', + unique_key = ['block_date','address','token_address'], + incremental_strategy = 'delete+insert', merge_exclude_columns = ["inserted_timestamp"], - tags = ['core','full_test'], - enabled = false + cluster_by = ['block_date','_inserted_timestamp::DATE'], + post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(address, token_address);", + tags = ['core','full_test'] ) }} -{# cluster_by = ['block_timestamp::DATE','_inserted_timestamp::DATE'], -post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(tx_hash, change_type,inner_change_type,change_address,change_module,change_resource);", -#} + +WITH +fungible_asset_balances AS ( + SELECT + c.block_number, + c.block_timestamp, + c.block_timestamp::DATE AS block_date, + c.version, + c.tx_hash, + c.change_data:metadata:inner::STRING AS token_address, + c.change_data:balance::BIGINT AS post_balance, + c.change_data:frozen::BOOLEAN AS frozen, + c.address, + c.modified_timestamp, + c._inserted_timestamp + FROM {{ ref('silver__changes') }} c + INNER JOIN {{ ref('silver__fungible_asset_metadata') }} m + ON c.change_data:metadata:inner::STRING = m.token_address + WHERE c.change_module = 'fungible_asset' + AND c.change_resource = 'FungibleStore' + AND c.change_data:balance IS NOT NULL + AND c.address IS NOT NULL + AND c.change_data:balance::BIGINT > 0 + + {% if is_incremental() %} + AND c.modified_timestamp >= ( + SELECT MAX(modified_timestamp) + FROM {{ this }} + ) + {% endif %} +), + +coin_balances AS ( + SELECT + c.block_number, + c.block_timestamp, + c.block_timestamp::DATE AS block_date, + c.version, + c.tx_hash, + REPLACE(REPLACE(c.change_resource::STRING, 'CoinStore<'), '>') AS token_address, + c.change_data:coin:value::BIGINT AS post_balance, + FALSE AS frozen, -- Coin shouldn't be frozen + COALESCE( + c.change_data:deposit_events:guid:id:addr, + c.change_data:withdraw_events:guid:id:addr, + c.change_data:coin_amount_event:guid:id:addr + )::STRING AS address, + c.modified_timestamp, + c._inserted_timestamp + FROM {{ ref('silver__changes') }} c + INNER JOIN {{ ref('core__dim_tokens') }} t + ON REPLACE(REPLACE(c.change_resource::STRING, 'CoinStore<'), '>') = t.token_address + WHERE c.change_module = 'coin' + AND c.change_resource LIKE 'CoinStore<%' + AND c.change_data:coin:value IS NOT NULL + AND COALESCE( + c.change_data:deposit_events:guid:id:addr, + c.change_data:withdraw_events:guid:id:addr, + c.change_data:coin_amount_event:guid:id:addr + ) IS NOT NULL + AND c.change_data:coin:value::BIGINT > 0 + + {% if is_incremental() %} + AND c.modified_timestamp >= ( + SELECT MAX(modified_timestamp) + FROM {{ this }} + ) + {% endif %} +), + +all_balances AS ( + SELECT * FROM fungible_asset_balances + UNION ALL + SELECT * FROM coin_balances +), + +token_metadata AS ( + SELECT + token_address, + symbol, + decimals, + name + FROM {{ ref('silver__fungible_asset_metadata') }} + + UNION ALL + + SELECT + token_address, + symbol, + decimals, + name + FROM {{ ref('core__dim_tokens') }} +), + +-- Decimal adjustment +balances_with_metadata AS ( + SELECT + b.block_date, + b.block_number, + b.version, + b.tx_hash, + b.address, + b.token_address, + m.symbol, + m.decimals, + m.name, + b.post_balance, + b.frozen, + CASE + WHEN m.decimals IS NOT NULL + THEN b.post_balance / POW(10, m.decimals) + ELSE NULL + END AS balance, + b.modified_timestamp, + b._inserted_timestamp + FROM all_balances b + INNER JOIN token_metadata m + ON b.token_address = m.token_address + WHERE b.post_balance > 0 +), + +daily_balances AS ( + SELECT + block_date, + address, + token_address, + symbol, + decimals, + name, + post_balance, + balance, + frozen, + modified_timestamp, + _inserted_timestamp + FROM balances_with_metadata + QUALIFY ROW_NUMBER() OVER ( + PARTITION BY block_date, address, token_address + ORDER BY block_number DESC, version DESC + ) = 1 +) SELECT - block_number, - block_timestamp, - tx_hash, - version, - REPLACE( - REPLACE( - change_resource :: STRING, - 'CoinStore<' - ), - '>' - ) AS token_address, - change_data :coin :value :: INT AS post_balance, - COALESCE( - change_data :deposit_events :guid :id :addr, - change_data :withdraw_events :guid :id :addr, - change_data :coin_amount_event :guid :id :addr - ) :: STRING AS account_address, + block_date, + address, + token_address, + symbol, + decimals, + name, + post_balance, + balance, + frozen, {{ dbt_utils.generate_surrogate_key( - ['block_number','version','account_address','token_address'] - ) }} AS changes_id, + ['block_date','address','token_address'] + ) }} AS balances_id, SYSDATE() AS inserted_timestamp, SYSDATE() AS modified_timestamp, _inserted_timestamp, '{{ invocation_id }}' AS _invocation_id -FROM - {{ ref( - 'silver__changes' - ) }} -WHERE - post_balance IS NOT NULL +FROM daily_balances +WHERE balance > 0 diff --git a/models/silver/core/balances/silver__balances.yml b/models/silver/core/balances/silver__balances.yml new file mode 100644 index 0000000..54927ac --- /dev/null +++ b/models/silver/core/balances/silver__balances.yml @@ -0,0 +1,57 @@ +version: 2 + +models: + - name: silver__balances + description: | + Daily balance snapshots for ALL Aptos fungible assets (coins and fungible assets). + + tests: + - dbt_constraints.primary_key: + column_name: BALANCES_ID + + columns: + - name: BLOCK_DATE + tests: + - not_null + - name: ADDRESS + tests: + - not_null + - name: TOKEN_ADDRESS + tests: + - not_null + - name: SYMBOL + tests: + - not_null + - name: DECIMALS + tests: + - not_null + - name: NAME + - name: POST_BALANCE + tests: + - not_null + - name: BALANCE + tests: + - not_null + - name: FROZEN + description: | + Account freeze status (FungibleAsset-specific field). + - TRUE: Account/store is frozen (cannot transfer) + - FALSE: Account is active + - For Coin standard balances: Always FALSE (Coin doesn't support freezing) + - name: BALANCES_ID + tests: + - not_null + - name: INSERTED_TIMESTAMP + tests: + - not_null + - name: MODIFIED_TIMESTAMP + tests: + - not_null + - name: _INSERTED_TIMESTAMP + tests: + - name: not_null_silver__balances_INSERTED_TIMESTAMP_ + test_name: not_null + - name: _INVOCATION_ID + tests: + - name: not_null_silver__balances_INVOCATION_ID + test_name: not_null From 8cbad416a91e5a2ec7583c3a2902709f8a45655c Mon Sep 17 00:00:00 2001 From: stanz Date: Wed, 3 Dec 2025 17:29:16 +0700 Subject: [PATCH 02/18] upd balances logic forward fill --- .../silver/core/balances/silver__balances.sql | 180 ++++++++++-------- 1 file changed, 103 insertions(+), 77 deletions(-) diff --git a/models/silver/core/balances/silver__balances.sql b/models/silver/core/balances/silver__balances.sql index 0259b97..4323b04 100644 --- a/models/silver/core/balances/silver__balances.sql +++ b/models/silver/core/balances/silver__balances.sql @@ -1,21 +1,20 @@ {{ config( materialized = 'incremental', - unique_key = ['block_date','address','token_address'], + unique_key = ['block_date', 'address', 'token_address'], incremental_strategy = 'delete+insert', + incremental_predicates = ["dynamic_range_predicate", "block_date"], merge_exclude_columns = ["inserted_timestamp"], - cluster_by = ['block_date','_inserted_timestamp::DATE'], + cluster_by = ['block_date', '_inserted_timestamp::DATE'], post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(address, token_address);", - tags = ['core','full_test'] + tags = ['core', 'full_test'] ) }} -WITH -fungible_asset_balances AS ( +WITH fungible_asset_balances AS ( SELECT c.block_number, c.block_timestamp, c.block_timestamp::DATE AS block_date, c.version, - c.tx_hash, c.change_data:metadata:inner::STRING AS token_address, c.change_data:balance::BIGINT AS post_balance, c.change_data:frozen::BOOLEAN AS frozen, @@ -23,19 +22,14 @@ fungible_asset_balances AS ( c.modified_timestamp, c._inserted_timestamp FROM {{ ref('silver__changes') }} c - INNER JOIN {{ ref('silver__fungible_asset_metadata') }} m - ON c.change_data:metadata:inner::STRING = m.token_address WHERE c.change_module = 'fungible_asset' AND c.change_resource = 'FungibleStore' AND c.change_data:balance IS NOT NULL AND c.address IS NOT NULL - AND c.change_data:balance::BIGINT > 0 - {% if is_incremental() %} - AND c.modified_timestamp >= ( - SELECT MAX(modified_timestamp) - FROM {{ this }} - ) + AND c.modified_timestamp >= ( + SELECT MAX(modified_timestamp) FROM {{ this }} + ) {% endif %} ), @@ -45,10 +39,9 @@ coin_balances AS ( c.block_timestamp, c.block_timestamp::DATE AS block_date, c.version, - c.tx_hash, - REPLACE(REPLACE(c.change_resource::STRING, 'CoinStore<'), '>') AS token_address, + REPLACE(REPLACE(c.change_resource::STRING, 'CoinStore<', ''), '>', '') AS token_address, c.change_data:coin:value::BIGINT AS post_balance, - FALSE AS frozen, -- Coin shouldn't be frozen + FALSE AS frozen, COALESCE( c.change_data:deposit_events:guid:id:addr, c.change_data:withdraw_events:guid:id:addr, @@ -57,8 +50,6 @@ coin_balances AS ( c.modified_timestamp, c._inserted_timestamp FROM {{ ref('silver__changes') }} c - INNER JOIN {{ ref('core__dim_tokens') }} t - ON REPLACE(REPLACE(c.change_resource::STRING, 'CoinStore<'), '>') = t.token_address WHERE c.change_module = 'coin' AND c.change_resource LIKE 'CoinStore<%' AND c.change_data:coin:value IS NOT NULL @@ -67,13 +58,10 @@ coin_balances AS ( c.change_data:withdraw_events:guid:id:addr, c.change_data:coin_amount_event:guid:id:addr ) IS NOT NULL - AND c.change_data:coin:value::BIGINT > 0 - {% if is_incremental() %} - AND c.modified_timestamp >= ( - SELECT MAX(modified_timestamp) - FROM {{ this }} - ) + AND c.modified_timestamp >= ( + SELECT MAX(modified_timestamp) FROM {{ this }} + ) {% endif %} ), @@ -83,49 +71,30 @@ all_balances AS ( SELECT * FROM coin_balances ), -token_metadata AS ( +address_token_pairs AS ( SELECT + address, token_address, - symbol, - decimals, - name - FROM {{ ref('silver__fungible_asset_metadata') }} - - UNION ALL - - SELECT - token_address, - symbol, - decimals, - name - FROM {{ ref('core__dim_tokens') }} + MIN(block_date) AS min_date + FROM all_balances + GROUP BY address, token_address ), --- Decimal adjustment -balances_with_metadata AS ( +date_spine AS ( + SELECT date_day AS block_date + FROM {{ source('crosschain', 'dim_dates') }} + WHERE date_day >= '2022-10-12' + AND date_day <= CURRENT_DATE +), + +address_token_date_spine AS ( SELECT - b.block_date, - b.block_number, - b.version, - b.tx_hash, - b.address, - b.token_address, - m.symbol, - m.decimals, - m.name, - b.post_balance, - b.frozen, - CASE - WHEN m.decimals IS NOT NULL - THEN b.post_balance / POW(10, m.decimals) - ELSE NULL - END AS balance, - b.modified_timestamp, - b._inserted_timestamp - FROM all_balances b - INNER JOIN token_metadata m - ON b.token_address = m.token_address - WHERE b.post_balance > 0 + d.block_date, + p.address, + p.token_address + FROM address_token_pairs p + CROSS JOIN date_spine d + WHERE d.block_date >= p.min_date ), daily_balances AS ( @@ -133,37 +102,94 @@ daily_balances AS ( block_date, address, token_address, - symbol, - decimals, - name, post_balance, - balance, frozen, modified_timestamp, _inserted_timestamp - FROM balances_with_metadata + FROM all_balances QUALIFY ROW_NUMBER() OVER ( PARTITION BY block_date, address, token_address ORDER BY block_number DESC, version DESC ) = 1 +), + +forward_filled_values AS ( + SELECT + s.block_date, + s.address, + s.token_address, + LAST_VALUE(b.post_balance IGNORE NULLS) OVER ( + PARTITION BY s.address, s.token_address + ORDER BY s.block_date + ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW + ) AS post_balance, + LAST_VALUE(b.frozen IGNORE NULLS) OVER ( + PARTITION BY s.address, s.token_address + ORDER BY s.block_date + ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW + ) AS frozen, + LAST_VALUE(b.modified_timestamp IGNORE NULLS) OVER ( + PARTITION BY s.address, s.token_address + ORDER BY s.block_date + ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW + ) AS modified_timestamp, + LAST_VALUE(b._inserted_timestamp IGNORE NULLS) OVER ( + PARTITION BY s.address, s.token_address + ORDER BY s.block_date + ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW + ) AS _inserted_timestamp + FROM address_token_date_spine s + LEFT JOIN daily_balances b + ON s.block_date = b.block_date + AND s.address = b.address + AND s.token_address = b.token_address +), + +forward_filled_balances AS ( + SELECT + block_date, + address, + token_address, + post_balance, + frozen, + modified_timestamp, + _inserted_timestamp, + LAST_VALUE(CASE WHEN post_balance > 0 THEN block_date END IGNORE NULLS) OVER ( + PARTITION BY address, token_address + ORDER BY block_date + ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW + ) AS last_positive_date + FROM forward_filled_values +), + +filtered_balances AS ( + SELECT + block_date, + address, + token_address, + post_balance, + frozen, + modified_timestamp, + _inserted_timestamp + FROM forward_filled_balances + WHERE post_balance IS NOT NULL + AND ( + post_balance > 0 + OR (post_balance = 0 + AND last_positive_date IS NOT NULL + AND DATEDIFF('day', last_positive_date, block_date) <= 3) + ) ) SELECT block_date, address, token_address, - symbol, - decimals, - name, post_balance, - balance, frozen, - {{ dbt_utils.generate_surrogate_key( - ['block_date','address','token_address'] - ) }} AS balances_id, + {{ dbt_utils.generate_surrogate_key(['block_date', 'address', 'token_address']) }} AS balances_id, SYSDATE() AS inserted_timestamp, SYSDATE() AS modified_timestamp, _inserted_timestamp, '{{ invocation_id }}' AS _invocation_id -FROM daily_balances -WHERE balance > 0 +FROM filtered_balances From 62c13565ed18ed0e2571daf3f48f1d21d3d04803 Mon Sep 17 00:00:00 2001 From: stanz Date: Thu, 4 Dec 2025 23:36:22 +0700 Subject: [PATCH 03/18] gold model --- models/gold/core/core__ez_balances.sql | 25 ++++++++++++++ models/gold/core/core__fact_balances.sql | 43 ++++++++++++++++++++++++ 2 files changed, 68 insertions(+) create mode 100644 models/gold/core/core__ez_balances.sql create mode 100644 models/gold/core/core__fact_balances.sql diff --git a/models/gold/core/core__ez_balances.sql b/models/gold/core/core__ez_balances.sql new file mode 100644 index 0000000..4298577 --- /dev/null +++ b/models/gold/core/core__ez_balances.sql @@ -0,0 +1,25 @@ +{{ config( + materialized = 'view', + tags = ['core'] +) }} + +SELECT + f.block_date, + f.address, + f.token_address, + f.balance_unadj, + f.balance, + f.balance * p.price AS balance_usd, + f.symbol, + f.name, + f.decimals, + COALESCE(p.is_verified, FALSE) AS token_is_verified, + f.frozen, + f.fact_balances_id AS ez_balances_id, + f.inserted_timestamp, + f.modified_timestamp +FROM + {{ ref('core__fact_balances') }} f + LEFT JOIN {{ ref('price__ez_prices_hourly') }} p + ON LOWER(f.token_address) = LOWER(p.token_address) + AND p.hour = f.block_date::TIMESTAMP diff --git a/models/gold/core/core__fact_balances.sql b/models/gold/core/core__fact_balances.sql new file mode 100644 index 0000000..38a82b5 --- /dev/null +++ b/models/gold/core/core__fact_balances.sql @@ -0,0 +1,43 @@ +{{ config( + materialized = 'view', + tags = ['core'] +) }} + +WITH balances_with_last_positive AS ( + SELECT + block_date, + address, + token_address, + post_balance, + frozen, + balances_id, + inserted_timestamp, + modified_timestamp, + LAST_VALUE(CASE WHEN post_balance > 0 THEN block_date END IGNORE NULLS) OVER ( + PARTITION BY address, token_address + ORDER BY block_date + ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW + ) AS last_positive_date + FROM {{ ref('silver__balances') }} +) + +SELECT + b.block_date, + b.address, + b.token_address, + b.post_balance AS balance_unadj, + b.post_balance / NULLIF(POW(10, COALESCE(t.decimals, 0)), 0) AS balance, + COALESCE(t.symbol, NULL) AS symbol, + t.name, + t.decimals, + b.frozen, + b.balances_id AS fact_balances_id, + b.inserted_timestamp, + b.modified_timestamp +FROM + balances_with_last_positive b + LEFT JOIN {{ ref('core__dim_tokens') }} t + ON LOWER(b.token_address) = LOWER(t.token_address) +WHERE + b.post_balance > 0 + OR (b.post_balance = 0 AND DATEDIFF('day', b.last_positive_date, b.block_date) <= 1) -- reducing 3d grace period for zero balances to only just 1d of 0's From 7ce176d4269dd0fc0999e6422844e01528df2f28 Mon Sep 17 00:00:00 2001 From: stanz Date: Fri, 5 Dec 2025 00:35:24 +0700 Subject: [PATCH 04/18] yml --- models/gold/core/core__ez_balances.yml | 43 ++++++++++++++++++++++++ models/gold/core/core__fact_balances.yml | 39 +++++++++++++++++++++ 2 files changed, 82 insertions(+) create mode 100644 models/gold/core/core__ez_balances.yml create mode 100644 models/gold/core/core__fact_balances.yml diff --git a/models/gold/core/core__ez_balances.yml b/models/gold/core/core__ez_balances.yml new file mode 100644 index 0000000..396b74c --- /dev/null +++ b/models/gold/core/core__ez_balances.yml @@ -0,0 +1,43 @@ +version: 2 +models: + - name: core__ez_balances + + columns: + - name: BLOCK_DATE + + - name: ADDRESS + description: '{{ doc("address") }}' + + - name: TOKEN_ADDRESS + description: '{{ doc("token_address") }}' + + - name: BALANCE_UNADJ + + - name: BALANCE + + - name: BALANCE_USD + + - name: SYMBOL + description: '{{ doc("symbol") }}' + + - name: NAME + description: '{{ doc("name") }}' + + - name: DECIMALS + description: '{{ doc("decimals") }}' + + - name: TOKEN_IS_VERIFIED + + - name: FROZEN + + - name: EZ_BALANCES_ID + description: '{{ doc("pk") }}' + tests: + - unique: + where: block_date > current_date - 3 + + - name: INSERTED_TIMESTAMP + description: '{{ doc("inserted_timestamp") }}' + + - name: MODIFIED_TIMESTAMP + description: '{{ doc("modified_timestamp") }}' diff --git a/models/gold/core/core__fact_balances.yml b/models/gold/core/core__fact_balances.yml new file mode 100644 index 0000000..3a24a8f --- /dev/null +++ b/models/gold/core/core__fact_balances.yml @@ -0,0 +1,39 @@ +version: 2 +models: + - name: core__fact_balances + + columns: + - name: BLOCK_DATE + + - name: ADDRESS + description: '{{ doc("address") }}' + + - name: TOKEN_ADDRESS + description: '{{ doc("token_address") }}' + + - name: BALANCE_UNADJ + + - name: BALANCE + + - name: SYMBOL + description: '{{ doc("symbol") }}' + + - name: NAME + description: '{{ doc("name") }}' + + - name: DECIMALS + description: '{{ doc("decimals") }}' + + - name: FROZEN + + - name: FACT_BALANCES_ID + description: '{{ doc("pk") }}' + tests: + - unique: + where: block_date > current_date - 3 + + - name: INSERTED_TIMESTAMP + description: '{{ doc("inserted_timestamp") }}' + + - name: MODIFIED_TIMESTAMP + description: '{{ doc("modified_timestamp") }}' From 08b41449ec31aff926c4fd23ec6e95c46fcd529b Mon Sep 17 00:00:00 2001 From: stanz Date: Fri, 5 Dec 2025 00:35:39 +0700 Subject: [PATCH 05/18] rm incremental predicates --- models/silver/core/balances/silver__balances.sql | 1 - 1 file changed, 1 deletion(-) diff --git a/models/silver/core/balances/silver__balances.sql b/models/silver/core/balances/silver__balances.sql index 4323b04..249ef55 100644 --- a/models/silver/core/balances/silver__balances.sql +++ b/models/silver/core/balances/silver__balances.sql @@ -2,7 +2,6 @@ materialized = 'incremental', unique_key = ['block_date', 'address', 'token_address'], incremental_strategy = 'delete+insert', - incremental_predicates = ["dynamic_range_predicate", "block_date"], merge_exclude_columns = ["inserted_timestamp"], cluster_by = ['block_date', '_inserted_timestamp::DATE'], post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(address, token_address);", From 20e566f134e8db7b720cecc79fb607122dcdaa2f Mon Sep 17 00:00:00 2001 From: stanz Date: Fri, 5 Dec 2025 01:17:52 +0700 Subject: [PATCH 06/18] add is_verified --- models/gold/core/core__ez_balances.sql | 2 +- models/gold/core/core__fact_balances.sql | 2 ++ .../silver/core/balances/silver__balances.sql | 27 +++++++++++++------ 3 files changed, 22 insertions(+), 9 deletions(-) diff --git a/models/gold/core/core__ez_balances.sql b/models/gold/core/core__ez_balances.sql index 4298577..c5e4b11 100644 --- a/models/gold/core/core__ez_balances.sql +++ b/models/gold/core/core__ez_balances.sql @@ -13,7 +13,7 @@ SELECT f.symbol, f.name, f.decimals, - COALESCE(p.is_verified, FALSE) AS token_is_verified, + f.token_is_verified, f.frozen, f.fact_balances_id AS ez_balances_id, f.inserted_timestamp, diff --git a/models/gold/core/core__fact_balances.sql b/models/gold/core/core__fact_balances.sql index 38a82b5..28b8fab 100644 --- a/models/gold/core/core__fact_balances.sql +++ b/models/gold/core/core__fact_balances.sql @@ -10,6 +10,7 @@ WITH balances_with_last_positive AS ( token_address, post_balance, frozen, + is_verified, balances_id, inserted_timestamp, modified_timestamp, @@ -30,6 +31,7 @@ SELECT COALESCE(t.symbol, NULL) AS symbol, t.name, t.decimals, + b.is_verified AS token_is_verified, b.frozen, b.balances_id AS fact_balances_id, b.inserted_timestamp, diff --git a/models/silver/core/balances/silver__balances.sql b/models/silver/core/balances/silver__balances.sql index 249ef55..1ce2893 100644 --- a/models/silver/core/balances/silver__balances.sql +++ b/models/silver/core/balances/silver__balances.sql @@ -178,17 +178,28 @@ filtered_balances AS ( AND last_positive_date IS NOT NULL AND DATEDIFF('day', last_positive_date, block_date) <= 3) ) +), + +verified_tokens AS ( + SELECT DISTINCT + token_address, + is_verified + FROM {{ ref('price__ez_prices_hourly') }} + WHERE is_verified = TRUE ) SELECT - block_date, - address, - token_address, - post_balance, - frozen, - {{ dbt_utils.generate_surrogate_key(['block_date', 'address', 'token_address']) }} AS balances_id, + f.block_date, + f.address, + f.token_address, + f.post_balance, + f.frozen, + COALESCE(v.is_verified, FALSE) AS is_verified, + {{ dbt_utils.generate_surrogate_key(['f.block_date', 'f.address', 'f.token_address']) }} AS balances_id, SYSDATE() AS inserted_timestamp, SYSDATE() AS modified_timestamp, - _inserted_timestamp, + f._inserted_timestamp, '{{ invocation_id }}' AS _invocation_id -FROM filtered_balances +FROM filtered_balances f +LEFT JOIN verified_tokens v + ON LOWER(f.token_address) = LOWER(v.token_address) From 5bbb3de850e537b3a57f47967109e30446ea68ce Mon Sep 17 00:00:00 2001 From: stanz Date: Fri, 5 Dec 2025 01:18:01 +0700 Subject: [PATCH 07/18] upd yml --- models/gold/core/core__ez_balances.yml | 20 ++++++++ models/gold/core/core__fact_balances.yml | 22 ++++++++ .../silver/core/balances/silver__balances.yml | 51 ++++++++----------- 3 files changed, 64 insertions(+), 29 deletions(-) diff --git a/models/gold/core/core__ez_balances.yml b/models/gold/core/core__ez_balances.yml index 396b74c..3a66805 100644 --- a/models/gold/core/core__ez_balances.yml +++ b/models/gold/core/core__ez_balances.yml @@ -4,14 +4,26 @@ models: columns: - name: BLOCK_DATE + tests: + - not_null: + where: block_date > current_date - 3 - name: ADDRESS description: '{{ doc("address") }}' + tests: + - not_null: + where: block_date > current_date - 3 - name: TOKEN_ADDRESS description: '{{ doc("token_address") }}' + tests: + - not_null: + where: block_date > current_date - 3 - name: BALANCE_UNADJ + tests: + - not_null: + where: block_date > current_date - 3 - name: BALANCE @@ -27,14 +39,22 @@ models: description: '{{ doc("decimals") }}' - name: TOKEN_IS_VERIFIED + tests: + - not_null: + where: block_date > current_date - 3 - name: FROZEN + tests: + - not_null: + where: block_date > current_date - 3 - name: EZ_BALANCES_ID description: '{{ doc("pk") }}' tests: - unique: where: block_date > current_date - 3 + - not_null: + where: block_date > current_date - 3 - name: INSERTED_TIMESTAMP description: '{{ doc("inserted_timestamp") }}' diff --git a/models/gold/core/core__fact_balances.yml b/models/gold/core/core__fact_balances.yml index 3a24a8f..cc3d3a5 100644 --- a/models/gold/core/core__fact_balances.yml +++ b/models/gold/core/core__fact_balances.yml @@ -4,14 +4,26 @@ models: columns: - name: BLOCK_DATE + tests: + - not_null: + where: block_date > current_date - 3 - name: ADDRESS description: '{{ doc("address") }}' + tests: + - not_null: + where: block_date > current_date - 3 - name: TOKEN_ADDRESS description: '{{ doc("token_address") }}' + tests: + - not_null: + where: block_date > current_date - 3 - name: BALANCE_UNADJ + tests: + - not_null: + where: block_date > current_date - 3 - name: BALANCE @@ -24,13 +36,23 @@ models: - name: DECIMALS description: '{{ doc("decimals") }}' + - name: TOKEN_IS_VERIFIED + tests: + - not_null: + where: block_date > current_date - 3 + - name: FROZEN + tests: + - not_null: + where: block_date > current_date - 3 - name: FACT_BALANCES_ID description: '{{ doc("pk") }}' tests: - unique: where: block_date > current_date - 3 + - not_null: + where: block_date > current_date - 3 - name: INSERTED_TIMESTAMP description: '{{ doc("inserted_timestamp") }}' diff --git a/models/silver/core/balances/silver__balances.yml b/models/silver/core/balances/silver__balances.yml index 54927ac..3d8b7cb 100644 --- a/models/silver/core/balances/silver__balances.yml +++ b/models/silver/core/balances/silver__balances.yml @@ -11,47 +11,40 @@ models: columns: - name: BLOCK_DATE - tests: - - not_null + - name: ADDRESS - tests: - - not_null + description: '{{ doc("address") }}' + - name: TOKEN_ADDRESS - tests: - - not_null - - name: SYMBOL - tests: - - not_null - - name: DECIMALS - tests: - - not_null - - name: NAME + description: '{{ doc("token_address") }}' + - name: POST_BALANCE - tests: - - not_null - - name: BALANCE - tests: - - not_null + - name: FROZEN description: | Account freeze status (FungibleAsset-specific field). - TRUE: Account/store is frozen (cannot transfer) - FALSE: Account is active - For Coin standard balances: Always FALSE (Coin doesn't support freezing) + + - name: IS_VERIFIED + description: | + Whether the token has verified price data. + - TRUE: Token exists in price__ez_prices_hourly with is_verified = TRUE + - FALSE: Token has no verified price data + - name: BALANCES_ID + description: '{{ doc("pk") }}' tests: - - not_null + - unique: + where: block_date > current_date - 3 + - name: INSERTED_TIMESTAMP - tests: - - not_null + description: '{{ doc("inserted_timestamp") }}' + - name: MODIFIED_TIMESTAMP - tests: - - not_null + description: '{{ doc("modified_timestamp") }}' + - name: _INSERTED_TIMESTAMP - tests: - - name: not_null_silver__balances_INSERTED_TIMESTAMP_ - test_name: not_null + - name: _INVOCATION_ID - tests: - - name: not_null_silver__balances_INVOCATION_ID - test_name: not_null From 15a4935042f799ae539581b9efa45a3bac4fa057 Mon Sep 17 00:00:00 2001 From: Eric Laurello Date: Fri, 5 Dec 2025 12:29:20 -0500 Subject: [PATCH 08/18] eric's version of bals --- .../tables/balances__ez_balances_daily.md | 29 +++ .../balances/balances__ez_balances_daily.sql | 41 +++ .../balances/balances__ez_balances_daily.yml | 74 ++++++ models/silver/core/balances/silver__bals.sql | 71 +++++ .../core/balances/silver__bals_daily.sql | 242 ++++++++++++++++++ 5 files changed, 457 insertions(+) create mode 100644 models/descriptions/tables/balances__ez_balances_daily.md create mode 100644 models/gold/balances/balances__ez_balances_daily.sql create mode 100644 models/gold/balances/balances__ez_balances_daily.yml create mode 100644 models/silver/core/balances/silver__bals.sql create mode 100644 models/silver/core/balances/silver__bals_daily.sql diff --git a/models/descriptions/tables/balances__ez_balances_daily.md b/models/descriptions/tables/balances__ez_balances_daily.md new file mode 100644 index 0000000..aaef8ef --- /dev/null +++ b/models/descriptions/tables/balances__ez_balances_daily.md @@ -0,0 +1,29 @@ +{% docs balances__ez_balances_daily %} + +## Description +This table provides daily end-of-day token balances for all addresses holding verified fungible assets on the Aptos blockchain. Each row represents a unique address-token combination for a specific date, with balances forward-filled from the last known balance change. The table includes decimal-adjusted balances and USD valuations using end-of-day token prices. + +**Data Retention:** Daily records are retained for the most recent 95 days. For data older than 95 days, only weekly snapshots (Sundays) are preserved to optimize storage while maintaining historical trend analysis capabilities. + +## Key Use Cases +- Portfolio tracking and historical balance analysis +- Wallet wealth distribution and concentration metrics +- Token holder analysis and whale tracking over time +- DeFi TVL calculations and protocol health monitoring +- Time-series analysis of address holdings + +## Important Relationships +- Sources balance data from `silver.bals_daily` which tracks daily balance snapshots +- Joins to `core.dim_tokens` for token metadata (symbol, name, decimals) +- Joins to `price.ez_prices_hourly` for end-of-day USD price valuations +- Can be joined with `core.dim_labels` for address labeling and entity identification + +## Commonly-used Fields +- `balance_date`: Primary field for time-series analysis and point-in-time balance queries +- `address`: Core field for wallet-level analysis and filtering +- `token_address`: Essential for token-specific balance queries and aggregations +- `balance`: Decimal-adjusted balance for human-readable amounts +- `balance_usd`: Critical for portfolio valuation and cross-token comparisons +- `balance_changed_on_date`: Useful for identifying active vs. stale balances + +{% enddocs %} diff --git a/models/gold/balances/balances__ez_balances_daily.sql b/models/gold/balances/balances__ez_balances_daily.sql new file mode 100644 index 0000000..1b5d113 --- /dev/null +++ b/models/gold/balances/balances__ez_balances_daily.sql @@ -0,0 +1,41 @@ +{{ config( + materialized = 'view', + tags = ['daily_balances'] +) }} + +WITH end_of_day_prices AS ( + SELECT + token_address, + hour::DATE AS price_date, + price + FROM + {{ ref('price__ez_prices_hourly') }} + QUALIFY ROW_NUMBER() OVER ( + PARTITION BY token_address, hour::DATE + ORDER BY hour DESC + ) = 1 +) + +SELECT + b.balance_date, + b.address, + b.token_address, + b.balance AS balance_raw, + b.balance / NULLIF(POW(10, COALESCE(t.decimals, 0)), 0) AS balance, + (b.balance / NULLIF(POW(10, COALESCE(t.decimals, 0)), 0)) * p.price AS balance_usd, + t.symbol, + t.name AS token_name, + t.decimals, + b.frozen, + b.last_balance_change, + b.balance_changed_on_date, + b.balances_daily_id AS ez_balances_daily_id, + b.inserted_timestamp, + b.modified_timestamp +FROM + {{ ref('silver__bals_daily') }} b + LEFT JOIN {{ ref('core__dim_tokens') }} t + ON LOWER(b.token_address) = LOWER(t.token_address) + LEFT JOIN end_of_day_prices p + ON LOWER(b.token_address) = LOWER(p.token_address) + AND b.balance_date = p.price_date diff --git a/models/gold/balances/balances__ez_balances_daily.yml b/models/gold/balances/balances__ez_balances_daily.yml new file mode 100644 index 0000000..280226e --- /dev/null +++ b/models/gold/balances/balances__ez_balances_daily.yml @@ -0,0 +1,74 @@ +version: 2 + +models: + - name: balances__ez_balances_daily + description: '{{ doc("balances__ez_balances_daily") }}' + + columns: + - name: BALANCE_DATE + description: The date representing the end-of-day balance snapshot. + tests: + - not_null: + where: balance_date > current_date - 3 + + - name: ADDRESS + description: '{{ doc("address") }}' + tests: + - not_null: + where: balance_date > current_date - 3 + + - name: TOKEN_ADDRESS + description: '{{ doc("token_address") }}' + tests: + - not_null: + where: balance_date > current_date - 3 + + - name: BALANCE_RAW + description: The unadjusted token balance as stored on-chain, before decimal adjustment. + tests: + - not_null: + where: balance_date > current_date - 3 + + - name: BALANCE + description: The decimal-adjusted token balance, representing human-readable token amounts. + + - name: BALANCE_USD + description: The USD value of the balance, calculated using the end-of-day token price. + + - name: SYMBOL + description: '{{ doc("symbol") }}' + + - name: TOKEN_NAME + description: The full name of the token. + + - name: DECIMALS + description: '{{ doc("decimals") }}' + + - name: FROZEN + description: Boolean indicating whether the fungible asset account is frozen and unable to transfer. + tests: + - not_null: + where: balance_date > current_date - 3 + + - name: LAST_BALANCE_CHANGE + description: The date when the balance was last modified for this address-token combination. + + - name: BALANCE_CHANGED_ON_DATE + description: Boolean indicating whether the balance changed on this specific date, useful for distinguishing actual changes from forward-filled values. + tests: + - not_null: + where: balance_date > current_date - 3 + + - name: EZ_BALANCES_DAILY_ID + description: '{{ doc("pk") }}' + tests: + - unique: + where: balance_date > current_date - 3 + - not_null: + where: balance_date > current_date - 3 + + - name: INSERTED_TIMESTAMP + description: '{{ doc("inserted_timestamp") }}' + + - name: MODIFIED_TIMESTAMP + description: '{{ doc("modified_timestamp") }}' diff --git a/models/silver/core/balances/silver__bals.sql b/models/silver/core/balances/silver__bals.sql new file mode 100644 index 0000000..5b285bc --- /dev/null +++ b/models/silver/core/balances/silver__bals.sql @@ -0,0 +1,71 @@ +{{ config( + materialized = 'incremental', + unique_key = [ 'version'], + incremental_strategy = 'merge', + merge_exclude_columns = ["inserted_timestamp"], + cluster_by = ['modified_timestamp'], + incremental_predicates = ["dynamic_range_predicate", "block_timestamp::DATE"], + tags = ['noncore', 'full_test'] +) }} +-- at most one record per (address, token_address) pair per day - we will get the last transaction of the day +WITH verified_tokens AS ( + + SELECT + DISTINCT token_address + FROM + {{ ref('price__ez_prices_hourly') }} + WHERE + is_verified +), +fungible_asset_balances AS ( + SELECT + C.block_number, + C.block_timestamp, + C.version, + C.change_data :metadata :inner :: STRING AS token_address, + C.change_data :balance :: bigint AS post_balance, + C.change_data :frozen :: BOOLEAN AS frozen, + C.address + FROM + {{ ref('silver__changes') }} C + WHERE + block_timestamp :: DATE >= '2023-07-28' + AND C.change_module = 'fungible_asset' + AND C.change_resource = 'FungibleStore' + AND TRY_CAST( + C.change_data :balance :: STRING AS bigint + ) IS NOT NULL + AND C.address IS NOT NULL + +{% if is_incremental() %} +AND C.modified_timestamp >= ( + SELECT + MAX(modified_timestamp) + FROM + {{ this }} +) +{% endif %} +) +SELECT + f.block_number, + f.block_timestamp, + f.block_timestamp :: DATE AS block_date, + f.version, + f.address, + f.token_address, + f.post_balance AS balance, + f.frozen, + {{ dbt_utils.generate_surrogate_key(['block_date', 'f.address', 'f.token_address']) }} AS balances_id, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp, + '{{ invocation_id }}' AS _invocation_id +FROM + fungible_asset_balances f + JOIN verified_tokens v + ON LOWER( + f.token_address + ) = LOWER( + v.token_address + ) qualify(ROW_NUMBER() over (PARTITION BY balances_id +ORDER BY + block_timestamp DESC)) = 1 diff --git a/models/silver/core/balances/silver__bals_daily.sql b/models/silver/core/balances/silver__bals_daily.sql new file mode 100644 index 0000000..3030865 --- /dev/null +++ b/models/silver/core/balances/silver__bals_daily.sql @@ -0,0 +1,242 @@ +{{ config( + materialized = 'incremental', + unique_key = ['balances_daily_id'], + incremental_predicates = ["dynamic_range_predicate", "balance_date"], + cluster_by = ['balance_date'], + merge_exclude_columns = ["inserted_timestamp"], + post_hook = [ + "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(address, token_address);", + "DELETE FROM {{ this }} WHERE balance_date < CURRENT_DATE - 95 AND DAYOFWEEK(balance_date) != 0;" + ], + tags = ['daily_balances'], + full_refresh = false +) }} + +WITH date_spine AS ( + SELECT + date_day AS balance_date + FROM + {{ source( + 'crosschain', + 'dim_dates' + ) }} + WHERE + date_day >= '2023-07-28' + AND date_day < SYSDATE() :: DATE + +{% if is_incremental() %} +AND date_day > ( + SELECT + MAX(balance_date) + FROM + {{ this }} +) + +{% endif %} +), + +{% if is_incremental() %} +latest_balances_from_table AS ( + SELECT + address, + token_address, + balance, + frozen, + last_balance_change, + balance_date + FROM {{ this }} + WHERE balance_date = ( + SELECT MAX(balance_date) + FROM {{ this }} + ) +), +{% endif %} + +todays_balance_changes AS ( + -- Get balance changes for dates in the date spine + SELECT + block_date AS balance_date, + address, + token_address, + balance, + frozen, + block_timestamp, + ROW_NUMBER() OVER ( + PARTITION BY block_date, address, token_address + ORDER BY block_timestamp DESC, block_number DESC, version DESC + ) AS daily_rank + FROM {{ ref('silver__bals') }} tb + WHERE EXISTS ( + SELECT 1 FROM date_spine ds + WHERE ds.balance_date = tb.block_date + ) +), + +todays_final_balances AS ( + -- Get the last balance change per address-token_address for today + SELECT + balance_date, + address, + token_address, + balance, + frozen, + block_timestamp AS last_balance_change_timestamp, + TRUE AS balance_changed_on_date + FROM todays_balance_changes + WHERE daily_rank = 1 +), + +address_token_combinations AS ( + -- Get all unique address-token_address combinations that have ever had a balance + SELECT DISTINCT + address, + token_address + FROM todays_final_balances +), + +source_data AS ( + {% if is_incremental() %} + -- Check if processing multiple days (batch mode) + {% if execute %} + {% set max_date_query %} + SELECT MAX(balance_date) as max_date FROM {{ this }} + {% endset %} + {% set max_date = run_query(max_date_query).columns[0].values()[0] %} + {% set days_to_process = (modules.datetime.date.today() - max_date).days %} + {% set batch_size = days_to_process if days_to_process <= 60 else 60 %} + {% else %} + {% set batch_size = 1 %} + {% endif %} + + {% if batch_size > 1 %} + -- Multi-day batch: Use window functions for proper forward-filling + SELECT + d.balance_date, + COALESCE(c.address, y.address) AS address, + COALESCE(c.token_address, y.token_address) AS token_address, + -- For balance, use the most recent change within batch, or carry forward from yesterday + COALESCE( + LAST_VALUE(t.balance IGNORE NULLS) OVER ( + PARTITION BY COALESCE(c.address, y.address), COALESCE(c.token_address, y.token_address) + ORDER BY d.balance_date + ROWS UNBOUNDED PRECEDING + ), + y.balance + ) AS balance, + -- For frozen, use the most recent change within batch, or carry forward from yesterday + COALESCE( + LAST_VALUE(t.frozen IGNORE NULLS) OVER ( + PARTITION BY COALESCE(c.address, y.address), COALESCE(c.token_address, y.token_address) + ORDER BY d.balance_date + ROWS UNBOUNDED PRECEDING + ), + y.frozen + ) AS frozen, + -- For last_balance_change, we need to track the most recent change date within the batch + CASE + WHEN MAX(CASE WHEN t.balance_date IS NOT NULL THEN d.balance_date END) OVER ( + PARTITION BY COALESCE(c.address, y.address), COALESCE(c.token_address, y.token_address) + ORDER BY d.balance_date + ROWS UNBOUNDED PRECEDING + ) IS NOT NULL THEN + MAX(CASE WHEN t.balance_date IS NOT NULL THEN d.balance_date END) OVER ( + PARTITION BY COALESCE(c.address, y.address), COALESCE(c.token_address, y.token_address) + ORDER BY d.balance_date + ROWS UNBOUNDED PRECEDING + )::TIMESTAMP + ELSE y.last_balance_change::TIMESTAMP + END AS last_balance_change_timestamp, + CASE WHEN t.balance_date IS NOT NULL THEN TRUE ELSE FALSE END AS balance_changed_on_date + FROM date_spine d + CROSS JOIN ( + -- All addresses that should exist (previous + new) + SELECT address, token_address FROM latest_balances_from_table + UNION + SELECT address, token_address FROM address_token_combinations + ) c + LEFT JOIN todays_final_balances t + ON d.balance_date = t.balance_date + AND c.address = t.address + AND c.token_address = t.token_address + LEFT JOIN latest_balances_from_table y + ON c.address = y.address + AND c.token_address = y.token_address + + {% else %} + -- Single day: Use original efficient logic + SELECT + balance_date, + address, + token_address, + balance, + frozen, + last_balance_change_timestamp, + balance_changed_on_date + FROM todays_final_balances + + UNION ALL + + -- Carry forward yesterday's balances for addresses that didn't change today + SELECT + d.balance_date, + y.address, + y.token_address, + y.balance, + y.frozen, + y.last_balance_change::TIMESTAMP AS last_balance_change_timestamp, + FALSE AS balance_changed_on_date + FROM date_spine d + CROSS JOIN latest_balances_from_table y + LEFT JOIN todays_final_balances t + ON y.address = t.address + AND y.token_address = t.token_address + AND d.balance_date = t.balance_date + WHERE t.address IS NULL -- Only addresses with no changes today + {% endif %} + + {% else %} + -- Full refresh: Create complete time series with forward-filling + SELECT + d.balance_date, + c.address, + c.token_address, + LAST_VALUE(t.balance IGNORE NULLS) OVER ( + PARTITION BY c.address, c.token_address + ORDER BY d.balance_date + ROWS UNBOUNDED PRECEDING + ) AS balance, + LAST_VALUE(t.frozen IGNORE NULLS) OVER ( + PARTITION BY c.address, c.token_address + ORDER BY d.balance_date + ROWS UNBOUNDED PRECEDING + ) AS frozen, + LAST_VALUE(t.last_balance_change_timestamp IGNORE NULLS) OVER ( + PARTITION BY c.address, c.token_address + ORDER BY d.balance_date + ROWS UNBOUNDED PRECEDING + ) AS last_balance_change_timestamp, + CASE WHEN t.balance_date IS NOT NULL THEN TRUE ELSE FALSE END AS balance_changed_on_date + FROM date_spine d + CROSS JOIN address_token_combinations c + LEFT JOIN todays_final_balances t + ON d.balance_date = t.balance_date + AND c.address = t.address + AND c.token_address = t.token_address + {% endif %} +) + +SELECT + balance_date, + address, + token_address, + balance, + frozen, + last_balance_change_timestamp::DATE AS last_balance_change, + balance_changed_on_date, + {{ dbt_utils.generate_surrogate_key(['balance_date', 'address', 'token_address']) }} AS balances_daily_id, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp, + '{{ invocation_id }}' AS _invocation_id +FROM source_data +WHERE balance IS NOT NULL -- Only include addresses that have had at least one balance + AND balance > 0 -- Only include addresses with positive balances From 6500eeee3a8c1c3127129e44ed7db554957bf18c Mon Sep 17 00:00:00 2001 From: stanz Date: Mon, 8 Dec 2025 16:43:58 +0700 Subject: [PATCH 09/18] deprecate silver__balances --- models/silver/core/balances/silver__balances.sql | 12 +++++++++++- models/silver/core/balances/silver__balances.yml | 5 ++++- 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/models/silver/core/balances/silver__balances.sql b/models/silver/core/balances/silver__balances.sql index 1ce2893..f73c4c3 100644 --- a/models/silver/core/balances/silver__balances.sql +++ b/models/silver/core/balances/silver__balances.sql @@ -1,3 +1,12 @@ +{# + DEPRECATED: This model is superseded by silver__bals + silver__bals_daily. + Please use the new two-layer architecture instead: + - silver__bals: Raw balance changes (activity days only) + - silver__bals_daily: Daily aggregated balances with forward-fill + + This model is kept for reference but should not be run. +#} + {{ config( materialized = 'incremental', unique_key = ['block_date', 'address', 'token_address'], @@ -5,7 +14,8 @@ merge_exclude_columns = ["inserted_timestamp"], cluster_by = ['block_date', '_inserted_timestamp::DATE'], post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(address, token_address);", - tags = ['core', 'full_test'] + tags = ['deprecated'], + enabled = false ) }} WITH fungible_asset_balances AS ( diff --git a/models/silver/core/balances/silver__balances.yml b/models/silver/core/balances/silver__balances.yml index 3d8b7cb..9db9133 100644 --- a/models/silver/core/balances/silver__balances.yml +++ b/models/silver/core/balances/silver__balances.yml @@ -3,7 +3,10 @@ version: 2 models: - name: silver__balances description: | - Daily balance snapshots for ALL Aptos fungible assets (coins and fungible assets). + DEPRECATED: This model is superseded by silver__bals + silver__bals_daily. + Please use the new two-layer architecture instead. + + Original description: Daily balance snapshots for ALL Aptos fungible assets (coins and fungible assets). tests: - dbt_constraints.primary_key: From bc9603b4f977dd792262d20d5b589d2b6ab0329d Mon Sep 17 00:00:00 2001 From: stanz Date: Mon, 8 Dec 2025 18:12:15 +0700 Subject: [PATCH 10/18] rm gold --- models/gold/core/core__ez_balances.sql | 25 ---------- models/gold/core/core__ez_balances.yml | 63 ------------------------ models/gold/core/core__fact_balances.sql | 45 ----------------- models/gold/core/core__fact_balances.yml | 61 ----------------------- 4 files changed, 194 deletions(-) delete mode 100644 models/gold/core/core__ez_balances.sql delete mode 100644 models/gold/core/core__ez_balances.yml delete mode 100644 models/gold/core/core__fact_balances.sql delete mode 100644 models/gold/core/core__fact_balances.yml diff --git a/models/gold/core/core__ez_balances.sql b/models/gold/core/core__ez_balances.sql deleted file mode 100644 index c5e4b11..0000000 --- a/models/gold/core/core__ez_balances.sql +++ /dev/null @@ -1,25 +0,0 @@ -{{ config( - materialized = 'view', - tags = ['core'] -) }} - -SELECT - f.block_date, - f.address, - f.token_address, - f.balance_unadj, - f.balance, - f.balance * p.price AS balance_usd, - f.symbol, - f.name, - f.decimals, - f.token_is_verified, - f.frozen, - f.fact_balances_id AS ez_balances_id, - f.inserted_timestamp, - f.modified_timestamp -FROM - {{ ref('core__fact_balances') }} f - LEFT JOIN {{ ref('price__ez_prices_hourly') }} p - ON LOWER(f.token_address) = LOWER(p.token_address) - AND p.hour = f.block_date::TIMESTAMP diff --git a/models/gold/core/core__ez_balances.yml b/models/gold/core/core__ez_balances.yml deleted file mode 100644 index 3a66805..0000000 --- a/models/gold/core/core__ez_balances.yml +++ /dev/null @@ -1,63 +0,0 @@ -version: 2 -models: - - name: core__ez_balances - - columns: - - name: BLOCK_DATE - tests: - - not_null: - where: block_date > current_date - 3 - - - name: ADDRESS - description: '{{ doc("address") }}' - tests: - - not_null: - where: block_date > current_date - 3 - - - name: TOKEN_ADDRESS - description: '{{ doc("token_address") }}' - tests: - - not_null: - where: block_date > current_date - 3 - - - name: BALANCE_UNADJ - tests: - - not_null: - where: block_date > current_date - 3 - - - name: BALANCE - - - name: BALANCE_USD - - - name: SYMBOL - description: '{{ doc("symbol") }}' - - - name: NAME - description: '{{ doc("name") }}' - - - name: DECIMALS - description: '{{ doc("decimals") }}' - - - name: TOKEN_IS_VERIFIED - tests: - - not_null: - where: block_date > current_date - 3 - - - name: FROZEN - tests: - - not_null: - where: block_date > current_date - 3 - - - name: EZ_BALANCES_ID - description: '{{ doc("pk") }}' - tests: - - unique: - where: block_date > current_date - 3 - - not_null: - where: block_date > current_date - 3 - - - name: INSERTED_TIMESTAMP - description: '{{ doc("inserted_timestamp") }}' - - - name: MODIFIED_TIMESTAMP - description: '{{ doc("modified_timestamp") }}' diff --git a/models/gold/core/core__fact_balances.sql b/models/gold/core/core__fact_balances.sql deleted file mode 100644 index 28b8fab..0000000 --- a/models/gold/core/core__fact_balances.sql +++ /dev/null @@ -1,45 +0,0 @@ -{{ config( - materialized = 'view', - tags = ['core'] -) }} - -WITH balances_with_last_positive AS ( - SELECT - block_date, - address, - token_address, - post_balance, - frozen, - is_verified, - balances_id, - inserted_timestamp, - modified_timestamp, - LAST_VALUE(CASE WHEN post_balance > 0 THEN block_date END IGNORE NULLS) OVER ( - PARTITION BY address, token_address - ORDER BY block_date - ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW - ) AS last_positive_date - FROM {{ ref('silver__balances') }} -) - -SELECT - b.block_date, - b.address, - b.token_address, - b.post_balance AS balance_unadj, - b.post_balance / NULLIF(POW(10, COALESCE(t.decimals, 0)), 0) AS balance, - COALESCE(t.symbol, NULL) AS symbol, - t.name, - t.decimals, - b.is_verified AS token_is_verified, - b.frozen, - b.balances_id AS fact_balances_id, - b.inserted_timestamp, - b.modified_timestamp -FROM - balances_with_last_positive b - LEFT JOIN {{ ref('core__dim_tokens') }} t - ON LOWER(b.token_address) = LOWER(t.token_address) -WHERE - b.post_balance > 0 - OR (b.post_balance = 0 AND DATEDIFF('day', b.last_positive_date, b.block_date) <= 1) -- reducing 3d grace period for zero balances to only just 1d of 0's diff --git a/models/gold/core/core__fact_balances.yml b/models/gold/core/core__fact_balances.yml deleted file mode 100644 index cc3d3a5..0000000 --- a/models/gold/core/core__fact_balances.yml +++ /dev/null @@ -1,61 +0,0 @@ -version: 2 -models: - - name: core__fact_balances - - columns: - - name: BLOCK_DATE - tests: - - not_null: - where: block_date > current_date - 3 - - - name: ADDRESS - description: '{{ doc("address") }}' - tests: - - not_null: - where: block_date > current_date - 3 - - - name: TOKEN_ADDRESS - description: '{{ doc("token_address") }}' - tests: - - not_null: - where: block_date > current_date - 3 - - - name: BALANCE_UNADJ - tests: - - not_null: - where: block_date > current_date - 3 - - - name: BALANCE - - - name: SYMBOL - description: '{{ doc("symbol") }}' - - - name: NAME - description: '{{ doc("name") }}' - - - name: DECIMALS - description: '{{ doc("decimals") }}' - - - name: TOKEN_IS_VERIFIED - tests: - - not_null: - where: block_date > current_date - 3 - - - name: FROZEN - tests: - - not_null: - where: block_date > current_date - 3 - - - name: FACT_BALANCES_ID - description: '{{ doc("pk") }}' - tests: - - unique: - where: block_date > current_date - 3 - - not_null: - where: block_date > current_date - 3 - - - name: INSERTED_TIMESTAMP - description: '{{ doc("inserted_timestamp") }}' - - - name: MODIFIED_TIMESTAMP - description: '{{ doc("modified_timestamp") }}' From 893067c31b181ab7c9907f607f73de85c7261bca Mon Sep 17 00:00:00 2001 From: stanz Date: Mon, 8 Dec 2025 19:15:48 +0700 Subject: [PATCH 11/18] yml --- .../balances/balances__ez_balances_daily.yml | 3 + models/silver/core/balances/silver__bals.yml | 54 +++++++++++++++++ .../core/balances/silver__bals_daily.yml | 59 +++++++++++++++++++ 3 files changed, 116 insertions(+) create mode 100644 models/silver/core/balances/silver__bals.yml create mode 100644 models/silver/core/balances/silver__bals_daily.yml diff --git a/models/gold/balances/balances__ez_balances_daily.yml b/models/gold/balances/balances__ez_balances_daily.yml index 280226e..c2ae6cc 100644 --- a/models/gold/balances/balances__ez_balances_daily.yml +++ b/models/gold/balances/balances__ez_balances_daily.yml @@ -59,6 +59,9 @@ models: - not_null: where: balance_date > current_date - 3 + - name: TOKEN_IS_VERIFIED + description: Boolean indicating whether the token has been verified with a reliable price source. + - name: EZ_BALANCES_DAILY_ID description: '{{ doc("pk") }}' tests: diff --git a/models/silver/core/balances/silver__bals.yml b/models/silver/core/balances/silver__bals.yml new file mode 100644 index 0000000..9a54089 --- /dev/null +++ b/models/silver/core/balances/silver__bals.yml @@ -0,0 +1,54 @@ +version: 2 + +models: + - name: silver__bals + description: | + Raw balance changes for verified tokens only. Captures the last balance state + per (address, token_address) per day from fungible asset stores. + + columns: + - name: BLOCK_NUMBER + + - name: BLOCK_TIMESTAMP + + - name: BLOCK_DATE + + - name: VERSION + tests: + - not_null: + where: block_date > current_date - 3 + + - name: ADDRESS + description: '{{ doc("address") }}' + tests: + - not_null: + where: block_date > current_date - 3 + + - name: TOKEN_ADDRESS + description: '{{ doc("token_address") }}' + tests: + - not_null: + where: block_date > current_date - 3 + + - name: BALANCE + tests: + - not_null: + where: block_date > current_date - 3 + + - name: FROZEN + + - name: BALANCES_ID + description: '{{ doc("pk") }}' + tests: + - unique: + where: block_date > current_date - 3 + - not_null: + where: block_date > current_date - 3 + + - name: INSERTED_TIMESTAMP + description: '{{ doc("inserted_timestamp") }}' + + - name: MODIFIED_TIMESTAMP + description: '{{ doc("modified_timestamp") }}' + + - name: _INVOCATION_ID diff --git a/models/silver/core/balances/silver__bals_daily.yml b/models/silver/core/balances/silver__bals_daily.yml new file mode 100644 index 0000000..0f5956b --- /dev/null +++ b/models/silver/core/balances/silver__bals_daily.yml @@ -0,0 +1,59 @@ +version: 2 + +models: + - name: silver__bals_daily + description: | + Daily balance snapshots with forward-filling. Contains one row per (address, token_address) + per day for all addresses with positive balances. + + columns: + - name: BALANCE_DATE + tests: + - not_null: + where: balance_date > current_date - 3 + + - name: ADDRESS + description: '{{ doc("address") }}' + tests: + - not_null: + where: balance_date > current_date - 3 + + - name: TOKEN_ADDRESS + description: '{{ doc("token_address") }}' + tests: + - not_null: + where: balance_date > current_date - 3 + + - name: BALANCE + tests: + - not_null: + where: balance_date > current_date - 3 + + - name: FROZEN + + - name: LAST_BALANCE_CHANGE + description: The date when the balance was last modified for this address-token combination. + + - name: BALANCE_CHANGED_ON_DATE + description: | + Boolean indicating whether the balance changed on this specific date. + TRUE = actual transaction occurred, FALSE = forward-filled from previous day. + tests: + - not_null: + where: balance_date > current_date - 3 + + - name: BALANCES_DAILY_ID + description: '{{ doc("pk") }}' + tests: + - unique: + where: balance_date > current_date - 3 + - not_null: + where: balance_date > current_date - 3 + + - name: INSERTED_TIMESTAMP + description: '{{ doc("inserted_timestamp") }}' + + - name: MODIFIED_TIMESTAMP + description: '{{ doc("modified_timestamp") }}' + + - name: _INVOCATION_ID From 0c0d2064460ef23609cd99b71caa15fa6dce20cd Mon Sep 17 00:00:00 2001 From: stanz Date: Mon, 8 Dec 2025 19:16:01 +0700 Subject: [PATCH 12/18] add is_verified col for metadata visibility --- models/gold/balances/balances__ez_balances_daily.sql | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/models/gold/balances/balances__ez_balances_daily.sql b/models/gold/balances/balances__ez_balances_daily.sql index 1b5d113..6ae0ec1 100644 --- a/models/gold/balances/balances__ez_balances_daily.sql +++ b/models/gold/balances/balances__ez_balances_daily.sql @@ -7,7 +7,8 @@ WITH end_of_day_prices AS ( SELECT token_address, hour::DATE AS price_date, - price + price, + is_verified FROM {{ ref('price__ez_prices_hourly') }} QUALIFY ROW_NUMBER() OVER ( @@ -29,6 +30,7 @@ SELECT b.frozen, b.last_balance_change, b.balance_changed_on_date, + p.is_verified AS token_is_verified, b.balances_daily_id AS ez_balances_daily_id, b.inserted_timestamp, b.modified_timestamp From 32e02b158be52f82838d0d4d5bff6d9e92cb7aef Mon Sep 17 00:00:00 2001 From: stanz Date: Mon, 8 Dec 2025 20:20:01 +0700 Subject: [PATCH 13/18] deprecate balances --- .../silver/core/balances/silver__balances.sql | 215 ------------------ .../silver/core/balances/silver__balances.yml | 53 ----- 2 files changed, 268 deletions(-) delete mode 100644 models/silver/core/balances/silver__balances.sql delete mode 100644 models/silver/core/balances/silver__balances.yml diff --git a/models/silver/core/balances/silver__balances.sql b/models/silver/core/balances/silver__balances.sql deleted file mode 100644 index f73c4c3..0000000 --- a/models/silver/core/balances/silver__balances.sql +++ /dev/null @@ -1,215 +0,0 @@ -{# - DEPRECATED: This model is superseded by silver__bals + silver__bals_daily. - Please use the new two-layer architecture instead: - - silver__bals: Raw balance changes (activity days only) - - silver__bals_daily: Daily aggregated balances with forward-fill - - This model is kept for reference but should not be run. -#} - -{{ config( - materialized = 'incremental', - unique_key = ['block_date', 'address', 'token_address'], - incremental_strategy = 'delete+insert', - merge_exclude_columns = ["inserted_timestamp"], - cluster_by = ['block_date', '_inserted_timestamp::DATE'], - post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(address, token_address);", - tags = ['deprecated'], - enabled = false -) }} - -WITH fungible_asset_balances AS ( - SELECT - c.block_number, - c.block_timestamp, - c.block_timestamp::DATE AS block_date, - c.version, - c.change_data:metadata:inner::STRING AS token_address, - c.change_data:balance::BIGINT AS post_balance, - c.change_data:frozen::BOOLEAN AS frozen, - c.address, - c.modified_timestamp, - c._inserted_timestamp - FROM {{ ref('silver__changes') }} c - WHERE c.change_module = 'fungible_asset' - AND c.change_resource = 'FungibleStore' - AND c.change_data:balance IS NOT NULL - AND c.address IS NOT NULL - {% if is_incremental() %} - AND c.modified_timestamp >= ( - SELECT MAX(modified_timestamp) FROM {{ this }} - ) - {% endif %} -), - -coin_balances AS ( - SELECT - c.block_number, - c.block_timestamp, - c.block_timestamp::DATE AS block_date, - c.version, - REPLACE(REPLACE(c.change_resource::STRING, 'CoinStore<', ''), '>', '') AS token_address, - c.change_data:coin:value::BIGINT AS post_balance, - FALSE AS frozen, - COALESCE( - c.change_data:deposit_events:guid:id:addr, - c.change_data:withdraw_events:guid:id:addr, - c.change_data:coin_amount_event:guid:id:addr - )::STRING AS address, - c.modified_timestamp, - c._inserted_timestamp - FROM {{ ref('silver__changes') }} c - WHERE c.change_module = 'coin' - AND c.change_resource LIKE 'CoinStore<%' - AND c.change_data:coin:value IS NOT NULL - AND COALESCE( - c.change_data:deposit_events:guid:id:addr, - c.change_data:withdraw_events:guid:id:addr, - c.change_data:coin_amount_event:guid:id:addr - ) IS NOT NULL - {% if is_incremental() %} - AND c.modified_timestamp >= ( - SELECT MAX(modified_timestamp) FROM {{ this }} - ) - {% endif %} -), - -all_balances AS ( - SELECT * FROM fungible_asset_balances - UNION ALL - SELECT * FROM coin_balances -), - -address_token_pairs AS ( - SELECT - address, - token_address, - MIN(block_date) AS min_date - FROM all_balances - GROUP BY address, token_address -), - -date_spine AS ( - SELECT date_day AS block_date - FROM {{ source('crosschain', 'dim_dates') }} - WHERE date_day >= '2022-10-12' - AND date_day <= CURRENT_DATE -), - -address_token_date_spine AS ( - SELECT - d.block_date, - p.address, - p.token_address - FROM address_token_pairs p - CROSS JOIN date_spine d - WHERE d.block_date >= p.min_date -), - -daily_balances AS ( - SELECT - block_date, - address, - token_address, - post_balance, - frozen, - modified_timestamp, - _inserted_timestamp - FROM all_balances - QUALIFY ROW_NUMBER() OVER ( - PARTITION BY block_date, address, token_address - ORDER BY block_number DESC, version DESC - ) = 1 -), - -forward_filled_values AS ( - SELECT - s.block_date, - s.address, - s.token_address, - LAST_VALUE(b.post_balance IGNORE NULLS) OVER ( - PARTITION BY s.address, s.token_address - ORDER BY s.block_date - ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW - ) AS post_balance, - LAST_VALUE(b.frozen IGNORE NULLS) OVER ( - PARTITION BY s.address, s.token_address - ORDER BY s.block_date - ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW - ) AS frozen, - LAST_VALUE(b.modified_timestamp IGNORE NULLS) OVER ( - PARTITION BY s.address, s.token_address - ORDER BY s.block_date - ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW - ) AS modified_timestamp, - LAST_VALUE(b._inserted_timestamp IGNORE NULLS) OVER ( - PARTITION BY s.address, s.token_address - ORDER BY s.block_date - ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW - ) AS _inserted_timestamp - FROM address_token_date_spine s - LEFT JOIN daily_balances b - ON s.block_date = b.block_date - AND s.address = b.address - AND s.token_address = b.token_address -), - -forward_filled_balances AS ( - SELECT - block_date, - address, - token_address, - post_balance, - frozen, - modified_timestamp, - _inserted_timestamp, - LAST_VALUE(CASE WHEN post_balance > 0 THEN block_date END IGNORE NULLS) OVER ( - PARTITION BY address, token_address - ORDER BY block_date - ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW - ) AS last_positive_date - FROM forward_filled_values -), - -filtered_balances AS ( - SELECT - block_date, - address, - token_address, - post_balance, - frozen, - modified_timestamp, - _inserted_timestamp - FROM forward_filled_balances - WHERE post_balance IS NOT NULL - AND ( - post_balance > 0 - OR (post_balance = 0 - AND last_positive_date IS NOT NULL - AND DATEDIFF('day', last_positive_date, block_date) <= 3) - ) -), - -verified_tokens AS ( - SELECT DISTINCT - token_address, - is_verified - FROM {{ ref('price__ez_prices_hourly') }} - WHERE is_verified = TRUE -) - -SELECT - f.block_date, - f.address, - f.token_address, - f.post_balance, - f.frozen, - COALESCE(v.is_verified, FALSE) AS is_verified, - {{ dbt_utils.generate_surrogate_key(['f.block_date', 'f.address', 'f.token_address']) }} AS balances_id, - SYSDATE() AS inserted_timestamp, - SYSDATE() AS modified_timestamp, - f._inserted_timestamp, - '{{ invocation_id }}' AS _invocation_id -FROM filtered_balances f -LEFT JOIN verified_tokens v - ON LOWER(f.token_address) = LOWER(v.token_address) diff --git a/models/silver/core/balances/silver__balances.yml b/models/silver/core/balances/silver__balances.yml deleted file mode 100644 index 9db9133..0000000 --- a/models/silver/core/balances/silver__balances.yml +++ /dev/null @@ -1,53 +0,0 @@ -version: 2 - -models: - - name: silver__balances - description: | - DEPRECATED: This model is superseded by silver__bals + silver__bals_daily. - Please use the new two-layer architecture instead. - - Original description: Daily balance snapshots for ALL Aptos fungible assets (coins and fungible assets). - - tests: - - dbt_constraints.primary_key: - column_name: BALANCES_ID - - columns: - - name: BLOCK_DATE - - - name: ADDRESS - description: '{{ doc("address") }}' - - - name: TOKEN_ADDRESS - description: '{{ doc("token_address") }}' - - - name: POST_BALANCE - - - name: FROZEN - description: | - Account freeze status (FungibleAsset-specific field). - - TRUE: Account/store is frozen (cannot transfer) - - FALSE: Account is active - - For Coin standard balances: Always FALSE (Coin doesn't support freezing) - - - name: IS_VERIFIED - description: | - Whether the token has verified price data. - - TRUE: Token exists in price__ez_prices_hourly with is_verified = TRUE - - FALSE: Token has no verified price data - - - name: BALANCES_ID - description: '{{ doc("pk") }}' - tests: - - unique: - where: block_date > current_date - 3 - - - name: INSERTED_TIMESTAMP - description: '{{ doc("inserted_timestamp") }}' - - - name: MODIFIED_TIMESTAMP - description: '{{ doc("modified_timestamp") }}' - - - name: _INSERTED_TIMESTAMP - - - name: _INVOCATION_ID From 2e745beff5b159991598ff3a02fcd0e846e79b91 Mon Sep 17 00:00:00 2001 From: stanz Date: Mon, 8 Dec 2025 22:13:04 +0700 Subject: [PATCH 14/18] modify uk --- models/silver/core/balances/silver__bals.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/models/silver/core/balances/silver__bals.sql b/models/silver/core/balances/silver__bals.sql index 5b285bc..f96475f 100644 --- a/models/silver/core/balances/silver__bals.sql +++ b/models/silver/core/balances/silver__bals.sql @@ -1,6 +1,6 @@ {{ config( materialized = 'incremental', - unique_key = [ 'version'], + unique_key = ['balances_id'], incremental_strategy = 'merge', merge_exclude_columns = ["inserted_timestamp"], cluster_by = ['modified_timestamp'], From dc8d23bd5ef242605b3d10df3ba91c55d38ce0a7 Mon Sep 17 00:00:00 2001 From: stanz Date: Sun, 14 Dec 2025 22:16:28 +0100 Subject: [PATCH 15/18] add heal model support --- models/silver/core/balances/silver__bals.sql | 102 +++++++++++++----- .../core/balances/silver__bals_daily.sql | 92 +++++++++++++++- 2 files changed, 164 insertions(+), 30 deletions(-) diff --git a/models/silver/core/balances/silver__bals.sql b/models/silver/core/balances/silver__bals.sql index f96475f..88c256a 100644 --- a/models/silver/core/balances/silver__bals.sql +++ b/models/silver/core/balances/silver__bals.sql @@ -5,18 +5,42 @@ merge_exclude_columns = ["inserted_timestamp"], cluster_by = ['modified_timestamp'], incremental_predicates = ["dynamic_range_predicate", "block_timestamp::DATE"], - tags = ['noncore', 'full_test'] + post_hook = '{{ unverify_tokens() }}', + tags = ['noncore', 'full_test', 'heal'] ) }} -- at most one record per (address, token_address) pair per day - we will get the last transaction of the day WITH verified_tokens AS ( - - SELECT - DISTINCT token_address - FROM - {{ ref('price__ez_prices_hourly') }} - WHERE - is_verified + SELECT DISTINCT token_address + FROM {{ ref('price__ez_prices_hourly') }} + WHERE is_verified ), + +{% if is_incremental() and var('HEAL_MODEL', false) %} +newly_verified_tokens AS ( + {{ get_missing_verified_tokens() }} +), +heal_balances AS ( + SELECT + C.block_number, + C.block_timestamp, + C.version, + C.change_data :metadata :inner :: STRING AS token_address, + C.change_data :balance :: bigint AS post_balance, + C.change_data :frozen :: BOOLEAN AS frozen, + C.address + FROM {{ ref('silver__changes') }} C + WHERE + block_timestamp :: DATE >= '2023-07-28' + AND C.change_module = 'fungible_asset' + AND C.change_resource = 'FungibleStore' + AND TRY_CAST(C.change_data :balance :: STRING AS bigint) IS NOT NULL + AND C.address IS NOT NULL + AND LOWER(C.change_data :metadata :inner :: STRING) IN ( + SELECT token_address FROM newly_verified_tokens + ) +), +{% endif %} + fungible_asset_balances AS ( SELECT C.block_number, @@ -45,27 +69,51 @@ AND C.modified_timestamp >= ( {{ this }} ) {% endif %} +), + +all_balances AS ( + SELECT + block_number, + block_timestamp, + version, + token_address, + post_balance, + frozen, + address + FROM fungible_asset_balances + WHERE LOWER(token_address) IN ( + SELECT LOWER(token_address) FROM verified_tokens + ) + + {% if is_incremental() and var('HEAL_MODEL', false) %} + UNION ALL + SELECT + block_number, + block_timestamp, + version, + token_address, + post_balance, + frozen, + address + FROM heal_balances + {% endif %} ) + SELECT - f.block_number, - f.block_timestamp, - f.block_timestamp :: DATE AS block_date, - f.version, - f.address, - f.token_address, - f.post_balance AS balance, - f.frozen, - {{ dbt_utils.generate_surrogate_key(['block_date', 'f.address', 'f.token_address']) }} AS balances_id, + block_number, + block_timestamp, + block_timestamp :: DATE AS block_date, + version, + address, + token_address, + post_balance AS balance, + frozen, + {{ dbt_utils.generate_surrogate_key(['block_date', 'address', 'token_address']) }} AS balances_id, SYSDATE() AS inserted_timestamp, SYSDATE() AS modified_timestamp, '{{ invocation_id }}' AS _invocation_id -FROM - fungible_asset_balances f - JOIN verified_tokens v - ON LOWER( - f.token_address - ) = LOWER( - v.token_address - ) qualify(ROW_NUMBER() over (PARTITION BY balances_id -ORDER BY - block_timestamp DESC)) = 1 +FROM all_balances +QUALIFY ROW_NUMBER() OVER ( + PARTITION BY block_timestamp :: DATE, address, token_address + ORDER BY block_timestamp DESC +) = 1 diff --git a/models/silver/core/balances/silver__bals_daily.sql b/models/silver/core/balances/silver__bals_daily.sql index 3030865..4eef0b2 100644 --- a/models/silver/core/balances/silver__bals_daily.sql +++ b/models/silver/core/balances/silver__bals_daily.sql @@ -6,9 +6,10 @@ merge_exclude_columns = ["inserted_timestamp"], post_hook = [ "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(address, token_address);", - "DELETE FROM {{ this }} WHERE balance_date < CURRENT_DATE - 95 AND DAYOFWEEK(balance_date) != 0;" + "DELETE FROM {{ this }} WHERE balance_date < CURRENT_DATE - 95 AND DAYOFWEEK(balance_date) != 0;", + "{{ unverify_tokens() }}" ], - tags = ['daily_balances'], + tags = ['daily_balances', 'heal'], full_refresh = false ) }} @@ -52,6 +53,66 @@ latest_balances_from_table AS ( ), {% endif %} +{% if is_incremental() and var('HEAL_MODEL', false) %} +newly_verified_tokens AS ( + {{ get_missing_verified_tokens() }} +), +heal_date_spine AS ( + SELECT date_day AS balance_date + FROM {{ source('crosschain', 'dim_dates') }} + WHERE date_day >= '2023-07-28' + AND date_day < SYSDATE() :: DATE +), +heal_source_balances AS ( + SELECT + block_date AS balance_date, + address, + token_address, + balance, + frozen, + block_timestamp AS last_balance_change_timestamp, + TRUE AS balance_changed_on_date + FROM {{ ref('silver__bals') }} + WHERE LOWER(token_address) IN (SELECT token_address FROM newly_verified_tokens) + QUALIFY ROW_NUMBER() OVER ( + PARTITION BY block_date, address, token_address + ORDER BY block_timestamp DESC, block_number DESC, version DESC + ) = 1 +), +heal_address_token_combinations AS ( + SELECT DISTINCT address, token_address + FROM heal_source_balances +), +heal_daily_balances AS ( + SELECT + d.balance_date, + c.address, + c.token_address, + LAST_VALUE(t.balance IGNORE NULLS) OVER ( + PARTITION BY c.address, c.token_address + ORDER BY d.balance_date + ROWS UNBOUNDED PRECEDING + ) AS balance, + LAST_VALUE(t.frozen IGNORE NULLS) OVER ( + PARTITION BY c.address, c.token_address + ORDER BY d.balance_date + ROWS UNBOUNDED PRECEDING + ) AS frozen, + LAST_VALUE(t.last_balance_change_timestamp IGNORE NULLS) OVER ( + PARTITION BY c.address, c.token_address + ORDER BY d.balance_date + ROWS UNBOUNDED PRECEDING + ) AS last_balance_change_timestamp, + CASE WHEN t.balance_date IS NOT NULL THEN TRUE ELSE FALSE END AS balance_changed_on_date + FROM heal_date_spine d + CROSS JOIN heal_address_token_combinations c + LEFT JOIN heal_source_balances t + ON d.balance_date = t.balance_date + AND c.address = t.address + AND c.token_address = t.token_address +), +{% endif %} + todays_balance_changes AS ( -- Get balance changes for dates in the date spine SELECT @@ -225,6 +286,31 @@ source_data AS ( {% endif %} ) +final_data AS ( + SELECT + balance_date, + address, + token_address, + balance, + frozen, + last_balance_change_timestamp, + balance_changed_on_date + FROM source_data + + {% if is_incremental() and var('HEAL_MODEL', false) %} + UNION ALL + SELECT + balance_date, + address, + token_address, + balance, + frozen, + last_balance_change_timestamp, + balance_changed_on_date + FROM heal_daily_balances + {% endif %} +) + SELECT balance_date, address, @@ -237,6 +323,6 @@ SELECT SYSDATE() AS inserted_timestamp, SYSDATE() AS modified_timestamp, '{{ invocation_id }}' AS _invocation_id -FROM source_data +FROM final_data WHERE balance IS NOT NULL -- Only include addresses that have had at least one balance AND balance > 0 -- Only include addresses with positive balances From 1a163ea3bf9bd05c3e6900384df2216b4d88bd05 Mon Sep 17 00:00:00 2001 From: Eric Laurello Date: Wed, 17 Dec 2025 15:22:23 -0500 Subject: [PATCH 16/18] rm the daily balance leave the deltas --- .../tables/balances__ez_balances.md | 32 ++ .../tables/balances__ez_balances_daily.md | 29 -- .../gold/balances/balances__ez_balances.sql | 85 +++++ ...es_daily.yml => balances__ez_balances.yml} | 19 +- .../balances/balances__ez_balances_daily.sql | 43 --- ...{silver__bals.sql => silver__balances.sql} | 90 +++-- ...{silver__bals.yml => silver__balances.yml} | 9 +- .../balances/silver__balances_snapshot.sql | 30 ++ .../balances/silver__balances_snapshot.yml | 48 +++ .../core/balances/silver__bals_daily.sql | 328 ------------------ .../core/balances/silver__bals_daily.yml | 59 ---- 11 files changed, 267 insertions(+), 505 deletions(-) create mode 100644 models/descriptions/tables/balances__ez_balances.md delete mode 100644 models/descriptions/tables/balances__ez_balances_daily.md create mode 100644 models/gold/balances/balances__ez_balances.sql rename models/gold/balances/{balances__ez_balances_daily.yml => balances__ez_balances.yml} (76%) delete mode 100644 models/gold/balances/balances__ez_balances_daily.sql rename models/silver/core/balances/{silver__bals.sql => silver__balances.sql} (66%) rename models/silver/core/balances/{silver__bals.yml => silver__balances.yml} (68%) create mode 100644 models/silver/core/balances/silver__balances_snapshot.sql create mode 100644 models/silver/core/balances/silver__balances_snapshot.yml delete mode 100644 models/silver/core/balances/silver__bals_daily.sql delete mode 100644 models/silver/core/balances/silver__bals_daily.yml diff --git a/models/descriptions/tables/balances__ez_balances.md b/models/descriptions/tables/balances__ez_balances.md new file mode 100644 index 0000000..6508113 --- /dev/null +++ b/models/descriptions/tables/balances__ez_balances.md @@ -0,0 +1,32 @@ +{% docs balances__ez_balances %} + +## Description +This table provides token balances for all addresses holding verified fungible assets on the Aptos blockchain. It combines a historical snapshot with recent balance changes to provide comprehensive balance data. Each row represents a unique address-token combination, with decimal-adjusted balances and USD valuations using end-of-day token prices. + +**Data Structure:** The table unions: +- Historical balances from a point-in-time snapshot (configurable via `SNAPSHOT_DATE` variable, default: 2025-09-01) +- Recent balance changes that occurred after the snapshot date + +## Key Use Cases +- Portfolio tracking and balance analysis +- Wallet wealth distribution and concentration metrics +- Token holder analysis and whale tracking +- DeFi TVL calculations and protocol health monitoring +- Point-in-time balance queries + +## Important Relationships +- Sources historical data from `silver.balances_snapshot` for the snapshot date +- Sources recent data from `silver.balances` for post-snapshot changes +- Joins to `core.dim_tokens` for token metadata (symbol, name, decimals) +- Joins to `price.ez_prices_hourly` for end-of-day USD price valuations +- Can be joined with `core.dim_labels` for address labeling and entity identification + +## Commonly-used Fields +- `balance_date`: The date of the balance record (snapshot date or block date) +- `address`: Core field for wallet-level analysis and filtering +- `token_address`: Essential for token-specific balance queries and aggregations +- `balance`: Decimal-adjusted balance for human-readable amounts +- `balance_usd`: Critical for portfolio valuation and cross-token comparisons +- `last_balance_change`: Timestamp of the most recent balance modification + +{% enddocs %} diff --git a/models/descriptions/tables/balances__ez_balances_daily.md b/models/descriptions/tables/balances__ez_balances_daily.md deleted file mode 100644 index aaef8ef..0000000 --- a/models/descriptions/tables/balances__ez_balances_daily.md +++ /dev/null @@ -1,29 +0,0 @@ -{% docs balances__ez_balances_daily %} - -## Description -This table provides daily end-of-day token balances for all addresses holding verified fungible assets on the Aptos blockchain. Each row represents a unique address-token combination for a specific date, with balances forward-filled from the last known balance change. The table includes decimal-adjusted balances and USD valuations using end-of-day token prices. - -**Data Retention:** Daily records are retained for the most recent 95 days. For data older than 95 days, only weekly snapshots (Sundays) are preserved to optimize storage while maintaining historical trend analysis capabilities. - -## Key Use Cases -- Portfolio tracking and historical balance analysis -- Wallet wealth distribution and concentration metrics -- Token holder analysis and whale tracking over time -- DeFi TVL calculations and protocol health monitoring -- Time-series analysis of address holdings - -## Important Relationships -- Sources balance data from `silver.bals_daily` which tracks daily balance snapshots -- Joins to `core.dim_tokens` for token metadata (symbol, name, decimals) -- Joins to `price.ez_prices_hourly` for end-of-day USD price valuations -- Can be joined with `core.dim_labels` for address labeling and entity identification - -## Commonly-used Fields -- `balance_date`: Primary field for time-series analysis and point-in-time balance queries -- `address`: Core field for wallet-level analysis and filtering -- `token_address`: Essential for token-specific balance queries and aggregations -- `balance`: Decimal-adjusted balance for human-readable amounts -- `balance_usd`: Critical for portfolio valuation and cross-token comparisons -- `balance_changed_on_date`: Useful for identifying active vs. stale balances - -{% enddocs %} diff --git a/models/gold/balances/balances__ez_balances.sql b/models/gold/balances/balances__ez_balances.sql new file mode 100644 index 0000000..92ec8f5 --- /dev/null +++ b/models/gold/balances/balances__ez_balances.sql @@ -0,0 +1,85 @@ +{{ config( + materialized = 'view', + tags = ['balances'] +) }} + +{# Use the same snapshot date as the silver snapshot model #} +{% set snapshot_date = var('SNAPSHOT_DATE', '2025-09-02') %} + +WITH snapshot_balances AS ( + -- Historical balances from snapshot (state as of snapshot date) + SELECT + snapshot_date AS balance_date, + address, + token_address, + balance, + frozen, + block_timestamp AS last_balance_change, + balances_snapshot_id AS ez_balances_id, + inserted_timestamp, + modified_timestamp + FROM + {{ ref('silver__balances_snapshot') }} +), + +recent_balances AS ( + -- Balances that occurred after the snapshot date + SELECT + block_date AS balance_date, + address, + token_address, + balance, + frozen, + block_timestamp AS last_balance_change, + balances_id AS ez_balances_id, + inserted_timestamp, + modified_timestamp + FROM + {{ ref('silver__balances') }} + WHERE + block_date >= '{{ snapshot_date }}'::DATE +), + +combined_balances AS ( + SELECT * FROM snapshot_balances + UNION ALL + SELECT * FROM recent_balances +), + +prices AS ( + SELECT + token_address, + hour::DATE AS price_date, + price, + is_verified + FROM + {{ ref('price__ez_prices_hourly') }} + QUALIFY ROW_NUMBER() OVER ( + PARTITION BY token_address, hour::DATE + ORDER BY hour DESC + ) = 1 +) + +SELECT + b.balance_date, + b.address, + b.token_address, + b.balance AS balance_raw, + b.balance / NULLIF(POW(10, COALESCE(t.decimals, 0)), 0) AS balance, + (b.balance / NULLIF(POW(10, COALESCE(t.decimals, 0)), 0)) * p.price AS balance_usd, + t.symbol, + t.name AS token_name, + t.decimals, + b.frozen, + b.last_balance_change, + p.is_verified AS token_is_verified, + b.ez_balances_id, + b.inserted_timestamp, + b.modified_timestamp +FROM + combined_balances b + LEFT JOIN {{ ref('core__dim_tokens') }} t + ON LOWER(b.token_address) = LOWER(t.token_address) + LEFT JOIN prices p + ON LOWER(b.token_address) = LOWER(p.token_address) + AND b.balance_date = p.price_date diff --git a/models/gold/balances/balances__ez_balances_daily.yml b/models/gold/balances/balances__ez_balances.yml similarity index 76% rename from models/gold/balances/balances__ez_balances_daily.yml rename to models/gold/balances/balances__ez_balances.yml index c2ae6cc..04f3b80 100644 --- a/models/gold/balances/balances__ez_balances_daily.yml +++ b/models/gold/balances/balances__ez_balances.yml @@ -1,15 +1,18 @@ version: 2 models: - - name: balances__ez_balances_daily - description: '{{ doc("balances__ez_balances_daily") }}' + - name: balances__ez_balances + description: '{{ doc("balances__ez_balances") }}' columns: - name: BALANCE_DATE - description: The date representing the end-of-day balance snapshot. + description: The date of the balance record. For snapshot data this is the snapshot date; for recent data this is the block date. tests: - not_null: where: balance_date > current_date - 3 + - dbt_expectations.expect_row_values_to_have_recent_data: + datepart: day + interval: 2 - name: ADDRESS description: '{{ doc("address") }}' @@ -51,18 +54,12 @@ models: where: balance_date > current_date - 3 - name: LAST_BALANCE_CHANGE - description: The date when the balance was last modified for this address-token combination. - - - name: BALANCE_CHANGED_ON_DATE - description: Boolean indicating whether the balance changed on this specific date, useful for distinguishing actual changes from forward-filled values. - tests: - - not_null: - where: balance_date > current_date - 3 + description: The timestamp when the balance was last modified for this address-token combination. - name: TOKEN_IS_VERIFIED description: Boolean indicating whether the token has been verified with a reliable price source. - - name: EZ_BALANCES_DAILY_ID + - name: EZ_BALANCES_ID description: '{{ doc("pk") }}' tests: - unique: diff --git a/models/gold/balances/balances__ez_balances_daily.sql b/models/gold/balances/balances__ez_balances_daily.sql deleted file mode 100644 index 6ae0ec1..0000000 --- a/models/gold/balances/balances__ez_balances_daily.sql +++ /dev/null @@ -1,43 +0,0 @@ -{{ config( - materialized = 'view', - tags = ['daily_balances'] -) }} - -WITH end_of_day_prices AS ( - SELECT - token_address, - hour::DATE AS price_date, - price, - is_verified - FROM - {{ ref('price__ez_prices_hourly') }} - QUALIFY ROW_NUMBER() OVER ( - PARTITION BY token_address, hour::DATE - ORDER BY hour DESC - ) = 1 -) - -SELECT - b.balance_date, - b.address, - b.token_address, - b.balance AS balance_raw, - b.balance / NULLIF(POW(10, COALESCE(t.decimals, 0)), 0) AS balance, - (b.balance / NULLIF(POW(10, COALESCE(t.decimals, 0)), 0)) * p.price AS balance_usd, - t.symbol, - t.name AS token_name, - t.decimals, - b.frozen, - b.last_balance_change, - b.balance_changed_on_date, - p.is_verified AS token_is_verified, - b.balances_daily_id AS ez_balances_daily_id, - b.inserted_timestamp, - b.modified_timestamp -FROM - {{ ref('silver__bals_daily') }} b - LEFT JOIN {{ ref('core__dim_tokens') }} t - ON LOWER(b.token_address) = LOWER(t.token_address) - LEFT JOIN end_of_day_prices p - ON LOWER(b.token_address) = LOWER(p.token_address) - AND b.balance_date = p.price_date diff --git a/models/silver/core/balances/silver__bals.sql b/models/silver/core/balances/silver__balances.sql similarity index 66% rename from models/silver/core/balances/silver__bals.sql rename to models/silver/core/balances/silver__balances.sql index 88c256a..214c636 100644 --- a/models/silver/core/balances/silver__bals.sql +++ b/models/silver/core/balances/silver__balances.sql @@ -10,15 +10,20 @@ ) }} -- at most one record per (address, token_address) pair per day - we will get the last transaction of the day WITH verified_tokens AS ( - SELECT DISTINCT token_address - FROM {{ ref('price__ez_prices_hourly') }} - WHERE is_verified + + SELECT + DISTINCT token_address + FROM + {{ ref('price__ez_prices_hourly') }} + WHERE + is_verified ), -{% if is_incremental() and var('HEAL_MODEL', false) %} -newly_verified_tokens AS ( - {{ get_missing_verified_tokens() }} -), +{% if is_incremental() and var( + 'HEAL_MODEL', + false +) %} +newly_verified_tokens AS ({{ get_missing_verified_tokens() }}), heal_balances AS ( SELECT C.block_number, @@ -28,15 +33,23 @@ heal_balances AS ( C.change_data :balance :: bigint AS post_balance, C.change_data :frozen :: BOOLEAN AS frozen, C.address - FROM {{ ref('silver__changes') }} C + FROM + {{ ref('silver__changes') }} C WHERE block_timestamp :: DATE >= '2023-07-28' AND C.change_module = 'fungible_asset' AND C.change_resource = 'FungibleStore' - AND TRY_CAST(C.change_data :balance :: STRING AS bigint) IS NOT NULL + AND TRY_CAST( + C.change_data :balance :: STRING AS bigint + ) IS NOT NULL AND C.address IS NOT NULL - AND LOWER(C.change_data :metadata :inner :: STRING) IN ( - SELECT token_address FROM newly_verified_tokens + AND LOWER( + C.change_data :metadata :inner :: STRING + ) IN ( + SELECT + token_address + FROM + newly_verified_tokens ) ), {% endif %} @@ -60,6 +73,12 @@ fungible_asset_balances AS ( C.change_data :balance :: STRING AS bigint ) IS NOT NULL AND C.address IS NOT NULL + AND LOWER(token_address) IN ( + SELECT + LOWER(token_address) + FROM + verified_tokens + ) {% if is_incremental() %} AND C.modified_timestamp >= ( @@ -70,7 +89,6 @@ AND C.modified_timestamp >= ( ) {% endif %} ), - all_balances AS ( SELECT block_number, @@ -80,25 +98,26 @@ all_balances AS ( post_balance, frozen, address - FROM fungible_asset_balances - WHERE LOWER(token_address) IN ( - SELECT LOWER(token_address) FROM verified_tokens - ) + FROM + fungible_asset_balances - {% if is_incremental() and var('HEAL_MODEL', false) %} - UNION ALL - SELECT - block_number, - block_timestamp, - version, - token_address, - post_balance, - frozen, - address - FROM heal_balances - {% endif %} +{% if is_incremental() and var( + 'HEAL_MODEL', + false +) %} +UNION ALL +SELECT + block_number, + block_timestamp, + version, + token_address, + post_balance, + frozen, + address +FROM + heal_balances +{% endif %} ) - SELECT block_number, block_timestamp, @@ -112,8 +131,11 @@ SELECT SYSDATE() AS inserted_timestamp, SYSDATE() AS modified_timestamp, '{{ invocation_id }}' AS _invocation_id -FROM all_balances -QUALIFY ROW_NUMBER() OVER ( - PARTITION BY block_timestamp :: DATE, address, token_address - ORDER BY block_timestamp DESC -) = 1 +FROM + all_balances qualify ROW_NUMBER() over ( + PARTITION BY block_timestamp :: DATE, + address, + token_address + ORDER BY + block_timestamp DESC + ) = 1 diff --git a/models/silver/core/balances/silver__bals.yml b/models/silver/core/balances/silver__balances.yml similarity index 68% rename from models/silver/core/balances/silver__bals.yml rename to models/silver/core/balances/silver__balances.yml index 9a54089..b52aced 100644 --- a/models/silver/core/balances/silver__bals.yml +++ b/models/silver/core/balances/silver__balances.yml @@ -1,19 +1,23 @@ version: 2 models: - - name: silver__bals + - name: silver__balances description: | Raw balance changes for verified tokens only. Captures the last balance state per (address, token_address) per day from fungible asset stores. columns: - name: BLOCK_NUMBER + description: The block number of the transaction that modified this balance. - name: BLOCK_TIMESTAMP + description: The timestamp of the block containing the balance change. - name: BLOCK_DATE + description: The date portion of the block timestamp, used for daily partitioning. - name: VERSION + description: '{{ doc("version") }}' tests: - not_null: where: block_date > current_date - 3 @@ -31,11 +35,13 @@ models: where: block_date > current_date - 3 - name: BALANCE + description: The raw token balance amount before decimal adjustment. tests: - not_null: where: block_date > current_date - 3 - name: FROZEN + description: Boolean indicating whether the fungible asset account is frozen and unable to transfer. - name: BALANCES_ID description: '{{ doc("pk") }}' @@ -52,3 +58,4 @@ models: description: '{{ doc("modified_timestamp") }}' - name: _INVOCATION_ID + description: The dbt invocation ID for the run that produced this record. diff --git a/models/silver/core/balances/silver__balances_snapshot.sql b/models/silver/core/balances/silver__balances_snapshot.sql new file mode 100644 index 0000000..419a548 --- /dev/null +++ b/models/silver/core/balances/silver__balances_snapshot.sql @@ -0,0 +1,30 @@ +{{ config( + materialized = 'table', + tags = ['balances_snapshot'] +) }} + +{# Set snapshot date - override with --var 'SNAPSHOT_DATE:2025-09-02' #} +{% set snapshot_date = var('SNAPSHOT_DATE', '2025-09-02') %} + +SELECT + block_number, + block_timestamp, + block_date, + version, + address, + token_address, + balance, + frozen, + '{{ snapshot_date }}'::DATE AS snapshot_date, + {{ dbt_utils.generate_surrogate_key(['address', 'token_address', "'" ~ snapshot_date ~ "'"]) }} AS balances_snapshot_id, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp, + '{{ invocation_id }}' AS _invocation_id +FROM + {{ ref('silver__balances') }} +WHERE + block_timestamp < '{{ snapshot_date }}'::TIMESTAMP +QUALIFY ROW_NUMBER() OVER ( + PARTITION BY address, token_address + ORDER BY version DESC +) = 1 diff --git a/models/silver/core/balances/silver__balances_snapshot.yml b/models/silver/core/balances/silver__balances_snapshot.yml new file mode 100644 index 0000000..3ed13ca --- /dev/null +++ b/models/silver/core/balances/silver__balances_snapshot.yml @@ -0,0 +1,48 @@ +version: 2 + +models: + - name: silver__balances_snapshot + description: | + Point-in-time snapshot of token balances for verified tokens. Contains one row per + (address, token_address) combination representing the most recent balance state + before the configured SNAPSHOT_DATE variable (default: 2025-09-02). + + columns: + - name: BLOCK_NUMBER + description: The block number of the transaction that last modified this balance. + + - name: BLOCK_TIMESTAMP + description: The timestamp of the block containing the balance change. + + - name: BLOCK_DATE + description: The date portion of the block timestamp. + + - name: VERSION + description: '{{ doc("version") }}' + + - name: ADDRESS + description: '{{ doc("address") }}' + + - name: TOKEN_ADDRESS + description: '{{ doc("token_address") }}' + + - name: BALANCE + description: The raw token balance amount before decimal adjustment. + + - name: FROZEN + description: Boolean indicating whether the fungible asset account is frozen and unable to transfer. + + - name: SNAPSHOT_DATE + description: The configured snapshot date used to filter balance records. + + - name: BALANCES_SNAPSHOT_ID + description: '{{ doc("pk") }}' + + - name: INSERTED_TIMESTAMP + description: '{{ doc("inserted_timestamp") }}' + + - name: MODIFIED_TIMESTAMP + description: '{{ doc("modified_timestamp") }}' + + - name: _INVOCATION_ID + description: The dbt invocation ID for the run that produced this record. diff --git a/models/silver/core/balances/silver__bals_daily.sql b/models/silver/core/balances/silver__bals_daily.sql deleted file mode 100644 index 4eef0b2..0000000 --- a/models/silver/core/balances/silver__bals_daily.sql +++ /dev/null @@ -1,328 +0,0 @@ -{{ config( - materialized = 'incremental', - unique_key = ['balances_daily_id'], - incremental_predicates = ["dynamic_range_predicate", "balance_date"], - cluster_by = ['balance_date'], - merge_exclude_columns = ["inserted_timestamp"], - post_hook = [ - "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(address, token_address);", - "DELETE FROM {{ this }} WHERE balance_date < CURRENT_DATE - 95 AND DAYOFWEEK(balance_date) != 0;", - "{{ unverify_tokens() }}" - ], - tags = ['daily_balances', 'heal'], - full_refresh = false -) }} - -WITH date_spine AS ( - SELECT - date_day AS balance_date - FROM - {{ source( - 'crosschain', - 'dim_dates' - ) }} - WHERE - date_day >= '2023-07-28' - AND date_day < SYSDATE() :: DATE - -{% if is_incremental() %} -AND date_day > ( - SELECT - MAX(balance_date) - FROM - {{ this }} -) - -{% endif %} -), - -{% if is_incremental() %} -latest_balances_from_table AS ( - SELECT - address, - token_address, - balance, - frozen, - last_balance_change, - balance_date - FROM {{ this }} - WHERE balance_date = ( - SELECT MAX(balance_date) - FROM {{ this }} - ) -), -{% endif %} - -{% if is_incremental() and var('HEAL_MODEL', false) %} -newly_verified_tokens AS ( - {{ get_missing_verified_tokens() }} -), -heal_date_spine AS ( - SELECT date_day AS balance_date - FROM {{ source('crosschain', 'dim_dates') }} - WHERE date_day >= '2023-07-28' - AND date_day < SYSDATE() :: DATE -), -heal_source_balances AS ( - SELECT - block_date AS balance_date, - address, - token_address, - balance, - frozen, - block_timestamp AS last_balance_change_timestamp, - TRUE AS balance_changed_on_date - FROM {{ ref('silver__bals') }} - WHERE LOWER(token_address) IN (SELECT token_address FROM newly_verified_tokens) - QUALIFY ROW_NUMBER() OVER ( - PARTITION BY block_date, address, token_address - ORDER BY block_timestamp DESC, block_number DESC, version DESC - ) = 1 -), -heal_address_token_combinations AS ( - SELECT DISTINCT address, token_address - FROM heal_source_balances -), -heal_daily_balances AS ( - SELECT - d.balance_date, - c.address, - c.token_address, - LAST_VALUE(t.balance IGNORE NULLS) OVER ( - PARTITION BY c.address, c.token_address - ORDER BY d.balance_date - ROWS UNBOUNDED PRECEDING - ) AS balance, - LAST_VALUE(t.frozen IGNORE NULLS) OVER ( - PARTITION BY c.address, c.token_address - ORDER BY d.balance_date - ROWS UNBOUNDED PRECEDING - ) AS frozen, - LAST_VALUE(t.last_balance_change_timestamp IGNORE NULLS) OVER ( - PARTITION BY c.address, c.token_address - ORDER BY d.balance_date - ROWS UNBOUNDED PRECEDING - ) AS last_balance_change_timestamp, - CASE WHEN t.balance_date IS NOT NULL THEN TRUE ELSE FALSE END AS balance_changed_on_date - FROM heal_date_spine d - CROSS JOIN heal_address_token_combinations c - LEFT JOIN heal_source_balances t - ON d.balance_date = t.balance_date - AND c.address = t.address - AND c.token_address = t.token_address -), -{% endif %} - -todays_balance_changes AS ( - -- Get balance changes for dates in the date spine - SELECT - block_date AS balance_date, - address, - token_address, - balance, - frozen, - block_timestamp, - ROW_NUMBER() OVER ( - PARTITION BY block_date, address, token_address - ORDER BY block_timestamp DESC, block_number DESC, version DESC - ) AS daily_rank - FROM {{ ref('silver__bals') }} tb - WHERE EXISTS ( - SELECT 1 FROM date_spine ds - WHERE ds.balance_date = tb.block_date - ) -), - -todays_final_balances AS ( - -- Get the last balance change per address-token_address for today - SELECT - balance_date, - address, - token_address, - balance, - frozen, - block_timestamp AS last_balance_change_timestamp, - TRUE AS balance_changed_on_date - FROM todays_balance_changes - WHERE daily_rank = 1 -), - -address_token_combinations AS ( - -- Get all unique address-token_address combinations that have ever had a balance - SELECT DISTINCT - address, - token_address - FROM todays_final_balances -), - -source_data AS ( - {% if is_incremental() %} - -- Check if processing multiple days (batch mode) - {% if execute %} - {% set max_date_query %} - SELECT MAX(balance_date) as max_date FROM {{ this }} - {% endset %} - {% set max_date = run_query(max_date_query).columns[0].values()[0] %} - {% set days_to_process = (modules.datetime.date.today() - max_date).days %} - {% set batch_size = days_to_process if days_to_process <= 60 else 60 %} - {% else %} - {% set batch_size = 1 %} - {% endif %} - - {% if batch_size > 1 %} - -- Multi-day batch: Use window functions for proper forward-filling - SELECT - d.balance_date, - COALESCE(c.address, y.address) AS address, - COALESCE(c.token_address, y.token_address) AS token_address, - -- For balance, use the most recent change within batch, or carry forward from yesterday - COALESCE( - LAST_VALUE(t.balance IGNORE NULLS) OVER ( - PARTITION BY COALESCE(c.address, y.address), COALESCE(c.token_address, y.token_address) - ORDER BY d.balance_date - ROWS UNBOUNDED PRECEDING - ), - y.balance - ) AS balance, - -- For frozen, use the most recent change within batch, or carry forward from yesterday - COALESCE( - LAST_VALUE(t.frozen IGNORE NULLS) OVER ( - PARTITION BY COALESCE(c.address, y.address), COALESCE(c.token_address, y.token_address) - ORDER BY d.balance_date - ROWS UNBOUNDED PRECEDING - ), - y.frozen - ) AS frozen, - -- For last_balance_change, we need to track the most recent change date within the batch - CASE - WHEN MAX(CASE WHEN t.balance_date IS NOT NULL THEN d.balance_date END) OVER ( - PARTITION BY COALESCE(c.address, y.address), COALESCE(c.token_address, y.token_address) - ORDER BY d.balance_date - ROWS UNBOUNDED PRECEDING - ) IS NOT NULL THEN - MAX(CASE WHEN t.balance_date IS NOT NULL THEN d.balance_date END) OVER ( - PARTITION BY COALESCE(c.address, y.address), COALESCE(c.token_address, y.token_address) - ORDER BY d.balance_date - ROWS UNBOUNDED PRECEDING - )::TIMESTAMP - ELSE y.last_balance_change::TIMESTAMP - END AS last_balance_change_timestamp, - CASE WHEN t.balance_date IS NOT NULL THEN TRUE ELSE FALSE END AS balance_changed_on_date - FROM date_spine d - CROSS JOIN ( - -- All addresses that should exist (previous + new) - SELECT address, token_address FROM latest_balances_from_table - UNION - SELECT address, token_address FROM address_token_combinations - ) c - LEFT JOIN todays_final_balances t - ON d.balance_date = t.balance_date - AND c.address = t.address - AND c.token_address = t.token_address - LEFT JOIN latest_balances_from_table y - ON c.address = y.address - AND c.token_address = y.token_address - - {% else %} - -- Single day: Use original efficient logic - SELECT - balance_date, - address, - token_address, - balance, - frozen, - last_balance_change_timestamp, - balance_changed_on_date - FROM todays_final_balances - - UNION ALL - - -- Carry forward yesterday's balances for addresses that didn't change today - SELECT - d.balance_date, - y.address, - y.token_address, - y.balance, - y.frozen, - y.last_balance_change::TIMESTAMP AS last_balance_change_timestamp, - FALSE AS balance_changed_on_date - FROM date_spine d - CROSS JOIN latest_balances_from_table y - LEFT JOIN todays_final_balances t - ON y.address = t.address - AND y.token_address = t.token_address - AND d.balance_date = t.balance_date - WHERE t.address IS NULL -- Only addresses with no changes today - {% endif %} - - {% else %} - -- Full refresh: Create complete time series with forward-filling - SELECT - d.balance_date, - c.address, - c.token_address, - LAST_VALUE(t.balance IGNORE NULLS) OVER ( - PARTITION BY c.address, c.token_address - ORDER BY d.balance_date - ROWS UNBOUNDED PRECEDING - ) AS balance, - LAST_VALUE(t.frozen IGNORE NULLS) OVER ( - PARTITION BY c.address, c.token_address - ORDER BY d.balance_date - ROWS UNBOUNDED PRECEDING - ) AS frozen, - LAST_VALUE(t.last_balance_change_timestamp IGNORE NULLS) OVER ( - PARTITION BY c.address, c.token_address - ORDER BY d.balance_date - ROWS UNBOUNDED PRECEDING - ) AS last_balance_change_timestamp, - CASE WHEN t.balance_date IS NOT NULL THEN TRUE ELSE FALSE END AS balance_changed_on_date - FROM date_spine d - CROSS JOIN address_token_combinations c - LEFT JOIN todays_final_balances t - ON d.balance_date = t.balance_date - AND c.address = t.address - AND c.token_address = t.token_address - {% endif %} -) - -final_data AS ( - SELECT - balance_date, - address, - token_address, - balance, - frozen, - last_balance_change_timestamp, - balance_changed_on_date - FROM source_data - - {% if is_incremental() and var('HEAL_MODEL', false) %} - UNION ALL - SELECT - balance_date, - address, - token_address, - balance, - frozen, - last_balance_change_timestamp, - balance_changed_on_date - FROM heal_daily_balances - {% endif %} -) - -SELECT - balance_date, - address, - token_address, - balance, - frozen, - last_balance_change_timestamp::DATE AS last_balance_change, - balance_changed_on_date, - {{ dbt_utils.generate_surrogate_key(['balance_date', 'address', 'token_address']) }} AS balances_daily_id, - SYSDATE() AS inserted_timestamp, - SYSDATE() AS modified_timestamp, - '{{ invocation_id }}' AS _invocation_id -FROM final_data -WHERE balance IS NOT NULL -- Only include addresses that have had at least one balance - AND balance > 0 -- Only include addresses with positive balances diff --git a/models/silver/core/balances/silver__bals_daily.yml b/models/silver/core/balances/silver__bals_daily.yml deleted file mode 100644 index 0f5956b..0000000 --- a/models/silver/core/balances/silver__bals_daily.yml +++ /dev/null @@ -1,59 +0,0 @@ -version: 2 - -models: - - name: silver__bals_daily - description: | - Daily balance snapshots with forward-filling. Contains one row per (address, token_address) - per day for all addresses with positive balances. - - columns: - - name: BALANCE_DATE - tests: - - not_null: - where: balance_date > current_date - 3 - - - name: ADDRESS - description: '{{ doc("address") }}' - tests: - - not_null: - where: balance_date > current_date - 3 - - - name: TOKEN_ADDRESS - description: '{{ doc("token_address") }}' - tests: - - not_null: - where: balance_date > current_date - 3 - - - name: BALANCE - tests: - - not_null: - where: balance_date > current_date - 3 - - - name: FROZEN - - - name: LAST_BALANCE_CHANGE - description: The date when the balance was last modified for this address-token combination. - - - name: BALANCE_CHANGED_ON_DATE - description: | - Boolean indicating whether the balance changed on this specific date. - TRUE = actual transaction occurred, FALSE = forward-filled from previous day. - tests: - - not_null: - where: balance_date > current_date - 3 - - - name: BALANCES_DAILY_ID - description: '{{ doc("pk") }}' - tests: - - unique: - where: balance_date > current_date - 3 - - not_null: - where: balance_date > current_date - 3 - - - name: INSERTED_TIMESTAMP - description: '{{ doc("inserted_timestamp") }}' - - - name: MODIFIED_TIMESTAMP - description: '{{ doc("modified_timestamp") }}' - - - name: _INVOCATION_ID From 07df5202231eb2b9a3454d56ef2d664360dd654f Mon Sep 17 00:00:00 2001 From: stanz Date: Tue, 6 Jan 2026 15:17:38 +0700 Subject: [PATCH 17/18] modify >= to > rm dupes --- models/gold/balances/balances__ez_balances.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/models/gold/balances/balances__ez_balances.sql b/models/gold/balances/balances__ez_balances.sql index 92ec8f5..ba99172 100644 --- a/models/gold/balances/balances__ez_balances.sql +++ b/models/gold/balances/balances__ez_balances.sql @@ -37,7 +37,7 @@ recent_balances AS ( FROM {{ ref('silver__balances') }} WHERE - block_date >= '{{ snapshot_date }}'::DATE + block_date > '{{ snapshot_date }}'::DATE ), combined_balances AS ( From 575161c63c2ba873593bd20c8ffd25162cd821d1 Mon Sep 17 00:00:00 2001 From: stanz Date: Wed, 7 Jan 2026 21:50:31 +0700 Subject: [PATCH 18/18] switch to daily runs --- models/silver/core/balances/silver__balances.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/models/silver/core/balances/silver__balances.sql b/models/silver/core/balances/silver__balances.sql index 214c636..e6e9b9b 100644 --- a/models/silver/core/balances/silver__balances.sql +++ b/models/silver/core/balances/silver__balances.sql @@ -6,7 +6,7 @@ cluster_by = ['modified_timestamp'], incremental_predicates = ["dynamic_range_predicate", "block_timestamp::DATE"], post_hook = '{{ unverify_tokens() }}', - tags = ['noncore', 'full_test', 'heal'] + tags = ['daily', 'full_test', 'heal'] ) }} -- at most one record per (address, token_address) pair per day - we will get the last transaction of the day WITH verified_tokens AS (