diff --git a/models/sources.yml b/models/sources.yml index 8d38ff0..e4c72e9 100644 --- a/models/sources.yml +++ b/models/sources.yml @@ -17,6 +17,8 @@ sources: - name: testnet_traces - name: testnet_confirm_blocks - name: testnet_decoded_logs + - name: testnet_blocks_v2 + - name: testnet_transactions_v2 - name: crosschain_silver database: "{{ 'crosschain' if target.database.upper() == var('GLOBAL_PROD_DB_NAME').upper() else 'crosschain_dev' }}" schema: silver diff --git a/models/testnet/core/bronze/streamline/bronze_testnet__blocks.sql b/models/testnet/core/bronze/streamline/bronze_testnet__blocks.sql index ac61e42..a5236ab 100644 --- a/models/testnet/core/bronze/streamline/bronze_testnet__blocks.sql +++ b/models/testnet/core/bronze/streamline/bronze_testnet__blocks.sql @@ -29,7 +29,7 @@ SELECT FROM {{ source( "bronze_streamline", - "testnet_blocks" + "testnet_blocks_v2" ) }} s JOIN meta b diff --git a/models/testnet/core/bronze/streamline/bronze_testnet__blocks_fr.sql b/models/testnet/core/bronze/streamline/bronze_testnet__blocks_fr.sql index 48faf7a..a28511f 100644 --- a/models/testnet/core/bronze/streamline/bronze_testnet__blocks_fr.sql +++ b/models/testnet/core/bronze/streamline/bronze_testnet__blocks_fr.sql @@ -3,40 +3,12 @@ tags = ['bronze_core'] ) }} -WITH meta AS ( - SELECT - registered_on AS _inserted_timestamp, - file_name, - CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER) AS partition_key - FROM - TABLE( - information_schema.external_table_files( - table_name => '{{ source( "bronze_streamline", "testnet_blocks") }}' - ) - ) A -) SELECT - s.*, - b.file_name, - b._inserted_timestamp, - COALESCE( - s.value :"BLOCK_NUMBER" :: STRING, - s.value :"block_number" :: STRING, - s.metadata :request :"data" :id :: STRING, - PARSE_JSON( - s.metadata :request :"data" - ) :id :: STRING - ) :: INT AS block_number + * FROM - {{ source( - "bronze_streamline", - "testnet_blocks" - ) }} - s - JOIN meta b - ON b.file_name = metadata$filename - AND b.partition_key = s.partition_key -WHERE - b.partition_key = s.partition_key - AND DATA :error IS NULL - AND DATA IS NOT NULL \ No newline at end of file + {{ ref('bronze_testnet__blocks_fr_v2') }} +UNION ALL +SELECT + * +FROM + {{ ref('bronze_testnet__blocks_fr_v1') }} diff --git a/models/testnet/core/bronze/streamline/bronze_testnet__blocks_fr_v1.sql b/models/testnet/core/bronze/streamline/bronze_testnet__blocks_fr_v1.sql new file mode 100644 index 0000000..48faf7a --- /dev/null +++ b/models/testnet/core/bronze/streamline/bronze_testnet__blocks_fr_v1.sql @@ -0,0 +1,42 @@ +{{ config ( + materialized = 'view', + tags = ['bronze_core'] +) }} + +WITH meta AS ( + SELECT + registered_on AS _inserted_timestamp, + file_name, + CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER) AS partition_key + FROM + TABLE( + information_schema.external_table_files( + table_name => '{{ source( "bronze_streamline", "testnet_blocks") }}' + ) + ) A +) +SELECT + s.*, + b.file_name, + b._inserted_timestamp, + COALESCE( + s.value :"BLOCK_NUMBER" :: STRING, + s.value :"block_number" :: STRING, + s.metadata :request :"data" :id :: STRING, + PARSE_JSON( + s.metadata :request :"data" + ) :id :: STRING + ) :: INT AS block_number +FROM + {{ source( + "bronze_streamline", + "testnet_blocks" + ) }} + s + JOIN meta b + ON b.file_name = metadata$filename + AND b.partition_key = s.partition_key +WHERE + b.partition_key = s.partition_key + AND DATA :error IS NULL + AND DATA IS NOT NULL \ No newline at end of file diff --git a/models/testnet/core/bronze/streamline/bronze_testnet__blocks_fr_v2.sql b/models/testnet/core/bronze/streamline/bronze_testnet__blocks_fr_v2.sql new file mode 100644 index 0000000..9473e17 --- /dev/null +++ b/models/testnet/core/bronze/streamline/bronze_testnet__blocks_fr_v2.sql @@ -0,0 +1,42 @@ +{{ config ( + materialized = 'view', + tags = ['bronze_core'] +) }} + +WITH meta AS ( + SELECT + registered_on AS _inserted_timestamp, + file_name, + CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER) AS partition_key + FROM + TABLE( + information_schema.external_table_files( + table_name => '{{ source( "bronze_streamline", "testnet_blocks_v2") }}' + ) + ) A +) +SELECT + s.*, + b.file_name, + b._inserted_timestamp, + COALESCE( + s.value :"BLOCK_NUMBER" :: STRING, + s.value :"block_number" :: STRING, + s.metadata :request :"data" :id :: STRING, + PARSE_JSON( + s.metadata :request :"data" + ) :id :: STRING + ) :: INT AS block_number +FROM + {{ source( + "bronze_streamline", + "testnet_blocks_v2" + ) }} + s + JOIN meta b + ON b.file_name = metadata$filename + AND b.partition_key = s.partition_key +WHERE + b.partition_key = s.partition_key + AND DATA :error IS NULL + AND DATA IS NOT NULL \ No newline at end of file diff --git a/models/testnet/core/bronze/streamline/bronze_testnet__transactions.sql b/models/testnet/core/bronze/streamline/bronze_testnet__transactions.sql index 1ae7b59..d995999 100644 --- a/models/testnet/core/bronze/streamline/bronze_testnet__transactions.sql +++ b/models/testnet/core/bronze/streamline/bronze_testnet__transactions.sql @@ -29,7 +29,7 @@ SELECT FROM {{ source( "bronze_streamline", - "testnet_transactions" + "testnet_transactions_v2" ) }} s JOIN meta b diff --git a/models/testnet/core/bronze/streamline/bronze_testnet__transactions_fr.sql b/models/testnet/core/bronze/streamline/bronze_testnet__transactions_fr.sql index 8cfa2f2..e3dae10 100644 --- a/models/testnet/core/bronze/streamline/bronze_testnet__transactions_fr.sql +++ b/models/testnet/core/bronze/streamline/bronze_testnet__transactions_fr.sql @@ -3,40 +3,12 @@ tags = ['bronze_core'] ) }} -WITH meta AS ( - SELECT - registered_on AS _inserted_timestamp, - file_name, - CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER) AS partition_key - FROM - TABLE( - information_schema.external_table_files( - table_name => '{{ source( "bronze_streamline", "testnet_transactions") }}' - ) - ) A -) SELECT - s.*, - b.file_name, - b._inserted_timestamp, - COALESCE( - s.value :"BLOCK_NUMBER" :: STRING, - s.value :"block_number" :: STRING, - s.metadata :request :"data" :id :: STRING, - PARSE_JSON( - s.metadata :request :"data" - ) :id :: STRING - ) :: INT AS block_number + * FROM - {{ source( - "bronze_streamline", - "testnet_transactions" - ) }} - s - JOIN meta b - ON b.file_name = metadata$filename - AND b.partition_key = s.partition_key -WHERE - b.partition_key = s.partition_key - AND DATA :error IS NULL - AND DATA IS NOT NULL \ No newline at end of file + {{ ref('bronze_testnet__transactions_fr_v2') }} +UNION ALL +SELECT + * +FROM + {{ ref('bronze_testnet__transactions_fr_v1') }} diff --git a/models/testnet/core/bronze/streamline/bronze_testnet__transactions_fr_v1.sql b/models/testnet/core/bronze/streamline/bronze_testnet__transactions_fr_v1.sql new file mode 100644 index 0000000..8cfa2f2 --- /dev/null +++ b/models/testnet/core/bronze/streamline/bronze_testnet__transactions_fr_v1.sql @@ -0,0 +1,42 @@ +{{ config ( + materialized = 'view', + tags = ['bronze_core'] +) }} + +WITH meta AS ( + SELECT + registered_on AS _inserted_timestamp, + file_name, + CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER) AS partition_key + FROM + TABLE( + information_schema.external_table_files( + table_name => '{{ source( "bronze_streamline", "testnet_transactions") }}' + ) + ) A +) +SELECT + s.*, + b.file_name, + b._inserted_timestamp, + COALESCE( + s.value :"BLOCK_NUMBER" :: STRING, + s.value :"block_number" :: STRING, + s.metadata :request :"data" :id :: STRING, + PARSE_JSON( + s.metadata :request :"data" + ) :id :: STRING + ) :: INT AS block_number +FROM + {{ source( + "bronze_streamline", + "testnet_transactions" + ) }} + s + JOIN meta b + ON b.file_name = metadata$filename + AND b.partition_key = s.partition_key +WHERE + b.partition_key = s.partition_key + AND DATA :error IS NULL + AND DATA IS NOT NULL \ No newline at end of file diff --git a/models/testnet/core/bronze/streamline/bronze_testnet__transactions_fr_v2.sql b/models/testnet/core/bronze/streamline/bronze_testnet__transactions_fr_v2.sql new file mode 100644 index 0000000..feb76e8 --- /dev/null +++ b/models/testnet/core/bronze/streamline/bronze_testnet__transactions_fr_v2.sql @@ -0,0 +1,42 @@ +{{ config ( + materialized = 'view', + tags = ['bronze_core'] +) }} + +WITH meta AS ( + SELECT + registered_on AS _inserted_timestamp, + file_name, + CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER) AS partition_key + FROM + TABLE( + information_schema.external_table_files( + table_name => '{{ source( "bronze_streamline", "testnet_transactions_v2") }}' + ) + ) A +) +SELECT + s.*, + b.file_name, + b._inserted_timestamp, + COALESCE( + s.value :"BLOCK_NUMBER" :: STRING, + s.value :"block_number" :: STRING, + s.metadata :request :"data" :id :: STRING, + PARSE_JSON( + s.metadata :request :"data" + ) :id :: STRING + ) :: INT AS block_number +FROM + {{ source( + "bronze_streamline", + "testnet_transactions_v2" + ) }} + s + JOIN meta b + ON b.file_name = metadata$filename + AND b.partition_key = s.partition_key +WHERE + b.partition_key = s.partition_key + AND DATA :error IS NULL + AND DATA IS NOT NULL \ No newline at end of file diff --git a/models/testnet/core/streamline/history/streamline__testnet_blocks_history.sql b/models/testnet/core/streamline/history/streamline__testnet_blocks_history.sql new file mode 100644 index 0000000..c5a80f9 --- /dev/null +++ b/models/testnet/core/streamline/history/streamline__testnet_blocks_history.sql @@ -0,0 +1,55 @@ +{% set node_secret_path = var("GLOBAL_NODE_SECRET_PATH") %} + +{{ config ( + materialized = "view", + post_hook = fsc_utils.if_data_call_function_v2( + func = 'streamline.udf_bulk_rest_api_v2', + target = "{{this.schema}}.{{this.identifier}}", + params ={ "external_table" :"testnet_blocks", + "sql_limit" :"2000000", + "producer_batch_size" :"7200", + "worker_batch_size" :"1800", + "sql_source" :"{{this.identifier}}", + "async_concurrent_requests" :"1", + "exploded_key": tojson(["result"]) } + ), + tags = ['streamline_testnet_history'] +) }} + +WITH to_do AS ( + SELECT block_number + FROM {{ ref("streamline__testnet_blocks") }} + EXCEPT + SELECT block_number + FROM {{ ref("streamline__testnet_blocks_complete") }} +), +ready_blocks AS ( + SELECT block_number + FROM to_do + where block_number < (select block_number from {{ ref("_testnet_block_lookback") }}) +) +SELECT + block_number, + ROUND(block_number, -3) AS partition_key, + live.udf_api( + 'POST', + '{Service}/{Authentication}', + OBJECT_CONSTRUCT( + 'Content-Type', 'application/json', + 'fsc-quantum-state', 'streamline' + ), + OBJECT_CONSTRUCT( + 'id', block_number, + 'jsonrpc', '2.0', + 'method', 'eth_getBlockByNumber', + 'params', ARRAY_CONSTRUCT(utils.udf_int_to_hex(block_number), FALSE) + ), + '{{ node_secret_path }}' + ) AS request +FROM + ready_blocks + +ORDER BY block_number desc + +LIMIT + 2000000 \ No newline at end of file diff --git a/models/testnet/core/streamline/history/streamline__testnet_blocks_transactions_history.sql b/models/testnet/core/streamline/history/streamline__testnet_transactions_history.sql similarity index 82% rename from models/testnet/core/streamline/history/streamline__testnet_blocks_transactions_history.sql rename to models/testnet/core/streamline/history/streamline__testnet_transactions_history.sql index bed3b32..8dc2d2c 100644 --- a/models/testnet/core/streamline/history/streamline__testnet_blocks_transactions_history.sql +++ b/models/testnet/core/streamline/history/streamline__testnet_transactions_history.sql @@ -5,13 +5,13 @@ post_hook = fsc_utils.if_data_call_function_v2( func = 'streamline.udf_bulk_rest_api_v2', target = "{{this.schema}}.{{this.identifier}}", - params ={ "external_table" :"testnet_blocks_transactions", + params ={ "external_table" :"testnet_transactions", "sql_limit" :"2000000", "producer_batch_size" :"7200", "worker_batch_size" :"1800", "sql_source" :"{{this.identifier}}", "async_concurrent_requests" :"1", - "exploded_key": tojson(["result", "result.transactions"]) } + "exploded_key": tojson(["result.transactions"]) } ), tags = ['streamline_testnet_history'] ) }} @@ -21,8 +21,7 @@ WITH to_do AS ( FROM {{ ref("streamline__testnet_blocks") }} EXCEPT SELECT block_number - FROM {{ ref("streamline__testnet_blocks_complete") }} b - INNER JOIN {{ ref("streamline__testnet_transactions_complete") }} t USING(block_number) + FROM {{ ref("streamline__testnet_transactions_complete") }} ), ready_blocks AS ( SELECT block_number diff --git a/models/testnet/core/streamline/realtime/streamline__testnet_blocks_realtime.sql b/models/testnet/core/streamline/realtime/streamline__testnet_blocks_realtime.sql new file mode 100644 index 0000000..2bcedd8 --- /dev/null +++ b/models/testnet/core/streamline/realtime/streamline__testnet_blocks_realtime.sql @@ -0,0 +1,61 @@ +{% set node_secret_path = var("GLOBAL_NODE_SECRET_PATH") %} + +{{ config ( + materialized = "view", + post_hook = fsc_utils.if_data_call_function_v2( + func = 'streamline.udf_bulk_rest_api_v2', + target = "{{this.schema}}.{{this.identifier}}", + params ={ "external_table" :"testnet_blocks", + "sql_limit" :"14400", + "producer_batch_size" :"3600", + "worker_batch_size" :"1800", + "sql_source" :"{{this.identifier}}", + "exploded_key": tojson(["result"]) } + ), + tags = ['streamline_testnet_realtime'] +) }} + +WITH last_3_days AS ( + SELECT block_number + FROM {{ ref("_testnet_block_lookback") }} +), +to_do AS ( + SELECT block_number + FROM {{ ref("streamline__testnet_blocks") }} + WHERE block_number IS NOT NULL + AND block_number >= (SELECT block_number FROM last_3_days) + EXCEPT + SELECT block_number + FROM {{ ref("streamline__testnet_blocks_complete") }} + WHERE 1=1 + AND block_number >= (SELECT block_number FROM last_3_days) +), +ready_blocks AS ( + SELECT block_number + FROM to_do +) +SELECT + block_number, + ROUND(block_number, -3) AS partition_key, + live.udf_api( + 'POST', + '{Service}/{Authentication}', + OBJECT_CONSTRUCT( + 'Content-Type', 'application/json', + 'fsc-quantum-state', 'streamline' + ), + OBJECT_CONSTRUCT( + 'id', block_number, + 'jsonrpc', '2.0', + 'method', 'eth_getBlockByNumber', + 'params', ARRAY_CONSTRUCT(utils.udf_int_to_hex(block_number), FALSE) + ), + '{{ node_secret_path }}' + ) AS request +FROM + ready_blocks + +ORDER BY block_number desc + +LIMIT + 14400 \ No newline at end of file diff --git a/models/testnet/core/streamline/realtime/streamline__testnet_receipts_realtime.sql b/models/testnet/core/streamline/realtime/streamline__testnet_receipts_realtime.sql index 2d5a0c2..8ba0aee 100644 --- a/models/testnet/core/streamline/realtime/streamline__testnet_receipts_realtime.sql +++ b/models/testnet/core/streamline/realtime/streamline__testnet_receipts_realtime.sql @@ -58,4 +58,4 @@ FROM ORDER BY block_number desc LIMIT - 7200 \ No newline at end of file + 14400 \ No newline at end of file diff --git a/models/testnet/core/streamline/realtime/streamline__testnet_traces_realtime.sql b/models/testnet/core/streamline/realtime/streamline__testnet_traces_realtime.sql index a1fbd52..2f06785 100644 --- a/models/testnet/core/streamline/realtime/streamline__testnet_traces_realtime.sql +++ b/models/testnet/core/streamline/realtime/streamline__testnet_traces_realtime.sql @@ -58,4 +58,4 @@ FROM ORDER BY block_number desc LIMIT - 7200 \ No newline at end of file + 14400 \ No newline at end of file diff --git a/models/testnet/core/streamline/realtime/streamline__testnet_blocks_transactions_realtime.sql b/models/testnet/core/streamline/realtime/streamline__testnet_transactions_realtime.sql similarity index 83% rename from models/testnet/core/streamline/realtime/streamline__testnet_blocks_transactions_realtime.sql rename to models/testnet/core/streamline/realtime/streamline__testnet_transactions_realtime.sql index 37829c7..547da5a 100644 --- a/models/testnet/core/streamline/realtime/streamline__testnet_blocks_transactions_realtime.sql +++ b/models/testnet/core/streamline/realtime/streamline__testnet_transactions_realtime.sql @@ -5,12 +5,12 @@ post_hook = fsc_utils.if_data_call_function_v2( func = 'streamline.udf_bulk_rest_api_v2', target = "{{this.schema}}.{{this.identifier}}", - params ={ "external_table" :"testnet_blocks_transactions", + params ={ "external_table" :"testnet_transactions", "sql_limit" :"14400", "producer_batch_size" :"3600", "worker_batch_size" :"1800", "sql_source" :"{{this.identifier}}", - "exploded_key": tojson(["result", "result.transactions"]) } + "exploded_key": tojson(["result.transactions"]) } ), tags = ['streamline_testnet_realtime'] ) }} @@ -26,8 +26,7 @@ to_do AS ( AND block_number >= (SELECT block_number FROM last_3_days) EXCEPT SELECT block_number - FROM {{ ref("streamline__testnet_blocks_complete") }} b - INNER JOIN {{ ref("streamline__testnet_transactions_complete") }} t USING(block_number) + FROM {{ ref("streamline__testnet_transactions_complete") }} WHERE 1=1 AND block_number >= (SELECT block_number FROM last_3_days) ), @@ -59,4 +58,4 @@ FROM ORDER BY block_number desc LIMIT - 7200 \ No newline at end of file + 14400 \ No newline at end of file