Skip to content

Commit 9ff897b

Browse files
committed
Write super stream consumer documentation
1 parent 5c6fe3c commit 9ff897b

File tree

3 files changed

+103
-4
lines changed

3 files changed

+103
-4
lines changed

src/docs/asciidoc/super-streams.adoc

+58-4
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
:test-examples: ../../test/java/com/rabbitmq/stream/docs
22

3+
[[super-streams]]
34
==== Super Streams (Partitioned Streams)
45

56
[WARNING]
@@ -14,7 +15,7 @@ In essence, a super stream is a partitioned stream that brings scalability compa
1415
The stream Java client uses the same programming model for super streams as with individual streams, that is the `Producer`, `Consumer`, `Message`, etc API are still valid when super streams are in use.
1516
Application code should not be impacted whether it uses individual or super streams.
1617

17-
==== Topology
18+
===== Topology
1819

1920
A super stream is made of several individual streams, so it can be considered a logical entity rather than an actual physical entity.
2021
The topology of a super stream is based on the https://www.rabbitmq.com/tutorials/amqp-concepts.html[AMQP 0.9.1 model], that is exchange, queues, and bindings between them.
@@ -46,7 +47,7 @@ When a super stream is in use, the stream Java client queries this information t
4647
From the application code point of view, using a super stream is mostly configuration-based.
4748
Some logic must also be provided to extract routing information from messages.
4849

49-
==== Publishing to a Super Stream
50+
===== Publishing to a Super Stream
5051

5152
When the topology of a super stream like the one described above has been set, creating a producer for it is straightforward:
5253

@@ -77,7 +78,7 @@ include::{test-examples}/SuperStreamUsage.java[tag=producer-custom-hash-function
7778

7879
Note using Java's `hashCode()` method is a debatable choice as potential producers in other languages are unlikely to implement it, making the routing different between producers in different languages.
7980

80-
==== Resolving Routes with Bindings
81+
====== Resolving Routes with Bindings
8182

8283
Hashing the routing key to pick a partition is only one way to route messages to the appropriate streams.
8384
The stream Java client provides another way to resolve streams, based on the routing key _and_ the bindings between the super stream exchange and the streams.
@@ -111,4 +112,57 @@ include::{test-examples}/SuperStreamUsage.java[tag=producer-key-routing-strategy
111112
<2> Enable the "key" routing strategy
112113

113114
Internally the client will query the broker to resolve the destination streams for a given routing key, making the routing logic from any exchange type available to streams.
114-
Note the client caches results, it does not query the broker for every message.
115+
Note the client caches results, it does not query the broker for every message.
116+
117+
====== Using a Custom Routing Strategy
118+
119+
The solution that provides the most control over routing is using a custom routing strategy.
120+
This should be needed only for specific cases.
121+
122+
The following code sample shows how to implement a simplistic round-robin `RoutingStrategy` and use it in the producer.
123+
Note this implementation should not be used in production as the modulo operation is not sign-safe for simplicity's sake.
124+
125+
.Setting a round-robin routing strategy
126+
[source,java,indent=0]
127+
--------
128+
include::{test-examples}/SuperStreamUsage.java[tag=producer-custom-routing-strategy]
129+
--------
130+
<1> No need to set the routing key extraction logic
131+
<2> Set the custom routing strategy
132+
133+
====== Deduplication
134+
135+
Deduplication for a super stream producer works the same way as with a <<api.adoc#outbound-message-deduplication, single stream producer>>.
136+
The publishing ID values are spread across the streams but this does affect the mechanism.
137+
138+
===== Consuming From a Super Stream
139+
140+
A super stream consumer is not much different from a single stream consumer.
141+
The `ConsumerBuilder#superStream(String)` must be used to set the super stream to consume from:
142+
143+
.Declaring a super stream consumer
144+
[source,java,indent=0]
145+
--------
146+
include::{test-examples}/SuperStreamUsage.java[tag=consumer-simple]
147+
--------
148+
<1> Set the super stream name
149+
<2> Close the consumer when it is no longer necessary
150+
151+
A super stream consumer is a composite consumer: it will look up the super stream partitions and create a consumer for each or them.
152+
153+
====== Offset Tracking
154+
155+
The semantic of offset tracking for a super stream consumer are roughly the same as for an individual stream consumer.
156+
There are still some subtle differences, so a good understanding of <<api.adoc#consumer-offset-tracking, offset tracking>> in general and of the <<api.adoc#consumer-automatic-offset-tracking,automatic>> and <<api.adoc#consumer-manual-offset-tracking,manual>> offset tracking strategies is recommended.
157+
158+
Here are the main differences for the automatic/manual offset tracking strategies between single and super stream consuming:
159+
160+
* *automatic offset tracking*: internally, _the client divides the `messageCountBeforeStorage` setting by the number of partitions for each individual consumer_.
161+
Imagine a 3-partition super stream, `messageCountBeforeStorage` set to 10,000, and 10,000 messages coming in, perfectly balanced across the partitions (that is about 3,333 messages for each partition).
162+
In this case, the automatic offset tracking strategy will not kick in, because the expected count message has not been reached on any partition.
163+
Making the client divide `messageCountBeforeStorage` by the number of partitions can be considered "more accurate" if the message are well balanced across the partitions.
164+
A good rule of thumb is to then multiply the expected per-stream `messageCountBeforeStorage` by the number of partitions, to avoid storing offsets too often. So the default being 10,000, it can be set to 30,000 for a 3-partition super stream.
165+
* *manual offset tracking*: the `MessageHandler.Context#storeOffset()` method must be used, the `Consumer#store(long)` will fail, because an offset value has a meaning only in one stream, not in other streams.
166+
A call to `MessageHandler.Context#storeOffset()` will store the current message offset in _its_ stream, but also the offset of the last dispatched message for the other streams of the super stream.
167+
168+

src/main/java/com/rabbitmq/stream/ProducerBuilder.java

+4
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,10 @@ public interface ProducerBuilder {
127127
* <p>The default routing strategy hashes the routing key to choose the stream (partition) to send
128128
* the message to.
129129
*
130+
* Note the routing key extraction logic is required only when the built-in routing strategies
131+
* are used. It can set to <code>null</code> when a custom {@link RoutingStrategy} is set
132+
* with {@link #routing(Function)}.
133+
*
130134
* @param routingKeyExtractor the logic to extract a routing key from a message
131135
* @return the routing configuration instance
132136
* @see RoutingConfiguration

src/test/java/com/rabbitmq/stream/docs/SuperStreamUsage.java

+41
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,15 @@
1414

1515
package com.rabbitmq.stream.docs;
1616

17+
import com.rabbitmq.stream.Consumer;
1718
import com.rabbitmq.stream.Environment;
19+
import com.rabbitmq.stream.Message;
20+
import com.rabbitmq.stream.MessageHandler;
1821
import com.rabbitmq.stream.Producer;
22+
import com.rabbitmq.stream.RoutingStrategy;
23+
import java.util.Collections;
24+
import java.util.List;
25+
import java.util.concurrent.atomic.AtomicLong;
1926

2027
public class SuperStreamUsage {
2128

@@ -55,4 +62,38 @@ void producerKeyRoutingStrategy() {
5562
.build();
5663
// end::producer-key-routing-strategy[]
5764
}
65+
66+
void producerCustomRoutingStrategy() {
67+
Environment environment = Environment.builder().build();
68+
// tag::producer-custom-routing-strategy[]
69+
AtomicLong messageCount = new AtomicLong(0);
70+
RoutingStrategy routingStrategy = (message, metadata) -> {
71+
List<String> partitions = metadata.partitions();
72+
String stream = partitions.get(
73+
(int) messageCount.getAndIncrement() % partitions.size()
74+
);
75+
return Collections.singletonList(stream);
76+
};
77+
Producer producer = environment.producerBuilder()
78+
.stream("invoices")
79+
.routing(null) // <1>
80+
.strategy(routingStrategy) // <2>
81+
.producerBuilder()
82+
.build();
83+
// end::producer-custom-routing-strategy[]
84+
}
85+
86+
void consumerSimple() {
87+
Environment environment = Environment.builder().build();
88+
// tag::consumer-simple[]
89+
Consumer consumer = environment.consumerBuilder()
90+
.superStream("invoices") // <1>
91+
.messageHandler((context, message) -> {
92+
// message processing
93+
})
94+
.build();
95+
// ...
96+
consumer.close(); // <2>
97+
// end::consumer-simple[]
98+
}
5899
}

0 commit comments

Comments
 (0)