Skip to content

Commit a02846c

Browse files
committed
Support filter size on stream creation
1 parent 50dfe13 commit a02846c

File tree

4 files changed

+39
-0
lines changed

4 files changed

+39
-0
lines changed

src/main/java/com/rabbitmq/stream/StreamCreator.java

+11
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
package com.rabbitmq.stream;
1515

1616
import java.time.Duration;
17+
import java.util.function.Function;
1718

1819
/** API to configure and create a stream. */
1920
public interface StreamCreator {
@@ -63,6 +64,16 @@ public interface StreamCreator {
6364
*/
6465
StreamCreator leaderLocator(LeaderLocator leaderLocator);
6566

67+
/**
68+
* Set the size of the stream chunk filters.
69+
*
70+
* @param size
71+
* @return this creator instance
72+
* @see ProducerBuilder#filterValue( Function)
73+
* @see ConsumerBuilder#filter()
74+
*/
75+
StreamCreator filterSize(int size);
76+
6677
/**
6778
* Create the stream.
6879
*

src/main/java/com/rabbitmq/stream/impl/Client.java

+8
Original file line numberDiff line numberDiff line change
@@ -2582,6 +2582,14 @@ public StreamParametersBuilder leaderLocator(LeaderLocator leaderLocator) {
25822582
return this;
25832583
}
25842584

2585+
public StreamParametersBuilder filterSize(int size) {
2586+
if (size < 16 || size > 255) {
2587+
throw new IllegalArgumentException("Stream filter size must be between 16 and 255");
2588+
}
2589+
this.parameters.put("stream-filter-size-bytes", String.valueOf(size));
2590+
return this;
2591+
}
2592+
25852593
public StreamParametersBuilder put(String key, String value) {
25862594
parameters.put(key, value);
25872595
return this;

src/main/java/com/rabbitmq/stream/impl/StreamStreamCreator.java

+6
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,12 @@ public StreamCreator leaderLocator(LeaderLocator leaderLocator) {
6767
return this;
6868
}
6969

70+
@Override
71+
public StreamCreator filterSize(int size) {
72+
streamParametersBuilder.filterSize(size);
73+
return this;
74+
}
75+
7076
@Override
7177
public void create() {
7278
if (stream == null) {

src/test/java/com/rabbitmq/stream/impl/FilteringTest.java

+14
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
import static com.rabbitmq.stream.impl.TestUtils.*;
1717
import static org.assertj.core.api.Assertions.assertThat;
18+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
1819

1920
import com.rabbitmq.stream.*;
2021
import io.netty.channel.EventLoopGroup;
@@ -28,6 +29,8 @@
2829
import java.util.stream.IntStream;
2930
import org.junit.jupiter.api.AfterEach;
3031
import org.junit.jupiter.api.BeforeEach;
32+
import org.junit.jupiter.api.Test;
33+
import org.junit.jupiter.api.TestInfo;
3134
import org.junit.jupiter.api.extension.ExtendWith;
3235
import org.junit.jupiter.params.ParameterizedTest;
3336
import org.junit.jupiter.params.provider.CsvSource;
@@ -183,6 +186,17 @@ void matchUnfilteredShouldReturnNullFilteredValueAndFilteredValues(
183186
});
184187
}
185188

189+
@Test
190+
void setFilterSizeOnCreation(TestInfo info) {
191+
String s = streamName(info);
192+
this.environment.streamCreator().stream(s).filterSize(128).create();
193+
this.environment.deleteStream(s);
194+
assertThatThrownBy(() -> this.environment.streamCreator().filterSize(15))
195+
.isInstanceOf(IllegalArgumentException.class);
196+
assertThatThrownBy(() -> this.environment.streamCreator().filterSize(256))
197+
.isInstanceOf(IllegalArgumentException.class);
198+
}
199+
186200
private ProducerBuilder producerBuilder() {
187201
return this.environment.producerBuilder().stream(stream);
188202
}

0 commit comments

Comments
 (0)