Add confirmed deposits logic to beacon chain ez deposits table (#1150)
Some checks failed
docs_update / called_workflow_template (push) Has been cancelled

* add confirmed logic

* config

* docs

* add recency test

* remove origin to address not null test

* fix incremental logic

* search op
This commit is contained in:
Sam 2025-10-14 22:37:45 +08:00 committed by GitHub
parent f5287d0576
commit 5573eefa7f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 316 additions and 13 deletions

View File

@ -1,12 +1,176 @@
{{ config(
materialized = 'view',
persist_docs ={ "relation": true,
"columns": true },
meta ={ 'database_tags':{ 'table':{ 'PURPOSE': 'DEFI' } } },
tags = ['gold','beacon','ez']
materialized = "incremental",
unique_key = "ez_deposits_id",
cluster_by = "block_timestamp::date",
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(pubkey, depositor)",
tags = ['gold','beacon']
) }}
WITH blocks AS (
SELECT
slot_number,
block_number
FROM
{{ ref('beacon_chain__fact_blocks') }}
{% if is_incremental() %}
WHERE
slot_number >= 11649077
AND slot_number >= (
SELECT
MIN(slot_number)
FROM
{{ this }}
WHERE
slot_number >= 11649077
AND (
NOT processed
OR block_number IS NULL
)
)
{% endif %}
),
confirmed_deposits AS (
SELECT
processed_slot,
submit_slot_number AS slot_number,
pubkey
FROM
{{ ref('silver__confirmed_deposits') }}
{# {% if is_incremental() %}
WHERE
processed_slot >= (
SELECT
MAX(processed_slot) - 7200
FROM
{{ this }}
)
{% endif %}
#}
qualify ROW_NUMBER() over (
PARTITION BY submit_slot_number,
pubkey
ORDER BY
processed_slot DESC
) = 1
),
eth_deposits AS (
SELECT
block_number,
block_timestamp,
tx_hash,
event_index,
deposit_amount,
depositor,
deposit_address,
platform_address,
contract_address,
pubkey,
withdrawal_credentials,
withdrawal_type,
withdrawal_address,
signature,
deposit_index,
eth_staking_deposits_id,
inserted_timestamp,
modified_timestamp
FROM
{{ ref('silver__eth_staking_deposits') }}
{% if is_incremental() %}
WHERE
modified_timestamp > (
SELECT
MAX(modified_timestamp)
FROM
{{ this }}
)
{% endif %}
)
{% if is_incremental() %},
processed_heal AS (
SELECT
block_number,
block_timestamp,
tx_hash,
event_index,
deposit_amount,
depositor,
deposit_address,
platform_address,
contract_address,
pubkey,
withdrawal_credentials,
withdrawal_type,
withdrawal_address,
signature,
deposit_index,
ez_deposits_id AS eth_staking_deposits_id,
inserted_timestamp,
modified_timestamp
FROM
{{ this }}
WHERE
block_number >= 22431132 -- first block for pending deposits
AND (
NOT processed
OR slot_number IS NULL
)
)
{% endif %},
combined_deposits AS (
SELECT
block_number,
block_timestamp,
tx_hash,
event_index,
deposit_amount,
depositor,
deposit_address,
platform_address,
contract_address,
pubkey,
withdrawal_credentials,
withdrawal_type,
withdrawal_address,
signature,
deposit_index,
eth_staking_deposits_id,
inserted_timestamp,
modified_timestamp
FROM
eth_deposits
{% if is_incremental() %}
UNION ALL
SELECT
block_number,
block_timestamp,
tx_hash,
event_index,
deposit_amount,
depositor,
deposit_address,
platform_address,
contract_address,
pubkey,
withdrawal_credentials,
withdrawal_type,
withdrawal_address,
signature,
deposit_index,
eth_staking_deposits_id,
inserted_timestamp,
modified_timestamp
FROM
processed_heal
{% endif %}
)
SELECT
slot_number,
block_number,
block_timestamp,
tx_hash,
@ -34,6 +198,19 @@ SELECT
withdrawal_address,
signature,
deposit_index,
IFF(
processed_slot IS NULL,
NULL,
processed_slot
) AS processed_slot,
CASE
-- min slot number for pending slot deposits
WHEN slot_number < 11649077
OR slot_number IS NULL THEN TRUE
WHEN slot_number >= 11649077
AND processed_slot IS NOT NULL THEN TRUE
ELSE FALSE
END AS processed,
COALESCE (
d.eth_staking_deposits_id,
{{ dbt_utils.generate_surrogate_key(
@ -61,8 +238,16 @@ SELECT
)
) AS modified_timestamp
FROM
{{ ref('silver__eth_staking_deposits') }}
d
combined_deposits d
LEFT JOIN blocks USING (block_number)
LEFT JOIN confirmed_deposits USING (
slot_number,
pubkey
)
LEFT JOIN {{ ref('core__dim_labels') }}
l
ON d.platform_address = l.address
ON d.platform_address = l.address qualify ROW_NUMBER() over (
PARTITION BY ez_deposits_id
ORDER BY
d.modified_timestamp DESC
) = 1

View File

@ -3,6 +3,8 @@ models:
- name: beacon_chain__ez_deposits
description: This convenience table contains information about the deposits made to the beacon chain, alongside address labels for analysis purposes. Deposit activity in this table is derived from the `DepositEvent` on the `BeaconDepositContract - 0x00000000219ab540356cbb839cbe05303d7705fa` in `ethereum.core.fact_event_logs`. For more info, please visit [The Ethereum Organization](https://ethereum.org/en/developers/docs/consensus-mechanisms/pos/).
columns:
- name: SLOT_NUMBER
description: The slot number that the deposit was made in.
- name: BLOCK_NUMBER
description: '{{ doc("general_block_number") }}'
- name: BLOCK_TIMESTAMP
@ -39,9 +41,22 @@ models:
description: The signature associated with the Validator deposit.
- name: DEPOSIT_INDEX
description: The index of the deposit.
- name: PROCESSED_SLOT
description: The slot number that the deposit was processed in.
- name: PROCESSED
description: Whether the deposit has been processed.
- name: EZ_DEPOSITS_ID
description: '{{ doc("general_pk") }}'
- name: INSERTED_TIMESTAMP
description: '{{ doc("general_inserted_timestamp") }}'
description: '{{ doc("general_inserted_timestamp") }}'
tests:
- not_null
- dbt_expectations.expect_row_values_to_have_recent_data:
datepart: day
interval: 1
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_LTZ
- TIMESTAMP_NTZ
- name: MODIFIED_TIMESTAMP
description: '{{ doc("general_modified_timestamp") }}'

View File

@ -6,6 +6,7 @@
) }}
SELECT
execution_payload :block_number :: INT AS block_number,
slot_number,
epoch_number,
IFF(

View File

@ -4,6 +4,8 @@ models:
description: '{{ doc("beacon_blocks_table_doc") }}'
columns:
- name: BLOCK_NUMBER
description: '{{ doc("general_block_number") }}'
- name: SLOT_NUMBER
description: '{{ doc("slot_number") }}'
- name: EPOCH_NUMBER

View File

@ -0,0 +1,90 @@
{{ config (
materialized = "incremental",
unique_key = "confirmed_deposits_id",
cluster_by = "ROUND(processed_slot, -3)",
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(pubkey)",
incremental_predicates = ["dynamic_range", "processed_slot"],
tags = ['silver','beacon']
) }}
WITH latest_pending AS (
SELECT
request_slot_number,
submit_slot_number,
pubkey,
signature,
withdrawal_credentials,
deposit_id,
amount
FROM
{{ ref('silver__pending_deposits') }}
WHERE
request_slot_number = (
SELECT
MAX(request_slot_number)
FROM
{{ ref('silver__pending_deposits') }}
)
AND submit_slot_number > 0 qualify ROW_NUMBER() over (
PARTITION BY deposit_id
ORDER BY
request_slot_number DESC
) = 1
),
new_pending_deposits AS (
SELECT
request_slot_number,
submit_slot_number,
pubkey,
signature,
withdrawal_credentials,
deposit_id,
amount
FROM
{{ ref('silver__pending_deposits') }}
WHERE
submit_slot_number > 0
{% if is_incremental() %}
AND request_slot_number >= (
SELECT
MAX(processed_slot)
FROM
{{ this }}
)
{% endif %}
),
processed_deposits AS (
SELECT
MAX(request_slot_number) AS processed_slot,
submit_slot_number,
pubkey,
signature,
withdrawal_credentials,
deposit_id,
amount
FROM
new_pending_deposits A
LEFT JOIN latest_pending b USING (deposit_id)
WHERE
b.deposit_id IS NULL
GROUP BY
ALL
)
SELECT
processed_slot,
submit_slot_number,
pubkey,
signature,
withdrawal_credentials,
deposit_id,
amount,
{{ dbt_utils.generate_surrogate_key(
['processed_slot', 'submit_slot_number', 'pubkey', 'signature', 'amount']
) }} AS confirmed_deposits_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
processed_deposits

View File

@ -0,0 +1,14 @@
version: 2
models:
- name: silver__confirmed_deposits
columns:
- name: INSERTED_TIMESTAMP
tests:
- not_null
- dbt_expectations.expect_row_values_to_have_recent_data:
datepart: day
interval: 1
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_LTZ
- TIMESTAMP_NTZ

View File

@ -105,10 +105,6 @@ models:
- dbt_expectations.expect_column_values_to_match_regex:
regex: 0[xX][0-9a-fA-F]+
- name: ORIGIN_TO_ADDRESS
tests:
- not_null
- dbt_expectations.expect_column_values_to_match_regex:
regex: 0[xX][0-9a-fA-F]+
- name: ORIGIN_FUNCTION_SIGNATURE
tests:
- not_null