From a0a2f1925df62cd89641defced42487d9f9c2674 Mon Sep 17 00:00:00 2001 From: Mike Stepanovic Date: Thu, 1 May 2025 13:42:10 -0600 Subject: [PATCH] add phase 1 models --- .../streamline/bronze__streamline_blocks.sql | 8 ++ .../bronze__streamline_transactions.sql | 8 ++ .../bronze__streamline_tx_counts.sql | 8 ++ .../bronze__streamline_blocks_fr.sql | 8 ++ .../bronze__streamline_transactions_fr.sql | 8 ++ .../bronze__streamline_tx_counts_fr.sql | 8 ++ .../core/streamline/_block_lookback.sql | 0 .../core/streamline/_max_block_by_date.sql | 0 .../complete/streamline__blocks_complete.sql | 40 +++++++ .../streamline__transactions_complete.sql | 53 +++++++++ .../streamline__tx_counts_complete.sql | 43 ++++++++ .../realtime/streamline__blocks_realtime.sql | 65 +++++++++++ .../streamline__transactions_realtime.sql | 104 ++++++++++++++++++ .../streamline__tx_counts_realtime.sql | 94 ++++++++++++++++ .../core/streamline/streamline_blocks.sql | 20 ++++ .../core/streamline/streamline_chainhead.sql | 27 +++++ 16 files changed, 494 insertions(+) create mode 100644 models/main_package/core/bronze/streamline/bronze__streamline_blocks.sql create mode 100644 models/main_package/core/bronze/streamline/bronze__streamline_transactions.sql create mode 100644 models/main_package/core/bronze/streamline/bronze__streamline_tx_counts.sql create mode 100644 models/main_package/core/bronze/streamline/full_refresh/bronze__streamline_blocks_fr.sql create mode 100644 models/main_package/core/bronze/streamline/full_refresh/bronze__streamline_transactions_fr.sql create mode 100644 models/main_package/core/bronze/streamline/full_refresh/bronze__streamline_tx_counts_fr.sql create mode 100644 models/main_package/core/streamline/_block_lookback.sql create mode 100644 models/main_package/core/streamline/_max_block_by_date.sql create mode 100644 models/main_package/core/streamline/complete/streamline__blocks_complete.sql create mode 100644 models/main_package/core/streamline/complete/streamline__transactions_complete.sql create mode 100644 models/main_package/core/streamline/complete/streamline__tx_counts_complete.sql create mode 100644 models/main_package/core/streamline/realtime/streamline__blocks_realtime.sql create mode 100644 models/main_package/core/streamline/realtime/streamline__transactions_realtime.sql create mode 100644 models/main_package/core/streamline/realtime/streamline__tx_counts_realtime.sql create mode 100644 models/main_package/core/streamline/streamline_blocks.sql create mode 100644 models/main_package/core/streamline/streamline_chainhead.sql diff --git a/models/main_package/core/bronze/streamline/bronze__streamline_blocks.sql b/models/main_package/core/bronze/streamline/bronze__streamline_blocks.sql new file mode 100644 index 0000000..207db84 --- /dev/null +++ b/models/main_package/core/bronze/streamline/bronze__streamline_blocks.sql @@ -0,0 +1,8 @@ +{{ config ( + materialized = 'view', + tags = ['bronze','core','phase_1'] +) }} + +{{ streamline_external_table_query( + source_name = 'blocks' +) }} \ No newline at end of file diff --git a/models/main_package/core/bronze/streamline/bronze__streamline_transactions.sql b/models/main_package/core/bronze/streamline/bronze__streamline_transactions.sql new file mode 100644 index 0000000..3773a27 --- /dev/null +++ b/models/main_package/core/bronze/streamline/bronze__streamline_transactions.sql @@ -0,0 +1,8 @@ +{{ config ( + materialized = 'view', + tags = ['bronze','core','phase_1'] +) }} + +{{ streamline_external_table_query( + source_name = 'transactions' +) }} \ No newline at end of file diff --git a/models/main_package/core/bronze/streamline/bronze__streamline_tx_counts.sql b/models/main_package/core/bronze/streamline/bronze__streamline_tx_counts.sql new file mode 100644 index 0000000..fd8d9ba --- /dev/null +++ b/models/main_package/core/bronze/streamline/bronze__streamline_tx_counts.sql @@ -0,0 +1,8 @@ +{{ config ( + materialized = 'view', + tags = ['bronze','core','phase_1'] +) }} + +{{ streamline_external_table_query( + source_name = 'tx_counts' +) }} \ No newline at end of file diff --git a/models/main_package/core/bronze/streamline/full_refresh/bronze__streamline_blocks_fr.sql b/models/main_package/core/bronze/streamline/full_refresh/bronze__streamline_blocks_fr.sql new file mode 100644 index 0000000..0c742bc --- /dev/null +++ b/models/main_package/core/bronze/streamline/full_refresh/bronze__streamline_blocks_fr.sql @@ -0,0 +1,8 @@ +{{ config ( + materialized = 'view', + tags = ['bronze','core','phase_1'] +) }} + +{{ streamline_external_table_query_fr( + source_name = 'blocks' +) }} \ No newline at end of file diff --git a/models/main_package/core/bronze/streamline/full_refresh/bronze__streamline_transactions_fr.sql b/models/main_package/core/bronze/streamline/full_refresh/bronze__streamline_transactions_fr.sql new file mode 100644 index 0000000..12b5401 --- /dev/null +++ b/models/main_package/core/bronze/streamline/full_refresh/bronze__streamline_transactions_fr.sql @@ -0,0 +1,8 @@ +{{ config ( + materialized = 'view', + tags = ['bronze','core','phase_1'] +) }} + +{{ streamline_external_table_query_fr( + source_name = 'transactions' +) }} \ No newline at end of file diff --git a/models/main_package/core/bronze/streamline/full_refresh/bronze__streamline_tx_counts_fr.sql b/models/main_package/core/bronze/streamline/full_refresh/bronze__streamline_tx_counts_fr.sql new file mode 100644 index 0000000..e1ce8d9 --- /dev/null +++ b/models/main_package/core/bronze/streamline/full_refresh/bronze__streamline_tx_counts_fr.sql @@ -0,0 +1,8 @@ +{{ config ( + materialized = 'view', + tags = ['bronze','core','phase_1'] +) }} + +{{ streamline_external_table_query_fr( + source_name = 'tx_counts' +) }} \ No newline at end of file diff --git a/models/main_package/core/streamline/_block_lookback.sql b/models/main_package/core/streamline/_block_lookback.sql new file mode 100644 index 0000000..e69de29 diff --git a/models/main_package/core/streamline/_max_block_by_date.sql b/models/main_package/core/streamline/_max_block_by_date.sql new file mode 100644 index 0000000..e69de29 diff --git a/models/main_package/core/streamline/complete/streamline__blocks_complete.sql b/models/main_package/core/streamline/complete/streamline__blocks_complete.sql new file mode 100644 index 0000000..d31f110 --- /dev/null +++ b/models/main_package/core/streamline/complete/streamline__blocks_complete.sql @@ -0,0 +1,40 @@ +{# Get variables #} +{% set vars = return_vars() %} + +-- depends_on: {{ ref('bronze__streamline_blocks') }} +{{ config ( + materialized = "incremental", + incremental_strategy = 'merge', + unique_key = "block_number", + cluster_by = "ROUND(block_number, -3)", + merge_exclude_columns = ["inserted_timestamp"], + post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(block_number)" +) }} + +SELECT + DATA :result :block :header :height :: INT AS block_number, + {{ dbt_utils.generate_surrogate_key( + ['block_number'] + ) }} AS complete_blocks_id, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp, + file_name, + '{{ invocation_id }}' AS _invocation_id +FROM + +{% if is_incremental() %} +{{ ref('bronze__streamline_blocks') }} +WHERE + inserted_timestamp >= ( + SELECT + MAX(modified_timestamp) modified_timestamp + FROM + {{ this }} + ) +{% else %} + {{ ref('bronze__streamline_blocks_fr') }} +{% endif %} + +qualify(ROW_NUMBER() over (PARTITION BY block_number +ORDER BY + inserted_timestamp DESC)) = 1 diff --git a/models/main_package/core/streamline/complete/streamline__transactions_complete.sql b/models/main_package/core/streamline/complete/streamline__transactions_complete.sql new file mode 100644 index 0000000..0b6d897 --- /dev/null +++ b/models/main_package/core/streamline/complete/streamline__transactions_complete.sql @@ -0,0 +1,53 @@ +{# Get variables #} +{% set vars = return_vars() %} + +-- depends_on: {{ ref('bronze__streamline_transactions') }} + +{{ config ( + materialized = "incremental", + incremental_strategy = 'merge', + unique_key = "complete_transactions_id", + cluster_by = "ROUND(block_number, -3)", + merge_exclude_columns = ["inserted_timestamp"], + post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(block_number)" +) }} + +SELECT + COALESCE( + VALUE :BLOCK_NUMBER_REQUESTED, + DATA :height, + VALUE :data :result :txs [0] :height + ) :: INT AS block_number, + COALESCE( + VALUE :PAGE_NUMBER, + metadata :request :params [2] + ) :: INT AS page_number, + {{ dbt_utils.generate_surrogate_key( + ['block_number','page_number'] + ) }} AS complete_transactions_id, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp, + file_name, + '{{ invocation_id }}' AS _invocation_id +FROM + +{% if is_incremental() %} +{{ ref('bronze__streamline_transactions') }} +{% else %} + {{ ref('bronze__streamline_transactions_fr') }} +{% endif %} +WHERE + DATA <> '[]' + +{% if is_incremental() %} +AND inserted_timestamp >= ( + SELECT + MAX(modified_timestamp) modified_timestamp + FROM + {{ this }} +) +{% endif %} + +qualify(ROW_NUMBER() over (PARTITION BY complete_transactions_id +ORDER BY + inserted_timestamp DESC)) = 1 diff --git a/models/main_package/core/streamline/complete/streamline__tx_counts_complete.sql b/models/main_package/core/streamline/complete/streamline__tx_counts_complete.sql new file mode 100644 index 0000000..17d83ab --- /dev/null +++ b/models/main_package/core/streamline/complete/streamline__tx_counts_complete.sql @@ -0,0 +1,43 @@ +{# Get variables #} +{% set vars = return_vars() %} + +-- depends_on: {{ ref('bronze__streamline_tx_counts') }} + +{{ config ( + materialized = "incremental", + incremental_strategy = 'merge', + unique_key = "block_number", + cluster_by = "ROUND(block_number, -3)", + merge_exclude_columns = ["inserted_timestamp"], + post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(block_number)" +) }} + +SELECT + VALUE :BLOCK_NUMBER :: INT AS block_number, + DATA :result :total_count :: INT AS tx_count, + {{ dbt_utils.generate_surrogate_key( + ['block_number'] + ) }} AS complete_tx_counts_id, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp, + file_name, + '{{ invocation_id }}' AS _invocation_id +FROM + +{% if is_incremental() %} +{{ ref('bronze__streamline_tx_counts') }} +WHERE + inserted_timestamp >= ( + SELECT + MAX(modified_timestamp) modified_timestamp + FROM + {{ this }} + ) + AND block_number NOT IN (21208991) +{% else %} + {{ ref('bronze__streamline_tx_counts_fr') }} +{% endif %} + +qualify(ROW_NUMBER() over (PARTITION BY block_number +ORDER BY + inserted_timestamp DESC)) = 1 diff --git a/models/main_package/core/streamline/realtime/streamline__blocks_realtime.sql b/models/main_package/core/streamline/realtime/streamline__blocks_realtime.sql new file mode 100644 index 0000000..deb65b2 --- /dev/null +++ b/models/main_package/core/streamline/realtime/streamline__blocks_realtime.sql @@ -0,0 +1,65 @@ +{% set vars = return_vars() %} + +{{ config ( + materialized = 'view', + tags = ['streamline','core','realtime','phase_1'] +) }} + +WITH blocks AS ( + + SELECT + block_number + FROM + {{ ref('streamline__blocks') }} + EXCEPT + SELECT + block_number + FROM + {{ ref('streamline__blocks_complete') }} +) +SELECT + ROUND(block_number, -4) :: INT AS partition_key, + {{ target.database }}.live.udf_api( + 'POST', + '{{ vars.GLOBAL_NODE_URL }}', + OBJECT_CONSTRUCT( + 'Content-Type', 'application/json', + 'fsc-quantum-state', 'streamline' + ), + OBJECT_CONSTRUCT( + 'id', block_number, + 'jsonrpc', '2.0', + 'method', 'block', + 'params', ARRAY_CONSTRUCT(block_number :: STRING) + ), + '{{ vars.GLOBAL_NODE_VAULT_PATH }}' + ) AS request +FROM + blocks +ORDER BY + block_number + +LIMIT {{ vars.MAIN_SL_BLOCKS_REALTIME_SQL_LIMIT }} + +{# Streamline Function Call #} +{% if execute %} + {% set params = { + 'external_table': 'blocks', + 'sql_limit': vars.MAIN_SL_BLOCKS_REALTIME_SQL_LIMIT, + 'producer_batch_size': vars.MAIN_SL_BLOCKS_REALTIME_PRODUCER_BATCH_SIZE, + 'worker_batch_size': vars.MAIN_SL_BLOCKS_REALTIME_WORKER_BATCH_SIZE, + 'async_concurrent_requests': vars.MAIN_SL_BLOCKS_REALTIME_ASYNC_CONCURRENT_REQUESTS, + 'sql_source': "{{this.identifier}}" + } %} + + {% set function_call_sql %} + {{ fsc_utils.if_data_call_function_v2( + func = 'streamline.udf_bulk_rest_api_v2', + target = '{{this.schema}}.{{this.identifier}}', + params = params + ) }} + {% endset %} + + {% do run_query(function_call_sql) %} + {{ log("Streamline function call: " ~ function_call_sql, info=true) }} +{% endif %} diff --git a/models/main_package/core/streamline/realtime/streamline__transactions_realtime.sql b/models/main_package/core/streamline/realtime/streamline__transactions_realtime.sql new file mode 100644 index 0000000..7bbfa00 --- /dev/null +++ b/models/main_package/core/streamline/realtime/streamline__transactions_realtime.sql @@ -0,0 +1,104 @@ +WITH blocks AS ( + + SELECT + A.block_number, + tx_count + FROM + {{ ref('streamline__tx_counts_complete') }} A + WHERE + tx_count > 0 +), +numbers AS ( + -- Recursive CTE to generate numbers. We'll use the maximum txcount value to limit our recursion. + SELECT + 1 AS n + UNION ALL + SELECT + n + 1 + FROM + numbers + WHERE + n < ( + SELECT + CEIL(MAX(tx_count) / 100.0) + FROM + blocks) + ), + blocks_with_page_numbers AS ( + SELECT + tt.block_number :: INT AS block_number, + n.n AS page_number + FROM + blocks tt + JOIN numbers n + ON n.n <= CASE + WHEN tt.tx_count % 100 = 0 THEN tt.tx_count / 100 + ELSE FLOOR( + tt.tx_count / 100 + ) + 1 + END + EXCEPT + SELECT + block_number, + page_number + FROM + {{ ref('streamline__transactions_complete') }} + ) + SELECT + ROUND( + block_number, + -3 + ) :: INT AS partition_key, + {{ target.database }}.live.udf_api( + 'POST', + '{{ vars.GLOBAL_NODE_URL }}', + OBJECT_CONSTRUCT( + 'Content-Type', 'application/json', + 'fsc-quantum-state', 'streamline' + ), + OBJECT_CONSTRUCT( + 'id', block_number, + 'jsonrpc', '2.0', + 'method', 'tx_search', + 'params', ARRAY_CONSTRUCT( + 'tx.height=' || block_number :: STRING, + TRUE, + page_number :: STRING, + '100', + 'asc' + ) + ), + '{{ vars.GLOBAL_NODE_VAULT_PATH }}' + ) AS request, + page_number, + block_number AS block_number_requested + FROM + blocks_with_page_numbers + ORDER BY + block_number + +LIMIT {{ vars.MAIN_SL_TRANSACTIONS_REALTIME_SQL_LIMIT }} + +{# Streamline Function Call #} +{% if execute %} + {% set params = { + 'external_table': 'transactions', + 'sql_limit': vars.MAIN_SL_TRANSACTIONS_REALTIME_SQL_LIMIT, + 'producer_batch_size': vars.MAIN_SL_TRANSACTIONS_REALTIME_PRODUCER_BATCH_SIZE, + 'worker_batch_size': vars.MAIN_SL_TRANSACTIONS_REALTIME_WORKER_BATCH_SIZE, + 'async_concurrent_requests': vars.MAIN_SL_TRANSACTIONS_REALTIME_ASYNC_CONCURRENT_REQUESTS, + 'sql_source': "{{this.identifier}}", + 'exploded_key': '["result.txs"]' + } %} + + {% set function_call_sql %} + {{ fsc_utils.if_data_call_function_v2( + func = 'streamline.udf_bulk_rest_api_v2', + target = '{{this.schema}}.{{this.identifier}}', + params = params + ) }} + {% endset %} + + {% do run_query(function_call_sql) %} + {{ log("Streamline function call: " ~ function_call_sql, info=true) }} +{% endif %} \ No newline at end of file diff --git a/models/main_package/core/streamline/realtime/streamline__tx_counts_realtime.sql b/models/main_package/core/streamline/realtime/streamline__tx_counts_realtime.sql new file mode 100644 index 0000000..57b72ef --- /dev/null +++ b/models/main_package/core/streamline/realtime/streamline__tx_counts_realtime.sql @@ -0,0 +1,94 @@ +{# Get variables #} +{% set vars = return_vars() %} + +-- depends_on: {{ ref('streamline__complete_tx_counts') }} + +{{ config ( + materialized = 'view', + tags = ['streamline','core','realtime','phase_1'] +) }} + +WITH blocks AS ( + + SELECT + block_number + FROM + {{ ref('streamline__blocks') }} + EXCEPT + SELECT + block_number + FROM + {{ ref('streamline__tx_counts_complete') }} +), +{# retry AS ( +SELECT + NULL AS A.block_number +FROM + {{ ref('streamline__complete_tx_counts') }} A + JOIN {{ ref('silver__blockchain') }} + b + ON A.block_number = b.block_id +WHERE + A.tx_count <> b.num_txs +), +#} +combo AS ( + SELECT + block_number + FROM + blocks {# UNION + SELECT + block_number + FROM + retry #} +) +SELECT + ROUND(block_number, -3) :: INT AS partition_key, + {{ target.database }}.live.udf_api( + 'POST', + '{{ vars.GLOBAL_NODE_URL }}', + OBJECT_CONSTRUCT( + 'Content-Type', 'application/json', + 'fsc-quantum-state', 'streamline' + ), + OBJECT_CONSTRUCT( + 'id', block_number, + 'jsonrpc', '2.0', + 'method', 'tx_search', + 'params', ARRAY_CONSTRUCT( + 'tx.height=' || block_number :: STRING, TRUE, + '1', + '1', + 'asc' + ) + ), + '{{ vars.GLOBAL_NODE_VAULT_PATH }}' + ) AS request, + block_number +FROM + combo +ORDER BY + block_number + +{# Streamline Function Call #} +{% if execute %} + {% set params = { + 'external_table': 'txcount', + 'sql_limit': vars.MAIN_SL_TX_COUNTS_REALTIME_SQL_LIMIT, + 'producer_batch_size': vars.MAIN_SL_TX_COUNTS_REALTIME_PRODUCER_BATCH_SIZE, + 'worker_batch_size': vars.MAIN_SL_TX_COUNTS_REALTIME_WORKER_BATCH_SIZE, + 'async_concurrent_requests': vars.MAIN_SL_TX_COUNTS_REALTIME_ASYNC_CONCURRENT_REQUESTS, + 'sql_source' :"{{this.identifier}}" + } %} + + {% set function_call_sql %} + {{ fsc_utils.if_data_call_function_v2( + func = 'streamline.udf_bulk_rest_api_v2', + target = '{{this.schema}}.{{this.identifier}}', + params = params + ) }} + {% endset %} + + {% do run_query(function_call_sql) %} + {{ log("Streamline function call: " ~ function_call_sql, info=true) }} +{% endif %} \ No newline at end of file diff --git a/models/main_package/core/streamline/streamline_blocks.sql b/models/main_package/core/streamline/streamline_blocks.sql new file mode 100644 index 0000000..edf6bd5 --- /dev/null +++ b/models/main_package/core/streamline/streamline_blocks.sql @@ -0,0 +1,20 @@ +{{ config ( + materialized = "view", + tags = ['streamline_view'] +) }} + +SELECT + _id AS block_number +FROM + {{ source( + 'crosschain_silver', + 'number_sequence' + ) }} +WHERE + _id >= 5200791 + AND _id <= ( + SELECT + MAX(block_number) + FROM + {{ ref('streamline__chainhead') }} + ) \ No newline at end of file diff --git a/models/main_package/core/streamline/streamline_chainhead.sql b/models/main_package/core/streamline/streamline_chainhead.sql new file mode 100644 index 0000000..3025400 --- /dev/null +++ b/models/main_package/core/streamline/streamline_chainhead.sql @@ -0,0 +1,27 @@ +{{ config ( + materialized = "view", + tags = ['streamline_view'] +) }} + +SELECT + {{ target.database }}.live.udf_api( + 'POST', + '{Service}/{Authentication}', + OBJECT_CONSTRUCT( + 'Content-Type', + 'application/json', + 'fsc-quantum-state', + 'livequery' + ), + OBJECT_CONSTRUCT( + 'id', + 0, + 'jsonrpc', + '2.0', + 'method', + 'status', + 'params', + [] + ), + '' + ) :data :result :sync_info :latest_block_height :: INT AS block_number \ No newline at end of file