File tree 4 files changed +37
-0
lines changed
main/java/com/rabbitmq/stream
test/java/com/rabbitmq/stream/impl
4 files changed +37
-0
lines changed Original file line number Diff line number Diff line change @@ -92,6 +92,16 @@ public interface StreamCreator {
92
92
*/
93
93
StreamCreator filterSize (int size );
94
94
95
+ /**
96
+ * Set the number of initial members the stream should have.
97
+ *
98
+ * @param initialMemberCount initial number of nodes
99
+ * @return this creator instance
100
+ * @see <a href="https://www.rabbitmq.com/docs/streams#replication-factor">Initial Replication
101
+ * Factor</a>
102
+ */
103
+ StreamCreator initialMemberCount (int initialMemberCount );
104
+
95
105
/**
96
106
* Configure the super stream to create.
97
107
*
Original file line number Diff line number Diff line change @@ -2744,6 +2744,14 @@ public StreamParametersBuilder filterSize(int size) {
2744
2744
return this ;
2745
2745
}
2746
2746
2747
+ public StreamParametersBuilder initialMemberCount (int initialMemberCount ) {
2748
+ if (initialMemberCount <= 0 ) {
2749
+ throw new IllegalArgumentException ("The initial member count must be greater than 0" );
2750
+ }
2751
+ this .parameters .put ("initial-cluster-size" , String .valueOf (initialMemberCount ));
2752
+ return this ;
2753
+ }
2754
+
2747
2755
public StreamParametersBuilder put (String key , String value ) {
2748
2756
parameters .put (key , value );
2749
2757
return this ;
Original file line number Diff line number Diff line change @@ -86,6 +86,12 @@ public StreamCreator filterSize(int size) {
86
86
return this ;
87
87
}
88
88
89
+ @ Override
90
+ public StreamCreator initialMemberCount (int initialMemberCount ) {
91
+ streamParametersBuilder .initialMemberCount (initialMemberCount );
92
+ return this ;
93
+ }
94
+
89
95
@ Override
90
96
public SuperStreamConfiguration superStream () {
91
97
if (this .superStreamConfiguration == null ) {
Original file line number Diff line number Diff line change @@ -813,4 +813,17 @@ void enforceEntityPerConnectionLimits() {
813
813
executor .shutdownNow ();
814
814
}
815
815
}
816
+
817
+ @ Test
818
+ void brokerShouldAcceptInitialMemberCountArgument (TestInfo info ) {
819
+ String s = streamName (info );
820
+ Environment env = environmentBuilder .build ();
821
+ try {
822
+ env .streamCreator ().name (s ).initialMemberCount (1 ).create ();
823
+ assertThat (env .streamExists (s )).isTrue ();
824
+ } finally {
825
+ env .deleteStream (s );
826
+ env .close ();
827
+ }
828
+ }
816
829
}
You can’t perform that action at this time.
0 commit comments