Jason/blocks (#36)

* Models for blocks stream

* add the utils for post_hook

* add the bronze streamline schema

* Update

* Update

* Duplicate bug fix

* Chainhead route fix

Co-authored-by: yulike <xiuyangguan@gmail.com>
This commit is contained in:
cantjaso 2022-10-31 13:03:52 -04:00 committed by GitHub
parent f356f44872
commit 9565d67c4d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 338 additions and 2 deletions

View File

@ -5,7 +5,18 @@
{{ create_udf_hex_to_int(
schema = "public"
) }}
{{ create_udtf_get_base_table(
schema = "streamline"
) }}
{% endset %}
{% do run_query(sql) %}
{% endmacro %}
{% if target.database != "POLYGON_COMMUNITY_DEV" %}
{% set sql %}
{{ create_udf_get_chainhead() }}
{{ create_udf_get_blocks() }}
{% endset %}
{% do run_query(sql) %}
{% endif %}
{% endmacro %}

View File

@ -0,0 +1,26 @@
{% macro create_blocks_history() %}
{% set sql %}
CREATE
OR REPLACE PROCEDURE streamline.blocks_history() returns variant LANGUAGE SQL AS $$
DECLARE
RESULT variant;
row_cnt INTEGER;
BEGIN
row_cnt:= (
SELECT
COUNT(1)
FROM
{{ ref('streamline__blocks_history') }}
);
if (
row_cnt > 0
) THEN RESULT:= (
SELECT
streamline.udf_get_blocks()
);
ELSE RESULT:= NULL;
END if;
RETURN RESULT;
END;$$ {% endset %}
{% do run_query(sql) %}
{% endmacro %}

View File

@ -0,0 +1,26 @@
{% macro create_blocks_realtime() %}
{% set sql %}
CREATE
OR REPLACE PROCEDURE streamline.blocks_realtime() returns variant LANGUAGE SQL AS $$
DECLARE
RESULT variant;
row_cnt INTEGER;
BEGIN
row_cnt:= (
SELECT
COUNT(1)
FROM
{{ ref('streamline__blocks_realtime') }}
);
if (
row_cnt > 0
) THEN RESULT:= (
SELECT
streamline.udf_get_blocks()
);
ELSE RESULT:= NULL;
END if;
RETURN RESULT;
END;$$ {% endset %}
{% do run_query(sql) %}
{% endmacro %}

View File

@ -0,0 +1,24 @@
{% macro create_udtf_get_base_table(schema) %}
create or replace function {{ schema }}.udtf_get_base_table(max_height integer)
returns table (height number)
as
$$
with base as (
select
row_number() over (
order by
seq4()
) as id
from
table(generator(rowcount => 100000000))
)
select
id as height
from
base
where
id <= max_height
$$
;
{% endmacro %}

View File

@ -0,0 +1,18 @@
{% macro create_udf_get_chainhead() %}
CREATE EXTERNAL FUNCTION IF NOT EXISTS streamline.udf_get_chainhead() returns variant api_integration = aws_polygon_api AS {% if target.name == "prod" %}
'https://avl1rax159.execute-api.us-east-1.amazonaws.com/prod/get_chainhead'
{% else %}
'https://jml4wcap5f.execute-api.us-east-1.amazonaws.com/dev/get_chainhead'
{%- endif %};
{% endmacro %}
{% macro create_udf_get_blocks() %}
CREATE
OR REPLACE EXTERNAL FUNCTION streamline.udf_get_blocks(
json variant
) returns text api_integration = aws_polygon_api AS {% if target.name == "prod" %}
'https://avl1rax159.execute-api.us-east-1.amazonaws.com/prod/bulk_get_blocks'
{% else %}
'https://jml4wcap5f.execute-api.us-east-1.amazonaws.com/dev/bulk_get_blocks'
{%- endif %};
{% endmacro %}

35
macros/utils.sql Normal file
View File

@ -0,0 +1,35 @@
{% macro if_data_call_function(
func,
target
) %}
{% if var(
"STREAMLINE_INVOKE_STREAMS"
) %}
{% if execute %}
{{ log(
"Running macro `if_data_call_function`: Calling udf " ~ func ~ " on " ~ target,
True
) }}
{% endif %}
SELECT
{{ func }}
WHERE
EXISTS(
SELECT
1
FROM
{{ target }}
LIMIT
1
)
{% else %}
{% if execute %}
{{ log(
"Running macro `if_data_call_function`: NOOP",
False
) }}
{% endif %}
SELECT
NULL
{% endif %}
{% endmacro %}

View File

@ -0,0 +1,27 @@
{{ config (
materialized = "ephemeral",
unique_key = "block_number",
) }}
WITH base AS (
SELECT
block_timestamp :: DATE AS block_date,
MAX(block_number) block_number
FROM
{{ ref("silver__blocks") }}
GROUP BY
block_timestamp :: DATE
)
SELECT
block_date,
block_number
FROM
base
WHERE
block_date <> (
SELECT
MAX(block_date)
FROM
base
)

View File

@ -0,0 +1,17 @@
{{ config (
materialized = "view",
tags = ['streamline_view']
) }}
{% if execute %}
{% set height = run_query('SELECT streamline.udf_get_chainhead()') %}
{% set block_height = height.columns[0].values()[0] %}
{% else %}
{% set block_height = 0 %}
{% endif %}
SELECT
height as block_number
FROM
TABLE(streamline.udtf_get_base_table({{block_height}}))

View File

@ -0,0 +1,40 @@
{{ config (
materialized = "view",
post_hook = if_data_call_function(
func = "{{this.schema}}.udf_get_blocks(object_construct('sql_source', '{{this.identifier}}'))",
target = "{{this.schema}}.{{this.identifier}}"
)
) }}
{% for item in range(33) %}
(
SELECT
{{ dbt_utils.surrogate_key(
['block_number']
) }} AS id,
block_number
FROM
{{ ref("streamline__blocks") }}
WHERE
block_number BETWEEN {{ item * 1000000 + 1 }}
AND {{(
item + 1
) * 1000000 }}
EXCEPT
SELECT
id,
block_number
FROM
{{ ref("streamline__complete_blocks") }}
WHERE
block_number BETWEEN {{ item * 1000000 + 1 }}
AND {{(
item + 1
) * 1000000 }}
ORDER BY
block_number
) {% if not loop.last %}
UNION ALL
{% endif %}
{% endfor %}

View File

@ -0,0 +1,51 @@
{{ config (
materialized = "view",
post_hook = if_data_call_function(
func = "{{this.schema}}.udf_get_blocks(object_construct('sql_source', '{{this.identifier}}'))",
target = "{{this.schema}}.{{this.identifier}}"
)
) }}
WITH last_3_days AS (
SELECT
block_number
FROM
{{ ref("_max_block_by_date") }}
qualify ROW_NUMBER() over (
ORDER BY
block_number DESC
) = 3
)
SELECT
{{ dbt_utils.surrogate_key(
['block_number']
) }} AS id,
block_number
FROM
{{ ref("streamline__blocks") }}
WHERE
(
block_number >= (
SELECT
block_number
FROM
last_3_days
) {# TODO: OR can be removed once historical load is complete #}
OR block_number > 33000000
)
AND block_number IS NOT NULL
EXCEPT
SELECT
id,
block_number
FROM
{{ ref("streamline__complete_blocks") }}
WHERE
block_number >= (
SELECT
block_number
FROM
last_3_days
) {# TODO: OR can be removed once historical load is complete #}
OR block_number > 33000000

View File

@ -0,0 +1,54 @@
{{ config (
materialized = "incremental",
unique_key = "id",
cluster_by = "ROUND(block_number, -3)",
merge_update_columns = ["id"]
) }}
WITH meta AS (
SELECT
last_modified,
file_name
FROM
TABLE(
information_schema.external_table_files(
table_name => '{{ source( "bronze_streamline", "blocks") }}'
)
) A
)
{% if is_incremental() %},
max_date AS (
SELECT
COALESCE(MAX(_INSERTED_TIMESTAMP), '1970-01-01' :: DATE) max_INSERTED_TIMESTAMP
FROM
{{ this }})
{% endif %}
SELECT
{{ dbt_utils.surrogate_key(
['block_number']
) }} AS id,
block_number,
last_modified AS _inserted_timestamp
FROM
{{ source(
"bronze_streamline",
"blocks"
) }}
JOIN meta b
ON b.file_name = metadata$filename
{% if is_incremental() %}
WHERE
b.last_modified > (
SELECT
max_INSERTED_TIMESTAMP
FROM
max_date
)
{% endif %}
qualify(ROW_NUMBER() over (PARTITION BY id
ORDER BY
_inserted_timestamp DESC)) = 1

View File

@ -29,4 +29,11 @@ sources:
tables:
- name: fact_hourly_token_prices
- name: DIM_CONTRACTS
- name: DIM_DEX_LIQUIDITY_POOLS
- name: DIM_DEX_LIQUIDITY_POOLS
- name: bronze_streamline
database: streamline
schema: |
{{ "POLYGON_DEV" if var("STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES", False) else "POLYGON" }}
tables:
- name: blocks
- name: transactions