mirror of
https://github.com/FlipsideCrypto/flow-models.git
synced 2026-02-06 13:26:44 +00:00
Stream 97 streamline blocks (#129)
* udf_bulk_grpc success * removed test analysis sql --------- Co-authored-by: shah <info@shahnewazkhan.ca>
This commit is contained in:
parent
a06fac90ba
commit
e39845bc3d
3
Makefile
3
Makefile
@ -15,7 +15,8 @@ sl-flow-api:
|
||||
--profiles-dir ~/.dbt/
|
||||
|
||||
udfs:
|
||||
dbt run-operation create_udf_get_chainhead \
|
||||
dbt run-operation create_udfs \
|
||||
--vars '{"UPDATE_UDFS_AND_SPS":True}' \
|
||||
--profile flow \
|
||||
--target $(DBT_TARGET) \
|
||||
--profiles-dir ~/.dbt/
|
||||
@ -2,11 +2,18 @@
|
||||
{% if var("UPDATE_UDFS_AND_SPS") %}
|
||||
{% if target.database != "FLOW_COMMUNITY_DEV" %}
|
||||
{% set sql %}
|
||||
|
||||
{{ udf_bulk_get_topshot_moments_minted_metadata() }};
|
||||
{{ udf_bulk_get_nfl_allday_moments_metadata() }};
|
||||
|
||||
{{ create_udtf_get_base_table(
|
||||
schema = "streamline"
|
||||
) }}
|
||||
{{ create_udf_get_chainhead() }}
|
||||
{{ create_udf_bulk_grpc() }}
|
||||
|
||||
{% endset %}
|
||||
{% do run_query(sql) %}
|
||||
|
||||
{{- fsc_utils.create_udfs() -}}
|
||||
{% endif %}
|
||||
{% endif %}
|
||||
|
||||
@ -26,33 +26,3 @@
|
||||
{% set query_result = run_and_log_sql(sql) %}
|
||||
{% endif %}
|
||||
{% endmacro %}
|
||||
|
||||
-- macro used to run a sql query and log the results
|
||||
-- TODO: Move this to fsc-utils package
|
||||
{% macro run_and_log_sql(sql_query, log_level='info') %}
|
||||
{% set result_var = 'result_' ~ sql_query[:8] %}
|
||||
|
||||
{% set log_message = 'Executing SQL query: ' ~ sql_query %}
|
||||
{% do log(log_message,info=True) %}
|
||||
|
||||
{% set query_result = run_query(sql_query) %}
|
||||
{% set result_str = query_result.columns[0].values()[0] if query_result.columns else None %}
|
||||
|
||||
{% set log_message = 'SQL query result: ' ~ result_str %}
|
||||
{% do log(log_message, info=True) %}
|
||||
|
||||
{{ result_var }}
|
||||
{% endmacro %}
|
||||
|
||||
-- macro used to select priveleges on all views/tables in a target chema to a role
|
||||
{% macro grant_select(role) %}
|
||||
{{ log("Granting privileges to role: " ~ role, info=True) }}
|
||||
{% set sql %}
|
||||
grant usage on schema {{ target.schema }} to role {{ role }};
|
||||
grant select on all tables in schema {{ target.schema }} to role {{ role }};
|
||||
grant select on all views in schema {{ target.schema }} to role {{ role }};
|
||||
{% endset %}
|
||||
|
||||
{% do run_query(sql) %}
|
||||
{% do log("Privileges granted", info=True) %}
|
||||
{% endmacro %}
|
||||
@ -12,3 +12,18 @@
|
||||
aws_flow_api_sbx AS 'https://bc5ejedoq8.execute-api.us-east-1.amazonaws.com/sbx/get_chainhead'
|
||||
{%- endif %};
|
||||
{% endmacro %}
|
||||
|
||||
{% macro create_udf_bulk_grpc() %}
|
||||
{{ log("Creating udf udf_bulk_grpc for target:" ~ target.name ~ ", schema: " ~ target.schema, info=True) }}
|
||||
{{ log("role:" ~ target.role ~ ", user:" ~ target.user, info=True) }}
|
||||
CREATE
|
||||
OR REPLACE EXTERNAL FUNCTION streamline.udf_bulk_grpc(json variant) returns variant api_integration =
|
||||
{% if target.name == "prod" %}
|
||||
aws_flow_api AS '<PROD_CHALICE_URL>/prod/get_chainhead'
|
||||
{% elif target.name == "dev" %}
|
||||
aws_flow_api_dev AS '<DEV_CHALICE_URL>/dev/get_chainhead'
|
||||
{% elif target.name == "sbx" %}
|
||||
{{ log("Creating sbx udf_bulk_grpc", info=True) }}
|
||||
aws_flow_api_sbx AS 'https://bc5ejedoq8.execute-api.us-east-1.amazonaws.com/sbx/udf_bulk_grpc'
|
||||
{%- endif %};
|
||||
{% endmacro %}
|
||||
|
||||
108
macros/utils.sql
Normal file
108
macros/utils.sql
Normal file
@ -0,0 +1,108 @@
|
||||
{% macro if_data_call_function(
|
||||
func,
|
||||
target
|
||||
) %}
|
||||
{% if var(
|
||||
"STREAMLINE_INVOKE_STREAMS"
|
||||
) %}
|
||||
{% if execute %}
|
||||
{{ log(
|
||||
"Running macro `if_data_call_function`: Calling udf " ~ func ~ " on " ~ target,
|
||||
True
|
||||
) }}
|
||||
{% endif %}
|
||||
SELECT
|
||||
{{ func }}
|
||||
WHERE
|
||||
EXISTS(
|
||||
SELECT
|
||||
1
|
||||
FROM
|
||||
{{ target }}
|
||||
LIMIT
|
||||
1
|
||||
)
|
||||
{% else %}
|
||||
{% if execute %}
|
||||
{{ log(
|
||||
"Running macro `if_data_call_function`: NOOP",
|
||||
False
|
||||
) }}
|
||||
{% endif %}
|
||||
SELECT
|
||||
NULL
|
||||
{% endif %}
|
||||
{% endmacro %}
|
||||
|
||||
{% macro if_data_call_wait() %}
|
||||
{% if var(
|
||||
"STREAMLINE_INVOKE_STREAMS"
|
||||
) %}
|
||||
{% set query %}
|
||||
SELECT
|
||||
1
|
||||
WHERE
|
||||
EXISTS(
|
||||
SELECT
|
||||
1
|
||||
FROM
|
||||
{{ model.schema ~ "." ~ model.alias }}
|
||||
LIMIT
|
||||
1
|
||||
) {% endset %}
|
||||
{% if execute %}
|
||||
{% set results = run_query(
|
||||
query
|
||||
) %}
|
||||
{% if results %}
|
||||
{{ log(
|
||||
"Waiting...",
|
||||
info = True
|
||||
) }}
|
||||
|
||||
{% set wait_query %}
|
||||
SELECT
|
||||
system$wait(
|
||||
{{ var(
|
||||
"WAIT",
|
||||
600
|
||||
) }}
|
||||
) {% endset %}
|
||||
{% do run_query(wait_query) %}
|
||||
{% else %}
|
||||
SELECT
|
||||
NULL;
|
||||
{% endif %}
|
||||
{% endif %}
|
||||
{% endif %}
|
||||
{% endmacro %}
|
||||
|
||||
-- macro used to run a sql query and log the results
|
||||
-- TODO: Move this to fsc-utils package
|
||||
{% macro run_and_log_sql(sql_query, log_level='info') %}
|
||||
{% set result_var = 'result_' ~ sql_query[:8] %}
|
||||
|
||||
{% set log_message = 'Executing SQL query: ' ~ sql_query %}
|
||||
{% do log(log_message,info=True) %}
|
||||
|
||||
{% set query_result = run_query(sql_query) %}
|
||||
{% set result_str = query_result.columns[0].values()[0] if query_result.columns else None %}
|
||||
|
||||
{% set log_message = 'SQL query result: ' ~ result_str %}
|
||||
{% do log(log_message, info=True) %}
|
||||
|
||||
{{ result_var }}
|
||||
{% endmacro %}
|
||||
|
||||
-- macro used to select priveleges on all views/tables in a target chema to a role
|
||||
{% macro grant_select(role) %}
|
||||
{{ log("Granting privileges to role: " ~ role, info=True) }}
|
||||
{% set sql %}
|
||||
grant usage on schema {{ target.schema }} to role {{ role }};
|
||||
grant select on all tables in schema {{ target.schema }} to role {{ role }};
|
||||
grant select on all views in schema {{ target.schema }} to role {{ role }};
|
||||
{% endset %}
|
||||
|
||||
{% do run_query(sql) %}
|
||||
{% do log("Privileges granted", info=True) %}
|
||||
{% endmacro %}
|
||||
22
models/silver/streamline/README.md
Normal file
22
models/silver/streamline/README.md
Normal file
@ -0,0 +1,22 @@
|
||||
# Setup Snowflake Api Integration & UDFS
|
||||
|
||||
## Setup Snowflake Api Integration
|
||||
|
||||
Use the [create_aws_flow_api()](../../macros/streamline/api_integrations.sql#2) macro to create the `streamline-flow` Snowflake API integration.
|
||||
|
||||
The
|
||||
|
||||
```zsh
|
||||
DBT_TARGET=sbx make sl-flow-api
|
||||
|
||||
# This runs:
|
||||
# dbt run-operation create_aws_flow_api \
|
||||
# --profile flow \
|
||||
# --target $(DBT_TARGET) \
|
||||
# --profiles-dir ~/.dbt/
|
||||
```
|
||||
|
||||
```zsh
|
||||
# call sbx udf_bulk_grpc() to test the API integration
|
||||
dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True, "STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES": True}' -m 1+models/silver/streamline/core/history/streamline__get_blocks_history.sql --profile flow --target sbx --profiles-dir ~/.dbt
|
||||
```
|
||||
@ -0,0 +1,31 @@
|
||||
{{ config (
|
||||
materialized = "incremental",
|
||||
unique_key = "record_id",
|
||||
cluster_by = "ROUND(block_id, -3)",
|
||||
merge_update_columns = ["record_id"],
|
||||
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(record_id)"
|
||||
) }}
|
||||
|
||||
SELECT
|
||||
record_id,
|
||||
block_id,
|
||||
_inserted_timestamp
|
||||
FROM
|
||||
|
||||
{% if is_incremental() %}
|
||||
{{ ref('bronze__blocks') }} -- TODO: change to bronze__streamline_blocks
|
||||
WHERE
|
||||
_inserted_timestamp >= (
|
||||
SELECT
|
||||
MAX(_inserted_timestamp) _inserted_timestamp
|
||||
FROM
|
||||
{{ this }}
|
||||
)
|
||||
|
||||
{% else %}
|
||||
{{ ref('bronze__blocks') }} -- TODO: change to bronze__streamline_FR_blocks
|
||||
{% endif %}
|
||||
|
||||
qualify(ROW_NUMBER() over (PARTITION BY record_id
|
||||
ORDER BY
|
||||
_inserted_timestamp DESC)) = 1
|
||||
@ -0,0 +1,34 @@
|
||||
{{ config (
|
||||
materialized = "view",
|
||||
post_hook = if_data_call_function(
|
||||
func = "{{this.schema}}.udf_bulk_grpc(object_construct('sql_source', '{{this.identifier}}','method', 'get_block_by_height', 'external_table', 'streamline_blocks', 'sql_limit', {{var('sql_limit','1000')}}, 'producer_batch_size', {{var('producer_batch_size','1000')}}, 'worker_batch_size', {{var('worker_batch_size','1000')}}, 'batch_call_limit', {{var('batch_call_limit','1')}}))",
|
||||
target = "{{this.schema}}.{{this.identifier}}"
|
||||
)
|
||||
) }}
|
||||
|
||||
WITH blocks AS (
|
||||
|
||||
SELECT
|
||||
block_height
|
||||
FROM
|
||||
{{ ref("streamline__blocks") }}
|
||||
EXCEPT
|
||||
SELECT
|
||||
block_id as block_height
|
||||
FROM
|
||||
{{ ref("streamline__complete_get_blocks") }}
|
||||
)
|
||||
SELECT
|
||||
PARSE_JSON(
|
||||
CONCAT(
|
||||
'{"grpc": "proto3",',
|
||||
'"method": "get_block_by_height',
|
||||
'"block_height":"',
|
||||
block_height :: INTEGER,
|
||||
'"}'
|
||||
)
|
||||
) AS request
|
||||
FROM
|
||||
blocks
|
||||
ORDER BY
|
||||
block_height ASC
|
||||
17
models/silver/streamline/core/streamline__blocks.sql
Normal file
17
models/silver/streamline/core/streamline__blocks.sql
Normal file
@ -0,0 +1,17 @@
|
||||
{{ config (
|
||||
materialized = "view",
|
||||
tags = ['streamline_view']
|
||||
) }}
|
||||
|
||||
|
||||
{% if execute %}
|
||||
{% set height = run_query('SELECT streamline.udf_get_chainhead()') %}
|
||||
{% set block_height = height.columns[0].values()[0] %}
|
||||
{% else %}
|
||||
{% set block_height = 0 %}
|
||||
{% endif %}
|
||||
|
||||
SELECT
|
||||
height as block_height
|
||||
FROM
|
||||
TABLE(streamline.udtf_get_base_table({{block_height}}))
|
||||
Loading…
Reference in New Issue
Block a user