Skip to content

Support message de-duplication in stream-perf-test #21

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jul 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 15 additions & 15 deletions src/docs/asciidoc/api.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -346,8 +346,8 @@ The following table sums up the main settings to create a `Producer`:

|`name`
|The logical name of the producer. Specify a name to enable
<<outbound-message-de-deduplication,message de-deduplication>>.
|`null` (no de-duplication)
<<outbound-message-deduplication,message deduplication>>.
|`null` (no deduplication)

|`batchSize`
|The maximum number of messages to accumulate before sending them to the broker.
Expand All @@ -357,7 +357,7 @@ The following table sums up the main settings to create a `Producer`:
|[[producer-sub-entry-size-configuration-entry]]The number of messages to put in a sub-entry. A sub-entry is one "slot" in a publishing
frame, meaning outbound messages are not only batched in publishing frames, but in sub-entries
as well. Use this feature to increase throughput at the cost of increased latency and
potential duplicated messages even when de-duplication is enabled.
potential duplicated messages even when deduplication is enabled.
|1 (meaning no use of sub-entry batching)

|`maxUnconfirmedMessages`
Expand Down Expand Up @@ -475,8 +475,8 @@ type system. It provides good interoperability, which allows streams
to be accessed as AMQP 0-9-1 queues, without data loss.
====

[[outbound-message-de-deduplication]]
===== Message De-deduplication
[[outbound-message-deduplication]]
===== Message Deduplication

RabbitMQ Stream provides publisher confirms to avoid losing messages: once
the broker has persisted a message it sends a confirmation for this message.
Expand All @@ -490,9 +490,9 @@ Luckily RabbitMQ Stream can detect and filter out duplicated messages, based
on 2 client-side elements: the _producer name_ and the _message publishing ID_.

[WARNING]
.De-duplication is not guaranteed when using sub-entries batching
.Deduplication is not guaranteed when using sub-entries batching
====
It is not possible to guarantee de-duplication when
It is not possible to guarantee deduplication when
<<producer-sub-entry-size-configuration-entry, sub-entry batching>> is in use.
Sub-entry batching is disabled by default and it does not prevent from
batching messages in a single publish frame, which can already provide
Expand All @@ -502,9 +502,9 @@ very high throughput.
====== Setting the Name of a Producer

The producer name is set when creating the producer instance, which automatically
enables de-duplication:
enables deduplication:

.Naming a producer to enable message de-duplication
.Naming a producer to enable message deduplication
[source,java,indent=0]
--------
include::{test-examples}/ProducerUsage.java[tag=producer-with-name]
Expand All @@ -518,13 +518,13 @@ will automatically recover and retry outstanding messages. The broker will then
filter out messages it has already received and persisted. No more duplicates!

[IMPORTANT]
.Why setting `confirmTimeout` to 0 when using de-duplication?
.Why setting `confirmTimeout` to 0 when using deduplication?
====
The point of de-duplication is to avoid duplicates when retrying unconfirmed messages.
The point of deduplication is to avoid duplicates when retrying unconfirmed messages.
But why retrying in the first place? To avoid _losing_ messages, that is enforcing
_at-least-once_ semantics. If the client does not stubbornly retry messages and gives
up at some point, messages can be lost, which maps to _at-most-once_ semantics. This
is why the de-duplication examples set the
is why the deduplication examples set the
<<producer-confirm-timeout-configuration-entry,`confirmTimeout` setting>> to `Duration.ZERO`:
to disable the background task that calls the confirmation callback for outstanding
messages that time out. This way the client will do its best to retry messages
Expand All @@ -539,11 +539,11 @@ stream at the same time.

====== Understanding Publishing ID

The producer name is only one part of the de-duplication mechanism, the other part
The producer name is only one part of the deduplication mechanism, the other part
is the _message publishing ID_. If the producer has a name, the client automatically
assigns a publishing ID to each outbound message for the producer. The publishing ID
is a strictly increasing sequence, starting at 0 and incremented for each message. The default
publishing sequence is good enough for de-duplication, but it is possible to
publishing sequence is good enough for deduplication, but it is possible to
assign a publishing ID to each message:

.Using an explicit publishing ID
Expand All @@ -570,7 +570,7 @@ properties (e.g. `messageId`).
[IMPORTANT]
.Do not mix client-assigned and custom publishing ID
====
As soon as a producer name is set, message de-duplication is enabled.
As soon as a producer name is set, message deduplication is enabled.
It is then possible to let the producer assign a publishing ID to each
message or assign custom publishing IDs. *Do one or the other, not both!*
====
Expand Down
4 changes: 2 additions & 2 deletions src/docs/asciidoc/overview.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ you need to use an AMQP 0-9-1 client library.
RabbitMQ stream provides at-least-once guarantees thanks to the
publisher confirm mechanism, which is supported by the stream Java client.

Message <<api.adoc#outbound-message-de-deduplication,de-duplication>>
Message <<api.adoc#outbound-message-deduplication,deduplication>>
is also supported on the publisher side.

[[stream-client-overview]]
Expand All @@ -63,7 +63,7 @@ to build fast, efficient, and robust client applications.
* _administrate streams (creation/deletion) directly from applications._ This
can also be useful for development and testing.
* _adapt publishing throughput_ thanks to the configurable batch size and flow control.
* _avoid publishing duplicate messages_ thanks to message de-duplication.
* _avoid publishing duplicate messages_ thanks to message deduplication.
* _consume asynchronously from streams and resume where left off_ thanks to
automatic or manual offset tracking.
* _enforce https://blog.rabbitmq.com/posts/2021/07/connecting-to-streams/[best practices] to create client connections_ – to stream leaders for publishers to minimize inter-node traffic and to stream replicas for consumers to offload leaders.
Expand Down
16 changes: 16 additions & 0 deletions src/docs/asciidoc/performance-tool.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,7 @@ The following command shows how to store the offset every 100,000 messages:
java -jar stream-perf-test.jar --store-every 100000
----

[[consumer-names]]
===== Consumer Names

When using `--store-every` (see above) for <<api.adoc#consumer-offset-tracking, offset tracking>>,
Expand Down Expand Up @@ -448,6 +449,21 @@ force the offset they start consuming from. With consumer names that do not chan
tracking consumers would ignore the specified offset and would start where they left off
(this is the purpose of offset tracking).

===== Producer Names
You can use the `--producer-names` option to set the producer names pattern and therefore
enable <<api.adoc#outbound-message-deduplication, message deduplication>> (using the default
publishing sequence starting at 0 and incremented for each message).
The same naming options apply as above in <<api.adoc#consumer-names, consumer names>> with the only
difference that the default pattern is empty (i.e. no deduplication).

Here is an example of the usage of the `--producer-names` option:

----
java -jar stream-perf-test.jar --producer-names %s-%d
----

The run will start one producer and will use the `stream-1` producer reference (default stream is `stream` and the number of the producer is 1.)

=== Building the Performance Tool

To build the uber JAR:
Expand Down
23 changes: 20 additions & 3 deletions src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.rabbitmq.stream.EnvironmentBuilder.TlsConfiguration;
import com.rabbitmq.stream.OffsetSpecification;
import com.rabbitmq.stream.Producer;
import com.rabbitmq.stream.ProducerBuilder;
import com.rabbitmq.stream.StreamCreator;
import com.rabbitmq.stream.StreamCreator.LeaderLocator;
import com.rabbitmq.stream.StreamException;
Expand Down Expand Up @@ -258,6 +259,16 @@ public class StreamPerfTest implements Callable<Integer> {
converter = Utils.OneTo255RangeIntegerTypeConverter.class)
private int producersByConnection;

@CommandLine.Option(
names = {"--producer-names", "-pn"},
description =
"naming strategy for producer names. Valid values are 'uuid' or a pattern with "
+ "stream name and producer index as arguments. "
+ "If set, a publishing ID is automatically assigned to each outbound message.",
defaultValue = "",
converter = Utils.NameStrategyConverter.class)
private BiFunction<String, Integer, String> producerNameStrategy;

@CommandLine.Option(
names = {"--tracking-consumers-by-connection", "-ccbc"},
description = "number of tracking consumers by connection. Value must be between 1 and 255.",
Expand All @@ -284,7 +295,7 @@ public class StreamPerfTest implements Callable<Integer> {
"naming strategy for consumer names. Valid values are 'uuid' or a pattern with "
+ "stream name and consumer index as arguments.",
defaultValue = "%s-%d",
converter = Utils.ConsumerNameStrategyConverter.class)
converter = Utils.NameStrategyConverter.class)
private BiFunction<String, Integer, String> consumerNameStrategy;

@CommandLine.Option(
Expand Down Expand Up @@ -597,10 +608,16 @@ public Integer call() throws Exception {
}

String stream = stream();
ProducerBuilder producerBuilder = environment.producerBuilder();

String producerName = this.producerNameStrategy.apply(stream, i + 1);
if (producerName != "") {
producerBuilder =
producerBuilder.name(producerName).confirmTimeout(Duration.ZERO);
}

Producer producer =
environment
.producerBuilder()
producerBuilder
.subEntrySize(this.subEntrySize)
.batchSize(this.batchSize)
.compression(
Expand Down
8 changes: 4 additions & 4 deletions src/main/java/com/rabbitmq/stream/perf/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -151,15 +151,15 @@ public ByteCapacity convert(String value) {
}
}

static class ConsumerNameStrategyConverter
static class NameStrategyConverter
implements CommandLine.ITypeConverter<BiFunction<String, Integer, String>> {

@Override
public BiFunction<String, Integer, String> convert(String input) {
if ("uuid".equals(input)) {
return (stream, index) -> UUID.randomUUID().toString();
} else {
return new PatternConsumerNameStrategy(input);
return new PatternNameStrategy(input);
}
}
}
Expand Down Expand Up @@ -429,11 +429,11 @@ public X509Certificate[] getAcceptedIssuers() {
}
}

static final class PatternConsumerNameStrategy implements BiFunction<String, Integer, String> {
static final class PatternNameStrategy implements BiFunction<String, Integer, String> {

private final String pattern;

PatternConsumerNameStrategy(String pattern) {
PatternNameStrategy(String pattern) {
this.pattern = pattern;
}

Expand Down
31 changes: 30 additions & 1 deletion src/test/java/com/rabbitmq/stream/perf/StreamPerfTestTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -187,14 +187,38 @@ void offsetShouldBeStoredWhenOptionIsEnabled() throws Exception {
void offsetShouldNotBeStoredWhenOptionIsNotEnabled() throws Exception {
Future<?> run = run(builder());
waitUntilStreamExists(s);
String consumerName = s + "-0"; // convention
String consumerName = s + "-0"; // default value when offset tracking is enabled
assertThat(client.queryOffset(consumerName, s)).isZero();
waitOneSecond();
assertThat(client.queryOffset(consumerName, s)).isZero();
run.cancel(true);
waitRunEnds();
}

@Test
void publishingSequenceShouldBeStoredWhenProducerNamesAreSet() throws Exception {
Future<?> run = run(builder().producerNames("producer-%2$d-on-stream-%1$s"));
waitUntilStreamExists(s);
String producerName = "producer-1-on-stream-" + s;
long seq = client.queryPublisherSequence(producerName, s);
waitOneSecond();
waitAtMost(() -> client.queryPublisherSequence(producerName, s) > seq);
run.cancel(true);
waitRunEnds();
}

@Test
void publishingSequenceShouldNotBeStoredWhenProducerNamesAreNotSet() throws Exception {
Future<?> run = run(builder());
waitUntilStreamExists(s);
String producerName = s + "-0"; // shooting in the dark here
assertThat(client.queryPublisherSequence(producerName, s)).isZero();
waitOneSecond();
assertThat(client.queryPublisherSequence(producerName, s)).isZero();
run.cancel(true);
waitRunEnds();
}

@Test
@DisabledIfTlsNotEnabled
void shouldConnectWithTls() throws Exception {
Expand Down Expand Up @@ -405,6 +429,11 @@ ArgumentsBuilder consumerNames(String pattern) {
return this;
}

ArgumentsBuilder producerNames(String pattern) {
arguments.put("producer-names", pattern);
return this;
}

String build() {
return this.arguments.entrySet().stream()
.map(e -> "--" + e.getKey() + (e.getValue().isEmpty() ? "" : (" " + e.getValue())))
Expand Down
32 changes: 30 additions & 2 deletions src/test/java/com/rabbitmq/stream/perf/UtilsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,16 @@
import com.rabbitmq.stream.OffsetSpecification;
import com.rabbitmq.stream.compression.Compression;
import com.rabbitmq.stream.perf.Utils.CompressionTypeConverter;
import com.rabbitmq.stream.perf.Utils.PatternConsumerNameStrategy;
import com.rabbitmq.stream.perf.Utils.NameStrategyConverter;
import com.rabbitmq.stream.perf.Utils.PatternNameStrategy;
import com.rabbitmq.stream.perf.Utils.RangeTypeConverter;
import com.rabbitmq.stream.perf.Utils.SniServerNamesConverter;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Random;
import java.util.UUID;
import java.util.function.BiFunction;
import java.util.stream.IntStream;
import java.util.stream.LongStream;
Expand Down Expand Up @@ -143,10 +145,36 @@ void compressionTypeConverterKo(String value) {
"consumer-%2$d-on-stream-%1$s,consumer-2-on-stream-s1"
})
void consumerNameStrategy(String pattern, String expected) {
BiFunction<String, Integer, String> strategy = new PatternConsumerNameStrategy(pattern);
BiFunction<String, Integer, String> strategy = new PatternNameStrategy(pattern);
assertThat(strategy.apply("s1", 2)).isEqualTo(expected);
}

@Test
void producerConsumerNameStrategyConverterShouldReturnUuidWhenAskedForUuid() {
NameStrategyConverter nameStrategyConverter = new NameStrategyConverter();
BiFunction<String, Integer, String> nameStrategy = nameStrategyConverter.convert("uuid");
String name = nameStrategy.apply("stream", 1);
UUID.fromString(name);
assertThat(nameStrategy.apply("stream", 1)).isNotEqualTo(name);
}

@Test
void producerConsumerNameStrategyConverterShouldReturnEmptyStringWhenPatternIsEmptyString() {
NameStrategyConverter nameStrategyConverter = new NameStrategyConverter();
BiFunction<String, Integer, String> nameStrategy = nameStrategyConverter.convert("");
assertThat(nameStrategy.apply("stream", 1)).isEmpty();
assertThat(nameStrategy.apply("stream", 2)).isEmpty();
}

@Test
void producerConsumerNameStrategyConverterShouldReturnPatternStrategyWhenAsked() {
NameStrategyConverter nameStrategyConverter = new NameStrategyConverter();
BiFunction<String, Integer, String> nameStrategy =
nameStrategyConverter.convert("stream-%s-consumer-%d");
assertThat(nameStrategy).isInstanceOf(PatternNameStrategy.class);
assertThat(nameStrategy.apply("s1", 2)).isEqualTo("stream-s1-consumer-2");
}

@Test
void sniServerNamesConverter() throws Exception {
SniServerNamesConverter converter = new SniServerNamesConverter();
Expand Down