In July 2021, streams were introduced to RabbitMQ, utilizing a new blazingly-fast protocol that can be used alongside AMQP 0.9.1. Streams offer an easier way to solve a number of problems in RabbitMQ, including large fan-outs, replay & time travel, and large logs, all with very high throughput (1 million messages per second on a 3-node cluster). Arnaud Cogoluègnes, Staff Engineer @ VMware introduced streams and how they are best used.
This talk was recorded at the RabbitMQ Summit 2021. The 4th edition of RabbitMQ Summit is taking place as a hybrid event, both in-person (CodeNode venue in London) and virtual, on 16th September 2022 and brings together some of the world’s biggest companies, using RabbitMQ, all in one place.
Streams: A New Type of Data Structure in RabbitMQ
Streams are a new data structure in RabbitMQ that open up a world of possibilities for new use cases. They model an append-only log, which is a big change from traditional RabbitMQ queues, as they have non-destructive consumer semantics. This means that when you read messages from a Stream, you don’t remove them, whereas, with queues, when a message is read from a queue, it is destroyed. This re-readable behaviour of RabbitMQ Streams is facilitated by the append-only log structure.
RabbitMQ also introduced a new protocol, the Stream protocol, which allows much faster message flow, however, you can access Streams through the traditional AMQP 0.9.1 protocol as well, which remains the most used protocol in RabbitMQ. They are also accessible through the other protocols that RabbitMQ supports, such as MQTT and STOMP.
Streams strong points
Streams have unique strengths that allow them to shine for some use cases. These include:
When you have multiple applications in your system needing to read the same messages, you have a fan-out architecture. Streams are great for fan-outs, thanks to their non-destructive consuming semantics, removing the need to copy the message inside RabbitMQ as many times as there are consumers.
Streams also offer replay and time-travelling capabilities. Consumers can attach anywhere in a stream, using an absolute offset or a timestamp, and they can read and re-read the same data as many times as needed.
Thanks to the new stream protocol, streams have the potential to be significantly faster than traditional queues. If you want high throughput or you are working with large messages, streams can often be a suitable option.
Streams are also good for large logs. Messages in streams are always persistent on the file system, and the messages don’t stay in the memory for long. Upon consumption, the operating system’s file cache is used to allow for fast message flow.
The Log Abstraction
A stream is immutable, you are able to add messages, but once a message has entered the stream, it cannot be removed. This makes the log abstraction of the stream quite a simple data structure compared to queues where messages are always added, and removed. This brings us to another important concept, the offset. The offset is just a technical index of a message inside the stream, or a timestamp. Consumers can instruct RabbitMQ to start reading from an offset instead of the beginning of the stream. This allows for easy time-travelling and replaying of messages. Consumers can also push the offset tracking responsibility to RabbitMQ.
We can have any number of consumers on a stream, they don’t compete with each other, one consuming application will not steal messages from the other applications, and the same application can read the stream of messages many times.
Queues can store messages in memory or on disk, they can be only on one node or mirrored, Streams are persistent and replicated at all times. When we create a stream, it’s going to have a leader located on one node, and replicas on other nodes. The replicas will follow the leader and synchronize the data. The leader is the only one that can create write operations and the replicas will only be used to serve consumers.
RabbitMQ Queues vs. Streams
Streams are here to complement queues and to expand the use cases for RabbitMQ. Traditional queues are still the best tool for the most common use-cases in RabbitMQ, but they do have their limitations, there are times when they are not the best fit.
Streams are, similarly to queues, a FIFO data structure, i.e.the oldest message published will be read first. Providing an offset lets the client skip the beginning of the stream but the messages will be read in the order of publishing.
In RabbitMQ you have a traditional queue with a couple of messages and a consuming application. After registering the consumer, the broker will start dispatching messages to the client, and the application can start processing.
When, at this point, the message is at an important point in its lifetime, it’s present on the sender’s side, and also on the consuming side. The broker still needs to care about the message because it can be rejected and it must know that it hasn’t been acknowledged yet. After the application finishes processing the message, it can acknowledge it and after this point the broker can get rid of the message and consider it processed. This is what we can call destructive consumption, and it is the behaviour of Classic and Quorum Queues. When using Streams, the message stays in the Stream as long as the retention policy allows for it.
Implementing massive fan-out setups with RabbitMQ was not optimal before Streams. You have a message come in, they go to an exchange, and they are routed to a queue. If you want another application to process the messages, you need to create a new queue, bind the queue to the exchange, and start consuming. This process creates a copy of the message for each application, and if you need yet another application to process the same messages, you need to repeat the process; so yet another queue, a new binding, new consumer, and a new copy of the message.
This method works, and it’s been used for years, but it doesn’t scale elegantly when you have many consumer applications. Streams provide a better way to implement this, as the messages can be read by each consumer separately, in order, from the Stream
RabbitMQ Stream Throughput using AMQP and the Stream Protocol
As explained in the talk, there was higher throughput with Streams compared to Quorum Queues. They got about 40,000 messages per second with Quorum Queues and 64,000 messages per second with Streams. This is because Streams are a simpler data structure than Quorum Queues, they don’t have to deal with complicated things like message acknowledgment, rejected messages, or requeuing.
Quorum Queues still are state-of-the-art replicated and persistent queues, Streams are for other use cases. When using the dedicated Stream protocol, throughputs of one million messages per second are achievable.
The Stream Protocol has been designed with performance in mind and it utilises low level techniques such as the sendfile libC API, OS page cache, and batching which makes it faster than AMQP Queues.
RabbitMQ Stream Plugin and Clients
Streams are available through a new plugin in the core distribution. When turned on, RabbitMQ will start listening on a new port which can be used by clients understanding the Stream Protocol. It’s integrated with the existing infrastructure that is present in RabbitMQ, such as the management UI, the REST API, Prometheus.
There is a dedicated stream Java and Go client using this new stream protocol. The Java client is the reference implementation. A tool for performance testing is also available. Clients for other languages are also actively worked on by the community and the core team.
The stream protocol is a bit simpler than AMQP; there’s no routing; you just publish to a stream, there’s no exchange involved, and you consume from a stream just like from a queue. No logic is needed to decide where the message should be routed. When you publish a message from your client applications, it goes to the network, then almost directly to storage.
There is excellent interoperability between streams and the rest of RabbitMQ. The messages can be consumed from an AMQP 0.9.1 client application and it also works the other way around.
Example Use Case for Interoperability
Queues and Streams live in the same namespace in RabbitMQ, therefore you can specify the name of the Stream you want to consume from using the usual AMQP clients and by using the x-stream-offset parameter for basicConsume.
It’s very easy to publish with AMQP clients because it’s the same as with Queues, you publish to an exchange.
Above is an example of how you can imagine using streams. You have a publisher publishing messages to an exchange, depending on the routing key of your messages, the message routed to different queues. So you have a queue for each region in the world. For example, you have a queue for Americas, you have a queue for Europe, you have a queue for Asia, and one for HQ, you have a dedicated consuming application that will do some specific processing for the region.
If you upgrade to RabbitMQ 3.9 or later, you can just create a stream, bind it to the exchange with a wild card so that all messages are still routed to the queues, but the stream gets all the messages. Then you can point an application using the Stream Protocol to this stream, and we can imagine that this application will do some worldwide analytics every day without even reading the stream very quickly. This is how we can imagine streams can fit into existing applications.
Guarantees for RabbitMQ Streams
Streams support at-least-once delivery, as they support a similar mechanism to AMQP Publish Confirms. There’s also a deduplication mechanism, the broker filtering out duplicate messages based on the publishing sequence number, such as a key in a database, or line number in a file.
On both sides, we have flow control, so fast publishers’ TCP connections will be blocked. The broker will only send messages to the client when it is ready to accept them.
Streams are a new replicated and persistent data structure in RabbitMQ, they model an append-only log. They are good for large fan-outs, support replay and time-travelling features, and they’re good for high throughput scenarios, and for large logs. They store their data on the file system and never in memory.
If you think Streams or RabbitMQ could be useful to you but don’t know where to start, talk to our experts, we’re always happy to help. If you want to see the latest features and case studies from the world of RabbitMQ, join us at RabbitMQ Summit 2022.