Skip to content

Commit 0eb0dab

Browse files
committed
Document filtering
1 parent 4c908f3 commit 0eb0dab

File tree

5 files changed

+190
-0
lines changed

5 files changed

+190
-0
lines changed

src/docs/asciidoc/advanced-topics.adoc

+70
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,76 @@
22

33
=== Advanced Topics
44

5+
==== Filtering
6+
7+
RabbitMQ Stream provides a server-side filtering feature that avoids reading all the messages of a stream and filtering only on the client side.
8+
This helps to save network bandwidth when a consuming application needs only a subset of messages, e.g. the messages from a given geographical region.
9+
10+
The filtering feature works as follows:
11+
12+
* each message is published with an associated _filter value_
13+
* a consumer that wants to enable filtering must:
14+
** define one or several filter values
15+
** define some client-side filtering logic
16+
17+
Why does the consumer need to define some client-side filtering logic?
18+
Because the server-side filtering is probabilistic: messages that do not match the filter value(s) can still be sent to the consumer.
19+
The server uses a https://en.wikipedia.org/wiki/Bloom_filter[bloom filter], _a space-efficient probabilistic data structure_, where false positives are possible.
20+
Despite this, the filtering saves some bandwidth, which is its primary goal.
21+
22+
===== Filtering on the Publishing Side
23+
24+
Filtering on the publishing side consists in defining some logic to extract the filter value from a message.
25+
The following snippet shows how to extract the filter value from an application property:
26+
27+
.Declaring a producer with logic to extract a filter value from each message
28+
[source,java,indent=0]
29+
--------
30+
include::{test-examples}/FilteringUsage.java[tag=producer-simple]
31+
--------
32+
<1> Get filter value from `state` application property
33+
34+
Note the filter value can be null: the message is then published in a regular way.
35+
It is called in this context an _unfiltered_ message.
36+
37+
===== Filtering on the Consuming Side
38+
39+
A consumer needs to set up one or several filter values and some filtering logic to enable filtering.
40+
The filtering logic must be consistent with the filter values.
41+
In the next snippet, the consumer wants to process only messages from the state of California.
42+
It sets a filter value to `california` and a predicate that accepts a message only if the `state` application properties is `california`:
43+
44+
.Declaring a consumer with a filter value and filtering logic
45+
[source,java,indent=0]
46+
--------
47+
include::{test-examples}/FilteringUsage.java[tag=consumer-simple]
48+
--------
49+
<1> Set filter value
50+
<2> Set filtering logic
51+
52+
The filter logic is a `Predicate<Message>`.
53+
It must return `true` if a message is accepted, following the same semantics as `java.util.stream.Stream#filter(Predicate)`.
54+
55+
As stated above, not all messages must have an associated filter value.
56+
Many applications may not need some filtering, so they can publish messages the regular way.
57+
And a stream can contain messages with and without an associated filter value.
58+
59+
By default, messages without a filter value (a.k.a _unfiltered_ messages) are not sent to a consumer that enabled filtering.
60+
61+
But what if a consumer wants to process messages with a filter value and messages without any filter value as well?
62+
It must use the `matchUnfiltered()` method in its declaration and also make sure to keep the filtering logic consistent:
63+
64+
.Declaring a consumer with a filter value and filtering logic
65+
[source,java,indent=0]
66+
--------
67+
include::{test-examples}/FilteringUsage.java[tag=consumer-match-unfiltered]
68+
--------
69+
<1> Request messages from California
70+
<2> Request messages without a filter value as well
71+
<3> Let both types of messages pass
72+
73+
In the example above, the filtering logic has been adapted to let pass `california` messages _and_ messages without a state set as well.
74+
575
==== Using Native `epoll`
676

777
The stream Java client uses the https://netty.io/[Netty] network framework and its Java NIO transport implementation by default.

src/main/java/com/rabbitmq/stream/ConsumerBuilder.java

+42
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,11 @@ public interface ConsumerBuilder {
152152
*/
153153
ConsumerBuilder noTrackingStrategy();
154154

155+
/**
156+
* Configure the filtering.
157+
*
158+
* @return the filtering configuration
159+
*/
155160
FilterConfiguration filter();
156161

157162
/**
@@ -213,16 +218,53 @@ interface AutoTrackingStrategy {
213218
ConsumerBuilder builder();
214219
}
215220

221+
/** Filter configuration. */
216222
interface FilterConfiguration {
217223

224+
/**
225+
* Set the filter values.
226+
*
227+
* @param filterValues
228+
* @return this filter configuration instance
229+
*/
218230
FilterConfiguration values(String... filterValues);
219231

232+
/**
233+
* Client-side filtering logic.
234+
*
235+
* <p>It must be consistent with the requested filter {@link #values( String...)} and the {@link
236+
* #matchUnfiltered()} flag.
237+
*
238+
* @param filter a predicate that returns <code>true</code> if a message should go to the {@link
239+
* MessageHandler}
240+
* @return this filter configuration instance
241+
*/
220242
FilterConfiguration filter(Predicate<Message> filter);
221243

244+
/**
245+
* Whether messages without a filter value should be sent as well.
246+
*
247+
* <p>Default is false.
248+
*
249+
* @return this filter configuration instance
250+
*/
222251
FilterConfiguration matchUnfiltered();
223252

253+
/**
254+
* Whether messages without a filter value should be sent as well.
255+
*
256+
* <p>Default is false.
257+
*
258+
* @param matchUnfiltered
259+
* @return this filter configuration instance
260+
*/
224261
FilterConfiguration matchUnfiltered(boolean matchUnfiltered);
225262

263+
/**
264+
* Go back to the builder.
265+
*
266+
* @return the consumer builder
267+
*/
226268
ConsumerBuilder builder();
227269
}
228270
}

src/main/java/com/rabbitmq/stream/ProducerBuilder.java

+6
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,12 @@ public interface ProducerBuilder {
132132
*/
133133
ProducerBuilder enqueueTimeout(Duration timeout);
134134

135+
/**
136+
* Logic to extract a filter value from a message.
137+
*
138+
* @param filterValueExtractor
139+
* @return
140+
*/
135141
ProducerBuilder filter(Function<Message, String> filterValueExtractor);
136142

137143
/**

src/main/java/com/rabbitmq/stream/impl/StreamConsumerBuilder.java

+3
Original file line numberDiff line numberDiff line change
@@ -389,6 +389,9 @@ private DefaultFilterConfiguration(StreamConsumerBuilder builder) {
389389

390390
@Override
391391
public FilterConfiguration values(String... filterValues) {
392+
if (filterValues == null || filterValues.length == 0) {
393+
throw new IllegalArgumentException("At least one filter value must be specified");
394+
}
392395
this.filterValues = Arrays.asList(filterValues);
393396
return this;
394397
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
// Copyright (c) 2023 VMware, Inc. or its affiliates. All rights reserved.
2+
//
3+
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
4+
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
5+
// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL,
6+
// please see LICENSE-APACHE2.
7+
//
8+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
9+
// either express or implied. See the LICENSE file for specific language governing
10+
// rights and limitations of this software.
11+
//
12+
// If you have any questions regarding licensing, please contact us at
13+
14+
15+
package com.rabbitmq.stream.docs;
16+
17+
import com.rabbitmq.stream.Consumer;
18+
import com.rabbitmq.stream.Environment;
19+
import com.rabbitmq.stream.Producer;
20+
21+
public class FilteringUsage {
22+
23+
void producerSimple() {
24+
Environment environment = Environment.builder().build();
25+
// tag::producer-simple[]
26+
Producer producer = environment.producerBuilder()
27+
.stream("invoices")
28+
.filter(msg ->
29+
msg.getApplicationProperties().get("state").toString()) // <1>
30+
.build();
31+
// end::producer-simple[]
32+
}
33+
34+
void consumerSimple() {
35+
Environment environment = Environment.builder().build();
36+
// tag::consumer-simple[]
37+
String filterValue = "california";
38+
Consumer consumer = environment.consumerBuilder()
39+
.stream("invoices")
40+
.filter()
41+
.values(filterValue) // <1>
42+
.filter(msg ->
43+
filterValue.equals(msg.getApplicationProperties().get("state"))) // <2>
44+
.builder()
45+
.messageHandler((ctx, msg) -> { })
46+
.build();
47+
// end::consumer-simple[]
48+
}
49+
50+
void consumerMatchUnfiltered() {
51+
Environment environment = Environment.builder().build();
52+
// tag::consumer-match-unfiltered[]
53+
String filterValue = "california";
54+
Consumer consumer = environment.consumerBuilder()
55+
.stream("invoices")
56+
.filter()
57+
.values(filterValue) // <1>
58+
.matchUnfiltered() // <2>
59+
.filter(msg ->
60+
filterValue.equals(msg.getApplicationProperties().get("state"))
61+
|| !msg.getApplicationProperties().containsKey("state") // <3>
62+
)
63+
.builder()
64+
.messageHandler((ctx, msg) -> { })
65+
.build();
66+
// end::consumer-match-unfiltered[]
67+
}
68+
69+
}

0 commit comments

Comments
 (0)