expand load to past partition

This commit is contained in:
Jack Forgash 2023-07-25 15:49:05 -06:00
parent 4c326fece1
commit 17f44e7fca
5 changed files with 171 additions and 45 deletions

View File

@ -1,23 +1,21 @@
{% macro partition_batch_load(batch_size) %}
{% if is_incremental() %}
_partition_by_block_number BETWEEN (
_partition_by_block_number BETWEEN (
SELECT
MAX(_partition_by_block_number) - 10000
FROM
{{ this }}
)
AND (
(
SELECT
MAX(_partition_by_block_number)
FROM
{{ this }}
)
AND (
(
SELECT
MAX(_partition_by_block_number)
FROM
{{ this }}
) + {{ batch_size }}
)
) + {{ batch_size }}
)
{%- else -%}
_partition_by_block_number BETWEEN 9820000
AND 10000000
{% endif %}
@ -30,24 +28,21 @@
) %}
{% if is_incremental() %}
_partition_by_block_number BETWEEN (
_partition_by_block_number BETWEEN (
SELECT
MAX(_partition_by_block_number) - {{ front_buffer }}
FROM
{{ this }}
)
AND (
(
SELECT
MAX(_partition_by_block_number) - {{ front_buffer }}
MAX(_partition_by_block_number)
FROM
{{ this }}
)
AND (
(
SELECT
MAX(_partition_by_block_number)
FROM
{{ this }}
) + {{ batch_size }} + {{ end_buffer }}
)
) + {{ batch_size }} + {{ end_buffer }}
)
{%- else -%}
TRUE
{% endif %}
{%- endmacro %}

View File

@ -7,14 +7,17 @@
tags = ['load', 'load_blocks']
) }}
WITH missing_blocks AS (
WITH {% if var("MANUAL_FIX") %}
missing_blocks AS (
SELECT
_partition_by_block_number,
missing_block_id
FROM
{{ target.database }}.tests.streamline_block_gaps
),
{% endif %}
SELECT
_partition_by_block_number,
missing_block_id
FROM
{{ target.database }}.tests.streamline_block_gaps
),
blocks_json AS (
SELECT
block_id,
@ -40,7 +43,7 @@ blocks_json AS (
missing_blocks
)
{% else %}
WHERE
WHERE
{{ partition_batch_load(150000) }}
{% endif %}
)

View File

@ -7,17 +7,20 @@
tags = ['load', 'load_shards']
) }}
WITH missing_shards AS (
WITH {% if var("MANUAL_FIX") %}
missing_shards AS (
SELECT
_partition_by_block_number,
VALUE AS block_id
FROM
{{ target.database }}.tests.chunk_gaps,
LATERAL FLATTEN(
input => blocks_to_walk
)
),
{% endif %}
SELECT
_partition_by_block_number,
VALUE AS block_id
FROM
{{ target.database }}.tests.chunk_gaps,
LATERAL FLATTEN(
input => blocks_to_walk
)
),
shards_json AS (
SELECT
block_id,
@ -49,7 +52,7 @@ shards_json AS (
missing_shards
)
{% else %}
WHERE
WHERE
{{ partition_batch_load(150000) }}
{% endif %}
)

View File

@ -0,0 +1,37 @@
{{ config(
severity = 'warn',
tags = ['recent_gap_test']
) }}
WITH recent_blocks AS (
SELECT
*
FROM
{{ ref('silver__streamline_blocks') }}
WHERE
block_timestamp :: DATE >= CURRENT_DATE - INTERVAL '2 days'
),
silver_blocks AS (
SELECT
block_id,
block_id - 1 AS missing_block_id,
block_timestamp,
block_hash,
prev_hash,
LAG(block_hash) over (
ORDER BY
block_timestamp ASC,
block_id ASC
) AS prior_hash,
_partition_by_block_number,
CURRENT_TIMESTAMP AS _test_timestamp
FROM
recent_blocks
)
SELECT
*
FROM
silver_blocks
WHERE
prior_hash <> prev_hash

View File

@ -0,0 +1,88 @@
{{ config(
severity = 'warn',
tags = ['recent_gap_test']
) }}
WITH recent_blocks AS (
SELECT
*
FROM
{{ ref('silver__streamline_blocks') }}
WHERE
block_timestamp :: DATE >= CURRENT_DATE - INTERVAL '2 days'
),
recent_chunks AS (
SELECT
*
FROM
{{ ref('silver__streamline_chunks') }}
WHERE
block_id >= (
SELECT
MIN(block_id)
FROM
recent_blocks
)
),
block_chunks_included AS (
SELECT
block_id,
header :chunks_included AS chunk_count_block_header,
_partition_by_block_number
FROM
recent_blocks
),
chunks_per_block AS (
SELECT
block_id,
COUNT(
DISTINCT chunk_hash
) AS chunk_count_actual
FROM
recent_chunks
GROUP BY
1
),
comp AS (
SELECT
_partition_by_block_number,
b.block_id AS bblock_id,
C.block_id AS cblock_id,
b.chunk_count_block_header,
C.chunk_count_actual
FROM
block_chunks_included b full
OUTER JOIN chunks_per_block C USING (block_id)
),
missing AS (
SELECT
*
FROM
comp
WHERE
chunk_count_block_header > 0
AND (
bblock_id IS NULL
OR cblock_id IS NULL
OR chunk_count_block_header != chunk_count_actual
)
ORDER BY
1
),
FINAL AS (
SELECT
bblock_id AS block_id,
_partition_by_block_number,
chunk_count_block_header,
chunk_count_actual,
CURRENT_TIMESTAMP AS _test_timestamp
FROM
missing
ORDER BY
1
)
SELECT
*
FROM
FINAL