diff --git a/macros/api_udf/create_get_nearblocks_fts.sql b/macros/api_udf/create_get_nearblocks_fts.sql index 54a741c..b5bb530 100644 --- a/macros/api_udf/create_get_nearblocks_fts.sql +++ b/macros/api_udf/create_get_nearblocks_fts.sql @@ -1,3 +1,5 @@ +-- TODO slated for deprecation and drop + {# Deprecated 9/25/2023 #} {% macro create_get_nearblocks_fts() %} {% set create_table %} diff --git a/macros/api_udf/get_nearblocks_fts.sql b/macros/api_udf/get_nearblocks_fts.sql index 2913601..acaf4cb 100644 --- a/macros/api_udf/get_nearblocks_fts.sql +++ b/macros/api_udf/get_nearblocks_fts.sql @@ -1,3 +1,5 @@ +-- TODO slated for deprecation and drop + {# Deprecated 9/25/2023 #} {% macro get_nearblocks_fts() %} diff --git a/macros/create_procedure_get_chainhead.sql b/macros/create_procedure_get_chainhead.sql index da74100..1c84886 100644 --- a/macros/create_procedure_get_chainhead.sql +++ b/macros/create_procedure_get_chainhead.sql @@ -1,3 +1,5 @@ +-- TODO slated for deprecation and drop + {% macro create_PROCEDURE_GET_CHAINHEAD() %} {% set sql %} CREATE OR REPLACE PROCEDURE {{ target.database }}.STREAMLINE.GET_CHAINHEAD( diff --git a/macros/incremental_utils.sql b/macros/incremental_utils.sql index 2917f4a..707abe5 100644 --- a/macros/incremental_utils.sql +++ b/macros/incremental_utils.sql @@ -1,3 +1,4 @@ +-- TODO slated for deprecation and drop {% macro incremental_load_filter(time_col) -%} {% if is_incremental() %} diff --git a/macros/streamline/models.sql b/macros/streamline/models.sql new file mode 100644 index 0000000..e9096c9 --- /dev/null +++ b/macros/streamline/models.sql @@ -0,0 +1,68 @@ +{% macro streamline_external_table_query_v2( + model, + partition_function + ) %} + WITH meta AS ( + SELECT + job_created_time AS _inserted_timestamp, + file_name, + {{ partition_function }} AS partition_key + FROM + TABLE( + information_schema.external_table_file_registration_history( + start_time => DATEADD('day', -3, SYSDATE()), + table_name => '{{ source( "bronze_streamline", model) }}') + ) A + ) + SELECT + s.*, + b.file_name, + _inserted_timestamp + FROM + {{ source( + "bronze_streamline", + model + ) }} + s + JOIN meta b + ON b.file_name = metadata$filename + AND b.partition_key = s.partition_key + WHERE + b.partition_key = s.partition_key + AND DATA :error IS NULL + +{% endmacro %} + +{% macro streamline_external_table_FR_query_v2( + model, + partition_function + ) %} + WITH meta AS ( + SELECT + registered_on AS _inserted_timestamp, + file_name, + {{ partition_function }} AS partition_key + FROM + TABLE( + information_schema.external_table_files( + table_name => '{{ source( "bronze_streamline", model) }}' + ) + ) A + ) +SELECT + s.*, + b.file_name, + _inserted_timestamp +FROM + {{ source( + "bronze_streamline", + model + ) }} + s + JOIN meta b + ON b.file_name = metadata$filename + AND b.partition_key = s.partition_key +WHERE + b.partition_key = s.partition_key + AND DATA :error IS NULL +{% endmacro %} diff --git a/macros/udfs/create_udf_get_chainhead.sql b/macros/udfs/create_udf_get_chainhead.sql index 0e5c6cf..ea4c9fc 100644 --- a/macros/udfs/create_udf_get_chainhead.sql +++ b/macros/udfs/create_udf_get_chainhead.sql @@ -7,21 +7,22 @@ SELECT {{ target.database }}.live.udf_api( 'POST', - 'https://rpc.mainnet.near.org', - { - 'Content-Type': 'application/json' - }, - { - 'jsonrpc': '2.0', - 'id': 'dontcare', - 'method' :'status', - 'params':{ - 'finality': 'final' - } - } + '{Service}', + OBJECT_CONSTRUCT( + 'Content-Type', 'application/json' + ), + OBJECT_CONSTRUCT( + 'jsonrpc', '2.0', + 'id', 'Flipside/getChainhead/0.1', + 'method', 'status', + 'params', OBJECT_CONSTRUCT( + 'finality', 'final' + ) + ), + 'Vault/prod/near/quicknode/mainnet' ) :data :result :sync_info :latest_block_height :: INT AS block_id $$ {% endset %} {% do run_query(sql) %} -{% endmacro %} \ No newline at end of file +{% endmacro %} diff --git a/models/bronze/bronze__FR_blocks.sql b/models/bronze/bronze__FR_blocks.sql new file mode 100644 index 0000000..a03bc49 --- /dev/null +++ b/models/bronze/bronze__FR_blocks.sql @@ -0,0 +1,9 @@ +{{ config ( + materialized = 'view', + tags = ['streamline_helper'] +) }} + +{{ streamline_external_table_FR_query_v2( + model = "blocks_v2", + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER )" +) }} diff --git a/models/bronze/bronze__blocks.sql b/models/bronze/bronze__blocks.sql new file mode 100644 index 0000000..b11440a --- /dev/null +++ b/models/bronze/bronze__blocks.sql @@ -0,0 +1,10 @@ +{{ config ( + materialized = 'view', + tags = ['streamline_helper'] + +) }} + +{{ streamline_external_table_query_v2( + model = "blocks_v2", + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER )" +) }} diff --git a/models/sources.yml b/models/sources.yml index 11c789e..37c6a29 100644 --- a/models/sources.yml +++ b/models/sources.yml @@ -32,4 +32,13 @@ sources: schema: near database: hevo tables: - - name: flipsidecrypto_near_ft_balances_daily \ No newline at end of file + - name: flipsidecrypto_near_ft_balances_daily + + - name: bronze_streamline + database: streamline + schema: | + {{ "NEAR_DEV" if var("STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES", False) else "NEAR" }} + tables: + - name: blocks_v2 + - name: chunks_v2 + - name: tx_status_v2 diff --git a/models/streamline/_block_lookback.sql b/models/streamline/_block_lookback.sql new file mode 100644 index 0000000..2d3b5f0 --- /dev/null +++ b/models/streamline/_block_lookback.sql @@ -0,0 +1,11 @@ +{{ config ( + materialized = "ephemeral" +) }} + +SELECT + MIN(block_id) AS block_number +FROM + {{ ref("core__fact_blocks") }} +WHERE + block_timestamp >= DATEADD('hour', -72, SYSDATE()) + AND block_timestamp < DATEADD('hour', -71, SYSDATE()) diff --git a/models/streamline/core/complete/streamline__blocks_complete.sql b/models/streamline/core/complete/streamline__blocks_complete.sql index e69de29..46adf6d 100644 --- a/models/streamline/core/complete/streamline__blocks_complete.sql +++ b/models/streamline/core/complete/streamline__blocks_complete.sql @@ -0,0 +1,41 @@ +-- depends_on: {{ ref('bronze__blocks') }} +-- depends_on: {{ ref('bronze__FR_blocks') }} +{{ config ( + materialized = "incremental", + unique_key = "block_number", + cluster_by = "ROUND(block_number, -3)", + merge_exclude_columns = ["inserted_timestamp"], + post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(block_number)", + tags = ['streamline_complete'] +) }} + +SELECT + VALUE :block_number :: INT AS block_number, + DATA :header :hash :: STRING AS block_hash, + partition_key, + _inserted_timestamp, + DATA :header :hash :: STRING AS complete_blocks_id, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp, + '{{ invocation_id }}' AS _invocation_id +FROM + +{% if is_incremental() %} +{{ ref('bronze__blocks') }} +WHERE + _inserted_timestamp >= COALESCE( + ( + SELECT + MAX(_inserted_timestamp) _inserted_timestamp + FROM + {{ this }} + ), + '1900-01-01' :: timestamp_ntz + ) +{% else %} + {{ ref('bronze__FR_blocks') }} +{% endif %} + +qualify(ROW_NUMBER() over (PARTITION BY block_number +ORDER BY + _inserted_timestamp DESC)) = 1 diff --git a/models/streamline/core/realtime/streamline__blocks_realtime.sql b/models/streamline/core/realtime/streamline__blocks_realtime.sql index bb52edc..2c92ed4 100644 --- a/models/streamline/core/realtime/streamline__blocks_realtime.sql +++ b/models/streamline/core/realtime/streamline__blocks_realtime.sql @@ -1,7 +1,7 @@ {{ config ( materialized = "view", post_hook = fsc_utils.if_data_call_function_v2( - func = '{{this.schema}}.udf_bulk_rest_api_v2', + func = 'streamline.udf_bulk_rest_api_v2', target = "{{this.schema}}.{{this.identifier}}", params = { "external_table": "blocks_v2", @@ -10,12 +10,53 @@ "worker_batch_size": "100", "sql_source": "{{this.identifier}}" } - ) + ), + tags = ['streamline_realtime'] ) }} --- single block for testing +WITH last_3_days AS ( + + SELECT + ZEROIFNULL(block_number) AS block_number + FROM + {{ ref("_block_lookback") }} +), +tbl AS ( + SELECT + block_number + FROM + {{ ref('streamline__blocks') }} + WHERE + ( + block_number >= ( + SELECT + block_number + FROM + last_3_days + ) + ) + AND block_number IS NOT NULL + EXCEPT + SELECT + block_number + FROM + {{ ref('streamline__blocks_complete') }} + WHERE + block_number >= ( + SELECT + block_number + FROM + last_3_days + ) + AND _inserted_timestamp >= DATEADD( + 'day', + -4, + SYSDATE() + ) + -- AND {} IS NOT NULL -- TODO, determine identifier for bad response +) SELECT - 138515000 AS block_number, + block_number, DATE_PART('EPOCH', SYSDATE()) :: INTEGER AS partition_key, {{ target.database }}.live.udf_api( 'POST', @@ -26,11 +67,13 @@ SELECT OBJECT_CONSTRUCT( 'jsonrpc', '2.0', 'method', 'block', - 'id', 'dontcare', + 'id', 'Flipside/getBlock/0.1', 'params', OBJECT_CONSTRUCT( 'block_id', block_number ) ), 'Vault/prod/near/quicknode/mainnet' ) AS request - +FROM tbl +ORDER BY + block_number DESC diff --git a/models/streamline/streamline__blocks.sql b/models/streamline/streamline__blocks.sql new file mode 100644 index 0000000..6f1229d --- /dev/null +++ b/models/streamline/streamline__blocks.sql @@ -0,0 +1,21 @@ +{{ config( + materialized = "view", + tags = ['streamline_realtime', 'streamline_history', 'streamline_helper'] +) }} + +{% if execute %} + {% set height = run_query("SELECT streamline.udf_get_chainhead()") %} + {% set block_number = height.columns [0].values() [0] %} +{% else %} + {% set block_number = 0 %} +{% endif %} + +SELECT + _id AS block_number +FROM + {{ source( + 'crosschain_silver', + 'number_sequence' + ) }} +WHERE + _id <= {{ block_number }}