updated macros and makefile

This commit is contained in:
Mike Stepanovic 2025-05-07 12:29:18 -06:00
parent 7bd8946873
commit 9ba7cd8fbf
6 changed files with 314 additions and 1 deletions

View File

@ -0,0 +1,19 @@
{% macro drop_github_actions_schema() %}
{% set show_tasks_query %}
SHOW TASKS IN SCHEMA {{ target.database }}.github_actions;
{% endset %}
{% set results = run_query(show_tasks_query) %}
{% if execute %}
{% for task in results %}
{% set drop_task_sql %}
DROP TASK IF EXISTS {{ target.database }}.github_actions.{{ task[1] }};
{% endset %}
{% do run_query(drop_task_sql) %}
{% do log("Dropped task '" ~ task[1] ~ "'", info=true) %}
{% endfor %}
{% endif %}
{% endmacro %}

View File

@ -0,0 +1,90 @@
{% macro generate_workflow_schedules(chainhead_schedule) %}
{# Get all variables from return_vars #}
{% set vars = return_vars() %}
{# Parse chainhead schedule with safety checks #}
{% set chainhead_components = chainhead_schedule.split(' ') %}
{% set chainhead_minutes = chainhead_components[0] if chainhead_components|length > 0 else '0' %}
{% set chainhead_minutes_list = chainhead_minutes.split(',') | map('int') | list %}
{% set max_chainhead_minute = chainhead_minutes_list | max if chainhead_minutes_list|length > 0 else 0 %}
{# Generate a repo_id based on database name length to ensure unique schedules #}
{% set db_name = target.database %}
{% set repo_id = db_name|length % 12 %}
{# Helper function for root offset, in minutes #}
{% set root_offset = {} %}
{% for offset in range(0, 60) %}
{% do root_offset.update({offset: ((max_chainhead_minute + offset) % 60) | string}) %}
{% endfor %}
{# Schedule templates with complete cron format #}
{% set schedule_templates = {
'hourly': '{minute} * * * *',
'every_4_hours': '{minute} */4 * * *',
'daily': '{minute} {hour} * * *',
'weekly': '{minute} {hour} * * {day}',
'monthly': '{minute} {hour} 28 * *'
} %}
{# Define workflow definitions #}
{% set workflow_definitions = [
{'name': 'dbt_run_streamline_chainhead', 'cadence': 'root', 'root_schedule': chainhead_schedule},
{'name': 'dbt_run_scheduled_main', 'cadence': 'hourly', 'root_offset': 15},
{'name': 'dbt_run_scheduled_decoder', 'cadence': 'hourly', 'root_offset': 40},
{'name': 'dbt_run_scheduled_curated', 'cadence': 'every_4_hours', 'root_offset': 30},
{'name': 'dbt_run_scheduled_abis', 'cadence': 'daily', 'root_offset': 20, 'hour': 1},
{'name': 'dbt_run_scheduled_scores', 'cadence': 'daily', 'root_offset': 35, 'hour': 2},
{'name': 'dbt_test_daily', 'cadence': 'daily', 'root_offset': 50, 'hour': 3},
{'name': 'dbt_test_intraday', 'cadence': 'every_4_hours', 'root_offset': 50},
{'name': 'dbt_test_monthly', 'cadence': 'monthly', 'root_offset': 20, 'hour': 1},
{'name': 'dbt_run_heal_models', 'cadence': 'weekly', 'root_offset': 45, 'hour': 6, 'day': 0},
{'name': 'dbt_run_full_observability', 'cadence': 'monthly', 'root_offset': 25, 'hour': 2},
{'name': 'dbt_run_dev_refresh', 'cadence': 'weekly', 'root_offset': 40, 'hour': 7, 'day': 1},
{'name': 'dbt_run_streamline_decoder_history', 'cadence': 'weekly', 'root_offset': 30, 'hour': 3, 'day': 6}
] %}
{# Generate all workflow schedules #}
{% for workflow in workflow_definitions %}
{# Extract workflow name to create variable name for override #}
{% set workflow_name = workflow.name %}
{% if workflow_name.startswith('dbt_run') %}
{% set workflow_name = workflow_name[8:] %}
{% elif workflow_name.startswith('dbt_test') %}
{% set workflow_name = workflow_name[4:] %}
{% endif %}
{# Create variable name for override functionality, which matches variable names set in return_vars() #}
{% set override_cron_var = 'MAIN_GHA_' + workflow_name.upper() + '_CRON' %}
{# Helper variables for template replacement #}
{% set template = schedule_templates[workflow.cadence] %}
{% set minute_val = root_offset[workflow.root_offset] %}
{% set hour_val = (workflow.get('hour', 0) + repo_id) % 24 %}
{% set day_val = workflow.get('day', 0) %}
SELECT
'{{ workflow.name }}' AS workflow_name,
{% if workflow.cadence == 'root' %}
'{{ workflow.root_schedule }}'
{% else %}
{% if vars[override_cron_var] is defined and vars[override_cron_var] is not none %}
'{{ vars[override_cron_var] }}'
{% elif workflow.cadence == 'hourly' or workflow.cadence == 'every_4_hours' %}
'{{ template.replace("{minute}", minute_val) }}'
{% elif workflow.cadence == 'daily' or workflow.cadence == 'monthly' %}
'{{ template.replace("{minute}", minute_val).replace("{hour}", hour_val | string) }}'
{% elif workflow.cadence == 'weekly' %}
'{{ template.replace("{minute}", minute_val).replace("{hour}", hour_val | string).replace("{day}", day_val | string) }}'
{% endif %}
{% endif %} AS cron_schedule,
'{{ workflow.cadence }}' AS cadence
{% if not loop.last %}
UNION ALL
{% endif %}
{% endfor %}
{% endmacro %}

View File

@ -0,0 +1,120 @@
{% macro create_workflow_table(workflow_values) %}
{# Intended to be called via the make deploy_gha_workflows_table command in the Makefile #}
{% set create_schema_sql %}
CREATE SCHEMA IF NOT EXISTS github_actions;
{% endset %}
{% do run_query(create_schema_sql) %}
{% set update_table_sql %}
CREATE OR REPLACE TABLE {{target.database}}.github_actions.workflows AS
WITH source_data AS (
SELECT column1 as workflow_name
FROM VALUES
{{ workflow_values }}
)
SELECT
workflow_name,
SYSDATE() as inserted_at
FROM source_data;
{% endset %}
{% do run_query(update_table_sql) %}
{% set prod_db = target.database.lower().replace('_dev', '') %}
{% set grant_sql %}
GRANT USAGE ON SCHEMA {{target.database}}.github_actions TO ROLE INTERNAL_DEV;
GRANT USAGE ON SCHEMA {{target.database}}.github_actions TO ROLE DBT_CLOUD_{{ prod_db }};
GRANT SELECT ON TABLE {{target.database}}.github_actions.workflows TO ROLE INTERNAL_DEV;
GRANT SELECT ON TABLE {{target.database}}.github_actions.workflows TO ROLE DBT_CLOUD_{{ prod_db }};
{% endset %}
{% do run_query(grant_sql) %}
{% do log("Table github_actions.workflows updated successfully with grants applied.", info=True) %}
{% endmacro %}
{% macro create_gha_tasks() %}
{% set query %}
SELECT
task_name,
workflow_name,
cron_schedule
FROM
{{ ref('github_actions__workflow_schedule') }}
{% endset %}
{% set results = run_query(query) %}
{% if execute and results is not none %}
{% set results_list = results.rows %}
{% else %}
{% set results_list = [] %}
{% endif %}
{% set prod_db = target.database.lower().replace('_dev', '') %}
{% set created_tasks = [] %}
{% for result in results_list %}
{% set task_name = result[0] %}
{% set workflow_name = result[1] %}
{% set cron_schedule = result[2] %}
{% do log("Task: " ~ task_name ~ ", Schedule: " ~ cron_schedule, info=true) %}
{% set sql %}
EXECUTE IMMEDIATE 'CREATE OR REPLACE TASK github_actions.{{ task_name }} WAREHOUSE = DBT_CLOUD SCHEDULE = ''USING CRON {{ cron_schedule }} UTC'' COMMENT = ''Task to trigger {{ workflow_name }}.yml workflow according to {{ cron_schedule }}'' AS DECLARE rs resultset; output string; BEGIN rs := (SELECT github_actions.workflow_dispatches(''FlipsideCrypto'', ''{{ prod_db }}-models'', ''{{ workflow_name }}.yml'', NULL):status_code::int AS status_code); SELECT LISTAGG($1, '';'') INTO :output FROM TABLE(result_scan(LAST_QUERY_ID())) LIMIT 1; CALL SYSTEM$SET_RETURN_VALUE(:output); END;'
{% endset %}
{% do run_query(sql) %}
{% do created_tasks.append(task_name) %}
{% endfor %}
{# Optionally, resume tasks if the variable is set #}
{% if var('RESUME_GHA_TASKS', false) %}
{% do log("Tasks created in RESUME state. Use var RESUME_GHA_TASKS: false to automatically suspend them.", info=true) %}
{% for task_name in created_tasks %}
{% set resume_task_sql %}
ALTER TASK github_actions.{{ task_name }} RESUME;
{% endset %}
{% do run_query(resume_task_sql) %}
{% do log("Resumed task: " ~ task_name, info=true) %}
{% endfor %}
{% else %}
{% do log("Tasks created in SUSPENDED state. Use var RESUME_GHA_TASKS: true to automatically resume them.", info=true) %}
{% endif %}
{% endmacro %}
{% macro alter_gha_tasks(
task_names,
task_action
) %}
{% set task_list = task_names.split(',') %}
{% for task_name in task_list %}
{% set task_name = task_name.strip() %}
{% set sql %}
EXECUTE IMMEDIATE 'ALTER TASK IF EXISTS github_actions.{{ task_name }} {{ task_action }};'
{% endset %}
{% do run_query(sql) %}
{% endfor %}
{% endmacro %}
{% macro alter_all_gha_tasks(task_action) %}
{% set query %}
SELECT
task_name
FROM
{{ ref('github_actions__workflow_schedule') }}
{% endset %}
{% set results = run_query(query) %}
{% if execute and results is not none %}
{% set results_list = results.rows %}
{% else %}
{% set results_list = [] %}
{% endif %}
{% for result in results_list %}
{% set task_name = result[0] %}
{% set sql %}
EXECUTE IMMEDIATE 'ALTER TASK IF EXISTS github_actions.{{ task_name }} {{ task_action }};'
{% endset %}
{% do run_query(sql) %}
{% endfor %}
{% endmacro %}

View File

@ -0,0 +1,55 @@
{% macro livequery_grants() %}
{% set vars = return_vars() %}
{% set target_db = target.database | upper %}
{% set project = vars.GLOBAL_PROJECT_NAME | upper %}
{% if var("UPDATE_UDFS_AND_SPS", false) %}
{% do run_query("GRANT USAGE ON SCHEMA " ~ target_db ~ "._live TO AWS_LAMBDA_" ~ project ~ "_API;") %}
{% do run_query("GRANT USAGE ON ALL FUNCTIONS IN SCHEMA " ~ target_db ~ "._live TO AWS_LAMBDA_" ~ project ~ "_API;") %}
{{ log("Permissions granted to role AWS_LAMBDA_" ~ project ~ "_API for schema " ~ target_db ~ "._live", info=True) }}
{% do run_query("GRANT USAGE ON SCHEMA " ~ target_db ~ "._live TO DBT_CLOUD_" ~ project ~ ";") %}
{% do run_query("GRANT USAGE ON ALL FUNCTIONS IN SCHEMA " ~ target_db ~ "._live TO DBT_CLOUD_" ~ project ~ ";") %}
{{ log("Permissions granted to role DBT_CLOUD_" ~ project ~ " for schema " ~ target_db ~ "._live", info=True) }}
{% do run_query("GRANT USAGE ON SCHEMA " ~ target_db ~ "._live TO INTERNAL_DEV;") %}
{% do run_query("GRANT USAGE ON ALL FUNCTIONS IN SCHEMA " ~ target_db ~ "._live TO INTERNAL_DEV;") %}
{{ log("Permissions granted to role INTERNAL_DEV for schema " ~ target_db ~ "._live", info=True) }}
{% do run_query("GRANT USAGE ON SCHEMA " ~ target_db ~ "._utils TO AWS_LAMBDA_" ~ project ~ "_API;") %}
{% do run_query("GRANT USAGE ON ALL FUNCTIONS IN SCHEMA " ~ target_db ~ "._utils TO AWS_LAMBDA_" ~ project ~ "_API;") %}
{{ log("Permissions granted to role AWS_LAMBDA_" ~ project ~ "_API for schema " ~ target_db ~ "._utils", info=True) }}
{% do run_query("GRANT USAGE ON SCHEMA " ~ target_db ~ "._utils TO DBT_CLOUD_" ~ project ~ ";") %}
{% do run_query("GRANT USAGE ON ALL FUNCTIONS IN SCHEMA " ~ target_db ~ "._utils TO DBT_CLOUD_" ~ project ~ ";") %}
{{ log("Permissions granted to role DBT_CLOUD_" ~ project ~ " for schema " ~ target_db ~ "._utils", info=True) }}
{% do run_query("GRANT USAGE ON SCHEMA " ~ target_db ~ "._utils TO INTERNAL_DEV;") %}
{% do run_query("GRANT USAGE ON ALL FUNCTIONS IN SCHEMA " ~ target_db ~ "._utils TO INTERNAL_DEV;") %}
{{ log("Permissions granted to role INTERNAL_DEV for schema " ~ target_db ~ "._utils", info=True) }}
{% else %}
{{ log("Error: Permission grants unsuccessful.", info=True) }}
{% endif %}
{% endmacro %}
{% macro drop_livequery_schemas() %}
{% set target_db = target.database | upper %}
{% if var("UPDATE_UDFS_AND_SPS", false) %}
{% do run_query("DROP SCHEMA IF EXISTS " ~ target_db ~ "._LIVE;") %}
{% do run_query("DROP SCHEMA IF EXISTS " ~ target_db ~ "._UTILS;") %}
{% do run_query("DROP SCHEMA IF EXISTS " ~ target_db ~ ".LIVE;") %}
{% do run_query("DROP SCHEMA IF EXISTS " ~ target_db ~ ".UTILS;") %}
{{ log("Schemas dropped successfully.", info=True) }}
{% else %}
{{ log("Error: DROP SCHEMA unsuccessful.", info=True) }}
{% endif %}
{% endmacro %}

View File

@ -2,7 +2,7 @@
{% set vars = { {% set vars = {
'GLOBAL_PROJECT_NAME': 'cosmos', 'GLOBAL_PROJECT_NAME': 'cosmos',
'GLOBAL_NODE_PROVIDER': 'quicknode', 'GLOBAL_NODE_PROVIDER': 'quicknode',
'GLOBAL_NODE_VAULT_PATH': 'Vault/prod/cosmos/quicknode/mainnet', 'GLOBAL_NODE_VAULT_PATH': 'vault/prod/cosmos/quicknode/mainnet',
'GLOBAL_NODE_URL': '{service}/{Authentication}', 'GLOBAL_NODE_URL': '{service}/{Authentication}',
'GLOBAL_WRAPPED_NATIVE_ASSET_ADDRESS': '', 'GLOBAL_WRAPPED_NATIVE_ASSET_ADDRESS': '',
'MAIN_SL_BLOCKS_PER_HOUR': 3600 'MAIN_SL_BLOCKS_PER_HOUR': 3600

29
makefile Normal file
View File

@ -0,0 +1,29 @@
.PHONY: new_repo_tag
new_repo_tag:
@echo "Last 3 tags:"
@git tag -l --sort=-v:refname | head -n 3
@echo ""
@read -p "Enter new tag name (e.g., v1.1.0) or 'q' to quit: " tag_name; \
if [ "$$tag_name" = "q" ]; then \
echo "Operation cancelled."; \
exit 0; \
elif [ -n "$$tag_name" ]; then \
if git diff-index --quiet HEAD --; then \
echo "No changes to commit. Proceeding with tagging."; \
else \
git add . && \
git commit -m "Prepare release $$tag_name" && \
git push; \
fi; \
if git push --dry-run 2>&1 | grep -q "Everything up-to-date"; then \
echo "Remote is up-to-date. Skipping push."; \
else \
git push; \
fi; \
git tag -a $$tag_name -m "version $$tag_name" && \
git push origin --tags && \
echo "Tag $$tag_name created and pushed successfully."; \
else \
echo "No tag name entered. Operation cancelled."; \
fi