From ab1369b78103df3ffe55fbd85bb739a4f3eeae08 Mon Sep 17 00:00:00 2001 From: drethereum Date: Fri, 26 Jul 2024 10:37:10 -0600 Subject: [PATCH 01/11] add trace decoder to evm udf --- macros/create_streamline_udfs.sql | 1 + macros/streamline/udfs.sql | 24 ++++++++++++++++++++++++ 2 files changed, 25 insertions(+) diff --git a/macros/create_streamline_udfs.sql b/macros/create_streamline_udfs.sql index a9bc226..9b2bad9 100644 --- a/macros/create_streamline_udfs.sql +++ b/macros/create_streamline_udfs.sql @@ -8,5 +8,6 @@ {% if var("UPDATE_UDFS_AND_SPS") %} {{ create_udf_bulk_rest_api_v2() }} {{ create_udf_bulk_decode_logs() }} + {{ create_udf_bulk_decode_traces() }} {% endif %} {% endmacro %} diff --git a/macros/streamline/udfs.sql b/macros/streamline/udfs.sql index f25e9eb..c21da4b 100644 --- a/macros/streamline/udfs.sql +++ b/macros/streamline/udfs.sql @@ -46,6 +46,30 @@ {% do adapter.execute(sql) %} {% endmacro %} +{% macro create_udf_bulk_decode_traces() %} + {{ log("Creating udf udf_bulk_decode_traces_v2 for target:" ~ target.name ~ ", schema: " ~ target.schema ~ ", DB: " ~ target.database, info=True) }} + {{ log("role:" ~ target.role ~ ", user:" ~ target.user, info=True) }} + + {% set sql %} + CREATE OR REPLACE EXTERNAL FUNCTION streamline.udf_bulk_decode_traces_v2(json object) returns array api_integration = + {% if target.name == "prod" %} + {{ log("Creating prod udf_bulk_decode_traces_v2", info=True) }} + {{ var("API_INTEGRATION") }} AS 'https://{{ var("EXTERNAL_FUNCTION_URI") | lower }}bulk_decode_traces' + {% elif target.name == "dev" %} + {{ log("Creating dev udf_bulk_decode_traces_v2", info=True) }} + {{ var("API_INTEGRATION") }} AS 'https://{{ var("EXTERNAL_FUNCTION_URI") | lower }}bulk_decode_traces' + {% elif target.name == "sbx" %} + {{ log("Creating stg udf_bulk_decode_traces_v2", info=True) }} + {{ var("API_INTEGRATION") }} AS 'https://{{ var("EXTERNAL_FUNCTION_URI") | lower }}bulk_decode_traces' + {% else %} + {{ log("Creating default (dev) udf_bulk_decode_traces_v2", info=True) }} + {{ var("config")["dev"]["API_INTEGRATION"] }} AS 'https://{{ var("config")["dev"]["EXTERNAL_FUNCTION_URI"] | lower }}bulk_decode_traces' + {% endif %}; + {% endset %} + {{ log(sql, info=True) }} + {% do adapter.execute(sql) %} +{% endmacro %} + {% macro create_aws_api_integrations() %} {{ log("Creating api integration for target:" ~ target.name ~ ", schema: " ~ target.schema ~ ", DB: " ~ target.database, info=True) }} {{ log("role:" ~ target.role ~ ", user:" ~ target.user, info=True) }} From 08e36777d8069f2598cb7ff260a2fc410e9e3731 Mon Sep 17 00:00:00 2001 From: drethereum Date: Thu, 1 Aug 2024 09:07:14 -0600 Subject: [PATCH 02/11] if_data_call_wait macro --- macros/streamline/utils.sql | 45 +++++++++++++++++++++++++++++++++++++ 1 file changed, 45 insertions(+) diff --git a/macros/streamline/utils.sql b/macros/streamline/utils.sql index b9411b8..da95b2f 100644 --- a/macros/streamline/utils.sql +++ b/macros/streamline/utils.sql @@ -118,4 +118,49 @@ SELECT NULL {% endif %} +{% endmacro %} + +{% macro if_data_call_wait( + seconds + ) %} + {% if var( + "STREAMLINE_INVOKE_STREAMS" + ) %} + {% set query %} + SELECT + 1 + WHERE + EXISTS( + SELECT + 1 + FROM + {{ model.schema ~ "." ~ model.alias }} + LIMIT + 1 + ) {% endset %} + {% if execute %} + {% set results = run_query( + query + ) %} + {% if results %} + {{ log( + "Waiting...", + info = True + ) }} + + {% set wait_query %} + SELECT + system$wait( + {{ var( + "WAIT", + {{ seconds }} + ) }} + ) {% endset %} + {% do run_query(wait_query) %} + {% else %} + SELECT + NULL; + {% endif %} + {% endif %} + {% endif %} {% endmacro %} \ No newline at end of file From c4e9725156ebf9ddc934929a5a0af1a27980b6f3 Mon Sep 17 00:00:00 2001 From: drethereum Date: Thu, 1 Aug 2024 09:24:38 -0600 Subject: [PATCH 03/11] seconds --- macros/streamline/utils.sql | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/macros/streamline/utils.sql b/macros/streamline/utils.sql index da95b2f..ed27f2b 100644 --- a/macros/streamline/utils.sql +++ b/macros/streamline/utils.sql @@ -120,9 +120,7 @@ {% endif %} {% endmacro %} -{% macro if_data_call_wait( - seconds - ) %} +{% macro if_data_call_wait() %} {% if var( "STREAMLINE_INVOKE_STREAMS" ) %} @@ -153,7 +151,7 @@ system$wait( {{ var( "WAIT", - {{ seconds }} + 400 ) }} ) {% endset %} {% do run_query(wait_query) %} From 6d37d9df2adba6988f5ba275de9860e02ccb5e75 Mon Sep 17 00:00:00 2001 From: drethereum Date: Tue, 6 Aug 2024 15:11:55 -0600 Subject: [PATCH 04/11] partition_column --- macros/streamline/models.sql | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/macros/streamline/models.sql b/macros/streamline/models.sql index 21f5133..7e019e7 100644 --- a/macros/streamline/models.sql +++ b/macros/streamline/models.sql @@ -90,7 +90,8 @@ WHERE {% macro streamline_external_table_query_v2( model, - partition_function + partition_function, + partition_column="partition_key" ) %} WITH meta AS ( SELECT @@ -116,16 +117,17 @@ WHERE s JOIN meta b ON b.file_name = metadata$filename - AND b.partition_key = s.partition_key + AND b.partition_key = s.{{ partition_column }} WHERE - b.partition_key = s.partition_key + b.partition_key = s.{{ partition_column }} AND DATA :error IS NULL AND DATA is not null {% endmacro %} {% macro streamline_external_table_FR_query_v2( model, - partition_function + partition_function, + partition_column="partition_key" ) %} WITH meta AS ( SELECT @@ -151,9 +153,9 @@ FROM s JOIN meta b ON b.file_name = metadata$filename - AND b.partition_key = s.partition_key + AND b.partition_key = s.{{ partition_column }} WHERE - b.partition_key = s.partition_key + b.partition_key = s.{{ partition_column }} AND DATA :error IS NULL AND DATA is not null {% endmacro %} From 5766237e88477bc1c03e09ac5a5e0f6755eb88e9 Mon Sep 17 00:00:00 2001 From: drethereum Date: Tue, 6 Aug 2024 16:41:42 -0600 Subject: [PATCH 05/11] balances if --- macros/streamline/models.sql | 24 ++++++++++++++++++++---- 1 file changed, 20 insertions(+), 4 deletions(-) diff --git a/macros/streamline/models.sql b/macros/streamline/models.sql index 7e019e7..567c0fb 100644 --- a/macros/streamline/models.sql +++ b/macros/streamline/models.sql @@ -91,7 +91,8 @@ WHERE {% macro streamline_external_table_query_v2( model, partition_function, - partition_column="partition_key" + partition_column="partition_key", + evm_balances=False ) %} WITH meta AS ( SELECT @@ -109,6 +110,9 @@ WHERE s.*, b.file_name, _inserted_timestamp + {% if evm_balances %} + , r.block_timestamp :: TIMESTAMP AS block_timestamp + {% endif %} FROM {{ source( "bronze_streamline", @@ -118,16 +122,21 @@ WHERE JOIN meta b ON b.file_name = metadata$filename AND b.partition_key = s.{{ partition_column }} + {% if evm_balances %} + JOIN {{ ref('_block_ranges') }} r + ON r.block_number = s.block_number + {% endif %} WHERE b.partition_key = s.{{ partition_column }} AND DATA :error IS NULL - AND DATA is not null + AND DATA IS NOT NULL {% endmacro %} {% macro streamline_external_table_FR_query_v2( model, partition_function, - partition_column="partition_key" + partition_column="partition_key", + evm_balances=False ) %} WITH meta AS ( SELECT @@ -145,6 +154,9 @@ SELECT s.*, b.file_name, _inserted_timestamp + {% if evm_balances %} + , r.block_timestamp :: TIMESTAMP AS block_timestamp + {% endif %} FROM {{ source( "bronze_streamline", @@ -154,8 +166,12 @@ FROM JOIN meta b ON b.file_name = metadata$filename AND b.partition_key = s.{{ partition_column }} + {% if evm_balances %} + JOIN {{ ref('_block_ranges') }} r + ON r.block_number = s.block_number + {% endif %} WHERE b.partition_key = s.{{ partition_column }} AND DATA :error IS NULL - AND DATA is not null + AND DATA IS NOT NULL {% endmacro %} From cd68987512d8d183446f5dbbf60da2b12009d6c8 Mon Sep 17 00:00:00 2001 From: drethereum Date: Tue, 6 Aug 2024 17:23:43 -0600 Subject: [PATCH 06/11] alias --- macros/streamline/models.sql | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/macros/streamline/models.sql b/macros/streamline/models.sql index 567c0fb..6392c50 100644 --- a/macros/streamline/models.sql +++ b/macros/streamline/models.sql @@ -109,7 +109,7 @@ WHERE SELECT s.*, b.file_name, - _inserted_timestamp + b._inserted_timestamp {% if evm_balances %} , r.block_timestamp :: TIMESTAMP AS block_timestamp {% endif %} @@ -153,7 +153,7 @@ WHERE SELECT s.*, b.file_name, - _inserted_timestamp + b._inserted_timestamp {% if evm_balances %} , r.block_timestamp :: TIMESTAMP AS block_timestamp {% endif %} From 691a8d0a09b287740585c523df38a00c3058a219 Mon Sep 17 00:00:00 2001 From: drethereum Date: Tue, 6 Aug 2024 17:33:34 -0600 Subject: [PATCH 07/11] join --- macros/streamline/models.sql | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/macros/streamline/models.sql b/macros/streamline/models.sql index 6392c50..6639713 100644 --- a/macros/streamline/models.sql +++ b/macros/streamline/models.sql @@ -124,7 +124,7 @@ WHERE AND b.partition_key = s.{{ partition_column }} {% if evm_balances %} JOIN {{ ref('_block_ranges') }} r - ON r.block_number = s.block_number + ON r.block_number = s.COALESCE(VALUE :"BLOCK_NUMBER" :: INT,VALUE :"block_number" :: INT) {% endif %} WHERE b.partition_key = s.{{ partition_column }} @@ -168,7 +168,7 @@ FROM AND b.partition_key = s.{{ partition_column }} {% if evm_balances %} JOIN {{ ref('_block_ranges') }} r - ON r.block_number = s.block_number + ON r.block_number = s.COALESCE(VALUE :"BLOCK_NUMBER" :: INT,VALUE :"block_number" :: INT) {% endif %} WHERE b.partition_key = s.{{ partition_column }} From 02db244318d386ade6d6095776752f0250a51487 Mon Sep 17 00:00:00 2001 From: drethereum Date: Tue, 6 Aug 2024 17:35:51 -0600 Subject: [PATCH 08/11] typo --- macros/streamline/models.sql | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/macros/streamline/models.sql b/macros/streamline/models.sql index 6639713..541f689 100644 --- a/macros/streamline/models.sql +++ b/macros/streamline/models.sql @@ -124,7 +124,7 @@ WHERE AND b.partition_key = s.{{ partition_column }} {% if evm_balances %} JOIN {{ ref('_block_ranges') }} r - ON r.block_number = s.COALESCE(VALUE :"BLOCK_NUMBER" :: INT,VALUE :"block_number" :: INT) + ON r.block_number = COALESCE(s.VALUE :"BLOCK_NUMBER" :: INT,s.VALUE :"block_number" :: INT) {% endif %} WHERE b.partition_key = s.{{ partition_column }} @@ -168,7 +168,7 @@ FROM AND b.partition_key = s.{{ partition_column }} {% if evm_balances %} JOIN {{ ref('_block_ranges') }} r - ON r.block_number = s.COALESCE(VALUE :"BLOCK_NUMBER" :: INT,VALUE :"block_number" :: INT) + ON r.block_number = COALESCE(s.VALUE :"BLOCK_NUMBER" :: INT,s.VALUE :"block_number" :: INT) {% endif %} WHERE b.partition_key = s.{{ partition_column }} From 0382613d49730873b4fc3624a7194a648533c81a Mon Sep 17 00:00:00 2001 From: drethereum Date: Tue, 20 Aug 2024 13:51:53 -0600 Subject: [PATCH 09/11] decoder macro --- macros/streamline/models.sql | 91 ++++++++++++++++++++++++++++++++++-- 1 file changed, 88 insertions(+), 3 deletions(-) diff --git a/macros/streamline/models.sql b/macros/streamline/models.sql index 541f689..78f7108 100644 --- a/macros/streamline/models.sql +++ b/macros/streamline/models.sql @@ -91,7 +91,6 @@ WHERE {% macro streamline_external_table_query_v2( model, partition_function, - partition_column="partition_key", evm_balances=False ) %} WITH meta AS ( @@ -121,13 +120,13 @@ WHERE s JOIN meta b ON b.file_name = metadata$filename - AND b.partition_key = s.{{ partition_column }} + AND b.partition_key = s.partition_key {% if evm_balances %} JOIN {{ ref('_block_ranges') }} r ON r.block_number = COALESCE(s.VALUE :"BLOCK_NUMBER" :: INT,s.VALUE :"block_number" :: INT) {% endif %} WHERE - b.partition_key = s.{{ partition_column }} + b.partition_key = s.partition_key AND DATA :error IS NULL AND DATA IS NOT NULL {% endmacro %} @@ -175,3 +174,89 @@ WHERE AND DATA :error IS NULL AND DATA IS NOT NULL {% endmacro %} + +{% macro streamline_external_table_query_decoder( + model + ) %} + + WITH meta AS ( + + SELECT + job_created_time AS _inserted_timestamp, + file_name, + CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 6), '_', 1) AS INTEGER) AS _partition_by_block_number, + TO_DATE( + concat_ws('-', SPLIT_PART(file_name, '/', 3), SPLIT_PART(file_name, '/', 4), SPLIT_PART(file_name, '/', 5)) + ) AS _partition_by_created_date + FROM + TABLE( + information_schema.external_table_file_registration_history( + start_time => DATEADD('day', -3, CURRENT_TIMESTAMP()), + table_name => '{{ source( "bronze_streamline", model) }}') + ) A + ) + SELECT + block_number, + id :: STRING AS id, + DATA, + _inserted_timestamp, + s._partition_by_block_number AS _partition_by_block_number, + s._partition_by_created_date AS _partition_by_created_date + FROM + {{ source( + "bronze_streamline", + model + ) }} + s + JOIN meta b + ON b.file_name = metadata$filename + AND b._partition_by_block_number = s._partition_by_block_number + AND b._partition_by_created_date = s._partition_by_created_date + WHERE + b._partition_by_block_number = s._partition_by_block_number + AND b._partition_by_created_date = s._partition_by_created_date + AND s._partition_by_created_date >= DATEADD('day', -2, CURRENT_TIMESTAMP()) +{% endmacro %} + +{% macro streamline_external_table_FR_query_decoder( + model + ) %} + + WITH meta AS ( + + SELECT + registered_on AS _inserted_timestamp, + file_name, + CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 6), '_', 1) AS INTEGER) AS _partition_by_block_number, + TO_DATE( + concat_ws('-', SPLIT_PART(file_name, '/', 3), SPLIT_PART(file_name, '/', 4), SPLIT_PART(file_name, '/', 5)) + ) AS _partition_by_created_date + FROM + TABLE( + information_schema.external_table_files( + table_name => '{{ source( "bronze_streamline", model) }}' + ) + ) A + ) + SELECT + block_number, + id :: STRING AS id, + DATA, + _inserted_timestamp, + s._partition_by_block_number AS _partition_by_block_number, + s._partition_by_created_date AS _partition_by_created_date + FROM + {{ source( + "bronze_streamline", + model + ) }} + s + JOIN meta b + ON b.file_name = metadata$filename + AND b._partition_by_block_number = s._partition_by_block_number + AND b._partition_by_created_date = s._partition_by_created_date + WHERE + b._partition_by_block_number = s._partition_by_block_number + AND b._partition_by_created_date = s._partition_by_created_date + AND DATA NOT ILIKE '%"error":%' +{% endmacro %} \ No newline at end of file From 84cafbc5d2fff88e05830b792d2dfa1cf1c12e6d Mon Sep 17 00:00:00 2001 From: drethereum Date: Tue, 20 Aug 2024 14:05:15 -0600 Subject: [PATCH 10/11] remove error --- macros/streamline/models.sql | 1 - 1 file changed, 1 deletion(-) diff --git a/macros/streamline/models.sql b/macros/streamline/models.sql index 78f7108..054737b 100644 --- a/macros/streamline/models.sql +++ b/macros/streamline/models.sql @@ -258,5 +258,4 @@ WHERE WHERE b._partition_by_block_number = s._partition_by_block_number AND b._partition_by_created_date = s._partition_by_created_date - AND DATA NOT ILIKE '%"error":%' {% endmacro %} \ No newline at end of file From bfca8ff2c9fd0b8aae9fc86cc3400699d16b2f27 Mon Sep 17 00:00:00 2001 From: drethereum Date: Tue, 20 Aug 2024 14:08:50 -0600 Subject: [PATCH 11/11] error --- macros/streamline/models.sql | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/macros/streamline/models.sql b/macros/streamline/models.sql index 054737b..886290c 100644 --- a/macros/streamline/models.sql +++ b/macros/streamline/models.sql @@ -216,6 +216,8 @@ WHERE b._partition_by_block_number = s._partition_by_block_number AND b._partition_by_created_date = s._partition_by_created_date AND s._partition_by_created_date >= DATEADD('day', -2, CURRENT_TIMESTAMP()) + AND DATA :error IS NULL + AND DATA IS NOT NULL {% endmacro %} {% macro streamline_external_table_FR_query_decoder( @@ -258,4 +260,6 @@ WHERE WHERE b._partition_by_block_number = s._partition_by_block_number AND b._partition_by_created_date = s._partition_by_created_date + AND DATA :error IS NULL + AND DATA IS NOT NULL {% endmacro %} \ No newline at end of file