Herding Celery Tasks: Building Workflows for Sequential Task Execution
In this post, I’ll tackle a straightforward problem: launching a Celery task X after all tasks Y have been completed. Task X needs to take the result of all the tasks Y as an input. While it seems like a brute-force solution would be to manually store each result of task Y, it’s more interesting to achieve this using Celery without manual database manipulations or extra code.
This guide is not meant to be an exhaustive or comprehensive manual; consider it more like a cheat sheet. Throughout the article, I’ll include links to relevant documentation, and I encourage you to first check the official docs and try everything out yourself. If you notice any inaccuracies or have suggestions for additions, feel free to reach out to me. For simplicity, I’ll use Dockerized Redis as the broker in this example, although in your particular case, the broker could be something else. For the sake of demonstration, the broker choice is not critical here.
The Task Setup
Let’s dive into the task. I’ve come up with a simple and clear example to demonstrate how such a workflow can be implemented in Celery. Let’s assume we have two types of tasks. The first task calculates the power of a number, and the second sums up the results of all the first type of tasks. In real life, you might encounter this pattern when, for example, one task generates individual PDF invoices for subscribers, and another task merges all those invoices into a single file.
Of course, we could store intermediate results in a database, and we could figure out a way to launch the second type of task after the last task of the first type completes. But once you start thinking about implementing such behavior, you quickly realize you’re entering the realm of clumsy and inflexible solutions. However, Celery offers a great solution that’s described in Canvas: Designing Workflows. I won’t bore you with the details — let’s get straight to the code.
Setting Up the Project
First, let’s create a folder called workflow
and an empty __init__.py
file inside it. Next, create a Celery application in the workflow
folder by adding a module named celery.py
. I’ll skip over the
unimportant details for the purposes of this article. Production code
will be more complex anyway.
|
|
Next, we need another important module, tasks. Inside the workflow
folder, create a tasks.py
file with the following content:
|
|
So far, the code is simple and self-explanatory, except for one thing:
I added a random delay in the power
function to simulate a complex
task. Otherwise, everything would execute too quickly, and it would be
hard to notice the asynchronous nature of the task.
The Fun Part: Using Celery Chords
Now comes the interesting part. First, we want to asynchronously
execute power
(say, ten times). Second, we need to pass the result of
all ten executions to amass
. In Celery, this is done using an
intriguing primitive called chord, which is described in The
Primitives section of the documentation. Chord isn’t a function but a
class — more specifically, it’s an alias for the class
celery.canvas._chord
.
Let’s create a producer.py
file in the workflow
folder with the
following content:
|
|
And that’s pretty much it! Our test project is ready. Here’s the final project structure:
|
|
Let’s run Redis and take a look at how this works. For the sake of this example, I’ll use a Dockerized Redis with default settings:
|
|
Now we’ll run Celery in worker mode, which will listen to our queue and make sure everything is ready to handle the tasks:
|
|
Finally, let’s fire up the producer and throw some tasks onto the queue:
|
|
At the time of writing this, in 2021, running Celery with the
configuration above will result in a NotImplementedError
with the
following message:
|
|
This makes sense. There’s no magic here — Celery needs somewhere to
store the task results. What confuses me a bit is the
NotImplementedError
, which implies that this feature might be
implemented later. Anyway, let’s open the workflow/celery.py
file and
modify it as follows:
|
|
For simplicity, I’ve chosen Redis as the result backend here as
well. You’ll need to restart the worker and re-run the producer after
adding the backend. Eventually, you should see the producer quickly
dispatch tasks and complete its work. If you had a separate terminal
open for the worker, you might notice that the worker continues
processing for a while after the producer finishes due to the sleep()
in the power()
task.
Additionally, the worker’s output will include something like this:
|
|
This proves that amass()
executed at the very end with the results
from the ten calls to power()
.
Diving Deeper into Chord
Let’s take a closer look at how we achieved this result. The most interesting part of what we wrote is the use of the chord primitive. As the documentation states, chord is designed for situations where you need to execute a list of tasks in parallel and then run a callback after all those tasks are done. It’s really that simple: we call chord(...)(...)
, where the first set of parentheses contains the list of tasks (the “header”), and the second set contains the callback (the “body”). In our case, the callback is amass.s()
.
Why are callbacks useful? Waiting for one task to finish before another starts is inefficient and can even lead to deadlocks if the worker pool is exhausted. Instead, it’s almost always better to use asynchronous design patterns like callbacks. To clarify, since chord is a class (an alias for a class), the following construction is equivalent:
|
|
The second line is legal due to the overload of __call__()
. Also, note that throughout this post, I’ve been using the task.s(...)
format, which doesn’t actually invoke the task but creates its signature for future use. Essentially, by passing a list of task signatures to chord, we serialize the tasks into the result backend, which is why we encountered the NotImplementedError earlier and needed to specify the backend like this: backend='redis://localhost'
.
What to Watch Out For
One important caveat: if even one task in the list fails, the callback registered with chord will never be executed. This is something you should handle separately.
All of the code I’ve described here is available as a working project, which you can find at the following link: GitHub Repository.