Python: introduction to the Celery, and its monitoring configurations

By | 05/20/2025

To put it very simply, Celery is something we can use to perform tasks outside of our main service.

For example, there is a Backend API that has some kind of endpoint to which mobile devices send information that the user has created a new whatever in the application. The task of the Backend is to add the whatever to a database.

This can be done directly in the API instance immediately upon receiving the event on the endpoint, or, if we don’t need to execute whatever in the database right now, we can create a parallel deferred task that will be executed in 1-5-20-60 seconds.

This is exactly what Celery does:

  • The main service code creates a Celery task
  • The Celery client in the code sends this task to an MQ Broker (Message Queue Broker, such as RabbitMQ, Redis or AWS SQS, see the full list on the Broker Overview page)
  • Celery Worker receives a message from the queue
  • Worker runs some function, as does whatever in the database
  • profit!

Actually, in this post, we won’t go into the details of realizing all this happiness.

All I’m interested in is how to work with this, that is, how I can create a new task so that the Worker can execute this task.

Ideally, I would also check that the task was actually completed, but there are problems with the AWS SQS. We will take a closer look at it below.

Getting Celery up and running

We’ll do a quick local setup with Python PiP and Docker, then will take a look at AWS SQS, Kubernetes, and Celery’s monitoring.

Creating a project

Install Celery itself, and Celery’s dependencies for Redis:

$ mkdir celery
$ cd celery/
$ python -m venv .venv
$ . ./.venv/bin/activate
$ pip install celery
$ pip install -U "celery[redis]"

Starting Redis

We’ll use Redis as MQ because it’s easy to run locally and does not require a lot of CPU/Memory resources.

For the results’ backend, we’ll use Redis as well, but we’ll look at that a bit later.

Let’s start the container with Redis:

$ docker run --rm -p 6379:6379 --name redis -e REDIS_ARGS="--bind 0.0.0.0" redis

Connect to it:

$ docker exec -ti redis bash
root@78326cab3d4b:/data#

Check if Redis is wrking:

root@78326cab3d4b:/data# redis-cli ping 
PONG

Starting Celery

See also Broker Settings.

I have no idea why the documentation calls the parameter"broker" and not "broker_url", because “broker” is like a deprecated version, and the documentation “describes the current stable version of Celery (5.4)” (c). Or maybe --broker is a command line parameter, and broker_url is for the configuration?

You can simply do all the code in one file like tasks.py, as described in the Getting Started documentation, but I’ll split it into several separate modules to make it in a more “production way”, especially since it’s already done this way on my project’s Production, so I want to have a more similar setup during testing.

Create a main module for Celery – the celery_app.py file:

from celery import Celery

app = Celery(__name__, 
                 broker_url='redis://localhost:6379/0',
                 include=["celery_tasks"]
             )

Here, the broker_url is the Redis address, and in the include we connect our future tasks. There is also an autodiscover_tasks option, but I haven’t tried it and we don’t use it here.

If the tasks are not imported, then there will be errors of the type:

Received unregistered task of type ‘celery_tasks.test_task’.

Create a module for the Celery Tasks – celery_tasks.py:

from celery_app import app

@app.task
def test_task(arg):
    return "OK: " + arg

Actually, a task is just a function that has to perform a task in the background of the main system.

As in our case, we have an API that receives notifications from customers that a user has created a new record in their mobile app.

Our API through Celery creates a task that goes to an AWS RDS instance, and updates a table with this user, adding some new records.

We’ll write our “API service” – the main code that will call Celery.

Using the delay() method, add the task creation:

#!/usr/bin/env python

from celery_tasks import test_task
    
test_task.delay("Hello, comrade!")

Now we start the actual Celery Worker, which is a separate Python process (in Kubernetes, we have separate Pods for this).

That is, the Celery client is a Celery instance that is created when the Backend API is launched and through which we create new tasks in the broker, and the Celery Worker is a separate process that collects messages and executes tasks.

You can run it directly from the terminal:

$ celery -A celery_app worker --loglevel=INFO
...
- ** ---------- [config]
- ** ---------- .> app:         celery_app:0x7d2bb7830980
- ** ---------- .> transport:   redis://localhost:6379/0
- ** ---------- .> results:     disabled://
...
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery
...

You can also check the keys in Redis – there’s nothing special here yet, but let’s see what’s being created here:

root@61eff8635cb1:/data# redis-cli keys *
1) "_kombu.binding.celery.pidbox"
2) "_kombu.binding.celery"
3) "_kombu.binding.celeryev"

Since we have Redis, these will be keys. But for Celery, these are queues, and the keys are created with the SET type, which means they can have a sequence. Although such nuances are not significant fo us now.

Let’s run our “API” so that it causes the task to be created (don’t forget to activate venv if you do it in a separate terminal, because we need to import the Celery libs):

$ chmod +x my_api_app.py
$ ./my_api_app.py

Check the Celery Worker logs – the task is received, the task is processed:

...
[2025-03-19 12:48:06,285: INFO/MainProcess] Task celery_tasks.test_task[edbdc0aa-673c-490f-a18f-0b7665db2ff7] received
[2025-03-19 12:48:06,287: INFO/ForkPoolWorker-15] Task celery_tasks.test_task[edbdc0aa-673c-490f-a18f-0b7665db2ff7] succeeded in 0.0003573799040168524s: 'OK: Hello, comrade!'

Default Celery Broker keys

Let’s take a brief look at what Celery creates in queues and why, because in SQS we will have problems with some of them, so we need to understand why and how.

As we said above, in Redis it’s KEYS, but with the SET type (in SQS there would be separate queues):

root@a0c65a5e7bfb:/data# redis-cli type _kombu.binding.celeryev 
set

Here:

  • _kombu.binding.celeryev: used by Celery Events to send events about the state of the worker (for example, when a worker starts, completes a task, or quites)
    • used for monitoring via celery events or Flower, we will see them later
  • _kombu.binding.celery: the default main Celery task queue
    • tasks are sent here and then processed by Celery Workers
  • _kombu.binding.celery.pidbox: used for pidbox messaging, i.e. exchange of commands between workers, for example, celery inspect, celery control – and this will not work in SQS either
    • through it, Celery can send commands to workers to check their status, change the logging level, etc.

Why _kombu in the names – because Celery uses the Kombu library under the hood.

Let’s see what’s in the keys.

In the default queue, execute SMEMBERS because it is a SET type:

root@a06f25448034:/data# redis-cli SMEMBERS _kombu.binding.celery 
1) "celery\x06\x16\x06\x16celery"

The \x06 and \x16 are the ACK and SYN that are added by the Kombu.

Since our task has been completed, we have nothing in the _kombu.binding.celery.

And pidbox – information about the existing worker celery@setevoy-wrk-laptop:

root@8d4de6fb1bc6:/data# redis-cli SMEMBERS _kombu.binding.celery.pidbox 
1) "\x06\x16\x06\[email protected]"

You can run redis-cli monitor and see everything that is happening in Redis.

Okay.

Everything seems to be working.

What’s next?

Adding a result_backend

Without a result_backend we can’t check the status of tasks because Celery simply doesn’t store this information anywhere. See Keeping Results.

That is, if we then want to get the state of the task with something like response = result.get() (we will do it later), then without result_backend we will get errors like:

│ File “/usr/local/lib/python3.12/site-packages/celery/backends/base.py”, line 1104, in _is_disabled │
│ raise NotImplementedError(E_NO_BACKEND.strip())

Let’s put the Celery configuration in a dedicated module, celery_config.py, and add the result_backend parameter with Redis:

broker_url='redis://localhost:6379/0'
result_backend='redis://localhost:6379/0'

include=[
    "celery_tasks"
]

Update the celery_app.py code – add configuration import and config_from_object() call:

import celery_config

from celery import Celery

#app = Celery(__name__, 
#                 broker_url='redis://localhost:6379/0',
#                 include=["celery_tasks"]
#             )

app = Celery(__name__)
app.config_from_object("celery_config", force=True)

Restart the Celery Worker, and now in the "results" field instead of Disabled we have the address of our Redis:

$ celery -A celery_app:app worker --loglevel=INFO
...
- ** ---------- [config]
- ** ---------- .> app:         celery_app:0x78f451ae8980
- ** ---------- .> transport:   redis://localhost:6379/0
- ** ---------- .> results:     redis://localhost:6379/0
...

Update the code of the main service – add getting the result of the task via get():

#!/usr/bin/env python

from celery_tasks import test_task

    
result = test_task.delay("Hello, comrade!")

print(result.get())

Let’s start our “API”:

$ ./my_api_app.py 
OK: Hello, comrade!

And in print(result.get()) we have the value returned by the test_task() function – that is, return "OK" + the passed argument.

OK.

Everything looks like it’s working great here.

Now let’s try using AWS SQS.

Using Celery with AWS SQS

Documentation – Using Amazon SQS.

Install the dependencies:

$ pip install "celery[sqs]"

Creating an SQS queue

Leave the default type, Standard:

We don’t need for DLQ now.

Save it, copy the URL – we’ll need it for the Celery config:

Setting up Celery with SQS

Let’s edit our celery_app.py to add AWS ACCESS/SECRET keys and a queue.

But first, let’s take a look at the available options:

  • broker_url: here we change to sqs
  • broker_transport_options:
    • predefined_queues: should be added, because otherwise Celery will search for available SQS by using the ListQueues AWS API call, which is long and can be expensive
      • a default queue can be set with the task_default_queue
        • if task_default_queue is not set, then Celery (probably) will search for a queue at https://sqs.us-east-1.amazonaws.com/492***148/celery
      • the queue can also be passed during the creation of a task – @app.task(queue="my_custom_queue")
  • task_create_missing_queues: if the required queue is not found, Celery will try to create it – which we definitely do not need in SQS

Now the config may look like this – we disabled result_backend for now, because SQS does not support it, see Results:

from kombu.utils.url import safequote

#broker_url='redis://localhost:6379/0'
#result_backend='redis://localhost:6379/0'

aws_access_key = safequote("AKI***B7A")
aws_secret_key = safequote("pAu***2gW")

broker_url = "sqs://{aws_access_key}:{aws_secret_key}@".format(
    aws_access_key=aws_access_key, aws_secret_key=aws_secret_key,
)
broker_transport_options = {
    "region": "us-east-1",
    "predefined_queues": {
        "arseny_test": {
            "url": "https://sqs.us-east-1.amazonaws.com/492***148/arseny-celery-test",
        }
    }
}

task_create_missing_queues = False
task_default_queue = "arseny_test"

include=[
    "celery_tasks"
]

Restart the worker:

$ celery -A celery_app worker --loglevel=INFO
...
- ** ---------- [config]
- ** ---------- .> app:         celery_app:0x7d9631ef8980
- ** ---------- .> transport:   sqs://AKI***B7A:**@localhost//
- ** ---------- .> results:     disabled://
- *** --- * --- .> concurrency: 16 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** ----- 
 -------------- [queues]
                .> arseny_test      exchange=arseny_test(direct) key=arseny_test

[tasks]
  . celery_tasks.test_task
...

Interesting that transport @localhost… But okay.

Check the Monitoring tab in SQS:

Messages have gone through, OK.

What’s next?

Next, we will try to add some monitoring.

Monitoring Celery in AWS SQS

Actually, how I came to this post because I have a Kubernetes Pod for which I want to have a simple Liveness Probe.

In 10 minutes I googled a couple of Celery methods, updated my Deployment manifest, and was about to merge a PR, when it turned out that…

So, here’s the problem: SQS doesn’t support a few things we need:

  • SQS doesn’t yet support worker remote control commands.
  • SQS does not yet support events, and so cannot be used with celery events, celerymon, or the Django Admin monitor.

That is, if we try to execute the celery inspect ping command, we will get errors. At least because this requires a pidbox queue, which is not supported in SQS.

Let’s take a look at these errors first.

get() and the “No result backend is configured” error

We have already seen the error with get(): if you try to do this without result_backend:

result = test_task.delay("Hello, comrade!") 
print(result.get())

Then you’ll get the NotImplementedError(E_NO_BACKEND.strip()):

...
  File "/home/setevoy/Scripts/Python/celery/.venv/lib/python3.13/site-packages/celery/backends/base.py", line 1104, in _is_disabled
    raise NotImplementedError(E_NO_BACKEND.strip())
NotImplementedError: No result backend is configured.

Okay…

But surely such a serious library as Celery has its own mechanisms for checking workers?

Yes, it does.

But…

Celery control inspect

In general, celery.app.control is a cool thing, and if you have RabbitMQ or Redis, you can get a lot of useful information from it.

However, we are using AWS SQS, so control won’t work.

Let’s try to check the worker’s tasks:

$ celery -A celery_app inspect registered
...
  File "/home/setevoy/Scripts/Python/celery/.venv/lib/python3.13/site-packages/kombu/transport/SQS.py", line 381, in _resolve_queue_url
    raise UndefinedQueueException((
    ...<2 lines>...
    ).format(sqs_qname))
kombu.transport.SQS.UndefinedQueueException: Queue with name '0f41e5d5-49e1-38bc-bc9b-c1efbc4f9a3e-reply-celery-pidbox' must be defined in 'predefined_queues'.

Or we can try to ping the worker with the app.control.ping(), but we will get the same error with “pidbox must be defined in ‘predefined_queues’“:

@app.task
def celery_health_check():
    response = app.control.ping(timeout=2)
    return response

And the same for celery_app.control.inspect().

There is a very old GitHub issue – celery ping doesn’t work when using SQS.

And the options for how to work around this problem in Kubernetes Liveness probes are… Just by disabling the checks altogether. For example, here>>>.

Possible solution: separate result_backend

So, what I’m trying to do now is just add a result_backend with a Redis container to the Kubernetes Pod with Celery Worker. Redis requires a penny of resources, so it’s not going to be any kind of overhead.

That is, we could have:

  • an SQS queue for messages
  • a Redis instance in the Pod to store the processing statuses of these messages

Then, using get(), we can get the result of a task, and make sure that the Worker is working.

Let’s try.

Set the result_backend from Redis again:

...
result_backend='redis://localhost:6379/0' 
...

Add a new task to the celery_tasks.py:

@app.task 
def celery_health_check():
    return "OK"

Add a “monitoring” to our “API” in the my_api_app.py file:

#!/usr/bin/env python

import sys

from celery_tasks import test_task, celery_health_check_task

def celery_health_check():
    try:
        result = celery_health_check_task.apply_async()
        response = result.get(timeout=5)

        print ("Result:", result)
        print ("Result state:", result.state)
        print ("Respose:", response)

        if response != "OK":
            raise RuntimeError("Celery health check task returned unexpected response!")

        print("Celery is running")

    except Exception as e:
        print("Celery health check failed")
        print({"status": "error", "message": str(e)})
        sys.exit(1)

celery_health_check()

delay() is the simplest method without additional parameters, apply_async() can use different options. It doesn’t really matter to us now, but we’ve already used delay(), so let’s use apply_async() now.

Restart Celery Worker, run our main script:

$ ./my_api_app.py 
Result: 2d32805b-3c55-412d-8fc4-4b893f222202
Result state: SUCCESS
Respose: OK
Celery is running

Good.

What else can we do?

Celery and FastAPI

Besides calling Celery via imports, we can create a FastAPI service, and do everything over TCP.

Install fastapi and uvicorn:

$ pip install fastapi uvicorn

Create a new file celery_api.py, describe the FastAPI app:

from fastapi import FastAPI, HTTPException

from celery_tasks import celery_health_check_task

app = FastAPI()


@app.get("/celery-healthz")
def celery_healthcheck():
    try:
        result = celery_health_check_task.apply_async()
        response = result.get(timeout=5)

        if response != "OK":
            raise RuntimeError("Celery health check task returned unexpected response!")

        return {"status": "success", "message": "Celery is running"}

    except Exception as e:
        raise HTTPException(status_code=500, detail={"status": "error", "message": str(e)})

Run with uvicorn:

$ uvicorn celery_api:app --host 0.0.0.0 --port 8000 --reload

And check:

$ curl localhost:8000/celery-healthz 
{"status": "success", "message": "Celery is running"}

Looks good…

Flower for Celery monitoring

The Flower is a popular solution for Celery monitoring, but it will not work with SQS for the same reasons:

  • Flower uses celery events (celeryev), which does not work in SQS
  • Flower’s list of worker events uses Celery’s inspect(), which, as we saw above, will not work with SQS either

If you have a dedicated result_backend as we did above, it will work partially – at least, you will be able to see the list of tasks.

Therefore, to see the capabilities of Flower, let’s return Redis to celery_config.py:

broker_url='redis://localhost:6379/0' 
result_backend='redis://localhost:6379/0' 
...

Let’s install Flower:

$ pip install flower

Run Flower with our Celery instance and its configuration:

$ celery -A celery_app flower
[I 250320 13:49:48 command:168] Visit me at http://0.0.0.0:5555
[I 250320 13:49:48 command:176] Broker: redis://localhost:6379/0
[I 250320 13:49:48 command:177] Registered tasks: 
    ['celery.accumulate',
     'celery.backend_cleanup',
     'celery.chain',
     'celery.chord',
     'celery.chord_unlock',
     'celery.chunks',
     'celery.group',
     'celery.map',
     'celery.starmap',
     'celery_tasks.celery_health_check_task',
     'celery_tasks.test_task']
[I 250320 13:49:48 mixins:228] Connected to redis://localhost:6379/0

And open the http://localhost:5555 URL in a browser:

Now, we can use the Flower API for monitoring:

$ curl -s localhost:5555/api/workers | jq
{
  "celery@setevoy-wrk-laptop": {
    "scheduled": [],
    "timestamp": 1742471544.7675385,
    "active": [],
    "reserved": [],
    ...

API documentation – API Reference.

Actually, that’s all.

We’ve run Celery to see how to create tasks, and looked at how to monitor their execution.

All that was left was to implement it in Production.