An 3414/data observability (#605)

* stash

* workflows

* threads

* prod

* better logic

* updates

* config
This commit is contained in:
Austin 2023-07-12 21:41:22 -04:00 committed by GitHub
parent 7fcb0c6c2f
commit f2a9b85150
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
28 changed files with 1027 additions and 23 deletions

View File

@ -0,0 +1,47 @@
name: dbt_run_full_observability
run-name: dbt_run_full_observability
on:
workflow_dispatch:
schedule:
# Runs “At 00:00 on day-of-month 1.” (see https://crontab.guru)
- cron: '0 0 1 * *'
env:
DBT_PROFILES_DIR: ./
ACCOUNT: "${{ vars.ACCOUNT }}"
ROLE: "${{ vars.ROLE }}"
USER: "${{ vars.USER }}"
PASSWORD: "${{ secrets.PASSWORD }}"
REGION: "${{ vars.REGION }}"
DATABASE: "${{ vars.DATABASE }}"
WAREHOUSE: "${{ vars.WAREHOUSE }}"
SCHEMA: "${{ vars.SCHEMA }}"
concurrency:
group: ${{ github.workflow }}
jobs:
run_dbt_jobs:
runs-on: ubuntu-latest
environment:
name: workflow_prod_2xl
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v1
with:
python-version: "3.7.x"
- name: install dependencies
run: |
pip3 install dbt-snowflake==${{ vars.DBT_VERSION }} cli_passthrough requests click
dbt deps
- name: Run DBT Jobs
run: |
dbt run --threads 2 --vars '{"OBSERV_FULL_TEST":True}' -m models/silver/_observability

View File

@ -41,4 +41,4 @@ jobs:
dbt deps
- name: Run DBT Jobs
run: |
dbt run --exclude tag:core models/silver/streamline/ models/ethereum_share models/silver/balances/ models/silver/silver__decoded_logs.sql models/beacon_chain models/bronze/api_udf/nft models/silver/api_udf/silver__nft_metadata_reads.sql models/silver/core models/silver/abis
dbt run --exclude tag:core models/silver/streamline/ models/ethereum_share models/silver/balances/ models/silver/silver__decoded_logs.sql models/beacon_chain models/bronze/api_udf/nft models/silver/api_udf/silver__nft_metadata_reads.sql models/silver/core models/silver/abis models/silver/_observability

View File

@ -4,8 +4,8 @@ run-name: dbt_run_scheduled_real_time
on:
workflow_dispatch:
schedule:
# Runs "every 20 minutes" (see https://crontab.guru)
- cron: '10-59/20 * * * *'
# Runs at minute 10, 30, and 50 (see https://crontab.guru)
- cron: '10,30,50 * * * *'
env:
DBT_PROFILES_DIR: ./

View File

@ -4,8 +4,8 @@ run-name: dbt_run_streamline_chainhead
on:
workflow_dispatch:
schedule:
# Runs "at every 10th minute" (see https://crontab.guru)
- cron: '*/10 * * * *'
# Runs at minute 0, 20, and 40 (see https://crontab.guru)
- cron: '0,20,40 * * * *'
env:
DBT_PROFILES_DIR: ./
@ -26,7 +26,7 @@ jobs:
run_dbt_jobs:
runs-on: ubuntu-latest
environment:
name: workflow_prod_backfill
name: workflow_prod
steps:
- uses: actions/checkout@v3

View File

@ -4,8 +4,8 @@ run-name: dbt_run_streamline_decoder
on:
workflow_dispatch:
schedule:
# Runs "every 30 mins" (see https://crontab.guru)
- cron: '*/30 * * * *'
# Runs at minute 0, 20, and 40 (see https://crontab.guru)
- cron: '0,20,40 * * * *'
env:
DBT_PROFILES_DIR: ./

View File

@ -4,8 +4,8 @@ run-name: dbt_test_intraday
on:
workflow_dispatch:
schedule:
# Runs “At minute 0 past every 4th hour.” (see https://crontab.guru)
- cron: '0 */4 * * *'
# Runs “At minute 20 past every 4th hour.” (see https://crontab.guru)
- cron: '20 */4 * * *'
env:
DBT_PROFILES_DIR: ./
@ -42,6 +42,7 @@ jobs:
- name: Run DBT Jobs
run: |
dbt test -m tag:recent_test
dbt run -m models/silver/_observability

View File

@ -54,3 +54,4 @@ vars:
STREAMLINE_RUN_HISTORY: False
UPDATE_SNOWFLAKE_TAGS: True
WAIT: 0
OBSERV_FULL_TEST: False

View File

@ -63,3 +63,41 @@ WHERE
model_tx_hash IS NULL
OR model_block_number IS NULL
{% endmacro %}
{% macro missing_confirmed_txs(
model1,
model2
) %}
WITH txs_base AS (
SELECT
block_number AS base_block_number,
block_hash AS base_block_hash,
tx_hash AS base_tx_hash
FROM
{{ model1 }}
),
model_name AS (
SELECT
block_number AS model_block_number,
block_hash AS model_block_hash,
tx_hash AS model_tx_hash
FROM
{{ model2 }}
)
SELECT
DISTINCT base_block_number AS block_number
FROM
txs_base
LEFT JOIN model_name
ON base_block_number = model_block_number
AND base_tx_hash = model_tx_hash
AND base_block_hash = model_block_hash
WHERE
model_tx_hash IS NULL
AND model_block_number <= (
SELECT
MAX(base_block_number)
FROM
txs_base
)
{% endmacro %}

View File

@ -0,0 +1,166 @@
{{ config(
materialized = 'incremental',
unique_key = 'test_timestamp',
full_refresh = false
) }}
WITH summary_stats AS (
SELECT
MIN(block_number) AS min_block,
MAX(block_number) AS max_block,
MIN(block_timestamp) AS min_block_timestamp,
MAX(block_timestamp) AS max_block_timestamp,
COUNT(1) AS blocks_tested
FROM
{{ ref('silver__blocks') }}
WHERE
block_timestamp <= DATEADD('hour', -12, CURRENT_TIMESTAMP())
{% if is_incremental() %}
AND (
block_number >= (
SELECT
MIN(block_number)
FROM
(
SELECT
MIN(block_number) AS block_number
FROM
{{ ref('silver__blocks') }}
WHERE
block_timestamp BETWEEN DATEADD('hour', -96, CURRENT_TIMESTAMP())
AND DATEADD('hour', -95, CURRENT_TIMESTAMP())
UNION
SELECT
MIN(VALUE) - 1 AS block_number
FROM
(
SELECT
blocks_impacted_array
FROM
{{ this }}
qualify ROW_NUMBER() over (
ORDER BY
test_timestamp DESC
) = 1
),
LATERAL FLATTEN(
input => blocks_impacted_array
)
)
) {% if var('OBSERV_FULL_TEST') %}
OR block_number >= 0
{% endif %}
)
{% endif %}
),
block_range AS (
SELECT
_id AS block_number
FROM
{{ source(
'crosschain_silver',
'number_sequence'
) }}
WHERE
_id BETWEEN (
SELECT
min_block
FROM
summary_stats
)
AND (
SELECT
max_block
FROM
summary_stats
)
),
blocks AS (
SELECT
l.block_number,
block_timestamp,
LAG(
l.block_number,
1
) over (
ORDER BY
l.block_number ASC
) AS prev_BLOCK_NUMBER
FROM
{{ ref("silver__blocks") }}
l
INNER JOIN block_range b
ON l.block_number = b.block_number
AND l.block_number >= (
SELECT
MIN(block_number)
FROM
block_range
)
),
block_gen AS (
SELECT
_id AS block_number
FROM
{{ source(
'crosschain_silver',
'number_sequence'
) }}
WHERE
_id BETWEEN (
SELECT
MIN(block_number)
FROM
blocks
)
AND (
SELECT
MAX(block_number)
FROM
blocks
)
)
SELECT
'blocks' AS test_name,
MIN(
b.block_number
) AS min_block,
MAX(
b.block_number
) AS max_block,
MIN(
b.block_timestamp
) AS min_block_timestamp,
MAX(
b.block_timestamp
) AS max_block_timestamp,
COUNT(1) AS blocks_tested,
COUNT(
CASE
WHEN C.block_number IS NOT NULL THEN A.block_number
END
) AS blocks_impacted_count,
ARRAY_AGG(
CASE
WHEN C.block_number IS NOT NULL THEN A.block_number
END
) within GROUP (
ORDER BY
A.block_number
) AS blocks_impacted_array,
CURRENT_TIMESTAMP AS test_timestamp
FROM
block_gen A
LEFT JOIN blocks b
ON A.block_number = b.block_number
LEFT JOIN blocks C
ON A.block_number > C.prev_block_number
AND A.block_number < C.block_number
AND C.block_number - C.prev_block_number <> 1
WHERE
COALESCE(
b.block_number,
C.block_number
) IS NOT NULL

View File

@ -0,0 +1,101 @@
{{ config(
materialized = 'view'
) }}
SELECT
column1 AS block_number
FROM
(
VALUES
(3804951),
(3804282),
(3805213),
(3805274),
(3805144),
(3804909),
(3804375),
(3805171),
(3805210),
(3804357),
(3804917),
(3805111),
(3805195),
(3804880),
(3805005),
(3804985),
(3805056),
(3805047),
(3805177),
(3805129),
(3804887),
(3804982),
(3804898),
(3804931),
(3804939),
(3805099),
(3805119),
(3805027),
(3804885),
(3805174),
(3804945),
(3805147),
(3804005),
(3805207),
(3804216),
(3805224),
(3805042),
(3805126),
(3805141),
(3805091),
(3804347),
(3805247),
(3805132),
(3805262),
(3804934),
(3804958),
(3804962),
(3804299),
(3804906),
(3804953),
(3805228),
(3805016),
(3805222),
(3805064),
(3805072),
(3804222),
(3805115),
(3805279),
(3804874),
(3804913),
(3804992),
(3805084),
(3805258),
(3805157),
(3804893),
(3805061),
(3805019),
(3804973),
(3804920),
(3805230),
(3804901),
(3805266),
(3805038),
(3804969),
(3805190),
(3805200),
(3805153),
(3805067),
(3804865),
(3804988),
(3804975),
(3804926),
(3805010),
(3804315),
(3804309),
(3805272),
(3805249),
(3804227),
(3805202),
(3805033),
(3805022)
) AS block_number(column1)

View File

@ -0,0 +1,121 @@
{{ config(
materialized = 'incremental',
unique_key = 'test_timestamp',
full_refresh = false
) }}
WITH summary_stats AS (
SELECT
MIN(block_number) AS min_block,
MAX(block_number) AS max_block,
MIN(block_timestamp) AS min_block_timestamp,
MAX(block_timestamp) AS max_block_timestamp,
COUNT(1) AS blocks_tested
FROM
{{ ref('silver__blocks') }}
WHERE
block_timestamp <= DATEADD('hour', -12, CURRENT_TIMESTAMP())
{% if is_incremental() %}
AND (
block_number >= (
SELECT
MIN(block_number)
FROM
(
SELECT
MIN(block_number) AS block_number
FROM
{{ ref('silver__blocks') }}
WHERE
block_timestamp BETWEEN DATEADD('hour', -96, CURRENT_TIMESTAMP())
AND DATEADD('hour', -95, CURRENT_TIMESTAMP())
UNION
SELECT
MIN(VALUE) - 1 AS block_number
FROM
(
SELECT
blocks_impacted_array
FROM
{{ this }}
qualify ROW_NUMBER() over (
ORDER BY
test_timestamp DESC
) = 1
),
LATERAL FLATTEN(
input => blocks_impacted_array
)
)
) {% if var('OBSERV_FULL_TEST') %}
OR block_number >= 0
{% endif %}
)
{% endif %}
),
block_range AS (
SELECT
_id AS block_number
FROM
{{ source(
'crosschain_silver',
'number_sequence'
) }}
WHERE
_id BETWEEN (
SELECT
min_block
FROM
summary_stats
)
AND (
SELECT
max_block
FROM
summary_stats
)
),
broken_blocks AS (
SELECT
DISTINCT block_number
FROM
{{ ref("silver__receipts") }}
r
LEFT JOIN {{ ref("silver__logs") }}
l USING (
block_number,
tx_hash
)
JOIN block_range USING (block_number)
WHERE
l.tx_hash IS NULL
AND ARRAY_SIZE(
r.logs
) > 0
),
impacted_blocks AS (
SELECT
COUNT(1) AS blocks_impacted_count,
ARRAY_AGG(block_number) within GROUP (
ORDER BY
block_number
) AS blocks_impacted_array
FROM
broken_blocks
)
SELECT
'event_logs' AS test_name,
min_block,
max_block,
min_block_timestamp,
max_block_timestamp,
blocks_tested,
blocks_impacted_count,
blocks_impacted_array,
CURRENT_TIMESTAMP() AS test_timestamp
FROM
summary_stats
JOIN impacted_blocks
ON 1 = 1

View File

@ -0,0 +1,119 @@
{{ config(
materialized = 'incremental',
unique_key = 'test_timestamp',
full_refresh = false
) }}
WITH summary_stats AS (
SELECT
MIN(block_number) AS min_block,
MAX(block_number) AS max_block,
MIN(block_timestamp) AS min_block_timestamp,
MAX(block_timestamp) AS max_block_timestamp,
COUNT(1) AS blocks_tested
FROM
{{ ref('silver__blocks') }}
WHERE
block_timestamp <= DATEADD('hour', -12, CURRENT_TIMESTAMP())
{% if is_incremental() %}
AND (
block_number >= (
SELECT
MIN(block_number)
FROM
(
SELECT
MIN(block_number) AS block_number
FROM
{{ ref('silver__blocks') }}
WHERE
block_timestamp BETWEEN DATEADD('hour', -96, CURRENT_TIMESTAMP())
AND DATEADD('hour', -95, CURRENT_TIMESTAMP())
UNION
SELECT
MIN(VALUE) - 1 AS block_number
FROM
(
SELECT
blocks_impacted_array
FROM
{{ this }}
qualify ROW_NUMBER() over (
ORDER BY
test_timestamp DESC
) = 1
),
LATERAL FLATTEN(
input => blocks_impacted_array
)
)
) {% if var('OBSERV_FULL_TEST') %}
OR block_number >= 0
{% endif %}
)
{% endif %}
),
block_range AS (
SELECT
_id AS block_number
FROM
{{ source(
'crosschain_silver',
'number_sequence'
) }}
WHERE
_id BETWEEN (
SELECT
min_block
FROM
summary_stats
)
AND (
SELECT
max_block
FROM
summary_stats
)
),
broken_blocks AS (
SELECT
DISTINCT block_number
FROM
{{ ref("silver__transactions") }}
t
LEFT JOIN {{ ref("silver__receipts") }}
r USING (
block_number,
tx_hash,
block_hash
)
JOIN block_range USING (block_number)
WHERE
r.tx_hash IS NULL
),
impacted_blocks AS (
SELECT
COUNT(1) AS blocks_impacted_count,
ARRAY_AGG(block_number) within GROUP (
ORDER BY
block_number
) AS blocks_impacted_array
FROM
broken_blocks
)
SELECT
'receipts' AS test_name,
min_block,
max_block,
min_block_timestamp,
max_block_timestamp,
blocks_tested,
blocks_impacted_count,
blocks_impacted_array,
CURRENT_TIMESTAMP() AS test_timestamp
FROM
summary_stats
JOIN impacted_blocks
ON 1 = 1

View File

@ -0,0 +1,125 @@
{{ config(
materialized = 'incremental',
unique_key = 'test_timestamp',
full_refresh = false
) }}
WITH summary_stats AS (
SELECT
MIN(block_number) AS min_block,
MAX(block_number) AS max_block,
MIN(block_timestamp) AS min_block_timestamp,
MAX(block_timestamp) AS max_block_timestamp,
COUNT(1) AS blocks_tested
FROM
{{ ref('silver__blocks') }}
WHERE
block_timestamp <= DATEADD('hour', -12, CURRENT_TIMESTAMP())
{% if is_incremental() %}
AND (
block_number >= (
SELECT
MIN(block_number)
FROM
(
SELECT
MIN(block_number) AS block_number
FROM
{{ ref('silver__blocks') }}
WHERE
block_timestamp BETWEEN DATEADD('hour', -96, CURRENT_TIMESTAMP())
AND DATEADD('hour', -95, CURRENT_TIMESTAMP())
UNION
SELECT
MIN(VALUE) - 1 AS block_number
FROM
(
SELECT
blocks_impacted_array
FROM
{{ this }}
qualify ROW_NUMBER() over (
ORDER BY
test_timestamp DESC
) = 1
),
LATERAL FLATTEN(
input => blocks_impacted_array
)
)
) {% if var('OBSERV_FULL_TEST') %}
OR block_number >= 0
{% endif %}
)
{% endif %}
),
block_range AS (
SELECT
_id AS block_number
FROM
{{ source(
'crosschain_silver',
'number_sequence'
) }}
WHERE
_id BETWEEN (
SELECT
min_block
FROM
summary_stats
)
AND (
SELECT
max_block
FROM
summary_stats
)
),
broken_blocks AS (
SELECT
DISTINCT block_number
FROM
{{ ref("silver__transactions") }}
tx
LEFT JOIN {{ ref("silver__traces") }}
tr USING (
block_number,
tx_hash
)
JOIN block_range USING (block_number)
WHERE
tr.tx_hash IS NULL
),
impacted_blocks AS (
SELECT
COUNT(1) AS blocks_impacted_count,
ARRAY_AGG(block_number) within GROUP (
ORDER BY
block_number
) AS blocks_impacted_array
FROM
broken_blocks
WHERE
block_number NOT IN (
SELECT
block_number
FROM
{{ ref("silver_observability__excluded_trace_blocks") }}
)
)
SELECT
'traces' AS test_name,
min_block,
max_block,
min_block_timestamp,
max_block_timestamp,
blocks_tested,
blocks_impacted_count,
blocks_impacted_array,
CURRENT_TIMESTAMP() AS test_timestamp
FROM
summary_stats
JOIN impacted_blocks
ON 1 = 1

View File

@ -0,0 +1,119 @@
{{ config(
materialized = 'incremental',
unique_key = 'test_timestamp',
full_refresh = false
) }}
WITH summary_stats AS (
SELECT
MIN(block_number) AS min_block,
MAX(block_number) AS max_block,
MIN(block_timestamp) AS min_block_timestamp,
MAX(block_timestamp) AS max_block_timestamp,
COUNT(1) AS blocks_tested
FROM
{{ ref('silver__blocks') }}
WHERE
block_timestamp <= DATEADD('hour', -12, CURRENT_TIMESTAMP())
{% if is_incremental() %}
AND (
block_number >= (
SELECT
MIN(block_number)
FROM
(
SELECT
MIN(block_number) AS block_number
FROM
{{ ref('silver__blocks') }}
WHERE
block_timestamp BETWEEN DATEADD('hour', -96, CURRENT_TIMESTAMP())
AND DATEADD('hour', -95, CURRENT_TIMESTAMP())
UNION
SELECT
MIN(VALUE) - 1 AS block_number
FROM
(
SELECT
blocks_impacted_array
FROM
{{ this }}
qualify ROW_NUMBER() over (
ORDER BY
test_timestamp DESC
) = 1
),
LATERAL FLATTEN(
input => blocks_impacted_array
)
)
) {% if var('OBSERV_FULL_TEST') %}
OR block_number >= 0
{% endif %}
)
{% endif %}
),
block_range AS (
SELECT
_id AS block_number
FROM
{{ source(
'crosschain_silver',
'number_sequence'
) }}
WHERE
_id BETWEEN (
SELECT
min_block
FROM
summary_stats
)
AND (
SELECT
max_block
FROM
summary_stats
)
),
broken_blocks AS (
SELECT
DISTINCT block_number
FROM
{{ ref("silver__confirmed_blocks") }}
b
LEFT JOIN {{ ref("silver__transactions") }}
t USING (
block_number,
tx_hash,
block_hash
)
JOIN block_range USING (block_number)
WHERE
t.tx_hash IS NULL
),
impacted_blocks AS (
SELECT
COUNT(1) AS blocks_impacted_count,
ARRAY_AGG(block_number) within GROUP (
ORDER BY
block_number
) AS blocks_impacted_array
FROM
broken_blocks
)
SELECT
'transactions' AS test_name,
min_block,
max_block,
min_block_timestamp,
max_block_timestamp,
blocks_tested,
blocks_impacted_count,
blocks_impacted_array,
CURRENT_TIMESTAMP() AS test_timestamp
FROM
summary_stats
JOIN impacted_blocks
ON 1 = 1

View File

@ -0,0 +1,9 @@
{{ config (
materialized = 'view',
tags = ['full_test']
) }}
SELECT
*
FROM
{{ ref('silver__confirmed_blocks') }}

View File

@ -0,0 +1,34 @@
version: 2
models:
- name: test_silver__confirmed_blocks_full
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- TX_HASH
columns:
- name: BLOCK_NUMBER
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- name: BLOCK_HASH
tests:
- not_null
- dbt_expectations.expect_column_values_to_match_regex:
regex: 0[xX][0-9a-fA-F]+
- name: TX_HASH
tests:
- not_null
- dbt_expectations.expect_column_values_to_match_regex:
regex: 0[xX][0-9a-fA-F]+
- name: _INSERTED_TIMESTAMP
tests:
- not_null
- dbt_expectations.expect_row_values_to_have_recent_data:
datepart: day
interval: 1
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_NTZ
- TIMESTAMP_LTZ

View File

@ -0,0 +1,27 @@
{{ config (
materialized = 'view',
tags = ['recent_test']
) }}
WITH last_3_days AS (
SELECT
block_number
FROM
{{ ref("_max_block_by_date") }}
qualify ROW_NUMBER() over (
ORDER BY
block_number DESC
) = 3
)
SELECT
*
FROM
{{ ref('silver__confirmed_blocks') }}
WHERE
block_number >= (
SELECT
block_number
FROM
last_3_days
)

View File

@ -0,0 +1,34 @@
version: 2
models:
- name: test_silver__confirmed_blocks_recent
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- TX_HASH
columns:
- name: BLOCK_NUMBER
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- name: BLOCK_HASH
tests:
- not_null
- dbt_expectations.expect_column_values_to_match_regex:
regex: 0[xX][0-9a-fA-F]+
- name: TX_HASH
tests:
- not_null
- dbt_expectations.expect_column_values_to_match_regex:
regex: 0[xX][0-9a-fA-F]+
- name: _INSERTED_TIMESTAMP
tests:
- not_null
- dbt_expectations.expect_row_values_to_have_recent_data:
datepart: hour
interval: 3
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_NTZ
- TIMESTAMP_LTZ

View File

@ -15,7 +15,7 @@ WITH base AS (
WHERE
block_timestamp > DATEADD(
'day',
-3,
-5,
CURRENT_DATE
)
GROUP BY

View File

@ -79,4 +79,9 @@ FROM
block_number
FROM
{{ ref("_missing_txs") }}
UNION
SELECT
block_number
FROM
{{ ref("_unconfirmed_blocks") }}
)

View File

@ -86,4 +86,9 @@ FROM
block_number
FROM
{{ ref("_missing_traces") }}
UNION
SELECT
block_number
FROM
{{ ref("_unconfirmed_blocks") }}
)

View File

@ -70,4 +70,9 @@ FROM
block_number
FROM
{{ ref("_missing_txs") }}
UNION
SELECT
block_number
FROM
{{ ref("_unconfirmed_blocks") }}
)

View File

@ -2,19 +2,31 @@
materialized = "ephemeral"
) }}
WITH lookback AS (
SELECT
MAX(block_number) AS block_number
FROM
{{ ref("silver__blocks") }}
WHERE
block_timestamp :: DATE = CURRENT_DATE() - 3
)
SELECT
DISTINCT tx.block_number AS block_number
DISTINCT t.block_number AS block_number
FROM
{{ ref("silver__transactions") }}
tx
t
LEFT JOIN {{ ref("silver__receipts") }}
r
ON tx.block_number = r.block_number
AND tx.tx_hash = r.tx_hash
WHERE
tx.block_timestamp >= DATEADD(
'day',
-2,
CURRENT_DATE
r USING (
block_number,
block_hash,
tx_hash
)
WHERE
r.tx_hash IS NULL
AND t.block_number >= (
SELECT
block_number
FROM
lookback
)
AND r.tx_hash IS NULL

View File

@ -3,7 +3,7 @@
) }}
SELECT
DISTINCT tx.block_number AS block_number
DISTINCT tx.block_number block_number
FROM
{{ ref("silver__transactions") }}
tx

View File

@ -0,0 +1,32 @@
{{ config (
materialized = "ephemeral"
) }}
WITH lookback AS (
SELECT
MAX(block_number) AS block_number
FROM
{{ ref("silver__blocks") }}
WHERE
block_timestamp :: DATE = CURRENT_DATE() - 3
)
SELECT
DISTINCT cb.block_number AS block_number
FROM
{{ ref("silver__confirmed_blocks") }}
cb
LEFT JOIN {{ ref("silver__transactions") }}
txs USING (
block_number,
block_hash,
tx_hash
)
WHERE
txs.tx_hash IS NULL
AND cb.block_number >= (
SELECT
block_number
FROM
lookback
)

View File

@ -100,6 +100,7 @@ sources:
- name: asset_metadata_coin_market_cap
- name: token_prices_priority_hourly
- name: asset_metadata_priority
- name: number_sequence
- name: streamline_crosschain
database: streamline
schema: crosschain

View File

@ -0,0 +1 @@
{{ missing_confirmed_txs(ref("test_silver__confirmed_blocks_full"), ref("test_silver__transactions_full")) }}

View File

@ -0,0 +1 @@
{{ missing_confirmed_txs(ref("test_silver__confirmed_blocks_recent"), ref("test_silver__transactions_recent")) }}