diff --git a/.github/workflows/dbt_run_observability.yml b/.github/workflows/dbt_run_observability.yml new file mode 100644 index 0000000..2ee2fd5 --- /dev/null +++ b/.github/workflows/dbt_run_observability.yml @@ -0,0 +1,45 @@ +name: dbt_run_observability +run-name: dbt_run_observability + +on: + workflow_dispatch: + schedule: + # Runs “At minute 0 past every 8th hour.” (see https://crontab.guru) + - cron: '0 */8 * * *' + +env: + DBT_PROFILES_DIR: ./ + + ACCOUNT: "${{ vars.ACCOUNT }}" + ROLE: "${{ vars.ROLE }}" + USER: "${{ vars.USER }}" + PASSWORD: "${{ secrets.PASSWORD }}" + REGION: "${{ vars.REGION }}" + DATABASE: "${{ vars.DATABASE }}" + WAREHOUSE: "${{ vars.WAREHOUSE }}" + SCHEMA: "${{ vars.SCHEMA }}" + +concurrency: + group: ${{ github.workflow }} + +jobs: + run_dbt_jobs: + runs-on: ubuntu-latest + environment: + name: workflow_prod + + steps: + - uses: actions/checkout@v3 + + - uses: actions/setup-python@v4 + with: + python-version: "${{ vars.PYTHON_VERSION }}" + cache: "pip" + + - name: install dependencies + run: | + pip install -r requirements.txt + dbt deps + - name: Run DBT Jobs + run: | + dbt run -s tag:observability \ No newline at end of file diff --git a/models/silver/_observability/silver_observability.yml b/models/silver/_observability/silver_observability.yml new file mode 100644 index 0000000..9386e32 --- /dev/null +++ b/models/silver/_observability/silver_observability.yml @@ -0,0 +1,130 @@ +version: 2 +models: + - name: silver_observability__blocks_completeness + description: Records of all blocks block gaps (missing blocks) with a timestamp the test was run + config: + contract: + enforced: true + tests: + - dbt_utils.recency: + datepart: day + field: TEST_TIMESTAMP + interval: 2 + severity: error + tags: ['test_recency'] + columns: + - name: TEST_NAME + data_type: VARCHAR + description: Name for the test + - name: MIN_BLOCK + data_type: NUMBER + description: The lowest block id in the test + tests: + - not_null: + tags: ['test_quality'] + - name: MAX_BLOCK + data_type: NUMBER + description: The highest block id in the test + tests: + - not_null: + tags: ['test_quality'] + - name: MIN_BLOCK_TIMESTAMP + data_type: TIMESTAMP_NTZ + description: The lowest block timestamp in the test + tests: + - not_null: + tags: ['test_quality'] + - name: MAX_BLOCK_TIMESTAMP + data_type: TIMESTAMP_NTZ + description: The highest block timestamp in the test + tests: + - not_null: + tags: ['test_quality'] + - name: BLOCKS_TESTED + data_type: NUMBER + description: Count of blocks in the test + tests: + - not_null: + tags: ['test_quality'] + - name: BLOCKS_IMPACTED_COUNT + data_type: NUMBER + description: Count of block gaps in the test + tests: + - not_null: + tags: ['test_quality'] + - name: BLOCKS_IMPACTED_ARRAY + data_type: ARRAY + description: Array of affected blocks + - name: TEST_TIMESTAMP + data_type: TIMESTAMP_NTZ + description: When the test was run + tests: + - not_null: + tags: ['test_quality'] + - unique: + tags: ['test_quality'] + - name: MODIFIED_TIMESTAMP + data_type: TIMESTAMP_NTZ + + - name: silver_observability__transactions_completeness + description: Records of all blocks with missing transactions with a timestamp the test was run + tests: + - dbt_utils.recency: + datepart: day + field: TEST_TIMESTAMP + interval: 2 + severity: error + tags: ['test_recency'] + columns: + - name: TEST_NAME + data_type: VARCHAR + description: Name for the test + - name: MIN_BLOCK + data_type: NUMBER + description: The lowest block id in the test + tests: + - not_null: + tags: ['test_quality'] + - name: MAX_BLOCK + data_type: NUMBER + description: The highest block id in the test + tests: + - not_null: + tags: ['test_quality'] + - name: MIN_BLOCK_TIMESTAMP + data_type: TIMESTAMP_NTZ + description: The lowest block timestamp in the test + tests: + - not_null: + tags: ['test_quality'] + - name: MAX_BLOCK_TIMESTAMP + data_type: TIMESTAMP_NTZ + description: The highest block timestamp in the test + tests: + - not_null: + tags: ['test_quality'] + - name: BLOCKS_TESTED + data_type: NUMBER + description: Count of blocks in the test + tests: + - not_null: + tags: ['test_quality'] + - name: BLOCKS_IMPACTED_COUNT + data_type: NUMBER + description: Count of block gaps in the test + tests: + - not_null: + tags: ['test_quality'] + - name: BLOCKS_IMPACTED_ARRAY + data_type: ARRAY + description: Array of affected blocks + - name: TEST_TIMESTAMP + data_type: TIMESTAMP_NTZ + description: When the test was run + tests: + - not_null: + tags: ['test_quality'] + - unique: + tags: ['test_quality'] + - name: MODIFIED_TIMESTAMP + data_type: TIMESTAMP_NTZ diff --git a/models/silver/_observability/silver_observability__blocks_completeness.sql b/models/silver/_observability/silver_observability__blocks_completeness.sql new file mode 100644 index 0000000..c3ae205 --- /dev/null +++ b/models/silver/_observability/silver_observability__blocks_completeness.sql @@ -0,0 +1,169 @@ +{{ config( + materialized = 'incremental', + unique_key = 'test_timestamp', + full_refresh = false, + tags = ['observability'], + enabled = true +) }} + +WITH summary_stats AS ( + + SELECT + MIN(block_number) AS min_block, + MAX(block_number) AS max_block, + MIN(block_timestamp) AS min_block_timestamp, + MAX(block_timestamp) AS max_block_timestamp, + COUNT(1) AS blocks_tested + FROM + {{ ref('core__fact_blocks') }} + WHERE + block_timestamp <= DATEADD('hour', -12, CURRENT_TIMESTAMP()) + +{% if is_incremental() %} +AND ( + block_number >= ( + SELECT + MIN(block_number) + FROM + ( + SELECT + MIN(block_number) AS block_number + FROM + {{ ref('core__fact_blocks') }} + WHERE + block_timestamp BETWEEN DATEADD('hour', -96, CURRENT_TIMESTAMP()) + AND DATEADD('hour', -95, CURRENT_TIMESTAMP()) + UNION + SELECT + MIN(VALUE) - 1 AS block_number + FROM + ( + SELECT + blocks_impacted_array + FROM + {{ this }} + qualify ROW_NUMBER() over ( + ORDER BY + test_timestamp DESC + ) = 1 + ), + LATERAL FLATTEN( + input => blocks_impacted_array + ) + ) + ) {% if var('OBSERV_FULL_TEST') %} + OR block_number >= 0 + {% endif %} +) +{% endif %} +), +block_range AS ( + SELECT + _id AS block_number + FROM + {{ source( + 'crosschain_silver', + 'number_sequence' + ) }} + WHERE + _id BETWEEN ( + SELECT + min_block + FROM + summary_stats + ) + AND ( + SELECT + max_block + FROM + summary_stats + ) +), +blocks AS ( + SELECT + l.block_number, + block_timestamp, + LAG( + l.block_number, + 1 + ) over ( + ORDER BY + l.block_number ASC + ) AS prev_BLOCK_NUMBER + FROM + {{ ref("core__fact_blocks") }} + l + INNER JOIN block_range b + ON l.block_number = b.block_number + AND l.block_number >= ( + SELECT + MIN(block_number) + FROM + block_range + ) +), +block_gen AS ( + SELECT + _id AS block_number + FROM + {{ source( + 'crosschain_silver', + 'number_sequence' + ) }} + WHERE + _id BETWEEN ( + SELECT + MIN(block_number) + FROM + blocks + ) + AND ( + SELECT + MAX(block_number) + FROM + blocks + ) +) +SELECT + 'blocks' AS test_name, + MIN( + b.block_number + ) AS min_block, + MAX( + b.block_number + ) AS max_block, + MIN( + b.block_timestamp + ) AS min_block_timestamp, + MAX( + b.block_timestamp + ) AS max_block_timestamp, + COUNT(1) AS blocks_tested, + COUNT( + CASE + WHEN C.block_number IS NOT NULL THEN A.block_number + END + ) AS blocks_impacted_count, + ARRAY_AGG( + CASE + WHEN C.block_number IS NOT NULL THEN A.block_number + END + ) within GROUP ( + ORDER BY + A.block_number + ) AS blocks_impacted_array, + SYSDATE() AS test_timestamp, + SYSDATE() AS modified_timestamp +FROM + block_gen A + LEFT JOIN blocks b + ON A.block_number = b.block_number + LEFT JOIN blocks C + ON A.block_number > C.prev_block_number + AND A.block_number < C.block_number + AND C.block_number - C.prev_block_number <> 1 +WHERE + COALESCE( + b.block_number, + C.block_number + ) IS NOT NULL diff --git a/models/silver/_observability/silver_observability__transactions_completeness.sql b/models/silver/_observability/silver_observability__transactions_completeness.sql new file mode 100644 index 0000000..bd19d2f --- /dev/null +++ b/models/silver/_observability/silver_observability__transactions_completeness.sql @@ -0,0 +1,142 @@ +{{ config( + materialized = 'incremental', + unique_key = 'test_timestamp', + full_refresh = false, + tags = ['observability'] +) }} + +WITH summary_stats AS ( + + SELECT + MIN(block_number) AS min_block, + MAX(block_number) AS max_block, + MIN(block_timestamp) AS min_block_timestamp, + MAX(block_timestamp) AS max_block_timestamp, + COUNT(1) AS blocks_tested + FROM + {{ ref('core__fact_blocks') }} + WHERE + block_timestamp <= DATEADD('hour', -12, CURRENT_TIMESTAMP()) + +{% if is_incremental() %} +AND ( + block_number >= ( + SELECT + MIN(block_number) + FROM + ( + SELECT + MIN(block_number) AS block_number + FROM + {{ ref('core__fact_blocks') }} + WHERE + block_timestamp BETWEEN DATEADD('hour', -96, CURRENT_TIMESTAMP()) + AND DATEADD('hour', -95, CURRENT_TIMESTAMP()) + UNION + SELECT + MIN(VALUE) - 1 AS block_number + FROM + ( + SELECT + blocks_impacted_array + FROM + {{ this }} + qualify ROW_NUMBER() over ( + ORDER BY + test_timestamp DESC + ) = 1 + ), + LATERAL FLATTEN( + input => blocks_impacted_array + ) + ) + ) {% if var('OBSERV_FULL_TEST') %} + OR block_number >= 0 + {% endif %} +) +{% endif %} +), +base_blocks AS ( + SELECT + block_number, + tx_count AS transaction_count + FROM + {{ ref('core__fact_blocks') }} + WHERE + block_number BETWEEN ( + SELECT + min_block + FROM + summary_stats + ) + AND ( + SELECT + max_block + FROM + summary_stats + ) + AND + block_number NOT IN (0, 1758, 1760, 1761, 1762, 1763, 1764, 1766) +), +actual_tx_counts AS ( + SELECT + block_number, + COUNT(1) AS transaction_count + FROM + {{ ref('core__fact_transactions') }} + WHERE + block_number BETWEEN ( + SELECT + min_block + FROM + summary_stats + ) + AND ( + SELECT + max_block + FROM + summary_stats + ) + AND + block_number NOT IN (0, 1758, 1760, 1761, 1762, 1763, 1764, 1766) + GROUP BY + block_number +), +potential_missing_txs AS ( + SELECT + e.block_number + FROM + base_blocks e + LEFT OUTER JOIN actual_tx_counts A + ON e.block_number = A.block_number + WHERE + COALESCE( + A.transaction_count, + 0 + ) <> e.transaction_count +), +impacted_blocks AS ( + SELECT + COUNT(1) AS blocks_impacted_count, + ARRAY_AGG(block_number) within GROUP ( + ORDER BY + block_number + ) AS blocks_impacted_array + FROM + potential_missing_txs +) +SELECT + 'transactions' AS test_name, + min_block, + max_block, + min_block_timestamp, + max_block_timestamp, + blocks_tested, + blocks_impacted_count, + blocks_impacted_array, + SYSDATE() AS test_timestamp, + SYSDATE() AS modified_timestamp +FROM + summary_stats + JOIN impacted_blocks + ON 1 = 1