Skip to content

Files

Latest commit

61c4e4c · Jul 10, 2023

History

History
141 lines (104 loc) · 6.43 KB

advanced-topics.adoc

File metadata and controls

141 lines (104 loc) · 6.43 KB

Advanced Topics

Filtering

Warning
Filtering requires RabbitMQ 3.13 or more.

RabbitMQ Stream provides a server-side filtering feature that avoids reading all the messages of a stream and filtering only on the client side. This helps to save network bandwidth when a consuming application needs only a subset of messages, e.g. the messages from a given geographical region.

The filtering feature works as follows:

  • each message is published with an associated filter value

  • a consumer that wants to enable filtering must:

    • define one or several filter values

    • define some client-side filtering logic

Why does the consumer need to define some client-side filtering logic? Because the server-side filtering is probabilistic: messages that do not match the filter value(s) can still be sent to the consumer. The server uses a Bloom filter, a space-efficient probabilistic data structure, where false positives are possible. Despite this, the filtering saves some bandwidth, which is its primary goal.

Filtering on the Publishing Side

Filtering on the publishing side consists in defining some logic to extract the filter value from a message. The following snippet shows how to extract the filter value from an application property:

Declaring a producer with logic to extract a filter value from each message
link:../../test/java/com/rabbitmq/stream/docs/FilteringUsage.java[role=include]
  1. Get filter value from state application property

Note the filter value can be null: the message is then published in a regular way. It is called in this context an unfiltered message.

Filtering on the Consuming Side

A consumer needs to set up one or several filter values and some filtering logic to enable filtering. The filtering logic must be consistent with the filter values. In the next snippet, the consumer wants to process only messages from the state of California. It sets a filter value to california and a predicate that accepts a message only if the state application properties is california:

Declaring a consumer with a filter value and filtering logic
link:../../test/java/com/rabbitmq/stream/docs/FilteringUsage.java[role=include]
  1. Set filter value

  2. Set filtering logic

The filter logic is a Predicate<Message>. It must return true if a message is accepted, following the same semantics as java.util.stream.Stream#filter(Predicate).

As stated above, not all messages must have an associated filter value. Many applications may not need some filtering, so they can publish messages the regular way. So a stream can contain messages with and without an associated filter value.

By default, messages without a filter value (a.k.a unfiltered messages) are not sent to a consumer that enabled filtering.

But what if a consumer wants to process messages with a filter value and messages without any filter value as well? It must use the matchUnfiltered() method in its declaration and also make sure to keep the filtering logic consistent:

Getting unfiltered messages as well when enabling filtering
link:../../test/java/com/rabbitmq/stream/docs/FilteringUsage.java[role=include]
  1. Request messages from California

  2. Request messages without a filter value as well

  3. Let both types of messages pass

In the example above, the filtering logic has been adapted to let pass california messages and messages without a state set as well.

Considerations on Filtering

As stated previously, the server can send messages that do not match the filter value(s) set by consumers. This is why application developers must be very careful with the filtering logic they define to avoid processing unwanted messages.

What are good candidates for filter values? Unique identifiers are not: if you know a given message property will be unique in a stream, do not use it as a filter value. A defined set of values shared across the messages is a good candidate: geographical locations (e.g. countries, states), document types in a stream that stores document information (e.g. payslip, invoice, order), categories of products (e.g. book, luggage, toy).

Cardinality of filter values can be from a few to a few thousands. Extreme cardinality (a couple or dozens of thousands) can make filtering less efficient.

Using Native epoll

The stream Java client uses the Netty network framework and its Java NIO transport implementation by default. This should be a reasonable default for most applications.

Netty also allows using JNI transports. They are less portable than Java NIO, but they can be more performant for some workloads (even though the RabbitMQ team has not seen any significant improvement in their own tests).

The Linux epoll transport is a popular choice, so we’ll see how to configure with the stream Java client. Other JNI transports can be configured in the same way.

The native transport dependency must be added to the dependency manager. We must pull the native binaries compiled for our OS and architecture, in our example Linux x86-64, so we are using the linux-x86_64 classifier. Here is the declaration for Maven:

Declaring the Linux x86-64 native epoll transport dependency with Maven
<dependencies>

  <dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-transport-native-epoll</artifactId>
    <version>{netty-version}</version>
    <classifier>linux-x86_64</classifier>
  </dependency>

</dependencies>

And for Gradle:

Declaring the Linux x86-64 native epoll transport dependency with Gradle
dependencies {
  compile "io.netty:netty-transport-native-epoll:{netty-version}:linux-x86_64"
}

The native epoll transport is set up when the environment is configured:

Configuring the native epoll transport in the environment
link:../../test/java/com/rabbitmq/stream/docs/EnvironmentUsage.java[role=include]
  1. Create the epoll event loop group (don’t forget to close it!)

  2. Use the Netty configuration helper

  3. Set the event loop group

  4. Set the channel class to use

Note the event loop group must be closed explicitly: the environment will not close it itself as it is provided externally.