Merge pull request #13 from FlipsideCrypto/add-observability

+ observability models
This commit is contained in:
eric-laurello 2025-04-07 14:59:59 -04:00 committed by GitHub
commit 93d02aaea4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 486 additions and 0 deletions

View File

@ -0,0 +1,45 @@
name: dbt_run_observability
run-name: dbt_run_observability
on:
workflow_dispatch:
schedule:
# Runs “At minute 0 past every 8th hour.” (see https://crontab.guru)
- cron: '0 */8 * * *'
env:
DBT_PROFILES_DIR: ./
ACCOUNT: "${{ vars.ACCOUNT }}"
ROLE: "${{ vars.ROLE }}"
USER: "${{ vars.USER }}"
PASSWORD: "${{ secrets.PASSWORD }}"
REGION: "${{ vars.REGION }}"
DATABASE: "${{ vars.DATABASE }}"
WAREHOUSE: "${{ vars.WAREHOUSE }}"
SCHEMA: "${{ vars.SCHEMA }}"
concurrency:
group: ${{ github.workflow }}
jobs:
run_dbt_jobs:
runs-on: ubuntu-latest
environment:
name: workflow_prod
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: "${{ vars.PYTHON_VERSION }}"
cache: "pip"
- name: install dependencies
run: |
pip install -r requirements.txt
dbt deps
- name: Run DBT Jobs
run: |
dbt run -s tag:observability

View File

@ -0,0 +1,130 @@
version: 2
models:
- name: silver_observability__blocks_completeness
description: Records of all blocks block gaps (missing blocks) with a timestamp the test was run
config:
contract:
enforced: true
tests:
- dbt_utils.recency:
datepart: day
field: TEST_TIMESTAMP
interval: 2
severity: error
tags: ['test_recency']
columns:
- name: TEST_NAME
data_type: VARCHAR
description: Name for the test
- name: MIN_BLOCK
data_type: NUMBER
description: The lowest block id in the test
tests:
- not_null:
tags: ['test_quality']
- name: MAX_BLOCK
data_type: NUMBER
description: The highest block id in the test
tests:
- not_null:
tags: ['test_quality']
- name: MIN_BLOCK_TIMESTAMP
data_type: TIMESTAMP_NTZ
description: The lowest block timestamp in the test
tests:
- not_null:
tags: ['test_quality']
- name: MAX_BLOCK_TIMESTAMP
data_type: TIMESTAMP_NTZ
description: The highest block timestamp in the test
tests:
- not_null:
tags: ['test_quality']
- name: BLOCKS_TESTED
data_type: NUMBER
description: Count of blocks in the test
tests:
- not_null:
tags: ['test_quality']
- name: BLOCKS_IMPACTED_COUNT
data_type: NUMBER
description: Count of block gaps in the test
tests:
- not_null:
tags: ['test_quality']
- name: BLOCKS_IMPACTED_ARRAY
data_type: ARRAY
description: Array of affected blocks
- name: TEST_TIMESTAMP
data_type: TIMESTAMP_NTZ
description: When the test was run
tests:
- not_null:
tags: ['test_quality']
- unique:
tags: ['test_quality']
- name: MODIFIED_TIMESTAMP
data_type: TIMESTAMP_NTZ
- name: silver_observability__transactions_completeness
description: Records of all blocks with missing transactions with a timestamp the test was run
tests:
- dbt_utils.recency:
datepart: day
field: TEST_TIMESTAMP
interval: 2
severity: error
tags: ['test_recency']
columns:
- name: TEST_NAME
data_type: VARCHAR
description: Name for the test
- name: MIN_BLOCK
data_type: NUMBER
description: The lowest block id in the test
tests:
- not_null:
tags: ['test_quality']
- name: MAX_BLOCK
data_type: NUMBER
description: The highest block id in the test
tests:
- not_null:
tags: ['test_quality']
- name: MIN_BLOCK_TIMESTAMP
data_type: TIMESTAMP_NTZ
description: The lowest block timestamp in the test
tests:
- not_null:
tags: ['test_quality']
- name: MAX_BLOCK_TIMESTAMP
data_type: TIMESTAMP_NTZ
description: The highest block timestamp in the test
tests:
- not_null:
tags: ['test_quality']
- name: BLOCKS_TESTED
data_type: NUMBER
description: Count of blocks in the test
tests:
- not_null:
tags: ['test_quality']
- name: BLOCKS_IMPACTED_COUNT
data_type: NUMBER
description: Count of block gaps in the test
tests:
- not_null:
tags: ['test_quality']
- name: BLOCKS_IMPACTED_ARRAY
data_type: ARRAY
description: Array of affected blocks
- name: TEST_TIMESTAMP
data_type: TIMESTAMP_NTZ
description: When the test was run
tests:
- not_null:
tags: ['test_quality']
- unique:
tags: ['test_quality']
- name: MODIFIED_TIMESTAMP
data_type: TIMESTAMP_NTZ

View File

@ -0,0 +1,169 @@
{{ config(
materialized = 'incremental',
unique_key = 'test_timestamp',
full_refresh = false,
tags = ['observability'],
enabled = true
) }}
WITH summary_stats AS (
SELECT
MIN(block_number) AS min_block,
MAX(block_number) AS max_block,
MIN(block_timestamp) AS min_block_timestamp,
MAX(block_timestamp) AS max_block_timestamp,
COUNT(1) AS blocks_tested
FROM
{{ ref('core__fact_blocks') }}
WHERE
block_timestamp <= DATEADD('hour', -12, CURRENT_TIMESTAMP())
{% if is_incremental() %}
AND (
block_number >= (
SELECT
MIN(block_number)
FROM
(
SELECT
MIN(block_number) AS block_number
FROM
{{ ref('core__fact_blocks') }}
WHERE
block_timestamp BETWEEN DATEADD('hour', -96, CURRENT_TIMESTAMP())
AND DATEADD('hour', -95, CURRENT_TIMESTAMP())
UNION
SELECT
MIN(VALUE) - 1 AS block_number
FROM
(
SELECT
blocks_impacted_array
FROM
{{ this }}
qualify ROW_NUMBER() over (
ORDER BY
test_timestamp DESC
) = 1
),
LATERAL FLATTEN(
input => blocks_impacted_array
)
)
) {% if var('OBSERV_FULL_TEST') %}
OR block_number >= 0
{% endif %}
)
{% endif %}
),
block_range AS (
SELECT
_id AS block_number
FROM
{{ source(
'crosschain_silver',
'number_sequence'
) }}
WHERE
_id BETWEEN (
SELECT
min_block
FROM
summary_stats
)
AND (
SELECT
max_block
FROM
summary_stats
)
),
blocks AS (
SELECT
l.block_number,
block_timestamp,
LAG(
l.block_number,
1
) over (
ORDER BY
l.block_number ASC
) AS prev_BLOCK_NUMBER
FROM
{{ ref("core__fact_blocks") }}
l
INNER JOIN block_range b
ON l.block_number = b.block_number
AND l.block_number >= (
SELECT
MIN(block_number)
FROM
block_range
)
),
block_gen AS (
SELECT
_id AS block_number
FROM
{{ source(
'crosschain_silver',
'number_sequence'
) }}
WHERE
_id BETWEEN (
SELECT
MIN(block_number)
FROM
blocks
)
AND (
SELECT
MAX(block_number)
FROM
blocks
)
)
SELECT
'blocks' AS test_name,
MIN(
b.block_number
) AS min_block,
MAX(
b.block_number
) AS max_block,
MIN(
b.block_timestamp
) AS min_block_timestamp,
MAX(
b.block_timestamp
) AS max_block_timestamp,
COUNT(1) AS blocks_tested,
COUNT(
CASE
WHEN C.block_number IS NOT NULL THEN A.block_number
END
) AS blocks_impacted_count,
ARRAY_AGG(
CASE
WHEN C.block_number IS NOT NULL THEN A.block_number
END
) within GROUP (
ORDER BY
A.block_number
) AS blocks_impacted_array,
SYSDATE() AS test_timestamp,
SYSDATE() AS modified_timestamp
FROM
block_gen A
LEFT JOIN blocks b
ON A.block_number = b.block_number
LEFT JOIN blocks C
ON A.block_number > C.prev_block_number
AND A.block_number < C.block_number
AND C.block_number - C.prev_block_number <> 1
WHERE
COALESCE(
b.block_number,
C.block_number
) IS NOT NULL

View File

@ -0,0 +1,142 @@
{{ config(
materialized = 'incremental',
unique_key = 'test_timestamp',
full_refresh = false,
tags = ['observability']
) }}
WITH summary_stats AS (
SELECT
MIN(block_number) AS min_block,
MAX(block_number) AS max_block,
MIN(block_timestamp) AS min_block_timestamp,
MAX(block_timestamp) AS max_block_timestamp,
COUNT(1) AS blocks_tested
FROM
{{ ref('core__fact_blocks') }}
WHERE
block_timestamp <= DATEADD('hour', -12, CURRENT_TIMESTAMP())
{% if is_incremental() %}
AND (
block_number >= (
SELECT
MIN(block_number)
FROM
(
SELECT
MIN(block_number) AS block_number
FROM
{{ ref('core__fact_blocks') }}
WHERE
block_timestamp BETWEEN DATEADD('hour', -96, CURRENT_TIMESTAMP())
AND DATEADD('hour', -95, CURRENT_TIMESTAMP())
UNION
SELECT
MIN(VALUE) - 1 AS block_number
FROM
(
SELECT
blocks_impacted_array
FROM
{{ this }}
qualify ROW_NUMBER() over (
ORDER BY
test_timestamp DESC
) = 1
),
LATERAL FLATTEN(
input => blocks_impacted_array
)
)
) {% if var('OBSERV_FULL_TEST') %}
OR block_number >= 0
{% endif %}
)
{% endif %}
),
base_blocks AS (
SELECT
block_number,
tx_count AS transaction_count
FROM
{{ ref('core__fact_blocks') }}
WHERE
block_number BETWEEN (
SELECT
min_block
FROM
summary_stats
)
AND (
SELECT
max_block
FROM
summary_stats
)
AND
block_number NOT IN (0, 1758, 1760, 1761, 1762, 1763, 1764, 1766)
),
actual_tx_counts AS (
SELECT
block_number,
COUNT(1) AS transaction_count
FROM
{{ ref('core__fact_transactions') }}
WHERE
block_number BETWEEN (
SELECT
min_block
FROM
summary_stats
)
AND (
SELECT
max_block
FROM
summary_stats
)
AND
block_number NOT IN (0, 1758, 1760, 1761, 1762, 1763, 1764, 1766)
GROUP BY
block_number
),
potential_missing_txs AS (
SELECT
e.block_number
FROM
base_blocks e
LEFT OUTER JOIN actual_tx_counts A
ON e.block_number = A.block_number
WHERE
COALESCE(
A.transaction_count,
0
) <> e.transaction_count
),
impacted_blocks AS (
SELECT
COUNT(1) AS blocks_impacted_count,
ARRAY_AGG(block_number) within GROUP (
ORDER BY
block_number
) AS blocks_impacted_array
FROM
potential_missing_txs
)
SELECT
'transactions' AS test_name,
min_block,
max_block,
min_block_timestamp,
max_block_timestamp,
blocks_tested,
blocks_impacted_count,
blocks_impacted_array,
SYSDATE() AS test_timestamp,
SYSDATE() AS modified_timestamp
FROM
summary_stats
JOIN impacted_blocks
ON 1 = 1