`.
+It must return `true` if a message is accepted, following the same semantics as `java.util.stream.Stream#filter(Predicate)`.
+
+As stated above, not all messages must have an associated filter value.
+Many applications may not need some filtering, so they can publish messages the regular way.
+So a stream can contain messages with and without an associated filter value.
+
+By default, messages without a filter value (a.k.a _unfiltered_ messages) are not sent to a consumer that enabled filtering.
+
+But what if a consumer wants to process messages with a filter value and messages without any filter value as well?
+It must use the `matchUnfiltered()` method in its declaration and also make sure to keep the filtering logic consistent:
+
+.Getting unfiltered messages as well when enabling filtering
+[source,java,indent=0]
+--------
+include::{test-examples}/FilteringUsage.java[tag=consumer-match-unfiltered]
+--------
+<1> Request messages from California
+<2> Request messages without a filter value as well
+<3> Let both types of messages pass
+
+In the example above, the filtering logic has been adapted to let pass `california` messages _and_ messages without a state set as well.
+
+===== Considerations on Filtering
+
+As stated previously, the server can send messages that do not match the filter value(s) set by consumers.
+This is why application developers must be very careful with the filtering logic they define to avoid processing unwanted messages.
+
+What are good candidates for filter values?
+Unique identifiers are _not_: if you know a given message property will be unique in a stream, do not use it as a filter value.
+A defined set of values shared across the messages is a good candidate: geographical locations (e.g. countries, states), document types in a stream that stores document information (e.g. payslip, invoice, order), categories of products (e.g. book, luggage, toy).
+
+Cardinality of filter values can be from a few to a few thousands.
+Extreme cardinality (a couple or dozens of thousands) can make filtering less efficient.
+
==== Using Native `epoll`
The stream Java client uses the https://netty.io/[Netty] network framework and its Java NIO transport implementation by default.
diff --git a/src/main/java/com/rabbitmq/stream/ConsumerBuilder.java b/src/main/java/com/rabbitmq/stream/ConsumerBuilder.java
index b86db48b2e..a7424e2f75 100644
--- a/src/main/java/com/rabbitmq/stream/ConsumerBuilder.java
+++ b/src/main/java/com/rabbitmq/stream/ConsumerBuilder.java
@@ -14,6 +14,7 @@
package com.rabbitmq.stream;
import java.time.Duration;
+import java.util.function.Predicate;
/** API to configure and create a {@link Consumer}. */
public interface ConsumerBuilder {
@@ -151,6 +152,15 @@ public interface ConsumerBuilder {
*/
ConsumerBuilder noTrackingStrategy();
+ /**
+ * Configure the filtering.
+ *
+ * RabbitMQ 3.13 or more is required.
+ *
+ * @return the filtering configuration
+ */
+ FilterConfiguration filter();
+
/**
* Configure flow of messages.
*
@@ -242,4 +252,58 @@ interface FlowConfiguration {
*/
ConsumerBuilder builder();
}
+
+ /**
+ * Filter configuration.
+ *
+ *
RabbitMQ 3.13 or more is required.
+ */
+ interface FilterConfiguration {
+
+ /**
+ * Set the filter values.
+ *
+ * @param filterValues
+ * @return this filter configuration instance
+ */
+ FilterConfiguration values(String... filterValues);
+
+ /**
+ * Client-side filtering logic, occurring after the server-side filtering.
+ *
+ *
It must be consistent with the requested filter {@link #values( String...)} and the {@link
+ * #matchUnfiltered()} flag.
+ *
+ * @param filter a predicate that returns true
if a message should go to the {@link
+ * MessageHandler}
+ * @return this filter configuration instance
+ */
+ FilterConfiguration postFilter(Predicate filter);
+
+ /**
+ * Whether messages without a filter value should be sent as well.
+ *
+ * Default is false.
+ *
+ * @return this filter configuration instance
+ */
+ FilterConfiguration matchUnfiltered();
+
+ /**
+ * Whether messages without a filter value should be sent as well.
+ *
+ *
Default is false.
+ *
+ * @param matchUnfiltered
+ * @return this filter configuration instance
+ */
+ FilterConfiguration matchUnfiltered(boolean matchUnfiltered);
+
+ /**
+ * Go back to the builder.
+ *
+ * @return the consumer builder
+ */
+ ConsumerBuilder builder();
+ }
}
diff --git a/src/main/java/com/rabbitmq/stream/ProducerBuilder.java b/src/main/java/com/rabbitmq/stream/ProducerBuilder.java
index be5314da16..cdc942f03d 100644
--- a/src/main/java/com/rabbitmq/stream/ProducerBuilder.java
+++ b/src/main/java/com/rabbitmq/stream/ProducerBuilder.java
@@ -132,6 +132,16 @@ public interface ProducerBuilder {
*/
ProducerBuilder enqueueTimeout(Duration timeout);
+ /**
+ * Logic to extract a filter value from a message.
+ *
+ *
RabbitMQ 3.13 or more is required.
+ *
+ * @param filterValueExtractor
+ * @return this builder instance
+ */
+ ProducerBuilder filterValue(Function filterValueExtractor);
+
/**
* Create the {@link Producer} instance.
*
diff --git a/src/main/java/com/rabbitmq/stream/StreamCreator.java b/src/main/java/com/rabbitmq/stream/StreamCreator.java
index aa61e4ddd2..22250e40d8 100644
--- a/src/main/java/com/rabbitmq/stream/StreamCreator.java
+++ b/src/main/java/com/rabbitmq/stream/StreamCreator.java
@@ -14,6 +14,7 @@
package com.rabbitmq.stream;
import java.time.Duration;
+import java.util.function.Function;
/** API to configure and create a stream. */
public interface StreamCreator {
@@ -63,6 +64,16 @@ public interface StreamCreator {
*/
StreamCreator leaderLocator(LeaderLocator leaderLocator);
+ /**
+ * Set the size of the stream chunk filters.
+ *
+ * @param size
+ * @return this creator instance
+ * @see ProducerBuilder#filterValue( Function)
+ * @see ConsumerBuilder#filter()
+ */
+ StreamCreator filterSize(int size);
+
/**
* Create the stream.
*
diff --git a/src/main/java/com/rabbitmq/stream/impl/Client.java b/src/main/java/com/rabbitmq/stream/impl/Client.java
index f85fa9a51a..5e3bcb79af 100644
--- a/src/main/java/com/rabbitmq/stream/impl/Client.java
+++ b/src/main/java/com/rabbitmq/stream/impl/Client.java
@@ -13,34 +13,7 @@
// info@rabbitmq.com.
package com.rabbitmq.stream.impl;
-import static com.rabbitmq.stream.Constants.COMMAND_CLOSE;
-import static com.rabbitmq.stream.Constants.COMMAND_CONSUMER_UPDATE;
-import static com.rabbitmq.stream.Constants.COMMAND_CREATE_STREAM;
-import static com.rabbitmq.stream.Constants.COMMAND_CREDIT;
-import static com.rabbitmq.stream.Constants.COMMAND_DECLARE_PUBLISHER;
-import static com.rabbitmq.stream.Constants.COMMAND_DELETE_PUBLISHER;
-import static com.rabbitmq.stream.Constants.COMMAND_DELETE_STREAM;
-import static com.rabbitmq.stream.Constants.COMMAND_EXCHANGE_COMMAND_VERSIONS;
-import static com.rabbitmq.stream.Constants.COMMAND_HEARTBEAT;
-import static com.rabbitmq.stream.Constants.COMMAND_METADATA;
-import static com.rabbitmq.stream.Constants.COMMAND_OPEN;
-import static com.rabbitmq.stream.Constants.COMMAND_PARTITIONS;
-import static com.rabbitmq.stream.Constants.COMMAND_PEER_PROPERTIES;
-import static com.rabbitmq.stream.Constants.COMMAND_PUBLISH;
-import static com.rabbitmq.stream.Constants.COMMAND_QUERY_OFFSET;
-import static com.rabbitmq.stream.Constants.COMMAND_QUERY_PUBLISHER_SEQUENCE;
-import static com.rabbitmq.stream.Constants.COMMAND_ROUTE;
-import static com.rabbitmq.stream.Constants.COMMAND_SASL_AUTHENTICATE;
-import static com.rabbitmq.stream.Constants.COMMAND_SASL_HANDSHAKE;
-import static com.rabbitmq.stream.Constants.COMMAND_STORE_OFFSET;
-import static com.rabbitmq.stream.Constants.COMMAND_STREAM_STATS;
-import static com.rabbitmq.stream.Constants.COMMAND_SUBSCRIBE;
-import static com.rabbitmq.stream.Constants.COMMAND_UNSUBSCRIBE;
-import static com.rabbitmq.stream.Constants.RESPONSE_CODE_AUTHENTICATION_FAILURE;
-import static com.rabbitmq.stream.Constants.RESPONSE_CODE_AUTHENTICATION_FAILURE_LOOPBACK;
-import static com.rabbitmq.stream.Constants.RESPONSE_CODE_OK;
-import static com.rabbitmq.stream.Constants.RESPONSE_CODE_SASL_CHALLENGE;
-import static com.rabbitmq.stream.Constants.VERSION_1;
+import static com.rabbitmq.stream.Constants.*;
import static com.rabbitmq.stream.impl.Utils.encodeRequestCode;
import static com.rabbitmq.stream.impl.Utils.encodeResponseCode;
import static com.rabbitmq.stream.impl.Utils.extractResponseCode;
@@ -216,6 +189,7 @@ public long applyAsLong(Object value) {
private final Duration rpcTimeout;
private volatile ShutdownReason shutdownReason = null;
private final Runnable exchangeCommandVersionsCheck;
+ private final boolean filteringSupported;
public Client() {
this(new ClientParameters());
@@ -398,15 +372,25 @@ public void initChannel(SocketChannel ch) {
tuneState.getHeartbeat());
this.connectionProperties = open(parameters.virtualHost);
Set supportedCommands = maybeExchangeCommandVersions();
- if (supportedCommands.stream().anyMatch(i -> i.getKey() == COMMAND_STREAM_STATS)) {
- this.exchangeCommandVersionsCheck = () -> {};
- } else {
- this.exchangeCommandVersionsCheck =
- () -> {
- throw new UnsupportedOperationException(
- "QueryStreamInfo is available only on RabbitMQ 3.11 or more.");
- };
- }
+ AtomicReference exchangeCommandVersionsCheckReference = new AtomicReference<>();
+ AtomicBoolean filteringSupportedReference = new AtomicBoolean(false);
+ supportedCommands.forEach(
+ c -> {
+ if (c.getKey() == COMMAND_STREAM_STATS) {
+ exchangeCommandVersionsCheckReference.set(() -> {});
+ }
+ if (c.getKey() == COMMAND_PUBLISH && c.getMaxVersion() >= VERSION_2) {
+ filteringSupportedReference.set(true);
+ }
+ });
+ this.exchangeCommandVersionsCheck =
+ exchangeCommandVersionsCheckReference.get() == null
+ ? () -> {
+ throw new UnsupportedOperationException(
+ "QueryStreamInfo is available only on RabbitMQ 3.11 or more.");
+ }
+ : exchangeCommandVersionsCheckReference.get();
+ this.filteringSupported = filteringSupportedReference.get();
started.set(true);
this.metricsCollector.openConnection();
} catch (RuntimeException e) {
@@ -855,6 +839,7 @@ public List publish(
encodedMessages.add(encodedMessage);
}
return publishInternal(
+ VERSION_1,
this.channel,
publisherId,
encodedMessages,
@@ -881,6 +866,7 @@ public List publish(
encodedMessages.add(wrapper);
}
return publishInternal(
+ VERSION_1,
this.channel,
publisherId,
encodedMessages,
@@ -911,6 +897,7 @@ public List publishBatches(
encodedMessageBatches.add(encodedMessageBatch);
}
return publishInternal(
+ VERSION_1,
this.channel,
publisherId,
encodedMessageBatches,
@@ -947,6 +934,7 @@ public List publishBatches(
encodedMessageBatches.add(wrapper);
}
return publishInternal(
+ VERSION_1,
this.channel,
publisherId,
encodedMessageBatches,
@@ -977,15 +965,17 @@ private void checkMessageBatchFitsInFrame(EncodedMessageBatch encodedMessageBatc
}
List publishInternal(
+ short version,
byte publisherId,
List encodedEntities,
OutboundEntityWriteCallback callback,
ToLongFunction publishSequenceFunction) {
return this.publishInternal(
- this.channel, publisherId, encodedEntities, callback, publishSequenceFunction);
+ version, this.channel, publisherId, encodedEntities, callback, publishSequenceFunction);
}
List publishInternal(
+ short version,
Channel ch,
byte publisherId,
List encodedEntities,
@@ -1002,6 +992,7 @@ List publishInternal(
// the current message/batch does not fit, we're sending the batch
int frameLength = length - callback.fragmentLength(encodedEntity);
sendEntityBatch(
+ version,
ch,
frameLength,
publisherId,
@@ -1017,6 +1008,7 @@ List publishInternal(
currentIndex++;
}
sendEntityBatch(
+ version,
ch,
length,
publisherId,
@@ -1031,6 +1023,7 @@ List publishInternal(
}
private void sendEntityBatch(
+ short version,
Channel ch,
int frameLength,
byte publisherId,
@@ -1044,7 +1037,7 @@ private void sendEntityBatch(
ByteBuf out = allocateNoCheck(ch.alloc(), frameLength + 4);
out.writeInt(frameLength);
out.writeShort(encodeRequestCode(COMMAND_PUBLISH));
- out.writeShort(VERSION_1);
+ out.writeShort(version);
out.writeByte(publisherId);
int messageCount = 0;
out.writeInt(toExcluded - fromIncluded);
@@ -1402,6 +1395,10 @@ private String serverAddress() {
}
}
+ boolean filteringSupported() {
+ return this.filteringSupported;
+ }
+
public List route(String routingKey, String superStream) {
if (routingKey == null || superStream == null) {
throw new IllegalArgumentException("routing key and stream must not be null");
@@ -1603,11 +1600,7 @@ private Set maybeExchangeCommandVersions() {
Set supported = new HashSet<>();
try {
if (Utils.is3_11_OrMore(brokerVersion())) {
- for (FrameHandlerInfo info : exchangeCommandVersions()) {
- if (info.getKey() == COMMAND_STREAM_STATS) {
- supported.add(info);
- }
- }
+ supported.addAll(exchangeCommandVersions());
}
} catch (Exception e) {
LOGGER.info("Error while exchanging command versions: {}", e.getMessage());
@@ -2589,6 +2582,14 @@ public StreamParametersBuilder leaderLocator(LeaderLocator leaderLocator) {
return this;
}
+ public StreamParametersBuilder filterSize(int size) {
+ if (size < 16 || size > 255) {
+ throw new IllegalArgumentException("Stream filter size must be between 16 and 255");
+ }
+ this.parameters.put("stream-filter-size-bytes", String.valueOf(size));
+ return this;
+ }
+
public StreamParametersBuilder put(String key, String value) {
parameters.put(key, value);
return this;
diff --git a/src/main/java/com/rabbitmq/stream/impl/MessageAccumulator.java b/src/main/java/com/rabbitmq/stream/impl/MessageAccumulator.java
index 25c37fd465..e40f6ada11 100644
--- a/src/main/java/com/rabbitmq/stream/impl/MessageAccumulator.java
+++ b/src/main/java/com/rabbitmq/stream/impl/MessageAccumulator.java
@@ -32,6 +32,8 @@ interface AccumulatedEntity {
long publishindId();
+ String filterValue();
+
Object encodedEntity();
StreamProducer.ConfirmationCallback confirmationCallback();
diff --git a/src/main/java/com/rabbitmq/stream/impl/SimpleMessageAccumulator.java b/src/main/java/com/rabbitmq/stream/impl/SimpleMessageAccumulator.java
index d701710ac0..5f01ea1416 100644
--- a/src/main/java/com/rabbitmq/stream/impl/SimpleMessageAccumulator.java
+++ b/src/main/java/com/rabbitmq/stream/impl/SimpleMessageAccumulator.java
@@ -21,28 +21,35 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
import java.util.function.ToLongFunction;
class SimpleMessageAccumulator implements MessageAccumulator {
+ private static final Function NULL_FILTER_VALUE_EXTRACTOR = m -> null;
+
protected final BlockingQueue messages;
protected final Clock clock;
private final int capacity;
private final Codec codec;
private final int maxFrameSize;
private final ToLongFunction publishSequenceFunction;
+ private final Function filterValueExtractor;
SimpleMessageAccumulator(
int capacity,
Codec codec,
int maxFrameSize,
ToLongFunction publishSequenceFunction,
+ Function filterValueExtractor,
Clock clock) {
this.capacity = capacity;
this.messages = new LinkedBlockingQueue<>(capacity);
this.codec = codec;
this.maxFrameSize = maxFrameSize;
this.publishSequenceFunction = publishSequenceFunction;
+ this.filterValueExtractor =
+ filterValueExtractor == null ? NULL_FILTER_VALUE_EXTRACTOR : filterValueExtractor;
this.clock = clock;
}
@@ -56,6 +63,7 @@ public boolean add(Message message, ConfirmationHandler confirmationHandler) {
new SimpleAccumulatedEntity(
clock.time(),
publishingId,
+ this.filterValueExtractor.apply(message),
encodedMessage,
new SimpleConfirmationCallback(message, confirmationHandler)),
60,
@@ -88,17 +96,20 @@ private static final class SimpleAccumulatedEntity implements AccumulatedEntity
private final long time;
private final long publishingId;
+ private final String filterValue;
private final Codec.EncodedMessage encodedMessage;
private final StreamProducer.ConfirmationCallback confirmationCallback;
private SimpleAccumulatedEntity(
long time,
long publishingId,
+ String filterValue,
Codec.EncodedMessage encodedMessage,
StreamProducer.ConfirmationCallback confirmationCallback) {
this.time = time;
this.publishingId = publishingId;
this.encodedMessage = encodedMessage;
+ this.filterValue = filterValue;
this.confirmationCallback = confirmationCallback;
}
@@ -107,6 +118,11 @@ public long publishindId() {
return publishingId;
}
+ @Override
+ public String filterValue() {
+ return filterValue;
+ }
+
@Override
public Object encodedEntity() {
return encodedMessage;
diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamConsumer.java b/src/main/java/com/rabbitmq/stream/impl/StreamConsumer.java
index 2e63b11e00..f2368d3f94 100644
--- a/src/main/java/com/rabbitmq/stream/impl/StreamConsumer.java
+++ b/src/main/java/com/rabbitmq/stream/impl/StreamConsumer.java
@@ -18,15 +18,8 @@
import static com.rabbitmq.stream.impl.Utils.offsetBefore;
import static java.time.Duration.ofMillis;
-import com.rabbitmq.stream.Constants;
-import com.rabbitmq.stream.Consumer;
-import com.rabbitmq.stream.ConsumerUpdateListener;
-import com.rabbitmq.stream.MessageHandler;
+import com.rabbitmq.stream.*;
import com.rabbitmq.stream.MessageHandler.Context;
-import com.rabbitmq.stream.NoOffsetException;
-import com.rabbitmq.stream.OffsetSpecification;
-import com.rabbitmq.stream.StreamException;
-import com.rabbitmq.stream.SubscriptionListener;
import com.rabbitmq.stream.impl.Client.QueryOffsetResponse;
import com.rabbitmq.stream.impl.StreamConsumerBuilder.TrackingConfiguration;
import com.rabbitmq.stream.impl.StreamEnvironment.TrackingConsumerRegistration;
@@ -83,7 +76,9 @@ class StreamConsumer implements Consumer {
ConsumerUpdateListener consumerUpdateListener,
int initialCredits,
int additionalCredits) {
-
+ if (Utils.filteringEnabled(subscriptionProperties) && !environment.filteringSupported()) {
+ throw new IllegalArgumentException("Filtering is not supported by the broker");
+ }
this.id = ID_SEQUENCE.getAndIncrement();
Runnable trackingClosingCallback;
try {
diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamConsumerBuilder.java b/src/main/java/com/rabbitmq/stream/impl/StreamConsumerBuilder.java
index 7d829e1f7f..0cefa2aa0f 100644
--- a/src/main/java/com/rabbitmq/stream/impl/StreamConsumerBuilder.java
+++ b/src/main/java/com/rabbitmq/stream/impl/StreamConsumerBuilder.java
@@ -13,24 +13,27 @@
// info@rabbitmq.com.
package com.rabbitmq.stream.impl;
-import com.rabbitmq.stream.Consumer;
-import com.rabbitmq.stream.ConsumerBuilder;
-import com.rabbitmq.stream.ConsumerUpdateListener;
-import com.rabbitmq.stream.MessageHandler;
-import com.rabbitmq.stream.OffsetSpecification;
-import com.rabbitmq.stream.StreamException;
-import com.rabbitmq.stream.SubscriptionListener;
+import static com.rabbitmq.stream.impl.Utils.SUBSCRIPTION_PROPERTY_FILTER_PREFIX;
+import static com.rabbitmq.stream.impl.Utils.SUBSCRIPTION_PROPERTY_MATCH_UNFILTERED;
+
+import com.rabbitmq.stream.*;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Predicate;
class StreamConsumerBuilder implements ConsumerBuilder {
private static final int NAME_MAX_SIZE = 256; // server-side limitation
+ private static final TrackingConfiguration DISABLED_TRACKING_CONFIGURATION =
+ new TrackingConfiguration(false, false, -1, Duration.ZERO, Duration.ZERO);
private final StreamEnvironment environment;
-
+ private final Map subscriptionProperties = new ConcurrentHashMap<>();
private String stream, superStream;
private OffsetSpecification offsetSpecification = null;
private MessageHandler messageHandler;
@@ -40,9 +43,9 @@ class StreamConsumerBuilder implements ConsumerBuilder {
private boolean noTrackingStrategy = false;
private boolean lazyInit = false;
private SubscriptionListener subscriptionListener = subscriptionContext -> {};
- private final Map subscriptionProperties = new ConcurrentHashMap<>();
- private ConsumerUpdateListener consumerUpdateListener;
private final DefaultFlowConfiguration flowConfiguration = new DefaultFlowConfiguration(this);
+ private ConsumerUpdateListener consumerUpdateListener;
+ private DefaultFilterConfiguration filterConfiguration;
public StreamConsumerBuilder(StreamEnvironment environment) {
this.environment = environment;
@@ -88,7 +91,7 @@ public ConsumerBuilder name(String name) {
@Override
public ConsumerBuilder singleActiveConsumer() {
- this.subscriptionProperties.put("single-active-consumer", "true");
+ this.subscriptionProperties.put(Utils.SUBSCRIPTION_PROPERTY_SAC, "true");
return this;
}
@@ -141,6 +144,14 @@ StreamConsumerBuilder lazyInit(boolean lazyInit) {
return this;
}
+ @Override
+ public FilterConfiguration filter() {
+ if (this.filterConfiguration == null) {
+ this.filterConfiguration = new DefaultFilterConfiguration(this);
+ }
+ return this.filterConfiguration;
+ }
+
@Override
public Consumer build() {
if (this.stream == null && this.superStream == null) {
@@ -185,13 +196,36 @@ public Consumer build() {
trackingConfiguration = DISABLED_TRACKING_CONFIGURATION;
}
+ MessageHandler handler;
+ if (this.filterConfiguration == null) {
+ handler = this.messageHandler;
+ } else {
+ this.filterConfiguration.validate();
+ AtomicInteger i = new AtomicInteger(0);
+ this.filterConfiguration.filterValues.forEach(
+ v ->
+ this.subscriptionProperties.put(
+ SUBSCRIPTION_PROPERTY_FILTER_PREFIX + i.getAndIncrement(), v));
+ this.subscriptionProperties.put(
+ SUBSCRIPTION_PROPERTY_MATCH_UNFILTERED,
+ this.filterConfiguration.matchUnfiltered ? "true" : "false");
+ final Predicate filter = this.filterConfiguration.filter;
+ final MessageHandler delegate = this.messageHandler;
+ handler =
+ (context, message) -> {
+ if (filter.test(message)) {
+ delegate.handle(context, message);
+ }
+ };
+ }
+
Consumer consumer;
if (this.stream != null) {
consumer =
new StreamConsumer(
this.stream,
this.offsetSpecification,
- this.messageHandler,
+ handler,
this.name,
this.environment,
trackingConfiguration,
@@ -204,7 +238,7 @@ public Consumer build() {
environment.addConsumer((StreamConsumer) consumer);
} else {
if (Utils.isSac(this.subscriptionProperties)) {
- this.subscriptionProperties.put("super-stream", this.superStream);
+ this.subscriptionProperties.put(Utils.SUBSCRIPTION_PROPERTY_SUPER_STREAM, this.superStream);
}
consumer =
new SuperStreamConsumer(this, this.superStream, this.environment, trackingConfiguration);
@@ -212,8 +246,21 @@ public Consumer build() {
return consumer;
}
- private static final TrackingConfiguration DISABLED_TRACKING_CONFIGURATION =
- new TrackingConfiguration(false, false, -1, Duration.ZERO, Duration.ZERO);
+ StreamConsumerBuilder duplicate() {
+ StreamConsumerBuilder duplicate = new StreamConsumerBuilder(this.environment);
+ for (Field field : StreamConsumerBuilder.class.getDeclaredFields()) {
+ if (Modifier.isStatic(field.getModifiers())) {
+ continue;
+ }
+ field.setAccessible(true);
+ try {
+ field.set(duplicate, field.get(this));
+ } catch (IllegalAccessException e) {
+ throw new StreamException("Error while duplicating stream producer builder", e);
+ }
+ }
+ return duplicate;
+ }
static class TrackingConfiguration {
@@ -321,20 +368,54 @@ public ConsumerBuilder builder() {
}
}
- StreamConsumerBuilder duplicate() {
- StreamConsumerBuilder duplicate = new StreamConsumerBuilder(this.environment);
- for (Field field : StreamConsumerBuilder.class.getDeclaredFields()) {
- if (Modifier.isStatic(field.getModifiers())) {
- continue;
+ private static final class DefaultFilterConfiguration implements FilterConfiguration {
+
+ private final StreamConsumerBuilder builder;
+ private List filterValues;
+ private Predicate filter;
+ private boolean matchUnfiltered = false;
+
+ private DefaultFilterConfiguration(StreamConsumerBuilder builder) {
+ this.builder = builder;
+ }
+
+ @Override
+ public FilterConfiguration values(String... filterValues) {
+ if (filterValues == null || filterValues.length == 0) {
+ throw new IllegalArgumentException("At least one filter value must be specified");
}
- field.setAccessible(true);
- try {
- field.set(duplicate, field.get(this));
- } catch (IllegalAccessException e) {
- throw new StreamException("Error while duplicating stream producer builder", e);
+ this.filterValues = Arrays.asList(filterValues);
+ return this;
+ }
+
+ @Override
+ public FilterConfiguration postFilter(Predicate filter) {
+ this.filter = filter;
+ return this;
+ }
+
+ @Override
+ public FilterConfiguration matchUnfiltered() {
+ this.matchUnfiltered = true;
+ return this;
+ }
+
+ @Override
+ public FilterConfiguration matchUnfiltered(boolean matchUnfiltered) {
+ this.matchUnfiltered = matchUnfiltered;
+ return this;
+ }
+
+ @Override
+ public ConsumerBuilder builder() {
+ return this.builder;
+ }
+
+ private void validate() {
+ if (this.filterValues == null || this.filter == null) {
+ throw new IllegalArgumentException("Both filter values and the filter logic must be set");
}
}
- return duplicate;
}
private static class DefaultFlowConfiguration implements FlowConfiguration {
diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java b/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java
index d40f070b56..c4ef75a190 100644
--- a/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java
+++ b/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java
@@ -736,6 +736,10 @@ static T locatorOperation(
return result;
}
+ boolean filteringSupported() {
+ return this.locatorOperation(Client::filteringSupported);
+ }
+
Clock clock() {
return this.clock;
}
diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamProducer.java b/src/main/java/com/rabbitmq/stream/impl/StreamProducer.java
index 49919164eb..84ff8a6296 100644
--- a/src/main/java/com/rabbitmq/stream/impl/StreamProducer.java
+++ b/src/main/java/com/rabbitmq/stream/impl/StreamProducer.java
@@ -13,9 +13,7 @@
// info@rabbitmq.com.
package com.rabbitmq.stream.impl;
-import static com.rabbitmq.stream.Constants.CODE_MESSAGE_ENQUEUEING_FAILED;
-import static com.rabbitmq.stream.Constants.CODE_PRODUCER_CLOSED;
-import static com.rabbitmq.stream.Constants.CODE_PRODUCER_NOT_AVAILABLE;
+import static com.rabbitmq.stream.Constants.*;
import static com.rabbitmq.stream.impl.Utils.formatConstant;
import static com.rabbitmq.stream.impl.Utils.namedRunnable;
@@ -31,6 +29,7 @@
import com.rabbitmq.stream.impl.Client.Response;
import com.rabbitmq.stream.impl.MessageAccumulator.AccumulatedEntity;
import io.netty.buffer.ByteBuf;
+import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Iterator;
@@ -48,6 +47,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
import java.util.function.ToLongFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -80,6 +80,7 @@ class StreamProducer implements Producer {
private volatile byte publisherId;
private volatile Status status;
private volatile ScheduledFuture> confirmTimeoutFuture;
+ private final short publishVersion;
StreamProducer(
String name,
@@ -91,7 +92,11 @@ class StreamProducer implements Producer {
int maxUnconfirmedMessages,
Duration confirmTimeout,
Duration enqueueTimeout,
+ Function filterValueExtractor,
StreamEnvironment environment) {
+ if (filterValueExtractor != null && !environment.filteringSupported()) {
+ throw new IllegalArgumentException("Filtering is not supported by the broker");
+ }
this.id = ID_SEQUENCE.getAndIncrement();
this.environment = environment;
this.name = name;
@@ -116,8 +121,13 @@ class StreamProducer implements Producer {
environment.codec(),
client.maxFrameSize(),
accumulatorPublishSequenceFunction,
+ filterValueExtractor,
this.environment.clock());
- delegateWriteCallback = Client.OUTBOUND_MESSAGE_WRITE_CALLBACK;
+ if (filterValueExtractor == null) {
+ delegateWriteCallback = Client.OUTBOUND_MESSAGE_WRITE_CALLBACK;
+ } else {
+ delegateWriteCallback = OUTBOUND_MSG_FILTER_VALUE_WRITE_CALLBACK;
+ }
} else {
this.accumulator =
new SubEntryMessageAccumulator(
@@ -138,22 +148,44 @@ class StreamProducer implements Producer {
this.unconfirmedMessagesSemaphore = new Semaphore(maxUnconfirmedMessages, true);
this.unconfirmedMessages = new ConcurrentHashMap<>(this.maxUnconfirmedMessages, 0.75f, 2);
- this.writeCallback =
- new Client.OutboundEntityWriteCallback() {
- @Override
- public int write(ByteBuf bb, Object entity, long publishingId) {
- MessageAccumulator.AccumulatedEntity accumulatedEntity =
- (MessageAccumulator.AccumulatedEntity) entity;
- unconfirmedMessages.put(publishingId, accumulatedEntity);
- return delegateWriteCallback.write(bb, accumulatedEntity.encodedEntity(), publishingId);
- }
+ if (filterValueExtractor == null) {
+ this.publishVersion = VERSION_1;
+ this.writeCallback =
+ new Client.OutboundEntityWriteCallback() {
+ @Override
+ public int write(ByteBuf bb, Object entity, long publishingId) {
+ MessageAccumulator.AccumulatedEntity accumulatedEntity =
+ (MessageAccumulator.AccumulatedEntity) entity;
+ unconfirmedMessages.put(publishingId, accumulatedEntity);
+ return delegateWriteCallback.write(
+ bb, accumulatedEntity.encodedEntity(), publishingId);
+ }
+
+ @Override
+ public int fragmentLength(Object entity) {
+ return delegateWriteCallback.fragmentLength(
+ ((MessageAccumulator.AccumulatedEntity) entity).encodedEntity());
+ }
+ };
+ } else {
+ this.publishVersion = VERSION_2;
+ this.writeCallback =
+ new Client.OutboundEntityWriteCallback() {
+ @Override
+ public int write(ByteBuf bb, Object entity, long publishingId) {
+ MessageAccumulator.AccumulatedEntity accumulatedEntity =
+ (MessageAccumulator.AccumulatedEntity) entity;
+ unconfirmedMessages.put(publishingId, accumulatedEntity);
+ return delegateWriteCallback.write(bb, accumulatedEntity, publishingId);
+ }
+
+ @Override
+ public int fragmentLength(Object entity) {
+ return delegateWriteCallback.fragmentLength(entity);
+ }
+ };
+ }
- @Override
- public int fragmentLength(Object entity) {
- return delegateWriteCallback.fragmentLength(
- ((MessageAccumulator.AccumulatedEntity) entity).encodedEntity());
- }
- };
if (!batchPublishingDelay.isNegative() && !batchPublishingDelay.isZero()) {
AtomicReference taskReference = new AtomicReference<>();
Runnable task =
@@ -445,7 +477,11 @@ private void publishBatch(boolean stateCheck) {
batchCount++;
}
client.publishInternal(
- this.publisherId, messages, this.writeCallback, this.publishSequenceFunction);
+ this.publishVersion,
+ this.publisherId,
+ messages,
+ this.writeCallback,
+ this.publishSequenceFunction);
}
}
@@ -480,7 +516,11 @@ void running() {
batchCount++;
}
client.publishInternal(
- this.publisherId, messages, this.writeCallback, this.publishSequenceFunction);
+ this.publishVersion,
+ this.publisherId,
+ messages,
+ this.writeCallback,
+ this.publishSequenceFunction);
}
}
publishBatch(false);
@@ -558,4 +598,40 @@ private void checkNotClosed() {
throw new IllegalStateException("This producer instance has been closed");
}
}
+
+ private static final Client.OutboundEntityWriteCallback OUTBOUND_MSG_FILTER_VALUE_WRITE_CALLBACK =
+ new OutboundMessageFilterValueWriterCallback();
+
+ private static final class OutboundMessageFilterValueWriterCallback
+ implements Client.OutboundEntityWriteCallback {
+
+ @Override
+ public int write(ByteBuf bb, Object entity, long publishingId) {
+ AccumulatedEntity accumulatedEntity = (AccumulatedEntity) entity;
+ String filterValue = accumulatedEntity.filterValue();
+ if (filterValue == null) {
+ bb.writeShort(-1);
+ } else {
+ bb.writeShort(filterValue.length());
+ bb.writeBytes(filterValue.getBytes(StandardCharsets.UTF_8));
+ }
+ Codec.EncodedMessage messageToPublish =
+ (Codec.EncodedMessage) accumulatedEntity.encodedEntity();
+ bb.writeInt(messageToPublish.getSize());
+ bb.writeBytes(messageToPublish.getData(), 0, messageToPublish.getSize());
+ return 1;
+ }
+
+ @Override
+ public int fragmentLength(Object entity) {
+ AccumulatedEntity accumulatedEntity = (AccumulatedEntity) entity;
+ Codec.EncodedMessage message = (Codec.EncodedMessage) accumulatedEntity.encodedEntity();
+ String filterValue = accumulatedEntity.filterValue();
+ if (filterValue == null) {
+ return 8 + 2 + 4 + message.getSize();
+ } else {
+ return 8 + 2 + accumulatedEntity.filterValue().length() + 4 + message.getSize();
+ }
+ }
+ }
}
diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamProducerBuilder.java b/src/main/java/com/rabbitmq/stream/impl/StreamProducerBuilder.java
index 074a544a7d..431f77ecee 100644
--- a/src/main/java/com/rabbitmq/stream/impl/StreamProducerBuilder.java
+++ b/src/main/java/com/rabbitmq/stream/impl/StreamProducerBuilder.java
@@ -48,6 +48,8 @@ class StreamProducerBuilder implements ProducerBuilder {
private DefaultRoutingConfiguration routingConfiguration;
+ private Function filterValueExtractor;
+
StreamProducerBuilder(StreamEnvironment environment) {
this.environment = environment;
}
@@ -128,6 +130,12 @@ public ProducerBuilder enqueueTimeout(Duration timeout) {
return this;
}
+ @Override
+ public ProducerBuilder filterValue(Function filterValueExtractor) {
+ this.filterValueExtractor = filterValueExtractor;
+ return this;
+ }
+
@Override
public RoutingConfiguration routing(Function routingKeyExtractor) {
this.routingConfiguration = new DefaultRoutingConfiguration(this);
@@ -150,6 +158,9 @@ public Producer build() {
throw new IllegalArgumentException(
"Sub-entry batching must be enabled to enable compression");
}
+ if (subEntrySize > 1 && filterValueExtractor != null) {
+ throw new IllegalArgumentException("Filtering is not supported with sub-entry batching");
+ }
if (subEntrySize > 1 && compression == null) {
compression = Compression.NONE;
}
@@ -183,6 +194,7 @@ public Producer build() {
maxUnconfirmedMessages,
confirmTimeout,
enqueueTimeout,
+ filterValueExtractor,
environment);
this.environment.addProducer((StreamProducer) producer);
} else {
diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamStreamCreator.java b/src/main/java/com/rabbitmq/stream/impl/StreamStreamCreator.java
index 0b7f0fd559..ed48565ec1 100644
--- a/src/main/java/com/rabbitmq/stream/impl/StreamStreamCreator.java
+++ b/src/main/java/com/rabbitmq/stream/impl/StreamStreamCreator.java
@@ -67,6 +67,12 @@ public StreamCreator leaderLocator(LeaderLocator leaderLocator) {
return this;
}
+ @Override
+ public StreamCreator filterSize(int size) {
+ streamParametersBuilder.filterSize(size);
+ return this;
+ }
+
@Override
public void create() {
if (stream == null) {
diff --git a/src/main/java/com/rabbitmq/stream/impl/SubEntryMessageAccumulator.java b/src/main/java/com/rabbitmq/stream/impl/SubEntryMessageAccumulator.java
index b2045c8df7..9533f2a922 100644
--- a/src/main/java/com/rabbitmq/stream/impl/SubEntryMessageAccumulator.java
+++ b/src/main/java/com/rabbitmq/stream/impl/SubEntryMessageAccumulator.java
@@ -40,7 +40,7 @@ public SubEntryMessageAccumulator(
int maxFrameSize,
ToLongFunction publishSequenceFunction,
Clock clock) {
- super(subEntrySize * batchSize, codec, maxFrameSize, publishSequenceFunction, clock);
+ super(subEntrySize * batchSize, codec, maxFrameSize, publishSequenceFunction, null, clock);
this.subEntrySize = subEntrySize;
this.compressionCodec = compressionCodec;
this.compression = compressionCodec == null ? Compression.NONE.code() : compressionCodec.code();
@@ -111,6 +111,11 @@ public long publishindId() {
return publishingId;
}
+ @Override
+ public String filterValue() {
+ return null;
+ }
+
@Override
public Object encodedEntity() {
return encodedMessageBatch;
diff --git a/src/main/java/com/rabbitmq/stream/impl/Utils.java b/src/main/java/com/rabbitmq/stream/impl/Utils.java
index c5f258ab37..a21b23e413 100644
--- a/src/main/java/com/rabbitmq/stream/impl/Utils.java
+++ b/src/main/java/com/rabbitmq/stream/impl/Utils.java
@@ -65,6 +65,11 @@ final class Utils {
private static final Logger LOGGER = LoggerFactory.getLogger(Utils.class);
private static final Map CONSTANT_LABELS;
+ static final String SUBSCRIPTION_PROPERTY_SAC = "single-active-consumer";
+ static final String SUBSCRIPTION_PROPERTY_SUPER_STREAM = "super-stream";
+ static final String SUBSCRIPTION_PROPERTY_FILTER_PREFIX = "filter.";
+ static final String SUBSCRIPTION_PROPERTY_MATCH_UNFILTERED = "match-unfiltered";
+
static {
Map labels = new HashMap<>();
Arrays.stream(Constants.class.getDeclaredFields())
@@ -119,6 +124,15 @@ static boolean isSac(Map properties) {
}
}
+ static boolean filteringEnabled(Map properties) {
+ if (properties == null || properties.isEmpty()) {
+ return false;
+ } else {
+ return properties.keySet().stream()
+ .anyMatch(k -> k.startsWith(SUBSCRIPTION_PROPERTY_FILTER_PREFIX));
+ }
+ }
+
static short encodeRequestCode(Short code) {
return code;
}
diff --git a/src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java b/src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java
index 73136cb0fd..cd4a1fa82a 100644
--- a/src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java
+++ b/src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java
@@ -122,6 +122,7 @@ public class StreamPerfTest implements Callable {
@CommandLine.Mixin
private final CommandLine.HelpCommand helpCommand = new CommandLine.HelpCommand();
+
// for testing
private final AddressResolver addressResolver;
private final PrintWriter err, out;
@@ -430,6 +431,18 @@ public class StreamPerfTest implements Callable {
@ArgGroup(exclusive = false, multiplicity = "0..1")
InstanceSyncOptions instanceSyncOptions;
+ @CommandLine.Option(
+ names = {"--filter-value-set", "-fvs"},
+ description = "filter value set for publishers, range (e.g. 1..15) are accepted",
+ converter = Utils.FilterValueSetConverter.class)
+ private List filterValueSet;
+
+ @CommandLine.Option(
+ names = {"--filter-values", "-fv"},
+ description = "filter values for consumers",
+ split = ",")
+ private List filterValues;
+
static class InstanceSyncOptions {
@CommandLine.Option(
@@ -476,6 +489,7 @@ static class InstanceSyncOptions {
private List monitorings;
private volatile Environment environment;
private volatile EventLoopGroup eventLoopGroup;
+
// constructor for completion script generation
public StreamPerfTest() {
this(null, null, null, null);
@@ -586,7 +600,6 @@ public Integer call() throws Exception {
maybeDisplayVersion();
maybeDisplayEnvironmentVariablesHelp();
overridePropertiesWithEnvironmentVariables();
-
Codec codec = createCodec(this.codecClass);
ByteBufAllocator byteBufAllocator = ByteBufAllocator.DEFAULT;
@@ -873,19 +886,43 @@ public Integer call() throws Exception {
producerBuilder.name(producerName).confirmTimeout(Duration.ZERO);
}
- java.util.function.Consumer messageBuilderConsumer;
+ java.util.function.Consumer messageBuilderConsumerTemp;
if (this.superStreams) {
producerBuilder
.superStream(stream)
.routing(msg -> msg.getProperties().getMessageIdAsString());
AtomicLong messageIdSequence = new AtomicLong(0);
- messageBuilderConsumer =
+ messageBuilderConsumerTemp =
mg -> mg.properties().messageId(messageIdSequence.getAndIncrement());
} else {
- messageBuilderConsumer = mg -> {};
+ messageBuilderConsumerTemp = mg -> {};
producerBuilder.stream(stream);
}
+ if (this.filterValueSet != null && this.filterValueSet.size() > 0) {
+ producerBuilder =
+ producerBuilder.filterValue(msg -> msg.getProperties().getTo());
+ List values = new ArrayList<>(this.filterValueSet);
+ AtomicInteger count = new AtomicInteger();
+ int subSetSize = Utils.filteringSubSetSize(values.size());
+ int messageCountCycle = Utils.filteringPublishingCycle(this.rate);
+ List subSet = new ArrayList<>(subSetSize);
+ java.util.function.Consumer filteringMessageBuilderConsumer =
+ b -> {
+ if (Integer.remainderUnsigned(
+ count.getAndIncrement(), messageCountCycle)
+ == 0) {
+ Collections.shuffle(values);
+ subSet.clear();
+ subSet.addAll(values.subList(0, subSetSize));
+ }
+ b.properties()
+ .to(subSet.get(Integer.remainderUnsigned(count.get(), subSetSize)));
+ };
+ messageBuilderConsumerTemp =
+ messageBuilderConsumerTemp.andThen(filteringMessageBuilderConsumer);
+ }
+
Producer producer =
producerBuilder
.subEntrySize(this.subEntrySize)
@@ -895,9 +932,9 @@ public Integer call() throws Exception {
.maxUnconfirmedMessages(this.confirms)
.build();
- AtomicLong messageCount = new AtomicLong(0);
ConfirmationHandler confirmationHandler;
if (this.confirmLatency) {
+ AtomicLong messageCount = new AtomicLong(0);
final PerformanceMetrics metrics = this.performanceMetrics;
final int divisor = Utils.downSamplingDivisor(this.rate);
confirmationHandler =
@@ -933,6 +970,8 @@ public Integer call() throws Exception {
producers.add(producer);
+ java.util.function.Consumer messageBuilderConsumer =
+ messageBuilderConsumerTemp;
return (Runnable)
() -> {
final int msgSize = this.messageSize;
@@ -1048,6 +1087,8 @@ public Integer call() throws Exception {
}
});
+ consumerBuilder = maybeConfigureForFiltering(consumerBuilder);
+
Consumer consumer = consumerBuilder.build();
return consumer;
})
@@ -1124,6 +1165,37 @@ public Integer call() throws Exception {
return 0;
}
+ private ConsumerBuilder maybeConfigureForFiltering(ConsumerBuilder consumerBuilder) {
+ if (this.filterValues != null && this.filterValues.size() > 0) {
+ consumerBuilder =
+ consumerBuilder.filter().values(this.filterValues.toArray(new String[0])).builder();
+
+ if (this.filterValues.size() == 1) {
+ String filterValue = filterValues.get(0);
+ consumerBuilder =
+ consumerBuilder
+ .filter()
+ .postFilter(msg -> filterValue.equals(msg.getProperties().getTo()))
+ .builder();
+ } else {
+ consumerBuilder =
+ consumerBuilder
+ .filter()
+ .postFilter(
+ msg -> {
+ for (String filterValue : this.filterValues) {
+ if (filterValue.equals(msg.getProperties().getTo())) {
+ return true;
+ }
+ }
+ return false;
+ })
+ .builder();
+ }
+ }
+ return consumerBuilder;
+ }
+
private void createStream(Environment environment, String stream) {
StreamCreator streamCreator =
environment.streamCreator().stream(stream)
diff --git a/src/main/java/com/rabbitmq/stream/perf/Utils.java b/src/main/java/com/rabbitmq/stream/perf/Utils.java
index f123134cac..1d38ab3cee 100644
--- a/src/main/java/com/rabbitmq/stream/perf/Utils.java
+++ b/src/main/java/com/rabbitmq/stream/perf/Utils.java
@@ -473,6 +473,33 @@ public BiFunction convert(String input) {
}
}
+ static class FilterValueSetConverter implements ITypeConverter> {
+
+ @Override
+ public List convert(String value) {
+ if (value == null || value.trim().isEmpty()) {
+ return Collections.emptyList();
+ }
+ if (value.contains("..")) {
+ String[] range = value.split("\\.\\.");
+ String errorMessage = "'" + value + "' is not valid, valid example values: 1..10, 1..20";
+ if (range.length != 2) {
+ throw new CommandLine.TypeConversionException(errorMessage);
+ }
+ int start, end;
+ try {
+ start = Integer.parseInt(range[0]);
+ end = Integer.parseInt(range[1]) + 1;
+ return IntStream.range(start, end).mapToObj(String::valueOf).collect(Collectors.toList());
+ } catch (NumberFormatException e) {
+ throw new CommandLine.TypeConversionException(errorMessage);
+ }
+ } else {
+ return Arrays.stream(value.split(",")).collect(Collectors.toList());
+ }
+ }
+ }
+
static class SniServerNamesConverter implements ITypeConverter> {
@Override
@@ -859,4 +886,24 @@ static InstanceSynchronization defaultInstanceSynchronization(
throw new RuntimeException(e);
}
}
+
+ static int filteringPublishingCycle(int rate) {
+ if (rate == 0) {
+ return 100_000;
+ } else if (rate <= 10) {
+ return 10;
+ } else {
+ return rate / 10;
+ }
+ }
+
+ static int filteringSubSetSize(int setSize) {
+ if (setSize <= 3) {
+ return 1;
+ } else if (setSize > 10) {
+ return (int) (setSize * 0.70);
+ } else {
+ return setSize - 3;
+ }
+ }
}
diff --git a/src/test/java/com/rabbitmq/stream/benchmark/FilteringBenchmark.java b/src/test/java/com/rabbitmq/stream/benchmark/FilteringBenchmark.java
new file mode 100644
index 0000000000..f12b9c64dc
--- /dev/null
+++ b/src/test/java/com/rabbitmq/stream/benchmark/FilteringBenchmark.java
@@ -0,0 +1,286 @@
+// Copyright (c) 2023 VMware, Inc. or its affiliates. All rights reserved.
+//
+// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
+// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
+// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL,
+// please see LICENSE-APACHE2.
+//
+// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
+// either express or implied. See the LICENSE file for specific language governing
+// rights and limitations of this software.
+//
+// If you have any questions regarding licensing, please contact us at
+// info@rabbitmq.com.
+package com.rabbitmq.stream.benchmark;
+
+import static com.rabbitmq.stream.impl.TestUtils.waitAtMost;
+
+import com.codahale.metrics.MetricRegistry;
+import com.google.common.util.concurrent.RateLimiter;
+import com.rabbitmq.stream.*;
+import com.rabbitmq.stream.metrics.DropwizardMetricsCollector;
+import com.rabbitmq.stream.metrics.MetricsCollector;
+import java.time.Duration;
+import java.util.*;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.IntStream;
+
+public class FilteringBenchmark {
+
+ static final String stream = "filtering";
+
+ public static void main(String[] args) throws Exception {
+ int filterValueCount = 200;
+ int filterValueSubsetCount = 80;
+ int rate = 100_000;
+ int filterSize = 64;
+ int batchSize = 100;
+ int maxUnconfirmedMessages = 10_000;
+
+ Duration publishingDuration = Duration.ofSeconds(10);
+ Duration publishingCycle = Duration.ofSeconds(1);
+
+ ScheduledExecutorService scheduledExecutorService =
+ Executors.newSingleThreadScheduledExecutor();
+ try (Environment env = Environment.builder().build()) {
+ try {
+ env.deleteStream(stream);
+ } catch (StreamException e) {
+ // OK
+ }
+ env.streamCreator().stream(stream).filterSize(filterSize).create();
+
+ List filterValues = new ArrayList<>(filterValueCount);
+ IntStream.range(0, filterValueCount)
+ .forEach(i -> filterValues.add(UUID.randomUUID().toString()));
+
+ AtomicLong publishedCount = new AtomicLong(0);
+ AtomicLong confirmedCount = new AtomicLong(0);
+
+ Producer producer =
+ env.producerBuilder().stream(stream)
+ .batchSize(batchSize)
+ .maxUnconfirmedMessages(maxUnconfirmedMessages)
+ .filterValue(msg -> msg.getProperties().getTo())
+ .build();
+
+ AtomicBoolean keepPublishing = new AtomicBoolean(true);
+ scheduledExecutorService.schedule(
+ () -> keepPublishing.set(false), publishingDuration.toMillis(), TimeUnit.MILLISECONDS);
+
+ RateLimiter rateLimiter = RateLimiter.create(rate);
+
+ Random random = new Random();
+ ConfirmationHandler confirmationHandler = status -> confirmedCount.getAndIncrement();
+ System.out.printf(
+ "Starting test, filter values %s, subset %s, filter size %d%n",
+ filterValueCount, filterValueSubsetCount, filterSize);
+ System.out.printf(
+ "Starting publishing for %d second(s) at rate %d, batch size %d, max unconfirmed messages %d...%n",
+ publishingDuration.getSeconds(), rate, batchSize, maxUnconfirmedMessages);
+ while (keepPublishing.get()) {
+ AtomicBoolean keepPublishingInCycle = new AtomicBoolean(true);
+ scheduledExecutorService.schedule(
+ () -> keepPublishingInCycle.set(false),
+ publishingCycle.toMillis(),
+ TimeUnit.MILLISECONDS);
+ Collections.shuffle(filterValues);
+ List filterValueSubset = filterValues.subList(0, filterValueSubsetCount);
+ System.out.printf(
+ "Starting publishing cycle for %d second(s)...%n", publishingCycle.getSeconds());
+ while (keepPublishingInCycle.get()) {
+ rateLimiter.acquire(1);
+ String filterValue = filterValueSubset.get(random.nextInt(filterValueSubsetCount));
+ producer.send(
+ producer.messageBuilder().properties().to(filterValue).messageBuilder().build(),
+ confirmationHandler);
+ publishedCount.getAndIncrement();
+ }
+ }
+ System.out.println("Done publishing, waiting for all confirmations...");
+ waitAtMost(() -> publishedCount.get() == confirmedCount.get());
+
+ System.out.println("Starting consuming...");
+
+ List values = filterValues.subList(0, 10);
+ for (String filterValue : values) {
+ Duration timeout = Duration.ofSeconds(30);
+ System.out.printf("For filter value %s%n", filterValue);
+ MetricRegistry registry = new MetricRegistry();
+ MetricsCollector collector = new DropwizardMetricsCollector(registry);
+ AtomicLong unfilteredTargetMessageCount = new AtomicLong(0);
+ Duration unfilteredDuration;
+ try (Environment e = Environment.builder().metricsCollector(collector).build()) {
+ AtomicBoolean hasReceivedSomething = new AtomicBoolean(false);
+ AtomicLong lastReceived = new AtomicLong(0);
+ long s = System.nanoTime();
+ e.consumerBuilder().stream(stream)
+ .offset(OffsetSpecification.first())
+ .messageHandler(
+ (ctx, msg) -> {
+ hasReceivedSomething.set(true);
+ lastReceived.set(System.nanoTime());
+ if (filterValue.equals(msg.getProperties().getTo())) {
+ unfilteredTargetMessageCount.getAndIncrement();
+ }
+ })
+ .build();
+ waitAtMost(
+ timeout,
+ () ->
+ hasReceivedSomething.get()
+ && System.nanoTime() - lastReceived.get() > Duration.ofSeconds(1).toNanos());
+ unfilteredDuration = Duration.ofNanos(System.nanoTime() - s);
+ }
+
+ long unfilteredChunkCount = registry.getMeters().get("rabbitmq.stream.chunk").getCount();
+ long unfilteredMessageCount =
+ registry.getMeters().get("rabbitmq.stream.consumed").getCount();
+
+ AtomicInteger chunkFilteredMessages = new AtomicInteger(0);
+ AtomicInteger chunkMessageCount = new AtomicInteger(0);
+ AtomicInteger chunkWithNoMessagesCount = new AtomicInteger(0);
+ AtomicBoolean firstChunk = new AtomicBoolean(true);
+ AtomicLong droppedMessages = new AtomicLong(0);
+ registry = new MetricRegistry();
+ collector = new DropwizardMetricsCollector(registry);
+ collector =
+ new DelegatingMetricsCollector(collector) {
+
+ @Override
+ public void chunk(int entriesCount) {
+ if (firstChunk.get()) {
+ firstChunk.set(false);
+ } else {
+ if (chunkMessageCount.get() == chunkFilteredMessages.get()) {
+ chunkWithNoMessagesCount.incrementAndGet();
+ }
+ chunkFilteredMessages.set(0);
+ chunkMessageCount.set(entriesCount);
+ }
+ super.chunk(entriesCount);
+ }
+ };
+ AtomicLong filteredTargetMessageCount = new AtomicLong(0);
+ Duration filteredDuration;
+ try (Environment e = Environment.builder().metricsCollector(collector).build()) {
+ AtomicBoolean hasReceivedSomething = new AtomicBoolean(false);
+ AtomicLong lastReceived = new AtomicLong(0);
+ long s = System.nanoTime();
+ AtomicLong chunkId = new AtomicLong(-1);
+ e.consumerBuilder().stream(stream)
+ .offset(OffsetSpecification.first())
+ .filter()
+ .values(filterValue)
+ .postFilter(
+ msg -> {
+ boolean shouldPass = filterValue.equals(msg.getProperties().getTo());
+ if (!shouldPass) {
+ droppedMessages.getAndIncrement();
+ chunkFilteredMessages.getAndIncrement();
+ }
+ return shouldPass;
+ })
+ .builder()
+ .messageHandler(
+ (ctx, msg) -> {
+ if (chunkId.get() == -1 || chunkId.get() != ctx.committedChunkId()) {}
+
+ hasReceivedSomething.set(true);
+ lastReceived.set(System.nanoTime());
+ filteredTargetMessageCount.getAndIncrement();
+ })
+ .build();
+ waitAtMost(
+ timeout,
+ () ->
+ hasReceivedSomething.get()
+ && System.nanoTime() - lastReceived.get() > Duration.ofSeconds(1).toNanos());
+ filteredDuration = Duration.ofNanos(System.nanoTime() - s);
+ }
+ long filteredChunkCount = registry.getMeters().get("rabbitmq.stream.chunk").getCount();
+ long filteredMessageCount = registry.getMeters().get("rabbitmq.stream.consumed").getCount();
+ System.out.printf(
+ "consumed in %d / %d ms, target messages %d / %d, chunk count %d / %d (%d %%), messages %d / %d (%d %%)%n",
+ unfilteredDuration.toMillis(),
+ filteredDuration.toMillis(),
+ unfilteredTargetMessageCount.get(),
+ filteredTargetMessageCount.get(),
+ unfilteredChunkCount,
+ filteredChunkCount,
+ (unfilteredChunkCount - filteredChunkCount) * 100 / unfilteredChunkCount,
+ unfilteredMessageCount,
+ filteredMessageCount,
+ (unfilteredMessageCount - filteredMessageCount) * 100 / unfilteredMessageCount);
+ System.out.printf(
+ "chunk without matching messages %d / %d, dropped messages %d / %d%n",
+ chunkWithNoMessagesCount.get(),
+ filteredChunkCount,
+ droppedMessages.getAndIncrement(),
+ filteredMessageCount);
+ }
+
+ } finally {
+ scheduledExecutorService.shutdownNow();
+ }
+ }
+
+ private static class DelegatingMetricsCollector implements MetricsCollector {
+
+ private final MetricsCollector delegate;
+
+ private DelegatingMetricsCollector(MetricsCollector delegate) {
+ this.delegate = delegate;
+ }
+
+ @Override
+ public void openConnection() {
+ this.delegate.openConnection();
+ }
+
+ @Override
+ public void closeConnection() {
+ this.delegate.closeConnection();
+ }
+
+ @Override
+ public void publish(int count) {
+ this.delegate.publish(count);
+ }
+
+ @Override
+ public void publishConfirm(int count) {
+ this.delegate.publishConfirm(count);
+ }
+
+ @Override
+ public void publishError(int count) {
+ this.delegate.publishError(count);
+ }
+
+ @Override
+ public void chunk(int entriesCount) {
+ this.delegate.chunk(entriesCount);
+ }
+
+ @Override
+ public void consume(long count) {
+ this.delegate.consume(count);
+ }
+
+ @Override
+ public void writtenBytes(int writtenBytes) {
+ this.delegate.writtenBytes(writtenBytes);
+ }
+
+ @Override
+ public void readBytes(int readBytes) {
+ this.delegate.readBytes(readBytes);
+ }
+ }
+}
diff --git a/src/test/java/com/rabbitmq/stream/docs/FilteringUsage.java b/src/test/java/com/rabbitmq/stream/docs/FilteringUsage.java
new file mode 100644
index 0000000000..cd123f77db
--- /dev/null
+++ b/src/test/java/com/rabbitmq/stream/docs/FilteringUsage.java
@@ -0,0 +1,69 @@
+// Copyright (c) 2023 VMware, Inc. or its affiliates. All rights reserved.
+//
+// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
+// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
+// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL,
+// please see LICENSE-APACHE2.
+//
+// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
+// either express or implied. See the LICENSE file for specific language governing
+// rights and limitations of this software.
+//
+// If you have any questions regarding licensing, please contact us at
+// info@rabbitmq.com.
+
+package com.rabbitmq.stream.docs;
+
+import com.rabbitmq.stream.Consumer;
+import com.rabbitmq.stream.Environment;
+import com.rabbitmq.stream.Producer;
+
+public class FilteringUsage {
+
+ void producerSimple() {
+ Environment environment = Environment.builder().build();
+ // tag::producer-simple[]
+ Producer producer = environment.producerBuilder()
+ .stream("invoices")
+ .filterValue(msg ->
+ msg.getApplicationProperties().get("state").toString()) // <1>
+ .build();
+ // end::producer-simple[]
+ }
+
+ void consumerSimple() {
+ Environment environment = Environment.builder().build();
+ // tag::consumer-simple[]
+ String filterValue = "california";
+ Consumer consumer = environment.consumerBuilder()
+ .stream("invoices")
+ .filter()
+ .values(filterValue) // <1>
+ .postFilter(msg ->
+ filterValue.equals(msg.getApplicationProperties().get("state"))) // <2>
+ .builder()
+ .messageHandler((ctx, msg) -> { })
+ .build();
+ // end::consumer-simple[]
+ }
+
+ void consumerMatchUnfiltered() {
+ Environment environment = Environment.builder().build();
+ // tag::consumer-match-unfiltered[]
+ String filterValue = "california";
+ Consumer consumer = environment.consumerBuilder()
+ .stream("invoices")
+ .filter()
+ .values(filterValue) // <1>
+ .matchUnfiltered() // <2>
+ .postFilter(msg ->
+ filterValue.equals(msg.getApplicationProperties().get("state"))
+ || !msg.getApplicationProperties().containsKey("state") // <3>
+ )
+ .builder()
+ .messageHandler((ctx, msg) -> { })
+ .build();
+ // end::consumer-match-unfiltered[]
+ }
+
+}
diff --git a/src/test/java/com/rabbitmq/stream/impl/Amqp10InteroperabilityTest.java b/src/test/java/com/rabbitmq/stream/impl/Amqp10InteroperabilityTest.java
index 166abb5f00..e60d105dbb 100644
--- a/src/test/java/com/rabbitmq/stream/impl/Amqp10InteroperabilityTest.java
+++ b/src/test/java/com/rabbitmq/stream/impl/Amqp10InteroperabilityTest.java
@@ -14,7 +14,6 @@
package com.rabbitmq.stream.impl;
import static com.rabbitmq.stream.impl.TestUtils.ClientFactory;
-import static com.rabbitmq.stream.impl.TestUtils.StreamTestInfrastructureExtension;
import static com.rabbitmq.stream.impl.TestUtils.latchAssert;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -27,12 +26,12 @@
import com.rabbitmq.stream.impl.Client.ClientParameters;
import com.rabbitmq.stream.impl.TestUtils.BrokerVersion;
import com.rabbitmq.stream.impl.TestUtils.BrokerVersionAtLeast;
-import com.rabbitmq.stream.impl.TestUtils.DisabledIfAmqp10NotEnabled;
import com.swiftmq.amqp.AMQPContext;
import com.swiftmq.amqp.v100.client.Connection;
import com.swiftmq.amqp.v100.client.Producer;
import com.swiftmq.amqp.v100.client.QoS;
import com.swiftmq.amqp.v100.client.Session;
+import com.swiftmq.amqp.v100.generated.messaging.delivery_state.*;
import com.swiftmq.amqp.v100.generated.messaging.message_format.AmqpSequence;
import com.swiftmq.amqp.v100.generated.messaging.message_format.AmqpValue;
import com.swiftmq.amqp.v100.generated.messaging.message_format.ApplicationProperties;
@@ -61,8 +60,8 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
-@ExtendWith(StreamTestInfrastructureExtension.class)
-@DisabledIfAmqp10NotEnabled
+@ExtendWith(TestUtils.StreamTestInfrastructureExtension.class)
+@TestUtils.DisabledIfAmqp10NotEnabled
public class Amqp10InteroperabilityTest {
String stream;
diff --git a/src/test/java/com/rabbitmq/stream/impl/ClientTest.java b/src/test/java/com/rabbitmq/stream/impl/ClientTest.java
index 22e80abd8a..588b5ac290 100644
--- a/src/test/java/com/rabbitmq/stream/impl/ClientTest.java
+++ b/src/test/java/com/rabbitmq/stream/impl/ClientTest.java
@@ -13,8 +13,7 @@
// info@rabbitmq.com.
package com.rabbitmq.stream.impl;
-import static com.rabbitmq.stream.impl.TestUtils.ResponseConditions.ko;
-import static com.rabbitmq.stream.impl.TestUtils.ResponseConditions.responseCode;
+import static com.rabbitmq.stream.impl.TestUtils.ResponseConditions.*;
import static com.rabbitmq.stream.impl.TestUtils.b;
import static com.rabbitmq.stream.impl.TestUtils.latchAssert;
import static com.rabbitmq.stream.impl.TestUtils.streamName;
@@ -43,6 +42,7 @@
import com.rabbitmq.stream.impl.ServerFrameHandler.FrameHandlerInfo;
import com.rabbitmq.stream.impl.TestUtils.BrokerVersion;
import com.rabbitmq.stream.impl.TestUtils.BrokerVersionAtLeast;
+import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.UnpooledByteBufAllocator;
import java.io.ByteArrayOutputStream;
@@ -52,14 +52,7 @@
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-import java.util.UUID;
+import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
@@ -68,6 +61,7 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.LongConsumer;
+import java.util.function.ToLongFunction;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.junit.jupiter.api.Test;
@@ -978,4 +972,132 @@ void streamStatsFirstOffsetShouldChangeAfterRetentionKickedIn(TestInfo info) {
}
}
}
+
+ @Test
+ void publishConsumeWithFilterValueShouldSendSubSetOfStream() throws Exception {
+ int messageCount = 10_000;
+ AtomicReference publishLatch =
+ new AtomicReference<>(new CountDownLatch(messageCount));
+ Client publisher =
+ cf.get(
+ new ClientParameters()
+ .publishConfirmListener(
+ (publisherId, publishingId) -> publishLatch.get().countDown()));
+ AtomicLong sequence = new AtomicLong(0);
+ ToLongFunction sequenceFunction = msg -> sequence.getAndIncrement();
+ Response response = publisher.declarePublisher(b(0), null, stream);
+ assertThat(response).is(ok());
+
+ // firt wave of messages with several filter values
+ List filterValues = new ArrayList<>(Arrays.asList("apple", "banana", "pear"));
+ Map filterValueCount = new HashMap<>();
+ Random random = new Random();
+ class Entity {
+ private final Codec.EncodedMessage encodedMessage;
+ private final String filterValue;
+
+ Entity(Codec.EncodedMessage encodedMessage, String filterValue) {
+ this.encodedMessage = encodedMessage;
+ this.filterValue = filterValue;
+ }
+ }
+
+ Client.OutboundEntityWriteCallback writeCallback =
+ new Client.OutboundEntityWriteCallback() {
+ @Override
+ public int write(ByteBuf bb, Object obj, long publishingId) {
+ Entity msg = (Entity) obj;
+ bb.writeShort(msg.filterValue.length());
+ bb.writeBytes(msg.filterValue.getBytes(StandardCharsets.UTF_8));
+ Codec.EncodedMessage messageToPublish = msg.encodedMessage;
+ bb.writeInt(messageToPublish.getSize());
+ bb.writeBytes(messageToPublish.getData(), 0, messageToPublish.getSize());
+ return 1;
+ }
+
+ @Override
+ public int fragmentLength(Object obj) {
+ Entity msg = (Entity) obj;
+ return 8 + 2 + msg.filterValue.length() + 4 + msg.encodedMessage.getSize();
+ }
+ };
+ int batchSize = 100;
+ List messages = new ArrayList<>(batchSize);
+ Runnable write =
+ () ->
+ publisher.publishInternal(
+ Constants.VERSION_2, b(0), messages, writeCallback, sequenceFunction);
+ Runnable insert =
+ () -> {
+ byte[] data = "hello".getBytes(StandardCharsets.UTF_8);
+ int publishedMessageCount = 0;
+ while (publishedMessageCount < messageCount) {
+ String filterValue = filterValues.get(random.nextInt(filterValues.size()));
+ filterValueCount
+ .computeIfAbsent(filterValue, k -> new AtomicInteger())
+ .incrementAndGet();
+ Message message =
+ publisher
+ .codec()
+ .messageBuilder()
+ .addData(data)
+ .properties()
+ .groupId(filterValue)
+ .messageBuilder()
+ .build();
+ Entity entity = new Entity(publisher.codec().encode(message), filterValue);
+ messages.add(entity);
+ publishedMessageCount++;
+
+ if (messages.size() == batchSize) {
+ write.run();
+ messages.clear();
+ }
+ }
+ if (!messages.isEmpty()) {
+ write.run();
+ }
+ };
+
+ insert.run();
+ assertThat(latchAssert(publishLatch)).completes();
+
+ // second wave of messages, with only one, new filter value
+ String newFilterValue = "orange";
+ filterValues.clear();
+ filterValues.add(newFilterValue);
+ publishLatch.set(new CountDownLatch(messageCount));
+ insert.run();
+ assertThat(latchAssert(publishLatch)).completes();
+
+ AtomicInteger consumedMessageCount = new AtomicInteger(0);
+ AtomicInteger filteredConsumedMessageCount = new AtomicInteger(0);
+ Client consumer =
+ cf.get(
+ new ClientParameters()
+ .chunkListener(
+ (client, subscriptionId, offset, messageCount1, dataSize) ->
+ client.credit(subscriptionId, 1))
+ .messageListener(
+ (subscriptionId, offset, chunkTimestamp, committedChunkId, message) -> {
+ consumedMessageCount.incrementAndGet();
+ String filterValue = message.getProperties().getGroupId();
+ if (newFilterValue.equals(filterValue)) {
+ filteredConsumedMessageCount.incrementAndGet();
+ }
+ }));
+
+ // consume only messages with filter value from second wave
+ consumer.subscribe(
+ b(0),
+ stream,
+ OffsetSpecification.first(),
+ 1,
+ Collections.singletonMap("filter.1", newFilterValue));
+
+ int expectedCount = filterValueCount.get(newFilterValue).get();
+ waitAtMost(() -> filteredConsumedMessageCount.get() == expectedCount);
+ // we should get messages only from the "second" part of the stream
+ assertThat(consumedMessageCount).hasValueLessThan(messageCount * 2);
+ }
}
diff --git a/src/test/java/com/rabbitmq/stream/impl/FilteringTest.java b/src/test/java/com/rabbitmq/stream/impl/FilteringTest.java
new file mode 100644
index 0000000000..5abf9b2bd6
--- /dev/null
+++ b/src/test/java/com/rabbitmq/stream/impl/FilteringTest.java
@@ -0,0 +1,344 @@
+// Copyright (c) 2023 VMware, Inc. or its affiliates. All rights reserved.
+//
+// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
+// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
+// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL,
+// please see LICENSE-APACHE2.
+//
+// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
+// either express or implied. See the LICENSE file for specific language governing
+// rights and limitations of this software.
+//
+// If you have any questions regarding licensing, please contact us at
+// info@rabbitmq.com.
+package com.rabbitmq.stream.impl;
+
+import static com.rabbitmq.stream.impl.TestUtils.*;
+import static java.util.Collections.singletonMap;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import com.rabbitmq.client.*;
+import com.rabbitmq.client.AMQP.BasicProperties.Builder;
+import com.rabbitmq.stream.*;
+import com.rabbitmq.stream.Consumer;
+import io.netty.channel.EventLoopGroup;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.*;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
+import java.util.stream.IntStream;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+import org.junit.jupiter.params.provider.NullSource;
+import org.junit.jupiter.params.provider.ValueSource;
+
+@DisabledIfFilteringNotSupported
+@ExtendWith(TestUtils.StreamTestInfrastructureExtension.class)
+public class FilteringTest {
+
+ private static final Duration CONDITION_TIMEOUT = Duration.ofSeconds(5);
+
+ static final int messageCount = 10_000;
+
+ EventLoopGroup eventLoopGroup;
+
+ Environment environment;
+
+ String stream;
+
+ @BeforeEach
+ void init() throws Exception {
+ EnvironmentBuilder environmentBuilder =
+ Environment.builder().netty().eventLoopGroup(eventLoopGroup).environmentBuilder();
+ environmentBuilder.addressResolver(add -> localhost());
+ environment = environmentBuilder.build();
+ }
+
+ @AfterEach
+ void tearDown() throws Exception {
+ environment.close();
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = "foo")
+ @NullSource
+ void publishConsume(String producerName) throws Exception {
+ repeatIfFailure(
+ () -> {
+ List filterValues = new ArrayList<>(Arrays.asList("apple", "banana", "pear"));
+ Map filterValueCount = new HashMap<>();
+ Random random = new Random();
+
+ Runnable insert =
+ () ->
+ publish(
+ messageCount,
+ producerName,
+ () -> {
+ String filterValue = filterValues.get(random.nextInt(filterValues.size()));
+ filterValueCount
+ .computeIfAbsent(filterValue, k -> new AtomicInteger())
+ .incrementAndGet();
+ return filterValue;
+ });
+ insert.run();
+
+ // second wave of messages, with only one, new filter value
+ String newFilterValue = "orange";
+ filterValues.clear();
+ filterValues.add(newFilterValue);
+ insert.run();
+
+ AtomicInteger receivedMessageCount = new AtomicInteger(0);
+ AtomicInteger filteredConsumedMessageCount = new AtomicInteger(0);
+ try (Consumer ignored =
+ consumerBuilder()
+ .filter()
+ .values(newFilterValue)
+ .postFilter(
+ m -> {
+ receivedMessageCount.incrementAndGet();
+ return newFilterValue.equals(m.getProperties().getGroupId());
+ })
+ .builder()
+ .messageHandler(
+ (context, message) -> filteredConsumedMessageCount.incrementAndGet())
+ .build()) {
+ int expectedCount = filterValueCount.get(newFilterValue).get();
+ waitAtMost(
+ CONDITION_TIMEOUT, () -> filteredConsumedMessageCount.get() == expectedCount);
+ assertThat(receivedMessageCount).hasValueLessThan(messageCount * 2);
+ }
+ });
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = "foo")
+ @NullSource
+ void publishWithNullFilterValuesShouldBePossible(String producerName) throws Exception {
+ repeatIfFailure(
+ () -> {
+ publish(messageCount, producerName, () -> null);
+
+ CountDownLatch consumeLatch = new CountDownLatch(messageCount);
+ try (Consumer ignored =
+ consumerBuilder().messageHandler((ctx, msg) -> consumeLatch.countDown()).build()) {
+ latchAssert(consumeLatch).completes(CONDITION_TIMEOUT);
+ }
+ });
+ }
+
+ @ParameterizedTest
+ @CsvSource({"foo,true", "foo,false", ",true", ",false"})
+ void matchUnfilteredShouldReturnNullFilteredValueAndFilteredValues(
+ String producerName, boolean matchUnfiltered) throws Exception {
+ repeatIfFailure(
+ () -> {
+ publish(messageCount, producerName, () -> null);
+
+ List filterValues = new ArrayList<>(Arrays.asList("apple", "banana", "pear"));
+ Map filterValueCount = new HashMap<>();
+ Random random = new Random();
+ publish(
+ messageCount,
+ producerName,
+ () -> {
+ String filterValue = filterValues.get(random.nextInt(filterValues.size()));
+ filterValueCount
+ .computeIfAbsent(filterValue, k -> new AtomicInteger())
+ .incrementAndGet();
+ return filterValue;
+ });
+
+ publish(messageCount, producerName, () -> null);
+
+ AtomicInteger receivedMessageCount = new AtomicInteger(0);
+ Set receivedFilterValues = ConcurrentHashMap.newKeySet();
+ try (Consumer ignored =
+ consumerBuilder()
+ .filter()
+ .values(filterValues.get(0))
+ .matchUnfiltered(matchUnfiltered)
+ .postFilter(m -> true)
+ .builder()
+ .messageHandler(
+ (ctx, msg) -> {
+ receivedFilterValues.add(
+ msg.getProperties().getGroupId() == null
+ ? "null"
+ : msg.getProperties().getGroupId());
+ receivedMessageCount.incrementAndGet();
+ })
+ .build()) {
+ int expected;
+ if (matchUnfiltered) {
+ expected = messageCount * 2;
+ } else {
+ expected = messageCount;
+ }
+ waitAtMost(CONDITION_TIMEOUT, () -> receivedMessageCount.get() >= expected);
+ }
+ });
+ }
+
+ @Test
+ void setFilterSizeOnCreation(TestInfo info) {
+ String s = streamName(info);
+ this.environment.streamCreator().stream(s).filterSize(128).create();
+ this.environment.deleteStream(s);
+ assertThatThrownBy(() -> this.environment.streamCreator().filterSize(15))
+ .isInstanceOf(IllegalArgumentException.class);
+ assertThatThrownBy(() -> this.environment.streamCreator().filterSize(256))
+ .isInstanceOf(IllegalArgumentException.class);
+ }
+
+ @Test
+ void publishConsumeAmqp() throws Exception {
+ int messageCount = 1000;
+ repeatIfFailure(
+ () -> {
+ List filterValues = new ArrayList<>(Arrays.asList("apple", "banana", "pear"));
+ Map filterValueCount = new HashMap<>();
+ Random random = new Random();
+
+ try (Connection c = new ConnectionFactory().newConnection()) {
+ Callable insert =
+ () -> {
+ publishAmqp(
+ c,
+ messageCount,
+ () -> {
+ String filterValue = filterValues.get(random.nextInt(filterValues.size()));
+ filterValueCount
+ .computeIfAbsent(filterValue, k -> new AtomicInteger())
+ .incrementAndGet();
+ return filterValue;
+ });
+ return null;
+ };
+ insert.call();
+
+ // second wave of messages, with only one, new filter value
+ String newFilterValue = "orange";
+ filterValues.clear();
+ filterValues.add(newFilterValue);
+ insert.call();
+
+ AtomicInteger receivedMessageCount = new AtomicInteger(0);
+ AtomicInteger filteredConsumedMessageCount = new AtomicInteger(0);
+ Channel ch = c.createChannel();
+ ch.basicQos(10);
+ Map arguments = new HashMap<>();
+ arguments.put("x-stream-filter", newFilterValue);
+ arguments.put("x-stream-offset", 0);
+ ch.basicConsume(
+ stream,
+ false,
+ arguments,
+ new DefaultConsumer(ch) {
+ @Override
+ public void handleDelivery(
+ String consumerTag,
+ Envelope envelope,
+ AMQP.BasicProperties properties,
+ byte[] body)
+ throws IOException {
+ receivedMessageCount.incrementAndGet();
+ String filterValue =
+ properties.getHeaders().get("x-stream-filter-value").toString();
+ if (newFilterValue.equals(filterValue)) {
+ filteredConsumedMessageCount.incrementAndGet();
+ }
+ ch.basicAck(envelope.getDeliveryTag(), false);
+ }
+ });
+ int expectedCount = filterValueCount.get(newFilterValue).get();
+ waitAtMost(
+ CONDITION_TIMEOUT, () -> filteredConsumedMessageCount.get() == expectedCount);
+ assertThat(receivedMessageCount).hasValueLessThan(messageCount * 2);
+ }
+ });
+ }
+
+ private ProducerBuilder producerBuilder() {
+ return this.environment.producerBuilder().stream(stream);
+ }
+
+ private ConsumerBuilder consumerBuilder() {
+ return this.environment.consumerBuilder().stream(stream).offset(OffsetSpecification.first());
+ }
+
+ private static final AtomicLong PUBLISHING_SEQUENCE = new AtomicLong(0);
+
+ private void publish(
+ int messageCount, String producerName, Supplier filterValueSupplier) {
+ Producer producer =
+ producerBuilder()
+ .name(producerName)
+ .filterValue(m -> m.getProperties().getGroupId())
+ .build();
+ CountDownLatch latch = new CountDownLatch(messageCount);
+ ConfirmationHandler confirmationHandler = ctx -> latch.countDown();
+ IntStream.range(0, messageCount)
+ .forEach(
+ ignored ->
+ producer.send(
+ producer
+ .messageBuilder()
+ .publishingId(PUBLISHING_SEQUENCE.getAndIncrement())
+ .properties()
+ .groupId(filterValueSupplier.get())
+ .messageBuilder()
+ .build(),
+ confirmationHandler));
+ latchAssert(latch).completes(CONDITION_TIMEOUT);
+ producer.close();
+ }
+
+ private void publishAmqp(Connection c, int messageCount, Supplier filterValueSupplier)
+ throws Exception {
+ try (Channel ch = c.createChannel()) {
+ ch.confirmSelect();
+ for (int i = 0; i < messageCount; i++) {
+ ch.basicPublish(
+ "",
+ stream,
+ new Builder()
+ .headers(singletonMap("x-stream-filter-value", filterValueSupplier.get()))
+ .build(),
+ null);
+ }
+ ch.waitForConfirmsOrDie();
+ }
+ }
+
+ private static void repeatIfFailure(RunnableWithException test) throws Exception {
+ int executionCount = 0;
+ Throwable lastException = null;
+ while (executionCount < 5) {
+ try {
+ test.run();
+ return;
+ } catch (Exception | AssertionError e) {
+ executionCount++;
+ lastException = e;
+ }
+ }
+ if (lastException instanceof Error) {
+ throw new RuntimeException(lastException);
+ } else {
+ throw (Exception) lastException;
+ }
+ }
+}
diff --git a/src/test/java/com/rabbitmq/stream/impl/FrameTest.java b/src/test/java/com/rabbitmq/stream/impl/FrameTest.java
index c5319fd1a2..13b5c4a22a 100644
--- a/src/test/java/com/rabbitmq/stream/impl/FrameTest.java
+++ b/src/test/java/com/rabbitmq/stream/impl/FrameTest.java
@@ -21,6 +21,7 @@
import static org.mockito.Mockito.verify;
import com.rabbitmq.stream.Codec;
+import com.rabbitmq.stream.Constants;
import com.rabbitmq.stream.Message;
import com.rabbitmq.stream.Properties;
import io.netty.buffer.ByteBuf;
@@ -152,6 +153,7 @@ public TestDesc(String description, List sizes, int expectedCalls) {
.thenReturn(Mockito.mock(ChannelFuture.class));
client.publishInternal(
+ Constants.VERSION_1,
channel,
b(1),
test.sizes.stream()
diff --git a/src/test/java/com/rabbitmq/stream/impl/StreamProducerUnitTest.java b/src/test/java/com/rabbitmq/stream/impl/StreamProducerUnitTest.java
index 8b9989870f..7677d1984e 100644
--- a/src/test/java/com/rabbitmq/stream/impl/StreamProducerUnitTest.java
+++ b/src/test/java/com/rabbitmq/stream/impl/StreamProducerUnitTest.java
@@ -16,12 +16,7 @@
import static com.rabbitmq.stream.impl.TestUtils.waitAtMost;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyByte;
-import static org.mockito.ArgumentMatchers.anyInt;
-import static org.mockito.ArgumentMatchers.anyList;
-import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.ArgumentMatchers.nullable;
+import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.when;
@@ -90,6 +85,7 @@ void init() {
});
when(client.maxFrameSize()).thenReturn(Integer.MAX_VALUE);
when(client.publishInternal(
+ anyShort(),
anyByte(),
anyList(),
any(OutboundEntityWriteCallback.class),
@@ -97,13 +93,15 @@ void init() {
.thenAnswer(
invocation ->
client.publishInternal(
+ Constants.VERSION_1,
channel,
- invocation.getArgument(0),
invocation.getArgument(1),
invocation.getArgument(2),
- invocation.getArgument(3)));
+ invocation.getArgument(3),
+ invocation.getArgument(4)));
when(client.publishInternal(
+ anyShort(),
any(Channel.class),
anyByte(),
anyList(),
@@ -176,6 +174,7 @@ void confirmTimeoutTaskShouldFailMessagesAfterTimeout(
messageCount * 10,
confirmTimeout,
Duration.ofSeconds(10),
+ null,
env);
IntStream.range(0, messageCount)
@@ -217,6 +216,7 @@ void enqueueTimeoutMessageShouldBeFailedWhenEnqueueTimeoutIsReached(int subEntry
2,
Duration.ofMinutes(1),
enqueueTimeout,
+ null,
env);
AtomicBoolean confirmCalled = new AtomicBoolean(false);
@@ -255,6 +255,7 @@ void enqueueTimeoutSendingShouldBlockWhenEnqueueTimeoutIsZero(int subEntrySize)
2,
Duration.ofMinutes(1),
enqueueTimeout,
+ null,
env);
AtomicBoolean confirmCalled = new AtomicBoolean(false);
diff --git a/src/test/java/com/rabbitmq/stream/impl/TestUtils.java b/src/test/java/com/rabbitmq/stream/impl/TestUtils.java
index 873a6454cb..1e7550a7b5 100644
--- a/src/test/java/com/rabbitmq/stream/impl/TestUtils.java
+++ b/src/test/java/com/rabbitmq/stream/impl/TestUtils.java
@@ -458,6 +458,12 @@ static boolean atLeastVersion(String expectedVersion, String currentVersion) {
}
}
+ @Target({ElementType.TYPE, ElementType.METHOD})
+ @Retention(RetentionPolicy.RUNTIME)
+ @Documented
+ @ExtendWith(DisabledIfFilteringNotSupportedCondition.class)
+ @interface DisabledIfFilteringNotSupported {}
+
@Target({ElementType.TYPE, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@@ -601,6 +607,7 @@ public void beforeEach(ExtensionContext context) throws Exception {
new Client(new Client.ClientParameters().eventLoopGroup(eventLoopGroup(context)));
Client.Response response = client.create(stream);
assertThat(response.isOk()).isTrue();
+ store(context.getRoot()).put("filteringSupported", client.filteringSupported());
client.close();
store(context).put("testMethodStream", stream);
} catch (NoSuchFieldException e) {
@@ -671,7 +678,7 @@ private ExecutorServiceCloseableResourceWrapper() {
}
@Override
- public void close() throws Throwable {
+ public void close() {
this.executorService.shutdownNow();
}
}
@@ -703,6 +710,29 @@ private void close() {
}
}
+ static class DisabledIfFilteringNotSupportedCondition implements ExecutionCondition {
+
+ @Override
+ public ConditionEvaluationResult evaluateExecutionCondition(ExtensionContext context) {
+ Boolean filteringSupported =
+ StreamTestInfrastructureExtension.store(context).get("filteringSupported", Boolean.class);
+ if (filteringSupported == null) {
+ EventLoopGroup eventLoop = StreamTestInfrastructureExtension.eventLoopGroup(context);
+ try (Client client = new Client(new ClientParameters().eventLoopGroup(eventLoop))) {
+ filteringSupported = client.filteringSupported();
+ StreamTestInfrastructureExtension.store(context)
+ .put("filteringSupported", filteringSupported);
+ }
+ }
+
+ if (filteringSupported) {
+ return ConditionEvaluationResult.enabled("filtering is supported");
+ } else {
+ return ConditionEvaluationResult.disabled("filtering is not supported");
+ }
+ }
+ }
+
static class DisabledIfRabbitMqCtlNotSetCondition implements ExecutionCondition {
@Override
diff --git a/src/test/java/com/rabbitmq/stream/perf/StreamPerfTestTest.java b/src/test/java/com/rabbitmq/stream/perf/StreamPerfTestTest.java
index 673557d33e..b1e798caaf 100644
--- a/src/test/java/com/rabbitmq/stream/perf/StreamPerfTestTest.java
+++ b/src/test/java/com/rabbitmq/stream/perf/StreamPerfTestTest.java
@@ -511,6 +511,17 @@ void nativeEpollWorksOnLinux() throws Exception {
assertThat(streamExists(s)).isTrue();
}
+ @Test
+ @BrokerVersionAtLeast(BrokerVersion.RABBITMQ_3_13_0)
+ void shouldNotFailWhenFilteringIsActivated() throws Exception {
+ Future> run = run(builder().filterValueSet("1..15").filterValues("4"));
+ waitUntilStreamExists(s);
+ waitOneSecond();
+ run.cancel(true);
+ waitRunEnds();
+ assertThat(consoleOutput()).containsIgnoringCase("summary:");
+ }
+
private static Consumer wrap(CallableConsumer action) {
return t -> {
try {
@@ -728,6 +739,16 @@ ArgumentsBuilder nativeEpoll() {
return this;
}
+ ArgumentsBuilder filterValueSet(String... values) {
+ arguments.put("filter-value-set", String.join(",", values));
+ return this;
+ }
+
+ ArgumentsBuilder filterValues(String... values) {
+ arguments.put("filter-values", String.join(",", values));
+ return this;
+ }
+
String build() {
return this.arguments.entrySet().stream()
.map(e -> "--" + e.getKey() + (e.getValue().isEmpty() ? "" : (" " + e.getValue())))
diff --git a/src/test/java/com/rabbitmq/stream/perf/UtilsTest.java b/src/test/java/com/rabbitmq/stream/perf/UtilsTest.java
index f3642b7586..a6e09fd0bd 100644
--- a/src/test/java/com/rabbitmq/stream/perf/UtilsTest.java
+++ b/src/test/java/com/rabbitmq/stream/perf/UtilsTest.java
@@ -368,6 +368,27 @@ void commandLineMetricsTest() {
.isEqualTo("-x 1 -y 2");
}
+ @ParameterizedTest
+ @CsvSource({"0,100000", "100,10", "1000,100", "50,5", "10,10", "11,1", "5,10"})
+ void filteringPublishingCycle(int rate, int expected) {
+ assertThat(Utils.filteringPublishingCycle(rate)).isEqualTo(expected);
+ }
+
+ @ParameterizedTest
+ @CsvSource({"3,1", "2,1", "4,1", "7,4", "10,7", "15,10"})
+ void filteringSubSetSize(int setSize, int expected) {
+ assertThat(Utils.filteringSubSetSize(setSize)).isEqualTo(expected);
+ }
+
+ @Test
+ void filterValueSetConverter() throws Exception {
+ CommandLine.ITypeConverter> converter = new Utils.FilterValueSetConverter();
+ assertThat(converter.convert("one")).containsExactly("one");
+ assertThat(converter.convert("one,two,three")).containsExactly("one", "two", "three");
+ assertThat(converter.convert("1..10")).hasSize(10).contains("1", "2", "10");
+ assertThat(converter.convert("5..10")).hasSize(6).contains("5", "6", "10");
+ }
+
@Command(name = "test-command")
static class TestCommand {