pending consolidations and withdrawals (#1169)

* pending consolidations and withdrawals

* remove limits

* silver + gold

* docs
This commit is contained in:
Austin 2025-11-19 12:22:37 -05:00 committed by GitHub
parent eff4a3cfb2
commit f2d619fc49
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
22 changed files with 680 additions and 0 deletions

View File

@ -90,6 +90,8 @@ There is more information on how to use dbt docs in the last section of this doc
- [fact_attestations](https://flipsidecrypto.github.io/ethereum-models/#!/model/model.ethereum_models.beacon_chain__fact_attestations)
- [fact_deposits](https://flipsidecrypto.github.io/ethereum-models/#!/model/model.ethereum_models.beacon_chain__fact_deposits)
- [fact_pending_deposits](https://flipsidecrypto.github.io/ethereum-models/#!/model/model.ethereum_models.beacon_chain__fact_pending_deposits)
- [fact_pending_partial_withdrawals](https://flipsidecrypto.github.io/ethereum-models/#!/model/model.ethereum_models.beacon_chain__fact_pending_partial_withdrawals)
- [fact_pending_consolidations](https://flipsidecrypto.github.io/ethereum-models/#!/model/model.ethereum_models.beacon_chain__fact_pending_consolidations)
- [fact_withdrawals](https://flipsidecrypto.github.io/ethereum-models/#!/model/model.ethereum_models.beacon_chain__fact_withdrawals)
- [fact_validators](https://flipsidecrypto.github.io/ethereum-models/#!/model/model.ethereum_models.beacon_chain__fact_validators)
- [fact_validator_balances](https://flipsidecrypto.github.io/ethereum-models/#!/model/model.ethereum_models.beacon_chain__fact_validator_balances)

View File

@ -0,0 +1,18 @@
{{ config(
materialized = 'view',
persist_docs ={ "relation": true,
"columns": true },
meta ={ 'database_tags':{ 'table':{ 'PURPOSE': 'BEACON' } } },
tags = ['gold','beacon']
) }}
SELECT
request_slot_number,
source_index,
target_index,
pending_consolidations_id as fact_pending_consolidations_id,
inserted_timestamp,
modified_timestamp
FROM
{{ ref('silver__pending_consolidations') }}

View File

@ -0,0 +1,19 @@
version: 2
models:
- name: beacon_chain__fact_pending_consolidations
description: This model details the pending consolidations that are queued in the beacon chain of Ethereum.
columns:
- name: REQUEST_SLOT_NUMBER
description: The slot number that the queue was read at.
- name: SOURCE_INDEX
description: The source validator index for the consolidation.
- name: TARGET_INDEX
description: The target validator index for the consolidation.
- name: FACT_PENDING_CONSOLIDATIONS_ID
description: The unique identifier for a row.
- name: INSERTED_TIMESTAMP
description: The timestamp when the pending consolidation was inserted.
- name: MODIFIED_TIMESTAMP
description: The timestamp when the pending consolidation was modified.

View File

@ -0,0 +1,20 @@
{{ config(
materialized = 'view',
persist_docs ={ "relation": true,
"columns": true },
meta ={ 'database_tags':{ 'table':{ 'PURPOSE': 'BEACON' } } },
tags = ['gold','beacon']
) }}
SELECT
request_slot_number,
validator_index,
withdrawable_epoch,
withdrawable_epoch_timestamp,
amount / pow(10, 9) AS amount,
pending_partial_withdrawals_id as fact_pending_partial_withdrawals_id,
inserted_timestamp,
modified_timestamp
FROM
{{ ref('silver__pending_partial_withdrawals') }}

View File

@ -0,0 +1,23 @@
version: 2
models:
- name: beacon_chain__fact_pending_partial_withdrawals
description: This model details the pending partial withdrawals that are queued in the beacon chain of Ethereum.
columns:
- name: REQUEST_SLOT_NUMBER
description: The slot number that the queue was read at.
- name: VALIDATOR_INDEX
description: The validator index for the pending partial withdrawal.
- name: WITHDRAWABLE_EPOCH
description: The epoch when the withdrawal becomes withdrawable.
- name: WITHDRAWABLE_EPOCH_TIMESTAMP
description: The estimated timestamp when the withdrawal becomes withdrawable (calculated from epoch number using genesis timestamp).
- name: AMOUNT
description: The amount of the pending partial withdrawal, adjusted for the 9 decimal places (in ETH).
- name: FACT_PENDING_PARTIAL_WITHDRAWALS_ID
description: The unique identifier for a row.
- name: INSERTED_TIMESTAMP
description: The timestamp when the pending partial withdrawal was inserted.
- name: MODIFIED_TIMESTAMP
description: The timestamp when the pending partial withdrawal was modified.

View File

@ -0,0 +1,59 @@
-- depends on: {{ ref('bronze__streamline_beacon_pending_consolidations') }}
{{ config (
materialized = "incremental",
unique_key = "pending_consolidations_id",
cluster_by = "ROUND(request_slot_number, -3)",
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(pending_consolidations_id)",
incremental_predicates = ["dynamic_range", "request_slot_number"],
tags = ['silver','beacon']
) }}
SELECT
COALESCE(
VALUE :"SLOT_NUMBER" :: INT,
metadata :request :"slot_number" :: INT,
PARSE_JSON(
metadata :request :"slot_number"
) :: INT
) AS request_slot_number,
try_to_number(data:source_index::STRING) AS source_index,
try_to_number(data:target_index::STRING) AS target_index,
data,
{{ dbt_utils.generate_surrogate_key(
['request_slot_number', 'source_index', 'target_index']
) }} AS pending_consolidations_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
_inserted_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
{% if is_incremental() %}
{{ ref('bronze__streamline_beacon_pending_consolidations') }}
WHERE
_inserted_timestamp >= (
SELECT
COALESCE(MAX(_inserted_timestamp), '1970-01-01' :: TIMESTAMP) _inserted_timestamp
FROM
{{ this }})
AND (LEFT(
DATA :error :: STRING,
1
) <> 'F'
OR DATA :error IS NULL
)
{% else %}
{{ ref('bronze__streamline_fr_beacon_pending_consolidations') }}
WHERE
(LEFT(
DATA :error :: STRING,
1
) <> 'F'
OR DATA :error IS NULL
)
{% endif %}
qualify(ROW_NUMBER() over (PARTITION BY pending_consolidations_id
ORDER BY
_inserted_timestamp DESC)) = 1

View File

@ -0,0 +1,15 @@
version: 2
models:
- name: silver__pending_consolidations
columns:
- name: INSERTED_TIMESTAMP
tests:
- not_null
- dbt_expectations.expect_row_values_to_have_recent_data:
datepart: day
interval: 1
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_LTZ
- TIMESTAMP_NTZ

View File

@ -0,0 +1,65 @@
-- depends on: {{ ref('bronze__streamline_beacon_pending_partial_withdrawals') }}
{{ config (
materialized = "incremental",
unique_key = "pending_partial_withdrawals_id",
cluster_by = "ROUND(request_slot_number, -3)",
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(pending_partial_withdrawals_id)",
incremental_predicates = ["dynamic_range", "request_slot_number"],
tags = ['silver','beacon']
) }}
SELECT
COALESCE(
VALUE :"SLOT_NUMBER" :: INT,
metadata :request :"slot_number" :: INT,
PARSE_JSON(
metadata :request :"slot_number"
) :: INT
) AS request_slot_number,
try_to_number(data:amount::STRING) AS amount,
try_to_number(data:validator_index::STRING) AS validator_index,
try_to_number(data:withdrawable_epoch::STRING) AS withdrawable_epoch,
DATEADD(
'seconds',
try_to_number(data:withdrawable_epoch::STRING) * 32 * 12,
'2020-12-01T12:00:23Z' :: timestamp_ntz
) AS withdrawable_epoch_timestamp,
data,
{{ dbt_utils.generate_surrogate_key(
['request_slot_number', 'validator_index', 'withdrawable_epoch', 'amount']
) }} AS pending_partial_withdrawals_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
_inserted_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
{% if is_incremental() %}
{{ ref('bronze__streamline_beacon_pending_partial_withdrawals') }}
WHERE
_inserted_timestamp >= (
SELECT
COALESCE(MAX(_inserted_timestamp), '1970-01-01' :: TIMESTAMP) _inserted_timestamp
FROM
{{ this }})
AND (LEFT(
DATA :error :: STRING,
1
) <> 'F'
OR DATA :error IS NULL
)
{% else %}
{{ ref('bronze__streamline_fr_beacon_pending_partial_withdrawals') }}
WHERE
(LEFT(
DATA :error :: STRING,
1
) <> 'F'
OR DATA :error IS NULL
)
{% endif %}
qualify(ROW_NUMBER() over (PARTITION BY pending_partial_withdrawals_id
ORDER BY
_inserted_timestamp DESC)) = 1

View File

@ -0,0 +1,15 @@
version: 2
models:
- name: silver__pending_partial_withdrawals
columns:
- name: INSERTED_TIMESTAMP
tests:
- not_null
- dbt_expectations.expect_row_values_to_have_recent_data:
datepart: day
interval: 1
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_LTZ
- TIMESTAMP_NTZ

View File

@ -46,6 +46,8 @@ sources:
- name: contract_abis_v3
- name: token_reads
- name: pending_deposits
- name: pending_partial_withdrawals
- name: pending_consolidations
- name: balances_erc20
- name: balances_native
- name: crosschain_silver

View File

@ -0,0 +1,10 @@
{{ config (
materialized = 'view',
tags = ['bronze_beacon_pending_consolidations']
) }}
{{ v0_streamline_external_table_query(
model = "pending_consolidations",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER)",
block_number = false
) }}

View File

@ -0,0 +1,9 @@
{{ config (
materialized = 'view',
tags = ['bronze_beacon_pending_partial_withdrawals']
) }}
{{ v0_streamline_external_table_query(
model = "pending_partial_withdrawals",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER)",
block_number = false
) }}

View File

@ -0,0 +1,16 @@
{{ config (
materialized = 'view',
tags = ['bronze_beacon_pending_consolidations']
) }}
SELECT
partition_key,
VALUE :"SLOT_NUMBER" :: INT AS slot_number,
VALUE,
DATA,
metadata,
file_name,
_inserted_timestamp
FROM
{{ ref('bronze__streamline_fr_beacon_pending_consolidations_v2') }}

View File

@ -0,0 +1,10 @@
{{ config (
materialized = 'view',
tags = ['bronze_beacon_pending_consolidations']
) }}
{{ v0_streamline_external_table_fr_query(
model = "pending_consolidations",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )",
block_number = false
) }}

View File

@ -0,0 +1,15 @@
{{ config (
materialized = 'view',
tags = ['bronze_beacon_pending_partial_withdrawals']
) }}
SELECT
partition_key,
VALUE :"SLOT_NUMBER" :: INT AS slot_number,
VALUE,
DATA,
metadata,
file_name,
_inserted_timestamp
FROM
{{ ref('bronze__streamline_fr_beacon_pending_partial_withdrawals_v2') }}

View File

@ -0,0 +1,9 @@
{{ config (
materialized = 'view',
tags = ['bronze_beacon_pending_partial_withdrawals']
) }}
{{ v0_streamline_external_table_fr_query(
model = "pending_partial_withdrawals",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )",
block_number = false
) }}

View File

@ -0,0 +1,59 @@
-- depends on: {{ ref('bronze__streamline_beacon_pending_consolidations') }}
{{ config (
materialized = "incremental",
unique_key = "slot_number",
cluster_by = "ROUND(slot_number, -3)",
merge_update_columns = ["slot_number"],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(slot_number)",
incremental_predicates = ["dynamic_range", "slot_number"],
tags = ['streamline_beacon_complete']
) }}
SELECT
COALESCE(
VALUE :"SLOT_NUMBER" :: INT,
metadata :request :"slot_number" :: INT,
PARSE_JSON(
metadata :request :"slot_number"
) :: INT
) AS slot_number,
file_name,
--parse slot_number from metadata for FR because it's not properly accessible in VALUE column from v1 requests
{{ dbt_utils.generate_surrogate_key(
['slot_number']
) }} AS complete_beacon_pending_consolidations_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
_inserted_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
{% if is_incremental() %}
{{ ref('bronze__streamline_beacon_pending_consolidations') }}
WHERE
_inserted_timestamp >= (
SELECT
COALESCE(MAX(_inserted_timestamp), '1970-01-01' :: TIMESTAMP) _inserted_timestamp
FROM
{{ this }})
AND (LEFT(
DATA :error :: STRING,
1
) <> 'F'
OR DATA :error IS NULL
)
{% else %}
{{ ref('bronze__streamline_fr_beacon_pending_consolidations') }}
WHERE
(LEFT(
DATA :error :: STRING,
1
) <> 'F'
OR DATA :error IS NULL
)
{% endif %}
qualify(ROW_NUMBER() over (PARTITION BY slot_number
ORDER BY
_inserted_timestamp DESC)) = 1

View File

@ -0,0 +1,58 @@
-- depends on: {{ ref('bronze__streamline_beacon_pending_partial_withdrawals') }}
{{ config (
materialized = "incremental",
unique_key = "slot_number",
cluster_by = "ROUND(slot_number, -3)",
merge_update_columns = ["slot_number"],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(slot_number)",
incremental_predicates = ["dynamic_range", "slot_number"],
tags = ['streamline_beacon_complete']
) }}
SELECT
COALESCE(
VALUE :"SLOT_NUMBER" :: INT,
metadata :request :"slot_number" :: INT,
PARSE_JSON(
metadata :request :"slot_number"
) :: INT
) AS slot_number,
file_name,
--parse slot_number from metadata for FR because it's not properly accessible in VALUE column from v1 requests
{{ dbt_utils.generate_surrogate_key(
['slot_number']
) }} AS complete_beacon_pending_partial_withdrawals_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
_inserted_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
{% if is_incremental() %}
{{ ref('bronze__streamline_beacon_pending_partial_withdrawals') }}
WHERE
_inserted_timestamp >= (
SELECT
COALESCE(MAX(_inserted_timestamp), '1970-01-01' :: TIMESTAMP) _inserted_timestamp
FROM
{{ this }})
AND (LEFT(
DATA :error :: STRING,
1
) <> 'F'
OR DATA :error IS NULL
)
{% else %}
{{ ref('bronze__streamline_fr_beacon_pending_partial_withdrawals') }}
WHERE
(LEFT(
DATA :error :: STRING,
1
) <> 'F'
OR DATA :error IS NULL
)
{% endif %}
qualify(ROW_NUMBER() over (PARTITION BY slot_number
ORDER BY
_inserted_timestamp DESC)) = 1

View File

@ -0,0 +1,65 @@
{{ config (
materialized = "view",
post_hook = fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_rest_api_v2',
target = "{{this.schema}}.{{this.identifier}}",
params ={ "external_table" :"pending_consolidations",
"sql_limit" :"1000000",
"producer_batch_size" :"1000",
"worker_batch_size" :"100",
"async_concurrent_requests" :"10",
"sql_source" :"{{this.identifier}}",
"exploded_key": tojson(["data"]) }
),
tags = ['streamline_beacon_history']
) }}
WITH to_do AS (
SELECT
slot_number
FROM
{{ ref("streamline__beacon_blocks") }}
WHERE
slot_number <= (
SELECT
MIN(slot_number)
FROM
{{ ref("beacon_chain__fact_blocks") }}
WHERE
slot_timestamp >= DATEADD(hour, -12, SYSDATE())
)
and slot_number >= 11649025
EXCEPT
SELECT
slot_number
FROM
{{ ref("streamline__complete_beacon_pending_consolidations") }}
WHERE
slot_number <= (
SELECT
MIN(slot_number)
FROM
{{ ref("beacon_chain__fact_blocks") }}
WHERE
slot_timestamp >= DATEADD(hour, -12, SYSDATE())
)
)
SELECT
slot_number,
ROUND(slot_number, -3) AS partition_key,
{{ target.database }}.live.udf_api(
'GET',
'{service}/{Authentication}/eth/v1/beacon/states/' || slot_number || '/pending_consolidations',
OBJECT_CONSTRUCT(
'accept', 'application/json',
'fsc-quantum-state', 'streamline'
),
{},
'vault/prod/ethereum/quicknode/mainnet'
) AS request
FROM
to_do
ORDER BY
slot_number DESC
LIMIT 1000000

View File

@ -0,0 +1,65 @@
{{ config (
materialized = "view",
post_hook = fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_rest_api_v2',
target = "{{this.schema}}.{{this.identifier}}",
params ={ "external_table" :"pending_partial_withdrawals",
"sql_limit" :"1000000",
"producer_batch_size" :"1000",
"worker_batch_size" :"100",
"async_concurrent_requests" :"10",
"sql_source" :"{{this.identifier}}",
"exploded_key": tojson(["data"]) }
),
tags = ['streamline_beacon_history']
) }}
WITH to_do AS (
SELECT
slot_number
FROM
{{ ref("streamline__beacon_blocks") }}
WHERE
slot_number <= (
SELECT
MIN(slot_number)
FROM
{{ ref("beacon_chain__fact_blocks") }}
WHERE
slot_timestamp >= DATEADD(hour, -12, SYSDATE())
)
and slot_number >= 11649025
EXCEPT
SELECT
slot_number
FROM
{{ ref("streamline__complete_beacon_pending_partial_withdrawals") }}
WHERE
slot_number <= (
SELECT
MIN(slot_number)
FROM
{{ ref("beacon_chain__fact_blocks") }}
WHERE
slot_timestamp >= DATEADD(hour, -12, SYSDATE())
)
)
SELECT
slot_number,
ROUND(slot_number, -3) AS partition_key,
{{ target.database }}.live.udf_api(
'GET',
'{service}/{Authentication}/eth/v1/beacon/states/' || slot_number || '/pending_partial_withdrawals',
OBJECT_CONSTRUCT(
'accept', 'application/json',
'fsc-quantum-state', 'streamline'
),
{},
'vault/prod/ethereum/quicknode/mainnet'
) AS request
FROM
to_do
ORDER BY
slot_number DESC
LIMIT 1000000

View File

@ -0,0 +1,63 @@
{{ config (
materialized = "view",
post_hook = fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_rest_api_v2',
target = "{{this.schema}}.{{this.identifier}}",
params ={ "external_table" :"pending_consolidations",
"sql_limit" :"5000",
"producer_batch_size" :"2480",
"worker_batch_size" :"1240",
"async_concurrent_requests" :"10",
"sql_source" :"{{this.identifier}}",
"exploded_key": tojson(["data"]) }
),
tags = ['streamline_beacon_realtime']
) }}
WITH to_do AS (
SELECT
slot_number
FROM
{{ ref("streamline__beacon_blocks") }}
WHERE
slot_number > (
SELECT
MIN(slot_number)
FROM
{{ ref("beacon_chain__fact_blocks") }}
WHERE
slot_timestamp >= DATEADD(day, -3, SYSDATE())
)
and slot_number >= 11649025
EXCEPT
SELECT
slot_number
FROM
{{ ref("streamline__complete_beacon_pending_consolidations") }}
WHERE
slot_number > (
SELECT
MIN(slot_number)
FROM
{{ ref("beacon_chain__fact_blocks") }}
WHERE
slot_timestamp >= DATEADD(day, -3, SYSDATE())
)
)
SELECT
slot_number,
ROUND(slot_number, -3) AS partition_key,
{{ target.database }}.live.udf_api(
'GET',
'{service}/{Authentication}/eth/v1/beacon/states/' || slot_number || '/pending_consolidations',
OBJECT_CONSTRUCT(
'accept', 'application/json',
'fsc-quantum-state', 'streamline'
),
{},
'vault/prod/ethereum/quicknode/mainnet'
) AS request
FROM
to_do
ORDER BY
slot_number DESC

View File

@ -0,0 +1,63 @@
{{ config (
materialized = "view",
post_hook = fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_rest_api_v2',
target = "{{this.schema}}.{{this.identifier}}",
params ={ "external_table" :"pending_partial_withdrawals",
"sql_limit" :"5000",
"producer_batch_size" :"2480",
"worker_batch_size" :"1240",
"async_concurrent_requests" :"10",
"sql_source" :"{{this.identifier}}",
"exploded_key": tojson(["data"]) }
),
tags = ['streamline_beacon_realtime']
) }}
WITH to_do AS (
SELECT
slot_number
FROM
{{ ref("streamline__beacon_blocks") }}
WHERE
slot_number > (
SELECT
MIN(slot_number)
FROM
{{ ref("beacon_chain__fact_blocks") }}
WHERE
slot_timestamp >= DATEADD(day, -3, SYSDATE())
)
and slot_number >= 11649025
EXCEPT
SELECT
slot_number
FROM
{{ ref("streamline__complete_beacon_pending_partial_withdrawals") }}
WHERE
slot_number > (
SELECT
MIN(slot_number)
FROM
{{ ref("beacon_chain__fact_blocks") }}
WHERE
slot_timestamp >= DATEADD(day, -3, SYSDATE())
)
)
SELECT
slot_number,
ROUND(slot_number, -3) AS partition_key,
{{ target.database }}.live.udf_api(
'GET',
'{service}/{Authentication}/eth/v1/beacon/states/' || slot_number || '/pending_partial_withdrawals',
OBJECT_CONSTRUCT(
'accept', 'application/json',
'fsc-quantum-state', 'streamline'
),
{},
'vault/prod/ethereum/quicknode/mainnet'
) AS request
FROM
to_do
ORDER BY
slot_number DESC