udf_bulk_grpc success

This commit is contained in:
shah 2023-07-20 15:07:07 -07:00
parent cecced9db1
commit 43c77b39ec
10 changed files with 238 additions and 32 deletions

View File

@ -15,7 +15,8 @@ sl-flow-api:
--profiles-dir ~/.dbt/
udfs:
dbt run-operation create_udf_get_chainhead \
dbt run-operation create_udfs \
--vars '{"UPDATE_UDFS_AND_SPS":True}' \
--profile flow \
--target $(DBT_TARGET) \
--profiles-dir ~/.dbt/

1
analysis/test.sql Normal file
View File

@ -0,0 +1 @@
{{ create_udf_bulk_grpc() }}

View File

@ -2,11 +2,18 @@
{% if var("UPDATE_UDFS_AND_SPS") %}
{% if target.database != "FLOW_COMMUNITY_DEV" %}
{% set sql %}
{{ udf_bulk_get_topshot_moments_minted_metadata() }};
{{ udf_bulk_get_nfl_allday_moments_metadata() }};
{{ create_udtf_get_base_table(
schema = "streamline"
) }}
{{ create_udf_get_chainhead() }}
{{ create_udf_bulk_grpc() }}
{% endset %}
{% do run_query(sql) %}
{{- fsc_utils.create_udfs() -}}
{% endif %}
{% endif %}

View File

@ -26,33 +26,3 @@
{% set query_result = run_and_log_sql(sql) %}
{% endif %}
{% endmacro %}
-- macro used to run a sql query and log the results
-- TODO: Move this to fsc-utils package
{% macro run_and_log_sql(sql_query, log_level='info') %}
{% set result_var = 'result_' ~ sql_query[:8] %}
{% set log_message = 'Executing SQL query: ' ~ sql_query %}
{% do log(log_message,info=True) %}
{% set query_result = run_query(sql_query) %}
{% set result_str = query_result.columns[0].values()[0] if query_result.columns else None %}
{% set log_message = 'SQL query result: ' ~ result_str %}
{% do log(log_message, info=True) %}
{{ result_var }}
{% endmacro %}
-- macro used to select priveleges on all views/tables in a target chema to a role
{% macro grant_select(role) %}
{{ log("Granting privileges to role: " ~ role, info=True) }}
{% set sql %}
grant usage on schema {{ target.schema }} to role {{ role }};
grant select on all tables in schema {{ target.schema }} to role {{ role }};
grant select on all views in schema {{ target.schema }} to role {{ role }};
{% endset %}
{% do run_query(sql) %}
{% do log("Privileges granted", info=True) %}
{% endmacro %}

View File

@ -12,3 +12,18 @@
aws_flow_api_sbx AS 'https://bc5ejedoq8.execute-api.us-east-1.amazonaws.com/sbx/get_chainhead'
{%- endif %};
{% endmacro %}
{% macro create_udf_bulk_grpc() %}
{{ log("Creating udf udf_bulk_grpc for target:" ~ target.name ~ ", schema: " ~ target.schema, info=True) }}
{{ log("role:" ~ target.role ~ ", user:" ~ target.user, info=True) }}
CREATE
OR REPLACE EXTERNAL FUNCTION streamline.udf_bulk_grpc(json variant) returns variant api_integration =
{% if target.name == "prod" %}
aws_flow_api AS '<PROD_CHALICE_URL>/prod/get_chainhead'
{% elif target.name == "dev" %}
aws_flow_api_dev AS '<DEV_CHALICE_URL>/dev/get_chainhead'
{% elif target.name == "sbx" %}
{{ log("Creating sbx udf_bulk_grpc", info=True) }}
aws_flow_api_sbx AS 'https://bc5ejedoq8.execute-api.us-east-1.amazonaws.com/sbx/udf_bulk_grpc'
{%- endif %};
{% endmacro %}

108
macros/utils.sql Normal file
View File

@ -0,0 +1,108 @@
{% macro if_data_call_function(
func,
target
) %}
{% if var(
"STREAMLINE_INVOKE_STREAMS"
) %}
{% if execute %}
{{ log(
"Running macro `if_data_call_function`: Calling udf " ~ func ~ " on " ~ target,
True
) }}
{% endif %}
SELECT
{{ func }}
WHERE
EXISTS(
SELECT
1
FROM
{{ target }}
LIMIT
1
)
{% else %}
{% if execute %}
{{ log(
"Running macro `if_data_call_function`: NOOP",
False
) }}
{% endif %}
SELECT
NULL
{% endif %}
{% endmacro %}
{% macro if_data_call_wait() %}
{% if var(
"STREAMLINE_INVOKE_STREAMS"
) %}
{% set query %}
SELECT
1
WHERE
EXISTS(
SELECT
1
FROM
{{ model.schema ~ "." ~ model.alias }}
LIMIT
1
) {% endset %}
{% if execute %}
{% set results = run_query(
query
) %}
{% if results %}
{{ log(
"Waiting...",
info = True
) }}
{% set wait_query %}
SELECT
system$wait(
{{ var(
"WAIT",
600
) }}
) {% endset %}
{% do run_query(wait_query) %}
{% else %}
SELECT
NULL;
{% endif %}
{% endif %}
{% endif %}
{% endmacro %}
-- macro used to run a sql query and log the results
-- TODO: Move this to fsc-utils package
{% macro run_and_log_sql(sql_query, log_level='info') %}
{% set result_var = 'result_' ~ sql_query[:8] %}
{% set log_message = 'Executing SQL query: ' ~ sql_query %}
{% do log(log_message,info=True) %}
{% set query_result = run_query(sql_query) %}
{% set result_str = query_result.columns[0].values()[0] if query_result.columns else None %}
{% set log_message = 'SQL query result: ' ~ result_str %}
{% do log(log_message, info=True) %}
{{ result_var }}
{% endmacro %}
-- macro used to select priveleges on all views/tables in a target chema to a role
{% macro grant_select(role) %}
{{ log("Granting privileges to role: " ~ role, info=True) }}
{% set sql %}
grant usage on schema {{ target.schema }} to role {{ role }};
grant select on all tables in schema {{ target.schema }} to role {{ role }};
grant select on all views in schema {{ target.schema }} to role {{ role }};
{% endset %}
{% do run_query(sql) %}
{% do log("Privileges granted", info=True) %}
{% endmacro %}

View File

@ -0,0 +1,22 @@
# Setup Snowflake Api Integration & UDFS
## Setup Snowflake Api Integration
Use the [create_aws_flow_api()](../../macros/streamline/api_integrations.sql#2) macro to create the `streamline-flow` Snowflake API integration.
The
```zsh
DBT_TARGET=sbx make sl-flow-api
# This runs:
# dbt run-operation create_aws_flow_api \
# --profile flow \
# --target $(DBT_TARGET) \
# --profiles-dir ~/.dbt/
```
```zsh
# call sbx udf_bulk_grpc() to test the API integration
dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True, "STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES": True}' -m 1+models/silver/streamline/core/history/streamline__get_blocks_history.sql --profile flow --target sbx --profiles-dir ~/.dbt
```

View File

@ -0,0 +1,31 @@
{{ config (
materialized = "incremental",
unique_key = "record_id",
cluster_by = "ROUND(block_id, -3)",
merge_update_columns = ["record_id"],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(record_id)"
) }}
SELECT
record_id,
block_id,
_inserted_timestamp
FROM
{% if is_incremental() %}
{{ ref('bronze__blocks') }} -- TODO: change to bronze__streamline_blocks
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) _inserted_timestamp
FROM
{{ this }}
)
{% else %}
{{ ref('bronze__blocks') }} -- TODO: change to bronze__streamline_FR_blocks
{% endif %}
qualify(ROW_NUMBER() over (PARTITION BY record_id
ORDER BY
_inserted_timestamp DESC)) = 1

View File

@ -0,0 +1,34 @@
{{ config (
materialized = "view",
post_hook = if_data_call_function(
func = "{{this.schema}}.udf_bulk_grpc(object_construct('sql_source', '{{this.identifier}}','method', 'get_block_by_height', 'external_table', 'streamline_blocks', 'sql_limit', {{var('sql_limit','1000')}}, 'producer_batch_size', {{var('producer_batch_size','1000')}}, 'worker_batch_size', {{var('worker_batch_size','1000')}}, 'batch_call_limit', {{var('batch_call_limit','1')}}))",
target = "{{this.schema}}.{{this.identifier}}"
)
) }}
WITH blocks AS (
SELECT
block_height
FROM
{{ ref("streamline__blocks") }}
EXCEPT
SELECT
block_id as block_height
FROM
{{ ref("streamline__complete_get_blocks") }}
)
SELECT
PARSE_JSON(
CONCAT(
'{"grpc": "proto3",',
'"method": "get_block_by_height',
'"block_height":"',
block_height :: INTEGER,
'"}'
)
) AS request
FROM
blocks
ORDER BY
block_height ASC

View File

@ -0,0 +1,17 @@
{{ config (
materialized = "view",
tags = ['streamline_view']
) }}
{% if execute %}
{% set height = run_query('SELECT streamline.udf_get_chainhead()') %}
{% set block_height = height.columns[0].values()[0] %}
{% else %}
{% set block_height = 0 %}
{% endif %}
SELECT
height as block_height
FROM
TABLE(streamline.udtf_get_base_table({{block_height}}))