From 295a10d5f0e3bb045b284fcec146b90270ac245b Mon Sep 17 00:00:00 2001 From: David Ansari Date: Tue, 27 Jul 2021 18:25:19 +0200 Subject: [PATCH 1/5] Add producer names option to stream perf test If producer names are set, message de-duplication will be enabled. The default behaviour does not change: producer names are empty and de-duplication is therefore disabled. --- src/docs/asciidoc/api.adoc | 6 ++-- src/docs/asciidoc/overview.adoc | 2 +- src/docs/asciidoc/performance-tool.adoc | 8 +++++ .../rabbitmq/stream/perf/StreamPerfTest.java | 24 +++++++++++++-- .../java/com/rabbitmq/stream/perf/Utils.java | 8 ++--- .../stream/perf/StreamPerfTestTest.java | 29 +++++++++++++++++++ .../com/rabbitmq/stream/perf/UtilsTest.java | 4 +-- 7 files changed, 68 insertions(+), 13 deletions(-) diff --git a/src/docs/asciidoc/api.adoc b/src/docs/asciidoc/api.adoc index 2710a96804..8ab0040ea8 100644 --- a/src/docs/asciidoc/api.adoc +++ b/src/docs/asciidoc/api.adoc @@ -346,7 +346,7 @@ The following table sums up the main settings to create a `Producer`: |`name` |The logical name of the producer. Specify a name to enable -<>. +<>. |`null` (no de-duplication) |`batchSize` @@ -475,8 +475,8 @@ type system. It provides good interoperability, which allows streams to be accessed as AMQP 0-9-1 queues, without data loss. ==== -[[outbound-message-de-deduplication]] -===== Message De-deduplication +[[outbound-message-de-duplication]] +===== Message De-duplication RabbitMQ Stream provides publisher confirms to avoid losing messages: once the broker has persisted a message it sends a confirmation for this message. diff --git a/src/docs/asciidoc/overview.adoc b/src/docs/asciidoc/overview.adoc index 3dc82698fe..5da4ee6102 100644 --- a/src/docs/asciidoc/overview.adoc +++ b/src/docs/asciidoc/overview.adoc @@ -49,7 +49,7 @@ you need to use an AMQP 0-9-1 client library. RabbitMQ stream provides at-least-once guarantees thanks to the publisher confirm mechanism, which is supported by the stream Java client. -Message <> +Message <> is also supported on the publisher side. [[stream-client-overview]] diff --git a/src/docs/asciidoc/performance-tool.adoc b/src/docs/asciidoc/performance-tool.adoc index 01983df016..ebd306b825 100644 --- a/src/docs/asciidoc/performance-tool.adoc +++ b/src/docs/asciidoc/performance-tool.adoc @@ -404,6 +404,7 @@ The following command shows how to store the offset every 100,000 messages: java -jar stream-perf-test.jar --store-every 100000 ---- +[[consumer-names]] ===== Consumer Names When using `--store-every` (see above) for <>, @@ -448,6 +449,13 @@ force the offset they start consuming from. With consumer names that do not chan tracking consumers would ignore the specified offset and would start where they left off (this is the purpose of offset tracking). +===== Producer Names +You can use the `--producer-names` option to set the producer names pattern and therefore +enable <> (using the default +publishing sequence starting at 0 and incremented for each message). +The same naming options apply as above in <> with the only +difference that the default pattern is empty (i.e. no de-duplication). + === Building the Performance Tool To build the uber JAR: diff --git a/src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java b/src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java index 4b37e7b624..5446f94b4f 100644 --- a/src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java +++ b/src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java @@ -30,6 +30,7 @@ import com.rabbitmq.stream.EnvironmentBuilder.TlsConfiguration; import com.rabbitmq.stream.OffsetSpecification; import com.rabbitmq.stream.Producer; +import com.rabbitmq.stream.ProducerBuilder; import com.rabbitmq.stream.StreamCreator; import com.rabbitmq.stream.StreamCreator.LeaderLocator; import com.rabbitmq.stream.StreamException; @@ -258,6 +259,16 @@ public class StreamPerfTest implements Callable { converter = Utils.OneTo255RangeIntegerTypeConverter.class) private int producersByConnection; + @CommandLine.Option( + names = {"--producer-names", "-pn"}, + description = + "naming strategy for producer names. Valid values are 'uuid' or a pattern with " + + "stream name and producer index as arguments. " + + "If set, a publishing ID is automatically assigned to each outbound message.", + defaultValue = "", + converter = Utils.NameStrategyConverter.class) + private BiFunction producerNameStrategy; + @CommandLine.Option( names = {"--tracking-consumers-by-connection", "-ccbc"}, description = "number of tracking consumers by connection. Value must be between 1 and 255.", @@ -284,7 +295,7 @@ public class StreamPerfTest implements Callable { "naming strategy for consumer names. Valid values are 'uuid' or a pattern with " + "stream name and consumer index as arguments.", defaultValue = "%s-%d", - converter = Utils.ConsumerNameStrategyConverter.class) + converter = Utils.NameStrategyConverter.class) private BiFunction consumerNameStrategy; @CommandLine.Option( @@ -597,10 +608,17 @@ public Integer call() throws Exception { } String stream = stream(); + ProducerBuilder producerBuilder = environment.producerBuilder(); + + String producerName = this.producerNameStrategy.apply(stream, i + 1); + if (producerName != "") { + producerBuilder = producerBuilder + .name(producerName) + .confirmTimeout(Duration.ZERO); + } Producer producer = - environment - .producerBuilder() + producerBuilder .subEntrySize(this.subEntrySize) .batchSize(this.batchSize) .compression( diff --git a/src/main/java/com/rabbitmq/stream/perf/Utils.java b/src/main/java/com/rabbitmq/stream/perf/Utils.java index 61a300766a..d7a3ee3b5f 100644 --- a/src/main/java/com/rabbitmq/stream/perf/Utils.java +++ b/src/main/java/com/rabbitmq/stream/perf/Utils.java @@ -151,7 +151,7 @@ public ByteCapacity convert(String value) { } } - static class ConsumerNameStrategyConverter + static class NameStrategyConverter implements CommandLine.ITypeConverter> { @Override @@ -159,7 +159,7 @@ public BiFunction convert(String input) { if ("uuid".equals(input)) { return (stream, index) -> UUID.randomUUID().toString(); } else { - return new PatternConsumerNameStrategy(input); + return new PatternNameStrategy(input); } } } @@ -429,11 +429,11 @@ public X509Certificate[] getAcceptedIssuers() { } } - static final class PatternConsumerNameStrategy implements BiFunction { + static final class PatternNameStrategy implements BiFunction { private final String pattern; - PatternConsumerNameStrategy(String pattern) { + PatternNameStrategy(String pattern) { this.pattern = pattern; } diff --git a/src/test/java/com/rabbitmq/stream/perf/StreamPerfTestTest.java b/src/test/java/com/rabbitmq/stream/perf/StreamPerfTestTest.java index 77aaa35864..590ee61721 100644 --- a/src/test/java/com/rabbitmq/stream/perf/StreamPerfTestTest.java +++ b/src/test/java/com/rabbitmq/stream/perf/StreamPerfTestTest.java @@ -195,6 +195,30 @@ void offsetShouldNotBeStoredWhenOptionIsNotEnabled() throws Exception { waitRunEnds(); } + @Test + void publishingSequenceShouldBeStoredWhenProducerNamesAreSet() throws Exception { + Future run = run(builder().producerNames("producer-%2$d-on-stream-%1$s")); + waitUntilStreamExists(s); + String producerName = "producer-1-on-stream-" + s; + long seq = client.queryPublisherSequence(producerName, s); + waitOneSecond(); + waitAtMost(() -> client.queryPublisherSequence(producerName, s) > seq); + run.cancel(true); + waitRunEnds(); + } + + @Test + void publishingSequenceShouldNotBeStoredWhenProducerNamesAreNotSet() throws Exception { + Future run = run(builder()); + waitUntilStreamExists(s); + String producerName = s + "-0"; // convention + assertThat(client.queryPublisherSequence(producerName, s)).isZero(); + waitOneSecond(); + assertThat(client.queryPublisherSequence(producerName, s)).isZero(); + run.cancel(true); + waitRunEnds(); + } + @Test @DisabledIfTlsNotEnabled void shouldConnectWithTls() throws Exception { @@ -405,6 +429,11 @@ ArgumentsBuilder consumerNames(String pattern) { return this; } + ArgumentsBuilder producerNames(String pattern) { + arguments.put("producer-names", pattern); + return this; + } + String build() { return this.arguments.entrySet().stream() .map(e -> "--" + e.getKey() + (e.getValue().isEmpty() ? "" : (" " + e.getValue()))) diff --git a/src/test/java/com/rabbitmq/stream/perf/UtilsTest.java b/src/test/java/com/rabbitmq/stream/perf/UtilsTest.java index d9769de05d..2f7baf49c6 100644 --- a/src/test/java/com/rabbitmq/stream/perf/UtilsTest.java +++ b/src/test/java/com/rabbitmq/stream/perf/UtilsTest.java @@ -22,7 +22,7 @@ import com.rabbitmq.stream.OffsetSpecification; import com.rabbitmq.stream.compression.Compression; import com.rabbitmq.stream.perf.Utils.CompressionTypeConverter; -import com.rabbitmq.stream.perf.Utils.PatternConsumerNameStrategy; +import com.rabbitmq.stream.perf.Utils.PatternNameStrategy; import com.rabbitmq.stream.perf.Utils.RangeTypeConverter; import com.rabbitmq.stream.perf.Utils.SniServerNamesConverter; import java.util.Arrays; @@ -143,7 +143,7 @@ void compressionTypeConverterKo(String value) { "consumer-%2$d-on-stream-%1$s,consumer-2-on-stream-s1" }) void consumerNameStrategy(String pattern, String expected) { - BiFunction strategy = new PatternConsumerNameStrategy(pattern); + BiFunction strategy = new PatternNameStrategy(pattern); assertThat(strategy.apply("s1", 2)).isEqualTo(expected); } From cc67d29e5e9dcb93de2ecb6c85fb619360a5c592 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Wed, 28 Jul 2021 09:01:37 +0200 Subject: [PATCH 2/5] Add tests for producer/consumer name strategy In performance tool. --- .../stream/perf/StreamPerfTestTest.java | 4 +-- .../com/rabbitmq/stream/perf/UtilsTest.java | 27 +++++++++++++++++++ 2 files changed, 29 insertions(+), 2 deletions(-) diff --git a/src/test/java/com/rabbitmq/stream/perf/StreamPerfTestTest.java b/src/test/java/com/rabbitmq/stream/perf/StreamPerfTestTest.java index 590ee61721..7d62d537c4 100644 --- a/src/test/java/com/rabbitmq/stream/perf/StreamPerfTestTest.java +++ b/src/test/java/com/rabbitmq/stream/perf/StreamPerfTestTest.java @@ -187,7 +187,7 @@ void offsetShouldBeStoredWhenOptionIsEnabled() throws Exception { void offsetShouldNotBeStoredWhenOptionIsNotEnabled() throws Exception { Future run = run(builder()); waitUntilStreamExists(s); - String consumerName = s + "-0"; // convention + String consumerName = s + "-0"; // default value when offset tracking is enabled assertThat(client.queryOffset(consumerName, s)).isZero(); waitOneSecond(); assertThat(client.queryOffset(consumerName, s)).isZero(); @@ -211,7 +211,7 @@ void publishingSequenceShouldBeStoredWhenProducerNamesAreSet() throws Exception void publishingSequenceShouldNotBeStoredWhenProducerNamesAreNotSet() throws Exception { Future run = run(builder()); waitUntilStreamExists(s); - String producerName = s + "-0"; // convention + String producerName = s + "-0"; // shooting in the dark here assertThat(client.queryPublisherSequence(producerName, s)).isZero(); waitOneSecond(); assertThat(client.queryPublisherSequence(producerName, s)).isZero(); diff --git a/src/test/java/com/rabbitmq/stream/perf/UtilsTest.java b/src/test/java/com/rabbitmq/stream/perf/UtilsTest.java index 2f7baf49c6..904d85bcb7 100644 --- a/src/test/java/com/rabbitmq/stream/perf/UtilsTest.java +++ b/src/test/java/com/rabbitmq/stream/perf/UtilsTest.java @@ -22,6 +22,7 @@ import com.rabbitmq.stream.OffsetSpecification; import com.rabbitmq.stream.compression.Compression; import com.rabbitmq.stream.perf.Utils.CompressionTypeConverter; +import com.rabbitmq.stream.perf.Utils.NameStrategyConverter; import com.rabbitmq.stream.perf.Utils.PatternNameStrategy; import com.rabbitmq.stream.perf.Utils.RangeTypeConverter; import com.rabbitmq.stream.perf.Utils.SniServerNamesConverter; @@ -30,6 +31,7 @@ import java.util.List; import java.util.Locale; import java.util.Random; +import java.util.UUID; import java.util.function.BiFunction; import java.util.stream.IntStream; import java.util.stream.LongStream; @@ -147,6 +149,31 @@ void consumerNameStrategy(String pattern, String expected) { assertThat(strategy.apply("s1", 2)).isEqualTo(expected); } + @Test + void producerConsumerNameStrategyConverterShouldReturnUuidWhenAskedForUuid() { + NameStrategyConverter nameStrategyConverter = new NameStrategyConverter(); + BiFunction nameStrategy = nameStrategyConverter.convert("uuid"); + String name = nameStrategy.apply("stream", 1); + UUID.fromString(name); + assertThat(nameStrategy.apply("stream", 1)).isNotEqualTo(name); + } + + @Test + void producerConsumerNameStrategyConverterShouldReturnEmptyStringWhenPatternIsEmptyString() { + NameStrategyConverter nameStrategyConverter = new NameStrategyConverter(); + BiFunction nameStrategy = nameStrategyConverter.convert(""); + assertThat(nameStrategy.apply("stream", 1)).isEmpty(); + assertThat(nameStrategy.apply("stream", 2)).isEmpty(); + } + + @Test + void producerConsumerNameStrategyConverterShouldReturnPatternStrategyWhenAsked() { + NameStrategyConverter nameStrategyConverter = new NameStrategyConverter(); + BiFunction nameStrategy = nameStrategyConverter.convert("stream-%s-consumer-%d"); + assertThat(nameStrategy).isInstanceOf(PatternNameStrategy.class); + assertThat(nameStrategy.apply("s1", 2)).isEqualTo("stream-s1-consumer-2"); + } + @Test void sniServerNamesConverter() throws Exception { SniServerNamesConverter converter = new SniServerNamesConverter(); From 737e5622e4fe6859107e282499a8a1510d06b8bb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Wed, 28 Jul 2021 09:06:30 +0200 Subject: [PATCH 3/5] Use "deduplication" instead of "de-duplication" Apparently "deduplication" is a word, so let's use it. --- src/docs/asciidoc/api.adoc | 30 ++++++++++++------------- src/docs/asciidoc/overview.adoc | 4 ++-- src/docs/asciidoc/performance-tool.adoc | 4 ++-- 3 files changed, 19 insertions(+), 19 deletions(-) diff --git a/src/docs/asciidoc/api.adoc b/src/docs/asciidoc/api.adoc index 8ab0040ea8..446fe1d530 100644 --- a/src/docs/asciidoc/api.adoc +++ b/src/docs/asciidoc/api.adoc @@ -346,8 +346,8 @@ The following table sums up the main settings to create a `Producer`: |`name` |The logical name of the producer. Specify a name to enable -<>. -|`null` (no de-duplication) +<>. +|`null` (no deduplication) |`batchSize` |The maximum number of messages to accumulate before sending them to the broker. @@ -357,7 +357,7 @@ The following table sums up the main settings to create a `Producer`: |[[producer-sub-entry-size-configuration-entry]]The number of messages to put in a sub-entry. A sub-entry is one "slot" in a publishing frame, meaning outbound messages are not only batched in publishing frames, but in sub-entries as well. Use this feature to increase throughput at the cost of increased latency and -potential duplicated messages even when de-duplication is enabled. +potential duplicated messages even when deduplication is enabled. |1 (meaning no use of sub-entry batching) |`maxUnconfirmedMessages` @@ -475,8 +475,8 @@ type system. It provides good interoperability, which allows streams to be accessed as AMQP 0-9-1 queues, without data loss. ==== -[[outbound-message-de-duplication]] -===== Message De-duplication +[[outbound-message-deduplication]] +===== Message Deduplication RabbitMQ Stream provides publisher confirms to avoid losing messages: once the broker has persisted a message it sends a confirmation for this message. @@ -490,9 +490,9 @@ Luckily RabbitMQ Stream can detect and filter out duplicated messages, based on 2 client-side elements: the _producer name_ and the _message publishing ID_. [WARNING] -.De-duplication is not guaranteed when using sub-entries batching +.Deduplication is not guaranteed when using sub-entries batching ==== -It is not possible to guarantee de-duplication when +It is not possible to guarantee deduplication when <> is in use. Sub-entry batching is disabled by default and it does not prevent from batching messages in a single publish frame, which can already provide @@ -502,9 +502,9 @@ very high throughput. ====== Setting the Name of a Producer The producer name is set when creating the producer instance, which automatically -enables de-duplication: +enables deduplication: -.Naming a producer to enable message de-duplication +.Naming a producer to enable message deduplication [source,java,indent=0] -------- include::{test-examples}/ProducerUsage.java[tag=producer-with-name] @@ -518,13 +518,13 @@ will automatically recover and retry outstanding messages. The broker will then filter out messages it has already received and persisted. No more duplicates! [IMPORTANT] -.Why setting `confirmTimeout` to 0 when using de-duplication? +.Why setting `confirmTimeout` to 0 when using deduplication? ==== -The point of de-duplication is to avoid duplicates when retrying unconfirmed messages. +The point of deduplication is to avoid duplicates when retrying unconfirmed messages. But why retrying in the first place? To avoid _losing_ messages, that is enforcing _at-least-once_ semantics. If the client does not stubbornly retry messages and gives up at some point, messages can be lost, which maps to _at-most-once_ semantics. This -is why the de-duplication examples set the +is why the deduplication examples set the <> to `Duration.ZERO`: to disable the background task that calls the confirmation callback for outstanding messages that time out. This way the client will do its best to retry messages @@ -539,11 +539,11 @@ stream at the same time. ====== Understanding Publishing ID -The producer name is only one part of the de-duplication mechanism, the other part +The producer name is only one part of the deduplication mechanism, the other part is the _message publishing ID_. If the producer has a name, the client automatically assigns a publishing ID to each outbound message for the producer. The publishing ID is a strictly increasing sequence, starting at 0 and incremented for each message. The default -publishing sequence is good enough for de-duplication, but it is possible to +publishing sequence is good enough for deduplication, but it is possible to assign a publishing ID to each message: .Using an explicit publishing ID @@ -570,7 +570,7 @@ properties (e.g. `messageId`). [IMPORTANT] .Do not mix client-assigned and custom publishing ID ==== -As soon as a producer name is set, message de-duplication is enabled. +As soon as a producer name is set, message deduplication is enabled. It is then possible to let the producer assign a publishing ID to each message or assign custom publishing IDs. *Do one or the other, not both!* ==== diff --git a/src/docs/asciidoc/overview.adoc b/src/docs/asciidoc/overview.adoc index 5da4ee6102..c7e59edba3 100644 --- a/src/docs/asciidoc/overview.adoc +++ b/src/docs/asciidoc/overview.adoc @@ -49,7 +49,7 @@ you need to use an AMQP 0-9-1 client library. RabbitMQ stream provides at-least-once guarantees thanks to the publisher confirm mechanism, which is supported by the stream Java client. -Message <> +Message <> is also supported on the publisher side. [[stream-client-overview]] @@ -63,7 +63,7 @@ to build fast, efficient, and robust client applications. * _administrate streams (creation/deletion) directly from applications._ This can also be useful for development and testing. * _adapt publishing throughput_ thanks to the configurable batch size and flow control. -* _avoid publishing duplicate messages_ thanks to message de-duplication. +* _avoid publishing duplicate messages_ thanks to message deduplication. * _consume asynchronously from streams and resume where left off_ thanks to automatic or manual offset tracking. * _enforce https://blog.rabbitmq.com/posts/2021/07/connecting-to-streams/[best practices] to create client connections_ – to stream leaders for publishers to minimize inter-node traffic and to stream replicas for consumers to offload leaders. diff --git a/src/docs/asciidoc/performance-tool.adoc b/src/docs/asciidoc/performance-tool.adoc index ebd306b825..859ed911ec 100644 --- a/src/docs/asciidoc/performance-tool.adoc +++ b/src/docs/asciidoc/performance-tool.adoc @@ -451,10 +451,10 @@ tracking consumers would ignore the specified offset and would start where they ===== Producer Names You can use the `--producer-names` option to set the producer names pattern and therefore -enable <> (using the default +enable <> (using the default publishing sequence starting at 0 and incremented for each message). The same naming options apply as above in <> with the only -difference that the default pattern is empty (i.e. no de-duplication). +difference that the default pattern is empty (i.e. no deduplication). === Building the Performance Tool From b35bb5f56a8241dc36159bde59641b67f4729333 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Wed, 28 Jul 2021 09:17:35 +0200 Subject: [PATCH 4/5] Add example of --producer-names option usage --- src/docs/asciidoc/performance-tool.adoc | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/src/docs/asciidoc/performance-tool.adoc b/src/docs/asciidoc/performance-tool.adoc index 859ed911ec..dc875ad858 100644 --- a/src/docs/asciidoc/performance-tool.adoc +++ b/src/docs/asciidoc/performance-tool.adoc @@ -456,6 +456,14 @@ publishing sequence starting at 0 and incremented for each message). The same naming options apply as above in <> with the only difference that the default pattern is empty (i.e. no deduplication). +Here is an example of the usage of the `--producer-names` option: + +---- +java -jar stream-perf-test.jar --producer-names %s-%d +---- + +The run will start one producer and will use the `stream-1` producer reference (default stream is `stream` and the number of the producer is 1.) + === Building the Performance Tool To build the uber JAR: From 18f12814c21f73843ae8a28b5e16abe4bb73c4f2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Wed, 28 Jul 2021 09:18:39 +0200 Subject: [PATCH 5/5] Format code --- src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java | 5 ++--- src/test/java/com/rabbitmq/stream/perf/UtilsTest.java | 3 ++- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java b/src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java index 5446f94b4f..c07a4efa4f 100644 --- a/src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java +++ b/src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java @@ -612,9 +612,8 @@ public Integer call() throws Exception { String producerName = this.producerNameStrategy.apply(stream, i + 1); if (producerName != "") { - producerBuilder = producerBuilder - .name(producerName) - .confirmTimeout(Duration.ZERO); + producerBuilder = + producerBuilder.name(producerName).confirmTimeout(Duration.ZERO); } Producer producer = diff --git a/src/test/java/com/rabbitmq/stream/perf/UtilsTest.java b/src/test/java/com/rabbitmq/stream/perf/UtilsTest.java index 904d85bcb7..504d881576 100644 --- a/src/test/java/com/rabbitmq/stream/perf/UtilsTest.java +++ b/src/test/java/com/rabbitmq/stream/perf/UtilsTest.java @@ -169,7 +169,8 @@ void producerConsumerNameStrategyConverterShouldReturnEmptyStringWhenPatternIsEm @Test void producerConsumerNameStrategyConverterShouldReturnPatternStrategyWhenAsked() { NameStrategyConverter nameStrategyConverter = new NameStrategyConverter(); - BiFunction nameStrategy = nameStrategyConverter.convert("stream-%s-consumer-%d"); + BiFunction nameStrategy = + nameStrategyConverter.convert("stream-%s-consumer-%d"); assertThat(nameStrategy).isInstanceOf(PatternNameStrategy.class); assertThat(nameStrategy.apply("s1", 2)).isEqualTo("stream-s1-consumer-2"); }