Skip to content

Commit 067c15d

Browse files
authored
Merge pull request #32 from rabbitmq/super-stream-consumer
Super stream support
2 parents 05477a1 + 9ff897b commit 067c15d

19 files changed

+808
-151
lines changed

src/docs/asciidoc/api.adoc

+1-1
Original file line numberDiff line numberDiff line change
@@ -880,4 +880,4 @@ entry, which has its own offset.
880880

881881
This means one must be careful when basing some decision on offset values, like
882882
a modulo to perform an operation every X messages. As the message offsets have
883-
no guarantee to be contiguous, the operation may not happen exactly every X messages.
883+
no guarantee to be contiguous, the operation may not happen exactly every X messages.

src/docs/asciidoc/index.adoc

+2
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ include::sample-application.adoc[]
2222

2323
include::api.adoc[]
2424

25+
include::super-streams.adoc[]
26+
2527
include::building.adoc[]
2628

2729
include::performance-tool.adoc[]

src/docs/asciidoc/super-streams.adoc

+168
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
:test-examples: ../../test/java/com/rabbitmq/stream/docs
2+
3+
[[super-streams]]
4+
==== Super Streams (Partitioned Streams)
5+
6+
[WARNING]
7+
.Experimental
8+
====
9+
Super streams are an experimental feature, they are subject to change.
10+
====
11+
12+
A super stream is a logical stream made of several individual streams.
13+
In essence, a super stream is a partitioned stream that brings scalability compared to a single stream.
14+
15+
The stream Java client uses the same programming model for super streams as with individual streams, that is the `Producer`, `Consumer`, `Message`, etc API are still valid when super streams are in use.
16+
Application code should not be impacted whether it uses individual or super streams.
17+
18+
===== Topology
19+
20+
A super stream is made of several individual streams, so it can be considered a logical entity rather than an actual physical entity.
21+
The topology of a super stream is based on the https://www.rabbitmq.com/tutorials/amqp-concepts.html[AMQP 0.9.1 model], that is exchange, queues, and bindings between them.
22+
This does not mean AMQP resources are used to transport or store stream messages, it means that they are used to _describe_ the super stream topology, that is the streams it is made of.
23+
24+
Let's take the example of an `invoices` super stream made of 3 streams (i.e. partitions):
25+
26+
* an `invoices` exchange represents the super stream
27+
* the `invoices-0`, `invoices-1`, `invoices-2` streams are the partitions of the super stream (streams are also AMQP queues in RabbitMQ)
28+
* 3 bindings between the exchange and the streams link the super stream to its partitions and represent _routing rules_
29+
30+
.The topology of a super stream is defined with bindings between an exchange and queues
31+
[ditaa]
32+
....
33+
0 +------------+
34+
+----->+ invoices–0 |
35+
| +------------+
36+
+----------+ |
37+
| invoices | | 1 +------------+
38+
| +---+----->+ invoices–1 |
39+
| exchange | | +------------+
40+
+----------+ |
41+
| 2 +------------+
42+
+----->+ invoices–2 |
43+
+------------+
44+
....
45+
46+
When a super stream is in use, the stream Java client queries this information to find out about the partitions of a super stream and the routing rules.
47+
From the application code point of view, using a super stream is mostly configuration-based.
48+
Some logic must also be provided to extract routing information from messages.
49+
50+
===== Publishing to a Super Stream
51+
52+
When the topology of a super stream like the one described above has been set, creating a producer for it is straightforward:
53+
54+
.Creating a Producer for a Super Stream
55+
[source,java,indent=0]
56+
--------
57+
include::{test-examples}/SuperStreamUsage.java[tag=producer-simple]
58+
--------
59+
<1> Use the super stream name
60+
<2> Provide the logic to get the routing key from a message
61+
<3> Create the producer instance
62+
<4> Close the producer when it's no longer necessary
63+
64+
Note that even though the `invoices` super stream is not an actual stream, its name must be used to declare the producer.
65+
Internally the client will figure out the streams that compose the super stream.
66+
The application code must provide the logic to extract a routing key from a message as a `Function<Message, String>`.
67+
The client will hash the routing key to determine the stream to send the message to (using partition list and a modulo operation).
68+
69+
The client uses 32-bit https://en.wikipedia.org/wiki/MurmurHash[MurmurHash3] by default to hash the routing key.
70+
This hash function provides good uniformity, performance, and portability, making it a good default choice, but it is possible to specify a custom hash function:
71+
72+
.Specifying a custom hash function
73+
[source,java,indent=0]
74+
--------
75+
include::{test-examples}/SuperStreamUsage.java[tag=producer-custom-hash-function]
76+
--------
77+
<1> Use `String#hashCode()` to hash the routing key
78+
79+
Note using Java's `hashCode()` method is a debatable choice as potential producers in other languages are unlikely to implement it, making the routing different between producers in different languages.
80+
81+
====== Resolving Routes with Bindings
82+
83+
Hashing the routing key to pick a partition is only one way to route messages to the appropriate streams.
84+
The stream Java client provides another way to resolve streams, based on the routing key _and_ the bindings between the super stream exchange and the streams.
85+
86+
This routing strategy makes sense when the partitioning has a business meaning, e.g. with a partition for a region in the world, like in the diagram below:
87+
88+
.A super stream with a partition for a region in a world
89+
[ditaa]
90+
....
91+
amer +---------------+
92+
+------>+ invoices–amer |
93+
| +---------------+
94+
+----------+ |
95+
| invoices | | emea +---------------+
96+
| +---+------>+ invoices–emea |
97+
| exchange | | +---------------+
98+
+----------+ |
99+
| apac +---------------+
100+
+------>+ invoices–apac |
101+
+---------------+
102+
....
103+
104+
In such a case, the routing key will be a property of the message that represents the region:
105+
106+
.Enabling the "key" routing strategy
107+
[source,java,indent=0]
108+
--------
109+
include::{test-examples}/SuperStreamUsage.java[tag=producer-key-routing-strategy]
110+
--------
111+
<1> Extract the routing key
112+
<2> Enable the "key" routing strategy
113+
114+
Internally the client will query the broker to resolve the destination streams for a given routing key, making the routing logic from any exchange type available to streams.
115+
Note the client caches results, it does not query the broker for every message.
116+
117+
====== Using a Custom Routing Strategy
118+
119+
The solution that provides the most control over routing is using a custom routing strategy.
120+
This should be needed only for specific cases.
121+
122+
The following code sample shows how to implement a simplistic round-robin `RoutingStrategy` and use it in the producer.
123+
Note this implementation should not be used in production as the modulo operation is not sign-safe for simplicity's sake.
124+
125+
.Setting a round-robin routing strategy
126+
[source,java,indent=0]
127+
--------
128+
include::{test-examples}/SuperStreamUsage.java[tag=producer-custom-routing-strategy]
129+
--------
130+
<1> No need to set the routing key extraction logic
131+
<2> Set the custom routing strategy
132+
133+
====== Deduplication
134+
135+
Deduplication for a super stream producer works the same way as with a <<api.adoc#outbound-message-deduplication, single stream producer>>.
136+
The publishing ID values are spread across the streams but this does affect the mechanism.
137+
138+
===== Consuming From a Super Stream
139+
140+
A super stream consumer is not much different from a single stream consumer.
141+
The `ConsumerBuilder#superStream(String)` must be used to set the super stream to consume from:
142+
143+
.Declaring a super stream consumer
144+
[source,java,indent=0]
145+
--------
146+
include::{test-examples}/SuperStreamUsage.java[tag=consumer-simple]
147+
--------
148+
<1> Set the super stream name
149+
<2> Close the consumer when it is no longer necessary
150+
151+
A super stream consumer is a composite consumer: it will look up the super stream partitions and create a consumer for each or them.
152+
153+
====== Offset Tracking
154+
155+
The semantic of offset tracking for a super stream consumer are roughly the same as for an individual stream consumer.
156+
There are still some subtle differences, so a good understanding of <<api.adoc#consumer-offset-tracking, offset tracking>> in general and of the <<api.adoc#consumer-automatic-offset-tracking,automatic>> and <<api.adoc#consumer-manual-offset-tracking,manual>> offset tracking strategies is recommended.
157+
158+
Here are the main differences for the automatic/manual offset tracking strategies between single and super stream consuming:
159+
160+
* *automatic offset tracking*: internally, _the client divides the `messageCountBeforeStorage` setting by the number of partitions for each individual consumer_.
161+
Imagine a 3-partition super stream, `messageCountBeforeStorage` set to 10,000, and 10,000 messages coming in, perfectly balanced across the partitions (that is about 3,333 messages for each partition).
162+
In this case, the automatic offset tracking strategy will not kick in, because the expected count message has not been reached on any partition.
163+
Making the client divide `messageCountBeforeStorage` by the number of partitions can be considered "more accurate" if the message are well balanced across the partitions.
164+
A good rule of thumb is to then multiply the expected per-stream `messageCountBeforeStorage` by the number of partitions, to avoid storing offsets too often. So the default being 10,000, it can be set to 30,000 for a 3-partition super stream.
165+
* *manual offset tracking*: the `MessageHandler.Context#storeOffset()` method must be used, the `Consumer#store(long)` will fail, because an offset value has a meaning only in one stream, not in other streams.
166+
A call to `MessageHandler.Context#storeOffset()` will store the current message offset in _its_ stream, but also the offset of the last dispatched message for the other streams of the super stream.
167+
168+

src/main/java/com/rabbitmq/stream/Constants.java

+1
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ public final class Constants {
3939
public static final short CODE_PRODUCER_NOT_AVAILABLE = 10_002;
4040
public static final short CODE_PRODUCER_CLOSED = 10_003;
4141
public static final short CODE_PUBLISH_CONFIRM_TIMEOUT = 10_004;
42+
public static final short CODE_NO_ROUTE_FOUND = 10_005;
4243

4344
public static final short COMMAND_DECLARE_PUBLISHER = 1;
4445
public static final short COMMAND_PUBLISH = 2;

src/main/java/com/rabbitmq/stream/ConsumerBuilder.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,9 @@ public interface ConsumerBuilder {
2727
ConsumerBuilder stream(String stream);
2828

2929
/**
30-
* Set the consumer to consume from a super stream (partitioned stream). Experimental!
30+
* Set the consumer to consume from a super stream (partitioned stream).
31+
*
32+
* <p>This is an experimental API, subject to change.
3133
*
3234
* @param superStream
3335
* @return this builder instance

src/main/java/com/rabbitmq/stream/ProducerBuilder.java

+69-21
Original file line numberDiff line numberDiff line change
@@ -108,37 +108,85 @@ public interface ProducerBuilder {
108108
ProducerBuilder enqueueTimeout(Duration timeout);
109109

110110
/**
111-
* Routing strategy for super streams. Experimental!
111+
* Create the {@link Producer} instance.
112112
*
113-
* @param routingKeyExtractor
114-
* @param routingType
115-
* @return this builder instance
113+
* @return the configured producer
116114
*/
117-
ProducerBuilder routing(Function<Message, String> routingKeyExtractor, RoutingType routingType);
115+
Producer build();
118116

119117
/**
120-
* Routing strategy for super streams. Experimental!
118+
* Configure the routing for super streams (partitioned streams).
121119
*
122-
* @param routingKeyExtractor
123-
* @param routingType
124-
* @param hash
125-
* @return this builder instance
120+
* <p>This is an experimental API, subject to change.
121+
*
122+
* <p>The to-be-created producer will be a composite producer when this method is called. It will
123+
* use the routing configuration to find out where a message should be routed. The application
124+
* developer must provide the logic to extract a "routing key" from a message, which will decide
125+
* the destination(s) of the message.
126+
*
127+
* <p>The default routing strategy hashes the routing key to choose the stream (partition) to send
128+
* the message to.
129+
*
130+
* Note the routing key extraction logic is required only when the built-in routing strategies
131+
* are used. It can set to <code>null</code> when a custom {@link RoutingStrategy} is set
132+
* with {@link #routing(Function)}.
133+
*
134+
* @param routingKeyExtractor the logic to extract a routing key from a message
135+
* @return the routing configuration instance
136+
* @see RoutingConfiguration
126137
*/
127-
ProducerBuilder routing(
128-
Function<Message, String> routingKeyExtractor,
129-
RoutingType routingType,
130-
ToIntFunction<String> hash);
138+
RoutingConfiguration routing(Function<Message, String> routingKeyExtractor);
131139

132140
/**
133-
* Create the {@link Producer} instance.
141+
* Routing configuration for super streams (partitioned streams).
134142
*
135-
* @return the configured producer
143+
* <p>This is an experimental API, subject to change.
136144
*/
137-
Producer build();
145+
interface RoutingConfiguration {
146+
147+
/**
148+
* Enable the "hash" routing strategy (the default).
149+
*
150+
* <p>The default hash algorithm is 32-bit MurmurHash3.
151+
*
152+
* @return the routing configuration instance
153+
*/
154+
RoutingConfiguration hash();
155+
156+
/**
157+
* Enable the "hash" routing strategy with a specific hash algorithm.
158+
*
159+
* @param hash
160+
* @return
161+
*/
162+
RoutingConfiguration hash(ToIntFunction<String> hash);
163+
164+
/**
165+
* Enable the "key" routing strategy.
166+
*
167+
* <p>It consists in using the "route" command of the RabbitMQ Stream protocol to determine the
168+
* streams to send a message to.
169+
*
170+
* @return the routing configuration instance
171+
*/
172+
RoutingConfiguration key();
173+
174+
/**
175+
* Set the routing strategy to use.
176+
*
177+
* <p>Providing the routing strategy provides control over the streams a message is routed to
178+
* (routing key extraction logic if relevant and destination(s) decision).
179+
*
180+
* @param routingStrategy
181+
* @return the routing configuration instance
182+
*/
183+
RoutingConfiguration strategy(RoutingStrategy routingStrategy);
138184

139-
/** Routing type when using super streams. Experimental! */
140-
enum RoutingType {
141-
HASH,
142-
KEY
185+
/**
186+
* Go back to the producer builder.
187+
*
188+
* @return the producer builder
189+
*/
190+
ProducerBuilder producerBuilder();
143191
}
144192
}

src/main/java/com/rabbitmq/stream/impl/RoutingStrategy.java renamed to src/main/java/com/rabbitmq/stream/RoutingStrategy.java

+28-4
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,36 @@
1111
//
1212
// If you have any questions regarding licensing, please contact us at
1313
14-
package com.rabbitmq.stream.impl;
14+
package com.rabbitmq.stream;
1515

16-
import com.rabbitmq.stream.Message;
1716
import java.util.List;
17+
import java.util.function.Function;
1818

19-
interface RoutingStrategy {
19+
/**
20+
* Strategy to route outbound messages to appropriate streams.
21+
*
22+
* <p>This is an experimental API, subject to change.
23+
*
24+
* <p>Used for super streams (partitioned stream).
25+
*
26+
* @see ProducerBuilder#routing(Function)
27+
*/
28+
public interface RoutingStrategy {
2029

21-
List<String> route(Message message);
30+
/**
31+
* Where to route a message.
32+
*
33+
* @param message
34+
* @param metadata
35+
* @return
36+
*/
37+
List<String> route(Message message, Metadata metadata);
38+
39+
/** Metadata on the super stream. */
40+
interface Metadata {
41+
42+
List<String> partitions();
43+
44+
List<String> route(String routingKey);
45+
}
2246
}

src/main/java/com/rabbitmq/stream/impl/HashRoutingStrategy.java

+5-18
Original file line numberDiff line numberDiff line change
@@ -14,41 +14,28 @@
1414
package com.rabbitmq.stream.impl;
1515

1616
import com.rabbitmq.stream.Message;
17+
import com.rabbitmq.stream.RoutingStrategy;
1718
import java.util.Collections;
1819
import java.util.List;
19-
import java.util.concurrent.CopyOnWriteArrayList;
2020
import java.util.function.Function;
2121
import java.util.function.ToIntFunction;
22-
import java.util.stream.Collectors;
2322

2423
class HashRoutingStrategy implements RoutingStrategy {
2524

2625
private final Function<Message, String> routingKeyExtractor;
2726

28-
private final StreamEnvironment env;
29-
30-
private final List<List<String>> partitions;
31-
3227
private final ToIntFunction<String> hash;
3328

34-
HashRoutingStrategy(
35-
String superStream,
36-
Function<Message, String> routingKeyExtractor,
37-
StreamEnvironment env,
38-
ToIntFunction<String> hash) {
29+
HashRoutingStrategy(Function<Message, String> routingKeyExtractor, ToIntFunction<String> hash) {
3930
this.routingKeyExtractor = routingKeyExtractor;
40-
this.env = env;
41-
List<String> ps = this.env.locatorOperation(c -> c.partitions(superStream));
42-
this.partitions =
43-
new CopyOnWriteArrayList<>(
44-
ps.stream().map(Collections::singletonList).collect(Collectors.toList()));
4531
this.hash = hash;
4632
}
4733

4834
@Override
49-
public List<String> route(Message message) {
35+
public List<String> route(Message message, Metadata metadata) {
5036
String routingKey = routingKeyExtractor.apply(message);
5137
int hashValue = hash.applyAsInt(routingKey);
52-
return this.partitions.get((hashValue & 0x7FFFFFFF) % this.partitions.size());
38+
List<String> partitions = metadata.partitions();
39+
return Collections.singletonList(partitions.get((hashValue & 0x7FFFFFFF) % partitions.size()));
5340
}
5441
}

0 commit comments

Comments
 (0)