diff --git a/macros/dbt/get_merge.sql b/macros/dbt/get_merge.sql new file mode 100644 index 00000000..6885da84 --- /dev/null +++ b/macros/dbt/get_merge.sql @@ -0,0 +1,69 @@ +{% macro get_merge_sql(target, source, unique_key, dest_columns, incremental_predicates) -%} + {% set predicate_override = "" %} + {% if incremental_predicates[0] == "dynamic_block_date_ranges" %} + -- run some queries to dynamically determine the min + max of this 'date_column' in the new data + {% set get_limits_query %} + WITH full_range AS ( + SELECT + min(block_timestamp::date) as full_range_start_block_date, + max(block_timestamp::date) as full_range_end_block_date + FROM {{ source }} + ), + block_range as ( + SELECT + date_day, + row_number() over (order by date_day) - 1 as rn + FROM + crosschain.core.dim_dates + WHERE + date_day between (select full_range_start_block_date from full_range) and (select full_range_end_block_date from full_range) + ), + partition_block_counts as ( + SELECT + b.date_day as block_date, + COUNT(*) as count_in_window + FROM + block_range b + left outer join {{ source }} r + on r.block_timestamp::date = b.date_day + group by 1 + ), + range_groupings AS ( + SELECT + block_date, + count_in_window, + CONDITIONAL_CHANGE_EVENT(count_in_window > 1) OVER (ORDER BY block_date) as group_val + FROM + partition_block_counts + ), + contiguous_ranges as ( + select + min(block_date) as start_block_date, + max(block_date) as end_block_date + from range_groupings + where count_in_window > 1 + group by group_val + ), + between_stmts as ( + select + concat('DBT_INTERNAL_DEST.block_timestamp::date between \'',start_block_date,'\' and \'',end_block_date,'\'') as b + from + contiguous_ranges + ) + select + concat('(',listagg(b, ' OR '),')') + from between_stmts + {% endset %} + {% set between_stmts = run_query(get_limits_query).columns[0].values()[0] %} + {% if between_stmts != '()' %} /* in case empty update array */ + {% set predicate_override = between_stmts %} + {% else %} + {% set predicate_override = '1=1' %} /* need to have something or it will error since 'dynamic_block_date_ranges' is not a column */ + {% endif %} + {% endif %} + {% set predicates = [predicate_override] if predicate_override else incremental_predicates %} + -- standard merge from here + {% set merge_sql = dbt.get_merge_sql(target, source, unique_key, dest_columns, predicates) %} + {{ return(merge_sql) }} + +{% endmacro %} \ No newline at end of file diff --git a/models/silver/parser/silver__decoded_instructions.sql b/models/silver/parser/silver__decoded_instructions.sql index 6ac6f510..d343699a 100644 --- a/models/silver/parser/silver__decoded_instructions.sql +++ b/models/silver/parser/silver__decoded_instructions.sql @@ -2,7 +2,7 @@ -- depends_on: {{ ref('bronze__streamline_FR_decoded_instructions_2') }} {{ config( materialized = 'incremental', - incremental_predicates = ['DBT_INTERNAL_DEST.block_timestamp::date >= LEAST(current_date-7,(select min(block_timestamp)::date from ' ~ generate_tmp_view_name(this) ~ '))'], + incremental_predicates = ["dynamic_block_date_ranges"], unique_key = "decoded_instructions_id", cluster_by = ['block_timestamp::DATE','_inserted_timestamp::DATE','program_id'], post_hook = enable_search_optimization('{{this.schema}}','{{this.identifier}}'),