An 3414/data observability (#112)

* stash

* workflow adjustments

* threads

* better logic

* updates

* config
This commit is contained in:
Austin 2023-07-12 17:57:12 -04:00 committed by GitHub
parent 8b53882b47
commit b398689f3c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
26 changed files with 918 additions and 23 deletions

View File

@ -0,0 +1,47 @@
name: dbt_run_full_observability
run-name: dbt_run_full_observability
on:
workflow_dispatch:
schedule:
# Runs “At 08:00 on day-of-month 1.” (see https://crontab.guru)
- cron: '0 8 1 * *'
env:
DBT_PROFILES_DIR: ./
ACCOUNT: "${{ vars.ACCOUNT }}"
ROLE: "${{ vars.ROLE }}"
USER: "${{ vars.USER }}"
PASSWORD: "${{ secrets.PASSWORD }}"
REGION: "${{ vars.REGION }}"
DATABASE: "${{ vars.DATABASE }}"
WAREHOUSE: "${{ vars.WAREHOUSE }}"
SCHEMA: "${{ vars.SCHEMA }}"
concurrency:
group: ${{ github.workflow }}
jobs:
run_dbt_jobs:
runs-on: ubuntu-latest
environment:
name: workflow_prod_2xl
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v1
with:
python-version: "3.7.x"
- name: install dependencies
run: |
pip3 install dbt-snowflake==${{ vars.DBT_VERSION }} cli_passthrough requests click
dbt deps
- name: Run DBT Jobs
run: |
dbt run --threads 2 --vars '{"OBSERV_FULL_TEST":True}' -m models/silver/_observability

View File

@ -4,8 +4,8 @@ run-name: dbt_run_scheduled
on:
workflow_dispatch:
schedule:
# Runs every "At minute 15.” (see https://crontab.guru)
- cron: '15 * * * *'
# Runs every "At minute 25.” (see https://crontab.guru)
- cron: '25 * * * *'
env:
DBT_PROFILES_DIR: ./
@ -41,5 +41,5 @@ jobs:
dbt deps
- name: Run DBT Jobs
run: |
dbt run --exclude models/silver/abis models/silver/api_udf models/silver/streamline models/silver/silver__decoded_logs.sql models/silver/core/tests models/silver/silver__decoded_logs_legacy.sql
dbt run --exclude models/silver/abis models/silver/api_udf models/silver/streamline models/silver/silver__decoded_logs.sql models/silver/core/tests models/silver/_observability
dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True}' -m models/silver/streamline/decoder/streamline__decode_logs_realtime.sql models/silver/streamline/decoder/streamline__complete_decode_logs.sql

View File

@ -4,8 +4,8 @@ run-name: dbt_run_streamline_decoder
on:
workflow_dispatch:
schedule:
# Runs “At minute 35 past every 2nd hour.” (see https://crontab.guru)
- cron: '35 */2 * * *'
# Runs “At minute 40” (see https://crontab.guru)
- cron: '40 * * * *'
env:
DBT_PROFILES_DIR: ./

View File

@ -4,8 +4,8 @@ run-name: dbt_run_streamline_realtime
on:
workflow_dispatch:
schedule:
# Runs “At every 30th minute.” (see https://crontab.guru)
- cron: '*/30 * * * *'
# Runs “At minutes 10 and 40.” (see https://crontab.guru)
- cron: '10,40 * * * *'
env:
DBT_PROFILES_DIR: ./

View File

@ -4,8 +4,8 @@ run-name: dbt_test_intraday
on:
workflow_dispatch:
schedule:
# Runs “At minute 45 past every 4th hour.” (see https://crontab.guru)
- cron: '45 */4 * * *'
# Runs “At minute 50 past every 4th hour.” (see https://crontab.guru)
- cron: '50 */4 * * *'
env:
DBT_PROFILES_DIR: ./
@ -42,6 +42,7 @@ jobs:
- name: Run DBT Jobs
run: |
dbt test -m tag:recent_test
dbt run -m models/silver/_observability

View File

@ -48,4 +48,5 @@ vars:
STREAMLINE_RUN_HISTORY: False
UPDATE_UDFS_AND_SPS: False
UPDATE_SNOWFLAKE_TAGS: True
WAIT: 0
WAIT: 0
OBSERV_FULL_TEST: False

View File

@ -61,3 +61,41 @@ WHERE
model_tx_hash IS NULL
OR model_block_number IS NULL
{% endmacro %}
{% macro missing_confirmed_txs(
model1,
model2
) %}
WITH txs_base AS (
SELECT
block_number AS base_block_number,
block_hash AS base_block_hash,
tx_hash AS base_tx_hash
FROM
{{ model1 }}
),
model_name AS (
SELECT
block_number AS model_block_number,
block_hash AS model_block_hash,
tx_hash AS model_tx_hash
FROM
{{ model2 }}
)
SELECT
DISTINCT base_block_number AS block_number
FROM
txs_base
LEFT JOIN model_name
ON base_block_number = model_block_number
AND base_tx_hash = model_tx_hash
AND base_block_hash = model_block_hash
WHERE
model_tx_hash IS NULL
AND model_block_number <= (
SELECT
MAX(base_block_number)
FROM
txs_base
)
{% endmacro %}

View File

@ -0,0 +1,166 @@
{{ config(
materialized = 'incremental',
unique_key = 'test_timestamp',
full_refresh = false
) }}
WITH summary_stats AS (
SELECT
MIN(block_number) AS min_block,
MAX(block_number) AS max_block,
MIN(block_timestamp) AS min_block_timestamp,
MAX(block_timestamp) AS max_block_timestamp,
COUNT(1) AS blocks_tested
FROM
{{ ref('silver__blocks') }}
WHERE
block_timestamp <= DATEADD('hour', -12, CURRENT_TIMESTAMP())
{% if is_incremental() %}
AND (
block_number >= (
SELECT
MIN(block_number)
FROM
(
SELECT
MIN(block_number) AS block_number
FROM
{{ ref('silver__blocks') }}
WHERE
block_timestamp BETWEEN DATEADD('hour', -96, CURRENT_TIMESTAMP())
AND DATEADD('hour', -95, CURRENT_TIMESTAMP())
UNION
SELECT
MIN(VALUE) - 1 AS block_number
FROM
(
SELECT
blocks_impacted_array
FROM
{{ this }}
qualify ROW_NUMBER() over (
ORDER BY
test_timestamp DESC
) = 1
),
LATERAL FLATTEN(
input => blocks_impacted_array
)
)
) {% if var('OBSERV_FULL_TEST') %}
OR block_number >= 0
{% endif %}
)
{% endif %}
),
block_range AS (
SELECT
_id AS block_number
FROM
{{ source(
'crosschain_silver',
'number_sequence'
) }}
WHERE
_id BETWEEN (
SELECT
min_block
FROM
summary_stats
)
AND (
SELECT
max_block
FROM
summary_stats
)
),
blocks AS (
SELECT
l.block_number,
block_timestamp,
LAG(
l.block_number,
1
) over (
ORDER BY
l.block_number ASC
) AS prev_BLOCK_NUMBER
FROM
{{ ref("silver__blocks") }}
l
INNER JOIN block_range b
ON l.block_number = b.block_number
AND l.block_number >= (
SELECT
MIN(block_number)
FROM
block_range
)
),
block_gen AS (
SELECT
_id AS block_number
FROM
{{ source(
'crosschain_silver',
'number_sequence'
) }}
WHERE
_id BETWEEN (
SELECT
MIN(block_number)
FROM
blocks
)
AND (
SELECT
MAX(block_number)
FROM
blocks
)
)
SELECT
'blocks' AS test_name,
MIN(
b.block_number
) AS min_block,
MAX(
b.block_number
) AS max_block,
MIN(
b.block_timestamp
) AS min_block_timestamp,
MAX(
b.block_timestamp
) AS max_block_timestamp,
COUNT(1) AS blocks_tested,
COUNT(
CASE
WHEN C.block_number IS NOT NULL THEN A.block_number
END
) AS blocks_impacted_count,
ARRAY_AGG(
CASE
WHEN C.block_number IS NOT NULL THEN A.block_number
END
) within GROUP (
ORDER BY
A.block_number
) AS blocks_impacted_array,
CURRENT_TIMESTAMP AS test_timestamp
FROM
block_gen A
LEFT JOIN blocks b
ON A.block_number = b.block_number
LEFT JOIN blocks C
ON A.block_number > C.prev_block_number
AND A.block_number < C.block_number
AND C.block_number - C.prev_block_number <> 1
WHERE
COALESCE(
b.block_number,
C.block_number
) IS NOT NULL

View File

@ -0,0 +1,121 @@
{{ config(
materialized = 'incremental',
unique_key = 'test_timestamp',
full_refresh = false
) }}
WITH summary_stats AS (
SELECT
MIN(block_number) AS min_block,
MAX(block_number) AS max_block,
MIN(block_timestamp) AS min_block_timestamp,
MAX(block_timestamp) AS max_block_timestamp,
COUNT(1) AS blocks_tested
FROM
{{ ref('silver__blocks') }}
WHERE
block_timestamp <= DATEADD('hour', -12, CURRENT_TIMESTAMP())
{% if is_incremental() %}
AND (
block_number >= (
SELECT
MIN(block_number)
FROM
(
SELECT
MIN(block_number) AS block_number
FROM
{{ ref('silver__blocks') }}
WHERE
block_timestamp BETWEEN DATEADD('hour', -96, CURRENT_TIMESTAMP())
AND DATEADD('hour', -95, CURRENT_TIMESTAMP())
UNION
SELECT
MIN(VALUE) - 1 AS block_number
FROM
(
SELECT
blocks_impacted_array
FROM
{{ this }}
qualify ROW_NUMBER() over (
ORDER BY
test_timestamp DESC
) = 1
),
LATERAL FLATTEN(
input => blocks_impacted_array
)
)
) {% if var('OBSERV_FULL_TEST') %}
OR block_number >= 0
{% endif %}
)
{% endif %}
),
block_range AS (
SELECT
_id AS block_number
FROM
{{ source(
'crosschain_silver',
'number_sequence'
) }}
WHERE
_id BETWEEN (
SELECT
min_block
FROM
summary_stats
)
AND (
SELECT
max_block
FROM
summary_stats
)
),
broken_blocks AS (
SELECT
DISTINCT block_number
FROM
{{ ref("silver__receipts") }}
r
LEFT JOIN {{ ref("silver__logs") }}
l USING (
block_number,
tx_hash
)
JOIN block_range USING (block_number)
WHERE
l.tx_hash IS NULL
AND ARRAY_SIZE(
r.logs
) > 0
),
impacted_blocks AS (
SELECT
COUNT(1) AS blocks_impacted_count,
ARRAY_AGG(block_number) within GROUP (
ORDER BY
block_number
) AS blocks_impacted_array
FROM
broken_blocks
)
SELECT
'event_logs' AS test_name,
min_block,
max_block,
min_block_timestamp,
max_block_timestamp,
blocks_tested,
blocks_impacted_count,
blocks_impacted_array,
CURRENT_TIMESTAMP() AS test_timestamp
FROM
summary_stats
JOIN impacted_blocks
ON 1 = 1

View File

@ -0,0 +1,119 @@
{{ config(
materialized = 'incremental',
unique_key = 'test_timestamp',
full_refresh = false
) }}
WITH summary_stats AS (
SELECT
MIN(block_number) AS min_block,
MAX(block_number) AS max_block,
MIN(block_timestamp) AS min_block_timestamp,
MAX(block_timestamp) AS max_block_timestamp,
COUNT(1) AS blocks_tested
FROM
{{ ref('silver__blocks') }}
WHERE
block_timestamp <= DATEADD('hour', -12, CURRENT_TIMESTAMP())
{% if is_incremental() %}
AND (
block_number >= (
SELECT
MIN(block_number)
FROM
(
SELECT
MIN(block_number) AS block_number
FROM
{{ ref('silver__blocks') }}
WHERE
block_timestamp BETWEEN DATEADD('hour', -96, CURRENT_TIMESTAMP())
AND DATEADD('hour', -95, CURRENT_TIMESTAMP())
UNION
SELECT
MIN(VALUE) - 1 AS block_number
FROM
(
SELECT
blocks_impacted_array
FROM
{{ this }}
qualify ROW_NUMBER() over (
ORDER BY
test_timestamp DESC
) = 1
),
LATERAL FLATTEN(
input => blocks_impacted_array
)
)
) {% if var('OBSERV_FULL_TEST') %}
OR block_number >= 0
{% endif %}
)
{% endif %}
),
block_range AS (
SELECT
_id AS block_number
FROM
{{ source(
'crosschain_silver',
'number_sequence'
) }}
WHERE
_id BETWEEN (
SELECT
min_block
FROM
summary_stats
)
AND (
SELECT
max_block
FROM
summary_stats
)
),
broken_blocks AS (
SELECT
DISTINCT block_number
FROM
{{ ref("silver__transactions") }}
t
LEFT JOIN {{ ref("silver__receipts") }}
r USING (
block_number,
tx_hash,
block_hash
)
JOIN block_range USING (block_number)
WHERE
r.tx_hash IS NULL
),
impacted_blocks AS (
SELECT
COUNT(1) AS blocks_impacted_count,
ARRAY_AGG(block_number) within GROUP (
ORDER BY
block_number
) AS blocks_impacted_array
FROM
broken_blocks
)
SELECT
'receipts' AS test_name,
min_block,
max_block,
min_block_timestamp,
max_block_timestamp,
blocks_tested,
blocks_impacted_count,
blocks_impacted_array,
CURRENT_TIMESTAMP() AS test_timestamp
FROM
summary_stats
JOIN impacted_blocks
ON 1 = 1

View File

@ -0,0 +1,117 @@
{{ config(
materialized = 'incremental',
unique_key = 'test_timestamp'
) }}
WITH summary_stats AS (
SELECT
MIN(block_number) AS min_block,
MAX(block_number) AS max_block,
MIN(block_timestamp) AS min_block_timestamp,
MAX(block_timestamp) AS max_block_timestamp,
COUNT(1) AS blocks_tested
FROM
{{ ref('silver__blocks') }}
WHERE
block_timestamp <= DATEADD('hour', -12, CURRENT_TIMESTAMP())
{% if is_incremental() %}
AND (
block_number >= (
SELECT
MIN(block_number)
FROM
(
SELECT
MIN(block_number) AS block_number
FROM
{{ ref('silver__blocks') }}
WHERE
block_timestamp BETWEEN DATEADD('hour', -96, CURRENT_TIMESTAMP())
AND DATEADD('hour', -95, CURRENT_TIMESTAMP())
UNION
SELECT
MIN(VALUE) - 1 AS block_number
FROM
(
SELECT
blocks_impacted_array
FROM
{{ this }}
qualify ROW_NUMBER() over (
ORDER BY
test_timestamp DESC
) = 1
),
LATERAL FLATTEN(
input => blocks_impacted_array
)
)
) {% if var('OBSERV_FULL_TEST') %}
OR block_number >= 0
{% endif %}
)
{% endif %}
),
block_range AS (
SELECT
_id AS block_number
FROM
{{ source(
'crosschain_silver',
'number_sequence'
) }}
WHERE
_id BETWEEN (
SELECT
min_block
FROM
summary_stats
)
AND (
SELECT
max_block
FROM
summary_stats
)
),
broken_blocks AS (
SELECT
DISTINCT block_number
FROM
{{ ref("silver__transactions") }}
tx
LEFT JOIN {{ ref("silver__traces") }}
tr USING (
block_number,
tx_hash
)
JOIN block_range USING (block_number)
WHERE
tr.tx_hash IS NULL
),
impacted_blocks AS (
SELECT
COUNT(1) AS blocks_impacted_count,
ARRAY_AGG(block_number) within GROUP (
ORDER BY
block_number
) AS blocks_impacted_array
FROM
broken_blocks
)
SELECT
'traces' AS test_name,
min_block,
max_block,
min_block_timestamp,
max_block_timestamp,
blocks_tested,
blocks_impacted_count,
blocks_impacted_array,
CURRENT_TIMESTAMP() AS test_timestamp
FROM
summary_stats
JOIN impacted_blocks
ON 1 = 1

View File

@ -0,0 +1,119 @@
{{ config(
materialized = 'incremental',
unique_key = 'test_timestamp',
full_refresh = false
) }}
WITH summary_stats AS (
SELECT
MIN(block_number) AS min_block,
MAX(block_number) AS max_block,
MIN(block_timestamp) AS min_block_timestamp,
MAX(block_timestamp) AS max_block_timestamp,
COUNT(1) AS blocks_tested
FROM
{{ ref('silver__blocks') }}
WHERE
block_timestamp <= DATEADD('hour', -12, CURRENT_TIMESTAMP())
{% if is_incremental() %}
AND (
block_number >= (
SELECT
MIN(block_number)
FROM
(
SELECT
MIN(block_number) AS block_number
FROM
{{ ref('silver__blocks') }}
WHERE
block_timestamp BETWEEN DATEADD('hour', -96, CURRENT_TIMESTAMP())
AND DATEADD('hour', -95, CURRENT_TIMESTAMP())
UNION
SELECT
MIN(VALUE) - 1 AS block_number
FROM
(
SELECT
blocks_impacted_array
FROM
{{ this }}
qualify ROW_NUMBER() over (
ORDER BY
test_timestamp DESC
) = 1
),
LATERAL FLATTEN(
input => blocks_impacted_array
)
)
) {% if var('OBSERV_FULL_TEST') %}
OR block_number >= 0
{% endif %}
)
{% endif %}
),
block_range AS (
SELECT
_id AS block_number
FROM
{{ source(
'crosschain_silver',
'number_sequence'
) }}
WHERE
_id BETWEEN (
SELECT
min_block
FROM
summary_stats
)
AND (
SELECT
max_block
FROM
summary_stats
)
),
broken_blocks AS (
SELECT
DISTINCT block_number
FROM
{{ ref("silver__confirmed_blocks") }}
b
LEFT JOIN {{ ref("silver__transactions") }}
t USING (
block_number,
tx_hash,
block_hash
)
JOIN block_range USING (block_number)
WHERE
t.tx_hash IS NULL
),
impacted_blocks AS (
SELECT
COUNT(1) AS blocks_impacted_count,
ARRAY_AGG(block_number) within GROUP (
ORDER BY
block_number
) AS blocks_impacted_array
FROM
broken_blocks
)
SELECT
'transactions' AS test_name,
min_block,
max_block,
min_block_timestamp,
max_block_timestamp,
blocks_tested,
blocks_impacted_count,
blocks_impacted_array,
CURRENT_TIMESTAMP() AS test_timestamp
FROM
summary_stats
JOIN impacted_blocks
ON 1 = 1

View File

@ -0,0 +1,9 @@
{{ config (
materialized = 'view',
tags = ['full_test']
) }}
SELECT
*
FROM
{{ ref('silver__confirmed_blocks') }}

View File

@ -0,0 +1,34 @@
version: 2
models:
- name: test_silver__confirmed_blocks_full
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- TX_HASH
columns:
- name: BLOCK_NUMBER
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- name: BLOCK_HASH
tests:
- not_null
- dbt_expectations.expect_column_values_to_match_regex:
regex: 0[xX][0-9a-fA-F]+
- name: TX_HASH
tests:
- not_null
- dbt_expectations.expect_column_values_to_match_regex:
regex: 0[xX][0-9a-fA-F]+
- name: _INSERTED_TIMESTAMP
tests:
- not_null
- dbt_expectations.expect_row_values_to_have_recent_data:
datepart: day
interval: 1
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_NTZ
- TIMESTAMP_LTZ

View File

@ -0,0 +1,27 @@
{{ config (
materialized = 'view',
tags = ['recent_test']
) }}
WITH last_3_days AS (
SELECT
block_number
FROM
{{ ref("_max_block_by_date") }}
qualify ROW_NUMBER() over (
ORDER BY
block_number DESC
) = 3
)
SELECT
*
FROM
{{ ref('silver__confirmed_blocks') }}
WHERE
block_number >= (
SELECT
block_number
FROM
last_3_days
)

View File

@ -0,0 +1,34 @@
version: 2
models:
- name: test_silver__confirmed_blocks_recent
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- TX_HASH
columns:
- name: BLOCK_NUMBER
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- name: BLOCK_HASH
tests:
- not_null
- dbt_expectations.expect_column_values_to_match_regex:
regex: 0[xX][0-9a-fA-F]+
- name: TX_HASH
tests:
- not_null
- dbt_expectations.expect_column_values_to_match_regex:
regex: 0[xX][0-9a-fA-F]+
- name: _INSERTED_TIMESTAMP
tests:
- not_null
- dbt_expectations.expect_row_values_to_have_recent_data:
datepart: hour
interval: 3
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_NTZ
- TIMESTAMP_LTZ

View File

@ -15,7 +15,7 @@ WITH base AS (
WHERE
block_timestamp > DATEADD(
'day',
-3,
-5,
CURRENT_DATE
)
GROUP BY

View File

@ -47,6 +47,11 @@ retry_blocks AS (
block_number
FROM
{{ ref("_missing_txs") }}
UNION
SELECT
block_number
FROM
{{ ref("_unconfirmed_blocks") }}
)
)
SELECT

View File

@ -63,6 +63,11 @@ retry_blocks AS (
block_number
FROM
{{ ref("_missing_traces") }}
UNION
SELECT
block_number
FROM
{{ ref("_unconfirmed_blocks") }}
)
)
SELECT

View File

@ -40,6 +40,11 @@ retry_blocks AS (
block_number
FROM
{{ ref("_missing_txs") }}
UNION
SELECT
block_number
FROM
{{ ref("_unconfirmed_blocks") }}
)
)
SELECT

View File

@ -2,19 +2,31 @@
materialized = "ephemeral"
) }}
WITH lookback AS (
SELECT
MAX(block_number) AS block_number
FROM
{{ ref("silver__blocks") }}
WHERE
block_timestamp :: DATE = CURRENT_DATE() - 3
)
SELECT
DISTINCT tx.block_number AS block_number
DISTINCT t.block_number AS block_number
FROM
{{ ref("silver__transactions") }}
tx
t
LEFT JOIN {{ ref("silver__receipts") }}
r
ON tx.block_number = r.block_number
AND tx.tx_hash = r.tx_hash
WHERE
tx.block_timestamp >= DATEADD(
'day',
-2,
CURRENT_DATE
r USING (
block_number,
block_hash,
tx_hash
)
WHERE
r.tx_hash IS NULL
AND t.block_number >= (
SELECT
block_number
FROM
lookback
)
AND r.tx_hash IS NULL

View File

@ -3,7 +3,7 @@
) }}
SELECT
DISTINCT tx.block_number AS block_number
DISTINCT tx.block_number block_number
FROM
{{ ref("silver__transactions") }}
tx

View File

@ -0,0 +1,32 @@
{{ config (
materialized = "ephemeral"
) }}
WITH lookback AS (
SELECT
MAX(block_number) AS block_number
FROM
{{ ref("silver__blocks") }}
WHERE
block_timestamp :: DATE = CURRENT_DATE() - 3
)
SELECT
DISTINCT cb.block_number AS block_number
FROM
{{ ref("silver__confirmed_blocks") }}
cb
LEFT JOIN {{ ref("silver__transactions") }}
txs USING (
block_number,
block_hash,
tx_hash
)
WHERE
txs.tx_hash IS NULL
AND cb.block_number >= (
SELECT
block_number
FROM
lookback
)

View File

@ -24,6 +24,7 @@ sources:
- name: apis_keys
- name: token_prices_priority_hourly
- name: asset_metadata_priority
- name: number_sequence
- name: crosschain_public
database: crosschain
schema: bronze_public

View File

@ -0,0 +1 @@
{{ missing_confirmed_txs(ref("test_silver__confirmed_blocks_full"), ref("test_silver__transactions_full")) }}

View File

@ -0,0 +1 @@
{{ missing_confirmed_txs(ref("test_silver__confirmed_blocks_recent"), ref("test_silver__transactions_recent")) }}