update performance view logic

This commit is contained in:
Austin 2024-07-24 08:43:58 -04:00
parent 9a91c3025f
commit eb33ac727a

View File

@ -1,51 +1,37 @@
{% macro create_gha_tasks() %}
{% set query %}
SELECT
task_name,
workflow_name,
workflow_schedule
FROM
{{ ref('github_actions__tasks') }}
SELECT
task_name,
workflow_name,
workflow_schedule
FROM
{{ ref('github_actions__tasks') }}
{% endset %}
{% set results = run_query(query) %}
{% if execute and results is not none %}
{% set results_list = results.rows %}
{% else %}
{% set results_list = [] %}
{% endif %}
{% set prod_db = target.database.lower().replace('_dev', '') %}
{% set prod_db = target.database.lower().replace(
'_dev',
''
) %}
{% for result in results_list %}
{% set task_name = result[0] %}
{% set workflow_name = result[1] %}
{% set workflow_schedule = result[2] %}
{% set task_name = result [0] %}
{% set workflow_name = result [1] %}
{% set workflow_schedule = result [2] %}
{% set sql %}
EXECUTE IMMEDIATE
'CREATE OR REPLACE TASK github_actions.{{ task_name }}
WAREHOUSE = DBT_CLOUD
SCHEDULE = \'USING CRON {{ workflow_schedule }} UTC\'
COMMENT = \'Task to trigger {{ workflow_name }}.yml workflow according to {{ workflow_schedule }}\' AS
DECLARE
rs resultset;
output string;
BEGIN
rs := (SELECT github_actions.workflow_dispatches(\'FlipsideCrypto\', \'{{ prod_db }}-models\', \'{{ workflow_name }}.yml\', NULL):status_code::int AS status_code);
SELECT LISTAGG($1, \';\') INTO :output FROM TABLE(result_scan(LAST_QUERY_ID())) LIMIT 1;
CALL SYSTEM$SET_RETURN_VALUE(:output);
END;'
{% endset %}
EXECUTE IMMEDIATE 'CREATE OR REPLACE TASK github_actions.{{ task_name }} WAREHOUSE = DBT_CLOUD SCHEDULE = \'USING CRON {{ workflow_schedule }} UTC\' COMMENT = \'Task to trigger {{ workflow_name }}.yml workflow according to {{ workflow_schedule }}\' AS DECLARE rs resultset; output string; BEGIN rs := (SELECT github_actions.workflow_dispatches(\'FlipsideCrypto\', \'{{ prod_db }}-models\', \'{{ workflow_name }}.yml\', NULL):status_code::int AS status_code); SELECT LISTAGG($1, \';\') INTO :output FROM TABLE(result_scan(LAST_QUERY_ID())) LIMIT 1; CALL SYSTEM$SET_RETURN_VALUE(:output); END;' {% endset %}
{% do run_query(sql) %}
{% if var("START_GHA_TASKS") %}
{% if target.database.lower() == prod_db %}
{% set sql %}
ALTER TASK github_actions.{{ task_name }} RESUME;
{% endset %}
ALTER task github_actions.{{ task_name }}
resume;
{% endset %}
{% do run_query(sql) %}
{% endif %}
{% endif %}
@ -53,27 +39,28 @@
{% endmacro %}
{% macro gha_tasks_view() %}
SELECT
workflow_name,
concat_ws(
'_',
'TRIGGER',
UPPER(workflow_name)
) AS task_name,
workflow_schedule
FROM
{{ source(
'github_actions',
'workflows'
) }}
SELECT
workflow_name,
concat_ws(
'_',
'TRIGGER',
UPPER(workflow_name)
) AS task_name,
workflow_schedule
FROM
{{ source(
'github_actions',
'workflows'
) }}
{% endmacro %}
{% macro gha_task_history_view() %}
{% set query %}
SELECT
DISTINCT task_name
FROM
{{ ref('github_actions__tasks') }}
SELECT
DISTINCT task_name
FROM
{{ ref('github_actions__tasks') }}
{% endset %}
{% set results = run_query(query) %}
{% if execute and results is not none %}
@ -88,21 +75,14 @@
FROM
({% for result in results_list %}
SELECT
NAME AS task_name,
completed_time,
return_value,
state,
database_name,
schema_name,
scheduled_time,
query_start_time
NAME AS task_name, completed_time, return_value, state, database_name, schema_name, scheduled_time, query_start_time
FROM
TABLE(information_schema.task_history(scheduled_time_range_start => DATEADD('hour', -24, CURRENT_TIMESTAMP()), task_name => '{{ result[0]}}')) {% if not loop.last %}
UNION ALL
{% endif %}
{% endfor %}) AS subquery
WHERE
database_name = '{{ target.database }}'
database_name = '{{ target.database }}'
AND schema_name = 'GITHUB_ACTIONS')
SELECT
*
@ -126,62 +106,76 @@
)
) AS t
)
SELECT
task_name,
workflow_name,
workflow_schedule,
scheduled_time
FROM
base
SELECT
task_name,
workflow_name,
workflow_schedule,
scheduled_time
FROM
base
{% endmacro %}
{% macro gha_task_performance_view() %}
SELECT
s.task_name,
s.workflow_name,
s.scheduled_time,
SELECT
s.task_name,
s.workflow_name,
s.scheduled_time,
h.return_value
FROM
{{ ref('github_actions__task_schedule') }}
s
LEFT JOIN {{ ref('github_actions__task_history') }}
h
ON s.task_name = h.task_name
AND TO_TIMESTAMP_NTZ(
s.scheduled_time
) BETWEEN TO_TIMESTAMP_NTZ(DATEADD(MINUTE, -1, h.scheduled_time))
AND TO_TIMESTAMP_NTZ(DATEADD(MINUTE, 1, h.scheduled_time))
AND TRY_TO_NUMBER(
h.return_value
FROM
{{ ref('github_actions__task_schedule') }}
s
LEFT JOIN {{ ref('github_actions__task_history') }}
h
ON s.task_name = h.task_name
AND TO_TIMESTAMP_NTZ(DATE_TRUNC('minute', s.scheduled_time)) = TO_TIMESTAMP_NTZ(DATE_TRUNC('minute', h.scheduled_time))
AND try_to_number(h.return_value) between 200 and 299
AND h.state = 'SUCCEEDED'
ORDER BY
task_name,
scheduled_time
) BETWEEN 200
AND 299
AND h.state = 'SUCCEEDED'
ORDER BY
task_name,
scheduled_time
{% endmacro %}
{% macro gha_task_current_status_view() %}
WITH base AS (
SELECT
SELECT
task_name,
workflow_name,
scheduled_time,
return_value,
return_value IS NOT NULL AS was_successful
FROM {{ ref('github_actions__task_performance') }}
QUALIFY row_number() OVER (PARTITION BY task_name ORDER BY scheduled_time DESC) <= 2
FROM
{{ ref('github_actions__task_performance') }}
qualify ROW_NUMBER() over (
PARTITION BY task_name
ORDER BY
scheduled_time DESC
) <= 2
)
SELECT
task_name,
workflow_name,
MAX(scheduled_time) AS recent_scheduled_time,
MIN(scheduled_time) AS prior_scheduled_time,
SUM(IFF(return_value = 204, 1, 0)) AS successes,
successes > 0 AS pipeline_active
FROM base
GROUP BY task_name, workflow_name
SELECT
task_name,
workflow_name,
MAX(scheduled_time) AS recent_scheduled_time,
MIN(scheduled_time) AS prior_scheduled_time,
SUM(IFF(return_value = 204, 1, 0)) AS successes,
successes > 0 AS pipeline_active
FROM
base
GROUP BY
task_name,
workflow_name
{% endmacro %}
{% macro alter_gha_task(task_name, task_action) %}
{% macro alter_gha_task(
task_name,
task_action
) %}
{% set sql %}
EXECUTE IMMEDIATE
'ALTER TASK IF EXISTS github_actions.{{ task_name }} {{ task_action }};'
{% endset %}
EXECUTE IMMEDIATE 'ALTER TASK IF EXISTS github_actions.{{ task_name }} {{ task_action }};' {% endset %}
{% do run_query(sql) %}
{% endmacro %}
{% endmacro %}