From eb33ac727af26ebc8a8cc9711d4a6ebc3790a107 Mon Sep 17 00:00:00 2001 From: Austin Date: Wed, 24 Jul 2024 08:43:58 -0400 Subject: [PATCH] update performance view logic --- macros/workflow_tasks.sql | 192 ++++++++++++++++++-------------------- 1 file changed, 93 insertions(+), 99 deletions(-) diff --git a/macros/workflow_tasks.sql b/macros/workflow_tasks.sql index 8a5dc97..ab426ef 100644 --- a/macros/workflow_tasks.sql +++ b/macros/workflow_tasks.sql @@ -1,51 +1,37 @@ {% macro create_gha_tasks() %} {% set query %} - SELECT - task_name, - workflow_name, - workflow_schedule - FROM - {{ ref('github_actions__tasks') }} +SELECT + task_name, + workflow_name, + workflow_schedule +FROM + {{ ref('github_actions__tasks') }} + {% endset %} - {% set results = run_query(query) %} - {% if execute and results is not none %} {% set results_list = results.rows %} {% else %} {% set results_list = [] %} {% endif %} - {% set prod_db = target.database.lower().replace('_dev', '') %} - + {% set prod_db = target.database.lower().replace( + '_dev', + '' + ) %} {% for result in results_list %} - {% set task_name = result[0] %} - {% set workflow_name = result[1] %} - {% set workflow_schedule = result[2] %} - + {% set task_name = result [0] %} + {% set workflow_name = result [1] %} + {% set workflow_schedule = result [2] %} {% set sql %} - EXECUTE IMMEDIATE - 'CREATE OR REPLACE TASK github_actions.{{ task_name }} - WAREHOUSE = DBT_CLOUD - SCHEDULE = \'USING CRON {{ workflow_schedule }} UTC\' - COMMENT = \'Task to trigger {{ workflow_name }}.yml workflow according to {{ workflow_schedule }}\' AS - DECLARE - rs resultset; - output string; - BEGIN - rs := (SELECT github_actions.workflow_dispatches(\'FlipsideCrypto\', \'{{ prod_db }}-models\', \'{{ workflow_name }}.yml\', NULL):status_code::int AS status_code); - SELECT LISTAGG($1, \';\') INTO :output FROM TABLE(result_scan(LAST_QUERY_ID())) LIMIT 1; - CALL SYSTEM$SET_RETURN_VALUE(:output); - END;' - {% endset %} - + EXECUTE IMMEDIATE 'CREATE OR REPLACE TASK github_actions.{{ task_name }} WAREHOUSE = DBT_CLOUD SCHEDULE = \'USING CRON {{ workflow_schedule }} UTC\' COMMENT = \'Task to trigger {{ workflow_name }}.yml workflow according to {{ workflow_schedule }}\' AS DECLARE rs resultset; output string; BEGIN rs := (SELECT github_actions.workflow_dispatches(\'FlipsideCrypto\', \'{{ prod_db }}-models\', \'{{ workflow_name }}.yml\', NULL):status_code::int AS status_code); SELECT LISTAGG($1, \';\') INTO :output FROM TABLE(result_scan(LAST_QUERY_ID())) LIMIT 1; CALL SYSTEM$SET_RETURN_VALUE(:output); END;' {% endset %} {% do run_query(sql) %} - {% if var("START_GHA_TASKS") %} {% if target.database.lower() == prod_db %} {% set sql %} - ALTER TASK github_actions.{{ task_name }} RESUME; - {% endset %} + ALTER task github_actions.{{ task_name }} + resume; +{% endset %} {% do run_query(sql) %} {% endif %} {% endif %} @@ -53,27 +39,28 @@ {% endmacro %} {% macro gha_tasks_view() %} - SELECT - workflow_name, - concat_ws( - '_', - 'TRIGGER', - UPPER(workflow_name) - ) AS task_name, - workflow_schedule - FROM - {{ source( - 'github_actions', - 'workflows' - ) }} +SELECT + workflow_name, + concat_ws( + '_', + 'TRIGGER', + UPPER(workflow_name) + ) AS task_name, + workflow_schedule +FROM + {{ source( + 'github_actions', + 'workflows' + ) }} {% endmacro %} {% macro gha_task_history_view() %} {% set query %} - SELECT - DISTINCT task_name - FROM - {{ ref('github_actions__tasks') }} +SELECT + DISTINCT task_name +FROM + {{ ref('github_actions__tasks') }} + {% endset %} {% set results = run_query(query) %} {% if execute and results is not none %} @@ -88,21 +75,14 @@ FROM ({% for result in results_list %} SELECT - NAME AS task_name, - completed_time, - return_value, - state, - database_name, - schema_name, - scheduled_time, - query_start_time + NAME AS task_name, completed_time, return_value, state, database_name, schema_name, scheduled_time, query_start_time FROM TABLE(information_schema.task_history(scheduled_time_range_start => DATEADD('hour', -24, CURRENT_TIMESTAMP()), task_name => '{{ result[0]}}')) {% if not loop.last %} UNION ALL {% endif %} {% endfor %}) AS subquery WHERE - database_name = '{{ target.database }}' + database_name = '{{ target.database }}' AND schema_name = 'GITHUB_ACTIONS') SELECT * @@ -126,62 +106,76 @@ ) ) AS t ) - SELECT - task_name, - workflow_name, - workflow_schedule, - scheduled_time - FROM - base +SELECT + task_name, + workflow_name, + workflow_schedule, + scheduled_time +FROM + base {% endmacro %} {% macro gha_task_performance_view() %} - SELECT - s.task_name, - s.workflow_name, - s.scheduled_time, +SELECT + s.task_name, + s.workflow_name, + s.scheduled_time, + h.return_value +FROM + {{ ref('github_actions__task_schedule') }} + s + LEFT JOIN {{ ref('github_actions__task_history') }} + h + ON s.task_name = h.task_name + AND TO_TIMESTAMP_NTZ( + s.scheduled_time + ) BETWEEN TO_TIMESTAMP_NTZ(DATEADD(MINUTE, -1, h.scheduled_time)) + AND TO_TIMESTAMP_NTZ(DATEADD(MINUTE, 1, h.scheduled_time)) + AND TRY_TO_NUMBER( h.return_value - FROM - {{ ref('github_actions__task_schedule') }} - s - LEFT JOIN {{ ref('github_actions__task_history') }} - h - ON s.task_name = h.task_name - AND TO_TIMESTAMP_NTZ(DATE_TRUNC('minute', s.scheduled_time)) = TO_TIMESTAMP_NTZ(DATE_TRUNC('minute', h.scheduled_time)) - AND try_to_number(h.return_value) between 200 and 299 - AND h.state = 'SUCCEEDED' - ORDER BY - task_name, - scheduled_time + ) BETWEEN 200 + AND 299 + AND h.state = 'SUCCEEDED' +ORDER BY + task_name, + scheduled_time {% endmacro %} {% macro gha_task_current_status_view() %} WITH base AS ( - SELECT + SELECT task_name, workflow_name, scheduled_time, return_value, return_value IS NOT NULL AS was_successful - FROM {{ ref('github_actions__task_performance') }} - QUALIFY row_number() OVER (PARTITION BY task_name ORDER BY scheduled_time DESC) <= 2 + FROM + {{ ref('github_actions__task_performance') }} + qualify ROW_NUMBER() over ( + PARTITION BY task_name + ORDER BY + scheduled_time DESC + ) <= 2 ) - SELECT - task_name, - workflow_name, - MAX(scheduled_time) AS recent_scheduled_time, - MIN(scheduled_time) AS prior_scheduled_time, - SUM(IFF(return_value = 204, 1, 0)) AS successes, - successes > 0 AS pipeline_active - FROM base - GROUP BY task_name, workflow_name +SELECT + task_name, + workflow_name, + MAX(scheduled_time) AS recent_scheduled_time, + MIN(scheduled_time) AS prior_scheduled_time, + SUM(IFF(return_value = 204, 1, 0)) AS successes, + successes > 0 AS pipeline_active +FROM + base +GROUP BY + task_name, + workflow_name {% endmacro %} -{% macro alter_gha_task(task_name, task_action) %} +{% macro alter_gha_task( + task_name, + task_action + ) %} {% set sql %} - EXECUTE IMMEDIATE - 'ALTER TASK IF EXISTS github_actions.{{ task_name }} {{ task_action }};' - {% endset %} - + EXECUTE IMMEDIATE 'ALTER TASK IF EXISTS github_actions.{{ task_name }} {{ task_action }};' {% endset %} {% do run_query(sql) %} -{% endmacro %} \ No newline at end of file +{% endmacro %}