This commit is contained in:
Eric Laurello 2023-08-21 13:39:16 -04:00
parent 5787b7f03d
commit 27f7f34d89
7 changed files with 504 additions and 7 deletions

View File

@ -45,4 +45,5 @@ vars:
"dbt_date:time_zone": GMT
STREAMLINE_INVOKE_STREAMS: True
STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES: False
"UPDATE_SNOWFLAKE_TAGS": True
"UPDATE_SNOWFLAKE_TAGS": True
OBSERV_FULL_TEST: False

View File

@ -1,6 +1,6 @@
{{ config(
materialized = 'incremental',
unique_key = 'tx_id',
unique_key = ['block_id','tx_id'],
cluster_by = ['_inserted_timestamp::date'],
merge_update_columns = ["block_id"],
) }}
@ -27,12 +27,12 @@ WHERE
LEAST(
registered_on,
last_modified
) >= (
) >= dateadd(day,-2,(
SELECT
COALESCE(MAX(_INSERTED_TIMESTAMP), '1970-01-01' :: DATE) max_INSERTED_TIMESTAMP
COALESCE(MAX(_INSERTED_TIMESTAMP), '1970-01-01' :: DATE) max_INSERTED_TIMESTAMP
FROM
{{ this }})
)
))
{% else %}
)
{% endif %}
@ -55,6 +55,6 @@ FROM
JOIN meta m
ON m.file_name = metadata$filename
WHERE
DATA: error IS NULL qualify(ROW_NUMBER() over (PARTITION BY tx_hash :: STRING
DATA: error IS NULL qualify(ROW_NUMBER() over (PARTITION BY block_number, tx_hash :: STRING
ORDER BY
_inserted_timestamp DESC)) = 1
_inserted_timestamp DESC)) = 1

View File

@ -0,0 +1,132 @@
{{ config(
materialized = 'incremental',
full_refresh = false
) }}
WITH source AS (
SELECT
block_id,
block_timestamp,
LAG(
block_id,
1
) over (
ORDER BY
block_id ASC
) AS prev_BLOCK_ID
FROM
{{ ref('silver__blocks') }} A
WHERE
block_timestamp < DATEADD(
HOUR,
-24,
SYSDATE()
)
{% if is_incremental() %}
AND (
block_timestamp >= DATEADD(
HOUR,
-96,(
SELECT
MAX(
max_block_timestamp
)
FROM
{{ this }}
)
)
OR ({% if var('OBSERV_FULL_TEST') %}
block_id >= 0
{% else %}
block_id >= (
SELECT
MIN(VALUE) - 1
FROM
(
SELECT
blocks_impacted_array
FROM
{{ this }}
qualify ROW_NUMBER() over (
ORDER BY
test_timestamp DESC) = 1), LATERAL FLATTEN(input => blocks_impacted_array))
{% endif %})
)
{% endif %}
),
block_gen AS (
SELECT
_id AS block_id
FROM
{{ source(
'crosschain_silver',
'number_sequence'
) }}
WHERE
_id BETWEEN (
SELECT
MIN(block_id)
FROM
source
)
AND (
SELECT
MAX(block_id)
FROM
source
)
)
SELECT
'blocks' AS test_name,
MIN(
b.block_id
) AS min_block,
MAX(
b.block_id
) AS max_block,
MIN(
b.block_timestamp
) AS min_block_timestamp,
MAX(
b.block_timestamp
) AS max_block_timestamp,
COUNT(1) AS blocks_tested,
COUNT(
CASE
WHEN C.block_id IS NOT NULL THEN A.block_id
END
) AS blocks_impacted_count,
ARRAY_AGG(
CASE
WHEN C.block_id IS NOT NULL THEN A.block_id
END
) within GROUP (
ORDER BY
A.block_id
) AS blocks_impacted_array,
ARRAY_AGG(
DISTINCT CASE
WHEN C.block_id IS NOT NULL THEN OBJECT_CONSTRUCT(
'prev_block_id',
C.prev_block_id,
'block_id',
C.block_id
)
END
) AS test_failure_details,
SYSDATE() AS test_timestamp
FROM
block_gen A
LEFT JOIN source b
ON A.block_id = b.block_id
LEFT JOIN source C
ON A.block_id > C.prev_BLOCK_ID
AND A.block_id < C.block_id
AND C.block_id - C.prev_BLOCK_ID <> 1
WHERE
COALESCE(
b.block_id,
C.block_id
) IS NOT NULL

View File

@ -0,0 +1,75 @@
version: 2
models:
- name: silver_observability__blocks_completeness
description: Records of all blocks block gaps (missing blocks) with a timestamp the test was run
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- TEST_TIMESTAMP
columns:
- name: MIN_BLOCK
description: The lowest block id in the test
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- name: MAX_BLOCK
description: The highest block id in the test
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- name: MIN_BLOCK_TIMESTAMP
description: The lowest block timestamp in the test
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_NTZ
- name: MAX_BLOCK_TIMESTAMP
description: The highest block timestamp in the test
tests:
- not_null
- dbt_expectations.expect_row_values_to_have_recent_data:
datepart: day
interval: 2
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_NTZ
- name: BLOCKS_TESTED
description: Count of blocks in the test
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- name: BLOCKS_IMPACTED_COUNT
description: Count of block gaps in the test
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- name: BLOCKS_IMPACTED_ARRAY
description: Array of affected blocks
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- ARRAY
- name: TEST_FAILURE_DETAILS
description: Array of details of the failure
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- ARRAY
- name: TEST_TIMESTAMP
description: When the test was run
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_NTZ

View File

@ -0,0 +1,202 @@
{{ config(
materialized = 'incremental',
full_refresh = false
) }}
WITH rel_blocks AS (
SELECT
block_id,
block_timestamp
FROM
{{ ref('silver__blocks') }}
WHERE
block_timestamp < DATEADD(
HOUR,
-24,
SYSDATE()
)
{% if is_incremental() %}
AND (
block_timestamp >= DATEADD(
HOUR,
-96,(
SELECT
MAX(
max_block_timestamp
)
FROM
{{ this }}
)
)
OR ({% if var('OBSERV_FULL_TEST') %}
block_id >= 0
{% else %}
block_id >= (
SELECT
MIN(VALUE) - 1
FROM
(
SELECT
blocks_impacted_array
FROM
{{ this }}
qualify ROW_NUMBER() over (
ORDER BY
test_timestamp DESC) = 1), LATERAL FLATTEN(input => blocks_impacted_array))
{% endif %})
)
{% endif %}
),
bronze AS (
SELECT
A.block_id,
b.block_timestamp,
A.tx_id
FROM
{{ ref('bronze__transactions') }} A
JOIN rel_blocks b
ON A.block_id = b.block_id
{% if is_incremental() %}
WHERE
A._inserted_timestamp >= CURRENT_DATE - 14
OR {% if var('OBSERV_FULL_TEST') %}
1 = 1
{% else %}
(
SELECT
MIN(VALUE) - 1
FROM
(
SELECT
blocks_impacted_array
FROM
{{ this }}
qualify ROW_NUMBER() over (
ORDER BY
test_timestamp DESC
) = 1
),
LATERAL FLATTEN(
input => blocks_impacted_array
)
) IS NOT NULL
{% endif %}
{% endif %}
qualify(ROW_NUMBER() over(PARTITION BY A.block_id, tx_id
ORDER BY
A._inserted_timestamp DESC) = 1)
),
bronze_count AS (
SELECT
block_id,
block_timestamp,
COUNT(
DISTINCT tx_id
) AS num_txs
FROM
bronze
GROUP BY
block_id,
block_timestamp
),
bronze_api AS (
SELECT
block_id,
block_timestamp,
num_txs
FROM
{{ ref('silver__blockchain') }}
WHERE
block_timestamp BETWEEN (
SELECT
MIN(block_timestamp)
FROM
rel_blocks
)
AND (
SELECT
MAX(block_timestamp)
FROM
rel_blocks
)
)
SELECT
'transactions' AS test_name,
MIN(
A.block_id
) AS min_block,
MAX(
A.block_id
) AS max_block,
MIN(
A.block_timestamp
) AS min_block_timestamp,
MAX(
A.block_timestamp
) AS max_block_timestamp,
COUNT(1) AS blocks_tested,
SUM(
CASE
WHEN COALESCE(
b.num_txs,
0
) - A.num_txs <> 0 THEN 1
ELSE 0
END
) AS blocks_impacted_count,
ARRAY_AGG(
CASE
WHEN COALESCE(
b.num_txs,
0
) - A.num_txs <> 0 THEN A.block_id
END
) within GROUP (
ORDER BY
A.block_id
) AS blocks_impacted_array,
SUM(
ABS(
COALESCE(
b.num_txs,
0
) - A.num_txs
)
) AS transactions_impacted_count,
ARRAY_AGG(
CASE
WHEN COALESCE(
b.num_txs,
0
) - A.num_txs <> 0 THEN OBJECT_CONSTRUCT(
'block',
A.block_id,
'block_timestamp',
A.block_timestamp,
'diff',
COALESCE(
b.num_txs,
0
) - A.num_txs,
'blockchain_num_txs',
A.num_txs,
'bronze_num_txs',
COALESCE(
b.num_txs,
0
)
)
END
) within GROUP(
ORDER BY
A.block_id
) AS test_failure_details,
SYSDATE() AS test_timestamp
FROM
bronze_api A
LEFT JOIN bronze_count b
ON A.block_id = b.block_id

View File

@ -0,0 +1,82 @@
version: 2
models:
- name: silver_observability__transactions_completeness
description: Records of all blocks with missing transactions with a timestamp the test was run
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- TEST_TIMESTAMP
columns:
- name: MIN_BLOCK
description: The lowest block id in the test
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- name: MAX_BLOCK
description: The highest block id in the test
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- name: MIN_BLOCK_TIMESTAMP
description: The lowest block timestamp in the test
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_NTZ
- name: MAX_BLOCK_TIMESTAMP
description: The highest block timestamp in the test
tests:
- not_null
- dbt_expectations.expect_row_values_to_have_recent_data:
datepart: day
interval: 2
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_NTZ
- name: BLOCKS_TESTED
description: Count of blocks in the test
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- name: BLOCKS_IMPACTED_COUNT
description: Count of blocks with missing transactions in the test
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- name: BLOCKS_IMPACTED_ARRAY
description: Array of affected blocks
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- ARRAY
- name: TRANSACTIONS_IMPACTED_COUNT
description: Total count of missing transactions in the test
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- name: TEST_FAILURE_DETAILS
description: blocks with missing transactions with the number of missing transactions
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- ARRAY
- name: TEST_TIMESTAMP
description: When the test was run
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_NTZ

View File

@ -21,6 +21,11 @@ sources:
tables:
- name: address_labels
- name: dim_dates
- name: crosschain_silver
database: "{{ 'crosschain' if target.database == 'AXELAR' else 'crosschain_dev' }}"
schema: silver
tables:
- name: number_sequence
- name: bronze_streamline
database: streamline
schema: |