Update/tx complete incremental and merge pred (#60)

* use dynamic range

* get min partition
This commit is contained in:
tarikceric 2025-03-14 09:48:37 -07:00 committed by GitHub
parent b2c06dabce
commit 9e92ff8804
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 26 additions and 13 deletions

View File

@ -4,7 +4,7 @@
{{ config(
materialized = 'incremental',
unique_key = "tx_id",
incremental_predicates = ["COALESCE(DBT_INTERNAL_DEST.block_timestamp::DATE,'2099-01-01') >= (select min(block_timestamp::DATE) from " ~ generate_tmp_view_name(this) ~ ")"],
incremental_predicates = ["dynamic_range_predicate", "block_timestamp::date"],
cluster_by = ['block_timestamp::DATE','_inserted_timestamp::DATE'],
merge_exclude_columns = ["inserted_timestamp"],
tags = ['scheduled_core']

View File

@ -1,6 +1,6 @@
-- depends_on: {{ ref('bronze__transactions_2') }}
-- depends_on: {{ ref('bronze__FR_transactions_2') }}
-- depends_on: {{ ref('streamline__blocks') }}
{{ config (
materialized = "incremental",
@ -9,6 +9,27 @@
cluster_by = "ROUND(block_id, -4)",
) }}
{% if execute %}
{% set min_partition_key_query %}
SELECT round(min(block_id),-4)::int AS min_partition_key
FROM (
SELECT
block_id
FROM
{{ ref("streamline__blocks") }}
WHERE
/* Find the earliest block available from the node provider */
block_id >= 6572203
EXCEPT
SELECT
block_id
FROM
{{ this }}
)
{% endset %}
{% set min_partition_key = run_query(min_partition_key_query)[0][0] %}
{% endif %}
SELECT
block_id,
partition_key,
@ -20,18 +41,10 @@ FROM
{% if is_incremental() %}
{{ ref('bronze__transactions_2') }}
WHERE
_inserted_timestamp >= (
partition_key >= {{ min_partition_key }}
AND _inserted_timestamp >= (
SELECT
COALESCE(MAX(_INSERTED_TIMESTAMP), '1970-01-01' :: DATE) max_INSERTED_TIMESTAMP
FROM
{{ this }}
)
AND partition_key >= (
SELECT
COALESCE(
MAX(partition_key),
0
)
coalesce(max(_inserted_timestamp), '1970-01-01' :: DATE) AS max_inserted_timestamp
FROM
{{ this }}
)