Skip to content

Super stream support #32

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Sep 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/docs/asciidoc/api.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -880,4 +880,4 @@ entry, which has its own offset.

This means one must be careful when basing some decision on offset values, like
a modulo to perform an operation every X messages. As the message offsets have
no guarantee to be contiguous, the operation may not happen exactly every X messages.
no guarantee to be contiguous, the operation may not happen exactly every X messages.
2 changes: 2 additions & 0 deletions src/docs/asciidoc/index.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ include::sample-application.adoc[]

include::api.adoc[]

include::super-streams.adoc[]

include::building.adoc[]

include::performance-tool.adoc[]
168 changes: 168 additions & 0 deletions src/docs/asciidoc/super-streams.adoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
:test-examples: ../../test/java/com/rabbitmq/stream/docs

[[super-streams]]
==== Super Streams (Partitioned Streams)

[WARNING]
.Experimental
====
Super streams are an experimental feature, they are subject to change.
====

A super stream is a logical stream made of several individual streams.
In essence, a super stream is a partitioned stream that brings scalability compared to a single stream.

The stream Java client uses the same programming model for super streams as with individual streams, that is the `Producer`, `Consumer`, `Message`, etc API are still valid when super streams are in use.
Application code should not be impacted whether it uses individual or super streams.

===== Topology

A super stream is made of several individual streams, so it can be considered a logical entity rather than an actual physical entity.
The topology of a super stream is based on the https://www.rabbitmq.com/tutorials/amqp-concepts.html[AMQP 0.9.1 model], that is exchange, queues, and bindings between them.
This does not mean AMQP resources are used to transport or store stream messages, it means that they are used to _describe_ the super stream topology, that is the streams it is made of.

Let's take the example of an `invoices` super stream made of 3 streams (i.e. partitions):

* an `invoices` exchange represents the super stream
* the `invoices-0`, `invoices-1`, `invoices-2` streams are the partitions of the super stream (streams are also AMQP queues in RabbitMQ)
* 3 bindings between the exchange and the streams link the super stream to its partitions and represent _routing rules_

.The topology of a super stream is defined with bindings between an exchange and queues
[ditaa]
....
0 +------------+
+----->+ invoices–0 |
| +------------+
+----------+ |
| invoices | | 1 +------------+
| +---+----->+ invoices–1 |
| exchange | | +------------+
+----------+ |
| 2 +------------+
+----->+ invoices–2 |
+------------+
....

When a super stream is in use, the stream Java client queries this information to find out about the partitions of a super stream and the routing rules.
From the application code point of view, using a super stream is mostly configuration-based.
Some logic must also be provided to extract routing information from messages.

===== Publishing to a Super Stream

When the topology of a super stream like the one described above has been set, creating a producer for it is straightforward:

.Creating a Producer for a Super Stream
[source,java,indent=0]
--------
include::{test-examples}/SuperStreamUsage.java[tag=producer-simple]
--------
<1> Use the super stream name
<2> Provide the logic to get the routing key from a message
<3> Create the producer instance
<4> Close the producer when it's no longer necessary

Note that even though the `invoices` super stream is not an actual stream, its name must be used to declare the producer.
Internally the client will figure out the streams that compose the super stream.
The application code must provide the logic to extract a routing key from a message as a `Function<Message, String>`.
The client will hash the routing key to determine the stream to send the message to (using partition list and a modulo operation).

The client uses 32-bit https://en.wikipedia.org/wiki/MurmurHash[MurmurHash3] by default to hash the routing key.
This hash function provides good uniformity, performance, and portability, making it a good default choice, but it is possible to specify a custom hash function:

.Specifying a custom hash function
[source,java,indent=0]
--------
include::{test-examples}/SuperStreamUsage.java[tag=producer-custom-hash-function]
--------
<1> Use `String#hashCode()` to hash the routing key

Note using Java's `hashCode()` method is a debatable choice as potential producers in other languages are unlikely to implement it, making the routing different between producers in different languages.

====== Resolving Routes with Bindings

Hashing the routing key to pick a partition is only one way to route messages to the appropriate streams.
The stream Java client provides another way to resolve streams, based on the routing key _and_ the bindings between the super stream exchange and the streams.

This routing strategy makes sense when the partitioning has a business meaning, e.g. with a partition for a region in the world, like in the diagram below:

.A super stream with a partition for a region in a world
[ditaa]
....
amer +---------------+
+------>+ invoices–amer |
| +---------------+
+----------+ |
| invoices | | emea +---------------+
| +---+------>+ invoices–emea |
| exchange | | +---------------+
+----------+ |
| apac +---------------+
+------>+ invoices–apac |
+---------------+
....

In such a case, the routing key will be a property of the message that represents the region:

.Enabling the "key" routing strategy
[source,java,indent=0]
--------
include::{test-examples}/SuperStreamUsage.java[tag=producer-key-routing-strategy]
--------
<1> Extract the routing key
<2> Enable the "key" routing strategy

Internally the client will query the broker to resolve the destination streams for a given routing key, making the routing logic from any exchange type available to streams.
Note the client caches results, it does not query the broker for every message.

====== Using a Custom Routing Strategy

The solution that provides the most control over routing is using a custom routing strategy.
This should be needed only for specific cases.

The following code sample shows how to implement a simplistic round-robin `RoutingStrategy` and use it in the producer.
Note this implementation should not be used in production as the modulo operation is not sign-safe for simplicity's sake.

.Setting a round-robin routing strategy
[source,java,indent=0]
--------
include::{test-examples}/SuperStreamUsage.java[tag=producer-custom-routing-strategy]
--------
<1> No need to set the routing key extraction logic
<2> Set the custom routing strategy

====== Deduplication

Deduplication for a super stream producer works the same way as with a <<api.adoc#outbound-message-deduplication, single stream producer>>.
The publishing ID values are spread across the streams but this does affect the mechanism.

===== Consuming From a Super Stream

A super stream consumer is not much different from a single stream consumer.
The `ConsumerBuilder#superStream(String)` must be used to set the super stream to consume from:

.Declaring a super stream consumer
[source,java,indent=0]
--------
include::{test-examples}/SuperStreamUsage.java[tag=consumer-simple]
--------
<1> Set the super stream name
<2> Close the consumer when it is no longer necessary

A super stream consumer is a composite consumer: it will look up the super stream partitions and create a consumer for each or them.

====== Offset Tracking

The semantic of offset tracking for a super stream consumer are roughly the same as for an individual stream consumer.
There are still some subtle differences, so a good understanding of <<api.adoc#consumer-offset-tracking, offset tracking>> in general and of the <<api.adoc#consumer-automatic-offset-tracking,automatic>> and <<api.adoc#consumer-manual-offset-tracking,manual>> offset tracking strategies is recommended.

Here are the main differences for the automatic/manual offset tracking strategies between single and super stream consuming:

* *automatic offset tracking*: internally, _the client divides the `messageCountBeforeStorage` setting by the number of partitions for each individual consumer_.
Imagine a 3-partition super stream, `messageCountBeforeStorage` set to 10,000, and 10,000 messages coming in, perfectly balanced across the partitions (that is about 3,333 messages for each partition).
In this case, the automatic offset tracking strategy will not kick in, because the expected count message has not been reached on any partition.
Making the client divide `messageCountBeforeStorage` by the number of partitions can be considered "more accurate" if the message are well balanced across the partitions.
A good rule of thumb is to then multiply the expected per-stream `messageCountBeforeStorage` by the number of partitions, to avoid storing offsets too often. So the default being 10,000, it can be set to 30,000 for a 3-partition super stream.
* *manual offset tracking*: the `MessageHandler.Context#storeOffset()` method must be used, the `Consumer#store(long)` will fail, because an offset value has a meaning only in one stream, not in other streams.
A call to `MessageHandler.Context#storeOffset()` will store the current message offset in _its_ stream, but also the offset of the last dispatched message for the other streams of the super stream.


1 change: 1 addition & 0 deletions src/main/java/com/rabbitmq/stream/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public final class Constants {
public static final short CODE_PRODUCER_NOT_AVAILABLE = 10_002;
public static final short CODE_PRODUCER_CLOSED = 10_003;
public static final short CODE_PUBLISH_CONFIRM_TIMEOUT = 10_004;
public static final short CODE_NO_ROUTE_FOUND = 10_005;

public static final short COMMAND_DECLARE_PUBLISHER = 1;
public static final short COMMAND_PUBLISH = 2;
Expand Down
4 changes: 3 additions & 1 deletion src/main/java/com/rabbitmq/stream/ConsumerBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ public interface ConsumerBuilder {
ConsumerBuilder stream(String stream);

/**
* Set the consumer to consume from a super stream (partitioned stream). Experimental!
* Set the consumer to consume from a super stream (partitioned stream).
*
* <p>This is an experimental API, subject to change.
*
* @param superStream
* @return this builder instance
Expand Down
90 changes: 69 additions & 21 deletions src/main/java/com/rabbitmq/stream/ProducerBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -108,37 +108,85 @@ public interface ProducerBuilder {
ProducerBuilder enqueueTimeout(Duration timeout);

/**
* Routing strategy for super streams. Experimental!
* Create the {@link Producer} instance.
*
* @param routingKeyExtractor
* @param routingType
* @return this builder instance
* @return the configured producer
*/
ProducerBuilder routing(Function<Message, String> routingKeyExtractor, RoutingType routingType);
Producer build();

/**
* Routing strategy for super streams. Experimental!
* Configure the routing for super streams (partitioned streams).
*
* @param routingKeyExtractor
* @param routingType
* @param hash
* @return this builder instance
* <p>This is an experimental API, subject to change.
*
* <p>The to-be-created producer will be a composite producer when this method is called. It will
* use the routing configuration to find out where a message should be routed. The application
* developer must provide the logic to extract a "routing key" from a message, which will decide
* the destination(s) of the message.
*
* <p>The default routing strategy hashes the routing key to choose the stream (partition) to send
* the message to.
*
* Note the routing key extraction logic is required only when the built-in routing strategies
* are used. It can set to <code>null</code> when a custom {@link RoutingStrategy} is set
* with {@link #routing(Function)}.
*
* @param routingKeyExtractor the logic to extract a routing key from a message
* @return the routing configuration instance
* @see RoutingConfiguration
*/
ProducerBuilder routing(
Function<Message, String> routingKeyExtractor,
RoutingType routingType,
ToIntFunction<String> hash);
RoutingConfiguration routing(Function<Message, String> routingKeyExtractor);

/**
* Create the {@link Producer} instance.
* Routing configuration for super streams (partitioned streams).
*
* @return the configured producer
* <p>This is an experimental API, subject to change.
*/
Producer build();
interface RoutingConfiguration {

/**
* Enable the "hash" routing strategy (the default).
*
* <p>The default hash algorithm is 32-bit MurmurHash3.
*
* @return the routing configuration instance
*/
RoutingConfiguration hash();

/**
* Enable the "hash" routing strategy with a specific hash algorithm.
*
* @param hash
* @return
*/
RoutingConfiguration hash(ToIntFunction<String> hash);

/**
* Enable the "key" routing strategy.
*
* <p>It consists in using the "route" command of the RabbitMQ Stream protocol to determine the
* streams to send a message to.
*
* @return the routing configuration instance
*/
RoutingConfiguration key();

/**
* Set the routing strategy to use.
*
* <p>Providing the routing strategy provides control over the streams a message is routed to
* (routing key extraction logic if relevant and destination(s) decision).
*
* @param routingStrategy
* @return the routing configuration instance
*/
RoutingConfiguration strategy(RoutingStrategy routingStrategy);

/** Routing type when using super streams. Experimental! */
enum RoutingType {
HASH,
KEY
/**
* Go back to the producer builder.
*
* @return the producer builder
*/
ProducerBuilder producerBuilder();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,36 @@
//
// If you have any questions regarding licensing, please contact us at
// [email protected].
package com.rabbitmq.stream.impl;
package com.rabbitmq.stream;

import com.rabbitmq.stream.Message;
import java.util.List;
import java.util.function.Function;

interface RoutingStrategy {
/**
* Strategy to route outbound messages to appropriate streams.
*
* <p>This is an experimental API, subject to change.
*
* <p>Used for super streams (partitioned stream).
*
* @see ProducerBuilder#routing(Function)
*/
public interface RoutingStrategy {

List<String> route(Message message);
/**
* Where to route a message.
*
* @param message
* @param metadata
* @return
*/
List<String> route(Message message, Metadata metadata);

/** Metadata on the super stream. */
interface Metadata {

List<String> partitions();

List<String> route(String routingKey);
}
}
23 changes: 5 additions & 18 deletions src/main/java/com/rabbitmq/stream/impl/HashRoutingStrategy.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,41 +14,28 @@
package com.rabbitmq.stream.impl;

import com.rabbitmq.stream.Message;
import com.rabbitmq.stream.RoutingStrategy;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Function;
import java.util.function.ToIntFunction;
import java.util.stream.Collectors;

class HashRoutingStrategy implements RoutingStrategy {

private final Function<Message, String> routingKeyExtractor;

private final StreamEnvironment env;

private final List<List<String>> partitions;

private final ToIntFunction<String> hash;

HashRoutingStrategy(
String superStream,
Function<Message, String> routingKeyExtractor,
StreamEnvironment env,
ToIntFunction<String> hash) {
HashRoutingStrategy(Function<Message, String> routingKeyExtractor, ToIntFunction<String> hash) {
this.routingKeyExtractor = routingKeyExtractor;
this.env = env;
List<String> ps = this.env.locatorOperation(c -> c.partitions(superStream));
this.partitions =
new CopyOnWriteArrayList<>(
ps.stream().map(Collections::singletonList).collect(Collectors.toList()));
this.hash = hash;
}

@Override
public List<String> route(Message message) {
public List<String> route(Message message, Metadata metadata) {
String routingKey = routingKeyExtractor.apply(message);
int hashValue = hash.applyAsInt(routingKey);
return this.partitions.get((hashValue & 0x7FFFFFFF) % this.partitions.size());
List<String> partitions = metadata.partitions();
return Collections.singletonList(partitions.get((hashValue & 0x7FFFFFFF) % partitions.size()));
}
}
Loading