diff --git a/src/docs/asciidoc/api.adoc b/src/docs/asciidoc/api.adoc index 2710a96804..446fe1d530 100644 --- a/src/docs/asciidoc/api.adoc +++ b/src/docs/asciidoc/api.adoc @@ -346,8 +346,8 @@ The following table sums up the main settings to create a `Producer`: |`name` |The logical name of the producer. Specify a name to enable -<>. -|`null` (no de-duplication) +<>. +|`null` (no deduplication) |`batchSize` |The maximum number of messages to accumulate before sending them to the broker. @@ -357,7 +357,7 @@ The following table sums up the main settings to create a `Producer`: |[[producer-sub-entry-size-configuration-entry]]The number of messages to put in a sub-entry. A sub-entry is one "slot" in a publishing frame, meaning outbound messages are not only batched in publishing frames, but in sub-entries as well. Use this feature to increase throughput at the cost of increased latency and -potential duplicated messages even when de-duplication is enabled. +potential duplicated messages even when deduplication is enabled. |1 (meaning no use of sub-entry batching) |`maxUnconfirmedMessages` @@ -475,8 +475,8 @@ type system. It provides good interoperability, which allows streams to be accessed as AMQP 0-9-1 queues, without data loss. ==== -[[outbound-message-de-deduplication]] -===== Message De-deduplication +[[outbound-message-deduplication]] +===== Message Deduplication RabbitMQ Stream provides publisher confirms to avoid losing messages: once the broker has persisted a message it sends a confirmation for this message. @@ -490,9 +490,9 @@ Luckily RabbitMQ Stream can detect and filter out duplicated messages, based on 2 client-side elements: the _producer name_ and the _message publishing ID_. [WARNING] -.De-duplication is not guaranteed when using sub-entries batching +.Deduplication is not guaranteed when using sub-entries batching ==== -It is not possible to guarantee de-duplication when +It is not possible to guarantee deduplication when <> is in use. Sub-entry batching is disabled by default and it does not prevent from batching messages in a single publish frame, which can already provide @@ -502,9 +502,9 @@ very high throughput. ====== Setting the Name of a Producer The producer name is set when creating the producer instance, which automatically -enables de-duplication: +enables deduplication: -.Naming a producer to enable message de-duplication +.Naming a producer to enable message deduplication [source,java,indent=0] -------- include::{test-examples}/ProducerUsage.java[tag=producer-with-name] @@ -518,13 +518,13 @@ will automatically recover and retry outstanding messages. The broker will then filter out messages it has already received and persisted. No more duplicates! [IMPORTANT] -.Why setting `confirmTimeout` to 0 when using de-duplication? +.Why setting `confirmTimeout` to 0 when using deduplication? ==== -The point of de-duplication is to avoid duplicates when retrying unconfirmed messages. +The point of deduplication is to avoid duplicates when retrying unconfirmed messages. But why retrying in the first place? To avoid _losing_ messages, that is enforcing _at-least-once_ semantics. If the client does not stubbornly retry messages and gives up at some point, messages can be lost, which maps to _at-most-once_ semantics. This -is why the de-duplication examples set the +is why the deduplication examples set the <> to `Duration.ZERO`: to disable the background task that calls the confirmation callback for outstanding messages that time out. This way the client will do its best to retry messages @@ -539,11 +539,11 @@ stream at the same time. ====== Understanding Publishing ID -The producer name is only one part of the de-duplication mechanism, the other part +The producer name is only one part of the deduplication mechanism, the other part is the _message publishing ID_. If the producer has a name, the client automatically assigns a publishing ID to each outbound message for the producer. The publishing ID is a strictly increasing sequence, starting at 0 and incremented for each message. The default -publishing sequence is good enough for de-duplication, but it is possible to +publishing sequence is good enough for deduplication, but it is possible to assign a publishing ID to each message: .Using an explicit publishing ID @@ -570,7 +570,7 @@ properties (e.g. `messageId`). [IMPORTANT] .Do not mix client-assigned and custom publishing ID ==== -As soon as a producer name is set, message de-duplication is enabled. +As soon as a producer name is set, message deduplication is enabled. It is then possible to let the producer assign a publishing ID to each message or assign custom publishing IDs. *Do one or the other, not both!* ==== diff --git a/src/docs/asciidoc/overview.adoc b/src/docs/asciidoc/overview.adoc index 3dc82698fe..c7e59edba3 100644 --- a/src/docs/asciidoc/overview.adoc +++ b/src/docs/asciidoc/overview.adoc @@ -49,7 +49,7 @@ you need to use an AMQP 0-9-1 client library. RabbitMQ stream provides at-least-once guarantees thanks to the publisher confirm mechanism, which is supported by the stream Java client. -Message <> +Message <> is also supported on the publisher side. [[stream-client-overview]] @@ -63,7 +63,7 @@ to build fast, efficient, and robust client applications. * _administrate streams (creation/deletion) directly from applications._ This can also be useful for development and testing. * _adapt publishing throughput_ thanks to the configurable batch size and flow control. -* _avoid publishing duplicate messages_ thanks to message de-duplication. +* _avoid publishing duplicate messages_ thanks to message deduplication. * _consume asynchronously from streams and resume where left off_ thanks to automatic or manual offset tracking. * _enforce https://blog.rabbitmq.com/posts/2021/07/connecting-to-streams/[best practices] to create client connections_ – to stream leaders for publishers to minimize inter-node traffic and to stream replicas for consumers to offload leaders. diff --git a/src/docs/asciidoc/performance-tool.adoc b/src/docs/asciidoc/performance-tool.adoc index 01983df016..dc875ad858 100644 --- a/src/docs/asciidoc/performance-tool.adoc +++ b/src/docs/asciidoc/performance-tool.adoc @@ -404,6 +404,7 @@ The following command shows how to store the offset every 100,000 messages: java -jar stream-perf-test.jar --store-every 100000 ---- +[[consumer-names]] ===== Consumer Names When using `--store-every` (see above) for <>, @@ -448,6 +449,21 @@ force the offset they start consuming from. With consumer names that do not chan tracking consumers would ignore the specified offset and would start where they left off (this is the purpose of offset tracking). +===== Producer Names +You can use the `--producer-names` option to set the producer names pattern and therefore +enable <> (using the default +publishing sequence starting at 0 and incremented for each message). +The same naming options apply as above in <> with the only +difference that the default pattern is empty (i.e. no deduplication). + +Here is an example of the usage of the `--producer-names` option: + +---- +java -jar stream-perf-test.jar --producer-names %s-%d +---- + +The run will start one producer and will use the `stream-1` producer reference (default stream is `stream` and the number of the producer is 1.) + === Building the Performance Tool To build the uber JAR: diff --git a/src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java b/src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java index 4b37e7b624..c07a4efa4f 100644 --- a/src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java +++ b/src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java @@ -30,6 +30,7 @@ import com.rabbitmq.stream.EnvironmentBuilder.TlsConfiguration; import com.rabbitmq.stream.OffsetSpecification; import com.rabbitmq.stream.Producer; +import com.rabbitmq.stream.ProducerBuilder; import com.rabbitmq.stream.StreamCreator; import com.rabbitmq.stream.StreamCreator.LeaderLocator; import com.rabbitmq.stream.StreamException; @@ -258,6 +259,16 @@ public class StreamPerfTest implements Callable { converter = Utils.OneTo255RangeIntegerTypeConverter.class) private int producersByConnection; + @CommandLine.Option( + names = {"--producer-names", "-pn"}, + description = + "naming strategy for producer names. Valid values are 'uuid' or a pattern with " + + "stream name and producer index as arguments. " + + "If set, a publishing ID is automatically assigned to each outbound message.", + defaultValue = "", + converter = Utils.NameStrategyConverter.class) + private BiFunction producerNameStrategy; + @CommandLine.Option( names = {"--tracking-consumers-by-connection", "-ccbc"}, description = "number of tracking consumers by connection. Value must be between 1 and 255.", @@ -284,7 +295,7 @@ public class StreamPerfTest implements Callable { "naming strategy for consumer names. Valid values are 'uuid' or a pattern with " + "stream name and consumer index as arguments.", defaultValue = "%s-%d", - converter = Utils.ConsumerNameStrategyConverter.class) + converter = Utils.NameStrategyConverter.class) private BiFunction consumerNameStrategy; @CommandLine.Option( @@ -597,10 +608,16 @@ public Integer call() throws Exception { } String stream = stream(); + ProducerBuilder producerBuilder = environment.producerBuilder(); + + String producerName = this.producerNameStrategy.apply(stream, i + 1); + if (producerName != "") { + producerBuilder = + producerBuilder.name(producerName).confirmTimeout(Duration.ZERO); + } Producer producer = - environment - .producerBuilder() + producerBuilder .subEntrySize(this.subEntrySize) .batchSize(this.batchSize) .compression( diff --git a/src/main/java/com/rabbitmq/stream/perf/Utils.java b/src/main/java/com/rabbitmq/stream/perf/Utils.java index 61a300766a..d7a3ee3b5f 100644 --- a/src/main/java/com/rabbitmq/stream/perf/Utils.java +++ b/src/main/java/com/rabbitmq/stream/perf/Utils.java @@ -151,7 +151,7 @@ public ByteCapacity convert(String value) { } } - static class ConsumerNameStrategyConverter + static class NameStrategyConverter implements CommandLine.ITypeConverter> { @Override @@ -159,7 +159,7 @@ public BiFunction convert(String input) { if ("uuid".equals(input)) { return (stream, index) -> UUID.randomUUID().toString(); } else { - return new PatternConsumerNameStrategy(input); + return new PatternNameStrategy(input); } } } @@ -429,11 +429,11 @@ public X509Certificate[] getAcceptedIssuers() { } } - static final class PatternConsumerNameStrategy implements BiFunction { + static final class PatternNameStrategy implements BiFunction { private final String pattern; - PatternConsumerNameStrategy(String pattern) { + PatternNameStrategy(String pattern) { this.pattern = pattern; } diff --git a/src/test/java/com/rabbitmq/stream/perf/StreamPerfTestTest.java b/src/test/java/com/rabbitmq/stream/perf/StreamPerfTestTest.java index 77aaa35864..7d62d537c4 100644 --- a/src/test/java/com/rabbitmq/stream/perf/StreamPerfTestTest.java +++ b/src/test/java/com/rabbitmq/stream/perf/StreamPerfTestTest.java @@ -187,7 +187,7 @@ void offsetShouldBeStoredWhenOptionIsEnabled() throws Exception { void offsetShouldNotBeStoredWhenOptionIsNotEnabled() throws Exception { Future run = run(builder()); waitUntilStreamExists(s); - String consumerName = s + "-0"; // convention + String consumerName = s + "-0"; // default value when offset tracking is enabled assertThat(client.queryOffset(consumerName, s)).isZero(); waitOneSecond(); assertThat(client.queryOffset(consumerName, s)).isZero(); @@ -195,6 +195,30 @@ void offsetShouldNotBeStoredWhenOptionIsNotEnabled() throws Exception { waitRunEnds(); } + @Test + void publishingSequenceShouldBeStoredWhenProducerNamesAreSet() throws Exception { + Future run = run(builder().producerNames("producer-%2$d-on-stream-%1$s")); + waitUntilStreamExists(s); + String producerName = "producer-1-on-stream-" + s; + long seq = client.queryPublisherSequence(producerName, s); + waitOneSecond(); + waitAtMost(() -> client.queryPublisherSequence(producerName, s) > seq); + run.cancel(true); + waitRunEnds(); + } + + @Test + void publishingSequenceShouldNotBeStoredWhenProducerNamesAreNotSet() throws Exception { + Future run = run(builder()); + waitUntilStreamExists(s); + String producerName = s + "-0"; // shooting in the dark here + assertThat(client.queryPublisherSequence(producerName, s)).isZero(); + waitOneSecond(); + assertThat(client.queryPublisherSequence(producerName, s)).isZero(); + run.cancel(true); + waitRunEnds(); + } + @Test @DisabledIfTlsNotEnabled void shouldConnectWithTls() throws Exception { @@ -405,6 +429,11 @@ ArgumentsBuilder consumerNames(String pattern) { return this; } + ArgumentsBuilder producerNames(String pattern) { + arguments.put("producer-names", pattern); + return this; + } + String build() { return this.arguments.entrySet().stream() .map(e -> "--" + e.getKey() + (e.getValue().isEmpty() ? "" : (" " + e.getValue()))) diff --git a/src/test/java/com/rabbitmq/stream/perf/UtilsTest.java b/src/test/java/com/rabbitmq/stream/perf/UtilsTest.java index d9769de05d..504d881576 100644 --- a/src/test/java/com/rabbitmq/stream/perf/UtilsTest.java +++ b/src/test/java/com/rabbitmq/stream/perf/UtilsTest.java @@ -22,7 +22,8 @@ import com.rabbitmq.stream.OffsetSpecification; import com.rabbitmq.stream.compression.Compression; import com.rabbitmq.stream.perf.Utils.CompressionTypeConverter; -import com.rabbitmq.stream.perf.Utils.PatternConsumerNameStrategy; +import com.rabbitmq.stream.perf.Utils.NameStrategyConverter; +import com.rabbitmq.stream.perf.Utils.PatternNameStrategy; import com.rabbitmq.stream.perf.Utils.RangeTypeConverter; import com.rabbitmq.stream.perf.Utils.SniServerNamesConverter; import java.util.Arrays; @@ -30,6 +31,7 @@ import java.util.List; import java.util.Locale; import java.util.Random; +import java.util.UUID; import java.util.function.BiFunction; import java.util.stream.IntStream; import java.util.stream.LongStream; @@ -143,10 +145,36 @@ void compressionTypeConverterKo(String value) { "consumer-%2$d-on-stream-%1$s,consumer-2-on-stream-s1" }) void consumerNameStrategy(String pattern, String expected) { - BiFunction strategy = new PatternConsumerNameStrategy(pattern); + BiFunction strategy = new PatternNameStrategy(pattern); assertThat(strategy.apply("s1", 2)).isEqualTo(expected); } + @Test + void producerConsumerNameStrategyConverterShouldReturnUuidWhenAskedForUuid() { + NameStrategyConverter nameStrategyConverter = new NameStrategyConverter(); + BiFunction nameStrategy = nameStrategyConverter.convert("uuid"); + String name = nameStrategy.apply("stream", 1); + UUID.fromString(name); + assertThat(nameStrategy.apply("stream", 1)).isNotEqualTo(name); + } + + @Test + void producerConsumerNameStrategyConverterShouldReturnEmptyStringWhenPatternIsEmptyString() { + NameStrategyConverter nameStrategyConverter = new NameStrategyConverter(); + BiFunction nameStrategy = nameStrategyConverter.convert(""); + assertThat(nameStrategy.apply("stream", 1)).isEmpty(); + assertThat(nameStrategy.apply("stream", 2)).isEmpty(); + } + + @Test + void producerConsumerNameStrategyConverterShouldReturnPatternStrategyWhenAsked() { + NameStrategyConverter nameStrategyConverter = new NameStrategyConverter(); + BiFunction nameStrategy = + nameStrategyConverter.convert("stream-%s-consumer-%d"); + assertThat(nameStrategy).isInstanceOf(PatternNameStrategy.class); + assertThat(nameStrategy.apply("s1", 2)).isEqualTo("stream-s1-consumer-2"); + } + @Test void sniServerNamesConverter() throws Exception { SniServerNamesConverter converter = new SniServerNamesConverter();