Якщо дуже просто, то Celery – це щось, за допомогою чого ми можемо виконувати задачі поза нашим основним сервісом.
Наприклад, є Backend API, який має якийсь ендпоінт, на який мобілочки відправляють інформацію про те, що юзер створив новий whatever в застосунку. Задача бекенда – додати whatever в базі даних.
Можна це виконати прямо в інстансі самого API відразу при отриманні івента на ендпоінт, а можна, якщо нам не горить виконання whatever в базі даних, створити паралельну відкладену задачу, яка буде виконана через 1-5-20-60 секунд.
Власне, це і робить Celery:
- основний код сервісу створює task
- Celery-клієнт в коді цю задачу відправляє в MQ Broker (Message Queue Broker, як-от RabbitMQ, Redis або мать його єті AWS SQS, див повний список на Broker Overview)
- Celery Worker отримує меседж з черги
- Worker запускає якусь функцію, як робить whatever в базі даних
- …
- profit!
Власне в цьому пості ми не будемо заглиблюватись в деталі реалізації всього цього щастя.
Все, що мені цікаво – це як з цим працювати, тобто – як я можу створити новий task, аби Worker цю задачу виконав.
В ідеалі – ще й перевірити, що задача дійсно була виконана – але тут є проблеми з AWS SQS. Далі подивимось на це детальніше.
Зміст
Запуск Celery
Зробимо все швиденько локально з Python PiP та Docker, потім ще глянемо на AWS SQS, Kubernetes та моніторинг.
Створення проекту
Встановлюємо сам Celery та залежності для роботи з Redis:
$ mkdir celery $ cd celery/ $ python -m venv .venv $ . ./.venv/bin/activate $ pip install celery $ pip install -U "celery[redis]"
Запуск Redis
В ролі MQ буде Redis, бо його легко запустити локально і він легенький в плані ресурсів.
Для results backend – теж Redis, але це розглянемо трохи пізніше.
Запускаємо контейнер з Redis:
$ docker run --rm -p 6379:6379 --name redis -e REDIS_ARGS="--bind 0.0.0.0" redis
Заходимо в нього:
$ docker exec -ti redis bash root@78326cab3d4b:/data#
Перевіряємо:
root@78326cab3d4b:/data# redis-cli ping PONG
Запуск Celery
Параметри для брокера – див. Broker Settings.
IDK чому в документації параметр називається “broker” а не “broker_url“, бо “broker” наче депрікейтед, а документація “describes the current stable version of Celery (5.4)” (с). Чи, може, --broker – це параметр для командної строки, а broker_url – для конфігу?
Весь код можна просто робити в одному файлі типу tasks.py, як це описано в Getting Started документації, але я відразу розіб’ю на кілька окремих модулів, аби було більше “production way”, тим більше воно вже так зроблено у нас в Production, тому хочеться і під час тестування мати більш схожий сетап.
Створюємо основний модуль для Селері – celery_app.py:
from celery import Celery
app = Celery(__name__,
broker_url='redis://localhost:6379/0',
include=["celery_tasks"]
)
Тут, власне, broker_url – адреса Redis, а в include ми підключаємо наші майбутні таски. Також є опція autodiscover_tasks, але не пробував і у нас не використовується.
Якщо таски не заімпорчені – то будуть помилки типу:
Received unregistered task of type ‘celery_tasks.test_task’.
Створюємо модуль для Celery Tasks – celery_tasks.py:
from celery_app import app
@app.task
def test_task(arg):
return "OK: " + arg
Власне таска – це просто якась функція, яка має в бекграунді основної системи виконати якусь задачу.
Як в нашому випадку – у нас є API, на який приходяться повідомлення від клієнтів, що юзер створив новий запис у себе в мобільній апці.
Наш API через Celery створює таску, яка їде в RDS, і оновлює табличку з цим юзером, додаючи якісь нові records.
Пишемо наш “API сервіс” – основний код, який буде викликати Celery.
Через метод delay() додаємо створення задачі:
#!/usr/bin/env python
from celery_tasks import test_task
test_task.delay("Hello, comrade!")
Запускаємо вже власне Celery Worker – це окремий процес Python (в Kubernetes у нас для цього окремі Pods).
Тобто, Celery-клієнт – це інстанс Celery, який створюється під час запуску API і через який ми створюємо нові задачі в брокері, а Worker – це окремий процес, який займається збором повідомлень і виконанням задач.
Запустити можна просто з термінала:
$ celery -A celery_app worker --loglevel=INFO
...
- ** ---------- [config]
- ** ---------- .> app: celery_app:0x7d2bb7830980
- ** ---------- .> transport: redis://localhost:6379/0
- ** ---------- .> results: disabled://
...
-------------- [queues]
.> celery exchange=celery(direct) key=celery
...
Вже можна глянути ключі в Redis – тут поки нічого особливого, але далі подивимось що взагалі тут створюється:
root@61eff8635cb1:/data# redis-cli keys * 1) "_kombu.binding.celery.pidbox" 2) "_kombu.binding.celery" 3) "_kombu.binding.celeryev"
Так як у нас Redis, то це будуть саме ключі. Але для Celery – це черги, і ключі створюються з типом SET, тобто можуть мати послідовність. Хоча нам зараз такі нюанси не дуже важливі.
Запускаємо наш “API” аби він викликав створення задачі (не забуваємо про activate venv, якщо робимо в окремому терміналі, бо треба імпортити Celery libs):
$ chmod +x my_api_app.py $ ./my_api_app.py
Перевіряємо логи Celery Worker – таска отримана, таска оброблена:
... [2025-03-19 12:48:06,285: INFO/MainProcess] Task celery_tasks.test_task[edbdc0aa-673c-490f-a18f-0b7665db2ff7] received [2025-03-19 12:48:06,287: INFO/ForkPoolWorker-15] Task celery_tasks.test_task[edbdc0aa-673c-490f-a18f-0b7665db2ff7] succeeded in 0.0003573799040168524s: 'OK: Hello, comrade!'
Default Celery Broker keys
Швиденько глянемо розберемо що Celery створює в чергах і для чого, бо в SQS будемо мати проблеми з деякими з них, тому треба розуміти чому і від чого.
Як вже говорили вище, в Redis це KEYS, але з типом SET (в SQS були б окремі черги):
root@a0c65a5e7bfb:/data# redis-cli type _kombu.binding.celeryev set
Тут:
_kombu.binding.celeryev: використовується Celery Events для надсилання подій про стан воркерів (наприклад, коли воркер запускається, виконує задачу або завершує роботу)- використовується для моніторингу через celery events або Flower, ми їх далі подивимось
_kombu.binding.celery: головна черга завдань Celery за замовчуванням- сюди надсилаються задачі, які потім обробляються Celery Workers
_kombu.binding.celery.pidbox: використовується для pidbox messaging, тобто обміну командами між воркерами, наприклад,celery inspect,celery control– і в SQS це теж працювати не буде- через нього Celery може надсилати команди воркерам, щоб перевірити їхній стан, змінити рівень логування тощо
Чому _kombu в іменах – бо під капотом Celery використовує бібліотеку Kombu.
Глянемо, що в ключах.
В дефолтній черзі – виконуємо SMEMBERS, бо це тип SET:
root@a06f25448034:/data# redis-cli SMEMBERS _kombu.binding.celery 1) "celery\x06\x16\x06\x16celery"
\x06 та \x16 – це ACK та SYN, які додаються Kombo.
Так як наша задача була виконана – то у нас в _kombu.binding.celery пусто.
Ну і pidbox – інформація про наявний воркер celery@setevoy-wrk-laptop:
root@8d4de6fb1bc6:/data# redis-cli SMEMBERS _kombu.binding.celery.pidbox 1) "\x06\x16\x06\[email protected]"
Можна запустити redis-cli monitor, і побачити все, що відбувається в Redis.
Окей.
Все наче працює.
Що далі?
Додавання result_backend
Без наявного result_backend ми не можемо перевіряти статус виконання тасок, бо Celery просто ніде не зберігає цю інформацію. Див. Keeping Results.
Тобто, якщо ми потім захочемо отримати стан задачі з response = result.get() (далі це зробимо) – то без result_backend отримаємо помилки типу:
│ File “/usr/local/lib/python3.12/site-packages/celery/backends/base.py”, line 1104, in _is_disabled │
│ raise NotImplementedError(E_NO_BACKEND.strip())
Давайте конфіг Celery винесемо теж окремим модулем, celery_config.py, і додамо параметр result_backend з Redis:
broker_url='redis://localhost:6379/0'
result_backend='redis://localhost:6379/0'
include=[
"celery_tasks"
]
Оновлюємо код celery_app.py – додаємо імпорт конфігу і виклик config_from_object():
import celery_config
from celery import Celery
#app = Celery(__name__,
# broker_url='redis://localhost:6379/0',
# include=["celery_tasks"]
# )
app = Celery(__name__)
app.config_from_object("celery_config", force=True)
Перезапускаємо Celery Worker, і тепер в “results” замість Disabled маємо адресу нашого Redis:
$ celery -A celery_app:app worker --loglevel=INFO ... - ** ---------- [config] - ** ---------- .> app: celery_app:0x78f451ae8980 - ** ---------- .> transport: redis://localhost:6379/0 - ** ---------- .> results: redis://localhost:6379/0 ...
Оновлюємо код основного сервісу – додамо отримання результату виконання задачі через get():
#!/usr/bin/env python
from celery_tasks import test_task
result = test_task.delay("Hello, comrade!")
print(result.get())
Запускаємо наш “API”:
$ ./my_api_app.py OK: Hello, comrade!
І в print(result.get()) маємо значення, яке повертає функція test_task() – тобто return "OK" + переданий аргумент.
Окей.
Виглядає, наче тут все класно працює.
А тепер давайте спробуємо використати AWS SQS.
Використання Celery з AWS SQS
Документація – Using Amazon SQS.
Встановлюємо залежності:
$ pip install "celery[sqs]"
Створення SQS
Залишаємо дефолтний тип, Standart:
Зберігаємо, копіюємо URL – він нам буде потрібний для Celery:
Налаштування Celery з SQS
Редагуємо наш celery_app.py – додаємо AWS ACCESS/SECRET ключі і чергу.
Але спочатку давайте глянемо на доступні опції:
broker_url: тут міняємо наsqsbroker_transport_options:predefined_queues: варто додати, бо інакше Celery буде шукати доступні SQS зListQueues, що довго і може бути дорого- дефолтна черга може бути задана з
task_default_queue- якщо
task_default_queueне задана, то Celery (мабуть) буде шукати чергу за URLhttps://sqs.us-east-1.amazonaws.com/492***148/celery
- якщо
- чергу також можна передати під час створення таски –
@app.task(queue="my_custom_queue")
- дефолтна черга може бути задана з
task_create_missing_queues: якщо потрібна черга не знайдена, то Celery спробує її створити – що в SQS нам точно не треба
Тепер конфіг може виглядати так – result_backend поки відключаємо, бо SQS його не підтримує, див. Results:
from kombu.utils.url import safequote
#broker_url='redis://localhost:6379/0'
#result_backend='redis://localhost:6379/0'
aws_access_key = safequote("AKI***B7A")
aws_secret_key = safequote("pAu***2gW")
broker_url = "sqs://{aws_access_key}:{aws_secret_key}@".format(
aws_access_key=aws_access_key, aws_secret_key=aws_secret_key,
)
broker_transport_options = {
"region": "us-east-1",
"predefined_queues": {
"arseny_test": {
"url": "https://sqs.us-east-1.amazonaws.com/492***148/arseny-celery-test",
}
}
}
task_create_missing_queues = False
task_default_queue = "arseny_test"
include=[
"celery_tasks"
]
Перезапускаємо воркер:
$ celery -A celery_app worker --loglevel=INFO
...
- ** ---------- [config]
- ** ---------- .> app: celery_app:0x7d9631ef8980
- ** ---------- .> transport: sqs://AKI***B7A:**@localhost//
- ** ---------- .> results: disabled://
- *** --- * --- .> concurrency: 16 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> arseny_test exchange=arseny_test(direct) key=arseny_test
[tasks]
. celery_tasks.test_task
...
Цікаво, що transport @localhost… Але ок.
Перевіряємо вкладку Моніторинг в SQS:
Меседжі пройшли, ОК.
Що далі?
А далі ми спробуємо додати трохи води моніторингу.
Моніторинг Celery в AWS SQS
Власне, як я дійшов до жизні такой до цього посту: є Kubernetes Pod, для якого хочеться мати простий Liveness Probe.
Я за 10 хвилин нагуглив пару методів Celery, оновив Deployment, і вже хотів мержити PR, як виявилось, що…
Отже, в чому зараз проблема: SQS не підтримує кілька корисних нам речей:
- SQS doesn’t yet support worker remote control commands.
- SQS doesn’t yet support events, and so cannot be used with celery events, celerymon, or the Django Admin monitor.
Тобто, якщо ми спробуємо виконати celery inspect ping – то отримаємо помилки. Як мінімум тому, бо для цього потрібна черга pidbox, яка не підтримується в SQS.
Давайте спочатку на ці помилки глянемо.
get() та “No result backend is configured”
Помилку з get() ми вже бачили – якщо спробувати зробити таке без result_backend:
result = test_task.delay("Hello, comrade!")
print(result.get())
То отримаємо NotImplementedError(E_NO_BACKEND.strip()):
...
File "/home/setevoy/Scripts/Python/celery/.venv/lib/python3.13/site-packages/celery/backends/base.py", line 1104, in _is_disabled
raise NotImplementedError(E_NO_BACKEND.strip())
NotImplementedError: No result backend is configured.
Окей…
Але ж мабуть така серйозна бібліотека як Celery має власні механізми для перевірки воркерів?
Так – має.
Але…
Celery control inspect
Взагалі, celery.app.control класна штука, і якщо у вас RabbitMQ чи Redis – то з нею можна отримати багато корисної інформації.
Але у нас SQS, тому control працювати не буде.
Пробуємо перевірити таски для воркерів:
$ celery -A celery_app inspect registered
...
File "/home/setevoy/Scripts/Python/celery/.venv/lib/python3.13/site-packages/kombu/transport/SQS.py", line 381, in _resolve_queue_url
raise UndefinedQueueException((
...<2 lines>...
).format(sqs_qname))
kombu.transport.SQS.UndefinedQueueException: Queue with name '0f41e5d5-49e1-38bc-bc9b-c1efbc4f9a3e-reply-celery-pidbox' must be defined in 'predefined_queues'.
Або можемо спробувати пінганути воркери з app.control.ping() – отримаємо ту ж саму помилку з “pidbox must be defined in ‘predefined_queues’“:
@app.task
def celery_health_check():
response = app.control.ping(timeout=2)
return response
Ну і теж саме для celery_app.control.inspect().
Є дуже стара GitHub issue – celery ping doesn’t work when using SQS.
І варіанти того, як цю проблему обійти в Kubernetes Liveness probes – просто… Відключити перевірки взагалі. Наприклад – тут>>>.
Можливе рішення: окремий result_backend
Отже, що я зараз намагаюсь зробити – це просто додати result_backend з другим контейнером Redis до Kubernetes Pod з Celery. Ресурсів Redis потребує копійки, тому в принципі якимось оверхедом це не буде.
Тобто, ми маємо:
- SQS для меседжів
- Redis для зберігання статусів обробки цих меседжів
Тоді з get() можемо отримати результат виконання тестової таски, і впевнитись, що воркери працюють.
Задаємо result_backend з Redis знов:
... result_backend='redis://localhost:6379/0' ...
Додаємо нову таску в celery_tasks.py:
@app.task
def celery_health_check():
return "OK"
Додаємо “моніторинг” в наш “API”, my_api_app.py:
#!/usr/bin/env python
import sys
from celery_tasks import test_task, celery_health_check_task
def celery_health_check():
try:
result = celery_health_check_task.apply_async()
response = result.get(timeout=5)
print ("Result:", result)
print ("Result state:", result.state)
print ("Respose:", response)
if response != "OK":
raise RuntimeError("Celery health check task returned unexpected response!")
print("Celery is running")
except Exception as e:
print("Celery health check failed")
print({"status": "error", "message": str(e)})
sys.exit(1)
celery_health_check()
delay() – самий простий метод без додаткових параметрів, apply_async() вміє в різні опції. Нам зараз в принципі не важливо, але delay() ми вже використовували, тут давайте з apply_async().
Перезапускаємо Celery Worker, запускаємо наш головний скрипт:
$ ./my_api_app.py Result: 2d32805b-3c55-412d-8fc4-4b893f222202 Result state: SUCCESS Respose: OK Celery is running
Гуд.
Що ми можемо ще?
Celery та FastAPI
Окрім виклику Celery через імпорти, ми можемо створити FastAPI сервіс, і все робити через TCP.
Встановлюємо fastapi та uvicorn:
$ pip install fastapi uvicorn
Створюємо файл celery_api.py, описуємо FastAPI app:
from fastapi import FastAPI, HTTPException
from celery_tasks import celery_health_check_task
app = FastAPI()
@app.get("/celery-healthz")
def celery_healthcheck():
try:
result = celery_health_check_task.apply_async()
response = result.get(timeout=5)
if response != "OK":
raise RuntimeError("Celery health check task returned unexpected response!")
return {"status": "success", "message": "Celery is running"}
except Exception as e:
raise HTTPException(status_code=500, detail={"status": "error", "message": str(e)})
Запускаємо з uvicorn:
$ uvicorn celery_api:app --host 0.0.0.0 --port 8000 --reload
І перевіряємо:
$ curl localhost:8000/celery-healthz
{"status":"success","message":"Celery is running"}
Looks good…
Flower для моніторингу Celery
Flower – популярне рішення для моніторингу Celery, але не буде працювати з SQS з тих самих причин:
- Flower використовує celery events (
celeryev), в SQS не працює - список воркерів у Flower використовує Celery
inspect(), що, як ми бачили вище, теж не буде працювати з SQS
Якщо мати окремий result_backend, як ми вище робили – то буде працювати частково – можна буде побачити список тасок.




