observe/ runtimes

This commit is contained in:
Eric Laurello 2023-09-08 11:10:05 -04:00
parent bfe514a19f
commit e3d52c4de4
15 changed files with 664 additions and 60 deletions

View File

@ -1,51 +0,0 @@
name: dbt_run_scheduled
run-name: dbt_run_scheduled
on:
workflow_dispatch:
schedule:
# Runs 0700 daily (see https://crontab.guru)
- cron: '0 7 * * *'
env:
USE_VARS: "${{ vars.USE_VARS }}"
DBT_PROFILES_DIR: "${{ vars.DBT_PROFILES_DIR }}"
DBT_VERSION: "${{ vars.DBT_VERSION }}"
ACCOUNT: "${{ vars.ACCOUNT }}"
ROLE: "${{ vars.ROLE }}"
USER: "${{ vars.USER }}"
PASSWORD: "${{ secrets.PASSWORD }}"
REGION: "${{ vars.REGION }}"
DATABASE: "${{ vars.DATABASE }}"
WAREHOUSE: "${{ vars.WAREHOUSE }}"
SCHEMA: "${{ vars.SCHEMA }}"
concurrency:
group: ${{ github.workflow }}
jobs:
called_workflow_template:
uses: FlipsideCrypto/analytics-workflow-templates/.github/workflows/dbt_run_template.yml@main
with:
dbt_command: |
dbt seed;
dbt run --exclude tag:classic;
dbt run -m models/bronze/bronze_api/bronze_api__get_blockchain.sql;
dbt run -m models/silver/silver__blockchain.sql;
dbt run -m models/bronze/bronze_api/bronze_api__get_blockchain.sql;
dbt run -m models/silver/silver__blockchain.sql;
dbt run -m models/bronze/bronze_api/bronze_api__get_blockchain.sql;
dbt run -m models/silver/silver__blockchain.sql;
dbt run -m models/bronze/bronze_api/bronze_api__get_blockchain.sql;
dbt run -m models/silver/silver__blockchain.sql;
dbt run -m models/bronze/bronze_api/bronze_api__get_blockchain.sql;
dbt run -m models/silver/silver__blockchain.sql;
dbt run -m models/bronze/bronze_api/bronze_api__get_blockchain.sql;
dbt run -m models/silver/silver__blockchain.sql;
dbt run -m models/bronze/bronze_api/bronze_api__get_blockchain.sql;
dbt run -m models/silver/silver__blockchain.sql;
dbt run -m models/bronze/bronze_api/bronze_api__get_blockchain.sql;
dbt run -m models/silver/silver__blockchain.sql;
environment: workflow_prod
warehouse: ${{ vars.WAREHOUSE }}
secrets: inherit

View File

@ -1,5 +1,5 @@
name: dbt_run_scheduled_daily
run-name: dbt_run_scheduled_daily
name: dbt_run_daily_dev_refresh
run-name: dbt_run_daily_dev_refresh
on:
workflow_dispatch:

View File

@ -0,0 +1,34 @@
name: dbt_run_observability
run-name: dbt_run_observability
on:
workflow_dispatch:
schedule:
- cron: '15 0,4,8,12,17,20 * * *'
env:
USE_VARS: "${{ vars.USE_VARS }}"
DBT_PROFILES_DIR: "${{ vars.DBT_PROFILES_DIR }}"
DBT_VERSION: "${{ vars.DBT_VERSION }}"
ACCOUNT: "${{ vars.ACCOUNT }}"
ROLE: "${{ vars.ROLE }}"
USER: "${{ vars.USER }}"
PASSWORD: "${{ secrets.PASSWORD }}"
REGION: "${{ vars.REGION }}"
DATABASE: "${{ vars.DATABASE }}"
WAREHOUSE: "${{ vars.WAREHOUSE }}"
SCHEMA: "${{ vars.SCHEMA }}"
concurrency:
group: ${{ github.workflow }}
jobs:
called_workflow_template:
uses: FlipsideCrypto/analytics-workflow-templates/.github/workflows/dbt_run_template.yml@main
with:
dbt_command: |
dbt run -m models/silver/_observability;
environment: workflow_prod
warehouse: ${{ vars.WAREHOUSE }}
secrets: inherit

View File

@ -0,0 +1,35 @@
name: dbt_run_observability_monthly
run-name: dbt_run_observability_monthly
on:
workflow_dispatch:
schedule:
# “At 01:45 on day-of-month 1.”
- cron: '45 3 1 * *'
env:
USE_VARS: "${{ vars.USE_VARS }}"
DBT_PROFILES_DIR: "${{ vars.DBT_PROFILES_DIR }}"
DBT_VERSION: "${{ vars.DBT_VERSION }}"
ACCOUNT: "${{ vars.ACCOUNT }}"
ROLE: "${{ vars.ROLE }}"
USER: "${{ vars.USER }}"
PASSWORD: "${{ secrets.PASSWORD }}"
REGION: "${{ vars.REGION }}"
DATABASE: "${{ vars.DATABASE }}"
WAREHOUSE: "${{ vars.WAREHOUSE }}"
SCHEMA: "${{ vars.SCHEMA }}"
concurrency:
group: ${{ github.workflow }}
jobs:
called_workflow_template:
uses: FlipsideCrypto/analytics-workflow-templates/.github/workflows/dbt_run_template.yml@main
with:
dbt_command: |
dbt run -m models/silver/_observability/silver_observability* --vars "OBSERV_FULL_TEST: true"
environment: workflow_prod
warehouse: ${{ vars.WAREHOUSE }}
secrets: inherit

35
.github/workflows/dbt_run_scheduled.yml vendored Normal file
View File

@ -0,0 +1,35 @@
name: dbt_run_scheduled
run-name: dbt_run_scheduled
on:
workflow_dispatch:
schedule:
# Runs 0700 daily (see https://crontab.guru)
- cron: '20,50 */1 * * **'
env:
USE_VARS: "${{ vars.USE_VARS }}"
DBT_PROFILES_DIR: "${{ vars.DBT_PROFILES_DIR }}"
DBT_VERSION: "${{ vars.DBT_VERSION }}"
ACCOUNT: "${{ vars.ACCOUNT }}"
ROLE: "${{ vars.ROLE }}"
USER: "${{ vars.USER }}"
PASSWORD: "${{ secrets.PASSWORD }}"
REGION: "${{ vars.REGION }}"
DATABASE: "${{ vars.DATABASE }}"
WAREHOUSE: "${{ vars.WAREHOUSE }}"
SCHEMA: "${{ vars.SCHEMA }}"
concurrency:
group: ${{ github.workflow }}
jobs:
called_workflow_template:
uses: FlipsideCrypto/analytics-workflow-templates/.github/workflows/dbt_run_template.yml@main
with:
dbt_command: |
dbt seed;
dbt run --exclude tag:classic --exclude models/silver/_observability;
environment: workflow_prod
warehouse: ${{ vars.WAREHOUSE }}
secrets: inherit

View File

@ -4,8 +4,8 @@ run-name: dbt_run_streamline
on:
workflow_dispatch:
schedule:
# Runs "every hour at the 0 minute" (see https://crontab.guru)
- cron: '0 */1 * * *'
# Runs "every hour at the 0,30 minute" (see https://crontab.guru)
- cron: '0,30 */1 * * *'
env:
USE_VARS: "${{ vars.USE_VARS }}"

View File

@ -51,7 +51,7 @@ tests:
vars:
"dbt_date:time_zone": GMT
"UPDATE_SNOWFLAKE_TAGS": TRUE
STREAMLINE_INVOKE_STREAMS: True
STREAMLINE_INVOKE_STREAMS: TRUE
STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES: False
UPDATE_UDFS_AND_SPS: False
UPDATE_SNOWFLAKE_TAGS: True

View File

@ -25,6 +25,8 @@
COALESCE(CAST({{ unique_key }} AS text), '' :: STRING) AS text
)
) AS id,
s.metadata,
b.file_name,
s.{{ partition_name }},
s.value AS VALUE
FROM
@ -83,6 +85,8 @@ SELECT
COALESCE(CAST({{ unique_key }} AS text), '' :: STRING) AS text
)
) AS id,
s.metadata,
b.file_name,
s.{{ partition_name }},
s.value AS VALUE
FROM

View File

@ -7,7 +7,7 @@ WITH min_block AS (
SELECT
ethereum.streamline.udf_json_rpc_call(
'https://terra-rpc.polkachu.com/',{},
'https://terra-rpc.publicnode.com/',{},
ARRAY_AGG(
{ 'jsonrpc': '2.0',
'id': 0,
@ -119,7 +119,7 @@ calls AS (
SELECT
call,
ethereum.streamline.udf_json_rpc_call(
'https://terra-rpc.polkachu.com/',{},
'https://terra-rpc.publicnode.com/',{},
call
) AS DATA,
SYSDATE() AS _inserted_timestamp

View File

@ -0,0 +1,132 @@
{{ config(
materialized = 'incremental',
full_refresh = false
) }}
WITH source AS (
SELECT
block_id,
block_timestamp,
LAG(
block_id,
1
) over (
ORDER BY
block_id ASC
) AS prev_BLOCK_ID
FROM
{{ ref('silver__blocks') }} A
WHERE
block_timestamp < DATEADD(
HOUR,
-24,
SYSDATE()
)
{% if is_incremental() %}
AND (
block_timestamp >= DATEADD(
HOUR,
-96,(
SELECT
MAX(
max_block_timestamp
)
FROM
{{ this }}
)
)
OR ({% if var('OBSERV_FULL_TEST') %}
block_id >= 0
{% else %}
block_id >= (
SELECT
MIN(VALUE) - 1
FROM
(
SELECT
blocks_impacted_array
FROM
{{ this }}
qualify ROW_NUMBER() over (
ORDER BY
test_timestamp DESC) = 1), LATERAL FLATTEN(input => blocks_impacted_array))
{% endif %})
)
{% endif %}
),
block_gen AS (
SELECT
_id AS block_id
FROM
{{ source(
'crosschain_silver',
'number_sequence'
) }}
WHERE
_id BETWEEN (
SELECT
MIN(block_id)
FROM
source
)
AND (
SELECT
MAX(block_id)
FROM
source
)
)
SELECT
'blocks' AS test_name,
MIN(
b.block_id
) AS min_block,
MAX(
b.block_id
) AS max_block,
MIN(
b.block_timestamp
) AS min_block_timestamp,
MAX(
b.block_timestamp
) AS max_block_timestamp,
COUNT(1) AS blocks_tested,
COUNT(
CASE
WHEN C.block_id IS NOT NULL THEN A.block_id
END
) AS blocks_impacted_count,
ARRAY_AGG(
CASE
WHEN C.block_id IS NOT NULL THEN A.block_id
END
) within GROUP (
ORDER BY
A.block_id
) AS blocks_impacted_array,
ARRAY_AGG(
DISTINCT CASE
WHEN C.block_id IS NOT NULL THEN OBJECT_CONSTRUCT(
'prev_block_id',
C.prev_block_id,
'block_id',
C.block_id
)
END
) AS test_failure_details,
SYSDATE() AS test_timestamp
FROM
block_gen A
LEFT JOIN source b
ON A.block_id = b.block_id
LEFT JOIN source C
ON A.block_id > C.prev_BLOCK_ID
AND A.block_id < C.block_id
AND C.block_id - C.prev_BLOCK_ID <> 1
WHERE
COALESCE(
b.block_id,
C.block_id
) IS NOT NULL

View File

@ -0,0 +1,75 @@
version: 2
models:
- name: silver_observability__blocks_completeness
description: Records of all blocks block gaps (missing blocks) with a timestamp the test was run
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- TEST_TIMESTAMP
columns:
- name: MIN_BLOCK
description: The lowest block id in the test
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- name: MAX_BLOCK
description: The highest block id in the test
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- name: MIN_BLOCK_TIMESTAMP
description: The lowest block timestamp in the test
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_NTZ
- name: MAX_BLOCK_TIMESTAMP
description: The highest block timestamp in the test
tests:
- not_null
- dbt_expectations.expect_row_values_to_have_recent_data:
datepart: day
interval: 2
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_NTZ
- name: BLOCKS_TESTED
description: Count of blocks in the test
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- name: BLOCKS_IMPACTED_COUNT
description: Count of block gaps in the test
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- name: BLOCKS_IMPACTED_ARRAY
description: Array of affected blocks
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- ARRAY
- name: TEST_FAILURE_DETAILS
description: Array of details of the failure
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- ARRAY
- name: TEST_TIMESTAMP
description: When the test was run
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_NTZ

View File

@ -0,0 +1,257 @@
{{ config(
materialized = 'incremental',
full_refresh = false
) }}
WITH rel_blocks AS (
SELECT
block_id,
block_timestamp
FROM
{{ ref('silver__blocks') }}
WHERE
block_timestamp < DATEADD(
HOUR,
-24,
SYSDATE()
)
{% if is_incremental() %}
AND (
block_timestamp >= DATEADD(
HOUR,
-96,(
SELECT
MAX(
max_block_timestamp
)
FROM
{{ this }}
)
)
OR ({% if var('OBSERV_FULL_TEST') %}
block_id >= 0
{% else %}
block_id >= (
SELECT
MIN(VALUE) - 1
FROM
(
SELECT
blocks_impacted_array
FROM
{{ this }}
qualify ROW_NUMBER() over (
ORDER BY
test_timestamp DESC) = 1), LATERAL FLATTEN(input => blocks_impacted_array))
{% endif %})
)
{% endif %}
),
bronze AS (
SELECT
A.block_number AS block_id,
REPLACE(
metadata :request :params [0],
'tx.height='
) :: INT AS block_id_requested,
b.block_timestamp,
d.value :hash :: STRING AS tx_id,
A._inserted_timestamp
FROM
{{ ref('bronze__streamline_FR_transactions') }} A
LEFT JOIN rel_blocks b
ON A.block_number = b.block_id
LEFT JOIN rel_blocks C
ON block_id_requested = C.block_id
JOIN LATERAL FLATTEN(
input => A.data :result :txs
) AS d
WHERE
(
b.block_id IS NOT NULL
OR C.block_id IS NOT NULL
)
{% if is_incremental() %}
AND A._inserted_timestamp >= CURRENT_DATE - 14
OR {% if var('OBSERV_FULL_TEST') %}
1 = 1
{% else %}
(
SELECT
MIN(VALUE) - 1
FROM
(
SELECT
blocks_impacted_array
FROM
{{ this }}
qualify ROW_NUMBER() over (
ORDER BY
test_timestamp DESC
) = 1
),
LATERAL FLATTEN(
input => blocks_impacted_array
)
) IS NOT NULL
{% endif %}
{% endif %}
),
b_block AS (
SELECT
A.block_id,
A.block_id_requested,
A.block_timestamp,
A.tx_id,
A._inserted_timestamp
FROM
bronze A qualify(ROW_NUMBER() over(PARTITION BY A.block_id, tx_id
ORDER BY
A._inserted_timestamp DESC) = 1)
),
b_block_req AS (
SELECT
A.block_id,
A.block_id_requested,
A.block_timestamp,
A.tx_id,
A._inserted_timestamp
FROM
bronze A qualify(ROW_NUMBER() over(PARTITION BY A.block_id_requested, tx_id
ORDER BY
A._inserted_timestamp DESC) = 1)
),
bronze_count AS (
SELECT
block_id,
block_timestamp,
MAX(num_txs) num_txs
FROM
(
SELECT
block_id,
block_timestamp,
COUNT(
DISTINCT tx_id
) AS num_txs
FROM
b_block A
GROUP BY
block_id,
block_timestamp
UNION ALL
SELECT
block_id_requested AS block_id,
MIN(block_timestamp) AS block_timestamp,
COUNT(
DISTINCT tx_id
) AS num_txs
FROM
b_block_req A
GROUP BY
block_id_requested
)
GROUP BY
block_id,
block_timestamp
),
bronze_api AS (
SELECT
block_id,
block_timestamp,
num_txs
FROM
{{ ref('silver__blockchain') }}
WHERE
block_timestamp BETWEEN (
SELECT
MIN(block_timestamp)
FROM
rel_blocks
)
AND (
SELECT
MAX(block_timestamp)
FROM
rel_blocks
)
)
SELECT
'transactions' AS test_name,
MIN(
A.block_id
) AS min_block,
MAX(
A.block_id
) AS max_block,
MIN(
A.block_timestamp
) AS min_block_timestamp,
MAX(
A.block_timestamp
) AS max_block_timestamp,
COUNT(1) AS blocks_tested,
SUM(
CASE
WHEN COALESCE(
b.num_txs,
0
) - A.num_txs <> 0 THEN 1
ELSE 0
END
) AS blocks_impacted_count,
ARRAY_AGG(
CASE
WHEN COALESCE(
b.num_txs,
0
) - A.num_txs <> 0 THEN A.block_id
END
) within GROUP (
ORDER BY
A.block_id
) AS blocks_impacted_array,
SUM(
ABS(
COALESCE(
b.num_txs,
0
) - A.num_txs
)
) AS transactions_impacted_count,
ARRAY_AGG(
CASE
WHEN COALESCE(
b.num_txs,
0
) - A.num_txs <> 0 THEN OBJECT_CONSTRUCT(
'block',
A.block_id,
'block_timestamp',
A.block_timestamp,
'diff',
COALESCE(
b.num_txs,
0
) - A.num_txs,
'blockchain_num_txs',
A.num_txs,
'bronze_num_txs',
COALESCE(
b.num_txs,
0
)
)
END
) within GROUP(
ORDER BY
A.block_id
) AS test_failure_details,
SYSDATE() AS test_timestamp
FROM
bronze_api A
LEFT JOIN bronze_count b
ON A.block_id = b.block_id

View File

@ -0,0 +1,82 @@
version: 2
models:
- name: silver_observability__transactions_completeness
description: Records of all blocks with missing transactions with a timestamp the test was run. PLEASE NOTE THIS TEST IS ONLY VALID FOR BLOCKS 6673335 and forward
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- TEST_TIMESTAMP
columns:
- name: MIN_BLOCK
description: The lowest block id in the test
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- name: MAX_BLOCK
description: The highest block id in the test
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- name: MIN_BLOCK_TIMESTAMP
description: The lowest block timestamp in the test
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_NTZ
- name: MAX_BLOCK_TIMESTAMP
description: The highest block timestamp in the test
tests:
- not_null
- dbt_expectations.expect_row_values_to_have_recent_data:
datepart: day
interval: 2
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_NTZ
- name: BLOCKS_TESTED
description: Count of blocks in the test
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- name: BLOCKS_IMPACTED_COUNT
description: Count of blocks with missing transactions in the test
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- name: BLOCKS_IMPACTED_ARRAY
description: Array of affected blocks
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- ARRAY
- name: TRANSACTIONS_IMPACTED_COUNT
description: Total count of missing transactions in the test
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- name: TEST_FAILURE_DETAILS
description: blocks with missing transactions with the number of missing transactions
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- ARRAY
- name: TEST_TIMESTAMP
description: When the test was run
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_NTZ

View File

@ -16,7 +16,7 @@ WITH labels AS (
system_created_at AS _inserted_timestamp
FROM
{{ source(
'labels_v2',
'crosschain_silver',
'address_labels'
) }}
WHERE

View File

@ -61,11 +61,12 @@ sources:
description: The time this row was ingested by Chainwalkers
- name: _inserted_timestamp
description: The time this row was inserted into Snowflake
- name: labels_v2
- name: crosschain_silver
schema: silver
database: crosschain
tables:
- name: address_labels
- name: number_sequence
- name: bronze
schema: bronze
database: terra