added streamline udtf definitions

This commit is contained in:
shah 2023-05-08 13:30:36 -07:00
parent 5683091151
commit 1a914d6321
7 changed files with 148 additions and 2 deletions

2
.gitignore vendored
View File

@ -18,3 +18,5 @@ dbt_docs.sh
.github/workflows/.dbt/.user.yml
dbt-env/
*.user.yml

View File

@ -8,9 +8,9 @@ SELECT
streamline.udf_bulk_json_rpc(
object_construct(
'sql_source',
'view_name',
'streamline__pc_getBlock_realtime',
'external_table',
'qn_getBlockWithReceipts',
'pc_getBlock',
'sql_limit',
4000,
'producer_batch_size',

16
macros/create_udfs.sql Normal file
View File

@ -0,0 +1,16 @@
{% macro create_udfs() %}
{% if var("UPDATE_UDFS_AND_SPS") %}
{% set sql %}
CREATE schema if NOT EXISTS silver;
{{ create_udtf_get_base_table(
schema = "streamline"
) }}
{{ create_js_hex_to_int() }};
{{ create_udf_hex_to_int(
schema = "public"
) }}
{{ create_udf_get_chainhead() }}
{% endset %}
{% do run_query(sql) %}
{% endif %}
{% endmacro %}

View File

@ -0,0 +1,24 @@
{% macro create_udtf_get_base_table(schema) %}
create or replace function {{ schema }}.udtf_get_base_table(max_height integer)
returns table (height number)
as
$$
with base as (
select
row_number() over (
order by
seq4()
) as id
from
table(generator(rowcount => 100000000))
)
select
id as height
from
base
where
id <= max_height
$$
;
{% endmacro %}

View File

@ -0,0 +1,27 @@
{{ config (
materialized = "ephemeral",
unique_key = "block_id",
) }}
WITH base AS (
SELECT
block_timestamp :: DATE AS block_date,
MAX(block_id) block_id
FROM
{{ ref("silver__blocks") }}
GROUP BY
block_timestamp :: DATE
)
SELECT
block_date,
block_id
FROM
base
WHERE
block_date <> (
SELECT
MAX(block_date)
FROM
base
)

View File

@ -0,0 +1,17 @@
{{ config (
materialized = "view",
tags = ['streamline_view']
) }}
{% if execute %}
{% set height = run_query('SELECT streamline.udf_get_chainhead()') %}
{% set block_height = height.columns[0].values()[0] %}
{% else %}
{% set block_height = 0 %}
{% endif %}
SELECT
height as block_id
FROM
TABLE(streamline.udtf_get_base_table({{block_height}}))

View File

@ -0,0 +1,60 @@
{{ config (
materialized = "view",
post_hook = if_data_call_function(
func = "{{this.schema}}.udf_bulk_json_rpc(object_construct('sql_source', '{{this.identifier}}', 'external_table', 'qn_getBlockWithReceipts', 'sql_limit', {{var('sql_limit','40000')}}, 'producer_batch_size', {{var('producer_batch_size','10000')}}, 'worker_batch_size', {{var('worker_batch_size','1000')}}, 'batch_call_limit', {{var('batch_call_limit','10')}}))",
target = "{{this.schema}}.{{this.identifier}}"
)
) }}
WITH last_3_days AS (
SELECT
block_id
FROM
{{ ref("_max_block_by_date") }}
qualify ROW_NUMBER() over (
ORDER BY
block_id DESC
) = 3
),
blocks AS (
SELECT
block_id :: STRING AS block_id
FROM
{{ ref("streamline__blocks") }}
WHERE
(
block_id >= (
SELECT
block_id
FROM
last_3_days
)
)
)
SELECT
PARSE_JSON(
CONCAT(
'{"jsonrpc": "2.0",',
'"method": "qn_getBlockWithReceipts", "params":["',
REPLACE(
concat_ws(
'',
'0x',
to_char(
block_id :: INTEGER,
'XXXXXXXX'
)
),
' ',
''
),
'"],"id":"',
block_id :: STRING,
'"}'
)
) AS request
FROM
blocks
ORDER BY
block_id ASC