Warning
|
Super Streams require RabbitMQ 3.11 or more. |
A super stream is a logical stream made of several individual streams. In essence, a super stream is a partitioned stream that brings scalability compared to a single stream.
The stream Java client uses the same programming model for super streams as with individual streams, that is the Producer
, Consumer
, Message
, etc API are still valid when super streams are in use.
Application code should not be impacted whether it uses individual or super streams.
Consuming applications can use super streams and single active consumer at the same time. The 2 features combined make sure only one consumer instance consumes from an individual stream at a time. In this configuration, super streams provide scalability and single active consumer provides the guarantee that messages of an individual stream are processed in order.
Warning
|
Super streams do not deprecate streams
Super streams are a partitioning solution. They are not meant to replace individual streams, they sit on top of them to handle some use cases in a better way. If the stream data is likely to be large – hundreds of gigabytes or even terabytes, size remains relative – and even presents an obvious partition key (e.g. country), a super stream can be appropriate. It can help to cope with the data size and to take advantage of data locality for some processing use cases. Remember that partitioning always comes with complexity though, even if the implementation of super streams strives to make it as transparent as possible for the application developer. |
A super stream is made of several individual streams, so it can be considered a logical entity rather than an actual physical entity. The topology of a super stream is based on the AMQP 0.9.1 model, that is exchange, queues, and bindings between them. This does not mean AMQP resources are used to transport or store stream messages, it means that they are used to describe the super stream topology, that is the streams it is made of.
Let’s take the example of an invoices
super stream made of 3 streams (i.e. partitions):
-
an
invoices
exchange represents the super stream -
the
invoices-0
,invoices-1
,invoices-2
streams are the partitions of the super stream (streams are also AMQP queues in RabbitMQ) -
3 bindings between the exchange and the streams link the super stream to its partitions and represent routing rules
0 +------------+ +----->+ invoices–0 | | +------------+ +----------+ | | invoices | | 1 +------------+ | +---+----->+ invoices–1 | | exchange | | +------------+ +----------+ | | 2 +------------+ +----->+ invoices–2 | +------------+
When a super stream is in use, the stream Java client queries this information to find out about the partitions of a super stream and the routing rules. From the application code point of view, using a super stream is mostly configuration-based. Some logic must also be provided to extract routing information from messages.
It is possible to manage super streams with
-
the stream Java client, by using
Environment#streamCreator()
andEnvironment#deleteSuperStream(String)
-
the
add_super_stream
anddelete_super_stream
commands inrabbitmq-streams
(CLI) -
any AMQP 0.9.1 client library
The stream Java client and the dedicated CLI commands are easier to use as they take care of the topology details (exchange, streams, and bindings).
Here is how to create an invoices
super stream with 5 partitions:
link:../../test/java/com/rabbitmq/stream/docs/SuperStreamUsage.java[role=include]
The super stream partitions will be invoices-0
, invoices-1
, …, invoices-5
.
We use this kind of topology when routing keys of outbound messages are hashed to pick the partition to publish them to.
This way, if the routing key is the customer ID of the invoice, all the invoices for a given customer end up in the same partition, and they can be processed in the publishing order.
It is also possible to specify binding keys when creating a super stream:
link:../../test/java/com/rabbitmq/stream/docs/SuperStreamUsage.java[role=include]
The super stream partitions will be invoices-amer
, invoices-emea
and invoices-apac
in this case.
Using one type of topology or the other depends on the use cases, especially how messages are processed. See the next sections on publishing and consuming to find out more.
When the topology of a super stream like the one described above has been set, creating a producer for it is straightforward:
link:../../test/java/com/rabbitmq/stream/docs/SuperStreamUsage.java[role=include]
-
Set the super stream name
-
Provide the logic to get the routing key from a message
-
Create the producer instance
-
Close the producer when it’s no longer necessary
Note that even though the invoices
super stream is not an actual stream, its name must be used to declare the producer.
Internally the client will figure out the streams that compose the super stream.
The application code must provide the logic to extract a routing key from a message as a Function<Message, String>
.
The client will hash the routing key to determine the stream to send the message to (using partition list and a modulo operation).
The client uses 32-bit MurmurHash3 by default to hash the routing key. This hash function provides good uniformity, performance, and portability, making it a good default choice, but it is possible to specify a custom hash function:
link:../../test/java/com/rabbitmq/stream/docs/SuperStreamUsage.java[role=include]
-
Use
String#hashCode()
to hash the routing key
Note using Java’s hashCode()
method is a debatable choice as potential producers in other languages are unlikely to implement it, making the routing different between producers in different languages.
Hashing the routing key to pick a partition is only one way to route messages to the appropriate streams. The stream Java client provides another way to resolve streams, based on the routing key and the bindings between the super stream exchange and the streams.
This routing strategy makes sense when the partitioning has a business meaning, e.g. with a partition for a region in the world, like in the diagram below:
amer +---------------+ +------>+ invoices–amer | | +---------------+ +----------+ | | invoices | | emea +---------------+ | +---+------>+ invoices–emea | | exchange | | +---------------+ +----------+ | | apac +---------------+ +------>+ invoices–apac | +---------------+
In such a case, the routing key will be a property of the message that represents the region:
link:../../test/java/com/rabbitmq/stream/docs/SuperStreamUsage.java[role=include]
-
Extract the routing key
-
Enable the "key" routing strategy
Internally the client will query the broker to resolve the destination streams for a given routing key, making the routing logic from any exchange type available to streams. Note the client caches results, it does not query the broker for every message.
The solution that provides the most control over routing is using a custom routing strategy. This should be needed only for specific cases.
Here is an excerpt of the RoutingStrategy
interface:
public interface RoutingStrategy {
/** Where to route a message. */
List<String> route(Message message, Metadata metadata);
/** Metadata on the super stream. */
interface Metadata {
List<String> partitions();
List<String> route(String routingKey);
}
}
Note it is possible to route a message to several streams or even nowhere. The "hash" routing strategy always routes to 1 stream and the "key" routing strategy can route to several streams.
The following code sample shows how to implement a simplistic round-robin RoutingStrategy
and use it in the producer.
Note this implementation should not be used in production as the modulo operation is not sign-safe for simplicity’s sake.
link:../../test/java/com/rabbitmq/stream/docs/SuperStreamUsage.java[role=include]
-
No need to set the routing key extraction logic
-
Set the custom routing strategy
Deduplication for a super stream producer works the same way as with a single stream producer. The publishing ID values are spread across the streams but this does affect the mechanism.
A super stream consumer is a composite consumer: it will look up the super stream partitions and create a consumer for each or them.
The programming model is the same as with regular consumers for the application developer: their main job is to provide the application code to process messages, that is a MessageHandler
instance.
The configuration is different though and this section covers its subtleties.
But let’s focus on the behavior of a super stream consumer first.
Imagine you have a super stream made of 3 partitions (individual streams).
You start an instance of your application, that itself creates a super stream consumer for this super stream.
The super stream consumer will create 3 consumers internally, one for each partition, and messages will flow in your MessageHandler
.
Imagine now that you start another instance of your application. It will do the exact same thing as previously and the 2 instances will process the exact same messages in parallel. This may be not what you want: the messages will be processed twice!
Having one instance of your application may be enough: the data are spread across several streams automatically and the messages from the different partitions are processed in parallel from a single OS process.
But if you want to scale the processing across several OS processes (or bare-metal machines, or virtual machines) and you don’t want your messages to be processed several times as illustrated above, you’ll have to enable the single active consumer feature on your super stream consumer.
The next subsections cover the basic settings of a super stream consumer and a dedicated section covers how super stream consumers and single active consumer play together.
Declaring a super stream consumer is not much different from declaring a single stream consumer.
The ConsumerBuilder#superStream(String)
must be used to set the super stream to consume from:
link:../../test/java/com/rabbitmq/stream/docs/SuperStreamUsage.java[role=include]
-
Set the super stream name
-
Close the consumer when it is no longer necessary
That’s all. The super stream consumer will take of the details (partitions lookup, coordination of the single consumers, etc).
The semantic of offset tracking for a super stream consumer are roughly the same as for an individual stream consumer. There are still some subtle differences, so a good understanding of offset tracking in general and of the automatic and manual offset tracking strategies is recommended.
Here are the main differences for the automatic/manual offset tracking strategies between single and super stream consuming:
-
automatic offset tracking: internally, the client divides the
messageCountBeforeStorage
setting by the number of partitions for each individual consumer. Imagine a 3-partition super stream,messageCountBeforeStorage
set to 10,000, and 10,000 messages coming in, perfectly balanced across the partitions (that is about 3,333 messages for each partition). In this case, the automatic offset tracking strategy will not kick in, because the expected count message has not been reached on any partition. Making the client dividemessageCountBeforeStorage
by the number of partitions can be considered "more accurate" if the message are well balanced across the partitions. A good rule of thumb is to then multiply the expected per-streammessageCountBeforeStorage
by the number of partitions, to avoid storing offsets too often. So the default being 10,000, it can be set to 30,000 for a 3-partition super stream. -
manual offset tracking: the
MessageHandler.Context#storeOffset()
method must be used, theConsumer#store(long)
will fail, because an offset value has a meaning only in one stream, not in other streams. A call toMessageHandler.Context#storeOffset()
will store the current message offset in its stream, but also the offset of the last dispatched message for the other streams of the super stream.
Warning
|
Single Active Consumer requires RabbitMQ 3.11 or more. |
As stated previously, super stream consumers and single active consumer provide scalability and the guarantee that messages of an individual stream are processed in order.
Let’s take an example with a 3-partition super stream:
-
You have an application that creates a super stream consumer instance with single active consumer enabled.
-
You start 3 instances of this application. An instance in this case is a JVM process, which can be in a Docker container, a virtual machine, or a bare-metal server.
-
As the super stream has 3 partitions, each application instance will create a super stream consumer that maintains internally 3 consumer instances. That is 9 Java instances of consumer overall. Such a super stream consumer is a composite consumer.
-
The broker and the different application instances coordinate so that only 1 consumer instance for a given partition receives messages at a time. So among these 9 consumer instances, only 3 are actually active, the other ones are idle or inactive.
-
If one of the application instances stops, the broker will rebalance its active consumer to one of the other instances.
The following figure illustrates how the client library supports the combination of the super stream and single active consumer features. It uses a composite consumer that creates an individual consumer for each partition of the super stream. If there is only one single active consumer instance with a given name for a super stream, each individual consumer is active.
+--------------------+ | | |cGRE invoices–0 | | | +-------------------+ +--------------------+ |+-----------------+| |+cGRE consumer ||Active |+-----------------+| invoices +--------------------+ | | | | |+-----------------+| |cPNK invoices–1 | |+cPNK consumer ||Active | | |+-----------------+| super stream +--------------------+ | | |+-----------------+| |+cBLU consumer ||Active +--------------------+ |+-----------------+| | | +-------------------+ |cBLU invoices–2 | Composite Consumer | | +--------------------+
Imagine now we start 3 instances of the consuming application to scale out the processing. The individual consumer instances spread out across the super stream partitions and only one is active for each partition, as illustrated in the following figure:
+-------------------+ |+-----------------+| |+cGRE consumer ||Active |+-----------------+| | | |+-----------------+| |+cPNK consumer ||Inactive |+-----------------+| | | |+-----------------+| |+cBLU consumer ||Inactive |+-----------------+| +--------------------+ +-------------------+ | | Composite Consumer |cGRE invoices–0 | | | +-------------------+ +--------------------+ |+-----------------+| |+cGRE consumer ||Inactive |+-----------------+| invoices +--------------------+ | | | | |+-----------------+| |cPNK invoices–1 | |+cPNK consumer ||Active | | |+-----------------+| super stream +--------------------+ | | |+-----------------+| |+cBLU consumer ||Inactive +--------------------+ |+-----------------+| | | +-------------------+ |cBLU invoices–2 | Composite Consumer | | +--------------------+ +-------------------+ |+-----------------+| |+cGRE consumer ||Inactive |+-----------------+| | | |+-----------------+| |+cPNK consumer ||Inactive |+-----------------+| | | |+-----------------+| |+cBLU consumer ||Active |+-----------------+| +-------------------+ Composite Consumer
After this overview, let’s see the API and the configuration details.
The following snippet shows how to declare a single active consumer on a super stream with the ConsumerBuilder#superStream(String)
and ConsumerBuilder#singleActiveConsumer()
methods:
link:../../test/java/com/rabbitmq/stream/docs/SuperStreamUsage.java[role=include]
-
Set the super stream name
-
Set the consumer name (mandatory to enable single active consumer)
-
Enable single active consumer
Note it is mandatory to specify a name for the consumer. This name will be used to identify the group of consumer instances and make sure only one is active for each partition. The name is also the reference for offset tracking.
The example above uses by default automatic offset tracking. With this strategy, the client library takes care of offset tracking when consumers become active or inactive. It looks up the latest stored offset when a consumer becomes active to start consuming at the appropriate offset and it stores the last dispatched offset when a consumer becomes inactive.
The story is not the same with manual offset tracking as the client library does not know which offset it should store when a consumer becomes inactive.
The application developer can use the ConsumerUpdateListener)
callback to react appropriately when a consumer changes state.
The following snippet illustrates the use of the ConsumerUpdateListener
callback:
link:../../test/java/com/rabbitmq/stream/docs/SuperStreamUsage.java[role=include]
-
Set the super stream name
-
Set the consumer name (mandatory to enable single active consumer)
-
Enable single active consumer
-
Enable manual offset tracking strategy
-
Set
ConsumerUpdateListener
-
Return stored offset + 1 or default when consumer becomes active
-
Store last processed offset for the stream when consumer becomes inactive
-
Store the current offset on some condition
The ConsumerUpdateListener
callback must return the offset to start consuming from when a consumer becomes active.
This is what the code above does: it checks if the consumer is active with ConsumerUpdateListener.Context#isActive()
and looks up the last stored offset.
If there is no stored offset yet, it returns a default value, OffsetSpecification#next()
here.
When a consumer becomes inactive, it should store the last processed offset, as another consumer instance will take over elsewhere. It is expected this other consumer runs the exact same code, so it will execute the same sequence when it becomes active (looking up the stored offset, returning the value + 1).
Note the ConsumerUpdateListener
is called for a partition, that is an individual stream.
The application code should take care of maintaining a reference of the last processed offset for each partition of the super stream, e.g. with a Map<String, Long>
(partition-to-offset map).
To do so, the context
parameter of the MessageHandler
and ConsumerUpdateListener
callbacks provide a stream()
method.
RabbitMQ Stream provides server-side offset tracking, but it is possible to use an external store to track offsets for streams.
The ConsumerUpdateListener
callback is still your friend in this case.
The following snippet shows how to leverage it when an external store is in use:
link:../../test/java/com/rabbitmq/stream/docs/SuperStreamUsage.java[role=include]
-
Set the super stream name
-
Set the consumer name (mandatory to enable single active consumer)
-
Enable single active consumer
-
Disable server-side offset tracking
-
Set
ConsumerUpdateListener
-
Use external store for stored offset when consumer becomes active
-
Assume offset already stored when consumer becomes inactive
-
Use external store for offset tracking
Here are the takeaway points of this code:
-
Even though there is no server-side offset tracking to use it, the consumer must still have a name to identify the group it belongs to. The external offset tracking mechanism is free to use the same name or not.
-
Calling
ConsumerBuilder#noTrackingStrategy()
is necessary to disable server-side offset tracking, or the automatic tracking strategy will kick in. -
The snippet does not provide the details, but the offset tracking mechanism seems to store the offset for each message. The external store must be able to cope with the message rate in a real-world scenario.
-
The
ConsumerUpdateListener
callback returns the last stored offset + 1 when the consumer becomes active. This way the broker will resume the dispatching at this location in the stream. -
A well-behaved
ConsumerUpdateListener
must make sure the last processed offset is stored when the consumer becomes inactive, so that the consumer that will take over can look up the offset and resume consuming at the right location. OurConsumerUpdateListener
does not do anything when the consumer becomes inactive (it returnsnull
): it can afford this because the offset is stored for each message. Make sure to store the last processed offset when the consumer becomes inactive to avoid duplicates when the consumption resumes elsewhere.