Airflow allows multiple ways to keep the users informed about the status of a task. There is no one size fit all solution when it comes to sending emails from airflow. We will deep dive into all the m
Now that you understand what Airflow is, let's discuss how you can send emails to update your team about the status of a task or send reports using airflow.
Sending Email from Airflow
There is no one size fit all solution when it comes to sending emails from airflow. We will deep dive into all the methods available and the pros and cons of each in the following sections.
If you're not sure what an SMTP server is or how to configure one. Checkout the SendGrid blog
All the methods below need an SMTP server, and the same to be configured in the airflow.cfg file.
Email operators and email options are the most simple and easy way to send emails from airflow. The only drawback is these options are limited in customization.
One common use case for sending emails is to send reports of tasks executed in the pipeline. For such cases, you might want to construct an email body based on the success or failure of tasks.
Using Callback
The above use case can be achieved using a callback mechanism. Let's start by writing a generic function to send an email. Different callback mechanisms can repurpose this function.
def send_email(**context):
task = context['ti'].task
for parent_task in task.upstream_list:
ti = TaskInstance(parent_task, args.execution_date)
if ti.current_state() == "FAILURE":
status = "Failed"
break
else:
status = "Successful"
subject = "Order {status}""
body = f"""
Hi {user}, <br>
# Type your message here
<br> Thank You. <br>
"""
send_email(dag.default_args["email"], subject, body)
Task Level callback
Each task in Airflow comes with callbacks for success or failure of tasks. We can define this callback function to send an email per task.
This works well when your pipeline is small or when you want the status of a particular task.
But oftentimes, we want to email about the status of the whole pipeline.
Just like tasks, DAGs also have callbacks. This method will be called after the completion of all tasks on the DAG.
dag = DAG(
dag_id='example_dag',
start_date=datetime(2020, 1, 1),
on_failure_callback=send_email
)
While callbacks completely fit our purpose, there is still one problem. With callbacks, we lose the advantage we have by treating them as an independent task.
Whether the email sending was a success or failure?
how long did it take to send the email?
What are the logs?
When did it run?
How many emails sent?
PythonOperator
To achieve the combined benefits of customization and added advantage of airflow task, we can couple the above send_email function to an airflow PythonOperator.