RabbitMQ Mirrored Queues Gotchas

Read more about the requirements and scripts for using RabbitMQ mirrored queues.

24 min read

Mirrored Queues are a popular RabbitMQ feature that provides High Availability (HA). HA, in this context simply means that RabbitMQ nodes in the cluster can fail and the queues will still be available for the clients.
However, the HA queues can lead to some unexpected behaviour in failure scenarios or when combined with specific queue properties. In this blog post, we share three examples of these unpredictable behaviours that we have come across in RabbitMQ. This blog will help us explain some of the intricacies of HA queues. In doing so, we’ll also demonstrate how one can analyze the behaviours of a RabbitMQ cluster on a laptop or a single machine using common tools. Thus the next chapter briefly discusses the requirements to do so and scripts in the assisting repository that allow us to test the presented cases.

Setup

If you want to reproduce the examples from the post you will need the following dependencies installed:

Make
Git
Docker
Python3
Pipenv
Rabbitmq-perf-test 2.8.0
Wireshark (optional)

All the scenarios are based on a 2-node cluster consisting of RabbitMQ Docker containers - rmq1 and rmq2 - running Rabbit in version 3.7.15. Both containers expose ports 5672 (AMQP) and 15672 (management plugin) which are mapped to 5672/15672 and 5673/15673 for rmq1 and rmq2 respectively. In other words, once you set up the cluster, AMQP port for rmq1 is available at amqp://localhost:5672 and the management interface at http://localhost:15672.

The cluster is started with make up and tore down with make down. The up command will start the containers, attach them to a rmq network and install the following policy:

To see the logs run make logs1 to attach the output of the rmq1 container. Also, Python scripts are in use, thus pipenv install and pipenv shell need to be run to install the Python dependencies and start a shell within the python virtualenv respectively.

Auto-delete property for an HA queue

A queue in RabbitMQ can have the auto-delete property set. A queue with this property will be deleted by the broker once the last consumer unsubscribes. But what does this mean in a distributed environment where consumers are connected to different nodes and queue slaves are promoted to masters on failure? Let’s explore this example by setting up an environment for testing. Run make up which will spawn and cluster the RabbitMQ containers. The command should finish with an output similar to the below:

Cluster status of node rabbit@rmq1 ...  
[{nodes,[{disc,[rabbit@rmq1,rabbit@rmq2]}]},  
{running_nodes,[rabbit@rmq2,rabbit@rmq1]},  
{cluster_name,<<"rabbit@rmq1">>},  
{partitions,[]}, {alarms,[{rabbit@rmq2,[]},{rabbit@rmq1,[]}]}]  

Now we want to create a Mirrored Queue with the master at node rmq2 and the slave at rmq1. The queue should have the auto-delete property set.

For this purpose, we will use the PerfTest tool that we will connect to the second node and make it act as a producer. It will create haq queue (which matches the policy), bind it to the direct exchange with the key routing key and start producing 1 message per second:

# producer at rmq2
perf_test/bin/runjava com.rabbitmq.perf.PerfTest \
--uri amqp://localhost:5673 \
--producers 1 \
--consumers 0 \
--rate 1 \
--queue haq \
--routing-key key \
--auto-delete true 

Throughout the example, we assume perf_test is installed in the ./perf_test directory. As the producer is running, the queue should appear in the management UI and messages should be piling up:

Now let’s connect a consumer to our queue. Again, PerfTest will be our tool of choice but this time it will be used as a consumer attached to the first node (rmq1):

# consumer at rmq1 
perf_test/bin/runjava com.rabbitmq.perf.PerfTest \ --uri amqp://localhost:5672 \
--producers 0 \ 
--consumers 1 \ 
--queue haq 

The perf_test output should reveal that the messages are flowing:

# consumer at rmq1  
id: test-114434-950, time: 73.704s, received: 1.00 msg/s, min/median/75th/95th/99th consumer latency: 0/0/0/0/0 μs  
# producer id: test-113752-488, time: 487.154s, sent: 1.0 msg/s    

At this point, we have the following setup:

two dogs look up at camera

Now, let’s see what using the auto-delete property on HA queues can lead to in a failure scenario. Let’s introduce some turbulence into the cluster and disconnect rmq2 from the network between the nodes:

make disconnect2 

Underneath a docker network disconnect command is run that detaches the node from the rmq network. This should result in the consumer draining all the messages (as the producer is at the disconnected node and no new messages arrive at the queue). After approximately 10s the nodes should report that they cannot see each other. Below is a log from rmq2 where all this happens:

2019-06-07 10:02:20.910 [error] <0.669.0> ** Node rabbit@rmq1 not responding ** ** Removing (timedout) connection ** 
2019-06-07 10:02:20.910 [info] <0.406.0> rabbit on node rabbit@rmq1 down 2019-06-07 10:02:20.919 [info] <0.406.0> Node rabbit@rmq1 is down, deleting its listeners 
2019-06-07 10:02:20.921 [info] <0.406.0> node rabbit@rmq1 down: net_tick_timeout 

But what happened to our queue? If you look at the management interface of rmq2 (the node with the producer, http://localhost:15673/#/queues, user/password is guest/guest) the queue is gone! This is because once the netsplit has been detected, no consumers were left at rmq2 and the queue got deleted. What’s more, all the messages that made it to the master queue on rmq2 before the netsplit was detected are lost – unless Publisher Confirms was in play. The Producer would simply not receive confirms for the messages that were not accepted by the mirror. The queue should still be present at rmq1:

two dogs look up at camera

If we switch our producer to rmq1 the message flow should go back to normal:

# producer at rmq2 switched to rmq1 perf_test/bin/runjava com.rabbitmq.perf.PerfTest \ -uri amqp://localhost:5672 \
--producers 1 \ 
--consumers 0 \ 
--rate 1 \ 
--queue haq \ 
--routing-key key \ 
--auto-delete true 

Also, note that our slave at rmq1 was promoted to master:

2019-06-07 10:02:22.740 [info] <0.1858.0> Mirrored queue 'haq' in vhost '/': Slave <rabbit@rmq1.3.1858.0> saw deaths of mirrors <rabbit@rmq2.1.1096.0>
2019-06-07 10:02:22.742 [info] <0.1858.0> Mirrored queue 'haq' in vhost '/': Promoting slave <rabbit@rmq1.3.1858.0> to master 

As we are left with one copy of the queue with one consumer, if that last consumer is disconnected, the entire queue will be lost. Long story short: if you set up your HA queue with the auto-delete property, then in a fail-over scenario replicas can be deleted because they may lose consumers connected through other nodes. Using either ignore or autoheal cluster partition handling will keep the replicas. In the next section, we discuss the same scenario but with consumer cancel notification in use. Before moving on to the next section, restart our environment: make down up.

The consumer is notified about the fail-over

Now have a look at a slightly different scenario in which a consumer specifies that it wants to be notified in case of a master fail-over. To do so, it sets the x-cancel-on-ha-failover to true when issuing basic.consume (more here).

It differs from the previous example in that the consumer will get the basic.cancel method from the server once it detects the queue master is gone. And now, the client can handle this method as it wishes. E.g. it can restart the consumption by issuing basic.consume again, it could even reopen the channel or reconnect before resuming the consumption. Even if it crashes, upon receiving the basic.cancel, the queue will not be deleted. Note, the consumption can be resumed only once a new master is elected. For the same reasons as the previous example, the replica at another node (rmq2) will be gone. You can test all these consumer behaviours with the consume_after_cancel.py script. Below we will go through a case where the consumer will resume consumption after getting the basic.cancel. At the beginning, set up the cluster with make up and a producer at rmq2 - just as we did in the previous section:

# producer at rmq2
perf_test/bin/runjava com.rabbitmq.perf.PerfTest \
--uri amqp://localhost:5673 \
--producers 1 \
--consumers 0 \
--rate 1 \
--queue haq \
--routing-key key \
--auto-delete true 

Next, connect our new consumer to rmq1 (remember to pipenv install and pipenv shell as mentioned in the Setup):

./consume_after_cancel.py --server localhost --port 5672 --after-cancel reconsume

When the messages start flowing (they look meaningless), disconnect the second node from the cluster:

make disconnect2  

That should stop our consumer for a few moments, but it will resume consumption – since there’s a netsplit there are no messages to consume. Thus, we need to move the producer to the 1st node:

# producer at rmq2 switched to rmq1 perf_test/bin/runjava com.rabbitmq.perf.PerfTest \ -uri amqp://localhost:5672 \
--producers 1 \ 
--consumers 0 \ 
--rate 1 \ 
--queue haq \ 
--routing-key key \ 
--auto-delete true  

At this point, the message will be flowing again.

When studying these corner cases Wireshark and its AMQP dissector comes in very handy as it is crystal clear what the client-server conversation looks like. In our particular case, this is what can be seen:

two dogs look up at camera

As a simple explanation: using the x-cancal-on-ha-failover gives a piece of extra information indicating that a fail-over has happened which can be acted upon appropriately – e.g. to automatically subscribe to the new master. We are then left with one consumer, and all the replicas without consumers being cleared.

HA queues with automatic synchronization

If we take a closer look at our HA policy (see the Setup section) it specifies the queue synchronization as automatic:

"ha-sync-mode":"automatic"

It means that when a node joins a cluster (or rejoins after a restart or being partitioned) and a mirror is to be installed on it, all the messages will be copied to it from the master. The good thing about this is that the new mirror has all the messages straight after joining, which increases data safety. However, it comes with a cost: when a queue is being synchronized, all the operations on the queue are blocked. It means that for example, a publisher cannot publish and a consumer cannot consume. Let’s have a look at an example in which we publish 200K messages to the haq queue, then attach a slow consumer and finally restart one node. As the policy specifies that the queue is to have 2 replicas with automatic synchronization, the restart will trigger messages to be copied to keep the replicas in sync. Also, the "ha-sync- batch-size":2 set in the policy indicates that RabbitMQ will synchronize 2 messages at a time – to slow down the synchronization and exaggerate “the blocking effect”. Run make up to setup the cluster. Then publish the messages:

# 10*20K = 200K 
./perf_test/bin/runjava com.rabbitmq.perf.PerfTest \
--uri amqp://localhost:5672 \
--producers 10 \
--consumers 0 \
--queue haq \
--pmessages 20000 \
--auto-delete false \
--rate 1000  

Check the queue is filled up (http://localhost:15672/#/queues)

two dogs look up at camera

When it’s ready we can start the slow consumer which will process 10 msgs/s with a prefetch count of 10:

./perf_test/bin/runjava com.rabbitmq.perf.PerfTest \
--uri amqp://localhost:5672 \
--producers 0 \
--consumers 1 \
--queue haq \
--predeclared \
--qos 10 \
--consumer-rate 10 

Then restart the rmq2 (make restart2) and observe both the consumer’s logs and the rmq1 logs. In the consumers’ logs you should see some slowdown in receiving of messages:

id: test-145129-551, time: 20.960s, received: 10.0 msg/s, min/median/75th/95th/99th consumer latency: 0/0/0/0/0 μs
id: test-145129-551, time: 21.960s, received: 10 msg/s, min/median/75th/95th/99th consumer latency: 0/0/0/0/0 μs
id: test-145129-551, time: 23.058s, received: 10 msg/s, min/median/75th/95th/99th consumer latency: 0/0/0/0/0 μs
--> id: test-145129-551, time: 31.588s, received: 0.82 msg/s, min/median/75th/95th/99th consumer latency: 0/0/0/0/0 μs 
id: test-145129-551, time: 32.660s, received: 83 msg/s, min/median/75th/95th/99th consumer latency: 0/0/0/0/0 μs 
id: test-145129-551, time: 33.758s, received: 10 msg/s, min/median/75th/95th/99th consumer latency: 0/0/0/0/0 μs 
id: test-145129-551, time: 34.858s, received: 10 msg/s, min/median/75th/95th/99th consumer latency: 0/0/0/0/0 μs  

The marked line shows that the consumer was waiting for nearly 10 seconds for a message. And the RabbitMQ logs can explain why:

2019-06-04 13:36:33.076 [info] <0.1557.0> Mirrored queue 'haq' in vhost '/': Synchronising: 200000 messages to synchronise 2019-06-04 13:36:33.076 [info] <0.1557.0> Mirrored queue 'haq' in vhost '/': Synchronising: batch size: 2 
2019-06-04 13:36:33.079 [info] <0.3039.0> Mirrored queue 'haq' in vhost '/': Synchronising: mirrors [rabbit@rmq2] to sync 2019-06-04 13:36:34.079 [info] <0.1557.0> Mirrored queue 'haq' in vhost '/': Synchronising: 22422 messages 
2019-06-04 13:36:35.079 [info] <0.1557.0> Mirrored queue 'haq' in vhost '/': Synchronising: 43346 messages 2019-06-04 13:36:36.080 [info] <0.1557.0> Mirrored queue 'haq' in vhost '/': Synchronising: 64512 messages 
2019-06-04 13:36:37.084 [info] <0.1557.0> Mirrored queue 'haq' in vhost '/': Synchronising: 82404 messages 2019-06-04 13:36:38.095 [info] <0.1557.0> Mirrored queue 'haq' in vhost '/': Synchronising: 104404 messages 
2019-06-04 13:36:39.095 [info] <0.1557.0> Mirrored queue 'haq' in vhost '/': Synchronising: 128686 messages 2019-06-04 13:36:40.095 [info] <0.1557.0> Mirrored queue 'haq' in vhost '/': Synchronising: 147580 messages 
2019-06-04 13:36:41.096 [info] <0.1557.0> Mirrored queue 'haq' in vhost '/': Synchronising: 164498 messages 2019-06-04 13:36:42.096 [info] <0.1557.0> Mirrored queue 'haq' in vhost '/': Synchronising: 182408 messages 
2019-06-04 13:36:42.961 [info] <0.1557.0> Mirrored queue 'haq' in vhost '/': Synchronising: complete 

If we compare timestamps of the first line, indicating the start of the synchronization, with the last line, indicating the end of the synchronization, we can see it took nearly 10 seconds. And this is exactly when the queue was blocked. In real life, obviously, we would not have the ha-sync-batch-size set to 2 – it is by default set to 4096. But what happens in real life is that there can be multiple queues, longer than 200K, with larger messages. All of that can contribute to a high volume of traffic and prolonged synchronization time resulting in queues that remain blocked for longer. The takeaway is: make sure you are in control of the length of your queues if automatic synchronization is in use. This can be achieved by limiting the length of queues or setting TTLs for messages.

Delayed Publisher Confirms

If you’re concerned about data safety, you probably need Publisher Confirms. This mechanism makes the broker send an ACK for each successfully accepted message. But what does successfully accepted mean? As per the documentation:

For routable messages, the basic.ack is sent when a message has been accepted by all the queues. For persistent messages routed to durable queues, this means persisting to disk. For mirrored queues, this means that all mirrors have accepted the message.

As we are discussing HA queues, the statement saying that an ACK is sent when ALL the mirrors have accepted the message is the interesting one. To see how this can cause complications, let’s consider the following setup based on the RPC messaging pattern:

two dogs look up at camera

We have a server that subscribes to haq_jobs queue on which it expects messages with integers in their body. Once it gets a message, it computes a Fibonacci number for that integer and sends the reply back to the reply queue stores it in the reply_to property of the original message. The client in turn (denoted as P/C in the picture), when invoked, will send an integer to the haq_jobs queue and wait for the reply on the temporary queue. It opens two connections: one for publishing the request and one for getting the response. The reply queue is exclusive as it is meant to be used by just one consumer. What is more, the message is sent in a separate thread which then waits for the ACK. In other words, sending a message and waiting for the reply are independent thread-wise.

Now we’re ready to start the cluster: make up. Then in a separate shell run the server: /fib_server.py. Finally, try the client:

./fib_client.py 7 
[2019-06-06 14:20:37] Press any key to proceed... 
[2019-06-06 14:20:37] About to publish request='7' 
[2019-06-06 14:20:37] Message ACKed
[2019-06-06 14:20:37] Got response=13
./fib_client.py 7 
[2019-06-06 14:20:39] Press any key to proceed... 
[2019-06-06 14:20:39] About to publish request='7' 
[2019-06-06 14:20:39] Got response=13 
[2019-06-06 14:20:39] Message ACKed 

As you may have noticed, in the second invocation of the client, we got the first response and then the ACK for the request. That may look weird, but this is due to the fact that the confirms are sent asynchronously once all the mirrors accept the message. In this particular case, it turned out that the Fibonacci server and RabbitMQ were quicker to produce the result and deliver it than the cluster to replicate the request and send the confirmation. What can we conclude from this? Never rely on the confirm to come before the reply in the RPC (Remote Procedure Call) pattern – although they should come very close to each other.

The last statement, that the ACK and the reply come shorty after one another, doesn’t hold true in a failure scenario where the network/node is broken. To simulate that, let’s clean the environment with make down and set the net_ticktime to 120s to make Rabbit detect net-splits slower. This will give us time to perform the actions after we break the network. It has been set to 10s in the previous experiments to make them run faster, specifically to make the net-split detection run fast. Edit the conf/advanced.config as follows: This is needed for the second experiment in this section as it gives us time to perform the required actions; it doesn’t affect the first experiment. It has been initially set to 10 so that we could go through the tests in the previous sections fast. Modify the conf/advanced.config so that the option is set to 120s instead of 10:

[ 
 {kernel, [{net_ticktime, 120}]}
]. 

Now we can run the cluster, make up, and start the server and client as previously. But leave the clients at the request to press a key and stop for a moment to reflect on the current state of the system: *we have the Fibonacci server connected and waiting for the requests, *we have 2 connections opened from the client: one to publish and one to consume, *all the queues are set up.

Now run make disconnect2 to disconnect rmq2 from the cluster. At this point, you have between 90 to 150 seconds (as the net_ticktime is set to 120s) to press any button to publish the message. You should see that the server will process it and you will get the response but the ACK for the request won’t be delivered until you recover the net-split with make connect2 (which you have to do within the given time-range).

[2019-06-06 14:21:32] Press any key to proceed... [2019-06-06 14:21:38] About to publish request='7' [2019-06-06 14:21:38] Got response=13
[2019-06-06 14:22:05] Message ACKed 
In the above listing, the ACK arrived ~30s after the actual response due to the net-split. 

Remember: one cannot expect that a message will be ACKed by the broker before the reply arrives on the reply queue (RPC pattern) – as we’ve just seen the ACK can get stuck due to some turbulence on the network.

The importance of Mirror Queues

Undoubtedly Mirror Queues provide an important feature of RabbitMQ – which is High Availability. In simple words: depending on the configuration, you will always have more than one copy of the queue and its data. As a result, you are ready for a node/network failure which will inevitably happen in a distributed system. Nonetheless, as illustrated in this blog post, in some specific failure scenarios or configuration, you may observe unexpected behaviours or even data/queue loss. Thus it’s important to test your system against potential failures and scenarios which are not (clearly) covered by the documentation. If you’re interested in learning more about HA in RabbitMQ, talk to us at the RabbitMQ Summit in London or contact us today for a RabbitMQ health check.

Monitor your Erlang, Elixir and RabbitMQ systems

Are you currently using Erlang, Elixir or RabbitMQ in your stack? Get full visibility of your system with WombatOAM, find out more and get a free trial on our WombatOAM page. For more information on High Availability in RabbitMQ, watch this video from our Erlang Meetup in Krakow.

Please accept marketing-cookies to watch this video.

Our work with RabbitMQ.

Keep reading

RabbitMQ’s Anatomy – Understanding Topic Exchanges

Let’s take a look at Topic Exchanges .

ex_rabbit_pool open source AMQP connection pool

How and why we created Buildex.

Erlang, Elixir, RabbitMQ preview image

RabbitMQ vs Kafka

RabbitMQ and Apache Kafka are two of the most popular messaging brokers available. In this webinar we compare the two and explore which use cases each is best suited towards.