diff --git a/.github/workflows/dbt_run_noncore.yml b/.github/workflows/dbt_run_observability.yml similarity index 77% rename from .github/workflows/dbt_run_noncore.yml rename to .github/workflows/dbt_run_observability.yml index 6b36bce..2ee2fd5 100644 --- a/.github/workflows/dbt_run_noncore.yml +++ b/.github/workflows/dbt_run_observability.yml @@ -1,13 +1,14 @@ -name: dbt_run_noncore -run-name: dbt_run_noncore +name: dbt_run_observability +run-name: dbt_run_observability on: workflow_dispatch: schedule: - - cron: "25,55 0/4 * * *" + # Runs “At minute 0 past every 8th hour.” (see https://crontab.guru) + - cron: '0 */8 * * *' env: - DBT_PROFILES_DIR: "${{ vars.DBT_PROFILES_DIR }}" + DBT_PROFILES_DIR: ./ ACCOUNT: "${{ vars.ACCOUNT }}" ROLE: "${{ vars.ROLE }}" @@ -41,4 +42,4 @@ jobs: dbt deps - name: Run DBT Jobs run: | - dbt run -m movement_models,tag:noncore \ No newline at end of file + dbt run -s tag:observability \ No newline at end of file diff --git a/models/silver/_observability/silver_observability.yml b/models/silver/_observability/silver_observability.yml new file mode 100644 index 0000000..9386e32 --- /dev/null +++ b/models/silver/_observability/silver_observability.yml @@ -0,0 +1,130 @@ +version: 2 +models: + - name: silver_observability__blocks_completeness + description: Records of all blocks block gaps (missing blocks) with a timestamp the test was run + config: + contract: + enforced: true + tests: + - dbt_utils.recency: + datepart: day + field: TEST_TIMESTAMP + interval: 2 + severity: error + tags: ['test_recency'] + columns: + - name: TEST_NAME + data_type: VARCHAR + description: Name for the test + - name: MIN_BLOCK + data_type: NUMBER + description: The lowest block id in the test + tests: + - not_null: + tags: ['test_quality'] + - name: MAX_BLOCK + data_type: NUMBER + description: The highest block id in the test + tests: + - not_null: + tags: ['test_quality'] + - name: MIN_BLOCK_TIMESTAMP + data_type: TIMESTAMP_NTZ + description: The lowest block timestamp in the test + tests: + - not_null: + tags: ['test_quality'] + - name: MAX_BLOCK_TIMESTAMP + data_type: TIMESTAMP_NTZ + description: The highest block timestamp in the test + tests: + - not_null: + tags: ['test_quality'] + - name: BLOCKS_TESTED + data_type: NUMBER + description: Count of blocks in the test + tests: + - not_null: + tags: ['test_quality'] + - name: BLOCKS_IMPACTED_COUNT + data_type: NUMBER + description: Count of block gaps in the test + tests: + - not_null: + tags: ['test_quality'] + - name: BLOCKS_IMPACTED_ARRAY + data_type: ARRAY + description: Array of affected blocks + - name: TEST_TIMESTAMP + data_type: TIMESTAMP_NTZ + description: When the test was run + tests: + - not_null: + tags: ['test_quality'] + - unique: + tags: ['test_quality'] + - name: MODIFIED_TIMESTAMP + data_type: TIMESTAMP_NTZ + + - name: silver_observability__transactions_completeness + description: Records of all blocks with missing transactions with a timestamp the test was run + tests: + - dbt_utils.recency: + datepart: day + field: TEST_TIMESTAMP + interval: 2 + severity: error + tags: ['test_recency'] + columns: + - name: TEST_NAME + data_type: VARCHAR + description: Name for the test + - name: MIN_BLOCK + data_type: NUMBER + description: The lowest block id in the test + tests: + - not_null: + tags: ['test_quality'] + - name: MAX_BLOCK + data_type: NUMBER + description: The highest block id in the test + tests: + - not_null: + tags: ['test_quality'] + - name: MIN_BLOCK_TIMESTAMP + data_type: TIMESTAMP_NTZ + description: The lowest block timestamp in the test + tests: + - not_null: + tags: ['test_quality'] + - name: MAX_BLOCK_TIMESTAMP + data_type: TIMESTAMP_NTZ + description: The highest block timestamp in the test + tests: + - not_null: + tags: ['test_quality'] + - name: BLOCKS_TESTED + data_type: NUMBER + description: Count of blocks in the test + tests: + - not_null: + tags: ['test_quality'] + - name: BLOCKS_IMPACTED_COUNT + data_type: NUMBER + description: Count of block gaps in the test + tests: + - not_null: + tags: ['test_quality'] + - name: BLOCKS_IMPACTED_ARRAY + data_type: ARRAY + description: Array of affected blocks + - name: TEST_TIMESTAMP + data_type: TIMESTAMP_NTZ + description: When the test was run + tests: + - not_null: + tags: ['test_quality'] + - unique: + tags: ['test_quality'] + - name: MODIFIED_TIMESTAMP + data_type: TIMESTAMP_NTZ diff --git a/models/silver/_observability/silver_observability__blocks_completeness.sql b/models/silver/_observability/silver_observability__blocks_completeness.sql new file mode 100644 index 0000000..c3ae205 --- /dev/null +++ b/models/silver/_observability/silver_observability__blocks_completeness.sql @@ -0,0 +1,169 @@ +{{ config( + materialized = 'incremental', + unique_key = 'test_timestamp', + full_refresh = false, + tags = ['observability'], + enabled = true +) }} + +WITH summary_stats AS ( + + SELECT + MIN(block_number) AS min_block, + MAX(block_number) AS max_block, + MIN(block_timestamp) AS min_block_timestamp, + MAX(block_timestamp) AS max_block_timestamp, + COUNT(1) AS blocks_tested + FROM + {{ ref('core__fact_blocks') }} + WHERE + block_timestamp <= DATEADD('hour', -12, CURRENT_TIMESTAMP()) + +{% if is_incremental() %} +AND ( + block_number >= ( + SELECT + MIN(block_number) + FROM + ( + SELECT + MIN(block_number) AS block_number + FROM + {{ ref('core__fact_blocks') }} + WHERE + block_timestamp BETWEEN DATEADD('hour', -96, CURRENT_TIMESTAMP()) + AND DATEADD('hour', -95, CURRENT_TIMESTAMP()) + UNION + SELECT + MIN(VALUE) - 1 AS block_number + FROM + ( + SELECT + blocks_impacted_array + FROM + {{ this }} + qualify ROW_NUMBER() over ( + ORDER BY + test_timestamp DESC + ) = 1 + ), + LATERAL FLATTEN( + input => blocks_impacted_array + ) + ) + ) {% if var('OBSERV_FULL_TEST') %} + OR block_number >= 0 + {% endif %} +) +{% endif %} +), +block_range AS ( + SELECT + _id AS block_number + FROM + {{ source( + 'crosschain_silver', + 'number_sequence' + ) }} + WHERE + _id BETWEEN ( + SELECT + min_block + FROM + summary_stats + ) + AND ( + SELECT + max_block + FROM + summary_stats + ) +), +blocks AS ( + SELECT + l.block_number, + block_timestamp, + LAG( + l.block_number, + 1 + ) over ( + ORDER BY + l.block_number ASC + ) AS prev_BLOCK_NUMBER + FROM + {{ ref("core__fact_blocks") }} + l + INNER JOIN block_range b + ON l.block_number = b.block_number + AND l.block_number >= ( + SELECT + MIN(block_number) + FROM + block_range + ) +), +block_gen AS ( + SELECT + _id AS block_number + FROM + {{ source( + 'crosschain_silver', + 'number_sequence' + ) }} + WHERE + _id BETWEEN ( + SELECT + MIN(block_number) + FROM + blocks + ) + AND ( + SELECT + MAX(block_number) + FROM + blocks + ) +) +SELECT + 'blocks' AS test_name, + MIN( + b.block_number + ) AS min_block, + MAX( + b.block_number + ) AS max_block, + MIN( + b.block_timestamp + ) AS min_block_timestamp, + MAX( + b.block_timestamp + ) AS max_block_timestamp, + COUNT(1) AS blocks_tested, + COUNT( + CASE + WHEN C.block_number IS NOT NULL THEN A.block_number + END + ) AS blocks_impacted_count, + ARRAY_AGG( + CASE + WHEN C.block_number IS NOT NULL THEN A.block_number + END + ) within GROUP ( + ORDER BY + A.block_number + ) AS blocks_impacted_array, + SYSDATE() AS test_timestamp, + SYSDATE() AS modified_timestamp +FROM + block_gen A + LEFT JOIN blocks b + ON A.block_number = b.block_number + LEFT JOIN blocks C + ON A.block_number > C.prev_block_number + AND A.block_number < C.block_number + AND C.block_number - C.prev_block_number <> 1 +WHERE + COALESCE( + b.block_number, + C.block_number + ) IS NOT NULL diff --git a/models/silver/_observability/silver_observability__transactions_completeness.sql b/models/silver/_observability/silver_observability__transactions_completeness.sql new file mode 100644 index 0000000..bd19d2f --- /dev/null +++ b/models/silver/_observability/silver_observability__transactions_completeness.sql @@ -0,0 +1,142 @@ +{{ config( + materialized = 'incremental', + unique_key = 'test_timestamp', + full_refresh = false, + tags = ['observability'] +) }} + +WITH summary_stats AS ( + + SELECT + MIN(block_number) AS min_block, + MAX(block_number) AS max_block, + MIN(block_timestamp) AS min_block_timestamp, + MAX(block_timestamp) AS max_block_timestamp, + COUNT(1) AS blocks_tested + FROM + {{ ref('core__fact_blocks') }} + WHERE + block_timestamp <= DATEADD('hour', -12, CURRENT_TIMESTAMP()) + +{% if is_incremental() %} +AND ( + block_number >= ( + SELECT + MIN(block_number) + FROM + ( + SELECT + MIN(block_number) AS block_number + FROM + {{ ref('core__fact_blocks') }} + WHERE + block_timestamp BETWEEN DATEADD('hour', -96, CURRENT_TIMESTAMP()) + AND DATEADD('hour', -95, CURRENT_TIMESTAMP()) + UNION + SELECT + MIN(VALUE) - 1 AS block_number + FROM + ( + SELECT + blocks_impacted_array + FROM + {{ this }} + qualify ROW_NUMBER() over ( + ORDER BY + test_timestamp DESC + ) = 1 + ), + LATERAL FLATTEN( + input => blocks_impacted_array + ) + ) + ) {% if var('OBSERV_FULL_TEST') %} + OR block_number >= 0 + {% endif %} +) +{% endif %} +), +base_blocks AS ( + SELECT + block_number, + tx_count AS transaction_count + FROM + {{ ref('core__fact_blocks') }} + WHERE + block_number BETWEEN ( + SELECT + min_block + FROM + summary_stats + ) + AND ( + SELECT + max_block + FROM + summary_stats + ) + AND + block_number NOT IN (0, 1758, 1760, 1761, 1762, 1763, 1764, 1766) +), +actual_tx_counts AS ( + SELECT + block_number, + COUNT(1) AS transaction_count + FROM + {{ ref('core__fact_transactions') }} + WHERE + block_number BETWEEN ( + SELECT + min_block + FROM + summary_stats + ) + AND ( + SELECT + max_block + FROM + summary_stats + ) + AND + block_number NOT IN (0, 1758, 1760, 1761, 1762, 1763, 1764, 1766) + GROUP BY + block_number +), +potential_missing_txs AS ( + SELECT + e.block_number + FROM + base_blocks e + LEFT OUTER JOIN actual_tx_counts A + ON e.block_number = A.block_number + WHERE + COALESCE( + A.transaction_count, + 0 + ) <> e.transaction_count +), +impacted_blocks AS ( + SELECT + COUNT(1) AS blocks_impacted_count, + ARRAY_AGG(block_number) within GROUP ( + ORDER BY + block_number + ) AS blocks_impacted_array + FROM + potential_missing_txs +) +SELECT + 'transactions' AS test_name, + min_block, + max_block, + min_block_timestamp, + max_block_timestamp, + blocks_tested, + blocks_impacted_count, + blocks_impacted_array, + SYSDATE() AS test_timestamp, + SYSDATE() AS modified_timestamp +FROM + summary_stats + JOIN impacted_blocks + ON 1 = 1 diff --git a/models/sources.yml b/models/sources.yml index a5da39e..f8f61f9 100644 --- a/models/sources.yml +++ b/models/sources.yml @@ -13,13 +13,6 @@ sources: schema: silver tables: - name: number_sequence - - name: labels_combined - - name: complete_token_asset_metadata - - name: complete_token_prices - - name: complete_provider_asset_metadata - - name: complete_provider_prices - - name: complete_native_asset_metadata - - name: complete_native_prices - name: bronze_streamline database: streamline schema: | diff --git a/models/streamline/silver/realtime/streamline__blocks_tx_realtime.sql b/models/streamline/silver/realtime/streamline__blocks_tx_realtime.sql index eb59fee..a7ec06d 100644 --- a/models/streamline/silver/realtime/streamline__blocks_tx_realtime.sql +++ b/models/streamline/silver/realtime/streamline__blocks_tx_realtime.sql @@ -41,7 +41,7 @@ SELECT 'Flipside_Crypto/0.1' ), PARSE_JSON('{}'), - 'Vault/prod/movement/mainnet' + 'Vault/prod/movement/mainnet_fsc' ) AS request FROM blocks diff --git a/models/streamline/silver/realtime/streamline__transactions_realtime.sql b/models/streamline/silver/realtime/streamline__transactions_realtime.sql index 5fc62a5..42213bf 100644 --- a/models/streamline/silver/realtime/streamline__transactions_realtime.sql +++ b/models/streamline/silver/realtime/streamline__transactions_realtime.sql @@ -105,7 +105,7 @@ numbers AS ( 'Flipside_Crypto/0.1' ), PARSE_JSON('{}'), - 'Vault/prod/movement/mainnet' + 'Vault/prod/movement/mainnet_fsc' ) AS request FROM WORK diff --git a/models/streamline/silver/streamline__chainhead.sql b/models/streamline/silver/streamline__chainhead.sql index 86ea4ac..0d21d97 100644 --- a/models/streamline/silver/streamline__chainhead.sql +++ b/models/streamline/silver/streamline__chainhead.sql @@ -16,5 +16,5 @@ SELECT 'Flipside_Crypto/0.1' ), OBJECT_CONSTRUCT(), - 'Vault/prod/movement/mainnet' + 'Vault/prod/movement/mainnet_fsc' ) :data :block_height :: INT AS block_number