Merge main into add-prices branch

This commit is contained in:
Mike Stepanovic 2025-04-07 13:57:58 -06:00
commit 114689da5b
8 changed files with 450 additions and 15 deletions

View File

@ -1,13 +1,14 @@
name: dbt_run_noncore
run-name: dbt_run_noncore
name: dbt_run_observability
run-name: dbt_run_observability
on:
workflow_dispatch:
schedule:
- cron: "25,55 0/4 * * *"
# Runs “At minute 0 past every 8th hour.” (see https://crontab.guru)
- cron: '0 */8 * * *'
env:
DBT_PROFILES_DIR: "${{ vars.DBT_PROFILES_DIR }}"
DBT_PROFILES_DIR: ./
ACCOUNT: "${{ vars.ACCOUNT }}"
ROLE: "${{ vars.ROLE }}"
@ -41,4 +42,4 @@ jobs:
dbt deps
- name: Run DBT Jobs
run: |
dbt run -m movement_models,tag:noncore
dbt run -s tag:observability

View File

@ -0,0 +1,130 @@
version: 2
models:
- name: silver_observability__blocks_completeness
description: Records of all blocks block gaps (missing blocks) with a timestamp the test was run
config:
contract:
enforced: true
tests:
- dbt_utils.recency:
datepart: day
field: TEST_TIMESTAMP
interval: 2
severity: error
tags: ['test_recency']
columns:
- name: TEST_NAME
data_type: VARCHAR
description: Name for the test
- name: MIN_BLOCK
data_type: NUMBER
description: The lowest block id in the test
tests:
- not_null:
tags: ['test_quality']
- name: MAX_BLOCK
data_type: NUMBER
description: The highest block id in the test
tests:
- not_null:
tags: ['test_quality']
- name: MIN_BLOCK_TIMESTAMP
data_type: TIMESTAMP_NTZ
description: The lowest block timestamp in the test
tests:
- not_null:
tags: ['test_quality']
- name: MAX_BLOCK_TIMESTAMP
data_type: TIMESTAMP_NTZ
description: The highest block timestamp in the test
tests:
- not_null:
tags: ['test_quality']
- name: BLOCKS_TESTED
data_type: NUMBER
description: Count of blocks in the test
tests:
- not_null:
tags: ['test_quality']
- name: BLOCKS_IMPACTED_COUNT
data_type: NUMBER
description: Count of block gaps in the test
tests:
- not_null:
tags: ['test_quality']
- name: BLOCKS_IMPACTED_ARRAY
data_type: ARRAY
description: Array of affected blocks
- name: TEST_TIMESTAMP
data_type: TIMESTAMP_NTZ
description: When the test was run
tests:
- not_null:
tags: ['test_quality']
- unique:
tags: ['test_quality']
- name: MODIFIED_TIMESTAMP
data_type: TIMESTAMP_NTZ
- name: silver_observability__transactions_completeness
description: Records of all blocks with missing transactions with a timestamp the test was run
tests:
- dbt_utils.recency:
datepart: day
field: TEST_TIMESTAMP
interval: 2
severity: error
tags: ['test_recency']
columns:
- name: TEST_NAME
data_type: VARCHAR
description: Name for the test
- name: MIN_BLOCK
data_type: NUMBER
description: The lowest block id in the test
tests:
- not_null:
tags: ['test_quality']
- name: MAX_BLOCK
data_type: NUMBER
description: The highest block id in the test
tests:
- not_null:
tags: ['test_quality']
- name: MIN_BLOCK_TIMESTAMP
data_type: TIMESTAMP_NTZ
description: The lowest block timestamp in the test
tests:
- not_null:
tags: ['test_quality']
- name: MAX_BLOCK_TIMESTAMP
data_type: TIMESTAMP_NTZ
description: The highest block timestamp in the test
tests:
- not_null:
tags: ['test_quality']
- name: BLOCKS_TESTED
data_type: NUMBER
description: Count of blocks in the test
tests:
- not_null:
tags: ['test_quality']
- name: BLOCKS_IMPACTED_COUNT
data_type: NUMBER
description: Count of block gaps in the test
tests:
- not_null:
tags: ['test_quality']
- name: BLOCKS_IMPACTED_ARRAY
data_type: ARRAY
description: Array of affected blocks
- name: TEST_TIMESTAMP
data_type: TIMESTAMP_NTZ
description: When the test was run
tests:
- not_null:
tags: ['test_quality']
- unique:
tags: ['test_quality']
- name: MODIFIED_TIMESTAMP
data_type: TIMESTAMP_NTZ

View File

@ -0,0 +1,169 @@
{{ config(
materialized = 'incremental',
unique_key = 'test_timestamp',
full_refresh = false,
tags = ['observability'],
enabled = true
) }}
WITH summary_stats AS (
SELECT
MIN(block_number) AS min_block,
MAX(block_number) AS max_block,
MIN(block_timestamp) AS min_block_timestamp,
MAX(block_timestamp) AS max_block_timestamp,
COUNT(1) AS blocks_tested
FROM
{{ ref('core__fact_blocks') }}
WHERE
block_timestamp <= DATEADD('hour', -12, CURRENT_TIMESTAMP())
{% if is_incremental() %}
AND (
block_number >= (
SELECT
MIN(block_number)
FROM
(
SELECT
MIN(block_number) AS block_number
FROM
{{ ref('core__fact_blocks') }}
WHERE
block_timestamp BETWEEN DATEADD('hour', -96, CURRENT_TIMESTAMP())
AND DATEADD('hour', -95, CURRENT_TIMESTAMP())
UNION
SELECT
MIN(VALUE) - 1 AS block_number
FROM
(
SELECT
blocks_impacted_array
FROM
{{ this }}
qualify ROW_NUMBER() over (
ORDER BY
test_timestamp DESC
) = 1
),
LATERAL FLATTEN(
input => blocks_impacted_array
)
)
) {% if var('OBSERV_FULL_TEST') %}
OR block_number >= 0
{% endif %}
)
{% endif %}
),
block_range AS (
SELECT
_id AS block_number
FROM
{{ source(
'crosschain_silver',
'number_sequence'
) }}
WHERE
_id BETWEEN (
SELECT
min_block
FROM
summary_stats
)
AND (
SELECT
max_block
FROM
summary_stats
)
),
blocks AS (
SELECT
l.block_number,
block_timestamp,
LAG(
l.block_number,
1
) over (
ORDER BY
l.block_number ASC
) AS prev_BLOCK_NUMBER
FROM
{{ ref("core__fact_blocks") }}
l
INNER JOIN block_range b
ON l.block_number = b.block_number
AND l.block_number >= (
SELECT
MIN(block_number)
FROM
block_range
)
),
block_gen AS (
SELECT
_id AS block_number
FROM
{{ source(
'crosschain_silver',
'number_sequence'
) }}
WHERE
_id BETWEEN (
SELECT
MIN(block_number)
FROM
blocks
)
AND (
SELECT
MAX(block_number)
FROM
blocks
)
)
SELECT
'blocks' AS test_name,
MIN(
b.block_number
) AS min_block,
MAX(
b.block_number
) AS max_block,
MIN(
b.block_timestamp
) AS min_block_timestamp,
MAX(
b.block_timestamp
) AS max_block_timestamp,
COUNT(1) AS blocks_tested,
COUNT(
CASE
WHEN C.block_number IS NOT NULL THEN A.block_number
END
) AS blocks_impacted_count,
ARRAY_AGG(
CASE
WHEN C.block_number IS NOT NULL THEN A.block_number
END
) within GROUP (
ORDER BY
A.block_number
) AS blocks_impacted_array,
SYSDATE() AS test_timestamp,
SYSDATE() AS modified_timestamp
FROM
block_gen A
LEFT JOIN blocks b
ON A.block_number = b.block_number
LEFT JOIN blocks C
ON A.block_number > C.prev_block_number
AND A.block_number < C.block_number
AND C.block_number - C.prev_block_number <> 1
WHERE
COALESCE(
b.block_number,
C.block_number
) IS NOT NULL

View File

@ -0,0 +1,142 @@
{{ config(
materialized = 'incremental',
unique_key = 'test_timestamp',
full_refresh = false,
tags = ['observability']
) }}
WITH summary_stats AS (
SELECT
MIN(block_number) AS min_block,
MAX(block_number) AS max_block,
MIN(block_timestamp) AS min_block_timestamp,
MAX(block_timestamp) AS max_block_timestamp,
COUNT(1) AS blocks_tested
FROM
{{ ref('core__fact_blocks') }}
WHERE
block_timestamp <= DATEADD('hour', -12, CURRENT_TIMESTAMP())
{% if is_incremental() %}
AND (
block_number >= (
SELECT
MIN(block_number)
FROM
(
SELECT
MIN(block_number) AS block_number
FROM
{{ ref('core__fact_blocks') }}
WHERE
block_timestamp BETWEEN DATEADD('hour', -96, CURRENT_TIMESTAMP())
AND DATEADD('hour', -95, CURRENT_TIMESTAMP())
UNION
SELECT
MIN(VALUE) - 1 AS block_number
FROM
(
SELECT
blocks_impacted_array
FROM
{{ this }}
qualify ROW_NUMBER() over (
ORDER BY
test_timestamp DESC
) = 1
),
LATERAL FLATTEN(
input => blocks_impacted_array
)
)
) {% if var('OBSERV_FULL_TEST') %}
OR block_number >= 0
{% endif %}
)
{% endif %}
),
base_blocks AS (
SELECT
block_number,
tx_count AS transaction_count
FROM
{{ ref('core__fact_blocks') }}
WHERE
block_number BETWEEN (
SELECT
min_block
FROM
summary_stats
)
AND (
SELECT
max_block
FROM
summary_stats
)
AND
block_number NOT IN (0, 1758, 1760, 1761, 1762, 1763, 1764, 1766)
),
actual_tx_counts AS (
SELECT
block_number,
COUNT(1) AS transaction_count
FROM
{{ ref('core__fact_transactions') }}
WHERE
block_number BETWEEN (
SELECT
min_block
FROM
summary_stats
)
AND (
SELECT
max_block
FROM
summary_stats
)
AND
block_number NOT IN (0, 1758, 1760, 1761, 1762, 1763, 1764, 1766)
GROUP BY
block_number
),
potential_missing_txs AS (
SELECT
e.block_number
FROM
base_blocks e
LEFT OUTER JOIN actual_tx_counts A
ON e.block_number = A.block_number
WHERE
COALESCE(
A.transaction_count,
0
) <> e.transaction_count
),
impacted_blocks AS (
SELECT
COUNT(1) AS blocks_impacted_count,
ARRAY_AGG(block_number) within GROUP (
ORDER BY
block_number
) AS blocks_impacted_array
FROM
potential_missing_txs
)
SELECT
'transactions' AS test_name,
min_block,
max_block,
min_block_timestamp,
max_block_timestamp,
blocks_tested,
blocks_impacted_count,
blocks_impacted_array,
SYSDATE() AS test_timestamp,
SYSDATE() AS modified_timestamp
FROM
summary_stats
JOIN impacted_blocks
ON 1 = 1

View File

@ -13,13 +13,6 @@ sources:
schema: silver
tables:
- name: number_sequence
- name: labels_combined
- name: complete_token_asset_metadata
- name: complete_token_prices
- name: complete_provider_asset_metadata
- name: complete_provider_prices
- name: complete_native_asset_metadata
- name: complete_native_prices
- name: bronze_streamline
database: streamline
schema: |

View File

@ -41,7 +41,7 @@ SELECT
'Flipside_Crypto/0.1'
),
PARSE_JSON('{}'),
'Vault/prod/movement/mainnet'
'Vault/prod/movement/mainnet_fsc'
) AS request
FROM
blocks

View File

@ -105,7 +105,7 @@ numbers AS (
'Flipside_Crypto/0.1'
),
PARSE_JSON('{}'),
'Vault/prod/movement/mainnet'
'Vault/prod/movement/mainnet_fsc'
) AS request
FROM
WORK

View File

@ -16,5 +16,5 @@ SELECT
'Flipside_Crypto/0.1'
),
OBJECT_CONSTRUCT(),
'Vault/prod/movement/mainnet'
'Vault/prod/movement/mainnet_fsc'
) :data :block_height :: INT AS block_number