Warning
|
Filtering requires RabbitMQ 3.13 or more. |
RabbitMQ Stream provides a server-side filtering feature that avoids reading all the messages of a stream and filtering only on the client side. This helps to save network bandwidth when a consuming application needs only a subset of messages, e.g. the messages from a given geographical region.
The filtering feature works as follows:
-
each message is published with an associated filter value
-
a consumer that wants to enable filtering must:
-
define one or several filter values
-
define some client-side filtering logic
-
Why does the consumer need to define some client-side filtering logic? Because the server-side filtering is probabilistic: messages that do not match the filter value(s) can still be sent to the consumer. The server uses a Bloom filter, a space-efficient probabilistic data structure, where false positives are possible. Despite this, the filtering saves some bandwidth, which is its primary goal.
Filtering on the publishing side consists in defining some logic to extract the filter value from a message. The following snippet shows how to extract the filter value from an application property:
link:../../test/java/com/rabbitmq/stream/docs/FilteringUsage.java[role=include]
-
Get filter value from
state
application property
Note the filter value can be null: the message is then published in a regular way. It is called in this context an unfiltered message.
A consumer needs to set up one or several filter values and some filtering logic to enable filtering.
The filtering logic must be consistent with the filter values.
In the next snippet, the consumer wants to process only messages from the state of California.
It sets a filter value to california
and a predicate that accepts a message only if the state
application properties is california
:
link:../../test/java/com/rabbitmq/stream/docs/FilteringUsage.java[role=include]
-
Set filter value
-
Set filtering logic
The filter logic is a Predicate<Message>
.
It must return true
if a message is accepted, following the same semantics as java.util.stream.Stream#filter(Predicate)
.
As stated above, not all messages must have an associated filter value. Many applications may not need some filtering, so they can publish messages the regular way. So a stream can contain messages with and without an associated filter value.
By default, messages without a filter value (a.k.a unfiltered messages) are not sent to a consumer that enabled filtering.
But what if a consumer wants to process messages with a filter value and messages without any filter value as well?
It must use the matchUnfiltered()
method in its declaration and also make sure to keep the filtering logic consistent:
link:../../test/java/com/rabbitmq/stream/docs/FilteringUsage.java[role=include]
-
Request messages from California
-
Request messages without a filter value as well
-
Let both types of messages pass
In the example above, the filtering logic has been adapted to let pass california
messages and messages without a state set as well.
As stated previously, the server can send messages that do not match the filter value(s) set by consumers. This is why application developers must be very careful with the filtering logic they define to avoid processing unwanted messages.
What are good candidates for filter values? Unique identifiers are not: if you know a given message property will be unique in a stream, do not use it as a filter value. A defined set of values shared across the messages is a good candidate: geographical locations (e.g. countries, states), document types in a stream that stores document information (e.g. payslip, invoice, order), categories of products (e.g. book, luggage, toy).
Cardinality of filter values can be from a few to a few thousands. Extreme cardinality (a couple or dozens of thousands) can make filtering less efficient.
The stream Java client uses the Netty network framework and its Java NIO transport implementation by default. This should be a reasonable default for most applications.
Netty also allows using JNI transports. They are less portable than Java NIO, but they can be more performant for some workloads (even though the RabbitMQ team has not seen any significant improvement in their own tests).
The Linux epoll
transport is a popular choice, so we’ll see how to configure with the stream Java client.
Other JNI transports can be configured in the same way.
The native transport dependency must be added to the dependency manager.
We must pull the native binaries compiled for our OS and architecture, in our example Linux x86-64, so we are using the linux-x86_64
classifier.
Here is the declaration for Maven:
epoll
transport dependency with Maven<dependencies>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-epoll</artifactId>
<version>{netty-version}</version>
<classifier>linux-x86_64</classifier>
</dependency>
</dependencies>
And for Gradle:
epoll
transport dependency with Gradledependencies {
compile "io.netty:netty-transport-native-epoll:{netty-version}:linux-x86_64"
}
The native epoll
transport is set up when the environment is configured:
epoll
transport in the environmentlink:../../test/java/com/rabbitmq/stream/docs/EnvironmentUsage.java[role=include]
-
Create the
epoll
event loop group (don’t forget to close it!) -
Use the Netty configuration helper
-
Set the event loop group
-
Set the channel class to use
Note the event loop group must be closed explicitly: the environment will not close it itself as it is provided externally.