Warning
|
Experimental
Super streams are an experimental feature, they are subject to change. |
A super stream is a logical stream made of several individual streams. In essence, a super stream is a partitioned stream that brings scalability compared to a single stream.
The stream Java client uses the same programming model for super streams as with individual streams, that is the Producer
, Consumer
, Message
, etc API are still valid when super streams are in use.
Application code should not be impacted whether it uses individual or super streams.
A super stream is made of several individual streams, so it can be considered a logical entity rather than an actual physical entity. The topology of a super stream is based on the AMQP 0.9.1 model, that is exchange, queues, and bindings between them. This does not mean AMQP resources are used to transport or store stream messages, it means that they are used to describe the super stream topology, that is the streams it is made of.
Let’s take the example of an invoices
super stream made of 3 streams (i.e. partitions):
-
an
invoices
exchange represents the super stream -
the
invoices-0
,invoices-1
,invoices-2
streams are the partitions of the super stream (streams are also AMQP queues in RabbitMQ) -
3 bindings between the exchange and the streams link the super stream to its partitions and represent routing rules
0 +------------+ +----->+ invoices–0 | | +------------+ +----------+ | | invoices | | 1 +------------+ | +---+----->+ invoices–1 | | exchange | | +------------+ +----------+ | | 2 +------------+ +----->+ invoices–2 | +------------+
When a super stream is in use, the stream Java client queries this information to find out about the partitions of a super stream and the routing rules. From the application code point of view, using a super stream is mostly configuration-based. Some logic must also be provided to extract routing information from messages.
When the topology of a super stream like the one described above has been set, creating a producer for it is straightforward:
link:../../test/java/com/rabbitmq/stream/docs/SuperStreamUsage.java[role=include]
-
Use the super stream name
-
Provide the logic to get the routing key from a message
-
Create the producer instance
-
Close the producer when it’s no longer necessary
Note that even though the invoices
super stream is not an actual stream, its name must be used to declare the producer.
Internally the client will figure out the streams that compose the super stream.
The application code must provide the logic to extract a routing key from a message as a Function<Message, String>
.
The client will hash the routing key to determine the stream to send the message to (using partition list and a modulo operation).
The client uses 32-bit MurmurHash3 by default to hash the routing key. This hash function provides good uniformity, performance, and portability, making it a good default choice, but it is possible to specify a custom hash function:
link:../../test/java/com/rabbitmq/stream/docs/SuperStreamUsage.java[role=include]
-
Use
String#hashCode()
to hash the routing key
Note using Java’s hashCode()
method is a debatable choice as potential producers in other languages are unlikely to implement it, making the routing different between producers in different languages.
Hashing the routing key to pick a partition is only one way to route messages to the appropriate streams. The stream Java client provides another way to resolve streams, based on the routing key and the bindings between the super stream exchange and the streams.
This routing strategy makes sense when the partitioning has a business meaning, e.g. with a partition for a region in the world, like in the diagram below:
amer +---------------+ +------>+ invoices–amer | | +---------------+ +----------+ | | invoices | | emea +---------------+ | +---+------>+ invoices–emea | | exchange | | +---------------+ +----------+ | | apac +---------------+ +------>+ invoices–apac | +---------------+
In such a case, the routing key will be a property of the message that represents the region:
link:../../test/java/com/rabbitmq/stream/docs/SuperStreamUsage.java[role=include]
-
Extract the routing key
-
Enable the "key" routing strategy
Internally the client will query the broker to resolve the destination streams for a given routing key, making the routing logic from any exchange type available to streams. Note the client caches results, it does not query the broker for every message.
The solution that provides the most control over routing is using a custom routing strategy. This should be needed only for specific cases.
The following code sample shows how to implement a simplistic round-robin RoutingStrategy
and use it in the producer.
Note this implementation should not be used in production as the modulo operation is not sign-safe for simplicity’s sake.
link:../../test/java/com/rabbitmq/stream/docs/SuperStreamUsage.java[role=include]
-
No need to set the routing key extraction logic
-
Set the custom routing strategy
Deduplication for a super stream producer works the same way as with a single stream producer. The publishing ID values are spread across the streams but this does affect the mechanism.
A super stream consumer is not much different from a single stream consumer.
The ConsumerBuilder#superStream(String)
must be used to set the super stream to consume from:
link:../../test/java/com/rabbitmq/stream/docs/SuperStreamUsage.java[role=include]
-
Set the super stream name
-
Close the consumer when it is no longer necessary
A super stream consumer is a composite consumer: it will look up the super stream partitions and create a consumer for each or them.
The semantic of offset tracking for a super stream consumer are roughly the same as for an individual stream consumer. There are still some subtle differences, so a good understanding of offset tracking in general and of the automatic and manual offset tracking strategies is recommended.
Here are the main differences for the automatic/manual offset tracking strategies between single and super stream consuming:
-
automatic offset tracking: internally, the client divides the
messageCountBeforeStorage
setting by the number of partitions for each individual consumer. Imagine a 3-partition super stream,messageCountBeforeStorage
set to 10,000, and 10,000 messages coming in, perfectly balanced across the partitions (that is about 3,333 messages for each partition). In this case, the automatic offset tracking strategy will not kick in, because the expected count message has not been reached on any partition. Making the client dividemessageCountBeforeStorage
by the number of partitions can be considered "more accurate" if the message are well balanced across the partitions. A good rule of thumb is to then multiply the expected per-streammessageCountBeforeStorage
by the number of partitions, to avoid storing offsets too often. So the default being 10,000, it can be set to 30,000 for a 3-partition super stream. -
manual offset tracking: the
MessageHandler.Context#storeOffset()
method must be used, theConsumer#store(long)
will fail, because an offset value has a meaning only in one stream, not in other streams. A call toMessageHandler.Context#storeOffset()
will store the current message offset in its stream, but also the offset of the last dispatched message for the other streams of the super stream.