diff --git a/models/bronze/bronze_testnet__FR_confirm_blocks.sql b/models/bronze/bronze_testnet__FR_confirm_blocks.sql new file mode 100644 index 0000000..c5ac3be --- /dev/null +++ b/models/bronze/bronze_testnet__FR_confirm_blocks.sql @@ -0,0 +1,7 @@ +{{ config ( + materialized = 'view' +) }} +{{ fsc_utils.streamline_external_table_FR_query_v2( + model = "testnet_confirm_blocks", + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )" +) }} \ No newline at end of file diff --git a/models/bronze/bronze_testnet__confirm_blocks.sql b/models/bronze/bronze_testnet__confirm_blocks.sql new file mode 100644 index 0000000..a137253 --- /dev/null +++ b/models/bronze/bronze_testnet__confirm_blocks.sql @@ -0,0 +1,7 @@ +{{ config ( + materialized = 'view' +) }} +{{ fsc_utils.streamline_external_table_query_v2( + model = "testnet_confirm_blocks", + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )" +) }} diff --git a/models/silver/core/silver_testnet__confirmed_blocks.sql b/models/silver/core/silver_testnet__confirmed_blocks.sql new file mode 100644 index 0000000..a3045c0 --- /dev/null +++ b/models/silver/core/silver_testnet__confirmed_blocks.sql @@ -0,0 +1,56 @@ +-- depends_on: {{ ref('bronze_testnet__confirm_blocks') }} +{{ config( + materialized = 'incremental', + incremental_strategy = 'delete+insert', + unique_key = "block_number", + cluster_by = "round(block_number,-3)", + tags = ['non_realtime'] +) }} + +WITH base AS ( + + SELECT + COALESCE( + VALUE :BLOCK_NUMBER :: INT, + metadata :request :"data" :id :: INT, + PARSE_JSON( + metadata :request :"data" + ) :id :: INT + ) AS block_number, + DATA :result :hash :: STRING AS block_hash, + DATA :result :transactions txs, + _inserted_timestamp + FROM + +{% if is_incremental() %} +{{ ref('bronze_testnet__confirm_blocks') }} +WHERE + _inserted_timestamp >= ( + SELECT + IFNULL( + MAX( + _inserted_timestamp + ), + '1970-01-01' :: TIMESTAMP + ) _inserted_timestamp + FROM + {{ this }} + ) +{% else %} + {{ ref('bronze_testnet__FR_confirm_blocks') }} +{% endif %} + +qualify(ROW_NUMBER() over (PARTITION BY block_number +ORDER BY + _inserted_timestamp DESC)) = 1 +) +SELECT + block_number, + block_hash, + VALUE :: STRING AS tx_hash, + _inserted_timestamp +FROM + base, + LATERAL FLATTEN ( + input => txs + ) diff --git a/models/silver/core/tests/confirmed_blocks/test_silver__confirmed_blocks_full.sql b/models/silver/core/tests/confirmed_blocks/test_silver__confirmed_blocks_full.sql new file mode 100644 index 0000000..2397ca7 --- /dev/null +++ b/models/silver/core/tests/confirmed_blocks/test_silver__confirmed_blocks_full.sql @@ -0,0 +1,9 @@ +{{ config ( + materialized = 'view', + tags = ['full_test'] +) }} + +SELECT + * +FROM + {{ ref('silver_testnet__confirmed_blocks') }} diff --git a/models/silver/core/tests/confirmed_blocks/test_silver__confirmed_blocks_full.yml b/models/silver/core/tests/confirmed_blocks/test_silver__confirmed_blocks_full.yml new file mode 100644 index 0000000..81a9db0 --- /dev/null +++ b/models/silver/core/tests/confirmed_blocks/test_silver__confirmed_blocks_full.yml @@ -0,0 +1,34 @@ +version: 2 +models: + - name: test_silver__confirmed_blocks_full + tests: + - dbt_utils.unique_combination_of_columns: + combination_of_columns: + - TX_HASH + columns: + - name: BLOCK_NUMBER + tests: + - not_null + - dbt_expectations.expect_column_values_to_be_in_type_list: + column_type_list: + - NUMBER + - name: BLOCK_HASH + tests: + - not_null + - dbt_expectations.expect_column_values_to_match_regex: + regex: 0[xX][0-9a-fA-F]+ + - name: TX_HASH + tests: + - not_null + - dbt_expectations.expect_column_values_to_match_regex: + regex: 0[xX][0-9a-fA-F]+ + - name: _INSERTED_TIMESTAMP + tests: + - not_null + - dbt_expectations.expect_row_values_to_have_recent_data: + datepart: day + interval: 1 + - dbt_expectations.expect_column_values_to_be_in_type_list: + column_type_list: + - TIMESTAMP_NTZ + - TIMESTAMP_LTZ \ No newline at end of file diff --git a/models/silver/core/tests/confirmed_blocks/test_silver__confirmed_blocks_recent.sql b/models/silver/core/tests/confirmed_blocks/test_silver__confirmed_blocks_recent.sql new file mode 100644 index 0000000..3282f5d --- /dev/null +++ b/models/silver/core/tests/confirmed_blocks/test_silver__confirmed_blocks_recent.sql @@ -0,0 +1,23 @@ +{{ config ( + materialized = 'view', + tags = ['recent_test'] +) }} + +WITH last_3_days AS ( + + SELECT + block_number + FROM + {{ ref("_block_lookback") }} +) +SELECT + * +FROM + {{ ref('silver_testnet__confirmed_blocks') }} +WHERE + block_number >= ( + SELECT + block_number + FROM + last_3_days + ) diff --git a/models/silver/core/tests/confirmed_blocks/test_silver__confirmed_blocks_recent.yml b/models/silver/core/tests/confirmed_blocks/test_silver__confirmed_blocks_recent.yml new file mode 100644 index 0000000..460ddf1 --- /dev/null +++ b/models/silver/core/tests/confirmed_blocks/test_silver__confirmed_blocks_recent.yml @@ -0,0 +1,34 @@ +version: 2 +models: + - name: test_silver__confirmed_blocks_recent + tests: + - dbt_utils.unique_combination_of_columns: + combination_of_columns: + - TX_HASH + columns: + - name: BLOCK_NUMBER + tests: + - not_null + - dbt_expectations.expect_column_values_to_be_in_type_list: + column_type_list: + - NUMBER + - name: BLOCK_HASH + tests: + - not_null + - dbt_expectations.expect_column_values_to_match_regex: + regex: 0[xX][0-9a-fA-F]+ + - name: TX_HASH + tests: + - not_null + - dbt_expectations.expect_column_values_to_match_regex: + regex: 0[xX][0-9a-fA-F]+ + - name: _INSERTED_TIMESTAMP + tests: + - not_null + - dbt_expectations.expect_row_values_to_have_recent_data: + datepart: hour + interval: 3 + - dbt_expectations.expect_column_values_to_be_in_type_list: + column_type_list: + - TIMESTAMP_NTZ + - TIMESTAMP_LTZ \ No newline at end of file diff --git a/models/streamline/_max_block_by_hour.sql b/models/streamline/_max_block_by_hour.sql new file mode 100644 index 0000000..368e8fe --- /dev/null +++ b/models/streamline/_max_block_by_hour.sql @@ -0,0 +1,37 @@ +{{ config ( + materialized = "ephemeral" +) }} + +WITH base AS ( + + SELECT + DATE_TRUNC( + 'hour', + block_timestamp + ) AS block_hour, + MAX(block_number) block_number + FROM + {{ ref("silver_testnet__blocks") }} + WHERE + block_timestamp > DATEADD( + 'day', + -5, + CURRENT_DATE + ) + GROUP BY + 1 +) +SELECT + block_hour, + block_number +FROM + base +WHERE + block_hour <> ( + SELECT + MAX( + block_hour + ) + FROM + base + ) diff --git a/models/streamline/core/complete/streamline__testnet_confirmed_blocks_complete.sql b/models/streamline/core/complete/streamline__testnet_confirmed_blocks_complete.sql new file mode 100644 index 0000000..b5a3119 --- /dev/null +++ b/models/streamline/core/complete/streamline__testnet_confirmed_blocks_complete.sql @@ -0,0 +1,40 @@ +-- depends_on: {{ ref('bronze__streamline_confirm_blocks') }} +{{ config ( + materialized = "incremental", + unique_key = "block_number", + cluster_by = "ROUND(block_number, -3)", + tags = ['streamline_testnet_core_complete'] +) }} + +SELECT + COALESCE( + VALUE :BLOCK_NUMBER :: INT, + metadata :request :"data" :id :: INT, + PARSE_JSON( + metadata :request :"data" + ) :id :: INT + ) AS block_number, + {{ dbt_utils.generate_surrogate_key( + ['block_number'] + ) }} AS complete_confirmed_blocks_id, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp, + _inserted_timestamp, + '{{ invocation_id }}' AS _invocation_id +FROM + +{% if is_incremental() %} +{{ ref('bronze_testnet__confirm_blocks') }} +WHERE + _inserted_timestamp >= ( + SELECT + COALESCE(MAX(_inserted_timestamp), '1970-01-01' :: TIMESTAMP) _inserted_timestamp + FROM + {{ this }}) + {% else %} + {{ ref('bronze_testnet__FR_confirm_blocks') }} + {% endif %} + + qualify(ROW_NUMBER() over (PARTITION BY block_number + ORDER BY + _inserted_timestamp DESC)) = 1 diff --git a/models/streamline/core/realtime/streamline__testnet_confirm_blocks_realtime.sql b/models/streamline/core/realtime/streamline__testnet_confirm_blocks_realtime.sql new file mode 100644 index 0000000..0f7794b --- /dev/null +++ b/models/streamline/core/realtime/streamline__testnet_confirm_blocks_realtime.sql @@ -0,0 +1,82 @@ +{{ config ( + materialized = "view", + post_hook = fsc_utils.if_data_call_function_v2( + func = 'streamline.udf_bulk_rest_api_v2', + target = "{{this.schema}}.{{this.identifier}}", + params ={ "external_table" :"testnet_confirm_blocks", + "sql_limit" :"20000", + "producer_batch_size" :"5000", + "worker_batch_size" :"2000", + "sql_source" :"{{this.identifier}}" } + ), + tags = ['confirm_blocks_temp'] +) }} + +WITH look_back AS ( + + SELECT + block_number + FROM + {{ ref("_max_block_by_hour") }} + qualify ROW_NUMBER() over ( + ORDER BY + block_number DESC + ) = 6 +), +tbl AS ( + SELECT + block_number + FROM + {{ ref("streamline__testnet_blocks") }} + WHERE + block_number IS NOT NULL + AND block_number <= ( + SELECT + block_number + FROM + look_back + ) + EXCEPT + SELECT + block_number + FROM + {{ ref("streamline__complete_confirmed_blocks") }} + WHERE + block_number IS NOT NULL + AND block_number <= ( + SELECT + block_number + FROM + look_back + ) +) +SELECT + block_number, + ROUND( + block_number, + -3 + ) AS partition_key, + {{ target.database }}.live.udf_api( + 'POST', + '{service}/{Authentication}', + OBJECT_CONSTRUCT( + 'Content-Type', + 'application/json' + ), + OBJECT_CONSTRUCT( + 'id', + block_number, + 'jsonrpc', + '2.0', + 'method', + 'eth_getBlockByNumber', + 'params', + ARRAY_CONSTRUCT(utils.udf_int_to_hex(block_number), FALSE)), + 'Vault/prod/berachain/internal/archive' + ) AS request + FROM + tbl + ORDER BY + block_number ASC + LIMIT + 20000 diff --git a/models/streamline/core/realtime/streamline__testnet_receipts_realtime.sql b/models/streamline/core/realtime/streamline__testnet_receipts_realtime.sql index 436cff7..986ed17 100644 --- a/models/streamline/core/realtime/streamline__testnet_receipts_realtime.sql +++ b/models/streamline/core/realtime/streamline__testnet_receipts_realtime.sql @@ -42,6 +42,11 @@ ready_blocks AS ( block_number FROM {{ ref("_missing_txs") }} + UNION + SELECT + block_number + FROM + {{ ref("_unconfirmed_blocks") }} ) SELECT block_number, diff --git a/models/streamline/core/realtime/streamline__testnet_traces_realtime.sql b/models/streamline/core/realtime/streamline__testnet_traces_realtime.sql index 6830e09..e023f1e 100644 --- a/models/streamline/core/realtime/streamline__testnet_traces_realtime.sql +++ b/models/streamline/core/realtime/streamline__testnet_traces_realtime.sql @@ -37,6 +37,11 @@ ready_blocks AS ( block_number FROM {{ ref("_missing_traces") }} + UNION + SELECT + block_number + FROM + {{ ref("_unconfirmed_blocks") }} ) SELECT block_number, diff --git a/models/streamline/core/realtime/streamline__testnet_transactions_realtime.sql b/models/streamline/core/realtime/streamline__testnet_transactions_realtime.sql index f70b9f1..8ee6be3 100644 --- a/models/streamline/core/realtime/streamline__testnet_transactions_realtime.sql +++ b/models/streamline/core/realtime/streamline__testnet_transactions_realtime.sql @@ -37,6 +37,11 @@ ready_blocks AS ( block_number FROM {{ ref("_missing_txs") }} + UNION + SELECT + block_number + FROM + {{ ref("_unconfirmed_blocks") }} ) SELECT block_number, diff --git a/models/streamline/core/retry/_unconfirmed_blocks.sql b/models/streamline/core/retry/_unconfirmed_blocks.sql new file mode 100644 index 0000000..7dc55f8 --- /dev/null +++ b/models/streamline/core/retry/_unconfirmed_blocks.sql @@ -0,0 +1,34 @@ +{{ config ( + materialized = "ephemeral" +) }} + +WITH lookback AS ( + + SELECT + block_number + FROM + {{ ref("_block_lookback") }} +) +SELECT + DISTINCT cb.block_number AS block_number +FROM + {{ ref("silver_testnet__confirmed_blocks") }} + cb + LEFT JOIN {{ ref("silver_testnet__transactions") }} + txs USING ( + block_number, + block_hash, + tx_hash + ) +WHERE + txs.tx_hash IS NULL + AND cb.block_number >= ( + SELECT + block_number + FROM + lookback + ) + AND cb._inserted_timestamp >= DATEADD('hour', -84, SYSDATE()) + AND ( + txs._inserted_timestamp >= DATEADD('hour', -84, SYSDATE()) + OR txs._inserted_timestamp IS NULL) diff --git a/tests/berachain/test_silver__confirmed_blocks.sql b/tests/berachain/test_silver__confirmed_blocks.sql new file mode 100644 index 0000000..8cbac8b --- /dev/null +++ b/tests/berachain/test_silver__confirmed_blocks.sql @@ -0,0 +1 @@ +{{ missing_confirmed_txs(ref("test_silver__confirmed_blocks_full"), ref("test_silver__transactions_full")) }} diff --git a/tests/berachain/test_silver__recent_confirmed_blocks.sql b/tests/berachain/test_silver__recent_confirmed_blocks.sql new file mode 100644 index 0000000..79cdd66 --- /dev/null +++ b/tests/berachain/test_silver__recent_confirmed_blocks.sql @@ -0,0 +1 @@ +{{ missing_confirmed_txs(ref("test_silver__confirmed_blocks_recent"), ref("test_silver__transactions_recent")) }}