mirror of
https://github.com/FlipsideCrypto/flow-models.git
synced 2026-02-06 15:02:11 +00:00
* new workflow * print run status * add workflow dispatch for testing * add temp default for inputs * add temp default for inputs in steps * modify echo status after run * add outcome * print data * pull echo into separate step * define dbt_run_status * run again * try intentional failed dbt command * add emoji to status * fix single quote * pass data directly * del testing defaults * del unused imports
68 lines
2.1 KiB
Python
68 lines
2.1 KiB
Python
import requests
|
|
import json
|
|
import os
|
|
|
|
def create_message(**kwargs):
|
|
messageBody = {
|
|
"text": f"Ad hoc :dbt: job run completed for :{os.environ.get('DATABASE').split('_DEV')[0]}: {os.environ.get('DATABASE')}",
|
|
"attachments": [
|
|
{
|
|
"color": kwargs["color"],
|
|
"fields": [
|
|
{
|
|
"title": "Environment",
|
|
"value": kwargs["environment"],
|
|
"short": True
|
|
},
|
|
{
|
|
"title": "Warehouse",
|
|
"value": kwargs["warehouse"],
|
|
"short": True
|
|
},
|
|
{
|
|
"title": "Elapsed Time",
|
|
"value": kwargs["elapsed_time"],
|
|
"short": True
|
|
},
|
|
{
|
|
"title": "Run status",
|
|
"value": f"{':check:' if kwargs['dbt_run_status'] == 'success' else ':x:'} {kwargs['dbt_run_status'].capitalize()}",
|
|
"short": True
|
|
},
|
|
{
|
|
"title": "dbt Command",
|
|
"value": kwargs["dbt_command"],
|
|
"short": False
|
|
}
|
|
]
|
|
}
|
|
]
|
|
}
|
|
|
|
return messageBody
|
|
|
|
|
|
def send_alert(webhook_url, data):
|
|
"""Sends a message to a slack channel"""
|
|
|
|
send_message = create_message(
|
|
**data,
|
|
color="#008080" if data["dbt_run_status"] == "success" else "#FF0000"
|
|
)
|
|
|
|
x = requests.post(webhook_url, json=send_message)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
data = {
|
|
"environment": os.environ.get("ENVIRONMENT"),
|
|
"database": os.environ.get("DATABASE"),
|
|
"warehouse": os.environ.get("WAREHOUSE"),
|
|
"dbt_command": os.environ.get("DBT_COMMAND"),
|
|
"elapsed_time": os.environ.get("ELAPSED_TIME"),
|
|
"dbt_run_status": os.environ.get("DBT_RUN_STATUS")
|
|
}
|
|
|
|
webhook_url = os.environ.get("SLACK_WEBHOOK_URL")
|
|
send_alert(webhook_url, data)
|