Skip to content

Commit a7ba4d1

Browse files
committed
Make RoutingStratey a public API
The metadata of the super stream are now provided in the route method. This abstraction hides the infrastructure needed to find out about the super stream topology. The user can now have full control over the routing strategy, even though the provided implementations ("hash" and "key") should be enough most of the time. The configuration of the routing strategy has been also refactored to reflect the change and to be simpler.
1 parent 2be8bc7 commit a7ba4d1

9 files changed

+216
-107
lines changed

src/main/java/com/rabbitmq/stream/ConsumerBuilder.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,9 @@ public interface ConsumerBuilder {
2727
ConsumerBuilder stream(String stream);
2828

2929
/**
30-
* Set the consumer to consume from a super stream (partitioned stream). Experimental!
30+
* Set the consumer to consume from a super stream (partitioned stream).
31+
*
32+
* <p>This is an experimental API, subject to change.
3133
*
3234
* @param superStream
3335
* @return this builder instance

src/main/java/com/rabbitmq/stream/ProducerBuilder.java

+65-21
Original file line numberDiff line numberDiff line change
@@ -108,37 +108,81 @@ public interface ProducerBuilder {
108108
ProducerBuilder enqueueTimeout(Duration timeout);
109109

110110
/**
111-
* Routing strategy for super streams. Experimental!
111+
* Create the {@link Producer} instance.
112112
*
113-
* @param routingKeyExtractor
114-
* @param routingType
115-
* @return this builder instance
113+
* @return the configured producer
116114
*/
117-
ProducerBuilder routing(Function<Message, String> routingKeyExtractor, RoutingType routingType);
115+
Producer build();
118116

119117
/**
120-
* Routing strategy for super streams. Experimental!
118+
* Configure the routing for super streams (partitioned streams).
121119
*
122-
* @param routingKeyExtractor
123-
* @param routingType
124-
* @param hash
125-
* @return this builder instance
120+
* <p>This is an experimental API, subject to change.
121+
*
122+
* <p>The to-be-created producer will be a composite producer when this method is called. It will
123+
* use the routing configuration to find out where a message should be routed. The application
124+
* developer must provide the logic to extract a "routing key" from a message, which will decide
125+
* the destination(s) of the message.
126+
*
127+
* <p>The default routing strategy hashes the routing key to choose the stream (partition) to send
128+
* the message to.
129+
*
130+
* @param routingKeyExtractor the logic to extract a routing key from a message
131+
* @return the routing configuration instance
132+
* @see RoutingConfiguration
126133
*/
127-
ProducerBuilder routing(
128-
Function<Message, String> routingKeyExtractor,
129-
RoutingType routingType,
130-
ToIntFunction<String> hash);
134+
RoutingConfiguration routing(Function<Message, String> routingKeyExtractor);
131135

132136
/**
133-
* Create the {@link Producer} instance.
137+
* Routing configuration for super streams (partitioned streams).
134138
*
135-
* @return the configured producer
139+
* <p>This is an experimental API, subject to change.
136140
*/
137-
Producer build();
141+
interface RoutingConfiguration {
142+
143+
/**
144+
* Enable the "hash" routing strategy (the default).
145+
*
146+
* <p>The default hash algorithm is 32-bit MurmurHash3.
147+
*
148+
* @return the routing configuration instance
149+
*/
150+
RoutingConfiguration hash();
151+
152+
/**
153+
* Enable the "hash" routing strategy with a specific hash algorithm.
154+
*
155+
* @param hash
156+
* @return
157+
*/
158+
RoutingConfiguration hash(ToIntFunction<String> hash);
159+
160+
/**
161+
* Enable the "key" routing strategy.
162+
*
163+
* <p>It consists in using the "route" command of the RabbitMQ Stream protocol to determine the
164+
* streams to send a message to.
165+
*
166+
* @return the routing configuration instance
167+
*/
168+
RoutingConfiguration key();
169+
170+
/**
171+
* Set the routing strategy to use.
172+
*
173+
* <p>Providing the routing strategy provides control over the streams a message is routed to
174+
* (routing key extraction logic if relevant and destination(s) decision).
175+
*
176+
* @param routingStrategy
177+
* @return the routing configuration instance
178+
*/
179+
RoutingConfiguration strategy(RoutingStrategy routingStrategy);
138180

139-
/** Routing type when using super streams. Experimental! */
140-
enum RoutingType {
141-
HASH,
142-
KEY
181+
/**
182+
* Go back to the producer builder.
183+
*
184+
* @return the producer builder
185+
*/
186+
ProducerBuilder producerBuilder();
143187
}
144188
}

src/main/java/com/rabbitmq/stream/impl/RoutingStrategy.java renamed to src/main/java/com/rabbitmq/stream/RoutingStrategy.java

+28-4
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,36 @@
1111
//
1212
// If you have any questions regarding licensing, please contact us at
1313
14-
package com.rabbitmq.stream.impl;
14+
package com.rabbitmq.stream;
1515

16-
import com.rabbitmq.stream.Message;
1716
import java.util.List;
17+
import java.util.function.Function;
1818

19-
interface RoutingStrategy {
19+
/**
20+
* Strategy to route outbound messages to appropriate streams.
21+
*
22+
* <p>This is an experimental API, subject to change.
23+
*
24+
* <p>Used for super streams (partitioned stream).
25+
*
26+
* @see ProducerBuilder#routing(Function)
27+
*/
28+
public interface RoutingStrategy {
2029

21-
List<String> route(Message message);
30+
/**
31+
* Where to route a message.
32+
*
33+
* @param message
34+
* @param metadata
35+
* @return
36+
*/
37+
List<String> route(Message message, Metadata metadata);
38+
39+
/** Metadata on the super stream. */
40+
interface Metadata {
41+
42+
List<String> partitions();
43+
44+
List<String> route(String routingKey);
45+
}
2246
}

src/main/java/com/rabbitmq/stream/impl/HashRoutingStrategy.java

+5-18
Original file line numberDiff line numberDiff line change
@@ -14,41 +14,28 @@
1414
package com.rabbitmq.stream.impl;
1515

1616
import com.rabbitmq.stream.Message;
17+
import com.rabbitmq.stream.RoutingStrategy;
1718
import java.util.Collections;
1819
import java.util.List;
19-
import java.util.concurrent.CopyOnWriteArrayList;
2020
import java.util.function.Function;
2121
import java.util.function.ToIntFunction;
22-
import java.util.stream.Collectors;
2322

2423
class HashRoutingStrategy implements RoutingStrategy {
2524

2625
private final Function<Message, String> routingKeyExtractor;
2726

28-
private final StreamEnvironment env;
29-
30-
private final List<List<String>> partitions;
31-
3227
private final ToIntFunction<String> hash;
3328

34-
HashRoutingStrategy(
35-
String superStream,
36-
Function<Message, String> routingKeyExtractor,
37-
StreamEnvironment env,
38-
ToIntFunction<String> hash) {
29+
HashRoutingStrategy(Function<Message, String> routingKeyExtractor, ToIntFunction<String> hash) {
3930
this.routingKeyExtractor = routingKeyExtractor;
40-
this.env = env;
41-
List<String> ps = this.env.locatorOperation(c -> c.partitions(superStream));
42-
this.partitions =
43-
new CopyOnWriteArrayList<>(
44-
ps.stream().map(Collections::singletonList).collect(Collectors.toList()));
4531
this.hash = hash;
4632
}
4733

4834
@Override
49-
public List<String> route(Message message) {
35+
public List<String> route(Message message, Metadata metadata) {
5036
String routingKey = routingKeyExtractor.apply(message);
5137
int hashValue = hash.applyAsInt(routingKey);
52-
return this.partitions.get((hashValue & 0x7FFFFFFF) % this.partitions.size());
38+
List<String> partitions = metadata.partitions();
39+
return Collections.singletonList(partitions.get((hashValue & 0x7FFFFFFF) % partitions.size()));
5340
}
5441
}

src/main/java/com/rabbitmq/stream/impl/RoutingKeyRoutingStrategy.java

+4-14
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
package com.rabbitmq.stream.impl;
1515

1616
import com.rabbitmq.stream.Message;
17+
import com.rabbitmq.stream.RoutingStrategy;
1718
import java.util.List;
1819
import java.util.Map;
1920
import java.util.concurrent.ConcurrentHashMap;
@@ -25,27 +26,16 @@ class RoutingKeyRoutingStrategy implements RoutingStrategy {
2526

2627
private final Map<String, List<String>> routingKeysToStreams = new ConcurrentHashMap<>();
2728

28-
private final StreamEnvironment env;
29-
30-
private final String superStream;
31-
32-
RoutingKeyRoutingStrategy(
33-
String superStream, Function<Message, String> routingKeyExtractor, StreamEnvironment env) {
29+
RoutingKeyRoutingStrategy(Function<Message, String> routingKeyExtractor) {
3430
this.routingKeyExtractor = routingKeyExtractor;
35-
this.env = env;
36-
this.superStream = superStream;
3731
}
3832

3933
@Override
40-
public List<String> route(Message message) {
34+
public List<String> route(Message message, Metadata metadata) {
4135
String routingKey = this.routingKeyExtractor.apply(message);
4236
List<String> streams =
4337
routingKeysToStreams.computeIfAbsent(
44-
routingKey,
45-
routingKey1 -> {
46-
// TODO retry on locator lookup
47-
return env.locatorOperation(c -> c.route(routingKey1, superStream));
48-
});
38+
routingKey, routingKey1 -> metadata.route(routingKey1));
4939
return streams;
5040
}
5141
}

src/main/java/com/rabbitmq/stream/impl/StreamProducerBuilder.java

+68-37
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import com.rabbitmq.stream.Message;
1717
import com.rabbitmq.stream.Producer;
1818
import com.rabbitmq.stream.ProducerBuilder;
19+
import com.rabbitmq.stream.RoutingStrategy;
1920
import com.rabbitmq.stream.StreamException;
2021
import com.rabbitmq.stream.compression.Compression;
2122
import java.lang.reflect.Field;
@@ -45,11 +46,7 @@ class StreamProducerBuilder implements ProducerBuilder {
4546

4647
private Duration enqueueTimeout = Duration.ofSeconds(10);
4748

48-
private Function<Message, String> routingKeyExtractor;
49-
50-
private RoutingType routingType;
51-
52-
private ToIntFunction<String> hash = HashUtils.MURMUR3;
49+
private DefaultRoutingConfiguration routingConfiguration;
5350

5451
StreamProducerBuilder(StreamEnvironment environment) {
5552
this.environment = environment;
@@ -126,32 +123,14 @@ public ProducerBuilder enqueueTimeout(Duration timeout) {
126123
}
127124

128125
@Override
129-
public ProducerBuilder routing(
130-
Function<Message, String> routingKeyExtractor, RoutingType routingType) {
131-
return this.routing(routingKeyExtractor, routingType, HashUtils.MURMUR3);
132-
}
133-
134-
@Override
135-
public ProducerBuilder routing(
136-
Function<Message, String> routingKeyExtractor,
137-
RoutingType routingType,
138-
ToIntFunction<String> hash) {
139-
if (routingKeyExtractor == null || routingType == null) {
140-
throw new IllegalArgumentException(
141-
"both routing key extractor and routing type must be non-null");
142-
}
143-
this.routingKeyExtractor = routingKeyExtractor;
144-
this.routingType = routingType;
145-
if (hash != null) {
146-
this.hash = hash;
147-
}
148-
return this;
126+
public RoutingConfiguration routing(Function<Message, String> routingKeyExtractor) {
127+
this.routingConfiguration = new DefaultRoutingConfiguration(this);
128+
this.routingConfiguration.routingKeyExtractor = routingKeyExtractor;
129+
return this.routingConfiguration;
149130
}
150131

151132
void resetRouting() {
152-
this.routingKeyExtractor = null;
153-
this.routingType = null;
154-
this.hash = null;
133+
this.routingConfiguration = null;
155134
}
156135

157136
public Producer build() {
@@ -164,7 +143,7 @@ public Producer build() {
164143
}
165144
this.environment.maybeInitializeLocator();
166145
Producer producer;
167-
if (this.routingKeyExtractor == null) {
146+
if (this.routingConfiguration == null) {
168147
producer =
169148
new StreamProducer(
170149
name,
@@ -179,14 +158,17 @@ public Producer build() {
179158
environment);
180159
this.environment.addProducer((StreamProducer) producer);
181160
} else {
182-
// FIXME propagate compression to super stream producer
183-
ToIntFunction<String> hashFunction = this.hash == null ? HashUtils.MURMUR3 : this.hash;
184-
RoutingStrategy routingStrategy =
185-
this.routingType == RoutingType.HASH
186-
? new HashRoutingStrategy(
187-
this.stream, this.routingKeyExtractor, this.environment, hashFunction)
188-
: new RoutingKeyRoutingStrategy(
189-
this.stream, this.routingKeyExtractor, this.environment);
161+
RoutingStrategy routingStrategy = this.routingConfiguration.routingStrategy;
162+
if (routingStrategy == null) {
163+
if (this.routingConfiguration.hash == null) {
164+
routingStrategy =
165+
new RoutingKeyRoutingStrategy(this.routingConfiguration.routingKeyExtractor);
166+
} else {
167+
routingStrategy =
168+
new HashRoutingStrategy(
169+
this.routingConfiguration.routingKeyExtractor, this.routingConfiguration.hash);
170+
}
171+
}
190172
producer =
191173
new SuperStreamProducer(this, this.name, this.stream, routingStrategy, this.environment);
192174
}
@@ -205,4 +187,53 @@ StreamProducerBuilder duplicate() {
205187
}
206188
return duplicate;
207189
}
190+
191+
static final class DefaultRoutingConfiguration implements RoutingConfiguration {
192+
193+
private final StreamProducerBuilder producerBuilder;
194+
195+
private Function<Message, String> routingKeyExtractor;
196+
197+
private RoutingStrategy routingStrategy;
198+
199+
private ToIntFunction<String> hash = HashUtils.MURMUR3;
200+
201+
DefaultRoutingConfiguration(StreamProducerBuilder producerBuilder) {
202+
this.producerBuilder = producerBuilder;
203+
}
204+
205+
@Override
206+
public RoutingConfiguration hash() {
207+
if (this.hash == null) {
208+
this.hash = HashUtils.MURMUR3;
209+
}
210+
this.routingStrategy = null;
211+
return this;
212+
}
213+
214+
@Override
215+
public RoutingConfiguration hash(ToIntFunction<String> hash) {
216+
this.hash = hash;
217+
this.routingStrategy = null;
218+
return this;
219+
}
220+
221+
@Override
222+
public RoutingConfiguration key() {
223+
this.hash = null;
224+
this.routingStrategy = null;
225+
return this;
226+
}
227+
228+
@Override
229+
public RoutingConfiguration strategy(RoutingStrategy routingStrategy) {
230+
this.routingStrategy = routingStrategy;
231+
return this;
232+
}
233+
234+
@Override
235+
public ProducerBuilder producerBuilder() {
236+
return this.producerBuilder;
237+
}
238+
}
208239
}

0 commit comments

Comments
 (0)