Skip to content

Commit 5c6fe3c

Browse files
committed
Start super stream documentation
1 parent cf4bddb commit 5c6fe3c

File tree

5 files changed

+175
-2
lines changed

5 files changed

+175
-2
lines changed

src/docs/asciidoc/api.adoc

+1-1
Original file line numberDiff line numberDiff line change
@@ -880,4 +880,4 @@ entry, which has its own offset.
880880

881881
This means one must be careful when basing some decision on offset values, like
882882
a modulo to perform an operation every X messages. As the message offsets have
883-
no guarantee to be contiguous, the operation may not happen exactly every X messages.
883+
no guarantee to be contiguous, the operation may not happen exactly every X messages.

src/docs/asciidoc/index.adoc

+2
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ include::sample-application.adoc[]
2222

2323
include::api.adoc[]
2424

25+
include::super-streams.adoc[]
26+
2527
include::building.adoc[]
2628

2729
include::performance-tool.adoc[]

src/docs/asciidoc/super-streams.adoc

+114
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
:test-examples: ../../test/java/com/rabbitmq/stream/docs
2+
3+
==== Super Streams (Partitioned Streams)
4+
5+
[WARNING]
6+
.Experimental
7+
====
8+
Super streams are an experimental feature, they are subject to change.
9+
====
10+
11+
A super stream is a logical stream made of several individual streams.
12+
In essence, a super stream is a partitioned stream that brings scalability compared to a single stream.
13+
14+
The stream Java client uses the same programming model for super streams as with individual streams, that is the `Producer`, `Consumer`, `Message`, etc API are still valid when super streams are in use.
15+
Application code should not be impacted whether it uses individual or super streams.
16+
17+
==== Topology
18+
19+
A super stream is made of several individual streams, so it can be considered a logical entity rather than an actual physical entity.
20+
The topology of a super stream is based on the https://www.rabbitmq.com/tutorials/amqp-concepts.html[AMQP 0.9.1 model], that is exchange, queues, and bindings between them.
21+
This does not mean AMQP resources are used to transport or store stream messages, it means that they are used to _describe_ the super stream topology, that is the streams it is made of.
22+
23+
Let's take the example of an `invoices` super stream made of 3 streams (i.e. partitions):
24+
25+
* an `invoices` exchange represents the super stream
26+
* the `invoices-0`, `invoices-1`, `invoices-2` streams are the partitions of the super stream (streams are also AMQP queues in RabbitMQ)
27+
* 3 bindings between the exchange and the streams link the super stream to its partitions and represent _routing rules_
28+
29+
.The topology of a super stream is defined with bindings between an exchange and queues
30+
[ditaa]
31+
....
32+
0 +------------+
33+
+----->+ invoices–0 |
34+
| +------------+
35+
+----------+ |
36+
| invoices | | 1 +------------+
37+
| +---+----->+ invoices–1 |
38+
| exchange | | +------------+
39+
+----------+ |
40+
| 2 +------------+
41+
+----->+ invoices–2 |
42+
+------------+
43+
....
44+
45+
When a super stream is in use, the stream Java client queries this information to find out about the partitions of a super stream and the routing rules.
46+
From the application code point of view, using a super stream is mostly configuration-based.
47+
Some logic must also be provided to extract routing information from messages.
48+
49+
==== Publishing to a Super Stream
50+
51+
When the topology of a super stream like the one described above has been set, creating a producer for it is straightforward:
52+
53+
.Creating a Producer for a Super Stream
54+
[source,java,indent=0]
55+
--------
56+
include::{test-examples}/SuperStreamUsage.java[tag=producer-simple]
57+
--------
58+
<1> Use the super stream name
59+
<2> Provide the logic to get the routing key from a message
60+
<3> Create the producer instance
61+
<4> Close the producer when it's no longer necessary
62+
63+
Note that even though the `invoices` super stream is not an actual stream, its name must be used to declare the producer.
64+
Internally the client will figure out the streams that compose the super stream.
65+
The application code must provide the logic to extract a routing key from a message as a `Function<Message, String>`.
66+
The client will hash the routing key to determine the stream to send the message to (using partition list and a modulo operation).
67+
68+
The client uses 32-bit https://en.wikipedia.org/wiki/MurmurHash[MurmurHash3] by default to hash the routing key.
69+
This hash function provides good uniformity, performance, and portability, making it a good default choice, but it is possible to specify a custom hash function:
70+
71+
.Specifying a custom hash function
72+
[source,java,indent=0]
73+
--------
74+
include::{test-examples}/SuperStreamUsage.java[tag=producer-custom-hash-function]
75+
--------
76+
<1> Use `String#hashCode()` to hash the routing key
77+
78+
Note using Java's `hashCode()` method is a debatable choice as potential producers in other languages are unlikely to implement it, making the routing different between producers in different languages.
79+
80+
==== Resolving Routes with Bindings
81+
82+
Hashing the routing key to pick a partition is only one way to route messages to the appropriate streams.
83+
The stream Java client provides another way to resolve streams, based on the routing key _and_ the bindings between the super stream exchange and the streams.
84+
85+
This routing strategy makes sense when the partitioning has a business meaning, e.g. with a partition for a region in the world, like in the diagram below:
86+
87+
.A super stream with a partition for a region in a world
88+
[ditaa]
89+
....
90+
amer +---------------+
91+
+------>+ invoices–amer |
92+
| +---------------+
93+
+----------+ |
94+
| invoices | | emea +---------------+
95+
| +---+------>+ invoices–emea |
96+
| exchange | | +---------------+
97+
+----------+ |
98+
| apac +---------------+
99+
+------>+ invoices–apac |
100+
+---------------+
101+
....
102+
103+
In such a case, the routing key will be a property of the message that represents the region:
104+
105+
.Enabling the "key" routing strategy
106+
[source,java,indent=0]
107+
--------
108+
include::{test-examples}/SuperStreamUsage.java[tag=producer-key-routing-strategy]
109+
--------
110+
<1> Extract the routing key
111+
<2> Enable the "key" routing strategy
112+
113+
Internally the client will query the broker to resolve the destination streams for a given routing key, making the routing logic from any exchange type available to streams.
114+
Note the client caches results, it does not query the broker for every message.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
// Copyright (c) 2020-2021 VMware, Inc. or its affiliates. All rights reserved.
2+
//
3+
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
4+
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
5+
// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL,
6+
// please see LICENSE-APACHE2.
7+
//
8+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
9+
// either express or implied. See the LICENSE file for specific language governing
10+
// rights and limitations of this software.
11+
//
12+
// If you have any questions regarding licensing, please contact us at
13+
14+
15+
package com.rabbitmq.stream.docs;
16+
17+
import com.rabbitmq.stream.Environment;
18+
import com.rabbitmq.stream.Producer;
19+
20+
public class SuperStreamUsage {
21+
22+
void producerSimple() {
23+
Environment environment = Environment.builder().build();
24+
// tag::producer-simple[]
25+
Producer producer = environment.producerBuilder()
26+
.stream("invoices") // <1>
27+
.routing(message -> message.getProperties().getMessageIdAsString()) // <2>
28+
.producerBuilder()
29+
.build(); // <3>
30+
// ...
31+
producer.close(); // <4>
32+
// end::producer-simple[]
33+
}
34+
35+
void producerCustomHashFunction() {
36+
Environment environment = Environment.builder().build();
37+
// tag::producer-custom-hash-function[]
38+
Producer producer = environment.producerBuilder()
39+
.stream("invoices")
40+
.routing(message -> message.getProperties().getMessageIdAsString())
41+
.hash(rk -> rk.hashCode()) // <1>
42+
.producerBuilder()
43+
.build();
44+
// end::producer-custom-hash-function[]
45+
}
46+
47+
void producerKeyRoutingStrategy() {
48+
Environment environment = Environment.builder().build();
49+
// tag::producer-key-routing-strategy[]
50+
Producer producer = environment.producerBuilder()
51+
.stream("invoices")
52+
.routing(msg -> msg.getApplicationProperties().get("region").toString()) // <1>
53+
.key() // <2>
54+
.producerBuilder()
55+
.build();
56+
// end::producer-key-routing-strategy[]
57+
}
58+
}

src/test/java/com/rabbitmq/stream/impl/SuperStreamProducerTest.java

-1
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,6 @@ void allMessagesSentToSuperStreamWithRoutingKeyRoutingShouldBeThenConsumed() thr
180180

181181
@Test
182182
void messageIsNackedIfNoRouteFound() throws Exception {
183-
int messageCount = 10_000;
184183
routingKeys = new String[] {"amer", "emea", "apac"};
185184
declareSuperStreamTopology(connection, superStream, routingKeys);
186185
Producer producer =

0 commit comments

Comments
 (0)