Merge pull request #31 from FlipsideCrypto/AN-4508/dynamic-predicate

An 4508/dynamic predicate
This commit is contained in:
desmond-hui 2024-02-07 09:37:45 -08:00 committed by GitHub
commit 7fa932b2df
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 174 additions and 1 deletions

View File

@ -215,12 +215,60 @@ A set of macros and UDFs have been created to help with the creation of Snowflak
``````
10. Create the Tasks
```
dbt run-operation fsc_utils.create_gha_tasks --var '{"START_GHA_TASKS":True}'
dbt run-operation fsc_utils.create_gha_tasks --vars '{"START_GHA_TASKS":True}'
```
> This will create the tasks in Snowflake and the workflows in GitHub Actions. The tasks will only be started if `START_GHA_TASKS` is set to `True` and the target is the production database for your project.
11. Add a Data Dog CI Pipeline Alert on the logs of `dbt_test_tasks` to ensure that the test is checking the workflows successfully. See `Polygon Task Alert` in Data Dog for sample alert.
## Dynamic Merge Predicate
A set of macros to help with generating dynamic merge predicate statements for models in chain projects. Specifically this will output a concatenanted set of BETWEEN statements of contiguous ranges.
### Setup and Usage ###
The macro only supports generating predicates for column types of DATE and INTEGER
1. Make sure fsc-utils package referenced in the project is version `v1.16.0` or greater. Re-run dbt deps if revision was changed.
#### Inline Usage ####
{% set between_stmts = fsc_utils.dynamic_range_predicate("silver.my_temp_table", "block_timestamp::date") %}
...
SELECT
*
FROM
some_other_table
WHERE
{{ between_stmts }}
#### DBT Snowflake incremental_predicate Usage ####
1. Requires overriding behavior of `get_merge_sql` macro
2. Create a file in `macros/dbt/` ex: `macros/dbt/get_merge.sql`
3. Copy this to the new file
```
{% macro get_merge_sql(target, source, unique_key, dest_columns, incremental_predicates) -%}
{% set merge_sql = fsc_utils.get_merge_sql(target, source, unique_key, dest_columns, incremental_predicates) %}
{{ return(merge_sql) }}
{% endmacro %}
```
**NOTE**: This is backwards compatible with the default dbt merge behavior, however it does override the default macro. If additional customization is needed, the above macro should be modified.
4. Example usage to create predicates using block_id
```
{{ config(
...
incremental_predicates = ["dynamic_range_predicate", "block_id"],
...
) }}
```
Example Output: ```(DBT_INTERNAL_DEST.block_id between 100 and 200 OR DBT_INTERNAL_DEST.block_id between 100000 and 150000)```
## Resources
* Learn more about dbt [in the docs](https://docs.getdbt.com/docs/introduction)

12
macros/dbt/get_merge.sql Normal file
View File

@ -0,0 +1,12 @@
{% macro get_merge_sql(target, source, unique_key, dest_columns, incremental_predicates) -%}
{% set predicate_override = "" %}
{% if incremental_predicates[0] == "dynamic_range_predicate" %}
-- run some queries to dynamically determine the min + max of this 'date_column' in the new data
{% set predicate_override = fsc_utils.dynamic_range_predicate(source, incremental_predicates[1], "DBT_INTERNAL_DEST") %}
{% endif %}
{% set predicates = [predicate_override] if predicate_override else incremental_predicates %}
-- standard merge from here
{% set merge_sql = dbt.get_merge_sql(target, source, unique_key, dest_columns, predicates) %}
{{ return(merge_sql) }}
{% endmacro %}

View File

@ -0,0 +1,113 @@
{% macro dynamic_range_predicate(source, predicate_column, output_alias="") -%}
{% set supported_data_types = ["INTEGER","DATE"] %}
{% set predicate_column_data_type_query %}
SELECT typeof({{predicate_column}}::variant)
FROM {{ source }}
WHERE {{predicate_column}} IS NOT NULL
LIMIT 1;
{% endset %}
{% set predicate_column_data_type = run_query(predicate_column_data_type_query).columns[0].values()[0] %}
{% if predicate_column_data_type not in supported_data_types %}
{{ exceptions.raise_compiler_error("Data type of "~ predicate_column_data_type ~" is not supported, use one of "~ supported_data_types ~" column instead") }}
{% endif %}
{% set get_start_end_query %}
SELECT
MIN(
{{ predicate_column }}
) AS full_range_start,
MAX(
{{ predicate_column }}
) AS full_range_end
FROM
{{ source }}
{% endset %}
{% set start_end_results = run_query(get_start_end_query).columns %}
{% set start_preciate_value = start_end_results[0].values()[0] %}
{% set end_predicate_value = start_end_results[1].values()[0] %}
{% set get_limits_query %}
WITH block_range AS (
{% if predicate_column_data_type == "INTEGER" %}
SELECT
SEQ4() + {{ start_preciate_value }} as predicate_value
FROM
TABLE(GENERATOR(rowcount => {{ end_predicate_value - start_preciate_value }}+1))
{% else %}
SELECT
date_day as predicate_value
FROM
crosschain.core.dim_dates
WHERE
date_day BETWEEN '{{ start_preciate_value }}' AND '{{ end_predicate_value }}'
{% endif %}
),
partition_block_counts AS (
SELECT
b.predicate_value,
COUNT(r.{{ predicate_column }}) AS count_in_window
FROM
block_range b
LEFT OUTER JOIN {{ source }}
r
ON r.{{ predicate_column }} = b.predicate_value
GROUP BY
1
),
range_groupings AS (
SELECT
predicate_value,
count_in_window,
conditional_change_event(
count_in_window > 0
) over (
ORDER BY
predicate_value
) AS group_val
FROM
partition_block_counts
),
contiguous_ranges AS (
SELECT
MIN(predicate_value) AS start_value,
MAX(predicate_value) AS end_value
FROM
range_groupings
WHERE
count_in_window > 0
GROUP BY
group_val
),
between_stmts AS (
SELECT
CONCAT(
'{{ output_alias~"." if output_alias else "" }}',
'{{ predicate_column }} between \'',
start_value,
'\' and \'',
end_value,
'\''
) AS b
FROM
contiguous_ranges
)
SELECT
CONCAT('(', LISTAGG(b, ' OR '), ')')
FROM
between_stmts
{% endset %}
{% set between_stmts = run_query(get_limits_query).columns[0].values()[0] %}
{% if between_stmts != '()' %}
/* in case empty update array */
{% set predicate_override = between_stmts %}
{% else %}
{% set predicate_override = '1=1' %}
/* need to have something or it will error since it expects at least 1 predicate */
{% endif %}
{{ return(predicate_override) }}
{% endmacro %}