Airflow Slack



Recently, I came across an annoying problem. One of our Airflow DAGs were not scheduling tasks. The issue looked very strange because it wasn’t happening all the time.

In the case of some DAG runs, everything was running normally. On other occasions, Airflow was scheduling and running half of the tasks, but the other half got stuck in the no_status state.

I could run the task manually. When I did that, the manually triggered task was doing its job, but the next task was not getting scheduled either.

My two mistakes

Module Contents¶ class airflow.hooks.slackhook.SlackHook (token=None, slackconnid=None) source ¶. Bases: airflow.hooks.basehook.BaseHook Interact with Slack, using slackclient library.

I was trying to find the problem by looking at the code and comparing it with other DAGs that don’t have the same issue.

  • Airflow has a built-in capability to send alerts on emails but well it gets lost in the pile of other 1000 unread emails. And it is just easier to get alerts where your entire team has an eye on —.
  • This is a backport providers package for slack provider. All classes for this provider package are in airflow.providers.slack python package. Only Python 3.6+ is supported for this backport package. While Airflow 1.10. continues to support Python 2.7+ - you need to upgrade python to 3.6+ if you want to use this backport package.

Here, I made the first mistake. I could not spot the difference between normally running DAGs and the faulty one, even though there was a difference in the DAG configuration.

I SSHed to the Airflow scheduler to look at the scheduler log. I saw that the scheduler was printing ValueError in the log because it could not parse the value of enum in our code.

At this point, I made the second mistake. I noted it down as an issue to fix later when I finish dealing with not working scheduler and moved on.

Some failing code which I ignored

I did not pay attention to the error in the log because it occurred in a custom function we wrote to send notifications about errors, SLA misses, etc. to Slack channels. The function gets the type of failure and the DAG owner to figure out which Slack channel should receive the notification.

In addition to the function, we have an Owner enum which looks like this:

The ValueError was raised when we tried to parse the DAG owner value: Owner(dag.owner). It was failing because, for some reason, a few of our DAGs (including the failing one) had owners set to Airflow, DE Team.

It was strange, but again, I decided to deal with it later when I fix the issue with scheduling.

Long and fruitless debugging

In the DAG configuration, we were intentionally limiting the number of DAG runs and the running tasks. We have set the max_active_runs to 1, disabled the Airflow “catch up” feature, and limited the task concurrency to 1.

Because of that, my debugging attempts focused mostly on figuring out how those three parameters interact with each other and break task scheduling. I was sure it had to be caused by those settings. I just had to find the issue.

When my attempts to tweak concurrency parameters failed, I started changing the DAG structure. That one DAG was kind of complicated. It started with a few tasks running sequentially. After that, the tasks branched out to share the common upstream dependency. In the next step, the task paths merged again because of a common downstream task, run some additional steps sequentially, and branched out again in the end.

The dependencies definition looked like this (note that I changed variable names, task_5 is a BranchPythonOperator that picks one of two branches):

I concluded that I could define it as a sequence of tasks. After all, I was not going to let multiple tasks run at the same time because that would mess up the result. So I put all of the tasks in a long sequence:

The only branching left was the BranchPythonOperator, but the tasks in the second group were running in a sequence.

It did not solve the problem.

The difference that should not matter

I was looking at the differences between the tasks again. This time, I focused on the DAG owners. For some reason, the faulty DAG was owned by Airflow and DE Team at the same time. That was strange because we always assign the owner to one of the teams.

I searched for the code that sets Airflow as the DAG owner. I could not find it, so it had to be somewhere in the Airflow configuration. There it was. In the operators section of the airflow.cfg file, I saw default_owner = Airflow.

But why does it use the default owner? When we create a new instance of DAG, we explicitly pass the owner’s name. There were no Airflow, just “DE Team!” So what was wrong?

I cloned the Airflow source code and began the search for DAG owner. The property is defined in the dag.py file and looks like this:

Why? Why are some of my tasks without an owner? I was clicking every single task in the Airflow UI to check its owner.

At the end of my DAG, in the second_group of tasks, I found the problem: Airflow as the owner. I looked at the source code of my DAG and noticed that all of the tasks assigned to the default owner don’t have the dag parameter specified.

If they were instances of the DummyOperator, the code would look like this:

Airflow Slack

instead of:

I added the dag to all of the tasks and redeployed the DAG configuration. I did not expect anything to happen, but the previously stuck task instance started running! That was strange, but maybe DAG redeployment triggered it. I was sure that it is going to get stuck again after processing the currently running task, but it did not happen! The DAG was running!

You gotta be kidding me

How could the DAG owner prevent the scheduler from scheduling the tasks? It could not be the real cause of the problem! After all, the DAG owner was wrong all of the time, but the DAG wasn’t always getting stuck.

I opened the Airflow UI again and looked for differences between the DAG runs that ran successfully and the ones that were getting stuck. It looked that the faulty DAG runs had more data to process, but how could it affect scheduling, and how is it related to the DAG owner?

I looked at our code that sends notifications to see when do we send them. There were only two cases: when a task failed and when we miss an SLA. I checked the DAG runs again. All of the faulty ones were missing the SLA.

A series of unfortunate events

When our tasks were running for too long, our code tried to send a notification to the Slack channel. This function was failing because we had an invalid value of the DAG owner property, and it did not match any of the Owner enum values. Because of that, Airflow was continuously trying to send the notification. As a result, Airflow could not execute the code that schedules the tasks because of a ValueError happening earlier.

All of that happened, because I did not set the dag property of some tasks and Airflow assigned them to the default owner.

The hypothesis sounds crazy, but could it be true? How do I test it?

To verify my assumptions, I created a simple test DAG:

As expected, this DAG was running without any issues.

In the next test, I removed the dag property of the second task. This time, the first task ran successfully, but it exceeded the SLA. Airflow was trying to notify me about it, but the code was failing because of a ValueError caused by the invalid DAG owner. As a consequence, the second task got stuck in the no_status state. Everything was just like my production DAG.

Prevention

We do code reviews, but somehow nobody noticed the missing property. More reviews or more procedures will not help us avoid making the same mistake in the future.

I decided that the best solution is to overwrite the notification recipient when we can’t get it from the DAG properties.

In the function that sends notifications, I added a try ... except block which sends an additional notification about invalid DAG owner configuration to our Slack channel and changes the recipient of the original error message to DE Team:

Session video

Proposal video

Airflow Slack

Ananth Durai

@vananth22

Operating data pipeline using Airflow @ Slack

Submitted May 9, 2018

Slack is a communication and collaboration platform for teams. Our millions of users spend 10+ hrs connected to the service on a typical working day.

The Slack data engineering team goal is simple: Drive up speed, efficiency, and reliability of making data-informed decisions. For engineers, For people managers, For salespeople, For every slack customer.

Airflow is the core system in our data infrastructure to orchestrate our data pipeline. We use Airflow to schedule Hive/ Tez, spark, Flink and TensorFlow applications. Airflow helps us to manage our stream processing, statistical analytics, machine learning, and deep learning pipelines.

About six months back, we started on-call rotation for our data pipeline to adopt what we learned from devops paradigm. We found out several airflow performance bottleneck and operational inefficiency that’s been siloed with ad-hoc pipeline management.

Airflow Slack Hook

In this talk, I will speak about how we identified Airflow performance issues and fixed it. I will talk about our experience as we thrive to resolve our on-call nightmares and make data pipeline simpler and pleasant to operate and the hacks we did to improve alerting and visibility of our data pipeline.

Airflow

Though the talk tune towards Airflow, the principles we applied for data pipeline visibility engineering is more generic and can apply to any tools/ data pipeline.

Outline

  • Intro to slack and the data engineering team
  • problem statement and the customer complaints.
  • Overview of Airflow infrastructure and deployment workflow
  • Scale Airflow Local Executor.
  • Data pipeline operations.
  • Alerting and monitoring data pipeline.

Requirements

Airflow Slack Integration

The audience expected to have some basic understanding of how Airflow works.
The airflow official documentation is a good starting point https://airflow.apache.org/
Our friends at Robinhood wrote an excellent blog post describing why they use Airflow. https://robinhood.engineering/why-robinhood-uses-airflow-aed13a9a90c8

Speaker bio

I work Senior data engineer at Slack manage core data infrastructures like Airflow, Kafka, Flink, and Pinot. I love talking about all things ethical data management.

Links

Slides