Skip to content

Commit 295a10d

Browse files
committed
Add producer names option to stream perf test
If producer names are set, message de-duplication will be enabled. The default behaviour does not change: producer names are empty and de-duplication is therefore disabled.
1 parent 0eb762d commit 295a10d

File tree

7 files changed

+68
-13
lines changed

7 files changed

+68
-13
lines changed

src/docs/asciidoc/api.adoc

+3-3
Original file line numberDiff line numberDiff line change
@@ -346,7 +346,7 @@ The following table sums up the main settings to create a `Producer`:
346346

347347
|`name`
348348
|The logical name of the producer. Specify a name to enable
349-
<<outbound-message-de-deduplication,message de-deduplication>>.
349+
<<outbound-message-de-duplication,message de-duplication>>.
350350
|`null` (no de-duplication)
351351

352352
|`batchSize`
@@ -475,8 +475,8 @@ type system. It provides good interoperability, which allows streams
475475
to be accessed as AMQP 0-9-1 queues, without data loss.
476476
====
477477

478-
[[outbound-message-de-deduplication]]
479-
===== Message De-deduplication
478+
[[outbound-message-de-duplication]]
479+
===== Message De-duplication
480480

481481
RabbitMQ Stream provides publisher confirms to avoid losing messages: once
482482
the broker has persisted a message it sends a confirmation for this message.

src/docs/asciidoc/overview.adoc

+1-1
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ you need to use an AMQP 0-9-1 client library.
4949
RabbitMQ stream provides at-least-once guarantees thanks to the
5050
publisher confirm mechanism, which is supported by the stream Java client.
5151

52-
Message <<api.adoc#outbound-message-de-deduplication,de-duplication>>
52+
Message <<api.adoc#outbound-message-de-duplication,de-duplication>>
5353
is also supported on the publisher side.
5454

5555
[[stream-client-overview]]

src/docs/asciidoc/performance-tool.adoc

+8
Original file line numberDiff line numberDiff line change
@@ -404,6 +404,7 @@ The following command shows how to store the offset every 100,000 messages:
404404
java -jar stream-perf-test.jar --store-every 100000
405405
----
406406

407+
[[consumer-names]]
407408
===== Consumer Names
408409

409410
When using `--store-every` (see above) for <<api.adoc#consumer-offset-tracking, offset tracking>>,
@@ -448,6 +449,13 @@ force the offset they start consuming from. With consumer names that do not chan
448449
tracking consumers would ignore the specified offset and would start where they left off
449450
(this is the purpose of offset tracking).
450451

452+
===== Producer Names
453+
You can use the `--producer-names` option to set the producer names pattern and therefore
454+
enable <<api.adoc#outbound-message-de-duplication, message de-duplication>> (using the default
455+
publishing sequence starting at 0 and incremented for each message).
456+
The same naming options apply as above in <<api.adoc#consumer-names, consumer names>> with the only
457+
difference that the default pattern is empty (i.e. no de-duplication).
458+
451459
=== Building the Performance Tool
452460

453461
To build the uber JAR:

src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java

+21-3
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import com.rabbitmq.stream.EnvironmentBuilder.TlsConfiguration;
3131
import com.rabbitmq.stream.OffsetSpecification;
3232
import com.rabbitmq.stream.Producer;
33+
import com.rabbitmq.stream.ProducerBuilder;
3334
import com.rabbitmq.stream.StreamCreator;
3435
import com.rabbitmq.stream.StreamCreator.LeaderLocator;
3536
import com.rabbitmq.stream.StreamException;
@@ -258,6 +259,16 @@ public class StreamPerfTest implements Callable<Integer> {
258259
converter = Utils.OneTo255RangeIntegerTypeConverter.class)
259260
private int producersByConnection;
260261

262+
@CommandLine.Option(
263+
names = {"--producer-names", "-pn"},
264+
description =
265+
"naming strategy for producer names. Valid values are 'uuid' or a pattern with "
266+
+ "stream name and producer index as arguments. "
267+
+ "If set, a publishing ID is automatically assigned to each outbound message.",
268+
defaultValue = "",
269+
converter = Utils.NameStrategyConverter.class)
270+
private BiFunction<String, Integer, String> producerNameStrategy;
271+
261272
@CommandLine.Option(
262273
names = {"--tracking-consumers-by-connection", "-ccbc"},
263274
description = "number of tracking consumers by connection. Value must be between 1 and 255.",
@@ -284,7 +295,7 @@ public class StreamPerfTest implements Callable<Integer> {
284295
"naming strategy for consumer names. Valid values are 'uuid' or a pattern with "
285296
+ "stream name and consumer index as arguments.",
286297
defaultValue = "%s-%d",
287-
converter = Utils.ConsumerNameStrategyConverter.class)
298+
converter = Utils.NameStrategyConverter.class)
288299
private BiFunction<String, Integer, String> consumerNameStrategy;
289300

290301
@CommandLine.Option(
@@ -597,10 +608,17 @@ public Integer call() throws Exception {
597608
}
598609

599610
String stream = stream();
611+
ProducerBuilder producerBuilder = environment.producerBuilder();
612+
613+
String producerName = this.producerNameStrategy.apply(stream, i + 1);
614+
if (producerName != "") {
615+
producerBuilder = producerBuilder
616+
.name(producerName)
617+
.confirmTimeout(Duration.ZERO);
618+
}
600619

601620
Producer producer =
602-
environment
603-
.producerBuilder()
621+
producerBuilder
604622
.subEntrySize(this.subEntrySize)
605623
.batchSize(this.batchSize)
606624
.compression(

src/main/java/com/rabbitmq/stream/perf/Utils.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -151,15 +151,15 @@ public ByteCapacity convert(String value) {
151151
}
152152
}
153153

154-
static class ConsumerNameStrategyConverter
154+
static class NameStrategyConverter
155155
implements CommandLine.ITypeConverter<BiFunction<String, Integer, String>> {
156156

157157
@Override
158158
public BiFunction<String, Integer, String> convert(String input) {
159159
if ("uuid".equals(input)) {
160160
return (stream, index) -> UUID.randomUUID().toString();
161161
} else {
162-
return new PatternConsumerNameStrategy(input);
162+
return new PatternNameStrategy(input);
163163
}
164164
}
165165
}
@@ -429,11 +429,11 @@ public X509Certificate[] getAcceptedIssuers() {
429429
}
430430
}
431431

432-
static final class PatternConsumerNameStrategy implements BiFunction<String, Integer, String> {
432+
static final class PatternNameStrategy implements BiFunction<String, Integer, String> {
433433

434434
private final String pattern;
435435

436-
PatternConsumerNameStrategy(String pattern) {
436+
PatternNameStrategy(String pattern) {
437437
this.pattern = pattern;
438438
}
439439

src/test/java/com/rabbitmq/stream/perf/StreamPerfTestTest.java

+29
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,30 @@ void offsetShouldNotBeStoredWhenOptionIsNotEnabled() throws Exception {
195195
waitRunEnds();
196196
}
197197

198+
@Test
199+
void publishingSequenceShouldBeStoredWhenProducerNamesAreSet() throws Exception {
200+
Future<?> run = run(builder().producerNames("producer-%2$d-on-stream-%1$s"));
201+
waitUntilStreamExists(s);
202+
String producerName = "producer-1-on-stream-" + s;
203+
long seq = client.queryPublisherSequence(producerName, s);
204+
waitOneSecond();
205+
waitAtMost(() -> client.queryPublisherSequence(producerName, s) > seq);
206+
run.cancel(true);
207+
waitRunEnds();
208+
}
209+
210+
@Test
211+
void publishingSequenceShouldNotBeStoredWhenProducerNamesAreNotSet() throws Exception {
212+
Future<?> run = run(builder());
213+
waitUntilStreamExists(s);
214+
String producerName = s + "-0"; // convention
215+
assertThat(client.queryPublisherSequence(producerName, s)).isZero();
216+
waitOneSecond();
217+
assertThat(client.queryPublisherSequence(producerName, s)).isZero();
218+
run.cancel(true);
219+
waitRunEnds();
220+
}
221+
198222
@Test
199223
@DisabledIfTlsNotEnabled
200224
void shouldConnectWithTls() throws Exception {
@@ -405,6 +429,11 @@ ArgumentsBuilder consumerNames(String pattern) {
405429
return this;
406430
}
407431

432+
ArgumentsBuilder producerNames(String pattern) {
433+
arguments.put("producer-names", pattern);
434+
return this;
435+
}
436+
408437
String build() {
409438
return this.arguments.entrySet().stream()
410439
.map(e -> "--" + e.getKey() + (e.getValue().isEmpty() ? "" : (" " + e.getValue())))

src/test/java/com/rabbitmq/stream/perf/UtilsTest.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
import com.rabbitmq.stream.OffsetSpecification;
2323
import com.rabbitmq.stream.compression.Compression;
2424
import com.rabbitmq.stream.perf.Utils.CompressionTypeConverter;
25-
import com.rabbitmq.stream.perf.Utils.PatternConsumerNameStrategy;
25+
import com.rabbitmq.stream.perf.Utils.PatternNameStrategy;
2626
import com.rabbitmq.stream.perf.Utils.RangeTypeConverter;
2727
import com.rabbitmq.stream.perf.Utils.SniServerNamesConverter;
2828
import java.util.Arrays;
@@ -143,7 +143,7 @@ void compressionTypeConverterKo(String value) {
143143
"consumer-%2$d-on-stream-%1$s,consumer-2-on-stream-s1"
144144
})
145145
void consumerNameStrategy(String pattern, String expected) {
146-
BiFunction<String, Integer, String> strategy = new PatternConsumerNameStrategy(pattern);
146+
BiFunction<String, Integer, String> strategy = new PatternNameStrategy(pattern);
147147
assertThat(strategy.apply("s1", 2)).isEqualTo(expected);
148148
}
149149

0 commit comments

Comments
 (0)