Herding Celery Tasks: Building Workflows for Sequential Task Execution

6 minute read

In this post, I’ll tackle a straightforward problem: launching a Celery task X after all tasks Y have been completed. Task X needs to take the result of all the tasks Y as an input. While it seems like a brute-force solution would be to manually store each result of task Y, it’s more interesting to achieve this using Celery without manual database manipulations or extra code.

This guide is not meant to be an exhaustive or comprehensive manual; consider it more like a cheat sheet. Throughout the article, I’ll include links to relevant documentation, and I encourage you to first check the official docs and try everything out yourself. If you notice any inaccuracies or have suggestions for additions, feel free to reach out to me. For simplicity, I’ll use Dockerized Redis as the broker in this example, although in your particular case, the broker could be something else. For the sake of demonstration, the broker choice is not critical here.

The Task Setup

Let’s dive into the task. I’ve come up with a simple and clear example to demonstrate how such a workflow can be implemented in Celery. Let’s assume we have two types of tasks. The first task calculates the power of a number, and the second sums up the results of all the first type of tasks. In real life, you might encounter this pattern when, for example, one task generates individual PDF invoices for subscribers, and another task merges all those invoices into a single file.

Of course, we could store intermediate results in a database, and we could figure out a way to launch the second type of task after the last task of the first type completes. But once you start thinking about implementing such behavior, you quickly realize you’re entering the realm of clumsy and inflexible solutions. However, Celery offers a great solution that’s described in Canvas: Designing Workflows. I won’t bore you with the details — let’s get straight to the code.

Setting Up the Project

First, let’s create a folder called workflow and an empty __init__.py file inside it. Next, create a Celery application in the workflow folder by adding a module named celery.py. I’ll skip over the unimportant details for the purposes of this article. Production code will be more complex anyway.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
from celery import Celery

app = Celery(
    'workflow',
    broker='redis://localhost',
    include=['workflow.tasks']
)

if __name__ == '__main__':
    app.start()

Next, we need another important module, tasks. Inside the workflow folder, create a tasks.py file with the following content:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
from time import sleep
import random
from .celery import app

@app.task
def power(value, expo):
    sleep(random.randint(10, 1000) / 100.0)
    return value ** expo

@app.task
def amass(values):
    print(sum(values))

So far, the code is simple and self-explanatory, except for one thing: I added a random delay in the power function to simulate a complex task. Otherwise, everything would execute too quickly, and it would be hard to notice the asynchronous nature of the task.

The Fun Part: Using Celery Chords

Now comes the interesting part. First, we want to asynchronously execute power (say, ten times). Second, we need to pass the result of all ten executions to amass. In Celery, this is done using an intriguing primitive called chord, which is described in The Primitives section of the documentation. Chord isn’t a function but a class — more specifically, it’s an alias for the class celery.canvas._chord.

Let’s create a producer.py file in the workflow folder with the following content:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
from celery import chord
from .tasks import amass, power

def main():
    tasks = (power.s(i, 2) for i in range(10))
    callback = amass.s()

    return chord(tasks)(callback)

if __name__ == '__main__':
    print('Start testing workflow...')
    main()

And that’s pretty much it! Our test project is ready. Here’s the final project structure:

1
2
3
4
5
6
7
8
9
$ tree -a
.
└── workflow
    ├── __init__.py
    ├── celery.py
    ├── producer.py
    └── tasks.py

1 directory, 4 files

Let’s run Redis and take a look at how this works. For the sake of this example, I’ll use a Dockerized Redis with default settings:

1
$ docker run -d -p 6379:6379 redis

Now we’ll run Celery in worker mode, which will listen to our queue and make sure everything is ready to handle the tasks:

1
$ celery -A workflow worker --loglevel=INFO

Finally, let’s fire up the producer and throw some tasks onto the queue:

1
$ python -m workflow.producer

At the time of writing this, in 2021, running Celery with the configuration above will result in a NotImplementedError with the following message:

1
NotImplementedError: Starting chords requires a result backend to be configured.

This makes sense. There’s no magic here — Celery needs somewhere to store the task results. What confuses me a bit is the NotImplementedError, which implies that this feature might be implemented later. Anyway, let’s open the workflow/celery.py file and modify it as follows:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
from celery import Celery

app = Celery(
    'workflow',
    broker='redis://localhost',
    backend='redis://localhost',
    include=['workflow.tasks']
)

if __name__ == '__main__':
    app.start()

For simplicity, I’ve chosen Redis as the result backend here as well. You’ll need to restart the worker and re-run the producer after adding the backend. Eventually, you should see the producer quickly dispatch tasks and complete its work. If you had a separate terminal open for the worker, you might notice that the worker continues processing for a while after the producer finishes due to the sleep() in the power() task.

Additionally, the worker’s output will include something like this:

1
2
[2021-09-15 01:16:11,208: WARNING/ForkPoolWorker-8] 285
[2021-09-15 01:16:11,209: WARNING/ForkPoolWorker-8]

This proves that amass() executed at the very end with the results from the ten calls to power().

Diving Deeper into Chord

Let’s take a closer look at how we achieved this result. The most interesting part of what we wrote is the use of the chord primitive. As the documentation states, chord is designed for situations where you need to execute a list of tasks in parallel and then run a callback after all those tasks are done. It’s really that simple: we call chord(...)(...), where the first set of parentheses contains the list of tasks (the “header”), and the second set contains the callback (the “body”). In our case, the callback is amass.s().

Why are callbacks useful? Waiting for one task to finish before another starts is inefficient and can even lead to deadlocks if the worker pool is exhausted. Instead, it’s almost always better to use asynchronous design patterns like callbacks. To clarify, since chord is a class (an alias for a class), the following construction is equivalent:

1
2
group = chord(tasks)
group(callback)

The second line is legal due to the overload of __call__(). Also, note that throughout this post, I’ve been using the task.s(...) format, which doesn’t actually invoke the task but creates its signature for future use. Essentially, by passing a list of task signatures to chord, we serialize the tasks into the result backend, which is why we encountered the NotImplementedError earlier and needed to specify the backend like this: backend='redis://localhost'.

What to Watch Out For

One important caveat: if even one task in the list fails, the callback registered with chord will never be executed. This is something you should handle separately.

All of the code I’ve described here is available as a working project, which you can find at the following link: GitHub Repository.