From 3cc98f94b67a2fb39e1d1356277892fbd0036b3a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Fri, 18 Oct 2024 09:12:53 +0200 Subject: [PATCH] Add initial member count to stream creator --- .../java/com/rabbitmq/stream/StreamCreator.java | 10 ++++++++++ src/main/java/com/rabbitmq/stream/impl/Client.java | 8 ++++++++ .../rabbitmq/stream/impl/StreamStreamCreator.java | 6 ++++++ .../rabbitmq/stream/impl/StreamEnvironmentTest.java | 13 +++++++++++++ 4 files changed, 37 insertions(+) diff --git a/src/main/java/com/rabbitmq/stream/StreamCreator.java b/src/main/java/com/rabbitmq/stream/StreamCreator.java index 73f1360592..b16de24b0a 100644 --- a/src/main/java/com/rabbitmq/stream/StreamCreator.java +++ b/src/main/java/com/rabbitmq/stream/StreamCreator.java @@ -92,6 +92,16 @@ public interface StreamCreator { */ StreamCreator filterSize(int size); + /** + * Set the number of initial members the stream should have. + * + * @param initialMemberCount initial number of nodes + * @return this creator instance + * @see Initial Replication + * Factor + */ + StreamCreator initialMemberCount(int initialMemberCount); + /** * Configure the super stream to create. * diff --git a/src/main/java/com/rabbitmq/stream/impl/Client.java b/src/main/java/com/rabbitmq/stream/impl/Client.java index 1c9d53c123..aa27f430fa 100644 --- a/src/main/java/com/rabbitmq/stream/impl/Client.java +++ b/src/main/java/com/rabbitmq/stream/impl/Client.java @@ -2744,6 +2744,14 @@ public StreamParametersBuilder filterSize(int size) { return this; } + public StreamParametersBuilder initialMemberCount(int initialMemberCount) { + if (initialMemberCount <= 0) { + throw new IllegalArgumentException("The initial member count must be greater than 0"); + } + this.parameters.put("initial-cluster-size", String.valueOf(initialMemberCount)); + return this; + } + public StreamParametersBuilder put(String key, String value) { parameters.put(key, value); return this; diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamStreamCreator.java b/src/main/java/com/rabbitmq/stream/impl/StreamStreamCreator.java index f0928487ec..e639240cde 100644 --- a/src/main/java/com/rabbitmq/stream/impl/StreamStreamCreator.java +++ b/src/main/java/com/rabbitmq/stream/impl/StreamStreamCreator.java @@ -86,6 +86,12 @@ public StreamCreator filterSize(int size) { return this; } + @Override + public StreamCreator initialMemberCount(int initialMemberCount) { + streamParametersBuilder.initialMemberCount(initialMemberCount); + return this; + } + @Override public SuperStreamConfiguration superStream() { if (this.superStreamConfiguration == null) { diff --git a/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentTest.java b/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentTest.java index 064f8f2e2c..6ef7e563cd 100644 --- a/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentTest.java @@ -813,4 +813,17 @@ void enforceEntityPerConnectionLimits() { executor.shutdownNow(); } } + + @Test + void brokerShouldAcceptInitialMemberCountArgument(TestInfo info) { + String s = streamName(info); + Environment env = environmentBuilder.build(); + try { + env.streamCreator().name(s).initialMemberCount(1).create(); + assertThat(env.streamExists(s)).isTrue(); + } finally { + env.deleteStream(s); + env.close(); + } + } }