diff --git a/README.md b/README.md index ff206a4..beea577 100644 --- a/README.md +++ b/README.md @@ -215,12 +215,60 @@ A set of macros and UDFs have been created to help with the creation of Snowflak `````` 10. Create the Tasks ``` - dbt run-operation fsc_utils.create_gha_tasks --var '{"START_GHA_TASKS":True}' + dbt run-operation fsc_utils.create_gha_tasks --vars '{"START_GHA_TASKS":True}' ``` > This will create the tasks in Snowflake and the workflows in GitHub Actions. The tasks will only be started if `START_GHA_TASKS` is set to `True` and the target is the production database for your project. 11. Add a Data Dog CI Pipeline Alert on the logs of `dbt_test_tasks` to ensure that the test is checking the workflows successfully. See `Polygon Task Alert` in Data Dog for sample alert. + +## Dynamic Merge Predicate + +A set of macros to help with generating dynamic merge predicate statements for models in chain projects. Specifically this will output a concatenanted set of BETWEEN statements of contiguous ranges. + +### Setup and Usage ### + +The macro only supports generating predicates for column types of DATE and INTEGER + + 1. Make sure fsc-utils package referenced in the project is version `v1.16.0` or greater. Re-run dbt deps if revision was changed. + +#### Inline Usage #### + + {% set between_stmts = fsc_utils.dynamic_range_predicate("silver.my_temp_table", "block_timestamp::date") %} + ... + + SELECT + * + FROM + some_other_table + WHERE + {{ between_stmts }} + +#### DBT Snowflake incremental_predicate Usage #### + + 1. Requires overriding behavior of `get_merge_sql` macro + + 2. Create a file in `macros/dbt/` ex: `macros/dbt/get_merge.sql` + + 3. Copy this to the new file + ``` + {% macro get_merge_sql(target, source, unique_key, dest_columns, incremental_predicates) -%} + {% set merge_sql = fsc_utils.get_merge_sql(target, source, unique_key, dest_columns, incremental_predicates) %} + {{ return(merge_sql) }} + {% endmacro %} + ``` + **NOTE**: This is backwards compatible with the default dbt merge behavior, however it does override the default macro. If additional customization is needed, the above macro should be modified. + +4. Example usage to create predicates using block_id + ``` + {{ config( + ... + incremental_predicates = ["dynamic_range_predicate", "block_id"], + ... + ) }} + ``` + Example Output: ```(DBT_INTERNAL_DEST.block_id between 100 and 200 OR DBT_INTERNAL_DEST.block_id between 100000 and 150000)``` + ## Resources * Learn more about dbt [in the docs](https://docs.getdbt.com/docs/introduction) diff --git a/macros/dbt/get_merge.sql b/macros/dbt/get_merge.sql new file mode 100644 index 0000000..e0a568e --- /dev/null +++ b/macros/dbt/get_merge.sql @@ -0,0 +1,12 @@ +{% macro get_merge_sql(target, source, unique_key, dest_columns, incremental_predicates) -%} + {% set predicate_override = "" %} + {% if incremental_predicates[0] == "dynamic_range_predicate" %} + -- run some queries to dynamically determine the min + max of this 'date_column' in the new data + {% set predicate_override = fsc_utils.dynamic_range_predicate(source, incremental_predicates[1], "DBT_INTERNAL_DEST") %} + {% endif %} + {% set predicates = [predicate_override] if predicate_override else incremental_predicates %} + -- standard merge from here + {% set merge_sql = dbt.get_merge_sql(target, source, unique_key, dest_columns, predicates) %} + {{ return(merge_sql) }} + +{% endmacro %} \ No newline at end of file diff --git a/macros/dynamic_range_predicate.sql b/macros/dynamic_range_predicate.sql new file mode 100644 index 0000000..51a17e8 --- /dev/null +++ b/macros/dynamic_range_predicate.sql @@ -0,0 +1,113 @@ +{% macro dynamic_range_predicate(source, predicate_column, output_alias="") -%} + {% set supported_data_types = ["INTEGER","DATE"] %} + {% set predicate_column_data_type_query %} + SELECT typeof({{predicate_column}}::variant) + FROM {{ source }} + WHERE {{predicate_column}} IS NOT NULL + LIMIT 1; + {% endset %} + {% set predicate_column_data_type = run_query(predicate_column_data_type_query).columns[0].values()[0] %} + + + {% if predicate_column_data_type not in supported_data_types %} + {{ exceptions.raise_compiler_error("Data type of "~ predicate_column_data_type ~" is not supported, use one of "~ supported_data_types ~" column instead") }} + {% endif %} + + {% set get_start_end_query %} + SELECT + MIN( + {{ predicate_column }} + ) AS full_range_start, + MAX( + {{ predicate_column }} + ) AS full_range_end + FROM + {{ source }} + {% endset %} + {% set start_end_results = run_query(get_start_end_query).columns %} + {% set start_preciate_value = start_end_results[0].values()[0] %} + {% set end_predicate_value = start_end_results[1].values()[0] %} + + {% set get_limits_query %} + WITH block_range AS ( + {% if predicate_column_data_type == "INTEGER" %} + SELECT + SEQ4() + {{ start_preciate_value }} as predicate_value + FROM + TABLE(GENERATOR(rowcount => {{ end_predicate_value - start_preciate_value }}+1)) + {% else %} + SELECT + date_day as predicate_value + FROM + crosschain.core.dim_dates + WHERE + date_day BETWEEN '{{ start_preciate_value }}' AND '{{ end_predicate_value }}' + {% endif %} + ), + partition_block_counts AS ( + SELECT + b.predicate_value, + COUNT(r.{{ predicate_column }}) AS count_in_window + FROM + block_range b + LEFT OUTER JOIN {{ source }} + r + ON r.{{ predicate_column }} = b.predicate_value + GROUP BY + 1 + ), + range_groupings AS ( + SELECT + predicate_value, + count_in_window, + conditional_change_event( + count_in_window > 0 + ) over ( + ORDER BY + predicate_value + ) AS group_val + FROM + partition_block_counts + ), + contiguous_ranges AS ( + SELECT + MIN(predicate_value) AS start_value, + MAX(predicate_value) AS end_value + FROM + range_groupings + WHERE + count_in_window > 0 + GROUP BY + group_val + ), + between_stmts AS ( + SELECT + CONCAT( + '{{ output_alias~"." if output_alias else "" }}', + '{{ predicate_column }} between \'', + start_value, + '\' and \'', + end_value, + '\'' + ) AS b + FROM + contiguous_ranges + ) + SELECT + CONCAT('(', LISTAGG(b, ' OR '), ')') + FROM + between_stmts + {% endset %} + + {% set between_stmts = run_query(get_limits_query).columns[0].values()[0] %} + + {% if between_stmts != '()' %} + /* in case empty update array */ + {% set predicate_override = between_stmts %} + {% else %} + {% set predicate_override = '1=1' %} + /* need to have something or it will error since it expects at least 1 predicate */ + {% endif %} + + {{ return(predicate_override) }} +{% endmacro %}