diff --git a/src/main/java/com/rabbitmq/stream/StreamCreator.java b/src/main/java/com/rabbitmq/stream/StreamCreator.java index 73f1360592..b16de24b0a 100644 --- a/src/main/java/com/rabbitmq/stream/StreamCreator.java +++ b/src/main/java/com/rabbitmq/stream/StreamCreator.java @@ -92,6 +92,16 @@ public interface StreamCreator { */ StreamCreator filterSize(int size); + /** + * Set the number of initial members the stream should have. + * + * @param initialMemberCount initial number of nodes + * @return this creator instance + * @see Initial Replication + * Factor + */ + StreamCreator initialMemberCount(int initialMemberCount); + /** * Configure the super stream to create. * diff --git a/src/main/java/com/rabbitmq/stream/impl/Client.java b/src/main/java/com/rabbitmq/stream/impl/Client.java index 1c9d53c123..aa27f430fa 100644 --- a/src/main/java/com/rabbitmq/stream/impl/Client.java +++ b/src/main/java/com/rabbitmq/stream/impl/Client.java @@ -2744,6 +2744,14 @@ public StreamParametersBuilder filterSize(int size) { return this; } + public StreamParametersBuilder initialMemberCount(int initialMemberCount) { + if (initialMemberCount <= 0) { + throw new IllegalArgumentException("The initial member count must be greater than 0"); + } + this.parameters.put("initial-cluster-size", String.valueOf(initialMemberCount)); + return this; + } + public StreamParametersBuilder put(String key, String value) { parameters.put(key, value); return this; diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamStreamCreator.java b/src/main/java/com/rabbitmq/stream/impl/StreamStreamCreator.java index f0928487ec..e639240cde 100644 --- a/src/main/java/com/rabbitmq/stream/impl/StreamStreamCreator.java +++ b/src/main/java/com/rabbitmq/stream/impl/StreamStreamCreator.java @@ -86,6 +86,12 @@ public StreamCreator filterSize(int size) { return this; } + @Override + public StreamCreator initialMemberCount(int initialMemberCount) { + streamParametersBuilder.initialMemberCount(initialMemberCount); + return this; + } + @Override public SuperStreamConfiguration superStream() { if (this.superStreamConfiguration == null) { diff --git a/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentTest.java b/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentTest.java index 064f8f2e2c..6ef7e563cd 100644 --- a/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentTest.java @@ -813,4 +813,17 @@ void enforceEntityPerConnectionLimits() { executor.shutdownNow(); } } + + @Test + void brokerShouldAcceptInitialMemberCountArgument(TestInfo info) { + String s = streamName(info); + Environment env = environmentBuilder.build(); + try { + env.streamCreator().name(s).initialMemberCount(1).create(); + assertThat(env.streamExists(s)).isTrue(); + } finally { + env.deleteStream(s); + env.close(); + } + } }