updated models and macros

This commit is contained in:
Mike Stepanovic 2025-05-08 14:25:45 -06:00
parent 6e34ac02e7
commit 3fe9881a4d
23 changed files with 97 additions and 186 deletions

View File

@ -1,70 +0,0 @@
{% macro streamline_external_table_query_v2(
model,
partition_function
) %}
{% set days = var("BRONZE_LOOKBACK_DAYS")%}
WITH meta AS (
SELECT
last_modified AS inserted_timestamp,
file_name,
{{ partition_function }} AS partition_key
FROM
TABLE(
information_schema.external_table_file_registration_history(
start_time => DATEADD('day', -ABS({{days}}), CURRENT_TIMESTAMP()),
table_name => '{{ source( "bronze_streamline", model) }}')
) A
)
SELECT
s.*,
b.file_name,
inserted_timestamp
FROM
{{ source(
"bronze_streamline",
model
) }}
s
JOIN meta b
ON b.file_name = metadata$filename
AND b.partition_key = s.partition_key
WHERE
b.partition_key = s.partition_key
AND DATA :error IS NULL
{% endmacro %}
{% macro streamline_external_table_FR_query_v2(
model,
partition_function
) %}
WITH meta AS (
SELECT
registered_on AS inserted_timestamp,
file_name,
{{ partition_function }} AS partition_key
FROM
TABLE(
information_schema.external_table_files(
table_name => '{{ source( "bronze_streamline", model) }}'
)
) A
)
SELECT
s.*,
b.file_name,
inserted_timestamp
FROM
{{ source(
"bronze_streamline",
model
) }}
s
JOIN meta b
ON b.file_name = metadata$filename
AND b.partition_key = s.partition_key
WHERE
b.partition_key = s.partition_key
AND DATA :error IS NULL
{% endmacro %}

View File

@ -1,6 +1,9 @@
{% macro create_ibc_streamline_udfs() %}
{% if var("UPDATE_UDFS_AND_SPS") %}
{% do run_query("CREATE SCHEMA IF NOT EXISTS streamline") %}
{{ create_udf_bulk_rest_api_v2() }}
{{ create_udf_bulk_decode_logs() }}
{{ create_udf_bulk_decode_traces() }}
{% endif %}
{% endmacro %}

View File

@ -6,10 +6,10 @@
CREATE OR REPLACE EXTERNAL FUNCTION streamline.udf_bulk_rest_api_v2(json object) returns array api_integration =
{% if target.name == "prod" %}
{{ log("Creating prod udf_bulk_rest_api_v2", info=True) }}
{{ var("API_INTEGRATION_PROD") }} AS 'https://{{ var("EXTERNAL_FUNCTION_URI") | lower }}udf_bulk_rest_api'
{{ var("API_INTEGRATION") }} AS 'https://{{ var("EXTERNAL_FUNCTION_URI") | lower }}udf_bulk_rest_api'
{% elif target.name == "dev" %}
{{ log("Creating dev udf_bulk_rest_api_v2", info=True) }}
{{ var("API_INTEGRATION_DEV") }} AS 'https://{{ var("EXTERNAL_FUNCTION_URI") | lower }}udf_bulk_rest_api'
{{ var("API_INTEGRATION") }} AS 'https://{{ var("EXTERNAL_FUNCTION_URI") | lower }}udf_bulk_rest_api'
{% else %}
{{ log("Creating default (dev) udf_bulk_rest_api_v2", info=True) }}
{{ var("config")["dev"]["API_INTEGRATION"] }} AS 'https://{{ var("config")["dev"]["EXTERNAL_FUNCTION_URI"] | lower }}udf_bulk_rest_api'

View File

@ -0,0 +1,11 @@
{% macro generate_schema_name(custom_schema_name=none, node=none) -%}
{% set node_name = node.name %}
{% set split_name = node_name.split('__') %}
{{ split_name[0] | trim }}
{%- endmacro %}
{% macro generate_alias_name(custom_alias_name=none, node=none) -%}
{% set node_name = node.name %}
{% set split_name = node_name.split('__') %}
{{ split_name[1] | trim }}
{%- endmacro %}

View File

@ -0,0 +1,14 @@
{% macro log_model_details() %}
{%- if execute -%}
/*
DBT Model Config:
{{ model.config | tojson(indent=2) }}
*/
/*
Raw Code:
{{ model.raw_code }}
*/
{%- endif -%}
{% endmacro %}

View File

@ -1,141 +1,66 @@
{% macro streamline_external_table_query(
source_name,
source_version,
partition_function,
balances,
block_id,
uses_receipts_by_hash
source_name,
partition_function="CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER)"
) %}
{% if source_version != '' %}
{% set source_version = '_' ~ source_version.lower() %}
{% endif %}
{% set days = var("BRONZE_LOOKBACK_DAYS")%}
WITH meta AS (
SELECT
job_created_time AS _inserted_timestamp,
last_modified AS inserted_timestamp,
file_name,
{{ partition_function }} AS partition_key
FROM
TABLE(
information_schema.external_table_file_registration_history(
start_time => DATEADD('day', -3, CURRENT_TIMESTAMP()),
table_name => '{{ source( "bronze_streamline", source_name ~ source_version) }}')
start_time => DATEADD('day', -ABS({{days}}), CURRENT_TIMESTAMP()),
table_name => '{{ source( "bronze_streamline", source_name) }}')
) A
)
)
SELECT
s.*,
b.file_name,
b._inserted_timestamp
{% if balances %},
r.block_timestamp :: TIMESTAMP AS block_timestamp
{% endif %}
{% if block_id %},
COALESCE(
s.value :"block_id" :: STRING,
s.metadata :request :"data" :id :: STRING,
PARSE_JSON(
s.metadata :request :"data"
) :id :: STRING
) :: INT AS block_id
{% endif %}
{% if uses_receipts_by_hash %},
s.value :"TX_HASH" :: STRING AS tx_hash
{% endif %}
inserted_timestamp
FROM
{{ source(
"bronze_streamline",
source_name ~ source_version
) }}
s
{{ source("bronze_streamline", source_name) }} s
JOIN meta b
ON b.file_name = metadata$filename
AND b.partition_key = s.partition_key
{% if balances %}
JOIN {{ ref('_block_ranges') }}
r
ON r.block_id = COALESCE(
s.value :"block_id" :: INT,
s.value :"block_id" :: INT
)
{% endif %}
WHERE
b.partition_key = s.partition_key
AND DATA :error IS NULL
AND DATA IS NOT NULL
{% endmacro %}
{% macro streamline_external_table_query_fr(
{% macro streamline_external_table_FR_query_v2(
source_name,
source_version,
partition_function,
partition_join_key,
balances,
block_id,
uses_receipts_by_hash
partition_function="CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER)"
) %}
{% if source_version != '' %}
{% set source_version = '_' ~ source_version.lower() %}
{% endif %}
WITH meta AS (
SELECT
registered_on AS _inserted_timestamp,
registered_on AS inserted_timestamp,
file_name,
{{ partition_function }} AS partition_key
FROM
TABLE(
information_schema.external_table_files(
table_name => '{{ source( "bronze_streamline", source_name ~ source_version) }}'
table_name => '{{ source( "bronze_streamline", source_name) }}'
)
) A
)
SELECT
s.*,
b.file_name,
b._inserted_timestamp
{% if balances %},
r.block_timestamp :: TIMESTAMP AS block_timestamp
{% endif %}
{% if block_id %},
COALESCE(
s.value :"block_id" :: STRING,
s.value :"block_id" :: STRING,
s.metadata :request :"data" :id :: STRING,
PARSE_JSON(
s.metadata :request :"data"
) :id :: STRING
) :: INT AS block_id
{% endif %}
{% if uses_receipts_by_hash %},
s.value :"TX_HASH" :: STRING AS tx_hash
{% endif %}
inserted_timestamp
FROM
{{ source(
"bronze_streamline",
source_name ~ source_version
source_name
) }}
s
JOIN meta b
ON b.file_name = metadata$filename
AND b.partition_key = s.{{ partition_join_key }}
{% if balances %}
JOIN {{ ref('_block_ranges') }}
r
ON r.block_id = COALESCE(
s.value :"block_id" :: INT,
s.value :"block_id" :: INT
)
{% endif %}
AND b.partition_key = s.partition_key
WHERE
b.partition_key = s.{{ partition_join_key }}
b.partition_key = s.partition_key
AND DATA :error IS NULL
AND DATA IS NOT NULL
{% endmacro %}
{% endmacro %}

View File

@ -4,6 +4,5 @@
) }}
{{ streamline_external_table_query(
source_name = 'blocks',
source_version = ''
source_name = 'blocks'
) }}

View File

@ -4,6 +4,5 @@
) }}
{{ streamline_external_table_query(
source_name = 'transactions',
source_version = ''
source_name = 'transactions'
) }}

View File

@ -4,6 +4,5 @@
) }}
{{ streamline_external_table_query(
source_name = 'tx_counts',
source_version = ''
source_name = 'tx_counts'
) }}

View File

@ -4,6 +4,5 @@
) }}
{{ streamline_external_table_query_fr(
source_name = 'blocks',
source_version = ''
source_name = 'blocks'
) }}

View File

@ -4,6 +4,5 @@
) }}
{{ streamline_external_table_query_fr(
source_name = 'transactions',
source_version = ''
source_name = 'transactions'
) }}

View File

@ -4,6 +4,5 @@
) }}
{{ streamline_external_table_query_fr(
source_name = 'tx_counts',
source_version = ''
source_name = 'tx_counts'
) }}

View File

@ -8,7 +8,8 @@
unique_key = "block_id",
cluster_by = "ROUND(block_id, -3)",
merge_exclude_columns = ["inserted_timestamp"],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(block_id)"
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(block_id)",
tags = ['streamline','core','complete','phase_1']
) }}
SELECT

View File

@ -9,7 +9,8 @@
unique_key = "complete_transactions_id",
cluster_by = "ROUND(block_id, -3)",
merge_exclude_columns = ["inserted_timestamp"],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(block_id)"
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(block_id)",
tags = ['streamline','core','complete','phase_1']
) }}
SELECT

View File

@ -9,7 +9,8 @@
unique_key = "block_id",
cluster_by = "ROUND(block_id, -3)",
merge_exclude_columns = ["inserted_timestamp"],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(block_id)"
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(block_id)",
tags = ['streamline','core','complete','phase_1']
) }}
SELECT

View File

@ -1,7 +1,12 @@
{# Get variables #}
{% set vars = return_vars() %}
{# Log configuration details #}
{{ log_model_details() }}
{# Set up dbt configuration #}
{{ config (
materialized = 'view',
materialized = "view",
tags = ['streamline','core','realtime','phase_1']
) }}

View File

@ -1,3 +1,15 @@
{# Get variables #}
{% set vars = return_vars() %}
{# Log configuration details #}
{{ log_model_details() }}
{# Set up dbt configuration #}
{{ config (
materialized = "view",
tags = ['streamline','core','realtime','phase_1']
) }}
WITH blocks AS (
SELECT

View File

@ -1,10 +1,12 @@
{# Get variables #}
{% set vars = return_vars() %}
-- depends_on: {{ ref('streamline__tx_counts_complete') }}
{# Log configuration details #}
{{ log_model_details() }}
{# Set up dbt configuration #}
{{ config (
materialized = 'view',
materialized = "view",
tags = ['streamline','core','realtime','phase_1']
) }}

View File

@ -1,6 +1,12 @@
{# Get variables #}
{% set vars = return_vars() %}
{# Log configuration details #}
{{ log_model_details() }}
{{ config (
materialized = "view",
tags = ['streamline_view']
tags = ['streamline','core','chainhead','phase_1']
) }}
SELECT

View File

@ -1,6 +1,12 @@
{# Get variables #}
{% set vars = return_vars() %}
{# Log configuration details #}
{{ log_model_details() }}
{{ config (
materialized = "view",
tags = ['streamline_view']
materialized = 'table',
tags = ['streamline','core','chainhead','phase_1']
) }}
SELECT