diff --git a/macros/partition_batch_load.sql b/macros/partition_batch_load.sql index 6747114..2094437 100644 --- a/macros/partition_batch_load.sql +++ b/macros/partition_batch_load.sql @@ -1,23 +1,21 @@ {% macro partition_batch_load(batch_size) %} {% if is_incremental() %} - - _partition_by_block_number BETWEEN ( +_partition_by_block_number BETWEEN ( + SELECT + MAX(_partition_by_block_number) - 10000 + FROM + {{ this }} +) +AND ( + ( SELECT MAX(_partition_by_block_number) FROM {{ this }} - ) - AND ( - ( - SELECT - MAX(_partition_by_block_number) - FROM - {{ this }} - ) + {{ batch_size }} - ) + ) + {{ batch_size }} +) {%- else -%} - _partition_by_block_number BETWEEN 9820000 AND 10000000 {% endif %} @@ -30,24 +28,21 @@ ) %} {% if is_incremental() %} - - _partition_by_block_number BETWEEN ( +_partition_by_block_number BETWEEN ( + SELECT + MAX(_partition_by_block_number) - {{ front_buffer }} + FROM + {{ this }} +) +AND ( + ( SELECT - MAX(_partition_by_block_number) - {{ front_buffer }} + MAX(_partition_by_block_number) FROM {{ this }} - ) - AND ( - ( - SELECT - MAX(_partition_by_block_number) - FROM - {{ this }} - ) + {{ batch_size }} + {{ end_buffer }} - ) + ) + {{ batch_size }} + {{ end_buffer }} +) {%- else -%} - TRUE {% endif %} {%- endmacro %} - diff --git a/models/silver/streamline/silver__load_blocks.sql b/models/silver/streamline/silver__load_blocks.sql index 02dde35..dbedafd 100644 --- a/models/silver/streamline/silver__load_blocks.sql +++ b/models/silver/streamline/silver__load_blocks.sql @@ -7,14 +7,17 @@ tags = ['load', 'load_blocks'] ) }} -WITH missing_blocks AS ( +WITH {% if var("MANUAL_FIX") %} + missing_blocks AS ( + + SELECT + _partition_by_block_number, + missing_block_id + FROM + {{ target.database }}.tests.streamline_block_gaps + ), +{% endif %} - SELECT - _partition_by_block_number, - missing_block_id - FROM - {{ target.database }}.tests.streamline_block_gaps -), blocks_json AS ( SELECT block_id, @@ -40,7 +43,7 @@ blocks_json AS ( missing_blocks ) {% else %} - WHERE + WHERE {{ partition_batch_load(150000) }} {% endif %} ) diff --git a/models/silver/streamline/silver__load_shards.sql b/models/silver/streamline/silver__load_shards.sql index cf1bbea..634cf05 100644 --- a/models/silver/streamline/silver__load_shards.sql +++ b/models/silver/streamline/silver__load_shards.sql @@ -7,17 +7,20 @@ tags = ['load', 'load_shards'] ) }} -WITH missing_shards AS ( +WITH {% if var("MANUAL_FIX") %} + missing_shards AS ( + + SELECT + _partition_by_block_number, + VALUE AS block_id + FROM + {{ target.database }}.tests.chunk_gaps, + LATERAL FLATTEN( + input => blocks_to_walk + ) + ), +{% endif %} - SELECT - _partition_by_block_number, - VALUE AS block_id - FROM - {{ target.database }}.tests.chunk_gaps, - LATERAL FLATTEN( - input => blocks_to_walk - ) -), shards_json AS ( SELECT block_id, @@ -49,7 +52,7 @@ shards_json AS ( missing_shards ) {% else %} - WHERE + WHERE {{ partition_batch_load(150000) }} {% endif %} ) diff --git a/tests/tests__recent_block_gaps.sql b/tests/tests__recent_block_gaps.sql new file mode 100644 index 0000000..4e7ab5f --- /dev/null +++ b/tests/tests__recent_block_gaps.sql @@ -0,0 +1,37 @@ +{{ config( + severity = 'warn', + tags = ['recent_gap_test'] +) }} + +WITH recent_blocks AS ( + + SELECT + * + FROM + {{ ref('silver__streamline_blocks') }} + WHERE + block_timestamp :: DATE >= CURRENT_DATE - INTERVAL '2 days' +), +silver_blocks AS ( + SELECT + block_id, + block_id - 1 AS missing_block_id, + block_timestamp, + block_hash, + prev_hash, + LAG(block_hash) over ( + ORDER BY + block_timestamp ASC, + block_id ASC + ) AS prior_hash, + _partition_by_block_number, + CURRENT_TIMESTAMP AS _test_timestamp + FROM + recent_blocks +) +SELECT + * +FROM + silver_blocks +WHERE + prior_hash <> prev_hash diff --git a/tests/tests__recent_chunk_gaps.sql b/tests/tests__recent_chunk_gaps.sql new file mode 100644 index 0000000..c0c8f2c --- /dev/null +++ b/tests/tests__recent_chunk_gaps.sql @@ -0,0 +1,88 @@ +{{ config( + severity = 'warn', + tags = ['recent_gap_test'] +) }} + +WITH recent_blocks AS ( + + SELECT + * + FROM + {{ ref('silver__streamline_blocks') }} + WHERE + block_timestamp :: DATE >= CURRENT_DATE - INTERVAL '2 days' +), +recent_chunks AS ( + SELECT + * + FROM + {{ ref('silver__streamline_chunks') }} + WHERE + block_id >= ( + SELECT + MIN(block_id) + FROM + recent_blocks + ) +), +block_chunks_included AS ( + SELECT + block_id, + header :chunks_included AS chunk_count_block_header, + _partition_by_block_number + FROM + recent_blocks +), +chunks_per_block AS ( + SELECT + block_id, + COUNT( + DISTINCT chunk_hash + ) AS chunk_count_actual + FROM + recent_chunks + GROUP BY + 1 +), +comp AS ( + SELECT + _partition_by_block_number, + b.block_id AS bblock_id, + C.block_id AS cblock_id, + b.chunk_count_block_header, + C.chunk_count_actual + FROM + block_chunks_included b full + OUTER JOIN chunks_per_block C USING (block_id) +), +missing AS ( + SELECT + * + FROM + comp + WHERE + chunk_count_block_header > 0 + AND ( + bblock_id IS NULL + OR cblock_id IS NULL + OR chunk_count_block_header != chunk_count_actual + ) + ORDER BY + 1 +), +FINAL AS ( + SELECT + bblock_id AS block_id, + _partition_by_block_number, + chunk_count_block_header, + chunk_count_actual, + CURRENT_TIMESTAMP AS _test_timestamp + FROM + missing + ORDER BY + 1 +) +SELECT + * +FROM + FINAL