From 93e22d09bb1ed521972bdfba165a1623a8a6f8f8 Mon Sep 17 00:00:00 2001 From: Austin Date: Fri, 13 Jun 2025 13:32:08 -0400 Subject: [PATCH] 2.0 stash --- dbt_project.yml | 26 ++++++ macros/create_udfs.sql | 8 +- macros/js_hextoint.sql | 6 -- macros/python/udfs.sql | 21 ----- macros/streamline/api_integrations.sql | 17 ---- macros/streamline/get_base_table_udtf.sql | 22 ----- macros/streamline/streamline_udfs.sql | 86 +++++++++++++++---- models/admin/admin__number_sequence.sql | 19 ++++ models/admin/admin__number_sequence.yml | 14 +++ models/bronze/core/bronze__blocks.sql | 39 +++++++++ models/bronze/core/bronze__blocks_fr.sql | 26 ++++++ ...FR_traces.sql => bronze__blocks_fr_v1.sql} | 43 +++++----- models/bronze/core/bronze__blocks_fr_v2.sql | 39 +++++++++ models/bronze/core/bronze__receipts.sql | 0 models/bronze/core/bronze__receipts_fr.sql | 0 ...eceipts.sql => bronze__receipts_fr_v1.sql} | 0 models/bronze/core/bronze__receipts_fr_v2.sql | 0 .../core/bronze__streamline_FR_blocks.sql | 11 --- ...nze__streamline_FR_blocks_transactions.sql | 11 --- .../bronze__streamline_FR_transactions.sql | 79 ----------------- .../bronze__streamline_FR_tx_receipts.sql | 55 ------------ .../bronze/core/bronze__streamline_blocks.sql | 11 --- ...bronze__streamline_blocks_transactions.sql | 11 --- models/bronze/core/bronze__traces.sql | 0 models/bronze/core/bronze__traces_fr.sql | 0 ...ne_traces.sql => bronze__traces_fr_v1.sql} | 0 models/bronze/core/bronze__traces_fr_v2.sql | 0 models/bronze/core/bronze__transactions.sql | 39 +++++++++ .../bronze/core/bronze__transactions_fr.sql | 26 ++++++ ...ons.sql => bronze__transactions_fr_v1.sql} | 6 ++ .../core/bronze__transactions_fr_v2.sql | 39 +++++++++ models/silver/core/silver__blocks.sql | 6 +- models/silver/core/silver__receipts.sql | 6 +- models/silver/core/silver__traces.sql | 6 +- models/silver/core/silver__transactions.sql | 6 +- models/silver/streamline/_block_lookback.sql | 11 +++ .../complete/streamline__complete_blocks.sql | 6 +- ...reamline__complete_blocks_transactions.sql | 30 ------- .../complete/streamline__complete_traces.sql | 6 +- .../streamline__complete_transactions.sql | 6 +- .../streamline__complete_tx_receipts.sql | 6 +- .../realtime/streamline__blocks_realtime.sql | 64 -------------- ...reamline__blocks_transactions_realtime.sql | 33 +++++++ .../streamline__receipts_by_hash_realtime.sql | 36 ++++++++ .../realtime/streamline__traces_realtime.sql | 36 ++++++++ .../streamline__transactions_realtime.sql | 64 -------------- .../streamline__tx_receipts_realtime.sql | 48 ----------- .../silver/streamline/streamline__blocks.sql | 38 ++++---- .../streamline/streamline__get_chainhead.sql | 28 ++++++ models/sources.yml | 5 +- package-lock.yml | 30 +++---- packages.yml | 2 +- 52 files changed, 567 insertions(+), 560 deletions(-) delete mode 100644 macros/js_hextoint.sql delete mode 100644 macros/python/udfs.sql delete mode 100644 macros/streamline/api_integrations.sql delete mode 100644 macros/streamline/get_base_table_udtf.sql create mode 100644 models/admin/admin__number_sequence.sql create mode 100644 models/admin/admin__number_sequence.yml create mode 100644 models/bronze/core/bronze__blocks.sql create mode 100644 models/bronze/core/bronze__blocks_fr.sql rename models/bronze/core/{bronze__streamline_FR_traces.sql => bronze__blocks_fr_v1.sql} (50%) create mode 100644 models/bronze/core/bronze__blocks_fr_v2.sql create mode 100644 models/bronze/core/bronze__receipts.sql create mode 100644 models/bronze/core/bronze__receipts_fr.sql rename models/bronze/core/{bronze__streamline_tx_receipts.sql => bronze__receipts_fr_v1.sql} (100%) create mode 100644 models/bronze/core/bronze__receipts_fr_v2.sql delete mode 100644 models/bronze/core/bronze__streamline_FR_blocks.sql delete mode 100644 models/bronze/core/bronze__streamline_FR_blocks_transactions.sql delete mode 100644 models/bronze/core/bronze__streamline_FR_transactions.sql delete mode 100644 models/bronze/core/bronze__streamline_FR_tx_receipts.sql delete mode 100644 models/bronze/core/bronze__streamline_blocks.sql delete mode 100644 models/bronze/core/bronze__streamline_blocks_transactions.sql create mode 100644 models/bronze/core/bronze__traces.sql create mode 100644 models/bronze/core/bronze__traces_fr.sql rename models/bronze/core/{bronze__streamline_traces.sql => bronze__traces_fr_v1.sql} (100%) create mode 100644 models/bronze/core/bronze__traces_fr_v2.sql create mode 100644 models/bronze/core/bronze__transactions.sql create mode 100644 models/bronze/core/bronze__transactions_fr.sql rename models/bronze/core/{bronze__streamline_transactions.sql => bronze__transactions_fr_v1.sql} (95%) create mode 100644 models/bronze/core/bronze__transactions_fr_v2.sql create mode 100644 models/silver/streamline/_block_lookback.sql delete mode 100644 models/silver/streamline/complete/streamline__complete_blocks_transactions.sql delete mode 100644 models/silver/streamline/realtime/streamline__blocks_realtime.sql create mode 100644 models/silver/streamline/realtime/streamline__blocks_transactions_realtime.sql create mode 100644 models/silver/streamline/realtime/streamline__receipts_by_hash_realtime.sql create mode 100644 models/silver/streamline/realtime/streamline__traces_realtime.sql delete mode 100644 models/silver/streamline/realtime/streamline__transactions_realtime.sql delete mode 100644 models/silver/streamline/realtime/streamline__tx_receipts_realtime.sql create mode 100644 models/silver/streamline/streamline__get_chainhead.sql diff --git a/dbt_project.yml b/dbt_project.yml index 2ba4bf7..89c0d84 100644 --- a/dbt_project.yml +++ b/dbt_project.yml @@ -58,6 +58,32 @@ vars: 'silver/streamline' ] + #### STREAMLINE 2.0 BEGIN #### + + API_INTEGRATION: '{{ var("config")[target.name]["API_INTEGRATION"] if var("config")[target.name] else var("config")["dev"]["API_INTEGRATION"] }}' + EXTERNAL_FUNCTION_URI: '{{ var("config")[target.name]["EXTERNAL_FUNCTION_URI"] if var("config")[target.name] else var("config")["dev"]["EXTERNAL_FUNCTION_URI"] }}' + ROLES: | + ["INTERNAL_DEV"] + + config: + # The keys correspond to dbt profiles and are case sensitive + dev: + API_INTEGRATION: AWS_AURORA_API_STG_V2 + EXTERNAL_FUNCTION_URI: ovimat5dxg.execute-api.us-east-1.amazonaws.com/stg/ + ROLES: + - AWS_LAMBDA_AURORA_API + - INTERNAL_DEV + + prod: + API_INTEGRATION: AWS_AURORA_API_PROD_V2 + EXTERNAL_FUNCTION_URI: 0sbz1dl1ik.execute-api.us-east-1.amazonaws.com/prod/ + ROLES: + - AWS_LAMBDA_AURORA_API + - INTERNAL_DEV + - DBT_CLOUD_AURORA + + #### STREAMLINE 2.0 END #### + tests: +store_failures: true # all tests diff --git a/macros/create_udfs.sql b/macros/create_udfs.sql index f816180..82313e6 100644 --- a/macros/create_udfs.sql +++ b/macros/create_udfs.sql @@ -1,11 +1,5 @@ {% macro create_udfs() %} {% if var("UPDATE_UDFS_AND_SPS") %} - {% set sql %} - {{ create_udf_get_chainhead() }} - {{ create_udf_json_rpc() }} - - {% endset %} - {% do run_query(sql) %} {{- fsc_utils.create_udfs() -}} {% endif %} -{% endmacro %} +{% endmacro %} \ No newline at end of file diff --git a/macros/js_hextoint.sql b/macros/js_hextoint.sql deleted file mode 100644 index 8742ac3..0000000 --- a/macros/js_hextoint.sql +++ /dev/null @@ -1,6 +0,0 @@ -{% macro create_js_hex_to_int() %} - CREATE - OR REPLACE FUNCTION {{ target.schema }}.js_hex_to_int ( - s STRING - ) returns DOUBLE LANGUAGE javascript AS 'if (S !== null) { yourNumber = parseInt(S, 16); } return yourNumber' -{% endmacro %} diff --git a/macros/python/udfs.sql b/macros/python/udfs.sql deleted file mode 100644 index df0489e..0000000 --- a/macros/python/udfs.sql +++ /dev/null @@ -1,21 +0,0 @@ -{% macro create_udf_hex_to_int(schema) %} -create or replace function {{ schema }}.udf_hex_to_int(hex string) -returns string -language python -runtime_version = '3.8' -handler = 'hex_to_int' -as -$$ -def hex_to_int(hex) -> str: - """ - Converts hex (of any size) to int (as a string). Snowflake and java script can only handle up to 64-bit (38 digits of precision) - select hex_to_int('200000000000000000000000000000211'); - >> 680564733841876926926749214863536423441 - select hex_to_int('0x200000000000000000000000000000211'); - >> 680564733841876926926749214863536423441 - select hex_to_int(NULL); - >> NULL - """ - return str(int(hex, 16)) if hex else None -$$; -{% endmacro %} \ No newline at end of file diff --git a/macros/streamline/api_integrations.sql b/macros/streamline/api_integrations.sql deleted file mode 100644 index 7414d94..0000000 --- a/macros/streamline/api_integrations.sql +++ /dev/null @@ -1,17 +0,0 @@ -{% macro create_aws_aurora_api() %} - {% if target.name == "prod" %} - {% set sql %} - CREATE api integration IF NOT EXISTS aws_aurora_api api_provider = aws_api_gateway api_aws_role_arn = 'arn:aws:iam::490041342817:role/aurora-api-prod-rolesnowflakeudfsAF733095-3WVDCVO54NPX' api_allowed_prefixes = ( - 'https://sl2f5beopl.execute-api.us-east-1.amazonaws.com/prod/' - ) enabled = TRUE; - {% endset %} - {% do run_query(sql) %} - {% else %} - {% set sql %} - CREATE OR REPLACE api integration aws_aurora_dev_api api_provider = aws_api_gateway api_aws_role_arn = 'arn:aws:iam::490041342817:role/aurora-api-dev-rolesnowflakeudfsAF733095-AN4Q3176CUYA' api_allowed_prefixes = ( - 'https://xh409mek2a.execute-api.us-east-1.amazonaws.com/dev/' - ) enabled = TRUE; - {% endset %} - {% do run_query(sql) %} - {% endif %} -{% endmacro %} \ No newline at end of file diff --git a/macros/streamline/get_base_table_udtf.sql b/macros/streamline/get_base_table_udtf.sql deleted file mode 100644 index f9f65b8..0000000 --- a/macros/streamline/get_base_table_udtf.sql +++ /dev/null @@ -1,22 +0,0 @@ -{% macro create_udtf_get_base_table(schema) %} - CREATE - OR REPLACE FUNCTION {{ schema }}.udtf_get_base_table( - max_height INTEGER - ) returns TABLE ( - height NUMBER - ) AS $$ WITH base AS ( - SELECT - ROW_NUMBER() over ( - ORDER BY - SEQ4() - ) AS id - FROM - TABLE(GENERATOR(rowcount => 500000000)) - ) -SELECT - id AS height -FROM - base -WHERE - id <= max_height $$; -{% endmacro %} diff --git a/macros/streamline/streamline_udfs.sql b/macros/streamline/streamline_udfs.sql index fa1b70c..f459c25 100644 --- a/macros/streamline/streamline_udfs.sql +++ b/macros/streamline/streamline_udfs.sql @@ -1,23 +1,71 @@ -{% macro create_udf_get_chainhead() %} - {% if target.name == "prod" %} - CREATE OR REPLACE EXTERNAL FUNCTION streamline.udf_get_chainhead() returns variant api_integration = aws_aurora_api AS - 'https://sl2f5beopl.execute-api.us-east-1.amazonaws.com/prod/get_chainhead' +{% macro create_udf_bulk_rest_api_v2() %} + {{ log("Creating udf udf_bulk_rest_api for target:" ~ target.name ~ ", schema: " ~ target.schema ~ ", DB: " ~ target.database, info=True) }} + {{ log("role:" ~ target.role ~ ", user:" ~ target.user, info=True) }} + + {% set sql %} + CREATE OR REPLACE EXTERNAL FUNCTION streamline.udf_bulk_rest_api_v2(json object) returns array api_integration = + {% if target.name == "prod" %} + {{ log("Creating prod udf_bulk_rest_api_v2", info=True) }} + {{ var("API_INTEGRATION") }} AS 'https://{{ var("EXTERNAL_FUNCTION_URI") | lower }}udf_bulk_rest_api' + {% elif target.name == "dev" %} + {{ log("Creating dev udf_bulk_rest_api_v2", info=True) }} + {{ var("API_INTEGRATION") }} AS 'https://{{ var("EXTERNAL_FUNCTION_URI") | lower }}udf_bulk_rest_api' + {% elif target.name == "sbx" %} + {{ log("Creating stg udf_bulk_rest_api_v2", info=True) }} + {{ var("API_INTEGRATION") }} AS 'https://{{ var("EXTERNAL_FUNCTION_URI") | lower }}udf_bulk_rest_api' {% else %} - CREATE OR REPLACE EXTERNAL FUNCTION streamline.udf_get_chainhead() returns variant api_integration = aws_aurora_dev_api AS - 'https://xh409mek2a.execute-api.us-east-1.amazonaws.com/dev/get_chainhead' - {%- endif %}; + {{ log("Creating default (dev) udf_bulk_rest_api_v2", info=True) }} + {{ var("config")["dev"]["API_INTEGRATION"] }} AS 'https://{{ var("config")["dev"]["EXTERNAL_FUNCTION_URI"] | lower }}udf_bulk_rest_api' + {% endif %}; + {% endset %} + {{ log(sql, info=True) }} + {% do adapter.execute(sql) %} {% endmacro %} -{% macro create_udf_json_rpc() %} - {% if target.name == "prod" %} - CREATE OR REPLACE EXTERNAL FUNCTION streamline.udf_json_rpc( - json OBJECT - ) returns ARRAY api_integration = aws_aurora_api AS - 'https://sl2f5beopl.execute-api.us-east-1.amazonaws.com/prod/bulk_get_json_rpc' +{% macro create_udf_bulk_decode_logs() %} + {{ log("Creating udf udf_bulk_decode_logs_v2 for target:" ~ target.name ~ ", schema: " ~ target.schema ~ ", DB: " ~ target.database, info=True) }} + {{ log("role:" ~ target.role ~ ", user:" ~ target.user, info=True) }} + + {% set sql %} + CREATE OR REPLACE EXTERNAL FUNCTION streamline.udf_bulk_decode_logs_v2(json object) returns array api_integration = + {% if target.name == "prod" %} + {{ log("Creating prod udf_bulk_decode_logs_v2", info=True) }} + {{ var("API_INTEGRATION") }} AS 'https://{{ var("EXTERNAL_FUNCTION_URI") | lower }}bulk_decode_logs' + {% elif target.name == "dev" %} + {{ log("Creating dev udf_bulk_decode_logs_v2", info=True) }} + {{ var("API_INTEGRATION") }} AS 'https://{{ var("EXTERNAL_FUNCTION_URI") | lower }}bulk_decode_logs' + {% elif target.name == "sbx" %} + {{ log("Creating stg udf_bulk_decode_logs_v2", info=True) }} + {{ var("API_INTEGRATION") }} AS 'https://{{ var("EXTERNAL_FUNCTION_URI") | lower }}bulk_decode_logs' {% else %} - CREATE OR REPLACE EXTERNAL FUNCTION streamline.udf_json_rpc( - json OBJECT - ) returns ARRAY api_integration = aws_aurora_dev_api AS - 'https://xh409mek2a.execute-api.us-east-1.amazonaws.com/dev/bulk_get_json_rpc' - {%- endif %}; -{% endmacro %} \ No newline at end of file + {{ log("Creating default (dev) udf_bulk_decode_logs_v2", info=True) }} + {{ var("config")["dev"]["API_INTEGRATION"] }} AS 'https://{{ var("config")["dev"]["EXTERNAL_FUNCTION_URI"] | lower }}bulk_decode_logs' + {% endif %}; + {% endset %} + {{ log(sql, info=True) }} + {% do adapter.execute(sql) %} +{% endmacro %} + +{% macro create_udf_bulk_decode_traces() %} + {{ log("Creating udf udf_bulk_decode_traces_v2 for target:" ~ target.name ~ ", schema: " ~ target.schema ~ ", DB: " ~ target.database, info=True) }} + {{ log("role:" ~ target.role ~ ", user:" ~ target.user, info=True) }} + + {% set sql %} + CREATE OR REPLACE EXTERNAL FUNCTION streamline.udf_bulk_decode_traces_v2(json object) returns array api_integration = + {% if target.name == "prod" %} + {{ log("Creating prod udf_bulk_decode_traces_v2", info=True) }} + {{ var("API_INTEGRATION") }} AS 'https://{{ var("EXTERNAL_FUNCTION_URI") | lower }}bulk_decode_traces' + {% elif target.name == "dev" %} + {{ log("Creating dev udf_bulk_decode_traces_v2", info=True) }} + {{ var("API_INTEGRATION") }} AS 'https://{{ var("EXTERNAL_FUNCTION_URI") | lower }}bulk_decode_traces' + {% elif target.name == "sbx" %} + {{ log("Creating stg udf_bulk_decode_traces_v2", info=True) }} + {{ var("API_INTEGRATION") }} AS 'https://{{ var("EXTERNAL_FUNCTION_URI") | lower }}bulk_decode_traces' + {% else %} + {{ log("Creating default (dev) udf_bulk_decode_traces_v2", info=True) }} + {{ var("config")["dev"]["API_INTEGRATION"] }} AS 'https://{{ var("config")["dev"]["EXTERNAL_FUNCTION_URI"] | lower }}bulk_decode_traces' + {% endif %}; + {% endset %} + {{ log(sql, info=True) }} + {% do adapter.execute(sql) %} +{% endmacro %} diff --git a/models/admin/admin__number_sequence.sql b/models/admin/admin__number_sequence.sql new file mode 100644 index 0000000..06a2af0 --- /dev/null +++ b/models/admin/admin__number_sequence.sql @@ -0,0 +1,19 @@ +{{ config( + materialized = 'incremental', + cluster_by = 'round(_id,-3)', + post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(_id)", + full_refresh = false, + tags = ['silver','admin','phase_1'] +) }} + +SELECT + ROW_NUMBER() over ( + ORDER BY + SEQ4() + ) - 1 :: INT AS _id +FROM + TABLE(GENERATOR(rowcount => 1000000000)) +WHERE 1=1 +{% if is_incremental() %} + AND 1=0 +{% endif %} \ No newline at end of file diff --git a/models/admin/admin__number_sequence.yml b/models/admin/admin__number_sequence.yml new file mode 100644 index 0000000..ee97bd6 --- /dev/null +++ b/models/admin/admin__number_sequence.yml @@ -0,0 +1,14 @@ +version: 2 +models: + - name: admin__number_sequence + description: | + This model generates a sequence of numbers for a given range. + tests: + - dbt_utils.unique_combination_of_columns: + combination_of_columns: + - _ID + columns: + - name: _ID + tests: + - not_null + description: Primary key for the table \ No newline at end of file diff --git a/models/bronze/core/bronze__blocks.sql b/models/bronze/core/bronze__blocks.sql new file mode 100644 index 0000000..c825eff --- /dev/null +++ b/models/bronze/core/bronze__blocks.sql @@ -0,0 +1,39 @@ +{{ config ( + materialized = 'view', + tags = ['bronze','core','streamline_v1','phase_1'] +) }} + +WITH meta AS ( + SELECT + job_created_time AS _inserted_timestamp, + file_name, + CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER) AS partition_key + FROM + TABLE( + information_schema.external_table_file_registration_history( + start_time => DATEADD('day', -3, CURRENT_TIMESTAMP()), + table_name => '{{ source( "bronze_streamline", "blocks_v2") }}' + ) + ) A +) + SELECT + s.*, + b.file_name, + b._inserted_timestamp, + COALESCE( + s.value :"BLOCK_NUMBER" :: STRING, + s.metadata :request :"data" :id :: STRING, + PARSE_JSON( + s.metadata :request :"data" + ) :id :: STRING + ) :: INT AS block_number + FROM + {{ source( "bronze_streamline", "blocks_v2") }} + s + JOIN meta b + ON b.file_name = metadata$filename + AND b.partition_key = s.partition_key + WHERE + b.partition_key = s.partition_key + AND DATA :error IS NULL + AND DATA IS NOT NULL \ No newline at end of file diff --git a/models/bronze/core/bronze__blocks_fr.sql b/models/bronze/core/bronze__blocks_fr.sql new file mode 100644 index 0000000..d8602af --- /dev/null +++ b/models/bronze/core/bronze__blocks_fr.sql @@ -0,0 +1,26 @@ +{{ config ( + materialized = 'view', + tags = ['bronze','core','streamline_v1','phase_1'] +) }} + +SELECT + partition_key, + block_number, + VALUE, + DATA, + metadata, + file_name, + _inserted_timestamp +FROM + {{ ref('bronze__blocks_fr_v2') }} +UNION ALL +SELECT + _partition_by_block_id AS partition_key, + block_number, + VALUE, + DATA, + metadata, + file_name, + _inserted_timestamp +FROM + {{ ref('bronze__blocks_fr_v1') }} \ No newline at end of file diff --git a/models/bronze/core/bronze__streamline_FR_traces.sql b/models/bronze/core/bronze__blocks_fr_v1.sql similarity index 50% rename from models/bronze/core/bronze__streamline_FR_traces.sql rename to models/bronze/core/bronze__blocks_fr_v1.sql index 2cc38e5..7f05f05 100644 --- a/models/bronze/core/bronze__streamline_FR_traces.sql +++ b/models/bronze/core/bronze__blocks_fr_v1.sql @@ -1,37 +1,35 @@ {{ config ( - materialized = 'view' + materialized = 'view', + tags = ['bronze','core','streamline_v1','phase_1'] ) }} -WITH meta AS ( - - SELECT - registered_on AS _inserted_timestamp, - file_name, - CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER) AS _partition_by_block_id - FROM - TABLE( - information_schema.external_table_files( - table_name => '{{ source( "bronze_streamline", "traces") }}' - ) - ) A -) + WITH meta AS ( + SELECT + registered_on AS _inserted_timestamp, + file_name, + CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER ) AS _partition_by_block_id + FROM + TABLE( + information_schema.external_table_files( + table_name => '{{ source( "bronze_streamline", "blocks") }}' + ) + ) A + ) SELECT block_number, - s.value :metadata :request :params [0] :: STRING AS tx_hash, DATA, + metadata, + file_name, _inserted_timestamp, MD5( CAST( - COALESCE(CAST(tx_hash AS text), '' :: STRING) AS text + COALESCE(CAST(block_number AS text), '' :: STRING) AS text ) ) AS id, s._partition_by_block_id, s.value AS VALUE FROM - {{ source( - "bronze_streamline", - "traces" - ) }} + {{ source( "bronze_streamline", "blocks") }} s JOIN meta b ON b.file_name = metadata$filename @@ -51,7 +49,6 @@ WHERE '-32007', '-32008', '-32009', - '-32010', - '-32608' + '-32010' ) - ) + ) \ No newline at end of file diff --git a/models/bronze/core/bronze__blocks_fr_v2.sql b/models/bronze/core/bronze__blocks_fr_v2.sql new file mode 100644 index 0000000..fb1ff8a --- /dev/null +++ b/models/bronze/core/bronze__blocks_fr_v2.sql @@ -0,0 +1,39 @@ +{{ config ( + materialized = 'view', + tags = ['bronze','core','streamline_v1','phase_1'] +) }} + + WITH meta AS ( + SELECT + registered_on AS _inserted_timestamp, + file_name, + CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER ) AS partition_key + FROM + TABLE( + information_schema.external_table_files( + table_name => '{{ source( "bronze_streamline", "blocks_v2") }}' + ) + ) A + ) +SELECT + s.*, + b.file_name, + b._inserted_timestamp, + COALESCE( + s.value :"BLOCK_NUMBER" :: STRING, + s.value :"block_number" :: STRING, + s.metadata :request :"data" :id :: STRING, + PARSE_JSON( + s.metadata :request :"data" + ) :id :: STRING + ) :: INT AS block_number +FROM + {{ source( "bronze_streamline", "blocks_v2") }} + s + JOIN meta b + ON b.file_name = metadata$filename + AND b.partition_key = s.partition_key +WHERE + b.partition_key = s.partition_key + AND DATA :error IS NULL + AND DATA IS NOT NULL \ No newline at end of file diff --git a/models/bronze/core/bronze__receipts.sql b/models/bronze/core/bronze__receipts.sql new file mode 100644 index 0000000..e69de29 diff --git a/models/bronze/core/bronze__receipts_fr.sql b/models/bronze/core/bronze__receipts_fr.sql new file mode 100644 index 0000000..e69de29 diff --git a/models/bronze/core/bronze__streamline_tx_receipts.sql b/models/bronze/core/bronze__receipts_fr_v1.sql similarity index 100% rename from models/bronze/core/bronze__streamline_tx_receipts.sql rename to models/bronze/core/bronze__receipts_fr_v1.sql diff --git a/models/bronze/core/bronze__receipts_fr_v2.sql b/models/bronze/core/bronze__receipts_fr_v2.sql new file mode 100644 index 0000000..e69de29 diff --git a/models/bronze/core/bronze__streamline_FR_blocks.sql b/models/bronze/core/bronze__streamline_FR_blocks.sql deleted file mode 100644 index e0a16c6..0000000 --- a/models/bronze/core/bronze__streamline_FR_blocks.sql +++ /dev/null @@ -1,11 +0,0 @@ -{{ config ( - materialized = 'view' -) }} - -{% set model = this.identifier.split("_") [-1] %} -{{ streamline_external_table_FR_query( - model, - partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )", - partition_name = "_partition_by_block_id", - unique_key = "block_number" -) }} diff --git a/models/bronze/core/bronze__streamline_FR_blocks_transactions.sql b/models/bronze/core/bronze__streamline_FR_blocks_transactions.sql deleted file mode 100644 index e0a16c6..0000000 --- a/models/bronze/core/bronze__streamline_FR_blocks_transactions.sql +++ /dev/null @@ -1,11 +0,0 @@ -{{ config ( - materialized = 'view' -) }} - -{% set model = this.identifier.split("_") [-1] %} -{{ streamline_external_table_FR_query( - model, - partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )", - partition_name = "_partition_by_block_id", - unique_key = "block_number" -) }} diff --git a/models/bronze/core/bronze__streamline_FR_transactions.sql b/models/bronze/core/bronze__streamline_FR_transactions.sql deleted file mode 100644 index b27a8f8..0000000 --- a/models/bronze/core/bronze__streamline_FR_transactions.sql +++ /dev/null @@ -1,79 +0,0 @@ -{{ config ( - materialized = 'view' -) }} - -WITH meta AS ( - - SELECT - last_modified AS _inserted_timestamp, - file_name, - CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER) AS _partition_by_block_id - FROM - TABLE( - information_schema.external_table_files( - table_name => '{{ source( "bronze_streamline", "transactions") }}' - ) - ) A -), -tbl AS ( - SELECT - block_number, - COALESCE(s.data :hash :: STRING, s.data :result :hash :: STRING) AS tx_hash, - _inserted_timestamp, - s._partition_by_block_id, - s.value AS VALUE, - s.data AS DATA - FROM - {{ source( - "bronze_streamline", - "transactions" - ) }} - s - JOIN meta b - ON b.file_name = metadata$filename - AND b._partition_by_block_id = s._partition_by_block_id - WHERE - b._partition_by_block_id = s._partition_by_block_id - AND ( - DATA :error :code IS NULL - OR DATA :error :code NOT IN ( - '-32000', - '-32001', - '-32002', - '-32003', - '-32004', - '-32005', - '-32006', - '-32007', - '-32008', - '-32009', - '-32010' - ) - ) -) -SELECT - block_number, - COALESCE(f.value :hash :: STRING, tx_hash) AS tx_hash, - _inserted_timestamp, - MD5( - CAST( - COALESCE( - CAST( - CONCAT( - block_number, - '_-_', - COALESCE(COALESCE(f.value :hash :: STRING, tx_hash), '') - ) AS text - ), - '' :: STRING - ) AS text - ) - ) AS id, - _partition_by_block_id, - COALESCE(f.value, tbl.data) AS VALUE -FROM - tbl, - LATERAL FLATTEN( - input => VALUE :data :result :transactions, OUTER => TRUE - ) f -WHERE f.value IS NOT NULL OR tbl.data :transactionIndex IS NOT NULL \ No newline at end of file diff --git a/models/bronze/core/bronze__streamline_FR_tx_receipts.sql b/models/bronze/core/bronze__streamline_FR_tx_receipts.sql deleted file mode 100644 index e67934e..0000000 --- a/models/bronze/core/bronze__streamline_FR_tx_receipts.sql +++ /dev/null @@ -1,55 +0,0 @@ -{{ config ( - materialized = 'view' -) }} - -WITH meta AS ( - - SELECT - registered_on AS _inserted_timestamp, - file_name, - CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER) AS _partition_by_block_id - FROM - TABLE( - information_schema.external_table_files( - table_name => '{{ source( "bronze_streamline", "tx_receipts") }}' - ) - ) A -) -SELECT - block_number, - value :data :result :transactionHash ::STRING AS tx_hash, - _inserted_timestamp, - MD5( - CAST( - COALESCE(CAST(CONCAT(block_number, '_-_', COALESCE(value :data :result :transactionHash ::STRING, '')) AS text), '' :: STRING) AS text - ) - ) AS id, - s._partition_by_block_id, - s.value:data:result AS VALUE -FROM - {{ source( - "bronze_streamline", - "tx_receipts" - ) }} - s - JOIN meta b - ON b.file_name = metadata$filename - AND b._partition_by_block_id = s._partition_by_block_id -WHERE - b._partition_by_block_id = s._partition_by_block_id - AND ( - DATA :error :code IS NULL - OR DATA :error :code NOT IN ( - '-32000', - '-32001', - '-32002', - '-32003', - '-32004', - '-32005', - '-32006', - '-32007', - '-32008', - '-32009', - '-32010' - ) - ) diff --git a/models/bronze/core/bronze__streamline_blocks.sql b/models/bronze/core/bronze__streamline_blocks.sql deleted file mode 100644 index de33225..0000000 --- a/models/bronze/core/bronze__streamline_blocks.sql +++ /dev/null @@ -1,11 +0,0 @@ -{{ config ( - materialized = 'view' -) }} - -{% set model = this.identifier.split("_") [-1] %} -{{ streamline_external_table_query( - model, - partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )", - partition_name = "_partition_by_block_id", - unique_key = "block_number" -) }} diff --git a/models/bronze/core/bronze__streamline_blocks_transactions.sql b/models/bronze/core/bronze__streamline_blocks_transactions.sql deleted file mode 100644 index de33225..0000000 --- a/models/bronze/core/bronze__streamline_blocks_transactions.sql +++ /dev/null @@ -1,11 +0,0 @@ -{{ config ( - materialized = 'view' -) }} - -{% set model = this.identifier.split("_") [-1] %} -{{ streamline_external_table_query( - model, - partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )", - partition_name = "_partition_by_block_id", - unique_key = "block_number" -) }} diff --git a/models/bronze/core/bronze__traces.sql b/models/bronze/core/bronze__traces.sql new file mode 100644 index 0000000..e69de29 diff --git a/models/bronze/core/bronze__traces_fr.sql b/models/bronze/core/bronze__traces_fr.sql new file mode 100644 index 0000000..e69de29 diff --git a/models/bronze/core/bronze__streamline_traces.sql b/models/bronze/core/bronze__traces_fr_v1.sql similarity index 100% rename from models/bronze/core/bronze__streamline_traces.sql rename to models/bronze/core/bronze__traces_fr_v1.sql diff --git a/models/bronze/core/bronze__traces_fr_v2.sql b/models/bronze/core/bronze__traces_fr_v2.sql new file mode 100644 index 0000000..e69de29 diff --git a/models/bronze/core/bronze__transactions.sql b/models/bronze/core/bronze__transactions.sql new file mode 100644 index 0000000..5eee0bf --- /dev/null +++ b/models/bronze/core/bronze__transactions.sql @@ -0,0 +1,39 @@ +{{ config ( + materialized = 'view', + tags = ['bronze','core','streamline_v1','phase_1'] +) }} + +WITH meta AS ( + SELECT + job_created_time AS _inserted_timestamp, + file_name, + CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER) AS partition_key + FROM + TABLE( + information_schema.external_table_file_registration_history( + start_time => DATEADD('day', -3, CURRENT_TIMESTAMP()), + table_name => '{{ source( "bronze_streamline", "transactions_v2") }}' + ) + ) A +) + SELECT + s.*, + b.file_name, + b._inserted_timestamp, + COALESCE( + s.value :"BLOCK_NUMBER" :: STRING, + s.metadata :request :"data" :id :: STRING, + PARSE_JSON( + s.metadata :request :"data" + ) :id :: STRING + ) :: INT AS block_number + FROM + {{ source( "bronze_streamline", "transactions_v2") }} + s + JOIN meta b + ON b.file_name = metadata$filename + AND b.partition_key = s.partition_key + WHERE + b.partition_key = s.partition_key + AND DATA :error IS NULL + AND DATA IS NOT NULL \ No newline at end of file diff --git a/models/bronze/core/bronze__transactions_fr.sql b/models/bronze/core/bronze__transactions_fr.sql new file mode 100644 index 0000000..b5f9cc2 --- /dev/null +++ b/models/bronze/core/bronze__transactions_fr.sql @@ -0,0 +1,26 @@ +{{ config ( + materialized = 'view', + tags = ['bronze','core','streamline_v1','phase_1'] +) }} + +SELECT + partition_key, + block_number, + VALUE, + DATA, + metadata, + file_name, + _inserted_timestamp +FROM + {{ ref('bronze__transactions_fr_v2') }} +UNION ALL +SELECT + _partition_by_block_id AS partition_key, + block_number, + VALUE, + DATA, + metadata, + file_name, + _inserted_timestamp +FROM + {{ ref('bronze__transactions_fr_v1') }} \ No newline at end of file diff --git a/models/bronze/core/bronze__streamline_transactions.sql b/models/bronze/core/bronze__transactions_fr_v1.sql similarity index 95% rename from models/bronze/core/bronze__streamline_transactions.sql rename to models/bronze/core/bronze__transactions_fr_v1.sql index e85321f..b925577 100644 --- a/models/bronze/core/bronze__streamline_transactions.sql +++ b/models/bronze/core/bronze__transactions_fr_v1.sql @@ -21,6 +21,8 @@ tbl AS ( block_number, COALESCE(s.data :hash :: STRING, s.data :result :hash :: STRING) AS tx_hash, _inserted_timestamp, + metadata, + file_name, s._partition_by_block_id, s.value AS VALUE, s.data AS DATA @@ -70,7 +72,11 @@ SELECT ) AS text ) ) AS id, + data, + metadata, + file_name, _partition_by_block_id, + _inserted_timestamp, COALESCE(f.value, tbl.data:result, tbl.data) AS VALUE FROM tbl, diff --git a/models/bronze/core/bronze__transactions_fr_v2.sql b/models/bronze/core/bronze__transactions_fr_v2.sql new file mode 100644 index 0000000..ef5982a --- /dev/null +++ b/models/bronze/core/bronze__transactions_fr_v2.sql @@ -0,0 +1,39 @@ +{{ config ( + materialized = 'view', + tags = ['bronze','core','streamline_v1','phase_1'] +) }} + + WITH meta AS ( + SELECT + registered_on AS _inserted_timestamp, + file_name, + CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER ) AS partition_key + FROM + TABLE( + information_schema.external_table_files( + table_name => '{{ source( "bronze_streamline", "transactions_v2") }}' + ) + ) A + ) +SELECT + s.*, + b.file_name, + b._inserted_timestamp, + COALESCE( + s.value :"BLOCK_NUMBER" :: STRING, + s.value :"block_number" :: STRING, + s.metadata :request :"data" :id :: STRING, + PARSE_JSON( + s.metadata :request :"data" + ) :id :: STRING + ) :: INT AS block_number +FROM + {{ source( "bronze_streamline", "transactions_v2") }} + s + JOIN meta b + ON b.file_name = metadata$filename + AND b.partition_key = s.partition_key +WHERE + b.partition_key = s.partition_key + AND DATA :error IS NULL + AND DATA IS NOT NULL \ No newline at end of file diff --git a/models/silver/core/silver__blocks.sql b/models/silver/core/silver__blocks.sql index 08478a9..301667d 100644 --- a/models/silver/core/silver__blocks.sql +++ b/models/silver/core/silver__blocks.sql @@ -1,4 +1,4 @@ --- depends_on: {{ ref('bronze__streamline_blocks') }} +-- depends_on: {{ ref('bronze__blocks') }} {{ config( materialized = 'incremental', incremental_strategy = 'delete+insert', @@ -57,7 +57,7 @@ SELECT FROM {% if is_incremental() %} -{{ ref('bronze__streamline_blocks') }} +{{ ref('bronze__blocks') }} WHERE _inserted_timestamp >= ( SELECT @@ -66,7 +66,7 @@ WHERE {{ this }} ) {% else %} - {{ ref('bronze__streamline_FR_blocks') }} + {{ ref('bronze__blocks_fr') }} {% endif %} qualify(ROW_NUMBER() over (PARTITION BY block_number diff --git a/models/silver/core/silver__receipts.sql b/models/silver/core/silver__receipts.sql index b9a37dc..9c0203f 100644 --- a/models/silver/core/silver__receipts.sql +++ b/models/silver/core/silver__receipts.sql @@ -1,4 +1,4 @@ --- depends_on: {{ ref('bronze__streamline_tx_receipts') }} +-- depends_on: {{ ref('bronze__receipts') }} {{ config( materialized = 'incremental', incremental_strategy = 'delete+insert', @@ -14,7 +14,7 @@ WITH base AS ( _inserted_timestamp FROM {% if is_incremental() %} -{{ ref('bronze__streamline_tx_receipts') }} +{{ ref('bronze__receipts') }} WHERE _inserted_timestamp >= ( SELECT @@ -26,7 +26,7 @@ WHERE DATA ) {% else %} - {{ ref('bronze__streamline_FR_tx_receipts') }} + {{ ref('bronze__receipts_fr') }} WHERE IS_OBJECT( DATA diff --git a/models/silver/core/silver__traces.sql b/models/silver/core/silver__traces.sql index ebfa448..692d820 100644 --- a/models/silver/core/silver__traces.sql +++ b/models/silver/core/silver__traces.sql @@ -1,4 +1,4 @@ --- depends_on: {{ ref('bronze__streamline_traces') }} +-- depends_on: {{ ref('bronze__traces') }} {{ config ( materialized = "incremental", incremental_strategy = 'delete+insert', @@ -17,7 +17,7 @@ WITH bronze_traces AS ( FROM {% if is_incremental() %} -{{ ref('bronze__streamline_traces') }} +{{ ref('bronze__traces') }} WHERE _inserted_timestamp >= ( SELECT @@ -26,7 +26,7 @@ WHERE {{ this }} ) {% else %} - {{ ref('bronze__streamline_FR_traces') }} + {{ ref('bronze__traces_fr') }} {% endif %} qualify(ROW_NUMBER() over (PARTITION BY block_number, tx_hash diff --git a/models/silver/core/silver__transactions.sql b/models/silver/core/silver__transactions.sql index 7809180..972c06a 100644 --- a/models/silver/core/silver__transactions.sql +++ b/models/silver/core/silver__transactions.sql @@ -1,4 +1,4 @@ --- depends_on: {{ ref('bronze__streamline_transactions') }} +-- depends_on: {{ ref('bronze__transactions') }} {{ config( materialized = 'incremental', incremental_strategy = 'delete+insert', @@ -16,7 +16,7 @@ WITH base AS ( FROM {% if is_incremental() %} -{{ ref('bronze__streamline_transactions') }} +{{ ref('bronze__transactions') }} WHERE _inserted_timestamp >= ( SELECT @@ -26,7 +26,7 @@ WHERE ) AND IS_OBJECT(DATA) {% else %} - {{ ref('bronze__streamline_FR_transactions') }} + {{ ref('bronze__transactions_fr') }} WHERE IS_OBJECT(DATA) {% endif %} diff --git a/models/silver/streamline/_block_lookback.sql b/models/silver/streamline/_block_lookback.sql new file mode 100644 index 0000000..3e4ab21 --- /dev/null +++ b/models/silver/streamline/_block_lookback.sql @@ -0,0 +1,11 @@ +{{ config( + materialized = 'ephemeral' +) }} + +SELECT + COALESCE(MIN(block_number), 0) AS block_number +FROM + {{ ref("core__fact_blocks") }} +WHERE + block_timestamp >= DATEADD('hour', -72, TRUNCATE(SYSDATE(), 'HOUR')) + AND block_timestamp < DATEADD('hour', -71, TRUNCATE(SYSDATE(), 'HOUR')) \ No newline at end of file diff --git a/models/silver/streamline/complete/streamline__complete_blocks.sql b/models/silver/streamline/complete/streamline__complete_blocks.sql index 4910a03..31cc0c4 100644 --- a/models/silver/streamline/complete/streamline__complete_blocks.sql +++ b/models/silver/streamline/complete/streamline__complete_blocks.sql @@ -1,4 +1,4 @@ --- depends_on: {{ ref('bronze__streamline_blocks') }} +-- depends_on: {{ ref('bronze__blocks') }} {{ config ( materialized = "incremental", unique_key = "id", @@ -14,7 +14,7 @@ SELECT FROM {% if is_incremental() %} -{{ ref('bronze__streamline_blocks') }} +{{ ref('bronze__blocks') }} WHERE _inserted_timestamp >= ( SELECT @@ -24,7 +24,7 @@ WHERE ) AND DATA != [] {% else %} - {{ ref('bronze__streamline_FR_blocks') }} + {{ ref('bronze__blocks_fr') }} WHERE DATA != [] {% endif %} diff --git a/models/silver/streamline/complete/streamline__complete_blocks_transactions.sql b/models/silver/streamline/complete/streamline__complete_blocks_transactions.sql deleted file mode 100644 index 9d98ef0..0000000 --- a/models/silver/streamline/complete/streamline__complete_blocks_transactions.sql +++ /dev/null @@ -1,30 +0,0 @@ --- depends_on: {{ ref('bronze__streamline_blocks_transactions') }} -{{ config ( - materialized = "incremental", - unique_key = "id", - cluster_by = "ROUND(block_number, -3)", - post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(id)" -) }} - -SELECT - id, - block_number, - _inserted_timestamp -FROM - -{% if is_incremental() %} -{{ ref('bronze__streamline_blocks_transactions') }} -WHERE - _inserted_timestamp >= ( - SELECT - MAX(_inserted_timestamp) _inserted_timestamp - FROM - {{ this }} - ) -{% else %} - {{ ref('bronze__streamline_FR_blocks_transactions') }} -{% endif %} - -qualify(ROW_NUMBER() over (PARTITION BY id -ORDER BY - _inserted_timestamp DESC)) = 1 diff --git a/models/silver/streamline/complete/streamline__complete_traces.sql b/models/silver/streamline/complete/streamline__complete_traces.sql index 8348091..b1f87ea 100644 --- a/models/silver/streamline/complete/streamline__complete_traces.sql +++ b/models/silver/streamline/complete/streamline__complete_traces.sql @@ -1,4 +1,4 @@ --- depends_on: {{ ref('bronze__streamline_traces') }} +-- depends_on: {{ ref('bronze__traces') }} {{ config ( materialized = "incremental", unique_key = "id", @@ -14,7 +14,7 @@ SELECT FROM {% if is_incremental() %} -{{ ref('bronze__streamline_traces') }} +{{ ref('bronze__traces') }} WHERE _inserted_timestamp >= ( @@ -24,7 +24,7 @@ WHERE {{ this }} ) {% else %} - {{ ref('bronze__streamline_FR_traces') }} + {{ ref('bronze__traces_fr') }} {% endif %} qualify(ROW_NUMBER() over (PARTITION BY id diff --git a/models/silver/streamline/complete/streamline__complete_transactions.sql b/models/silver/streamline/complete/streamline__complete_transactions.sql index 6cdbfc5..05b8770 100644 --- a/models/silver/streamline/complete/streamline__complete_transactions.sql +++ b/models/silver/streamline/complete/streamline__complete_transactions.sql @@ -1,4 +1,4 @@ --- depends_on: {{ ref('bronze__streamline_transactions') }} +-- depends_on: {{ ref('bronze__transactions') }} {{ config ( materialized = "incremental", unique_key = "id", @@ -14,7 +14,7 @@ SELECT FROM {% if is_incremental() %} -{{ ref('bronze__streamline_transactions') }} +{{ ref('bronze__transactions') }} WHERE _inserted_timestamp >= ( SELECT @@ -23,7 +23,7 @@ WHERE {{ this }} ) {% else %} - {{ ref('bronze__streamline_FR_transactions') }} + {{ ref('bronze__transactions_fr') }} {% endif %} qualify(ROW_NUMBER() over (PARTITION BY id diff --git a/models/silver/streamline/complete/streamline__complete_tx_receipts.sql b/models/silver/streamline/complete/streamline__complete_tx_receipts.sql index 59631da..0b63985 100644 --- a/models/silver/streamline/complete/streamline__complete_tx_receipts.sql +++ b/models/silver/streamline/complete/streamline__complete_tx_receipts.sql @@ -1,4 +1,4 @@ --- depends_on: {{ ref('bronze__streamline_tx_receipts') }} +-- depends_on: {{ ref('bronze__receipts') }} {{ config ( materialized = "incremental", unique_key = "id", @@ -12,7 +12,7 @@ SELECT FROM {% if is_incremental() %} -{{ ref('bronze__streamline_tx_receipts') }} +{{ ref('bronze__receipts') }} WHERE _inserted_timestamp >= ( SELECT @@ -22,7 +22,7 @@ WHERE ) AND tx_hash IS NOT NULL {% else %} - {{ ref('bronze__streamline_FR_tx_receipts') }} + {{ ref('bronze__receipts_fr') }} WHERE tx_hash IS NOT NULL {% endif %} diff --git a/models/silver/streamline/realtime/streamline__blocks_realtime.sql b/models/silver/streamline/realtime/streamline__blocks_realtime.sql deleted file mode 100644 index e7c68bd..0000000 --- a/models/silver/streamline/realtime/streamline__blocks_realtime.sql +++ /dev/null @@ -1,64 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = if_data_call_function( - func = "{{this.schema}}.udf_json_rpc(object_construct('sql_source', '{{this.identifier}}', 'external_table','blocks', 'producer_batch_size',10000, 'producer_limit_size',2000000, 'worker_batch_size',100))", - target = "{{this.schema}}.{{this.identifier}}" - ) -) }} - -WITH last_3_days AS ({% if var('STREAMLINE_RUN_HISTORY') %} - - SELECT - 0 AS block_number - {% else %} - SELECT - MAX(block_number) - 500000 AS block_number --aprox 3 days - FROM - {{ ref("streamline__blocks") }} - {% endif %}), - tbl AS ( - SELECT - block_number, - block_number_hex - FROM - {{ ref("streamline__blocks") }} - WHERE - ( - block_number >= ( - SELECT - block_number - FROM - last_3_days - ) - ) - AND block_number IS NOT NULL - EXCEPT - SELECT - block_number, - REPLACE( - concat_ws('', '0x', to_char(block_number, 'XXXXXXXX')), - ' ', - '' - ) AS block_number_hex - FROM - {{ ref("streamline__complete_blocks") }} - WHERE - ( - block_number >= ( - SELECT - block_number - FROM - last_3_days - ) - ) - AND block_number IS NOT NULL - ) -SELECT - block_number, - 'eth_getBlockByNumber' AS method, - CONCAT( - block_number_hex, - '_-_', - 'false' - ) AS params -FROM tbl \ No newline at end of file diff --git a/models/silver/streamline/realtime/streamline__blocks_transactions_realtime.sql b/models/silver/streamline/realtime/streamline__blocks_transactions_realtime.sql new file mode 100644 index 0000000..8ceb652 --- /dev/null +++ b/models/silver/streamline/realtime/streamline__blocks_transactions_realtime.sql @@ -0,0 +1,33 @@ +{{ config ( + materialized = "view", + post_hook = fsc_utils.if_data_call_function_v2( + func = 'streamline.udf_bulk_rest_api_v2', + target = "{{this.schema}}.{{this.identifier}}", + params ={ "external_table" :'blocks_transactions', + "sql_limit" :"30000", + "producer_batch_size" :"30000", + "worker_batch_size" :"10000", + "sql_source" :'{{this.identifier}}', + "exploded_key": tojson(['result', 'result.transactions']) } + ), + tags = ['streamline_core_evm_realtime'] +) }} + +SELECT + 150815373 as block_number, + ROUND(block_number, -3) AS partition_key, + live.udf_api( + 'POST', + '{URL}', + OBJECT_CONSTRUCT( + 'Content-Type', 'application/json', + 'fsc-quantum-state', 'streamline' + ), + OBJECT_CONSTRUCT( + 'id', block_number, + 'jsonrpc', '2.0', + 'method', 'eth_getBlockByNumber', + 'params', ARRAY_CONSTRUCT(utils.udf_int_to_hex(block_number), TRUE) + ), + 'Vault/prod/evm/aurora/mainnet' + ) AS request \ No newline at end of file diff --git a/models/silver/streamline/realtime/streamline__receipts_by_hash_realtime.sql b/models/silver/streamline/realtime/streamline__receipts_by_hash_realtime.sql new file mode 100644 index 0000000..b92808c --- /dev/null +++ b/models/silver/streamline/realtime/streamline__receipts_by_hash_realtime.sql @@ -0,0 +1,36 @@ +{{ config ( + materialized = "view", + post_hook = fsc_utils.if_data_call_function_v2( + func = 'streamline.udf_bulk_rest_api_v2', + target = "{{this.schema}}.{{this.identifier}}", + params ={ "external_table" :'receipts_by_hash', + "sql_limit" :"30000", + "producer_batch_size" :"30000", + "worker_batch_size" :"10000", + "sql_source" :'{{this.identifier}}' } + ), + tags = ['streamline','core','realtime','phase_1'] +) }} + +SELECT + 150949168 as block_number, + '0x967586085f70584ab5e5886ba9c7d1b2f227f554ed594a491978e4a2110bfdd7' as tx_hash, + ROUND( + block_number, + -3 + ) AS partition_key, + live.udf_api( + 'POST', + '{URL}', + OBJECT_CONSTRUCT( + 'Content-Type', 'application/json', + 'fsc-quantum-state', 'streamline' + ), + OBJECT_CONSTRUCT( + 'id', block_number, + 'jsonrpc', '2.0', + 'method', 'eth_getTransactionReceipt', + 'params', ARRAY_CONSTRUCT(tx_hash) + ), + 'Vault/prod/evm/aurora/mainnet' + ) AS request \ No newline at end of file diff --git a/models/silver/streamline/realtime/streamline__traces_realtime.sql b/models/silver/streamline/realtime/streamline__traces_realtime.sql new file mode 100644 index 0000000..ba4de5a --- /dev/null +++ b/models/silver/streamline/realtime/streamline__traces_realtime.sql @@ -0,0 +1,36 @@ +{{ config ( + materialized = "view", + post_hook = fsc_utils.if_data_call_function_v2( + func = 'streamline.udf_bulk_rest_api_v2', + target = "{{this.schema}}.{{this.identifier}}", + params ={ + "external_table" :'traces', + "sql_limit" :"30000", + "producer_batch_size" :"30000", + "worker_batch_size" :"10000", + "sql_source" :'{{this.identifier}}', + "exploded_key": tojson(['result']) + } + ), + tags = ['streamline_core_evm_realtime'] +) }} + +SELECT + 150949168 as block_number, + '0x967586085f70584ab5e5886ba9c7d1b2f227f554ed594a491978e4a2110bfdd7' as tx_hash, + ROUND(block_number, -3) AS partition_key, + live.udf_api( + 'POST', + '{URL}', + OBJECT_CONSTRUCT( + 'Content-Type', 'application/json', + 'fsc-quantum-state', 'streamline' + ), + OBJECT_CONSTRUCT( + 'id', block_number, + 'jsonrpc', '2.0', + 'method', 'debug_traceTransaction', + 'params', ARRAY_CONSTRUCT(tx_hash, OBJECT_CONSTRUCT('tracer', 'callTracer', 'timeout', '120s')) + ), + 'Vault/prod/evm/aurora/mainnet' + ) AS request \ No newline at end of file diff --git a/models/silver/streamline/realtime/streamline__transactions_realtime.sql b/models/silver/streamline/realtime/streamline__transactions_realtime.sql deleted file mode 100644 index 09cd7cd..0000000 --- a/models/silver/streamline/realtime/streamline__transactions_realtime.sql +++ /dev/null @@ -1,64 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = if_data_call_function( - func = "{{this.schema}}.udf_json_rpc(object_construct('sql_source', '{{this.identifier}}', 'external_table', 'transactions', 'exploded_key','[\"result\", \"transactions\"]', 'producer_batch_size',10000, 'producer_limit_size',2000000, 'worker_batch_size',100))", - target = "{{this.schema}}.{{this.identifier}}" - ) -) }} - -WITH last_3_days AS ({% if var('STREAMLINE_RUN_HISTORY') %} - - SELECT - 0 AS block_number - {% else %} - SELECT - MAX(block_number) - 500000 AS block_number --aprox 3 days - FROM - {{ ref("streamline__blocks") }} - {% endif %}), - tbl AS ( - SELECT - block_number, - block_number_hex - FROM - {{ ref("streamline__blocks") }} - WHERE - ( - block_number >= ( - SELECT - block_number - FROM - last_3_days - ) - ) - AND block_number IS NOT NULL - EXCEPT - SELECT - block_number, - REPLACE( - concat_ws('', '0x', to_char(block_number, 'XXXXXXXX')), - ' ', - '' - ) AS block_number_hex - FROM - {{ ref("streamline__complete_blocks_transactions") }} - WHERE - ( - block_number >= ( - SELECT - block_number - FROM - last_3_days - ) - ) - AND block_number IS NOT NULL - ) -SELECT - block_number, - 'eth_getBlockByNumber' AS method, - CONCAT( - block_number_hex, - '_-_', - 'true' - ) AS params -FROM tbl \ No newline at end of file diff --git a/models/silver/streamline/realtime/streamline__tx_receipts_realtime.sql b/models/silver/streamline/realtime/streamline__tx_receipts_realtime.sql deleted file mode 100644 index 906a62a..0000000 --- a/models/silver/streamline/realtime/streamline__tx_receipts_realtime.sql +++ /dev/null @@ -1,48 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = if_data_call_function( - func = "{{this.schema}}.udf_json_rpc(object_construct('sql_source', '{{this.identifier}}', 'external_table', 'tx_receipts', 'producer_batch_size',10000, 'producer_limit_size',2000000, 'worker_batch_size',100))", - target = "{{this.schema}}.{{this.identifier}}" - ) -) }} - -WITH last_3_days AS ({% if var('STREAMLINE_RUN_HISTORY') %} - - SELECT - 0 AS block_number - {% else %} - SELECT - MAX(block_number) - 500000 AS block_number -- aprox 3 days - FROM - {{ ref("streamline__complete_transactions") }} - {% endif %}), - tbl AS ( - SELECT - block_number, - tx_hash - FROM - {{ ref("streamline__complete_transactions") }} - WHERE - ( - block_number >= ( - SELECT - block_number - FROM - last_3_days - ) - ) - AND block_number IS NOT NULL - AND tx_hash IS NOT NULL - AND tx_hash NOT IN ( - SELECT - tx_hash - FROM - {{ ref("streamline__complete_tx_receipts") }} - ) - ) -SELECT - block_number, - 'eth_getTransactionReceipt' AS method, - tx_hash AS params -FROM - tbl diff --git a/models/silver/streamline/streamline__blocks.sql b/models/silver/streamline/streamline__blocks.sql index 289babd..2445bdd 100644 --- a/models/silver/streamline/streamline__blocks.sql +++ b/models/silver/streamline/streamline__blocks.sql @@ -1,26 +1,24 @@ {{ config ( materialized = "view", - tags = ['streamline_view'] + tags = ['streamline','core','chainhead','phase_1'] ) }} -{% if execute %} - {% set height = run_query('SELECT streamline.udf_get_chainhead()') %} - {% set block_height = height.columns [0].values() [0] %} -{% else %} - {% set block_height = 0 %} -{% endif %} - SELECT - height AS block_number, - REPLACE( - concat_ws('', '0x', to_char(height, 'XXXXXXXX')), - ' ', - '' - ) AS block_number_hex + _id, + ( + (6000 / 60) * 5 + ) :: INT AS block_number_delay, --minute-based block delay + (_id - block_number_delay) :: INT AS block_number, + utils.udf_int_to_hex(block_number) AS block_number_hex FROM - TABLE( - streamline.udtf_get_base_table( - {{ block_height }} - - 800 - ) - ) -- avoid the missing blocks at the tips of the chainhead, around 1 hour + {{ ref('admin__number_sequence') }} +WHERE + _id <= ( + SELECT + COALESCE( + block_number, + 0 + ) + FROM + {{ ref("streamline__get_chainhead") }} + ) \ No newline at end of file diff --git a/models/silver/streamline/streamline__get_chainhead.sql b/models/silver/streamline/streamline__get_chainhead.sql new file mode 100644 index 0000000..3965979 --- /dev/null +++ b/models/silver/streamline/streamline__get_chainhead.sql @@ -0,0 +1,28 @@ +{{ config ( + materialized = 'table', + tags = ['streamline','core','chainhead','phase_1'] +) }} + +SELECT + live.udf_api( + 'POST', + '{URL}', + OBJECT_CONSTRUCT( + 'Content-Type', 'application/json', + 'fsc-quantum-state', 'livequery' + ), + OBJECT_CONSTRUCT( + 'id', + 0, + 'jsonrpc', + '2.0', + 'method', + 'eth_blockNumber', + 'params', + [] + ), + 'Vault/prod/evm/aurora/mainnet' + ) AS resp, + utils.udf_hex_to_int( + resp :data :result :: STRING + ) AS block_number \ No newline at end of file diff --git a/models/sources.yml b/models/sources.yml index e0db1fa..eec5ef6 100644 --- a/models/sources.yml +++ b/models/sources.yml @@ -11,7 +11,6 @@ sources: - name: bronze_streamline database: streamline - schema: | {{ "AURORA_DEV" if var("STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES", False) else "AURORA" }} tables: @@ -19,6 +18,10 @@ sources: - name: transactions - name: tx_receipts - name: traces + - name: blocks_v2 + - name: transactions_v2 + - name: receipts_v2 + - name: traces_v2 - name: silver_crosschain database: "{{ 'crosschain' if target.database == 'FLOW' else 'crosschain_dev' }}" diff --git a/package-lock.yml b/package-lock.yml index 501cbcb..e69015c 100644 --- a/package-lock.yml +++ b/package-lock.yml @@ -1,16 +1,16 @@ packages: - - package: calogica/dbt_expectations - version: 0.8.2 - - package: dbt-labs/dbt_external_tables - version: 0.8.2 - - git: https://github.com/FlipsideCrypto/fsc-utils.git - revision: d3cf679e079f0cf06142de9386f215e55fe26b3b - - package: get-select/dbt_snowflake_query_tags - version: 2.5.0 - - package: calogica/dbt_date - version: 0.7.2 - - package: dbt-labs/dbt_utils - version: 1.0.0 - - git: https://github.com/FlipsideCrypto/livequery-models.git - revision: b024188be4e9c6bc00ed77797ebdc92d351d620e -sha1_hash: 43a34a68dfdb6b0a3c91dd72cdb7a72243b37630 +- package: calogica/dbt_expectations + version: 0.8.2 +- package: dbt-labs/dbt_external_tables + version: 0.8.2 +- git: https://github.com/FlipsideCrypto/fsc-utils.git + revision: d5a43b13ef69c9dd50a86aae5eeddc328789d9f9 +- package: get-select/dbt_snowflake_query_tags + version: 2.5.0 +- package: calogica/dbt_date + version: 0.7.2 +- package: dbt-labs/dbt_utils + version: 1.0.0 +- git: https://github.com/FlipsideCrypto/livequery-models.git + revision: b3d6329d3252b3dae5cf0a5e05044c085f05ea7c +sha1_hash: 434ca8a8ca99de424199f19ef86a3fcacb0e82ec diff --git a/packages.yml b/packages.yml index 941e110..9478a75 100644 --- a/packages.yml +++ b/packages.yml @@ -4,6 +4,6 @@ packages: - package: dbt-labs/dbt_external_tables version: 0.8.2 - git: https://github.com/FlipsideCrypto/fsc-utils.git - revision: v1.32.0 + revision: v1.37.0 - package: get-select/dbt_snowflake_query_tags version: [">=2.0.0", "<3.0.0"]