Add generic stream (#2)

* Add generic stream

* Add generic stream
This commit is contained in:
Ryan-Loofy 2023-02-24 11:47:49 -05:00 committed by GitHub
parent 3cad429943
commit e7f56a6d83
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 570 additions and 5 deletions

View File

@ -29,9 +29,10 @@ tests:
on-run-start:
- "{{ create_sps() }}"
- "{{ create_udfs() }}"
on-run-end:
- '{{ apply_meta_as_tags(results) }}'
- '{{ apply_meta_as_tags(results) }}'
# Configuring models
# Full documentation: https://docs.getdbt.com/docs/configuring-models
@ -42,5 +43,7 @@ on-run-end:
vars:
"dbt_date:time_zone": GMT
STREAMLINE_INVOKE_STREAMS: False
STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES: False
UPDATE_UDFS_AND_SPS: False
UPDATE_SNOWFLAKE_TAGS: True

20
macros/create_udfs.sql Normal file
View File

@ -0,0 +1,20 @@
{% macro create_udfs() %}
{% if var("UPDATE_UDFS_AND_SPS") %}
{% set sql %}
CREATE schema if NOT EXISTS silver;
{{ create_udtf_get_base_table(
schema = "streamline"
) }}
{% endset %}
{% do run_query(sql) %}
{% if target.database != "BASE_COMMUNITY_DEV" %}
{% set sql %}
{# {{ create_udf_get_chainhead() }} #}
{{ create_udf_bulk_json_rpc() }}
{% endset %}
{% do run_query(sql) %}
{% endif %}
{% endif %}
{% endmacro %}

View File

@ -0,0 +1,11 @@
{% macro create_aws_base_api() %}
{% if target.name == "prod" %}
{% set sql %}
CREATE api integration IF NOT EXISTS aws_base_api api_provider = aws_api_gateway api_aws_role_arn = 'arn:aws:iam::490041342817:role/snowflake-api-base' api_allowed_prefixes = (
'https://avaxk4phkl.execute-api.us-east-1.amazonaws.com/prod/',
'https://k9b03inxm4.execute-api.us-east-1.amazonaws.com/dev/'
) enabled = TRUE;
{% endset %}
{% do run_query(sql) %}
{% endif %}
{% endmacro %}

View File

@ -0,0 +1,24 @@
{% macro create_udtf_get_base_table(schema) %}
create or replace function {{ schema }}.udtf_get_base_table(max_height integer)
returns table (height number)
as
$$
with base as (
select
row_number() over (
order by
seq4()
) as id
from
table(generator(rowcount => 100000000))
)
select
id as height
from
base
where
id <= max_height
$$
;
{% endmacro %}

View File

@ -0,0 +1,19 @@
{# {% macro create_udf_get_chainhead() %}
CREATE OR REPLACE EXTERNAL FUNCTION streamline.udf_get_chainhead(
) returns variant api_integration = aws_base_api AS {% if target.name == "prod" %}
'https://avaxk4phkl.execute-api.us-east-1.amazonaws.com/prod/get_chainhead'
{% else %}
'https://k9b03inxm4.execute-api.us-east-1.amazonaws.com/dev/get_chainhead'
{%- endif %};
{% endmacro %} #}
{% macro create_udf_bulk_json_rpc() %}
CREATE
OR REPLACE EXTERNAL FUNCTION streamline.udf_bulk_json_rpc(
json variant
) returns text api_integration = aws_base_api AS {% if target.name == "prod" %}
'https://avaxk4phkl.execute-api.us-east-1.amazonaws.com/prod/udf_bulk_json_rpc'
{% else %}
'https://k9b03inxm4.execute-api.us-east-1.amazonaws.com/dev/udf_bulk_json_rpc'
{%- endif %};
{% endmacro %}

78
macros/utils.sql Normal file
View File

@ -0,0 +1,78 @@
{% macro if_data_call_function(
func,
target
) %}
{% if var(
"STREAMLINE_INVOKE_STREAMS"
) %}
{% if execute %}
{{ log(
"Running macro `if_data_call_function`: Calling udf " ~ func ~ " on " ~ target,
True
) }}
{% endif %}
SELECT
{{ func }}
WHERE
EXISTS(
SELECT
1
FROM
{{ target }}
LIMIT
1
)
{% else %}
{% if execute %}
{{ log(
"Running macro `if_data_call_function`: NOOP",
False
) }}
{% endif %}
SELECT
NULL
{% endif %}
{% endmacro %}
{% macro if_data_call_wait() %}
{% if var(
"STREAMLINE_INVOKE_STREAMS"
) %}
{% set query %}
SELECT
1
WHERE
EXISTS(
SELECT
1
FROM
{{ model.schema ~ "." ~ model.alias }}
LIMIT
1
) {% endset %}
{% if execute %}
{% set results = run_query(
query
) %}
{% if results %}
{{ log(
"Waiting...",
info = True
) }}
{% set wait_query %}
SELECT
system$wait(
{{ var(
"WAIT",
600
) }}
) {% endset %}
{% do run_query(wait_query) %}
{% else %}
SELECT
NULL;
{% endif %}
{% endif %}
{% endif %}
{% endmacro %}

View File

@ -0,0 +1,6 @@
{{ config (
materialized = 'view'
) }}
SELECT
1 AS test

View File

@ -0,0 +1,19 @@
{{ config (
materialized = "view",
tags = ['streamline_view']
) }}
{% if execute %}
{# {% set height = run_query('SELECT streamline.udf_get_chainhead()') %}
#}
{# {% set block_height = height.columns[0].values()[0] %} #}
{% set block_height = 1000000 %}
{% else %}
{% set block_height = 0 %}
{% endif %}
SELECT
height as block_number
FROM
TABLE(streamline.udtf_get_base_table({{block_height}}))

View File

@ -0,0 +1,32 @@
{{ config (
materialized = "view",
post_hook = if_data_call_function(
func = "{{this.schema}}.udf_bulk_json_rpc(object_construct('sql_source', '{{this.identifier}}', 'external_table', 'blocks', 'sql_limit', {{var('sql_limit','900')}}, 'producer_batch_size', {{var('producer_batch_size','300')}}, 'worker_batch_size', {{var('worker_batch_size','300')}}, 'batch_call_limit', {{var('batch_call_limit','30')}}))",
target = "{{this.schema}}.{{this.identifier}}"
)
) }}
SELECT
PARSE_JSON(
CONCAT(
'{"method": "eth_getBlockByNumber", "params":[',
block_number :: STRING,
',',
FALSE :: BOOLEAN,
'],"id":',
block_number :: STRING,
'}'
)
) AS request
FROM
{{ ref("streamline__blocks") }}
WHERE
block_number > 1000000
AND block_number IS NOT NULL
EXCEPT
SELECT
block_number
FROM
{{ ref("streamline__complete_blocks") }}
WHERE
block_number > 1000000

View File

@ -0,0 +1,63 @@
{{ config (
materialized = "incremental",
unique_key = "id",
cluster_by = "ROUND(block_number, -3)",
merge_update_columns = ["id"],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(id)"
) }}
WITH max_date AS (
SELECT
COALESCE(MAX(_INSERTED_TIMESTAMP), '1970-01-01' :: DATE) max_INSERTED_TIMESTAMP
FROM
{{ this }}),
meta AS (
SELECT
CAST(
SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER
) AS _partition_by_block_number
FROM
TABLE(
information_schema.external_table_files(
table_name => '{{ source( "bronze_streamline", "blocks") }}'
)
) A
{% if is_incremental() %}
WHERE
last_modified >= (
SELECT
MAX(max_INSERTED_TIMESTAMP)
FROM
max_date
)
{% endif %}
)
SELECT
MD5(
CAST(COALESCE(CAST(block_number AS text), '') AS text)
) AS id,
block_number,
(
{% if is_incremental() %}
SELECT
MAX(max_INSERTED_TIMESTAMP)
FROM
max_date
{% else %}
SYSDATE()
{% endif %}) AS _inserted_timestamp
FROM
{{ source(
"bronze_streamline",
"blocks"
) }}
t
JOIN meta b
ON b._partition_by_block_number = t._partition_by_block_id
WHERE
b._partition_by_block_number = t._partition_by_block_id qualify(ROW_NUMBER() over (PARTITION BY id
ORDER BY
_inserted_timestamp DESC)) = 1

View File

@ -0,0 +1,63 @@
{{ config (
materialized = "incremental",
unique_key = "id",
cluster_by = "ROUND(block_number, -3)",
merge_update_columns = ["id"],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(id)"
) }}
WITH max_date AS (
SELECT
COALESCE(MAX(_INSERTED_TIMESTAMP), '1970-01-01' :: DATE) max_INSERTED_TIMESTAMP
FROM
{{ this }}),
meta AS (
SELECT
CAST(
SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER
) AS _partition_by_block_number
FROM
TABLE(
information_schema.external_table_files(
table_name => '{{ source( "bronze_streamline", "eth_getBlockReceipts") }}'
)
) A
{% if is_incremental() %}
WHERE
last_modified >= (
SELECT
MAX(max_INSERTED_TIMESTAMP)
FROM
max_date
)
{% endif %}
)
SELECT
MD5(
CAST(COALESCE(CAST(block_number AS text), '') AS text)
) AS id,
block_number,
(
{% if is_incremental() %}
SELECT
MAX(max_INSERTED_TIMESTAMP)
FROM
max_date
{% else %}
SYSDATE()
{% endif %}) AS _inserted_timestamp
FROM
{{ source(
"bronze_streamline",
"eth_getBlockReceipts"
) }}
t
JOIN meta b
ON b._partition_by_block_number = t._partition_by_block_id
WHERE
b._partition_by_block_number = t._partition_by_block_id qualify(ROW_NUMBER() over (PARTITION BY id
ORDER BY
_inserted_timestamp DESC)) = 1

View File

@ -0,0 +1,63 @@
{{ config (
materialized = "incremental",
unique_key = "id",
cluster_by = "ROUND(block_number, -3)",
merge_update_columns = ["id"],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(id)"
) }}
WITH max_date AS (
SELECT
COALESCE(MAX(_INSERTED_TIMESTAMP), '1970-01-01' :: DATE) max_INSERTED_TIMESTAMP
FROM
{{ this }}),
meta AS (
SELECT
CAST(
SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER
) AS _partition_by_block_number
FROM
TABLE(
information_schema.external_table_files(
table_name => '{{ source( "bronze_streamline", "trace_blocks") }}'
)
) A
{% if is_incremental() %}
WHERE
last_modified >= (
SELECT
MAX(max_INSERTED_TIMESTAMP)
FROM
max_date
)
{% endif %}
)
SELECT
MD5(
CAST(COALESCE(CAST(block_number AS text), '') AS text)
) AS id,
block_number,
(
{% if is_incremental() %}
SELECT
MAX(max_INSERTED_TIMESTAMP)
FROM
max_date
{% else %}
SYSDATE()
{% endif %}) AS _inserted_timestamp
FROM
{{ source(
"bronze_streamline",
"trace_blocks"
) }}
t
JOIN meta b
ON b._partition_by_block_number = t._partition_by_block_id
WHERE
b._partition_by_block_number = t._partition_by_block_id qualify(ROW_NUMBER() over (PARTITION BY id
ORDER BY
_inserted_timestamp DESC)) = 1

View File

@ -0,0 +1,63 @@
{{ config (
materialized = "incremental",
unique_key = "id",
cluster_by = "ROUND(block_number, -3)",
merge_update_columns = ["id"],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(id)"
) }}
WITH max_date AS (
SELECT
COALESCE(MAX(_INSERTED_TIMESTAMP), '1970-01-01' :: DATE) max_INSERTED_TIMESTAMP
FROM
{{ this }}),
meta AS (
SELECT
CAST(
SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER
) AS _partition_by_block_number
FROM
TABLE(
information_schema.external_table_files(
table_name => '{{ source( "bronze_streamline", "transactions") }}'
)
) A
{% if is_incremental() %}
WHERE
last_modified >= (
SELECT
MAX(max_INSERTED_TIMESTAMP)
FROM
max_date
)
{% endif %}
)
SELECT
MD5(
CAST(COALESCE(CAST(block_number AS text), '') AS text)
) AS id,
block_number,
(
{% if is_incremental() %}
SELECT
MAX(max_INSERTED_TIMESTAMP)
FROM
max_date
{% else %}
SYSDATE()
{% endif %}) AS _inserted_timestamp
FROM
{{ source(
"bronze_streamline",
"transactions"
) }}
t
JOIN meta b
ON b._partition_by_block_number = t._partition_by_block_id
WHERE
b._partition_by_block_number = t._partition_by_block_id qualify(ROW_NUMBER() over (PARTITION BY id
ORDER BY
_inserted_timestamp DESC)) = 1

View File

@ -0,0 +1,30 @@
{{ config (
materialized = "view",
post_hook = if_data_call_function(
func = "{{this.schema}}.udf_bulk_json_rpc(object_construct('sql_source', '{{this.identifier}}', 'external_table', 'eth_getBlockReceipts', 'sql_limit', {{var('sql_limit','900')}}, 'producer_batch_size', {{var('producer_batch_size','300')}}, 'worker_batch_size', {{var('worker_batch_size','300')}}, 'batch_call_limit', {{var('batch_call_limit','30')}}))",
target = "{{this.schema}}.{{this.identifier}}"
)
) }}
SELECT
PARSE_JSON(
CONCAT(
'{"method": "eth_getBlockReceipts", "params":[',
block_number :: STRING,
'],"id":',
block_number :: STRING,
'}'
)
) AS request
FROM
{{ ref("streamline__blocks") }}
WHERE
block_number > 1000000
AND block_number IS NOT NULL
EXCEPT
SELECT
block_number
FROM
{{ ref("streamline__complete_eth_getBlockReceipts") }}
WHERE
block_number > 1000000

View File

@ -0,0 +1,30 @@
{{ config (
materialized = "view",
post_hook = if_data_call_function(
func = "{{this.schema}}.udf_bulk_json_rpc(object_construct('sql_source', '{{this.identifier}}', 'external_table', 'trace_blocks', 'sql_limit', {{var('sql_limit','900')}}, 'producer_batch_size', {{var('producer_batch_size','300')}}, 'worker_batch_size', {{var('worker_batch_size','300')}}, 'batch_call_limit', {{var('batch_call_limit','30')}}))",
target = "{{this.schema}}.{{this.identifier}}"
)
) }}
SELECT
PARSE_JSON(
CONCAT(
'{"method": "trace_block", "params":[',
block_number :: STRING,
'],"id":',
block_number :: STRING,
'}'
)
) AS request
FROM
{{ ref("streamline__blocks") }}
WHERE
block_number > 1000000
AND block_number IS NOT NULL
EXCEPT
SELECT
block_number
FROM
{{ ref("streamline__complete_trace_blocks") }}
WHERE
block_number > 1000000

View File

@ -0,0 +1,32 @@
{{ config (
materialized = "view",
post_hook = if_data_call_function(
func = "{{this.schema}}.udf_bulk_json_rpc(object_construct('sql_source', '{{this.identifier}}', 'external_table', 'transactions', 'sql_limit', {{var('sql_limit','900')}}, 'producer_batch_size', {{var('producer_batch_size','300')}}, 'worker_batch_size', {{var('worker_batch_size','300')}}, 'batch_call_limit', {{var('batch_call_limit','30')}}))",
target = "{{this.schema}}.{{this.identifier}}"
)
) }}
SELECT
PARSE_JSON(
CONCAT(
'{"method": "eth_getBlockByNumber", "params":[',
block_number :: STRING,
',',
TRUE :: BOOLEAN,
'],"id":',
block_number :: STRING,
'}'
)
) AS request
FROM
{{ ref("streamline__blocks") }}
WHERE
block_number > 1000000
AND block_number IS NOT NULL
EXCEPT
SELECT
block_number
FROM
{{ ref("streamline__complete_blocks") }}
WHERE
block_number > 1000000

View File

@ -1,8 +1,17 @@
version: 2
sources:
- name: ethereum
- name: ethereum
database: ethereum
schema: core
tables:
- name: fact_hourly_token_prices
schema: core
tables:
- name: fact_hourly_token_prices
- name: bronze_streamline
database: streamline
schema: |
{{ "BASE_DEV" if var("STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES", False) else "BASE" }}
tables:
- name: blocks
- name: transactions
- name: trace_blocks
- name: eth_getBlockReceipts