From fe7f688cc2cdf60bb1842ab14192368522000f49 Mon Sep 17 00:00:00 2001 From: Shah Newaz Khan Date: Thu, 10 Aug 2023 14:23:29 -0700 Subject: [PATCH] Stream 98 collection backfill (#145) * node url join optimized | warehouse privilege grants * added string interpolation for env based target.database * revert to candidate node 7 root height * collections history success * removed end height from network_version * removed EOF * added prod udf --------- Co-authored-by: shah --- macros/streamline/api_integrations.sql | 10 ++-- macros/streamline/get_base_table_udft.sql | 37 +++++--------- macros/streamline/models.sql | 43 ++++++++++++++++ macros/streamline/streamline_udfs.sql | 8 +-- macros/streamline/utils.sql | 29 +++++++++++ macros/utils.sql | 11 ++++- models/silver/streamline/README.md | 28 +++++++++++ .../bronze/core/bronze__streamline_blocks.sql | 11 +++++ .../core/bronze__streamline_collections.sql | 11 +++++ .../core/bronze__streamline_transactions.sql | 11 +++++ .../streamline__complete_get_blocks.sql | 24 ++++----- .../streamline__complete_get_collections.sql | 34 +++++++++++++ .../streamline__complete_get_transactions.sql | 34 +++++++++++++ ...reamline__get_blocks_history_mainnet22.sql | 28 +++++++++++ ...ine__get_collections_history_mainnet22.sql | 39 +++++++++++++++ .../streamline__get_blocks_history.sql | 39 --------------- ...ne__get_transactions_history_mainnet22.sql | 49 +++++++++++++++++++ .../streamline/core/streamline__blocks.sql | 5 +- models/sources.yml | 7 ++- 19 files changed, 368 insertions(+), 90 deletions(-) create mode 100644 macros/streamline/models.sql create mode 100644 macros/streamline/utils.sql create mode 100644 models/silver/streamline/bronze/core/bronze__streamline_blocks.sql create mode 100644 models/silver/streamline/bronze/core/bronze__streamline_collections.sql create mode 100644 models/silver/streamline/bronze/core/bronze__streamline_transactions.sql create mode 100644 models/silver/streamline/core/complete/streamline__complete_get_collections.sql create mode 100644 models/silver/streamline/core/complete/streamline__complete_get_transactions.sql create mode 100644 models/silver/streamline/core/history/blocks/streamline__get_blocks_history_mainnet22.sql create mode 100644 models/silver/streamline/core/history/collections/streamline__get_collections_history_mainnet22.sql delete mode 100644 models/silver/streamline/core/history/streamline__get_blocks_history.sql create mode 100644 models/silver/streamline/core/history/transactions/streamline__get_transactions_history_mainnet22.sql diff --git a/macros/streamline/api_integrations.sql b/macros/streamline/api_integrations.sql index 0a8b913..b5382f7 100644 --- a/macros/streamline/api_integrations.sql +++ b/macros/streamline/api_integrations.sql @@ -1,17 +1,17 @@ -- macro used to create flow api integrations {% macro create_aws_flow_api() %} - {{ log("Creating integration for target:" ~ target, info=True) }} {% if target.name == "prod" %} {% set sql %} - CREATE api integration IF NOT EXISTS aws_flow_api_prod api_provider = aws_api_gateway api_aws_role_arn = 'arn:aws:iam::490041342817:role/snowflake-api-flow' api_allowed_prefixes = ( - 'https:///prod/' + CREATE api integration IF NOT EXISTS aws_flow_api_prod api_provider = aws_api_gateway api_aws_role_arn = 'arn:aws:iam::490041342817:role/flow-api-prod-rolesnowflakeudfsAF733095-FNY67ODG1RFG' api_allowed_prefixes = ( + 'https://quxfxtl934.execute-api.us-east-1.amazonaws.com/prod/' ) enabled = TRUE; {% endset %} {% do run_query(sql) %} {% elif target.name == "dev" %} + {{ log("Generating api integration for target:" ~ target.name, info=True) }} {% set sql %} - CREATE api integration IF NOT EXISTS aws_flow_api_dev api_provider = aws_api_gateway api_aws_role_arn = 'arn:aws:iam::490041342817:role/snowflake-api-flow' api_allowed_prefixes = ( - 'https:///dev/' + CREATE api integration IF NOT EXISTS aws_flow_api_dev_2 api_provider = aws_api_gateway api_aws_role_arn = 'arn:aws:iam::490041342817:role/flow-api-dev-rolesnowflakeudfsAF733095-1D0U05G1EDT3' api_allowed_prefixes = ( + 'https://8jjulyhxhj.execute-api.us-east-1.amazonaws.com/dev/' ) enabled = TRUE; {% endset %} {% do run_query(sql) %} diff --git a/macros/streamline/get_base_table_udft.sql b/macros/streamline/get_base_table_udft.sql index 0b96ad7..57cc6f6 100644 --- a/macros/streamline/get_base_table_udft.sql +++ b/macros/streamline/get_base_table_udft.sql @@ -1,34 +1,21 @@ {% macro create_udtf_get_base_table(schema) %} create or replace function {{ schema }}.udtf_get_base_table(max_height integer) -returns table (height number, node_url varchar(16777216)) +returns table (height number) as $$ - with base as ( - select - row_number() over ( - order by - seq4() - ) as id - from - table(generator(rowcount => 100000000)) -- July 2023 Flow Chain head is at 57M - ), - node_mapping as ( - select - base.id as height, - first_value(nv.node_url) over (partition by base.id order by nv.root_height desc) as node_url - from - base - left join {{ target.database }}.seeds.network_version nv - on - base.id >= nv.root_height - and - base.id <= max_height - ) +with base as ( + select + row_number() over ( + order by + seq4() + ) as height + from + table(generator(rowcount => 100000000)) +) select - height, - coalesce(node_url, 'access.mainnet.nodes.onflow.org:9000') as node_url + height from - node_mapping + base where height <= max_height $$ diff --git a/macros/streamline/models.sql b/macros/streamline/models.sql new file mode 100644 index 0000000..59e80ea --- /dev/null +++ b/macros/streamline/models.sql @@ -0,0 +1,43 @@ +{% macro streamline_external_table_query( + model, + partition_function, + partition_name, + unique_key + ) %} + WITH meta AS ( + SELECT + last_modified AS _inserted_timestamp, + file_name, + {{ partition_function }} AS {{ partition_name }} + FROM + TABLE( + information_schema.external_table_file_registration_history( + start_time => DATEADD('day', -3, CURRENT_TIMESTAMP()), + table_name => '{{ source( "bronze_streamline", model) }}') + ) A + ) + SELECT + {{ unique_key }}, + DATA, + _inserted_timestamp, + MD5( + CAST( + COALESCE(CAST({{ unique_key }} AS text), '' :: STRING) AS text + ) + ) AS id, + s.{{ partition_name }}, + s.value AS VALUE + FROM + {{ source( + "bronze_streamline", + model + ) }} + s + JOIN meta b + ON b.file_name = metadata$filename + AND b.{{ partition_name }} = s.{{ partition_name }} + WHERE + b.{{ partition_name }} = s.{{ partition_name }} +{% endmacro %} + + diff --git a/macros/streamline/streamline_udfs.sql b/macros/streamline/streamline_udfs.sql index 890aa5c..dfba6ae 100644 --- a/macros/streamline/streamline_udfs.sql +++ b/macros/streamline/streamline_udfs.sql @@ -4,9 +4,9 @@ CREATE OR REPLACE EXTERNAL FUNCTION streamline.udf_get_chainhead() returns variant api_integration = {% if target.name == "prod" %} - aws_flow_api AS '/prod/get_chainhead' + aws_flow_api AS 'https://quxfxtl934.execute-api.us-east-1.amazonaws.com/prod/get_chainhead' {% elif target.name == "dev" %} - aws_flow_api_dev AS '/dev/get_chainhead' + aws_flow_api_dev_2 AS 'https://8jjulyhxhj.execute-api.us-east-1.amazonaws.com/dev/get_chainhead' {% elif target.name == "sbx" %} {{ log("Creating sbx get_chainhead", info=True) }} aws_flow_api_sbx AS 'https://bc5ejedoq8.execute-api.us-east-1.amazonaws.com/sbx/get_chainhead' @@ -19,9 +19,9 @@ CREATE OR REPLACE EXTERNAL FUNCTION streamline.udf_bulk_grpc(json variant) returns variant api_integration = {% if target.name == "prod" %} - aws_flow_api AS '/prod/udf_bulk_grpc' + aws_flow_api AS 'https://quxfxtl934.execute-api.us-east-1.amazonaws.com/prod/udf_bulk_grpc' {% elif target.name == "dev" %} - aws_flow_api_dev AS '/dev/udf_bulk_grpc' + aws_flow_api_dev_2 AS 'https://8jjulyhxhj.execute-api.us-east-1.amazonaws.com/dev/udf_bulk_grpc' {% elif target.name == "sbx" %} {{ log("Creating sbx udf_bulk_grpc", info=True) }} aws_flow_api_sbx AS 'https://bc5ejedoq8.execute-api.us-east-1.amazonaws.com/sbx/udf_bulk_grpc' diff --git a/macros/streamline/utils.sql b/macros/streamline/utils.sql new file mode 100644 index 0000000..b66b028 --- /dev/null +++ b/macros/streamline/utils.sql @@ -0,0 +1,29 @@ +{% macro generate_blocks_grpc_request(block_height) %} + PARSE_JSON( + CONCAT( + '{"grpc": "proto3",', + '"method": "get_block_by_height",', + '"block_height":"', + block_height :: INTEGER, + '",', + '"method_params": {"height":', + block_height :: INTEGER, + '}}' + ) + ) +{% endmacro %} + +{% macro generate_collections_grpc_request(block_height, collection_guarantee) %} + PARSE_JSON( + CONCAT( + '{"grpc": "proto3",', + '"method": "get_collection_by_i_d",', + '"block_height":"', + block_height :: INTEGER, + '",', + '"method_params": {"id":"', + collection_guarantee.value:collection_id, + '"}}' + ) + ) +{% endmacro %} \ No newline at end of file diff --git a/macros/utils.sql b/macros/utils.sql index 468236f..c23c75a 100644 --- a/macros/utils.sql +++ b/macros/utils.sql @@ -100,11 +100,18 @@ {% set sql %} grant usage on database {{ target.database }} to role {{ role }}; grant usage on schema {{ target.schema }} to role {{ role }}; + grant usage on warehouse {{ target.warehouse }} to role {{ role }}; grant select on all tables in schema {{ target.schema }} to role {{ role }}; grant select on all views in schema {{ target.schema }} to role {{ role }}; - grant select on future views in schema {{ target.schema }} to role {{ role }} + grant select on future views in schema {{ target.schema }} to role {{ role }}; + + grant usage on database streamline to role {{ role }}; + grant usage on schema streamline.{{ target.schema }} to role {{ role }}; + grant select on all tables in schema streamline.{{ target.schema }} to role {{ role }}; + + grant usage on schema {{target.database}}.bronze to role {{ role }}; {% endset %} - {% do run_query(sql) %} + {% do run_and_log_sql(sql) %} {% do log("Privileges granted", info=True) %} {% endmacro %} \ No newline at end of file diff --git a/models/silver/streamline/README.md b/models/silver/streamline/README.md index 48ebe4a..6f7ce85 100644 --- a/models/silver/streamline/README.md +++ b/models/silver/streamline/README.md @@ -21,3 +21,31 @@ DBT_TARGET=sbx make sl-flow-api dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True, "STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES": True}' -m 1+models/silver/streamline/core/history/streamline__get_blocks_history.sql --profile flow --target sbx --profiles-dir ~/.dbt ``` + +```zsh +# dev bronze__streamline_blocks.sql +dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True, "STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES": True}' -m 1+models/silver/streamline/bronze/core/bronze__streamline_blocks.sql --profile flow --target dev --profiles-dir ~/.dbt +``` + +```zsh +# dev complete_get_blocks +dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True, "STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES": True}' -m 1+models/silver/streamline/core/complete/streamline__complete_get_blocks.sql --profile flow --target dev --profiles-dir ~/.dbt + +# dev complete_get_collections +dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True, "STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES": True}' -m 1+models/silver/streamline/core/complete/streamline__complete_get_collections.sql --profile flow --target dev --profiles-dir ~/.dbt + +# dev complete_get_transactions +dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True, "STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES": True}' -m 1+models/silver/streamline/core/complete/streamline__complete_get_transactionss.sql --profile flow --target dev --profiles-dir ~/.dbt + +-- + + +# dev get_blocks_history +dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True, "STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES": True}' -m 1+models/silver/streamline/core/history/blocks/streamline__get_blocks_history.sql --profile flow --target dev --profiles-dir ~/.dbt + +# dev get_transactions_history +dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True, "STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES": True}' -m 1+models/silver/streamline/core/history/transactions/streamline__get_transactions_history_mainnet22.sql --profile flow --target dev --profiles-dir ~/.dbt + +# dev complete_get_collections +dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True, "STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES": True}' -m 1+models/silver/streamline/core/complete/streamline__complete_get_collections.sql --profile flow --target dev --profiles-dir ~/.dbt +``` \ No newline at end of file diff --git a/models/silver/streamline/bronze/core/bronze__streamline_blocks.sql b/models/silver/streamline/bronze/core/bronze__streamline_blocks.sql new file mode 100644 index 0000000..22b0c51 --- /dev/null +++ b/models/silver/streamline/bronze/core/bronze__streamline_blocks.sql @@ -0,0 +1,11 @@ +{{ config ( + materialized = 'view' +) }} + +{% set model = this.identifier.split("_") [-1] %} +{{ streamline_external_table_query( + model, + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)", + partition_name = "_partition_by_block_id", + unique_key = "block_number" +) }} diff --git a/models/silver/streamline/bronze/core/bronze__streamline_collections.sql b/models/silver/streamline/bronze/core/bronze__streamline_collections.sql new file mode 100644 index 0000000..22b0c51 --- /dev/null +++ b/models/silver/streamline/bronze/core/bronze__streamline_collections.sql @@ -0,0 +1,11 @@ +{{ config ( + materialized = 'view' +) }} + +{% set model = this.identifier.split("_") [-1] %} +{{ streamline_external_table_query( + model, + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)", + partition_name = "_partition_by_block_id", + unique_key = "block_number" +) }} diff --git a/models/silver/streamline/bronze/core/bronze__streamline_transactions.sql b/models/silver/streamline/bronze/core/bronze__streamline_transactions.sql new file mode 100644 index 0000000..22b0c51 --- /dev/null +++ b/models/silver/streamline/bronze/core/bronze__streamline_transactions.sql @@ -0,0 +1,11 @@ +{{ config ( + materialized = 'view' +) }} + +{% set model = this.identifier.split("_") [-1] %} +{{ streamline_external_table_query( + model, + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)", + partition_name = "_partition_by_block_id", + unique_key = "block_number" +) }} diff --git a/models/silver/streamline/core/complete/streamline__complete_get_blocks.sql b/models/silver/streamline/core/complete/streamline__complete_get_blocks.sql index 8215ab8..868c558 100644 --- a/models/silver/streamline/core/complete/streamline__complete_get_blocks.sql +++ b/models/silver/streamline/core/complete/streamline__complete_get_blocks.sql @@ -1,20 +1,22 @@ +-- depends_on: {{ ref('bronze__streamline_blocks') }} {{ config ( materialized = "incremental", - unique_key = "record_id", - cluster_by = "ROUND(block_id, -3)", - merge_update_columns = ["record_id"], - post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(record_id)" + unique_key = "id", + cluster_by = "ROUND(block_number, -3)", + merge_update_columns = ["id"], + post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(id)" ) }} SELECT - record_id, - block_id, - _inserted_timestamp, - network AS node_url + id, + data, + block_number, + _partition_by_block_id, + _inserted_timestamp FROM {% if is_incremental() %} -{{ ref('bronze__blocks') }} -- TODO: change to bronze__streamline_blocks +{{ ref('bronze__streamline_blocks') }} WHERE _inserted_timestamp >= ( SELECT @@ -24,9 +26,9 @@ WHERE ) {% else %} - {{ ref('bronze__blocks') }} -- TODO: change to bronze__streamline_FR_blocks + {{ ref('bronze__streamline_blocks') }} -- TODO: change to bronze__streamline_FR_blocks {% endif %} -qualify(ROW_NUMBER() over (PARTITION BY record_id +qualify(ROW_NUMBER() over (PARTITION BY _partition_by_block_id ORDER BY _inserted_timestamp DESC)) = 1 \ No newline at end of file diff --git a/models/silver/streamline/core/complete/streamline__complete_get_collections.sql b/models/silver/streamline/core/complete/streamline__complete_get_collections.sql new file mode 100644 index 0000000..dd3d86a --- /dev/null +++ b/models/silver/streamline/core/complete/streamline__complete_get_collections.sql @@ -0,0 +1,34 @@ +-- depends_on: {{ ref('bronze__streamline_collections') }} +{{ config ( + materialized = "incremental", + unique_key = "id", + cluster_by = "ROUND(block_number, -3)", + merge_update_columns = ["id"], + post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(id)" +) }} + +SELECT + id, + data, + block_number, + _partition_by_block_id, + _inserted_timestamp +FROM + +{% if is_incremental() %} +{{ ref('bronze__streamline_collections') }} +WHERE + _inserted_timestamp >= ( + SELECT + MAX(_inserted_timestamp) _inserted_timestamp + FROM + {{ this }} + ) + +{% else %} + {{ ref('bronze__streamline_collections') }} -- TODO: change to bronze__streamline_FR_collections +{% endif %} + +qualify(ROW_NUMBER() over (PARTITION BY _partition_by_block_id +ORDER BY + _inserted_timestamp DESC)) = 1 \ No newline at end of file diff --git a/models/silver/streamline/core/complete/streamline__complete_get_transactions.sql b/models/silver/streamline/core/complete/streamline__complete_get_transactions.sql new file mode 100644 index 0000000..9ee00ad --- /dev/null +++ b/models/silver/streamline/core/complete/streamline__complete_get_transactions.sql @@ -0,0 +1,34 @@ +-- depends_on: {{ ref('bronze__streamline_transactions') }} +{{ config ( + materialized = "incremental", + unique_key = "id", + cluster_by = "ROUND(block_number, -3)", + merge_update_columns = ["id"], + post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(id)" +) }} + +SELECT + id, + data, + block_number, + _partition_by_block_id, + _inserted_timestamp +FROM + +{% if is_incremental() %} +{{ ref('bronze__streamline_transactions') }} +WHERE + _inserted_timestamp >= ( + SELECT + MAX(_inserted_timestamp) _inserted_timestamp + FROM + {{ this }} + ) + +{% else %} + {{ ref('bronze__streamline_transactions') }} -- TODO: change to bronze__streamline_FR_transactions +{% endif %} + +qualify(ROW_NUMBER() over (PARTITION BY _partition_by_block_id +ORDER BY + _inserted_timestamp DESC)) = 1 \ No newline at end of file diff --git a/models/silver/streamline/core/history/blocks/streamline__get_blocks_history_mainnet22.sql b/models/silver/streamline/core/history/blocks/streamline__get_blocks_history_mainnet22.sql new file mode 100644 index 0000000..ea1704e --- /dev/null +++ b/models/silver/streamline/core/history/blocks/streamline__get_blocks_history_mainnet22.sql @@ -0,0 +1,28 @@ +{{ config ( + materialized = "view", + post_hook = if_data_call_function( + func = "{{this.schema}}.udf_bulk_grpc(object_construct('sql_source', '{{this.identifier}}','method', 'get_block_by_height','node_url','access-001.mainnet22.nodes.onflow.org:9000','external_table', 'blocks', 'sql_limit', {{var('sql_limit','100000')}}, 'producer_batch_size', {{var('producer_batch_size','10000')}}, 'worker_batch_size', {{var('worker_batch_size','1000')}}, 'batch_call_limit', {{var('batch_call_limit','1')}}))", + target = "{{this.schema}}.{{this.identifier}}" + ) +) }} + +WITH blocks AS ( + + SELECT + block_height + FROM + {{ ref("streamline__blocks") }} + EXCEPT + SELECT + block_number as block_height + FROM + {{ ref("streamline__complete_get_blocks") }} +) +SELECT + {{ generate_blocks_grpc_request(block_height) }} AS request +FROM + blocks +WHERE + block_height BETWEEN 47169687 AND 55114466 -- Mainnet22 block range +ORDER BY + block_height ASC diff --git a/models/silver/streamline/core/history/collections/streamline__get_collections_history_mainnet22.sql b/models/silver/streamline/core/history/collections/streamline__get_collections_history_mainnet22.sql new file mode 100644 index 0000000..baf883b --- /dev/null +++ b/models/silver/streamline/core/history/collections/streamline__get_collections_history_mainnet22.sql @@ -0,0 +1,39 @@ +-- depends_on: {{ ref('streamline__get_blocks_history_mainnet22') }} +{{ config ( + materialized = "view", + post_hook = if_data_call_function( + func = "{{this.schema}}.udf_bulk_grpc(object_construct('sql_source', '{{this.identifier}}','method', 'get_collection_by_i_d','node_url','access-001.mainnet22.nodes.onflow.org:9000','external_table', 'collections', 'sql_limit', {{var('sql_limit','10000')}}, 'producer_batch_size', {{var('producer_batch_size','10000')}}, 'worker_batch_size', {{var('worker_batch_size','10000')}}, 'batch_call_limit', {{var('batch_call_limit','1')}}))", + target = "{{this.schema}}.{{this.identifier}}" + ) +) }} + +WITH blocks AS ( + + SELECT + block_height + FROM + {{ ref("streamline__blocks") }} + EXCEPT + SELECT + block_number as block_height + FROM + {{ ref("streamline__complete_get_collections") }} +), +collections AS ( + + SELECT + block_number as block_height, + data + FROM + {{ ref('streamline__complete_get_blocks') }} + JOIN blocks ON blocks.block_height = block_number +) +SELECT + {{ generate_collections_grpc_request(block_height, collection_guarantee) }} AS request +FROM + collections, + LATERAL FLATTEN(input => data:collection_guarantees) AS collection_guarantee +WHERE + block_height BETWEEN 47169687 AND 55114466 -- Mainnet22 block range +ORDER BY + block_height ASC diff --git a/models/silver/streamline/core/history/streamline__get_blocks_history.sql b/models/silver/streamline/core/history/streamline__get_blocks_history.sql deleted file mode 100644 index 3f2869c..0000000 --- a/models/silver/streamline/core/history/streamline__get_blocks_history.sql +++ /dev/null @@ -1,39 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = if_data_call_function( - func = "{{this.schema}}.udf_bulk_grpc(object_construct('sql_source', '{{this.identifier}}','method', 'get_block_by_height', 'external_table', 'streamline_blocks', 'sql_limit', {{var('sql_limit','1000')}}, 'producer_batch_size', {{var('producer_batch_size','1000')}}, 'worker_batch_size', {{var('worker_batch_size','1000')}}, 'batch_call_limit', {{var('batch_call_limit','1')}}))", - target = "{{this.schema}}.{{this.identifier}}" - ) -) }} - -WITH blocks AS ( - - SELECT - block_height, - node_url - FROM - {{ ref("streamline__blocks") }} - EXCEPT - SELECT - block_id as block_height, - node_url - FROM - {{ ref("streamline__complete_get_blocks") }} -) -SELECT - PARSE_JSON( - CONCAT( - '{"grpc": "proto3",', - '"method": "get_block_by_height",', - '"block_height":"', - block_height :: INTEGER, - '",', - '"node_url":"', - node_url, - '"}' - ) - ) AS request -FROM - blocks -ORDER BY - block_height ASC diff --git a/models/silver/streamline/core/history/transactions/streamline__get_transactions_history_mainnet22.sql b/models/silver/streamline/core/history/transactions/streamline__get_transactions_history_mainnet22.sql new file mode 100644 index 0000000..c4a3b37 --- /dev/null +++ b/models/silver/streamline/core/history/transactions/streamline__get_transactions_history_mainnet22.sql @@ -0,0 +1,49 @@ +{{ config ( + materialized = "view", + post_hook = if_data_call_function( + func = "{{this.schema}}.udf_bulk_grpc(object_construct('sql_source', '{{this.identifier}}','method', 'get_transaction','node_url','access-001.mainnet22.nodes.onflow.org:9000','external_table', 'transactions', 'sql_limit', {{var('sql_limit','10000')}}, 'producer_batch_size', {{var('producer_batch_size','10000')}}, 'worker_batch_size', {{var('worker_batch_size','10000')}}, 'batch_call_limit', {{var('batch_call_limit','1')}}))", + target = "{{this.schema}}.{{this.identifier}}" + ) +) }} + +WITH blocks AS ( + + SELECT + block_height + FROM + {{ ref("streamline__blocks") }} + EXCEPT + SELECT + block_number as block_height + FROM + {{ ref("streamline__complete_get_transactions") }} +), +tx AS ( + + SELECT + block_number as block_height, + data + FROM + {{ ref('bronze__streamline_collections') }} + JOIN blocks ON blocks.block_height = block_number +) +SELECT + PARSE_JSON( + CONCAT( + '{"grpc": "proto3",', + '"method": "get_collection_by_i_d",', + '"block_height":"', + block_height :: INTEGER, + '",', + '"method_params": {"id":"', + transaction_ids, + '"}}' + ) + ) AS request +FROM + tx, + LATERAL FLATTEN(input => data:transaction_ids) AS transaction_ids +WHERE + block_height BETWEEN 47169687 AND 55114466 -- Mainnet22 block range +ORDER BY + block_height ASC diff --git a/models/silver/streamline/core/streamline__blocks.sql b/models/silver/streamline/core/streamline__blocks.sql index 71dc03a..971bfad 100644 --- a/models/silver/streamline/core/streamline__blocks.sql +++ b/models/silver/streamline/core/streamline__blocks.sql @@ -12,12 +12,11 @@ {% endif %} SELECT - height as block_height, - node_url + height as block_height FROM TABLE(streamline.udtf_get_base_table({{block_height}})) WHERE block_height > 4132133 -- Root Height for Candidate node 7 - -- the earliest available block we can ingest since earlier candidat nodes + -- the earliest available block we can ingest since earlier candidate nodes -- do not have the get_block_by_height grpc method -- https://developers.flow.com/concepts/nodes/node-operation/past-sporks#candidate-4 diff --git a/models/sources.yml b/models/sources.yml index df49320..29a6262 100644 --- a/models/sources.yml +++ b/models/sources.yml @@ -23,9 +23,13 @@ sources: - name: bronze_streamline database: streamline - schema: flow + schema: | + {{ "FLOW_DEV" if var("STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES", False) else "FLOW" }} tables: - name: moments_minted_metadata_api + - name: blocks + - name: collections + - name: transactions - name: crosschain_v2 database: crosschain @@ -33,3 +37,4 @@ sources: tables: - name: hourly_prices_coin_gecko - name: hourly_prices_coin_market_cap +