I am trying to send myself an email whenever a task fails but I am having some issues. Here is my code for context:
failure_notification_task.py
from prefect_email import email_send_message, EmailServerCredentials
from prefect import task
from dotenv import dotenv_values
def send_failure_email(email_address):
try:
email_server_credentials = EmailServerCredentials.load("google-email")
except:
env_vars = dotenv_values()
credentials = EmailServerCredentials(
username=env_vars.get('GOOGLE_EMAIL_ADDRESS'),
password=env_vars.get('GOOGLE_APP_PASS'),
)
credentials.save("google-email", overwrite=True)
email_server_credentials = EmailServerCredentials.load("google-email")
email_send_message.with_options(name=f"email {email_address}").submit(
email_server_credentials=email_server_credentials,
subject="Pipeline Failure Notification",
msg=f"this task has encountered a failure.",
email_to=email_address,
)
@task()
def failed_task():
raise ValueError
main.py
from prefect import Flow
from tasks.weather_api_task import fetch_weather_api, format_responses
from tasks.azure_blob_task import connect_to_blob, get_container, upload_blob
from tasks.failure_notification_task import failed_task, send_failure_email
from dotenv import dotenv_values
from datetime import datetime
# Load environment variables from the .env file
env_vars = dotenv_values()
# Retrieve the values using the keys
weather_api_key = env_vars.get('WEATHER_API_KEY')
storage_account_key = env_vars.get('AZURE_STORAGE_KEY')
storage_account_name = env_vars.get('ACCOUNT_NAME')
# Set url and location parameter
url = "http://api.weatherstack.com/current"
locations = ["Raleigh, United States", "Halifax, Canada", "Mumbai, India"]
# Set container name
container_name = "jistpoc"
# Set blob name
now = datetime.now()
now_str = now.strftime("%Y-%m-%d %H:%M:%S")
blob_name = "weather_data_" + now_str
#if failed_task fails then send this email to this address
failed_task.on_failure(send_failure_email('[email protected]'))
# Define the weather_flow
@Flow
def weather_flow():
responses = fetch_weather_api(url=url, weather_api_key=weather_api_key, locations=locations)
blob_data = format_responses(responses=responses)
blob_service_client = connect_to_blob(storage_account_name=storage_account_name, storage_account_key=storage_account_key)
container_client = get_container(blob_service_client=blob_service_client, container_name=container_name)
upload_blob(container_client=container_client, blob_data=blob_data, blob_name=blob_name)
failed_task()
# Run the weather_flow
weather_flow._run()
I created a fake task called failed_task that raises an error and then I use this line to email myself on failure:
#if failed_task fails then send this email to this address
failed_task.on_failure(send_failure_email('[email protected]'))
I keep getting this error though:
RuntimeError: Tasks cannot be run outside of a flow. To call the underlying task function outside of a flow use task.fn()
.
I have been banging my head against a wall trying to solve this. Can anyone with knowledge of prefect.io be of assistance?
Thanks!