From 14e471fbacf90b5942836ee9fcafa51042d64695 Mon Sep 17 00:00:00 2001 From: shah Date: Fri, 5 May 2023 08:21:08 -0700 Subject: [PATCH] added terra_udf_bulk_json_rpc analyses --- analyses/terra_udf_bulk_json_rpc.sql | 23 ++++++ dbt_project.yml | 6 +- macros/utils.sql | 78 +++++++++++++++++++ .../streamline__pc_getBlock_realtime.sql | 74 ------------------ 4 files changed, 105 insertions(+), 76 deletions(-) create mode 100644 analyses/terra_udf_bulk_json_rpc.sql create mode 100644 macros/utils.sql delete mode 100644 models/silver/streamline/core/realtime/streamline__pc_getBlock_realtime.sql diff --git a/analyses/terra_udf_bulk_json_rpc.sql b/analyses/terra_udf_bulk_json_rpc.sql new file mode 100644 index 0000000..fe2e8aa --- /dev/null +++ b/analyses/terra_udf_bulk_json_rpc.sql @@ -0,0 +1,23 @@ +{{if_data_call_function( + func = "streamline.udf_bulk_json_rpc_sbx_shah(object_construct('sql_source', 'view_name', 'external_table', 'qn_getBlockWithReceipts', 'sql_limit', 4000, 'producer_batch_size', 10000s, 'worker_batch_size', 1000, 'batch_call_limit', 10))", + target = "streamline.view_name" + ) +}} + +SELECT + streamline.udf_bulk_json_rpc( + object_construct( + 'sql_source', + 'view_name', + 'external_table', + 'qn_getBlockWithReceipts', + 'sql_limit', + 4000, + 'producer_batch_size', + 10000, + 'worker_batch_size', + 1000, + 'batch_call_limit', + 10 + ) + ) \ No newline at end of file diff --git a/dbt_project.yml b/dbt_project.yml index e6707dd..32a0a00 100644 --- a/dbt_project.yml +++ b/dbt_project.yml @@ -51,5 +51,7 @@ tests: vars: "dbt_date:time_zone": GMT "UPDATE_SNOWFLAKE_TAGS": TRUE - - + STREAMLINE_INVOKE_STREAMS: True + STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES: False + UPDATE_UDFS_AND_SPS: False + UPDATE_SNOWFLAKE_TAGS: True diff --git a/macros/utils.sql b/macros/utils.sql new file mode 100644 index 0000000..0b800b8 --- /dev/null +++ b/macros/utils.sql @@ -0,0 +1,78 @@ +{% macro if_data_call_function( + func, + target + ) %} + {% if var( + "STREAMLINE_INVOKE_STREAMS" + ) %} + {% if execute %} + {{ log( + "Running macro `if_data_call_function`: Calling udf " ~ func ~ " on " ~ target, + True + ) }} + {% endif %} + SELECT + {{ func }} + WHERE + EXISTS( + SELECT + 1 + FROM + {{ target }} + LIMIT + 1 + ) + {% else %} + {% if execute %} + {{ log( + "Running macro `if_data_call_function`: NOOP", + False + ) }} + {% endif %} + SELECT + NULL + {% endif %} +{% endmacro %} + +{% macro if_data_call_wait() %} + {% if var( + "STREAMLINE_INVOKE_STREAMS" + ) %} + {% set query %} + SELECT + 1 + WHERE + EXISTS( + SELECT + 1 + FROM + {{ model.schema ~ "." ~ model.alias }} + LIMIT + 1 + ) {% endset %} + {% if execute %} + {% set results = run_query( + query + ) %} + {% if results %} + {{ log( + "Waiting...", + info = True + ) }} + + {% set wait_query %} + SELECT + system$wait( + {{ var( + "WAIT", + 600 + ) }} + ) {% endset %} + {% do run_query(wait_query) %} + {% else %} + SELECT + NULL; + {% endif %} + {% endif %} + {% endif %} +{% endmacro %} diff --git a/models/silver/streamline/core/realtime/streamline__pc_getBlock_realtime.sql b/models/silver/streamline/core/realtime/streamline__pc_getBlock_realtime.sql deleted file mode 100644 index f40da92..0000000 --- a/models/silver/streamline/core/realtime/streamline__pc_getBlock_realtime.sql +++ /dev/null @@ -1,74 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = if_data_call_function( - func = "{{this.schema}}.udf_bulk_json_rpc(object_construct('sql_source', '{{this.identifier}}', 'external_table', 'pc_getBlock', 'sql_limit', {{var('sql_limit','40000')}}, 'producer_batch_size', {{var('producer_batch_size','10000')}}, 'worker_batch_size', {{var('worker_batch_size','1000')}}, 'batch_call_limit', {{var('batch_call_limit','10')}}))", - target = "{{this.schema}}.{{this.identifier}}" - ) -) }} - --- WITH last_3_days AS ( - --- SELECT --- block_number --- FROM --- {{ ref("_max_block_by_date") }} --- qualify ROW_NUMBER() over ( --- ORDER BY --- block_number DESC --- ) = 3 --- ), --- blocks AS ( --- SELECT --- block_number :: STRING AS block_number --- FROM --- {{ ref("streamline__blocks") }} --- WHERE --- ( --- block_number >= ( --- SELECT --- block_number --- FROM --- last_3_days --- ) --- ) --- -- EXCEPT --- -- SELECT --- -- block_number :: STRING --- -- FROM --- -- {{ ref("streamline__complete_qn_getBlockWithReceipts") }} --- -- WHERE --- -- ( --- -- block_number >= ( --- -- SELECT --- -- block_number --- -- FROM --- -- last_3_days --- -- ) --- -- ) --- ) --- SELECT --- PARSE_JSON( --- CONCAT( --- '{"jsonrpc": "2.0",', --- '"method": "block", "params":["', --- REPLACE( --- concat_ws( --- '', --- '0x', --- to_char( --- block_number :: INTEGER, --- 'XXXXXXXX' --- ) --- ), --- ' ', --- '' --- ), --- '"],"id":"', --- block_number :: STRING, --- '"}' --- ) --- ) AS request --- FROM --- blocks --- ORDER BY --- block_number ASC