From 93e22d09bb1ed521972bdfba165a1623a8a6f8f8 Mon Sep 17 00:00:00 2001 From: Austin Date: Fri, 13 Jun 2025 13:32:08 -0400 Subject: [PATCH 1/4] 2.0 stash --- dbt_project.yml | 26 ++++++ macros/create_udfs.sql | 8 +- macros/js_hextoint.sql | 6 -- macros/python/udfs.sql | 21 ----- macros/streamline/api_integrations.sql | 17 ---- macros/streamline/get_base_table_udtf.sql | 22 ----- macros/streamline/streamline_udfs.sql | 86 +++++++++++++++---- models/admin/admin__number_sequence.sql | 19 ++++ models/admin/admin__number_sequence.yml | 14 +++ models/bronze/core/bronze__blocks.sql | 39 +++++++++ models/bronze/core/bronze__blocks_fr.sql | 26 ++++++ ...FR_traces.sql => bronze__blocks_fr_v1.sql} | 43 +++++----- models/bronze/core/bronze__blocks_fr_v2.sql | 39 +++++++++ models/bronze/core/bronze__receipts.sql | 0 models/bronze/core/bronze__receipts_fr.sql | 0 ...eceipts.sql => bronze__receipts_fr_v1.sql} | 0 models/bronze/core/bronze__receipts_fr_v2.sql | 0 .../core/bronze__streamline_FR_blocks.sql | 11 --- ...nze__streamline_FR_blocks_transactions.sql | 11 --- .../bronze__streamline_FR_transactions.sql | 79 ----------------- .../bronze__streamline_FR_tx_receipts.sql | 55 ------------ .../bronze/core/bronze__streamline_blocks.sql | 11 --- ...bronze__streamline_blocks_transactions.sql | 11 --- models/bronze/core/bronze__traces.sql | 0 models/bronze/core/bronze__traces_fr.sql | 0 ...ne_traces.sql => bronze__traces_fr_v1.sql} | 0 models/bronze/core/bronze__traces_fr_v2.sql | 0 models/bronze/core/bronze__transactions.sql | 39 +++++++++ .../bronze/core/bronze__transactions_fr.sql | 26 ++++++ ...ons.sql => bronze__transactions_fr_v1.sql} | 6 ++ .../core/bronze__transactions_fr_v2.sql | 39 +++++++++ models/silver/core/silver__blocks.sql | 6 +- models/silver/core/silver__receipts.sql | 6 +- models/silver/core/silver__traces.sql | 6 +- models/silver/core/silver__transactions.sql | 6 +- models/silver/streamline/_block_lookback.sql | 11 +++ .../complete/streamline__complete_blocks.sql | 6 +- ...reamline__complete_blocks_transactions.sql | 30 ------- .../complete/streamline__complete_traces.sql | 6 +- .../streamline__complete_transactions.sql | 6 +- .../streamline__complete_tx_receipts.sql | 6 +- .../realtime/streamline__blocks_realtime.sql | 64 -------------- ...reamline__blocks_transactions_realtime.sql | 33 +++++++ .../streamline__receipts_by_hash_realtime.sql | 36 ++++++++ .../realtime/streamline__traces_realtime.sql | 36 ++++++++ .../streamline__transactions_realtime.sql | 64 -------------- .../streamline__tx_receipts_realtime.sql | 48 ----------- .../silver/streamline/streamline__blocks.sql | 38 ++++---- .../streamline/streamline__get_chainhead.sql | 28 ++++++ models/sources.yml | 5 +- package-lock.yml | 30 +++---- packages.yml | 2 +- 52 files changed, 567 insertions(+), 560 deletions(-) delete mode 100644 macros/js_hextoint.sql delete mode 100644 macros/python/udfs.sql delete mode 100644 macros/streamline/api_integrations.sql delete mode 100644 macros/streamline/get_base_table_udtf.sql create mode 100644 models/admin/admin__number_sequence.sql create mode 100644 models/admin/admin__number_sequence.yml create mode 100644 models/bronze/core/bronze__blocks.sql create mode 100644 models/bronze/core/bronze__blocks_fr.sql rename models/bronze/core/{bronze__streamline_FR_traces.sql => bronze__blocks_fr_v1.sql} (50%) create mode 100644 models/bronze/core/bronze__blocks_fr_v2.sql create mode 100644 models/bronze/core/bronze__receipts.sql create mode 100644 models/bronze/core/bronze__receipts_fr.sql rename models/bronze/core/{bronze__streamline_tx_receipts.sql => bronze__receipts_fr_v1.sql} (100%) create mode 100644 models/bronze/core/bronze__receipts_fr_v2.sql delete mode 100644 models/bronze/core/bronze__streamline_FR_blocks.sql delete mode 100644 models/bronze/core/bronze__streamline_FR_blocks_transactions.sql delete mode 100644 models/bronze/core/bronze__streamline_FR_transactions.sql delete mode 100644 models/bronze/core/bronze__streamline_FR_tx_receipts.sql delete mode 100644 models/bronze/core/bronze__streamline_blocks.sql delete mode 100644 models/bronze/core/bronze__streamline_blocks_transactions.sql create mode 100644 models/bronze/core/bronze__traces.sql create mode 100644 models/bronze/core/bronze__traces_fr.sql rename models/bronze/core/{bronze__streamline_traces.sql => bronze__traces_fr_v1.sql} (100%) create mode 100644 models/bronze/core/bronze__traces_fr_v2.sql create mode 100644 models/bronze/core/bronze__transactions.sql create mode 100644 models/bronze/core/bronze__transactions_fr.sql rename models/bronze/core/{bronze__streamline_transactions.sql => bronze__transactions_fr_v1.sql} (95%) create mode 100644 models/bronze/core/bronze__transactions_fr_v2.sql create mode 100644 models/silver/streamline/_block_lookback.sql delete mode 100644 models/silver/streamline/complete/streamline__complete_blocks_transactions.sql delete mode 100644 models/silver/streamline/realtime/streamline__blocks_realtime.sql create mode 100644 models/silver/streamline/realtime/streamline__blocks_transactions_realtime.sql create mode 100644 models/silver/streamline/realtime/streamline__receipts_by_hash_realtime.sql create mode 100644 models/silver/streamline/realtime/streamline__traces_realtime.sql delete mode 100644 models/silver/streamline/realtime/streamline__transactions_realtime.sql delete mode 100644 models/silver/streamline/realtime/streamline__tx_receipts_realtime.sql create mode 100644 models/silver/streamline/streamline__get_chainhead.sql diff --git a/dbt_project.yml b/dbt_project.yml index 2ba4bf7..89c0d84 100644 --- a/dbt_project.yml +++ b/dbt_project.yml @@ -58,6 +58,32 @@ vars: 'silver/streamline' ] + #### STREAMLINE 2.0 BEGIN #### + + API_INTEGRATION: '{{ var("config")[target.name]["API_INTEGRATION"] if var("config")[target.name] else var("config")["dev"]["API_INTEGRATION"] }}' + EXTERNAL_FUNCTION_URI: '{{ var("config")[target.name]["EXTERNAL_FUNCTION_URI"] if var("config")[target.name] else var("config")["dev"]["EXTERNAL_FUNCTION_URI"] }}' + ROLES: | + ["INTERNAL_DEV"] + + config: + # The keys correspond to dbt profiles and are case sensitive + dev: + API_INTEGRATION: AWS_AURORA_API_STG_V2 + EXTERNAL_FUNCTION_URI: ovimat5dxg.execute-api.us-east-1.amazonaws.com/stg/ + ROLES: + - AWS_LAMBDA_AURORA_API + - INTERNAL_DEV + + prod: + API_INTEGRATION: AWS_AURORA_API_PROD_V2 + EXTERNAL_FUNCTION_URI: 0sbz1dl1ik.execute-api.us-east-1.amazonaws.com/prod/ + ROLES: + - AWS_LAMBDA_AURORA_API + - INTERNAL_DEV + - DBT_CLOUD_AURORA + + #### STREAMLINE 2.0 END #### + tests: +store_failures: true # all tests diff --git a/macros/create_udfs.sql b/macros/create_udfs.sql index f816180..82313e6 100644 --- a/macros/create_udfs.sql +++ b/macros/create_udfs.sql @@ -1,11 +1,5 @@ {% macro create_udfs() %} {% if var("UPDATE_UDFS_AND_SPS") %} - {% set sql %} - {{ create_udf_get_chainhead() }} - {{ create_udf_json_rpc() }} - - {% endset %} - {% do run_query(sql) %} {{- fsc_utils.create_udfs() -}} {% endif %} -{% endmacro %} +{% endmacro %} \ No newline at end of file diff --git a/macros/js_hextoint.sql b/macros/js_hextoint.sql deleted file mode 100644 index 8742ac3..0000000 --- a/macros/js_hextoint.sql +++ /dev/null @@ -1,6 +0,0 @@ -{% macro create_js_hex_to_int() %} - CREATE - OR REPLACE FUNCTION {{ target.schema }}.js_hex_to_int ( - s STRING - ) returns DOUBLE LANGUAGE javascript AS 'if (S !== null) { yourNumber = parseInt(S, 16); } return yourNumber' -{% endmacro %} diff --git a/macros/python/udfs.sql b/macros/python/udfs.sql deleted file mode 100644 index df0489e..0000000 --- a/macros/python/udfs.sql +++ /dev/null @@ -1,21 +0,0 @@ -{% macro create_udf_hex_to_int(schema) %} -create or replace function {{ schema }}.udf_hex_to_int(hex string) -returns string -language python -runtime_version = '3.8' -handler = 'hex_to_int' -as -$$ -def hex_to_int(hex) -> str: - """ - Converts hex (of any size) to int (as a string). Snowflake and java script can only handle up to 64-bit (38 digits of precision) - select hex_to_int('200000000000000000000000000000211'); - >> 680564733841876926926749214863536423441 - select hex_to_int('0x200000000000000000000000000000211'); - >> 680564733841876926926749214863536423441 - select hex_to_int(NULL); - >> NULL - """ - return str(int(hex, 16)) if hex else None -$$; -{% endmacro %} \ No newline at end of file diff --git a/macros/streamline/api_integrations.sql b/macros/streamline/api_integrations.sql deleted file mode 100644 index 7414d94..0000000 --- a/macros/streamline/api_integrations.sql +++ /dev/null @@ -1,17 +0,0 @@ -{% macro create_aws_aurora_api() %} - {% if target.name == "prod" %} - {% set sql %} - CREATE api integration IF NOT EXISTS aws_aurora_api api_provider = aws_api_gateway api_aws_role_arn = 'arn:aws:iam::490041342817:role/aurora-api-prod-rolesnowflakeudfsAF733095-3WVDCVO54NPX' api_allowed_prefixes = ( - 'https://sl2f5beopl.execute-api.us-east-1.amazonaws.com/prod/' - ) enabled = TRUE; - {% endset %} - {% do run_query(sql) %} - {% else %} - {% set sql %} - CREATE OR REPLACE api integration aws_aurora_dev_api api_provider = aws_api_gateway api_aws_role_arn = 'arn:aws:iam::490041342817:role/aurora-api-dev-rolesnowflakeudfsAF733095-AN4Q3176CUYA' api_allowed_prefixes = ( - 'https://xh409mek2a.execute-api.us-east-1.amazonaws.com/dev/' - ) enabled = TRUE; - {% endset %} - {% do run_query(sql) %} - {% endif %} -{% endmacro %} \ No newline at end of file diff --git a/macros/streamline/get_base_table_udtf.sql b/macros/streamline/get_base_table_udtf.sql deleted file mode 100644 index f9f65b8..0000000 --- a/macros/streamline/get_base_table_udtf.sql +++ /dev/null @@ -1,22 +0,0 @@ -{% macro create_udtf_get_base_table(schema) %} - CREATE - OR REPLACE FUNCTION {{ schema }}.udtf_get_base_table( - max_height INTEGER - ) returns TABLE ( - height NUMBER - ) AS $$ WITH base AS ( - SELECT - ROW_NUMBER() over ( - ORDER BY - SEQ4() - ) AS id - FROM - TABLE(GENERATOR(rowcount => 500000000)) - ) -SELECT - id AS height -FROM - base -WHERE - id <= max_height $$; -{% endmacro %} diff --git a/macros/streamline/streamline_udfs.sql b/macros/streamline/streamline_udfs.sql index fa1b70c..f459c25 100644 --- a/macros/streamline/streamline_udfs.sql +++ b/macros/streamline/streamline_udfs.sql @@ -1,23 +1,71 @@ -{% macro create_udf_get_chainhead() %} - {% if target.name == "prod" %} - CREATE OR REPLACE EXTERNAL FUNCTION streamline.udf_get_chainhead() returns variant api_integration = aws_aurora_api AS - 'https://sl2f5beopl.execute-api.us-east-1.amazonaws.com/prod/get_chainhead' +{% macro create_udf_bulk_rest_api_v2() %} + {{ log("Creating udf udf_bulk_rest_api for target:" ~ target.name ~ ", schema: " ~ target.schema ~ ", DB: " ~ target.database, info=True) }} + {{ log("role:" ~ target.role ~ ", user:" ~ target.user, info=True) }} + + {% set sql %} + CREATE OR REPLACE EXTERNAL FUNCTION streamline.udf_bulk_rest_api_v2(json object) returns array api_integration = + {% if target.name == "prod" %} + {{ log("Creating prod udf_bulk_rest_api_v2", info=True) }} + {{ var("API_INTEGRATION") }} AS 'https://{{ var("EXTERNAL_FUNCTION_URI") | lower }}udf_bulk_rest_api' + {% elif target.name == "dev" %} + {{ log("Creating dev udf_bulk_rest_api_v2", info=True) }} + {{ var("API_INTEGRATION") }} AS 'https://{{ var("EXTERNAL_FUNCTION_URI") | lower }}udf_bulk_rest_api' + {% elif target.name == "sbx" %} + {{ log("Creating stg udf_bulk_rest_api_v2", info=True) }} + {{ var("API_INTEGRATION") }} AS 'https://{{ var("EXTERNAL_FUNCTION_URI") | lower }}udf_bulk_rest_api' {% else %} - CREATE OR REPLACE EXTERNAL FUNCTION streamline.udf_get_chainhead() returns variant api_integration = aws_aurora_dev_api AS - 'https://xh409mek2a.execute-api.us-east-1.amazonaws.com/dev/get_chainhead' - {%- endif %}; + {{ log("Creating default (dev) udf_bulk_rest_api_v2", info=True) }} + {{ var("config")["dev"]["API_INTEGRATION"] }} AS 'https://{{ var("config")["dev"]["EXTERNAL_FUNCTION_URI"] | lower }}udf_bulk_rest_api' + {% endif %}; + {% endset %} + {{ log(sql, info=True) }} + {% do adapter.execute(sql) %} {% endmacro %} -{% macro create_udf_json_rpc() %} - {% if target.name == "prod" %} - CREATE OR REPLACE EXTERNAL FUNCTION streamline.udf_json_rpc( - json OBJECT - ) returns ARRAY api_integration = aws_aurora_api AS - 'https://sl2f5beopl.execute-api.us-east-1.amazonaws.com/prod/bulk_get_json_rpc' +{% macro create_udf_bulk_decode_logs() %} + {{ log("Creating udf udf_bulk_decode_logs_v2 for target:" ~ target.name ~ ", schema: " ~ target.schema ~ ", DB: " ~ target.database, info=True) }} + {{ log("role:" ~ target.role ~ ", user:" ~ target.user, info=True) }} + + {% set sql %} + CREATE OR REPLACE EXTERNAL FUNCTION streamline.udf_bulk_decode_logs_v2(json object) returns array api_integration = + {% if target.name == "prod" %} + {{ log("Creating prod udf_bulk_decode_logs_v2", info=True) }} + {{ var("API_INTEGRATION") }} AS 'https://{{ var("EXTERNAL_FUNCTION_URI") | lower }}bulk_decode_logs' + {% elif target.name == "dev" %} + {{ log("Creating dev udf_bulk_decode_logs_v2", info=True) }} + {{ var("API_INTEGRATION") }} AS 'https://{{ var("EXTERNAL_FUNCTION_URI") | lower }}bulk_decode_logs' + {% elif target.name == "sbx" %} + {{ log("Creating stg udf_bulk_decode_logs_v2", info=True) }} + {{ var("API_INTEGRATION") }} AS 'https://{{ var("EXTERNAL_FUNCTION_URI") | lower }}bulk_decode_logs' {% else %} - CREATE OR REPLACE EXTERNAL FUNCTION streamline.udf_json_rpc( - json OBJECT - ) returns ARRAY api_integration = aws_aurora_dev_api AS - 'https://xh409mek2a.execute-api.us-east-1.amazonaws.com/dev/bulk_get_json_rpc' - {%- endif %}; -{% endmacro %} \ No newline at end of file + {{ log("Creating default (dev) udf_bulk_decode_logs_v2", info=True) }} + {{ var("config")["dev"]["API_INTEGRATION"] }} AS 'https://{{ var("config")["dev"]["EXTERNAL_FUNCTION_URI"] | lower }}bulk_decode_logs' + {% endif %}; + {% endset %} + {{ log(sql, info=True) }} + {% do adapter.execute(sql) %} +{% endmacro %} + +{% macro create_udf_bulk_decode_traces() %} + {{ log("Creating udf udf_bulk_decode_traces_v2 for target:" ~ target.name ~ ", schema: " ~ target.schema ~ ", DB: " ~ target.database, info=True) }} + {{ log("role:" ~ target.role ~ ", user:" ~ target.user, info=True) }} + + {% set sql %} + CREATE OR REPLACE EXTERNAL FUNCTION streamline.udf_bulk_decode_traces_v2(json object) returns array api_integration = + {% if target.name == "prod" %} + {{ log("Creating prod udf_bulk_decode_traces_v2", info=True) }} + {{ var("API_INTEGRATION") }} AS 'https://{{ var("EXTERNAL_FUNCTION_URI") | lower }}bulk_decode_traces' + {% elif target.name == "dev" %} + {{ log("Creating dev udf_bulk_decode_traces_v2", info=True) }} + {{ var("API_INTEGRATION") }} AS 'https://{{ var("EXTERNAL_FUNCTION_URI") | lower }}bulk_decode_traces' + {% elif target.name == "sbx" %} + {{ log("Creating stg udf_bulk_decode_traces_v2", info=True) }} + {{ var("API_INTEGRATION") }} AS 'https://{{ var("EXTERNAL_FUNCTION_URI") | lower }}bulk_decode_traces' + {% else %} + {{ log("Creating default (dev) udf_bulk_decode_traces_v2", info=True) }} + {{ var("config")["dev"]["API_INTEGRATION"] }} AS 'https://{{ var("config")["dev"]["EXTERNAL_FUNCTION_URI"] | lower }}bulk_decode_traces' + {% endif %}; + {% endset %} + {{ log(sql, info=True) }} + {% do adapter.execute(sql) %} +{% endmacro %} diff --git a/models/admin/admin__number_sequence.sql b/models/admin/admin__number_sequence.sql new file mode 100644 index 0000000..06a2af0 --- /dev/null +++ b/models/admin/admin__number_sequence.sql @@ -0,0 +1,19 @@ +{{ config( + materialized = 'incremental', + cluster_by = 'round(_id,-3)', + post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(_id)", + full_refresh = false, + tags = ['silver','admin','phase_1'] +) }} + +SELECT + ROW_NUMBER() over ( + ORDER BY + SEQ4() + ) - 1 :: INT AS _id +FROM + TABLE(GENERATOR(rowcount => 1000000000)) +WHERE 1=1 +{% if is_incremental() %} + AND 1=0 +{% endif %} \ No newline at end of file diff --git a/models/admin/admin__number_sequence.yml b/models/admin/admin__number_sequence.yml new file mode 100644 index 0000000..ee97bd6 --- /dev/null +++ b/models/admin/admin__number_sequence.yml @@ -0,0 +1,14 @@ +version: 2 +models: + - name: admin__number_sequence + description: | + This model generates a sequence of numbers for a given range. + tests: + - dbt_utils.unique_combination_of_columns: + combination_of_columns: + - _ID + columns: + - name: _ID + tests: + - not_null + description: Primary key for the table \ No newline at end of file diff --git a/models/bronze/core/bronze__blocks.sql b/models/bronze/core/bronze__blocks.sql new file mode 100644 index 0000000..c825eff --- /dev/null +++ b/models/bronze/core/bronze__blocks.sql @@ -0,0 +1,39 @@ +{{ config ( + materialized = 'view', + tags = ['bronze','core','streamline_v1','phase_1'] +) }} + +WITH meta AS ( + SELECT + job_created_time AS _inserted_timestamp, + file_name, + CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER) AS partition_key + FROM + TABLE( + information_schema.external_table_file_registration_history( + start_time => DATEADD('day', -3, CURRENT_TIMESTAMP()), + table_name => '{{ source( "bronze_streamline", "blocks_v2") }}' + ) + ) A +) + SELECT + s.*, + b.file_name, + b._inserted_timestamp, + COALESCE( + s.value :"BLOCK_NUMBER" :: STRING, + s.metadata :request :"data" :id :: STRING, + PARSE_JSON( + s.metadata :request :"data" + ) :id :: STRING + ) :: INT AS block_number + FROM + {{ source( "bronze_streamline", "blocks_v2") }} + s + JOIN meta b + ON b.file_name = metadata$filename + AND b.partition_key = s.partition_key + WHERE + b.partition_key = s.partition_key + AND DATA :error IS NULL + AND DATA IS NOT NULL \ No newline at end of file diff --git a/models/bronze/core/bronze__blocks_fr.sql b/models/bronze/core/bronze__blocks_fr.sql new file mode 100644 index 0000000..d8602af --- /dev/null +++ b/models/bronze/core/bronze__blocks_fr.sql @@ -0,0 +1,26 @@ +{{ config ( + materialized = 'view', + tags = ['bronze','core','streamline_v1','phase_1'] +) }} + +SELECT + partition_key, + block_number, + VALUE, + DATA, + metadata, + file_name, + _inserted_timestamp +FROM + {{ ref('bronze__blocks_fr_v2') }} +UNION ALL +SELECT + _partition_by_block_id AS partition_key, + block_number, + VALUE, + DATA, + metadata, + file_name, + _inserted_timestamp +FROM + {{ ref('bronze__blocks_fr_v1') }} \ No newline at end of file diff --git a/models/bronze/core/bronze__streamline_FR_traces.sql b/models/bronze/core/bronze__blocks_fr_v1.sql similarity index 50% rename from models/bronze/core/bronze__streamline_FR_traces.sql rename to models/bronze/core/bronze__blocks_fr_v1.sql index 2cc38e5..7f05f05 100644 --- a/models/bronze/core/bronze__streamline_FR_traces.sql +++ b/models/bronze/core/bronze__blocks_fr_v1.sql @@ -1,37 +1,35 @@ {{ config ( - materialized = 'view' + materialized = 'view', + tags = ['bronze','core','streamline_v1','phase_1'] ) }} -WITH meta AS ( - - SELECT - registered_on AS _inserted_timestamp, - file_name, - CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER) AS _partition_by_block_id - FROM - TABLE( - information_schema.external_table_files( - table_name => '{{ source( "bronze_streamline", "traces") }}' - ) - ) A -) + WITH meta AS ( + SELECT + registered_on AS _inserted_timestamp, + file_name, + CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER ) AS _partition_by_block_id + FROM + TABLE( + information_schema.external_table_files( + table_name => '{{ source( "bronze_streamline", "blocks") }}' + ) + ) A + ) SELECT block_number, - s.value :metadata :request :params [0] :: STRING AS tx_hash, DATA, + metadata, + file_name, _inserted_timestamp, MD5( CAST( - COALESCE(CAST(tx_hash AS text), '' :: STRING) AS text + COALESCE(CAST(block_number AS text), '' :: STRING) AS text ) ) AS id, s._partition_by_block_id, s.value AS VALUE FROM - {{ source( - "bronze_streamline", - "traces" - ) }} + {{ source( "bronze_streamline", "blocks") }} s JOIN meta b ON b.file_name = metadata$filename @@ -51,7 +49,6 @@ WHERE '-32007', '-32008', '-32009', - '-32010', - '-32608' + '-32010' ) - ) + ) \ No newline at end of file diff --git a/models/bronze/core/bronze__blocks_fr_v2.sql b/models/bronze/core/bronze__blocks_fr_v2.sql new file mode 100644 index 0000000..fb1ff8a --- /dev/null +++ b/models/bronze/core/bronze__blocks_fr_v2.sql @@ -0,0 +1,39 @@ +{{ config ( + materialized = 'view', + tags = ['bronze','core','streamline_v1','phase_1'] +) }} + + WITH meta AS ( + SELECT + registered_on AS _inserted_timestamp, + file_name, + CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER ) AS partition_key + FROM + TABLE( + information_schema.external_table_files( + table_name => '{{ source( "bronze_streamline", "blocks_v2") }}' + ) + ) A + ) +SELECT + s.*, + b.file_name, + b._inserted_timestamp, + COALESCE( + s.value :"BLOCK_NUMBER" :: STRING, + s.value :"block_number" :: STRING, + s.metadata :request :"data" :id :: STRING, + PARSE_JSON( + s.metadata :request :"data" + ) :id :: STRING + ) :: INT AS block_number +FROM + {{ source( "bronze_streamline", "blocks_v2") }} + s + JOIN meta b + ON b.file_name = metadata$filename + AND b.partition_key = s.partition_key +WHERE + b.partition_key = s.partition_key + AND DATA :error IS NULL + AND DATA IS NOT NULL \ No newline at end of file diff --git a/models/bronze/core/bronze__receipts.sql b/models/bronze/core/bronze__receipts.sql new file mode 100644 index 0000000..e69de29 diff --git a/models/bronze/core/bronze__receipts_fr.sql b/models/bronze/core/bronze__receipts_fr.sql new file mode 100644 index 0000000..e69de29 diff --git a/models/bronze/core/bronze__streamline_tx_receipts.sql b/models/bronze/core/bronze__receipts_fr_v1.sql similarity index 100% rename from models/bronze/core/bronze__streamline_tx_receipts.sql rename to models/bronze/core/bronze__receipts_fr_v1.sql diff --git a/models/bronze/core/bronze__receipts_fr_v2.sql b/models/bronze/core/bronze__receipts_fr_v2.sql new file mode 100644 index 0000000..e69de29 diff --git a/models/bronze/core/bronze__streamline_FR_blocks.sql b/models/bronze/core/bronze__streamline_FR_blocks.sql deleted file mode 100644 index e0a16c6..0000000 --- a/models/bronze/core/bronze__streamline_FR_blocks.sql +++ /dev/null @@ -1,11 +0,0 @@ -{{ config ( - materialized = 'view' -) }} - -{% set model = this.identifier.split("_") [-1] %} -{{ streamline_external_table_FR_query( - model, - partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )", - partition_name = "_partition_by_block_id", - unique_key = "block_number" -) }} diff --git a/models/bronze/core/bronze__streamline_FR_blocks_transactions.sql b/models/bronze/core/bronze__streamline_FR_blocks_transactions.sql deleted file mode 100644 index e0a16c6..0000000 --- a/models/bronze/core/bronze__streamline_FR_blocks_transactions.sql +++ /dev/null @@ -1,11 +0,0 @@ -{{ config ( - materialized = 'view' -) }} - -{% set model = this.identifier.split("_") [-1] %} -{{ streamline_external_table_FR_query( - model, - partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )", - partition_name = "_partition_by_block_id", - unique_key = "block_number" -) }} diff --git a/models/bronze/core/bronze__streamline_FR_transactions.sql b/models/bronze/core/bronze__streamline_FR_transactions.sql deleted file mode 100644 index b27a8f8..0000000 --- a/models/bronze/core/bronze__streamline_FR_transactions.sql +++ /dev/null @@ -1,79 +0,0 @@ -{{ config ( - materialized = 'view' -) }} - -WITH meta AS ( - - SELECT - last_modified AS _inserted_timestamp, - file_name, - CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER) AS _partition_by_block_id - FROM - TABLE( - information_schema.external_table_files( - table_name => '{{ source( "bronze_streamline", "transactions") }}' - ) - ) A -), -tbl AS ( - SELECT - block_number, - COALESCE(s.data :hash :: STRING, s.data :result :hash :: STRING) AS tx_hash, - _inserted_timestamp, - s._partition_by_block_id, - s.value AS VALUE, - s.data AS DATA - FROM - {{ source( - "bronze_streamline", - "transactions" - ) }} - s - JOIN meta b - ON b.file_name = metadata$filename - AND b._partition_by_block_id = s._partition_by_block_id - WHERE - b._partition_by_block_id = s._partition_by_block_id - AND ( - DATA :error :code IS NULL - OR DATA :error :code NOT IN ( - '-32000', - '-32001', - '-32002', - '-32003', - '-32004', - '-32005', - '-32006', - '-32007', - '-32008', - '-32009', - '-32010' - ) - ) -) -SELECT - block_number, - COALESCE(f.value :hash :: STRING, tx_hash) AS tx_hash, - _inserted_timestamp, - MD5( - CAST( - COALESCE( - CAST( - CONCAT( - block_number, - '_-_', - COALESCE(COALESCE(f.value :hash :: STRING, tx_hash), '') - ) AS text - ), - '' :: STRING - ) AS text - ) - ) AS id, - _partition_by_block_id, - COALESCE(f.value, tbl.data) AS VALUE -FROM - tbl, - LATERAL FLATTEN( - input => VALUE :data :result :transactions, OUTER => TRUE - ) f -WHERE f.value IS NOT NULL OR tbl.data :transactionIndex IS NOT NULL \ No newline at end of file diff --git a/models/bronze/core/bronze__streamline_FR_tx_receipts.sql b/models/bronze/core/bronze__streamline_FR_tx_receipts.sql deleted file mode 100644 index e67934e..0000000 --- a/models/bronze/core/bronze__streamline_FR_tx_receipts.sql +++ /dev/null @@ -1,55 +0,0 @@ -{{ config ( - materialized = 'view' -) }} - -WITH meta AS ( - - SELECT - registered_on AS _inserted_timestamp, - file_name, - CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER) AS _partition_by_block_id - FROM - TABLE( - information_schema.external_table_files( - table_name => '{{ source( "bronze_streamline", "tx_receipts") }}' - ) - ) A -) -SELECT - block_number, - value :data :result :transactionHash ::STRING AS tx_hash, - _inserted_timestamp, - MD5( - CAST( - COALESCE(CAST(CONCAT(block_number, '_-_', COALESCE(value :data :result :transactionHash ::STRING, '')) AS text), '' :: STRING) AS text - ) - ) AS id, - s._partition_by_block_id, - s.value:data:result AS VALUE -FROM - {{ source( - "bronze_streamline", - "tx_receipts" - ) }} - s - JOIN meta b - ON b.file_name = metadata$filename - AND b._partition_by_block_id = s._partition_by_block_id -WHERE - b._partition_by_block_id = s._partition_by_block_id - AND ( - DATA :error :code IS NULL - OR DATA :error :code NOT IN ( - '-32000', - '-32001', - '-32002', - '-32003', - '-32004', - '-32005', - '-32006', - '-32007', - '-32008', - '-32009', - '-32010' - ) - ) diff --git a/models/bronze/core/bronze__streamline_blocks.sql b/models/bronze/core/bronze__streamline_blocks.sql deleted file mode 100644 index de33225..0000000 --- a/models/bronze/core/bronze__streamline_blocks.sql +++ /dev/null @@ -1,11 +0,0 @@ -{{ config ( - materialized = 'view' -) }} - -{% set model = this.identifier.split("_") [-1] %} -{{ streamline_external_table_query( - model, - partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )", - partition_name = "_partition_by_block_id", - unique_key = "block_number" -) }} diff --git a/models/bronze/core/bronze__streamline_blocks_transactions.sql b/models/bronze/core/bronze__streamline_blocks_transactions.sql deleted file mode 100644 index de33225..0000000 --- a/models/bronze/core/bronze__streamline_blocks_transactions.sql +++ /dev/null @@ -1,11 +0,0 @@ -{{ config ( - materialized = 'view' -) }} - -{% set model = this.identifier.split("_") [-1] %} -{{ streamline_external_table_query( - model, - partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )", - partition_name = "_partition_by_block_id", - unique_key = "block_number" -) }} diff --git a/models/bronze/core/bronze__traces.sql b/models/bronze/core/bronze__traces.sql new file mode 100644 index 0000000..e69de29 diff --git a/models/bronze/core/bronze__traces_fr.sql b/models/bronze/core/bronze__traces_fr.sql new file mode 100644 index 0000000..e69de29 diff --git a/models/bronze/core/bronze__streamline_traces.sql b/models/bronze/core/bronze__traces_fr_v1.sql similarity index 100% rename from models/bronze/core/bronze__streamline_traces.sql rename to models/bronze/core/bronze__traces_fr_v1.sql diff --git a/models/bronze/core/bronze__traces_fr_v2.sql b/models/bronze/core/bronze__traces_fr_v2.sql new file mode 100644 index 0000000..e69de29 diff --git a/models/bronze/core/bronze__transactions.sql b/models/bronze/core/bronze__transactions.sql new file mode 100644 index 0000000..5eee0bf --- /dev/null +++ b/models/bronze/core/bronze__transactions.sql @@ -0,0 +1,39 @@ +{{ config ( + materialized = 'view', + tags = ['bronze','core','streamline_v1','phase_1'] +) }} + +WITH meta AS ( + SELECT + job_created_time AS _inserted_timestamp, + file_name, + CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER) AS partition_key + FROM + TABLE( + information_schema.external_table_file_registration_history( + start_time => DATEADD('day', -3, CURRENT_TIMESTAMP()), + table_name => '{{ source( "bronze_streamline", "transactions_v2") }}' + ) + ) A +) + SELECT + s.*, + b.file_name, + b._inserted_timestamp, + COALESCE( + s.value :"BLOCK_NUMBER" :: STRING, + s.metadata :request :"data" :id :: STRING, + PARSE_JSON( + s.metadata :request :"data" + ) :id :: STRING + ) :: INT AS block_number + FROM + {{ source( "bronze_streamline", "transactions_v2") }} + s + JOIN meta b + ON b.file_name = metadata$filename + AND b.partition_key = s.partition_key + WHERE + b.partition_key = s.partition_key + AND DATA :error IS NULL + AND DATA IS NOT NULL \ No newline at end of file diff --git a/models/bronze/core/bronze__transactions_fr.sql b/models/bronze/core/bronze__transactions_fr.sql new file mode 100644 index 0000000..b5f9cc2 --- /dev/null +++ b/models/bronze/core/bronze__transactions_fr.sql @@ -0,0 +1,26 @@ +{{ config ( + materialized = 'view', + tags = ['bronze','core','streamline_v1','phase_1'] +) }} + +SELECT + partition_key, + block_number, + VALUE, + DATA, + metadata, + file_name, + _inserted_timestamp +FROM + {{ ref('bronze__transactions_fr_v2') }} +UNION ALL +SELECT + _partition_by_block_id AS partition_key, + block_number, + VALUE, + DATA, + metadata, + file_name, + _inserted_timestamp +FROM + {{ ref('bronze__transactions_fr_v1') }} \ No newline at end of file diff --git a/models/bronze/core/bronze__streamline_transactions.sql b/models/bronze/core/bronze__transactions_fr_v1.sql similarity index 95% rename from models/bronze/core/bronze__streamline_transactions.sql rename to models/bronze/core/bronze__transactions_fr_v1.sql index e85321f..b925577 100644 --- a/models/bronze/core/bronze__streamline_transactions.sql +++ b/models/bronze/core/bronze__transactions_fr_v1.sql @@ -21,6 +21,8 @@ tbl AS ( block_number, COALESCE(s.data :hash :: STRING, s.data :result :hash :: STRING) AS tx_hash, _inserted_timestamp, + metadata, + file_name, s._partition_by_block_id, s.value AS VALUE, s.data AS DATA @@ -70,7 +72,11 @@ SELECT ) AS text ) ) AS id, + data, + metadata, + file_name, _partition_by_block_id, + _inserted_timestamp, COALESCE(f.value, tbl.data:result, tbl.data) AS VALUE FROM tbl, diff --git a/models/bronze/core/bronze__transactions_fr_v2.sql b/models/bronze/core/bronze__transactions_fr_v2.sql new file mode 100644 index 0000000..ef5982a --- /dev/null +++ b/models/bronze/core/bronze__transactions_fr_v2.sql @@ -0,0 +1,39 @@ +{{ config ( + materialized = 'view', + tags = ['bronze','core','streamline_v1','phase_1'] +) }} + + WITH meta AS ( + SELECT + registered_on AS _inserted_timestamp, + file_name, + CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER ) AS partition_key + FROM + TABLE( + information_schema.external_table_files( + table_name => '{{ source( "bronze_streamline", "transactions_v2") }}' + ) + ) A + ) +SELECT + s.*, + b.file_name, + b._inserted_timestamp, + COALESCE( + s.value :"BLOCK_NUMBER" :: STRING, + s.value :"block_number" :: STRING, + s.metadata :request :"data" :id :: STRING, + PARSE_JSON( + s.metadata :request :"data" + ) :id :: STRING + ) :: INT AS block_number +FROM + {{ source( "bronze_streamline", "transactions_v2") }} + s + JOIN meta b + ON b.file_name = metadata$filename + AND b.partition_key = s.partition_key +WHERE + b.partition_key = s.partition_key + AND DATA :error IS NULL + AND DATA IS NOT NULL \ No newline at end of file diff --git a/models/silver/core/silver__blocks.sql b/models/silver/core/silver__blocks.sql index 08478a9..301667d 100644 --- a/models/silver/core/silver__blocks.sql +++ b/models/silver/core/silver__blocks.sql @@ -1,4 +1,4 @@ --- depends_on: {{ ref('bronze__streamline_blocks') }} +-- depends_on: {{ ref('bronze__blocks') }} {{ config( materialized = 'incremental', incremental_strategy = 'delete+insert', @@ -57,7 +57,7 @@ SELECT FROM {% if is_incremental() %} -{{ ref('bronze__streamline_blocks') }} +{{ ref('bronze__blocks') }} WHERE _inserted_timestamp >= ( SELECT @@ -66,7 +66,7 @@ WHERE {{ this }} ) {% else %} - {{ ref('bronze__streamline_FR_blocks') }} + {{ ref('bronze__blocks_fr') }} {% endif %} qualify(ROW_NUMBER() over (PARTITION BY block_number diff --git a/models/silver/core/silver__receipts.sql b/models/silver/core/silver__receipts.sql index b9a37dc..9c0203f 100644 --- a/models/silver/core/silver__receipts.sql +++ b/models/silver/core/silver__receipts.sql @@ -1,4 +1,4 @@ --- depends_on: {{ ref('bronze__streamline_tx_receipts') }} +-- depends_on: {{ ref('bronze__receipts') }} {{ config( materialized = 'incremental', incremental_strategy = 'delete+insert', @@ -14,7 +14,7 @@ WITH base AS ( _inserted_timestamp FROM {% if is_incremental() %} -{{ ref('bronze__streamline_tx_receipts') }} +{{ ref('bronze__receipts') }} WHERE _inserted_timestamp >= ( SELECT @@ -26,7 +26,7 @@ WHERE DATA ) {% else %} - {{ ref('bronze__streamline_FR_tx_receipts') }} + {{ ref('bronze__receipts_fr') }} WHERE IS_OBJECT( DATA diff --git a/models/silver/core/silver__traces.sql b/models/silver/core/silver__traces.sql index ebfa448..692d820 100644 --- a/models/silver/core/silver__traces.sql +++ b/models/silver/core/silver__traces.sql @@ -1,4 +1,4 @@ --- depends_on: {{ ref('bronze__streamline_traces') }} +-- depends_on: {{ ref('bronze__traces') }} {{ config ( materialized = "incremental", incremental_strategy = 'delete+insert', @@ -17,7 +17,7 @@ WITH bronze_traces AS ( FROM {% if is_incremental() %} -{{ ref('bronze__streamline_traces') }} +{{ ref('bronze__traces') }} WHERE _inserted_timestamp >= ( SELECT @@ -26,7 +26,7 @@ WHERE {{ this }} ) {% else %} - {{ ref('bronze__streamline_FR_traces') }} + {{ ref('bronze__traces_fr') }} {% endif %} qualify(ROW_NUMBER() over (PARTITION BY block_number, tx_hash diff --git a/models/silver/core/silver__transactions.sql b/models/silver/core/silver__transactions.sql index 7809180..972c06a 100644 --- a/models/silver/core/silver__transactions.sql +++ b/models/silver/core/silver__transactions.sql @@ -1,4 +1,4 @@ --- depends_on: {{ ref('bronze__streamline_transactions') }} +-- depends_on: {{ ref('bronze__transactions') }} {{ config( materialized = 'incremental', incremental_strategy = 'delete+insert', @@ -16,7 +16,7 @@ WITH base AS ( FROM {% if is_incremental() %} -{{ ref('bronze__streamline_transactions') }} +{{ ref('bronze__transactions') }} WHERE _inserted_timestamp >= ( SELECT @@ -26,7 +26,7 @@ WHERE ) AND IS_OBJECT(DATA) {% else %} - {{ ref('bronze__streamline_FR_transactions') }} + {{ ref('bronze__transactions_fr') }} WHERE IS_OBJECT(DATA) {% endif %} diff --git a/models/silver/streamline/_block_lookback.sql b/models/silver/streamline/_block_lookback.sql new file mode 100644 index 0000000..3e4ab21 --- /dev/null +++ b/models/silver/streamline/_block_lookback.sql @@ -0,0 +1,11 @@ +{{ config( + materialized = 'ephemeral' +) }} + +SELECT + COALESCE(MIN(block_number), 0) AS block_number +FROM + {{ ref("core__fact_blocks") }} +WHERE + block_timestamp >= DATEADD('hour', -72, TRUNCATE(SYSDATE(), 'HOUR')) + AND block_timestamp < DATEADD('hour', -71, TRUNCATE(SYSDATE(), 'HOUR')) \ No newline at end of file diff --git a/models/silver/streamline/complete/streamline__complete_blocks.sql b/models/silver/streamline/complete/streamline__complete_blocks.sql index 4910a03..31cc0c4 100644 --- a/models/silver/streamline/complete/streamline__complete_blocks.sql +++ b/models/silver/streamline/complete/streamline__complete_blocks.sql @@ -1,4 +1,4 @@ --- depends_on: {{ ref('bronze__streamline_blocks') }} +-- depends_on: {{ ref('bronze__blocks') }} {{ config ( materialized = "incremental", unique_key = "id", @@ -14,7 +14,7 @@ SELECT FROM {% if is_incremental() %} -{{ ref('bronze__streamline_blocks') }} +{{ ref('bronze__blocks') }} WHERE _inserted_timestamp >= ( SELECT @@ -24,7 +24,7 @@ WHERE ) AND DATA != [] {% else %} - {{ ref('bronze__streamline_FR_blocks') }} + {{ ref('bronze__blocks_fr') }} WHERE DATA != [] {% endif %} diff --git a/models/silver/streamline/complete/streamline__complete_blocks_transactions.sql b/models/silver/streamline/complete/streamline__complete_blocks_transactions.sql deleted file mode 100644 index 9d98ef0..0000000 --- a/models/silver/streamline/complete/streamline__complete_blocks_transactions.sql +++ /dev/null @@ -1,30 +0,0 @@ --- depends_on: {{ ref('bronze__streamline_blocks_transactions') }} -{{ config ( - materialized = "incremental", - unique_key = "id", - cluster_by = "ROUND(block_number, -3)", - post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(id)" -) }} - -SELECT - id, - block_number, - _inserted_timestamp -FROM - -{% if is_incremental() %} -{{ ref('bronze__streamline_blocks_transactions') }} -WHERE - _inserted_timestamp >= ( - SELECT - MAX(_inserted_timestamp) _inserted_timestamp - FROM - {{ this }} - ) -{% else %} - {{ ref('bronze__streamline_FR_blocks_transactions') }} -{% endif %} - -qualify(ROW_NUMBER() over (PARTITION BY id -ORDER BY - _inserted_timestamp DESC)) = 1 diff --git a/models/silver/streamline/complete/streamline__complete_traces.sql b/models/silver/streamline/complete/streamline__complete_traces.sql index 8348091..b1f87ea 100644 --- a/models/silver/streamline/complete/streamline__complete_traces.sql +++ b/models/silver/streamline/complete/streamline__complete_traces.sql @@ -1,4 +1,4 @@ --- depends_on: {{ ref('bronze__streamline_traces') }} +-- depends_on: {{ ref('bronze__traces') }} {{ config ( materialized = "incremental", unique_key = "id", @@ -14,7 +14,7 @@ SELECT FROM {% if is_incremental() %} -{{ ref('bronze__streamline_traces') }} +{{ ref('bronze__traces') }} WHERE _inserted_timestamp >= ( @@ -24,7 +24,7 @@ WHERE {{ this }} ) {% else %} - {{ ref('bronze__streamline_FR_traces') }} + {{ ref('bronze__traces_fr') }} {% endif %} qualify(ROW_NUMBER() over (PARTITION BY id diff --git a/models/silver/streamline/complete/streamline__complete_transactions.sql b/models/silver/streamline/complete/streamline__complete_transactions.sql index 6cdbfc5..05b8770 100644 --- a/models/silver/streamline/complete/streamline__complete_transactions.sql +++ b/models/silver/streamline/complete/streamline__complete_transactions.sql @@ -1,4 +1,4 @@ --- depends_on: {{ ref('bronze__streamline_transactions') }} +-- depends_on: {{ ref('bronze__transactions') }} {{ config ( materialized = "incremental", unique_key = "id", @@ -14,7 +14,7 @@ SELECT FROM {% if is_incremental() %} -{{ ref('bronze__streamline_transactions') }} +{{ ref('bronze__transactions') }} WHERE _inserted_timestamp >= ( SELECT @@ -23,7 +23,7 @@ WHERE {{ this }} ) {% else %} - {{ ref('bronze__streamline_FR_transactions') }} + {{ ref('bronze__transactions_fr') }} {% endif %} qualify(ROW_NUMBER() over (PARTITION BY id diff --git a/models/silver/streamline/complete/streamline__complete_tx_receipts.sql b/models/silver/streamline/complete/streamline__complete_tx_receipts.sql index 59631da..0b63985 100644 --- a/models/silver/streamline/complete/streamline__complete_tx_receipts.sql +++ b/models/silver/streamline/complete/streamline__complete_tx_receipts.sql @@ -1,4 +1,4 @@ --- depends_on: {{ ref('bronze__streamline_tx_receipts') }} +-- depends_on: {{ ref('bronze__receipts') }} {{ config ( materialized = "incremental", unique_key = "id", @@ -12,7 +12,7 @@ SELECT FROM {% if is_incremental() %} -{{ ref('bronze__streamline_tx_receipts') }} +{{ ref('bronze__receipts') }} WHERE _inserted_timestamp >= ( SELECT @@ -22,7 +22,7 @@ WHERE ) AND tx_hash IS NOT NULL {% else %} - {{ ref('bronze__streamline_FR_tx_receipts') }} + {{ ref('bronze__receipts_fr') }} WHERE tx_hash IS NOT NULL {% endif %} diff --git a/models/silver/streamline/realtime/streamline__blocks_realtime.sql b/models/silver/streamline/realtime/streamline__blocks_realtime.sql deleted file mode 100644 index e7c68bd..0000000 --- a/models/silver/streamline/realtime/streamline__blocks_realtime.sql +++ /dev/null @@ -1,64 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = if_data_call_function( - func = "{{this.schema}}.udf_json_rpc(object_construct('sql_source', '{{this.identifier}}', 'external_table','blocks', 'producer_batch_size',10000, 'producer_limit_size',2000000, 'worker_batch_size',100))", - target = "{{this.schema}}.{{this.identifier}}" - ) -) }} - -WITH last_3_days AS ({% if var('STREAMLINE_RUN_HISTORY') %} - - SELECT - 0 AS block_number - {% else %} - SELECT - MAX(block_number) - 500000 AS block_number --aprox 3 days - FROM - {{ ref("streamline__blocks") }} - {% endif %}), - tbl AS ( - SELECT - block_number, - block_number_hex - FROM - {{ ref("streamline__blocks") }} - WHERE - ( - block_number >= ( - SELECT - block_number - FROM - last_3_days - ) - ) - AND block_number IS NOT NULL - EXCEPT - SELECT - block_number, - REPLACE( - concat_ws('', '0x', to_char(block_number, 'XXXXXXXX')), - ' ', - '' - ) AS block_number_hex - FROM - {{ ref("streamline__complete_blocks") }} - WHERE - ( - block_number >= ( - SELECT - block_number - FROM - last_3_days - ) - ) - AND block_number IS NOT NULL - ) -SELECT - block_number, - 'eth_getBlockByNumber' AS method, - CONCAT( - block_number_hex, - '_-_', - 'false' - ) AS params -FROM tbl \ No newline at end of file diff --git a/models/silver/streamline/realtime/streamline__blocks_transactions_realtime.sql b/models/silver/streamline/realtime/streamline__blocks_transactions_realtime.sql new file mode 100644 index 0000000..8ceb652 --- /dev/null +++ b/models/silver/streamline/realtime/streamline__blocks_transactions_realtime.sql @@ -0,0 +1,33 @@ +{{ config ( + materialized = "view", + post_hook = fsc_utils.if_data_call_function_v2( + func = 'streamline.udf_bulk_rest_api_v2', + target = "{{this.schema}}.{{this.identifier}}", + params ={ "external_table" :'blocks_transactions', + "sql_limit" :"30000", + "producer_batch_size" :"30000", + "worker_batch_size" :"10000", + "sql_source" :'{{this.identifier}}', + "exploded_key": tojson(['result', 'result.transactions']) } + ), + tags = ['streamline_core_evm_realtime'] +) }} + +SELECT + 150815373 as block_number, + ROUND(block_number, -3) AS partition_key, + live.udf_api( + 'POST', + '{URL}', + OBJECT_CONSTRUCT( + 'Content-Type', 'application/json', + 'fsc-quantum-state', 'streamline' + ), + OBJECT_CONSTRUCT( + 'id', block_number, + 'jsonrpc', '2.0', + 'method', 'eth_getBlockByNumber', + 'params', ARRAY_CONSTRUCT(utils.udf_int_to_hex(block_number), TRUE) + ), + 'Vault/prod/evm/aurora/mainnet' + ) AS request \ No newline at end of file diff --git a/models/silver/streamline/realtime/streamline__receipts_by_hash_realtime.sql b/models/silver/streamline/realtime/streamline__receipts_by_hash_realtime.sql new file mode 100644 index 0000000..b92808c --- /dev/null +++ b/models/silver/streamline/realtime/streamline__receipts_by_hash_realtime.sql @@ -0,0 +1,36 @@ +{{ config ( + materialized = "view", + post_hook = fsc_utils.if_data_call_function_v2( + func = 'streamline.udf_bulk_rest_api_v2', + target = "{{this.schema}}.{{this.identifier}}", + params ={ "external_table" :'receipts_by_hash', + "sql_limit" :"30000", + "producer_batch_size" :"30000", + "worker_batch_size" :"10000", + "sql_source" :'{{this.identifier}}' } + ), + tags = ['streamline','core','realtime','phase_1'] +) }} + +SELECT + 150949168 as block_number, + '0x967586085f70584ab5e5886ba9c7d1b2f227f554ed594a491978e4a2110bfdd7' as tx_hash, + ROUND( + block_number, + -3 + ) AS partition_key, + live.udf_api( + 'POST', + '{URL}', + OBJECT_CONSTRUCT( + 'Content-Type', 'application/json', + 'fsc-quantum-state', 'streamline' + ), + OBJECT_CONSTRUCT( + 'id', block_number, + 'jsonrpc', '2.0', + 'method', 'eth_getTransactionReceipt', + 'params', ARRAY_CONSTRUCT(tx_hash) + ), + 'Vault/prod/evm/aurora/mainnet' + ) AS request \ No newline at end of file diff --git a/models/silver/streamline/realtime/streamline__traces_realtime.sql b/models/silver/streamline/realtime/streamline__traces_realtime.sql new file mode 100644 index 0000000..ba4de5a --- /dev/null +++ b/models/silver/streamline/realtime/streamline__traces_realtime.sql @@ -0,0 +1,36 @@ +{{ config ( + materialized = "view", + post_hook = fsc_utils.if_data_call_function_v2( + func = 'streamline.udf_bulk_rest_api_v2', + target = "{{this.schema}}.{{this.identifier}}", + params ={ + "external_table" :'traces', + "sql_limit" :"30000", + "producer_batch_size" :"30000", + "worker_batch_size" :"10000", + "sql_source" :'{{this.identifier}}', + "exploded_key": tojson(['result']) + } + ), + tags = ['streamline_core_evm_realtime'] +) }} + +SELECT + 150949168 as block_number, + '0x967586085f70584ab5e5886ba9c7d1b2f227f554ed594a491978e4a2110bfdd7' as tx_hash, + ROUND(block_number, -3) AS partition_key, + live.udf_api( + 'POST', + '{URL}', + OBJECT_CONSTRUCT( + 'Content-Type', 'application/json', + 'fsc-quantum-state', 'streamline' + ), + OBJECT_CONSTRUCT( + 'id', block_number, + 'jsonrpc', '2.0', + 'method', 'debug_traceTransaction', + 'params', ARRAY_CONSTRUCT(tx_hash, OBJECT_CONSTRUCT('tracer', 'callTracer', 'timeout', '120s')) + ), + 'Vault/prod/evm/aurora/mainnet' + ) AS request \ No newline at end of file diff --git a/models/silver/streamline/realtime/streamline__transactions_realtime.sql b/models/silver/streamline/realtime/streamline__transactions_realtime.sql deleted file mode 100644 index 09cd7cd..0000000 --- a/models/silver/streamline/realtime/streamline__transactions_realtime.sql +++ /dev/null @@ -1,64 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = if_data_call_function( - func = "{{this.schema}}.udf_json_rpc(object_construct('sql_source', '{{this.identifier}}', 'external_table', 'transactions', 'exploded_key','[\"result\", \"transactions\"]', 'producer_batch_size',10000, 'producer_limit_size',2000000, 'worker_batch_size',100))", - target = "{{this.schema}}.{{this.identifier}}" - ) -) }} - -WITH last_3_days AS ({% if var('STREAMLINE_RUN_HISTORY') %} - - SELECT - 0 AS block_number - {% else %} - SELECT - MAX(block_number) - 500000 AS block_number --aprox 3 days - FROM - {{ ref("streamline__blocks") }} - {% endif %}), - tbl AS ( - SELECT - block_number, - block_number_hex - FROM - {{ ref("streamline__blocks") }} - WHERE - ( - block_number >= ( - SELECT - block_number - FROM - last_3_days - ) - ) - AND block_number IS NOT NULL - EXCEPT - SELECT - block_number, - REPLACE( - concat_ws('', '0x', to_char(block_number, 'XXXXXXXX')), - ' ', - '' - ) AS block_number_hex - FROM - {{ ref("streamline__complete_blocks_transactions") }} - WHERE - ( - block_number >= ( - SELECT - block_number - FROM - last_3_days - ) - ) - AND block_number IS NOT NULL - ) -SELECT - block_number, - 'eth_getBlockByNumber' AS method, - CONCAT( - block_number_hex, - '_-_', - 'true' - ) AS params -FROM tbl \ No newline at end of file diff --git a/models/silver/streamline/realtime/streamline__tx_receipts_realtime.sql b/models/silver/streamline/realtime/streamline__tx_receipts_realtime.sql deleted file mode 100644 index 906a62a..0000000 --- a/models/silver/streamline/realtime/streamline__tx_receipts_realtime.sql +++ /dev/null @@ -1,48 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = if_data_call_function( - func = "{{this.schema}}.udf_json_rpc(object_construct('sql_source', '{{this.identifier}}', 'external_table', 'tx_receipts', 'producer_batch_size',10000, 'producer_limit_size',2000000, 'worker_batch_size',100))", - target = "{{this.schema}}.{{this.identifier}}" - ) -) }} - -WITH last_3_days AS ({% if var('STREAMLINE_RUN_HISTORY') %} - - SELECT - 0 AS block_number - {% else %} - SELECT - MAX(block_number) - 500000 AS block_number -- aprox 3 days - FROM - {{ ref("streamline__complete_transactions") }} - {% endif %}), - tbl AS ( - SELECT - block_number, - tx_hash - FROM - {{ ref("streamline__complete_transactions") }} - WHERE - ( - block_number >= ( - SELECT - block_number - FROM - last_3_days - ) - ) - AND block_number IS NOT NULL - AND tx_hash IS NOT NULL - AND tx_hash NOT IN ( - SELECT - tx_hash - FROM - {{ ref("streamline__complete_tx_receipts") }} - ) - ) -SELECT - block_number, - 'eth_getTransactionReceipt' AS method, - tx_hash AS params -FROM - tbl diff --git a/models/silver/streamline/streamline__blocks.sql b/models/silver/streamline/streamline__blocks.sql index 289babd..2445bdd 100644 --- a/models/silver/streamline/streamline__blocks.sql +++ b/models/silver/streamline/streamline__blocks.sql @@ -1,26 +1,24 @@ {{ config ( materialized = "view", - tags = ['streamline_view'] + tags = ['streamline','core','chainhead','phase_1'] ) }} -{% if execute %} - {% set height = run_query('SELECT streamline.udf_get_chainhead()') %} - {% set block_height = height.columns [0].values() [0] %} -{% else %} - {% set block_height = 0 %} -{% endif %} - SELECT - height AS block_number, - REPLACE( - concat_ws('', '0x', to_char(height, 'XXXXXXXX')), - ' ', - '' - ) AS block_number_hex + _id, + ( + (6000 / 60) * 5 + ) :: INT AS block_number_delay, --minute-based block delay + (_id - block_number_delay) :: INT AS block_number, + utils.udf_int_to_hex(block_number) AS block_number_hex FROM - TABLE( - streamline.udtf_get_base_table( - {{ block_height }} - - 800 - ) - ) -- avoid the missing blocks at the tips of the chainhead, around 1 hour + {{ ref('admin__number_sequence') }} +WHERE + _id <= ( + SELECT + COALESCE( + block_number, + 0 + ) + FROM + {{ ref("streamline__get_chainhead") }} + ) \ No newline at end of file diff --git a/models/silver/streamline/streamline__get_chainhead.sql b/models/silver/streamline/streamline__get_chainhead.sql new file mode 100644 index 0000000..3965979 --- /dev/null +++ b/models/silver/streamline/streamline__get_chainhead.sql @@ -0,0 +1,28 @@ +{{ config ( + materialized = 'table', + tags = ['streamline','core','chainhead','phase_1'] +) }} + +SELECT + live.udf_api( + 'POST', + '{URL}', + OBJECT_CONSTRUCT( + 'Content-Type', 'application/json', + 'fsc-quantum-state', 'livequery' + ), + OBJECT_CONSTRUCT( + 'id', + 0, + 'jsonrpc', + '2.0', + 'method', + 'eth_blockNumber', + 'params', + [] + ), + 'Vault/prod/evm/aurora/mainnet' + ) AS resp, + utils.udf_hex_to_int( + resp :data :result :: STRING + ) AS block_number \ No newline at end of file diff --git a/models/sources.yml b/models/sources.yml index e0db1fa..eec5ef6 100644 --- a/models/sources.yml +++ b/models/sources.yml @@ -11,7 +11,6 @@ sources: - name: bronze_streamline database: streamline - schema: | {{ "AURORA_DEV" if var("STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES", False) else "AURORA" }} tables: @@ -19,6 +18,10 @@ sources: - name: transactions - name: tx_receipts - name: traces + - name: blocks_v2 + - name: transactions_v2 + - name: receipts_v2 + - name: traces_v2 - name: silver_crosschain database: "{{ 'crosschain' if target.database == 'FLOW' else 'crosschain_dev' }}" diff --git a/package-lock.yml b/package-lock.yml index 501cbcb..e69015c 100644 --- a/package-lock.yml +++ b/package-lock.yml @@ -1,16 +1,16 @@ packages: - - package: calogica/dbt_expectations - version: 0.8.2 - - package: dbt-labs/dbt_external_tables - version: 0.8.2 - - git: https://github.com/FlipsideCrypto/fsc-utils.git - revision: d3cf679e079f0cf06142de9386f215e55fe26b3b - - package: get-select/dbt_snowflake_query_tags - version: 2.5.0 - - package: calogica/dbt_date - version: 0.7.2 - - package: dbt-labs/dbt_utils - version: 1.0.0 - - git: https://github.com/FlipsideCrypto/livequery-models.git - revision: b024188be4e9c6bc00ed77797ebdc92d351d620e -sha1_hash: 43a34a68dfdb6b0a3c91dd72cdb7a72243b37630 +- package: calogica/dbt_expectations + version: 0.8.2 +- package: dbt-labs/dbt_external_tables + version: 0.8.2 +- git: https://github.com/FlipsideCrypto/fsc-utils.git + revision: d5a43b13ef69c9dd50a86aae5eeddc328789d9f9 +- package: get-select/dbt_snowflake_query_tags + version: 2.5.0 +- package: calogica/dbt_date + version: 0.7.2 +- package: dbt-labs/dbt_utils + version: 1.0.0 +- git: https://github.com/FlipsideCrypto/livequery-models.git + revision: b3d6329d3252b3dae5cf0a5e05044c085f05ea7c +sha1_hash: 434ca8a8ca99de424199f19ef86a3fcacb0e82ec diff --git a/packages.yml b/packages.yml index 941e110..9478a75 100644 --- a/packages.yml +++ b/packages.yml @@ -4,6 +4,6 @@ packages: - package: dbt-labs/dbt_external_tables version: 0.8.2 - git: https://github.com/FlipsideCrypto/fsc-utils.git - revision: v1.32.0 + revision: v1.37.0 - package: get-select/dbt_snowflake_query_tags version: [">=2.0.0", "<3.0.0"] From 9cd6fd257b09161d14e827daf812983154e98eee Mon Sep 17 00:00:00 2001 From: Austin Date: Fri, 13 Jun 2025 16:12:27 -0400 Subject: [PATCH 2/4] traces by hash --- ..._realtime.sql => streamline__traces_by_hash_realtime.sql} | 5 ++--- models/sources.yml | 4 ++-- 2 files changed, 4 insertions(+), 5 deletions(-) rename models/silver/streamline/realtime/{streamline__traces_realtime.sql => streamline__traces_by_hash_realtime.sql} (87%) diff --git a/models/silver/streamline/realtime/streamline__traces_realtime.sql b/models/silver/streamline/realtime/streamline__traces_by_hash_realtime.sql similarity index 87% rename from models/silver/streamline/realtime/streamline__traces_realtime.sql rename to models/silver/streamline/realtime/streamline__traces_by_hash_realtime.sql index ba4de5a..0306455 100644 --- a/models/silver/streamline/realtime/streamline__traces_realtime.sql +++ b/models/silver/streamline/realtime/streamline__traces_by_hash_realtime.sql @@ -4,12 +4,11 @@ func = 'streamline.udf_bulk_rest_api_v2', target = "{{this.schema}}.{{this.identifier}}", params ={ - "external_table" :'traces', + "external_table" :'traces_by_hash', "sql_limit" :"30000", "producer_batch_size" :"30000", "worker_batch_size" :"10000", - "sql_source" :'{{this.identifier}}', - "exploded_key": tojson(['result']) + "sql_source" :'{{this.identifier}}' } ), tags = ['streamline_core_evm_realtime'] diff --git a/models/sources.yml b/models/sources.yml index eec5ef6..d841c4c 100644 --- a/models/sources.yml +++ b/models/sources.yml @@ -20,8 +20,8 @@ sources: - name: traces - name: blocks_v2 - name: transactions_v2 - - name: receipts_v2 - - name: traces_v2 + - name: receipts_by_hash + - name: traces_by_hash - name: silver_crosschain database: "{{ 'crosschain' if target.database == 'FLOW' else 'crosschain_dev' }}" From e307841e9171958150bf26f72efe723574221813 Mon Sep 17 00:00:00 2001 From: Austin Date: Tue, 17 Jun 2025 12:03:47 -0400 Subject: [PATCH 3/4] 2.0 --- ...treamline_realtime_blocks_transactions.yml | 2 +- ...=> dbt_run_streamline_realtime_step_2.yml} | 6 +-- .github/workflows/dbt_test_daily.yml | 1 - .github/workflows/dbt_test_tasks.yml | 51 ------------------ .github/workflows/dbt_test_weekly.yml | 1 - data/github_actions__workflows.csv | 4 +- models/bronze/core/bronze__receipts.sql | 40 ++++++++++++++ models/bronze/core/bronze__receipts_fr.sql | 28 ++++++++++ models/bronze/core/bronze__receipts_fr_v2.sql | 40 ++++++++++++++ models/bronze/core/bronze__traces.sql | 40 ++++++++++++++ models/bronze/core/bronze__traces_fr.sql | 28 ++++++++++ models/bronze/core/bronze__traces_fr_v2.sql | 40 ++++++++++++++ models/silver/core/silver__blocks.sql | 4 +- models/silver/core/silver__transactions.sql | 2 +- .../complete/streamline__complete_blocks.sql | 9 +++- .../complete/streamline__complete_traces.sql | 9 +++- .../streamline__complete_transactions.sql | 10 ++-- .../streamline__complete_tx_receipts.sql | 10 +++- ...treamline__blocks_transactions_history.sql | 48 +++++++++++++++++ .../streamline__receipts_by_hash_history.sql | 53 +++++++++++++++++++ .../streamline__traces_by_hash_history.sql | 52 ++++++++++++++++++ .../history/streamline__traces_history.sql | 40 -------------- ...reamline__blocks_transactions_realtime.sql | 27 ++++++++-- .../streamline__receipts_by_hash_realtime.sql | 33 +++++++++--- .../streamline__traces_by_hash_realtime.sql | 33 +++++++++--- .../silver/streamline/streamline__blocks.sql | 2 +- .../streamline/streamline__get_chainhead.sql | 2 +- 27 files changed, 483 insertions(+), 132 deletions(-) rename .github/workflows/{dbt_run_streamline_realtime_tx_receipts.yml => dbt_run_streamline_realtime_step_2.yml} (88%) delete mode 100644 .github/workflows/dbt_test_tasks.yml create mode 100644 models/silver/streamline/history/streamline__blocks_transactions_history.sql create mode 100644 models/silver/streamline/history/streamline__receipts_by_hash_history.sql create mode 100644 models/silver/streamline/history/streamline__traces_by_hash_history.sql delete mode 100644 models/silver/streamline/history/streamline__traces_history.sql diff --git a/.github/workflows/dbt_run_streamline_realtime_blocks_transactions.yml b/.github/workflows/dbt_run_streamline_realtime_blocks_transactions.yml index d9b25c4..b805df5 100644 --- a/.github/workflows/dbt_run_streamline_realtime_blocks_transactions.yml +++ b/.github/workflows/dbt_run_streamline_realtime_blocks_transactions.yml @@ -45,7 +45,7 @@ jobs: - name: Run DBT Jobs run: | - dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True}' -m 1+models/silver/streamline/realtime/streamline__blocks_realtime.sql 1+models/silver/streamline/realtime/streamline__transactions_realtime.sql + dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True}' -m tag:streamline_core_evm_realtime - name: Store logs uses: actions/upload-artifact@v4 diff --git a/.github/workflows/dbt_run_streamline_realtime_tx_receipts.yml b/.github/workflows/dbt_run_streamline_realtime_step_2.yml similarity index 88% rename from .github/workflows/dbt_run_streamline_realtime_tx_receipts.yml rename to .github/workflows/dbt_run_streamline_realtime_step_2.yml index e9ac3f7..2716fbf 100644 --- a/.github/workflows/dbt_run_streamline_realtime_tx_receipts.yml +++ b/.github/workflows/dbt_run_streamline_realtime_step_2.yml @@ -1,5 +1,5 @@ -name: dbt_run_streamline_realtime_tx_receipts -run-name: dbt_run_streamline_realtime_tx_receipts +name: dbt_run_streamline_realtime_step_2 +run-name: dbt_run_streamline_realtime_step_2 on: workflow_dispatch: @@ -45,7 +45,7 @@ jobs: - name: Run DBT Jobs run: | - dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True}' -m 1+models/silver/streamline/realtime/streamline__tx_receipts_realtime.sql + dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True}' -m tag:streamline_core_evm_realtime_step_2 - name: Store logs uses: actions/upload-artifact@v4 diff --git a/.github/workflows/dbt_test_daily.yml b/.github/workflows/dbt_test_daily.yml index 96befdc..617975b 100644 --- a/.github/workflows/dbt_test_daily.yml +++ b/.github/workflows/dbt_test_daily.yml @@ -46,7 +46,6 @@ jobs: - name: Run DBT Jobs run: | dbt test --exclude tag:full_test --models tag:recent_test - continue-on-error: true notify-failure: needs: [run_dbt_jobs] diff --git a/.github/workflows/dbt_test_tasks.yml b/.github/workflows/dbt_test_tasks.yml deleted file mode 100644 index bcfc912..0000000 --- a/.github/workflows/dbt_test_tasks.yml +++ /dev/null @@ -1,51 +0,0 @@ -name: dbt_test_tasks - -on: - workflow_call: - -env: - USE_VARS: "${{ vars.USE_VARS }}" - DBT_VERSION: "${{ vars.DBT_VERSION }}" - DBT_PROFILES_DIR: "${{ vars.DBT_PROFILES_DIR }}" - ACCOUNT: "${{ vars.ACCOUNT }}" - ROLE: "${{ vars.ROLE }}" - USER: "${{ vars.USER }}" - PASSWORD: "${{ secrets.PASSWORD }}" - REGION: "${{ vars.REGION }}" - DATABASE: "${{ vars.DATABASE }}" - SCHEMA: "${{ vars.SCHEMA }}" - WAREHOUSE: "${{ vars.WAREHOUSE }}" - -concurrency: - group: ${{ github.workflow }} - cancel-in-progress: false - -jobs: - run_dbt_jobs: - runs-on: ubuntu-latest - environment: - name: workflow_prod - - steps: - - uses: actions/checkout@v3 - - - uses: actions/setup-python@v4 - with: - python-version: "3.10" - cache: "pip" - - - name: install dependencies - run: | - pip install -r requirements.txt - dbt deps - - - name: Run DBT Jobs - run: | - dbt test -m models/github_actions/github_actions__current_task_status.sql - - notify-failure: - needs: [run_dbt_jobs] - if: failure() - uses: ./.github/workflows/slack_notify.yml - secrets: - EVM_SLACK_WEBHOOK_URL: ${{ secrets.EVM_SLACK_WEBHOOK_URL }} \ No newline at end of file diff --git a/.github/workflows/dbt_test_weekly.yml b/.github/workflows/dbt_test_weekly.yml index 5af2efe..f69436b 100644 --- a/.github/workflows/dbt_test_weekly.yml +++ b/.github/workflows/dbt_test_weekly.yml @@ -46,7 +46,6 @@ jobs: - name: Run DBT Jobs run: | dbt test --exclude tag:recent_test --models tag:full_test - continue-on-error: true notify-failure: needs: [run_dbt_jobs] diff --git a/data/github_actions__workflows.csv b/data/github_actions__workflows.csv index f77a3a3..77032aa 100644 --- a/data/github_actions__workflows.csv +++ b/data/github_actions__workflows.csv @@ -1,5 +1,5 @@ workflow_name,workflow_schedule dbt_run_streamline_realtime_blocks_transactions,"12,42 * * * *" -dbt_run_streamline_realtime_tx_receipts,"17,47 * * * *" +dbt_run_streamline_realtime_step_2,"20,52 * * * *" dbt_run_scheduled,"25,55 * * * *" -dbt_test_tasks,"28,58 * * * *" +dbt_test_tasks,"28,58 * * * *" \ No newline at end of file diff --git a/models/bronze/core/bronze__receipts.sql b/models/bronze/core/bronze__receipts.sql index e69de29..bf8c3ef 100644 --- a/models/bronze/core/bronze__receipts.sql +++ b/models/bronze/core/bronze__receipts.sql @@ -0,0 +1,40 @@ +{{ config ( + materialized = 'view', + tags = ['bronze','core','streamline_v1','phase_1'] +) }} + +WITH meta AS ( + SELECT + job_created_time AS _inserted_timestamp, + file_name, + CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER) AS partition_key + FROM + TABLE( + information_schema.external_table_file_registration_history( + start_time => DATEADD('day', -3, CURRENT_TIMESTAMP()), + table_name => '{{ source( "bronze_streamline", "receipts_by_hash") }}' + ) + ) A +) + SELECT + s.*, + b.file_name, + b._inserted_timestamp, + COALESCE( + s.value :"BLOCK_NUMBER" :: STRING, + s.metadata :request :"data" :id :: STRING, + PARSE_JSON( + s.metadata :request :"data" + ) :id :: STRING + ) :: INT AS block_number, + s.value: "TX_HASH" :: STRING AS tx_hash + FROM + {{ source( "bronze_streamline", "receipts_by_hash") }} + s + JOIN meta b + ON b.file_name = metadata$filename + AND b.partition_key = s.partition_key + WHERE + b.partition_key = s.partition_key + AND DATA :error IS NULL + AND DATA IS NOT NULL \ No newline at end of file diff --git a/models/bronze/core/bronze__receipts_fr.sql b/models/bronze/core/bronze__receipts_fr.sql index e69de29..703c3fd 100644 --- a/models/bronze/core/bronze__receipts_fr.sql +++ b/models/bronze/core/bronze__receipts_fr.sql @@ -0,0 +1,28 @@ +{{ config ( + materialized = 'view', + tags = ['bronze','core','streamline_v1','phase_1'] +) }} + +SELECT + partition_key, + block_number, + tx_hash, + VALUE, + DATA, + metadata, + file_name, + _inserted_timestamp +FROM + {{ ref('bronze__receipts_fr_v2') }} +UNION ALL +SELECT + _partition_by_block_id AS partition_key, + block_number, + tx_hash, + VALUE, + DATA, + metadata, + file_name, + _inserted_timestamp +FROM + {{ ref('bronze__receipts_fr_v1') }} \ No newline at end of file diff --git a/models/bronze/core/bronze__receipts_fr_v2.sql b/models/bronze/core/bronze__receipts_fr_v2.sql index e69de29..6a16443 100644 --- a/models/bronze/core/bronze__receipts_fr_v2.sql +++ b/models/bronze/core/bronze__receipts_fr_v2.sql @@ -0,0 +1,40 @@ +{{ config ( + materialized = 'view', + tags = ['bronze','core','streamline_v1','phase_1'] +) }} + + WITH meta AS ( + SELECT + registered_on AS _inserted_timestamp, + file_name, + CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER ) AS partition_key + FROM + TABLE( + information_schema.external_table_files( + table_name => '{{ source( "bronze_streamline", "receipts_by_hash") }}' + ) + ) A + ) +SELECT + s.*, + b.file_name, + b._inserted_timestamp, + COALESCE( + s.value :"BLOCK_NUMBER" :: STRING, + s.value :"block_number" :: STRING, + s.metadata :request :"data" :id :: STRING, + PARSE_JSON( + s.metadata :request :"data" + ) :id :: STRING + ) :: INT AS block_number, + s.value: "TX_HASH" :: STRING AS tx_hash +FROM + {{ source( "bronze_streamline", "receipts_by_hash") }} + s + JOIN meta b + ON b.file_name = metadata$filename + AND b.partition_key = s.partition_key +WHERE + b.partition_key = s.partition_key + AND DATA :error IS NULL + AND DATA IS NOT NULL \ No newline at end of file diff --git a/models/bronze/core/bronze__traces.sql b/models/bronze/core/bronze__traces.sql index e69de29..dc0bca8 100644 --- a/models/bronze/core/bronze__traces.sql +++ b/models/bronze/core/bronze__traces.sql @@ -0,0 +1,40 @@ +{{ config ( + materialized = 'view', + tags = ['bronze','core','streamline_v1','phase_1'] +) }} + +WITH meta AS ( + SELECT + job_created_time AS _inserted_timestamp, + file_name, + CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER) AS partition_key + FROM + TABLE( + information_schema.external_table_file_registration_history( + start_time => DATEADD('day', -3, CURRENT_TIMESTAMP()), + table_name => '{{ source( "bronze_streamline", "traces_by_hash") }}' + ) + ) A +) + SELECT + s.*, + b.file_name, + b._inserted_timestamp, + COALESCE( + s.value :"BLOCK_NUMBER" :: STRING, + s.metadata :request :"data" :id :: STRING, + PARSE_JSON( + s.metadata :request :"data" + ) :id :: STRING + ) :: INT AS block_number, + s.value: "TX_HASH" :: STRING AS tx_hash + FROM + {{ source( "bronze_streamline", "traces_by_hash") }} + s + JOIN meta b + ON b.file_name = metadata$filename + AND b.partition_key = s.partition_key + WHERE + b.partition_key = s.partition_key + AND DATA :error IS NULL + AND DATA IS NOT NULL \ No newline at end of file diff --git a/models/bronze/core/bronze__traces_fr.sql b/models/bronze/core/bronze__traces_fr.sql index e69de29..b420e1c 100644 --- a/models/bronze/core/bronze__traces_fr.sql +++ b/models/bronze/core/bronze__traces_fr.sql @@ -0,0 +1,28 @@ +{{ config ( + materialized = 'view', + tags = ['bronze','core','streamline_v1','phase_1'] +) }} + +SELECT + partition_key, + block_number, + tx_hash, + VALUE, + DATA, + metadata, + file_name, + _inserted_timestamp +FROM + {{ ref('bronze__traces_fr_v2') }} +UNION ALL +SELECT + _partition_by_block_id AS partition_key, + block_number, + tx_hash, + VALUE, + DATA, + metadata, + file_name, + _inserted_timestamp +FROM + {{ ref('bronze__traces_fr_v1') }} \ No newline at end of file diff --git a/models/bronze/core/bronze__traces_fr_v2.sql b/models/bronze/core/bronze__traces_fr_v2.sql index e69de29..5ecc5eb 100644 --- a/models/bronze/core/bronze__traces_fr_v2.sql +++ b/models/bronze/core/bronze__traces_fr_v2.sql @@ -0,0 +1,40 @@ +{{ config ( + materialized = 'view', + tags = ['bronze','core','streamline_v1','phase_1'] +) }} + + WITH meta AS ( + SELECT + registered_on AS _inserted_timestamp, + file_name, + CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER ) AS partition_key + FROM + TABLE( + information_schema.external_table_files( + table_name => '{{ source( "bronze_streamline", "traces_by_hash") }}' + ) + ) A + ) +SELECT + s.*, + b.file_name, + b._inserted_timestamp, + COALESCE( + s.value :"BLOCK_NUMBER" :: STRING, + s.value :"block_number" :: STRING, + s.metadata :request :"data" :id :: STRING, + PARSE_JSON( + s.metadata :request :"data" + ) :id :: STRING + ) :: INT AS block_number, + s.value: "TX_HASH" :: STRING AS tx_hash +FROM + {{ source( "bronze_streamline", "traces_by_hash") }} + s + JOIN meta b + ON b.file_name = metadata$filename + AND b.partition_key = s.partition_key +WHERE + b.partition_key = s.partition_key + AND DATA :error IS NULL + AND DATA IS NOT NULL \ No newline at end of file diff --git a/models/silver/core/silver__blocks.sql b/models/silver/core/silver__blocks.sql index 301667d..3931375 100644 --- a/models/silver/core/silver__blocks.sql +++ b/models/silver/core/silver__blocks.sql @@ -4,7 +4,7 @@ incremental_strategy = 'delete+insert', unique_key = "block_number", cluster_by = "block_timestamp::date", - tags = ['core'] + tags = ['core','streamline_core_evm_realtime'] ) }} SELECT @@ -49,7 +49,7 @@ SELECT DATA :result :logsBloom :: STRING AS logs_bloom, DATA :result :stateRoot :: STRING AS state_root, DATA :result :transactionsRoot :: STRING AS transactions_root, - _partition_by_block_id, + partition_key as _partition_by_block_id, _inserted_timestamp, SYSDATE() AS inserted_timestamp, SYSDATE() AS modified_timestamp, diff --git a/models/silver/core/silver__transactions.sql b/models/silver/core/silver__transactions.sql index 972c06a..104ccd1 100644 --- a/models/silver/core/silver__transactions.sql +++ b/models/silver/core/silver__transactions.sql @@ -4,7 +4,7 @@ incremental_strategy = 'delete+insert', unique_key = "tx_id", cluster_by = "block_timestamp::date, _inserted_timestamp::date", - tags = ['core'] + tags = ['core','streamline_core_evm_realtime'] ) }} WITH base AS ( diff --git a/models/silver/streamline/complete/streamline__complete_blocks.sql b/models/silver/streamline/complete/streamline__complete_blocks.sql index 31cc0c4..ef189ee 100644 --- a/models/silver/streamline/complete/streamline__complete_blocks.sql +++ b/models/silver/streamline/complete/streamline__complete_blocks.sql @@ -3,11 +3,16 @@ materialized = "incremental", unique_key = "id", cluster_by = "ROUND(block_number, -3)", - post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(id)" + post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(id)", + tags = ['streamline_core_evm_realtime'] ) }} SELECT - id, + MD5( + CAST( + COALESCE(CAST(block_number AS text), '' :: STRING) AS text + ) + ) AS id, block_number, _inserted_timestamp, DATA :result :transactions AS transactions diff --git a/models/silver/streamline/complete/streamline__complete_traces.sql b/models/silver/streamline/complete/streamline__complete_traces.sql index b1f87ea..6a5c6b7 100644 --- a/models/silver/streamline/complete/streamline__complete_traces.sql +++ b/models/silver/streamline/complete/streamline__complete_traces.sql @@ -3,11 +3,16 @@ materialized = "incremental", unique_key = "id", cluster_by = "ROUND(block_number, -3)", - post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(id)" + post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(id)", + tags = ['streamline_core_evm_realtime_step_2'] ) }} SELECT - id, + MD5( + CAST( + COALESCE(CAST(CONCAT(block_number, '_-_', COALESCE(tx_hash, '')) AS text), '' :: STRING) AS text + ) + ) AS id, block_number, tx_hash, _inserted_timestamp diff --git a/models/silver/streamline/complete/streamline__complete_transactions.sql b/models/silver/streamline/complete/streamline__complete_transactions.sql index 05b8770..95a4dd4 100644 --- a/models/silver/streamline/complete/streamline__complete_transactions.sql +++ b/models/silver/streamline/complete/streamline__complete_transactions.sql @@ -3,13 +3,17 @@ materialized = "incremental", unique_key = "id", cluster_by = "ROUND(block_number, -3)", - post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(id)" + post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(id)", + tags = ['streamline_core_evm_realtime'] ) }} SELECT - id, + MD5( + CAST( + COALESCE(CAST(block_number AS text), '' :: STRING) AS text + ) + ) AS id, block_number, - tx_hash, _inserted_timestamp FROM diff --git a/models/silver/streamline/complete/streamline__complete_tx_receipts.sql b/models/silver/streamline/complete/streamline__complete_tx_receipts.sql index 0b63985..6205e4f 100644 --- a/models/silver/streamline/complete/streamline__complete_tx_receipts.sql +++ b/models/silver/streamline/complete/streamline__complete_tx_receipts.sql @@ -2,11 +2,17 @@ {{ config ( materialized = "incremental", unique_key = "id", - post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(id)" + post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(id)", + tags = ['streamline_core_evm_realtime_step_2'] ) }} SELECT - id, + MD5( + CAST( + COALESCE(CAST(CONCAT(block_number, '_-_', COALESCE(tx_hash, '')) AS text), '' :: STRING) AS text + ) + ) AS id, + block_number, tx_hash, _inserted_timestamp FROM diff --git a/models/silver/streamline/history/streamline__blocks_transactions_history.sql b/models/silver/streamline/history/streamline__blocks_transactions_history.sql new file mode 100644 index 0000000..1126f26 --- /dev/null +++ b/models/silver/streamline/history/streamline__blocks_transactions_history.sql @@ -0,0 +1,48 @@ +{{ config ( + materialized = "view", + post_hook = fsc_utils.if_data_call_function_v2( + func = 'streamline.udf_bulk_rest_api_v2', + target = "{{this.schema}}.{{this.identifier}}", + params ={ "external_table" :'blocks_transactions', + "sql_limit" :"120000", + "producer_batch_size" :"12000", + "worker_batch_size" :"4000", + "sql_source" :'{{this.identifier}}', + "exploded_key": tojson(['result', 'result.transactions']) } + ), + tags = ['streamline_core_evm_history'] +) }} + +with blocks as ( + select + block_number + from {{ ref('streamline__blocks') }} + except + select + block_number + from {{ ref('streamline__complete_blocks') }} + inner join {{ ref('streamline__complete_transactions') }} using (block_number) +) +SELECT + block_number, + ROUND(block_number, -3) AS partition_key, + live.udf_api( + 'POST', + '{URL}', + OBJECT_CONSTRUCT( + 'Content-Type', 'application/json', + 'fsc-quantum-state', 'streamline' + ), + OBJECT_CONSTRUCT( + 'id', block_number, + 'jsonrpc', '2.0', + 'method', 'eth_getBlockByNumber', + 'params', ARRAY_CONSTRUCT(utils.udf_int_to_hex(block_number), TRUE) + ), + 'Vault/prod/evm/aurora/mainnet' + ) AS request +from blocks + +order by block_number desc + +limit 120000 \ No newline at end of file diff --git a/models/silver/streamline/history/streamline__receipts_by_hash_history.sql b/models/silver/streamline/history/streamline__receipts_by_hash_history.sql new file mode 100644 index 0000000..d57701e --- /dev/null +++ b/models/silver/streamline/history/streamline__receipts_by_hash_history.sql @@ -0,0 +1,53 @@ +{{ config ( + materialized = "view", + post_hook = fsc_utils.if_data_call_function_v2( + func = 'streamline.udf_bulk_rest_api_v2', + target = "{{this.schema}}.{{this.identifier}}", + params ={ "external_table" :'receipts_by_hash', + "sql_limit" :"120000", + "producer_batch_size" :"12000", + "worker_batch_size" :"4000", + "sql_source" :'{{this.identifier}}' } + ), + tags = ['streamline_core_evm_history'] +) }} + +with txs as ( + select + block_number, + tx_hash + from {{ ref('silver__transactions') }} + except + select + block_number, + tx_hash + from {{ ref('streamline__complete_tx_receipts') }} +) + +SELECT + block_number, + tx_hash, + ROUND( + block_number, + -3 + ) AS partition_key, + live.udf_api( + 'POST', + '{URL}', + OBJECT_CONSTRUCT( + 'Content-Type', 'application/json', + 'fsc-quantum-state', 'streamline' + ), + OBJECT_CONSTRUCT( + 'id', block_number, + 'jsonrpc', '2.0', + 'method', 'eth_getTransactionReceipt', + 'params', ARRAY_CONSTRUCT(tx_hash) + ), + 'Vault/prod/evm/aurora/mainnet' + ) AS request +from txs + +order by block_number desc + +limit 120000 \ No newline at end of file diff --git a/models/silver/streamline/history/streamline__traces_by_hash_history.sql b/models/silver/streamline/history/streamline__traces_by_hash_history.sql new file mode 100644 index 0000000..b3c71f3 --- /dev/null +++ b/models/silver/streamline/history/streamline__traces_by_hash_history.sql @@ -0,0 +1,52 @@ +{{ config ( + materialized = "view", + post_hook = fsc_utils.if_data_call_function_v2( + func = 'streamline.udf_bulk_rest_api_v2', + target = "{{this.schema}}.{{this.identifier}}", + params ={ + "external_table" :'traces_by_hash', + "sql_limit" :"120000", + "producer_batch_size" :"12000", + "worker_batch_size" :"4000", + "sql_source" :'{{this.identifier}}' + } + ), + tags = ['streamline_core_evm_history'] +) }} + +with txs as ( + select + block_number, + tx_hash + from {{ ref('silver__transactions') }} + except + select + block_number, + tx_hash + from {{ ref('streamline__complete_traces') }} +) + +SELECT + block_number, + tx_hash, + ROUND(block_number, -3) AS partition_key, + live.udf_api( + 'POST', + '{URL}', + OBJECT_CONSTRUCT( + 'Content-Type', 'application/json', + 'fsc-quantum-state', 'streamline' + ), + OBJECT_CONSTRUCT( + 'id', block_number, + 'jsonrpc', '2.0', + 'method', 'debug_traceTransaction', + 'params', ARRAY_CONSTRUCT(tx_hash, OBJECT_CONSTRUCT('tracer', 'callTracer', 'timeout', '120s')) + ), + 'Vault/prod/evm/aurora/mainnet' + ) AS request +from txs + +order by block_number desc + +limit 120000 \ No newline at end of file diff --git a/models/silver/streamline/history/streamline__traces_history.sql b/models/silver/streamline/history/streamline__traces_history.sql deleted file mode 100644 index 4909cdf..0000000 --- a/models/silver/streamline/history/streamline__traces_history.sql +++ /dev/null @@ -1,40 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = if_data_call_function( - func = "{{this.schema}}.udf_json_rpc(object_construct('sql_source', '{{this.identifier}}', 'external_table', 'traces', 'producer_batch_size',20000, 'producer_limit_size',5000000, 'worker_batch_size',200))", - target = "{{this.schema}}.{{this.identifier}}" - ) -) }} - -WITH tbl AS ( - - SELECT - block_number, - tx_hash - FROM - {{ ref("silver__transactions") }} - WHERE - block_number IS NOT NULL - AND tx_hash IS NOT NULL - EXCEPT - SELECT - block_number, - tx_hash - FROM - {{ ref("streamline__complete_traces") }} - WHERE - block_number IS NOT NULL - AND tx_hash IS NOT NULL -) -SELECT - block_number, - 'debug_traceTransaction' AS method, - CONCAT( - tx_hash, - '_-_', - '{"tracer": "callTracer","timeout": "30s"}' - ) AS params -FROM - tbl -ORDER BY - block_number ASC diff --git a/models/silver/streamline/realtime/streamline__blocks_transactions_realtime.sql b/models/silver/streamline/realtime/streamline__blocks_transactions_realtime.sql index 8ceb652..dacc989 100644 --- a/models/silver/streamline/realtime/streamline__blocks_transactions_realtime.sql +++ b/models/silver/streamline/realtime/streamline__blocks_transactions_realtime.sql @@ -4,17 +4,29 @@ func = 'streamline.udf_bulk_rest_api_v2', target = "{{this.schema}}.{{this.identifier}}", params ={ "external_table" :'blocks_transactions', - "sql_limit" :"30000", - "producer_batch_size" :"30000", - "worker_batch_size" :"10000", + "sql_limit" :"12000", + "producer_batch_size" :"12000", + "worker_batch_size" :"4000", "sql_source" :'{{this.identifier}}', "exploded_key": tojson(['result', 'result.transactions']) } ), tags = ['streamline_core_evm_realtime'] ) }} +with blocks as ( + select + block_number + from {{ ref('streamline__blocks') }} + where block_number >= (select block_number from {{ ref('_block_lookback') }}) + except + select + block_number + from {{ ref('streamline__complete_blocks') }} + inner join {{ ref('streamline__complete_transactions') }} using (block_number) + where block_number >= (select block_number from {{ ref('_block_lookback') }}) +) SELECT - 150815373 as block_number, + block_number, ROUND(block_number, -3) AS partition_key, live.udf_api( 'POST', @@ -30,4 +42,9 @@ SELECT 'params', ARRAY_CONSTRUCT(utils.udf_int_to_hex(block_number), TRUE) ), 'Vault/prod/evm/aurora/mainnet' - ) AS request \ No newline at end of file + ) AS request +from blocks + +order by block_number asc + +limit 12000 \ No newline at end of file diff --git a/models/silver/streamline/realtime/streamline__receipts_by_hash_realtime.sql b/models/silver/streamline/realtime/streamline__receipts_by_hash_realtime.sql index b92808c..f2a4eaa 100644 --- a/models/silver/streamline/realtime/streamline__receipts_by_hash_realtime.sql +++ b/models/silver/streamline/realtime/streamline__receipts_by_hash_realtime.sql @@ -4,17 +4,31 @@ func = 'streamline.udf_bulk_rest_api_v2', target = "{{this.schema}}.{{this.identifier}}", params ={ "external_table" :'receipts_by_hash', - "sql_limit" :"30000", - "producer_batch_size" :"30000", - "worker_batch_size" :"10000", + "sql_limit" :"12000", + "producer_batch_size" :"12000", + "worker_batch_size" :"4000", "sql_source" :'{{this.identifier}}' } ), - tags = ['streamline','core','realtime','phase_1'] + tags = ['streamline_core_evm_realtime_step_2'] ) }} +with txs as ( + select + block_number, + tx_hash + from {{ ref('silver__transactions') }} + where block_number >= (select block_number from {{ ref('_block_lookback') }}) + except + select + block_number, + tx_hash + from {{ ref('streamline__complete_tx_receipts') }} + where block_number >= (select block_number from {{ ref('_block_lookback') }}) +) + SELECT - 150949168 as block_number, - '0x967586085f70584ab5e5886ba9c7d1b2f227f554ed594a491978e4a2110bfdd7' as tx_hash, + block_number, + tx_hash, ROUND( block_number, -3 @@ -33,4 +47,9 @@ SELECT 'params', ARRAY_CONSTRUCT(tx_hash) ), 'Vault/prod/evm/aurora/mainnet' - ) AS request \ No newline at end of file + ) AS request +from txs + +order by block_number asc + +limit 12000 \ No newline at end of file diff --git a/models/silver/streamline/realtime/streamline__traces_by_hash_realtime.sql b/models/silver/streamline/realtime/streamline__traces_by_hash_realtime.sql index 0306455..bbdf617 100644 --- a/models/silver/streamline/realtime/streamline__traces_by_hash_realtime.sql +++ b/models/silver/streamline/realtime/streamline__traces_by_hash_realtime.sql @@ -5,18 +5,32 @@ target = "{{this.schema}}.{{this.identifier}}", params ={ "external_table" :'traces_by_hash', - "sql_limit" :"30000", - "producer_batch_size" :"30000", - "worker_batch_size" :"10000", + "sql_limit" :"12000", + "producer_batch_size" :"12000", + "worker_batch_size" :"4000", "sql_source" :'{{this.identifier}}' } ), - tags = ['streamline_core_evm_realtime'] + tags = ['streamline_core_evm_realtime_step_2'] ) }} +with txs as ( + select + block_number, + tx_hash + from {{ ref('silver__transactions') }} + where block_number >= (select block_number from {{ ref('_block_lookback') }}) + except + select + block_number, + tx_hash + from {{ ref('streamline__complete_traces') }} + where block_number >= (select block_number from {{ ref('_block_lookback') }}) +) + SELECT - 150949168 as block_number, - '0x967586085f70584ab5e5886ba9c7d1b2f227f554ed594a491978e4a2110bfdd7' as tx_hash, + block_number, + tx_hash, ROUND(block_number, -3) AS partition_key, live.udf_api( 'POST', @@ -32,4 +46,9 @@ SELECT 'params', ARRAY_CONSTRUCT(tx_hash, OBJECT_CONSTRUCT('tracer', 'callTracer', 'timeout', '120s')) ), 'Vault/prod/evm/aurora/mainnet' - ) AS request \ No newline at end of file + ) AS request +from txs + +order by block_number asc + +limit 12000 \ No newline at end of file diff --git a/models/silver/streamline/streamline__blocks.sql b/models/silver/streamline/streamline__blocks.sql index 2445bdd..5eb7a8f 100644 --- a/models/silver/streamline/streamline__blocks.sql +++ b/models/silver/streamline/streamline__blocks.sql @@ -1,6 +1,6 @@ {{ config ( materialized = "view", - tags = ['streamline','core','chainhead','phase_1'] + tags = ['streamline_core_evm_realtime'] ) }} SELECT diff --git a/models/silver/streamline/streamline__get_chainhead.sql b/models/silver/streamline/streamline__get_chainhead.sql index 3965979..9bfdc74 100644 --- a/models/silver/streamline/streamline__get_chainhead.sql +++ b/models/silver/streamline/streamline__get_chainhead.sql @@ -1,6 +1,6 @@ {{ config ( materialized = 'table', - tags = ['streamline','core','chainhead','phase_1'] + tags = ['streamline_core_evm_realtime'] ) }} SELECT From 8961b2b988f0a7cdcfa6a4282c8d1433c6d46402 Mon Sep 17 00:00:00 2001 From: Austin Date: Tue, 17 Jun 2025 12:04:58 -0400 Subject: [PATCH 4/4] history --- .../workflows/dbt_run_streamline_history.yml | 2 +- .../dbt_run_streamline_traces_history.yml | 63 ------------------- 2 files changed, 1 insertion(+), 64 deletions(-) delete mode 100644 .github/workflows/dbt_run_streamline_traces_history.yml diff --git a/.github/workflows/dbt_run_streamline_history.yml b/.github/workflows/dbt_run_streamline_history.yml index 7281f01..134c316 100644 --- a/.github/workflows/dbt_run_streamline_history.yml +++ b/.github/workflows/dbt_run_streamline_history.yml @@ -42,7 +42,7 @@ jobs: - name: Run DBT Jobs run: | - dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True, "STREAMLINE_RUN_HISTORY":True}' -m 1+models/silver/streamline/realtime/ --full-refresh + dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True}' -m tag:streamline_core_evm_history - name: Store logs uses: actions/upload-artifact@v4 diff --git a/.github/workflows/dbt_run_streamline_traces_history.yml b/.github/workflows/dbt_run_streamline_traces_history.yml deleted file mode 100644 index 3bae2d5..0000000 --- a/.github/workflows/dbt_run_streamline_traces_history.yml +++ /dev/null @@ -1,63 +0,0 @@ -name: dbt_run_streamline_traces_history -run-name: dbt_run_streamline_traces_history - -on: - workflow_dispatch: - schedule: - # Run every 2 hours - - cron: "0 */2 * * *" - -env: - USE_VARS: "${{ vars.USE_VARS }}" - DBT_PROFILES_DIR: "${{ vars.DBT_PROFILES_DIR }}" - DBT_VERSION: "${{ vars.DBT_VERSION }}" - ACCOUNT: "${{ vars.ACCOUNT }}" - ROLE: "${{ vars.ROLE }}" - USER: "${{ vars.USER }}" - PASSWORD: "${{ secrets.PASSWORD }}" - REGION: "${{ vars.REGION }}" - DATABASE: "${{ vars.DATABASE }}" - WAREHOUSE: "${{ vars.WAREHOUSE }}" - SCHEMA: "${{ vars.SCHEMA }}" - -concurrency: - group: ${{ github.workflow }} - cancel-in-progress: false - - - -jobs: - dbt: - runs-on: ubuntu-latest - environment: - name: workflow_prod - steps: - - uses: actions/checkout@v3 - - uses: actions/setup-python@v4 - with: - python-version: "3.10" - cache: "pip" - - - name: install dependencies - run: | - pip install -r requirements.txt - dbt deps - - - name: Run DBT Jobs - run: | - dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True}' -m 1+models/silver/streamline/history/streamline__traces_history.sql - - - name: Store logs - uses: actions/upload-artifact@v4 - with: - name: dbt-logs - path: | - logs - target - - notify-failure: - needs: [dbt] - if: failure() - uses: ./.github/workflows/slack_notify.yml - secrets: - EVM_SLACK_WEBHOOK_URL: ${{ secrets.EVM_SLACK_WEBHOOK_URL }}