From a91380142a6946e3d58dea287ec0679cdb0ef3b6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Fri, 9 Jun 2023 12:05:18 +0200 Subject: [PATCH 01/18] Start support for stream filtering Conflicts: .github/workflows/test-pr.yml --- .github/workflows/test-pr.yml | 3 + .../com/rabbitmq/stream/ConsumerBuilder.java | 12 ++ .../com/rabbitmq/stream/ProducerBuilder.java | 2 + .../java/com/rabbitmq/stream/impl/Client.java | 13 +- .../stream/impl/MessageAccumulator.java | 2 + .../stream/impl/SimpleMessageAccumulator.java | 16 ++ .../rabbitmq/stream/impl/StreamConsumer.java | 9 +- .../stream/impl/StreamConsumerBuilder.java | 104 ++++++++++--- .../rabbitmq/stream/impl/StreamProducer.java | 106 ++++++++++--- .../stream/impl/StreamProducerBuilder.java | 12 ++ .../impl/SubEntryMessageAccumulator.java | 7 +- .../impl/Amqp10InteroperabilityTest.java | 28 +++- .../com/rabbitmq/stream/impl/ClientTest.java | 142 ++++++++++++++++-- .../com/rabbitmq/stream/impl/FrameTest.java | 2 + .../rabbitmq/stream/impl/MicroStreamTest.java | 120 +++++++++++++++ .../stream/impl/StreamProducerUnitTest.java | 13 +- 16 files changed, 513 insertions(+), 78 deletions(-) create mode 100644 src/test/java/com/rabbitmq/stream/impl/MicroStreamTest.java diff --git a/.github/workflows/test-pr.yml b/.github/workflows/test-pr.yml index aaf664e873..b55305d67f 100644 --- a/.github/workflows/test-pr.yml +++ b/.github/workflows/test-pr.yml @@ -5,6 +5,9 @@ on: branches: - main +env: + RABBITMQ_IMAGE: 'pivotalrabbitmq/rabbitmq:stream-chunk-filtering-otp-max-bazel' + jobs: build: runs-on: ubuntu-22.04 diff --git a/src/main/java/com/rabbitmq/stream/ConsumerBuilder.java b/src/main/java/com/rabbitmq/stream/ConsumerBuilder.java index b86db48b2e..c80302e8c6 100644 --- a/src/main/java/com/rabbitmq/stream/ConsumerBuilder.java +++ b/src/main/java/com/rabbitmq/stream/ConsumerBuilder.java @@ -14,6 +14,7 @@ package com.rabbitmq.stream; import java.time.Duration; +import java.util.function.Predicate; /** API to configure and create a {@link Consumer}. */ public interface ConsumerBuilder { @@ -151,6 +152,8 @@ public interface ConsumerBuilder { */ ConsumerBuilder noTrackingStrategy(); + FilterConfiguration filter(); + /** * Configure flow of messages. * @@ -242,4 +245,13 @@ interface FlowConfiguration { */ ConsumerBuilder builder(); } + + interface FilterConfiguration { + + FilterConfiguration values(String... filterValues); + + FilterConfiguration filter(Predicate filter); + + ConsumerBuilder builder(); + } } diff --git a/src/main/java/com/rabbitmq/stream/ProducerBuilder.java b/src/main/java/com/rabbitmq/stream/ProducerBuilder.java index be5314da16..82c92ea41c 100644 --- a/src/main/java/com/rabbitmq/stream/ProducerBuilder.java +++ b/src/main/java/com/rabbitmq/stream/ProducerBuilder.java @@ -132,6 +132,8 @@ public interface ProducerBuilder { */ ProducerBuilder enqueueTimeout(Duration timeout); + ProducerBuilder filter(Function filterValueExtractor); + /** * Create the {@link Producer} instance. * diff --git a/src/main/java/com/rabbitmq/stream/impl/Client.java b/src/main/java/com/rabbitmq/stream/impl/Client.java index f85fa9a51a..4c3d20f698 100644 --- a/src/main/java/com/rabbitmq/stream/impl/Client.java +++ b/src/main/java/com/rabbitmq/stream/impl/Client.java @@ -855,6 +855,7 @@ public List publish( encodedMessages.add(encodedMessage); } return publishInternal( + VERSION_1, this.channel, publisherId, encodedMessages, @@ -881,6 +882,7 @@ public List publish( encodedMessages.add(wrapper); } return publishInternal( + VERSION_1, this.channel, publisherId, encodedMessages, @@ -911,6 +913,7 @@ public List publishBatches( encodedMessageBatches.add(encodedMessageBatch); } return publishInternal( + VERSION_1, this.channel, publisherId, encodedMessageBatches, @@ -947,6 +950,7 @@ public List publishBatches( encodedMessageBatches.add(wrapper); } return publishInternal( + VERSION_1, this.channel, publisherId, encodedMessageBatches, @@ -977,15 +981,17 @@ private void checkMessageBatchFitsInFrame(EncodedMessageBatch encodedMessageBatc } List publishInternal( + short version, byte publisherId, List encodedEntities, OutboundEntityWriteCallback callback, ToLongFunction publishSequenceFunction) { return this.publishInternal( - this.channel, publisherId, encodedEntities, callback, publishSequenceFunction); + version, this.channel, publisherId, encodedEntities, callback, publishSequenceFunction); } List publishInternal( + short version, Channel ch, byte publisherId, List encodedEntities, @@ -1002,6 +1008,7 @@ List publishInternal( // the current message/batch does not fit, we're sending the batch int frameLength = length - callback.fragmentLength(encodedEntity); sendEntityBatch( + version, ch, frameLength, publisherId, @@ -1017,6 +1024,7 @@ List publishInternal( currentIndex++; } sendEntityBatch( + version, ch, length, publisherId, @@ -1031,6 +1039,7 @@ List publishInternal( } private void sendEntityBatch( + short version, Channel ch, int frameLength, byte publisherId, @@ -1044,7 +1053,7 @@ private void sendEntityBatch( ByteBuf out = allocateNoCheck(ch.alloc(), frameLength + 4); out.writeInt(frameLength); out.writeShort(encodeRequestCode(COMMAND_PUBLISH)); - out.writeShort(VERSION_1); + out.writeShort(version); out.writeByte(publisherId); int messageCount = 0; out.writeInt(toExcluded - fromIncluded); diff --git a/src/main/java/com/rabbitmq/stream/impl/MessageAccumulator.java b/src/main/java/com/rabbitmq/stream/impl/MessageAccumulator.java index 25c37fd465..e40f6ada11 100644 --- a/src/main/java/com/rabbitmq/stream/impl/MessageAccumulator.java +++ b/src/main/java/com/rabbitmq/stream/impl/MessageAccumulator.java @@ -32,6 +32,8 @@ interface AccumulatedEntity { long publishindId(); + String filterValue(); + Object encodedEntity(); StreamProducer.ConfirmationCallback confirmationCallback(); diff --git a/src/main/java/com/rabbitmq/stream/impl/SimpleMessageAccumulator.java b/src/main/java/com/rabbitmq/stream/impl/SimpleMessageAccumulator.java index d701710ac0..5f01ea1416 100644 --- a/src/main/java/com/rabbitmq/stream/impl/SimpleMessageAccumulator.java +++ b/src/main/java/com/rabbitmq/stream/impl/SimpleMessageAccumulator.java @@ -21,28 +21,35 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.function.Function; import java.util.function.ToLongFunction; class SimpleMessageAccumulator implements MessageAccumulator { + private static final Function NULL_FILTER_VALUE_EXTRACTOR = m -> null; + protected final BlockingQueue messages; protected final Clock clock; private final int capacity; private final Codec codec; private final int maxFrameSize; private final ToLongFunction publishSequenceFunction; + private final Function filterValueExtractor; SimpleMessageAccumulator( int capacity, Codec codec, int maxFrameSize, ToLongFunction publishSequenceFunction, + Function filterValueExtractor, Clock clock) { this.capacity = capacity; this.messages = new LinkedBlockingQueue<>(capacity); this.codec = codec; this.maxFrameSize = maxFrameSize; this.publishSequenceFunction = publishSequenceFunction; + this.filterValueExtractor = + filterValueExtractor == null ? NULL_FILTER_VALUE_EXTRACTOR : filterValueExtractor; this.clock = clock; } @@ -56,6 +63,7 @@ public boolean add(Message message, ConfirmationHandler confirmationHandler) { new SimpleAccumulatedEntity( clock.time(), publishingId, + this.filterValueExtractor.apply(message), encodedMessage, new SimpleConfirmationCallback(message, confirmationHandler)), 60, @@ -88,17 +96,20 @@ private static final class SimpleAccumulatedEntity implements AccumulatedEntity private final long time; private final long publishingId; + private final String filterValue; private final Codec.EncodedMessage encodedMessage; private final StreamProducer.ConfirmationCallback confirmationCallback; private SimpleAccumulatedEntity( long time, long publishingId, + String filterValue, Codec.EncodedMessage encodedMessage, StreamProducer.ConfirmationCallback confirmationCallback) { this.time = time; this.publishingId = publishingId; this.encodedMessage = encodedMessage; + this.filterValue = filterValue; this.confirmationCallback = confirmationCallback; } @@ -107,6 +118,11 @@ public long publishindId() { return publishingId; } + @Override + public String filterValue() { + return filterValue; + } + @Override public Object encodedEntity() { return encodedMessage; diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamConsumer.java b/src/main/java/com/rabbitmq/stream/impl/StreamConsumer.java index 2e63b11e00..f1dab8cdde 100644 --- a/src/main/java/com/rabbitmq/stream/impl/StreamConsumer.java +++ b/src/main/java/com/rabbitmq/stream/impl/StreamConsumer.java @@ -18,15 +18,8 @@ import static com.rabbitmq.stream.impl.Utils.offsetBefore; import static java.time.Duration.ofMillis; -import com.rabbitmq.stream.Constants; -import com.rabbitmq.stream.Consumer; -import com.rabbitmq.stream.ConsumerUpdateListener; -import com.rabbitmq.stream.MessageHandler; +import com.rabbitmq.stream.*; import com.rabbitmq.stream.MessageHandler.Context; -import com.rabbitmq.stream.NoOffsetException; -import com.rabbitmq.stream.OffsetSpecification; -import com.rabbitmq.stream.StreamException; -import com.rabbitmq.stream.SubscriptionListener; import com.rabbitmq.stream.impl.Client.QueryOffsetResponse; import com.rabbitmq.stream.impl.StreamConsumerBuilder.TrackingConfiguration; import com.rabbitmq.stream.impl.StreamEnvironment.TrackingConsumerRegistration; diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamConsumerBuilder.java b/src/main/java/com/rabbitmq/stream/impl/StreamConsumerBuilder.java index 7d829e1f7f..9aa7ae463e 100644 --- a/src/main/java/com/rabbitmq/stream/impl/StreamConsumerBuilder.java +++ b/src/main/java/com/rabbitmq/stream/impl/StreamConsumerBuilder.java @@ -13,24 +13,23 @@ // info@rabbitmq.com. package com.rabbitmq.stream.impl; -import com.rabbitmq.stream.Consumer; -import com.rabbitmq.stream.ConsumerBuilder; -import com.rabbitmq.stream.ConsumerUpdateListener; -import com.rabbitmq.stream.MessageHandler; -import com.rabbitmq.stream.OffsetSpecification; -import com.rabbitmq.stream.StreamException; -import com.rabbitmq.stream.SubscriptionListener; +import com.rabbitmq.stream.*; import java.lang.reflect.Field; import java.lang.reflect.Modifier; import java.time.Duration; +import java.util.Arrays; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Predicate; class StreamConsumerBuilder implements ConsumerBuilder { private static final int NAME_MAX_SIZE = 256; // server-side limitation + private static final TrackingConfiguration DISABLED_TRACKING_CONFIGURATION = + new TrackingConfiguration(false, false, -1, Duration.ZERO, Duration.ZERO); private final StreamEnvironment environment; - + private final Map subscriptionProperties = new ConcurrentHashMap<>(); private String stream, superStream; private OffsetSpecification offsetSpecification = null; private MessageHandler messageHandler; @@ -40,9 +39,9 @@ class StreamConsumerBuilder implements ConsumerBuilder { private boolean noTrackingStrategy = false; private boolean lazyInit = false; private SubscriptionListener subscriptionListener = subscriptionContext -> {}; - private final Map subscriptionProperties = new ConcurrentHashMap<>(); - private ConsumerUpdateListener consumerUpdateListener; private final DefaultFlowConfiguration flowConfiguration = new DefaultFlowConfiguration(this); + private ConsumerUpdateListener consumerUpdateListener; + private DefaultFilterConfiguration filterConfiguration; public StreamConsumerBuilder(StreamEnvironment environment) { this.environment = environment; @@ -141,6 +140,12 @@ StreamConsumerBuilder lazyInit(boolean lazyInit) { return this; } + @Override + public FilterConfiguration filter() { + this.filterConfiguration = new DefaultFilterConfiguration(this); + return this.filterConfiguration; + } + @Override public Consumer build() { if (this.stream == null && this.superStream == null) { @@ -185,13 +190,30 @@ public Consumer build() { trackingConfiguration = DISABLED_TRACKING_CONFIGURATION; } + MessageHandler handler; + if (this.filterConfiguration == null) { + handler = this.messageHandler; + } else { + this.filterConfiguration.validate(); + this.subscriptionProperties.put( + "filters", String.join(",", this.filterConfiguration.filterValues)); + final Predicate filter = this.filterConfiguration.filter; + final MessageHandler delegate = this.messageHandler; + handler = + (context, message) -> { + if (filter.test(message)) { + delegate.handle(context, message); + } + }; + } + Consumer consumer; if (this.stream != null) { consumer = new StreamConsumer( this.stream, this.offsetSpecification, - this.messageHandler, + handler, this.name, this.environment, trackingConfiguration, @@ -212,8 +234,21 @@ public Consumer build() { return consumer; } - private static final TrackingConfiguration DISABLED_TRACKING_CONFIGURATION = - new TrackingConfiguration(false, false, -1, Duration.ZERO, Duration.ZERO); + StreamConsumerBuilder duplicate() { + StreamConsumerBuilder duplicate = new StreamConsumerBuilder(this.environment); + for (Field field : StreamConsumerBuilder.class.getDeclaredFields()) { + if (Modifier.isStatic(field.getModifiers())) { + continue; + } + field.setAccessible(true); + try { + field.set(duplicate, field.get(this)); + } catch (IllegalAccessException e) { + throw new StreamException("Error while duplicating stream producer builder", e); + } + } + return duplicate; + } static class TrackingConfiguration { @@ -321,20 +356,39 @@ public ConsumerBuilder builder() { } } - StreamConsumerBuilder duplicate() { - StreamConsumerBuilder duplicate = new StreamConsumerBuilder(this.environment); - for (Field field : StreamConsumerBuilder.class.getDeclaredFields()) { - if (Modifier.isStatic(field.getModifiers())) { - continue; - } - field.setAccessible(true); - try { - field.set(duplicate, field.get(this)); - } catch (IllegalAccessException e) { - throw new StreamException("Error while duplicating stream producer builder", e); + private static final class DefaultFilterConfiguration implements FilterConfiguration { + + private final StreamConsumerBuilder builder; + private List filterValues; + private Predicate filter; + + private DefaultFilterConfiguration(StreamConsumerBuilder builder) { + this.builder = builder; + } + + @Override + public FilterConfiguration values(String... filterValues) { + // FIXME: check for ',' in values + this.filterValues = Arrays.asList(filterValues); + return this; + } + + @Override + public FilterConfiguration filter(Predicate filter) { + this.filter = filter; + return this; + } + + @Override + public ConsumerBuilder builder() { + return this.builder; + } + + private void validate() { + if (this.filterValues == null || this.filter == null) { + throw new IllegalArgumentException("Both filter values and the filter logic must be set"); } } - return duplicate; } private static class DefaultFlowConfiguration implements FlowConfiguration { diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamProducer.java b/src/main/java/com/rabbitmq/stream/impl/StreamProducer.java index 49919164eb..b3ce2a8b71 100644 --- a/src/main/java/com/rabbitmq/stream/impl/StreamProducer.java +++ b/src/main/java/com/rabbitmq/stream/impl/StreamProducer.java @@ -13,9 +13,7 @@ // info@rabbitmq.com. package com.rabbitmq.stream.impl; -import static com.rabbitmq.stream.Constants.CODE_MESSAGE_ENQUEUEING_FAILED; -import static com.rabbitmq.stream.Constants.CODE_PRODUCER_CLOSED; -import static com.rabbitmq.stream.Constants.CODE_PRODUCER_NOT_AVAILABLE; +import static com.rabbitmq.stream.Constants.*; import static com.rabbitmq.stream.impl.Utils.formatConstant; import static com.rabbitmq.stream.impl.Utils.namedRunnable; @@ -31,6 +29,7 @@ import com.rabbitmq.stream.impl.Client.Response; import com.rabbitmq.stream.impl.MessageAccumulator.AccumulatedEntity; import io.netty.buffer.ByteBuf; +import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.ArrayList; import java.util.Iterator; @@ -48,6 +47,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; import java.util.function.ToLongFunction; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -80,6 +80,7 @@ class StreamProducer implements Producer { private volatile byte publisherId; private volatile Status status; private volatile ScheduledFuture confirmTimeoutFuture; + private final short publishVersion; StreamProducer( String name, @@ -91,6 +92,7 @@ class StreamProducer implements Producer { int maxUnconfirmedMessages, Duration confirmTimeout, Duration enqueueTimeout, + Function filterValueExtractor, StreamEnvironment environment) { this.id = ID_SEQUENCE.getAndIncrement(); this.environment = environment; @@ -116,8 +118,13 @@ class StreamProducer implements Producer { environment.codec(), client.maxFrameSize(), accumulatorPublishSequenceFunction, + filterValueExtractor, this.environment.clock()); - delegateWriteCallback = Client.OUTBOUND_MESSAGE_WRITE_CALLBACK; + if (filterValueExtractor == null) { + delegateWriteCallback = Client.OUTBOUND_MESSAGE_WRITE_CALLBACK; + } else { + delegateWriteCallback = OUTBOUND_MSG_FILTER_VALUE_WRITE_CALLBACK; + } } else { this.accumulator = new SubEntryMessageAccumulator( @@ -138,22 +145,44 @@ class StreamProducer implements Producer { this.unconfirmedMessagesSemaphore = new Semaphore(maxUnconfirmedMessages, true); this.unconfirmedMessages = new ConcurrentHashMap<>(this.maxUnconfirmedMessages, 0.75f, 2); - this.writeCallback = - new Client.OutboundEntityWriteCallback() { - @Override - public int write(ByteBuf bb, Object entity, long publishingId) { - MessageAccumulator.AccumulatedEntity accumulatedEntity = - (MessageAccumulator.AccumulatedEntity) entity; - unconfirmedMessages.put(publishingId, accumulatedEntity); - return delegateWriteCallback.write(bb, accumulatedEntity.encodedEntity(), publishingId); - } + if (filterValueExtractor == null) { + this.publishVersion = VERSION_1; + this.writeCallback = + new Client.OutboundEntityWriteCallback() { + @Override + public int write(ByteBuf bb, Object entity, long publishingId) { + MessageAccumulator.AccumulatedEntity accumulatedEntity = + (MessageAccumulator.AccumulatedEntity) entity; + unconfirmedMessages.put(publishingId, accumulatedEntity); + return delegateWriteCallback.write( + bb, accumulatedEntity.encodedEntity(), publishingId); + } + + @Override + public int fragmentLength(Object entity) { + return delegateWriteCallback.fragmentLength( + ((MessageAccumulator.AccumulatedEntity) entity).encodedEntity()); + } + }; + } else { + this.publishVersion = VERSION_2; + this.writeCallback = + new Client.OutboundEntityWriteCallback() { + @Override + public int write(ByteBuf bb, Object entity, long publishingId) { + MessageAccumulator.AccumulatedEntity accumulatedEntity = + (MessageAccumulator.AccumulatedEntity) entity; + unconfirmedMessages.put(publishingId, accumulatedEntity); + return delegateWriteCallback.write(bb, accumulatedEntity, publishingId); + } + + @Override + public int fragmentLength(Object entity) { + return delegateWriteCallback.fragmentLength(entity); + } + }; + } - @Override - public int fragmentLength(Object entity) { - return delegateWriteCallback.fragmentLength( - ((MessageAccumulator.AccumulatedEntity) entity).encodedEntity()); - } - }; if (!batchPublishingDelay.isNegative() && !batchPublishingDelay.isZero()) { AtomicReference taskReference = new AtomicReference<>(); Runnable task = @@ -445,7 +474,11 @@ private void publishBatch(boolean stateCheck) { batchCount++; } client.publishInternal( - this.publisherId, messages, this.writeCallback, this.publishSequenceFunction); + this.publishVersion, + this.publisherId, + messages, + this.writeCallback, + this.publishSequenceFunction); } } @@ -480,7 +513,11 @@ void running() { batchCount++; } client.publishInternal( - this.publisherId, messages, this.writeCallback, this.publishSequenceFunction); + this.publishVersion, + this.publisherId, + messages, + this.writeCallback, + this.publishSequenceFunction); } } publishBatch(false); @@ -558,4 +595,31 @@ private void checkNotClosed() { throw new IllegalStateException("This producer instance has been closed"); } } + + private static final Client.OutboundEntityWriteCallback OUTBOUND_MSG_FILTER_VALUE_WRITE_CALLBACK = + new OutboundMessageFilterValueWriterCallback(); + + private static final class OutboundMessageFilterValueWriterCallback + implements Client.OutboundEntityWriteCallback { + + @Override + public int write(ByteBuf bb, Object entity, long publishingId) { + AccumulatedEntity accumulatedEntity = (AccumulatedEntity) entity; + String filterValue = accumulatedEntity.filterValue(); + bb.writeShort(filterValue.length()); + bb.writeBytes(filterValue.getBytes(StandardCharsets.UTF_8)); + Codec.EncodedMessage messageToPublish = + (Codec.EncodedMessage) accumulatedEntity.encodedEntity(); + bb.writeInt(messageToPublish.getSize()); + bb.writeBytes(messageToPublish.getData(), 0, messageToPublish.getSize()); + return 1; + } + + @Override + public int fragmentLength(Object entity) { + AccumulatedEntity accumulatedEntity = (AccumulatedEntity) entity; + Codec.EncodedMessage message = (Codec.EncodedMessage) accumulatedEntity.encodedEntity(); + return 8 + 2 + accumulatedEntity.filterValue().length() + 4 + message.getSize(); + } + } } diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamProducerBuilder.java b/src/main/java/com/rabbitmq/stream/impl/StreamProducerBuilder.java index 074a544a7d..bccaf2e231 100644 --- a/src/main/java/com/rabbitmq/stream/impl/StreamProducerBuilder.java +++ b/src/main/java/com/rabbitmq/stream/impl/StreamProducerBuilder.java @@ -48,6 +48,8 @@ class StreamProducerBuilder implements ProducerBuilder { private DefaultRoutingConfiguration routingConfiguration; + private Function filterValueExtractor; + StreamProducerBuilder(StreamEnvironment environment) { this.environment = environment; } @@ -128,6 +130,12 @@ public ProducerBuilder enqueueTimeout(Duration timeout) { return this; } + @Override + public ProducerBuilder filter(Function filterValueExtractor) { + this.filterValueExtractor = filterValueExtractor; + return this; + } + @Override public RoutingConfiguration routing(Function routingKeyExtractor) { this.routingConfiguration = new DefaultRoutingConfiguration(this); @@ -150,6 +158,9 @@ public Producer build() { throw new IllegalArgumentException( "Sub-entry batching must be enabled to enable compression"); } + if (subEntrySize > 1 && filterValueExtractor != null) { + throw new IllegalArgumentException("Filtering is not supported with sub-entry batching"); + } if (subEntrySize > 1 && compression == null) { compression = Compression.NONE; } @@ -183,6 +194,7 @@ public Producer build() { maxUnconfirmedMessages, confirmTimeout, enqueueTimeout, + filterValueExtractor, environment); this.environment.addProducer((StreamProducer) producer); } else { diff --git a/src/main/java/com/rabbitmq/stream/impl/SubEntryMessageAccumulator.java b/src/main/java/com/rabbitmq/stream/impl/SubEntryMessageAccumulator.java index b2045c8df7..9533f2a922 100644 --- a/src/main/java/com/rabbitmq/stream/impl/SubEntryMessageAccumulator.java +++ b/src/main/java/com/rabbitmq/stream/impl/SubEntryMessageAccumulator.java @@ -40,7 +40,7 @@ public SubEntryMessageAccumulator( int maxFrameSize, ToLongFunction publishSequenceFunction, Clock clock) { - super(subEntrySize * batchSize, codec, maxFrameSize, publishSequenceFunction, clock); + super(subEntrySize * batchSize, codec, maxFrameSize, publishSequenceFunction, null, clock); this.subEntrySize = subEntrySize; this.compressionCodec = compressionCodec; this.compression = compressionCodec == null ? Compression.NONE.code() : compressionCodec.code(); @@ -111,6 +111,11 @@ public long publishindId() { return publishingId; } + @Override + public String filterValue() { + return null; + } + @Override public Object encodedEntity() { return encodedMessageBatch; diff --git a/src/test/java/com/rabbitmq/stream/impl/Amqp10InteroperabilityTest.java b/src/test/java/com/rabbitmq/stream/impl/Amqp10InteroperabilityTest.java index 166abb5f00..c49d9914cc 100644 --- a/src/test/java/com/rabbitmq/stream/impl/Amqp10InteroperabilityTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/Amqp10InteroperabilityTest.java @@ -14,7 +14,6 @@ package com.rabbitmq.stream.impl; import static com.rabbitmq.stream.impl.TestUtils.ClientFactory; -import static com.rabbitmq.stream.impl.TestUtils.StreamTestInfrastructureExtension; import static com.rabbitmq.stream.impl.TestUtils.latchAssert; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -27,12 +26,12 @@ import com.rabbitmq.stream.impl.Client.ClientParameters; import com.rabbitmq.stream.impl.TestUtils.BrokerVersion; import com.rabbitmq.stream.impl.TestUtils.BrokerVersionAtLeast; -import com.rabbitmq.stream.impl.TestUtils.DisabledIfAmqp10NotEnabled; import com.swiftmq.amqp.AMQPContext; import com.swiftmq.amqp.v100.client.Connection; import com.swiftmq.amqp.v100.client.Producer; import com.swiftmq.amqp.v100.client.QoS; import com.swiftmq.amqp.v100.client.Session; +import com.swiftmq.amqp.v100.generated.messaging.delivery_state.*; import com.swiftmq.amqp.v100.generated.messaging.message_format.AmqpSequence; import com.swiftmq.amqp.v100.generated.messaging.message_format.AmqpValue; import com.swiftmq.amqp.v100.generated.messaging.message_format.ApplicationProperties; @@ -59,10 +58,9 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -@ExtendWith(StreamTestInfrastructureExtension.class) -@DisabledIfAmqp10NotEnabled +// @ExtendWith(StreamTestInfrastructureExtension.class) +// @DisabledIfAmqp10NotEnabled public class Amqp10InteroperabilityTest { String stream; @@ -91,6 +89,26 @@ void tearDown() { connection.close(); } + @Test + void publish() throws Exception { + // Producer p = session.createProducer("/exchange/queue/dummy", QoS.AT_LEAST_ONCE); + Producer p = session.createProducer("/amq/queue/dummy", QoS.AT_LEAST_ONCE); + // Producer p = session.createProducer("/queue/dummy", QoS.AT_LEAST_ONCE); + // /amq/queue/dummy => parsed {amqqueue,"dummy"} + // /queue/dummy => parsed {queue,"dummy"} + // dummy => {queue,"dummy"} + AMQPMessage message = new AMQPMessage(); + + // Properties properties = new Properties(); + // properties.setSubject(new AMQPString("bar")); + // message.setProperties(properties); + + message.addData(new Data("hello".getBytes(StandardCharsets.UTF_8))); + + p.send(message); + p.close(); + } + @Test @BrokerVersionAtLeast(BrokerVersion.RABBITMQ_3_11_9) void publishToStreamQueueConsumeFromStream() throws Exception { diff --git a/src/test/java/com/rabbitmq/stream/impl/ClientTest.java b/src/test/java/com/rabbitmq/stream/impl/ClientTest.java index 22e80abd8a..be10744587 100644 --- a/src/test/java/com/rabbitmq/stream/impl/ClientTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/ClientTest.java @@ -13,8 +13,7 @@ // info@rabbitmq.com. package com.rabbitmq.stream.impl; -import static com.rabbitmq.stream.impl.TestUtils.ResponseConditions.ko; -import static com.rabbitmq.stream.impl.TestUtils.ResponseConditions.responseCode; +import static com.rabbitmq.stream.impl.TestUtils.ResponseConditions.*; import static com.rabbitmq.stream.impl.TestUtils.b; import static com.rabbitmq.stream.impl.TestUtils.latchAssert; import static com.rabbitmq.stream.impl.TestUtils.streamName; @@ -43,6 +42,7 @@ import com.rabbitmq.stream.impl.ServerFrameHandler.FrameHandlerInfo; import com.rabbitmq.stream.impl.TestUtils.BrokerVersion; import com.rabbitmq.stream.impl.TestUtils.BrokerVersionAtLeast; +import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.UnpooledByteBufAllocator; import java.io.ByteArrayOutputStream; @@ -52,14 +52,7 @@ import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.time.Duration; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Random; -import java.util.Set; -import java.util.UUID; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Semaphore; @@ -68,6 +61,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.function.LongConsumer; +import java.util.function.ToLongFunction; import java.util.stream.Collectors; import java.util.stream.IntStream; import org.junit.jupiter.api.Test; @@ -978,4 +972,132 @@ void streamStatsFirstOffsetShouldChangeAfterRetentionKickedIn(TestInfo info) { } } } + + @Test + void publishConsumeWithFilterValueShouldSendSubSetOfStream() throws Exception { + int messageCount = 10_000; + AtomicReference publishLatch = + new AtomicReference<>(new CountDownLatch(messageCount)); + Client publisher = + cf.get( + new ClientParameters() + .publishConfirmListener( + (publisherId, publishingId) -> publishLatch.get().countDown())); + AtomicLong sequence = new AtomicLong(0); + ToLongFunction sequenceFunction = msg -> sequence.getAndIncrement(); + Response response = publisher.declarePublisher(b(0), null, stream); + assertThat(response).is(ok()); + + // firt wave of messages with several filter values + List filterValues = new ArrayList<>(Arrays.asList("apple", "banana", "pear")); + Map filterValueCount = new HashMap<>(); + Random random = new Random(); + class Entity { + private final Codec.EncodedMessage encodedMessage; + private final String filterValue; + + Entity(Codec.EncodedMessage encodedMessage, String filterValue) { + this.encodedMessage = encodedMessage; + this.filterValue = filterValue; + } + } + + Client.OutboundEntityWriteCallback writeCallback = + new Client.OutboundEntityWriteCallback() { + @Override + public int write(ByteBuf bb, Object obj, long publishingId) { + Entity msg = (Entity) obj; + bb.writeShort(msg.filterValue.length()); + bb.writeBytes(msg.filterValue.getBytes(StandardCharsets.UTF_8)); + Codec.EncodedMessage messageToPublish = msg.encodedMessage; + bb.writeInt(messageToPublish.getSize()); + bb.writeBytes(messageToPublish.getData(), 0, messageToPublish.getSize()); + return 1; + } + + @Override + public int fragmentLength(Object obj) { + Entity msg = (Entity) obj; + return 8 + 2 + msg.filterValue.length() + 4 + msg.encodedMessage.getSize(); + } + }; + int batchSize = 100; + List messages = new ArrayList<>(batchSize); + Runnable write = + () -> + publisher.publishInternal( + Constants.VERSION_2, b(0), messages, writeCallback, sequenceFunction); + Runnable insert = + () -> { + byte[] data = "hello".getBytes(StandardCharsets.UTF_8); + int publishedMessageCount = 0; + while (publishedMessageCount < messageCount) { + String filterValue = filterValues.get(random.nextInt(filterValues.size())); + filterValueCount + .computeIfAbsent(filterValue, k -> new AtomicInteger()) + .incrementAndGet(); + Message message = + publisher + .codec() + .messageBuilder() + .addData(data) + .properties() + .groupId(filterValue) + .messageBuilder() + .build(); + Entity entity = new Entity(publisher.codec().encode(message), filterValue); + messages.add(entity); + publishedMessageCount++; + + if (messages.size() == batchSize) { + write.run(); + messages.clear(); + } + } + if (!messages.isEmpty()) { + write.run(); + } + }; + + insert.run(); + assertThat(latchAssert(publishLatch)).completes(); + + // second wave of messages, with only one, new filter value + String newFilterValue = "orange"; + filterValues.clear(); + filterValues.add(newFilterValue); + publishLatch.set(new CountDownLatch(messageCount)); + insert.run(); + assertThat(latchAssert(publishLatch)).completes(); + + AtomicInteger consumedMessageCount = new AtomicInteger(0); + AtomicInteger filteredConsumedMessageCount = new AtomicInteger(0); + Client consumer = + cf.get( + new ClientParameters() + .chunkListener( + (client, subscriptionId, offset, messageCount1, dataSize) -> + client.credit(subscriptionId, 1)) + .messageListener( + (subscriptionId, offset, chunkTimestamp, committedChunkId, message) -> { + consumedMessageCount.incrementAndGet(); + String filterValue = message.getProperties().getGroupId(); + if (newFilterValue.equals(filterValue)) { + filteredConsumedMessageCount.incrementAndGet(); + } + })); + + // consume only messages with filter value from second wave + consumer.subscribe( + b(0), + stream, + OffsetSpecification.first(), + 1, + Collections.singletonMap("filters", newFilterValue)); + + int expectedCount = filterValueCount.get(newFilterValue).get(); + waitAtMost(() -> filteredConsumedMessageCount.get() == expectedCount); + // we should get messages only from the "second" part of the stream + assertThat(consumedMessageCount).hasValueLessThan(messageCount * 2); + } } diff --git a/src/test/java/com/rabbitmq/stream/impl/FrameTest.java b/src/test/java/com/rabbitmq/stream/impl/FrameTest.java index c5319fd1a2..e2c9024731 100644 --- a/src/test/java/com/rabbitmq/stream/impl/FrameTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/FrameTest.java @@ -17,6 +17,7 @@ import static com.rabbitmq.stream.impl.TestUtils.waitAtMost; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.anyShort; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -152,6 +153,7 @@ public TestDesc(String description, List sizes, int expectedCalls) { .thenReturn(Mockito.mock(ChannelFuture.class)); client.publishInternal( + anyShort(), channel, b(1), test.sizes.stream() diff --git a/src/test/java/com/rabbitmq/stream/impl/MicroStreamTest.java b/src/test/java/com/rabbitmq/stream/impl/MicroStreamTest.java new file mode 100644 index 0000000000..dd745a29e4 --- /dev/null +++ b/src/test/java/com/rabbitmq/stream/impl/MicroStreamTest.java @@ -0,0 +1,120 @@ +// Copyright (c) 2023 VMware, Inc. or its affiliates. All rights reserved. +// +// This software, the RabbitMQ Stream Java client library, is dual-licensed under the +// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL"). +// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL, +// please see LICENSE-APACHE2. +// +// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, +// either express or implied. See the LICENSE file for specific language governing +// rights and limitations of this software. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. +package com.rabbitmq.stream.impl; + +import static com.rabbitmq.stream.impl.TestUtils.*; +import static org.assertj.core.api.Assertions.assertThat; + +import com.rabbitmq.stream.*; +import io.netty.channel.EventLoopGroup; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.*; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.IntStream; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +@ExtendWith(TestUtils.StreamTestInfrastructureExtension.class) +public class MicroStreamTest { + + static final Charset UTF8 = StandardCharsets.UTF_8; + EventLoopGroup eventLoopGroup; + + Environment environment; + + String stream; + + @BeforeEach + void init() throws Exception { + EnvironmentBuilder environmentBuilder = + Environment.builder().netty().eventLoopGroup(eventLoopGroup).environmentBuilder(); + environmentBuilder.addressResolver(add -> localhost()); + environment = environmentBuilder.build(); + } + + @AfterEach + void tearDown() throws Exception { + environment.close(); + } + + @Test + void publishConsume() throws Exception { + int messageCount = 10_000; + Producer producer = + environment.producerBuilder().stream(stream) + .filter(m -> m.getProperties().getGroupId()) + .build(); + + List filterValues = new ArrayList<>(Arrays.asList("apple", "banana", "pear")); + Map filterValueCount = new HashMap<>(); + Random random = new Random(); + + AtomicReference publishLatch = + new AtomicReference<>(new CountDownLatch(messageCount)); + ConfirmationHandler confirmationHandler = confirmationStatus -> publishLatch.get().countDown(); + Runnable insert = + () -> { + IntStream.range(0, messageCount) + .forEach( + i -> { + String filterValue = filterValues.get(random.nextInt(filterValues.size())); + filterValueCount + .computeIfAbsent(filterValue, k -> new AtomicInteger()) + .incrementAndGet(); + producer.send( + producer + .messageBuilder() + .properties() + .groupId(filterValue) + .messageBuilder() + .build(), + confirmationHandler); + }); + }; + insert.run(); + latchAssert(publishLatch).completes(); + + // second wave of messages, with only one, new filter value + String newFilterValue = "orange"; + filterValues.clear(); + filterValues.add(newFilterValue); + publishLatch.set(new CountDownLatch(messageCount)); + insert.run(); + assertThat(latchAssert(publishLatch)).completes(); + + AtomicInteger receivedMessageCount = new AtomicInteger(0); + AtomicInteger filteredConsumedMessageCount = new AtomicInteger(0); + environment.consumerBuilder().stream(stream) + .offset(OffsetSpecification.first()) + .filter() + .values(newFilterValue) + .filter( + m -> { + receivedMessageCount.incrementAndGet(); + return newFilterValue.equals(m.getProperties().getGroupId()); + }) + .builder() + .messageHandler((context, message) -> filteredConsumedMessageCount.incrementAndGet()) + .build(); + + int expectedCount = filterValueCount.get(newFilterValue).get(); + waitAtMost(() -> filteredConsumedMessageCount.get() == expectedCount); + assertThat(receivedMessageCount).hasValueLessThan(messageCount * 2); + } +} diff --git a/src/test/java/com/rabbitmq/stream/impl/StreamProducerUnitTest.java b/src/test/java/com/rabbitmq/stream/impl/StreamProducerUnitTest.java index 8b9989870f..1f562e1339 100644 --- a/src/test/java/com/rabbitmq/stream/impl/StreamProducerUnitTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/StreamProducerUnitTest.java @@ -16,12 +16,7 @@ import static com.rabbitmq.stream.impl.TestUtils.waitAtMost; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyByte; -import static org.mockito.ArgumentMatchers.anyInt; -import static org.mockito.ArgumentMatchers.anyList; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.ArgumentMatchers.nullable; +import static org.mockito.ArgumentMatchers.*; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.when; @@ -90,6 +85,7 @@ void init() { }); when(client.maxFrameSize()).thenReturn(Integer.MAX_VALUE); when(client.publishInternal( + anyShort(), anyByte(), anyList(), any(OutboundEntityWriteCallback.class), @@ -97,6 +93,7 @@ void init() { .thenAnswer( invocation -> client.publishInternal( + Constants.VERSION_1, channel, invocation.getArgument(0), invocation.getArgument(1), @@ -104,6 +101,7 @@ void init() { invocation.getArgument(3))); when(client.publishInternal( + anyShort(), any(Channel.class), anyByte(), anyList(), @@ -176,6 +174,7 @@ void confirmTimeoutTaskShouldFailMessagesAfterTimeout( messageCount * 10, confirmTimeout, Duration.ofSeconds(10), + null, env); IntStream.range(0, messageCount) @@ -217,6 +216,7 @@ void enqueueTimeoutMessageShouldBeFailedWhenEnqueueTimeoutIsReached(int subEntry 2, Duration.ofMinutes(1), enqueueTimeout, + null, env); AtomicBoolean confirmCalled = new AtomicBoolean(false); @@ -255,6 +255,7 @@ void enqueueTimeoutSendingShouldBlockWhenEnqueueTimeoutIsZero(int subEntrySize) 2, Duration.ofMinutes(1), enqueueTimeout, + null, env); AtomicBoolean confirmCalled = new AtomicBoolean(false); From bb27e31c8ee50eedb35b005a5c9d03e9979054d5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Tue, 23 May 2023 15:03:17 +0200 Subject: [PATCH 02/18] Fix test --- .../impl/Amqp10InteroperabilityTest.java | 25 +++---------------- 1 file changed, 3 insertions(+), 22 deletions(-) diff --git a/src/test/java/com/rabbitmq/stream/impl/Amqp10InteroperabilityTest.java b/src/test/java/com/rabbitmq/stream/impl/Amqp10InteroperabilityTest.java index c49d9914cc..e60d105dbb 100644 --- a/src/test/java/com/rabbitmq/stream/impl/Amqp10InteroperabilityTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/Amqp10InteroperabilityTest.java @@ -58,9 +58,10 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; -// @ExtendWith(StreamTestInfrastructureExtension.class) -// @DisabledIfAmqp10NotEnabled +@ExtendWith(TestUtils.StreamTestInfrastructureExtension.class) +@TestUtils.DisabledIfAmqp10NotEnabled public class Amqp10InteroperabilityTest { String stream; @@ -89,26 +90,6 @@ void tearDown() { connection.close(); } - @Test - void publish() throws Exception { - // Producer p = session.createProducer("/exchange/queue/dummy", QoS.AT_LEAST_ONCE); - Producer p = session.createProducer("/amq/queue/dummy", QoS.AT_LEAST_ONCE); - // Producer p = session.createProducer("/queue/dummy", QoS.AT_LEAST_ONCE); - // /amq/queue/dummy => parsed {amqqueue,"dummy"} - // /queue/dummy => parsed {queue,"dummy"} - // dummy => {queue,"dummy"} - AMQPMessage message = new AMQPMessage(); - - // Properties properties = new Properties(); - // properties.setSubject(new AMQPString("bar")); - // message.setProperties(properties); - - message.addData(new Data("hello".getBytes(StandardCharsets.UTF_8))); - - p.send(message); - p.close(); - } - @Test @BrokerVersionAtLeast(BrokerVersion.RABBITMQ_3_11_9) void publishToStreamQueueConsumeFromStream() throws Exception { From e04015c345662da1288483cae5be3db46bbe9fab Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Tue, 23 May 2023 15:28:05 +0200 Subject: [PATCH 03/18] Fix tests --- src/test/java/com/rabbitmq/stream/impl/FrameTest.java | 4 ++-- .../java/com/rabbitmq/stream/impl/StreamProducerUnitTest.java | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/test/java/com/rabbitmq/stream/impl/FrameTest.java b/src/test/java/com/rabbitmq/stream/impl/FrameTest.java index e2c9024731..13b5c4a22a 100644 --- a/src/test/java/com/rabbitmq/stream/impl/FrameTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/FrameTest.java @@ -17,11 +17,11 @@ import static com.rabbitmq.stream.impl.TestUtils.waitAtMost; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; -import static org.mockito.ArgumentMatchers.anyShort; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import com.rabbitmq.stream.Codec; +import com.rabbitmq.stream.Constants; import com.rabbitmq.stream.Message; import com.rabbitmq.stream.Properties; import io.netty.buffer.ByteBuf; @@ -153,7 +153,7 @@ public TestDesc(String description, List sizes, int expectedCalls) { .thenReturn(Mockito.mock(ChannelFuture.class)); client.publishInternal( - anyShort(), + Constants.VERSION_1, channel, b(1), test.sizes.stream() diff --git a/src/test/java/com/rabbitmq/stream/impl/StreamProducerUnitTest.java b/src/test/java/com/rabbitmq/stream/impl/StreamProducerUnitTest.java index 1f562e1339..7677d1984e 100644 --- a/src/test/java/com/rabbitmq/stream/impl/StreamProducerUnitTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/StreamProducerUnitTest.java @@ -95,10 +95,10 @@ void init() { client.publishInternal( Constants.VERSION_1, channel, - invocation.getArgument(0), invocation.getArgument(1), invocation.getArgument(2), - invocation.getArgument(3))); + invocation.getArgument(3), + invocation.getArgument(4))); when(client.publishInternal( anyShort(), From 7b9eba56c884d013782a29d3ac3b2db23f95b32d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Fri, 26 May 2023 11:58:33 +0200 Subject: [PATCH 04/18] Use "filter."-like subscription properties And deal with null filter values. Conflicts: pom.xml --- pom.xml | 2 +- .../com/rabbitmq/stream/ConsumerBuilder.java | 4 + .../java/com/rabbitmq/stream/impl/Client.java | 68 +++--- .../rabbitmq/stream/impl/StreamConsumer.java | 4 +- .../stream/impl/StreamConsumerBuilder.java | 30 ++- .../stream/impl/StreamEnvironment.java | 4 + .../rabbitmq/stream/impl/StreamProducer.java | 18 +- .../java/com/rabbitmq/stream/impl/Utils.java | 14 ++ .../rabbitmq/stream/perf/StreamPerfTest.java | 2 + .../com/rabbitmq/stream/impl/ClientTest.java | 2 +- .../rabbitmq/stream/impl/FilteringTest.java | 197 ++++++++++++++++++ .../rabbitmq/stream/impl/MicroStreamTest.java | 120 ----------- .../com/rabbitmq/stream/impl/TestUtils.java | 32 ++- 13 files changed, 324 insertions(+), 173 deletions(-) create mode 100644 src/test/java/com/rabbitmq/stream/impl/FilteringTest.java delete mode 100644 src/test/java/com/rabbitmq/stream/impl/MicroStreamTest.java diff --git a/pom.xml b/pom.xml index 492a272e19..d6a446a1b0 100644 --- a/pom.xml +++ b/pom.xml @@ -92,7 +92,7 @@ 1.4 1.36 2.37.0 - 1.15.0 + 1.17.0 0.8.10 3.12 diff --git a/src/main/java/com/rabbitmq/stream/ConsumerBuilder.java b/src/main/java/com/rabbitmq/stream/ConsumerBuilder.java index c80302e8c6..cb83c99071 100644 --- a/src/main/java/com/rabbitmq/stream/ConsumerBuilder.java +++ b/src/main/java/com/rabbitmq/stream/ConsumerBuilder.java @@ -252,6 +252,10 @@ interface FilterConfiguration { FilterConfiguration filter(Predicate filter); + FilterConfiguration matchUnfiltered(); + + FilterConfiguration matchUnfiltered(boolean matchUnfiltered); + ConsumerBuilder builder(); } } diff --git a/src/main/java/com/rabbitmq/stream/impl/Client.java b/src/main/java/com/rabbitmq/stream/impl/Client.java index 4c3d20f698..60165af2aa 100644 --- a/src/main/java/com/rabbitmq/stream/impl/Client.java +++ b/src/main/java/com/rabbitmq/stream/impl/Client.java @@ -13,34 +13,7 @@ // info@rabbitmq.com. package com.rabbitmq.stream.impl; -import static com.rabbitmq.stream.Constants.COMMAND_CLOSE; -import static com.rabbitmq.stream.Constants.COMMAND_CONSUMER_UPDATE; -import static com.rabbitmq.stream.Constants.COMMAND_CREATE_STREAM; -import static com.rabbitmq.stream.Constants.COMMAND_CREDIT; -import static com.rabbitmq.stream.Constants.COMMAND_DECLARE_PUBLISHER; -import static com.rabbitmq.stream.Constants.COMMAND_DELETE_PUBLISHER; -import static com.rabbitmq.stream.Constants.COMMAND_DELETE_STREAM; -import static com.rabbitmq.stream.Constants.COMMAND_EXCHANGE_COMMAND_VERSIONS; -import static com.rabbitmq.stream.Constants.COMMAND_HEARTBEAT; -import static com.rabbitmq.stream.Constants.COMMAND_METADATA; -import static com.rabbitmq.stream.Constants.COMMAND_OPEN; -import static com.rabbitmq.stream.Constants.COMMAND_PARTITIONS; -import static com.rabbitmq.stream.Constants.COMMAND_PEER_PROPERTIES; -import static com.rabbitmq.stream.Constants.COMMAND_PUBLISH; -import static com.rabbitmq.stream.Constants.COMMAND_QUERY_OFFSET; -import static com.rabbitmq.stream.Constants.COMMAND_QUERY_PUBLISHER_SEQUENCE; -import static com.rabbitmq.stream.Constants.COMMAND_ROUTE; -import static com.rabbitmq.stream.Constants.COMMAND_SASL_AUTHENTICATE; -import static com.rabbitmq.stream.Constants.COMMAND_SASL_HANDSHAKE; -import static com.rabbitmq.stream.Constants.COMMAND_STORE_OFFSET; -import static com.rabbitmq.stream.Constants.COMMAND_STREAM_STATS; -import static com.rabbitmq.stream.Constants.COMMAND_SUBSCRIBE; -import static com.rabbitmq.stream.Constants.COMMAND_UNSUBSCRIBE; -import static com.rabbitmq.stream.Constants.RESPONSE_CODE_AUTHENTICATION_FAILURE; -import static com.rabbitmq.stream.Constants.RESPONSE_CODE_AUTHENTICATION_FAILURE_LOOPBACK; -import static com.rabbitmq.stream.Constants.RESPONSE_CODE_OK; -import static com.rabbitmq.stream.Constants.RESPONSE_CODE_SASL_CHALLENGE; -import static com.rabbitmq.stream.Constants.VERSION_1; +import static com.rabbitmq.stream.Constants.*; import static com.rabbitmq.stream.impl.Utils.encodeRequestCode; import static com.rabbitmq.stream.impl.Utils.encodeResponseCode; import static com.rabbitmq.stream.impl.Utils.extractResponseCode; @@ -216,6 +189,7 @@ public long applyAsLong(Object value) { private final Duration rpcTimeout; private volatile ShutdownReason shutdownReason = null; private final Runnable exchangeCommandVersionsCheck; + private final boolean filteringSupported; public Client() { this(new ClientParameters()); @@ -398,15 +372,25 @@ public void initChannel(SocketChannel ch) { tuneState.getHeartbeat()); this.connectionProperties = open(parameters.virtualHost); Set supportedCommands = maybeExchangeCommandVersions(); - if (supportedCommands.stream().anyMatch(i -> i.getKey() == COMMAND_STREAM_STATS)) { - this.exchangeCommandVersionsCheck = () -> {}; - } else { - this.exchangeCommandVersionsCheck = - () -> { - throw new UnsupportedOperationException( - "QueryStreamInfo is available only on RabbitMQ 3.11 or more."); - }; - } + AtomicReference exchangeCommandVersionsCheckReference = new AtomicReference<>(); + AtomicBoolean filteringSupportedReference = new AtomicBoolean(false); + supportedCommands.forEach( + c -> { + if (c.getKey() == COMMAND_STREAM_STATS) { + exchangeCommandVersionsCheckReference.set(() -> {}); + } + if (c.getKey() == COMMAND_PUBLISH && c.getMaxVersion() >= VERSION_2) { + filteringSupportedReference.set(true); + } + }); + this.exchangeCommandVersionsCheck = + exchangeCommandVersionsCheckReference.get() == null + ? () -> { + throw new UnsupportedOperationException( + "QueryStreamInfo is available only on RabbitMQ 3.11 or more."); + } + : exchangeCommandVersionsCheckReference.get(); + this.filteringSupported = filteringSupportedReference.get(); started.set(true); this.metricsCollector.openConnection(); } catch (RuntimeException e) { @@ -1411,6 +1395,10 @@ private String serverAddress() { } } + boolean filteringSupported() { + return this.filteringSupported; + } + public List route(String routingKey, String superStream) { if (routingKey == null || superStream == null) { throw new IllegalArgumentException("routing key and stream must not be null"); @@ -1612,11 +1600,7 @@ private Set maybeExchangeCommandVersions() { Set supported = new HashSet<>(); try { if (Utils.is3_11_OrMore(brokerVersion())) { - for (FrameHandlerInfo info : exchangeCommandVersions()) { - if (info.getKey() == COMMAND_STREAM_STATS) { - supported.add(info); - } - } + supported.addAll(exchangeCommandVersions()); } } catch (Exception e) { LOGGER.info("Error while exchanging command versions: {}", e.getMessage()); diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamConsumer.java b/src/main/java/com/rabbitmq/stream/impl/StreamConsumer.java index f1dab8cdde..f2368d3f94 100644 --- a/src/main/java/com/rabbitmq/stream/impl/StreamConsumer.java +++ b/src/main/java/com/rabbitmq/stream/impl/StreamConsumer.java @@ -76,7 +76,9 @@ class StreamConsumer implements Consumer { ConsumerUpdateListener consumerUpdateListener, int initialCredits, int additionalCredits) { - + if (Utils.filteringEnabled(subscriptionProperties) && !environment.filteringSupported()) { + throw new IllegalArgumentException("Filtering is not supported by the broker"); + } this.id = ID_SEQUENCE.getAndIncrement(); Runnable trackingClosingCallback; try { diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamConsumerBuilder.java b/src/main/java/com/rabbitmq/stream/impl/StreamConsumerBuilder.java index 9aa7ae463e..2f4d382a59 100644 --- a/src/main/java/com/rabbitmq/stream/impl/StreamConsumerBuilder.java +++ b/src/main/java/com/rabbitmq/stream/impl/StreamConsumerBuilder.java @@ -13,6 +13,9 @@ // info@rabbitmq.com. package com.rabbitmq.stream.impl; +import static com.rabbitmq.stream.impl.Utils.SUBSCRIPTION_PROPERTY_FILTER_PREFIX; +import static com.rabbitmq.stream.impl.Utils.SUBSCRIPTION_PROPERTY_MATCH_UNFILTERED; + import com.rabbitmq.stream.*; import java.lang.reflect.Field; import java.lang.reflect.Modifier; @@ -21,6 +24,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Predicate; class StreamConsumerBuilder implements ConsumerBuilder { @@ -87,7 +91,7 @@ public ConsumerBuilder name(String name) { @Override public ConsumerBuilder singleActiveConsumer() { - this.subscriptionProperties.put("single-active-consumer", "true"); + this.subscriptionProperties.put(Utils.SUBSCRIPTION_PROPERTY_SAC, "true"); return this; } @@ -195,8 +199,14 @@ public Consumer build() { handler = this.messageHandler; } else { this.filterConfiguration.validate(); + AtomicInteger i = new AtomicInteger(0); + this.filterConfiguration.filterValues.forEach( + v -> + this.subscriptionProperties.put( + SUBSCRIPTION_PROPERTY_FILTER_PREFIX + i.getAndIncrement(), v)); this.subscriptionProperties.put( - "filters", String.join(",", this.filterConfiguration.filterValues)); + SUBSCRIPTION_PROPERTY_MATCH_UNFILTERED, + this.filterConfiguration.matchUnfiltered ? "true" : "false"); final Predicate filter = this.filterConfiguration.filter; final MessageHandler delegate = this.messageHandler; handler = @@ -226,7 +236,7 @@ public Consumer build() { environment.addConsumer((StreamConsumer) consumer); } else { if (Utils.isSac(this.subscriptionProperties)) { - this.subscriptionProperties.put("super-stream", this.superStream); + this.subscriptionProperties.put(Utils.SUBSCRIPTION_PROPERTY_SUPER_STREAM, this.superStream); } consumer = new SuperStreamConsumer(this, this.superStream, this.environment, trackingConfiguration); @@ -361,6 +371,7 @@ private static final class DefaultFilterConfiguration implements FilterConfigura private final StreamConsumerBuilder builder; private List filterValues; private Predicate filter; + private boolean matchUnfiltered = false; private DefaultFilterConfiguration(StreamConsumerBuilder builder) { this.builder = builder; @@ -368,7 +379,6 @@ private DefaultFilterConfiguration(StreamConsumerBuilder builder) { @Override public FilterConfiguration values(String... filterValues) { - // FIXME: check for ',' in values this.filterValues = Arrays.asList(filterValues); return this; } @@ -379,6 +389,18 @@ public FilterConfiguration filter(Predicate filter) { return this; } + @Override + public FilterConfiguration matchUnfiltered() { + this.matchUnfiltered = true; + return this; + } + + @Override + public FilterConfiguration matchUnfiltered(boolean matchUnfiltered) { + this.matchUnfiltered = matchUnfiltered; + return this; + } + @Override public ConsumerBuilder builder() { return this.builder; diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java b/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java index d40f070b56..c4ef75a190 100644 --- a/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java +++ b/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java @@ -736,6 +736,10 @@ static T locatorOperation( return result; } + boolean filteringSupported() { + return this.locatorOperation(Client::filteringSupported); + } + Clock clock() { return this.clock; } diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamProducer.java b/src/main/java/com/rabbitmq/stream/impl/StreamProducer.java index b3ce2a8b71..84ff8a6296 100644 --- a/src/main/java/com/rabbitmq/stream/impl/StreamProducer.java +++ b/src/main/java/com/rabbitmq/stream/impl/StreamProducer.java @@ -94,6 +94,9 @@ class StreamProducer implements Producer { Duration enqueueTimeout, Function filterValueExtractor, StreamEnvironment environment) { + if (filterValueExtractor != null && !environment.filteringSupported()) { + throw new IllegalArgumentException("Filtering is not supported by the broker"); + } this.id = ID_SEQUENCE.getAndIncrement(); this.environment = environment; this.name = name; @@ -606,8 +609,12 @@ private static final class OutboundMessageFilterValueWriterCallback public int write(ByteBuf bb, Object entity, long publishingId) { AccumulatedEntity accumulatedEntity = (AccumulatedEntity) entity; String filterValue = accumulatedEntity.filterValue(); - bb.writeShort(filterValue.length()); - bb.writeBytes(filterValue.getBytes(StandardCharsets.UTF_8)); + if (filterValue == null) { + bb.writeShort(-1); + } else { + bb.writeShort(filterValue.length()); + bb.writeBytes(filterValue.getBytes(StandardCharsets.UTF_8)); + } Codec.EncodedMessage messageToPublish = (Codec.EncodedMessage) accumulatedEntity.encodedEntity(); bb.writeInt(messageToPublish.getSize()); @@ -619,7 +626,12 @@ public int write(ByteBuf bb, Object entity, long publishingId) { public int fragmentLength(Object entity) { AccumulatedEntity accumulatedEntity = (AccumulatedEntity) entity; Codec.EncodedMessage message = (Codec.EncodedMessage) accumulatedEntity.encodedEntity(); - return 8 + 2 + accumulatedEntity.filterValue().length() + 4 + message.getSize(); + String filterValue = accumulatedEntity.filterValue(); + if (filterValue == null) { + return 8 + 2 + 4 + message.getSize(); + } else { + return 8 + 2 + accumulatedEntity.filterValue().length() + 4 + message.getSize(); + } } } } diff --git a/src/main/java/com/rabbitmq/stream/impl/Utils.java b/src/main/java/com/rabbitmq/stream/impl/Utils.java index c5f258ab37..a21b23e413 100644 --- a/src/main/java/com/rabbitmq/stream/impl/Utils.java +++ b/src/main/java/com/rabbitmq/stream/impl/Utils.java @@ -65,6 +65,11 @@ final class Utils { private static final Logger LOGGER = LoggerFactory.getLogger(Utils.class); private static final Map CONSTANT_LABELS; + static final String SUBSCRIPTION_PROPERTY_SAC = "single-active-consumer"; + static final String SUBSCRIPTION_PROPERTY_SUPER_STREAM = "super-stream"; + static final String SUBSCRIPTION_PROPERTY_FILTER_PREFIX = "filter."; + static final String SUBSCRIPTION_PROPERTY_MATCH_UNFILTERED = "match-unfiltered"; + static { Map labels = new HashMap<>(); Arrays.stream(Constants.class.getDeclaredFields()) @@ -119,6 +124,15 @@ static boolean isSac(Map properties) { } } + static boolean filteringEnabled(Map properties) { + if (properties == null || properties.isEmpty()) { + return false; + } else { + return properties.keySet().stream() + .anyMatch(k -> k.startsWith(SUBSCRIPTION_PROPERTY_FILTER_PREFIX)); + } + } + static short encodeRequestCode(Short code) { return code; } diff --git a/src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java b/src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java index 73136cb0fd..a6f755beac 100644 --- a/src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java +++ b/src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java @@ -122,6 +122,7 @@ public class StreamPerfTest implements Callable { @CommandLine.Mixin private final CommandLine.HelpCommand helpCommand = new CommandLine.HelpCommand(); + // for testing private final AddressResolver addressResolver; private final PrintWriter err, out; @@ -476,6 +477,7 @@ static class InstanceSyncOptions { private List monitorings; private volatile Environment environment; private volatile EventLoopGroup eventLoopGroup; + // constructor for completion script generation public StreamPerfTest() { this(null, null, null, null); diff --git a/src/test/java/com/rabbitmq/stream/impl/ClientTest.java b/src/test/java/com/rabbitmq/stream/impl/ClientTest.java index be10744587..588b5ac290 100644 --- a/src/test/java/com/rabbitmq/stream/impl/ClientTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/ClientTest.java @@ -1093,7 +1093,7 @@ public int fragmentLength(Object obj) { stream, OffsetSpecification.first(), 1, - Collections.singletonMap("filters", newFilterValue)); + Collections.singletonMap("filter.1", newFilterValue)); int expectedCount = filterValueCount.get(newFilterValue).get(); waitAtMost(() -> filteredConsumedMessageCount.get() == expectedCount); diff --git a/src/test/java/com/rabbitmq/stream/impl/FilteringTest.java b/src/test/java/com/rabbitmq/stream/impl/FilteringTest.java new file mode 100644 index 0000000000..cf9107d552 --- /dev/null +++ b/src/test/java/com/rabbitmq/stream/impl/FilteringTest.java @@ -0,0 +1,197 @@ +// Copyright (c) 2023 VMware, Inc. or its affiliates. All rights reserved. +// +// This software, the RabbitMQ Stream Java client library, is dual-licensed under the +// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL"). +// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL, +// please see LICENSE-APACHE2. +// +// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, +// either express or implied. See the LICENSE file for specific language governing +// rights and limitations of this software. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. +package com.rabbitmq.stream.impl; + +import static com.rabbitmq.stream.impl.TestUtils.*; +import static org.assertj.core.api.Assertions.assertThat; + +import com.rabbitmq.stream.*; +import io.netty.channel.EventLoopGroup; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Supplier; +import java.util.stream.IntStream; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; +import org.junit.jupiter.params.provider.NullSource; +import org.junit.jupiter.params.provider.ValueSource; + +@DisabledIfFilteringNotSupported +@ExtendWith(TestUtils.StreamTestInfrastructureExtension.class) +public class FilteringTest { + + static final int messageCount = 10_000; + + EventLoopGroup eventLoopGroup; + + Environment environment; + + String stream; + + @BeforeEach + void init() throws Exception { + EnvironmentBuilder environmentBuilder = + Environment.builder().netty().eventLoopGroup(eventLoopGroup).environmentBuilder(); + environmentBuilder.addressResolver(add -> localhost()); + environment = environmentBuilder.build(); + } + + @AfterEach + void tearDown() throws Exception { + environment.close(); + } + + @ParameterizedTest + @ValueSource(strings = "foo") + @NullSource + void publishConsume(String producerName) throws Exception { + List filterValues = new ArrayList<>(Arrays.asList("apple", "banana", "pear")); + Map filterValueCount = new HashMap<>(); + Random random = new Random(); + + Runnable insert = + () -> + publish( + messageCount, + producerName, + () -> { + String filterValue = filterValues.get(random.nextInt(filterValues.size())); + filterValueCount + .computeIfAbsent(filterValue, k -> new AtomicInteger()) + .incrementAndGet(); + return filterValue; + }); + insert.run(); + + // second wave of messages, with only one, new filter value + String newFilterValue = "orange"; + filterValues.clear(); + filterValues.add(newFilterValue); + insert.run(); + + AtomicInteger receivedMessageCount = new AtomicInteger(0); + AtomicInteger filteredConsumedMessageCount = new AtomicInteger(0); + consumerBuilder() + .filter() + .values(newFilterValue) + .filter( + m -> { + receivedMessageCount.incrementAndGet(); + return newFilterValue.equals(m.getProperties().getGroupId()); + }) + .builder() + .messageHandler((context, message) -> filteredConsumedMessageCount.incrementAndGet()) + .build(); + + int expectedCount = filterValueCount.get(newFilterValue).get(); + waitAtMost(() -> filteredConsumedMessageCount.get() == expectedCount); + assertThat(receivedMessageCount).hasValueLessThan(messageCount * 2); + } + + @ParameterizedTest + @ValueSource(strings = "foo") + @NullSource + void publishWithNullFilterValuesShouldBePossible(String producerName) { + publish(messageCount, producerName, () -> null); + + CountDownLatch consumeLatch = new CountDownLatch(messageCount); + consumerBuilder().messageHandler((ctx, msg) -> consumeLatch.countDown()).build(); + latchAssert(consumeLatch).completes(); + } + + @ParameterizedTest + @CsvSource({"foo,true", "foo,false", ",true", ",false"}) + void matchUnfilteredShouldReturnNullFilteredValueAndFilteredValues( + String producerName, boolean matchUnfiltered) throws Exception { + publish(messageCount, producerName, () -> null); + + List filterValues = new ArrayList<>(Arrays.asList("apple", "banana", "pear")); + Map filterValueCount = new HashMap<>(); + Random random = new Random(); + publish( + messageCount, + producerName, + () -> { + String filterValue = filterValues.get(random.nextInt(filterValues.size())); + filterValueCount.computeIfAbsent(filterValue, k -> new AtomicInteger()).incrementAndGet(); + return filterValue; + }); + + publish(messageCount, producerName, () -> null); + + AtomicInteger receivedMessageCount = new AtomicInteger(0); + Set receivedFilterValues = ConcurrentHashMap.newKeySet(); + consumerBuilder() + .filter() + .values(filterValues.get(0)) + .matchUnfiltered(matchUnfiltered) + .filter(m -> true) + .builder() + .messageHandler( + (ctx, msg) -> { + receivedFilterValues.add( + msg.getProperties().getGroupId() == null + ? "null" + : msg.getProperties().getGroupId()); + receivedMessageCount.incrementAndGet(); + }) + .build(); + + int expected; + if (matchUnfiltered) { + expected = messageCount * 2; + } else { + expected = messageCount; + } + waitAtMost(() -> receivedMessageCount.get() >= expected); + } + + private ProducerBuilder producerBuilder() { + return this.environment.producerBuilder().stream(stream); + } + + private ConsumerBuilder consumerBuilder() { + return this.environment.consumerBuilder().stream(stream).offset(OffsetSpecification.first()); + } + + private static final AtomicLong PUBLISHING_SEQUENCE = new AtomicLong(0); + + private void publish( + int messageCount, String producerName, Supplier filterValueSupplier) { + Producer producer = + producerBuilder().name(producerName).filter(m -> m.getProperties().getGroupId()).build(); + CountDownLatch latch = new CountDownLatch(messageCount); + ConfirmationHandler confirmationHandler = ctx -> latch.countDown(); + IntStream.range(0, messageCount) + .forEach( + ignored -> + producer.send( + producer + .messageBuilder() + .publishingId(PUBLISHING_SEQUENCE.getAndIncrement()) + .properties() + .groupId(filterValueSupplier.get()) + .messageBuilder() + .build(), + confirmationHandler)); + latchAssert(latch).completes(); + producer.close(); + } +} diff --git a/src/test/java/com/rabbitmq/stream/impl/MicroStreamTest.java b/src/test/java/com/rabbitmq/stream/impl/MicroStreamTest.java deleted file mode 100644 index dd745a29e4..0000000000 --- a/src/test/java/com/rabbitmq/stream/impl/MicroStreamTest.java +++ /dev/null @@ -1,120 +0,0 @@ -// Copyright (c) 2023 VMware, Inc. or its affiliates. All rights reserved. -// -// This software, the RabbitMQ Stream Java client library, is dual-licensed under the -// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL"). -// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL, -// please see LICENSE-APACHE2. -// -// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, -// either express or implied. See the LICENSE file for specific language governing -// rights and limitations of this software. -// -// If you have any questions regarding licensing, please contact us at -// info@rabbitmq.com. -package com.rabbitmq.stream.impl; - -import static com.rabbitmq.stream.impl.TestUtils.*; -import static org.assertj.core.api.Assertions.assertThat; - -import com.rabbitmq.stream.*; -import io.netty.channel.EventLoopGroup; -import java.nio.charset.Charset; -import java.nio.charset.StandardCharsets; -import java.util.*; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; -import java.util.stream.IntStream; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; - -@ExtendWith(TestUtils.StreamTestInfrastructureExtension.class) -public class MicroStreamTest { - - static final Charset UTF8 = StandardCharsets.UTF_8; - EventLoopGroup eventLoopGroup; - - Environment environment; - - String stream; - - @BeforeEach - void init() throws Exception { - EnvironmentBuilder environmentBuilder = - Environment.builder().netty().eventLoopGroup(eventLoopGroup).environmentBuilder(); - environmentBuilder.addressResolver(add -> localhost()); - environment = environmentBuilder.build(); - } - - @AfterEach - void tearDown() throws Exception { - environment.close(); - } - - @Test - void publishConsume() throws Exception { - int messageCount = 10_000; - Producer producer = - environment.producerBuilder().stream(stream) - .filter(m -> m.getProperties().getGroupId()) - .build(); - - List filterValues = new ArrayList<>(Arrays.asList("apple", "banana", "pear")); - Map filterValueCount = new HashMap<>(); - Random random = new Random(); - - AtomicReference publishLatch = - new AtomicReference<>(new CountDownLatch(messageCount)); - ConfirmationHandler confirmationHandler = confirmationStatus -> publishLatch.get().countDown(); - Runnable insert = - () -> { - IntStream.range(0, messageCount) - .forEach( - i -> { - String filterValue = filterValues.get(random.nextInt(filterValues.size())); - filterValueCount - .computeIfAbsent(filterValue, k -> new AtomicInteger()) - .incrementAndGet(); - producer.send( - producer - .messageBuilder() - .properties() - .groupId(filterValue) - .messageBuilder() - .build(), - confirmationHandler); - }); - }; - insert.run(); - latchAssert(publishLatch).completes(); - - // second wave of messages, with only one, new filter value - String newFilterValue = "orange"; - filterValues.clear(); - filterValues.add(newFilterValue); - publishLatch.set(new CountDownLatch(messageCount)); - insert.run(); - assertThat(latchAssert(publishLatch)).completes(); - - AtomicInteger receivedMessageCount = new AtomicInteger(0); - AtomicInteger filteredConsumedMessageCount = new AtomicInteger(0); - environment.consumerBuilder().stream(stream) - .offset(OffsetSpecification.first()) - .filter() - .values(newFilterValue) - .filter( - m -> { - receivedMessageCount.incrementAndGet(); - return newFilterValue.equals(m.getProperties().getGroupId()); - }) - .builder() - .messageHandler((context, message) -> filteredConsumedMessageCount.incrementAndGet()) - .build(); - - int expectedCount = filterValueCount.get(newFilterValue).get(); - waitAtMost(() -> filteredConsumedMessageCount.get() == expectedCount); - assertThat(receivedMessageCount).hasValueLessThan(messageCount * 2); - } -} diff --git a/src/test/java/com/rabbitmq/stream/impl/TestUtils.java b/src/test/java/com/rabbitmq/stream/impl/TestUtils.java index 873a6454cb..1e7550a7b5 100644 --- a/src/test/java/com/rabbitmq/stream/impl/TestUtils.java +++ b/src/test/java/com/rabbitmq/stream/impl/TestUtils.java @@ -458,6 +458,12 @@ static boolean atLeastVersion(String expectedVersion, String currentVersion) { } } + @Target({ElementType.TYPE, ElementType.METHOD}) + @Retention(RetentionPolicy.RUNTIME) + @Documented + @ExtendWith(DisabledIfFilteringNotSupportedCondition.class) + @interface DisabledIfFilteringNotSupported {} + @Target({ElementType.TYPE, ElementType.METHOD}) @Retention(RetentionPolicy.RUNTIME) @Documented @@ -601,6 +607,7 @@ public void beforeEach(ExtensionContext context) throws Exception { new Client(new Client.ClientParameters().eventLoopGroup(eventLoopGroup(context))); Client.Response response = client.create(stream); assertThat(response.isOk()).isTrue(); + store(context.getRoot()).put("filteringSupported", client.filteringSupported()); client.close(); store(context).put("testMethodStream", stream); } catch (NoSuchFieldException e) { @@ -671,7 +678,7 @@ private ExecutorServiceCloseableResourceWrapper() { } @Override - public void close() throws Throwable { + public void close() { this.executorService.shutdownNow(); } } @@ -703,6 +710,29 @@ private void close() { } } + static class DisabledIfFilteringNotSupportedCondition implements ExecutionCondition { + + @Override + public ConditionEvaluationResult evaluateExecutionCondition(ExtensionContext context) { + Boolean filteringSupported = + StreamTestInfrastructureExtension.store(context).get("filteringSupported", Boolean.class); + if (filteringSupported == null) { + EventLoopGroup eventLoop = StreamTestInfrastructureExtension.eventLoopGroup(context); + try (Client client = new Client(new ClientParameters().eventLoopGroup(eventLoop))) { + filteringSupported = client.filteringSupported(); + StreamTestInfrastructureExtension.store(context) + .put("filteringSupported", filteringSupported); + } + } + + if (filteringSupported) { + return ConditionEvaluationResult.enabled("filtering is supported"); + } else { + return ConditionEvaluationResult.disabled("filtering is not supported"); + } + } + } + static class DisabledIfRabbitMqCtlNotSetCondition implements ExecutionCondition { @Override From a2609ec38d4acc07afe0552617d762c659b1a563 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Fri, 26 May 2023 11:06:14 +0200 Subject: [PATCH 05/18] Document filtering --- src/docs/asciidoc/advanced-topics.adoc | 70 +++++++++++++++++++ .../com/rabbitmq/stream/ConsumerBuilder.java | 42 +++++++++++ .../com/rabbitmq/stream/ProducerBuilder.java | 6 ++ .../stream/impl/StreamConsumerBuilder.java | 3 + .../rabbitmq/stream/docs/FilteringUsage.java | 69 ++++++++++++++++++ 5 files changed, 190 insertions(+) create mode 100644 src/test/java/com/rabbitmq/stream/docs/FilteringUsage.java diff --git a/src/docs/asciidoc/advanced-topics.adoc b/src/docs/asciidoc/advanced-topics.adoc index 215f4bdf61..fbf006cb5d 100644 --- a/src/docs/asciidoc/advanced-topics.adoc +++ b/src/docs/asciidoc/advanced-topics.adoc @@ -2,6 +2,76 @@ === Advanced Topics +==== Filtering + +RabbitMQ Stream provides a server-side filtering feature that avoids reading all the messages of a stream and filtering only on the client side. +This helps to save network bandwidth when a consuming application needs only a subset of messages, e.g. the messages from a given geographical region. + +The filtering feature works as follows: + +* each message is published with an associated _filter value_ +* a consumer that wants to enable filtering must: +** define one or several filter values +** define some client-side filtering logic + +Why does the consumer need to define some client-side filtering logic? +Because the server-side filtering is probabilistic: messages that do not match the filter value(s) can still be sent to the consumer. +The server uses a https://en.wikipedia.org/wiki/Bloom_filter[bloom filter], _a space-efficient probabilistic data structure_, where false positives are possible. +Despite this, the filtering saves some bandwidth, which is its primary goal. + +===== Filtering on the Publishing Side + +Filtering on the publishing side consists in defining some logic to extract the filter value from a message. +The following snippet shows how to extract the filter value from an application property: + +.Declaring a producer with logic to extract a filter value from each message +[source,java,indent=0] +-------- +include::{test-examples}/FilteringUsage.java[tag=producer-simple] +-------- +<1> Get filter value from `state` application property + +Note the filter value can be null: the message is then published in a regular way. +It is called in this context an _unfiltered_ message. + +===== Filtering on the Consuming Side + +A consumer needs to set up one or several filter values and some filtering logic to enable filtering. +The filtering logic must be consistent with the filter values. +In the next snippet, the consumer wants to process only messages from the state of California. +It sets a filter value to `california` and a predicate that accepts a message only if the `state` application properties is `california`: + +.Declaring a consumer with a filter value and filtering logic +[source,java,indent=0] +-------- +include::{test-examples}/FilteringUsage.java[tag=consumer-simple] +-------- +<1> Set filter value +<2> Set filtering logic + +The filter logic is a `Predicate`. +It must return `true` if a message is accepted, following the same semantics as `java.util.stream.Stream#filter(Predicate)`. + +As stated above, not all messages must have an associated filter value. +Many applications may not need some filtering, so they can publish messages the regular way. +And a stream can contain messages with and without an associated filter value. + +By default, messages without a filter value (a.k.a _unfiltered_ messages) are not sent to a consumer that enabled filtering. + +But what if a consumer wants to process messages with a filter value and messages without any filter value as well? +It must use the `matchUnfiltered()` method in its declaration and also make sure to keep the filtering logic consistent: + +.Declaring a consumer with a filter value and filtering logic +[source,java,indent=0] +-------- +include::{test-examples}/FilteringUsage.java[tag=consumer-match-unfiltered] +-------- +<1> Request messages from California +<2> Request messages without a filter value as well +<3> Let both types of messages pass + +In the example above, the filtering logic has been adapted to let pass `california` messages _and_ messages without a state set as well. + ==== Using Native `epoll` The stream Java client uses the https://netty.io/[Netty] network framework and its Java NIO transport implementation by default. diff --git a/src/main/java/com/rabbitmq/stream/ConsumerBuilder.java b/src/main/java/com/rabbitmq/stream/ConsumerBuilder.java index cb83c99071..1dfd6443bb 100644 --- a/src/main/java/com/rabbitmq/stream/ConsumerBuilder.java +++ b/src/main/java/com/rabbitmq/stream/ConsumerBuilder.java @@ -152,6 +152,11 @@ public interface ConsumerBuilder { */ ConsumerBuilder noTrackingStrategy(); + /** + * Configure the filtering. + * + * @return the filtering configuration + */ FilterConfiguration filter(); /** @@ -246,16 +251,53 @@ interface FlowConfiguration { ConsumerBuilder builder(); } + /** Filter configuration. */ interface FilterConfiguration { + /** + * Set the filter values. + * + * @param filterValues + * @return this filter configuration instance + */ FilterConfiguration values(String... filterValues); + /** + * Client-side filtering logic. + * + *

It must be consistent with the requested filter {@link #values( String...)} and the {@link + * #matchUnfiltered()} flag. + * + * @param filter a predicate that returns true if a message should go to the {@link + * MessageHandler} + * @return this filter configuration instance + */ FilterConfiguration filter(Predicate filter); + /** + * Whether messages without a filter value should be sent as well. + * + *

Default is false. + * + * @return this filter configuration instance + */ FilterConfiguration matchUnfiltered(); + /** + * Whether messages without a filter value should be sent as well. + * + *

Default is false. + * + * @param matchUnfiltered + * @return this filter configuration instance + */ FilterConfiguration matchUnfiltered(boolean matchUnfiltered); + /** + * Go back to the builder. + * + * @return the consumer builder + */ ConsumerBuilder builder(); } } diff --git a/src/main/java/com/rabbitmq/stream/ProducerBuilder.java b/src/main/java/com/rabbitmq/stream/ProducerBuilder.java index 82c92ea41c..417f792825 100644 --- a/src/main/java/com/rabbitmq/stream/ProducerBuilder.java +++ b/src/main/java/com/rabbitmq/stream/ProducerBuilder.java @@ -132,6 +132,12 @@ public interface ProducerBuilder { */ ProducerBuilder enqueueTimeout(Duration timeout); + /** + * Logic to extract a filter value from a message. + * + * @param filterValueExtractor + * @return + */ ProducerBuilder filter(Function filterValueExtractor); /** diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamConsumerBuilder.java b/src/main/java/com/rabbitmq/stream/impl/StreamConsumerBuilder.java index 2f4d382a59..651d515daf 100644 --- a/src/main/java/com/rabbitmq/stream/impl/StreamConsumerBuilder.java +++ b/src/main/java/com/rabbitmq/stream/impl/StreamConsumerBuilder.java @@ -379,6 +379,9 @@ private DefaultFilterConfiguration(StreamConsumerBuilder builder) { @Override public FilterConfiguration values(String... filterValues) { + if (filterValues == null || filterValues.length == 0) { + throw new IllegalArgumentException("At least one filter value must be specified"); + } this.filterValues = Arrays.asList(filterValues); return this; } diff --git a/src/test/java/com/rabbitmq/stream/docs/FilteringUsage.java b/src/test/java/com/rabbitmq/stream/docs/FilteringUsage.java new file mode 100644 index 0000000000..bb22187dc6 --- /dev/null +++ b/src/test/java/com/rabbitmq/stream/docs/FilteringUsage.java @@ -0,0 +1,69 @@ +// Copyright (c) 2023 VMware, Inc. or its affiliates. All rights reserved. +// +// This software, the RabbitMQ Stream Java client library, is dual-licensed under the +// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL"). +// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL, +// please see LICENSE-APACHE2. +// +// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, +// either express or implied. See the LICENSE file for specific language governing +// rights and limitations of this software. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. + +package com.rabbitmq.stream.docs; + +import com.rabbitmq.stream.Consumer; +import com.rabbitmq.stream.Environment; +import com.rabbitmq.stream.Producer; + +public class FilteringUsage { + + void producerSimple() { + Environment environment = Environment.builder().build(); + // tag::producer-simple[] + Producer producer = environment.producerBuilder() + .stream("invoices") + .filter(msg -> + msg.getApplicationProperties().get("state").toString()) // <1> + .build(); + // end::producer-simple[] + } + + void consumerSimple() { + Environment environment = Environment.builder().build(); + // tag::consumer-simple[] + String filterValue = "california"; + Consumer consumer = environment.consumerBuilder() + .stream("invoices") + .filter() + .values(filterValue) // <1> + .filter(msg -> + filterValue.equals(msg.getApplicationProperties().get("state"))) // <2> + .builder() + .messageHandler((ctx, msg) -> { }) + .build(); + // end::consumer-simple[] + } + + void consumerMatchUnfiltered() { + Environment environment = Environment.builder().build(); + // tag::consumer-match-unfiltered[] + String filterValue = "california"; + Consumer consumer = environment.consumerBuilder() + .stream("invoices") + .filter() + .values(filterValue) // <1> + .matchUnfiltered() // <2> + .filter(msg -> + filterValue.equals(msg.getApplicationProperties().get("state")) + || !msg.getApplicationProperties().containsKey("state") // <3> + ) + .builder() + .messageHandler((ctx, msg) -> { }) + .build(); + // end::consumer-match-unfiltered[] + } + +} From 2ba3f07cb3a4d1cea9a1cc2f544b05a507feb5d3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Fri, 26 May 2023 11:37:43 +0200 Subject: [PATCH 06/18] Document filtering considerations --- src/docs/asciidoc/advanced-topics.adoc | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/src/docs/asciidoc/advanced-topics.adoc b/src/docs/asciidoc/advanced-topics.adoc index fbf006cb5d..3642229a9d 100644 --- a/src/docs/asciidoc/advanced-topics.adoc +++ b/src/docs/asciidoc/advanced-topics.adoc @@ -72,6 +72,18 @@ include::{test-examples}/FilteringUsage.java[tag=consumer-match-unfiltered] In the example above, the filtering logic has been adapted to let pass `california` messages _and_ messages without a state set as well. +===== Considerations on Filtering + +As stated previously, the server can send messages that do not match the filter value(s) set by consumers. +This is why application developers must be very careful with the filtering logic they define to avoid processing unwanted messages. + +What are good candidates for filter values? +Unique identifiers are _not_: if you know a given message property will be unique in a stream, do not use it as a filter value. +A defined set of values shared across the messages is a good candidate: geographical locations (e.g. countries, states), document types in a stream that stores document information (e.g. payslip, invoice, order), categories of products (e.g. book, luggage, toy). + +Cardinality of filter values can be from a few to a few thousands. +Extreme cardinality (a couple or dozens of thousands) can make filtering less efficient. + ==== Using Native `epoll` The stream Java client uses the https://netty.io/[Netty] network framework and its Java NIO transport implementation by default. From efeb1b0a1e6a3cbdf6f31edb48066c9cfc9af062 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Fri, 26 May 2023 14:49:43 +0200 Subject: [PATCH 07/18] Refine filtering API As-per discussion with @kjnilsson and @ggreen. --- src/main/java/com/rabbitmq/stream/ConsumerBuilder.java | 4 ++-- src/main/java/com/rabbitmq/stream/ProducerBuilder.java | 4 ++-- .../com/rabbitmq/stream/impl/StreamConsumerBuilder.java | 2 +- .../com/rabbitmq/stream/impl/StreamProducerBuilder.java | 2 +- .../java/com/rabbitmq/stream/docs/FilteringUsage.java | 6 +++--- .../java/com/rabbitmq/stream/impl/FilteringTest.java | 9 ++++++--- 6 files changed, 15 insertions(+), 12 deletions(-) diff --git a/src/main/java/com/rabbitmq/stream/ConsumerBuilder.java b/src/main/java/com/rabbitmq/stream/ConsumerBuilder.java index 1dfd6443bb..f3ca83c0f4 100644 --- a/src/main/java/com/rabbitmq/stream/ConsumerBuilder.java +++ b/src/main/java/com/rabbitmq/stream/ConsumerBuilder.java @@ -263,7 +263,7 @@ interface FilterConfiguration { FilterConfiguration values(String... filterValues); /** - * Client-side filtering logic. + * Client-side filtering logic, occurring after the server-side filtering. * *

It must be consistent with the requested filter {@link #values( String...)} and the {@link * #matchUnfiltered()} flag. @@ -272,7 +272,7 @@ interface FilterConfiguration { * MessageHandler} * @return this filter configuration instance */ - FilterConfiguration filter(Predicate filter); + FilterConfiguration postFilter(Predicate filter); /** * Whether messages without a filter value should be sent as well. diff --git a/src/main/java/com/rabbitmq/stream/ProducerBuilder.java b/src/main/java/com/rabbitmq/stream/ProducerBuilder.java index 417f792825..7983729718 100644 --- a/src/main/java/com/rabbitmq/stream/ProducerBuilder.java +++ b/src/main/java/com/rabbitmq/stream/ProducerBuilder.java @@ -136,9 +136,9 @@ public interface ProducerBuilder { * Logic to extract a filter value from a message. * * @param filterValueExtractor - * @return + * @return this builder instance */ - ProducerBuilder filter(Function filterValueExtractor); + ProducerBuilder filterValue(Function filterValueExtractor); /** * Create the {@link Producer} instance. diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamConsumerBuilder.java b/src/main/java/com/rabbitmq/stream/impl/StreamConsumerBuilder.java index 651d515daf..f64b16f284 100644 --- a/src/main/java/com/rabbitmq/stream/impl/StreamConsumerBuilder.java +++ b/src/main/java/com/rabbitmq/stream/impl/StreamConsumerBuilder.java @@ -387,7 +387,7 @@ public FilterConfiguration values(String... filterValues) { } @Override - public FilterConfiguration filter(Predicate filter) { + public FilterConfiguration postFilter(Predicate filter) { this.filter = filter; return this; } diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamProducerBuilder.java b/src/main/java/com/rabbitmq/stream/impl/StreamProducerBuilder.java index bccaf2e231..431f77ecee 100644 --- a/src/main/java/com/rabbitmq/stream/impl/StreamProducerBuilder.java +++ b/src/main/java/com/rabbitmq/stream/impl/StreamProducerBuilder.java @@ -131,7 +131,7 @@ public ProducerBuilder enqueueTimeout(Duration timeout) { } @Override - public ProducerBuilder filter(Function filterValueExtractor) { + public ProducerBuilder filterValue(Function filterValueExtractor) { this.filterValueExtractor = filterValueExtractor; return this; } diff --git a/src/test/java/com/rabbitmq/stream/docs/FilteringUsage.java b/src/test/java/com/rabbitmq/stream/docs/FilteringUsage.java index bb22187dc6..cd123f77db 100644 --- a/src/test/java/com/rabbitmq/stream/docs/FilteringUsage.java +++ b/src/test/java/com/rabbitmq/stream/docs/FilteringUsage.java @@ -25,7 +25,7 @@ void producerSimple() { // tag::producer-simple[] Producer producer = environment.producerBuilder() .stream("invoices") - .filter(msg -> + .filterValue(msg -> msg.getApplicationProperties().get("state").toString()) // <1> .build(); // end::producer-simple[] @@ -39,7 +39,7 @@ void consumerSimple() { .stream("invoices") .filter() .values(filterValue) // <1> - .filter(msg -> + .postFilter(msg -> filterValue.equals(msg.getApplicationProperties().get("state"))) // <2> .builder() .messageHandler((ctx, msg) -> { }) @@ -56,7 +56,7 @@ void consumerMatchUnfiltered() { .filter() .values(filterValue) // <1> .matchUnfiltered() // <2> - .filter(msg -> + .postFilter(msg -> filterValue.equals(msg.getApplicationProperties().get("state")) || !msg.getApplicationProperties().containsKey("state") // <3> ) diff --git a/src/test/java/com/rabbitmq/stream/impl/FilteringTest.java b/src/test/java/com/rabbitmq/stream/impl/FilteringTest.java index cf9107d552..e0b198d9a2 100644 --- a/src/test/java/com/rabbitmq/stream/impl/FilteringTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/FilteringTest.java @@ -91,7 +91,7 @@ void publishConsume(String producerName) throws Exception { consumerBuilder() .filter() .values(newFilterValue) - .filter( + .postFilter( m -> { receivedMessageCount.incrementAndGet(); return newFilterValue.equals(m.getProperties().getGroupId()); @@ -142,7 +142,7 @@ void matchUnfilteredShouldReturnNullFilteredValueAndFilteredValues( .filter() .values(filterValues.get(0)) .matchUnfiltered(matchUnfiltered) - .filter(m -> true) + .postFilter(m -> true) .builder() .messageHandler( (ctx, msg) -> { @@ -176,7 +176,10 @@ private ConsumerBuilder consumerBuilder() { private void publish( int messageCount, String producerName, Supplier filterValueSupplier) { Producer producer = - producerBuilder().name(producerName).filter(m -> m.getProperties().getGroupId()).build(); + producerBuilder() + .name(producerName) + .filterValue(m -> m.getProperties().getGroupId()) + .build(); CountDownLatch latch = new CountDownLatch(messageCount); ConfirmationHandler confirmationHandler = ctx -> latch.countDown(); IntStream.range(0, messageCount) From f3596c427a4dc98daef833478fe0ad42dfa75b43 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Fri, 26 May 2023 15:20:53 +0200 Subject: [PATCH 08/18] Repeat filtering tests in case of failure The filtering is probabilistic because of the bloom filter on the server so we retry them up to 5 times in case of failure. --- .../rabbitmq/stream/impl/FilteringTest.java | 207 ++++++++++-------- 1 file changed, 121 insertions(+), 86 deletions(-) diff --git a/src/test/java/com/rabbitmq/stream/impl/FilteringTest.java b/src/test/java/com/rabbitmq/stream/impl/FilteringTest.java index e0b198d9a2..8ec4eb2596 100644 --- a/src/test/java/com/rabbitmq/stream/impl/FilteringTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/FilteringTest.java @@ -18,6 +18,7 @@ import com.rabbitmq.stream.*; import io.netty.channel.EventLoopGroup; +import java.time.Duration; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; @@ -37,6 +38,8 @@ @ExtendWith(TestUtils.StreamTestInfrastructureExtension.class) public class FilteringTest { + private static final Duration CONDITION_TIMEOUT = Duration.ofSeconds(5); + static final int messageCount = 10_000; EventLoopGroup eventLoopGroup; @@ -62,105 +65,122 @@ void tearDown() throws Exception { @ValueSource(strings = "foo") @NullSource void publishConsume(String producerName) throws Exception { - List filterValues = new ArrayList<>(Arrays.asList("apple", "banana", "pear")); - Map filterValueCount = new HashMap<>(); - Random random = new Random(); - - Runnable insert = - () -> - publish( - messageCount, - producerName, - () -> { - String filterValue = filterValues.get(random.nextInt(filterValues.size())); - filterValueCount - .computeIfAbsent(filterValue, k -> new AtomicInteger()) - .incrementAndGet(); - return filterValue; - }); - insert.run(); - - // second wave of messages, with only one, new filter value - String newFilterValue = "orange"; - filterValues.clear(); - filterValues.add(newFilterValue); - insert.run(); - - AtomicInteger receivedMessageCount = new AtomicInteger(0); - AtomicInteger filteredConsumedMessageCount = new AtomicInteger(0); - consumerBuilder() - .filter() - .values(newFilterValue) - .postFilter( - m -> { - receivedMessageCount.incrementAndGet(); - return newFilterValue.equals(m.getProperties().getGroupId()); - }) - .builder() - .messageHandler((context, message) -> filteredConsumedMessageCount.incrementAndGet()) - .build(); - - int expectedCount = filterValueCount.get(newFilterValue).get(); - waitAtMost(() -> filteredConsumedMessageCount.get() == expectedCount); - assertThat(receivedMessageCount).hasValueLessThan(messageCount * 2); + repeatIfFailure( + () -> { + List filterValues = new ArrayList<>(Arrays.asList("apple", "banana", "pear")); + Map filterValueCount = new HashMap<>(); + Random random = new Random(); + + Runnable insert = + () -> + publish( + messageCount, + producerName, + () -> { + String filterValue = filterValues.get(random.nextInt(filterValues.size())); + filterValueCount + .computeIfAbsent(filterValue, k -> new AtomicInteger()) + .incrementAndGet(); + return filterValue; + }); + insert.run(); + + // second wave of messages, with only one, new filter value + String newFilterValue = "orange"; + filterValues.clear(); + filterValues.add(newFilterValue); + insert.run(); + + AtomicInteger receivedMessageCount = new AtomicInteger(0); + AtomicInteger filteredConsumedMessageCount = new AtomicInteger(0); + try (Consumer ignored = + consumerBuilder() + .filter() + .values(newFilterValue) + .postFilter( + m -> { + receivedMessageCount.incrementAndGet(); + return newFilterValue.equals(m.getProperties().getGroupId()); + }) + .builder() + .messageHandler( + (context, message) -> filteredConsumedMessageCount.incrementAndGet()) + .build()) { + int expectedCount = filterValueCount.get(newFilterValue).get(); + waitAtMost( + CONDITION_TIMEOUT, () -> filteredConsumedMessageCount.get() == expectedCount); + assertThat(receivedMessageCount).hasValueLessThan(messageCount * 2); + } + }); } @ParameterizedTest @ValueSource(strings = "foo") @NullSource - void publishWithNullFilterValuesShouldBePossible(String producerName) { - publish(messageCount, producerName, () -> null); + void publishWithNullFilterValuesShouldBePossible(String producerName) throws Exception { + repeatIfFailure( + () -> { + publish(messageCount, producerName, () -> null); - CountDownLatch consumeLatch = new CountDownLatch(messageCount); - consumerBuilder().messageHandler((ctx, msg) -> consumeLatch.countDown()).build(); - latchAssert(consumeLatch).completes(); + CountDownLatch consumeLatch = new CountDownLatch(messageCount); + try (Consumer ignored = + consumerBuilder().messageHandler((ctx, msg) -> consumeLatch.countDown()).build()) { + latchAssert(consumeLatch).completes(CONDITION_TIMEOUT); + } + }); } @ParameterizedTest @CsvSource({"foo,true", "foo,false", ",true", ",false"}) void matchUnfilteredShouldReturnNullFilteredValueAndFilteredValues( String producerName, boolean matchUnfiltered) throws Exception { - publish(messageCount, producerName, () -> null); - - List filterValues = new ArrayList<>(Arrays.asList("apple", "banana", "pear")); - Map filterValueCount = new HashMap<>(); - Random random = new Random(); - publish( - messageCount, - producerName, + repeatIfFailure( () -> { - String filterValue = filterValues.get(random.nextInt(filterValues.size())); - filterValueCount.computeIfAbsent(filterValue, k -> new AtomicInteger()).incrementAndGet(); - return filterValue; + publish(messageCount, producerName, () -> null); + + List filterValues = new ArrayList<>(Arrays.asList("apple", "banana", "pear")); + Map filterValueCount = new HashMap<>(); + Random random = new Random(); + publish( + messageCount, + producerName, + () -> { + String filterValue = filterValues.get(random.nextInt(filterValues.size())); + filterValueCount + .computeIfAbsent(filterValue, k -> new AtomicInteger()) + .incrementAndGet(); + return filterValue; + }); + + publish(messageCount, producerName, () -> null); + + AtomicInteger receivedMessageCount = new AtomicInteger(0); + Set receivedFilterValues = ConcurrentHashMap.newKeySet(); + try (Consumer ignored = + consumerBuilder() + .filter() + .values(filterValues.get(0)) + .matchUnfiltered(matchUnfiltered) + .postFilter(m -> true) + .builder() + .messageHandler( + (ctx, msg) -> { + receivedFilterValues.add( + msg.getProperties().getGroupId() == null + ? "null" + : msg.getProperties().getGroupId()); + receivedMessageCount.incrementAndGet(); + }) + .build()) { + int expected; + if (matchUnfiltered) { + expected = messageCount * 2; + } else { + expected = messageCount; + } + waitAtMost(CONDITION_TIMEOUT, () -> receivedMessageCount.get() >= expected); + } }); - - publish(messageCount, producerName, () -> null); - - AtomicInteger receivedMessageCount = new AtomicInteger(0); - Set receivedFilterValues = ConcurrentHashMap.newKeySet(); - consumerBuilder() - .filter() - .values(filterValues.get(0)) - .matchUnfiltered(matchUnfiltered) - .postFilter(m -> true) - .builder() - .messageHandler( - (ctx, msg) -> { - receivedFilterValues.add( - msg.getProperties().getGroupId() == null - ? "null" - : msg.getProperties().getGroupId()); - receivedMessageCount.incrementAndGet(); - }) - .build(); - - int expected; - if (matchUnfiltered) { - expected = messageCount * 2; - } else { - expected = messageCount; - } - waitAtMost(() -> receivedMessageCount.get() >= expected); } private ProducerBuilder producerBuilder() { @@ -194,7 +214,22 @@ private void publish( .messageBuilder() .build(), confirmationHandler)); - latchAssert(latch).completes(); + latchAssert(latch).completes(CONDITION_TIMEOUT); producer.close(); } + + private static void repeatIfFailure(RunnableWithException test) throws Exception { + int executionCount = 0; + Exception lastException = null; + while (executionCount < 5) { + try { + test.run(); + return; + } catch (Exception e) { + executionCount++; + lastException = e; + } + } + throw lastException; + } } From d621a026fcb6ae86c134ebfb7b6dd6591eb51ca8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Fri, 9 Jun 2023 12:03:36 +0200 Subject: [PATCH 09/18] Support filter size on stream creation --- .../java/com/rabbitmq/stream/StreamCreator.java | 11 +++++++++++ src/main/java/com/rabbitmq/stream/impl/Client.java | 8 ++++++++ .../rabbitmq/stream/impl/StreamStreamCreator.java | 6 ++++++ .../com/rabbitmq/stream/impl/FilteringTest.java | 14 ++++++++++++++ 4 files changed, 39 insertions(+) diff --git a/src/main/java/com/rabbitmq/stream/StreamCreator.java b/src/main/java/com/rabbitmq/stream/StreamCreator.java index aa61e4ddd2..22250e40d8 100644 --- a/src/main/java/com/rabbitmq/stream/StreamCreator.java +++ b/src/main/java/com/rabbitmq/stream/StreamCreator.java @@ -14,6 +14,7 @@ package com.rabbitmq.stream; import java.time.Duration; +import java.util.function.Function; /** API to configure and create a stream. */ public interface StreamCreator { @@ -63,6 +64,16 @@ public interface StreamCreator { */ StreamCreator leaderLocator(LeaderLocator leaderLocator); + /** + * Set the size of the stream chunk filters. + * + * @param size + * @return this creator instance + * @see ProducerBuilder#filterValue( Function) + * @see ConsumerBuilder#filter() + */ + StreamCreator filterSize(int size); + /** * Create the stream. * diff --git a/src/main/java/com/rabbitmq/stream/impl/Client.java b/src/main/java/com/rabbitmq/stream/impl/Client.java index 60165af2aa..5e3bcb79af 100644 --- a/src/main/java/com/rabbitmq/stream/impl/Client.java +++ b/src/main/java/com/rabbitmq/stream/impl/Client.java @@ -2582,6 +2582,14 @@ public StreamParametersBuilder leaderLocator(LeaderLocator leaderLocator) { return this; } + public StreamParametersBuilder filterSize(int size) { + if (size < 16 || size > 255) { + throw new IllegalArgumentException("Stream filter size must be between 16 and 255"); + } + this.parameters.put("stream-filter-size-bytes", String.valueOf(size)); + return this; + } + public StreamParametersBuilder put(String key, String value) { parameters.put(key, value); return this; diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamStreamCreator.java b/src/main/java/com/rabbitmq/stream/impl/StreamStreamCreator.java index 0b7f0fd559..ed48565ec1 100644 --- a/src/main/java/com/rabbitmq/stream/impl/StreamStreamCreator.java +++ b/src/main/java/com/rabbitmq/stream/impl/StreamStreamCreator.java @@ -67,6 +67,12 @@ public StreamCreator leaderLocator(LeaderLocator leaderLocator) { return this; } + @Override + public StreamCreator filterSize(int size) { + streamParametersBuilder.filterSize(size); + return this; + } + @Override public void create() { if (stream == null) { diff --git a/src/test/java/com/rabbitmq/stream/impl/FilteringTest.java b/src/test/java/com/rabbitmq/stream/impl/FilteringTest.java index 8ec4eb2596..ad666a4fbe 100644 --- a/src/test/java/com/rabbitmq/stream/impl/FilteringTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/FilteringTest.java @@ -15,6 +15,7 @@ import static com.rabbitmq.stream.impl.TestUtils.*; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import com.rabbitmq.stream.*; import io.netty.channel.EventLoopGroup; @@ -28,6 +29,8 @@ import java.util.stream.IntStream; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.CsvSource; @@ -183,6 +186,17 @@ void matchUnfilteredShouldReturnNullFilteredValueAndFilteredValues( }); } + @Test + void setFilterSizeOnCreation(TestInfo info) { + String s = streamName(info); + this.environment.streamCreator().stream(s).filterSize(128).create(); + this.environment.deleteStream(s); + assertThatThrownBy(() -> this.environment.streamCreator().filterSize(15)) + .isInstanceOf(IllegalArgumentException.class); + assertThatThrownBy(() -> this.environment.streamCreator().filterSize(256)) + .isInstanceOf(IllegalArgumentException.class); + } + private ProducerBuilder producerBuilder() { return this.environment.producerBuilder().stream(stream); } From 80ff851072ca99247377f0b64d4bb368049c893e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Fri, 9 Jun 2023 17:46:58 +0200 Subject: [PATCH 10/18] Add filtering benchmark --- .../stream/benchmark/FilteringBenchmark.java | 192 ++++++++++++++++++ 1 file changed, 192 insertions(+) create mode 100644 src/test/java/com/rabbitmq/stream/benchmark/FilteringBenchmark.java diff --git a/src/test/java/com/rabbitmq/stream/benchmark/FilteringBenchmark.java b/src/test/java/com/rabbitmq/stream/benchmark/FilteringBenchmark.java new file mode 100644 index 0000000000..e5343dc1ac --- /dev/null +++ b/src/test/java/com/rabbitmq/stream/benchmark/FilteringBenchmark.java @@ -0,0 +1,192 @@ +// Copyright (c) 2023 VMware, Inc. or its affiliates. All rights reserved. +// +// This software, the RabbitMQ Stream Java client library, is dual-licensed under the +// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL"). +// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL, +// please see LICENSE-APACHE2. +// +// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, +// either express or implied. See the LICENSE file for specific language governing +// rights and limitations of this software. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. +package com.rabbitmq.stream.benchmark; + +import static com.rabbitmq.stream.impl.TestUtils.waitAtMost; + +import com.codahale.metrics.MetricRegistry; +import com.google.common.util.concurrent.RateLimiter; +import com.rabbitmq.stream.*; +import com.rabbitmq.stream.metrics.DropwizardMetricsCollector; +import java.time.Duration; +import java.util.*; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.IntStream; + +public class FilteringBenchmark { + + static final String stream = "filtering"; + + public static void main(String[] args) throws Exception { + int filterValueCount = 100; + int filterValueSubsetCount = 40; + int rate = 100_000; + int filterSize = 255; + int batchSize = 1; + int maxUnconfirmedMessages = 1; + + Duration publishingDuration = Duration.ofSeconds(10); + Duration publishingCycle = Duration.ofSeconds(1); + + ScheduledExecutorService scheduledExecutorService = + Executors.newSingleThreadScheduledExecutor(); + try (Environment env = Environment.builder().build()) { + try { + env.deleteStream(stream); + } catch (StreamException e) { + // OK + } + env.streamCreator().stream(stream).filterSize(filterSize).create(); + + List filterValues = new ArrayList<>(filterValueCount); + IntStream.range(0, filterValueCount) + .forEach(i -> filterValues.add(UUID.randomUUID().toString())); + + AtomicLong publishedCount = new AtomicLong(0); + AtomicLong confirmedCount = new AtomicLong(0); + + Producer producer = + env.producerBuilder().stream(stream) + .batchSize(batchSize) + .maxUnconfirmedMessages(maxUnconfirmedMessages) + .filterValue(msg -> msg.getProperties().getTo()) + .build(); + + AtomicBoolean keepPublishing = new AtomicBoolean(true); + scheduledExecutorService.schedule( + () -> keepPublishing.set(false), publishingDuration.toMillis(), TimeUnit.MILLISECONDS); + + RateLimiter rateLimiter = RateLimiter.create(rate); + + Random random = new Random(); + ConfirmationHandler confirmationHandler = status -> confirmedCount.getAndIncrement(); + System.out.printf( + "Starting test, filter values %s, subset %s, filter size %d%n", + filterValueCount, filterValueSubsetCount, filterSize); + System.out.printf( + "Starting publishing for %d second(s) at rate %d, batch size %d, max unconfirmed messages %d...%n", + publishingDuration.getSeconds(), rate, batchSize, maxUnconfirmedMessages); + while (keepPublishing.get()) { + AtomicBoolean keepPublishingInCycle = new AtomicBoolean(true); + scheduledExecutorService.schedule( + () -> keepPublishingInCycle.set(false), + publishingCycle.toMillis(), + TimeUnit.MILLISECONDS); + Collections.shuffle(filterValues); + List filterValueSubset = filterValues.subList(0, filterValueSubsetCount); + System.out.printf( + "Starting publishing cycle for %d second(s)...%n", publishingCycle.getSeconds()); + while (keepPublishingInCycle.get()) { + rateLimiter.acquire(1); + String filterValue = filterValueSubset.get(random.nextInt(filterValueSubsetCount)); + producer.send( + producer.messageBuilder().properties().to(filterValue).messageBuilder().build(), + confirmationHandler); + publishedCount.getAndIncrement(); + } + } + System.out.println("Done publishing, waiting for all confirmations..."); + waitAtMost(() -> publishedCount.get() == confirmedCount.get()); + + System.out.println("Starting consuming..."); + + List values = filterValues.subList(0, 10); + for (String filterValue : values) { + Duration timeout = Duration.ofSeconds(30); + long start = System.nanoTime(); + System.out.printf("For filter value %s%n", filterValue); + MetricRegistry registry = new MetricRegistry(); + DropwizardMetricsCollector collector = new DropwizardMetricsCollector(registry); + AtomicLong unfilteredTargetMessageCount = new AtomicLong(0); + Duration unfilteredDuration; + try (Environment e = Environment.builder().metricsCollector(collector).build()) { + AtomicBoolean hasReceivedSomething = new AtomicBoolean(false); + AtomicLong lastReceived = new AtomicLong(0); + long s = System.nanoTime(); + e.consumerBuilder().stream(stream) + .offset(OffsetSpecification.first()) + .messageHandler( + (ctx, msg) -> { + hasReceivedSomething.set(true); + lastReceived.set(System.nanoTime()); + if (filterValue.equals(msg.getProperties().getTo())) { + unfilteredTargetMessageCount.getAndIncrement(); + } + }) + .build(); + waitAtMost( + timeout, + () -> + hasReceivedSomething.get() + && System.nanoTime() - lastReceived.get() > Duration.ofSeconds(1).toNanos()); + unfilteredDuration = Duration.ofNanos(System.nanoTime() - s); + } + + long unfilteredChunkCount = registry.getMeters().get("rabbitmq.stream.chunk").getCount(); + long unfilteredMessageCount = + registry.getMeters().get("rabbitmq.stream.consumed").getCount(); + + registry = new MetricRegistry(); + collector = new DropwizardMetricsCollector(registry); + AtomicLong filteredTargetMessageCount = new AtomicLong(0); + Duration filteredDuration; + try (Environment e = Environment.builder().metricsCollector(collector).build()) { + AtomicBoolean hasReceivedSomething = new AtomicBoolean(false); + AtomicLong lastReceived = new AtomicLong(0); + long s = System.nanoTime(); + e.consumerBuilder().stream(stream) + .offset(OffsetSpecification.first()) + .filter() + .values(filterValue) + .postFilter(msg -> filterValue.equals(msg.getProperties().getTo())) + .builder() + .messageHandler( + (ctx, msg) -> { + hasReceivedSomething.set(true); + lastReceived.set(System.nanoTime()); + filteredTargetMessageCount.getAndIncrement(); + }) + .build(); + waitAtMost( + timeout, + () -> + hasReceivedSomething.get() + && System.nanoTime() - lastReceived.get() > Duration.ofSeconds(1).toNanos()); + filteredDuration = Duration.ofNanos(System.nanoTime() - s); + } + long filteredChunkCount = registry.getMeters().get("rabbitmq.stream.chunk").getCount(); + long filteredMessageCount = registry.getMeters().get("rabbitmq.stream.consumed").getCount(); + System.out.printf( + "consumed in %d / %d ms, target messages %d / %d, chunk count %d / %d (%d %%), messages %d / %d (%d %%)%n", + unfilteredDuration.toMillis(), + filteredDuration.toMillis(), + unfilteredTargetMessageCount.get(), + filteredTargetMessageCount.get(), + unfilteredChunkCount, + filteredChunkCount, + (unfilteredChunkCount - filteredChunkCount) * 100 / unfilteredChunkCount, + unfilteredMessageCount, + filteredMessageCount, + (unfilteredMessageCount - filteredMessageCount) * 100 / unfilteredMessageCount); + } + + } finally { + scheduledExecutorService.shutdownNow(); + } + } +} From aca293f9e6c403738f5c8feadc55085b0ac03726 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Mon, 12 Jun 2023 15:48:44 +0200 Subject: [PATCH 11/18] Count false positive in filtering benchmark --- .../stream/benchmark/FilteringBenchmark.java | 110 ++++++++++++++++-- 1 file changed, 102 insertions(+), 8 deletions(-) diff --git a/src/test/java/com/rabbitmq/stream/benchmark/FilteringBenchmark.java b/src/test/java/com/rabbitmq/stream/benchmark/FilteringBenchmark.java index e5343dc1ac..f12b9c64dc 100644 --- a/src/test/java/com/rabbitmq/stream/benchmark/FilteringBenchmark.java +++ b/src/test/java/com/rabbitmq/stream/benchmark/FilteringBenchmark.java @@ -19,12 +19,14 @@ import com.google.common.util.concurrent.RateLimiter; import com.rabbitmq.stream.*; import com.rabbitmq.stream.metrics.DropwizardMetricsCollector; +import com.rabbitmq.stream.metrics.MetricsCollector; import java.time.Duration; import java.util.*; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.IntStream; @@ -33,12 +35,12 @@ public class FilteringBenchmark { static final String stream = "filtering"; public static void main(String[] args) throws Exception { - int filterValueCount = 100; - int filterValueSubsetCount = 40; + int filterValueCount = 200; + int filterValueSubsetCount = 80; int rate = 100_000; - int filterSize = 255; - int batchSize = 1; - int maxUnconfirmedMessages = 1; + int filterSize = 64; + int batchSize = 100; + int maxUnconfirmedMessages = 10_000; Duration publishingDuration = Duration.ofSeconds(10); Duration publishingCycle = Duration.ofSeconds(1); @@ -108,10 +110,9 @@ public static void main(String[] args) throws Exception { List values = filterValues.subList(0, 10); for (String filterValue : values) { Duration timeout = Duration.ofSeconds(30); - long start = System.nanoTime(); System.out.printf("For filter value %s%n", filterValue); MetricRegistry registry = new MetricRegistry(); - DropwizardMetricsCollector collector = new DropwizardMetricsCollector(registry); + MetricsCollector collector = new DropwizardMetricsCollector(registry); AtomicLong unfilteredTargetMessageCount = new AtomicLong(0); Duration unfilteredDuration; try (Environment e = Environment.builder().metricsCollector(collector).build()) { @@ -141,22 +142,55 @@ public static void main(String[] args) throws Exception { long unfilteredMessageCount = registry.getMeters().get("rabbitmq.stream.consumed").getCount(); + AtomicInteger chunkFilteredMessages = new AtomicInteger(0); + AtomicInteger chunkMessageCount = new AtomicInteger(0); + AtomicInteger chunkWithNoMessagesCount = new AtomicInteger(0); + AtomicBoolean firstChunk = new AtomicBoolean(true); + AtomicLong droppedMessages = new AtomicLong(0); registry = new MetricRegistry(); collector = new DropwizardMetricsCollector(registry); + collector = + new DelegatingMetricsCollector(collector) { + + @Override + public void chunk(int entriesCount) { + if (firstChunk.get()) { + firstChunk.set(false); + } else { + if (chunkMessageCount.get() == chunkFilteredMessages.get()) { + chunkWithNoMessagesCount.incrementAndGet(); + } + chunkFilteredMessages.set(0); + chunkMessageCount.set(entriesCount); + } + super.chunk(entriesCount); + } + }; AtomicLong filteredTargetMessageCount = new AtomicLong(0); Duration filteredDuration; try (Environment e = Environment.builder().metricsCollector(collector).build()) { AtomicBoolean hasReceivedSomething = new AtomicBoolean(false); AtomicLong lastReceived = new AtomicLong(0); long s = System.nanoTime(); + AtomicLong chunkId = new AtomicLong(-1); e.consumerBuilder().stream(stream) .offset(OffsetSpecification.first()) .filter() .values(filterValue) - .postFilter(msg -> filterValue.equals(msg.getProperties().getTo())) + .postFilter( + msg -> { + boolean shouldPass = filterValue.equals(msg.getProperties().getTo()); + if (!shouldPass) { + droppedMessages.getAndIncrement(); + chunkFilteredMessages.getAndIncrement(); + } + return shouldPass; + }) .builder() .messageHandler( (ctx, msg) -> { + if (chunkId.get() == -1 || chunkId.get() != ctx.committedChunkId()) {} + hasReceivedSomething.set(true); lastReceived.set(System.nanoTime()); filteredTargetMessageCount.getAndIncrement(); @@ -183,10 +217,70 @@ public static void main(String[] args) throws Exception { unfilteredMessageCount, filteredMessageCount, (unfilteredMessageCount - filteredMessageCount) * 100 / unfilteredMessageCount); + System.out.printf( + "chunk without matching messages %d / %d, dropped messages %d / %d%n", + chunkWithNoMessagesCount.get(), + filteredChunkCount, + droppedMessages.getAndIncrement(), + filteredMessageCount); } } finally { scheduledExecutorService.shutdownNow(); } } + + private static class DelegatingMetricsCollector implements MetricsCollector { + + private final MetricsCollector delegate; + + private DelegatingMetricsCollector(MetricsCollector delegate) { + this.delegate = delegate; + } + + @Override + public void openConnection() { + this.delegate.openConnection(); + } + + @Override + public void closeConnection() { + this.delegate.closeConnection(); + } + + @Override + public void publish(int count) { + this.delegate.publish(count); + } + + @Override + public void publishConfirm(int count) { + this.delegate.publishConfirm(count); + } + + @Override + public void publishError(int count) { + this.delegate.publishError(count); + } + + @Override + public void chunk(int entriesCount) { + this.delegate.chunk(entriesCount); + } + + @Override + public void consume(long count) { + this.delegate.consume(count); + } + + @Override + public void writtenBytes(int writtenBytes) { + this.delegate.writtenBytes(writtenBytes); + } + + @Override + public void readBytes(int readBytes) { + this.delegate.readBytes(readBytes); + } + } } From 9bfb75ce55b89e8fce6c829933117297ca2e94a0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Mon, 12 Jun 2023 16:37:31 +0200 Subject: [PATCH 12/18] Repeat test in case of assertion error --- .../java/com/rabbitmq/stream/impl/FilteringTest.java | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/src/test/java/com/rabbitmq/stream/impl/FilteringTest.java b/src/test/java/com/rabbitmq/stream/impl/FilteringTest.java index ad666a4fbe..10c63a28e8 100644 --- a/src/test/java/com/rabbitmq/stream/impl/FilteringTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/FilteringTest.java @@ -234,16 +234,20 @@ private void publish( private static void repeatIfFailure(RunnableWithException test) throws Exception { int executionCount = 0; - Exception lastException = null; + Throwable lastException = null; while (executionCount < 5) { try { test.run(); return; - } catch (Exception e) { + } catch (Exception | AssertionError e) { executionCount++; lastException = e; } } - throw lastException; + if (lastException instanceof Error) { + throw new RuntimeException(lastException); + } else { + throw (Exception) lastException; + } } } From 98de9993f992152175a3827a31b3931539b355a7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Tue, 13 Jun 2023 16:14:31 +0200 Subject: [PATCH 13/18] Fix code snippet title in documentation --- src/docs/asciidoc/advanced-topics.adoc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/docs/asciidoc/advanced-topics.adoc b/src/docs/asciidoc/advanced-topics.adoc index 3642229a9d..5c91f51d6c 100644 --- a/src/docs/asciidoc/advanced-topics.adoc +++ b/src/docs/asciidoc/advanced-topics.adoc @@ -54,14 +54,14 @@ It must return `true` if a message is accepted, following the same semantics as As stated above, not all messages must have an associated filter value. Many applications may not need some filtering, so they can publish messages the regular way. -And a stream can contain messages with and without an associated filter value. +So a stream can contain messages with and without an associated filter value. By default, messages without a filter value (a.k.a _unfiltered_ messages) are not sent to a consumer that enabled filtering. But what if a consumer wants to process messages with a filter value and messages without any filter value as well? It must use the `matchUnfiltered()` method in its declaration and also make sure to keep the filtering logic consistent: -.Declaring a consumer with a filter value and filtering logic +.Getting unfiltered messages as well when enabling filtering [source,java,indent=0] -------- include::{test-examples}/FilteringUsage.java[tag=consumer-match-unfiltered] From ae93fc77ba2be014d0233396547a6e446b88e3c3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Tue, 20 Jun 2023 10:19:21 +0200 Subject: [PATCH 14/18] Support filtering in performance tool With --filter-value-set and --filter-values flags. --- .../stream/impl/StreamConsumerBuilder.java | 4 +- .../rabbitmq/stream/perf/StreamPerfTest.java | 80 +++++++++++++++++-- .../java/com/rabbitmq/stream/perf/Utils.java | 47 +++++++++++ .../stream/perf/StreamPerfTestTest.java | 21 +++++ .../com/rabbitmq/stream/perf/UtilsTest.java | 21 +++++ 5 files changed, 167 insertions(+), 6 deletions(-) diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamConsumerBuilder.java b/src/main/java/com/rabbitmq/stream/impl/StreamConsumerBuilder.java index f64b16f284..0cefa2aa0f 100644 --- a/src/main/java/com/rabbitmq/stream/impl/StreamConsumerBuilder.java +++ b/src/main/java/com/rabbitmq/stream/impl/StreamConsumerBuilder.java @@ -146,7 +146,9 @@ StreamConsumerBuilder lazyInit(boolean lazyInit) { @Override public FilterConfiguration filter() { - this.filterConfiguration = new DefaultFilterConfiguration(this); + if (this.filterConfiguration == null) { + this.filterConfiguration = new DefaultFilterConfiguration(this); + } return this.filterConfiguration; } diff --git a/src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java b/src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java index a6f755beac..cd4a1fa82a 100644 --- a/src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java +++ b/src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java @@ -431,6 +431,18 @@ public class StreamPerfTest implements Callable { @ArgGroup(exclusive = false, multiplicity = "0..1") InstanceSyncOptions instanceSyncOptions; + @CommandLine.Option( + names = {"--filter-value-set", "-fvs"}, + description = "filter value set for publishers, range (e.g. 1..15) are accepted", + converter = Utils.FilterValueSetConverter.class) + private List filterValueSet; + + @CommandLine.Option( + names = {"--filter-values", "-fv"}, + description = "filter values for consumers", + split = ",") + private List filterValues; + static class InstanceSyncOptions { @CommandLine.Option( @@ -588,7 +600,6 @@ public Integer call() throws Exception { maybeDisplayVersion(); maybeDisplayEnvironmentVariablesHelp(); overridePropertiesWithEnvironmentVariables(); - Codec codec = createCodec(this.codecClass); ByteBufAllocator byteBufAllocator = ByteBufAllocator.DEFAULT; @@ -875,19 +886,43 @@ public Integer call() throws Exception { producerBuilder.name(producerName).confirmTimeout(Duration.ZERO); } - java.util.function.Consumer messageBuilderConsumer; + java.util.function.Consumer messageBuilderConsumerTemp; if (this.superStreams) { producerBuilder .superStream(stream) .routing(msg -> msg.getProperties().getMessageIdAsString()); AtomicLong messageIdSequence = new AtomicLong(0); - messageBuilderConsumer = + messageBuilderConsumerTemp = mg -> mg.properties().messageId(messageIdSequence.getAndIncrement()); } else { - messageBuilderConsumer = mg -> {}; + messageBuilderConsumerTemp = mg -> {}; producerBuilder.stream(stream); } + if (this.filterValueSet != null && this.filterValueSet.size() > 0) { + producerBuilder = + producerBuilder.filterValue(msg -> msg.getProperties().getTo()); + List values = new ArrayList<>(this.filterValueSet); + AtomicInteger count = new AtomicInteger(); + int subSetSize = Utils.filteringSubSetSize(values.size()); + int messageCountCycle = Utils.filteringPublishingCycle(this.rate); + List subSet = new ArrayList<>(subSetSize); + java.util.function.Consumer filteringMessageBuilderConsumer = + b -> { + if (Integer.remainderUnsigned( + count.getAndIncrement(), messageCountCycle) + == 0) { + Collections.shuffle(values); + subSet.clear(); + subSet.addAll(values.subList(0, subSetSize)); + } + b.properties() + .to(subSet.get(Integer.remainderUnsigned(count.get(), subSetSize))); + }; + messageBuilderConsumerTemp = + messageBuilderConsumerTemp.andThen(filteringMessageBuilderConsumer); + } + Producer producer = producerBuilder .subEntrySize(this.subEntrySize) @@ -897,9 +932,9 @@ public Integer call() throws Exception { .maxUnconfirmedMessages(this.confirms) .build(); - AtomicLong messageCount = new AtomicLong(0); ConfirmationHandler confirmationHandler; if (this.confirmLatency) { + AtomicLong messageCount = new AtomicLong(0); final PerformanceMetrics metrics = this.performanceMetrics; final int divisor = Utils.downSamplingDivisor(this.rate); confirmationHandler = @@ -935,6 +970,8 @@ public Integer call() throws Exception { producers.add(producer); + java.util.function.Consumer messageBuilderConsumer = + messageBuilderConsumerTemp; return (Runnable) () -> { final int msgSize = this.messageSize; @@ -1050,6 +1087,8 @@ public Integer call() throws Exception { } }); + consumerBuilder = maybeConfigureForFiltering(consumerBuilder); + Consumer consumer = consumerBuilder.build(); return consumer; }) @@ -1126,6 +1165,37 @@ public Integer call() throws Exception { return 0; } + private ConsumerBuilder maybeConfigureForFiltering(ConsumerBuilder consumerBuilder) { + if (this.filterValues != null && this.filterValues.size() > 0) { + consumerBuilder = + consumerBuilder.filter().values(this.filterValues.toArray(new String[0])).builder(); + + if (this.filterValues.size() == 1) { + String filterValue = filterValues.get(0); + consumerBuilder = + consumerBuilder + .filter() + .postFilter(msg -> filterValue.equals(msg.getProperties().getTo())) + .builder(); + } else { + consumerBuilder = + consumerBuilder + .filter() + .postFilter( + msg -> { + for (String filterValue : this.filterValues) { + if (filterValue.equals(msg.getProperties().getTo())) { + return true; + } + } + return false; + }) + .builder(); + } + } + return consumerBuilder; + } + private void createStream(Environment environment, String stream) { StreamCreator streamCreator = environment.streamCreator().stream(stream) diff --git a/src/main/java/com/rabbitmq/stream/perf/Utils.java b/src/main/java/com/rabbitmq/stream/perf/Utils.java index f123134cac..1d38ab3cee 100644 --- a/src/main/java/com/rabbitmq/stream/perf/Utils.java +++ b/src/main/java/com/rabbitmq/stream/perf/Utils.java @@ -473,6 +473,33 @@ public BiFunction convert(String input) { } } + static class FilterValueSetConverter implements ITypeConverter> { + + @Override + public List convert(String value) { + if (value == null || value.trim().isEmpty()) { + return Collections.emptyList(); + } + if (value.contains("..")) { + String[] range = value.split("\\.\\."); + String errorMessage = "'" + value + "' is not valid, valid example values: 1..10, 1..20"; + if (range.length != 2) { + throw new CommandLine.TypeConversionException(errorMessage); + } + int start, end; + try { + start = Integer.parseInt(range[0]); + end = Integer.parseInt(range[1]) + 1; + return IntStream.range(start, end).mapToObj(String::valueOf).collect(Collectors.toList()); + } catch (NumberFormatException e) { + throw new CommandLine.TypeConversionException(errorMessage); + } + } else { + return Arrays.stream(value.split(",")).collect(Collectors.toList()); + } + } + } + static class SniServerNamesConverter implements ITypeConverter> { @Override @@ -859,4 +886,24 @@ static InstanceSynchronization defaultInstanceSynchronization( throw new RuntimeException(e); } } + + static int filteringPublishingCycle(int rate) { + if (rate == 0) { + return 100_000; + } else if (rate <= 10) { + return 10; + } else { + return rate / 10; + } + } + + static int filteringSubSetSize(int setSize) { + if (setSize <= 3) { + return 1; + } else if (setSize > 10) { + return (int) (setSize * 0.70); + } else { + return setSize - 3; + } + } } diff --git a/src/test/java/com/rabbitmq/stream/perf/StreamPerfTestTest.java b/src/test/java/com/rabbitmq/stream/perf/StreamPerfTestTest.java index 673557d33e..b1e798caaf 100644 --- a/src/test/java/com/rabbitmq/stream/perf/StreamPerfTestTest.java +++ b/src/test/java/com/rabbitmq/stream/perf/StreamPerfTestTest.java @@ -511,6 +511,17 @@ void nativeEpollWorksOnLinux() throws Exception { assertThat(streamExists(s)).isTrue(); } + @Test + @BrokerVersionAtLeast(BrokerVersion.RABBITMQ_3_13_0) + void shouldNotFailWhenFilteringIsActivated() throws Exception { + Future run = run(builder().filterValueSet("1..15").filterValues("4")); + waitUntilStreamExists(s); + waitOneSecond(); + run.cancel(true); + waitRunEnds(); + assertThat(consoleOutput()).containsIgnoringCase("summary:"); + } + private static Consumer wrap(CallableConsumer action) { return t -> { try { @@ -728,6 +739,16 @@ ArgumentsBuilder nativeEpoll() { return this; } + ArgumentsBuilder filterValueSet(String... values) { + arguments.put("filter-value-set", String.join(",", values)); + return this; + } + + ArgumentsBuilder filterValues(String... values) { + arguments.put("filter-values", String.join(",", values)); + return this; + } + String build() { return this.arguments.entrySet().stream() .map(e -> "--" + e.getKey() + (e.getValue().isEmpty() ? "" : (" " + e.getValue()))) diff --git a/src/test/java/com/rabbitmq/stream/perf/UtilsTest.java b/src/test/java/com/rabbitmq/stream/perf/UtilsTest.java index f3642b7586..a6e09fd0bd 100644 --- a/src/test/java/com/rabbitmq/stream/perf/UtilsTest.java +++ b/src/test/java/com/rabbitmq/stream/perf/UtilsTest.java @@ -368,6 +368,27 @@ void commandLineMetricsTest() { .isEqualTo("-x 1 -y 2"); } + @ParameterizedTest + @CsvSource({"0,100000", "100,10", "1000,100", "50,5", "10,10", "11,1", "5,10"}) + void filteringPublishingCycle(int rate, int expected) { + assertThat(Utils.filteringPublishingCycle(rate)).isEqualTo(expected); + } + + @ParameterizedTest + @CsvSource({"3,1", "2,1", "4,1", "7,4", "10,7", "15,10"}) + void filteringSubSetSize(int setSize, int expected) { + assertThat(Utils.filteringSubSetSize(setSize)).isEqualTo(expected); + } + + @Test + void filterValueSetConverter() throws Exception { + CommandLine.ITypeConverter> converter = new Utils.FilterValueSetConverter(); + assertThat(converter.convert("one")).containsExactly("one"); + assertThat(converter.convert("one,two,three")).containsExactly("one", "two", "three"); + assertThat(converter.convert("1..10")).hasSize(10).contains("1", "2", "10"); + assertThat(converter.convert("5..10")).hasSize(6).contains("5", "6", "10"); + } + @Command(name = "test-command") static class TestCommand { From 90b8f8be7cc6e1fad04929faeed9a0edbc09112f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Thu, 22 Jun 2023 16:53:04 +0200 Subject: [PATCH 15/18] Add AMQP-based filtering test --- .../rabbitmq/stream/impl/FilteringTest.java | 91 +++++++++++++++++++ 1 file changed, 91 insertions(+) diff --git a/src/test/java/com/rabbitmq/stream/impl/FilteringTest.java b/src/test/java/com/rabbitmq/stream/impl/FilteringTest.java index 10c63a28e8..5abf9b2bd6 100644 --- a/src/test/java/com/rabbitmq/stream/impl/FilteringTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/FilteringTest.java @@ -14,13 +14,19 @@ package com.rabbitmq.stream.impl; import static com.rabbitmq.stream.impl.TestUtils.*; +import static java.util.Collections.singletonMap; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import com.rabbitmq.client.*; +import com.rabbitmq.client.AMQP.BasicProperties.Builder; import com.rabbitmq.stream.*; +import com.rabbitmq.stream.Consumer; import io.netty.channel.EventLoopGroup; +import java.io.IOException; import java.time.Duration; import java.util.*; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; @@ -197,6 +203,74 @@ void setFilterSizeOnCreation(TestInfo info) { .isInstanceOf(IllegalArgumentException.class); } + @Test + void publishConsumeAmqp() throws Exception { + int messageCount = 1000; + repeatIfFailure( + () -> { + List filterValues = new ArrayList<>(Arrays.asList("apple", "banana", "pear")); + Map filterValueCount = new HashMap<>(); + Random random = new Random(); + + try (Connection c = new ConnectionFactory().newConnection()) { + Callable insert = + () -> { + publishAmqp( + c, + messageCount, + () -> { + String filterValue = filterValues.get(random.nextInt(filterValues.size())); + filterValueCount + .computeIfAbsent(filterValue, k -> new AtomicInteger()) + .incrementAndGet(); + return filterValue; + }); + return null; + }; + insert.call(); + + // second wave of messages, with only one, new filter value + String newFilterValue = "orange"; + filterValues.clear(); + filterValues.add(newFilterValue); + insert.call(); + + AtomicInteger receivedMessageCount = new AtomicInteger(0); + AtomicInteger filteredConsumedMessageCount = new AtomicInteger(0); + Channel ch = c.createChannel(); + ch.basicQos(10); + Map arguments = new HashMap<>(); + arguments.put("x-stream-filter", newFilterValue); + arguments.put("x-stream-offset", 0); + ch.basicConsume( + stream, + false, + arguments, + new DefaultConsumer(ch) { + @Override + public void handleDelivery( + String consumerTag, + Envelope envelope, + AMQP.BasicProperties properties, + byte[] body) + throws IOException { + receivedMessageCount.incrementAndGet(); + String filterValue = + properties.getHeaders().get("x-stream-filter-value").toString(); + if (newFilterValue.equals(filterValue)) { + filteredConsumedMessageCount.incrementAndGet(); + } + ch.basicAck(envelope.getDeliveryTag(), false); + } + }); + int expectedCount = filterValueCount.get(newFilterValue).get(); + waitAtMost( + CONDITION_TIMEOUT, () -> filteredConsumedMessageCount.get() == expectedCount); + assertThat(receivedMessageCount).hasValueLessThan(messageCount * 2); + } + }); + } + private ProducerBuilder producerBuilder() { return this.environment.producerBuilder().stream(stream); } @@ -232,6 +306,23 @@ private void publish( producer.close(); } + private void publishAmqp(Connection c, int messageCount, Supplier filterValueSupplier) + throws Exception { + try (Channel ch = c.createChannel()) { + ch.confirmSelect(); + for (int i = 0; i < messageCount; i++) { + ch.basicPublish( + "", + stream, + new Builder() + .headers(singletonMap("x-stream-filter-value", filterValueSupplier.get())) + .build(), + null); + } + ch.waitForConfirmsOrDie(); + } + } + private static void repeatIfFailure(RunnableWithException test) throws Exception { int executionCount = 0; Throwable lastException = null; From a22d8ada622e3f5d11943914da45640bc1d12fe5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Mon, 26 Jun 2023 15:35:56 +0200 Subject: [PATCH 16/18] Small documention edit --- src/docs/asciidoc/advanced-topics.adoc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/docs/asciidoc/advanced-topics.adoc b/src/docs/asciidoc/advanced-topics.adoc index 5c91f51d6c..5805f48b6f 100644 --- a/src/docs/asciidoc/advanced-topics.adoc +++ b/src/docs/asciidoc/advanced-topics.adoc @@ -16,7 +16,7 @@ The filtering feature works as follows: Why does the consumer need to define some client-side filtering logic? Because the server-side filtering is probabilistic: messages that do not match the filter value(s) can still be sent to the consumer. -The server uses a https://en.wikipedia.org/wiki/Bloom_filter[bloom filter], _a space-efficient probabilistic data structure_, where false positives are possible. +The server uses a https://en.wikipedia.org/wiki/Bloom_filter[Bloom filter], _a space-efficient probabilistic data structure_, where false positives are possible. Despite this, the filtering saves some bandwidth, which is its primary goal. ===== Filtering on the Publishing Side From 61c4e4c75dc1d06fd0dec1de86ac1fd15bd294ca Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Mon, 10 Jul 2023 15:53:27 +0200 Subject: [PATCH 17/18] Mention filtering requires RabbitMQ 3.13+ --- src/docs/asciidoc/advanced-topics.adoc | 2 ++ src/main/java/com/rabbitmq/stream/ConsumerBuilder.java | 8 +++++++- src/main/java/com/rabbitmq/stream/ProducerBuilder.java | 2 ++ 3 files changed, 11 insertions(+), 1 deletion(-) diff --git a/src/docs/asciidoc/advanced-topics.adoc b/src/docs/asciidoc/advanced-topics.adoc index 5805f48b6f..044c2661ef 100644 --- a/src/docs/asciidoc/advanced-topics.adoc +++ b/src/docs/asciidoc/advanced-topics.adoc @@ -4,6 +4,8 @@ ==== Filtering +WARNING: Filtering requires *RabbitMQ 3.13* or more. + RabbitMQ Stream provides a server-side filtering feature that avoids reading all the messages of a stream and filtering only on the client side. This helps to save network bandwidth when a consuming application needs only a subset of messages, e.g. the messages from a given geographical region. diff --git a/src/main/java/com/rabbitmq/stream/ConsumerBuilder.java b/src/main/java/com/rabbitmq/stream/ConsumerBuilder.java index f3ca83c0f4..a7424e2f75 100644 --- a/src/main/java/com/rabbitmq/stream/ConsumerBuilder.java +++ b/src/main/java/com/rabbitmq/stream/ConsumerBuilder.java @@ -155,6 +155,8 @@ public interface ConsumerBuilder { /** * Configure the filtering. * + *

RabbitMQ 3.13 or more is required. + * * @return the filtering configuration */ FilterConfiguration filter(); @@ -251,7 +253,11 @@ interface FlowConfiguration { ConsumerBuilder builder(); } - /** Filter configuration. */ + /** + * Filter configuration. + * + *

RabbitMQ 3.13 or more is required. + */ interface FilterConfiguration { /** diff --git a/src/main/java/com/rabbitmq/stream/ProducerBuilder.java b/src/main/java/com/rabbitmq/stream/ProducerBuilder.java index 7983729718..cdc942f03d 100644 --- a/src/main/java/com/rabbitmq/stream/ProducerBuilder.java +++ b/src/main/java/com/rabbitmq/stream/ProducerBuilder.java @@ -135,6 +135,8 @@ public interface ProducerBuilder { /** * Logic to extract a filter value from a message. * + *

RabbitMQ 3.13 or more is required. + * * @param filterValueExtractor * @return this builder instance */ From 78b310888e41c6a7794c861c9ab41059bd43f19a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Mon, 10 Jul 2023 16:03:20 +0200 Subject: [PATCH 18/18] Use default broker version to test PR --- .github/workflows/test-pr.yml | 3 --- 1 file changed, 3 deletions(-) diff --git a/.github/workflows/test-pr.yml b/.github/workflows/test-pr.yml index b55305d67f..aaf664e873 100644 --- a/.github/workflows/test-pr.yml +++ b/.github/workflows/test-pr.yml @@ -5,9 +5,6 @@ on: branches: - main -env: - RABBITMQ_IMAGE: 'pivotalrabbitmq/rabbitmq:stream-chunk-filtering-otp-max-bazel' - jobs: build: runs-on: ubuntu-22.04