Skip to content

Commit 04d8203

Browse files
committed
Add ProducerBuilder#superStream(String)
Not mandatory, but for consistency with ConsumerBuilder#superStream(String).
1 parent b807cf8 commit 04d8203

File tree

8 files changed

+83
-18
lines changed

8 files changed

+83
-18
lines changed

src/docs/asciidoc/super-streams.adoc

+1-1
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ When the topology of a super stream like the one described above has been set, c
5757
--------
5858
include::{test-examples}/SuperStreamUsage.java[tag=producer-simple]
5959
--------
60-
<1> Use the super stream name
60+
<1> Set the super stream name
6161
<2> Provide the logic to get the routing key from a message
6262
<3> Create the producer instance
6363
<4> Close the producer when it's no longer necessary

src/main/java/com/rabbitmq/stream/ProducerBuilder.java

+11
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,17 @@ public interface ProducerBuilder {
3939
*/
4040
ProducerBuilder stream(String stream);
4141

42+
/**
43+
* The super stream to send messages to.
44+
*
45+
* <p>This is an experimental API, subject to change.
46+
*
47+
* @param superStream
48+
* @return this builder instance
49+
* @see #routing(Function)
50+
*/
51+
ProducerBuilder superStream(String superStream);
52+
4253
/**
4354
* The number of messages to put in a sub-entry of a publish frame.
4455
*

src/main/java/com/rabbitmq/stream/impl/StreamProducerBuilder.java

+32-3
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ class StreamProducerBuilder implements ProducerBuilder {
3030

3131
private String name;
3232

33-
private String stream;
33+
private String stream, superStream;
3434

3535
private int subEntrySize = 1;
3636

@@ -63,6 +63,12 @@ public ProducerBuilder name(String name) {
6363
return this;
6464
}
6565

66+
@Override
67+
public ProducerBuilder superStream(String superStream) {
68+
this.superStream = superStream;
69+
return this;
70+
}
71+
6672
public StreamProducerBuilder batchSize(int batchSize) {
6773
if (batchSize <= 0) {
6874
throw new IllegalArgumentException("the batch size must be greater than 0");
@@ -134,6 +140,12 @@ void resetRouting() {
134140
}
135141

136142
public Producer build() {
143+
if (this.stream == null && this.superStream == null) {
144+
throw new IllegalArgumentException("A stream must be specified");
145+
}
146+
if (this.stream != null && this.superStream != null) {
147+
throw new IllegalArgumentException("Stream and superStream cannot be set at the same time");
148+
}
137149
if (subEntrySize == 1 && compression != null) {
138150
throw new IllegalArgumentException(
139151
"Sub-entry batching must be enabled to enable compression");
@@ -143,7 +155,23 @@ public Producer build() {
143155
}
144156
this.environment.maybeInitializeLocator();
145157
Producer producer;
146-
if (this.routingConfiguration == null) {
158+
159+
if (this.stream != null && this.routingConfiguration != null) {
160+
throw new IllegalArgumentException(
161+
"A super stream must be specified when a routing configuration is set");
162+
}
163+
164+
if (this.routingConfiguration != null && this.superStream == null) {
165+
throw new IllegalArgumentException(
166+
"A super stream must be specified when a routing configuration is set");
167+
}
168+
169+
if (this.routingConfiguration == null && this.superStream != null) {
170+
throw new IllegalArgumentException(
171+
"A routing configuration must specified when a super stream is set");
172+
}
173+
174+
if (this.stream != null) {
147175
producer =
148176
new StreamProducer(
149177
name,
@@ -170,7 +198,8 @@ public Producer build() {
170198
}
171199
}
172200
producer =
173-
new SuperStreamProducer(this, this.name, this.stream, routingStrategy, this.environment);
201+
new SuperStreamProducer(
202+
this, this.name, this.superStream, routingStrategy, this.environment);
174203
}
175204
return producer;
176205
}

src/main/java/com/rabbitmq/stream/impl/SuperStreamProducer.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,8 @@ public void send(Message message, ConfirmationHandler confirmationHandler) {
101101
producers.computeIfAbsent(
102102
stream,
103103
stream1 -> {
104-
Producer p = producerBuilder.duplicate().stream(stream1).build();
104+
Producer p =
105+
producerBuilder.duplicate().superStream(null).stream(stream1).build();
105106
return p;
106107
});
107108
producer.send(message, confirmationHandler);

src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -735,12 +735,15 @@ public Integer call() throws Exception {
735735

736736
java.util.function.Consumer<MessageBuilder> messageBuilderConsumer;
737737
if (this.superStreams) {
738-
producerBuilder.routing(msg -> msg.getProperties().getMessageIdAsString());
738+
producerBuilder
739+
.superStream(stream)
740+
.routing(msg -> msg.getProperties().getMessageIdAsString());
739741
AtomicLong messageIdSequence = new AtomicLong(0);
740742
messageBuilderConsumer =
741743
mg -> mg.properties().messageId(messageIdSequence.getAndIncrement());
742744
} else {
743745
messageBuilderConsumer = mg -> {};
746+
producerBuilder.stream(stream);
744747
}
745748

746749
Producer producer =
@@ -750,7 +753,6 @@ public Integer call() throws Exception {
750753
.compression(
751754
this.compression == Compression.NONE ? null : this.compression)
752755
.maxUnconfirmedMessages(this.confirms)
753-
.stream(stream)
754756
.build();
755757

756758
AtomicLong messageCount = new AtomicLong(0);

src/test/java/com/rabbitmq/stream/docs/SuperStreamUsage.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ void producerSimple() {
3030
Environment environment = Environment.builder().build();
3131
// tag::producer-simple[]
3232
Producer producer = environment.producerBuilder()
33-
.stream("invoices") // <1>
33+
.superStream("invoices") // <1>
3434
.routing(message -> message.getProperties().getMessageIdAsString()) // <2>
3535
.producerBuilder()
3636
.build(); // <3>
@@ -43,7 +43,7 @@ void producerCustomHashFunction() {
4343
Environment environment = Environment.builder().build();
4444
// tag::producer-custom-hash-function[]
4545
Producer producer = environment.producerBuilder()
46-
.stream("invoices")
46+
.superStream("invoices")
4747
.routing(message -> message.getProperties().getMessageIdAsString())
4848
.hash(rk -> rk.hashCode()) // <1>
4949
.producerBuilder()
@@ -55,7 +55,7 @@ void producerKeyRoutingStrategy() {
5555
Environment environment = Environment.builder().build();
5656
// tag::producer-key-routing-strategy[]
5757
Producer producer = environment.producerBuilder()
58-
.stream("invoices")
58+
.superStream("invoices")
5959
.routing(msg -> msg.getApplicationProperties().get("region").toString()) // <1>
6060
.key() // <2>
6161
.producerBuilder()
@@ -75,7 +75,7 @@ void producerCustomRoutingStrategy() {
7575
return Collections.singletonList(stream);
7676
};
7777
Producer producer = environment.producerBuilder()
78-
.stream("invoices")
78+
.superStream("invoices")
7979
.routing(null) // <1>
8080
.strategy(routingStrategy) // <2>
8181
.producerBuilder()

src/test/java/com/rabbitmq/stream/impl/SuperStreamProducerTest.java

+20-4
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import static com.rabbitmq.stream.impl.TestUtils.localhost;
2020
import static com.rabbitmq.stream.impl.TestUtils.waitAtMost;
2121
import static org.assertj.core.api.Assertions.assertThat;
22+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
2223

2324
import com.rabbitmq.client.Connection;
2425
import com.rabbitmq.client.ConnectionFactory;
@@ -75,12 +76,20 @@ void tearDown() throws Exception {
7576
connection.close();
7677
}
7778

79+
@Test
80+
void exceptionShouldBeThrownWhenSuperStreamIsSetAndRoutingIsNotConfigured() {
81+
assertThatThrownBy(() -> environment.producerBuilder().superStream(superStream).build())
82+
.isInstanceOf(IllegalArgumentException.class);
83+
}
84+
7885
@Test
7986
void allMessagesSentToSuperStreamWithHashRoutingShouldBeThenConsumed() throws Exception {
8087
int messageCount = 10_000;
8188
declareSuperStreamTopology(connection, superStream, partitions);
8289
Producer producer =
83-
environment.producerBuilder().stream(superStream)
90+
environment
91+
.producerBuilder()
92+
.superStream(superStream)
8493
.routing(message -> message.getProperties().getMessageIdAsString())
8594
.producerBuilder()
8695
.build();
@@ -133,7 +142,9 @@ void allMessagesSentToSuperStreamWithRoutingKeyRoutingShouldBeThenConsumed() thr
133142
routingKeys = new String[] {"amer", "emea", "apac"};
134143
declareSuperStreamTopology(connection, superStream, routingKeys);
135144
Producer producer =
136-
environment.producerBuilder().stream(superStream)
145+
environment
146+
.producerBuilder()
147+
.superStream(superStream)
137148
.routing(message -> message.getApplicationProperties().get("region").toString())
138149
.key()
139150
.producerBuilder()
@@ -183,7 +194,9 @@ void messageIsNackedIfNoRouteFound() throws Exception {
183194
routingKeys = new String[] {"amer", "emea", "apac"};
184195
declareSuperStreamTopology(connection, superStream, routingKeys);
185196
Producer producer =
186-
environment.producerBuilder().stream(superStream)
197+
environment
198+
.producerBuilder()
199+
.superStream(superStream)
187200
.routing(message -> message.getApplicationProperties().get("region").toString())
188201
.key()
189202
.producerBuilder()
@@ -216,7 +229,10 @@ void getLastPublishingIdShouldReturnLowestValue() throws Exception {
216229
declareSuperStreamTopology(connection, superStream, partitions);
217230
String producerName = "super-stream-application";
218231
Producer producer =
219-
environment.producerBuilder().name(producerName).stream(superStream)
232+
environment
233+
.producerBuilder()
234+
.name(producerName)
235+
.superStream(superStream)
220236
.routing(message -> message.getProperties().getMessageIdAsString())
221237
.producerBuilder()
222238
.build();

src/test/java/com/rabbitmq/stream/impl/SuperStreamTest.java

+9-3
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,9 @@ void allMessagesSentWithHashRoutingShouldBeThenConsumed() throws Exception {
8282
int messageCount = 10_000 * partitions;
8383
declareSuperStreamTopology(connection, superStream, partitions);
8484
Producer producer =
85-
environment.producerBuilder().stream(superStream)
85+
environment
86+
.producerBuilder()
87+
.superStream(superStream)
8688
.routing(message -> message.getProperties().getMessageIdAsString())
8789
.producerBuilder()
8890
.build();
@@ -125,7 +127,9 @@ void allMessagesSentWithRoutingKeyRoutingShouldBeThenConsumed() throws Exception
125127
routingKeys = new String[] {"amer", "emea", "apac"};
126128
declareSuperStreamTopology(connection, superStream, routingKeys);
127129
Producer producer =
128-
environment.producerBuilder().stream(superStream)
130+
environment
131+
.producerBuilder()
132+
.superStream(superStream)
129133
.routing(message -> message.getApplicationProperties().get("region").toString())
130134
.key()
131135
.producerBuilder()
@@ -208,7 +212,9 @@ void allMessagesForSameUserShouldEndUpInSamePartition() throws Exception {
208212
assertThat(latchAssert(consumersReadyLatch)).completes();
209213

210214
Producer producer =
211-
environment.producerBuilder().stream(superStream)
215+
environment
216+
.producerBuilder()
217+
.superStream(superStream)
212218
.routing(
213219
message -> new String(message.getProperties().getUserId(), StandardCharsets.UTF_8))
214220
.producerBuilder()

0 commit comments

Comments
 (0)