Skip to content

Latest commit

 

History

History
456 lines (359 loc) · 25.5 KB

super-streams.adoc

File metadata and controls

456 lines (359 loc) · 25.5 KB

Super Streams (Partitioned Streams)

Warning
Super Streams require RabbitMQ 3.11 or more.

A super stream is a logical stream made of several individual streams. In essence, a super stream is a partitioned stream that brings scalability compared to a single stream.

The stream Java client uses the same programming model for super streams as with individual streams, that is the Producer, Consumer, Message, etc API are still valid when super streams are in use. Application code should not be impacted whether it uses individual or super streams.

Consuming applications can use super streams and single active consumer at the same time. The 2 features combined make sure only one consumer instance consumes from an individual stream at a time. In this configuration, super streams provide scalability and single active consumer provides the guarantee that messages of an individual stream are processed in order.

Warning
Super streams do not deprecate streams

Super streams are a partitioning solution. They are not meant to replace individual streams, they sit on top of them to handle some use cases in a better way. If the stream data is likely to be large – hundreds of gigabytes or even terabytes, size remains relative – and even presents an obvious partition key (e.g. country), a super stream can be appropriate. It can help to cope with the data size and to take advantage of data locality for some processing use cases. Remember that partitioning always comes with complexity though, even if the implementation of super streams strives to make it as transparent as possible for the application developer.

Topology

A super stream is made of several individual streams, so it can be considered a logical entity rather than an actual physical entity. The topology of a super stream is based on the AMQP 0.9.1 model, that is exchange, queues, and bindings between them. This does not mean AMQP resources are used to transport or store stream messages, it means that they are used to describe the super stream topology, that is the streams it is made of.

Let’s take the example of an invoices super stream made of 3 streams (i.e. partitions):

  • an invoices exchange represents the super stream

  • the invoices-0, invoices-1, invoices-2 streams are the partitions of the super stream (streams are also AMQP queues in RabbitMQ)

  • 3 bindings between the exchange and the streams link the super stream to its partitions and represent routing rules

The topology of a super stream is defined with bindings between an exchange and queues
                 0    +------------+
               +----->+ invoices–0 |
               |      +------------+
+----------+   |
| invoices |   | 1    +------------+
|          +---+----->+ invoices–1 |
| exchange |   |      +------------+
+----------+   |
               | 2    +------------+
               +----->+ invoices–2 |
                      +------------+

When a super stream is in use, the stream Java client queries this information to find out about the partitions of a super stream and the routing rules. From the application code point of view, using a super stream is mostly configuration-based. Some logic must also be provided to extract routing information from messages.

Super Stream Creation and Deletion

It is possible to manage super streams with

  • the stream Java client, by using Environment#streamCreator() and Environment#deleteSuperStream(String)

  • the add_super_stream and delete_super_stream commands in rabbitmq-streams (CLI)

  • any AMQP 0.9.1 client library

  • the management plugin

The stream Java client and the dedicated CLI commands are easier to use as they take care of the topology details (exchange, streams, and bindings).

With the Client Library

Here is how to create an invoices super stream with 5 partitions:

Creating a super stream by specifying the number of partitions
link:../../test/java/com/rabbitmq/stream/docs/SuperStreamUsage.java[role=include]

The super stream partitions will be invoices-0, invoices-1, …​, invoices-5. We use this kind of topology when routing keys of outbound messages are hashed to pick the partition to publish them to. This way, if the routing key is the customer ID of the invoice, all the invoices for a given customer end up in the same partition, and they can be processed in the publishing order.

It is also possible to specify binding keys when creating a super stream:

Creating a super stream by specifying the binding keys
link:../../test/java/com/rabbitmq/stream/docs/SuperStreamUsage.java[role=include]

The super stream partitions will be invoices-amer, invoices-emea and invoices-apac in this case.

Using one type of topology or the other depends on the use cases, especially how messages are processed. See the next sections on publishing and consuming to find out more.

With the CLI

Here is how to create an invoices super stream with 5 partitions:

Creating a super stream from the CLI
rabbitmq-streams add_super_stream invoices --partitions 5

Use rabbitmq-streams add_super_stream --help to learn more about the command.

Publishing to a Super Stream

When the topology of a super stream like the one described above has been set, creating a producer for it is straightforward:

Creating a Producer for a Super Stream
link:../../test/java/com/rabbitmq/stream/docs/SuperStreamUsage.java[role=include]
  1. Set the super stream name

  2. Provide the logic to get the routing key from a message

  3. Create the producer instance

  4. Close the producer when it’s no longer necessary

Note that even though the invoices super stream is not an actual stream, its name must be used to declare the producer. Internally the client will figure out the streams that compose the super stream. The application code must provide the logic to extract a routing key from a message as a Function<Message, String>. The client will hash the routing key to determine the stream to send the message to (using partition list and a modulo operation).

The client uses 32-bit MurmurHash3 by default to hash the routing key. This hash function provides good uniformity, performance, and portability, making it a good default choice, but it is possible to specify a custom hash function:

Specifying a custom hash function
link:../../test/java/com/rabbitmq/stream/docs/SuperStreamUsage.java[role=include]
  1. Use String#hashCode() to hash the routing key

Note using Java’s hashCode() method is a debatable choice as potential producers in other languages are unlikely to implement it, making the routing different between producers in different languages.

Resolving Routes with Bindings

Hashing the routing key to pick a partition is only one way to route messages to the appropriate streams. The stream Java client provides another way to resolve streams, based on the routing key and the bindings between the super stream exchange and the streams.

This routing strategy makes sense when the partitioning has a business meaning, e.g. with a partition for a region in the world, like in the diagram below:

A super stream with a partition for a region in a world
                 amer  +---------------+
               +------>+ invoices–amer |
               |       +---------------+
+----------+   |
| invoices |   | emea  +---------------+
|          +---+------>+ invoices–emea |
| exchange |   |       +---------------+
+----------+   |
               | apac  +---------------+
               +------>+ invoices–apac |
                       +---------------+

In such a case, the routing key will be a property of the message that represents the region:

Enabling the "key" routing strategy
link:../../test/java/com/rabbitmq/stream/docs/SuperStreamUsage.java[role=include]
  1. Extract the routing key

  2. Enable the "key" routing strategy

Internally the client will query the broker to resolve the destination streams for a given routing key, making the routing logic from any exchange type available to streams. Note the client caches results, it does not query the broker for every message.

Using a Custom Routing Strategy

The solution that provides the most control over routing is using a custom routing strategy. This should be needed only for specific cases.

Here is an excerpt of the RoutingStrategy interface:

The routing strategy interface
public interface RoutingStrategy {

  /** Where to route a message. */
  List<String> route(Message message, Metadata metadata);

  /** Metadata on the super stream. */
  interface Metadata {

    List<String> partitions();

    List<String> route(String routingKey);
  }
}

Note it is possible to route a message to several streams or even nowhere. The "hash" routing strategy always routes to 1 stream and the "key" routing strategy can route to several streams.

The following code sample shows how to implement a simplistic round-robin RoutingStrategy and use it in the producer. Note this implementation should not be used in production as the modulo operation is not sign-safe for simplicity’s sake.

Setting a round-robin routing strategy
link:../../test/java/com/rabbitmq/stream/docs/SuperStreamUsage.java[role=include]
  1. No need to set the routing key extraction logic

  2. Set the custom routing strategy

Deduplication

Deduplication for a super stream producer works the same way as with a single stream producer. The publishing ID values are spread across the streams but this does affect the mechanism.

Consuming From a Super Stream

A super stream consumer is a composite consumer: it will look up the super stream partitions and create a consumer for each or them. The programming model is the same as with regular consumers for the application developer: their main job is to provide the application code to process messages, that is a MessageHandler instance. The configuration is different though and this section covers its subtleties. But let’s focus on the behavior of a super stream consumer first.

Super Stream Consumer in Practice

Imagine you have a super stream made of 3 partitions (individual streams). You start an instance of your application, that itself creates a super stream consumer for this super stream. The super stream consumer will create 3 consumers internally, one for each partition, and messages will flow in your MessageHandler.

Imagine now that you start another instance of your application. It will do the exact same thing as previously and the 2 instances will process the exact same messages in parallel. This may be not what you want: the messages will be processed twice!

Having one instance of your application may be enough: the data are spread across several streams automatically and the messages from the different partitions are processed in parallel from a single OS process.

But if you want to scale the processing across several OS processes (or bare-metal machines, or virtual machines) and you don’t want your messages to be processed several times as illustrated above, you’ll have to enable the single active consumer feature on your super stream consumer.

The next subsections cover the basic settings of a super stream consumer and a dedicated section covers how super stream consumers and single active consumer play together.

Declaring a Super Stream Consumer

Declaring a super stream consumer is not much different from declaring a single stream consumer. The ConsumerBuilder#superStream(String) must be used to set the super stream to consume from:

Declaring a super stream consumer
link:../../test/java/com/rabbitmq/stream/docs/SuperStreamUsage.java[role=include]
  1. Set the super stream name

  2. Close the consumer when it is no longer necessary

That’s all. The super stream consumer will take of the details (partitions lookup, coordination of the single consumers, etc).

Offset Tracking

The semantic of offset tracking for a super stream consumer are roughly the same as for an individual stream consumer. There are still some subtle differences, so a good understanding of offset tracking in general and of the automatic and manual offset tracking strategies is recommended.

Here are the main differences for the automatic/manual offset tracking strategies between single and super stream consuming:

  • automatic offset tracking: internally, the client divides the messageCountBeforeStorage setting by the number of partitions for each individual consumer. Imagine a 3-partition super stream, messageCountBeforeStorage set to 10,000, and 10,000 messages coming in, perfectly balanced across the partitions (that is about 3,333 messages for each partition). In this case, the automatic offset tracking strategy will not kick in, because the expected count message has not been reached on any partition. Making the client divide messageCountBeforeStorage by the number of partitions can be considered "more accurate" if the message are well balanced across the partitions. A good rule of thumb is to then multiply the expected per-stream messageCountBeforeStorage by the number of partitions, to avoid storing offsets too often. So the default being 10,000, it can be set to 30,000 for a 3-partition super stream.

  • manual offset tracking: the MessageHandler.Context#storeOffset() method must be used, the Consumer#store(long) will fail, because an offset value has a meaning only in one stream, not in other streams. A call to MessageHandler.Context#storeOffset() will store the current message offset in its stream, but also the offset of the last dispatched message for the other streams of the super stream.

Single Active Consumer Support
Warning
Single Active Consumer requires RabbitMQ 3.11 or more.

As stated previously, super stream consumers and single active consumer provide scalability and the guarantee that messages of an individual stream are processed in order.

Let’s take an example with a 3-partition super stream:

  • You have an application that creates a super stream consumer instance with single active consumer enabled.

  • You start 3 instances of this application. An instance in this case is a JVM process, which can be in a Docker container, a virtual machine, or a bare-metal server.

  • As the super stream has 3 partitions, each application instance will create a super stream consumer that maintains internally 3 consumer instances. That is 9 Java instances of consumer overall. Such a super stream consumer is a composite consumer.

  • The broker and the different application instances coordinate so that only 1 consumer instance for a given partition receives messages at a time. So among these 9 consumer instances, only 3 are actually active, the other ones are idle or inactive.

  • If one of the application instances stops, the broker will rebalance its active consumer to one of the other instances.

The following figure illustrates how the client library supports the combination of the super stream and single active consumer features. It uses a composite consumer that creates an individual consumer for each partition of the super stream. If there is only one single active consumer instance with a given name for a super stream, each individual consumer is active.

A single active consumer on a super stream is a composite consumer that creates an individual consumer for each partition
              +--------------------+
              |                    |
              |cGRE invoices–0     |
              |                    |    +-------------------+
              +--------------------+    |+-----------------+|
                                        |+cGRE consumer    ||Active
                                        |+-----------------+|
  invoices    +--------------------+    |                   |
              |                    |    |+-----------------+|
              |cPNK invoices–1     |    |+cPNK consumer    ||Active
              |                    |    |+-----------------+|
super stream  +--------------------+    |                   |
                                        |+-----------------+|
                                        |+cBLU consumer    ||Active
              +--------------------+    |+-----------------+|
              |                    |    +-------------------+
              |cBLU invoices–2     |     Composite Consumer
              |                    |
              +--------------------+

Imagine now we start 3 instances of the consuming application to scale out the processing. The individual consumer instances spread out across the super stream partitions and only one is active for each partition, as illustrated in the following figure:

Consumer instances spread across the super stream partitions and are activated accordingly
                                        +-------------------+
                                        |+-----------------+|
                                        |+cGRE consumer    ||Active
                                        |+-----------------+|
                                        |                   |
                                        |+-----------------+|
                                        |+cPNK consumer    ||Inactive
                                        |+-----------------+|
                                        |                   |
                                        |+-----------------+|
                                        |+cBLU consumer    ||Inactive
                                        |+-----------------+|
              +--------------------+    +-------------------+
              |                    |     Composite Consumer
              |cGRE invoices–0     |
              |                    |    +-------------------+
              +--------------------+    |+-----------------+|
                                        |+cGRE consumer    ||Inactive
                                        |+-----------------+|
  invoices    +--------------------+    |                   |
              |                    |    |+-----------------+|
              |cPNK invoices–1     |    |+cPNK consumer    ||Active
              |                    |    |+-----------------+|
super stream  +--------------------+    |                   |
                                        |+-----------------+|
                                        |+cBLU consumer    ||Inactive
              +--------------------+    |+-----------------+|
              |                    |    +-------------------+
              |cBLU invoices–2     |     Composite Consumer
              |                    |
              +--------------------+    +-------------------+
                                        |+-----------------+|
                                        |+cGRE consumer    ||Inactive
                                        |+-----------------+|
                                        |                   |
                                        |+-----------------+|
                                        |+cPNK consumer    ||Inactive
                                        |+-----------------+|
                                        |                   |
                                        |+-----------------+|
                                        |+cBLU consumer    ||Active
                                        |+-----------------+|
                                        +-------------------+
                                         Composite Consumer

After this overview, let’s see the API and the configuration details.

The following snippet shows how to declare a single active consumer on a super stream with the ConsumerBuilder#superStream(String) and ConsumerBuilder#singleActiveConsumer() methods:

Enabling single active consumer on a super stream
link:../../test/java/com/rabbitmq/stream/docs/SuperStreamUsage.java[role=include]
  1. Set the super stream name

  2. Set the consumer name (mandatory to enable single active consumer)

  3. Enable single active consumer

Note it is mandatory to specify a name for the consumer. This name will be used to identify the group of consumer instances and make sure only one is active for each partition. The name is also the reference for offset tracking.

The example above uses by default automatic offset tracking. With this strategy, the client library takes care of offset tracking when consumers become active or inactive. It looks up the latest stored offset when a consumer becomes active to start consuming at the appropriate offset and it stores the last dispatched offset when a consumer becomes inactive.

The story is not the same with manual offset tracking as the client library does not know which offset it should store when a consumer becomes inactive. The application developer can use the ConsumerUpdateListener) callback to react appropriately when a consumer changes state. The following snippet illustrates the use of the ConsumerUpdateListener callback:

Using manual offset tracking for a super stream single active consumer
link:../../test/java/com/rabbitmq/stream/docs/SuperStreamUsage.java[role=include]
  1. Set the super stream name

  2. Set the consumer name (mandatory to enable single active consumer)

  3. Enable single active consumer

  4. Enable manual offset tracking strategy

  5. Set ConsumerUpdateListener

  6. Return stored offset + 1 or default when consumer becomes active

  7. Store last processed offset for the stream when consumer becomes inactive

  8. Store the current offset on some condition

The ConsumerUpdateListener callback must return the offset to start consuming from when a consumer becomes active. This is what the code above does: it checks if the consumer is active with ConsumerUpdateListener.Context#isActive() and looks up the last stored offset. If there is no stored offset yet, it returns a default value, OffsetSpecification#next() here.

When a consumer becomes inactive, it should store the last processed offset, as another consumer instance will take over elsewhere. It is expected this other consumer runs the exact same code, so it will execute the same sequence when it becomes active (looking up the stored offset, returning the value + 1).

Note the ConsumerUpdateListener is called for a partition, that is an individual stream. The application code should take care of maintaining a reference of the last processed offset for each partition of the super stream, e.g. with a Map<String, Long> (partition-to-offset map). To do so, the context parameter of the MessageHandler and ConsumerUpdateListener callbacks provide a stream() method.

RabbitMQ Stream provides server-side offset tracking, but it is possible to use an external store to track offsets for streams. The ConsumerUpdateListener callback is still your friend in this case. The following snippet shows how to leverage it when an external store is in use:

Using external offset tracking for a super stream single active consumer
link:../../test/java/com/rabbitmq/stream/docs/SuperStreamUsage.java[role=include]
  1. Set the super stream name

  2. Set the consumer name (mandatory to enable single active consumer)

  3. Enable single active consumer

  4. Disable server-side offset tracking

  5. Set ConsumerUpdateListener

  6. Use external store for stored offset when consumer becomes active

  7. Assume offset already stored when consumer becomes inactive

  8. Use external store for offset tracking

Here are the takeaway points of this code:

  • Even though there is no server-side offset tracking to use it, the consumer must still have a name to identify the group it belongs to. The external offset tracking mechanism is free to use the same name or not.

  • Calling ConsumerBuilder#noTrackingStrategy() is necessary to disable server-side offset tracking, or the automatic tracking strategy will kick in.

  • The snippet does not provide the details, but the offset tracking mechanism seems to store the offset for each message. The external store must be able to cope with the message rate in a real-world scenario.

  • The ConsumerUpdateListener callback returns the last stored offset + 1 when the consumer becomes active. This way the broker will resume the dispatching at this location in the stream.

  • A well-behaved ConsumerUpdateListener must make sure the last processed offset is stored when the consumer becomes inactive, so that the consumer that will take over can look up the offset and resume consuming at the right location. Our ConsumerUpdateListener does not do anything when the consumer becomes inactive (it returns null): it can afford this because the offset is stored for each message. Make sure to store the last processed offset when the consumer becomes inactive to avoid duplicates when the consumption resumes elsewhere.