Skip to content

Support for stream filtering #343

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 18 commits into from
Jul 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@
<buildnumber.plugin.version>1.4</buildnumber.plugin.version>
<jmh.version>1.36</jmh.version>
<spotless.version>2.37.0</spotless.version>
<google-java-format.version>1.15.0</google-java-format.version>
<google-java-format.version>1.17.0</google-java-format.version>
<jacoco.version>0.8.10</jacoco.version>
<!-- for documentation -->
<broker.version>3.12</broker.version>
Expand Down
84 changes: 84 additions & 0 deletions src/docs/asciidoc/advanced-topics.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,90 @@

=== Advanced Topics

==== Filtering

WARNING: Filtering requires *RabbitMQ 3.13* or more.

RabbitMQ Stream provides a server-side filtering feature that avoids reading all the messages of a stream and filtering only on the client side.
This helps to save network bandwidth when a consuming application needs only a subset of messages, e.g. the messages from a given geographical region.

The filtering feature works as follows:

* each message is published with an associated _filter value_
* a consumer that wants to enable filtering must:
** define one or several filter values
** define some client-side filtering logic

Why does the consumer need to define some client-side filtering logic?
Because the server-side filtering is probabilistic: messages that do not match the filter value(s) can still be sent to the consumer.
The server uses a https://en.wikipedia.org/wiki/Bloom_filter[Bloom filter], _a space-efficient probabilistic data structure_, where false positives are possible.
Despite this, the filtering saves some bandwidth, which is its primary goal.

===== Filtering on the Publishing Side

Filtering on the publishing side consists in defining some logic to extract the filter value from a message.
The following snippet shows how to extract the filter value from an application property:

.Declaring a producer with logic to extract a filter value from each message
[source,java,indent=0]
--------
include::{test-examples}/FilteringUsage.java[tag=producer-simple]
--------
<1> Get filter value from `state` application property

Note the filter value can be null: the message is then published in a regular way.
It is called in this context an _unfiltered_ message.

===== Filtering on the Consuming Side

A consumer needs to set up one or several filter values and some filtering logic to enable filtering.
The filtering logic must be consistent with the filter values.
In the next snippet, the consumer wants to process only messages from the state of California.
It sets a filter value to `california` and a predicate that accepts a message only if the `state` application properties is `california`:

.Declaring a consumer with a filter value and filtering logic
[source,java,indent=0]
--------
include::{test-examples}/FilteringUsage.java[tag=consumer-simple]
--------
<1> Set filter value
<2> Set filtering logic

The filter logic is a `Predicate<Message>`.
It must return `true` if a message is accepted, following the same semantics as `java.util.stream.Stream#filter(Predicate)`.

As stated above, not all messages must have an associated filter value.
Many applications may not need some filtering, so they can publish messages the regular way.
So a stream can contain messages with and without an associated filter value.

By default, messages without a filter value (a.k.a _unfiltered_ messages) are not sent to a consumer that enabled filtering.

But what if a consumer wants to process messages with a filter value and messages without any filter value as well?
It must use the `matchUnfiltered()` method in its declaration and also make sure to keep the filtering logic consistent:

.Getting unfiltered messages as well when enabling filtering
[source,java,indent=0]
--------
include::{test-examples}/FilteringUsage.java[tag=consumer-match-unfiltered]
--------
<1> Request messages from California
<2> Request messages without a filter value as well
<3> Let both types of messages pass

In the example above, the filtering logic has been adapted to let pass `california` messages _and_ messages without a state set as well.

===== Considerations on Filtering

As stated previously, the server can send messages that do not match the filter value(s) set by consumers.
This is why application developers must be very careful with the filtering logic they define to avoid processing unwanted messages.

What are good candidates for filter values?
Unique identifiers are _not_: if you know a given message property will be unique in a stream, do not use it as a filter value.
A defined set of values shared across the messages is a good candidate: geographical locations (e.g. countries, states), document types in a stream that stores document information (e.g. payslip, invoice, order), categories of products (e.g. book, luggage, toy).

Cardinality of filter values can be from a few to a few thousands.
Extreme cardinality (a couple or dozens of thousands) can make filtering less efficient.

==== Using Native `epoll`

The stream Java client uses the https://netty.io/[Netty] network framework and its Java NIO transport implementation by default.
Expand Down
64 changes: 64 additions & 0 deletions src/main/java/com/rabbitmq/stream/ConsumerBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package com.rabbitmq.stream;

import java.time.Duration;
import java.util.function.Predicate;

/** API to configure and create a {@link Consumer}. */
public interface ConsumerBuilder {
Expand Down Expand Up @@ -151,6 +152,15 @@ public interface ConsumerBuilder {
*/
ConsumerBuilder noTrackingStrategy();

/**
* Configure the filtering.
*
* <p>RabbitMQ 3.13 or more is required.
*
* @return the filtering configuration
*/
FilterConfiguration filter();

/**
* Configure flow of messages.
*
Expand Down Expand Up @@ -242,4 +252,58 @@ interface FlowConfiguration {
*/
ConsumerBuilder builder();
}

/**
* Filter configuration.
*
* <p>RabbitMQ 3.13 or more is required.
*/
interface FilterConfiguration {

/**
* Set the filter values.
*
* @param filterValues
* @return this filter configuration instance
*/
FilterConfiguration values(String... filterValues);

/**
* Client-side filtering logic, occurring after the server-side filtering.
*
* <p>It must be consistent with the requested filter {@link #values( String...)} and the {@link
* #matchUnfiltered()} flag.
*
* @param filter a predicate that returns <code>true</code> if a message should go to the {@link
* MessageHandler}
* @return this filter configuration instance
*/
FilterConfiguration postFilter(Predicate<Message> filter);

/**
* Whether messages without a filter value should be sent as well.
*
* <p>Default is false.
*
* @return this filter configuration instance
*/
FilterConfiguration matchUnfiltered();

/**
* Whether messages without a filter value should be sent as well.
*
* <p>Default is false.
*
* @param matchUnfiltered
* @return this filter configuration instance
*/
FilterConfiguration matchUnfiltered(boolean matchUnfiltered);

/**
* Go back to the builder.
*
* @return the consumer builder
*/
ConsumerBuilder builder();
}
}
10 changes: 10 additions & 0 deletions src/main/java/com/rabbitmq/stream/ProducerBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,16 @@ public interface ProducerBuilder {
*/
ProducerBuilder enqueueTimeout(Duration timeout);

/**
* Logic to extract a filter value from a message.
*
* <p>RabbitMQ 3.13 or more is required.
*
* @param filterValueExtractor
* @return this builder instance
*/
ProducerBuilder filterValue(Function<Message, String> filterValueExtractor);

/**
* Create the {@link Producer} instance.
*
Expand Down
11 changes: 11 additions & 0 deletions src/main/java/com/rabbitmq/stream/StreamCreator.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package com.rabbitmq.stream;

import java.time.Duration;
import java.util.function.Function;

/** API to configure and create a stream. */
public interface StreamCreator {
Expand Down Expand Up @@ -63,6 +64,16 @@ public interface StreamCreator {
*/
StreamCreator leaderLocator(LeaderLocator leaderLocator);

/**
* Set the size of the stream chunk filters.
*
* @param size
* @return this creator instance
* @see ProducerBuilder#filterValue( Function)
* @see ConsumerBuilder#filter()
*/
StreamCreator filterSize(int size);

/**
* Create the stream.
*
Expand Down
Loading