From 09645560b1c84b008750435f32f51bc5e7f9aa94 Mon Sep 17 00:00:00 2001 From: Austin Date: Thu, 26 Oct 2023 11:07:16 -0400 Subject: [PATCH] Workflow task macros --- dbt_project.yml | 1 + macros/streamline/configs.yaml.sql | 14 ++- macros/streamline/functions.py.sql | 26 +++++ macros/workflow_tasks.sql | 171 +++++++++++++++++++++++++++++ 4 files changed, 211 insertions(+), 1 deletion(-) create mode 100644 macros/workflow_tasks.sql diff --git a/dbt_project.yml b/dbt_project.yml index 5715269..12bf86a 100644 --- a/dbt_project.yml +++ b/dbt_project.yml @@ -38,3 +38,4 @@ vars: "dbt_date:time_zone": GMT UPDATE_UDFS_AND_SPS: false AWS_REGION: us-east-1 + START_GHA_TASKS: False diff --git a/macros/streamline/configs.yaml.sql b/macros/streamline/configs.yaml.sql index a17d452..5e1ddc3 100644 --- a/macros/streamline/configs.yaml.sql +++ b/macros/streamline/configs.yaml.sql @@ -132,7 +132,19 @@ RUNTIME_VERSION = '3.8' HANDLER = 'custom_divide' sql: | - {{ fsc_utils.create_udf_decimal_adjust() | indent(4) }} + {{ fsc_utils.create_udf_decimal_adjust() | indent(4) }} +- name: {{ schema }}.udf_cron_to_prior_timestamps + signature: + - [workflow_name, STRING] + - [workflow_schedule, STRING] + return_type: TABLE(workflow_name STRING, workflow_schedule STRING, timestamp TIMESTAMP_NTZ) + options: | + LANGUAGE PYTHON + RUNTIME_VERSION = '3.8' + PACKAGES = ('croniter') + HANDLER = 'TimestampGenerator' + sql: | + {{ fsc_utils.create_udf_cron_to_prior_timestamps() | indent(4) }} {% endmacro %} diff --git a/macros/streamline/functions.py.sql b/macros/streamline/functions.py.sql index 9c3930c..992b407 100644 --- a/macros/streamline/functions.py.sql +++ b/macros/streamline/functions.py.sql @@ -96,4 +96,30 @@ def custom_divide(input, adjustment): return result_str except Exception as e: return None +{% endmacro %} + +{% macro create_udf_cron_to_prior_timestamps() %} +import croniter +import datetime + +class TimestampGenerator: + + def __init__(self): + pass + + def process(self, workflow_name, workflow_schedule): + for timestamp in self.generate_timestamps(workflow_name, workflow_schedule): + yield (workflow_name, workflow_schedule, timestamp) + + def generate_timestamps(self, workflow_name, workflow_schedule): + # Create a cron iterator object + cron = croniter.croniter(workflow_schedule) + + # Generate timestamps for the prev 10 runs + timestamps = [] + for i in range(10): + prev_run = cron.get_prev(datetime.datetime) + timestamps.append(prev_run) + + return timestamps {% endmacro %} \ No newline at end of file diff --git a/macros/workflow_tasks.sql b/macros/workflow_tasks.sql new file mode 100644 index 0000000..0dee467 --- /dev/null +++ b/macros/workflow_tasks.sql @@ -0,0 +1,171 @@ +{% macro create_gha_tasks() %} + {% set query %} + SELECT + task_name, + workflow_name, + workflow_schedule + FROM + {{ ref('github_actions__tasks') }} + {% endset %} + + {% set results = run_query(query) %} + + {% if execute and results is not none %} + {% set results_list = results.rows %} + {% else %} + {% set results_list = [] %} + {% endif %} + + {% set prod_db = target.database.lower().replace('_dev', '') %} + + {% for result in results_list %} + {% set task_name = result[0] %} + {% set workflow_name = result[1] %} + {% set workflow_schedule = result[2] %} + + {% set sql %} + EXECUTE IMMEDIATE + 'CREATE OR REPLACE TASK github_actions.{{ task_name }} + WAREHOUSE = DBT_CLOUD + SCHEDULE = \'USING CRON {{ workflow_schedule }} UTC\' + COMMENT = \'Task to trigger {{ workflow_name }}.yml workflow according to {{ workflow_schedule }}\' AS + DECLARE + rs resultset; + output string; + BEGIN + rs := (SELECT github_actions.workflow_dispatches(\'FlipsideCrypto\', \'{{ prod_db }}-models\', \'{{ workflow_name }}.yml\', NULL):status_code::int AS status_code); + SELECT LISTAGG($1, \';\') INTO :output FROM TABLE(result_scan(LAST_QUERY_ID())) LIMIT 1; + CALL SYSTEM$SET_RETURN_VALUE(:output); + END;' + {% endset %} + + {% do run_query(sql) %} + + {% if var("START_GHA_TASKS") %} + {% if target.database.lower() == prod_db %} + {% set sql %} + ALTER TASK github_actions.{{ task_name }} RESUME; + {% endset %} + {% do run_query(sql) %} + {% endif %} + {% endif %} + {% endfor %} +{% endmacro %} + +{% macro gha_tasks_view() %} + SELECT + workflow_name, + concat_ws( + '_', + 'TRIGGER', + UPPER(workflow_name) + ) AS task_name, + workflow_schedule + FROM + {{ source( + 'github_actions', + 'workflows' + ) }} +{% endmacro %} + +{% macro gha_task_history_view() %} + {% set query %} + SELECT + DISTINCT task_name + FROM + {{ ref('github_actions__tasks') }} + {% endset %} + {% set results = run_query(query) %} + {% if execute and results is not none %} + {% set results_list = results.rows %} + {% else %} + {% set results_list = [] %} + {% endif %} + + WITH task_history_data AS ( + SELECT + * + FROM + ({% for result in results_list %} + SELECT + NAME AS task_name, + completed_time, + return_value, + state, + database_name, + schema_name, + scheduled_time, + query_start_time + FROM + TABLE(information_schema.task_history(scheduled_time_range_start => DATEADD('hour', -24, CURRENT_TIMESTAMP()), task_name => '{{ result[0]}}')) {% if not loop.last %} + UNION ALL + {% endif %} + {% endfor %}) AS subquery + WHERE + database_name = '{{ target.database }}' + AND schema_name = 'GITHUB_ACTIONS') + SELECT + * + FROM + task_history_data +{% endmacro %} + +{% macro gha_task_schedule_view() %} + WITH base AS ( + SELECT + w.workflow_name AS workflow_name, + w.workflow_schedule AS workflow_schedule, + w.task_name AS task_name, + t.timestamp AS scheduled_time + FROM + {{ ref('github_actions__tasks') }} AS w + CROSS JOIN TABLE( + utils.udf_cron_to_prior_timestamps( + w.workflow_name, + w.workflow_schedule + ) + ) AS t + ) + SELECT + task_name, + workflow_name, + workflow_schedule, + scheduled_time + FROM + base +{% endmacro %} + +{% macro gha_task_performance_view() %} + SELECT + s.task_name, + s.workflow_name, + s.scheduled_time, + h.return_value + FROM + {{ ref('github_actions__task_schedule') }} + s + LEFT JOIN {{ ref('github_actions__task_history') }} + h + ON s.task_name = h.task_name + AND TO_TIMESTAMP_NTZ(DATE_TRUNC('minute', s.scheduled_time)) = TO_TIMESTAMP_NTZ(DATE_TRUNC('minute', h.scheduled_time)) + AND h.return_value between 200 and 299 + AND h.state = 'SUCCEEDED' + ORDER BY + task_name, + scheduled_time +{% endmacro %} + +{% macro gha_task_current_status_view() %} + SELECT + task_name, + workflow_name, + scheduled_time, + return_value, + return_value is not null as was_successful + FROM {{ ref('github_actions__task_performance') }} + QUALIFY row_number() over (partition by task_name order by scheduled_time desc) = 1 +{% endmacro %} + +{% macro alter_gha_task(task_name, task_action) %} + ALTER TASK IF EXISTS github_actions.{{ task_name }} {{ task_action }}; +{% endmacro %} \ No newline at end of file