Python: знайомство з Celery та його моніторинг

Автор |  15/04/2025

Якщо дуже просто, то Celery – це щось, за допомогою чого ми можемо виконувати задачі поза нашим основним сервісом.

Наприклад, є Backend API, який має якийсь ендпоінт, на який мобілочки відправляють інформацію про те, що юзер створив новий whatever в застосунку. Задача бекенда – додати whatever в базі даних.

Можна це виконати прямо в інстансі самого API відразу при отриманні івента на ендпоінт, а можна, якщо нам не горить виконання whatever в базі даних, створити паралельну відкладену задачу, яка буде виконана через 1-5-20-60 секунд.

Власне, це і робить Celery:

  • основний код сервісу створює task
  • Celery-клієнт в коді цю задачу відправляє в MQ Broker (Message Queue Broker, як-от RabbitMQ, Redis або мать його єті AWS SQS, див повний список на Broker Overview)
  • Celery Worker отримує меседж з черги
  • Worker запускає якусь функцію, як робить whatever в базі даних
  • profit!

Власне в цьому пості ми не будемо заглиблюватись в деталі реалізації всього цього щастя.

Все, що мені цікаво – це як з цим працювати, тобто – як я можу створити новий task, аби Worker цю задачу виконав.

В ідеалі – ще й перевірити, що задача дійсно була виконана – але тут є проблеми з AWS SQS. Далі подивимось на це детальніше.

Запуск Celery

Зробимо все швиденько локально з Python PiP та Docker, потім ще глянемо на AWS SQS, Kubernetes та моніторинг.

Створення проекту

Встановлюємо сам Celery та залежності для роботи з Redis:

$ mkdir celery
$ cd celery/
$ python -m venv .venv
$ . ./.venv/bin/activate
$ pip install celery
$ pip install -U "celery[redis]"

Запуск Redis

В ролі MQ буде Redis, бо його легко запустити локально і він легенький в плані ресурсів.

Для results backend – теж Redis, але це розглянемо трохи пізніше.

Запускаємо контейнер з Redis:

$ docker run --rm -p 6379:6379 --name redis -e REDIS_ARGS="--bind 0.0.0.0" redis

Заходимо в нього:

$ docker exec -ti redis bash
root@78326cab3d4b:/data#

Перевіряємо:

root@78326cab3d4b:/data# redis-cli ping
PONG

Запуск Celery

Параметри для брокера – див. Broker Settings.

IDK чому в документації параметр називається “broker” а не “broker_url“, бо “broker” наче депрікейтед, а документація “describes the current stable version of Celery (5.4)” (с). Чи, може, --broker – це параметр для командної строки, а broker_url – для конфігу?

Весь код можна просто робити в одному файлі типу tasks.py, як це описано в Getting Started документації, але я відразу розіб’ю на кілька окремих модулів, аби було більше “production way”, тим більше воно вже так зроблено у нас в Production, тому хочеться і під час тестування мати більш схожий сетап.

Створюємо основний модуль для Селері – celery_app.py:

from celery import Celery

app = Celery(__name__, 
                 broker_url='redis://localhost:6379/0',
                 include=["celery_tasks"]
             )

Тут, власне, broker_url – адреса Redis, а в include ми підключаємо наші майбутні таски. Також є опція autodiscover_tasks, але не пробував і у нас не використовується.

Якщо таски не заімпорчені – то будуть помилки типу:

Received unregistered task of type ‘celery_tasks.test_task’.

Створюємо модуль для Celery Tasks – celery_tasks.py:

from celery_app import app

@app.task
def test_task(arg):
    return "OK: " + arg

Власне таска – це просто якась функція, яка має в бекграунді основної системи виконати якусь задачу.

Як в нашому випадку – у нас є API, на який приходяться повідомлення від клієнтів, що юзер створив новий запис у себе в мобільній апці.

Наш API через Celery створює таску, яка їде в RDS, і оновлює табличку з цим юзером, додаючи якісь нові records.

Пишемо наш “API сервіс” – основний код, який буде викликати Celery.

Через метод delay() додаємо створення задачі:

#!/usr/bin/env python

from celery_tasks import test_task
    
test_task.delay("Hello, comrade!")

Запускаємо вже власне Celery Worker – це окремий процес Python (в Kubernetes у нас для цього окремі Pods).

Тобто, Celery-клієнт – це інстанс Celery, який створюється під час запуску API і через який ми створюємо нові задачі в брокері, а Worker – це окремий процес, який займається збором повідомлень і виконанням задач.

Запустити можна просто з термінала:

$ celery -A celery_app worker --loglevel=INFO
...
- ** ---------- [config]
- ** ---------- .> app:         celery_app:0x7d2bb7830980
- ** ---------- .> transport:   redis://localhost:6379/0
- ** ---------- .> results:     disabled://
...
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery
...

Вже можна глянути ключі в Redis – тут поки нічого особливого, але далі подивимось що взагалі тут створюється:

root@61eff8635cb1:/data# redis-cli keys *
1) "_kombu.binding.celery.pidbox"
2) "_kombu.binding.celery"
3) "_kombu.binding.celeryev"

Так як у нас Redis, то це будуть саме ключі. Але для Celery – це черги, і ключі створюються з типом SET, тобто можуть мати послідовність. Хоча нам зараз такі нюанси не дуже важливі.

Запускаємо наш “API” аби він викликав створення задачі (не забуваємо про activate venv, якщо робимо в окремому терміналі, бо треба імпортити Celery libs):

$ chmod +x my_api_app.py
$ ./my_api_app.py 

Перевіряємо логи Celery Worker – таска отримана, таска оброблена:

...
[2025-03-19 12:48:06,285: INFO/MainProcess] Task celery_tasks.test_task[edbdc0aa-673c-490f-a18f-0b7665db2ff7] received
[2025-03-19 12:48:06,287: INFO/ForkPoolWorker-15] Task celery_tasks.test_task[edbdc0aa-673c-490f-a18f-0b7665db2ff7] succeeded in 0.0003573799040168524s: 'OK: Hello, comrade!'

Default Celery Broker keys

Швиденько глянемо розберемо що Celery створює в чергах і для чого, бо в SQS будемо мати проблеми з деякими з них, тому треба розуміти чому і від чого.

Як вже говорили вище, в Redis це KEYS, але з типом SET (в SQS були б окремі черги):

root@a0c65a5e7bfb:/data# redis-cli type _kombu.binding.celeryev
set

Тут:

  • _kombu.binding.celeryev: використовується Celery Events для надсилання подій про стан воркерів (наприклад, коли воркер запускається, виконує задачу або завершує роботу)
    • використовується для моніторингу через celery events або Flower, ми їх далі подивимось
  • _kombu.binding.celery: головна черга завдань Celery за замовчуванням
    • сюди надсилаються задачі, які потім обробляються Celery Workers
  • _kombu.binding.celery.pidbox: використовується для pidbox messaging, тобто обміну командами між воркерами, наприклад, celery inspect, celery control – і в SQS це теж працювати не буде
    • через нього Celery може надсилати команди воркерам, щоб перевірити їхній стан, змінити рівень логування тощо

Чому _kombu в іменах – бо під капотом Celery використовує бібліотеку Kombu.

Глянемо, що в ключах.

В дефолтній черзі – виконуємо SMEMBERS, бо це тип SET:

root@a06f25448034:/data# redis-cli SMEMBERS _kombu.binding.celery
1) "celery\x06\x16\x06\x16celery"

\x06 та \x16 – це ACK та SYN, які додаються Kombo.

Так як наша задача була виконана – то у нас в _kombu.binding.celery пусто.

Ну і pidbox – інформація про наявний воркер celery@setevoy-wrk-laptop:

root@8d4de6fb1bc6:/data# redis-cli SMEMBERS _kombu.binding.celery.pidbox
1) "\x06\x16\x06\[email protected]"

Можна запустити redis-cli monitor, і побачити все, що відбувається в Redis.

Окей.

Все наче працює.

Що далі?

Додавання result_backend

Без наявного result_backend ми не можемо перевіряти статус виконання тасок, бо Celery просто ніде не зберігає цю інформацію. Див. Keeping Results.

Тобто, якщо ми потім захочемо отримати стан задачі з response = result.get() (далі це зробимо) – то без result_backend отримаємо помилки типу:

│ File “/usr/local/lib/python3.12/site-packages/celery/backends/base.py”, line 1104, in _is_disabled │
│ raise NotImplementedError(E_NO_BACKEND.strip())

Давайте конфіг Celery винесемо теж окремим модулем, celery_config.py, і додамо параметр result_backend з Redis:

broker_url='redis://localhost:6379/0'
result_backend='redis://localhost:6379/0'

include=[
    "celery_tasks"
]

Оновлюємо код celery_app.py – додаємо імпорт конфігу і виклик config_from_object():

import celery_config

from celery import Celery

#app = Celery(__name__, 
#                 broker_url='redis://localhost:6379/0',
#                 include=["celery_tasks"]
#             )

app = Celery(__name__)
app.config_from_object("celery_config", force=True)

Перезапускаємо Celery Worker, і тепер в “results” замість Disabled маємо адресу нашого Redis:

$ celery -A celery_app:app worker --loglevel=INFO
...
- ** ---------- [config]
- ** ---------- .> app:         celery_app:0x78f451ae8980
- ** ---------- .> transport:   redis://localhost:6379/0
- ** ---------- .> results:     redis://localhost:6379/0
...

Оновлюємо код основного сервісу – додамо отримання результату виконання задачі через get():

#!/usr/bin/env python

from celery_tasks import test_task

    
result = test_task.delay("Hello, comrade!")

print(result.get())

Запускаємо наш “API”:

$ ./my_api_app.py 
OK: Hello, comrade!

І в print(result.get()) маємо значення, яке повертає функція test_task() – тобто return "OK" + переданий аргумент.

Окей.

Виглядає, наче тут все класно працює.

А тепер давайте спробуємо використати AWS SQS.

Використання Celery з AWS SQS

Документація – Using Amazon SQS.

Встановлюємо залежності:

$ pip install "celery[sqs]"

Створення SQS

Залишаємо дефолтний тип, Standart:

DLQ нам зараз не потрібна.

Зберігаємо, копіюємо URL – він нам буде потрібний для Celery:

Налаштування Celery з SQS

Редагуємо наш celery_app.py – додаємо AWS ACCESS/SECRET ключі і чергу.

Але спочатку давайте глянемо на доступні опції:

  • broker_url: тут міняємо на sqs
  • broker_transport_options:
    • predefined_queues: варто додати, бо інакше Celery буде шукати доступні SQS з ListQueues, що довго і може бути дорого
      • дефолтна черга може бути задана з task_default_queue
        • якщо task_default_queue не задана, то Celery (мабуть) буде шукати чергу за URL https://sqs.us-east-1.amazonaws.com/492***148/celery
      • чергу також можна передати під час створення таски – @app.task(queue="my_custom_queue")
  • task_create_missing_queues: якщо потрібна черга не знайдена, то Celery спробує її створити – що в SQS нам точно не треба

Тепер конфіг може виглядати так – result_backend поки відключаємо, бо SQS його не підтримує, див. Results:

from kombu.utils.url import safequote

#broker_url='redis://localhost:6379/0'
#result_backend='redis://localhost:6379/0'

aws_access_key = safequote("AKI***B7A")
aws_secret_key = safequote("pAu***2gW")

broker_url = "sqs://{aws_access_key}:{aws_secret_key}@".format(
    aws_access_key=aws_access_key, aws_secret_key=aws_secret_key,
)
broker_transport_options = {
    "region": "us-east-1",
    "predefined_queues": {
        "arseny_test": {
            "url": "https://sqs.us-east-1.amazonaws.com/492***148/arseny-celery-test",
        }
    }
}

task_create_missing_queues = False
task_default_queue = "arseny_test"

include=[
    "celery_tasks"
]

Перезапускаємо воркер:

$ celery -A celery_app worker --loglevel=INFO
...
- ** ---------- [config]
- ** ---------- .> app:         celery_app:0x7d9631ef8980
- ** ---------- .> transport:   sqs://AKI***B7A:**@localhost//
- ** ---------- .> results:     disabled://
- *** --- * --- .> concurrency: 16 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** ----- 
 -------------- [queues]
                .> arseny_test      exchange=arseny_test(direct) key=arseny_test

[tasks]
  . celery_tasks.test_task
...

Цікаво, що transport @localhost… Але ок.

Перевіряємо вкладку Моніторинг в SQS:

Меседжі пройшли, ОК.

Що далі?

А далі ми спробуємо додати трохи води моніторингу.

Моніторинг Celery в AWS SQS

Власне, як я дійшов до жизні такой до цього посту: є Kubernetes Pod, для якого хочеться мати простий Liveness Probe.

Я за 10 хвилин нагуглив пару методів Celery, оновив Deployment, і вже хотів мержити PR, як виявилось, що…

Отже, в чому зараз проблема: SQS не підтримує кілька корисних нам речей:

  • SQS doesn’t yet support worker remote control commands.
  • SQS doesn’t yet support events, and so cannot be used with celery eventscelerymon, or the Django Admin monitor.

Тобто, якщо ми спробуємо виконати celery inspect ping – то отримаємо помилки. Як мінімум тому, бо для цього потрібна черга pidbox, яка не підтримується в SQS.

Давайте спочатку на ці помилки глянемо.

get() та “No result backend is configured”

Помилку з get() ми вже бачили – якщо спробувати зробити таке без result_backend:

result = test_task.delay("Hello, comrade!")
print(result.get())

То отримаємо NotImplementedError(E_NO_BACKEND.strip()):

...
  File "/home/setevoy/Scripts/Python/celery/.venv/lib/python3.13/site-packages/celery/backends/base.py", line 1104, in _is_disabled
    raise NotImplementedError(E_NO_BACKEND.strip())
NotImplementedError: No result backend is configured.

Окей…

Але ж мабуть така серйозна бібліотека як Celery має власні механізми для перевірки воркерів?

Так – має.

Але…

Celery control inspect

Взагалі, celery.app.control класна штука, і якщо у вас RabbitMQ чи Redis – то з нею можна отримати багато корисної інформації.

Але у нас SQS, тому control працювати не буде.

Пробуємо перевірити таски для воркерів:

$ celery -A celery_app inspect registered
...
  File "/home/setevoy/Scripts/Python/celery/.venv/lib/python3.13/site-packages/kombu/transport/SQS.py", line 381, in _resolve_queue_url
    raise UndefinedQueueException((
    ...<2 lines>...
    ).format(sqs_qname))
kombu.transport.SQS.UndefinedQueueException: Queue with name '0f41e5d5-49e1-38bc-bc9b-c1efbc4f9a3e-reply-celery-pidbox' must be defined in 'predefined_queues'.

Або можемо спробувати пінганути воркери з app.control.ping() – отримаємо ту ж саму помилку з “pidbox must be defined in ‘predefined_queues’“:

@app.task
def celery_health_check():
    response = app.control.ping(timeout=2)
    return response

Ну і теж саме для celery_app.control.inspect().

Є дуже стара GitHub issue – celery ping doesn’t work when using SQS.

І варіанти того, як цю проблему обійти в Kubernetes Liveness probes – просто… Відключити перевірки взагалі. Наприклад – тут>>>.

Можливе рішення: окремий result_backend

Отже, що я зараз намагаюсь зробити – це просто додати result_backend з другим контейнером Redis до Kubernetes Pod з Celery. Ресурсів Redis потребує копійки, тому в принципі якимось оверхедом це не буде.

Тобто, ми маємо:

  • SQS для меседжів
  • Redis для зберігання статусів обробки цих меседжів

Тоді з get() можемо отримати результат виконання тестової таски, і впевнитись, що воркери працюють.

Задаємо result_backend з Redis знов:

...
result_backend='redis://localhost:6379/0'
...

Додаємо нову таску в celery_tasks.py:

@app.task
def celery_health_check():
    return "OK"

Додаємо “моніторинг” в наш “API”, my_api_app.py:

#!/usr/bin/env python

import sys

from celery_tasks import test_task, celery_health_check_task

def celery_health_check():
    try:
        result = celery_health_check_task.apply_async()
        response = result.get(timeout=5)

        print ("Result:", result)
        print ("Result state:", result.state)
        print ("Respose:", response)

        if response != "OK":
            raise RuntimeError("Celery health check task returned unexpected response!")

        print("Celery is running")

    except Exception as e:
        print("Celery health check failed")
        print({"status": "error", "message": str(e)})
        sys.exit(1)

celery_health_check()

delay() – самий простий метод без додаткових параметрів, apply_async() вміє в різні опції. Нам зараз в принципі не важливо, але delay() ми вже використовували, тут давайте з apply_async().

Перезапускаємо Celery Worker, запускаємо наш головний скрипт:

$ ./my_api_app.py 
Result: 2d32805b-3c55-412d-8fc4-4b893f222202
Result state: SUCCESS
Respose: OK
Celery is running

Гуд.

Що ми можемо ще?

Celery та FastAPI

Окрім виклику Celery через імпорти, ми можемо створити FastAPI сервіс, і все робити через TCP.

Встановлюємо fastapi та uvicorn:

$ pip install fastapi uvicorn

Створюємо файл celery_api.py, описуємо FastAPI app:

from fastapi import FastAPI, HTTPException

from celery_tasks import celery_health_check_task

app = FastAPI()


@app.get("/celery-healthz")
def celery_healthcheck():
    try:
        result = celery_health_check_task.apply_async()
        response = result.get(timeout=5)

        if response != "OK":
            raise RuntimeError("Celery health check task returned unexpected response!")

        return {"status": "success", "message": "Celery is running"}

    except Exception as e:
        raise HTTPException(status_code=500, detail={"status": "error", "message": str(e)})

Запускаємо з uvicorn:

$ uvicorn celery_api:app --host 0.0.0.0 --port 8000 --reload

І перевіряємо:

$ curl localhost:8000/celery-healthz
{"status":"success","message":"Celery is running"}

Looks good…

Flower для моніторингу Celery

Flower – популярне рішення для моніторингу Celery, але не буде працювати з SQS з тих самих причин:

  • Flower використовує celery events (celeryev), в SQS не працює
  • список воркерів у Flower використовує Celery inspect(), що, як ми бачили вище, теж не буде працювати з SQS

Якщо мати окремий result_backend, як ми вище робили – то буде працювати частково – можна буде побачити список тасок.

Тому, аби побачити можливості Flower – давайте повернемо Redis в celery_config.py:

broker_url='redis://localhost:6379/0'
result_backend='redis://localhost:6379/0'
...

Встановлюємо Flower:

$ pip install flower

Запускаємо Flower з нашим інстансом Celery та його конфігом:

$ celery -A celery_app flower
[I 250320 13:49:48 command:168] Visit me at http://0.0.0.0:5555
[I 250320 13:49:48 command:176] Broker: redis://localhost:6379/0
[I 250320 13:49:48 command:177] Registered tasks: 
    ['celery.accumulate',
     'celery.backend_cleanup',
     'celery.chain',
     'celery.chord',
     'celery.chord_unlock',
     'celery.chunks',
     'celery.group',
     'celery.map',
     'celery.starmap',
     'celery_tasks.celery_health_check_task',
     'celery_tasks.test_task']
[I 250320 13:49:48 mixins:228] Connected to redis://localhost:6379/0

І відкриваємо в браузері http://localhost:5555:

Можемо використати Flower API для моніторингу:

$ curl -s localhost:5555/api/workers | jq
{
  "celery@setevoy-wrk-laptop": {
    "scheduled": [],
    "timestamp": 1742471544.7675385,
    "active": [],
    "reserved": [],
    ...

Документація по API – API Reference.

Власне, це все.

Celery запустили, як створити таски – подивились, як можна моніторити їх виконання – розібрались.

Залишилось це реалізувати в Production.