diff --git a/macros/create_udfs.sql b/macros/create_udfs.sql index cc04a84..b26bed8 100644 --- a/macros/create_udfs.sql +++ b/macros/create_udfs.sql @@ -5,7 +5,18 @@ {{ create_udf_hex_to_int( schema = "public" ) }} + {{ create_udtf_get_base_table( + schema = "streamline" + ) }} {% endset %} {% do run_query(sql) %} -{% endmacro %} + {% if target.database != "POLYGON_COMMUNITY_DEV" %} + {% set sql %} + {{ create_udf_get_chainhead() }} + {{ create_udf_get_blocks() }} + + {% endset %} + {% do run_query(sql) %} + {% endif %} +{% endmacro %} \ No newline at end of file diff --git a/macros/streamline/blocks_history.sql b/macros/streamline/blocks_history.sql new file mode 100644 index 0000000..3c67cbd --- /dev/null +++ b/macros/streamline/blocks_history.sql @@ -0,0 +1,26 @@ +{% macro create_blocks_history() %} + {% set sql %} + CREATE + OR REPLACE PROCEDURE streamline.blocks_history() returns variant LANGUAGE SQL AS $$ +DECLARE + RESULT variant; +row_cnt INTEGER; +BEGIN + row_cnt:= ( + SELECT + COUNT(1) + FROM + {{ ref('streamline__blocks_history') }} + ); +if ( + row_cnt > 0 + ) THEN RESULT:= ( + SELECT + streamline.udf_get_blocks() + ); + ELSE RESULT:= NULL; +END if; +RETURN RESULT; +END;$$ {% endset %} +{% do run_query(sql) %} +{% endmacro %} \ No newline at end of file diff --git a/macros/streamline/blocks_realtime.sql b/macros/streamline/blocks_realtime.sql new file mode 100644 index 0000000..eae1adc --- /dev/null +++ b/macros/streamline/blocks_realtime.sql @@ -0,0 +1,26 @@ +{% macro create_blocks_realtime() %} + {% set sql %} + CREATE + OR REPLACE PROCEDURE streamline.blocks_realtime() returns variant LANGUAGE SQL AS $$ +DECLARE + RESULT variant; +row_cnt INTEGER; +BEGIN + row_cnt:= ( + SELECT + COUNT(1) + FROM + {{ ref('streamline__blocks_realtime') }} + ); +if ( + row_cnt > 0 + ) THEN RESULT:= ( + SELECT + streamline.udf_get_blocks() + ); + ELSE RESULT:= NULL; +END if; +RETURN RESULT; +END;$$ {% endset %} +{% do run_query(sql) %} +{% endmacro %} \ No newline at end of file diff --git a/macros/streamline/get_base_table_udft.sql b/macros/streamline/get_base_table_udft.sql new file mode 100644 index 0000000..a488d14 --- /dev/null +++ b/macros/streamline/get_base_table_udft.sql @@ -0,0 +1,24 @@ +{% macro create_udtf_get_base_table(schema) %} +create or replace function {{ schema }}.udtf_get_base_table(max_height integer) +returns table (height number) +as +$$ + with base as ( + select + row_number() over ( + order by + seq4() + ) as id + from + table(generator(rowcount => 100000000)) + ) +select + id as height +from + base +where + id <= max_height +$$ +; + +{% endmacro %} \ No newline at end of file diff --git a/macros/streamline/streamline_udfs.sql b/macros/streamline/streamline_udfs.sql new file mode 100644 index 0000000..72fdd19 --- /dev/null +++ b/macros/streamline/streamline_udfs.sql @@ -0,0 +1,18 @@ +{% macro create_udf_get_chainhead() %} + CREATE EXTERNAL FUNCTION IF NOT EXISTS streamline.udf_get_chainhead() returns variant api_integration = aws_polygon_api AS {% if target.name == "prod" %} + 'https://avl1rax159.execute-api.us-east-1.amazonaws.com/prod/get_chainhead' + {% else %} + 'https://jml4wcap5f.execute-api.us-east-1.amazonaws.com/dev/get_chainhead' + {%- endif %}; +{% endmacro %} + +{% macro create_udf_get_blocks() %} + CREATE + OR REPLACE EXTERNAL FUNCTION streamline.udf_get_blocks( + json variant + ) returns text api_integration = aws_polygon_api AS {% if target.name == "prod" %} + 'https://avl1rax159.execute-api.us-east-1.amazonaws.com/prod/bulk_get_blocks' + {% else %} + 'https://jml4wcap5f.execute-api.us-east-1.amazonaws.com/dev/bulk_get_blocks' + {%- endif %}; +{% endmacro %} \ No newline at end of file diff --git a/macros/utils.sql b/macros/utils.sql new file mode 100644 index 0000000..85549f1 --- /dev/null +++ b/macros/utils.sql @@ -0,0 +1,35 @@ +{% macro if_data_call_function( + func, + target + ) %} + {% if var( + "STREAMLINE_INVOKE_STREAMS" + ) %} + {% if execute %} + {{ log( + "Running macro `if_data_call_function`: Calling udf " ~ func ~ " on " ~ target, + True + ) }} + {% endif %} + SELECT + {{ func }} + WHERE + EXISTS( + SELECT + 1 + FROM + {{ target }} + LIMIT + 1 + ) + {% else %} + {% if execute %} + {{ log( + "Running macro `if_data_call_function`: NOOP", + False + ) }} + {% endif %} + SELECT + NULL + {% endif %} +{% endmacro %} diff --git a/models/silver/streamline/_max_block_by_date.sql b/models/silver/streamline/_max_block_by_date.sql new file mode 100644 index 0000000..a56cf82 --- /dev/null +++ b/models/silver/streamline/_max_block_by_date.sql @@ -0,0 +1,27 @@ +{{ config ( + materialized = "ephemeral", + unique_key = "block_number", +) }} + +WITH base AS ( + + SELECT + block_timestamp :: DATE AS block_date, + MAX(block_number) block_number + FROM + {{ ref("silver__blocks") }} + GROUP BY + block_timestamp :: DATE +) +SELECT + block_date, + block_number +FROM + base +WHERE + block_date <> ( + SELECT + MAX(block_date) + FROM + base + ) \ No newline at end of file diff --git a/models/silver/streamline/streamline__blocks.sql b/models/silver/streamline/streamline__blocks.sql new file mode 100644 index 0000000..da743e6 --- /dev/null +++ b/models/silver/streamline/streamline__blocks.sql @@ -0,0 +1,17 @@ +{{ config ( + materialized = "view", + tags = ['streamline_view'] +) }} + + +{% if execute %} +{% set height = run_query('SELECT streamline.udf_get_chainhead()') %} +{% set block_height = height.columns[0].values()[0] %} +{% else %} +{% set block_height = 0 %} +{% endif %} + +SELECT + height as block_number +FROM + TABLE(streamline.udtf_get_base_table({{block_height}})) \ No newline at end of file diff --git a/models/silver/streamline/streamline__blocks_history.sql b/models/silver/streamline/streamline__blocks_history.sql new file mode 100644 index 0000000..a71649f --- /dev/null +++ b/models/silver/streamline/streamline__blocks_history.sql @@ -0,0 +1,40 @@ +{{ config ( + materialized = "view", + post_hook = if_data_call_function( + func = "{{this.schema}}.udf_get_blocks(object_construct('sql_source', '{{this.identifier}}'))", + target = "{{this.schema}}.{{this.identifier}}" + ) +) }} + +{% for item in range(33) %} + ( + + SELECT + {{ dbt_utils.surrogate_key( + ['block_number'] + ) }} AS id, + block_number + FROM + {{ ref("streamline__blocks") }} + WHERE + block_number BETWEEN {{ item * 1000000 + 1 }} + AND {{( + item + 1 + ) * 1000000 }} + EXCEPT + SELECT + id, + block_number + FROM + {{ ref("streamline__complete_blocks") }} + WHERE + block_number BETWEEN {{ item * 1000000 + 1 }} + AND {{( + item + 1 + ) * 1000000 }} + ORDER BY + block_number + ) {% if not loop.last %} + UNION ALL + {% endif %} +{% endfor %} \ No newline at end of file diff --git a/models/silver/streamline/streamline__blocks_realtime.sql b/models/silver/streamline/streamline__blocks_realtime.sql new file mode 100644 index 0000000..a4c0dca --- /dev/null +++ b/models/silver/streamline/streamline__blocks_realtime.sql @@ -0,0 +1,51 @@ +{{ config ( + materialized = "view", + post_hook = if_data_call_function( + func = "{{this.schema}}.udf_get_blocks(object_construct('sql_source', '{{this.identifier}}'))", + target = "{{this.schema}}.{{this.identifier}}" + ) +) }} + +WITH last_3_days AS ( + + SELECT + block_number + FROM + {{ ref("_max_block_by_date") }} + qualify ROW_NUMBER() over ( + ORDER BY + block_number DESC + ) = 3 +) +SELECT + {{ dbt_utils.surrogate_key( + ['block_number'] + ) }} AS id, + block_number +FROM + {{ ref("streamline__blocks") }} +WHERE + ( + block_number >= ( + SELECT + block_number + FROM + last_3_days + ) {# TODO: OR can be removed once historical load is complete #} + OR block_number > 33000000 + ) + AND block_number IS NOT NULL +EXCEPT +SELECT + id, + block_number +FROM + {{ ref("streamline__complete_blocks") }} +WHERE + block_number >= ( + SELECT + block_number + FROM + last_3_days + ) {# TODO: OR can be removed once historical load is complete #} + OR block_number > 33000000 \ No newline at end of file diff --git a/models/silver/streamline/streamline__complete_blocks.sql b/models/silver/streamline/streamline__complete_blocks.sql new file mode 100644 index 0000000..1a9c23e --- /dev/null +++ b/models/silver/streamline/streamline__complete_blocks.sql @@ -0,0 +1,54 @@ +{{ config ( + materialized = "incremental", + unique_key = "id", + cluster_by = "ROUND(block_number, -3)", + merge_update_columns = ["id"] +) }} + +WITH meta AS ( + + SELECT + last_modified, + file_name + FROM + TABLE( + information_schema.external_table_files( + table_name => '{{ source( "bronze_streamline", "blocks") }}' + ) + ) A +) + +{% if is_incremental() %}, +max_date AS ( + SELECT + COALESCE(MAX(_INSERTED_TIMESTAMP), '1970-01-01' :: DATE) max_INSERTED_TIMESTAMP + FROM + {{ this }}) + {% endif %} + SELECT + {{ dbt_utils.surrogate_key( + ['block_number'] + ) }} AS id, + block_number, + last_modified AS _inserted_timestamp + FROM + {{ source( + "bronze_streamline", + "blocks" + ) }} + JOIN meta b + ON b.file_name = metadata$filename + +{% if is_incremental() %} +WHERE + b.last_modified > ( + SELECT + max_INSERTED_TIMESTAMP + FROM + max_date + ) +{% endif %} + +qualify(ROW_NUMBER() over (PARTITION BY id +ORDER BY + _inserted_timestamp DESC)) = 1 \ No newline at end of file diff --git a/models/sources.yml b/models/sources.yml index a820d4c..1361561 100644 --- a/models/sources.yml +++ b/models/sources.yml @@ -29,4 +29,11 @@ sources: tables: - name: fact_hourly_token_prices - name: DIM_CONTRACTS - - name: DIM_DEX_LIQUIDITY_POOLS \ No newline at end of file + - name: DIM_DEX_LIQUIDITY_POOLS + - name: bronze_streamline + database: streamline + schema: | + {{ "POLYGON_DEV" if var("STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES", False) else "POLYGON" }} + tables: + - name: blocks + - name: transactions \ No newline at end of file