This commit is contained in:
drethereum 2024-08-20 20:08:56 +00:00 committed by GitHub
commit b5e69e4ba1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 182 additions and 8 deletions

View File

@ -8,5 +8,6 @@
{% if var("UPDATE_UDFS_AND_SPS") %}
{{ create_udf_bulk_rest_api_v2() }}
{{ create_udf_bulk_decode_logs() }}
{{ create_udf_bulk_decode_traces() }}
{% endif %}
{% endmacro %}

View File

@ -90,7 +90,8 @@ WHERE
{% macro streamline_external_table_query_v2(
model,
partition_function
partition_function,
evm_balances=False
) %}
WITH meta AS (
SELECT
@ -107,7 +108,10 @@ WHERE
SELECT
s.*,
b.file_name,
_inserted_timestamp
b._inserted_timestamp
{% if evm_balances %}
, r.block_timestamp :: TIMESTAMP AS block_timestamp
{% endif %}
FROM
{{ source(
"bronze_streamline",
@ -117,15 +121,21 @@ WHERE
JOIN meta b
ON b.file_name = metadata$filename
AND b.partition_key = s.partition_key
{% if evm_balances %}
JOIN {{ ref('_block_ranges') }} r
ON r.block_number = COALESCE(s.VALUE :"BLOCK_NUMBER" :: INT,s.VALUE :"block_number" :: INT)
{% endif %}
WHERE
b.partition_key = s.partition_key
AND DATA :error IS NULL
AND DATA is not null
AND DATA IS NOT NULL
{% endmacro %}
{% macro streamline_external_table_FR_query_v2(
model,
partition_function
partition_function,
partition_column="partition_key",
evm_balances=False
) %}
WITH meta AS (
SELECT
@ -142,7 +152,10 @@ WHERE
SELECT
s.*,
b.file_name,
_inserted_timestamp
b._inserted_timestamp
{% if evm_balances %}
, r.block_timestamp :: TIMESTAMP AS block_timestamp
{% endif %}
FROM
{{ source(
"bronze_streamline",
@ -151,9 +164,102 @@ FROM
s
JOIN meta b
ON b.file_name = metadata$filename
AND b.partition_key = s.partition_key
AND b.partition_key = s.{{ partition_column }}
{% if evm_balances %}
JOIN {{ ref('_block_ranges') }} r
ON r.block_number = COALESCE(s.VALUE :"BLOCK_NUMBER" :: INT,s.VALUE :"block_number" :: INT)
{% endif %}
WHERE
b.partition_key = s.partition_key
b.partition_key = s.{{ partition_column }}
AND DATA :error IS NULL
AND DATA is not null
AND DATA IS NOT NULL
{% endmacro %}
{% macro streamline_external_table_query_decoder(
model
) %}
WITH meta AS (
SELECT
job_created_time AS _inserted_timestamp,
file_name,
CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 6), '_', 1) AS INTEGER) AS _partition_by_block_number,
TO_DATE(
concat_ws('-', SPLIT_PART(file_name, '/', 3), SPLIT_PART(file_name, '/', 4), SPLIT_PART(file_name, '/', 5))
) AS _partition_by_created_date
FROM
TABLE(
information_schema.external_table_file_registration_history(
start_time => DATEADD('day', -3, CURRENT_TIMESTAMP()),
table_name => '{{ source( "bronze_streamline", model) }}')
) A
)
SELECT
block_number,
id :: STRING AS id,
DATA,
_inserted_timestamp,
s._partition_by_block_number AS _partition_by_block_number,
s._partition_by_created_date AS _partition_by_created_date
FROM
{{ source(
"bronze_streamline",
model
) }}
s
JOIN meta b
ON b.file_name = metadata$filename
AND b._partition_by_block_number = s._partition_by_block_number
AND b._partition_by_created_date = s._partition_by_created_date
WHERE
b._partition_by_block_number = s._partition_by_block_number
AND b._partition_by_created_date = s._partition_by_created_date
AND s._partition_by_created_date >= DATEADD('day', -2, CURRENT_TIMESTAMP())
AND DATA :error IS NULL
AND DATA IS NOT NULL
{% endmacro %}
{% macro streamline_external_table_FR_query_decoder(
model
) %}
WITH meta AS (
SELECT
registered_on AS _inserted_timestamp,
file_name,
CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 6), '_', 1) AS INTEGER) AS _partition_by_block_number,
TO_DATE(
concat_ws('-', SPLIT_PART(file_name, '/', 3), SPLIT_PART(file_name, '/', 4), SPLIT_PART(file_name, '/', 5))
) AS _partition_by_created_date
FROM
TABLE(
information_schema.external_table_files(
table_name => '{{ source( "bronze_streamline", model) }}'
)
) A
)
SELECT
block_number,
id :: STRING AS id,
DATA,
_inserted_timestamp,
s._partition_by_block_number AS _partition_by_block_number,
s._partition_by_created_date AS _partition_by_created_date
FROM
{{ source(
"bronze_streamline",
model
) }}
s
JOIN meta b
ON b.file_name = metadata$filename
AND b._partition_by_block_number = s._partition_by_block_number
AND b._partition_by_created_date = s._partition_by_created_date
WHERE
b._partition_by_block_number = s._partition_by_block_number
AND b._partition_by_created_date = s._partition_by_created_date
AND DATA :error IS NULL
AND DATA IS NOT NULL
{% endmacro %}

View File

@ -46,6 +46,30 @@
{% do adapter.execute(sql) %}
{% endmacro %}
{% macro create_udf_bulk_decode_traces() %}
{{ log("Creating udf udf_bulk_decode_traces_v2 for target:" ~ target.name ~ ", schema: " ~ target.schema ~ ", DB: " ~ target.database, info=True) }}
{{ log("role:" ~ target.role ~ ", user:" ~ target.user, info=True) }}
{% set sql %}
CREATE OR REPLACE EXTERNAL FUNCTION streamline.udf_bulk_decode_traces_v2(json object) returns array api_integration =
{% if target.name == "prod" %}
{{ log("Creating prod udf_bulk_decode_traces_v2", info=True) }}
{{ var("API_INTEGRATION") }} AS 'https://{{ var("EXTERNAL_FUNCTION_URI") | lower }}bulk_decode_traces'
{% elif target.name == "dev" %}
{{ log("Creating dev udf_bulk_decode_traces_v2", info=True) }}
{{ var("API_INTEGRATION") }} AS 'https://{{ var("EXTERNAL_FUNCTION_URI") | lower }}bulk_decode_traces'
{% elif target.name == "sbx" %}
{{ log("Creating stg udf_bulk_decode_traces_v2", info=True) }}
{{ var("API_INTEGRATION") }} AS 'https://{{ var("EXTERNAL_FUNCTION_URI") | lower }}bulk_decode_traces'
{% else %}
{{ log("Creating default (dev) udf_bulk_decode_traces_v2", info=True) }}
{{ var("config")["dev"]["API_INTEGRATION"] }} AS 'https://{{ var("config")["dev"]["EXTERNAL_FUNCTION_URI"] | lower }}bulk_decode_traces'
{% endif %};
{% endset %}
{{ log(sql, info=True) }}
{% do adapter.execute(sql) %}
{% endmacro %}
{% macro create_aws_api_integrations() %}
{{ log("Creating api integration for target:" ~ target.name ~ ", schema: " ~ target.schema ~ ", DB: " ~ target.database, info=True) }}
{{ log("role:" ~ target.role ~ ", user:" ~ target.user, info=True) }}

View File

@ -118,4 +118,47 @@
SELECT
NULL
{% endif %}
{% endmacro %}
{% macro if_data_call_wait() %}
{% if var(
"STREAMLINE_INVOKE_STREAMS"
) %}
{% set query %}
SELECT
1
WHERE
EXISTS(
SELECT
1
FROM
{{ model.schema ~ "." ~ model.alias }}
LIMIT
1
) {% endset %}
{% if execute %}
{% set results = run_query(
query
) %}
{% if results %}
{{ log(
"Waiting...",
info = True
) }}
{% set wait_query %}
SELECT
system$wait(
{{ var(
"WAIT",
400
) }}
) {% endset %}
{% do run_query(wait_query) %}
{% else %}
SELECT
NULL;
{% endif %}
{% endif %}
{% endif %}
{% endmacro %}