Skip to content

Commit 31fe4ca

Browse files
committed
Refine filtering API
As-per discussion with @kjnilsson and @ggreen.
1 parent 090d47c commit 31fe4ca

File tree

6 files changed

+15
-12
lines changed

6 files changed

+15
-12
lines changed

src/main/java/com/rabbitmq/stream/ConsumerBuilder.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -230,7 +230,7 @@ interface FilterConfiguration {
230230
FilterConfiguration values(String... filterValues);
231231

232232
/**
233-
* Client-side filtering logic.
233+
* Client-side filtering logic, occurring after the server-side filtering.
234234
*
235235
* <p>It must be consistent with the requested filter {@link #values( String...)} and the {@link
236236
* #matchUnfiltered()} flag.
@@ -239,7 +239,7 @@ interface FilterConfiguration {
239239
* MessageHandler}
240240
* @return this filter configuration instance
241241
*/
242-
FilterConfiguration filter(Predicate<Message> filter);
242+
FilterConfiguration postFilter(Predicate<Message> filter);
243243

244244
/**
245245
* Whether messages without a filter value should be sent as well.

src/main/java/com/rabbitmq/stream/ProducerBuilder.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -136,9 +136,9 @@ public interface ProducerBuilder {
136136
* Logic to extract a filter value from a message.
137137
*
138138
* @param filterValueExtractor
139-
* @return
139+
* @return this builder instance
140140
*/
141-
ProducerBuilder filter(Function<Message, String> filterValueExtractor);
141+
ProducerBuilder filterValue(Function<Message, String> filterValueExtractor);
142142

143143
/**
144144
* Create the {@link Producer} instance.

src/main/java/com/rabbitmq/stream/impl/StreamConsumerBuilder.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -397,7 +397,7 @@ public FilterConfiguration values(String... filterValues) {
397397
}
398398

399399
@Override
400-
public FilterConfiguration filter(Predicate<Message> filter) {
400+
public FilterConfiguration postFilter(Predicate<Message> filter) {
401401
this.filter = filter;
402402
return this;
403403
}

src/main/java/com/rabbitmq/stream/impl/StreamProducerBuilder.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ public ProducerBuilder enqueueTimeout(Duration timeout) {
131131
}
132132

133133
@Override
134-
public ProducerBuilder filter(Function<Message, String> filterValueExtractor) {
134+
public ProducerBuilder filterValue(Function<Message, String> filterValueExtractor) {
135135
this.filterValueExtractor = filterValueExtractor;
136136
return this;
137137
}

src/test/java/com/rabbitmq/stream/docs/FilteringUsage.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ void producerSimple() {
2525
// tag::producer-simple[]
2626
Producer producer = environment.producerBuilder()
2727
.stream("invoices")
28-
.filter(msg ->
28+
.filterValue(msg ->
2929
msg.getApplicationProperties().get("state").toString()) // <1>
3030
.build();
3131
// end::producer-simple[]
@@ -39,7 +39,7 @@ void consumerSimple() {
3939
.stream("invoices")
4040
.filter()
4141
.values(filterValue) // <1>
42-
.filter(msg ->
42+
.postFilter(msg ->
4343
filterValue.equals(msg.getApplicationProperties().get("state"))) // <2>
4444
.builder()
4545
.messageHandler((ctx, msg) -> { })
@@ -56,7 +56,7 @@ void consumerMatchUnfiltered() {
5656
.filter()
5757
.values(filterValue) // <1>
5858
.matchUnfiltered() // <2>
59-
.filter(msg ->
59+
.postFilter(msg ->
6060
filterValue.equals(msg.getApplicationProperties().get("state"))
6161
|| !msg.getApplicationProperties().containsKey("state") // <3>
6262
)

src/test/java/com/rabbitmq/stream/impl/FilteringTest.java

+6-3
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ void publishConsume(String producerName) throws Exception {
9191
consumerBuilder()
9292
.filter()
9393
.values(newFilterValue)
94-
.filter(
94+
.postFilter(
9595
m -> {
9696
receivedMessageCount.incrementAndGet();
9797
return newFilterValue.equals(m.getProperties().getGroupId());
@@ -142,7 +142,7 @@ void matchUnfilteredShouldReturnNullFilteredValueAndFilteredValues(
142142
.filter()
143143
.values(filterValues.get(0))
144144
.matchUnfiltered(matchUnfiltered)
145-
.filter(m -> true)
145+
.postFilter(m -> true)
146146
.builder()
147147
.messageHandler(
148148
(ctx, msg) -> {
@@ -176,7 +176,10 @@ private ConsumerBuilder consumerBuilder() {
176176
private void publish(
177177
int messageCount, String producerName, Supplier<String> filterValueSupplier) {
178178
Producer producer =
179-
producerBuilder().name(producerName).filter(m -> m.getProperties().getGroupId()).build();
179+
producerBuilder()
180+
.name(producerName)
181+
.filterValue(m -> m.getProperties().getGroupId())
182+
.build();
180183
CountDownLatch latch = new CountDownLatch(messageCount);
181184
ConfirmationHandler confirmationHandler = ctx -> latch.countDown();
182185
IntStream.range(0, messageCount)

0 commit comments

Comments
 (0)