RabbitMQ – менеджер сообщений (message broker), написан на Erlang, ближайший аналог в AWS – SQS.
Предназначен для передачи данных (сообщений) между несколькими сервисами : один сервис добавляет в очередь сообщение, другой – получает это сообщение.
Ниже – пример установки, запуска и использования RabbitMQ.
Содержание
Установка
На Arch Linux:
[simterm]
$ sudo pacman -S rabbitmq
[/simterm]
Debian/Ubuntu:
[simterm]
# apt install rabbitmq-server
[/simterm]
Либо можно запустить из Docker-образа:
[simterm]
$ docker run docker pull rabbitmq
[/simterm]
rabbitmq-plugins
RabbitMQ поддерживает работу с плагинами через rabbitmq-plugins
.
Документация тут>>>.
Для просмотра активных плагинов – используем list
:
[simterm]
admin@ip-172-31-33-210:~$ sudo rabbitmq-plugins list Configured: E = explicitly enabled; e = implicitly enabled | Status: * = running on rabbit@ip-172-31-33-210 |/ [ ] amqp_client 3.6.6 [ ] cowboy 1.0.3 [ ] cowlib 1.0.1 [ ] mochiweb 2.13.1 ...
[/simterm]
Активируем плагин rabbitmq_management
:
[simterm]
admin@ip-172-31-33-210:~$ sudo rabbitmq-plugins enable rabbitmq_management The following plugins have been enabled: mochiweb webmachine rabbitmq_web_dispatch amqp_client rabbitmq_management_agent rabbitmq_management Applying plugin configuration to rabbit@ip-172-31-33-210... started 6 plugins.
[/simterm]
Активные плагины хранятся в файле /etc/rabbitmq/enabled_plugins
:
[simterm]
root@ip-172-31-33-210:/home/admin# cat /etc/rabbitmq/enabled_plugins [rabbitmq_management].
[/simterm]
HTTP API
rabbitmq_management
активирует поддержку API на порту 15672:
[simterm]
root@ip-172-31-33-210:/home/admin# netstat -anp | grep 15672 tcp 0 0 0.0.0.0:15672 0.0.0.0:* LISTEN 1348/beam
[/simterm]
Пример запроса:
[simterm]
root@ip-172-31-33-210:/home/admin# curl -i -u guest:guest localhost:15672/api/overview HTTP/1.1 200 OK vary: Accept-Encoding, origin Server: MochiWeb/1.1 WebMachine/1.10.0 (never breaks eye contact) Date: Tue, 05 Jun 2018 11:05:57 GMT Content-Type: application/json Content-Length: 1973 Cache-Control: no-cache {"management_version":"3.6.6","rates_mode":"basic","exchange_types":[{"name":"direct","description":"AMQP direct exchange, as per the AMQP specification","enabled":true},{"name":"topic","description":"AMQP topic exchange, as per the AMQP specification","enabled":true},{"name":"fanout","description":"AMQP fanout exchange, as per the AMQP specification","enabled":true},{"name":"headers","description":"AMQP headers exchange, as per the AMQP specification","enabled":true}],"rabbitmq_version":"3.6.6","cluster_name":"[email protected]","erlang_version":"19.2.1","erlang_full_version":"Erlang/OTP 19 [erts-8.2.1] [source] [64-bit] [async-threads:64] [kernel-poll:true]","message_stats":{"deliver":0,"deliver_details":{"rate":0.0},"deliver_no_ack":0,"deliver_no_ack_details":{"rate":0.0},"get":1,"get_details":{"rate":0.0},"get_no_ack":0,"get_no_ack_details":{"rate":0.0},"publish":0,"publish_details":{"rate":0.0},"publish_in":0,"publish_in_details":{"rate":0.0},"publish_out":0,"publish_out_details":{"rate":0.0},"ack":0,"ack_details":{"rate":0.0},"deliver_get":1,"deliver_get_details":{"rate":0.0},"confirm":0,"confirm_details":{"rate":0.0},"return_unroutable":0,"return_unroutable_details":{"rate":0.0},"redeliver":0,"redeliver_details":{"rate":0.0}},"queue_totals":{"messages":1,"messages_details":{"rate":0.0},"messages_ready":1,"messages_ready_details":{"rate":0.0},"messages_unacknowledged":0,"messages_unacknowledged_details":{"rate":0.0}},"object_totals":{"consumers":0,"queues":1,"exchanges":8,"connections":0,"channels":0},"statistics_db_event_queue":0,"node":"rabbit@ip-172-31-33-210","statistics_db_node":"rabbit@ip-172-31-33-210","listeners":[{"node":"rabbit@ip-172-31-33-210","protocol":"amqp","ip_address":"::","port":5672},{"node":"rabbit@ip-172-31-33-210","protocol":"clustering","ip_address":"::","port":25672}],"contexts":[{"node":"rabbit@ip-172-31-33-210","description":"RabbitMQ Management","path":"/","port":"15672"}]}
[/simterm]
rabbitmqadmin
Документация тут>>>.
Утилита на Python, позволяющая выполнять операции используя HTTP API, такие как просмотр очередей, точек обмена, пользователей и т.д. Требуется плагин rabbitmq_management
.
rabbitmqctl
Документация тут>>>.
Предназначена в основном для управления нодами в кластере – добавлением, удалением, перезагрузкой, управление логами.
Аналогично rabbitmqadmin
– может управлять пользователеми, очередями, точками обмена и т.д:
[simterm]
root@ip-172-31-33-210:/home/admin# rabbitmqctl status Status of node 'rabbit@ip-172-31-33-210' ... [{pid,1348}, {running_applications, [{rabbitmq_management,"RabbitMQ Management Console","3.6.6"}, {rabbitmq_web_dispatch,"RabbitMQ Web Dispatcher","3.6.6"}, {webmachine,"webmachine","1.10.3"}, {mochiweb,"MochiMedia Web Server","2.13.1"}, {rabbitmq_management_agent,"RabbitMQ Management Agent","3.6.6"}, ...
[/simterm]
Просмотр пользователей:
[simterm]
root@ip-172-31-33-210:/home/admin# rabbitmqctl list_users Listing users ... guest [administrator]
[/simterm]
Web-UI
После активации плагина – становится доступен веб-интефрейс для управления сервером на порт 15672:
Доступ пользователю guest
по-умолчанию запрещён, добавляем нового пользователя:
[simterm]
root@ip-172-31-33-210:/home/admin# rabbitmqctl add_user test test Creating user "test" ...
[/simterm]
Устанавливаем его администратором:
[simterm]
root@ip-172-31-33-210:/home/admin# rabbitmqctl set_user_tags test administrator Setting tags for user "test" to [administrator] ...
[/simterm]
И права на все:
[simterm]
root@ip-172-31-33-210:/home/admin# rabbitmqctl set_permissions -p / test ".*" ".*" ".*" Setting permissions for user "test" in vhost "/" ...
[/simterm]
Логинимся:
Суть работы
Для RabbitMQ есть три основных понятия:
- producer: клиент, выполняющий отправку сообщения
- queue: собственно очередь сообщений
- consumer: клиент, получающий сообщения из очереди
- exchange: получает сообщения от producer, и отправялет их в очереди в соответствии с его типом (см. тут>>>)
Пример
Для работы с RabbitMQ требуется поддержка AMQP (Advanced Message Queuing Protocol), для Python она предоставляется библиотеками pika
, py-amqplib
и др.
См. документацию тут>>>.
Устанавливаем pika
:
[simterm]
# apt install python-pika
[/simterm]
Отправка сообщения
Создаём скрипт для producer – он будет отправлять сообщения:
#!/usr/bin/env python import pika connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() channel.queue_declare(queue='hello') channel.basic_publish(exchange='', routing_key='hello', body='Hello World!') print(" [x] Sent 'Hello World!'") connection.close()
Тут выполянется:
- подключение к rabbitmq-серверу на localhost –
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
- создаётся очередь с именем hello –
queue_declare(queue='hello')
- отправляется сообщение (
body='Hello World!'
) через точку обмена по-умолчанию (безымянный exchange –exchange=''
), в очередь hello (routing_key='hello'
) - закрывается соединение –
connection.close()
Запускаем скрипт:
[simterm]
admin@ip-172-31-33-210:~$ ./producer.py [x] Sent 'Hello World!'
[/simterm]
Проверяем с помощью rabbitmqadmin
:
[simterm]
root@ip-172-31-33-210:/home/admin# rabbitmqadmin get queue='hello' +-------------+----------+---------------+--------------+---------------+------------------+------------+-------------+ | routing_key | exchange | message_count | payload | payload_bytes | payload_encoding | properties | redelivered | +-------------+----------+---------------+--------------+---------------+------------------+------------+-------------+ | hello | | 0 | Hello World! | 12 | string | | False | +-------------+----------+---------------+--------------+---------------+------------------+------------+-------------+
[/simterm]
Список очередей с помощью rabbitmqctl
:
[simterm]
root@ip-172-31-33-210:/home/admin# rabbitmqctl list_queues Listing queues ... hello 1
[/simterm]
В очереди hello сейчас висит 1 сообщение – получим его.
Получение сообщения
Второй скрипт – consumer.py
– получит сообщение из очереди, и выведет его на консоль:
#!/usr/bin/env python import pika connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() channel.queue_declare(queue='hello') def callback(ch, method, properties, body): print(" [x] Received %r" % body) channel.basic_consume(callback, queue='hello', no_ack=True) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming()
Тут выполняется:
- аналогично первому скрипту – подключение к
rabboitmq
- создаём очередь hello, если её ещё нет
- создаём функцию
callback()
, которая будет получать сообщения и выводить их на консоль - указываем её вызов в
basic_consume()
, передавая имя очереди - запускаем бесконечный цикл с
start_consuming()
, который в свою очередь вызываетbasic_consume()
Запускаем:
[simterm]
root@ip-172-31-33-210:/home/admin# ./consumer.py [*] Waiting for messages. To exit press CTRL+C [x] Received 'Hello World!'
[/simterm]
Проверяем очередь – теперь тут пусто:
[simterm]
root@ip-172-31-33-210:/home/admin# rabbitmqctl list_queues Listing queues ... hello 0
[/simterm]
Готово.