From 43c77b39ece327ff329be0fce434308b2f6ebd90 Mon Sep 17 00:00:00 2001 From: shah Date: Thu, 20 Jul 2023 15:07:07 -0700 Subject: [PATCH] udf_bulk_grpc success --- Makefile | 3 +- analysis/test.sql | 1 + macros/create_udfs.sql | 9 +- macros/streamline/api_integrations.sql | 30 ----- macros/streamline/streamline_udfs.sql | 15 +++ macros/utils.sql | 108 ++++++++++++++++++ models/silver/streamline/README.md | 22 ++++ .../streamline__complete_get_blocks.sql | 31 +++++ .../streamline__get_blocks_history.sql | 34 ++++++ .../streamline/core/streamline__blocks.sql | 17 +++ 10 files changed, 238 insertions(+), 32 deletions(-) create mode 100644 analysis/test.sql create mode 100644 macros/utils.sql create mode 100644 models/silver/streamline/README.md create mode 100644 models/silver/streamline/core/complete/streamline__complete_get_blocks.sql create mode 100644 models/silver/streamline/core/history/streamline__get_blocks_history.sql create mode 100644 models/silver/streamline/core/streamline__blocks.sql diff --git a/Makefile b/Makefile index e68f6e5..82c5eae 100644 --- a/Makefile +++ b/Makefile @@ -15,7 +15,8 @@ sl-flow-api: --profiles-dir ~/.dbt/ udfs: - dbt run-operation create_udf_get_chainhead \ + dbt run-operation create_udfs \ + --vars '{"UPDATE_UDFS_AND_SPS":True}' \ --profile flow \ --target $(DBT_TARGET) \ --profiles-dir ~/.dbt/ \ No newline at end of file diff --git a/analysis/test.sql b/analysis/test.sql new file mode 100644 index 0000000..99f7b86 --- /dev/null +++ b/analysis/test.sql @@ -0,0 +1 @@ +{{ create_udf_bulk_grpc() }} \ No newline at end of file diff --git a/macros/create_udfs.sql b/macros/create_udfs.sql index d0b4845..ff1d212 100644 --- a/macros/create_udfs.sql +++ b/macros/create_udfs.sql @@ -2,11 +2,18 @@ {% if var("UPDATE_UDFS_AND_SPS") %} {% if target.database != "FLOW_COMMUNITY_DEV" %} {% set sql %} + {{ udf_bulk_get_topshot_moments_minted_metadata() }}; {{ udf_bulk_get_nfl_allday_moments_metadata() }}; + + {{ create_udtf_get_base_table( + schema = "streamline" + ) }} + {{ create_udf_get_chainhead() }} + {{ create_udf_bulk_grpc() }} + {% endset %} {% do run_query(sql) %} - {{- fsc_utils.create_udfs() -}} {% endif %} {% endif %} diff --git a/macros/streamline/api_integrations.sql b/macros/streamline/api_integrations.sql index 37d8a16..0a8b913 100644 --- a/macros/streamline/api_integrations.sql +++ b/macros/streamline/api_integrations.sql @@ -26,33 +26,3 @@ {% set query_result = run_and_log_sql(sql) %} {% endif %} {% endmacro %} - --- macro used to run a sql query and log the results --- TODO: Move this to fsc-utils package -{% macro run_and_log_sql(sql_query, log_level='info') %} - {% set result_var = 'result_' ~ sql_query[:8] %} - - {% set log_message = 'Executing SQL query: ' ~ sql_query %} - {% do log(log_message,info=True) %} - - {% set query_result = run_query(sql_query) %} - {% set result_str = query_result.columns[0].values()[0] if query_result.columns else None %} - - {% set log_message = 'SQL query result: ' ~ result_str %} - {% do log(log_message, info=True) %} - - {{ result_var }} -{% endmacro %} - --- macro used to select priveleges on all views/tables in a target chema to a role -{% macro grant_select(role) %} - {{ log("Granting privileges to role: " ~ role, info=True) }} - {% set sql %} - grant usage on schema {{ target.schema }} to role {{ role }}; - grant select on all tables in schema {{ target.schema }} to role {{ role }}; - grant select on all views in schema {{ target.schema }} to role {{ role }}; - {% endset %} - - {% do run_query(sql) %} - {% do log("Privileges granted", info=True) %} -{% endmacro %} \ No newline at end of file diff --git a/macros/streamline/streamline_udfs.sql b/macros/streamline/streamline_udfs.sql index c8563dd..eeb9be8 100644 --- a/macros/streamline/streamline_udfs.sql +++ b/macros/streamline/streamline_udfs.sql @@ -12,3 +12,18 @@ aws_flow_api_sbx AS 'https://bc5ejedoq8.execute-api.us-east-1.amazonaws.com/sbx/get_chainhead' {%- endif %}; {% endmacro %} + +{% macro create_udf_bulk_grpc() %} + {{ log("Creating udf udf_bulk_grpc for target:" ~ target.name ~ ", schema: " ~ target.schema, info=True) }} + {{ log("role:" ~ target.role ~ ", user:" ~ target.user, info=True) }} + CREATE + OR REPLACE EXTERNAL FUNCTION streamline.udf_bulk_grpc(json variant) returns variant api_integration = + {% if target.name == "prod" %} + aws_flow_api AS '/prod/get_chainhead' + {% elif target.name == "dev" %} + aws_flow_api_dev AS '/dev/get_chainhead' + {% elif target.name == "sbx" %} + {{ log("Creating sbx udf_bulk_grpc", info=True) }} + aws_flow_api_sbx AS 'https://bc5ejedoq8.execute-api.us-east-1.amazonaws.com/sbx/udf_bulk_grpc' + {%- endif %}; +{% endmacro %} diff --git a/macros/utils.sql b/macros/utils.sql new file mode 100644 index 0000000..dca82a7 --- /dev/null +++ b/macros/utils.sql @@ -0,0 +1,108 @@ +{% macro if_data_call_function( + func, + target + ) %} + {% if var( + "STREAMLINE_INVOKE_STREAMS" + ) %} + {% if execute %} + {{ log( + "Running macro `if_data_call_function`: Calling udf " ~ func ~ " on " ~ target, + True + ) }} + {% endif %} + SELECT + {{ func }} + WHERE + EXISTS( + SELECT + 1 + FROM + {{ target }} + LIMIT + 1 + ) + {% else %} + {% if execute %} + {{ log( + "Running macro `if_data_call_function`: NOOP", + False + ) }} + {% endif %} + SELECT + NULL + {% endif %} +{% endmacro %} + +{% macro if_data_call_wait() %} + {% if var( + "STREAMLINE_INVOKE_STREAMS" + ) %} + {% set query %} + SELECT + 1 + WHERE + EXISTS( + SELECT + 1 + FROM + {{ model.schema ~ "." ~ model.alias }} + LIMIT + 1 + ) {% endset %} + {% if execute %} + {% set results = run_query( + query + ) %} + {% if results %} + {{ log( + "Waiting...", + info = True + ) }} + + {% set wait_query %} + SELECT + system$wait( + {{ var( + "WAIT", + 600 + ) }} + ) {% endset %} + {% do run_query(wait_query) %} + {% else %} + SELECT + NULL; + {% endif %} + {% endif %} + {% endif %} +{% endmacro %} + +-- macro used to run a sql query and log the results +-- TODO: Move this to fsc-utils package +{% macro run_and_log_sql(sql_query, log_level='info') %} + {% set result_var = 'result_' ~ sql_query[:8] %} + + {% set log_message = 'Executing SQL query: ' ~ sql_query %} + {% do log(log_message,info=True) %} + + {% set query_result = run_query(sql_query) %} + {% set result_str = query_result.columns[0].values()[0] if query_result.columns else None %} + + {% set log_message = 'SQL query result: ' ~ result_str %} + {% do log(log_message, info=True) %} + + {{ result_var }} +{% endmacro %} + +-- macro used to select priveleges on all views/tables in a target chema to a role +{% macro grant_select(role) %} + {{ log("Granting privileges to role: " ~ role, info=True) }} + {% set sql %} + grant usage on schema {{ target.schema }} to role {{ role }}; + grant select on all tables in schema {{ target.schema }} to role {{ role }}; + grant select on all views in schema {{ target.schema }} to role {{ role }}; + {% endset %} + + {% do run_query(sql) %} + {% do log("Privileges granted", info=True) %} +{% endmacro %} \ No newline at end of file diff --git a/models/silver/streamline/README.md b/models/silver/streamline/README.md new file mode 100644 index 0000000..f4f7525 --- /dev/null +++ b/models/silver/streamline/README.md @@ -0,0 +1,22 @@ +# Setup Snowflake Api Integration & UDFS + +## Setup Snowflake Api Integration + +Use the [create_aws_flow_api()](../../macros/streamline/api_integrations.sql#2) macro to create the `streamline-flow` Snowflake API integration. + +The + +```zsh +DBT_TARGET=sbx make sl-flow-api + +# This runs: +# dbt run-operation create_aws_flow_api \ +# --profile flow \ +# --target $(DBT_TARGET) \ +# --profiles-dir ~/.dbt/ +``` + +```zsh +# call sbx udf_bulk_grpc() to test the API integration +dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True, "STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES": True}' -m 1+models/silver/streamline/core/history/streamline__get_blocks_history.sql --profile flow --target sbx --profiles-dir ~/.dbt +``` \ No newline at end of file diff --git a/models/silver/streamline/core/complete/streamline__complete_get_blocks.sql b/models/silver/streamline/core/complete/streamline__complete_get_blocks.sql new file mode 100644 index 0000000..cea3dbd --- /dev/null +++ b/models/silver/streamline/core/complete/streamline__complete_get_blocks.sql @@ -0,0 +1,31 @@ +{{ config ( + materialized = "incremental", + unique_key = "record_id", + cluster_by = "ROUND(block_id, -3)", + merge_update_columns = ["record_id"], + post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(record_id)" +) }} + +SELECT + record_id, + block_id, + _inserted_timestamp +FROM + +{% if is_incremental() %} +{{ ref('bronze__blocks') }} -- TODO: change to bronze__streamline_blocks +WHERE + _inserted_timestamp >= ( + SELECT + MAX(_inserted_timestamp) _inserted_timestamp + FROM + {{ this }} + ) + +{% else %} + {{ ref('bronze__blocks') }} -- TODO: change to bronze__streamline_FR_blocks +{% endif %} + +qualify(ROW_NUMBER() over (PARTITION BY record_id +ORDER BY + _inserted_timestamp DESC)) = 1 \ No newline at end of file diff --git a/models/silver/streamline/core/history/streamline__get_blocks_history.sql b/models/silver/streamline/core/history/streamline__get_blocks_history.sql new file mode 100644 index 0000000..246cd36 --- /dev/null +++ b/models/silver/streamline/core/history/streamline__get_blocks_history.sql @@ -0,0 +1,34 @@ +{{ config ( + materialized = "view", + post_hook = if_data_call_function( + func = "{{this.schema}}.udf_bulk_grpc(object_construct('sql_source', '{{this.identifier}}','method', 'get_block_by_height', 'external_table', 'streamline_blocks', 'sql_limit', {{var('sql_limit','1000')}}, 'producer_batch_size', {{var('producer_batch_size','1000')}}, 'worker_batch_size', {{var('worker_batch_size','1000')}}, 'batch_call_limit', {{var('batch_call_limit','1')}}))", + target = "{{this.schema}}.{{this.identifier}}" + ) +) }} + +WITH blocks AS ( + + SELECT + block_height + FROM + {{ ref("streamline__blocks") }} + EXCEPT + SELECT + block_id as block_height + FROM + {{ ref("streamline__complete_get_blocks") }} +) +SELECT + PARSE_JSON( + CONCAT( + '{"grpc": "proto3",', + '"method": "get_block_by_height', + '"block_height":"', + block_height :: INTEGER, + '"}' + ) + ) AS request +FROM + blocks +ORDER BY + block_height ASC diff --git a/models/silver/streamline/core/streamline__blocks.sql b/models/silver/streamline/core/streamline__blocks.sql new file mode 100644 index 0000000..ae6884b --- /dev/null +++ b/models/silver/streamline/core/streamline__blocks.sql @@ -0,0 +1,17 @@ +{{ config ( + materialized = "view", + tags = ['streamline_view'] +) }} + + +{% if execute %} +{% set height = run_query('SELECT streamline.udf_get_chainhead()') %} +{% set block_height = height.columns[0].values()[0] %} +{% else %} +{% set block_height = 0 %} +{% endif %} + +SELECT + height as block_height +FROM + TABLE(streamline.udtf_get_base_table({{block_height}})) \ No newline at end of file