AN-3418 observability (#131)

* block gaps

* observe

* test update
This commit is contained in:
eric-laurello 2023-07-06 09:33:41 -04:00 committed by GitHub
parent bc61f4391b
commit 63f2509adc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 443 additions and 3 deletions

View File

@ -28,7 +28,7 @@ jobs:
with:
dbt_command: |
dbt run-operation stage_external_sources --vars "ext_full_refresh: true"
dbt run -s ./models --exclude models/silver/mars/silver__red_bank_liquidations.sql
dbt run -s ./models --exclude models/silver/mars/silver__red_bank_liquidations.sql models/silver/_observability/silver_observability*
dbt run-operation stage_external_sources --vars "ext_full_refresh: true"
environment: workflow_prod
warehouse: ${{ vars.WAREHOUSE }}

View File

@ -0,0 +1,34 @@
name: dbt_run_observability
run-name: dbt_run_observability
on:
workflow_dispatch:
schedule:
- cron: '30 3,7,11,15,19,23 * * *'
env:
USE_VARS: "${{ vars.USE_VARS }}"
DBT_PROFILES_DIR: "${{ vars.DBT_PROFILES_DIR }}"
DBT_VERSION: "${{ vars.DBT_VERSION }}"
ACCOUNT: "${{ vars.ACCOUNT }}"
ROLE: "${{ vars.ROLE }}"
USER: "${{ vars.USER }}"
PASSWORD: "${{ secrets.PASSWORD }}"
REGION: "${{ vars.REGION }}"
DATABASE: "${{ vars.DATABASE }}"
WAREHOUSE: "${{ vars.WAREHOUSE }}"
SCHEMA: "${{ vars.SCHEMA }}"
concurrency:
group: ${{ github.workflow }}
jobs:
called_workflow_template:
uses: FlipsideCrypto/analytics-workflow-templates/.github/workflows/dbt_run_template.yml@main
with:
dbt_command: |
dbt run -m models/silver/_observability/silver_observability*
environment: workflow_prod
warehouse: ${{ vars.WAREHOUSE }}
secrets: inherit

View File

@ -0,0 +1,70 @@
{{ config(
materialized = 'incremental',
full_refresh = false
) }}
WITH source AS (
SELECT
block_id,
block_timestamp,
LAG(
block_id,
1
) over (
ORDER BY
block_id ASC
) AS prev_BLOCK_ID
FROM
{{ ref('silver__blocks') }}
{% if is_incremental() %}
WHERE
(
block_timestamp :: DATE >= (
SELECT
MAX(
max_block_timestamp
) :: DATE -3
FROM
{{ this }}
)
OR (
(
SELECT
block_gaps
FROM
{{ this }}
qualify(ROW_NUMBER() over(
ORDER BY
max_block_timestamp DESC) = 1)
) <> 0
)
)
{% endif %}
)
SELECT
MIN(block_id) AS min_block,
MAX(block_id) AS max_block,
MIN(block_timestamp) AS min_block_timestamp,
MAX(block_timestamp) AS max_block_timestamp,
COUNT(1) AS total_blocks,
SUM(
CASE
WHEN block_id - prev_BLOCK_ID <> 1 THEN 1
ELSE 0
END
) AS block_gaps,
ARRAY_AGG(
CASE
WHEN block_id - prev_BLOCK_ID <> 1 THEN OBJECT_CONSTRUCT(
'prev_block_id',
prev_block_id,
'block_id',
block_id
)
END
) AS block_gaps_details,
SYSDATE() AS _inserted_timestamp
FROM
source

View File

@ -0,0 +1,68 @@
version: 2
models:
- name: silver_observability__block_gaps
description: Records of all blocks block gaps with a timestamp the test was run
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- _INSERTED_TIMESTAMP
columns:
- name: MIN_BLOCK
description: The lowest block id in the test
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- name: MAX_BLOCK
description: The highest block id in the test
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- name: MIN_BLOCK_TIMESTAMP
description: The lowest block timestamp in the test
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_NTZ
- name: MAX_BLOCK_TIMESTAMP
description: The highest block timestamp in the test
tests:
- not_null
- dbt_expectations.expect_row_values_to_have_recent_data:
datepart: day
interval: 1
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_NTZ
- name: TOTAL_BLOCKS
description: Count of blocks in the test
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- name: BLOCK_GAPS
description: Count of block gaps in the test
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- name: BLOCK_GAPS_DETAILS
description: gap blocks
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- ARRAY
- name: _INSERTED_TIMESTAMP
description: "{{ doc('inserted_timestamp') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_NTZ

View File

@ -0,0 +1,194 @@
{{ config(
materialized = 'incremental',
full_refresh = false
) }}
WITH max_silver AS (
SELECT
MAX(block_id) AS max_block_id
FROM
{{ ref('silver__transactions') }}
),
bronze AS (
SELECT
block_id,
block_timestamp,
tx_id
FROM
{{ ref('bronze__transactions') }}
WHERE
block_id <= (
SELECT
max_block_id
FROM
max_silver
)
{% if is_incremental() %}
AND (
_inserted_timestamp >= CURRENT_DATE - 7
AND block_timestamp :: DATE >= (
SELECT
MAX(
max_block_timestamp
) :: DATE -3
FROM
{{ this }}
)
OR (
(
SELECT
blocks_missing_transactions
FROM
{{ this }}
qualify(ROW_NUMBER() over(
ORDER BY
max_block_timestamp DESC) = 1)
) <> 0
)
)
{% endif %}
qualify(DENSE_RANK() over(PARTITION BY block_id
ORDER BY
_inserted_timestamp DESC) = 1)
),
bronze_count AS (
SELECT
block_id,
block_timestamp,
COUNT(
DISTINCT tx_id
) AS num_txs
FROM
bronze
GROUP BY
block_id,
block_timestamp
),
bronze_api AS (
SELECT
block_id,
block_timestamp,
num_txs
FROM
{{ ref('silver__blockchain') }}
WHERE
block_id <= (
SELECT
max_block_id
FROM
max_silver
)
{% if is_incremental() %}
AND (
block_timestamp :: DATE >= (
SELECT
MAX(
max_block_timestamp
) :: DATE -3
FROM
{{ this }}
)
OR (
(
SELECT
blocks_missing_transactions
FROM
{{ this }}
qualify(ROW_NUMBER() over(
ORDER BY
max_block_timestamp DESC) = 1)
) <> 0
)
)
{% endif %}
)
SELECT
MIN(
A.block_id
) AS min_block,
MAX(
A.block_id
) AS max_block,
MIN(
A.block_timestamp
) AS min_block_timestamp,
MAX(
A.block_timestamp
) AS max_block_timestamp,
COUNT(1) AS total_blocks,
SUM(
CASE
WHEN COALESCE(
b.num_txs,
0
) - A.num_txs <> 0 THEN 1
ELSE 0
END
) AS blocks_missing_transactions,
SUM(
ABS(
COALESCE(
b.num_txs,
0
) - A.num_txs
)
) AS total_missing_transactions,
ARRAY_AGG(
CASE
WHEN COALESCE(
b.num_txs,
0
) - A.num_txs <> 0 THEN OBJECT_CONSTRUCT(
'block',
A.block_id,
'block_timestamp',
A.block_timestamp,
'diff',
COALESCE(
b.num_txs,
0
) - A.num_txs,
'blockchain_num_txs',
A.num_txs,
'bronze_num_txs',
COALESCE(
b.num_txs,
0
)
)
END
) within GROUP(
ORDER BY
A.block_id
) AS missing_transactions_details,
SYSDATE() AS _inserted_timestamp {# SELECT
A.block_id AS min_block,
A.block_id AS max_block,
A.block_timestamp AS min_block_timestamp,
A.block_timestamp AS max_block_timestamp,
1 AS total_blocks,
CASE
WHEN A.num_txs - b.num_txs <> 0 THEN 1
ELSE 0
END AS blocks_missing_transactions,
CASE
WHEN A.num_txs - b.num_txs <> 0 THEN OBJECT_CONSTRUCT(
'block_id',
A.block_id,
'diff',
A.num_txs - b.num_txs,
'blockchain_num_txs',
b.num_txs,
'bronze_num_txs',
A.num_txs
)
END AS missing_transactions_details,
SYSDATE() AS _inserted_timestamp #}
FROM
bronze_api A
LEFT JOIN bronze_count b
ON A.block_id = b.block_id

View File

@ -0,0 +1,75 @@
version: 2
models:
- name: silver_observability__transaction_completeness
description: Records of all blocks with missing transactions with a timestamp the test was run
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- _INSERTED_TIMESTAMP
columns:
- name: MIN_BLOCK
description: The lowest block id in the test
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- name: MAX_BLOCK
description: The highest block id in the test
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- name: MIN_BLOCK_TIMESTAMP
description: The lowest block timestamp in the test
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_NTZ
- name: MAX_BLOCK_TIMESTAMP
description: The highest block timestamp in the test
tests:
- not_null
- dbt_expectations.expect_row_values_to_have_recent_data:
datepart: day
interval: 1
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_NTZ
- name: TOTAL_BLOCKS
description: Count of blocks in the test
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- name: BLOCKS_MISSING_TRANSACTIONS
description: Count of blocks with missing transactions in the test
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- name: TOTAL_MISSING_TRANSACTIONS
description: Total count of missing transactions in the test
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- name: MISSING_TRANSACTIONS_DETAILS
description: blocks with missing transactions with the number of missing transactions
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- ARRAY
- name: _INSERTED_TIMESTAMP
description: "{{ doc('inserted_timestamp') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_NTZ

View File

@ -332,8 +332,7 @@ SELECT
d.tx_id,
d.msg_group,
d.msg_sub_group,
d.currency,
action
d.currency
) AS _unique_key
FROM
decimals d