diff --git a/dbt_project.yml b/dbt_project.yml index fc89aa2..2a505bb 100644 --- a/dbt_project.yml +++ b/dbt_project.yml @@ -39,19 +39,6 @@ models: columns: true +on_schema_change: "append_new_columns" -vars: - "dbt_date:time_zone": GMT - OBSERV_FULL_TEST: FALSE - UPDATE_SNOWFLAKE_TAGS: TRUE - STREAMLINE_INVOKE_STREAMS: FALSE - STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES: FALSE - UPDATE_UDFS_AND_SPS: FALSE - START_GHA_TASKS: False - API_INTEGRATION: '{{ var("config")[target.name]["API_INTEGRATION"] if var("config")[target.name] else var("config")["dev"]["API_INTEGRATION"] }}' - EXTERNAL_FUNCTION_URI: '{{ var("config")[target.name]["EXTERNAL_FUNCTION_URI"] if var("config")[target.name] else var("config")["dev"]["EXTERNAL_FUNCTION_URI"] }}' - ROLES: | - ["INTERNAL_DEV"] - dbt_constraints_sources_nn_enabled: false tests: @@ -73,4 +60,37 @@ dispatch: query-comment: comment: '{{ dbt_snowflake_query_tags.get_query_comment(node) }}' - append: true # Snowflake removes prefixed comments. \ No newline at end of file + append: true # Snowflake removes prefixed comments. + +vars: + "dbt_date:time_zone": GMT + STREAMLINE_INVOKE_STREAMS: FALSE + STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES: TRUE + UPDATE_SNOWFLAKE_TAGS: TRUE + OBSERV_FULL_TEST: FALSE + START_GHA_TASKS: FALSE + UPDATE_UDFS_AND_SPS: FALSE + + +#### STREAMLINE 2.0 BEGIN #### + + API_INTEGRATION: '{{ var("config")[target.name]["API_INTEGRATION"] if var("config")[target.name] else var("config")["dev"]["API_INTEGRATION"] }}' + EXTERNAL_FUNCTION_URI: '{{ var("config")[target.name]["EXTERNAL_FUNCTION_URI"] if var("config")[target.name] else var("config")["dev"]["EXTERNAL_FUNCTION_URI"] }}' + ROLES: '{{ var("config")[target.name]["ROLES"] }}' + + config: + # The keys correspond to dbt profiles and are case sensitive + dev: + API_INTEGRATION: aws_aptos_api_dev + EXTERNAL_FUNCTION_URI: 9v6g64rv1e.execute-api.us-east-1.amazonaws.com/stg/ + API_AWS_ROLE_ARN: arn:aws:iam::704693948482:role/aptos-api-stg-rolesnowflakeudfsAF733095-k23uBmqxZRsN + ROLES: + - AWS_LAMBDA_APTOS_API + - INTERNAL_DEV + prod: + API_INTEGRATION: aws_aptos_api + EXTERNAL_FUNCTION_URI: sfl36j9j2c.execute-api.us-east-1.amazonaws.com/prod/ + ROLES: + - AWS_LAMBDA_APTOS_API + - DBT_CLOUD_APTOS + - INTERNAL_DEV \ No newline at end of file diff --git a/macros/create_udfs.sql b/macros/create_udfs.sql index ecd02b5..b8c2229 100644 --- a/macros/create_udfs.sql +++ b/macros/create_udfs.sql @@ -9,6 +9,7 @@ {% set sql %} {{ create_udf_bulk_json_rpc() }} {{ create_udf_bulk_rest_api() }} + {{ create_udf_bulk_rest_api_v2() }} {% endset %} diff --git a/macros/streamline/api_integrations.sql b/macros/streamline/api_integrations.sql index ab4d19a..44d057c 100644 --- a/macros/streamline/api_integrations.sql +++ b/macros/streamline/api_integrations.sql @@ -12,8 +12,8 @@ {% do run_query(sql) %} {% elif target.name == "dev" %} {% set sql %} - CREATE api integration IF NOT EXISTS aws_aptos_api_dev api_provider = aws_api_gateway api_aws_role_arn = 'arn:aws:iam::490041342817:role/aptos-api-dev-rolesnowflakeudfsAF733095-sLREQ0qf4XVH' api_allowed_prefixes = ( - 'https://jx15f84555.execute-api.us-east-1.amazonaws.com/dev/' + CREATE api integration IF NOT EXISTS aws_aptos_api_dev api_provider = aws_api_gateway api_aws_role_arn = 'arn:aws:iam::704693948482:role/aptos-api-stg-rolesnowflakeudfsAF733095-k23uBmqxZRsN' api_allowed_prefixes = ( + 'https://9v6g64rv1e.execute-api.us-east-1.amazonaws.com/stg/' ) enabled = TRUE; {% endset %} {% do run_query(sql) %} diff --git a/macros/streamline/models.sql b/macros/streamline/models.sql index 695c70f..6a672a0 100644 --- a/macros/streamline/models.sql +++ b/macros/streamline/models.sql @@ -76,21 +76,19 @@ WHERE FROM TABLE( information_schema.external_table_file_registration_history( - start_time => DATEADD('day', -3, CURRENT_TIMESTAMP()), + start_time => DATEADD('day', -7, CURRENT_TIMESTAMP()), table_name => '{{ source( "bronze_streamline", model) }}') ) A ) SELECT - {{ unique_key }}, - DATA, - _inserted_timestamp, MD5( CAST( COALESCE(CAST({{ unique_key }} AS text), '' :: STRING) AS text ) ) AS id, - s.{{ partition_name }}, - s.value AS VALUE + s.*, + b.file_name, + _inserted_timestamp FROM {{ source( "bronze_streamline", @@ -115,7 +113,9 @@ WHERE '-32007', '-32008', '-32009', - '-32010' + '-32010', + '-32602', + '-32603' ) ) {% endmacro %} @@ -139,16 +139,14 @@ WHERE ) A ) SELECT - {{ unique_key }}, - DATA, - _inserted_timestamp, MD5( CAST( COALESCE(CAST({{ unique_key }} AS text), '' :: STRING) AS text ) ) AS id, - s.{{ partition_name }}, - s.value AS VALUE + s.*, + b.file_name, + _inserted_timestamp FROM {{ source( "bronze_streamline", @@ -173,7 +171,78 @@ WHERE '-32007', '-32008', '-32009', - '-32010' + '-32010', + '-32602', + '-32603' ) ) {% endmacro %} + +{% macro streamline_external_table_query_v2( + model, + partition_function + ) %} + WITH meta AS ( + SELECT + job_created_time AS _inserted_timestamp, + file_name, + {{ partition_function }} AS partition_key + FROM + TABLE( + information_schema.external_table_file_registration_history( + start_time => DATEADD('day', -3, CURRENT_TIMESTAMP()), + table_name => '{{ source( "bronze_streamline", model) }}') + ) A + ) + SELECT + s.*, + b.file_name, + _inserted_timestamp + FROM + {{ source( + "bronze_streamline", + model + ) }} + s + JOIN meta b + ON b.file_name = metadata$filename + AND b.partition_key = s.partition_key + WHERE + b.partition_key = s.partition_key + AND DATA :error IS NULL + +{% endmacro %} + +{% macro streamline_external_table_FR_query_v2( + model, + partition_function + ) %} + WITH meta AS ( + SELECT + registered_on AS _inserted_timestamp, + file_name, + {{ partition_function }} AS partition_key + FROM + TABLE( + information_schema.external_table_files( + table_name => '{{ source( "bronze_streamline", model) }}' + ) + ) A + ) +SELECT + s.*, + b.file_name, + _inserted_timestamp +FROM + {{ source( + "bronze_streamline", + model + ) }} + s + JOIN meta b + ON b.file_name = metadata$filename + AND b.partition_key = s.partition_key +WHERE + b.partition_key = s.partition_key + AND DATA :error IS NULL +{% endmacro %} diff --git a/macros/streamline/streamline_udfs.sql b/macros/streamline/streamline_udfs.sql index d917f54..a0e0e69 100644 --- a/macros/streamline/streamline_udfs.sql +++ b/macros/streamline/streamline_udfs.sql @@ -22,4 +22,18 @@ ) returns ARRAY api_integration = aws_aptos_api_dev AS 'https://jx15f84555.execute-api.us-east-1.amazonaws.com/dev/udf_bulk_rest_api' {%- endif %}; +{% endmacro %} + +{% macro create_udf_bulk_rest_api_v2() %} + {% if target.name == "prod" %} + CREATE OR REPLACE EXTERNAL FUNCTION streamline.udf_bulk_rest_api_v2( + json OBJECT + ) returns ARRAY api_integration = aws_aptos_api AS + 'https://dedvhh9fi1.execute-api.us-east-1.amazonaws.com/prod/udf_bulk_rest_api' + {% else %} + CREATE OR REPLACE EXTERNAL FUNCTION streamline.udf_bulk_rest_api( + json OBJECT + ) returns ARRAY api_integration = aws_aptos_api_dev AS + 'https://9v6g64rv1e.execute-api.us-east-1.amazonaws.com/stg/udf_bulk_rest_api' + {%- endif %}; {% endmacro %} \ No newline at end of file diff --git a/models/bronze/core/bronze__streamline_FR_blocks_tx.sql b/models/bronze/core/bronze__streamline_FR_blocks_tx.sql index a24dd31..eb518b1 100644 --- a/models/bronze/core/bronze__streamline_FR_blocks_tx.sql +++ b/models/bronze/core/bronze__streamline_FR_blocks_tx.sql @@ -1,12 +1,23 @@ {{ config ( - materialized = 'view', - tags = ['core'] + materialized = 'view' ) }} -{% set model = this.identifier.split("_") [-1] %} -{{ streamline_external_table_FR_query( - model = "blocks_tx", - partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER )", - partition_name = "_partition_by_block_id", - unique_key = "partition_key" -) }} +SELECT + VALUE, + partition_key, + metadata, + DATA, + file_name, + _INSERTED_TIMESTAMP +FROM + {{ ref('bronze__streamline_FR_blocks_tx_v2') }} +UNION ALL +SELECT + VALUE, + _partition_by_block_id AS partition_key, + metadata, + DATA, + file_name, + _INSERTED_TIMESTAMP +FROM + {{ ref('bronze__streamline_FR_blocks_tx_v1') }} \ No newline at end of file diff --git a/models/bronze/core/bronze__streamline_FR_blocks_tx_v1.sql b/models/bronze/core/bronze__streamline_FR_blocks_tx_v1.sql new file mode 100644 index 0000000..a24dd31 --- /dev/null +++ b/models/bronze/core/bronze__streamline_FR_blocks_tx_v1.sql @@ -0,0 +1,12 @@ +{{ config ( + materialized = 'view', + tags = ['core'] +) }} + +{% set model = this.identifier.split("_") [-1] %} +{{ streamline_external_table_FR_query( + model = "blocks_tx", + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER )", + partition_name = "_partition_by_block_id", + unique_key = "partition_key" +) }} diff --git a/models/bronze/core/bronze__streamline_FR_blocks_tx_v2.sql b/models/bronze/core/bronze__streamline_FR_blocks_tx_v2.sql new file mode 100644 index 0000000..54ba07d --- /dev/null +++ b/models/bronze/core/bronze__streamline_FR_blocks_tx_v2.sql @@ -0,0 +1,7 @@ +{{ config ( + materialized = 'view' +) }} +{{ streamline_external_table_FR_query_v2( + "blocks_tx_v2", + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )" +) }} diff --git a/models/bronze/core/bronze__streamline_FR_transaction_batch.sql b/models/bronze/core/bronze__streamline_FR_transaction_batch.sql index 4b3b546..bb088c4 100644 --- a/models/bronze/core/bronze__streamline_FR_transaction_batch.sql +++ b/models/bronze/core/bronze__streamline_FR_transaction_batch.sql @@ -1,10 +1,23 @@ {{ config ( - materialized = 'view', - tags = ['core'] -) }} -{{ streamline_external_table_FR_query( - model = "transaction_batch", - partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER )", - partition_name = "_partition_by_block_id", - unique_key = "partition_key" + materialized = 'view' ) }} + +SELECT + VALUE, + partition_key, + metadata, + DATA, + file_name, + _INSERTED_TIMESTAMP +FROM + {{ ref('bronze__streamline_FR_transaction_batch_v2') }} +UNION ALL +SELECT + VALUE, + _partition_by_block_id AS partition_key, + metadata, + DATA, + file_name, + _INSERTED_TIMESTAMP +FROM + {{ ref('bronze__streamline_FR_transaction_batch_v1') }} diff --git a/models/bronze/core/bronze__streamline_FR_transaction_batch_v1.sql b/models/bronze/core/bronze__streamline_FR_transaction_batch_v1.sql new file mode 100644 index 0000000..4b3b546 --- /dev/null +++ b/models/bronze/core/bronze__streamline_FR_transaction_batch_v1.sql @@ -0,0 +1,10 @@ +{{ config ( + materialized = 'view', + tags = ['core'] +) }} +{{ streamline_external_table_FR_query( + model = "transaction_batch", + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER )", + partition_name = "_partition_by_block_id", + unique_key = "partition_key" +) }} diff --git a/models/bronze/core/bronze__streamline_FR_transaction_batch_v2.sql b/models/bronze/core/bronze__streamline_FR_transaction_batch_v2.sql new file mode 100644 index 0000000..2d8a24d --- /dev/null +++ b/models/bronze/core/bronze__streamline_FR_transaction_batch_v2.sql @@ -0,0 +1,7 @@ +{{ config ( + materialized = 'view' +) }} +{{ streamline_external_table_FR_query_v2( + "transaction_batch_v2", + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )" +) }} diff --git a/models/bronze/core/bronze__streamline_blocks_tx.sql b/models/bronze/core/bronze__streamline_blocks_tx.sql index 8f92e28..a01a862 100644 --- a/models/bronze/core/bronze__streamline_blocks_tx.sql +++ b/models/bronze/core/bronze__streamline_blocks_tx.sql @@ -3,10 +3,7 @@ tags = ['core'] ) }} -{% set model = this.identifier.split("_") [-1] %} -{{ streamline_external_table_query( - model = "blocks_tx", - partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER )", - partition_name = "_partition_by_block_id", - unique_key = "partition_key" +{{ streamline_external_table_query_v2( + model = "blocks_tx_v2", + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER)" ) }} diff --git a/models/bronze/core/bronze__streamline_transaction_batch.sql b/models/bronze/core/bronze__streamline_transaction_batch.sql index 40803cf..27cc688 100644 --- a/models/bronze/core/bronze__streamline_transaction_batch.sql +++ b/models/bronze/core/bronze__streamline_transaction_batch.sql @@ -2,9 +2,7 @@ materialized = 'view', tags = ['core'] ) }} -{{ streamline_external_table_query( - model = "transaction_batch", - partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER )", - partition_name = "_partition_by_block_id", - unique_key = "partition_key" +{{ streamline_external_table_query_v2( + model = "transaction_batch_v2", + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER)" ) }} diff --git a/models/sources.yml b/models/sources.yml index 28948b8..ee89eee 100644 --- a/models/sources.yml +++ b/models/sources.yml @@ -38,4 +38,9 @@ sources: - name: blocks_tx - name: transactions - name: transaction_batch + - name: blocks_tx_v2 + - name: transactions_v2 + - name: transaction_batch_v2 + + \ No newline at end of file diff --git a/models/streamline/core/complete/streamline__complete_blocks_tx_v2.sql b/models/streamline/core/complete/streamline__complete_blocks_tx_v2.sql new file mode 100644 index 0000000..cd6eedb --- /dev/null +++ b/models/streamline/core/complete/streamline__complete_blocks_tx_v2.sql @@ -0,0 +1,36 @@ +-- depends_on: {{ ref('bronze__streamline_blocks_tx') }} +-- depends_on: {{ ref('bronze__streamline_FR_blocks_tx') }} +{{ config ( + materialized = "incremental", + unique_key = "block_number", + cluster_by = "ROUND(block_number, -3)", + merge_update_columns = ["block_number"], + post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(block_number)" +) }} + +SELECT + DATA :result :block :header :height :: INT AS block_number, + {{ dbt_utils.generate_surrogate_key( + ['block_number'] + ) }} AS complete_blocks_id, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp, + _inserted_timestamp, + '{{ invocation_id }}' AS _invocation_id +FROM + +{% if is_incremental() %} +{{ ref('bronze__streamline_blocks_tx') }} +WHERE + _inserted_timestamp >= ( + SELECT + COALESCE(MAX(_INSERTED_TIMESTAMP), '1970-01-01' :: DATE) max_INSERTED_TIMESTAMP + FROM + {{ this }}) + {% else %} + {{ ref('bronze__streamline_FR_blocks_tx') }} + {% endif %} + + qualify(ROW_NUMBER() over (PARTITION BY block_number + ORDER BY + _inserted_timestamp DESC)) = 1 diff --git a/models/streamline/core/complete/streamline__complete_transaction_batch_v2.sql b/models/streamline/core/complete/streamline__complete_transaction_batch_v2.sql new file mode 100644 index 0000000..86c9b8a --- /dev/null +++ b/models/streamline/core/complete/streamline__complete_transaction_batch_v2.sql @@ -0,0 +1,39 @@ +-- depends_on: {{ ref('bronze__streamline_blocks_tx') }} +{{ config ( + materialized = "incremental", + unique_key = "block_number", + cluster_by = "ROUND(_partition_by_block_id, -3)", + post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(block_number)" +) }} +-- depends_on: {{ ref('bronze__streamline_transaction_batch') }} + +SELECT + block_number, + _partition_by_block_id, + A._inserted_timestamp +FROM + +{% if is_incremental() %} +{{ ref('bronze__streamline_transaction_batch') }} +{% else %} + {{ ref('bronze__streamline_FR_transaction_batch') }} +{% endif %} + +A +JOIN {{ ref('silver__blocks') }} +b +ON DATA [0] :version :: INT BETWEEN b.first_version +AND b.last_version + +{% if is_incremental() %} +WHERE + A._inserted_timestamp >= ( + SELECT + COALESCE(MAX(_INSERTED_TIMESTAMP), '1970-01-01' :: DATE) max_INSERTED_TIMESTAMP + FROM + {{ this }}) + {% endif %} + + qualify(ROW_NUMBER() over (PARTITION BY block_number + ORDER BY + A._inserted_timestamp DESC)) = 1 diff --git a/models/streamline/core/realtime/streamline__blocks_tx_realtime_v2.sql b/models/streamline/core/realtime/streamline__blocks_tx_realtime_v2.sql new file mode 100644 index 0000000..b7572f4 --- /dev/null +++ b/models/streamline/core/realtime/streamline__blocks_tx_realtime_v2.sql @@ -0,0 +1,62 @@ +{{ config ( + materialized = "view", + post_hook = fsc_utils.if_data_call_function_v2( + func = 'streamline.udf_bulk_rest_api_v2', + target = "{{this.schema}}.{{this.identifier}}", + params ={ "external_table" :"blocks_tx_v2", + "sql_limit" :"1200000", + "producer_batch_size" :"300000", + "worker_batch_size" :"50000", + "sql_source" :"{{this.identifier}}" } + ), + tags = ['streamline_core_realtime'] +) }} + +{% if execute %} + {% set next_batch_num_query %} + SELECT + greatest( + 251452492, + (SELECT coalesce(max(block_number),0) FROM {{ ref('streamline__complete_blocks_tx_v2') }}) + )+1 + {% endset %} + {% set next_batch_num = run_query(next_batch_num_query)[0][0] %} +{% endif %} + +WITH blocks AS ( + SELECT + block_number + FROM + {{ ref("streamline__blocks") }} + EXCEPT + SELECT + block_number + FROM + {{ ref("streamline__complete_blocks_tx") }} + WHERE + block_number <= 251452492 + EXCEPT + SELECT + block_number + FROM + {{ ref('streamline__complete_blocks_tx_v2') }} +) +SELECT + ROUND( + block_number, + -3 + ) :: INT AS partition_key, + aptos.live.udf_api( + 'GET', + '{service}/{Authentication}/v1/blocks/by_height/' || block_number || '?with_transactions=true', + object_construct( + 'Content-Type', + 'application/json', + 'token', + '{Authentication}' + ), + {}, + 'Vault/prod/aptos/node/mainnet' + ) AS request +FROM + blocks \ No newline at end of file diff --git a/models/streamline/core/realtime/streamline__transaction_batch_realtime_v2.sql b/models/streamline/core/realtime/streamline__transaction_batch_realtime_v2.sql new file mode 100644 index 0000000..9297920 --- /dev/null +++ b/models/streamline/core/realtime/streamline__transaction_batch_realtime_v2.sql @@ -0,0 +1,107 @@ +{{ config ( + materialized = "view", + post_hook = fsc_utils.if_data_call_function_v2( + func = 'streamline.udf_bulk_rest_api_v2', + target = "{{this.schema}}.{{this.identifier}}", + params ={ "external_table" :"transactions_batch_v2", + "sql_limit" :"1200000", + "producer_batch_size" :"300000", + "worker_batch_size" :"50000", + "sql_source" :"{{this.identifier}}" } + ) +) }} +-- depends_on: {{ ref('bronze__streamline_transaction_batch') }} + +--- to do:nothing done here yet +WITH blocks AS ( + + SELECT + A.block_number, + tx_count_from_versions -100 AS tx_count, + first_version + 100 version_start + FROM + {{ ref('silver__blocks') }} A + WHERE + tx_count_from_versions > 100 + AND ( + block_number >= 184014560 + OR block_number IN ( + 183537590, + 183564192, + 183587755, + 183587754, + 183666216 + ) + ) +), +numbers AS ( + -- Recursive CTE to generate numbers. We'll use the maximum txcount value to limit our recursion. + SELECT + 1 AS n + UNION ALL + SELECT + n + 1 + FROM + numbers + WHERE + n < ( + SELECT + CEIL(MAX(tx_count) / 100.0) + FROM + blocks) + ), + blocks_with_page_numbers AS ( + SELECT + tt.block_number :: INT AS block_number, + n.n - 1 AS multiplier, + version_start, + tx_count + FROM + blocks tt + JOIN numbers n + ON n.n <= CASE + WHEN tt.tx_count % 100 = 0 THEN tt.tx_count / 100 + ELSE FLOOR( + tt.tx_count / 100 + ) + 1 + END + ), + WORK AS ( + SELECT + A.block_number, + version_start +( + 100 * multiplier + ) AS tx_version + FROM + blocks_with_page_numbers A + LEFT JOIN {{ ref('streamline__complete_transaction_batch') }} + b + ON A.block_number = b.block_number + WHERE + b.block_number IS NULL + ), + calls AS ( + SELECT + '{service}/{Authentication}/v1/transactions?start=' || tx_version || '&limit=100' calls, + block_number + FROM + WORK + ) + SELECT + ARRAY_CONSTRUCT( + ROUND( + block_number, + -3 + ) :: INT, + ARRAY_CONSTRUCT( + 'GET', + calls, + PARSE_JSON('{}'), + PARSE_JSON('{}'), + '' + ) + ) AS request + FROM + calls + ORDER BY + block_number diff --git a/models/streamline/core/streamline__blocks.sql b/models/streamline/core/streamline__blocks.sql index b3c34c3..4be223a 100644 --- a/models/streamline/core/streamline__blocks.sql +++ b/models/streamline/core/streamline__blocks.sql @@ -3,13 +3,6 @@ tags = ['streamline_view'] ) }} -{% if execute %} - {% set height = run_query("SELECT live.udf_api( 'GET', 'https://fullnode.mainnet.aptoslabs.com/v1', OBJECT_CONSTRUCT( 'Content-Type', 'application/json' ),{} ):data:block_height::INT") %} - {% set block_height = height.columns [0].values() [0] %} -{% else %} - {% set block_height = 0 %} -{% endif %} - SELECT _id AS block_number FROM @@ -18,7 +11,9 @@ FROM 'number_sequence' ) }} WHERE - _id <= {{ block_height }} -UNION ALL -SELECT - 0 AS block_number + _id <= ( + SELECT + MAX(block_number) + FROM + {{ ref('streamline__chainhead') }} + ) \ No newline at end of file diff --git a/models/streamline/core/streamline__chainhead.sql b/models/streamline/core/streamline__chainhead.sql new file mode 100644 index 0000000..0e4d8d9 --- /dev/null +++ b/models/streamline/core/streamline__chainhead.sql @@ -0,0 +1,18 @@ +{{ config ( + materialized = "view", + tags = ['streamline_view'] +) }} + + SELECT + aptos.live.udf_api( + 'GET', + '{service}/{Authentication}/v1', + OBJECT_CONSTRUCT( + 'Content-Type', + 'application/json', + 'fsc-quantum-state', + 'livequery' + ), + OBJECT_CONSTRUCT(), + 'Vault/prod/aptos/node/mainnet' + ) :data:block_height :: INT AS block_number