Skip to content

Commit d143e10

Browse files
authored
Merge pull request #343 from rabbitmq/stream-filtering
Support for stream filtering
2 parents 6df7fda + 78b3108 commit d143e10

28 files changed

+1525
-130
lines changed

pom.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@
9292
<buildnumber.plugin.version>1.4</buildnumber.plugin.version>
9393
<jmh.version>1.36</jmh.version>
9494
<spotless.version>2.37.0</spotless.version>
95-
<google-java-format.version>1.15.0</google-java-format.version>
95+
<google-java-format.version>1.17.0</google-java-format.version>
9696
<jacoco.version>0.8.10</jacoco.version>
9797
<!-- for documentation -->
9898
<broker.version>3.12</broker.version>

src/docs/asciidoc/advanced-topics.adoc

+84
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,90 @@
22

33
=== Advanced Topics
44

5+
==== Filtering
6+
7+
WARNING: Filtering requires *RabbitMQ 3.13* or more.
8+
9+
RabbitMQ Stream provides a server-side filtering feature that avoids reading all the messages of a stream and filtering only on the client side.
10+
This helps to save network bandwidth when a consuming application needs only a subset of messages, e.g. the messages from a given geographical region.
11+
12+
The filtering feature works as follows:
13+
14+
* each message is published with an associated _filter value_
15+
* a consumer that wants to enable filtering must:
16+
** define one or several filter values
17+
** define some client-side filtering logic
18+
19+
Why does the consumer need to define some client-side filtering logic?
20+
Because the server-side filtering is probabilistic: messages that do not match the filter value(s) can still be sent to the consumer.
21+
The server uses a https://en.wikipedia.org/wiki/Bloom_filter[Bloom filter], _a space-efficient probabilistic data structure_, where false positives are possible.
22+
Despite this, the filtering saves some bandwidth, which is its primary goal.
23+
24+
===== Filtering on the Publishing Side
25+
26+
Filtering on the publishing side consists in defining some logic to extract the filter value from a message.
27+
The following snippet shows how to extract the filter value from an application property:
28+
29+
.Declaring a producer with logic to extract a filter value from each message
30+
[source,java,indent=0]
31+
--------
32+
include::{test-examples}/FilteringUsage.java[tag=producer-simple]
33+
--------
34+
<1> Get filter value from `state` application property
35+
36+
Note the filter value can be null: the message is then published in a regular way.
37+
It is called in this context an _unfiltered_ message.
38+
39+
===== Filtering on the Consuming Side
40+
41+
A consumer needs to set up one or several filter values and some filtering logic to enable filtering.
42+
The filtering logic must be consistent with the filter values.
43+
In the next snippet, the consumer wants to process only messages from the state of California.
44+
It sets a filter value to `california` and a predicate that accepts a message only if the `state` application properties is `california`:
45+
46+
.Declaring a consumer with a filter value and filtering logic
47+
[source,java,indent=0]
48+
--------
49+
include::{test-examples}/FilteringUsage.java[tag=consumer-simple]
50+
--------
51+
<1> Set filter value
52+
<2> Set filtering logic
53+
54+
The filter logic is a `Predicate<Message>`.
55+
It must return `true` if a message is accepted, following the same semantics as `java.util.stream.Stream#filter(Predicate)`.
56+
57+
As stated above, not all messages must have an associated filter value.
58+
Many applications may not need some filtering, so they can publish messages the regular way.
59+
So a stream can contain messages with and without an associated filter value.
60+
61+
By default, messages without a filter value (a.k.a _unfiltered_ messages) are not sent to a consumer that enabled filtering.
62+
63+
But what if a consumer wants to process messages with a filter value and messages without any filter value as well?
64+
It must use the `matchUnfiltered()` method in its declaration and also make sure to keep the filtering logic consistent:
65+
66+
.Getting unfiltered messages as well when enabling filtering
67+
[source,java,indent=0]
68+
--------
69+
include::{test-examples}/FilteringUsage.java[tag=consumer-match-unfiltered]
70+
--------
71+
<1> Request messages from California
72+
<2> Request messages without a filter value as well
73+
<3> Let both types of messages pass
74+
75+
In the example above, the filtering logic has been adapted to let pass `california` messages _and_ messages without a state set as well.
76+
77+
===== Considerations on Filtering
78+
79+
As stated previously, the server can send messages that do not match the filter value(s) set by consumers.
80+
This is why application developers must be very careful with the filtering logic they define to avoid processing unwanted messages.
81+
82+
What are good candidates for filter values?
83+
Unique identifiers are _not_: if you know a given message property will be unique in a stream, do not use it as a filter value.
84+
A defined set of values shared across the messages is a good candidate: geographical locations (e.g. countries, states), document types in a stream that stores document information (e.g. payslip, invoice, order), categories of products (e.g. book, luggage, toy).
85+
86+
Cardinality of filter values can be from a few to a few thousands.
87+
Extreme cardinality (a couple or dozens of thousands) can make filtering less efficient.
88+
589
==== Using Native `epoll`
690

791
The stream Java client uses the https://netty.io/[Netty] network framework and its Java NIO transport implementation by default.

src/main/java/com/rabbitmq/stream/ConsumerBuilder.java

+64
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
package com.rabbitmq.stream;
1515

1616
import java.time.Duration;
17+
import java.util.function.Predicate;
1718

1819
/** API to configure and create a {@link Consumer}. */
1920
public interface ConsumerBuilder {
@@ -151,6 +152,15 @@ public interface ConsumerBuilder {
151152
*/
152153
ConsumerBuilder noTrackingStrategy();
153154

155+
/**
156+
* Configure the filtering.
157+
*
158+
* <p>RabbitMQ 3.13 or more is required.
159+
*
160+
* @return the filtering configuration
161+
*/
162+
FilterConfiguration filter();
163+
154164
/**
155165
* Configure flow of messages.
156166
*
@@ -242,4 +252,58 @@ interface FlowConfiguration {
242252
*/
243253
ConsumerBuilder builder();
244254
}
255+
256+
/**
257+
* Filter configuration.
258+
*
259+
* <p>RabbitMQ 3.13 or more is required.
260+
*/
261+
interface FilterConfiguration {
262+
263+
/**
264+
* Set the filter values.
265+
*
266+
* @param filterValues
267+
* @return this filter configuration instance
268+
*/
269+
FilterConfiguration values(String... filterValues);
270+
271+
/**
272+
* Client-side filtering logic, occurring after the server-side filtering.
273+
*
274+
* <p>It must be consistent with the requested filter {@link #values( String...)} and the {@link
275+
* #matchUnfiltered()} flag.
276+
*
277+
* @param filter a predicate that returns <code>true</code> if a message should go to the {@link
278+
* MessageHandler}
279+
* @return this filter configuration instance
280+
*/
281+
FilterConfiguration postFilter(Predicate<Message> filter);
282+
283+
/**
284+
* Whether messages without a filter value should be sent as well.
285+
*
286+
* <p>Default is false.
287+
*
288+
* @return this filter configuration instance
289+
*/
290+
FilterConfiguration matchUnfiltered();
291+
292+
/**
293+
* Whether messages without a filter value should be sent as well.
294+
*
295+
* <p>Default is false.
296+
*
297+
* @param matchUnfiltered
298+
* @return this filter configuration instance
299+
*/
300+
FilterConfiguration matchUnfiltered(boolean matchUnfiltered);
301+
302+
/**
303+
* Go back to the builder.
304+
*
305+
* @return the consumer builder
306+
*/
307+
ConsumerBuilder builder();
308+
}
245309
}

src/main/java/com/rabbitmq/stream/ProducerBuilder.java

+10
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,16 @@ public interface ProducerBuilder {
132132
*/
133133
ProducerBuilder enqueueTimeout(Duration timeout);
134134

135+
/**
136+
* Logic to extract a filter value from a message.
137+
*
138+
* <p>RabbitMQ 3.13 or more is required.
139+
*
140+
* @param filterValueExtractor
141+
* @return this builder instance
142+
*/
143+
ProducerBuilder filterValue(Function<Message, String> filterValueExtractor);
144+
135145
/**
136146
* Create the {@link Producer} instance.
137147
*

src/main/java/com/rabbitmq/stream/StreamCreator.java

+11
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
package com.rabbitmq.stream;
1515

1616
import java.time.Duration;
17+
import java.util.function.Function;
1718

1819
/** API to configure and create a stream. */
1920
public interface StreamCreator {
@@ -63,6 +64,16 @@ public interface StreamCreator {
6364
*/
6465
StreamCreator leaderLocator(LeaderLocator leaderLocator);
6566

67+
/**
68+
* Set the size of the stream chunk filters.
69+
*
70+
* @param size
71+
* @return this creator instance
72+
* @see ProducerBuilder#filterValue( Function)
73+
* @see ConsumerBuilder#filter()
74+
*/
75+
StreamCreator filterSize(int size);
76+
6677
/**
6778
* Create the stream.
6879
*

0 commit comments

Comments
 (0)