AN-3418 observability (#132)

* block gaps

* observe

* test update

* check in

* observer
This commit is contained in:
eric-laurello 2023-07-11 09:13:09 -04:00 committed by GitHub
parent 63f2509adc
commit 66396c5b93
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 310 additions and 92 deletions

View File

@ -0,0 +1,34 @@
name: dbt_run_observability_monthly
run-name: dbt_run_observability_monthly
on:
workflow_dispatch:
schedule:
- cron: '10 0 1 * *'
env:
USE_VARS: "${{ vars.USE_VARS }}"
DBT_PROFILES_DIR: "${{ vars.DBT_PROFILES_DIR }}"
DBT_VERSION: "${{ vars.DBT_VERSION }}"
ACCOUNT: "${{ vars.ACCOUNT }}"
ROLE: "${{ vars.ROLE }}"
USER: "${{ vars.USER }}"
PASSWORD: "${{ secrets.PASSWORD }}"
REGION: "${{ vars.REGION }}"
DATABASE: "${{ vars.DATABASE }}"
WAREHOUSE: "${{ vars.WAREHOUSE }}"
SCHEMA: "${{ vars.SCHEMA }}"
concurrency:
group: ${{ github.workflow }}
jobs:
called_workflow_template:
uses: FlipsideCrypto/analytics-workflow-templates/.github/workflows/dbt_run_template.yml@main
with:
dbt_command: |
dbt run -m models/silver/_observability/silver_observability* --vars "OBSERV_FULL_TEST: true"
environment: workflow_prod
warehouse: ${{ vars.WAREHOUSE }}
secrets: inherit

View File

@ -42,6 +42,7 @@ models:
vars:
"dbt_date:time_zone": GMT
"UPDATE_SNOWFLAKE_TAGS": TRUE
OBSERV_FULL_TEST: FALSE
tests:
+store_failures: true # all tests

View File

@ -0,0 +1,132 @@
{{ config(
materialized = 'incremental',
full_refresh = false
) }}
WITH source AS (
SELECT
block_id,
block_timestamp,
LAG(
block_id,
1
) over (
ORDER BY
block_id ASC
) AS prev_BLOCK_ID
FROM
{{ ref('silver__blocks') }} A
WHERE
block_timestamp < DATEADD(
HOUR,
-24,
SYSDATE()
)
{% if is_incremental() %}
AND (
block_timestamp >= DATEADD(
HOUR,
-96,(
SELECT
MAX(
max_block_timestamp
)
FROM
{{ this }}
)
)
OR ({% if var('OBSERV_FULL_TEST') %}
block_id >= 0
{% else %}
block_id >= (
SELECT
MIN(VALUE) - 1
FROM
(
SELECT
blocks_impacted_array
FROM
{{ this }}
qualify ROW_NUMBER() over (
ORDER BY
test_timestamp DESC) = 1), LATERAL FLATTEN(input => blocks_impacted_array))
{% endif %})
)
{% endif %}
),
block_gen AS (
SELECT
_id AS block_id
FROM
{{ source(
'crosschain_silver',
'number_sequence'
) }}
WHERE
_id BETWEEN (
SELECT
MIN(block_id)
FROM
source
)
AND (
SELECT
MAX(block_id)
FROM
source
)
)
SELECT
'blocks' AS test_name,
MIN(
b.block_id
) AS min_block,
MAX(
b.block_id
) AS max_block,
MIN(
b.block_timestamp
) AS min_block_timestamp,
MAX(
b.block_timestamp
) AS max_block_timestamp,
COUNT(1) AS blocks_tested,
COUNT(
CASE
WHEN C.block_id IS NOT NULL THEN A.block_id
END
) AS blocks_impacted_count,
ARRAY_AGG(
CASE
WHEN C.block_id IS NOT NULL THEN A.block_id
END
) within GROUP (
ORDER BY
A.block_id
) AS blocks_impacted_array,
ARRAY_AGG(
DISTINCT CASE
WHEN C.block_id IS NOT NULL THEN OBJECT_CONSTRUCT(
'prev_block_id',
C.prev_block_id,
'block_id',
C.block_id
)
END
) AS test_failure_details,
SYSDATE() AS test_timestamp
FROM
block_gen A
LEFT JOIN source b
ON A.block_id = b.block_id
LEFT JOIN source C
ON A.block_id > C.prev_BLOCK_ID
AND A.block_id < C.block_id
AND C.block_id - C.prev_BLOCK_ID <> 1
WHERE
COALESCE(
b.block_id,
C.block_id
) IS NOT NULL

View File

@ -0,0 +1,75 @@
version: 2
models:
- name: silver_observability__block_completeness
description: Records of all blocks block gaps (missing blocks) with a timestamp the test was run
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- TEST_TIMESTAMP
columns:
- name: MIN_BLOCK
description: The lowest block id in the test
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- name: MAX_BLOCK
description: The highest block id in the test
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- name: MIN_BLOCK_TIMESTAMP
description: The lowest block timestamp in the test
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_NTZ
- name: MAX_BLOCK_TIMESTAMP
description: The highest block timestamp in the test
tests:
- not_null
- dbt_expectations.expect_row_values_to_have_recent_data:
datepart: day
interval: 2
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_NTZ
- name: BLOCKS_TESTED
description: Count of blocks in the test
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- name: BLOCKS_IMPACTED_COUNT
description: Count of block gaps in the test
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- name: BLOCKS_IMPACTED_ARRAY
description: Array of affected blocks
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- ARRAY
- name: TEST_FAILURE_DETAILS
description: Array of details of the failure
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- ARRAY
- name: TEST_TIMESTAMP
description: When the test was run
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_NTZ

View File

@ -3,14 +3,8 @@
full_refresh = false
) }}
WITH max_silver AS (
WITH bronze AS (
SELECT
MAX(block_id) AS max_block_id
FROM
{{ ref('silver__transactions') }}
),
bronze AS (
SELECT
block_id,
block_timestamp,
@ -18,41 +12,43 @@ bronze AS (
FROM
{{ ref('bronze__transactions') }}
WHERE
block_id <= (
SELECT
max_block_id
FROM
max_silver
block_timestamp < DATEADD(
HOUR,
-24,
SYSDATE()
)
{% if is_incremental() %}
AND (
_inserted_timestamp >= CURRENT_DATE - 7
AND block_timestamp :: DATE >= (
SELECT
MAX(
max_block_timestamp
) :: DATE -3
FROM
{{ this }}
)
OR (
(
block_timestamp >= DATEADD(
HOUR,
-96,(
SELECT
blocks_missing_transactions
MAX(
max_block_timestamp
)
FROM
{{ this }}
qualify(ROW_NUMBER() over(
ORDER BY
max_block_timestamp DESC) = 1)
) <> 0
)
)
OR ({% if var('OBSERV_FULL_TEST') %}
block_id >= 0
{% else %}
block_id >= (
SELECT
MIN(VALUE) - 1
FROM
(
SELECT
blocks_impacted_array
FROM
{{ this }}
qualify ROW_NUMBER() over (
ORDER BY
test_timestamp DESC) = 1), LATERAL FLATTEN(input => blocks_impacted_array))
{% endif %})
)
{% endif %}
qualify(DENSE_RANK() over(PARTITION BY block_id
ORDER BY
_inserted_timestamp DESC) = 1)
),
bronze_count AS (
SELECT
@ -75,38 +71,21 @@ bronze_api AS (
FROM
{{ ref('silver__blockchain') }}
WHERE
block_id <= (
block_timestamp BETWEEN (
SELECT
max_block_id
MIN(block_timestamp)
FROM
max_silver
bronze
)
{% if is_incremental() %}
AND (
block_timestamp :: DATE >= (
SELECT
MAX(
max_block_timestamp
) :: DATE -3
FROM
{{ this }}
)
OR (
(
AND (
SELECT
blocks_missing_transactions
MAX(block_timestamp)
FROM
{{ this }}
qualify(ROW_NUMBER() over(
ORDER BY
max_block_timestamp DESC) = 1)
) <> 0
)
)
{% endif %}
bronze
)
)
SELECT
'transactions' AS test_name,
MIN(
A.block_id
) AS min_block,
@ -119,7 +98,7 @@ SELECT
MAX(
A.block_timestamp
) AS max_block_timestamp,
COUNT(1) AS total_blocks,
COUNT(1) AS blocks_tested,
SUM(
CASE
WHEN COALESCE(
@ -128,7 +107,18 @@ SELECT
) - A.num_txs <> 0 THEN 1
ELSE 0
END
) AS blocks_missing_transactions,
) AS blocks_impacted_count,
ARRAY_AGG(
CASE
WHEN COALESCE(
b.num_txs,
0
) - A.num_txs <> 0 THEN A.block_id
END
) within GROUP (
ORDER BY
A.block_id
) AS blocks_impacted_array,
SUM(
ABS(
COALESCE(
@ -136,7 +126,7 @@ SELECT
0
) - A.num_txs
)
) AS total_missing_transactions,
) AS transactions_impacted_count,
ARRAY_AGG(
CASE
WHEN COALESCE(
@ -164,30 +154,8 @@ SELECT
) within GROUP(
ORDER BY
A.block_id
) AS missing_transactions_details,
SYSDATE() AS _inserted_timestamp {# SELECT
A.block_id AS min_block,
A.block_id AS max_block,
A.block_timestamp AS min_block_timestamp,
A.block_timestamp AS max_block_timestamp,
1 AS total_blocks,
CASE
WHEN A.num_txs - b.num_txs <> 0 THEN 1
ELSE 0
END AS blocks_missing_transactions,
CASE
WHEN A.num_txs - b.num_txs <> 0 THEN OBJECT_CONSTRUCT(
'block_id',
A.block_id,
'diff',
A.num_txs - b.num_txs,
'blockchain_num_txs',
b.num_txs,
'bronze_num_txs',
A.num_txs
)
END AS missing_transactions_details,
SYSDATE() AS _inserted_timestamp #}
) AS test_failure_details,
SYSDATE() AS test_timestamp
FROM
bronze_api A
LEFT JOIN bronze_count b

View File

@ -5,7 +5,7 @@ models:
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- _INSERTED_TIMESTAMP
- TEST_TIMESTAMP
columns:
- name: MIN_BLOCK
description: The lowest block id in the test
@ -34,40 +34,47 @@ models:
- not_null
- dbt_expectations.expect_row_values_to_have_recent_data:
datepart: day
interval: 1
interval: 2
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_NTZ
- name: TOTAL_BLOCKS
- name: BLOCKS_TESTED
description: Count of blocks in the test
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- name: BLOCKS_MISSING_TRANSACTIONS
- name: BLOCKS_IMPACTED_COUNT
description: Count of blocks with missing transactions in the test
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- name: TOTAL_MISSING_TRANSACTIONS
- name: BLOCKS_IMPACTED_ARRAY
description: Array of affected blocks
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- ARRAY
- name: TRANSACTIONS_IMPACTED_COUNT
description: Total count of missing transactions in the test
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- name: MISSING_TRANSACTIONS_DETAILS
- name: TEST_FAILURE_DETAILS
description: blocks with missing transactions with the number of missing transactions
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- ARRAY
- name: _INSERTED_TIMESTAMP
description: "{{ doc('inserted_timestamp') }}"
- name: TEST_TIMESTAMP
description: When the test was run
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:

View File

@ -32,6 +32,7 @@ sources:
- name: asset_metadata_coin_gecko
- name: hourly_prices_coin_market_cap
- name: hourly_prices_coin_gecko
- name: number_sequence
- name: crosschain
database: "{{ 'crosschain' if target.database == 'OSMOSIS' else 'crosschain_dev' }}"
schema: core