observe models

This commit is contained in:
Eric Laurello 2025-03-18 16:15:47 -04:00
parent c93fce674c
commit 8efcef0a3a
13 changed files with 610 additions and 24 deletions

View File

@ -3,7 +3,7 @@
OR REPLACE EXTERNAL FUNCTION streamline.udf_bulk_rest_api_v2(
json OBJECT
) returns ARRAY {% if target.database == 'STELLAR' -%}
api_integration = aws_stellar_api_prod_v2 AS 'https://.execute-api.us-east-1.amazonaws.com/prod/udf_bulk_rest_api'
api_integration = aws_stellar_api_prod_v2 AS 'https://qavdasgp43.execute-api.us-east-1.amazonaws.com/prod/udf_bulk_rest_api'
{% else %}
api_integration = aws_stellar_api_stg_v2 AS 'https://q75hks23yb.execute-api.us-east-1.amazonaws.com/stg/udf_bulk_rest_api'
{%- endif %}

View File

@ -0,0 +1,10 @@
{{ config (
materialized = 'view'
) }}
{{ streamline_external_table_query_v2(
model = "streamline_ledgers",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER)",
partition_name = "partition_key",
unique_key = "metadata",
other_cols = "data"
) }}

View File

@ -0,0 +1,10 @@
{{ config (
materialized = 'view'
) }}
{{ streamline_external_table_FR_query_v2(
model = "streamline_ledgers",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER)",
partition_name = "partition_key",
unique_key = "metadata",
other_cols = "data"
) }}

View File

@ -0,0 +1,134 @@
{{ config(
materialized = 'incremental',
unique_key = 'test_timestamp',
full_refresh = false,
tags = ['observability']
) }}
WITH source AS (
SELECT
SEQUENCE,
block_timestamp,
LAG(
SEQUENCE,
1
) over (
ORDER BY
SEQUENCE ASC
) AS prev_SEQUENCE
FROM
{{ ref('core__fact_ledgers') }} A
WHERE
block_timestamp < DATEADD(
HOUR,
-24,
SYSDATE()
)
{% if is_incremental() %}
AND (
block_timestamp >= DATEADD(
HOUR,
-96,(
SELECT
MAX(
max_block_timestamp
)
FROM
{{ this }}
)
)
OR ({% if var('OBSERV_FULL_TEST') %}
1 = 1
{% else %}
SEQUENCE >= (
SELECT
MIN(VALUE) - 1
FROM
(
SELECT
ledgers_impacted_array
FROM
{{ this }}
qualify ROW_NUMBER() over (
ORDER BY
test_timestamp DESC) = 1), LATERAL FLATTEN(input => ledgers_impacted_array))
{% endif %})
)
{% endif %}
),
seq_gen AS (
SELECT
_id AS SEQUENCE
FROM
{{ source(
'crosschain_silver',
'number_sequence'
) }}
WHERE
_id BETWEEN (
SELECT
MIN(SEQUENCE)
FROM
source
)
AND (
SELECT
MAX(SEQUENCE)
FROM
source
)
)
SELECT
'sequences' AS test_name,
MIN(
b.sequence
) AS min_SEQUENCE,
MAX(
b.sequence
) AS max_SEQUENCE,
MIN(
b.block_timestamp
) AS min_block_timestamp,
MAX(
b.block_timestamp
) AS max_block_timestamp,
COUNT(1) AS sequences_tested,
COUNT(
CASE
WHEN C.sequence IS NOT NULL THEN A.sequence
END
) AS ledgers_impacted_count,
ARRAY_AGG(
CASE
WHEN C.sequence IS NOT NULL THEN A.sequence
END
) within GROUP (
ORDER BY
A.sequence
) AS ledgers_impacted_array,
ARRAY_AGG(
DISTINCT CASE
WHEN C.sequence IS NOT NULL THEN OBJECT_CONSTRUCT(
'prev_sequence',
C.prev_SEQUENCE,
'SEQUENCE',
C.sequence
)
END
) AS test_failure_details,
SYSDATE() AS test_timestamp
FROM
seq_gen A
LEFT JOIN source b
ON A.sequence = b.sequence
LEFT JOIN source C
ON A.sequence > C.prev_SEQUENCE
AND A.sequence < C.sequence
AND C.sequence - C.prev_SEQUENCE <> 1
WHERE
COALESCE(
b.prev_SEQUENCE,
C.prev_SEQUENCE
) IS NOT NULL

View File

@ -0,0 +1,75 @@
version: 2
models:
- name: silver_observability__ledgers_completeness
description: Records of all blocks ledger gaps (missing ledgers) with a timestamp the test was run
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- TEST_TIMESTAMP
columns:
- name: MIN_SEQUENCE
description: The lowest block id in the test
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- name: MAX_SEQUENCE
description: The highest block id in the test
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- name: MIN_BLOCK_TIMESTAMP
description: The lowest block timestamp in the test
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_NTZ
- name: MAX_BLOCK_TIMESTAMP
description: The highest block timestamp in the test
tests:
- not_null
- dbt_expectations.expect_row_values_to_have_recent_data:
datepart: day
interval: 2
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_NTZ
- name: LEDGERS_TESTED
description: Count of blocks in the test
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- name: LEDGERS_IMPACTED_COUNT
description: Count of block gaps in the test
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- name: LEDGERS_IMPACTED_ARRAY
description: Array of affected blocks
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- ARRAY
- name: TEST_FAILURE_DETAILS
description: Array of details of the failure
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- ARRAY
- name: TEST_TIMESTAMP
description: When the test was run
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_NTZ

View File

@ -0,0 +1,143 @@
{{ config(
materialized = 'incremental',
unique_key = 'test_timestamp',
full_refresh = false,
tags = ['observability']
) }}
WITH bq AS (
SELECT
SEQUENCE,
block_timestamp,
successful_transaction_count,
failed_transaction_count,
operation_count,
tx_set_operation_count
FROM
{{ ref('core__fact_ledgers') }} A
WHERE
block_timestamp < DATEADD(
HOUR,
-24,
SYSDATE()
)
{% if is_incremental() %}
AND (
block_timestamp >= DATEADD(
HOUR,
-96,(
SELECT
MAX(
max_block_timestamp
)
FROM
{{ this }}
)
)
OR ({% if var('OBSERV_FULL_TEST') %}
1 = 1
{% else %}
SEQUENCE >= (
SELECT
MIN(VALUE) - 1
FROM
(
SELECT
ledgers_impacted_array
FROM
{{ this }}
qualify ROW_NUMBER() over (
ORDER BY
test_timestamp DESC) = 1), LATERAL FLATTEN(input => ledgers_impacted_array))
{% endif %})
)
{% endif %}
),
rpc AS (
SELECT
SEQUENCE,
block_timestamp,
successful_transaction_count,
failed_transaction_count,
operation_count,
tx_set_operation_count
FROM
{{ ref('streamline__ledgers_complete') }} A
WHERE
block_timestamp < DATEADD(
HOUR,
-24,
SYSDATE()
)
{% if is_incremental() %}
AND (
block_timestamp >= DATEADD(
HOUR,
-96,(
SELECT
MAX(
max_block_timestamp
)
FROM
{{ this }}
)
)
OR ({% if var('OBSERV_FULL_TEST') %}
1 = 1
{% else %}
SEQUENCE >= (
SELECT
MIN(VALUE) - 1
FROM
(
SELECT
ledgers_impacted_array
FROM
{{ this }}
qualify ROW_NUMBER() over (
ORDER BY
test_timestamp DESC) = 1), LATERAL FLATTEN(input => ledgers_impacted_array))
{% endif %})
)
{% endif %}
)
SELECT
'sequences' AS test_name,
MIN(
SEQUENCE
) AS min_SEQUENCE,
MAX(
SEQUENCE
) AS max_SEQUENCE,
MIN(
block_timestamp
) AS min_block_timestamp,
MAX(
block_timestamp
) AS max_block_timestamp,
COUNT(1) AS sequences_tested,
COUNT(
CASE
WHEN bq.successful_transaction_count <> rpc.successful_transaction_count
OR bq.successful_transaction_count IS NULL THEN SEQUENCE
END
) AS ledgers_impacted_count,
ARRAY_AGG(
CASE
WHEN bq.successful_transaction_count <> rpc.successful_transaction_count
OR bq.successful_transaction_count IS NULL THEN SEQUENCE
END
) within GROUP (
ORDER BY
SEQUENCE
) AS ledgers_impacted_array,
SYSDATE() AS test_timestamp
FROM
bq full
OUTER JOIN rpc USING(
SEQUENCE,
block_timestamp
)

View File

@ -0,0 +1,137 @@
{{ config(
materialized = 'incremental',
unique_key = 'test_timestamp',
full_refresh = false,
tags = ['observability']
) }}
WITH summary_stats AS (
SELECT
MIN(SEQUENCE) AS min_SEQUENCE,
MAX(SEQUENCE) AS max_SEQUENCE,
MIN(block_timestamp) AS min_block_timestamp,
MAX(block_timestamp) AS max_block_timestamp,
COUNT(1) AS ledgers_tested
FROM
{{ ref('core__fact_ledgers') }}
WHERE
block_timestamp <= DATEADD('hour', -12, CURRENT_TIMESTAMP())
{% if is_incremental() %}
AND (
block_number >= (
SELECT
MIN(block_number)
FROM
(
SELECT
MIN(SEQUENCE) AS SEQUENCE
FROM
{{ ref('core__fact_ledgers') }}
WHERE
block_timestamp BETWEEN DATEADD('hour', -96, CURRENT_TIMESTAMP())
AND DATEADD('hour', -95, CURRENT_TIMESTAMP())
UNION
SELECT
MIN(VALUE) - 1 AS SEQUENCE
FROM
(
SELECT
ledgers_impacted_array
FROM
{{ this }}
qualify ROW_NUMBER() over (
ORDER BY
test_timestamp DESC
) = 1
),
LATERAL FLATTEN(
input => ledgers_impacted_array
)
)
) {% if var('OBSERV_FULL_TEST') %}
OR SEQUENCE >= 0
{% endif %}
)
{% endif %}
),
base_sequence AS (
SELECT
SEQUENCE,
successful_transaction_count + failed_transaction_count AS transaction_count
FROM
{{ ref('core__fact_ledgers') }}
WHERE
SEQUENCE BETWEEN (
SELECT
min_SEQUENCE
FROM
summary_stats
)
AND (
SELECT
max_SEQUENCE
FROM
summary_stats
)
),
actual_tx_counts AS (
SELECT
ledger_SEQUENCE AS SEQUENCE,
COUNT(1) AS transaction_count
FROM
{{ ref('core__fact_transactions') }}
WHERE
SEQUENCE BETWEEN (
SELECT
min_SEQUENCE
FROM
summary_stats
)
AND (
SELECT
max_SEQUENCE
FROM
summary_stats
)
GROUP BY
SEQUENCE
),
potential_missing_txs AS (
SELECT
e.sequence
FROM
base_sequence e
LEFT OUTER JOIN actual_tx_counts A
ON e.sequence = A.sequence
WHERE
COALESCE(
A.transaction_count,
0
) <> e.transaction_count
),
impacted_seqs AS (
SELECT
COUNT(1) AS ledgers_impacted_count,
ARRAY_AGG(SEQUENCE) within GROUP (
ORDER BY
SEQUENCE
) AS ledgers_impacted_array
FROM
potential_missing_txs
)
SELECT
'transactions' AS test_name,
min_sequence,
max_sequence,
min_block_timestamp,
max_block_timestamp,
ledgers_tested,
ledgers_impacted_count,
ledgers_impacted_array,
SYSDATE() AS test_timestamp
FROM
summary_stats
JOIN impacted_seqs
ON 1 = 1

View File

@ -0,0 +1,27 @@
version: 2
models:
- name: silver_observability__transactions_completeness
description: Records of all block transaction counts. This is an intermediate table used for transaction completeness testing
columns:
- name: BLOCK_ID
description: The lowest block id in the test
tests:
- not_null
- unique
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- name: TRANSACTION_COUNT
description: The lowest block id in the test
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- name: _INSERTED_TIMESTAMP
description: The lowest block timestamp in the test
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_NTZ

View File

@ -3,7 +3,7 @@ version: 2
sources:
- name: bronze_streamline
database: streamline
schema: stellar
schema: "{{ 'stellar' if target.database == 'STELLAR' else 'stellar_dev' }}"
tables:
- name: accounts
- name: contract_data
@ -14,6 +14,8 @@ sources:
- name: history_trades
- name: history_transactions
- name: liquidity_pools
- name: streamline_ledgers
- name: crosschain
database: "{{ 'crosschain' if target.database == 'STELLAR' else 'crosschain_dev' }}"
schema: core

View File

@ -0,0 +1,43 @@
{{ config (
materialized = "incremental",
incremental_strategy = 'merge',
unique_key = "sequence",
cluster_by = "block_timestamp::DATE",
merge_exclude_columns = ["inserted_timestamp"],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(sequence)"
) }}
-- depends_on: {{ ref('bronze__streamline_ledgers') }}
SELECT
DATA :sequence :: INT AS SEQUENCE,
TO_TIMESTAMP(
DATA :closed_at
) AS block_timestamp,
DATA :successful_transaction_count :: INT AS successful_transaction_count,
DATA :failed_transaction_count :: INT AS failed_transaction_count,
DATA :operation_count :: INT AS operation_count,
DATA :tx_set_operation_count :: INT AS tx_set_operation_count,
{{ dbt_utils.generate_surrogate_key(
['sequence']
) }} AS complete_ledgers_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
{% if is_incremental() %}
{{ ref('bronze__streamline_ledgers') }}
WHERE
inserted_timestamp >= (
SELECT
MAX(modified_timestamp) modified_timestamp
FROM
{{ this }}
)
{% else %}
{{ ref('bronze__streamline_ledgers_FR') }}
{% endif %}
qualify(ROW_NUMBER() over (PARTITION BY SEQUENCE
ORDER BY
inserted_timestamp DESC)) = 1

View File

@ -3,37 +3,36 @@
post_hook = fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_rest_api_v2',
target = "{{this.schema}}.{{this.identifier}}",
params ={ "external_table" :"ledgers",
"sql_limit" :"500",
"producer_batch_size" :"500",
"worker_batch_size" :"500",
params ={ "external_table" :"streamline_ledgers",
"sql_limit" :"10000",
"producer_batch_size" :"10000",
"worker_batch_size" :"5000",
"sql_source" :"{{this.identifier}}",
"order_by_column": "ledger_sequence" }
"order_by_column": "sequence" }
)
) }}
WITH ledgers AS (
SELECT
ledger_sequence
SEQUENCE
FROM
{{ ref("streamline__legders") }}
{# EXCEPT
EXCEPT
SELECT
block_number
SEQUENCE
FROM
{{ ref("streamline__blocks_complete") }}
#}
{{ ref("streamline__ledgers_complete") }}
)
SELECT
ledger_sequence,
SEQUENCE,
ROUND(
ledger_sequence,
SEQUENCE,
-4
) :: INT AS partition_key,
{{ target.database }}.live.udf_api(
'GET',
'{Service}/{Authentication}/ledgers/' || ledger_sequence,
'{Service}/{Authentication}/ledgers/' || SEQUENCE,
OBJECT_CONSTRUCT(),
OBJECT_CONSTRUCT(),
'Vault/prod/stellar/quicknode/mainnet'

View File

@ -9,11 +9,14 @@ WITH head AS (
{{ target.database }}.live.udf_api(
'GET',
'{Service}/{Authentication}/ledgers?limit=1&order=desc',
OBJECT_CONSTRUCT(),
OBJECT_CONSTRUCT(
'fsc-quantum-state',
'livequery'
),
OBJECT_CONSTRUCT(),
'Vault/prod/stellar/quicknode/mainnet'
) :data :_embedded :records [0] AS DATA,
DATA :sequence :: INT AS ledger_sequence,
DATA :sequence :: INT AS SEQUENCE,
DATA :closed_at :: datetime AS block_timestamp
),
tail AS (
@ -21,17 +24,20 @@ tail AS (
{{ target.database }}.live.udf_api(
'GET',
'{Service}/{Authentication}/ledgers?limit=1&order=asc',
OBJECT_CONSTRUCT(),
OBJECT_CONSTRUCT(
'fsc-quantum-state',
'livequery'
),
OBJECT_CONSTRUCT(),
'Vault/prod/stellar/quicknode/mainnet'
) :data :_embedded :records [0] AS DATA,
DATA :sequence :: INT AS ledger_sequence,
DATA :sequence :: INT AS SEQUENCE,
DATA :closed_at :: datetime AS block_timestamp
)
SELECT
A.ledger_sequence AS head_ledger_sequence,
A.sequence AS head_sequence,
A.block_timestamp AS head_block_timestamp,
b.ledger_sequence AS tail_ledger_sequence,
b.sequence AS tail_sequence,
b.block_timestamp AS tail_block_timestamp
FROM
head A

View File

@ -4,7 +4,7 @@
) }}
SELECT
_id AS ledger_sequence
_id AS SEQUENCE
FROM
{{ source(
'crosschain_silver',
@ -13,13 +13,13 @@ FROM
WHERE
_id >= (
SELECT
MIN(tail_ledger_sequence)
MIN(tail_sequence)
FROM
{{ ref('streamline__chain_head_tail') }}
)
AND _id <= (
SELECT
MAX(head_ledger_sequence)
MAX(head_sequence)
FROM
{{ ref('streamline__chain_head_tail') }}
)