macro for manual batch refresh

This commit is contained in:
forgash_ 2023-01-24 16:37:26 -07:00
parent ca61bb3df1
commit f626b51957
8 changed files with 96 additions and 56 deletions

View File

@ -0,0 +1,37 @@
{% macro partition_load_manual(
scope = 'no_buffer'
) %}
{# if range_start and range_end not set in cli, default to earliest rpc data #}
{% set range_start = var(
'range_start',
46700000
) %}
{% set range_end = var(
'range_end',
47000000
) %}
{% set front_buffer = var(
'front_buffer',
0
) %}
{% set end_buffer = var(
'end_buffer',
0
) %}
{% if scope == 'front' %}
_partition_by_block_number BETWEEN {{ range_start }} - (
10000 * {{ front_buffer }}
)
AND {{ range_end }}
{% elif scope == 'end' %}
_partition_by_block_number BETWEEN {{ range_start }}
AND {{ range_end }} + (
10000 * {{ end_buffer }}
) {% elif scope == 'no_buffer' %}
_partition_by_block_number BETWEEN {{ range_start }}
AND {{ range_end }}
{% else %}
TRUE
{% endif %}
{%- endmacro %}

View File

@ -23,32 +23,6 @@ WHERE
{% endif %}
{%- endmacro %}
{% macro partition_batch_load_dev(batch_size) %}
{% if is_incremental() %}
WHERE
_partition_by_block_number > (
SELECT
MAX(_partition_by_block_number)
FROM
{{ this }}
)
AND _partition_by_block_number <= (
(
SELECT
MAX(_partition_by_block_number)
FROM
{{ this }}
) + {{ batch_size }}
)
{%- else -%}
WHERE
{# earliest block in RPC data, use for comparison testing #}
_partition_by_block_number BETWEEN 46670000
AND 47000000
{% endif %}
{%- endmacro %}
{% macro partition_incremental_load(
batch_size,
front_buffer = 0,
@ -76,3 +50,4 @@ WHERE
TRUE
{% endif %}
{%- endmacro %}

View File

@ -1,6 +1,6 @@
{{ config(
materialized = 'view',
tags = ['s3', 's3_helper']
tags = ['s3', 's3_helper', 's3_manual']
) }}
WITH receipts AS (
@ -17,6 +17,11 @@ WITH receipts AS (
A.outcome_receipts,
outer => TRUE
) b
{% if target.name == 'manual_fix' %}
WHERE
{{ partition_load_manual('front') }}
{% else %}
WHERE
_partition_by_block_number >= (
SELECT
@ -30,6 +35,7 @@ WITH receipts AS (
FROM
silver.streamline_receipts_final
) + 220000
{% endif %}
)
SELECT
*

View File

@ -1,7 +1,7 @@
{{ config(
materalized = 'view',
unique_key = 'receipt_id',
tags = ['s3', 's3_helper']
tags = ['s3', 's3_helper', 's3_manual']
) }}
WITH recursive ancestrytree AS (
@ -28,19 +28,25 @@ txs AS (
*
FROM
{{ ref('silver__streamline_transactions') }}
WHERE
_partition_by_block_number >= (
SELECT
MAX(_partition_by_block_number)
FROM
silver.streamline_receipts_final
) - 20000
AND _partition_by_block_number <= (
SELECT
MAX(_partition_by_block_number)
FROM
silver.streamline_receipts_final
) + 220000
{% if target.name == 'manual_fix' %}
WHERE
{{ partition_load_manual('front') }}
{% else %}
WHERE
_partition_by_block_number >= (
SELECT
MAX(_partition_by_block_number)
FROM
silver.streamline_receipts_final
) - 20000
AND _partition_by_block_number <= (
SELECT
MAX(_partition_by_block_number)
FROM
silver.streamline_receipts_final
) + 220000
{% endif %}
),
FINAL AS (
SELECT

View File

@ -3,7 +3,7 @@
incremental_strategy = 'delete+insert',
unique_key = 'receipt_object_id',
cluster_by = ['_load_timestamp::date', 'block_id'],
tags = ['s3', 's3_final']
tags = ['s3', 's3_final', 's3_manual']
) }}
WITH base_receipts AS (
@ -12,8 +12,14 @@ WITH base_receipts AS (
*
FROM
{{ ref('silver__streamline_receipts') }}
{{ partition_batch_load(150000) }}
AND {{ incremental_load_filter('_load_timestamp') }}
{% if target.name == 'manual_fix' %}
WHERE
{{ partition_load_manual('no_buffer') }}
{% else %}
{{ partition_batch_load(150000) }}
AND {{ incremental_load_filter('_load_timestamp') }}
{% endif %}
),
blocks AS (
SELECT

View File

@ -1,9 +1,9 @@
{{ config(
materialized = 'incremental',
unique_key = 'tx_hash',
incremental_strategy = 'merge',
incremental_strategy = 'delete+insert',
cluster_by = ['_load_timestamp::date', 'block_timestamp::date'],
tags = ['s3', 's3_final']
tags = ['s3', 's3_final', 's3_manual']
) }}
WITH int_txs AS (
@ -12,24 +12,34 @@ WITH int_txs AS (
*
FROM
{{ ref('silver__streamline_transactions') }}
{{ partition_incremental_load(
150000,
10000,
0
) }}
{% if target.name == 'manual_fix' %}
WHERE
{{ partition_load_manual('no_buffer') }}
{% else %}
{{ partition_incremental_load(
150000,
10000,
0
) }}
{% endif %}
),
int_receipts AS (
SELECT
*
FROM
{{ ref('silver__streamline_receipts_final') }}
{{ partition_incremental_load(
150000,
10000,
0
) }}
{% if target.name == 'manual_fix' %}
WHERE
{{ partition_load_manual('end') }}
{% else %}
{{ partition_incremental_load(
150000,
10000,
0
) }}
{% endif %}
),
int_blocks AS (
SELECT