Skip to content

Commit 6dd42a0

Browse files
authored
Merge pull request #635 from rabbitmq/initial-member-count-in-stream-creator
Add initial member count to stream creator
2 parents 7c9f645 + 3cc98f9 commit 6dd42a0

File tree

4 files changed

+37
-0
lines changed

4 files changed

+37
-0
lines changed

src/main/java/com/rabbitmq/stream/StreamCreator.java

+10
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,16 @@ public interface StreamCreator {
9292
*/
9393
StreamCreator filterSize(int size);
9494

95+
/**
96+
* Set the number of initial members the stream should have.
97+
*
98+
* @param initialMemberCount initial number of nodes
99+
* @return this creator instance
100+
* @see <a href="https://www.rabbitmq.com/docs/streams#replication-factor">Initial Replication
101+
* Factor</a>
102+
*/
103+
StreamCreator initialMemberCount(int initialMemberCount);
104+
95105
/**
96106
* Configure the super stream to create.
97107
*

src/main/java/com/rabbitmq/stream/impl/Client.java

+8
Original file line numberDiff line numberDiff line change
@@ -2744,6 +2744,14 @@ public StreamParametersBuilder filterSize(int size) {
27442744
return this;
27452745
}
27462746

2747+
public StreamParametersBuilder initialMemberCount(int initialMemberCount) {
2748+
if (initialMemberCount <= 0) {
2749+
throw new IllegalArgumentException("The initial member count must be greater than 0");
2750+
}
2751+
this.parameters.put("initial-cluster-size", String.valueOf(initialMemberCount));
2752+
return this;
2753+
}
2754+
27472755
public StreamParametersBuilder put(String key, String value) {
27482756
parameters.put(key, value);
27492757
return this;

src/main/java/com/rabbitmq/stream/impl/StreamStreamCreator.java

+6
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,12 @@ public StreamCreator filterSize(int size) {
8686
return this;
8787
}
8888

89+
@Override
90+
public StreamCreator initialMemberCount(int initialMemberCount) {
91+
streamParametersBuilder.initialMemberCount(initialMemberCount);
92+
return this;
93+
}
94+
8995
@Override
9096
public SuperStreamConfiguration superStream() {
9197
if (this.superStreamConfiguration == null) {

src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentTest.java

+13
Original file line numberDiff line numberDiff line change
@@ -813,4 +813,17 @@ void enforceEntityPerConnectionLimits() {
813813
executor.shutdownNow();
814814
}
815815
}
816+
817+
@Test
818+
void brokerShouldAcceptInitialMemberCountArgument(TestInfo info) {
819+
String s = streamName(info);
820+
Environment env = environmentBuilder.build();
821+
try {
822+
env.streamCreator().name(s).initialMemberCount(1).create();
823+
assertThat(env.streamExists(s)).isTrue();
824+
} finally {
825+
env.deleteStream(s);
826+
env.close();
827+
}
828+
}
816829
}

0 commit comments

Comments
 (0)