AN-5791 observe models (#10)

* observe models

* wf, yml

* spelling, scheduling, comments
This commit is contained in:
eric-laurello 2025-04-03 13:22:07 -04:00 committed by GitHub
parent a15c3a6828
commit 0d19dfd4f7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
17 changed files with 806 additions and 26 deletions

View File

@ -0,0 +1,45 @@
name: dbt_run_observability
run-name: dbt_run_observability
on:
workflow_dispatch:
schedule:
# Runs “At minute 0 past every 8th hour.” (see https://crontab.guru)
- cron: '0 */8 * * *'
env:
DBT_PROFILES_DIR: ./
ACCOUNT: "${{ vars.ACCOUNT }}"
ROLE: "${{ vars.ROLE }}"
USER: "${{ vars.USER }}"
PASSWORD: "${{ secrets.PASSWORD }}"
REGION: "${{ vars.REGION }}"
DATABASE: "${{ vars.DATABASE }}"
WAREHOUSE: "${{ vars.WAREHOUSE }}"
SCHEMA: "${{ vars.SCHEMA }}"
concurrency:
group: ${{ github.workflow }}
jobs:
run_dbt_jobs:
runs-on: ubuntu-latest
environment:
name: workflow_prod
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: "${{ vars.PYTHON_VERSION }}"
cache: "pip"
- name: install dependencies
run: |
pip install -r requirements.txt
dbt deps
- name: Run DBT Jobs
run: |
dbt build -s "stellar_models,tag:observability"

View File

@ -0,0 +1,45 @@
name: dbt_run_observability_full
run-name: dbt_run_observability_full
on:
workflow_dispatch:
schedule:
# Runs “At 19:00 on day-of-month 1.” (see https://crontab.guru)
- cron: '0 19 1 * *'
env:
DBT_PROFILES_DIR: ./
ACCOUNT: "${{ vars.ACCOUNT }}"
ROLE: "${{ vars.ROLE }}"
USER: "${{ vars.USER }}"
PASSWORD: "${{ secrets.PASSWORD }}"
REGION: "${{ vars.REGION }}"
DATABASE: "${{ vars.DATABASE }}"
WAREHOUSE: "${{ vars.WAREHOUSE }}"
SCHEMA: "${{ vars.SCHEMA }}"
concurrency:
group: ${{ github.workflow }}
jobs:
run_dbt_jobs:
runs-on: ubuntu-latest
environment:
name: workflow_prod
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: "${{ vars.PYTHON_VERSION }}"
cache: "pip"
- name: install dependencies
run: |
pip install -r requirements.txt
dbt deps
- name: Run DBT Jobs
run: |
dbt build --vars '{"OBSERV_FULL_TEST":True}' -s "stellar_models,tag:observability"

View File

@ -39,7 +39,7 @@ jobs:
dbt deps dbt deps
- name: Run DBT Jobs - name: Run DBT Jobs
run: | run: |
dbt test -m "stellar_models,models/silver" "stellar_models,models/gold" --vars 'test_days_threshold: 1' dbt test -m "stellar_models,models/silver" "stellar_models,models/gold" --vars 'test_days_threshold: 1' --exclude "stellar_models,tag:observability"
continue-on-error: true continue-on-error: true

View File

@ -3,7 +3,7 @@
OR REPLACE EXTERNAL FUNCTION streamline.udf_bulk_rest_api_v2( OR REPLACE EXTERNAL FUNCTION streamline.udf_bulk_rest_api_v2(
json OBJECT json OBJECT
) returns ARRAY {% if target.database == 'STELLAR' -%} ) returns ARRAY {% if target.database == 'STELLAR' -%}
api_integration = aws_stellar_api_prod_v2 AS 'https://.execute-api.us-east-1.amazonaws.com/prod/udf_bulk_rest_api' api_integration = aws_stellar_api_prod_v2 AS 'https://qavdasgp43.execute-api.us-east-1.amazonaws.com/prod/udf_bulk_rest_api'
{% else %} {% else %}
api_integration = aws_stellar_api_stg_v2 AS 'https://q75hks23yb.execute-api.us-east-1.amazonaws.com/stg/udf_bulk_rest_api' api_integration = aws_stellar_api_stg_v2 AS 'https://q75hks23yb.execute-api.us-east-1.amazonaws.com/stg/udf_bulk_rest_api'
{%- endif %} {%- endif %}

View File

@ -0,0 +1,10 @@
{{ config (
materialized = 'view'
) }}
{{ streamline_external_table_query_v2(
model = "streamline_ledgers",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER)",
partition_name = "partition_key",
unique_key = "metadata",
other_cols = "data"
) }}

View File

@ -0,0 +1,10 @@
{{ config (
materialized = 'view'
) }}
{{ streamline_external_table_FR_query_v2(
model = "streamline_ledgers",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER)",
partition_name = "partition_key",
unique_key = "metadata",
other_cols = "data"
) }}

View File

@ -0,0 +1,134 @@
{{ config(
materialized = 'incremental',
unique_key = 'test_timestamp',
full_refresh = false,
tags = ['observability']
) }}
WITH source AS (
SELECT
SEQUENCE,
block_timestamp,
LAG(
SEQUENCE,
1
) over (
ORDER BY
SEQUENCE ASC
) AS prev_SEQUENCE
FROM
{{ ref('core__fact_ledgers') }} A
WHERE
block_timestamp < DATEADD(
HOUR,
-24,
SYSDATE()
)
{% if is_incremental() %}
AND (
block_timestamp >= DATEADD(
HOUR,
-96,(
SELECT
MAX(
max_block_timestamp
)
FROM
{{ this }}
)
)
OR ({% if var('OBSERV_FULL_TEST') %}
1 = 1
{% else %}
SEQUENCE >= (
SELECT
MIN(VALUE) - 1
FROM
(
SELECT
ledgers_impacted_array
FROM
{{ this }}
qualify ROW_NUMBER() over (
ORDER BY
test_timestamp DESC) = 1), LATERAL FLATTEN(input => ledgers_impacted_array))
{% endif %})
)
{% endif %}
),
seq_gen AS (
SELECT
_id AS SEQUENCE
FROM
{{ source(
'crosschain_silver',
'number_sequence'
) }}
WHERE
_id BETWEEN (
SELECT
MIN(SEQUENCE)
FROM
source
)
AND (
SELECT
MAX(SEQUENCE)
FROM
source
)
)
SELECT
'sequences' AS test_name,
MIN(
b.sequence
) AS min_SEQUENCE,
MAX(
b.sequence
) AS max_SEQUENCE,
MIN(
b.block_timestamp
) AS min_block_timestamp,
MAX(
b.block_timestamp
) AS max_block_timestamp,
COUNT(1) AS ledgers_tested,
COUNT(
CASE
WHEN C.sequence IS NOT NULL THEN A.sequence
END
) AS ledgers_impacted_count,
ARRAY_AGG(
CASE
WHEN C.sequence IS NOT NULL THEN A.sequence
END
) within GROUP (
ORDER BY
A.sequence
) AS ledgers_impacted_array,
ARRAY_AGG(
DISTINCT CASE
WHEN C.sequence IS NOT NULL THEN OBJECT_CONSTRUCT(
'prev_sequence',
C.prev_SEQUENCE,
'SEQUENCE',
C.sequence
)
END
) AS test_failure_details,
SYSDATE() AS test_timestamp
FROM
seq_gen A
LEFT JOIN source b
ON A.sequence = b.sequence
LEFT JOIN source C
ON A.sequence > C.prev_SEQUENCE
AND A.sequence < C.sequence
AND C.sequence - C.prev_SEQUENCE <> 1
WHERE
COALESCE(
b.prev_SEQUENCE,
C.prev_SEQUENCE
) IS NOT NULL

View File

@ -0,0 +1,75 @@
version: 2
models:
- name: silver_observability__ledgers_completeness
description: Records of all blocks ledger gaps (missing ledgers) with a timestamp the test was run
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- TEST_TIMESTAMP
columns:
- name: MIN_SEQUENCE
description: The lowest block id in the test
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- name: MAX_SEQUENCE
description: The highest block id in the test
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- name: MIN_BLOCK_TIMESTAMP
description: The lowest block timestamp in the test
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_NTZ
- name: MAX_BLOCK_TIMESTAMP
description: The highest block timestamp in the test
tests:
- not_null
- dbt_expectations.expect_row_values_to_have_recent_data:
datepart: day
interval: 2
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_NTZ
- name: LEDGERS_TESTED
description: Count of blocks in the test
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- name: LEDGERS_IMPACTED_COUNT
description: Count of block gaps in the test
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- name: LEDGERS_IMPACTED_ARRAY
description: Array of affected blocks
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- ARRAY
- name: TEST_FAILURE_DETAILS
description: Array of details of the failure
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- ARRAY
- name: TEST_TIMESTAMP
description: When the test was run
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_NTZ

View File

@ -0,0 +1,143 @@
{{ config(
materialized = 'incremental',
unique_key = 'test_timestamp',
full_refresh = false,
tags = ['observability']
) }}
WITH bq AS (
SELECT
SEQUENCE,
block_timestamp,
successful_transaction_count,
failed_transaction_count,
operation_count,
tx_set_operation_count
FROM
{{ ref('core__fact_ledgers') }} A
WHERE
block_timestamp < DATEADD(
HOUR,
-24,
SYSDATE()
)
{% if is_incremental() %}
AND (
block_timestamp >= DATEADD(
HOUR,
-96,(
SELECT
MAX(
max_block_timestamp
)
FROM
{{ this }}
)
)
OR ({% if var('OBSERV_FULL_TEST') %}
1 = 1
{% else %}
SEQUENCE >= (
SELECT
MIN(VALUE) - 1
FROM
(
SELECT
ledgers_impacted_array
FROM
{{ this }}
qualify ROW_NUMBER() over (
ORDER BY
test_timestamp DESC) = 1), LATERAL FLATTEN(input => ledgers_impacted_array))
{% endif %})
)
{% endif %}
),
rpc AS (
SELECT
SEQUENCE,
block_timestamp,
successful_transaction_count,
failed_transaction_count,
operation_count,
tx_set_operation_count
FROM
{{ ref('streamline__ledgers_complete') }} A
WHERE
block_timestamp < DATEADD(
HOUR,
-24,
SYSDATE()
)
{% if is_incremental() %}
AND (
block_timestamp >= DATEADD(
HOUR,
-96,(
SELECT
MAX(
max_block_timestamp
)
FROM
{{ this }}
)
)
OR ({% if var('OBSERV_FULL_TEST') %}
1 = 1
{% else %}
SEQUENCE >= (
SELECT
MIN(VALUE) - 1
FROM
(
SELECT
ledgers_impacted_array
FROM
{{ this }}
qualify ROW_NUMBER() over (
ORDER BY
test_timestamp DESC) = 1), LATERAL FLATTEN(input => ledgers_impacted_array))
{% endif %})
)
{% endif %}
)
SELECT
'sequences' AS test_name,
MIN(
SEQUENCE
) AS min_SEQUENCE,
MAX(
SEQUENCE
) AS max_SEQUENCE,
MIN(
block_timestamp
) AS min_block_timestamp,
MAX(
block_timestamp
) AS max_block_timestamp,
COUNT(1) AS ledgers_tested,
COUNT(
CASE
WHEN bq.successful_transaction_count <> rpc.successful_transaction_count
OR bq.successful_transaction_count IS NULL THEN SEQUENCE
END
) AS ledgers_impacted_count,
ARRAY_AGG(
CASE
WHEN bq.successful_transaction_count <> rpc.successful_transaction_count
OR bq.successful_transaction_count IS NULL THEN SEQUENCE
END
) within GROUP (
ORDER BY
SEQUENCE
) AS ledgers_impacted_array,
SYSDATE() AS test_timestamp
FROM
bq full
OUTER JOIN rpc USING(
SEQUENCE,
block_timestamp
)

View File

@ -0,0 +1,68 @@
version: 2
models:
- name: silver_observability__ledgers_xref_completeness
description: Records of all blocks ledger differences between hubble and rpc
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- TEST_TIMESTAMP
columns:
- name: MIN_SEQUENCE
description: The lowest block id in the test
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- name: MAX_SEQUENCE
description: The highest block id in the test
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- name: MIN_BLOCK_TIMESTAMP
description: The lowest block timestamp in the test
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_NTZ
- name: MAX_BLOCK_TIMESTAMP
description: The highest block timestamp in the test
tests:
- not_null
- dbt_expectations.expect_row_values_to_have_recent_data:
datepart: day
interval: 2
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_NTZ
- name: LEDGERS_TESTED
description: Count of blocks in the test
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- name: LEDGERS_IMPACTED_COUNT
description: Count of block gaps in the test
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- name: LEDGERS_IMPACTED_ARRAY
description: Array of affected blocks
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- ARRAY
- name: TEST_TIMESTAMP
description: When the test was run
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_NTZ

View File

@ -0,0 +1,137 @@
{{ config(
materialized = 'incremental',
unique_key = 'test_timestamp',
full_refresh = false,
tags = ['observability']
) }}
WITH summary_stats AS (
SELECT
MIN(SEQUENCE) AS min_SEQUENCE,
MAX(SEQUENCE) AS max_SEQUENCE,
MIN(block_timestamp) AS min_block_timestamp,
MAX(block_timestamp) AS max_block_timestamp,
COUNT(1) AS ledgers_tested
FROM
{{ ref('core__fact_ledgers') }}
WHERE
block_timestamp <= DATEADD('hour', -12, CURRENT_TIMESTAMP())
{% if is_incremental() %}
AND (
SEQUENCE >= (
SELECT
MIN(SEQUENCE)
FROM
(
SELECT
MIN(SEQUENCE) AS SEQUENCE
FROM
{{ ref('core__fact_ledgers') }}
WHERE
block_timestamp BETWEEN DATEADD('hour', -96, CURRENT_TIMESTAMP())
AND DATEADD('hour', -95, CURRENT_TIMESTAMP())
UNION
SELECT
MIN(VALUE) - 1 AS SEQUENCE
FROM
(
SELECT
ledgers_impacted_array
FROM
{{ this }}
qualify ROW_NUMBER() over (
ORDER BY
test_timestamp DESC
) = 1
),
LATERAL FLATTEN(
input => ledgers_impacted_array
)
)
) {% if var('OBSERV_FULL_TEST') %}
OR SEQUENCE >= 0
{% endif %}
)
{% endif %}
),
base_sequence AS (
SELECT
SEQUENCE,
successful_transaction_count + failed_transaction_count AS transaction_count
FROM
{{ ref('core__fact_ledgers') }}
WHERE
SEQUENCE BETWEEN (
SELECT
min_SEQUENCE
FROM
summary_stats
)
AND (
SELECT
max_SEQUENCE
FROM
summary_stats
)
),
actual_tx_counts AS (
SELECT
ledger_SEQUENCE AS SEQUENCE,
COUNT(1) AS transaction_count
FROM
{{ ref('core__fact_transactions') }}
WHERE
SEQUENCE BETWEEN (
SELECT
min_SEQUENCE
FROM
summary_stats
)
AND (
SELECT
max_SEQUENCE
FROM
summary_stats
)
GROUP BY
SEQUENCE
),
potential_missing_txs AS (
SELECT
e.sequence
FROM
base_sequence e
LEFT OUTER JOIN actual_tx_counts A
ON e.sequence = A.sequence
WHERE
COALESCE(
A.transaction_count,
0
) <> e.transaction_count
),
impacted_seqs AS (
SELECT
COUNT(1) AS ledgers_impacted_count,
ARRAY_AGG(SEQUENCE) within GROUP (
ORDER BY
SEQUENCE
) AS ledgers_impacted_array
FROM
potential_missing_txs
)
SELECT
'transactions' AS test_name,
min_sequence,
max_sequence,
min_block_timestamp,
max_block_timestamp,
ledgers_tested,
ledgers_impacted_count,
ledgers_impacted_array,
SYSDATE() AS test_timestamp
FROM
summary_stats
JOIN impacted_seqs
ON 1 = 1

View File

@ -0,0 +1,64 @@
version: 2
models:
- name: silver_observability__transactions_completeness
description: Records of all block transaction counts.
columns:
- name: MIN_SEQUENCE
description: The lowest block id in the test
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- name: MAX_SEQUENCE
description: The highest block id in the test
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- name: MIN_BLOCK_TIMESTAMP
description: The lowest block timestamp in the test
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_NTZ
- name: MAX_BLOCK_TIMESTAMP
description: The highest block timestamp in the test
tests:
- not_null
- dbt_expectations.expect_row_values_to_have_recent_data:
datepart: day
interval: 2
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_NTZ
- name: LEDGERS_TESTED
description: Count of blocks in the test
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- name: LEDGERS_IMPACTED_COUNT
description: Count of block gaps in the test
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- name: LEDGERS_IMPACTED_ARRAY
description: Array of affected blocks
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- ARRAY
- name: TEST_TIMESTAMP
description: When the test was run
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_NTZ

View File

@ -3,7 +3,7 @@ version: 2
sources: sources:
- name: bronze_streamline - name: bronze_streamline
database: streamline database: streamline
schema: stellar schema: "{{ 'stellar' if target.database == 'STELLAR' else 'stellar_dev' }}"
tables: tables:
- name: accounts - name: accounts
- name: contract_data - name: contract_data
@ -14,6 +14,7 @@ sources:
- name: history_trades - name: history_trades
- name: history_transactions - name: history_transactions
- name: liquidity_pools - name: liquidity_pools
- name: streamline_ledgers
- name: trust_lines - name: trust_lines
- name: crosschain - name: crosschain
database: "{{ 'crosschain' if target.database == 'STELLAR' else 'crosschain_dev' }}" database: "{{ 'crosschain' if target.database == 'STELLAR' else 'crosschain_dev' }}"

View File

@ -0,0 +1,43 @@
{{ config (
materialized = "incremental",
incremental_strategy = 'merge',
unique_key = "sequence",
cluster_by = "block_timestamp::DATE",
merge_exclude_columns = ["inserted_timestamp"],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(sequence)"
) }}
-- depends_on: {{ ref('bronze__streamline_ledgers') }}
SELECT
DATA :sequence :: INT AS SEQUENCE,
TO_TIMESTAMP(
DATA :closed_at
) AS block_timestamp,
DATA :successful_transaction_count :: INT AS successful_transaction_count,
DATA :failed_transaction_count :: INT AS failed_transaction_count,
DATA :operation_count :: INT AS operation_count,
DATA :tx_set_operation_count :: INT AS tx_set_operation_count,
{{ dbt_utils.generate_surrogate_key(
['sequence']
) }} AS complete_ledgers_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
{% if is_incremental() %}
{{ ref('bronze__streamline_ledgers') }}
WHERE
inserted_timestamp >= (
SELECT
MAX(modified_timestamp) modified_timestamp
FROM
{{ this }}
)
{% else %}
{{ ref('bronze__streamline_ledgers_FR') }}
{% endif %}
qualify(ROW_NUMBER() over (PARTITION BY SEQUENCE
ORDER BY
inserted_timestamp DESC)) = 1

View File

@ -3,37 +3,36 @@
post_hook = fsc_utils.if_data_call_function_v2( post_hook = fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_rest_api_v2', func = 'streamline.udf_bulk_rest_api_v2',
target = "{{this.schema}}.{{this.identifier}}", target = "{{this.schema}}.{{this.identifier}}",
params ={ "external_table" :"ledgers", params ={ "external_table" :"streamline_ledgers",
"sql_limit" :"500", "sql_limit" :"10000",
"producer_batch_size" :"500", "producer_batch_size" :"10000",
"worker_batch_size" :"500", "worker_batch_size" :"5000",
"sql_source" :"{{this.identifier}}", "sql_source" :"{{this.identifier}}",
"order_by_column": "ledger_sequence" } "order_by_column": "sequence" }
) )
) }} ) }}
WITH ledgers AS ( WITH ledgers AS (
SELECT SELECT
ledger_sequence SEQUENCE
FROM FROM
{{ ref("streamline__legders") }} {{ ref("streamline__ledgers") }}
{# EXCEPT EXCEPT
SELECT SELECT
block_number SEQUENCE
FROM FROM
{{ ref("streamline__blocks_complete") }} {{ ref("streamline__ledgers_complete") }}
#}
) )
SELECT SELECT
ledger_sequence, SEQUENCE,
ROUND( ROUND(
ledger_sequence, SEQUENCE,
-4 -4
) :: INT AS partition_key, ) :: INT AS partition_key,
{{ target.database }}.live.udf_api( {{ target.database }}.live.udf_api(
'GET', 'GET',
'{Service}/{Authentication}/ledgers/' || ledger_sequence, '{Service}/{Authentication}/ledgers/' || SEQUENCE,
OBJECT_CONSTRUCT(), OBJECT_CONSTRUCT(),
OBJECT_CONSTRUCT(), OBJECT_CONSTRUCT(),
'Vault/prod/stellar/quicknode/mainnet' 'Vault/prod/stellar/quicknode/mainnet'

View File

@ -9,11 +9,14 @@ WITH head AS (
{{ target.database }}.live.udf_api( {{ target.database }}.live.udf_api(
'GET', 'GET',
'{Service}/{Authentication}/ledgers?limit=1&order=desc', '{Service}/{Authentication}/ledgers?limit=1&order=desc',
OBJECT_CONSTRUCT(), OBJECT_CONSTRUCT(
'fsc-quantum-state',
'livequery'
),
OBJECT_CONSTRUCT(), OBJECT_CONSTRUCT(),
'Vault/prod/stellar/quicknode/mainnet' 'Vault/prod/stellar/quicknode/mainnet'
) :data :_embedded :records [0] AS DATA, ) :data :_embedded :records [0] AS DATA,
DATA :sequence :: INT AS ledger_sequence, DATA :sequence :: INT AS SEQUENCE,
DATA :closed_at :: datetime AS block_timestamp DATA :closed_at :: datetime AS block_timestamp
), ),
tail AS ( tail AS (
@ -21,17 +24,20 @@ tail AS (
{{ target.database }}.live.udf_api( {{ target.database }}.live.udf_api(
'GET', 'GET',
'{Service}/{Authentication}/ledgers?limit=1&order=asc', '{Service}/{Authentication}/ledgers?limit=1&order=asc',
OBJECT_CONSTRUCT(), OBJECT_CONSTRUCT(
'fsc-quantum-state',
'livequery'
),
OBJECT_CONSTRUCT(), OBJECT_CONSTRUCT(),
'Vault/prod/stellar/quicknode/mainnet' 'Vault/prod/stellar/quicknode/mainnet'
) :data :_embedded :records [0] AS DATA, ) :data :_embedded :records [0] AS DATA,
DATA :sequence :: INT AS ledger_sequence, DATA :sequence :: INT AS SEQUENCE,
DATA :closed_at :: datetime AS block_timestamp DATA :closed_at :: datetime AS block_timestamp
) )
SELECT SELECT
A.ledger_sequence AS head_ledger_sequence, A.sequence AS head_sequence,
A.block_timestamp AS head_block_timestamp, A.block_timestamp AS head_block_timestamp,
b.ledger_sequence AS tail_ledger_sequence, b.sequence AS tail_sequence,
b.block_timestamp AS tail_block_timestamp b.block_timestamp AS tail_block_timestamp
FROM FROM
head A head A

View File

@ -4,7 +4,7 @@
) }} ) }}
SELECT SELECT
_id AS ledger_sequence _id AS SEQUENCE
FROM FROM
{{ source( {{ source(
'crosschain_silver', 'crosschain_silver',
@ -13,13 +13,13 @@ FROM
WHERE WHERE
_id >= ( _id >= (
SELECT SELECT
MIN(tail_ledger_sequence) MIN(tail_sequence)
FROM FROM
{{ ref('streamline__chain_head_tail') }} {{ ref('streamline__chain_head_tail') }}
) )
AND _id <= ( AND _id <= (
SELECT SELECT
MAX(head_ledger_sequence) MAX(head_sequence)
FROM FROM
{{ ref('streamline__chain_head_tail') }} {{ ref('streamline__chain_head_tail') }}
) )