0

I am trying to send myself an email whenever a task fails but I am having some issues. Here is my code for context:

failure_notification_task.py

from prefect_email import email_send_message, EmailServerCredentials
from prefect import task
from dotenv import dotenv_values

def send_failure_email(email_address):
    try:
        email_server_credentials = EmailServerCredentials.load("google-email")
    except:
        env_vars = dotenv_values()
        credentials = EmailServerCredentials(
            username=env_vars.get('GOOGLE_EMAIL_ADDRESS'),
            password=env_vars.get('GOOGLE_APP_PASS'),
        )
        credentials.save("google-email", overwrite=True)
        email_server_credentials = EmailServerCredentials.load("google-email")

    email_send_message.with_options(name=f"email {email_address}").submit(
        email_server_credentials=email_server_credentials,
        subject="Pipeline Failure Notification",
        msg=f"this task has encountered a failure.",
        email_to=email_address,
    )

@task()
def failed_task():
    raise ValueError

main.py

from prefect import Flow
from tasks.weather_api_task import fetch_weather_api, format_responses
from tasks.azure_blob_task import connect_to_blob, get_container, upload_blob
from tasks.failure_notification_task import failed_task, send_failure_email
from dotenv import dotenv_values
from datetime import datetime

# Load environment variables from the .env file
env_vars = dotenv_values()

# Retrieve the values using the keys
weather_api_key = env_vars.get('WEATHER_API_KEY')
storage_account_key = env_vars.get('AZURE_STORAGE_KEY')
storage_account_name = env_vars.get('ACCOUNT_NAME')

# Set url and location parameter
url = "http://api.weatherstack.com/current"
locations = ["Raleigh, United States", "Halifax, Canada", "Mumbai, India"]

# Set container name
container_name = "jistpoc"

# Set blob name
now = datetime.now()
now_str = now.strftime("%Y-%m-%d %H:%M:%S")
blob_name = "weather_data_" + now_str

#if failed_task fails then send this email to this address
failed_task.on_failure(send_failure_email('[email protected]'))

# Define the weather_flow
@Flow
def weather_flow():
    responses = fetch_weather_api(url=url, weather_api_key=weather_api_key, locations=locations)
    blob_data = format_responses(responses=responses)
    blob_service_client = connect_to_blob(storage_account_name=storage_account_name, storage_account_key=storage_account_key)
    container_client = get_container(blob_service_client=blob_service_client, container_name=container_name)
    upload_blob(container_client=container_client, blob_data=blob_data, blob_name=blob_name)
    failed_task()
    


# Run the weather_flow
weather_flow._run()

I created a fake task called failed_task that raises an error and then I use this line to email myself on failure:

#if failed_task fails then send this email to this address
failed_task.on_failure(send_failure_email('[email protected]'))

I keep getting this error though:

RuntimeError: Tasks cannot be run outside of a flow. To call the underlying task function outside of a flow use task.fn().

I have been banging my head against a wall trying to solve this. Can anyone with knowledge of prefect.io be of assistance?

Thanks!

1 Answer 1

0

Your error message looks like it is one found in Prefect 2. But it looks like you are importing @Flow as a class instead of using the @flow (lowercase) decorator used in Prefect 2. You can check out the Prefect docs on flows for more info.

Beyond that, I would cut ).submit( in the section below. Assuming you are running a relatively recent version of Prefect 2, you don't need .submit unless submitting a task to a task runner.

    email_send_message.with_options(name=f"email {email_address}").submit(
        email_server_credentials=email_server_credentials,
        subject="Pipeline Failure Notification",
        msg=f"this task has encountered a failure.",
        email_to=email_address,
    )

The email_send_message is a task, so you do need to call it within a flow.

Finally, you want to just call the weather_flow function like this: weather_flow() (or in an if __name__=="__main__": block).

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Not the answer you're looking for? Browse other questions tagged or ask your own question.