diff --git a/.github/workflows/dbt_run_full_observability.yml b/.github/workflows/dbt_run_full_observability.yml new file mode 100644 index 0000000..8e2ded7 --- /dev/null +++ b/.github/workflows/dbt_run_full_observability.yml @@ -0,0 +1,47 @@ +name: dbt_run_full_observability +run-name: dbt_run_full_observability + +on: + workflow_dispatch: + schedule: + # Runs “At 08:00 on day-of-month 1.” (see https://crontab.guru) + - cron: '0 8 1 * *' + +env: + DBT_PROFILES_DIR: ./ + + ACCOUNT: "${{ vars.ACCOUNT }}" + ROLE: "${{ vars.ROLE }}" + USER: "${{ vars.USER }}" + PASSWORD: "${{ secrets.PASSWORD }}" + REGION: "${{ vars.REGION }}" + DATABASE: "${{ vars.DATABASE }}" + WAREHOUSE: "${{ vars.WAREHOUSE }}" + SCHEMA: "${{ vars.SCHEMA }}" + +concurrency: + group: ${{ github.workflow }} + +jobs: + run_dbt_jobs: + runs-on: ubuntu-latest + environment: + name: workflow_prod_2xl + + steps: + - uses: actions/checkout@v3 + + - uses: actions/setup-python@v1 + with: + python-version: "3.7.x" + + - name: install dependencies + run: | + pip3 install dbt-snowflake==${{ vars.DBT_VERSION }} cli_passthrough requests click + dbt deps + - name: Run DBT Jobs + run: | + dbt run --threads 2 --vars '{"OBSERV_FULL_TEST":True}' -m models/silver/_observability + + + diff --git a/.github/workflows/dbt_run_incremental.yml b/.github/workflows/dbt_run_incremental.yml index bdda74b..59727de 100644 --- a/.github/workflows/dbt_run_incremental.yml +++ b/.github/workflows/dbt_run_incremental.yml @@ -4,8 +4,8 @@ run-name: dbt_run_scheduled on: workflow_dispatch: schedule: - # Runs every "At minute 15.” (see https://crontab.guru) - - cron: '15 * * * *' + # Runs every "At minute 25.” (see https://crontab.guru) + - cron: '25 * * * *' env: DBT_PROFILES_DIR: ./ @@ -41,5 +41,5 @@ jobs: dbt deps - name: Run DBT Jobs run: | - dbt run --exclude models/silver/abis models/silver/api_udf models/silver/streamline models/silver/silver__decoded_logs.sql models/silver/core/tests models/silver/silver__decoded_logs_legacy.sql + dbt run --exclude models/silver/abis models/silver/api_udf models/silver/streamline models/silver/silver__decoded_logs.sql models/silver/core/tests models/silver/_observability dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True}' -m models/silver/streamline/decoder/streamline__decode_logs_realtime.sql models/silver/streamline/decoder/streamline__complete_decode_logs.sql \ No newline at end of file diff --git a/.github/workflows/dbt_run_streamline_decoder.yml b/.github/workflows/dbt_run_streamline_decoder.yml index c39558f..53b7552 100644 --- a/.github/workflows/dbt_run_streamline_decoder.yml +++ b/.github/workflows/dbt_run_streamline_decoder.yml @@ -4,8 +4,8 @@ run-name: dbt_run_streamline_decoder on: workflow_dispatch: schedule: - # Runs “At minute 35 past every 2nd hour.” (see https://crontab.guru) - - cron: '35 */2 * * *' + # Runs “At minute 40” (see https://crontab.guru) + - cron: '40 * * * *' env: DBT_PROFILES_DIR: ./ diff --git a/.github/workflows/dbt_run_streamline_realtime.yml b/.github/workflows/dbt_run_streamline_realtime.yml index a025dad..46ab382 100644 --- a/.github/workflows/dbt_run_streamline_realtime.yml +++ b/.github/workflows/dbt_run_streamline_realtime.yml @@ -4,8 +4,8 @@ run-name: dbt_run_streamline_realtime on: workflow_dispatch: schedule: - # Runs “At every 30th minute.” (see https://crontab.guru) - - cron: '*/30 * * * *' + # Runs “At minutes 10 and 40.” (see https://crontab.guru) + - cron: '10,40 * * * *' env: DBT_PROFILES_DIR: ./ diff --git a/.github/workflows/dbt_test_intraday.yml b/.github/workflows/dbt_test_intraday.yml index 482839c..656d7f8 100644 --- a/.github/workflows/dbt_test_intraday.yml +++ b/.github/workflows/dbt_test_intraday.yml @@ -4,8 +4,8 @@ run-name: dbt_test_intraday on: workflow_dispatch: schedule: - # Runs “At minute 45 past every 4th hour.” (see https://crontab.guru) - - cron: '45 */4 * * *' + # Runs “At minute 50 past every 4th hour.” (see https://crontab.guru) + - cron: '50 */4 * * *' env: DBT_PROFILES_DIR: ./ @@ -42,6 +42,7 @@ jobs: - name: Run DBT Jobs run: | dbt test -m tag:recent_test + dbt run -m models/silver/_observability diff --git a/dbt_project.yml b/dbt_project.yml index 7a09056..e6041dc 100644 --- a/dbt_project.yml +++ b/dbt_project.yml @@ -48,4 +48,5 @@ vars: STREAMLINE_RUN_HISTORY: False UPDATE_UDFS_AND_SPS: False UPDATE_SNOWFLAKE_TAGS: True - WAIT: 0 \ No newline at end of file + WAIT: 0 + OBSERV_FULL_TEST: False \ No newline at end of file diff --git a/macros/tests/missing_txs.sql b/macros/tests/missing_txs.sql index 8abc462..1fb6188 100644 --- a/macros/tests/missing_txs.sql +++ b/macros/tests/missing_txs.sql @@ -61,3 +61,41 @@ WHERE model_tx_hash IS NULL OR model_block_number IS NULL {% endmacro %} + +{% macro missing_confirmed_txs( + model1, + model2 + ) %} + WITH txs_base AS ( + SELECT + block_number AS base_block_number, + block_hash AS base_block_hash, + tx_hash AS base_tx_hash + FROM + {{ model1 }} + ), + model_name AS ( + SELECT + block_number AS model_block_number, + block_hash AS model_block_hash, + tx_hash AS model_tx_hash + FROM + {{ model2 }} + ) +SELECT + DISTINCT base_block_number AS block_number +FROM + txs_base + LEFT JOIN model_name + ON base_block_number = model_block_number + AND base_tx_hash = model_tx_hash + AND base_block_hash = model_block_hash +WHERE + model_tx_hash IS NULL + AND model_block_number <= ( + SELECT + MAX(base_block_number) + FROM + txs_base + ) +{% endmacro %} diff --git a/models/silver/_observability/silver_observability__blocks_completeness.sql b/models/silver/_observability/silver_observability__blocks_completeness.sql new file mode 100644 index 0000000..604b0cc --- /dev/null +++ b/models/silver/_observability/silver_observability__blocks_completeness.sql @@ -0,0 +1,166 @@ +{{ config( + materialized = 'incremental', + unique_key = 'test_timestamp', + full_refresh = false +) }} + +WITH summary_stats AS ( + + SELECT + MIN(block_number) AS min_block, + MAX(block_number) AS max_block, + MIN(block_timestamp) AS min_block_timestamp, + MAX(block_timestamp) AS max_block_timestamp, + COUNT(1) AS blocks_tested + FROM + {{ ref('silver__blocks') }} + WHERE + block_timestamp <= DATEADD('hour', -12, CURRENT_TIMESTAMP()) + +{% if is_incremental() %} +AND ( + block_number >= ( + SELECT + MIN(block_number) + FROM + ( + SELECT + MIN(block_number) AS block_number + FROM + {{ ref('silver__blocks') }} + WHERE + block_timestamp BETWEEN DATEADD('hour', -96, CURRENT_TIMESTAMP()) + AND DATEADD('hour', -95, CURRENT_TIMESTAMP()) + UNION + SELECT + MIN(VALUE) - 1 AS block_number + FROM + ( + SELECT + blocks_impacted_array + FROM + {{ this }} + qualify ROW_NUMBER() over ( + ORDER BY + test_timestamp DESC + ) = 1 + ), + LATERAL FLATTEN( + input => blocks_impacted_array + ) + ) + ) {% if var('OBSERV_FULL_TEST') %} + OR block_number >= 0 + {% endif %} +) +{% endif %} +), +block_range AS ( + SELECT + _id AS block_number + FROM + {{ source( + 'crosschain_silver', + 'number_sequence' + ) }} + WHERE + _id BETWEEN ( + SELECT + min_block + FROM + summary_stats + ) + AND ( + SELECT + max_block + FROM + summary_stats + ) +), +blocks AS ( + SELECT + l.block_number, + block_timestamp, + LAG( + l.block_number, + 1 + ) over ( + ORDER BY + l.block_number ASC + ) AS prev_BLOCK_NUMBER + FROM + {{ ref("silver__blocks") }} + l + INNER JOIN block_range b + ON l.block_number = b.block_number + AND l.block_number >= ( + SELECT + MIN(block_number) + FROM + block_range + ) +), +block_gen AS ( + SELECT + _id AS block_number + FROM + {{ source( + 'crosschain_silver', + 'number_sequence' + ) }} + WHERE + _id BETWEEN ( + SELECT + MIN(block_number) + FROM + blocks + ) + AND ( + SELECT + MAX(block_number) + FROM + blocks + ) +) +SELECT + 'blocks' AS test_name, + MIN( + b.block_number + ) AS min_block, + MAX( + b.block_number + ) AS max_block, + MIN( + b.block_timestamp + ) AS min_block_timestamp, + MAX( + b.block_timestamp + ) AS max_block_timestamp, + COUNT(1) AS blocks_tested, + COUNT( + CASE + WHEN C.block_number IS NOT NULL THEN A.block_number + END + ) AS blocks_impacted_count, + ARRAY_AGG( + CASE + WHEN C.block_number IS NOT NULL THEN A.block_number + END + ) within GROUP ( + ORDER BY + A.block_number + ) AS blocks_impacted_array, + CURRENT_TIMESTAMP AS test_timestamp +FROM + block_gen A + LEFT JOIN blocks b + ON A.block_number = b.block_number + LEFT JOIN blocks C + ON A.block_number > C.prev_block_number + AND A.block_number < C.block_number + AND C.block_number - C.prev_block_number <> 1 +WHERE + COALESCE( + b.block_number, + C.block_number + ) IS NOT NULL diff --git a/models/silver/_observability/silver_observability__logs_completeness.sql b/models/silver/_observability/silver_observability__logs_completeness.sql new file mode 100644 index 0000000..4815408 --- /dev/null +++ b/models/silver/_observability/silver_observability__logs_completeness.sql @@ -0,0 +1,121 @@ +{{ config( + materialized = 'incremental', + unique_key = 'test_timestamp', + full_refresh = false +) }} + +WITH summary_stats AS ( + + SELECT + MIN(block_number) AS min_block, + MAX(block_number) AS max_block, + MIN(block_timestamp) AS min_block_timestamp, + MAX(block_timestamp) AS max_block_timestamp, + COUNT(1) AS blocks_tested + FROM + {{ ref('silver__blocks') }} + WHERE + block_timestamp <= DATEADD('hour', -12, CURRENT_TIMESTAMP()) + +{% if is_incremental() %} +AND ( + block_number >= ( + SELECT + MIN(block_number) + FROM + ( + SELECT + MIN(block_number) AS block_number + FROM + {{ ref('silver__blocks') }} + WHERE + block_timestamp BETWEEN DATEADD('hour', -96, CURRENT_TIMESTAMP()) + AND DATEADD('hour', -95, CURRENT_TIMESTAMP()) + UNION + SELECT + MIN(VALUE) - 1 AS block_number + FROM + ( + SELECT + blocks_impacted_array + FROM + {{ this }} + qualify ROW_NUMBER() over ( + ORDER BY + test_timestamp DESC + ) = 1 + ), + LATERAL FLATTEN( + input => blocks_impacted_array + ) + ) + ) {% if var('OBSERV_FULL_TEST') %} + OR block_number >= 0 + {% endif %} +) +{% endif %} +), +block_range AS ( + SELECT + _id AS block_number + FROM + {{ source( + 'crosschain_silver', + 'number_sequence' + ) }} + WHERE + _id BETWEEN ( + SELECT + min_block + FROM + summary_stats + ) + AND ( + SELECT + max_block + FROM + summary_stats + ) +), +broken_blocks AS ( + SELECT + DISTINCT block_number + FROM + {{ ref("silver__receipts") }} + r + LEFT JOIN {{ ref("silver__logs") }} + l USING ( + block_number, + tx_hash + ) + JOIN block_range USING (block_number) + WHERE + l.tx_hash IS NULL + AND ARRAY_SIZE( + r.logs + ) > 0 +), +impacted_blocks AS ( + SELECT + COUNT(1) AS blocks_impacted_count, + ARRAY_AGG(block_number) within GROUP ( + ORDER BY + block_number + ) AS blocks_impacted_array + FROM + broken_blocks +) +SELECT + 'event_logs' AS test_name, + min_block, + max_block, + min_block_timestamp, + max_block_timestamp, + blocks_tested, + blocks_impacted_count, + blocks_impacted_array, + CURRENT_TIMESTAMP() AS test_timestamp +FROM + summary_stats + JOIN impacted_blocks + ON 1 = 1 diff --git a/models/silver/_observability/silver_observability__receipts_completeness.sql b/models/silver/_observability/silver_observability__receipts_completeness.sql new file mode 100644 index 0000000..5b911a4 --- /dev/null +++ b/models/silver/_observability/silver_observability__receipts_completeness.sql @@ -0,0 +1,119 @@ +{{ config( + materialized = 'incremental', + unique_key = 'test_timestamp', + full_refresh = false +) }} + +WITH summary_stats AS ( + + SELECT + MIN(block_number) AS min_block, + MAX(block_number) AS max_block, + MIN(block_timestamp) AS min_block_timestamp, + MAX(block_timestamp) AS max_block_timestamp, + COUNT(1) AS blocks_tested + FROM + {{ ref('silver__blocks') }} + WHERE + block_timestamp <= DATEADD('hour', -12, CURRENT_TIMESTAMP()) + +{% if is_incremental() %} +AND ( + block_number >= ( + SELECT + MIN(block_number) + FROM + ( + SELECT + MIN(block_number) AS block_number + FROM + {{ ref('silver__blocks') }} + WHERE + block_timestamp BETWEEN DATEADD('hour', -96, CURRENT_TIMESTAMP()) + AND DATEADD('hour', -95, CURRENT_TIMESTAMP()) + UNION + SELECT + MIN(VALUE) - 1 AS block_number + FROM + ( + SELECT + blocks_impacted_array + FROM + {{ this }} + qualify ROW_NUMBER() over ( + ORDER BY + test_timestamp DESC + ) = 1 + ), + LATERAL FLATTEN( + input => blocks_impacted_array + ) + ) + ) {% if var('OBSERV_FULL_TEST') %} + OR block_number >= 0 + {% endif %} +) +{% endif %} +), +block_range AS ( + SELECT + _id AS block_number + FROM + {{ source( + 'crosschain_silver', + 'number_sequence' + ) }} + WHERE + _id BETWEEN ( + SELECT + min_block + FROM + summary_stats + ) + AND ( + SELECT + max_block + FROM + summary_stats + ) +), +broken_blocks AS ( + SELECT + DISTINCT block_number + FROM + {{ ref("silver__transactions") }} + t + LEFT JOIN {{ ref("silver__receipts") }} + r USING ( + block_number, + tx_hash, + block_hash + ) + JOIN block_range USING (block_number) + WHERE + r.tx_hash IS NULL +), +impacted_blocks AS ( + SELECT + COUNT(1) AS blocks_impacted_count, + ARRAY_AGG(block_number) within GROUP ( + ORDER BY + block_number + ) AS blocks_impacted_array + FROM + broken_blocks +) +SELECT + 'receipts' AS test_name, + min_block, + max_block, + min_block_timestamp, + max_block_timestamp, + blocks_tested, + blocks_impacted_count, + blocks_impacted_array, + CURRENT_TIMESTAMP() AS test_timestamp +FROM + summary_stats + JOIN impacted_blocks + ON 1 = 1 diff --git a/models/silver/_observability/silver_observability__traces_completeness.sql b/models/silver/_observability/silver_observability__traces_completeness.sql new file mode 100644 index 0000000..6b2b1b4 --- /dev/null +++ b/models/silver/_observability/silver_observability__traces_completeness.sql @@ -0,0 +1,117 @@ +{{ config( + materialized = 'incremental', + unique_key = 'test_timestamp' +) }} + +WITH summary_stats AS ( + + SELECT + MIN(block_number) AS min_block, + MAX(block_number) AS max_block, + MIN(block_timestamp) AS min_block_timestamp, + MAX(block_timestamp) AS max_block_timestamp, + COUNT(1) AS blocks_tested + FROM + {{ ref('silver__blocks') }} + WHERE + block_timestamp <= DATEADD('hour', -12, CURRENT_TIMESTAMP()) + +{% if is_incremental() %} +AND ( + block_number >= ( + SELECT + MIN(block_number) + FROM + ( + SELECT + MIN(block_number) AS block_number + FROM + {{ ref('silver__blocks') }} + WHERE + block_timestamp BETWEEN DATEADD('hour', -96, CURRENT_TIMESTAMP()) + AND DATEADD('hour', -95, CURRENT_TIMESTAMP()) + UNION + SELECT + MIN(VALUE) - 1 AS block_number + FROM + ( + SELECT + blocks_impacted_array + FROM + {{ this }} + qualify ROW_NUMBER() over ( + ORDER BY + test_timestamp DESC + ) = 1 + ), + LATERAL FLATTEN( + input => blocks_impacted_array + ) + ) + ) {% if var('OBSERV_FULL_TEST') %} + OR block_number >= 0 + {% endif %} +) +{% endif %} +), +block_range AS ( + SELECT + _id AS block_number + FROM + {{ source( + 'crosschain_silver', + 'number_sequence' + ) }} + WHERE + _id BETWEEN ( + SELECT + min_block + FROM + summary_stats + ) + AND ( + SELECT + max_block + FROM + summary_stats + ) +), +broken_blocks AS ( + SELECT + DISTINCT block_number + FROM + {{ ref("silver__transactions") }} + tx + LEFT JOIN {{ ref("silver__traces") }} + tr USING ( + block_number, + tx_hash + ) + JOIN block_range USING (block_number) + WHERE + tr.tx_hash IS NULL +), +impacted_blocks AS ( + SELECT + COUNT(1) AS blocks_impacted_count, + ARRAY_AGG(block_number) within GROUP ( + ORDER BY + block_number + ) AS blocks_impacted_array + FROM + broken_blocks +) +SELECT + 'traces' AS test_name, + min_block, + max_block, + min_block_timestamp, + max_block_timestamp, + blocks_tested, + blocks_impacted_count, + blocks_impacted_array, + CURRENT_TIMESTAMP() AS test_timestamp +FROM + summary_stats + JOIN impacted_blocks + ON 1 = 1 diff --git a/models/silver/_observability/silver_observability__transactions_completeness.sql b/models/silver/_observability/silver_observability__transactions_completeness.sql new file mode 100644 index 0000000..4db801f --- /dev/null +++ b/models/silver/_observability/silver_observability__transactions_completeness.sql @@ -0,0 +1,119 @@ +{{ config( + materialized = 'incremental', + unique_key = 'test_timestamp', + full_refresh = false +) }} + +WITH summary_stats AS ( + + SELECT + MIN(block_number) AS min_block, + MAX(block_number) AS max_block, + MIN(block_timestamp) AS min_block_timestamp, + MAX(block_timestamp) AS max_block_timestamp, + COUNT(1) AS blocks_tested + FROM + {{ ref('silver__blocks') }} + WHERE + block_timestamp <= DATEADD('hour', -12, CURRENT_TIMESTAMP()) + +{% if is_incremental() %} +AND ( + block_number >= ( + SELECT + MIN(block_number) + FROM + ( + SELECT + MIN(block_number) AS block_number + FROM + {{ ref('silver__blocks') }} + WHERE + block_timestamp BETWEEN DATEADD('hour', -96, CURRENT_TIMESTAMP()) + AND DATEADD('hour', -95, CURRENT_TIMESTAMP()) + UNION + SELECT + MIN(VALUE) - 1 AS block_number + FROM + ( + SELECT + blocks_impacted_array + FROM + {{ this }} + qualify ROW_NUMBER() over ( + ORDER BY + test_timestamp DESC + ) = 1 + ), + LATERAL FLATTEN( + input => blocks_impacted_array + ) + ) + ) {% if var('OBSERV_FULL_TEST') %} + OR block_number >= 0 + {% endif %} +) +{% endif %} +), +block_range AS ( + SELECT + _id AS block_number + FROM + {{ source( + 'crosschain_silver', + 'number_sequence' + ) }} + WHERE + _id BETWEEN ( + SELECT + min_block + FROM + summary_stats + ) + AND ( + SELECT + max_block + FROM + summary_stats + ) +), +broken_blocks AS ( + SELECT + DISTINCT block_number + FROM + {{ ref("silver__confirmed_blocks") }} + b + LEFT JOIN {{ ref("silver__transactions") }} + t USING ( + block_number, + tx_hash, + block_hash + ) + JOIN block_range USING (block_number) + WHERE + t.tx_hash IS NULL +), +impacted_blocks AS ( + SELECT + COUNT(1) AS blocks_impacted_count, + ARRAY_AGG(block_number) within GROUP ( + ORDER BY + block_number + ) AS blocks_impacted_array + FROM + broken_blocks +) +SELECT + 'transactions' AS test_name, + min_block, + max_block, + min_block_timestamp, + max_block_timestamp, + blocks_tested, + blocks_impacted_count, + blocks_impacted_array, + CURRENT_TIMESTAMP() AS test_timestamp +FROM + summary_stats + JOIN impacted_blocks + ON 1 = 1 diff --git a/models/silver/core/tests/confirmed_blocks/test_silver__confirmed_blocks_full.sql b/models/silver/core/tests/confirmed_blocks/test_silver__confirmed_blocks_full.sql new file mode 100644 index 0000000..6d93a21 --- /dev/null +++ b/models/silver/core/tests/confirmed_blocks/test_silver__confirmed_blocks_full.sql @@ -0,0 +1,9 @@ +{{ config ( + materialized = 'view', + tags = ['full_test'] +) }} + +SELECT + * +FROM + {{ ref('silver__confirmed_blocks') }} diff --git a/models/silver/core/tests/confirmed_blocks/test_silver__confirmed_blocks_full.yml b/models/silver/core/tests/confirmed_blocks/test_silver__confirmed_blocks_full.yml new file mode 100644 index 0000000..81a9db0 --- /dev/null +++ b/models/silver/core/tests/confirmed_blocks/test_silver__confirmed_blocks_full.yml @@ -0,0 +1,34 @@ +version: 2 +models: + - name: test_silver__confirmed_blocks_full + tests: + - dbt_utils.unique_combination_of_columns: + combination_of_columns: + - TX_HASH + columns: + - name: BLOCK_NUMBER + tests: + - not_null + - dbt_expectations.expect_column_values_to_be_in_type_list: + column_type_list: + - NUMBER + - name: BLOCK_HASH + tests: + - not_null + - dbt_expectations.expect_column_values_to_match_regex: + regex: 0[xX][0-9a-fA-F]+ + - name: TX_HASH + tests: + - not_null + - dbt_expectations.expect_column_values_to_match_regex: + regex: 0[xX][0-9a-fA-F]+ + - name: _INSERTED_TIMESTAMP + tests: + - not_null + - dbt_expectations.expect_row_values_to_have_recent_data: + datepart: day + interval: 1 + - dbt_expectations.expect_column_values_to_be_in_type_list: + column_type_list: + - TIMESTAMP_NTZ + - TIMESTAMP_LTZ \ No newline at end of file diff --git a/models/silver/core/tests/confirmed_blocks/test_silver__confirmed_blocks_recent.sql b/models/silver/core/tests/confirmed_blocks/test_silver__confirmed_blocks_recent.sql new file mode 100644 index 0000000..0808f4a --- /dev/null +++ b/models/silver/core/tests/confirmed_blocks/test_silver__confirmed_blocks_recent.sql @@ -0,0 +1,27 @@ +{{ config ( + materialized = 'view', + tags = ['recent_test'] +) }} + +WITH last_3_days AS ( + + SELECT + block_number + FROM + {{ ref("_max_block_by_date") }} + qualify ROW_NUMBER() over ( + ORDER BY + block_number DESC + ) = 3 +) +SELECT + * +FROM + {{ ref('silver__confirmed_blocks') }} +WHERE + block_number >= ( + SELECT + block_number + FROM + last_3_days + ) diff --git a/models/silver/core/tests/confirmed_blocks/test_silver__confirmed_blocks_recent.yml b/models/silver/core/tests/confirmed_blocks/test_silver__confirmed_blocks_recent.yml new file mode 100644 index 0000000..460ddf1 --- /dev/null +++ b/models/silver/core/tests/confirmed_blocks/test_silver__confirmed_blocks_recent.yml @@ -0,0 +1,34 @@ +version: 2 +models: + - name: test_silver__confirmed_blocks_recent + tests: + - dbt_utils.unique_combination_of_columns: + combination_of_columns: + - TX_HASH + columns: + - name: BLOCK_NUMBER + tests: + - not_null + - dbt_expectations.expect_column_values_to_be_in_type_list: + column_type_list: + - NUMBER + - name: BLOCK_HASH + tests: + - not_null + - dbt_expectations.expect_column_values_to_match_regex: + regex: 0[xX][0-9a-fA-F]+ + - name: TX_HASH + tests: + - not_null + - dbt_expectations.expect_column_values_to_match_regex: + regex: 0[xX][0-9a-fA-F]+ + - name: _INSERTED_TIMESTAMP + tests: + - not_null + - dbt_expectations.expect_row_values_to_have_recent_data: + datepart: hour + interval: 3 + - dbt_expectations.expect_column_values_to_be_in_type_list: + column_type_list: + - TIMESTAMP_NTZ + - TIMESTAMP_LTZ \ No newline at end of file diff --git a/models/silver/streamline/_max_block_by_hour.sql b/models/silver/streamline/_max_block_by_hour.sql index 332546a..80e3d97 100644 --- a/models/silver/streamline/_max_block_by_hour.sql +++ b/models/silver/streamline/_max_block_by_hour.sql @@ -15,7 +15,7 @@ WITH base AS ( WHERE block_timestamp > DATEADD( 'day', - -3, + -5, CURRENT_DATE ) GROUP BY diff --git a/models/silver/streamline/core/realtime/streamline__receipts_realtime.sql b/models/silver/streamline/core/realtime/streamline__receipts_realtime.sql index e805f1e..a1bada0 100644 --- a/models/silver/streamline/core/realtime/streamline__receipts_realtime.sql +++ b/models/silver/streamline/core/realtime/streamline__receipts_realtime.sql @@ -47,6 +47,11 @@ retry_blocks AS ( block_number FROM {{ ref("_missing_txs") }} + UNION + SELECT + block_number + FROM + {{ ref("_unconfirmed_blocks") }} ) ) SELECT diff --git a/models/silver/streamline/core/realtime/streamline__traces_realtime.sql b/models/silver/streamline/core/realtime/streamline__traces_realtime.sql index 3391f7c..489d43b 100644 --- a/models/silver/streamline/core/realtime/streamline__traces_realtime.sql +++ b/models/silver/streamline/core/realtime/streamline__traces_realtime.sql @@ -63,6 +63,11 @@ retry_blocks AS ( block_number FROM {{ ref("_missing_traces") }} + UNION + SELECT + block_number + FROM + {{ ref("_unconfirmed_blocks") }} ) ) SELECT diff --git a/models/silver/streamline/core/realtime/streamline__transactions_realtime.sql b/models/silver/streamline/core/realtime/streamline__transactions_realtime.sql index c76a8e5..a7c8c31 100644 --- a/models/silver/streamline/core/realtime/streamline__transactions_realtime.sql +++ b/models/silver/streamline/core/realtime/streamline__transactions_realtime.sql @@ -40,6 +40,11 @@ retry_blocks AS ( block_number FROM {{ ref("_missing_txs") }} + UNION + SELECT + block_number + FROM + {{ ref("_unconfirmed_blocks") }} ) ) SELECT diff --git a/models/silver/streamline/core/retry/_missing_receipts.sql b/models/silver/streamline/core/retry/_missing_receipts.sql index b27465c..c7ab41c 100644 --- a/models/silver/streamline/core/retry/_missing_receipts.sql +++ b/models/silver/streamline/core/retry/_missing_receipts.sql @@ -2,19 +2,31 @@ materialized = "ephemeral" ) }} +WITH lookback AS ( + + SELECT + MAX(block_number) AS block_number + FROM + {{ ref("silver__blocks") }} + WHERE + block_timestamp :: DATE = CURRENT_DATE() - 3 +) SELECT - DISTINCT tx.block_number AS block_number + DISTINCT t.block_number AS block_number FROM {{ ref("silver__transactions") }} - tx + t LEFT JOIN {{ ref("silver__receipts") }} - r - ON tx.block_number = r.block_number - AND tx.tx_hash = r.tx_hash -WHERE - tx.block_timestamp >= DATEADD( - 'day', - -2, - CURRENT_DATE + r USING ( + block_number, + block_hash, + tx_hash + ) +WHERE + r.tx_hash IS NULL + AND t.block_number >= ( + SELECT + block_number + FROM + lookback ) - AND r.tx_hash IS NULL diff --git a/models/silver/streamline/core/retry/_missing_traces.sql b/models/silver/streamline/core/retry/_missing_traces.sql index 1599bae..0830b89 100644 --- a/models/silver/streamline/core/retry/_missing_traces.sql +++ b/models/silver/streamline/core/retry/_missing_traces.sql @@ -3,7 +3,7 @@ ) }} SELECT - DISTINCT tx.block_number AS block_number + DISTINCT tx.block_number block_number FROM {{ ref("silver__transactions") }} tx diff --git a/models/silver/streamline/core/retry/_unconfirmed_blocks.sql b/models/silver/streamline/core/retry/_unconfirmed_blocks.sql new file mode 100644 index 0000000..10e9ecc --- /dev/null +++ b/models/silver/streamline/core/retry/_unconfirmed_blocks.sql @@ -0,0 +1,32 @@ +{{ config ( + materialized = "ephemeral" +) }} + +WITH lookback AS ( + + SELECT + MAX(block_number) AS block_number + FROM + {{ ref("silver__blocks") }} + WHERE + block_timestamp :: DATE = CURRENT_DATE() - 3 +) +SELECT + DISTINCT cb.block_number AS block_number +FROM + {{ ref("silver__confirmed_blocks") }} + cb + LEFT JOIN {{ ref("silver__transactions") }} + txs USING ( + block_number, + block_hash, + tx_hash + ) +WHERE + txs.tx_hash IS NULL + AND cb.block_number >= ( + SELECT + block_number + FROM + lookback + ) diff --git a/models/sources.yml b/models/sources.yml index 1623fc3..55062b4 100644 --- a/models/sources.yml +++ b/models/sources.yml @@ -24,6 +24,7 @@ sources: - name: apis_keys - name: token_prices_priority_hourly - name: asset_metadata_priority + - name: number_sequence - name: crosschain_public database: crosschain schema: bronze_public diff --git a/tests/bsc/test_silver__confirmed_blocks.sql b/tests/bsc/test_silver__confirmed_blocks.sql new file mode 100644 index 0000000..8cbac8b --- /dev/null +++ b/tests/bsc/test_silver__confirmed_blocks.sql @@ -0,0 +1 @@ +{{ missing_confirmed_txs(ref("test_silver__confirmed_blocks_full"), ref("test_silver__transactions_full")) }} diff --git a/tests/bsc/test_silver__recent_confirmed_blocks.sql b/tests/bsc/test_silver__recent_confirmed_blocks.sql new file mode 100644 index 0000000..79cdd66 --- /dev/null +++ b/tests/bsc/test_silver__recent_confirmed_blocks.sql @@ -0,0 +1 @@ +{{ missing_confirmed_txs(ref("test_silver__confirmed_blocks_recent"), ref("test_silver__transactions_recent")) }}