streamline models (#1144)
Some checks failed
docs_update / called_workflow_template (push) Has been cancelled

* streamline models

* typo

* param size and start block

* start slot

* params

* state id
This commit is contained in:
Austin 2025-09-30 10:52:51 -04:00 committed by GitHub
parent ffdcf2262c
commit 3ff04354ff
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 219 additions and 1 deletions

View File

@ -16,7 +16,6 @@ models:
- FLOAT
- name: STATE_ID
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING

View File

@ -45,6 +45,7 @@ sources:
- name: confirm_blocks_v3
- name: contract_abis_v3
- name: token_reads
- name: pending_deposits
- name: crosschain_silver
database: >-
{{ 'CROSSCHAIN_DEV' if '_DEV' in target.database.upper() else 'CROSSCHAIN' }}

View File

@ -0,0 +1,9 @@
{{ config (
materialized = 'view',
tags = ['bronze_beacon_pending_deposits']
) }}
{{ v0_streamline_external_table_query(
model = "pending_deposits",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER)",
block_number = false
) }}

View File

@ -0,0 +1,15 @@
{{ config (
materialized = 'view',
tags = ['bronze_beacon_pending_deposits']
) }}
SELECT
partition_key,
VALUE :"SLOT_NUMBER" :: INT AS slot_number,
VALUE,
DATA,
metadata,
file_name,
_inserted_timestamp
FROM
{{ ref('bronze__streamline_fr_beacon_pending_deposits_v2') }}

View File

@ -0,0 +1,9 @@
{{ config (
materialized = 'view',
tags = ['bronze_beacon_pending_deposits']
) }}
{{ v0_streamline_external_table_fr_query(
model = "pending_deposits",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )",
block_number = false
) }}

View File

@ -0,0 +1,58 @@
-- depends on: {{ ref('bronze__streamline_beacon_pending_deposits') }}
{{ config (
materialized = "incremental",
unique_key = "slot_number",
cluster_by = "ROUND(slot_number, -3)",
merge_update_columns = ["slot_number"],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(slot_number)",
incremental_predicates = ["dynamic_range", "slot_number"],
tags = ['streamline_beacon_complete']
) }}
SELECT
COALESCE(
VALUE :"SLOT_NUMBER" :: INT,
metadata :request :"slot_number" :: INT,
PARSE_JSON(
metadata :request :"slot_number"
) :: INT
) AS slot_number,
file_name,
--parse slot_number from metadata for FR because it's not properly accessible in VALUE column from v1 requests
{{ dbt_utils.generate_surrogate_key(
['slot_number']
) }} AS complete_beacon_pending_deposits_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
_inserted_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
{% if is_incremental() %}
{{ ref('bronze__streamline_beacon_pending_deposits') }}
WHERE
_inserted_timestamp >= (
SELECT
COALESCE(MAX(_inserted_timestamp), '1970-01-01' :: TIMESTAMP) _inserted_timestamp
FROM
{{ this }})
AND (LEFT(
DATA :error :: STRING,
1
) <> 'F'
OR DATA :error IS NULL
)
{% else %}
{{ ref('bronze__streamline_fr_beacon_pending_deposits') }}
WHERE
(LEFT(
DATA :error :: STRING,
1
) <> 'F'
OR DATA :error IS NULL
)
{% endif %}
qualify(ROW_NUMBER() over (PARTITION BY slot_number
ORDER BY
_inserted_timestamp DESC)) = 1

View File

@ -0,0 +1,64 @@
{{ config (
materialized = "view",
post_hook = fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_rest_api_v2',
target = "{{this.schema}}.{{this.identifier}}",
params ={ "external_table" :"pending_deposits",
"sql_limit" :"1000000",
"producer_batch_size" :"1000",
"worker_batch_size" :"100",
"async_concurrent_requests" :"10",
"sql_source" :"{{this.identifier}}",
"exploded_key": tojson(["data"]) }
),
tags = ['streamline_beacon_history']
) }}
WITH to_do AS (
SELECT
slot_number
FROM
{{ ref("streamline__beacon_blocks") }}
WHERE
slot_number <= (
SELECT
MIN(slot_number)
FROM
{{ ref("beacon_chain__fact_blocks") }}
WHERE
slot_timestamp >= DATEADD(hour, -12, SYSDATE())
)
and slot_number >= 11649025
EXCEPT
SELECT
slot_number
FROM
{{ ref("streamline__complete_beacon_pending_deposits") }}
WHERE
slot_number <= (
SELECT
MIN(slot_number)
FROM
{{ ref("beacon_chain__fact_blocks") }}
WHERE
slot_timestamp >= DATEADD(hour, -12, SYSDATE())
)
)
SELECT
slot_number,
ROUND(slot_number, -3) AS partition_key,
{{ target.database }}.live.udf_api(
'GET',
'{service}/{Authentication}/eth/v1/beacon/states/' || slot_number || '/pending_deposits',
OBJECT_CONSTRUCT(
'accept', 'application/json',
'fsc-quantum-state', 'streamline'
),
{},
'vault/prod/ethereum/quicknode/mainnet'
) AS request
FROM
to_do
ORDER BY
slot_number DESC
LIMIT 1000000

View File

@ -0,0 +1,63 @@
{{ config (
materialized = "view",
post_hook = fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_rest_api_v2',
target = "{{this.schema}}.{{this.identifier}}",
params ={ "external_table" :"pending_deposits",
"sql_limit" :"5000",
"producer_batch_size" :"2480",
"worker_batch_size" :"1240",
"async_concurrent_requests" :"10",
"sql_source" :"{{this.identifier}}",
"exploded_key": tojson(["data"]) }
),
tags = ['streamline_beacon_realtime']
) }}
WITH to_do AS (
SELECT
slot_number
FROM
{{ ref("streamline__beacon_blocks") }}
WHERE
slot_number > (
SELECT
MIN(slot_number)
FROM
{{ ref("beacon_chain__fact_blocks") }}
WHERE
slot_timestamp >= DATEADD(day, -3, SYSDATE())
)
and slot_number >= 11649025
EXCEPT
SELECT
slot_number
FROM
{{ ref("streamline__complete_beacon_pending_deposits") }}
WHERE
slot_number > (
SELECT
MIN(slot_number)
FROM
{{ ref("beacon_chain__fact_blocks") }}
WHERE
slot_timestamp >= DATEADD(day, -3, SYSDATE())
)
)
SELECT
slot_number,
ROUND(slot_number, -3) AS partition_key,
{{ target.database }}.live.udf_api(
'GET',
'{service}/{Authentication}/eth/v1/beacon/states/' || slot_number || '/pending_deposits',
OBJECT_CONSTRUCT(
'accept', 'application/json',
'fsc-quantum-state', 'streamline'
),
{},
'vault/prod/ethereum/quicknode/mainnet'
) AS request
FROM
to_do
ORDER BY
slot_number DESC