AN-5258/eth-balance-diffs-optimize-and-bug (#1009)

* freq table + modified bal diffs

* research docs

* add block number filter

* delete test models add eth diff

* no cache hints

* merge optimizations

* fix incremental to account for non-chron block loads

* config updates, move to 25 hour look back

* update to final

* update sources

* remove no cache

* add no filter

* add back <>

* add late arriving balance test

* add test to recent balances

* remove comments

* fix war

* format

* change test

* add group by

* remove test

* v2 models

* format and pause tests

* re enable tests

* format

* format

* format final

* update to final

* update test ref

* format macro
This commit is contained in:
Matt Romano 2025-02-18 12:15:42 -07:00 committed by GitHub
parent cbcafa3a63
commit 50813e29f3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 491 additions and 1 deletions

View File

@ -0,0 +1,78 @@
{% test out_of_order_balance_diffs(model) %}
WITH ordered_balances AS (
SELECT
t.block_number,
t.block_timestamp,
t.address,
{% if model.identifier == 'token_balance_diffs_recent' %}
t.contract_address,
{% endif %}
t.prev_bal_unadj,
t.current_bal_unadj,
t._inserted_timestamp,
t.id,
COALESCE(LAG(t.current_bal_unadj) ignore nulls over(PARTITION BY t.address
{% if model.identifier == 'token_balance_diffs_recent' %}
, t.contract_address
{% endif %}
ORDER BY
t.block_number ASC), 0) AS actual_previous_balance,
CASE
WHEN LAG(
t.current_bal_unadj
) over(
PARTITION BY t.address
{% if model.identifier == 'token_balance_diffs_recent' %},
t.contract_address
{% endif %}
ORDER BY
t.block_number ASC
) IS NULL THEN TRUE
ELSE FALSE
END AS is_first_record
FROM
{{ model }}
t
)
SELECT
block_number,
block_timestamp,
address,
{% if model.identifier == 'token_balance_diffs_recent' %}
contract_address,
{% endif %}
prev_bal_unadj,
actual_previous_balance,
current_bal_unadj,
_inserted_timestamp,
id,
ABS(
prev_bal_unadj - actual_previous_balance
) AS difference,
is_first_record
FROM
ordered_balances
WHERE
prev_bal_unadj != actual_previous_balance
AND actual_previous_balance IS NOT NULL
AND is_first_record = FALSE
GROUP BY
block_number,
block_timestamp,
address,
{% if model.identifier == 'token_balance_diffs_recent' %}
contract_address,
{% endif %}
prev_bal_unadj,
actual_previous_balance,
current_bal_unadj,
_inserted_timestamp,
id,
difference,
is_first_record
HAVING
COUNT(*) > 0 {% endtest %}

View File

@ -0,0 +1,31 @@
{{ config(
materialized = 'incremental',
unique_key = ['address'],
cluster_by = ['block_timestamp::date','address'],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(address)",
tags = ['curated']
) }}
SELECT
block_number,
block_timestamp,
address,
current_bal_unadj,
_inserted_timestamp
FROM
{{ source('ethereum_silver', 'eth_balance_diffs') }}
WHERE
_inserted_timestamp <= SYSDATE() - INTERVAL '1 day'
{% if is_incremental() %}
AND _inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp)
FROM
{{ this }}
)
{% endif %}
QUALIFY MAX(block_number) OVER (
PARTITION BY address
) = block_number

View File

@ -133,4 +133,4 @@ ON address = min_address
AND block_number >= min_block
{% endif %}
WHERE
prev_bal_unadj <> current_bal_unadj
prev_bal_unadj <> current_bal_unadj

View File

@ -0,0 +1,164 @@
-- depends_on: {{ ref('silver__eth_balance_address_blocks') }}
{{ config(
materialized = 'incremental',
unique_key = 'id',
cluster_by = ['block_timestamp::date'],
merge_exclude_columns = ["inserted_timestamp"],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION",
tags = ['curated']
) }}
WITH base_table AS (
SELECT
block_number,
block_timestamp,
address,
balance,
_inserted_timestamp
FROM
{{ ref('silver__eth_balances') }}
{% if is_incremental() %}
WHERE
_inserted_timestamp >= (
SELECT
MAX(
_inserted_timestamp
)
FROM
{{ this }}
)
{% endif %}
)
{% if is_incremental() %},
all_records AS (
-- pulls older record table
SELECT
block_number,
block_timestamp,
address,
current_bal_unadj AS balance,
_inserted_timestamp
FROM
{{ ref('silver__eth_balance_address_blocks') }}
WHERE
address IN (
SELECT
DISTINCT address
FROM
base_table
)
UNION ALL
-- pulls balances as usual but with only 25 hour look back to account for non-chronological blocks
SELECT
A.block_number,
A.block_timestamp,
A.address,
A.balance,
A._inserted_timestamp
FROM
{{ ref('silver__eth_balances') }} A
WHERE
address IN (
SELECT
DISTINCT address
FROM
base_table
)
AND _inserted_timestamp >= SYSDATE() - INTERVAL '25 hours'
UNION ALL
SELECT
block_number,
block_timestamp,
address,
balance,
_inserted_timestamp
FROM
base_table
),
min_record AS (
SELECT
address AS min_address,
MIN(block_number) AS min_block
FROM
base_table
GROUP BY
1
),
update_records AS (
SELECT
block_number,
block_timestamp,
address,
balance,
_inserted_timestamp
FROM
all_records
INNER JOIN min_record
ON address = min_address
AND block_number >= min_block
UNION ALL
SELECT
block_number,
block_timestamp,
address,
balance,
_inserted_timestamp
FROM
all_records
INNER JOIN min_record
ON address = min_address
AND block_number < min_block
),
incremental AS (
SELECT
block_number,
block_timestamp,
address,
balance,
_inserted_timestamp
FROM
update_records qualify(ROW_NUMBER() over (PARTITION BY address, block_number
ORDER BY
_inserted_timestamp DESC)) = 1
)
{% endif %},
FINAL AS (
SELECT
block_number,
block_timestamp,
address,
COALESCE(LAG(balance) ignore nulls over(PARTITION BY address
ORDER BY
block_number ASC), 0) AS prev_bal_unadj,
balance AS current_bal_unadj,
_inserted_timestamp,
{{ dbt_utils.generate_surrogate_key(
['block_number', 'address']
) }} AS id
{% if is_incremental() %}
FROM
incremental
{% else %}
FROM
base_table
{% endif %}
)
SELECT
f.*,
id AS eth_balance_diffs_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
FINAL f
{% if is_incremental() %}
INNER JOIN min_record
ON address = min_address
AND block_number >= min_block
{% endif %}
WHERE prev_bal_unadj <> current_bal_unadj -- this inner join filters out any records that are not in the incremental

View File

@ -0,0 +1,33 @@
{{ config(
materialized = 'incremental',
unique_key = ['address','contract_address'],
cluster_by = ['block_timestamp::date','contract_address'],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(address,contract_address)",
tags = ['curated']
) }}
SELECT
block_number,
block_timestamp,
address,
contract_address,
current_bal_unadj,
_inserted_timestamp
FROM
{{ source('ethereum_silver', 'token_balance_diffs') }}
WHERE
_inserted_timestamp <= SYSDATE() - INTERVAL '1 day'
{% if is_incremental() %}
AND _inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp)
FROM
{{ this }}
)
{% endif %}
QUALIFY MAX(block_number) OVER (
PARTITION BY address, contract_address
) = block_number

View File

@ -0,0 +1,180 @@
-- depends_on: {{ ref('silver__token_balance_address_blocks') }}
{{ config(
materialized = 'incremental',
unique_key = 'id',
cluster_by = ['block_timestamp::date'],
merge_exclude_columns = ["inserted_timestamp"],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION",
tags = ['curated']
) }}
WITH base_table AS (
SELECT
block_number,
block_timestamp,
address,
contract_address,
balance,
_inserted_timestamp
FROM
{{ ref('silver__token_balances') }}
{% if is_incremental() %}
WHERE
_inserted_timestamp >= (
SELECT
MAX(
_inserted_timestamp
)
FROM
{{ this }}
)
{% endif %}
)
{% if is_incremental() %},
all_records AS (
-- pulls older record table
SELECT
block_number,
block_timestamp,
address,
contract_address,
current_bal_unadj AS balance,
_inserted_timestamp
FROM
{{ ref('silver__token_balance_address_blocks') }} A
WHERE
address IN (
SELECT
DISTINCT address
FROM
base_table
)
UNION ALL
-- pulls balances as usual but with only 25 hour look back to account for non-chronological blocks
SELECT
A.block_number,
A.block_timestamp,
A.address,
A.contract_address,
A.balance,
A._inserted_timestamp
FROM
{{ ref('silver__token_balances') }} A
WHERE
address IN (
SELECT
DISTINCT address
FROM
base_table
)
AND _inserted_timestamp >= SYSDATE() - INTERVAL '25 hours'
UNION ALL
SELECT
block_number,
block_timestamp,
address,
contract_address,
balance,
_inserted_timestamp
FROM
base_table
),
min_record AS (
SELECT
address AS min_address,
contract_address AS min_contract,
MIN(block_number) AS min_block
FROM
base_table
GROUP BY
1,
2
),
update_records AS (
-- this gets anything in the incremental or anything newer than records in the
-- incremental from that address already in the table
SELECT
block_number,
block_timestamp,
address,
contract_address,
balance,
_inserted_timestamp
FROM
all_records
INNER JOIN min_record
ON address = min_address
AND contract_address = min_contract
AND block_number >= min_block
UNION ALL
-- old records that are not in the incremental
SELECT
block_number,
block_timestamp,
address,
contract_address,
balance,
_inserted_timestamp
FROM
all_records
INNER JOIN min_record
ON address = min_address
AND contract_address = min_contract
AND block_number < min_block
),
incremental AS (
SELECT
block_number,
block_timestamp,
address,
contract_address,
balance,
_inserted_timestamp
FROM
update_records qualify(ROW_NUMBER() over (PARTITION BY address, contract_address, block_number
ORDER BY
_inserted_timestamp DESC)) = 1
)
{% endif %},
FINAL AS (
SELECT
block_number,
block_timestamp,
address,
contract_address,
COALESCE(LAG(balance) ignore nulls over(PARTITION BY address, contract_address
ORDER BY
block_number ASC), 0) AS prev_bal_unadj,
balance AS current_bal_unadj,
_inserted_timestamp,
{{ dbt_utils.generate_surrogate_key(
['block_number', 'contract_address', 'address']
) }} AS id
{% if is_incremental() %}
FROM
incremental
{% else %}
FROM
base_table
{% endif %}
)
SELECT
f.*,
id AS token_balance_diffs_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
FINAL f
{% if is_incremental() %}
INNER JOIN min_record
ON address = min_address
AND contract_address = min_contract
AND block_number >= min_block
{% endif %}
WHERE current_bal_unadj <> prev_bal_unadj -- this inner join filters out any records that are not in the incremental

View File

@ -5,6 +5,7 @@ models:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- ID
- out_of_order_balance_diffs
columns:
- name: BLOCK_NUMBER
tests:

View File

@ -5,6 +5,7 @@ models:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- ID
- out_of_order_balance_diffs
columns:
- name: BLOCK_NUMBER
tests:

View File

@ -28,6 +28,8 @@ sources:
- name: evm_chains_20221212
- name: nft_metadata_legacy
- name: nft_collection_metadata
- name: eth_balance_diffs
- name: token_balance_diffs
- name: ethereum_share
database: "{{target.database}}"
schema: silver