diff --git a/pom.xml b/pom.xml index 492a272e19..d6a446a1b0 100644 --- a/pom.xml +++ b/pom.xml @@ -92,7 +92,7 @@ 1.4 1.36 2.37.0 - 1.15.0 + 1.17.0 0.8.10 3.12 diff --git a/src/docs/asciidoc/advanced-topics.adoc b/src/docs/asciidoc/advanced-topics.adoc index 215f4bdf61..044c2661ef 100644 --- a/src/docs/asciidoc/advanced-topics.adoc +++ b/src/docs/asciidoc/advanced-topics.adoc @@ -2,6 +2,90 @@ === Advanced Topics +==== Filtering + +WARNING: Filtering requires *RabbitMQ 3.13* or more. + +RabbitMQ Stream provides a server-side filtering feature that avoids reading all the messages of a stream and filtering only on the client side. +This helps to save network bandwidth when a consuming application needs only a subset of messages, e.g. the messages from a given geographical region. + +The filtering feature works as follows: + +* each message is published with an associated _filter value_ +* a consumer that wants to enable filtering must: +** define one or several filter values +** define some client-side filtering logic + +Why does the consumer need to define some client-side filtering logic? +Because the server-side filtering is probabilistic: messages that do not match the filter value(s) can still be sent to the consumer. +The server uses a https://en.wikipedia.org/wiki/Bloom_filter[Bloom filter], _a space-efficient probabilistic data structure_, where false positives are possible. +Despite this, the filtering saves some bandwidth, which is its primary goal. + +===== Filtering on the Publishing Side + +Filtering on the publishing side consists in defining some logic to extract the filter value from a message. +The following snippet shows how to extract the filter value from an application property: + +.Declaring a producer with logic to extract a filter value from each message +[source,java,indent=0] +-------- +include::{test-examples}/FilteringUsage.java[tag=producer-simple] +-------- +<1> Get filter value from `state` application property + +Note the filter value can be null: the message is then published in a regular way. +It is called in this context an _unfiltered_ message. + +===== Filtering on the Consuming Side + +A consumer needs to set up one or several filter values and some filtering logic to enable filtering. +The filtering logic must be consistent with the filter values. +In the next snippet, the consumer wants to process only messages from the state of California. +It sets a filter value to `california` and a predicate that accepts a message only if the `state` application properties is `california`: + +.Declaring a consumer with a filter value and filtering logic +[source,java,indent=0] +-------- +include::{test-examples}/FilteringUsage.java[tag=consumer-simple] +-------- +<1> Set filter value +<2> Set filtering logic + +The filter logic is a `Predicate`. +It must return `true` if a message is accepted, following the same semantics as `java.util.stream.Stream#filter(Predicate)`. + +As stated above, not all messages must have an associated filter value. +Many applications may not need some filtering, so they can publish messages the regular way. +So a stream can contain messages with and without an associated filter value. + +By default, messages without a filter value (a.k.a _unfiltered_ messages) are not sent to a consumer that enabled filtering. + +But what if a consumer wants to process messages with a filter value and messages without any filter value as well? +It must use the `matchUnfiltered()` method in its declaration and also make sure to keep the filtering logic consistent: + +.Getting unfiltered messages as well when enabling filtering +[source,java,indent=0] +-------- +include::{test-examples}/FilteringUsage.java[tag=consumer-match-unfiltered] +-------- +<1> Request messages from California +<2> Request messages without a filter value as well +<3> Let both types of messages pass + +In the example above, the filtering logic has been adapted to let pass `california` messages _and_ messages without a state set as well. + +===== Considerations on Filtering + +As stated previously, the server can send messages that do not match the filter value(s) set by consumers. +This is why application developers must be very careful with the filtering logic they define to avoid processing unwanted messages. + +What are good candidates for filter values? +Unique identifiers are _not_: if you know a given message property will be unique in a stream, do not use it as a filter value. +A defined set of values shared across the messages is a good candidate: geographical locations (e.g. countries, states), document types in a stream that stores document information (e.g. payslip, invoice, order), categories of products (e.g. book, luggage, toy). + +Cardinality of filter values can be from a few to a few thousands. +Extreme cardinality (a couple or dozens of thousands) can make filtering less efficient. + ==== Using Native `epoll` The stream Java client uses the https://netty.io/[Netty] network framework and its Java NIO transport implementation by default. diff --git a/src/main/java/com/rabbitmq/stream/ConsumerBuilder.java b/src/main/java/com/rabbitmq/stream/ConsumerBuilder.java index b86db48b2e..a7424e2f75 100644 --- a/src/main/java/com/rabbitmq/stream/ConsumerBuilder.java +++ b/src/main/java/com/rabbitmq/stream/ConsumerBuilder.java @@ -14,6 +14,7 @@ package com.rabbitmq.stream; import java.time.Duration; +import java.util.function.Predicate; /** API to configure and create a {@link Consumer}. */ public interface ConsumerBuilder { @@ -151,6 +152,15 @@ public interface ConsumerBuilder { */ ConsumerBuilder noTrackingStrategy(); + /** + * Configure the filtering. + * + *

RabbitMQ 3.13 or more is required. + * + * @return the filtering configuration + */ + FilterConfiguration filter(); + /** * Configure flow of messages. * @@ -242,4 +252,58 @@ interface FlowConfiguration { */ ConsumerBuilder builder(); } + + /** + * Filter configuration. + * + *

RabbitMQ 3.13 or more is required. + */ + interface FilterConfiguration { + + /** + * Set the filter values. + * + * @param filterValues + * @return this filter configuration instance + */ + FilterConfiguration values(String... filterValues); + + /** + * Client-side filtering logic, occurring after the server-side filtering. + * + *

It must be consistent with the requested filter {@link #values( String...)} and the {@link + * #matchUnfiltered()} flag. + * + * @param filter a predicate that returns true if a message should go to the {@link + * MessageHandler} + * @return this filter configuration instance + */ + FilterConfiguration postFilter(Predicate filter); + + /** + * Whether messages without a filter value should be sent as well. + * + *

Default is false. + * + * @return this filter configuration instance + */ + FilterConfiguration matchUnfiltered(); + + /** + * Whether messages without a filter value should be sent as well. + * + *

Default is false. + * + * @param matchUnfiltered + * @return this filter configuration instance + */ + FilterConfiguration matchUnfiltered(boolean matchUnfiltered); + + /** + * Go back to the builder. + * + * @return the consumer builder + */ + ConsumerBuilder builder(); + } } diff --git a/src/main/java/com/rabbitmq/stream/ProducerBuilder.java b/src/main/java/com/rabbitmq/stream/ProducerBuilder.java index be5314da16..cdc942f03d 100644 --- a/src/main/java/com/rabbitmq/stream/ProducerBuilder.java +++ b/src/main/java/com/rabbitmq/stream/ProducerBuilder.java @@ -132,6 +132,16 @@ public interface ProducerBuilder { */ ProducerBuilder enqueueTimeout(Duration timeout); + /** + * Logic to extract a filter value from a message. + * + *

RabbitMQ 3.13 or more is required. + * + * @param filterValueExtractor + * @return this builder instance + */ + ProducerBuilder filterValue(Function filterValueExtractor); + /** * Create the {@link Producer} instance. * diff --git a/src/main/java/com/rabbitmq/stream/StreamCreator.java b/src/main/java/com/rabbitmq/stream/StreamCreator.java index aa61e4ddd2..22250e40d8 100644 --- a/src/main/java/com/rabbitmq/stream/StreamCreator.java +++ b/src/main/java/com/rabbitmq/stream/StreamCreator.java @@ -14,6 +14,7 @@ package com.rabbitmq.stream; import java.time.Duration; +import java.util.function.Function; /** API to configure and create a stream. */ public interface StreamCreator { @@ -63,6 +64,16 @@ public interface StreamCreator { */ StreamCreator leaderLocator(LeaderLocator leaderLocator); + /** + * Set the size of the stream chunk filters. + * + * @param size + * @return this creator instance + * @see ProducerBuilder#filterValue( Function) + * @see ConsumerBuilder#filter() + */ + StreamCreator filterSize(int size); + /** * Create the stream. * diff --git a/src/main/java/com/rabbitmq/stream/impl/Client.java b/src/main/java/com/rabbitmq/stream/impl/Client.java index f85fa9a51a..5e3bcb79af 100644 --- a/src/main/java/com/rabbitmq/stream/impl/Client.java +++ b/src/main/java/com/rabbitmq/stream/impl/Client.java @@ -13,34 +13,7 @@ // info@rabbitmq.com. package com.rabbitmq.stream.impl; -import static com.rabbitmq.stream.Constants.COMMAND_CLOSE; -import static com.rabbitmq.stream.Constants.COMMAND_CONSUMER_UPDATE; -import static com.rabbitmq.stream.Constants.COMMAND_CREATE_STREAM; -import static com.rabbitmq.stream.Constants.COMMAND_CREDIT; -import static com.rabbitmq.stream.Constants.COMMAND_DECLARE_PUBLISHER; -import static com.rabbitmq.stream.Constants.COMMAND_DELETE_PUBLISHER; -import static com.rabbitmq.stream.Constants.COMMAND_DELETE_STREAM; -import static com.rabbitmq.stream.Constants.COMMAND_EXCHANGE_COMMAND_VERSIONS; -import static com.rabbitmq.stream.Constants.COMMAND_HEARTBEAT; -import static com.rabbitmq.stream.Constants.COMMAND_METADATA; -import static com.rabbitmq.stream.Constants.COMMAND_OPEN; -import static com.rabbitmq.stream.Constants.COMMAND_PARTITIONS; -import static com.rabbitmq.stream.Constants.COMMAND_PEER_PROPERTIES; -import static com.rabbitmq.stream.Constants.COMMAND_PUBLISH; -import static com.rabbitmq.stream.Constants.COMMAND_QUERY_OFFSET; -import static com.rabbitmq.stream.Constants.COMMAND_QUERY_PUBLISHER_SEQUENCE; -import static com.rabbitmq.stream.Constants.COMMAND_ROUTE; -import static com.rabbitmq.stream.Constants.COMMAND_SASL_AUTHENTICATE; -import static com.rabbitmq.stream.Constants.COMMAND_SASL_HANDSHAKE; -import static com.rabbitmq.stream.Constants.COMMAND_STORE_OFFSET; -import static com.rabbitmq.stream.Constants.COMMAND_STREAM_STATS; -import static com.rabbitmq.stream.Constants.COMMAND_SUBSCRIBE; -import static com.rabbitmq.stream.Constants.COMMAND_UNSUBSCRIBE; -import static com.rabbitmq.stream.Constants.RESPONSE_CODE_AUTHENTICATION_FAILURE; -import static com.rabbitmq.stream.Constants.RESPONSE_CODE_AUTHENTICATION_FAILURE_LOOPBACK; -import static com.rabbitmq.stream.Constants.RESPONSE_CODE_OK; -import static com.rabbitmq.stream.Constants.RESPONSE_CODE_SASL_CHALLENGE; -import static com.rabbitmq.stream.Constants.VERSION_1; +import static com.rabbitmq.stream.Constants.*; import static com.rabbitmq.stream.impl.Utils.encodeRequestCode; import static com.rabbitmq.stream.impl.Utils.encodeResponseCode; import static com.rabbitmq.stream.impl.Utils.extractResponseCode; @@ -216,6 +189,7 @@ public long applyAsLong(Object value) { private final Duration rpcTimeout; private volatile ShutdownReason shutdownReason = null; private final Runnable exchangeCommandVersionsCheck; + private final boolean filteringSupported; public Client() { this(new ClientParameters()); @@ -398,15 +372,25 @@ public void initChannel(SocketChannel ch) { tuneState.getHeartbeat()); this.connectionProperties = open(parameters.virtualHost); Set supportedCommands = maybeExchangeCommandVersions(); - if (supportedCommands.stream().anyMatch(i -> i.getKey() == COMMAND_STREAM_STATS)) { - this.exchangeCommandVersionsCheck = () -> {}; - } else { - this.exchangeCommandVersionsCheck = - () -> { - throw new UnsupportedOperationException( - "QueryStreamInfo is available only on RabbitMQ 3.11 or more."); - }; - } + AtomicReference exchangeCommandVersionsCheckReference = new AtomicReference<>(); + AtomicBoolean filteringSupportedReference = new AtomicBoolean(false); + supportedCommands.forEach( + c -> { + if (c.getKey() == COMMAND_STREAM_STATS) { + exchangeCommandVersionsCheckReference.set(() -> {}); + } + if (c.getKey() == COMMAND_PUBLISH && c.getMaxVersion() >= VERSION_2) { + filteringSupportedReference.set(true); + } + }); + this.exchangeCommandVersionsCheck = + exchangeCommandVersionsCheckReference.get() == null + ? () -> { + throw new UnsupportedOperationException( + "QueryStreamInfo is available only on RabbitMQ 3.11 or more."); + } + : exchangeCommandVersionsCheckReference.get(); + this.filteringSupported = filteringSupportedReference.get(); started.set(true); this.metricsCollector.openConnection(); } catch (RuntimeException e) { @@ -855,6 +839,7 @@ public List publish( encodedMessages.add(encodedMessage); } return publishInternal( + VERSION_1, this.channel, publisherId, encodedMessages, @@ -881,6 +866,7 @@ public List publish( encodedMessages.add(wrapper); } return publishInternal( + VERSION_1, this.channel, publisherId, encodedMessages, @@ -911,6 +897,7 @@ public List publishBatches( encodedMessageBatches.add(encodedMessageBatch); } return publishInternal( + VERSION_1, this.channel, publisherId, encodedMessageBatches, @@ -947,6 +934,7 @@ public List publishBatches( encodedMessageBatches.add(wrapper); } return publishInternal( + VERSION_1, this.channel, publisherId, encodedMessageBatches, @@ -977,15 +965,17 @@ private void checkMessageBatchFitsInFrame(EncodedMessageBatch encodedMessageBatc } List publishInternal( + short version, byte publisherId, List encodedEntities, OutboundEntityWriteCallback callback, ToLongFunction publishSequenceFunction) { return this.publishInternal( - this.channel, publisherId, encodedEntities, callback, publishSequenceFunction); + version, this.channel, publisherId, encodedEntities, callback, publishSequenceFunction); } List publishInternal( + short version, Channel ch, byte publisherId, List encodedEntities, @@ -1002,6 +992,7 @@ List publishInternal( // the current message/batch does not fit, we're sending the batch int frameLength = length - callback.fragmentLength(encodedEntity); sendEntityBatch( + version, ch, frameLength, publisherId, @@ -1017,6 +1008,7 @@ List publishInternal( currentIndex++; } sendEntityBatch( + version, ch, length, publisherId, @@ -1031,6 +1023,7 @@ List publishInternal( } private void sendEntityBatch( + short version, Channel ch, int frameLength, byte publisherId, @@ -1044,7 +1037,7 @@ private void sendEntityBatch( ByteBuf out = allocateNoCheck(ch.alloc(), frameLength + 4); out.writeInt(frameLength); out.writeShort(encodeRequestCode(COMMAND_PUBLISH)); - out.writeShort(VERSION_1); + out.writeShort(version); out.writeByte(publisherId); int messageCount = 0; out.writeInt(toExcluded - fromIncluded); @@ -1402,6 +1395,10 @@ private String serverAddress() { } } + boolean filteringSupported() { + return this.filteringSupported; + } + public List route(String routingKey, String superStream) { if (routingKey == null || superStream == null) { throw new IllegalArgumentException("routing key and stream must not be null"); @@ -1603,11 +1600,7 @@ private Set maybeExchangeCommandVersions() { Set supported = new HashSet<>(); try { if (Utils.is3_11_OrMore(brokerVersion())) { - for (FrameHandlerInfo info : exchangeCommandVersions()) { - if (info.getKey() == COMMAND_STREAM_STATS) { - supported.add(info); - } - } + supported.addAll(exchangeCommandVersions()); } } catch (Exception e) { LOGGER.info("Error while exchanging command versions: {}", e.getMessage()); @@ -2589,6 +2582,14 @@ public StreamParametersBuilder leaderLocator(LeaderLocator leaderLocator) { return this; } + public StreamParametersBuilder filterSize(int size) { + if (size < 16 || size > 255) { + throw new IllegalArgumentException("Stream filter size must be between 16 and 255"); + } + this.parameters.put("stream-filter-size-bytes", String.valueOf(size)); + return this; + } + public StreamParametersBuilder put(String key, String value) { parameters.put(key, value); return this; diff --git a/src/main/java/com/rabbitmq/stream/impl/MessageAccumulator.java b/src/main/java/com/rabbitmq/stream/impl/MessageAccumulator.java index 25c37fd465..e40f6ada11 100644 --- a/src/main/java/com/rabbitmq/stream/impl/MessageAccumulator.java +++ b/src/main/java/com/rabbitmq/stream/impl/MessageAccumulator.java @@ -32,6 +32,8 @@ interface AccumulatedEntity { long publishindId(); + String filterValue(); + Object encodedEntity(); StreamProducer.ConfirmationCallback confirmationCallback(); diff --git a/src/main/java/com/rabbitmq/stream/impl/SimpleMessageAccumulator.java b/src/main/java/com/rabbitmq/stream/impl/SimpleMessageAccumulator.java index d701710ac0..5f01ea1416 100644 --- a/src/main/java/com/rabbitmq/stream/impl/SimpleMessageAccumulator.java +++ b/src/main/java/com/rabbitmq/stream/impl/SimpleMessageAccumulator.java @@ -21,28 +21,35 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.function.Function; import java.util.function.ToLongFunction; class SimpleMessageAccumulator implements MessageAccumulator { + private static final Function NULL_FILTER_VALUE_EXTRACTOR = m -> null; + protected final BlockingQueue messages; protected final Clock clock; private final int capacity; private final Codec codec; private final int maxFrameSize; private final ToLongFunction publishSequenceFunction; + private final Function filterValueExtractor; SimpleMessageAccumulator( int capacity, Codec codec, int maxFrameSize, ToLongFunction publishSequenceFunction, + Function filterValueExtractor, Clock clock) { this.capacity = capacity; this.messages = new LinkedBlockingQueue<>(capacity); this.codec = codec; this.maxFrameSize = maxFrameSize; this.publishSequenceFunction = publishSequenceFunction; + this.filterValueExtractor = + filterValueExtractor == null ? NULL_FILTER_VALUE_EXTRACTOR : filterValueExtractor; this.clock = clock; } @@ -56,6 +63,7 @@ public boolean add(Message message, ConfirmationHandler confirmationHandler) { new SimpleAccumulatedEntity( clock.time(), publishingId, + this.filterValueExtractor.apply(message), encodedMessage, new SimpleConfirmationCallback(message, confirmationHandler)), 60, @@ -88,17 +96,20 @@ private static final class SimpleAccumulatedEntity implements AccumulatedEntity private final long time; private final long publishingId; + private final String filterValue; private final Codec.EncodedMessage encodedMessage; private final StreamProducer.ConfirmationCallback confirmationCallback; private SimpleAccumulatedEntity( long time, long publishingId, + String filterValue, Codec.EncodedMessage encodedMessage, StreamProducer.ConfirmationCallback confirmationCallback) { this.time = time; this.publishingId = publishingId; this.encodedMessage = encodedMessage; + this.filterValue = filterValue; this.confirmationCallback = confirmationCallback; } @@ -107,6 +118,11 @@ public long publishindId() { return publishingId; } + @Override + public String filterValue() { + return filterValue; + } + @Override public Object encodedEntity() { return encodedMessage; diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamConsumer.java b/src/main/java/com/rabbitmq/stream/impl/StreamConsumer.java index 2e63b11e00..f2368d3f94 100644 --- a/src/main/java/com/rabbitmq/stream/impl/StreamConsumer.java +++ b/src/main/java/com/rabbitmq/stream/impl/StreamConsumer.java @@ -18,15 +18,8 @@ import static com.rabbitmq.stream.impl.Utils.offsetBefore; import static java.time.Duration.ofMillis; -import com.rabbitmq.stream.Constants; -import com.rabbitmq.stream.Consumer; -import com.rabbitmq.stream.ConsumerUpdateListener; -import com.rabbitmq.stream.MessageHandler; +import com.rabbitmq.stream.*; import com.rabbitmq.stream.MessageHandler.Context; -import com.rabbitmq.stream.NoOffsetException; -import com.rabbitmq.stream.OffsetSpecification; -import com.rabbitmq.stream.StreamException; -import com.rabbitmq.stream.SubscriptionListener; import com.rabbitmq.stream.impl.Client.QueryOffsetResponse; import com.rabbitmq.stream.impl.StreamConsumerBuilder.TrackingConfiguration; import com.rabbitmq.stream.impl.StreamEnvironment.TrackingConsumerRegistration; @@ -83,7 +76,9 @@ class StreamConsumer implements Consumer { ConsumerUpdateListener consumerUpdateListener, int initialCredits, int additionalCredits) { - + if (Utils.filteringEnabled(subscriptionProperties) && !environment.filteringSupported()) { + throw new IllegalArgumentException("Filtering is not supported by the broker"); + } this.id = ID_SEQUENCE.getAndIncrement(); Runnable trackingClosingCallback; try { diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamConsumerBuilder.java b/src/main/java/com/rabbitmq/stream/impl/StreamConsumerBuilder.java index 7d829e1f7f..0cefa2aa0f 100644 --- a/src/main/java/com/rabbitmq/stream/impl/StreamConsumerBuilder.java +++ b/src/main/java/com/rabbitmq/stream/impl/StreamConsumerBuilder.java @@ -13,24 +13,27 @@ // info@rabbitmq.com. package com.rabbitmq.stream.impl; -import com.rabbitmq.stream.Consumer; -import com.rabbitmq.stream.ConsumerBuilder; -import com.rabbitmq.stream.ConsumerUpdateListener; -import com.rabbitmq.stream.MessageHandler; -import com.rabbitmq.stream.OffsetSpecification; -import com.rabbitmq.stream.StreamException; -import com.rabbitmq.stream.SubscriptionListener; +import static com.rabbitmq.stream.impl.Utils.SUBSCRIPTION_PROPERTY_FILTER_PREFIX; +import static com.rabbitmq.stream.impl.Utils.SUBSCRIPTION_PROPERTY_MATCH_UNFILTERED; + +import com.rabbitmq.stream.*; import java.lang.reflect.Field; import java.lang.reflect.Modifier; import java.time.Duration; +import java.util.Arrays; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Predicate; class StreamConsumerBuilder implements ConsumerBuilder { private static final int NAME_MAX_SIZE = 256; // server-side limitation + private static final TrackingConfiguration DISABLED_TRACKING_CONFIGURATION = + new TrackingConfiguration(false, false, -1, Duration.ZERO, Duration.ZERO); private final StreamEnvironment environment; - + private final Map subscriptionProperties = new ConcurrentHashMap<>(); private String stream, superStream; private OffsetSpecification offsetSpecification = null; private MessageHandler messageHandler; @@ -40,9 +43,9 @@ class StreamConsumerBuilder implements ConsumerBuilder { private boolean noTrackingStrategy = false; private boolean lazyInit = false; private SubscriptionListener subscriptionListener = subscriptionContext -> {}; - private final Map subscriptionProperties = new ConcurrentHashMap<>(); - private ConsumerUpdateListener consumerUpdateListener; private final DefaultFlowConfiguration flowConfiguration = new DefaultFlowConfiguration(this); + private ConsumerUpdateListener consumerUpdateListener; + private DefaultFilterConfiguration filterConfiguration; public StreamConsumerBuilder(StreamEnvironment environment) { this.environment = environment; @@ -88,7 +91,7 @@ public ConsumerBuilder name(String name) { @Override public ConsumerBuilder singleActiveConsumer() { - this.subscriptionProperties.put("single-active-consumer", "true"); + this.subscriptionProperties.put(Utils.SUBSCRIPTION_PROPERTY_SAC, "true"); return this; } @@ -141,6 +144,14 @@ StreamConsumerBuilder lazyInit(boolean lazyInit) { return this; } + @Override + public FilterConfiguration filter() { + if (this.filterConfiguration == null) { + this.filterConfiguration = new DefaultFilterConfiguration(this); + } + return this.filterConfiguration; + } + @Override public Consumer build() { if (this.stream == null && this.superStream == null) { @@ -185,13 +196,36 @@ public Consumer build() { trackingConfiguration = DISABLED_TRACKING_CONFIGURATION; } + MessageHandler handler; + if (this.filterConfiguration == null) { + handler = this.messageHandler; + } else { + this.filterConfiguration.validate(); + AtomicInteger i = new AtomicInteger(0); + this.filterConfiguration.filterValues.forEach( + v -> + this.subscriptionProperties.put( + SUBSCRIPTION_PROPERTY_FILTER_PREFIX + i.getAndIncrement(), v)); + this.subscriptionProperties.put( + SUBSCRIPTION_PROPERTY_MATCH_UNFILTERED, + this.filterConfiguration.matchUnfiltered ? "true" : "false"); + final Predicate filter = this.filterConfiguration.filter; + final MessageHandler delegate = this.messageHandler; + handler = + (context, message) -> { + if (filter.test(message)) { + delegate.handle(context, message); + } + }; + } + Consumer consumer; if (this.stream != null) { consumer = new StreamConsumer( this.stream, this.offsetSpecification, - this.messageHandler, + handler, this.name, this.environment, trackingConfiguration, @@ -204,7 +238,7 @@ public Consumer build() { environment.addConsumer((StreamConsumer) consumer); } else { if (Utils.isSac(this.subscriptionProperties)) { - this.subscriptionProperties.put("super-stream", this.superStream); + this.subscriptionProperties.put(Utils.SUBSCRIPTION_PROPERTY_SUPER_STREAM, this.superStream); } consumer = new SuperStreamConsumer(this, this.superStream, this.environment, trackingConfiguration); @@ -212,8 +246,21 @@ public Consumer build() { return consumer; } - private static final TrackingConfiguration DISABLED_TRACKING_CONFIGURATION = - new TrackingConfiguration(false, false, -1, Duration.ZERO, Duration.ZERO); + StreamConsumerBuilder duplicate() { + StreamConsumerBuilder duplicate = new StreamConsumerBuilder(this.environment); + for (Field field : StreamConsumerBuilder.class.getDeclaredFields()) { + if (Modifier.isStatic(field.getModifiers())) { + continue; + } + field.setAccessible(true); + try { + field.set(duplicate, field.get(this)); + } catch (IllegalAccessException e) { + throw new StreamException("Error while duplicating stream producer builder", e); + } + } + return duplicate; + } static class TrackingConfiguration { @@ -321,20 +368,54 @@ public ConsumerBuilder builder() { } } - StreamConsumerBuilder duplicate() { - StreamConsumerBuilder duplicate = new StreamConsumerBuilder(this.environment); - for (Field field : StreamConsumerBuilder.class.getDeclaredFields()) { - if (Modifier.isStatic(field.getModifiers())) { - continue; + private static final class DefaultFilterConfiguration implements FilterConfiguration { + + private final StreamConsumerBuilder builder; + private List filterValues; + private Predicate filter; + private boolean matchUnfiltered = false; + + private DefaultFilterConfiguration(StreamConsumerBuilder builder) { + this.builder = builder; + } + + @Override + public FilterConfiguration values(String... filterValues) { + if (filterValues == null || filterValues.length == 0) { + throw new IllegalArgumentException("At least one filter value must be specified"); } - field.setAccessible(true); - try { - field.set(duplicate, field.get(this)); - } catch (IllegalAccessException e) { - throw new StreamException("Error while duplicating stream producer builder", e); + this.filterValues = Arrays.asList(filterValues); + return this; + } + + @Override + public FilterConfiguration postFilter(Predicate filter) { + this.filter = filter; + return this; + } + + @Override + public FilterConfiguration matchUnfiltered() { + this.matchUnfiltered = true; + return this; + } + + @Override + public FilterConfiguration matchUnfiltered(boolean matchUnfiltered) { + this.matchUnfiltered = matchUnfiltered; + return this; + } + + @Override + public ConsumerBuilder builder() { + return this.builder; + } + + private void validate() { + if (this.filterValues == null || this.filter == null) { + throw new IllegalArgumentException("Both filter values and the filter logic must be set"); } } - return duplicate; } private static class DefaultFlowConfiguration implements FlowConfiguration { diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java b/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java index d40f070b56..c4ef75a190 100644 --- a/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java +++ b/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java @@ -736,6 +736,10 @@ static T locatorOperation( return result; } + boolean filteringSupported() { + return this.locatorOperation(Client::filteringSupported); + } + Clock clock() { return this.clock; } diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamProducer.java b/src/main/java/com/rabbitmq/stream/impl/StreamProducer.java index 49919164eb..84ff8a6296 100644 --- a/src/main/java/com/rabbitmq/stream/impl/StreamProducer.java +++ b/src/main/java/com/rabbitmq/stream/impl/StreamProducer.java @@ -13,9 +13,7 @@ // info@rabbitmq.com. package com.rabbitmq.stream.impl; -import static com.rabbitmq.stream.Constants.CODE_MESSAGE_ENQUEUEING_FAILED; -import static com.rabbitmq.stream.Constants.CODE_PRODUCER_CLOSED; -import static com.rabbitmq.stream.Constants.CODE_PRODUCER_NOT_AVAILABLE; +import static com.rabbitmq.stream.Constants.*; import static com.rabbitmq.stream.impl.Utils.formatConstant; import static com.rabbitmq.stream.impl.Utils.namedRunnable; @@ -31,6 +29,7 @@ import com.rabbitmq.stream.impl.Client.Response; import com.rabbitmq.stream.impl.MessageAccumulator.AccumulatedEntity; import io.netty.buffer.ByteBuf; +import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.ArrayList; import java.util.Iterator; @@ -48,6 +47,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; import java.util.function.ToLongFunction; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -80,6 +80,7 @@ class StreamProducer implements Producer { private volatile byte publisherId; private volatile Status status; private volatile ScheduledFuture confirmTimeoutFuture; + private final short publishVersion; StreamProducer( String name, @@ -91,7 +92,11 @@ class StreamProducer implements Producer { int maxUnconfirmedMessages, Duration confirmTimeout, Duration enqueueTimeout, + Function filterValueExtractor, StreamEnvironment environment) { + if (filterValueExtractor != null && !environment.filteringSupported()) { + throw new IllegalArgumentException("Filtering is not supported by the broker"); + } this.id = ID_SEQUENCE.getAndIncrement(); this.environment = environment; this.name = name; @@ -116,8 +121,13 @@ class StreamProducer implements Producer { environment.codec(), client.maxFrameSize(), accumulatorPublishSequenceFunction, + filterValueExtractor, this.environment.clock()); - delegateWriteCallback = Client.OUTBOUND_MESSAGE_WRITE_CALLBACK; + if (filterValueExtractor == null) { + delegateWriteCallback = Client.OUTBOUND_MESSAGE_WRITE_CALLBACK; + } else { + delegateWriteCallback = OUTBOUND_MSG_FILTER_VALUE_WRITE_CALLBACK; + } } else { this.accumulator = new SubEntryMessageAccumulator( @@ -138,22 +148,44 @@ class StreamProducer implements Producer { this.unconfirmedMessagesSemaphore = new Semaphore(maxUnconfirmedMessages, true); this.unconfirmedMessages = new ConcurrentHashMap<>(this.maxUnconfirmedMessages, 0.75f, 2); - this.writeCallback = - new Client.OutboundEntityWriteCallback() { - @Override - public int write(ByteBuf bb, Object entity, long publishingId) { - MessageAccumulator.AccumulatedEntity accumulatedEntity = - (MessageAccumulator.AccumulatedEntity) entity; - unconfirmedMessages.put(publishingId, accumulatedEntity); - return delegateWriteCallback.write(bb, accumulatedEntity.encodedEntity(), publishingId); - } + if (filterValueExtractor == null) { + this.publishVersion = VERSION_1; + this.writeCallback = + new Client.OutboundEntityWriteCallback() { + @Override + public int write(ByteBuf bb, Object entity, long publishingId) { + MessageAccumulator.AccumulatedEntity accumulatedEntity = + (MessageAccumulator.AccumulatedEntity) entity; + unconfirmedMessages.put(publishingId, accumulatedEntity); + return delegateWriteCallback.write( + bb, accumulatedEntity.encodedEntity(), publishingId); + } + + @Override + public int fragmentLength(Object entity) { + return delegateWriteCallback.fragmentLength( + ((MessageAccumulator.AccumulatedEntity) entity).encodedEntity()); + } + }; + } else { + this.publishVersion = VERSION_2; + this.writeCallback = + new Client.OutboundEntityWriteCallback() { + @Override + public int write(ByteBuf bb, Object entity, long publishingId) { + MessageAccumulator.AccumulatedEntity accumulatedEntity = + (MessageAccumulator.AccumulatedEntity) entity; + unconfirmedMessages.put(publishingId, accumulatedEntity); + return delegateWriteCallback.write(bb, accumulatedEntity, publishingId); + } + + @Override + public int fragmentLength(Object entity) { + return delegateWriteCallback.fragmentLength(entity); + } + }; + } - @Override - public int fragmentLength(Object entity) { - return delegateWriteCallback.fragmentLength( - ((MessageAccumulator.AccumulatedEntity) entity).encodedEntity()); - } - }; if (!batchPublishingDelay.isNegative() && !batchPublishingDelay.isZero()) { AtomicReference taskReference = new AtomicReference<>(); Runnable task = @@ -445,7 +477,11 @@ private void publishBatch(boolean stateCheck) { batchCount++; } client.publishInternal( - this.publisherId, messages, this.writeCallback, this.publishSequenceFunction); + this.publishVersion, + this.publisherId, + messages, + this.writeCallback, + this.publishSequenceFunction); } } @@ -480,7 +516,11 @@ void running() { batchCount++; } client.publishInternal( - this.publisherId, messages, this.writeCallback, this.publishSequenceFunction); + this.publishVersion, + this.publisherId, + messages, + this.writeCallback, + this.publishSequenceFunction); } } publishBatch(false); @@ -558,4 +598,40 @@ private void checkNotClosed() { throw new IllegalStateException("This producer instance has been closed"); } } + + private static final Client.OutboundEntityWriteCallback OUTBOUND_MSG_FILTER_VALUE_WRITE_CALLBACK = + new OutboundMessageFilterValueWriterCallback(); + + private static final class OutboundMessageFilterValueWriterCallback + implements Client.OutboundEntityWriteCallback { + + @Override + public int write(ByteBuf bb, Object entity, long publishingId) { + AccumulatedEntity accumulatedEntity = (AccumulatedEntity) entity; + String filterValue = accumulatedEntity.filterValue(); + if (filterValue == null) { + bb.writeShort(-1); + } else { + bb.writeShort(filterValue.length()); + bb.writeBytes(filterValue.getBytes(StandardCharsets.UTF_8)); + } + Codec.EncodedMessage messageToPublish = + (Codec.EncodedMessage) accumulatedEntity.encodedEntity(); + bb.writeInt(messageToPublish.getSize()); + bb.writeBytes(messageToPublish.getData(), 0, messageToPublish.getSize()); + return 1; + } + + @Override + public int fragmentLength(Object entity) { + AccumulatedEntity accumulatedEntity = (AccumulatedEntity) entity; + Codec.EncodedMessage message = (Codec.EncodedMessage) accumulatedEntity.encodedEntity(); + String filterValue = accumulatedEntity.filterValue(); + if (filterValue == null) { + return 8 + 2 + 4 + message.getSize(); + } else { + return 8 + 2 + accumulatedEntity.filterValue().length() + 4 + message.getSize(); + } + } + } } diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamProducerBuilder.java b/src/main/java/com/rabbitmq/stream/impl/StreamProducerBuilder.java index 074a544a7d..431f77ecee 100644 --- a/src/main/java/com/rabbitmq/stream/impl/StreamProducerBuilder.java +++ b/src/main/java/com/rabbitmq/stream/impl/StreamProducerBuilder.java @@ -48,6 +48,8 @@ class StreamProducerBuilder implements ProducerBuilder { private DefaultRoutingConfiguration routingConfiguration; + private Function filterValueExtractor; + StreamProducerBuilder(StreamEnvironment environment) { this.environment = environment; } @@ -128,6 +130,12 @@ public ProducerBuilder enqueueTimeout(Duration timeout) { return this; } + @Override + public ProducerBuilder filterValue(Function filterValueExtractor) { + this.filterValueExtractor = filterValueExtractor; + return this; + } + @Override public RoutingConfiguration routing(Function routingKeyExtractor) { this.routingConfiguration = new DefaultRoutingConfiguration(this); @@ -150,6 +158,9 @@ public Producer build() { throw new IllegalArgumentException( "Sub-entry batching must be enabled to enable compression"); } + if (subEntrySize > 1 && filterValueExtractor != null) { + throw new IllegalArgumentException("Filtering is not supported with sub-entry batching"); + } if (subEntrySize > 1 && compression == null) { compression = Compression.NONE; } @@ -183,6 +194,7 @@ public Producer build() { maxUnconfirmedMessages, confirmTimeout, enqueueTimeout, + filterValueExtractor, environment); this.environment.addProducer((StreamProducer) producer); } else { diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamStreamCreator.java b/src/main/java/com/rabbitmq/stream/impl/StreamStreamCreator.java index 0b7f0fd559..ed48565ec1 100644 --- a/src/main/java/com/rabbitmq/stream/impl/StreamStreamCreator.java +++ b/src/main/java/com/rabbitmq/stream/impl/StreamStreamCreator.java @@ -67,6 +67,12 @@ public StreamCreator leaderLocator(LeaderLocator leaderLocator) { return this; } + @Override + public StreamCreator filterSize(int size) { + streamParametersBuilder.filterSize(size); + return this; + } + @Override public void create() { if (stream == null) { diff --git a/src/main/java/com/rabbitmq/stream/impl/SubEntryMessageAccumulator.java b/src/main/java/com/rabbitmq/stream/impl/SubEntryMessageAccumulator.java index b2045c8df7..9533f2a922 100644 --- a/src/main/java/com/rabbitmq/stream/impl/SubEntryMessageAccumulator.java +++ b/src/main/java/com/rabbitmq/stream/impl/SubEntryMessageAccumulator.java @@ -40,7 +40,7 @@ public SubEntryMessageAccumulator( int maxFrameSize, ToLongFunction publishSequenceFunction, Clock clock) { - super(subEntrySize * batchSize, codec, maxFrameSize, publishSequenceFunction, clock); + super(subEntrySize * batchSize, codec, maxFrameSize, publishSequenceFunction, null, clock); this.subEntrySize = subEntrySize; this.compressionCodec = compressionCodec; this.compression = compressionCodec == null ? Compression.NONE.code() : compressionCodec.code(); @@ -111,6 +111,11 @@ public long publishindId() { return publishingId; } + @Override + public String filterValue() { + return null; + } + @Override public Object encodedEntity() { return encodedMessageBatch; diff --git a/src/main/java/com/rabbitmq/stream/impl/Utils.java b/src/main/java/com/rabbitmq/stream/impl/Utils.java index c5f258ab37..a21b23e413 100644 --- a/src/main/java/com/rabbitmq/stream/impl/Utils.java +++ b/src/main/java/com/rabbitmq/stream/impl/Utils.java @@ -65,6 +65,11 @@ final class Utils { private static final Logger LOGGER = LoggerFactory.getLogger(Utils.class); private static final Map CONSTANT_LABELS; + static final String SUBSCRIPTION_PROPERTY_SAC = "single-active-consumer"; + static final String SUBSCRIPTION_PROPERTY_SUPER_STREAM = "super-stream"; + static final String SUBSCRIPTION_PROPERTY_FILTER_PREFIX = "filter."; + static final String SUBSCRIPTION_PROPERTY_MATCH_UNFILTERED = "match-unfiltered"; + static { Map labels = new HashMap<>(); Arrays.stream(Constants.class.getDeclaredFields()) @@ -119,6 +124,15 @@ static boolean isSac(Map properties) { } } + static boolean filteringEnabled(Map properties) { + if (properties == null || properties.isEmpty()) { + return false; + } else { + return properties.keySet().stream() + .anyMatch(k -> k.startsWith(SUBSCRIPTION_PROPERTY_FILTER_PREFIX)); + } + } + static short encodeRequestCode(Short code) { return code; } diff --git a/src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java b/src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java index 73136cb0fd..cd4a1fa82a 100644 --- a/src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java +++ b/src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java @@ -122,6 +122,7 @@ public class StreamPerfTest implements Callable { @CommandLine.Mixin private final CommandLine.HelpCommand helpCommand = new CommandLine.HelpCommand(); + // for testing private final AddressResolver addressResolver; private final PrintWriter err, out; @@ -430,6 +431,18 @@ public class StreamPerfTest implements Callable { @ArgGroup(exclusive = false, multiplicity = "0..1") InstanceSyncOptions instanceSyncOptions; + @CommandLine.Option( + names = {"--filter-value-set", "-fvs"}, + description = "filter value set for publishers, range (e.g. 1..15) are accepted", + converter = Utils.FilterValueSetConverter.class) + private List filterValueSet; + + @CommandLine.Option( + names = {"--filter-values", "-fv"}, + description = "filter values for consumers", + split = ",") + private List filterValues; + static class InstanceSyncOptions { @CommandLine.Option( @@ -476,6 +489,7 @@ static class InstanceSyncOptions { private List monitorings; private volatile Environment environment; private volatile EventLoopGroup eventLoopGroup; + // constructor for completion script generation public StreamPerfTest() { this(null, null, null, null); @@ -586,7 +600,6 @@ public Integer call() throws Exception { maybeDisplayVersion(); maybeDisplayEnvironmentVariablesHelp(); overridePropertiesWithEnvironmentVariables(); - Codec codec = createCodec(this.codecClass); ByteBufAllocator byteBufAllocator = ByteBufAllocator.DEFAULT; @@ -873,19 +886,43 @@ public Integer call() throws Exception { producerBuilder.name(producerName).confirmTimeout(Duration.ZERO); } - java.util.function.Consumer messageBuilderConsumer; + java.util.function.Consumer messageBuilderConsumerTemp; if (this.superStreams) { producerBuilder .superStream(stream) .routing(msg -> msg.getProperties().getMessageIdAsString()); AtomicLong messageIdSequence = new AtomicLong(0); - messageBuilderConsumer = + messageBuilderConsumerTemp = mg -> mg.properties().messageId(messageIdSequence.getAndIncrement()); } else { - messageBuilderConsumer = mg -> {}; + messageBuilderConsumerTemp = mg -> {}; producerBuilder.stream(stream); } + if (this.filterValueSet != null && this.filterValueSet.size() > 0) { + producerBuilder = + producerBuilder.filterValue(msg -> msg.getProperties().getTo()); + List values = new ArrayList<>(this.filterValueSet); + AtomicInteger count = new AtomicInteger(); + int subSetSize = Utils.filteringSubSetSize(values.size()); + int messageCountCycle = Utils.filteringPublishingCycle(this.rate); + List subSet = new ArrayList<>(subSetSize); + java.util.function.Consumer filteringMessageBuilderConsumer = + b -> { + if (Integer.remainderUnsigned( + count.getAndIncrement(), messageCountCycle) + == 0) { + Collections.shuffle(values); + subSet.clear(); + subSet.addAll(values.subList(0, subSetSize)); + } + b.properties() + .to(subSet.get(Integer.remainderUnsigned(count.get(), subSetSize))); + }; + messageBuilderConsumerTemp = + messageBuilderConsumerTemp.andThen(filteringMessageBuilderConsumer); + } + Producer producer = producerBuilder .subEntrySize(this.subEntrySize) @@ -895,9 +932,9 @@ public Integer call() throws Exception { .maxUnconfirmedMessages(this.confirms) .build(); - AtomicLong messageCount = new AtomicLong(0); ConfirmationHandler confirmationHandler; if (this.confirmLatency) { + AtomicLong messageCount = new AtomicLong(0); final PerformanceMetrics metrics = this.performanceMetrics; final int divisor = Utils.downSamplingDivisor(this.rate); confirmationHandler = @@ -933,6 +970,8 @@ public Integer call() throws Exception { producers.add(producer); + java.util.function.Consumer messageBuilderConsumer = + messageBuilderConsumerTemp; return (Runnable) () -> { final int msgSize = this.messageSize; @@ -1048,6 +1087,8 @@ public Integer call() throws Exception { } }); + consumerBuilder = maybeConfigureForFiltering(consumerBuilder); + Consumer consumer = consumerBuilder.build(); return consumer; }) @@ -1124,6 +1165,37 @@ public Integer call() throws Exception { return 0; } + private ConsumerBuilder maybeConfigureForFiltering(ConsumerBuilder consumerBuilder) { + if (this.filterValues != null && this.filterValues.size() > 0) { + consumerBuilder = + consumerBuilder.filter().values(this.filterValues.toArray(new String[0])).builder(); + + if (this.filterValues.size() == 1) { + String filterValue = filterValues.get(0); + consumerBuilder = + consumerBuilder + .filter() + .postFilter(msg -> filterValue.equals(msg.getProperties().getTo())) + .builder(); + } else { + consumerBuilder = + consumerBuilder + .filter() + .postFilter( + msg -> { + for (String filterValue : this.filterValues) { + if (filterValue.equals(msg.getProperties().getTo())) { + return true; + } + } + return false; + }) + .builder(); + } + } + return consumerBuilder; + } + private void createStream(Environment environment, String stream) { StreamCreator streamCreator = environment.streamCreator().stream(stream) diff --git a/src/main/java/com/rabbitmq/stream/perf/Utils.java b/src/main/java/com/rabbitmq/stream/perf/Utils.java index f123134cac..1d38ab3cee 100644 --- a/src/main/java/com/rabbitmq/stream/perf/Utils.java +++ b/src/main/java/com/rabbitmq/stream/perf/Utils.java @@ -473,6 +473,33 @@ public BiFunction convert(String input) { } } + static class FilterValueSetConverter implements ITypeConverter> { + + @Override + public List convert(String value) { + if (value == null || value.trim().isEmpty()) { + return Collections.emptyList(); + } + if (value.contains("..")) { + String[] range = value.split("\\.\\."); + String errorMessage = "'" + value + "' is not valid, valid example values: 1..10, 1..20"; + if (range.length != 2) { + throw new CommandLine.TypeConversionException(errorMessage); + } + int start, end; + try { + start = Integer.parseInt(range[0]); + end = Integer.parseInt(range[1]) + 1; + return IntStream.range(start, end).mapToObj(String::valueOf).collect(Collectors.toList()); + } catch (NumberFormatException e) { + throw new CommandLine.TypeConversionException(errorMessage); + } + } else { + return Arrays.stream(value.split(",")).collect(Collectors.toList()); + } + } + } + static class SniServerNamesConverter implements ITypeConverter> { @Override @@ -859,4 +886,24 @@ static InstanceSynchronization defaultInstanceSynchronization( throw new RuntimeException(e); } } + + static int filteringPublishingCycle(int rate) { + if (rate == 0) { + return 100_000; + } else if (rate <= 10) { + return 10; + } else { + return rate / 10; + } + } + + static int filteringSubSetSize(int setSize) { + if (setSize <= 3) { + return 1; + } else if (setSize > 10) { + return (int) (setSize * 0.70); + } else { + return setSize - 3; + } + } } diff --git a/src/test/java/com/rabbitmq/stream/benchmark/FilteringBenchmark.java b/src/test/java/com/rabbitmq/stream/benchmark/FilteringBenchmark.java new file mode 100644 index 0000000000..f12b9c64dc --- /dev/null +++ b/src/test/java/com/rabbitmq/stream/benchmark/FilteringBenchmark.java @@ -0,0 +1,286 @@ +// Copyright (c) 2023 VMware, Inc. or its affiliates. All rights reserved. +// +// This software, the RabbitMQ Stream Java client library, is dual-licensed under the +// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL"). +// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL, +// please see LICENSE-APACHE2. +// +// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, +// either express or implied. See the LICENSE file for specific language governing +// rights and limitations of this software. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. +package com.rabbitmq.stream.benchmark; + +import static com.rabbitmq.stream.impl.TestUtils.waitAtMost; + +import com.codahale.metrics.MetricRegistry; +import com.google.common.util.concurrent.RateLimiter; +import com.rabbitmq.stream.*; +import com.rabbitmq.stream.metrics.DropwizardMetricsCollector; +import com.rabbitmq.stream.metrics.MetricsCollector; +import java.time.Duration; +import java.util.*; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.IntStream; + +public class FilteringBenchmark { + + static final String stream = "filtering"; + + public static void main(String[] args) throws Exception { + int filterValueCount = 200; + int filterValueSubsetCount = 80; + int rate = 100_000; + int filterSize = 64; + int batchSize = 100; + int maxUnconfirmedMessages = 10_000; + + Duration publishingDuration = Duration.ofSeconds(10); + Duration publishingCycle = Duration.ofSeconds(1); + + ScheduledExecutorService scheduledExecutorService = + Executors.newSingleThreadScheduledExecutor(); + try (Environment env = Environment.builder().build()) { + try { + env.deleteStream(stream); + } catch (StreamException e) { + // OK + } + env.streamCreator().stream(stream).filterSize(filterSize).create(); + + List filterValues = new ArrayList<>(filterValueCount); + IntStream.range(0, filterValueCount) + .forEach(i -> filterValues.add(UUID.randomUUID().toString())); + + AtomicLong publishedCount = new AtomicLong(0); + AtomicLong confirmedCount = new AtomicLong(0); + + Producer producer = + env.producerBuilder().stream(stream) + .batchSize(batchSize) + .maxUnconfirmedMessages(maxUnconfirmedMessages) + .filterValue(msg -> msg.getProperties().getTo()) + .build(); + + AtomicBoolean keepPublishing = new AtomicBoolean(true); + scheduledExecutorService.schedule( + () -> keepPublishing.set(false), publishingDuration.toMillis(), TimeUnit.MILLISECONDS); + + RateLimiter rateLimiter = RateLimiter.create(rate); + + Random random = new Random(); + ConfirmationHandler confirmationHandler = status -> confirmedCount.getAndIncrement(); + System.out.printf( + "Starting test, filter values %s, subset %s, filter size %d%n", + filterValueCount, filterValueSubsetCount, filterSize); + System.out.printf( + "Starting publishing for %d second(s) at rate %d, batch size %d, max unconfirmed messages %d...%n", + publishingDuration.getSeconds(), rate, batchSize, maxUnconfirmedMessages); + while (keepPublishing.get()) { + AtomicBoolean keepPublishingInCycle = new AtomicBoolean(true); + scheduledExecutorService.schedule( + () -> keepPublishingInCycle.set(false), + publishingCycle.toMillis(), + TimeUnit.MILLISECONDS); + Collections.shuffle(filterValues); + List filterValueSubset = filterValues.subList(0, filterValueSubsetCount); + System.out.printf( + "Starting publishing cycle for %d second(s)...%n", publishingCycle.getSeconds()); + while (keepPublishingInCycle.get()) { + rateLimiter.acquire(1); + String filterValue = filterValueSubset.get(random.nextInt(filterValueSubsetCount)); + producer.send( + producer.messageBuilder().properties().to(filterValue).messageBuilder().build(), + confirmationHandler); + publishedCount.getAndIncrement(); + } + } + System.out.println("Done publishing, waiting for all confirmations..."); + waitAtMost(() -> publishedCount.get() == confirmedCount.get()); + + System.out.println("Starting consuming..."); + + List values = filterValues.subList(0, 10); + for (String filterValue : values) { + Duration timeout = Duration.ofSeconds(30); + System.out.printf("For filter value %s%n", filterValue); + MetricRegistry registry = new MetricRegistry(); + MetricsCollector collector = new DropwizardMetricsCollector(registry); + AtomicLong unfilteredTargetMessageCount = new AtomicLong(0); + Duration unfilteredDuration; + try (Environment e = Environment.builder().metricsCollector(collector).build()) { + AtomicBoolean hasReceivedSomething = new AtomicBoolean(false); + AtomicLong lastReceived = new AtomicLong(0); + long s = System.nanoTime(); + e.consumerBuilder().stream(stream) + .offset(OffsetSpecification.first()) + .messageHandler( + (ctx, msg) -> { + hasReceivedSomething.set(true); + lastReceived.set(System.nanoTime()); + if (filterValue.equals(msg.getProperties().getTo())) { + unfilteredTargetMessageCount.getAndIncrement(); + } + }) + .build(); + waitAtMost( + timeout, + () -> + hasReceivedSomething.get() + && System.nanoTime() - lastReceived.get() > Duration.ofSeconds(1).toNanos()); + unfilteredDuration = Duration.ofNanos(System.nanoTime() - s); + } + + long unfilteredChunkCount = registry.getMeters().get("rabbitmq.stream.chunk").getCount(); + long unfilteredMessageCount = + registry.getMeters().get("rabbitmq.stream.consumed").getCount(); + + AtomicInteger chunkFilteredMessages = new AtomicInteger(0); + AtomicInteger chunkMessageCount = new AtomicInteger(0); + AtomicInteger chunkWithNoMessagesCount = new AtomicInteger(0); + AtomicBoolean firstChunk = new AtomicBoolean(true); + AtomicLong droppedMessages = new AtomicLong(0); + registry = new MetricRegistry(); + collector = new DropwizardMetricsCollector(registry); + collector = + new DelegatingMetricsCollector(collector) { + + @Override + public void chunk(int entriesCount) { + if (firstChunk.get()) { + firstChunk.set(false); + } else { + if (chunkMessageCount.get() == chunkFilteredMessages.get()) { + chunkWithNoMessagesCount.incrementAndGet(); + } + chunkFilteredMessages.set(0); + chunkMessageCount.set(entriesCount); + } + super.chunk(entriesCount); + } + }; + AtomicLong filteredTargetMessageCount = new AtomicLong(0); + Duration filteredDuration; + try (Environment e = Environment.builder().metricsCollector(collector).build()) { + AtomicBoolean hasReceivedSomething = new AtomicBoolean(false); + AtomicLong lastReceived = new AtomicLong(0); + long s = System.nanoTime(); + AtomicLong chunkId = new AtomicLong(-1); + e.consumerBuilder().stream(stream) + .offset(OffsetSpecification.first()) + .filter() + .values(filterValue) + .postFilter( + msg -> { + boolean shouldPass = filterValue.equals(msg.getProperties().getTo()); + if (!shouldPass) { + droppedMessages.getAndIncrement(); + chunkFilteredMessages.getAndIncrement(); + } + return shouldPass; + }) + .builder() + .messageHandler( + (ctx, msg) -> { + if (chunkId.get() == -1 || chunkId.get() != ctx.committedChunkId()) {} + + hasReceivedSomething.set(true); + lastReceived.set(System.nanoTime()); + filteredTargetMessageCount.getAndIncrement(); + }) + .build(); + waitAtMost( + timeout, + () -> + hasReceivedSomething.get() + && System.nanoTime() - lastReceived.get() > Duration.ofSeconds(1).toNanos()); + filteredDuration = Duration.ofNanos(System.nanoTime() - s); + } + long filteredChunkCount = registry.getMeters().get("rabbitmq.stream.chunk").getCount(); + long filteredMessageCount = registry.getMeters().get("rabbitmq.stream.consumed").getCount(); + System.out.printf( + "consumed in %d / %d ms, target messages %d / %d, chunk count %d / %d (%d %%), messages %d / %d (%d %%)%n", + unfilteredDuration.toMillis(), + filteredDuration.toMillis(), + unfilteredTargetMessageCount.get(), + filteredTargetMessageCount.get(), + unfilteredChunkCount, + filteredChunkCount, + (unfilteredChunkCount - filteredChunkCount) * 100 / unfilteredChunkCount, + unfilteredMessageCount, + filteredMessageCount, + (unfilteredMessageCount - filteredMessageCount) * 100 / unfilteredMessageCount); + System.out.printf( + "chunk without matching messages %d / %d, dropped messages %d / %d%n", + chunkWithNoMessagesCount.get(), + filteredChunkCount, + droppedMessages.getAndIncrement(), + filteredMessageCount); + } + + } finally { + scheduledExecutorService.shutdownNow(); + } + } + + private static class DelegatingMetricsCollector implements MetricsCollector { + + private final MetricsCollector delegate; + + private DelegatingMetricsCollector(MetricsCollector delegate) { + this.delegate = delegate; + } + + @Override + public void openConnection() { + this.delegate.openConnection(); + } + + @Override + public void closeConnection() { + this.delegate.closeConnection(); + } + + @Override + public void publish(int count) { + this.delegate.publish(count); + } + + @Override + public void publishConfirm(int count) { + this.delegate.publishConfirm(count); + } + + @Override + public void publishError(int count) { + this.delegate.publishError(count); + } + + @Override + public void chunk(int entriesCount) { + this.delegate.chunk(entriesCount); + } + + @Override + public void consume(long count) { + this.delegate.consume(count); + } + + @Override + public void writtenBytes(int writtenBytes) { + this.delegate.writtenBytes(writtenBytes); + } + + @Override + public void readBytes(int readBytes) { + this.delegate.readBytes(readBytes); + } + } +} diff --git a/src/test/java/com/rabbitmq/stream/docs/FilteringUsage.java b/src/test/java/com/rabbitmq/stream/docs/FilteringUsage.java new file mode 100644 index 0000000000..cd123f77db --- /dev/null +++ b/src/test/java/com/rabbitmq/stream/docs/FilteringUsage.java @@ -0,0 +1,69 @@ +// Copyright (c) 2023 VMware, Inc. or its affiliates. All rights reserved. +// +// This software, the RabbitMQ Stream Java client library, is dual-licensed under the +// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL"). +// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL, +// please see LICENSE-APACHE2. +// +// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, +// either express or implied. See the LICENSE file for specific language governing +// rights and limitations of this software. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. + +package com.rabbitmq.stream.docs; + +import com.rabbitmq.stream.Consumer; +import com.rabbitmq.stream.Environment; +import com.rabbitmq.stream.Producer; + +public class FilteringUsage { + + void producerSimple() { + Environment environment = Environment.builder().build(); + // tag::producer-simple[] + Producer producer = environment.producerBuilder() + .stream("invoices") + .filterValue(msg -> + msg.getApplicationProperties().get("state").toString()) // <1> + .build(); + // end::producer-simple[] + } + + void consumerSimple() { + Environment environment = Environment.builder().build(); + // tag::consumer-simple[] + String filterValue = "california"; + Consumer consumer = environment.consumerBuilder() + .stream("invoices") + .filter() + .values(filterValue) // <1> + .postFilter(msg -> + filterValue.equals(msg.getApplicationProperties().get("state"))) // <2> + .builder() + .messageHandler((ctx, msg) -> { }) + .build(); + // end::consumer-simple[] + } + + void consumerMatchUnfiltered() { + Environment environment = Environment.builder().build(); + // tag::consumer-match-unfiltered[] + String filterValue = "california"; + Consumer consumer = environment.consumerBuilder() + .stream("invoices") + .filter() + .values(filterValue) // <1> + .matchUnfiltered() // <2> + .postFilter(msg -> + filterValue.equals(msg.getApplicationProperties().get("state")) + || !msg.getApplicationProperties().containsKey("state") // <3> + ) + .builder() + .messageHandler((ctx, msg) -> { }) + .build(); + // end::consumer-match-unfiltered[] + } + +} diff --git a/src/test/java/com/rabbitmq/stream/impl/Amqp10InteroperabilityTest.java b/src/test/java/com/rabbitmq/stream/impl/Amqp10InteroperabilityTest.java index 166abb5f00..e60d105dbb 100644 --- a/src/test/java/com/rabbitmq/stream/impl/Amqp10InteroperabilityTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/Amqp10InteroperabilityTest.java @@ -14,7 +14,6 @@ package com.rabbitmq.stream.impl; import static com.rabbitmq.stream.impl.TestUtils.ClientFactory; -import static com.rabbitmq.stream.impl.TestUtils.StreamTestInfrastructureExtension; import static com.rabbitmq.stream.impl.TestUtils.latchAssert; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -27,12 +26,12 @@ import com.rabbitmq.stream.impl.Client.ClientParameters; import com.rabbitmq.stream.impl.TestUtils.BrokerVersion; import com.rabbitmq.stream.impl.TestUtils.BrokerVersionAtLeast; -import com.rabbitmq.stream.impl.TestUtils.DisabledIfAmqp10NotEnabled; import com.swiftmq.amqp.AMQPContext; import com.swiftmq.amqp.v100.client.Connection; import com.swiftmq.amqp.v100.client.Producer; import com.swiftmq.amqp.v100.client.QoS; import com.swiftmq.amqp.v100.client.Session; +import com.swiftmq.amqp.v100.generated.messaging.delivery_state.*; import com.swiftmq.amqp.v100.generated.messaging.message_format.AmqpSequence; import com.swiftmq.amqp.v100.generated.messaging.message_format.AmqpValue; import com.swiftmq.amqp.v100.generated.messaging.message_format.ApplicationProperties; @@ -61,8 +60,8 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; -@ExtendWith(StreamTestInfrastructureExtension.class) -@DisabledIfAmqp10NotEnabled +@ExtendWith(TestUtils.StreamTestInfrastructureExtension.class) +@TestUtils.DisabledIfAmqp10NotEnabled public class Amqp10InteroperabilityTest { String stream; diff --git a/src/test/java/com/rabbitmq/stream/impl/ClientTest.java b/src/test/java/com/rabbitmq/stream/impl/ClientTest.java index 22e80abd8a..588b5ac290 100644 --- a/src/test/java/com/rabbitmq/stream/impl/ClientTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/ClientTest.java @@ -13,8 +13,7 @@ // info@rabbitmq.com. package com.rabbitmq.stream.impl; -import static com.rabbitmq.stream.impl.TestUtils.ResponseConditions.ko; -import static com.rabbitmq.stream.impl.TestUtils.ResponseConditions.responseCode; +import static com.rabbitmq.stream.impl.TestUtils.ResponseConditions.*; import static com.rabbitmq.stream.impl.TestUtils.b; import static com.rabbitmq.stream.impl.TestUtils.latchAssert; import static com.rabbitmq.stream.impl.TestUtils.streamName; @@ -43,6 +42,7 @@ import com.rabbitmq.stream.impl.ServerFrameHandler.FrameHandlerInfo; import com.rabbitmq.stream.impl.TestUtils.BrokerVersion; import com.rabbitmq.stream.impl.TestUtils.BrokerVersionAtLeast; +import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.UnpooledByteBufAllocator; import java.io.ByteArrayOutputStream; @@ -52,14 +52,7 @@ import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.time.Duration; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Random; -import java.util.Set; -import java.util.UUID; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Semaphore; @@ -68,6 +61,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.function.LongConsumer; +import java.util.function.ToLongFunction; import java.util.stream.Collectors; import java.util.stream.IntStream; import org.junit.jupiter.api.Test; @@ -978,4 +972,132 @@ void streamStatsFirstOffsetShouldChangeAfterRetentionKickedIn(TestInfo info) { } } } + + @Test + void publishConsumeWithFilterValueShouldSendSubSetOfStream() throws Exception { + int messageCount = 10_000; + AtomicReference publishLatch = + new AtomicReference<>(new CountDownLatch(messageCount)); + Client publisher = + cf.get( + new ClientParameters() + .publishConfirmListener( + (publisherId, publishingId) -> publishLatch.get().countDown())); + AtomicLong sequence = new AtomicLong(0); + ToLongFunction sequenceFunction = msg -> sequence.getAndIncrement(); + Response response = publisher.declarePublisher(b(0), null, stream); + assertThat(response).is(ok()); + + // firt wave of messages with several filter values + List filterValues = new ArrayList<>(Arrays.asList("apple", "banana", "pear")); + Map filterValueCount = new HashMap<>(); + Random random = new Random(); + class Entity { + private final Codec.EncodedMessage encodedMessage; + private final String filterValue; + + Entity(Codec.EncodedMessage encodedMessage, String filterValue) { + this.encodedMessage = encodedMessage; + this.filterValue = filterValue; + } + } + + Client.OutboundEntityWriteCallback writeCallback = + new Client.OutboundEntityWriteCallback() { + @Override + public int write(ByteBuf bb, Object obj, long publishingId) { + Entity msg = (Entity) obj; + bb.writeShort(msg.filterValue.length()); + bb.writeBytes(msg.filterValue.getBytes(StandardCharsets.UTF_8)); + Codec.EncodedMessage messageToPublish = msg.encodedMessage; + bb.writeInt(messageToPublish.getSize()); + bb.writeBytes(messageToPublish.getData(), 0, messageToPublish.getSize()); + return 1; + } + + @Override + public int fragmentLength(Object obj) { + Entity msg = (Entity) obj; + return 8 + 2 + msg.filterValue.length() + 4 + msg.encodedMessage.getSize(); + } + }; + int batchSize = 100; + List messages = new ArrayList<>(batchSize); + Runnable write = + () -> + publisher.publishInternal( + Constants.VERSION_2, b(0), messages, writeCallback, sequenceFunction); + Runnable insert = + () -> { + byte[] data = "hello".getBytes(StandardCharsets.UTF_8); + int publishedMessageCount = 0; + while (publishedMessageCount < messageCount) { + String filterValue = filterValues.get(random.nextInt(filterValues.size())); + filterValueCount + .computeIfAbsent(filterValue, k -> new AtomicInteger()) + .incrementAndGet(); + Message message = + publisher + .codec() + .messageBuilder() + .addData(data) + .properties() + .groupId(filterValue) + .messageBuilder() + .build(); + Entity entity = new Entity(publisher.codec().encode(message), filterValue); + messages.add(entity); + publishedMessageCount++; + + if (messages.size() == batchSize) { + write.run(); + messages.clear(); + } + } + if (!messages.isEmpty()) { + write.run(); + } + }; + + insert.run(); + assertThat(latchAssert(publishLatch)).completes(); + + // second wave of messages, with only one, new filter value + String newFilterValue = "orange"; + filterValues.clear(); + filterValues.add(newFilterValue); + publishLatch.set(new CountDownLatch(messageCount)); + insert.run(); + assertThat(latchAssert(publishLatch)).completes(); + + AtomicInteger consumedMessageCount = new AtomicInteger(0); + AtomicInteger filteredConsumedMessageCount = new AtomicInteger(0); + Client consumer = + cf.get( + new ClientParameters() + .chunkListener( + (client, subscriptionId, offset, messageCount1, dataSize) -> + client.credit(subscriptionId, 1)) + .messageListener( + (subscriptionId, offset, chunkTimestamp, committedChunkId, message) -> { + consumedMessageCount.incrementAndGet(); + String filterValue = message.getProperties().getGroupId(); + if (newFilterValue.equals(filterValue)) { + filteredConsumedMessageCount.incrementAndGet(); + } + })); + + // consume only messages with filter value from second wave + consumer.subscribe( + b(0), + stream, + OffsetSpecification.first(), + 1, + Collections.singletonMap("filter.1", newFilterValue)); + + int expectedCount = filterValueCount.get(newFilterValue).get(); + waitAtMost(() -> filteredConsumedMessageCount.get() == expectedCount); + // we should get messages only from the "second" part of the stream + assertThat(consumedMessageCount).hasValueLessThan(messageCount * 2); + } } diff --git a/src/test/java/com/rabbitmq/stream/impl/FilteringTest.java b/src/test/java/com/rabbitmq/stream/impl/FilteringTest.java new file mode 100644 index 0000000000..5abf9b2bd6 --- /dev/null +++ b/src/test/java/com/rabbitmq/stream/impl/FilteringTest.java @@ -0,0 +1,344 @@ +// Copyright (c) 2023 VMware, Inc. or its affiliates. All rights reserved. +// +// This software, the RabbitMQ Stream Java client library, is dual-licensed under the +// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL"). +// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL, +// please see LICENSE-APACHE2. +// +// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, +// either express or implied. See the LICENSE file for specific language governing +// rights and limitations of this software. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. +package com.rabbitmq.stream.impl; + +import static com.rabbitmq.stream.impl.TestUtils.*; +import static java.util.Collections.singletonMap; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import com.rabbitmq.client.*; +import com.rabbitmq.client.AMQP.BasicProperties.Builder; +import com.rabbitmq.stream.*; +import com.rabbitmq.stream.Consumer; +import io.netty.channel.EventLoopGroup; +import java.io.IOException; +import java.time.Duration; +import java.util.*; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Supplier; +import java.util.stream.IntStream; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; +import org.junit.jupiter.params.provider.NullSource; +import org.junit.jupiter.params.provider.ValueSource; + +@DisabledIfFilteringNotSupported +@ExtendWith(TestUtils.StreamTestInfrastructureExtension.class) +public class FilteringTest { + + private static final Duration CONDITION_TIMEOUT = Duration.ofSeconds(5); + + static final int messageCount = 10_000; + + EventLoopGroup eventLoopGroup; + + Environment environment; + + String stream; + + @BeforeEach + void init() throws Exception { + EnvironmentBuilder environmentBuilder = + Environment.builder().netty().eventLoopGroup(eventLoopGroup).environmentBuilder(); + environmentBuilder.addressResolver(add -> localhost()); + environment = environmentBuilder.build(); + } + + @AfterEach + void tearDown() throws Exception { + environment.close(); + } + + @ParameterizedTest + @ValueSource(strings = "foo") + @NullSource + void publishConsume(String producerName) throws Exception { + repeatIfFailure( + () -> { + List filterValues = new ArrayList<>(Arrays.asList("apple", "banana", "pear")); + Map filterValueCount = new HashMap<>(); + Random random = new Random(); + + Runnable insert = + () -> + publish( + messageCount, + producerName, + () -> { + String filterValue = filterValues.get(random.nextInt(filterValues.size())); + filterValueCount + .computeIfAbsent(filterValue, k -> new AtomicInteger()) + .incrementAndGet(); + return filterValue; + }); + insert.run(); + + // second wave of messages, with only one, new filter value + String newFilterValue = "orange"; + filterValues.clear(); + filterValues.add(newFilterValue); + insert.run(); + + AtomicInteger receivedMessageCount = new AtomicInteger(0); + AtomicInteger filteredConsumedMessageCount = new AtomicInteger(0); + try (Consumer ignored = + consumerBuilder() + .filter() + .values(newFilterValue) + .postFilter( + m -> { + receivedMessageCount.incrementAndGet(); + return newFilterValue.equals(m.getProperties().getGroupId()); + }) + .builder() + .messageHandler( + (context, message) -> filteredConsumedMessageCount.incrementAndGet()) + .build()) { + int expectedCount = filterValueCount.get(newFilterValue).get(); + waitAtMost( + CONDITION_TIMEOUT, () -> filteredConsumedMessageCount.get() == expectedCount); + assertThat(receivedMessageCount).hasValueLessThan(messageCount * 2); + } + }); + } + + @ParameterizedTest + @ValueSource(strings = "foo") + @NullSource + void publishWithNullFilterValuesShouldBePossible(String producerName) throws Exception { + repeatIfFailure( + () -> { + publish(messageCount, producerName, () -> null); + + CountDownLatch consumeLatch = new CountDownLatch(messageCount); + try (Consumer ignored = + consumerBuilder().messageHandler((ctx, msg) -> consumeLatch.countDown()).build()) { + latchAssert(consumeLatch).completes(CONDITION_TIMEOUT); + } + }); + } + + @ParameterizedTest + @CsvSource({"foo,true", "foo,false", ",true", ",false"}) + void matchUnfilteredShouldReturnNullFilteredValueAndFilteredValues( + String producerName, boolean matchUnfiltered) throws Exception { + repeatIfFailure( + () -> { + publish(messageCount, producerName, () -> null); + + List filterValues = new ArrayList<>(Arrays.asList("apple", "banana", "pear")); + Map filterValueCount = new HashMap<>(); + Random random = new Random(); + publish( + messageCount, + producerName, + () -> { + String filterValue = filterValues.get(random.nextInt(filterValues.size())); + filterValueCount + .computeIfAbsent(filterValue, k -> new AtomicInteger()) + .incrementAndGet(); + return filterValue; + }); + + publish(messageCount, producerName, () -> null); + + AtomicInteger receivedMessageCount = new AtomicInteger(0); + Set receivedFilterValues = ConcurrentHashMap.newKeySet(); + try (Consumer ignored = + consumerBuilder() + .filter() + .values(filterValues.get(0)) + .matchUnfiltered(matchUnfiltered) + .postFilter(m -> true) + .builder() + .messageHandler( + (ctx, msg) -> { + receivedFilterValues.add( + msg.getProperties().getGroupId() == null + ? "null" + : msg.getProperties().getGroupId()); + receivedMessageCount.incrementAndGet(); + }) + .build()) { + int expected; + if (matchUnfiltered) { + expected = messageCount * 2; + } else { + expected = messageCount; + } + waitAtMost(CONDITION_TIMEOUT, () -> receivedMessageCount.get() >= expected); + } + }); + } + + @Test + void setFilterSizeOnCreation(TestInfo info) { + String s = streamName(info); + this.environment.streamCreator().stream(s).filterSize(128).create(); + this.environment.deleteStream(s); + assertThatThrownBy(() -> this.environment.streamCreator().filterSize(15)) + .isInstanceOf(IllegalArgumentException.class); + assertThatThrownBy(() -> this.environment.streamCreator().filterSize(256)) + .isInstanceOf(IllegalArgumentException.class); + } + + @Test + void publishConsumeAmqp() throws Exception { + int messageCount = 1000; + repeatIfFailure( + () -> { + List filterValues = new ArrayList<>(Arrays.asList("apple", "banana", "pear")); + Map filterValueCount = new HashMap<>(); + Random random = new Random(); + + try (Connection c = new ConnectionFactory().newConnection()) { + Callable insert = + () -> { + publishAmqp( + c, + messageCount, + () -> { + String filterValue = filterValues.get(random.nextInt(filterValues.size())); + filterValueCount + .computeIfAbsent(filterValue, k -> new AtomicInteger()) + .incrementAndGet(); + return filterValue; + }); + return null; + }; + insert.call(); + + // second wave of messages, with only one, new filter value + String newFilterValue = "orange"; + filterValues.clear(); + filterValues.add(newFilterValue); + insert.call(); + + AtomicInteger receivedMessageCount = new AtomicInteger(0); + AtomicInteger filteredConsumedMessageCount = new AtomicInteger(0); + Channel ch = c.createChannel(); + ch.basicQos(10); + Map arguments = new HashMap<>(); + arguments.put("x-stream-filter", newFilterValue); + arguments.put("x-stream-offset", 0); + ch.basicConsume( + stream, + false, + arguments, + new DefaultConsumer(ch) { + @Override + public void handleDelivery( + String consumerTag, + Envelope envelope, + AMQP.BasicProperties properties, + byte[] body) + throws IOException { + receivedMessageCount.incrementAndGet(); + String filterValue = + properties.getHeaders().get("x-stream-filter-value").toString(); + if (newFilterValue.equals(filterValue)) { + filteredConsumedMessageCount.incrementAndGet(); + } + ch.basicAck(envelope.getDeliveryTag(), false); + } + }); + int expectedCount = filterValueCount.get(newFilterValue).get(); + waitAtMost( + CONDITION_TIMEOUT, () -> filteredConsumedMessageCount.get() == expectedCount); + assertThat(receivedMessageCount).hasValueLessThan(messageCount * 2); + } + }); + } + + private ProducerBuilder producerBuilder() { + return this.environment.producerBuilder().stream(stream); + } + + private ConsumerBuilder consumerBuilder() { + return this.environment.consumerBuilder().stream(stream).offset(OffsetSpecification.first()); + } + + private static final AtomicLong PUBLISHING_SEQUENCE = new AtomicLong(0); + + private void publish( + int messageCount, String producerName, Supplier filterValueSupplier) { + Producer producer = + producerBuilder() + .name(producerName) + .filterValue(m -> m.getProperties().getGroupId()) + .build(); + CountDownLatch latch = new CountDownLatch(messageCount); + ConfirmationHandler confirmationHandler = ctx -> latch.countDown(); + IntStream.range(0, messageCount) + .forEach( + ignored -> + producer.send( + producer + .messageBuilder() + .publishingId(PUBLISHING_SEQUENCE.getAndIncrement()) + .properties() + .groupId(filterValueSupplier.get()) + .messageBuilder() + .build(), + confirmationHandler)); + latchAssert(latch).completes(CONDITION_TIMEOUT); + producer.close(); + } + + private void publishAmqp(Connection c, int messageCount, Supplier filterValueSupplier) + throws Exception { + try (Channel ch = c.createChannel()) { + ch.confirmSelect(); + for (int i = 0; i < messageCount; i++) { + ch.basicPublish( + "", + stream, + new Builder() + .headers(singletonMap("x-stream-filter-value", filterValueSupplier.get())) + .build(), + null); + } + ch.waitForConfirmsOrDie(); + } + } + + private static void repeatIfFailure(RunnableWithException test) throws Exception { + int executionCount = 0; + Throwable lastException = null; + while (executionCount < 5) { + try { + test.run(); + return; + } catch (Exception | AssertionError e) { + executionCount++; + lastException = e; + } + } + if (lastException instanceof Error) { + throw new RuntimeException(lastException); + } else { + throw (Exception) lastException; + } + } +} diff --git a/src/test/java/com/rabbitmq/stream/impl/FrameTest.java b/src/test/java/com/rabbitmq/stream/impl/FrameTest.java index c5319fd1a2..13b5c4a22a 100644 --- a/src/test/java/com/rabbitmq/stream/impl/FrameTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/FrameTest.java @@ -21,6 +21,7 @@ import static org.mockito.Mockito.verify; import com.rabbitmq.stream.Codec; +import com.rabbitmq.stream.Constants; import com.rabbitmq.stream.Message; import com.rabbitmq.stream.Properties; import io.netty.buffer.ByteBuf; @@ -152,6 +153,7 @@ public TestDesc(String description, List sizes, int expectedCalls) { .thenReturn(Mockito.mock(ChannelFuture.class)); client.publishInternal( + Constants.VERSION_1, channel, b(1), test.sizes.stream() diff --git a/src/test/java/com/rabbitmq/stream/impl/StreamProducerUnitTest.java b/src/test/java/com/rabbitmq/stream/impl/StreamProducerUnitTest.java index 8b9989870f..7677d1984e 100644 --- a/src/test/java/com/rabbitmq/stream/impl/StreamProducerUnitTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/StreamProducerUnitTest.java @@ -16,12 +16,7 @@ import static com.rabbitmq.stream.impl.TestUtils.waitAtMost; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyByte; -import static org.mockito.ArgumentMatchers.anyInt; -import static org.mockito.ArgumentMatchers.anyList; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.ArgumentMatchers.nullable; +import static org.mockito.ArgumentMatchers.*; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.when; @@ -90,6 +85,7 @@ void init() { }); when(client.maxFrameSize()).thenReturn(Integer.MAX_VALUE); when(client.publishInternal( + anyShort(), anyByte(), anyList(), any(OutboundEntityWriteCallback.class), @@ -97,13 +93,15 @@ void init() { .thenAnswer( invocation -> client.publishInternal( + Constants.VERSION_1, channel, - invocation.getArgument(0), invocation.getArgument(1), invocation.getArgument(2), - invocation.getArgument(3))); + invocation.getArgument(3), + invocation.getArgument(4))); when(client.publishInternal( + anyShort(), any(Channel.class), anyByte(), anyList(), @@ -176,6 +174,7 @@ void confirmTimeoutTaskShouldFailMessagesAfterTimeout( messageCount * 10, confirmTimeout, Duration.ofSeconds(10), + null, env); IntStream.range(0, messageCount) @@ -217,6 +216,7 @@ void enqueueTimeoutMessageShouldBeFailedWhenEnqueueTimeoutIsReached(int subEntry 2, Duration.ofMinutes(1), enqueueTimeout, + null, env); AtomicBoolean confirmCalled = new AtomicBoolean(false); @@ -255,6 +255,7 @@ void enqueueTimeoutSendingShouldBlockWhenEnqueueTimeoutIsZero(int subEntrySize) 2, Duration.ofMinutes(1), enqueueTimeout, + null, env); AtomicBoolean confirmCalled = new AtomicBoolean(false); diff --git a/src/test/java/com/rabbitmq/stream/impl/TestUtils.java b/src/test/java/com/rabbitmq/stream/impl/TestUtils.java index 873a6454cb..1e7550a7b5 100644 --- a/src/test/java/com/rabbitmq/stream/impl/TestUtils.java +++ b/src/test/java/com/rabbitmq/stream/impl/TestUtils.java @@ -458,6 +458,12 @@ static boolean atLeastVersion(String expectedVersion, String currentVersion) { } } + @Target({ElementType.TYPE, ElementType.METHOD}) + @Retention(RetentionPolicy.RUNTIME) + @Documented + @ExtendWith(DisabledIfFilteringNotSupportedCondition.class) + @interface DisabledIfFilteringNotSupported {} + @Target({ElementType.TYPE, ElementType.METHOD}) @Retention(RetentionPolicy.RUNTIME) @Documented @@ -601,6 +607,7 @@ public void beforeEach(ExtensionContext context) throws Exception { new Client(new Client.ClientParameters().eventLoopGroup(eventLoopGroup(context))); Client.Response response = client.create(stream); assertThat(response.isOk()).isTrue(); + store(context.getRoot()).put("filteringSupported", client.filteringSupported()); client.close(); store(context).put("testMethodStream", stream); } catch (NoSuchFieldException e) { @@ -671,7 +678,7 @@ private ExecutorServiceCloseableResourceWrapper() { } @Override - public void close() throws Throwable { + public void close() { this.executorService.shutdownNow(); } } @@ -703,6 +710,29 @@ private void close() { } } + static class DisabledIfFilteringNotSupportedCondition implements ExecutionCondition { + + @Override + public ConditionEvaluationResult evaluateExecutionCondition(ExtensionContext context) { + Boolean filteringSupported = + StreamTestInfrastructureExtension.store(context).get("filteringSupported", Boolean.class); + if (filteringSupported == null) { + EventLoopGroup eventLoop = StreamTestInfrastructureExtension.eventLoopGroup(context); + try (Client client = new Client(new ClientParameters().eventLoopGroup(eventLoop))) { + filteringSupported = client.filteringSupported(); + StreamTestInfrastructureExtension.store(context) + .put("filteringSupported", filteringSupported); + } + } + + if (filteringSupported) { + return ConditionEvaluationResult.enabled("filtering is supported"); + } else { + return ConditionEvaluationResult.disabled("filtering is not supported"); + } + } + } + static class DisabledIfRabbitMqCtlNotSetCondition implements ExecutionCondition { @Override diff --git a/src/test/java/com/rabbitmq/stream/perf/StreamPerfTestTest.java b/src/test/java/com/rabbitmq/stream/perf/StreamPerfTestTest.java index 673557d33e..b1e798caaf 100644 --- a/src/test/java/com/rabbitmq/stream/perf/StreamPerfTestTest.java +++ b/src/test/java/com/rabbitmq/stream/perf/StreamPerfTestTest.java @@ -511,6 +511,17 @@ void nativeEpollWorksOnLinux() throws Exception { assertThat(streamExists(s)).isTrue(); } + @Test + @BrokerVersionAtLeast(BrokerVersion.RABBITMQ_3_13_0) + void shouldNotFailWhenFilteringIsActivated() throws Exception { + Future run = run(builder().filterValueSet("1..15").filterValues("4")); + waitUntilStreamExists(s); + waitOneSecond(); + run.cancel(true); + waitRunEnds(); + assertThat(consoleOutput()).containsIgnoringCase("summary:"); + } + private static Consumer wrap(CallableConsumer action) { return t -> { try { @@ -728,6 +739,16 @@ ArgumentsBuilder nativeEpoll() { return this; } + ArgumentsBuilder filterValueSet(String... values) { + arguments.put("filter-value-set", String.join(",", values)); + return this; + } + + ArgumentsBuilder filterValues(String... values) { + arguments.put("filter-values", String.join(",", values)); + return this; + } + String build() { return this.arguments.entrySet().stream() .map(e -> "--" + e.getKey() + (e.getValue().isEmpty() ? "" : (" " + e.getValue()))) diff --git a/src/test/java/com/rabbitmq/stream/perf/UtilsTest.java b/src/test/java/com/rabbitmq/stream/perf/UtilsTest.java index f3642b7586..a6e09fd0bd 100644 --- a/src/test/java/com/rabbitmq/stream/perf/UtilsTest.java +++ b/src/test/java/com/rabbitmq/stream/perf/UtilsTest.java @@ -368,6 +368,27 @@ void commandLineMetricsTest() { .isEqualTo("-x 1 -y 2"); } + @ParameterizedTest + @CsvSource({"0,100000", "100,10", "1000,100", "50,5", "10,10", "11,1", "5,10"}) + void filteringPublishingCycle(int rate, int expected) { + assertThat(Utils.filteringPublishingCycle(rate)).isEqualTo(expected); + } + + @ParameterizedTest + @CsvSource({"3,1", "2,1", "4,1", "7,4", "10,7", "15,10"}) + void filteringSubSetSize(int setSize, int expected) { + assertThat(Utils.filteringSubSetSize(setSize)).isEqualTo(expected); + } + + @Test + void filterValueSetConverter() throws Exception { + CommandLine.ITypeConverter> converter = new Utils.FilterValueSetConverter(); + assertThat(converter.convert("one")).containsExactly("one"); + assertThat(converter.convert("one,two,three")).containsExactly("one", "two", "three"); + assertThat(converter.convert("1..10")).hasSize(10).contains("1", "2", "10"); + assertThat(converter.convert("5..10")).hasSize(6).contains("5", "6", "10"); + } + @Command(name = "test-command") static class TestCommand {