From dc1aafa116854ae448c8d416ffaa0482cc6f69c5 Mon Sep 17 00:00:00 2001 From: Mike Stepanovic Date: Wed, 5 Mar 2025 12:56:21 -0700 Subject: [PATCH] add streamline views --- models/bronze/core/bronze__blocks_tx.sql | 7 ++ models/bronze/core/bronze__blocks_tx_FR.sql | 7 ++ models/bronze/core/bronze__transactions.sql | 7 ++ .../bronze/core/bronze__transactions_FR.sql | 7 ++ models/sources.yml | 35 +++++++ .../streamline__blocks_tx_complete.sql | 41 ++++++++ .../streamline__transactions_complete.sql | 46 +++++++++ .../streamline/silver/_max_block_by_date.sql | 27 ++++++ .../streamline__blocks_tx_realtime.sql | 49 ++++++++++ .../streamline__transactions_realtime.sql | 93 +++++++++++++++++++ .../streamline/silver/streamline__blocks.sql | 19 ++++ .../silver/streamline__chainhead.sql | 20 ++++ 12 files changed, 358 insertions(+) create mode 100644 models/bronze/core/bronze__blocks_tx.sql create mode 100644 models/bronze/core/bronze__blocks_tx_FR.sql create mode 100644 models/bronze/core/bronze__transactions.sql create mode 100644 models/bronze/core/bronze__transactions_FR.sql create mode 100644 models/sources.yml create mode 100644 models/streamline/complete/streamline__blocks_tx_complete.sql create mode 100644 models/streamline/complete/streamline__transactions_complete.sql create mode 100644 models/streamline/silver/_max_block_by_date.sql create mode 100644 models/streamline/silver/realtime/streamline__blocks_tx_realtime.sql create mode 100644 models/streamline/silver/realtime/streamline__transactions_realtime.sql create mode 100644 models/streamline/silver/streamline__blocks.sql create mode 100644 models/streamline/silver/streamline__chainhead.sql diff --git a/models/bronze/core/bronze__blocks_tx.sql b/models/bronze/core/bronze__blocks_tx.sql new file mode 100644 index 0000000..a3e3432 --- /dev/null +++ b/models/bronze/core/bronze__blocks_tx.sql @@ -0,0 +1,7 @@ +{{ config ( + materialized = 'view' +) }} +{{ streamline_external_table_query_v2( + model = "blocks_tx", + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )" +) }} diff --git a/models/bronze/core/bronze__blocks_tx_FR.sql b/models/bronze/core/bronze__blocks_tx_FR.sql new file mode 100644 index 0000000..6541673 --- /dev/null +++ b/models/bronze/core/bronze__blocks_tx_FR.sql @@ -0,0 +1,7 @@ +{{ config ( + materialized = 'view' +) }} +{{ streamline_external_table_FR_query_v2( + model = "blocks_tx", + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )" +) }} diff --git a/models/bronze/core/bronze__transactions.sql b/models/bronze/core/bronze__transactions.sql new file mode 100644 index 0000000..a7af4cc --- /dev/null +++ b/models/bronze/core/bronze__transactions.sql @@ -0,0 +1,7 @@ +{{ config ( + materialized = 'view' +) }} +{{ streamline_external_table_query_v2( + model = "transactions", + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )" +) }} diff --git a/models/bronze/core/bronze__transactions_FR.sql b/models/bronze/core/bronze__transactions_FR.sql new file mode 100644 index 0000000..7ad9586 --- /dev/null +++ b/models/bronze/core/bronze__transactions_FR.sql @@ -0,0 +1,7 @@ +{{ config ( + materialized = 'view' +) }} +{{ streamline_external_table_FR_query_v2( + model = "transactions", + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )" +) }} diff --git a/models/sources.yml b/models/sources.yml new file mode 100644 index 0000000..d66542f --- /dev/null +++ b/models/sources.yml @@ -0,0 +1,35 @@ +version: 2 + +sources: + - name: crosschain + database: "{{ 'crosschain' if target.database == 'MOVEMENT' else 'crosschain_dev' }}" + schema: core + tables: + - name: dim_date_hours + - name: address_tags + - name: dim_dates + - name: crosschain_silver + database: "{{ 'crosschain' if target.database == 'MOVEMENT' else 'crosschain_dev' }}" + schema: silver + tables: + - name: number_sequence + - name: bronze_streamline + database: streamline + schema: | + {{ "MOVEMENT_DEV" if var("STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES", False) else "MOVEMENT" }} + tables: + - name: blocks_tx + loaded_at_field: INSERTED_TIMESTAMP + freshness: + warn_after: {count: 2, period: hour} + error_after: {count: 4, period: hour} + - name: transactions + loaded_at_field: INSERTED_TIMESTAMP + freshness: + warn_after: {count: 2, period: hour} + error_after: {count: 4, period: hour} + - name: github_actions + database: movement + schema: github_actions + tables: + - name: workflows diff --git a/models/streamline/complete/streamline__blocks_tx_complete.sql b/models/streamline/complete/streamline__blocks_tx_complete.sql new file mode 100644 index 0000000..d602314 --- /dev/null +++ b/models/streamline/complete/streamline__blocks_tx_complete.sql @@ -0,0 +1,41 @@ +{{ config ( + materialized = "incremental", + incremental_strategy = 'merge', + unique_key = "block_number", + cluster_by = "ROUND(block_number, -3)", + merge_exclude_columns = ["inserted_timestamp"], + post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(block_number)" +) }} +-- depends_on: {{ ref('bronze__blocks_tx') }} + +SELECT + DATA :block_height :: INT AS block_number, + ARRAY_SIZE( + DATA :transactions + ) AS tx_count_from_transactions_array, + DATA :last_version :: bigint - DATA :first_version :: bigint + 1 AS tx_count_from_versions, + {{ dbt_utils.generate_surrogate_key( + ['block_number'] + ) }} AS blocks_tx_complete_id, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp, + file_name, + '{{ invocation_id }}' AS _invocation_id +FROM + +{% if is_incremental() %} +{{ ref('bronze__blocks_tx') }} +WHERE + inserted_timestamp >= ( + SELECT + MAX(modified_timestamp) + FROM + {{ this }} + ) +{% else %} + {{ ref('bronze__blocks_tx_FR') }} +{% endif %} + +qualify(ROW_NUMBER() over (PARTITION BY block_number +ORDER BY + inserted_timestamp DESC)) = 1 diff --git a/models/streamline/complete/streamline__transactions_complete.sql b/models/streamline/complete/streamline__transactions_complete.sql new file mode 100644 index 0000000..7e5a5ca --- /dev/null +++ b/models/streamline/complete/streamline__transactions_complete.sql @@ -0,0 +1,46 @@ +{{ config ( + materialized = "incremental", + incremental_strategy = 'merge', + unique_key = "transactions_complete_id", + cluster_by = "ROUND(block_number, -3)", + merge_exclude_columns = ["inserted_timestamp"], + post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(block_number)" +) }} +-- depends_on: {{ ref('bronze__transactions') }} + +SELECT + A.value :BLOCK_NUMBER :: INT AS block_number, + A.value :MULTIPLIER :: INT AS multiplier_no, + {{ dbt_utils.generate_surrogate_key( + ['block_number','multiplier_no'] + ) }} AS transactions_complete_id, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp, + file_name, + '{{ invocation_id }}' AS _invocation_id +FROM + +{% if is_incremental() %} +{{ ref('bronze__transactions') }} +{% else %} + {{ ref('bronze__transactions_FR') }} +{% endif %} + +A +JOIN {{ ref('silver__blocks') }} +b +ON DATA [0] :version :: INT BETWEEN b.first_version +AND b.last_version + +{% if is_incremental() %} +WHERE + A.inserted_timestamp >= ( + SELECT + COALESCE(MAX(modified_timestamp), '1970-01-01' :: DATE) + FROM + {{ this }}) + {% endif %} + + qualify(ROW_NUMBER() over (PARTITION BY block_number + ORDER BY + A.inserted_timestamp DESC)) = 1 diff --git a/models/streamline/silver/_max_block_by_date.sql b/models/streamline/silver/_max_block_by_date.sql new file mode 100644 index 0000000..7609b67 --- /dev/null +++ b/models/streamline/silver/_max_block_by_date.sql @@ -0,0 +1,27 @@ +{{ config ( + materialized = "ephemeral", + unique_key = "block_number", +) }} + +WITH base AS ( + + SELECT + block_timestamp :: DATE AS block_date, + MAX(block_number) block_number + FROM + {{ ref("silver__blocks") }} + GROUP BY + block_timestamp :: DATE +) +SELECT + block_date, + block_number +FROM + base +WHERE + block_date <> ( + SELECT + MAX(block_date) + FROM + base + ) diff --git a/models/streamline/silver/realtime/streamline__blocks_tx_realtime.sql b/models/streamline/silver/realtime/streamline__blocks_tx_realtime.sql new file mode 100644 index 0000000..b271a04 --- /dev/null +++ b/models/streamline/silver/realtime/streamline__blocks_tx_realtime.sql @@ -0,0 +1,49 @@ +{{ config ( + materialized = "view", + post_hook = fsc_utils.if_data_call_function_v2( + func = 'streamline.udf_bulk_rest_api_v2', + target = "{{this.schema}}.{{this.identifier}}", + params ={ "external_table" :"blocks_tx", + "sql_limit" :"8000", + "producer_batch_size" :"50", + "worker_batch_size" :"50", + "sql_source" :"{{this.identifier}}" } + ) +) }} + +WITH blocks AS ( + + SELECT + block_number + FROM + {{ ref('streamline__blocks') }} + WHERE + block_number != 0 + EXCEPT + SELECT + block_number + FROM + {{ ref('streamline__blocks_tx_complete') }} +) +SELECT + block_number, + ROUND( + block_number, + -4 + ) :: INT AS partition_key, + {{ target.database }}.live.udf_api( + 'GET', + '{Service}/v1/blocks/by_height/' || block_number || '?with_transactions=true', + OBJECT_CONSTRUCT( + 'Content-Type', + 'application/json', + 'User-Agent', + 'Flipside_Crypto/0.1' + ), + PARSE_JSON('{}'), + 'Vault/prod/movement/mainnet' + ) AS request +FROM + blocks +ORDER BY + block_number diff --git a/models/streamline/silver/realtime/streamline__transactions_realtime.sql b/models/streamline/silver/realtime/streamline__transactions_realtime.sql new file mode 100644 index 0000000..1a7d6f9 --- /dev/null +++ b/models/streamline/silver/realtime/streamline__transactions_realtime.sql @@ -0,0 +1,93 @@ +{{ config ( + materialized = "view", + post_hook = fsc_utils.if_data_call_function_v2( + func = 'streamline.udf_bulk_rest_api_v2', + target = "{{this.schema}}.{{this.identifier}}", + params ={ "external_table" :"transactions", + "sql_limit" :"1000", + "producer_batch_size" :"50", + "worker_batch_size" :"50", + "sql_source" :"{{this.identifier}}" } + ) +) }} + +WITH blocks AS ( + + SELECT + A.block_number, + tx_count_from_versions AS tx_count, + first_version AS version_start + FROM + {{ ref('silver__blocks') }} A +), +numbers AS ( + SELECT + 1 AS n + UNION ALL + SELECT + n + 1 + FROM + numbers + WHERE + n < ( + SELECT + CEIL(MAX(tx_count) / 100.0) + FROM + blocks) + ), + blocks_with_page_numbers AS ( + SELECT + tt.block_number :: INT AS block_number, + n.n - 1 AS multiplier, + version_start, + tx_count + FROM + blocks tt + JOIN numbers n + ON n.n <= CASE + WHEN tt.tx_count % 100 = 0 THEN tt.tx_count / 100 + ELSE FLOOR( + tt.tx_count / 100 + ) + 1 + END + ), + WORK AS ( + SELECT + A.block_number, + version_start +( + 100 * multiplier + ) AS tx_version, + multiplier + FROM + blocks_with_page_numbers A + LEFT JOIN {{ ref('streamline__transactions_complete') }} + b + ON A.block_number = b.block_number + AND multiplier = b.multiplier_no + WHERE + b.block_number IS NULL + ) + SELECT + tx_version, + ROUND( + tx_version, + -4 + ) :: INT AS partition_key, + {{ target.database }}.live.udf_api( + 'GET', + '{Service}/v1/transactions?start=' || tx_version || '&limit=100', + OBJECT_CONSTRUCT( + 'Content-Type', + 'application/json', + 'User-Agent', + 'Flipside_Crypto/0.1' + ), + PARSE_JSON('{}'), + 'Vault/prod/movement/mainnet' + ) AS request, + block_number, + multiplier + FROM + WORK + ORDER BY + block_number diff --git a/models/streamline/silver/streamline__blocks.sql b/models/streamline/silver/streamline__blocks.sql new file mode 100644 index 0000000..3923098 --- /dev/null +++ b/models/streamline/silver/streamline__blocks.sql @@ -0,0 +1,19 @@ +{{ config ( + materialized = "view", + tags = ['streamline_view'] +) }} + +SELECT + _id AS block_number +FROM + {{ source( + 'crosschain_silver', + 'number_sequence' + ) }} +WHERE + _id <= ( + SELECT + MAX(block_number) + FROM + {{ ref('streamline__chainhead') }} + ) diff --git a/models/streamline/silver/streamline__chainhead.sql b/models/streamline/silver/streamline__chainhead.sql new file mode 100644 index 0000000..86ea4ac --- /dev/null +++ b/models/streamline/silver/streamline__chainhead.sql @@ -0,0 +1,20 @@ +{{ config ( + materialized = "view", + tags = ['streamline_view'] +) }} + +SELECT + {{ target.database }}.live.udf_api( + 'GET', + '{Service}/v1', + OBJECT_CONSTRUCT( + 'Content-Type', + 'application/json', + 'fsc-quantum-state', + 'livequery', + 'User-Agent', + 'Flipside_Crypto/0.1' + ), + OBJECT_CONSTRUCT(), + 'Vault/prod/movement/mainnet' + ) :data :block_height :: INT AS block_number