An 3648/data observability (#325)

* blocks test

* wip

* transaction observability

* rework models

* add crosschain source

* observability workflow

* observability vars
This commit is contained in:
tarikceric 2023-08-17 16:30:17 -07:00 committed by GitHub
parent 7be8c45e44
commit 8307619289
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 533 additions and 2 deletions

View File

@ -0,0 +1,44 @@
name: dbt_run_full_observability
run-name: dbt_run_full_observability
on:
workflow_dispatch:
schedule:
# Runs “At 06:00 on day-of-month 1.” (see https://crontab.guru)
- cron: '0 6 1 * *'
env:
DBT_PROFILES_DIR: ./
ACCOUNT: "${{ vars.ACCOUNT }}"
ROLE: "${{ vars.ROLE }}"
USER: "${{ vars.USER }}"
PASSWORD: "${{ secrets.PASSWORD }}"
REGION: "${{ vars.REGION }}"
DATABASE: "${{ vars.DATABASE }}"
WAREHOUSE: "${{ vars.WAREHOUSE }}"
SCHEMA: "${{ vars.SCHEMA }}"
concurrency:
group: ${{ github.workflow }}
jobs:
run_dbt_jobs:
runs-on: ubuntu-latest
environment:
name: workflow_prod
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v1
with:
python-version: "3.7.x"
- name: install dependencies
run: |
pip3 install dbt-snowflake==${{ vars.DBT_VERSION }} cli_passthrough requests click
dbt deps
- name: Run DBT Jobs
run: |
dbt run --vars '{"OBSERV_FULL_TEST":True}' -m models/silver/_observability

View File

@ -44,6 +44,6 @@ jobs:
dbt run-operation run_sp_refresh_external_tables_full
dbt run -s models/silver/silver__transactions.sql models/silver/silver__blocks.sql models/silver/silver__votes.sql models/silver/silver___inner_instructions.sql models/silver/silver___instructions.sql models/silver/silver__events.sql models/silver/silver___all_undecoded_instructions_data.sql
dbt run-operation run_sp_refresh_external_tables_full
dbt run -s ./models --exclude models/core models/silver/silver__transactions.sql models/silver/silver__blocks.sql models/silver/silver__votes.sql models/silver/silver___inner_instructions.sql models/silver/silver___instructions.sql models/silver/silver__events.sql models/silver/silver___all_undecoded_instructions_data.sql tag:share models/streamline models/silver/silver__daily_signers.sql models/silver/silver__signers.sql models/silver/accounts/silver__token_account_owners_intermediate.sql models/silver/accounts/silver__token_account_owners.sql models/silver/silver__signers_nfts_held.sql models/silver/validator models/silver/nfts/silver__nft_sales_opensea.sql models/silver/nfts/silver__nft_sales_yawww.sql models/silver/nfts/silver__nft_bids_yawww.sql models/silver/nfts/silver__nft_compressed_mints_onchain.sql models/silver/nfts/silver__nft_compressed_mints.sql
dbt run -s ./models --exclude models/core models/silver/silver__transactions.sql models/silver/silver__blocks.sql models/silver/silver__votes.sql models/silver/silver___inner_instructions.sql models/silver/silver___instructions.sql models/silver/silver__events.sql models/silver/silver___all_undecoded_instructions_data.sql tag:share models/streamline models/silver/silver__daily_signers.sql models/silver/silver__signers.sql models/silver/accounts/silver__token_account_owners_intermediate.sql models/silver/accounts/silver__token_account_owners.sql models/silver/silver__signers_nfts_held.sql models/silver/validator models/silver/nfts/silver__nft_sales_opensea.sql models/silver/nfts/silver__nft_sales_yawww.sql models/silver/nfts/silver__nft_bids_yawww.sql models/silver/_observability models/silver/nfts/silver__nft_compressed_mints_onchain.sql models/silver/nfts/silver__nft_compressed_mints.sql
dbt run --var '{"UPDATE_SNOWFLAKE_TAGS":True}' -s ./models/core --exclude models/core/core__ez_signers.sql

44
.github/workflows/dbt_test_intraday.yml vendored Normal file
View File

@ -0,0 +1,44 @@
name: dbt_test_intraday
run-name: dbt_test_intraday
on:
workflow_dispatch:
schedule:
# Runs “At minute 45 past every 4th hour.” (see https://crontab.guru)
- cron: '45 */4 * * *'
env:
DBT_PROFILES_DIR: ./
ACCOUNT: "${{ vars.ACCOUNT }}"
ROLE: "${{ vars.ROLE }}"
USER: "${{ vars.USER }}"
PASSWORD: "${{ secrets.PASSWORD }}"
REGION: "${{ vars.REGION }}"
DATABASE: "${{ vars.DATABASE }}"
WAREHOUSE: "${{ vars.WAREHOUSE }}"
SCHEMA: "${{ vars.SCHEMA }}"
concurrency:
group: ${{ github.workflow }}
jobs:
run_dbt_jobs:
runs-on: ubuntu-latest
environment:
name: workflow_prod
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v1
with:
python-version: "3.7.x"
- name: install dependencies
run: |
pip3 install dbt-snowflake==${{ vars.DBT_VERSION }} cli_passthrough requests click
dbt deps
- name: Run DBT Jobs
run: |
dbt run -m models/silver/_observability

View File

@ -62,4 +62,5 @@ vars:
UPDATE_SNOWFLAKE_TAGS: True
UPDATE_UDFS_AND_SPS: False
STREAMLINE_INVOKE_STREAMS: False
STREAMLINE_RUN_HISTORY: False
STREAMLINE_RUN_HISTORY: False
OBSERV_FULL_TEST: False

View File

@ -0,0 +1,135 @@
{{ config(
materialized = 'incremental',
unique_key = 'test_timestamp',
full_refresh = false
) }}
WITH source AS (
SELECT
block_id,
block_timestamp,
previous_BLOCK_ID AS true_prev_block_id,
LAG(
block_id,
1
) over (
ORDER BY
block_id ASC
) AS prev_BLOCK_ID
FROM
{{ ref('silver__blocks') }} A
WHERE
block_timestamp < DATEADD(
HOUR,
-24,
SYSDATE()
)
{% if is_incremental() %}
AND (
block_timestamp >= DATEADD(
HOUR,
-96,(
SELECT
MAX(
max_block_timestamp
)
FROM
{{ this }}
)
)
OR ({% if var('OBSERV_FULL_TEST') %}
block_id > 39824213 --some anomalies before tx's start having block_timestamps
{% else %}
block_id >= (
SELECT
MIN(VALUE) - 1
FROM
(
SELECT
blocks_impacted_array
FROM
{{ this }}
qualify ROW_NUMBER() over (
ORDER BY
test_timestamp DESC) = 1), LATERAL FLATTEN(input => blocks_impacted_array))
{% endif %})
)
{% endif %}
),
block_gen AS (
SELECT
_id AS block_id
FROM
{{ source(
'crosschain_silver',
'number_sequence'
) }}
WHERE
_id BETWEEN (
SELECT
MIN(block_id)
FROM
source
)
AND (
SELECT
MAX(block_id)
FROM
source
)
)
SELECT
'blocks' AS test_name,
MIN(
b.block_id
) AS min_block,
MAX(
b.block_id
) AS max_block,
MIN(
b.block_timestamp
) AS min_block_timestamp,
MAX(
b.block_timestamp
) AS max_block_timestamp,
COUNT(1) AS blocks_tested,
COUNT(
CASE
WHEN C.block_id IS NOT NULL THEN A.block_id
END
) AS blocks_impacted_count,
ARRAY_AGG(
CASE
WHEN C.block_id IS NOT NULL THEN A.block_id
END
) within GROUP (
ORDER BY
A.block_id
) AS blocks_impacted_array,
ARRAY_AGG(
DISTINCT CASE
WHEN C.block_id IS NOT NULL THEN OBJECT_CONSTRUCT(
'prev_block_id',
C.prev_block_id,
'block_id',
C.block_id
)
END
) AS test_failure_details,
SYSDATE() AS test_timestamp
FROM
block_gen A
LEFT JOIN source b
ON A.block_id = b.block_id
LEFT JOIN source C
ON A.block_id > C.prev_BLOCK_ID
AND A.block_id < C.block_id
AND C.block_id - C.prev_BLOCK_ID <> 1
AND A.block_id = C.true_prev_BLOCK_ID
WHERE
COALESCE(
b.prev_block_id,
C.prev_block_id
) IS NOT NULL

View File

@ -0,0 +1,75 @@
version: 2
models:
- name: silver_observability__blocks_completeness
description: Records of all blocks block gaps (missing blocks) with a timestamp the test was run
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- TEST_TIMESTAMP
columns:
- name: MIN_BLOCK
description: The lowest block id in the test
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- name: MAX_BLOCK
description: The highest block id in the test
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- name: MIN_BLOCK_TIMESTAMP
description: The lowest block timestamp in the test
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_NTZ
- name: MAX_BLOCK_TIMESTAMP
description: The highest block timestamp in the test
tests:
- not_null
- dbt_expectations.expect_row_values_to_have_recent_data:
datepart: day
interval: 2
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_NTZ
- name: BLOCKS_TESTED
description: Count of blocks in the test
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- name: BLOCKS_IMPACTED_COUNT
description: Count of block gaps in the test
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- name: BLOCKS_IMPACTED_ARRAY
description: Array of affected blocks
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- ARRAY
- name: TEST_FAILURE_DETAILS
description: Array of details of the failure
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- ARRAY
- name: TEST_TIMESTAMP
description: When the test was run
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_NTZ

View File

@ -0,0 +1,161 @@
{{ config(
materialized = 'incremental',
unique_key = 'test_timestamp',
full_refresh = false
) }}
WITH summary_stats AS (
SELECT
MIN(block_id) AS min_block,
MAX(block_id) AS max_block,
MIN(block_timestamp) AS min_block_timestamp,
MAX(block_timestamp) AS max_block_timestamp,
COUNT(1) AS blocks_tested
FROM
{{ ref('silver__blocks') }}
WHERE
block_timestamp <= DATEADD('hour', -12, CURRENT_TIMESTAMP())
{% if is_incremental() %}
AND (
block_id >= (
SELECT
MIN(block_id)
FROM
(
SELECT
MIN(block_id) AS block_id
FROM
{{ ref('silver__blocks') }}
WHERE
block_timestamp BETWEEN DATEADD('hour', -96, CURRENT_TIMESTAMP())
AND DATEADD('hour', -95, CURRENT_TIMESTAMP())
UNION
SELECT
MIN(VALUE) - 1 AS block_id
FROM
(
SELECT
blocks_impacted_array
FROM
{{ this }}
qualify ROW_NUMBER() over (
ORDER BY
test_timestamp DESC
) = 1
),
LATERAL FLATTEN(
input => blocks_impacted_array
)
)
) {% if var('OBSERV_FULL_TEST') %}
OR block_id >= 0
{% endif %}
)
{% endif %}
),
base_blocks AS (
SELECT
*
FROM
{{ ref('silver__blocks') }}
WHERE
block_id BETWEEN (
SELECT
min_block
FROM
summary_stats
)
AND (
SELECT
max_block
FROM
summary_stats
)
),
base_txs AS (
SELECT
block_id,
tx_id
FROM
{{ ref('silver__votes') }}
WHERE
block_id BETWEEN (
SELECT
min_block
FROM
summary_stats
)
AND (
SELECT
max_block
FROM
summary_stats
)
UNION
SELECT
block_id,
tx_id
FROM
{{ ref('silver__transactions') }}
WHERE
block_id BETWEEN (
SELECT
min_block
FROM
summary_stats
)
AND (
SELECT
max_block
FROM
summary_stats
)
),
potential_missing_txs AS (
SELECT
base_blocks.*
FROM
base_blocks
LEFT OUTER JOIN base_txs
ON base_blocks.block_id = base_txs.block_id
WHERE
base_txs.block_id IS NULL
),
broken_blocks AS (
SELECT
m.block_id
FROM
potential_missing_txs m
LEFT OUTER JOIN {{ ref('streamline__complete_block_txs') }}
cmp
ON m.block_id = cmp.block_id
WHERE
cmp.error IS NOT NULL
OR cmp.block_id IS NULL
),
impacted_blocks AS (
SELECT
COUNT(1) AS blocks_impacted_count,
ARRAY_AGG(block_id) within GROUP (
ORDER BY
block_id
) AS blocks_impacted_array
FROM
broken_blocks
)
SELECT
'transactions' AS test_name,
min_block,
max_block,
min_block_timestamp,
max_block_timestamp,
blocks_tested,
blocks_impacted_count,
blocks_impacted_array,
CURRENT_TIMESTAMP() AS test_timestamp
FROM
summary_stats
JOIN impacted_blocks
ON 1 = 1

View File

@ -0,0 +1,70 @@
version: 2
models:
- name: silver_observability__transactions_completeness
description: Records of all blocks with missing transactions with a timestamp the test was run
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- TEST_TIMESTAMP
columns:
- name: TEST_NAME
description: Name for the test
- name: MIN_BLOCK
description: The lowest block id in the test
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- name: MAX_BLOCK
description: The highest block id in the test
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- name: MIN_BLOCK_TIMESTAMP
description: The lowest block timestamp in the test
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_NTZ
- name: MAX_BLOCK_TIMESTAMP
description: The highest block timestamp in the test
tests:
- not_null
- dbt_expectations.expect_row_values_to_have_recent_data:
datepart: day
interval: 2
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_NTZ
- name: BLOCKS_TESTED
description: Count of blocks in the test
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- name: BLOCKS_IMPACTED_COUNT
description: Count of block gaps in the test
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- name: BLOCKS_IMPACTED_ARRAY
description: Array of affected blocks
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- ARRAY
- name: TEST_TIMESTAMP
description: When the test was run
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_NTZ

View File

@ -23,6 +23,7 @@ sources:
- name: hourly_prices_coin_gecko
- name: coin_market_cap_cryptocurrency_info
- name: apis_keys
- name: number_sequence
- name: shared
database: flipside_prod_db