flow-models/python/dbt_slack_notification.py
Jack Forgash 64a996b2c7
new workflow/adhoc with alert (#334)
* new workflow

* print run status

* add workflow dispatch for testing

* add temp default for inputs

* add temp default for inputs in steps

* modify echo status after run

* add outcome

* print data

* pull echo into separate step

* define dbt_run_status

* run again

* try intentional failed dbt command

* add emoji to status

* fix single quote

* pass data directly

* del testing defaults

* del unused imports
2024-06-20 10:40:48 -06:00

68 lines
2.1 KiB
Python

import requests
import json
import os
def create_message(**kwargs):
messageBody = {
"text": f"Ad hoc :dbt: job run completed for :{os.environ.get('DATABASE').split('_DEV')[0]}: {os.environ.get('DATABASE')}",
"attachments": [
{
"color": kwargs["color"],
"fields": [
{
"title": "Environment",
"value": kwargs["environment"],
"short": True
},
{
"title": "Warehouse",
"value": kwargs["warehouse"],
"short": True
},
{
"title": "Elapsed Time",
"value": kwargs["elapsed_time"],
"short": True
},
{
"title": "Run status",
"value": f"{':check:' if kwargs['dbt_run_status'] == 'success' else ':x:'} {kwargs['dbt_run_status'].capitalize()}",
"short": True
},
{
"title": "dbt Command",
"value": kwargs["dbt_command"],
"short": False
}
]
}
]
}
return messageBody
def send_alert(webhook_url, data):
"""Sends a message to a slack channel"""
send_message = create_message(
**data,
color="#008080" if data["dbt_run_status"] == "success" else "#FF0000"
)
x = requests.post(webhook_url, json=send_message)
if __name__ == '__main__':
data = {
"environment": os.environ.get("ENVIRONMENT"),
"database": os.environ.get("DATABASE"),
"warehouse": os.environ.get("WAREHOUSE"),
"dbt_command": os.environ.get("DBT_COMMAND"),
"elapsed_time": os.environ.get("ELAPSED_TIME"),
"dbt_run_status": os.environ.get("DBT_RUN_STATUS")
}
webhook_url = os.environ.get("SLACK_WEBHOOK_URL")
send_alert(webhook_url, data)