RabbitMQ: запуск, описание, примеры

Автор: | 06/06/2018

RabbitMQ – менеджер сообщений (message broker), написан на Erlang, ближайший аналог в AWS – SQS.

Предназначен для передачи данных (сообщений) между несколькими сервисами : один сервис добавляет в очередь сообщение, другой – получает это сообщение.

Ниже – пример установки, запуска и использования RabbitMQ.

Установка

На Arch Linux:

[simterm]

$ sudo pacman -S rabbitmq

[/simterm]

Debian/Ubuntu:

[simterm]

# apt install rabbitmq-server

[/simterm]

Либо можно запустить из Docker-образа:

[simterm]

$ docker run docker pull rabbitmq

[/simterm]

rabbitmq-plugins

RabbitMQ поддерживает работу с плагинами через rabbitmq-plugins.

Документация тут>>>.

Для просмотра активных плагинов – используем list:

[simterm]

admin@ip-172-31-33-210:~$ sudo rabbitmq-plugins list
 Configured: E = explicitly enabled; e = implicitly enabled
 | Status:   * = running on rabbit@ip-172-31-33-210
 |/
[  ] amqp_client                       3.6.6
[  ] cowboy                            1.0.3
[  ] cowlib                            1.0.1
[  ] mochiweb                          2.13.1
...

[/simterm]

Активируем плагин rabbitmq_management:

[simterm]

admin@ip-172-31-33-210:~$ sudo rabbitmq-plugins enable rabbitmq_management
The following plugins have been enabled:
  mochiweb
  webmachine
  rabbitmq_web_dispatch
  amqp_client
  rabbitmq_management_agent
  rabbitmq_management

Applying plugin configuration to rabbit@ip-172-31-33-210... started 6 plugins.

[/simterm]

Активные плагины хранятся в файле /etc/rabbitmq/enabled_plugins:

[simterm]

root@ip-172-31-33-210:/home/admin# cat /etc/rabbitmq/enabled_plugins 
[rabbitmq_management].

[/simterm]

HTTP API

Документация тут>>> и тут>>>.

rabbitmq_management активирует поддержку API на порту 15672:

[simterm]

root@ip-172-31-33-210:/home/admin# netstat -anp | grep 15672
tcp        0      0 0.0.0.0:15672           0.0.0.0:*               LISTEN      1348/beam

[/simterm]

Пример запроса:

[simterm]

root@ip-172-31-33-210:/home/admin# curl -i -u guest:guest localhost:15672/api/overview
HTTP/1.1 200 OK
vary: Accept-Encoding, origin
Server: MochiWeb/1.1 WebMachine/1.10.0 (never breaks eye contact)
Date: Tue, 05 Jun 2018 11:05:57 GMT
Content-Type: application/json
Content-Length: 1973
Cache-Control: no-cache

{"management_version":"3.6.6","rates_mode":"basic","exchange_types":[{"name":"direct","description":"AMQP direct exchange, as per the AMQP specification","enabled":true},{"name":"topic","description":"AMQP topic exchange, as per the AMQP specification","enabled":true},{"name":"fanout","description":"AMQP fanout exchange, as per the AMQP specification","enabled":true},{"name":"headers","description":"AMQP headers exchange, as per the AMQP specification","enabled":true}],"rabbitmq_version":"3.6.6","cluster_name":"[email protected]","erlang_version":"19.2.1","erlang_full_version":"Erlang/OTP 19 [erts-8.2.1] [source] [64-bit] [async-threads:64] [kernel-poll:true]","message_stats":{"deliver":0,"deliver_details":{"rate":0.0},"deliver_no_ack":0,"deliver_no_ack_details":{"rate":0.0},"get":1,"get_details":{"rate":0.0},"get_no_ack":0,"get_no_ack_details":{"rate":0.0},"publish":0,"publish_details":{"rate":0.0},"publish_in":0,"publish_in_details":{"rate":0.0},"publish_out":0,"publish_out_details":{"rate":0.0},"ack":0,"ack_details":{"rate":0.0},"deliver_get":1,"deliver_get_details":{"rate":0.0},"confirm":0,"confirm_details":{"rate":0.0},"return_unroutable":0,"return_unroutable_details":{"rate":0.0},"redeliver":0,"redeliver_details":{"rate":0.0}},"queue_totals":{"messages":1,"messages_details":{"rate":0.0},"messages_ready":1,"messages_ready_details":{"rate":0.0},"messages_unacknowledged":0,"messages_unacknowledged_details":{"rate":0.0}},"object_totals":{"consumers":0,"queues":1,"exchanges":8,"connections":0,"channels":0},"statistics_db_event_queue":0,"node":"rabbit@ip-172-31-33-210","statistics_db_node":"rabbit@ip-172-31-33-210","listeners":[{"node":"rabbit@ip-172-31-33-210","protocol":"amqp","ip_address":"::","port":5672},{"node":"rabbit@ip-172-31-33-210","protocol":"clustering","ip_address":"::","port":25672}],"contexts":[{"node":"rabbit@ip-172-31-33-210","description":"RabbitMQ Management","path":"/","port":"15672"}]}

[/simterm]

rabbitmqadmin

Документация тут>>>.

Утилита на Python, позволяющая выполнять операции используя HTTP API, такие как просмотр очередей, точек обмена, пользователей и т.д. Требуется плагин rabbitmq_management.

rabbitmqctl

Документация тут>>>.

Предназначена в основном для управления нодами в кластере – добавлением, удалением, перезагрузкой, управление логами.

Аналогично rabbitmqadmin – может управлять пользователеми, очередями, точками обмена и т.д:

[simterm]

root@ip-172-31-33-210:/home/admin# rabbitmqctl status
Status of node 'rabbit@ip-172-31-33-210' ...
[{pid,1348},
 {running_applications,
     [{rabbitmq_management,"RabbitMQ Management Console","3.6.6"},
      {rabbitmq_web_dispatch,"RabbitMQ Web Dispatcher","3.6.6"},
      {webmachine,"webmachine","1.10.3"},
      {mochiweb,"MochiMedia Web Server","2.13.1"},
      {rabbitmq_management_agent,"RabbitMQ Management Agent","3.6.6"},
...

[/simterm]

Просмотр пользователей:

[simterm]

root@ip-172-31-33-210:/home/admin# rabbitmqctl list_users
Listing users ...
guest   [administrator]

[/simterm]

Web-UI

После активации плагина – становится доступен веб-интефрейс для управления сервером на порт 15672:

Доступ пользователю guest по-умолчанию запрещён, добавляем нового пользователя:

[simterm]

root@ip-172-31-33-210:/home/admin# rabbitmqctl add_user test test
Creating user "test" ...

[/simterm]

Устанавливаем его администратором:

[simterm]

root@ip-172-31-33-210:/home/admin# rabbitmqctl set_user_tags test administrator
Setting tags for user "test" to [administrator] ...

[/simterm]

И права на все:

[simterm]

root@ip-172-31-33-210:/home/admin# rabbitmqctl set_permissions -p / test ".*" ".*" ".*"
Setting permissions for user "test" in vhost "/" ...

[/simterm]

Логинимся:

Суть работы

Для RabbitMQ есть три основных понятия:

  • producer: клиент, выполняющий отправку сообщения
  • queue: собственно очередь сообщений
  • consumer: клиент, получающий сообщения из очереди
  • exchange: получает сообщения от producer, и отправялет их в очереди в соответствии с его типом (см. тут>>>)

Пример

Для работы с RabbitMQ требуется поддержка AMQP (Advanced Message Queuing Protocol), для Python она предоставляется библиотеками pika, py-amqplib и др.

См. документацию тут>>>.

Устанавливаем pika:

[simterm]

# apt install python-pika

[/simterm]

Отправка сообщения

Создаём скрипт для producer – он будет отправлять сообщения:

#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.queue_declare(queue='hello')

channel.basic_publish(exchange='',
                     routing_key='hello',
                     body='Hello World!')

print(" [x] Sent 'Hello World!'")

connection.close()

Тут выполянется:

  1. подключение к rabbitmq-серверу на localhostconnection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
  2. создаётся очередь с именем helloqueue_declare(queue='hello')
  3. отправляется сообщение (body='Hello World!') через точку обмена по-умолчанию (безымянный exchange – exchange=''), в очередь hello (routing_key='hello')
  4. закрывается соединение – connection.close()

Запускаем скрипт:

[simterm]

admin@ip-172-31-33-210:~$ ./producer.py 
 [x] Sent 'Hello World!'

[/simterm]

Проверяем с помощью rabbitmqadmin:

[simterm]

root@ip-172-31-33-210:/home/admin# rabbitmqadmin get queue='hello'
+-------------+----------+---------------+--------------+---------------+------------------+------------+-------------+
| routing_key | exchange | message_count |   payload    | payload_bytes | payload_encoding | properties | redelivered |
+-------------+----------+---------------+--------------+---------------+------------------+------------+-------------+
| hello       |          | 0             | Hello World! | 12            | string           |            | False       |
+-------------+----------+---------------+--------------+---------------+------------------+------------+-------------+

[/simterm]

Список очередей с помощью rabbitmqctl:

[simterm]

root@ip-172-31-33-210:/home/admin# rabbitmqctl list_queues
Listing queues ...
hello   1

[/simterm]

В очереди hello сейчас висит 1 сообщение – получим его.

Получение сообщения

Второй скрипт – consumer.py – получит сообщение из очереди, и выведет его на консоль:

#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.queue_declare(queue='hello')

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)

channel.basic_consume(callback,
                      queue='hello',
                      no_ack=True)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

Тут выполняется:

  1. аналогично первому скрипту – подключение к rabboitmq
  2. создаём очередь hello, если её ещё нет
  3. создаём функцию callback() , которая будет получать сообщения и выводить их на консоль
  4. указываем её вызов в basic_consume(), передавая имя очереди
  5. запускаем бесконечный цикл с start_consuming(), который в свою очередь вызывает basic_consume()

Запускаем:

[simterm]

root@ip-172-31-33-210:/home/admin# ./consumer.py 
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'Hello World!'

[/simterm]

Проверяем очередь – теперь тут пусто:

[simterm]

root@ip-172-31-33-210:/home/admin# rabbitmqctl list_queues
Listing queues ...
hello   0

[/simterm]

Готово.