From 0d19dfd4f751d8b11a9a71b3f337666046b8a985 Mon Sep 17 00:00:00 2001 From: eric-laurello <102970824+eric-laurello@users.noreply.github.com> Date: Thu, 3 Apr 2025 13:22:07 -0400 Subject: [PATCH] AN-5791 observe models (#10) * observe models * wf, yml * spelling, scheduling, comments --- .github/workflows/dbt_run_observability.yml | 45 ++++++ .../workflows/dbt_run_observability_full.yml | 45 ++++++ .github/workflows/dbt_test_daily.yml | 2 +- macros/streamline/streamline_udfs.sql | 2 +- .../streamline/bronze__streamline_ledgers.sql | 10 ++ .../bronze__streamline_ledgers_FR.sql | 10 ++ ...er_observability__ledgers_completeness.sql | 134 ++++++++++++++++ ...er_observability__ledgers_completeness.yml | 75 +++++++++ ...servability__ledgers_xref_completeness.sql | 143 ++++++++++++++++++ ...servability__ledgers_xref_completeness.yml | 68 +++++++++ ...servability__transactions_completeness.sql | 137 +++++++++++++++++ ...servability__transactions_completeness.yml | 64 ++++++++ models/sources.yml | 3 +- .../complete/streamline__ledgers_complete.sql | 43 ++++++ .../realtime/streamline__ledgers_realtime.sql | 27 ++-- .../core/streamline__chain_head_tail.sql | 18 ++- ...e__legders.sql => streamline__ledgers.sql} | 6 +- 17 files changed, 806 insertions(+), 26 deletions(-) create mode 100644 .github/workflows/dbt_run_observability.yml create mode 100644 .github/workflows/dbt_run_observability_full.yml create mode 100644 models/bronze/streamline/bronze__streamline_ledgers.sql create mode 100644 models/bronze/streamline/bronze__streamline_ledgers_FR.sql create mode 100644 models/silver/_observability/silver_observability__ledgers_completeness.sql create mode 100644 models/silver/_observability/silver_observability__ledgers_completeness.yml create mode 100644 models/silver/_observability/silver_observability__ledgers_xref_completeness.sql create mode 100644 models/silver/_observability/silver_observability__ledgers_xref_completeness.yml create mode 100644 models/silver/_observability/silver_observability__transactions_completeness.sql create mode 100644 models/silver/_observability/silver_observability__transactions_completeness.yml create mode 100644 models/streamline/core/complete/streamline__ledgers_complete.sql rename models/streamline/core/{streamline__legders.sql => streamline__ledgers.sql} (78%) diff --git a/.github/workflows/dbt_run_observability.yml b/.github/workflows/dbt_run_observability.yml new file mode 100644 index 0000000..673b31c --- /dev/null +++ b/.github/workflows/dbt_run_observability.yml @@ -0,0 +1,45 @@ +name: dbt_run_observability +run-name: dbt_run_observability + +on: + workflow_dispatch: + schedule: + # Runs “At minute 0 past every 8th hour.” (see https://crontab.guru) + - cron: '0 */8 * * *' + +env: + DBT_PROFILES_DIR: ./ + + ACCOUNT: "${{ vars.ACCOUNT }}" + ROLE: "${{ vars.ROLE }}" + USER: "${{ vars.USER }}" + PASSWORD: "${{ secrets.PASSWORD }}" + REGION: "${{ vars.REGION }}" + DATABASE: "${{ vars.DATABASE }}" + WAREHOUSE: "${{ vars.WAREHOUSE }}" + SCHEMA: "${{ vars.SCHEMA }}" + +concurrency: + group: ${{ github.workflow }} + +jobs: + run_dbt_jobs: + runs-on: ubuntu-latest + environment: + name: workflow_prod + + steps: + - uses: actions/checkout@v3 + + - uses: actions/setup-python@v4 + with: + python-version: "${{ vars.PYTHON_VERSION }}" + cache: "pip" + + - name: install dependencies + run: | + pip install -r requirements.txt + dbt deps + - name: Run DBT Jobs + run: | + dbt build -s "stellar_models,tag:observability" \ No newline at end of file diff --git a/.github/workflows/dbt_run_observability_full.yml b/.github/workflows/dbt_run_observability_full.yml new file mode 100644 index 0000000..5b65f02 --- /dev/null +++ b/.github/workflows/dbt_run_observability_full.yml @@ -0,0 +1,45 @@ +name: dbt_run_observability_full +run-name: dbt_run_observability_full + +on: + workflow_dispatch: + schedule: + # Runs “At 19:00 on day-of-month 1.” (see https://crontab.guru) + - cron: '0 19 1 * *' + +env: + DBT_PROFILES_DIR: ./ + + ACCOUNT: "${{ vars.ACCOUNT }}" + ROLE: "${{ vars.ROLE }}" + USER: "${{ vars.USER }}" + PASSWORD: "${{ secrets.PASSWORD }}" + REGION: "${{ vars.REGION }}" + DATABASE: "${{ vars.DATABASE }}" + WAREHOUSE: "${{ vars.WAREHOUSE }}" + SCHEMA: "${{ vars.SCHEMA }}" + +concurrency: + group: ${{ github.workflow }} + +jobs: + run_dbt_jobs: + runs-on: ubuntu-latest + environment: + name: workflow_prod + + steps: + - uses: actions/checkout@v3 + + - uses: actions/setup-python@v4 + with: + python-version: "${{ vars.PYTHON_VERSION }}" + cache: "pip" + + - name: install dependencies + run: | + pip install -r requirements.txt + dbt deps + - name: Run DBT Jobs + run: | + dbt build --vars '{"OBSERV_FULL_TEST":True}' -s "stellar_models,tag:observability" \ No newline at end of file diff --git a/.github/workflows/dbt_test_daily.yml b/.github/workflows/dbt_test_daily.yml index 84dac69..309b1b7 100644 --- a/.github/workflows/dbt_test_daily.yml +++ b/.github/workflows/dbt_test_daily.yml @@ -39,7 +39,7 @@ jobs: dbt deps - name: Run DBT Jobs run: | - dbt test -m "stellar_models,models/silver" "stellar_models,models/gold" --vars 'test_days_threshold: 1' + dbt test -m "stellar_models,models/silver" "stellar_models,models/gold" --vars 'test_days_threshold: 1' --exclude "stellar_models,tag:observability" continue-on-error: true diff --git a/macros/streamline/streamline_udfs.sql b/macros/streamline/streamline_udfs.sql index 49748fe..6b9ff20 100644 --- a/macros/streamline/streamline_udfs.sql +++ b/macros/streamline/streamline_udfs.sql @@ -3,7 +3,7 @@ OR REPLACE EXTERNAL FUNCTION streamline.udf_bulk_rest_api_v2( json OBJECT ) returns ARRAY {% if target.database == 'STELLAR' -%} - api_integration = aws_stellar_api_prod_v2 AS 'https://.execute-api.us-east-1.amazonaws.com/prod/udf_bulk_rest_api' + api_integration = aws_stellar_api_prod_v2 AS 'https://qavdasgp43.execute-api.us-east-1.amazonaws.com/prod/udf_bulk_rest_api' {% else %} api_integration = aws_stellar_api_stg_v2 AS 'https://q75hks23yb.execute-api.us-east-1.amazonaws.com/stg/udf_bulk_rest_api' {%- endif %} diff --git a/models/bronze/streamline/bronze__streamline_ledgers.sql b/models/bronze/streamline/bronze__streamline_ledgers.sql new file mode 100644 index 0000000..4396d58 --- /dev/null +++ b/models/bronze/streamline/bronze__streamline_ledgers.sql @@ -0,0 +1,10 @@ +{{ config ( + materialized = 'view' +) }} +{{ streamline_external_table_query_v2( + model = "streamline_ledgers", + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER)", + partition_name = "partition_key", + unique_key = "metadata", + other_cols = "data" +) }} diff --git a/models/bronze/streamline/bronze__streamline_ledgers_FR.sql b/models/bronze/streamline/bronze__streamline_ledgers_FR.sql new file mode 100644 index 0000000..33391fc --- /dev/null +++ b/models/bronze/streamline/bronze__streamline_ledgers_FR.sql @@ -0,0 +1,10 @@ +{{ config ( + materialized = 'view' +) }} +{{ streamline_external_table_FR_query_v2( + model = "streamline_ledgers", + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER)", + partition_name = "partition_key", + unique_key = "metadata", + other_cols = "data" +) }} diff --git a/models/silver/_observability/silver_observability__ledgers_completeness.sql b/models/silver/_observability/silver_observability__ledgers_completeness.sql new file mode 100644 index 0000000..bf27cb8 --- /dev/null +++ b/models/silver/_observability/silver_observability__ledgers_completeness.sql @@ -0,0 +1,134 @@ +{{ config( + materialized = 'incremental', + unique_key = 'test_timestamp', + full_refresh = false, + tags = ['observability'] +) }} + +WITH source AS ( + + SELECT + SEQUENCE, + block_timestamp, + LAG( + SEQUENCE, + 1 + ) over ( + ORDER BY + SEQUENCE ASC + ) AS prev_SEQUENCE + FROM + {{ ref('core__fact_ledgers') }} A + WHERE + block_timestamp < DATEADD( + HOUR, + -24, + SYSDATE() + ) + +{% if is_incremental() %} +AND ( + block_timestamp >= DATEADD( + HOUR, + -96,( + SELECT + MAX( + max_block_timestamp + ) + FROM + {{ this }} + ) + ) + OR ({% if var('OBSERV_FULL_TEST') %} + 1 = 1 + {% else %} + SEQUENCE >= ( + SELECT + MIN(VALUE) - 1 + FROM + ( + SELECT + ledgers_impacted_array + FROM + {{ this }} + qualify ROW_NUMBER() over ( + ORDER BY + test_timestamp DESC) = 1), LATERAL FLATTEN(input => ledgers_impacted_array)) + {% endif %}) +) +{% endif %} +), +seq_gen AS ( + SELECT + _id AS SEQUENCE + FROM + {{ source( + 'crosschain_silver', + 'number_sequence' + ) }} + WHERE + _id BETWEEN ( + SELECT + MIN(SEQUENCE) + FROM + source + ) + AND ( + SELECT + MAX(SEQUENCE) + FROM + source + ) +) +SELECT + 'sequences' AS test_name, + MIN( + b.sequence + ) AS min_SEQUENCE, + MAX( + b.sequence + ) AS max_SEQUENCE, + MIN( + b.block_timestamp + ) AS min_block_timestamp, + MAX( + b.block_timestamp + ) AS max_block_timestamp, + COUNT(1) AS ledgers_tested, + COUNT( + CASE + WHEN C.sequence IS NOT NULL THEN A.sequence + END + ) AS ledgers_impacted_count, + ARRAY_AGG( + CASE + WHEN C.sequence IS NOT NULL THEN A.sequence + END + ) within GROUP ( + ORDER BY + A.sequence + ) AS ledgers_impacted_array, + ARRAY_AGG( + DISTINCT CASE + WHEN C.sequence IS NOT NULL THEN OBJECT_CONSTRUCT( + 'prev_sequence', + C.prev_SEQUENCE, + 'SEQUENCE', + C.sequence + ) + END + ) AS test_failure_details, + SYSDATE() AS test_timestamp +FROM + seq_gen A + LEFT JOIN source b + ON A.sequence = b.sequence + LEFT JOIN source C + ON A.sequence > C.prev_SEQUENCE + AND A.sequence < C.sequence + AND C.sequence - C.prev_SEQUENCE <> 1 +WHERE + COALESCE( + b.prev_SEQUENCE, + C.prev_SEQUENCE + ) IS NOT NULL diff --git a/models/silver/_observability/silver_observability__ledgers_completeness.yml b/models/silver/_observability/silver_observability__ledgers_completeness.yml new file mode 100644 index 0000000..6bff938 --- /dev/null +++ b/models/silver/_observability/silver_observability__ledgers_completeness.yml @@ -0,0 +1,75 @@ +version: 2 +models: + - name: silver_observability__ledgers_completeness + description: Records of all blocks ledger gaps (missing ledgers) with a timestamp the test was run + tests: + - dbt_utils.unique_combination_of_columns: + combination_of_columns: + - TEST_TIMESTAMP + columns: + - name: MIN_SEQUENCE + description: The lowest block id in the test + tests: + - not_null + - dbt_expectations.expect_column_values_to_be_in_type_list: + column_type_list: + - NUMBER + - name: MAX_SEQUENCE + description: The highest block id in the test + tests: + - not_null + - dbt_expectations.expect_column_values_to_be_in_type_list: + column_type_list: + - NUMBER + - name: MIN_BLOCK_TIMESTAMP + description: The lowest block timestamp in the test + tests: + - not_null + - dbt_expectations.expect_column_values_to_be_in_type_list: + column_type_list: + - TIMESTAMP_NTZ + - name: MAX_BLOCK_TIMESTAMP + description: The highest block timestamp in the test + tests: + - not_null + - dbt_expectations.expect_row_values_to_have_recent_data: + datepart: day + interval: 2 + - dbt_expectations.expect_column_values_to_be_in_type_list: + column_type_list: + - TIMESTAMP_NTZ + - name: LEDGERS_TESTED + description: Count of blocks in the test + tests: + - not_null + - dbt_expectations.expect_column_values_to_be_in_type_list: + column_type_list: + - NUMBER + - name: LEDGERS_IMPACTED_COUNT + description: Count of block gaps in the test + tests: + - not_null + - dbt_expectations.expect_column_values_to_be_in_type_list: + column_type_list: + - NUMBER + - name: LEDGERS_IMPACTED_ARRAY + description: Array of affected blocks + tests: + - not_null + - dbt_expectations.expect_column_values_to_be_in_type_list: + column_type_list: + - ARRAY + - name: TEST_FAILURE_DETAILS + description: Array of details of the failure + tests: + - not_null + - dbt_expectations.expect_column_values_to_be_in_type_list: + column_type_list: + - ARRAY + - name: TEST_TIMESTAMP + description: When the test was run + tests: + - not_null + - dbt_expectations.expect_column_values_to_be_in_type_list: + column_type_list: + - TIMESTAMP_NTZ \ No newline at end of file diff --git a/models/silver/_observability/silver_observability__ledgers_xref_completeness.sql b/models/silver/_observability/silver_observability__ledgers_xref_completeness.sql new file mode 100644 index 0000000..c66df9c --- /dev/null +++ b/models/silver/_observability/silver_observability__ledgers_xref_completeness.sql @@ -0,0 +1,143 @@ +{{ config( + materialized = 'incremental', + unique_key = 'test_timestamp', + full_refresh = false, + tags = ['observability'] +) }} + +WITH bq AS ( + + SELECT + SEQUENCE, + block_timestamp, + successful_transaction_count, + failed_transaction_count, + operation_count, + tx_set_operation_count + FROM + {{ ref('core__fact_ledgers') }} A + WHERE + block_timestamp < DATEADD( + HOUR, + -24, + SYSDATE() + ) + +{% if is_incremental() %} +AND ( + block_timestamp >= DATEADD( + HOUR, + -96,( + SELECT + MAX( + max_block_timestamp + ) + FROM + {{ this }} + ) + ) + OR ({% if var('OBSERV_FULL_TEST') %} + 1 = 1 + {% else %} + SEQUENCE >= ( + SELECT + MIN(VALUE) - 1 + FROM + ( + SELECT + ledgers_impacted_array + FROM + {{ this }} + qualify ROW_NUMBER() over ( + ORDER BY + test_timestamp DESC) = 1), LATERAL FLATTEN(input => ledgers_impacted_array)) + {% endif %}) +) +{% endif %} +), +rpc AS ( + SELECT + SEQUENCE, + block_timestamp, + successful_transaction_count, + failed_transaction_count, + operation_count, + tx_set_operation_count + FROM + {{ ref('streamline__ledgers_complete') }} A + WHERE + block_timestamp < DATEADD( + HOUR, + -24, + SYSDATE() + ) + +{% if is_incremental() %} +AND ( + block_timestamp >= DATEADD( + HOUR, + -96,( + SELECT + MAX( + max_block_timestamp + ) + FROM + {{ this }} + ) + ) + OR ({% if var('OBSERV_FULL_TEST') %} + 1 = 1 + {% else %} + SEQUENCE >= ( + SELECT + MIN(VALUE) - 1 + FROM + ( + SELECT + ledgers_impacted_array + FROM + {{ this }} + qualify ROW_NUMBER() over ( + ORDER BY + test_timestamp DESC) = 1), LATERAL FLATTEN(input => ledgers_impacted_array)) + {% endif %}) +) +{% endif %} +) +SELECT + 'sequences' AS test_name, + MIN( + SEQUENCE + ) AS min_SEQUENCE, + MAX( + SEQUENCE + ) AS max_SEQUENCE, + MIN( + block_timestamp + ) AS min_block_timestamp, + MAX( + block_timestamp + ) AS max_block_timestamp, + COUNT(1) AS ledgers_tested, + COUNT( + CASE + WHEN bq.successful_transaction_count <> rpc.successful_transaction_count + OR bq.successful_transaction_count IS NULL THEN SEQUENCE + END + ) AS ledgers_impacted_count, + ARRAY_AGG( + CASE + WHEN bq.successful_transaction_count <> rpc.successful_transaction_count + OR bq.successful_transaction_count IS NULL THEN SEQUENCE + END + ) within GROUP ( + ORDER BY + SEQUENCE + ) AS ledgers_impacted_array, + SYSDATE() AS test_timestamp +FROM + bq full + OUTER JOIN rpc USING( + SEQUENCE, + block_timestamp + ) diff --git a/models/silver/_observability/silver_observability__ledgers_xref_completeness.yml b/models/silver/_observability/silver_observability__ledgers_xref_completeness.yml new file mode 100644 index 0000000..dcb90bc --- /dev/null +++ b/models/silver/_observability/silver_observability__ledgers_xref_completeness.yml @@ -0,0 +1,68 @@ +version: 2 +models: + - name: silver_observability__ledgers_xref_completeness + description: Records of all blocks ledger differences between hubble and rpc + tests: + - dbt_utils.unique_combination_of_columns: + combination_of_columns: + - TEST_TIMESTAMP + columns: + - name: MIN_SEQUENCE + description: The lowest block id in the test + tests: + - not_null + - dbt_expectations.expect_column_values_to_be_in_type_list: + column_type_list: + - NUMBER + - name: MAX_SEQUENCE + description: The highest block id in the test + tests: + - not_null + - dbt_expectations.expect_column_values_to_be_in_type_list: + column_type_list: + - NUMBER + - name: MIN_BLOCK_TIMESTAMP + description: The lowest block timestamp in the test + tests: + - not_null + - dbt_expectations.expect_column_values_to_be_in_type_list: + column_type_list: + - TIMESTAMP_NTZ + - name: MAX_BLOCK_TIMESTAMP + description: The highest block timestamp in the test + tests: + - not_null + - dbt_expectations.expect_row_values_to_have_recent_data: + datepart: day + interval: 2 + - dbt_expectations.expect_column_values_to_be_in_type_list: + column_type_list: + - TIMESTAMP_NTZ + - name: LEDGERS_TESTED + description: Count of blocks in the test + tests: + - not_null + - dbt_expectations.expect_column_values_to_be_in_type_list: + column_type_list: + - NUMBER + - name: LEDGERS_IMPACTED_COUNT + description: Count of block gaps in the test + tests: + - not_null + - dbt_expectations.expect_column_values_to_be_in_type_list: + column_type_list: + - NUMBER + - name: LEDGERS_IMPACTED_ARRAY + description: Array of affected blocks + tests: + - not_null + - dbt_expectations.expect_column_values_to_be_in_type_list: + column_type_list: + - ARRAY + - name: TEST_TIMESTAMP + description: When the test was run + tests: + - not_null + - dbt_expectations.expect_column_values_to_be_in_type_list: + column_type_list: + - TIMESTAMP_NTZ \ No newline at end of file diff --git a/models/silver/_observability/silver_observability__transactions_completeness.sql b/models/silver/_observability/silver_observability__transactions_completeness.sql new file mode 100644 index 0000000..6f08988 --- /dev/null +++ b/models/silver/_observability/silver_observability__transactions_completeness.sql @@ -0,0 +1,137 @@ +{{ config( + materialized = 'incremental', + unique_key = 'test_timestamp', + full_refresh = false, + tags = ['observability'] +) }} + +WITH summary_stats AS ( + + SELECT + MIN(SEQUENCE) AS min_SEQUENCE, + MAX(SEQUENCE) AS max_SEQUENCE, + MIN(block_timestamp) AS min_block_timestamp, + MAX(block_timestamp) AS max_block_timestamp, + COUNT(1) AS ledgers_tested + FROM + {{ ref('core__fact_ledgers') }} + WHERE + block_timestamp <= DATEADD('hour', -12, CURRENT_TIMESTAMP()) + +{% if is_incremental() %} +AND ( + SEQUENCE >= ( + SELECT + MIN(SEQUENCE) + FROM + ( + SELECT + MIN(SEQUENCE) AS SEQUENCE + FROM + {{ ref('core__fact_ledgers') }} + WHERE + block_timestamp BETWEEN DATEADD('hour', -96, CURRENT_TIMESTAMP()) + AND DATEADD('hour', -95, CURRENT_TIMESTAMP()) + UNION + SELECT + MIN(VALUE) - 1 AS SEQUENCE + FROM + ( + SELECT + ledgers_impacted_array + FROM + {{ this }} + qualify ROW_NUMBER() over ( + ORDER BY + test_timestamp DESC + ) = 1 + ), + LATERAL FLATTEN( + input => ledgers_impacted_array + ) + ) + ) {% if var('OBSERV_FULL_TEST') %} + OR SEQUENCE >= 0 + {% endif %} +) +{% endif %} +), +base_sequence AS ( + SELECT + SEQUENCE, + successful_transaction_count + failed_transaction_count AS transaction_count + FROM + {{ ref('core__fact_ledgers') }} + WHERE + SEQUENCE BETWEEN ( + SELECT + min_SEQUENCE + FROM + summary_stats + ) + AND ( + SELECT + max_SEQUENCE + FROM + summary_stats + ) +), +actual_tx_counts AS ( + SELECT + ledger_SEQUENCE AS SEQUENCE, + COUNT(1) AS transaction_count + FROM + {{ ref('core__fact_transactions') }} + WHERE + SEQUENCE BETWEEN ( + SELECT + min_SEQUENCE + FROM + summary_stats + ) + AND ( + SELECT + max_SEQUENCE + FROM + summary_stats + ) + GROUP BY + SEQUENCE +), +potential_missing_txs AS ( + SELECT + e.sequence + FROM + base_sequence e + LEFT OUTER JOIN actual_tx_counts A + ON e.sequence = A.sequence + WHERE + COALESCE( + A.transaction_count, + 0 + ) <> e.transaction_count +), +impacted_seqs AS ( + SELECT + COUNT(1) AS ledgers_impacted_count, + ARRAY_AGG(SEQUENCE) within GROUP ( + ORDER BY + SEQUENCE + ) AS ledgers_impacted_array + FROM + potential_missing_txs +) +SELECT + 'transactions' AS test_name, + min_sequence, + max_sequence, + min_block_timestamp, + max_block_timestamp, + ledgers_tested, + ledgers_impacted_count, + ledgers_impacted_array, + SYSDATE() AS test_timestamp +FROM + summary_stats + JOIN impacted_seqs + ON 1 = 1 diff --git a/models/silver/_observability/silver_observability__transactions_completeness.yml b/models/silver/_observability/silver_observability__transactions_completeness.yml new file mode 100644 index 0000000..e035e41 --- /dev/null +++ b/models/silver/_observability/silver_observability__transactions_completeness.yml @@ -0,0 +1,64 @@ +version: 2 +models: + - name: silver_observability__transactions_completeness + description: Records of all block transaction counts. + columns: + - name: MIN_SEQUENCE + description: The lowest block id in the test + tests: + - not_null + - dbt_expectations.expect_column_values_to_be_in_type_list: + column_type_list: + - NUMBER + - name: MAX_SEQUENCE + description: The highest block id in the test + tests: + - not_null + - dbt_expectations.expect_column_values_to_be_in_type_list: + column_type_list: + - NUMBER + - name: MIN_BLOCK_TIMESTAMP + description: The lowest block timestamp in the test + tests: + - not_null + - dbt_expectations.expect_column_values_to_be_in_type_list: + column_type_list: + - TIMESTAMP_NTZ + - name: MAX_BLOCK_TIMESTAMP + description: The highest block timestamp in the test + tests: + - not_null + - dbt_expectations.expect_row_values_to_have_recent_data: + datepart: day + interval: 2 + - dbt_expectations.expect_column_values_to_be_in_type_list: + column_type_list: + - TIMESTAMP_NTZ + - name: LEDGERS_TESTED + description: Count of blocks in the test + tests: + - not_null + - dbt_expectations.expect_column_values_to_be_in_type_list: + column_type_list: + - NUMBER + - name: LEDGERS_IMPACTED_COUNT + description: Count of block gaps in the test + tests: + - not_null + - dbt_expectations.expect_column_values_to_be_in_type_list: + column_type_list: + - NUMBER + - name: LEDGERS_IMPACTED_ARRAY + description: Array of affected blocks + tests: + - not_null + - dbt_expectations.expect_column_values_to_be_in_type_list: + column_type_list: + - ARRAY + - name: TEST_TIMESTAMP + description: When the test was run + tests: + - not_null + - dbt_expectations.expect_column_values_to_be_in_type_list: + column_type_list: + - TIMESTAMP_NTZ \ No newline at end of file diff --git a/models/sources.yml b/models/sources.yml index 9d9c6fa..c889cdf 100644 --- a/models/sources.yml +++ b/models/sources.yml @@ -3,7 +3,7 @@ version: 2 sources: - name: bronze_streamline database: streamline - schema: stellar + schema: "{{ 'stellar' if target.database == 'STELLAR' else 'stellar_dev' }}" tables: - name: accounts - name: contract_data @@ -14,6 +14,7 @@ sources: - name: history_trades - name: history_transactions - name: liquidity_pools + - name: streamline_ledgers - name: trust_lines - name: crosschain database: "{{ 'crosschain' if target.database == 'STELLAR' else 'crosschain_dev' }}" diff --git a/models/streamline/core/complete/streamline__ledgers_complete.sql b/models/streamline/core/complete/streamline__ledgers_complete.sql new file mode 100644 index 0000000..d08ea3f --- /dev/null +++ b/models/streamline/core/complete/streamline__ledgers_complete.sql @@ -0,0 +1,43 @@ +{{ config ( + materialized = "incremental", + incremental_strategy = 'merge', + unique_key = "sequence", + cluster_by = "block_timestamp::DATE", + merge_exclude_columns = ["inserted_timestamp"], + post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(sequence)" +) }} +-- depends_on: {{ ref('bronze__streamline_ledgers') }} + +SELECT + DATA :sequence :: INT AS SEQUENCE, + TO_TIMESTAMP( + DATA :closed_at + ) AS block_timestamp, + DATA :successful_transaction_count :: INT AS successful_transaction_count, + DATA :failed_transaction_count :: INT AS failed_transaction_count, + DATA :operation_count :: INT AS operation_count, + DATA :tx_set_operation_count :: INT AS tx_set_operation_count, + {{ dbt_utils.generate_surrogate_key( + ['sequence'] + ) }} AS complete_ledgers_id, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp, + '{{ invocation_id }}' AS _invocation_id +FROM + +{% if is_incremental() %} +{{ ref('bronze__streamline_ledgers') }} +WHERE + inserted_timestamp >= ( + SELECT + MAX(modified_timestamp) modified_timestamp + FROM + {{ this }} + ) +{% else %} + {{ ref('bronze__streamline_ledgers_FR') }} +{% endif %} + +qualify(ROW_NUMBER() over (PARTITION BY SEQUENCE +ORDER BY + inserted_timestamp DESC)) = 1 diff --git a/models/streamline/core/realtime/streamline__ledgers_realtime.sql b/models/streamline/core/realtime/streamline__ledgers_realtime.sql index 9f4ff9e..40145de 100644 --- a/models/streamline/core/realtime/streamline__ledgers_realtime.sql +++ b/models/streamline/core/realtime/streamline__ledgers_realtime.sql @@ -3,37 +3,36 @@ post_hook = fsc_utils.if_data_call_function_v2( func = 'streamline.udf_bulk_rest_api_v2', target = "{{this.schema}}.{{this.identifier}}", - params ={ "external_table" :"ledgers", - "sql_limit" :"500", - "producer_batch_size" :"500", - "worker_batch_size" :"500", + params ={ "external_table" :"streamline_ledgers", + "sql_limit" :"10000", + "producer_batch_size" :"10000", + "worker_batch_size" :"5000", "sql_source" :"{{this.identifier}}", - "order_by_column": "ledger_sequence" } + "order_by_column": "sequence" } ) ) }} WITH ledgers AS ( SELECT - ledger_sequence + SEQUENCE FROM - {{ ref("streamline__legders") }} - {# EXCEPT + {{ ref("streamline__ledgers") }} + EXCEPT SELECT - block_number + SEQUENCE FROM - {{ ref("streamline__blocks_complete") }} - #} + {{ ref("streamline__ledgers_complete") }} ) SELECT - ledger_sequence, + SEQUENCE, ROUND( - ledger_sequence, + SEQUENCE, -4 ) :: INT AS partition_key, {{ target.database }}.live.udf_api( 'GET', - '{Service}/{Authentication}/ledgers/' || ledger_sequence, + '{Service}/{Authentication}/ledgers/' || SEQUENCE, OBJECT_CONSTRUCT(), OBJECT_CONSTRUCT(), 'Vault/prod/stellar/quicknode/mainnet' diff --git a/models/streamline/core/streamline__chain_head_tail.sql b/models/streamline/core/streamline__chain_head_tail.sql index 5cc3cb9..0ddf1b9 100644 --- a/models/streamline/core/streamline__chain_head_tail.sql +++ b/models/streamline/core/streamline__chain_head_tail.sql @@ -9,11 +9,14 @@ WITH head AS ( {{ target.database }}.live.udf_api( 'GET', '{Service}/{Authentication}/ledgers?limit=1&order=desc', - OBJECT_CONSTRUCT(), + OBJECT_CONSTRUCT( + 'fsc-quantum-state', + 'livequery' + ), OBJECT_CONSTRUCT(), 'Vault/prod/stellar/quicknode/mainnet' ) :data :_embedded :records [0] AS DATA, - DATA :sequence :: INT AS ledger_sequence, + DATA :sequence :: INT AS SEQUENCE, DATA :closed_at :: datetime AS block_timestamp ), tail AS ( @@ -21,17 +24,20 @@ tail AS ( {{ target.database }}.live.udf_api( 'GET', '{Service}/{Authentication}/ledgers?limit=1&order=asc', - OBJECT_CONSTRUCT(), + OBJECT_CONSTRUCT( + 'fsc-quantum-state', + 'livequery' + ), OBJECT_CONSTRUCT(), 'Vault/prod/stellar/quicknode/mainnet' ) :data :_embedded :records [0] AS DATA, - DATA :sequence :: INT AS ledger_sequence, + DATA :sequence :: INT AS SEQUENCE, DATA :closed_at :: datetime AS block_timestamp ) SELECT - A.ledger_sequence AS head_ledger_sequence, + A.sequence AS head_sequence, A.block_timestamp AS head_block_timestamp, - b.ledger_sequence AS tail_ledger_sequence, + b.sequence AS tail_sequence, b.block_timestamp AS tail_block_timestamp FROM head A diff --git a/models/streamline/core/streamline__legders.sql b/models/streamline/core/streamline__ledgers.sql similarity index 78% rename from models/streamline/core/streamline__legders.sql rename to models/streamline/core/streamline__ledgers.sql index 540d965..1ffbba7 100644 --- a/models/streamline/core/streamline__legders.sql +++ b/models/streamline/core/streamline__ledgers.sql @@ -4,7 +4,7 @@ ) }} SELECT - _id AS ledger_sequence + _id AS SEQUENCE FROM {{ source( 'crosschain_silver', @@ -13,13 +13,13 @@ FROM WHERE _id >= ( SELECT - MIN(tail_ledger_sequence) + MIN(tail_sequence) FROM {{ ref('streamline__chain_head_tail') }} ) AND _id <= ( SELECT - MAX(head_ledger_sequence) + MAX(head_sequence) FROM {{ ref('streamline__chain_head_tail') }} )