new workflow/adhoc with alert (#334)

* new workflow

* print run status

* add workflow dispatch for testing

* add temp default for inputs

* add temp default for inputs in steps

* modify echo status after run

* add outcome

* print data

* pull echo into separate step

* define dbt_run_status

* run again

* try intentional failed dbt command

* add emoji to status

* fix single quote

* pass data directly

* del testing defaults

* del unused imports
This commit is contained in:
Jack Forgash 2024-06-20 10:40:48 -06:00 committed by GitHub
parent 12bf4e2e86
commit 64a996b2c7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 155 additions and 0 deletions

View File

@ -0,0 +1,88 @@
name: dbt_run_adhoc_with_alert
run-name: ${{ inputs.dbt_command }}
on:
workflow_dispatch:
inputs:
environment:
type: choice
description: DBT Run Environment
required: true
options:
- dev
- prod
default: dev
warehouse:
type: choice
description: Snowflake warehouse
required: true
options:
- DBT
- DBT_CLOUD
- DBT_EMERGENCY
default: DBT
dbt_command:
type: string
description: 'DBT Run Command'
required: true
env:
SLACK_WEBHOOK_URL: "${{ secrets.SLACK_WEBHOOK_URL }}"
USE_VARS: "${{ vars.USE_VARS }}"
DBT_PROFILES_DIR: "${{ vars.DBT_PROFILES_DIR }}"
DBT_VERSION: "${{ vars.DBT_VERSION }}"
ACCOUNT: "${{ vars.ACCOUNT }}"
ROLE: "${{ vars.ROLE }}"
USER: "${{ vars.USER }}"
PASSWORD: "${{ secrets.PASSWORD }}"
REGION: "${{ vars.REGION }}"
DATABASE: "${{ vars.DATABASE }}"
SCHEMA: "${{ vars.SCHEMA }}"
WAREHOUSE: "${{ inputs.warehouse }}"
ENVIRONMENT: "${{ inputs.environment }}"
DBT_COMMAND: "${{ inputs.dbt_command }}"
concurrency:
group: ${{ github.workflow }}
jobs:
run_dbt_jobs:
runs-on: ubuntu-latest
environment:
name: workflow_${{ inputs.environment }}
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: "3.10"
cache: "pip"
- name: install dependencies
run: |
pip install -r requirements.txt
dbt deps
- name: Run DBT Jobs
id: dbt_run_command
run: |
start_time=$(date +%s)
${{ inputs.dbt_command }}
end_time=$(date +%s)
elapsed_time=$(expr $end_time - $start_time)
echo "ELAPSED_TIME=$elapsed_time" >> $GITHUB_ENV
continue-on-error: true
- name: Log run status
run: echo "DBT_RUN_STATUS=${{ steps.dbt_run_command.outcome }}" >> $GITHUB_ENV
- name: Send Notification
run: |
python python/dbt_slack_notification.py
- name: Store logs
uses: actions/upload-artifact@v3
with:
name: dbt-logs
path: logs

View File

@ -0,0 +1,67 @@
import requests
import json
import os
def create_message(**kwargs):
messageBody = {
"text": f"Ad hoc :dbt: job run completed for :{os.environ.get('DATABASE').split('_DEV')[0]}: {os.environ.get('DATABASE')}",
"attachments": [
{
"color": kwargs["color"],
"fields": [
{
"title": "Environment",
"value": kwargs["environment"],
"short": True
},
{
"title": "Warehouse",
"value": kwargs["warehouse"],
"short": True
},
{
"title": "Elapsed Time",
"value": kwargs["elapsed_time"],
"short": True
},
{
"title": "Run status",
"value": f"{':check:' if kwargs['dbt_run_status'] == 'success' else ':x:'} {kwargs['dbt_run_status'].capitalize()}",
"short": True
},
{
"title": "dbt Command",
"value": kwargs["dbt_command"],
"short": False
}
]
}
]
}
return messageBody
def send_alert(webhook_url, data):
"""Sends a message to a slack channel"""
send_message = create_message(
**data,
color="#008080" if data["dbt_run_status"] == "success" else "#FF0000"
)
x = requests.post(webhook_url, json=send_message)
if __name__ == '__main__':
data = {
"environment": os.environ.get("ENVIRONMENT"),
"database": os.environ.get("DATABASE"),
"warehouse": os.environ.get("WAREHOUSE"),
"dbt_command": os.environ.get("DBT_COMMAND"),
"elapsed_time": os.environ.get("ELAPSED_TIME"),
"dbt_run_status": os.environ.get("DBT_RUN_STATUS")
}
webhook_url = os.environ.get("SLACK_WEBHOOK_URL")
send_alert(webhook_url, data)