This commit is contained in:
Desmond Hui 2024-02-06 09:33:35 -08:00
parent 1f722ee45b
commit 619aefccd1
3 changed files with 178 additions and 1 deletions

View File

@ -215,12 +215,64 @@ A set of macros and UDFs have been created to help with the creation of Snowflak
``````
10. Create the Tasks
```
dbt run-operation fsc_utils.create_gha_tasks --var '{"START_GHA_TASKS":True}'
dbt run-operation fsc_utils.create_gha_tasks --vars '{"START_GHA_TASKS":True}'
```
> This will create the tasks in Snowflake and the workflows in GitHub Actions. The tasks will only be started if `START_GHA_TASKS` is set to `True` and the target is the production database for your project.
11. Add a Data Dog CI Pipeline Alert on the logs of `dbt_test_tasks` to ensure that the test is checking the workflows successfully. See `Polygon Task Alert` in Data Dog for sample alert.
## Dynamic Merge Predicate
A macro to help with generating merge predicate statements for models in chain projects. Specifically this will output a concatenanted set of BETWEEN statements.
### Setup and Usage ###
The macro only supports generating predicates for column types of DATE and INTEGER
#### Inline Usage ####
{% set between_stmts = fsc_utils.dynamic_range_predicate("silver.my_temp_table", "block_timestamp::date") %}
...
SELECT
*
FROM
some_other_table
WHERE
{{ between_stmts }}
#### DBT Snowflake incremental_predicate Usage ####
1. Requires overriding behavior of `get_merge_sql` macro
2. Create a file in `macros/dbt/` ex: `macros/dbt/get_merge.sql`
3. Copy this to the new file
```
{% macro get_merge_sql(target, source, unique_key, dest_columns, incremental_predicates) -%}
{% set predicate_override = "" %}
{% if incremental_predicates[0] == "dynamic_range_predicate" %}
-- run some queries to dynamically determine the min + max of this 'date_column' in the new data
{% set predicate_override = dynamic_range_predicate(source, incremental_predicates[1], "DBT_INTERNAL_DEST") %}
{% endif %}
{% set predicates = [predicate_override] if predicate_override else incremental_predicates %}
-- standard merge from here
{% set merge_sql = dbt.get_merge_sql(target, source, unique_key, dest_columns, predicates) %}
{{ return(merge_sql) }}
{% endmacro %}
```
4. Example usage
```
{{ config(
...
incremental_predicates = ["dynamic_range_predicate", "block_id"],
...
) }}
```
## Resources
* Learn more about dbt [in the docs](https://docs.getdbt.com/docs/introduction)

12
macros/dbt/get_merge.sql Normal file
View File

@ -0,0 +1,12 @@
{% macro get_merge_sql(target, source, unique_key, dest_columns, incremental_predicates) -%}
{% set predicate_override = "" %}
{% if incremental_predicates[0] == "dynamic_range_predicate" %}
-- run some queries to dynamically determine the min + max of this 'date_column' in the new data
{% set predicate_override = dynamic_range_predicate(source, incremental_predicates[1], "DBT_INTERNAL_DEST") %}
{% endif %}
{% set predicates = [predicate_override] if predicate_override else incremental_predicates %}
-- standard merge from here
{% set merge_sql = dbt.get_merge_sql(target, source, unique_key, dest_columns, predicates) %}
{{ return(merge_sql) }}
{% endmacro %}

View File

@ -0,0 +1,113 @@
{% macro dynamic_range_predicate(source, predicate_column, output_alias="") -%}
{% set supported_data_types = ["INTEGER","DATE"] %}
{% set predicate_column_data_type_query %}
SELECT typeof({{predicate_column}}::variant)
FROM {{ source }}
WHERE {{predicate_column}} IS NOT NULL
LIMIT 1;
{% endset %}
{% set predicate_column_data_type = run_query(predicate_column_data_type_query).columns[0].values()[0] %}
{% if predicate_column_data_type not in supported_data_types %}
{{ exceptions.raise_compiler_error("Data type of "~ predicate_column_data_type ~" is not supported, use one of "~ supported_data_types ~" column instead") }}
{% endif %}
{% set get_start_end_query %}
SELECT
MIN(
{{ predicate_column }}
) AS full_range_start,
MAX(
{{ predicate_column }}
) AS full_range_end
FROM
{{ source }}
{% endset %}
{% set start_end_results = run_query(get_start_end_query).columns %}
{% set start_preciate_value = start_end_results[0].values()[0] %}
{% set end_predicate_value = start_end_results[1].values()[0] %}
{% set get_limits_query %}
WITH block_range AS (
{% if predicate_column_data_type == "INTEGER" %}
SELECT
SEQ4() + {{ start_preciate_value }} as predicate_value
FROM
TABLE(GENERATOR(rowcount => {{ end_predicate_value - start_preciate_value }}+1))
{% else %}
SELECT
date_day as predicate_value
FROM
crosschain.core.dim_dates
WHERE
date_day BETWEEN '{{ start_preciate_value }}' AND '{{ end_predicate_value }}'
{% endif %}
),
partition_block_counts AS (
SELECT
b.predicate_value,
COUNT(r.{{ predicate_column }}) AS count_in_window
FROM
block_range b
LEFT OUTER JOIN {{ source }}
r
ON r.{{ predicate_column }} = b.predicate_value
GROUP BY
1
),
range_groupings AS (
SELECT
predicate_value,
count_in_window,
conditional_change_event(
count_in_window > 0
) over (
ORDER BY
predicate_value
) AS group_val
FROM
partition_block_counts
),
contiguous_ranges AS (
SELECT
MIN(predicate_value) AS start_value,
MAX(predicate_value) AS end_value
FROM
range_groupings
WHERE
count_in_window > 0
GROUP BY
group_val
),
between_stmts AS (
SELECT
CONCAT(
'{{ output_alias~"." if output_alias else "" }}',
'{{ predicate_column }} between \'',
start_value,
'\' and \'',
end_value,
'\''
) AS b
FROM
contiguous_ranges
)
SELECT
CONCAT('(', LISTAGG(b, ' OR '), ')')
FROM
between_stmts
{% endset %}
{% set between_stmts = run_query(get_limits_query).columns[0].values()[0] %}
{% if between_stmts != '()' %}
/* in case empty update array */
{% set predicate_override = between_stmts %}
{% else %}
{% set predicate_override = '1=1' %}
/* need to have something or it will error since it expects at least 1 predicate */
{% endif %}
{{ return(predicate_override) }}
{% endmacro %}