add phase 1 models

This commit is contained in:
Mike Stepanovic 2025-05-01 13:42:10 -06:00
parent 7d6778ff4f
commit a0a2f1925d
16 changed files with 494 additions and 0 deletions

View File

@ -0,0 +1,8 @@
{{ config (
materialized = 'view',
tags = ['bronze','core','phase_1']
) }}
{{ streamline_external_table_query(
source_name = 'blocks'
) }}

View File

@ -0,0 +1,8 @@
{{ config (
materialized = 'view',
tags = ['bronze','core','phase_1']
) }}
{{ streamline_external_table_query(
source_name = 'transactions'
) }}

View File

@ -0,0 +1,8 @@
{{ config (
materialized = 'view',
tags = ['bronze','core','phase_1']
) }}
{{ streamline_external_table_query(
source_name = 'tx_counts'
) }}

View File

@ -0,0 +1,8 @@
{{ config (
materialized = 'view',
tags = ['bronze','core','phase_1']
) }}
{{ streamline_external_table_query_fr(
source_name = 'blocks'
) }}

View File

@ -0,0 +1,8 @@
{{ config (
materialized = 'view',
tags = ['bronze','core','phase_1']
) }}
{{ streamline_external_table_query_fr(
source_name = 'transactions'
) }}

View File

@ -0,0 +1,8 @@
{{ config (
materialized = 'view',
tags = ['bronze','core','phase_1']
) }}
{{ streamline_external_table_query_fr(
source_name = 'tx_counts'
) }}

View File

@ -0,0 +1,40 @@
{# Get variables #}
{% set vars = return_vars() %}
-- depends_on: {{ ref('bronze__streamline_blocks') }}
{{ config (
materialized = "incremental",
incremental_strategy = 'merge',
unique_key = "block_number",
cluster_by = "ROUND(block_number, -3)",
merge_exclude_columns = ["inserted_timestamp"],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(block_number)"
) }}
SELECT
DATA :result :block :header :height :: INT AS block_number,
{{ dbt_utils.generate_surrogate_key(
['block_number']
) }} AS complete_blocks_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
file_name,
'{{ invocation_id }}' AS _invocation_id
FROM
{% if is_incremental() %}
{{ ref('bronze__streamline_blocks') }}
WHERE
inserted_timestamp >= (
SELECT
MAX(modified_timestamp) modified_timestamp
FROM
{{ this }}
)
{% else %}
{{ ref('bronze__streamline_blocks_fr') }}
{% endif %}
qualify(ROW_NUMBER() over (PARTITION BY block_number
ORDER BY
inserted_timestamp DESC)) = 1

View File

@ -0,0 +1,53 @@
{# Get variables #}
{% set vars = return_vars() %}
-- depends_on: {{ ref('bronze__streamline_transactions') }}
{{ config (
materialized = "incremental",
incremental_strategy = 'merge',
unique_key = "complete_transactions_id",
cluster_by = "ROUND(block_number, -3)",
merge_exclude_columns = ["inserted_timestamp"],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(block_number)"
) }}
SELECT
COALESCE(
VALUE :BLOCK_NUMBER_REQUESTED,
DATA :height,
VALUE :data :result :txs [0] :height
) :: INT AS block_number,
COALESCE(
VALUE :PAGE_NUMBER,
metadata :request :params [2]
) :: INT AS page_number,
{{ dbt_utils.generate_surrogate_key(
['block_number','page_number']
) }} AS complete_transactions_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
file_name,
'{{ invocation_id }}' AS _invocation_id
FROM
{% if is_incremental() %}
{{ ref('bronze__streamline_transactions') }}
{% else %}
{{ ref('bronze__streamline_transactions_fr') }}
{% endif %}
WHERE
DATA <> '[]'
{% if is_incremental() %}
AND inserted_timestamp >= (
SELECT
MAX(modified_timestamp) modified_timestamp
FROM
{{ this }}
)
{% endif %}
qualify(ROW_NUMBER() over (PARTITION BY complete_transactions_id
ORDER BY
inserted_timestamp DESC)) = 1

View File

@ -0,0 +1,43 @@
{# Get variables #}
{% set vars = return_vars() %}
-- depends_on: {{ ref('bronze__streamline_tx_counts') }}
{{ config (
materialized = "incremental",
incremental_strategy = 'merge',
unique_key = "block_number",
cluster_by = "ROUND(block_number, -3)",
merge_exclude_columns = ["inserted_timestamp"],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(block_number)"
) }}
SELECT
VALUE :BLOCK_NUMBER :: INT AS block_number,
DATA :result :total_count :: INT AS tx_count,
{{ dbt_utils.generate_surrogate_key(
['block_number']
) }} AS complete_tx_counts_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
file_name,
'{{ invocation_id }}' AS _invocation_id
FROM
{% if is_incremental() %}
{{ ref('bronze__streamline_tx_counts') }}
WHERE
inserted_timestamp >= (
SELECT
MAX(modified_timestamp) modified_timestamp
FROM
{{ this }}
)
AND block_number NOT IN (21208991)
{% else %}
{{ ref('bronze__streamline_tx_counts_fr') }}
{% endif %}
qualify(ROW_NUMBER() over (PARTITION BY block_number
ORDER BY
inserted_timestamp DESC)) = 1

View File

@ -0,0 +1,65 @@
{% set vars = return_vars() %}
{{ config (
materialized = 'view',
tags = ['streamline','core','realtime','phase_1']
) }}
WITH blocks AS (
SELECT
block_number
FROM
{{ ref('streamline__blocks') }}
EXCEPT
SELECT
block_number
FROM
{{ ref('streamline__blocks_complete') }}
)
SELECT
ROUND(block_number, -4) :: INT AS partition_key,
{{ target.database }}.live.udf_api(
'POST',
'{{ vars.GLOBAL_NODE_URL }}',
OBJECT_CONSTRUCT(
'Content-Type', 'application/json',
'fsc-quantum-state', 'streamline'
),
OBJECT_CONSTRUCT(
'id', block_number,
'jsonrpc', '2.0',
'method', 'block',
'params', ARRAY_CONSTRUCT(block_number :: STRING)
),
'{{ vars.GLOBAL_NODE_VAULT_PATH }}'
) AS request
FROM
blocks
ORDER BY
block_number
LIMIT {{ vars.MAIN_SL_BLOCKS_REALTIME_SQL_LIMIT }}
{# Streamline Function Call #}
{% if execute %}
{% set params = {
'external_table': 'blocks',
'sql_limit': vars.MAIN_SL_BLOCKS_REALTIME_SQL_LIMIT,
'producer_batch_size': vars.MAIN_SL_BLOCKS_REALTIME_PRODUCER_BATCH_SIZE,
'worker_batch_size': vars.MAIN_SL_BLOCKS_REALTIME_WORKER_BATCH_SIZE,
'async_concurrent_requests': vars.MAIN_SL_BLOCKS_REALTIME_ASYNC_CONCURRENT_REQUESTS,
'sql_source': "{{this.identifier}}"
} %}
{% set function_call_sql %}
{{ fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_rest_api_v2',
target = '{{this.schema}}.{{this.identifier}}',
params = params
) }}
{% endset %}
{% do run_query(function_call_sql) %}
{{ log("Streamline function call: " ~ function_call_sql, info=true) }}
{% endif %}

View File

@ -0,0 +1,104 @@
WITH blocks AS (
SELECT
A.block_number,
tx_count
FROM
{{ ref('streamline__tx_counts_complete') }} A
WHERE
tx_count > 0
),
numbers AS (
-- Recursive CTE to generate numbers. We'll use the maximum txcount value to limit our recursion.
SELECT
1 AS n
UNION ALL
SELECT
n + 1
FROM
numbers
WHERE
n < (
SELECT
CEIL(MAX(tx_count) / 100.0)
FROM
blocks)
),
blocks_with_page_numbers AS (
SELECT
tt.block_number :: INT AS block_number,
n.n AS page_number
FROM
blocks tt
JOIN numbers n
ON n.n <= CASE
WHEN tt.tx_count % 100 = 0 THEN tt.tx_count / 100
ELSE FLOOR(
tt.tx_count / 100
) + 1
END
EXCEPT
SELECT
block_number,
page_number
FROM
{{ ref('streamline__transactions_complete') }}
)
SELECT
ROUND(
block_number,
-3
) :: INT AS partition_key,
{{ target.database }}.live.udf_api(
'POST',
'{{ vars.GLOBAL_NODE_URL }}',
OBJECT_CONSTRUCT(
'Content-Type', 'application/json',
'fsc-quantum-state', 'streamline'
),
OBJECT_CONSTRUCT(
'id', block_number,
'jsonrpc', '2.0',
'method', 'tx_search',
'params', ARRAY_CONSTRUCT(
'tx.height=' || block_number :: STRING,
TRUE,
page_number :: STRING,
'100',
'asc'
)
),
'{{ vars.GLOBAL_NODE_VAULT_PATH }}'
) AS request,
page_number,
block_number AS block_number_requested
FROM
blocks_with_page_numbers
ORDER BY
block_number
LIMIT {{ vars.MAIN_SL_TRANSACTIONS_REALTIME_SQL_LIMIT }}
{# Streamline Function Call #}
{% if execute %}
{% set params = {
'external_table': 'transactions',
'sql_limit': vars.MAIN_SL_TRANSACTIONS_REALTIME_SQL_LIMIT,
'producer_batch_size': vars.MAIN_SL_TRANSACTIONS_REALTIME_PRODUCER_BATCH_SIZE,
'worker_batch_size': vars.MAIN_SL_TRANSACTIONS_REALTIME_WORKER_BATCH_SIZE,
'async_concurrent_requests': vars.MAIN_SL_TRANSACTIONS_REALTIME_ASYNC_CONCURRENT_REQUESTS,
'sql_source': "{{this.identifier}}",
'exploded_key': '["result.txs"]'
} %}
{% set function_call_sql %}
{{ fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_rest_api_v2',
target = '{{this.schema}}.{{this.identifier}}',
params = params
) }}
{% endset %}
{% do run_query(function_call_sql) %}
{{ log("Streamline function call: " ~ function_call_sql, info=true) }}
{% endif %}

View File

@ -0,0 +1,94 @@
{# Get variables #}
{% set vars = return_vars() %}
-- depends_on: {{ ref('streamline__complete_tx_counts') }}
{{ config (
materialized = 'view',
tags = ['streamline','core','realtime','phase_1']
) }}
WITH blocks AS (
SELECT
block_number
FROM
{{ ref('streamline__blocks') }}
EXCEPT
SELECT
block_number
FROM
{{ ref('streamline__tx_counts_complete') }}
),
{# retry AS (
SELECT
NULL AS A.block_number
FROM
{{ ref('streamline__complete_tx_counts') }} A
JOIN {{ ref('silver__blockchain') }}
b
ON A.block_number = b.block_id
WHERE
A.tx_count <> b.num_txs
),
#}
combo AS (
SELECT
block_number
FROM
blocks {# UNION
SELECT
block_number
FROM
retry #}
)
SELECT
ROUND(block_number, -3) :: INT AS partition_key,
{{ target.database }}.live.udf_api(
'POST',
'{{ vars.GLOBAL_NODE_URL }}',
OBJECT_CONSTRUCT(
'Content-Type', 'application/json',
'fsc-quantum-state', 'streamline'
),
OBJECT_CONSTRUCT(
'id', block_number,
'jsonrpc', '2.0',
'method', 'tx_search',
'params', ARRAY_CONSTRUCT(
'tx.height=' || block_number :: STRING, TRUE,
'1',
'1',
'asc'
)
),
'{{ vars.GLOBAL_NODE_VAULT_PATH }}'
) AS request,
block_number
FROM
combo
ORDER BY
block_number
{# Streamline Function Call #}
{% if execute %}
{% set params = {
'external_table': 'txcount',
'sql_limit': vars.MAIN_SL_TX_COUNTS_REALTIME_SQL_LIMIT,
'producer_batch_size': vars.MAIN_SL_TX_COUNTS_REALTIME_PRODUCER_BATCH_SIZE,
'worker_batch_size': vars.MAIN_SL_TX_COUNTS_REALTIME_WORKER_BATCH_SIZE,
'async_concurrent_requests': vars.MAIN_SL_TX_COUNTS_REALTIME_ASYNC_CONCURRENT_REQUESTS,
'sql_source' :"{{this.identifier}}"
} %}
{% set function_call_sql %}
{{ fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_rest_api_v2',
target = '{{this.schema}}.{{this.identifier}}',
params = params
) }}
{% endset %}
{% do run_query(function_call_sql) %}
{{ log("Streamline function call: " ~ function_call_sql, info=true) }}
{% endif %}

View File

@ -0,0 +1,20 @@
{{ config (
materialized = "view",
tags = ['streamline_view']
) }}
SELECT
_id AS block_number
FROM
{{ source(
'crosschain_silver',
'number_sequence'
) }}
WHERE
_id >= 5200791
AND _id <= (
SELECT
MAX(block_number)
FROM
{{ ref('streamline__chainhead') }}
)

View File

@ -0,0 +1,27 @@
{{ config (
materialized = "view",
tags = ['streamline_view']
) }}
SELECT
{{ target.database }}.live.udf_api(
'POST',
'{Service}/{Authentication}',
OBJECT_CONSTRUCT(
'Content-Type',
'application/json',
'fsc-quantum-state',
'livequery'
),
OBJECT_CONSTRUCT(
'id',
0,
'jsonrpc',
'2.0',
'method',
'status',
'params',
[]
),
'<VAULT_SECRET_PATH_HERE>'
) :data :result :sync_info :latest_block_height :: INT AS block_number