RabbitMQ: примеры rabbitmqctl, rabbitmqadmin и хранение сообщений при рестарте сервера

Автор: | 05/12/2018
 

Задача – запустить RabbitMQ сервер и создать сообщение, но таким образом, что бы оно сохранилось при рестарте сервера.

Документация по хранению данных – тут>>>.

В частности, в ней указано, что на диск будут сохранены любые типы сообщений, если RabbitMQ начинает особождать память (см. Memory Alarms), но нам требуется, что бы сообщения всегда синхронизировались в базу на диске.

Кроме того, для примера работы с rabbitmqctl и rabbitmqadmin выполним:

  1. создадим vhost
  2. создадим пользователя с доступом к этому вхосту
  3. создадим очередь в этом виртуалхосте
  4. создадим exchange для этой queue
  5. создадим binding

Подготовка RabbitMQ

RabbitMQ vhost

Документация – тут>>>.

Создаём виртуалхост:

[simterm]

root@ip-172-31-18-246:/home/admin# rabbitmqctl add_vhost examplevhost
Creating vhost "examplevhost" ...

[/simterm]

Проверяем:

[simterm]

root@ip-172-31-18-246:/home/admin# rabbitmqctl list_vhosts
Listing vhosts ...
/
examplevhost

[/simterm]

RabbitMQ user

Документация – тут>>>.

Создаём пользователя:

[simterm]

root@ip-172-31-18-246:/home/admin# rabbitmqctl add_user exampleuser examplepass
Creating user "exampleuser" ...

[/simterm]

И права на созданный хост:

[simterm]

root@ip-172-31-18-246:/home/admin# rabbitmqctl set_permissions -p examplevhost exampleuser ".*" ".*" ".*"
Setting permissions for user "exampleuser" in vhost "examplevhost" ...

[/simterm]

Проверяем:

[simterm]

root@ip-172-31-18-246:/home/admin# rabbitmqadmin -u exampleuser -p examplepass list queues
*** Access refused: /api/queues?columns=name,messages

[/simterm]

Эм…

То же.

Проверяем лог – /var/log/rabbitmq/rabbit\@ip-172-31-18-246.log:

=WARNING REPORT==== 4-Dec-2018::15:18:26 ===
HTTP access denied: user ‘exampleuser’ – Not management user

Ах, да.

Добавляем тег “administrator“:

[simterm]

root@ip-172-31-18-246:/home/admin# rabbitmqctl set_user_tags exampleuser administrator
Setting tags for user "exampleuser" to [administrator] ...

[/simterm]

Проверяем:

[simterm]

root@ip-172-31-18-246:/home/admin# rabbitmqadmin -u exampleuser -p examplepass list vhosts
+--------------+----------+
|     name     | messages |
+--------------+----------+
| /            | 1        |
| examplevhost |          |
+--------------+----------+

[/simterm]

И очереди в нашем виртуахосте:

[simterm]

root@ip-172-31-18-246:/home/admin# rabbitmqadmin -V examplevhost -u exampleuser -p examplepass list queues
No items

[/simterm]

RabbitMQ queues

Документация тут>>>, и хороший обзор – тут>>>.

Создаём очередь:

[simterm]

root@ip-172-31-18-246:/home/admin# rabbitmqadmin -u exampleuser -p examplepass declare queue --vhost=examplevhost name=examplequeue
queue declared

[/simterm]

Ещё раз проверяем:

[simterm]

root@ip-172-31-18-246:/home/admin# rabbitmqadmin -V examplevhost -u exampleuser -p examplepass list queues
+--------------+----------+
|     name     | messages |
+--------------+----------+
| examplequeue | 0        |
+--------------+----------+

[/simterm]

RabbitMQ exchanges

Примеры – тут>>>, описание тут>>>.

Создаём exchange:

[simterm]

root@ip-172-31-18-246:/home/admin# rabbitmqadmin -u exampleuser -p examplepass declare exchange --vhost=examplevhost  name="exampleexchange" type="direct"
exchange declared

[/simterm]

Проверяем:

[simterm]

root@ip-172-31-18-246:/home/admin# rabbitmqadmin -V examplevhost -u exampleuser -p examplepass list exchanges
+--------------------+---------+
|        name        |  type   |
+--------------------+---------+
|                    | direct  |
| amq.direct         | direct  |
| amq.fanout         | fanout  |
| amq.headers        | headers |
| amq.match          | headers |
| amq.rabbitmq.trace | topic   |
| amq.topic          | topic   |
| exampleexchange    | direct  |
+--------------------+---------+

[/simterm]

RabbitMQ bindings

См. тут>>> и тут>>>.

Проверяем биндинги – сейчас тут один, дефолтный, который перенаправляет все сообщения с routing_key == examplequeue в очередь с именем examplequeue:

[simterm]

root@ip-172-31-18-246:/home/admin# rabbitmqadmin -V examplevhost -u exampleuser -p examplepass list bindings
+--------+--------------+--------------+
| source | destination  | routing_key  |
+--------+--------------+--------------+
|        | examplequeue | examplequeue |
+--------+--------------+--------------+

[/simterm]

Добавляем новый:

[simterm]

root@ip-172-31-18-246:/home/admin# rabbitmqadmin -u exampleuser -p examplepass declare binding --vhost=examplevhost source="exampleexchange" destination_type="queue" destination="examplequeue" routing_key="examplekey"
binding declared

[/simterm]

Что бы как-то отличать работу нашего биндинга от дефолтного – используем значение routing_key == examplekey.

Проверяем:

[simterm]

root@ip-172-31-18-246:/home/admin# rabbitmqadmin -V examplevhost -u exampleuser -p examplepass list bindings
+-----------------+--------------+--------------+
|     source      | destination  | routing_key  |
+-----------------+--------------+--------------+
|                 | examplequeue | examplequeue |
| exampleexchange | examplequeue | examplekey   |
+-----------------+--------------+--------------+

[/simterm]

Отправляем сообщение:

[simterm]

root@ip-172-31-18-246:/home/admin# rabbitmqadmin -V examplevhost -u exampleuser -p examplepass publish exchange=exampleexchange routing_key=examplekey payload="hello, world"
Message published

[/simterm]

Проверяем очередь:

[simterm]

root@ip-172-31-18-246:/home/admin# rabbitmqadmin -V examplevhost -u exampleuser -p examplepass list queues
+--------------+----------+
|     name     | messages |
+--------------+----------+
| examplequeue | 1        |
+--------------+----------+

[/simterm]

Получаем сообщение:

[simterm]

root@ip-172-31-18-246:/home/admin# rabbitmqadmin -V examplevhost -u exampleuser -p examplepass get queue=examplequeue requeue=true
+-------------+-----------------+---------------+--------------+---------------+------------------+------------+-------------+
| routing_key |    exchange     | message_count |   payload    | payload_bytes | payload_encoding | properties | redelivered |
+-------------+-----------------+---------------+--------------+---------------+------------------+------------+-------------+
| examplekey  | exampleexchange | 0             | hello, world | 12            | string           |            | False       |
+-------------+-----------------+---------------+--------------+---------------+------------------+------------+-------------+

[/simterm]

Используем тут параметр requeue=true, что бы сообщение осталось в очереди.

Хранение сообщений при рестарте

А теперь – перезапускаем сервер:

[simterm]

root@ip-172-31-18-246:/home/admin# systemctl restart rabbitmq-server

[/simterm]

И проверяем очередь пещё раз:

[simterm]

root@ip-172-31-18-246:/home/admin# rabbitmqadmin -V examplevhost -u exampleuser -p examplepass list queues
+--------------+----------+
|     name     | messages |
+--------------+----------+
| examplequeue | 0        |
+--------------+----------+

[/simterm]

Или с дополнительными полями:

[simterm]

root@ip-172-31-18-246:/home/admin# rabbitmqadmin -V examplevhost -u exampleuser -p examplepass list queues vhost name messages
+--------------+--------------+----------+
|    vhost     |     name     | messages |
+--------------+--------------+----------+
| examplevhost | examplequeue | 0        |
+--------------+--------------+----------+

[/simterm]

Сообщений == 0.

Что бы RabbitMQ сохранял сообщения – необходимо два изменения.

Первое – объявить очередь как durable:

[simterm]

root@ip-172-31-18-246:/home/admin# rabbitmqadmin -u exampleuser -p examplepass declare queue --vhost=examplevhost name=examplequeue durable=true
queue declared

[/simterm]

Проверяем.

Добавляем -f (format) long, и -d (depth) 2:

[simterm]

root@ip-172-31-18-246:/home/admin# rabbitmqadmin -V examplevhost -u exampleuser -p examplepass list queues -f long -d 2

--------------------------------------------------------------------------------

                                    vhost: examplevhost
                                     name: examplequeue
                                       ...
                                  durable: True
                                       ...

[/simterm]

Или через RabbitMQ WebUI:

Следующим – добавляем сообщение, но указываем delivery_mode == 2:

[simterm]

root@ip-172-31-18-246:/home/admin# rabbitmqadmin -V examplevhost -u exampleuser -p examplepass publish exchange=exampleexchange routing_key=examplekey payload="hello, world" properties="{\"delivery_mode\": 2}"
Message published

[/simterm]

Проверяем:

[simterm]

root@ip-172-31-18-246:/home/admin# rabbitmqadmin -u exampleuser -p examplepass list queues
+--------------+----------+
|     name     | messages |
+--------------+----------+
| examplequeue | 1        |
+--------------+----------+

[/simterm]

Ребутаем RabbitMQ:

[simterm]

root@ip-172-31-18-246:/home/admin# systemctl restart rabbitmq-server

[/simterm]

Ещё раз проверяем:

[simterm]

root@ip-172-31-18-246:/home/admin# rabbitmqadmin -u exampleuser -p examplepass list queues
+--------------+----------+
|     name     | messages |
+--------------+----------+
| examplequeue | 1        |
+--------------+----------+

[/simterm]

Получаем его:

[simterm]

root@ip-172-31-18-246:/home/admin# rabbitmqadmin -V examplevhost -u exampleuser -p examplepass get queue=examplequeue requeue=true
+-------------+-----------------+---------------+--------------+---------------+------------------+-------------+
| routing_key |    exchange     | message_count |   payload    | payload_bytes | payload_encoding | redelivered |
+-------------+-----------------+---------------+--------------+---------------+------------------+-------------+
| examplekey  | exampleexchange | 0             | hello, world | 12            | string           | False       |
+-------------+-----------------+---------------+--------------+---------------+------------------+-------------+

[/simterm]

Всё на месте.

Ссылки по теме

Persistence Configuration

How to persist messages during RabbitMQ broker restart?

Dabbling around Rabbit MQ persistence, durability & message routing

Configuring RabbitMQ Exchanges, Queues and Bindings: Part 1

Reasoning About Memory Use

Memory Alarms