Якщо дуже просто, то Celery – це щось, за допомогою чого ми можемо виконувати задачі поза нашим основним сервісом.
Наприклад, є Backend API, який має якийсь ендпоінт, на який мобілочки відправляють інформацію про те, що юзер створив новий whatever в застосунку. Задача бекенда – додати whatever в базі даних.
Можна це виконати прямо в інстансі самого API відразу при отриманні івента на ендпоінт, а можна, якщо нам не горить виконання whatever в базі даних, створити паралельну відкладену задачу, яка буде виконана через 1-5-20-60 секунд.
Власне, це і робить Celery:
- основний код сервісу створює task
- Celery-клієнт в коді цю задачу відправляє в MQ Broker (Message Queue Broker, як-от RabbitMQ, Redis або мать його єті AWS SQS, див повний список на Broker Overview)
- Celery Worker отримує меседж з черги
- Worker запускає якусь функцію, як робить whatever в базі даних
- …
- profit!
Власне в цьому пості ми не будемо заглиблюватись в деталі реалізації всього цього щастя.
Все, що мені цікаво – це як з цим працювати, тобто – як я можу створити новий task, аби Worker цю задачу виконав.
В ідеалі – ще й перевірити, що задача дійсно була виконана – але тут є проблеми з AWS SQS. Далі подивимось на це детальніше.
Зміст
Запуск Celery
Зробимо все швиденько локально з Python PiP та Docker, потім ще глянемо на AWS SQS, Kubernetes та моніторинг.
Створення проекту
Встановлюємо сам Celery та залежності для роботи з Redis:
$ mkdir celery $ cd celery/ $ python -m venv .venv $ . ./.venv/bin/activate $ pip install celery $ pip install -U "celery[redis]"
Запуск Redis
В ролі MQ буде Redis, бо його легко запустити локально і він легенький в плані ресурсів.
Для results backend – теж Redis, але це розглянемо трохи пізніше.
Запускаємо контейнер з Redis:
$ docker run --rm -p 6379:6379 --name redis -e REDIS_ARGS="--bind 0.0.0.0" redis
Заходимо в нього:
$ docker exec -ti redis bash root@78326cab3d4b:/data#
Перевіряємо:
root@78326cab3d4b:/data# redis-cli ping PONG
Запуск Celery
Параметри для брокера – див. Broker Settings.
IDK чому в документації параметр називається “broker
” а не “broker_url
“, бо “broker” наче депрікейтед, а документація “describes the current stable version of Celery (5.4)” (с). Чи, може, --broker
– це параметр для командної строки, а broker_url
– для конфігу?
Весь код можна просто робити в одному файлі типу tasks.py
, як це описано в Getting Started документації, але я відразу розіб’ю на кілька окремих модулів, аби було більше “production way”, тим більше воно вже так зроблено у нас в Production, тому хочеться і під час тестування мати більш схожий сетап.
Створюємо основний модуль для Селері – celery_app.py
:
from celery import Celery app = Celery(__name__, broker_url='redis://localhost:6379/0', include=["celery_tasks"] )
Тут, власне, broker_url
– адреса Redis, а в include
ми підключаємо наші майбутні таски. Також є опція autodiscover_tasks
, але не пробував і у нас не використовується.
Якщо таски не заімпорчені – то будуть помилки типу:
Received unregistered task of type ‘celery_tasks.test_task’.
Створюємо модуль для Celery Tasks – celery_tasks.py
:
from celery_app import app @app.task def test_task(arg): return "OK: " + arg
Власне таска – це просто якась функція, яка має в бекграунді основної системи виконати якусь задачу.
Як в нашому випадку – у нас є API, на який приходяться повідомлення від клієнтів, що юзер створив новий запис у себе в мобільній апці.
Наш API через Celery створює таску, яка їде в RDS, і оновлює табличку з цим юзером, додаючи якісь нові records.
Пишемо наш “API сервіс” – основний код, який буде викликати Celery.
Через метод delay()
додаємо створення задачі:
#!/usr/bin/env python from celery_tasks import test_task test_task.delay("Hello, comrade!")
Запускаємо вже власне Celery Worker – це окремий процес Python (в Kubernetes у нас для цього окремі Pods).
Тобто, Celery-клієнт – це інстанс Celery, який створюється під час запуску API і через який ми створюємо нові задачі в брокері, а Worker – це окремий процес, який займається збором повідомлень і виконанням задач.
Запустити можна просто з термінала:
$ celery -A celery_app worker --loglevel=INFO ... - ** ---------- [config] - ** ---------- .> app: celery_app:0x7d2bb7830980 - ** ---------- .> transport: redis://localhost:6379/0 - ** ---------- .> results: disabled:// ... -------------- [queues] .> celery exchange=celery(direct) key=celery ...
Вже можна глянути ключі в Redis – тут поки нічого особливого, але далі подивимось що взагалі тут створюється:
root@61eff8635cb1:/data# redis-cli keys * 1) "_kombu.binding.celery.pidbox" 2) "_kombu.binding.celery" 3) "_kombu.binding.celeryev"
Так як у нас Redis, то це будуть саме ключі. Але для Celery – це черги, і ключі створюються з типом SET, тобто можуть мати послідовність. Хоча нам зараз такі нюанси не дуже важливі.
Запускаємо наш “API” аби він викликав створення задачі (не забуваємо про activate venv, якщо робимо в окремому терміналі, бо треба імпортити Celery libs):
$ chmod +x my_api_app.py $ ./my_api_app.py
Перевіряємо логи Celery Worker – таска отримана, таска оброблена:
... [2025-03-19 12:48:06,285: INFO/MainProcess] Task celery_tasks.test_task[edbdc0aa-673c-490f-a18f-0b7665db2ff7] received [2025-03-19 12:48:06,287: INFO/ForkPoolWorker-15] Task celery_tasks.test_task[edbdc0aa-673c-490f-a18f-0b7665db2ff7] succeeded in 0.0003573799040168524s: 'OK: Hello, comrade!'
Default Celery Broker keys
Швиденько глянемо розберемо що Celery створює в чергах і для чого, бо в SQS будемо мати проблеми з деякими з них, тому треба розуміти чому і від чого.
Як вже говорили вище, в Redis це KEYS, але з типом SET (в SQS були б окремі черги):
root@a0c65a5e7bfb:/data# redis-cli type _kombu.binding.celeryev set
Тут:
_kombu.binding.celeryev
: використовується Celery Events для надсилання подій про стан воркерів (наприклад, коли воркер запускається, виконує задачу або завершує роботу)- використовується для моніторингу через celery events або Flower, ми їх далі подивимось
_kombu.binding.celery
: головна черга завдань Celery за замовчуванням- сюди надсилаються задачі, які потім обробляються Celery Workers
_kombu.binding.celery.pidbox
: використовується для pidbox messaging, тобто обміну командами між воркерами, наприклад,celery inspect
,celery control
– і в SQS це теж працювати не буде- через нього Celery може надсилати команди воркерам, щоб перевірити їхній стан, змінити рівень логування тощо
Чому _kombu
в іменах – бо під капотом Celery використовує бібліотеку Kombu.
Глянемо, що в ключах.
В дефолтній черзі – виконуємо SMEMBERS
, бо це тип SET
:
root@a06f25448034:/data# redis-cli SMEMBERS _kombu.binding.celery 1) "celery\x06\x16\x06\x16celery"
\x06
та \x16
– це ACK та SYN, які додаються Kombo.
Так як наша задача була виконана – то у нас в _kombu.binding.celery
пусто.
Ну і pidbox
– інформація про наявний воркер celery@setevoy-wrk-laptop
:
root@8d4de6fb1bc6:/data# redis-cli SMEMBERS _kombu.binding.celery.pidbox 1) "\x06\x16\x06\[email protected]"
Можна запустити redis-cli monitor
, і побачити все, що відбувається в Redis.
Окей.
Все наче працює.
Що далі?
Додавання result_backend
Без наявного result_backend
ми не можемо перевіряти статус виконання тасок, бо Celery просто ніде не зберігає цю інформацію. Див. Keeping Results.
Тобто, якщо ми потім захочемо отримати стан задачі з response = result.get()
(далі це зробимо) – то без result_backend
отримаємо помилки типу:
│ File “/usr/local/lib/python3.12/site-packages/celery/backends/base.py”, line 1104, in _is_disabled │
│ raise NotImplementedError(E_NO_BACKEND.strip())
Давайте конфіг Celery винесемо теж окремим модулем, celery_config.py
, і додамо параметр result_backend
з Redis:
broker_url='redis://localhost:6379/0' result_backend='redis://localhost:6379/0' include=[ "celery_tasks" ]
Оновлюємо код celery_app.py
– додаємо імпорт конфігу і виклик config_from_object()
:
import celery_config from celery import Celery #app = Celery(__name__, # broker_url='redis://localhost:6379/0', # include=["celery_tasks"] # ) app = Celery(__name__) app.config_from_object("celery_config", force=True)
Перезапускаємо Celery Worker, і тепер в “results
” замість Disabled маємо адресу нашого Redis:
$ celery -A celery_app:app worker --loglevel=INFO ... - ** ---------- [config] - ** ---------- .> app: celery_app:0x78f451ae8980 - ** ---------- .> transport: redis://localhost:6379/0 - ** ---------- .> results: redis://localhost:6379/0 ...
Оновлюємо код основного сервісу – додамо отримання результату виконання задачі через get()
:
#!/usr/bin/env python from celery_tasks import test_task result = test_task.delay("Hello, comrade!") print(result.get())
Запускаємо наш “API”:
$ ./my_api_app.py OK: Hello, comrade!
І в print(result.get())
маємо значення, яке повертає функція test_task()
– тобто return "OK"
+ переданий аргумент.
Окей.
Виглядає, наче тут все класно працює.
А тепер давайте спробуємо використати AWS SQS.
Використання Celery з AWS SQS
Документація – Using Amazon SQS.
Встановлюємо залежності:
$ pip install "celery[sqs]"
Створення SQS
Залишаємо дефолтний тип, Standart:
Зберігаємо, копіюємо URL – він нам буде потрібний для Celery:
Налаштування Celery з SQS
Редагуємо наш celery_app
.py – додаємо AWS ACCESS/SECRET ключі і чергу.
Але спочатку давайте глянемо на доступні опції:
broker_url
: тут міняємо наsqs
broker_transport_options
:predefined_queues
: варто додати, бо інакше Celery буде шукати доступні SQS зListQueues
, що довго і може бути дорого- дефолтна черга може бути задана з
task_default_queue
- якщо
task_default_queue
не задана, то Celery (мабуть) буде шукати чергу за URLhttps://sqs.us-east-1.amazonaws.com/492***148/celery
- якщо
- чергу також можна передати під час створення таски –
@app.task(queue="my_custom_queue")
- дефолтна черга може бути задана з
task_create_missing_queues
: якщо потрібна черга не знайдена, то Celery спробує її створити – що в SQS нам точно не треба
Тепер конфіг може виглядати так – result_backend
поки відключаємо, бо SQS його не підтримує, див. Results:
from kombu.utils.url import safequote #broker_url='redis://localhost:6379/0' #result_backend='redis://localhost:6379/0' aws_access_key = safequote("AKI***B7A") aws_secret_key = safequote("pAu***2gW") broker_url = "sqs://{aws_access_key}:{aws_secret_key}@".format( aws_access_key=aws_access_key, aws_secret_key=aws_secret_key, ) broker_transport_options = { "region": "us-east-1", "predefined_queues": { "arseny_test": { "url": "https://sqs.us-east-1.amazonaws.com/492***148/arseny-celery-test", } } } task_create_missing_queues = False task_default_queue = "arseny_test" include=[ "celery_tasks" ]
Перезапускаємо воркер:
$ celery -A celery_app worker --loglevel=INFO ... - ** ---------- [config] - ** ---------- .> app: celery_app:0x7d9631ef8980 - ** ---------- .> transport: sqs://AKI***B7A:**@localhost// - ** ---------- .> results: disabled:// - *** --- * --- .> concurrency: 16 (prefork) -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker) --- ***** ----- -------------- [queues] .> arseny_test exchange=arseny_test(direct) key=arseny_test [tasks] . celery_tasks.test_task ...
Цікаво, що transport
@localhost… Але ок.
Перевіряємо вкладку Моніторинг в SQS:
Меседжі пройшли, ОК.
Що далі?
А далі ми спробуємо додати трохи води моніторингу.
Моніторинг Celery в AWS SQS
Власне, як я дійшов до жизні такой до цього посту: є Kubernetes Pod, для якого хочеться мати простий Liveness Probe.
Я за 10 хвилин нагуглив пару методів Celery, оновив Deployment, і вже хотів мержити PR, як виявилось, що…
Отже, в чому зараз проблема: SQS не підтримує кілька корисних нам речей:
- SQS doesn’t yet support worker remote control commands.
- SQS doesn’t yet support events, and so cannot be used with celery events, celerymon, or the Django Admin monitor.
Тобто, якщо ми спробуємо виконати celery inspect ping
– то отримаємо помилки. Як мінімум тому, бо для цього потрібна черга pidbox
, яка не підтримується в SQS.
Давайте спочатку на ці помилки глянемо.
get()
та “No result backend is configured”
Помилку з get()
ми вже бачили – якщо спробувати зробити таке без result_backend
:
result = test_task.delay("Hello, comrade!") print(result.get())
То отримаємо NotImplementedError(E_NO_BACKEND.strip())
:
... File "/home/setevoy/Scripts/Python/celery/.venv/lib/python3.13/site-packages/celery/backends/base.py", line 1104, in _is_disabled raise NotImplementedError(E_NO_BACKEND.strip()) NotImplementedError: No result backend is configured.
Окей…
Але ж мабуть така серйозна бібліотека як Celery має власні механізми для перевірки воркерів?
Так – має.
Але…
Celery control inspect
Взагалі, celery.app.control
класна штука, і якщо у вас RabbitMQ чи Redis – то з нею можна отримати багато корисної інформації.
Але у нас SQS, тому control працювати не буде.
Пробуємо перевірити таски для воркерів:
$ celery -A celery_app inspect registered ... File "/home/setevoy/Scripts/Python/celery/.venv/lib/python3.13/site-packages/kombu/transport/SQS.py", line 381, in _resolve_queue_url raise UndefinedQueueException(( ...<2 lines>... ).format(sqs_qname)) kombu.transport.SQS.UndefinedQueueException: Queue with name '0f41e5d5-49e1-38bc-bc9b-c1efbc4f9a3e-reply-celery-pidbox' must be defined in 'predefined_queues'.
Або можемо спробувати пінганути воркери з app.control.ping()
– отримаємо ту ж саму помилку з “pidbox must be defined in ‘predefined_queues’“:
@app.task def celery_health_check(): response = app.control.ping(timeout=2) return response
Ну і теж саме для celery_app.control.inspect()
.
Є дуже стара GitHub issue – celery ping doesn’t work when using SQS.
І варіанти того, як цю проблему обійти в Kubernetes Liveness probes – просто… Відключити перевірки взагалі. Наприклад – тут>>>.
Можливе рішення: окремий result_backend
Отже, що я зараз намагаюсь зробити – це просто додати result_backend
з другим контейнером Redis до Kubernetes Pod з Celery. Ресурсів Redis потребує копійки, тому в принципі якимось оверхедом це не буде.
Тобто, ми маємо:
- SQS для меседжів
- Redis для зберігання статусів обробки цих меседжів
Тоді з get()
можемо отримати результат виконання тестової таски, і впевнитись, що воркери працюють.
Задаємо result_backend
з Redis знов:
... result_backend='redis://localhost:6379/0' ...
Додаємо нову таску в celery_tasks.py
:
@app.task def celery_health_check(): return "OK"
Додаємо “моніторинг” в наш “API”, my_api_app.py
:
#!/usr/bin/env python import sys from celery_tasks import test_task, celery_health_check_task def celery_health_check(): try: result = celery_health_check_task.apply_async() response = result.get(timeout=5) print ("Result:", result) print ("Result state:", result.state) print ("Respose:", response) if response != "OK": raise RuntimeError("Celery health check task returned unexpected response!") print("Celery is running") except Exception as e: print("Celery health check failed") print({"status": "error", "message": str(e)}) sys.exit(1) celery_health_check()
delay()
– самий простий метод без додаткових параметрів, apply_async()
вміє в різні опції. Нам зараз в принципі не важливо, але delay()
ми вже використовували, тут давайте з apply_async()
.
Перезапускаємо Celery Worker, запускаємо наш головний скрипт:
$ ./my_api_app.py Result: 2d32805b-3c55-412d-8fc4-4b893f222202 Result state: SUCCESS Respose: OK Celery is running
Гуд.
Що ми можемо ще?
Celery та FastAPI
Окрім виклику Celery через імпорти, ми можемо створити FastAPI сервіс, і все робити через TCP.
Встановлюємо fastapi
та uvicorn
:
$ pip install fastapi uvicorn
Створюємо файл celery_api.py
, описуємо FastAPI app:
from fastapi import FastAPI, HTTPException from celery_tasks import celery_health_check_task app = FastAPI() @app.get("/celery-healthz") def celery_healthcheck(): try: result = celery_health_check_task.apply_async() response = result.get(timeout=5) if response != "OK": raise RuntimeError("Celery health check task returned unexpected response!") return {"status": "success", "message": "Celery is running"} except Exception as e: raise HTTPException(status_code=500, detail={"status": "error", "message": str(e)})
Запускаємо з uvicorn
:
$ uvicorn celery_api:app --host 0.0.0.0 --port 8000 --reload
І перевіряємо:
$ curl localhost:8000/celery-healthz {"status":"success","message":"Celery is running"}
Looks good…
Flower для моніторингу Celery
Flower – популярне рішення для моніторингу Celery, але не буде працювати з SQS з тих самих причин:
- Flower використовує celery events (
celeryev
), в SQS не працює - список воркерів у Flower використовує Celery
inspect()
, що, як ми бачили вище, теж не буде працювати з SQS
Якщо мати окремий result_backend
, як ми вище робили – то буде працювати частково – можна буде побачити список тасок.