diff --git a/src/main/java/com/rabbitmq/stream/CallbackStreamDataHandler.java b/src/main/java/com/rabbitmq/stream/CallbackStreamDataHandler.java new file mode 100644 index 0000000000..e548946f82 --- /dev/null +++ b/src/main/java/com/rabbitmq/stream/CallbackStreamDataHandler.java @@ -0,0 +1,56 @@ +package com.rabbitmq.stream; + +/** + * Exposes callbacks to handle events from a particular Stream subscription, + * with specific names for methods and no connection-oriented parameter. + */ +public interface CallbackStreamDataHandler { + + default void handleChunk(long offset, long messageCount, long dataSize) { + // No-op by default + } + + default void handleMessage(long offset, long chunkTimestamp, long committedChunkId, Message message) { + // No-op by default + } + + default void handleCreditNotification(short responseCode) { + // No-op by default + } + + default void handleConsumerUpdate(boolean active) { + // No-op by default + } + + default void handleMetadata(short code) { + // No-op by default + } + + /** + * Callback for handling a stream subscription. + * + * @param offsetSpecification The offset specification for this new subscription + * @param isInitialSubscription Whether this subscription is an initial subscription + * or a recovery for an existing subscription + */ + default void handleSubscribe( + OffsetSpecification offsetSpecification, + boolean isInitialSubscription + ) { + // No-op by default + } + + /** + * Callback for handling a stream unsubscription. + */ + default void handleUnsubscribe() { + if(this instanceof AutoCloseable) { + try { + ((AutoCloseable) this).close(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + } + +} diff --git a/src/main/java/com/rabbitmq/stream/ConsumerBuilder.java b/src/main/java/com/rabbitmq/stream/ConsumerBuilder.java index 7484ddc433..933ad569d9 100644 --- a/src/main/java/com/rabbitmq/stream/ConsumerBuilder.java +++ b/src/main/java/com/rabbitmq/stream/ConsumerBuilder.java @@ -13,6 +13,10 @@ // info@rabbitmq.com. package com.rabbitmq.stream; +import com.rabbitmq.stream.flow.ConsumerFlowControlStrategy; +import com.rabbitmq.stream.flow.ConsumerFlowControlStrategyBuilder; +import com.rabbitmq.stream.flow.ConsumerFlowControlStrategyBuilderFactory; + import java.time.Duration; /** API to configure and create a {@link Consumer}. */ @@ -59,6 +63,55 @@ public interface ConsumerBuilder { */ ConsumerBuilder messageHandler(MessageHandler messageHandler); + /** + * Configure prefetching parameters for synchronous flow control. + * + *

+ * Treat the parameters as an abstract scale at the {@link Consumer} level. + *

+ * + * @param initialPrefetchLevel The initial level of message pre-fetching. + * This may me implemented as the credits to initially + * ask for with each new subscription, + * but do not depend strongly on this aspect. + * @param prefetchLevelAfterDelivery The level of message pre-fetching after the initial batch. + * This may be implemented as the credits to ask for after a chunk + * is delivered on each subscription, + * but do not depend strongly on this aspect. + * + * The recommended value is 1. + * Higher values may cause excessive over-fetching. + * + * @return this {@link ConsumerBuilder} + */ + ConsumerBuilder synchronousControlFlow(int initialPrefetchLevel, int prefetchLevelAfterDelivery); + + /** + * Configure prefetching parameters for asynchronous flow control. + * + *

+ * Treat the parameters as an abstract scale at the {@link Consumer} level. + *

+ * + * @param prefetchLevel The desired level of message pre-fetching. + * This may be implemented as the maximum chunks to have in processing or pending arrival + * per subscription at a given moment, but do not depend strongly on this aspect. + * @return this {@link ConsumerBuilder} + */ + ConsumerBuilder asynchronousControlFlow(int prefetchLevel); + + /** + * Factory for the flow control strategy to be used when consuming messages. + * When there is no need to use a custom strategy, which is the majority of cases, + * prefer using {@link ConsumerBuilder#synchronousControlFlow} or {@link ConsumerBuilder#asynchronousControlFlow} instead. + * + * @param consumerFlowControlStrategyBuilderFactory the factory + * @return a fluent configurable builder for the flow control strategy + * @param The type of the builder for the provided factory + */ + , S extends ConsumerFlowControlStrategy> + T customFlowControlStrategy(ConsumerFlowControlStrategyBuilderFactory consumerFlowControlStrategyBuilderFactory); + /** * The logical name of the {@link Consumer}. * @@ -151,13 +204,6 @@ public interface ConsumerBuilder { */ ConsumerBuilder noTrackingStrategy(); - /** - * Configure flow of messages. - * - * @return the flow configuration - */ - FlowConfiguration flow(); - /** * Create the configured {@link Consumer} * @@ -166,7 +212,7 @@ public interface ConsumerBuilder { Consumer build(); /** Manual tracking strategy. */ - interface ManualTrackingStrategy { + interface ManualTrackingStrategy extends ConsumerBuilderAccessor { /** * Interval to check if the last requested stored offset has been actually stored. @@ -177,17 +223,10 @@ interface ManualTrackingStrategy { * @return the manual tracking strategy */ ManualTrackingStrategy checkInterval(Duration checkInterval); - - /** - * Go back to the builder. - * - * @return the consumer builder - */ - ConsumerBuilder builder(); } /** Auto-tracking strategy. */ - interface AutoTrackingStrategy { + interface AutoTrackingStrategy extends ConsumerBuilderAccessor { /** * Number of messages before storing. @@ -208,28 +247,9 @@ interface AutoTrackingStrategy { * @return the auto-tracking strategy */ AutoTrackingStrategy flushInterval(Duration flushInterval); - - /** - * Go back to the builder. - * - * @return the consumer builder - */ - ConsumerBuilder builder(); } - /** Message flow configuration. */ - interface FlowConfiguration { - - /** - * The number of initial credits for the subscription. - * - *

Default is 1. - * - * @param initialCredits the number of initial credits - * @return this configuration instance - */ - FlowConfiguration initialCredits(int initialCredits); - + interface ConsumerBuilderAccessor { /** * Go back to the builder. * @@ -237,4 +257,5 @@ interface FlowConfiguration { */ ConsumerBuilder builder(); } + } diff --git a/src/main/java/com/rabbitmq/stream/MessageHandler.java b/src/main/java/com/rabbitmq/stream/MessageHandler.java index d9d39fdb63..907ba62d61 100644 --- a/src/main/java/com/rabbitmq/stream/MessageHandler.java +++ b/src/main/java/com/rabbitmq/stream/MessageHandler.java @@ -84,5 +84,13 @@ interface Context { * @see Consumer#store(long) */ Consumer consumer(); + + /** + * Marks this message as handled + * + * @return Whether the message was marked as handled (returning {@code true}) + * or was not found (either because it was already marked as handled, or wasn't tracked) + */ + boolean markHandled(); } } diff --git a/src/main/java/com/rabbitmq/stream/flow/AbstractConsumerFlowControlStrategy.java b/src/main/java/com/rabbitmq/stream/flow/AbstractConsumerFlowControlStrategy.java new file mode 100644 index 0000000000..b810ed6904 --- /dev/null +++ b/src/main/java/com/rabbitmq/stream/flow/AbstractConsumerFlowControlStrategy.java @@ -0,0 +1,47 @@ +package com.rabbitmq.stream.flow; + +import java.util.Objects; + +/** + * Abstract class for Consumer Flow Control Strategies which keeps a + * {@link CreditAsker creditAsker} field ready for retrieval by its inheritors. + */ +public abstract class AbstractConsumerFlowControlStrategy implements ConsumerFlowControlStrategy { + + private final String identifier; + private final CreditAsker creditAsker; + + protected AbstractConsumerFlowControlStrategy(String identifier, CreditAsker creditAsker) { + this.identifier = identifier; + this.creditAsker = Objects.requireNonNull(creditAsker, "creditAsker"); + } + + public CreditAsker getCreditAsker() { + return creditAsker; + } + + public String getIdentifier() { + return identifier; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + AbstractConsumerFlowControlStrategy that = (AbstractConsumerFlowControlStrategy) o; + return Objects.equals(identifier, that.identifier) && Objects.equals(creditAsker, that.creditAsker); + } + + @Override + public int hashCode() { + return Objects.hash(identifier, creditAsker); + } + + @Override + public String toString() { + return "AbstractConsumerFlowControlStrategy{" + + "identifier='" + identifier + '\'' + + ", creditAsker=" + creditAsker + + '}'; + } +} diff --git a/src/main/java/com/rabbitmq/stream/flow/AsyncConsumerFlowControlStrategy.java b/src/main/java/com/rabbitmq/stream/flow/AsyncConsumerFlowControlStrategy.java new file mode 100644 index 0000000000..14a27ae9b5 --- /dev/null +++ b/src/main/java/com/rabbitmq/stream/flow/AsyncConsumerFlowControlStrategy.java @@ -0,0 +1,9 @@ +package com.rabbitmq.stream.flow; + +/** + * Variant of {@link ConsumerFlowControlStrategy} that implements {@link MessageHandlingListener} to asynchronously + * mark messages as handled. + */ +public interface AsyncConsumerFlowControlStrategy extends ConsumerFlowControlStrategy, MessageHandlingListener { + +} diff --git a/src/main/java/com/rabbitmq/stream/flow/ConsumerFlowControlStrategy.java b/src/main/java/com/rabbitmq/stream/flow/ConsumerFlowControlStrategy.java new file mode 100644 index 0000000000..33113294cf --- /dev/null +++ b/src/main/java/com/rabbitmq/stream/flow/ConsumerFlowControlStrategy.java @@ -0,0 +1,42 @@ +package com.rabbitmq.stream.flow; + +import com.rabbitmq.stream.CallbackStreamDataHandler; +import com.rabbitmq.stream.OffsetSpecification; + +/** + * A built and configured flow control strategy for consumers. + * Implementations may freely implement reactions to the various client callbacks. + * When defined by each implementation, it may internally call {@link CreditAsker#credit} to ask for credits. + * One instance of this is expected to be built for each separate subscription. + * A {@link com.rabbitmq.stream.Consumer} may have multiple subscriptions, and thus multiple instances of this. + */ +public interface ConsumerFlowControlStrategy extends CallbackStreamDataHandler { + + /** + * Callback for handling a stream subscription. + * Called right before the subscription is sent to the broker. + *

+ * Either this variant or {@link CallbackStreamDataHandler#handleSubscribe} should be called, NOT both. + *

+ * + * @param offsetSpecification The offset specification for this new subscription + * @param isInitialSubscription Whether this subscription is an initial subscription + * or a recovery for an existing subscription + * @return The initial credits that should be granted to this new subscription + */ + int handleSubscribeReturningInitialCredits( + OffsetSpecification offsetSpecification, + boolean isInitialSubscription + ); + + @Override + default void handleSubscribe( + OffsetSpecification offsetSpecification, + boolean isInitialSubscription) { + handleSubscribeReturningInitialCredits( + offsetSpecification, + isInitialSubscription + ); + } + +} diff --git a/src/main/java/com/rabbitmq/stream/flow/ConsumerFlowControlStrategyBuilder.java b/src/main/java/com/rabbitmq/stream/flow/ConsumerFlowControlStrategyBuilder.java new file mode 100644 index 0000000000..53ead20daf --- /dev/null +++ b/src/main/java/com/rabbitmq/stream/flow/ConsumerFlowControlStrategyBuilder.java @@ -0,0 +1,21 @@ +package com.rabbitmq.stream.flow; + +import com.rabbitmq.stream.ConsumerBuilder; + +/** + * Fluent builder for a {@link ConsumerFlowControlStrategyBuilderFactory}. + * One instance of this is set per {@link com.rabbitmq.stream.Consumer}. + * A {@link com.rabbitmq.stream.Consumer} may have multiple subscriptions, and thus multiple instances built by this. + * + * @param the type of {@link ConsumerFlowControlStrategy} to be built + */ +public interface ConsumerFlowControlStrategyBuilder extends ConsumerBuilder.ConsumerBuilderAccessor { + /** + * Builds the actual FlowControlStrategy instance, for the Client with which it interoperates + * + * @param identifier A {@link String} to uniquely identify the built instance and/or its subscription. + * @param creditAsker {@link CreditAsker} for asking for credits. + * @return {@link T} the built {@link ConsumerFlowControlStrategy} + */ + T build(String identifier, CreditAsker creditAsker); +} diff --git a/src/main/java/com/rabbitmq/stream/flow/ConsumerFlowControlStrategyBuilderFactory.java b/src/main/java/com/rabbitmq/stream/flow/ConsumerFlowControlStrategyBuilderFactory.java new file mode 100644 index 0000000000..28450f54bd --- /dev/null +++ b/src/main/java/com/rabbitmq/stream/flow/ConsumerFlowControlStrategyBuilderFactory.java @@ -0,0 +1,17 @@ +package com.rabbitmq.stream.flow; + +import com.rabbitmq.stream.ConsumerBuilder; + +/** + * A strategy for regulating consumer flow when consuming from a RabbitMQ Stream. + * @param the type of {@link ConsumerFlowControlStrategy} to be built + * @param the type of fluent builder exposed by this factory. Must subclass {@link ConsumerFlowControlStrategyBuilder} + */ +@FunctionalInterface +public interface ConsumerFlowControlStrategyBuilderFactory> { + /** + * Accessor for configuration builder with settings specific to each implementing strategy + * @return {@link C} the specific consumer flow control strategy configuration builder + */ + C builder(ConsumerBuilder consumerBuilder); +} diff --git a/src/main/java/com/rabbitmq/stream/flow/CreditAsker.java b/src/main/java/com/rabbitmq/stream/flow/CreditAsker.java new file mode 100644 index 0000000000..7437053b83 --- /dev/null +++ b/src/main/java/com/rabbitmq/stream/flow/CreditAsker.java @@ -0,0 +1,13 @@ +package com.rabbitmq.stream.flow; + +@FunctionalInterface +public interface CreditAsker { + + /** + * Asks for credits for a given subscription. + * @param credits How many credits to ask for + * @throws IllegalArgumentException If credits are below 0 or above {@link Short#MAX_VALUE} + */ + void credit(int credits); + +} diff --git a/src/main/java/com/rabbitmq/stream/flow/MessageHandlingListener.java b/src/main/java/com/rabbitmq/stream/flow/MessageHandlingListener.java new file mode 100644 index 0000000000..7884e52727 --- /dev/null +++ b/src/main/java/com/rabbitmq/stream/flow/MessageHandlingListener.java @@ -0,0 +1,17 @@ +package com.rabbitmq.stream.flow; + +import com.rabbitmq.stream.MessageHandler; + +@FunctionalInterface +public interface MessageHandlingListener { + + /** + * Marks a message as handled + * + * @param messageContext The {@link MessageHandler.Context} of the handled message + * @return Whether the message was marked as handled (returning {@code true}) + * or was not found (either because it was already marked as handled, or wasn't tracked) + */ + boolean markHandled(MessageHandler.Context messageContext); + +} diff --git a/src/main/java/com/rabbitmq/stream/impl/Client.java b/src/main/java/com/rabbitmq/stream/impl/Client.java index f85fa9a51a..d3a78b1b61 100644 --- a/src/main/java/com/rabbitmq/stream/impl/Client.java +++ b/src/main/java/com/rabbitmq/stream/impl/Client.java @@ -13,43 +13,6 @@ // info@rabbitmq.com. package com.rabbitmq.stream.impl; -import static com.rabbitmq.stream.Constants.COMMAND_CLOSE; -import static com.rabbitmq.stream.Constants.COMMAND_CONSUMER_UPDATE; -import static com.rabbitmq.stream.Constants.COMMAND_CREATE_STREAM; -import static com.rabbitmq.stream.Constants.COMMAND_CREDIT; -import static com.rabbitmq.stream.Constants.COMMAND_DECLARE_PUBLISHER; -import static com.rabbitmq.stream.Constants.COMMAND_DELETE_PUBLISHER; -import static com.rabbitmq.stream.Constants.COMMAND_DELETE_STREAM; -import static com.rabbitmq.stream.Constants.COMMAND_EXCHANGE_COMMAND_VERSIONS; -import static com.rabbitmq.stream.Constants.COMMAND_HEARTBEAT; -import static com.rabbitmq.stream.Constants.COMMAND_METADATA; -import static com.rabbitmq.stream.Constants.COMMAND_OPEN; -import static com.rabbitmq.stream.Constants.COMMAND_PARTITIONS; -import static com.rabbitmq.stream.Constants.COMMAND_PEER_PROPERTIES; -import static com.rabbitmq.stream.Constants.COMMAND_PUBLISH; -import static com.rabbitmq.stream.Constants.COMMAND_QUERY_OFFSET; -import static com.rabbitmq.stream.Constants.COMMAND_QUERY_PUBLISHER_SEQUENCE; -import static com.rabbitmq.stream.Constants.COMMAND_ROUTE; -import static com.rabbitmq.stream.Constants.COMMAND_SASL_AUTHENTICATE; -import static com.rabbitmq.stream.Constants.COMMAND_SASL_HANDSHAKE; -import static com.rabbitmq.stream.Constants.COMMAND_STORE_OFFSET; -import static com.rabbitmq.stream.Constants.COMMAND_STREAM_STATS; -import static com.rabbitmq.stream.Constants.COMMAND_SUBSCRIBE; -import static com.rabbitmq.stream.Constants.COMMAND_UNSUBSCRIBE; -import static com.rabbitmq.stream.Constants.RESPONSE_CODE_AUTHENTICATION_FAILURE; -import static com.rabbitmq.stream.Constants.RESPONSE_CODE_AUTHENTICATION_FAILURE_LOOPBACK; -import static com.rabbitmq.stream.Constants.RESPONSE_CODE_OK; -import static com.rabbitmq.stream.Constants.RESPONSE_CODE_SASL_CHALLENGE; -import static com.rabbitmq.stream.Constants.VERSION_1; -import static com.rabbitmq.stream.impl.Utils.encodeRequestCode; -import static com.rabbitmq.stream.impl.Utils.encodeResponseCode; -import static com.rabbitmq.stream.impl.Utils.extractResponseCode; -import static com.rabbitmq.stream.impl.Utils.formatConstant; -import static com.rabbitmq.stream.impl.Utils.noOpConsumer; -import static java.lang.String.format; -import static java.lang.String.join; -import static java.util.concurrent.TimeUnit.SECONDS; - import com.rabbitmq.stream.AuthenticationFailureException; import com.rabbitmq.stream.ByteCapacity; import com.rabbitmq.stream.ChunkChecksum; @@ -69,7 +32,6 @@ import com.rabbitmq.stream.impl.Client.ShutdownContext.ShutdownReason; import com.rabbitmq.stream.impl.ServerFrameHandler.FrameHandler; import com.rabbitmq.stream.impl.ServerFrameHandler.FrameHandlerInfo; -import com.rabbitmq.stream.impl.Utils.NamedThreadFactory; import com.rabbitmq.stream.metrics.MetricsCollector; import com.rabbitmq.stream.metrics.NoOpMetricsCollector; import com.rabbitmq.stream.sasl.CredentialsProvider; @@ -103,6 +65,12 @@ import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; import io.netty.handler.timeout.IdleStateHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.net.ssl.SSLEngine; +import javax.net.ssl.SSLHandshakeException; +import javax.net.ssl.SSLParameters; import java.io.IOException; import java.io.OutputStream; import java.lang.reflect.Field; @@ -134,11 +102,12 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.function.ToLongFunction; -import javax.net.ssl.SSLEngine; -import javax.net.ssl.SSLHandshakeException; -import javax.net.ssl.SSLParameters; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; + +import static com.rabbitmq.stream.Constants.*; +import static com.rabbitmq.stream.impl.Utils.*; +import static java.lang.String.format; +import static java.lang.String.join; +import static java.util.concurrent.TimeUnit.SECONDS; /** * This is low-level client API to communicate with the broker. diff --git a/src/main/java/com/rabbitmq/stream/impl/ConsumerStatisticRecorder.java b/src/main/java/com/rabbitmq/stream/impl/ConsumerStatisticRecorder.java new file mode 100644 index 0000000000..aac9466367 --- /dev/null +++ b/src/main/java/com/rabbitmq/stream/impl/ConsumerStatisticRecorder.java @@ -0,0 +1,354 @@ +package com.rabbitmq.stream.impl; + +import com.rabbitmq.stream.CallbackStreamDataHandler; +import com.rabbitmq.stream.Message; +import com.rabbitmq.stream.MessageHandler; +import com.rabbitmq.stream.OffsetSpecification; +import com.rabbitmq.stream.flow.MessageHandlingListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import java.util.Collections; +import java.util.Iterator; +import java.util.Map; +import java.util.NavigableMap; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +public class ConsumerStatisticRecorder implements CallbackStreamDataHandler, MessageHandlingListener { + + private static final Logger LOGGER = LoggerFactory.getLogger(ConsumerStatisticRecorder.class); + + private final String identifier; + private final AtomicReference subscriptionStatistics = new AtomicReference<>(); + + public ConsumerStatisticRecorder(String identifier) { + this.identifier = identifier; + } + + @Override + public void handleSubscribe( + OffsetSpecification offsetSpecification, + boolean isInitialSubscription + ) { + SubscriptionStatistics localSubscriptionStatistics = this.subscriptionStatistics.get(); + if(localSubscriptionStatistics == null) { + this.subscriptionStatistics.set(new SubscriptionStatistics(offsetSpecification)); + return; + } + if(isInitialSubscription) { + LOGGER.warn( + "handleSubscribe called for stream that already had same associated subscription! " + + "identifier={} offsetSpecification={}", + this.identifier, + offsetSpecification + ); + } + localSubscriptionStatistics.offsetSpecification = offsetSpecification; + localSubscriptionStatistics.pendingChunks.set(0); + cleanupOldTrackingData(localSubscriptionStatistics); + } + + private void cleanupOldTrackingData(SubscriptionStatistics subscriptionStatistics) { + if (!subscriptionStatistics.offsetSpecification.isOffset()) { + LOGGER.debug("Can't cleanup old tracking data: offsetSpecification is not an offset! " + + "identifier={} offsetSpecification={}", + this.identifier, + subscriptionStatistics.offsetSpecification + ); + return; + } + // Mark messages before the initial offset as handled + long newSubscriptionInitialOffset = subscriptionStatistics.offsetSpecification.getOffset(); + NavigableMap chunksHeadMap = subscriptionStatistics.unprocessedChunksByOffset.headMap(newSubscriptionInitialOffset, false); + Iterator> chunksHeadMapEntryIterator = chunksHeadMap.entrySet().iterator(); + while(chunksHeadMapEntryIterator.hasNext()) { + Map.Entry chunksHeadMapEntry = chunksHeadMapEntryIterator.next(); + ChunkStatistics chunkStatistics = chunksHeadMapEntry.getValue(); + Iterator> chunkMessagesIterator = chunkStatistics.unprocessedMessagesByOffset.entrySet().iterator(); + while(chunkMessagesIterator.hasNext()) { + Map.Entry chunkMessageEntry = chunkMessagesIterator.next(); + long messageOffset = chunkMessageEntry.getKey(); + if(messageOffset < newSubscriptionInitialOffset) { + chunkMessagesIterator.remove(); + chunkStatistics.processedMessages.incrementAndGet(); + } + } + if(chunkStatistics.isDone()) { + chunksHeadMapEntryIterator.remove(); + } + } + } + + @Override + public void handleChunk(long offset, long messageCount, long dataSize) { + SubscriptionStatistics localSubscriptionStatistics = this.subscriptionStatistics.get(); + if(localSubscriptionStatistics == null) { + LOGGER.warn( + "handleChunk called for subscription that does not exist! " + + "identifier={} offset={} messageCount={} dataSize={}", + this.identifier, + offset, + messageCount, + dataSize + ); + return; + } + localSubscriptionStatistics.pendingChunks.decrementAndGet(); + localSubscriptionStatistics.unprocessedChunksByOffset.put(offset, new ChunkStatistics(offset, messageCount, dataSize)); + } + + @Override + public void handleMessage( + long offset, + long chunkTimestamp, + long committedChunkId, + Message message + ) { + SubscriptionStatistics localSubscriptionStatistics = this.subscriptionStatistics.get(); + if(localSubscriptionStatistics == null) { + LOGGER.warn( + "handleMessage called for subscription that does not exist! " + + "identifier={} offset={} chunkTimestamp={} committedChunkId={}", + this.identifier, + offset, + chunkTimestamp, + committedChunkId + ); + return; + } + NavigableMap subHeadMapByOffset = localSubscriptionStatistics.unprocessedChunksByOffset.headMap(offset, true); + Map.Entry lastOffsetToChunkEntry = subHeadMapByOffset.lastEntry(); + if(lastOffsetToChunkEntry == null) { + LOGGER.warn( + "handleMessage called but chunk was not found! " + + "identifier={} offset={} chunkTimestamp={} committedChunkId={}", + this.identifier, + offset, + chunkTimestamp, + committedChunkId + ); + return; + } + ChunkStatistics chunkStatistics = lastOffsetToChunkEntry.getValue(); + chunkStatistics.unprocessedMessagesByOffset.put(offset, message); + } + + @Override + public void handleUnsubscribe() { + ConsumerStatisticRecorder.SubscriptionStatistics localSubscriptionStatistics = this.subscriptionStatistics.getAndSet(null); + if(localSubscriptionStatistics == null) { + LOGGER.warn( + "handleUnsubscribe called for subscriptionId that does not exist! Identifier: {}", identifier + ); + } + } + + /** + * Marks a message as handled, changing internal statistics. + * + * @param messageContext The {@link MessageHandler.Context} of the handled message + * @return Whether the message was marked as handled (returning {@code true}) + * or was not found (either because it was already marked as handled, or wasn't tracked) + */ + @Override + public boolean markHandled(MessageHandler.Context messageContext) { + AggregatedMessageStatistics entry = retrieveStatistics(messageContext); + if (entry == null) { + return false; + } + return markHandled(entry); + } + + /** + * Marks a message as handled, changing internal statistics. + * + * @param aggregatedMessageStatistics The {@link AggregatedMessageStatistics} of the handled message + * @return Whether the message was marked as handled (returning {@code true}) + * or was not found (either because it was already marked as handled, or wasn't tracked) + */ + public boolean markHandled(AggregatedMessageStatistics aggregatedMessageStatistics) { + // Can't remove, not enough information + if (aggregatedMessageStatistics.chunkStatistics == null + || aggregatedMessageStatistics.messageEntry == null + || aggregatedMessageStatistics.chunkHeadMap == null) { + return false; + } + if(aggregatedMessageStatistics.subscriptionStatistics.offsetSpecification.isOffset()) { + long initialOffset = aggregatedMessageStatistics.subscriptionStatistics.offsetSpecification.getOffset(); + // Old tracked message, should already be handled, probably a late acknowledgment of a defunct connection. + if(aggregatedMessageStatistics.offset < initialOffset) { + LOGGER.debug("Old message registered as consumed. Identifier={} Message Offset: {}, Start Offset: {}", + this.identifier, + aggregatedMessageStatistics.offset, + initialOffset); + return true; + } + } + Message removedMessage = aggregatedMessageStatistics.chunkStatistics + .unprocessedMessagesByOffset + .remove(aggregatedMessageStatistics.offset); + if (removedMessage == null) { + return false; + } + // Remove chunk from list of unprocessed chunks if all its messages have been processed + aggregatedMessageStatistics.chunkStatistics.processedMessages.incrementAndGet(); + if (aggregatedMessageStatistics.chunkStatistics.isDone()) { + aggregatedMessageStatistics.chunkHeadMap.remove(aggregatedMessageStatistics.messageEntry.getKey(), aggregatedMessageStatistics.chunkStatistics); + } + return true; + } + + public AggregatedMessageStatistics retrieveStatistics(long offset) { + SubscriptionStatistics localSubscriptionStatistics = this.subscriptionStatistics.get(); + if (localSubscriptionStatistics == null) { + return null; + } + NavigableMap chunkStatisticsHeadMap = localSubscriptionStatistics.unprocessedChunksByOffset.headMap(offset, true); + Map.Entry messageEntry = chunkStatisticsHeadMap.lastEntry(); + ChunkStatistics chunkStatistics = messageEntry == null ? null : messageEntry.getValue(); + return new AggregatedMessageStatistics(offset, localSubscriptionStatistics, chunkStatisticsHeadMap, chunkStatistics, messageEntry); + } + + public AggregatedMessageStatistics retrieveStatistics(MessageHandler.Context messageContext) { + return retrieveStatistics(messageContext.offset()); + } + + public static class AggregatedMessageStatistics { + + private final long offset; + private final SubscriptionStatistics subscriptionStatistics; + private final NavigableMap chunkHeadMap; + private final ChunkStatistics chunkStatistics; + private final Map.Entry messageEntry; + + public AggregatedMessageStatistics( + long offset, + @Nonnull SubscriptionStatistics subscriptionStatistics, + @Nullable NavigableMap chunkHeadMap, + @Nullable ChunkStatistics chunkStatistics, + @Nullable Map.Entry messageEntry) { + this.subscriptionStatistics = subscriptionStatistics; + this.chunkStatistics = chunkStatistics; + this.chunkHeadMap = chunkHeadMap; + this.messageEntry = messageEntry; + this.offset = offset; + } + + @Nonnull + public SubscriptionStatistics getSubscriptionStatistics() { + return subscriptionStatistics; + } + + @Nullable + public ChunkStatistics getChunkStatistics() { + return chunkStatistics; + } + + @Nullable + public NavigableMap getChunkHeadMap() { + return chunkHeadMap; + } + + @Nullable + public Map.Entry getMessageEntry() { + return messageEntry; + } + + public long getOffset() { + return offset; + } + + } + + public static class SubscriptionStatistics { + + private final AtomicInteger pendingChunks = new AtomicInteger(0); + private OffsetSpecification offsetSpecification; + private final NavigableMap unprocessedChunksByOffset; + + public SubscriptionStatistics(OffsetSpecification offsetSpecification) { + this(offsetSpecification, new ConcurrentSkipListMap<>()); + } + + public SubscriptionStatistics(OffsetSpecification offsetSpecification, + NavigableMap unprocessedChunksByOffset) { + this.offsetSpecification = offsetSpecification; + this.unprocessedChunksByOffset = unprocessedChunksByOffset; + } + + public AtomicInteger getPendingChunks() { + return pendingChunks; + } + + public OffsetSpecification getOffsetSpecification() { + return offsetSpecification; + } + + public NavigableMap getUnprocessedChunksByOffset() { + return Collections.unmodifiableNavigableMap(unprocessedChunksByOffset); + } + + } + + public static class ChunkStatistics { + + private final long offset; + private final AtomicLong processedMessages = new AtomicLong(); + private final long messageCount; + private final long dataSize; + private final Map unprocessedMessagesByOffset; + + public ChunkStatistics(long offset, long messageCount, long dataSize) { + this(offset, messageCount, dataSize, new ConcurrentHashMap<>()); + } + + public ChunkStatistics(long offset, long messageCount, long dataSize, Map unprocessedMessagesByOffset) { + this.offset = offset; + this.messageCount = messageCount; + this.dataSize = dataSize; + this.unprocessedMessagesByOffset = unprocessedMessagesByOffset; + } + + public long getOffset() { + return offset; + } + + public long getMessageCount() { + return messageCount; + } + + public long getDataSize() { + return dataSize; + } + + public Map getUnprocessedMessagesByOffset() { + return Collections.unmodifiableMap(unprocessedMessagesByOffset); + } + + public boolean isDone() { + return processedMessages.get() == messageCount && unprocessedMessagesByOffset.isEmpty(); + } + } + + public String getIdentifier() { + return identifier; + } + + public SubscriptionStatistics getSubscriptionStatistics() { + return subscriptionStatistics.get(); + } + + @Override + public String toString() { + return "ConsumerStatisticRecorder{" + + "identifier='" + identifier + '\'' + + ", subscriptionStatistics=" + subscriptionStatistics.get() + + '}'; + } +} diff --git a/src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java b/src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java index 5be323db01..0f7f603dc6 100644 --- a/src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java +++ b/src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java @@ -13,14 +13,6 @@ // info@rabbitmq.com. package com.rabbitmq.stream.impl; -import static com.rabbitmq.stream.impl.Utils.convertCodeToException; -import static com.rabbitmq.stream.impl.Utils.formatConstant; -import static com.rabbitmq.stream.impl.Utils.isSac; -import static com.rabbitmq.stream.impl.Utils.jsonField; -import static com.rabbitmq.stream.impl.Utils.namedFunction; -import static com.rabbitmq.stream.impl.Utils.namedRunnable; -import static com.rabbitmq.stream.impl.Utils.quote; - import com.rabbitmq.stream.BackOffDelayPolicy; import com.rabbitmq.stream.Constants; import com.rabbitmq.stream.Consumer; @@ -32,8 +24,10 @@ import com.rabbitmq.stream.StreamNotAvailableException; import com.rabbitmq.stream.SubscriptionListener; import com.rabbitmq.stream.SubscriptionListener.SubscriptionContext; +import com.rabbitmq.stream.flow.ConsumerFlowControlStrategy; +import com.rabbitmq.stream.flow.ConsumerFlowControlStrategyBuilder; +import com.rabbitmq.stream.flow.MessageHandlingListener; import com.rabbitmq.stream.impl.Client.Broker; -import com.rabbitmq.stream.impl.Client.ChunkListener; import com.rabbitmq.stream.impl.Client.ClientParameters; import com.rabbitmq.stream.impl.Client.ConsumerUpdateListener; import com.rabbitmq.stream.impl.Client.CreditNotification; @@ -41,9 +35,9 @@ import com.rabbitmq.stream.impl.Client.MetadataListener; import com.rabbitmq.stream.impl.Client.QueryOffsetResponse; import com.rabbitmq.stream.impl.Client.ShutdownListener; -import com.rabbitmq.stream.impl.Utils.ClientConnectionType; -import com.rabbitmq.stream.impl.Utils.ClientFactory; -import com.rabbitmq.stream.impl.Utils.ClientFactoryContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -65,8 +59,8 @@ import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.IntStream; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; + +import static com.rabbitmq.stream.impl.Utils.*; class ConsumersCoordinator { @@ -121,9 +115,8 @@ Runnable subscribe( SubscriptionListener subscriptionListener, Runnable trackingClosingCallback, MessageHandler messageHandler, - Map subscriptionProperties, - int initialCredits, - int additionalCredits) { + ConsumerFlowControlStrategyBuilder consumerFlowControlStrategyBuilder, + Map subscriptionProperties) { List candidates = findBrokersForStream(stream); Client.Broker newNode = pickBroker(candidates); if (newNode == null) { @@ -142,10 +135,8 @@ Runnable subscribe( subscriptionListener, trackingClosingCallback, messageHandler, - subscriptionProperties, - initialCredits, - additionalCredits); - + consumerFlowControlStrategyBuilder, + subscriptionProperties); try { addToManager(newNode, subscriptionTracker, offsetSpecification, true); } catch (ConnectionStreamException e) { @@ -169,10 +160,10 @@ Runnable subscribe( } private void addToManager( - Broker node, - SubscriptionTracker tracker, - OffsetSpecification offsetSpecification, - boolean isInitialSubscription) { + Broker node, + SubscriptionTracker tracker, + OffsetSpecification offsetSpecification, + boolean isInitialSubscription) { ClientParameters clientParameters = environment .clientParametersCopy() @@ -202,6 +193,7 @@ private void addToManager( pickedManager = new ClientSubscriptionsManager(node, clientParameters); LOGGER.debug("Created subscription manager on {}, id {}", name, pickedManager.id); } + tracker.clientReference.set(pickedManager.client); try { pickedManager.add(tracker, offsetSpecification, isInitialSubscription); LOGGER.debug( @@ -213,22 +205,22 @@ private void addToManager( tracker.subscriptionIdInClient); this.managers.add(pickedManager); } catch (IllegalStateException e) { + tracker.clientReference.set(null); pickedManager = null; } catch (ConnectionStreamException | ClientClosedException | StreamNotAvailableException e) { + tracker.clientReference.set(null); // manager connection is dead or stream not available // scheduling manager closing if necessary in another thread to avoid blocking this one if (pickedManager.isEmpty()) { - ClientSubscriptionsManager manager = pickedManager; - ConsumersCoordinator.this.environment.execute( - () -> { - manager.closeIfEmpty(); - }, + ConsumersCoordinator.this.environment.execute( + pickedManager::closeIfEmpty, "Consumer manager closing after timeout, consumer %d on stream '%s'", tracker.consumer.id(), tracker.stream); } throw e; } catch (RuntimeException e) { + tracker.clientReference.set(null); if (pickedManager != null) { pickedManager.closeIfEmpty(); } @@ -393,6 +385,8 @@ private static class SubscriptionTracker { private final OffsetSpecification initialOffsetSpecification; private final String offsetTrackingReference; private final MessageHandler messageHandler; + private final ConsumerFlowControlStrategy consumerFlowControlStrategy; + private final AtomicReference clientReference; private final StreamConsumer consumer; private final SubscriptionListener subscriptionListener; private final Runnable trackingClosingCallback; @@ -403,21 +397,18 @@ private static class SubscriptionTracker { private volatile ClientSubscriptionsManager manager; private volatile AtomicReference state = new AtomicReference<>(SubscriptionState.OPENING); - private final int initialCredits; - private final int additionalCredits; private SubscriptionTracker( - long id, - StreamConsumer consumer, - String stream, - OffsetSpecification initialOffsetSpecification, - String offsetTrackingReference, - SubscriptionListener subscriptionListener, - Runnable trackingClosingCallback, - MessageHandler messageHandler, - Map subscriptionProperties, - int initialCredits, - int additionalCredits) { + long id, + StreamConsumer consumer, + String stream, + OffsetSpecification initialOffsetSpecification, + String offsetTrackingReference, + SubscriptionListener subscriptionListener, + Runnable trackingClosingCallback, + MessageHandler messageHandler, + ConsumerFlowControlStrategyBuilder consumerFlowControlStrategyBuilder, + Map subscriptionProperties) { this.id = id; this.consumer = consumer; this.stream = stream; @@ -426,8 +417,20 @@ private SubscriptionTracker( this.subscriptionListener = subscriptionListener; this.trackingClosingCallback = trackingClosingCallback; this.messageHandler = messageHandler; - this.initialCredits = initialCredits; - this.additionalCredits = additionalCredits; + this.clientReference = new AtomicReference<>(); + String identifier = String.format("[stream=%s, subscriptionId=%s]", stream, String.valueOf(subscriptionIdInClient)); + this.consumerFlowControlStrategy = consumerFlowControlStrategyBuilder.build( + identifier, + credits -> { + Client retrievedClient = this.clientReference.get(); + if(retrievedClient == null) { + LOGGER.error("Client is not initialized, cannot ask for credits! " + + "Asked for {} credits. Identifier: {}", credits, identifier); + return; + } + retrievedClient.credit(this.subscriptionIdInClient, credits); + } + ); if (this.offsetTrackingReference == null) { this.subscriptionProperties = subscriptionProperties; } else { @@ -497,13 +500,15 @@ private static final class MessageHandlerContext implements Context { private final long timestamp; private final long committedOffset; private final StreamConsumer consumer; + private final MessageHandlingListener handledCallback; private MessageHandlerContext( - long offset, long timestamp, long committedOffset, StreamConsumer consumer) { + long offset, long timestamp, long committedOffset, StreamConsumer consumer, MessageHandlingListener handledCallback) { this.offset = offset; this.timestamp = timestamp; this.committedOffset = committedOffset; this.consumer = consumer; + this.handledCallback = handledCallback; } @Override @@ -534,6 +539,12 @@ public String stream() { public Consumer consumer() { return this.consumer; } + + @Override + public boolean markHandled() { + return this.handledCallback.markHandled(this); + } + } /** @@ -554,30 +565,18 @@ private class ClientSubscriptionsManager implements Comparable subscriptionTrackers = new ArrayList<>(maxConsumersByConnection); - private volatile int trackerCount = 0; + private volatile int trackerCount; private final AtomicBoolean closed = new AtomicBoolean(false); - private ClientSubscriptionsManager(Broker node, Client.ClientParameters clientParameters) { + private ClientSubscriptionsManager( + Broker node, + Client.ClientParameters clientParameters + ) { this.id = managerIdSequence.getAndIncrement(); this.node = node; this.name = keyForClientSubscription(node); LOGGER.debug("creating subscription manager on {}", name); IntStream.range(0, maxConsumersByConnection).forEach(i -> subscriptionTrackers.add(null)); - this.trackerCount = 0; - AtomicBoolean clientInitializedInManager = new AtomicBoolean(false); - ChunkListener chunkListener = - (client, subscriptionId, offset, messageCount, dataSize) -> { - SubscriptionTracker subscriptionTracker = - subscriptionTrackers.get(subscriptionId & 0xFF); - if (subscriptionTracker != null && subscriptionTracker.consumer.isOpen()) { - client.credit(subscriptionId, subscriptionTracker.additionalCredits); - } else { - LOGGER.debug( - "Could not find stream subscription {} or subscription closing, not providing credits", - subscriptionId & 0xFF); - } - }; - CreditNotification creditNotification = (subscriptionId, responseCode) -> { SubscriptionTracker subscriptionTracker = @@ -597,9 +596,22 @@ private ClientSubscriptionsManager(Broker node, Client.ClientParameters clientPa if (subscriptionTracker != null) { subscriptionTracker.offset = offset; subscriptionTracker.hasReceivedSomething = true; + subscriptionTracker.consumerFlowControlStrategy.handleMessage( + offset, + chunkTimestamp, + committedOffset, + message + ); subscriptionTracker.messageHandler.handle( new MessageHandlerContext( - offset, chunkTimestamp, committedOffset, subscriptionTracker.consumer), + offset, + chunkTimestamp, + committedOffset, + subscriptionTracker.consumer, + subscriptionTracker.consumerFlowControlStrategy instanceof MessageHandlingListener + ? (MessageHandlingListener) subscriptionTracker.consumerFlowControlStrategy + : c -> true + ), message); // FIXME set offset here as well, best effort to avoid duplicates? } else { @@ -733,6 +745,14 @@ private ClientSubscriptionsManager(Broker node, Client.ClientParameters clientPa } return result; }; + Client.ChunkListener chunkListener = (client, subscriptionId, offset, messageCount, dataSize) -> { + SubscriptionTracker subscriptionTracker = subscriptionTrackers.get(subscriptionId & 0xFF); + if(subscriptionTracker == null) { + LOGGER.warn("Could not find stream subscription {} for chunk listener", subscriptionId); + return; + } + subscriptionTracker.consumerFlowControlStrategy.handleChunk(offset, messageCount, dataSize); + }; String connectionName = connectionNamingStrategy.apply(ClientConnectionType.CONSUMER); ClientFactoryContext clientFactoryContext = ClientFactoryContext.fromParameters( @@ -747,7 +767,6 @@ private ClientSubscriptionsManager(Broker node, Client.ClientParameters clientPa .key(name); this.client = clientFactory.client(clientFactoryContext); LOGGER.debug("Created consumer connection '{}'", connectionName); - clientInitializedInManager.set(true); } private void assignConsumersToStream( @@ -961,15 +980,19 @@ synchronized void add( subscriptionContext.offsetSpecification()); checkNotClosed(); - byte subId = subscriptionId; + int initialCredits = subscriptionTracker.consumerFlowControlStrategy.handleSubscribeReturningInitialCredits( + subscriptionContext.offsetSpecification(), + isInitialSubscription + ); + final byte finalSubscriptionId = subscriptionId; Client.Response subscribeResponse = Utils.callAndMaybeRetry( () -> client.subscribe( - subId, + finalSubscriptionId, subscriptionTracker.stream, subscriptionContext.offsetSpecification(), - subscriptionTracker.initialCredits, + initialCredits, subscriptionTracker.subscriptionProperties), RETRY_ON_TIMEOUT, "Subscribe request for consumer %s on stream '%s'", @@ -1033,7 +1056,6 @@ synchronized void remove(SubscriptionTracker subscriptionTracker) { subscriptionTracker.consumer.id(), subscriptionTracker.stream); } - this.setSubscriptionTrackers(update(this.subscriptionTrackers, subscriptionIdInClient, null)); streamToStreamSubscriptions.compute( subscriptionTracker.stream, diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamConsumer.java b/src/main/java/com/rabbitmq/stream/impl/StreamConsumer.java index 2e63b11e00..4472f42742 100644 --- a/src/main/java/com/rabbitmq/stream/impl/StreamConsumer.java +++ b/src/main/java/com/rabbitmq/stream/impl/StreamConsumer.java @@ -13,11 +13,6 @@ // info@rabbitmq.com. package com.rabbitmq.stream.impl; -import static com.rabbitmq.stream.BackOffDelayPolicy.fixedWithInitialDelay; -import static com.rabbitmq.stream.impl.AsyncRetry.asyncRetry; -import static com.rabbitmq.stream.impl.Utils.offsetBefore; -import static java.time.Duration.ofMillis; - import com.rabbitmq.stream.Constants; import com.rabbitmq.stream.Consumer; import com.rabbitmq.stream.ConsumerUpdateListener; @@ -27,10 +22,14 @@ import com.rabbitmq.stream.OffsetSpecification; import com.rabbitmq.stream.StreamException; import com.rabbitmq.stream.SubscriptionListener; +import com.rabbitmq.stream.flow.ConsumerFlowControlStrategyBuilder; import com.rabbitmq.stream.impl.Client.QueryOffsetResponse; import com.rabbitmq.stream.impl.StreamConsumerBuilder.TrackingConfiguration; import com.rabbitmq.stream.impl.StreamEnvironment.TrackingConsumerRegistration; import com.rabbitmq.stream.impl.Utils.CompositeConsumerUpdateListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.util.Collections; import java.util.Map; import java.util.Objects; @@ -44,8 +43,11 @@ import java.util.function.LongConsumer; import java.util.function.LongSupplier; import java.util.function.Supplier; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; + +import static com.rabbitmq.stream.BackOffDelayPolicy.fixedWithInitialDelay; +import static com.rabbitmq.stream.impl.AsyncRetry.asyncRetry; +import static com.rabbitmq.stream.impl.Utils.offsetBefore; +import static java.time.Duration.ofMillis; class StreamConsumer implements Consumer { @@ -74,15 +76,14 @@ class StreamConsumer implements Consumer { String stream, OffsetSpecification offsetSpecification, MessageHandler messageHandler, + ConsumerFlowControlStrategyBuilder consumerFlowControlStrategyBuilder, String name, StreamEnvironment environment, TrackingConfiguration trackingConfiguration, boolean lazyInit, SubscriptionListener subscriptionListener, Map subscriptionProperties, - ConsumerUpdateListener consumerUpdateListener, - int initialCredits, - int additionalCredits) { + ConsumerUpdateListener consumerUpdateListener) { this.id = ID_SEQUENCE.getAndIncrement(); Runnable trackingClosingCallback; @@ -255,9 +256,9 @@ class StreamConsumer implements Consumer { subscriptionListener, trackingClosingCallback, closedAwareMessageHandler, - Collections.unmodifiableMap(subscriptionProperties), - initialCredits, - additionalCredits); + consumerFlowControlStrategyBuilder, + Collections.unmodifiableMap(subscriptionProperties) + ); this.status = Status.RUNNING; }; diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamConsumerBuilder.java b/src/main/java/com/rabbitmq/stream/impl/StreamConsumerBuilder.java index 7d829e1f7f..26a44e3dd8 100644 --- a/src/main/java/com/rabbitmq/stream/impl/StreamConsumerBuilder.java +++ b/src/main/java/com/rabbitmq/stream/impl/StreamConsumerBuilder.java @@ -20,6 +20,12 @@ import com.rabbitmq.stream.OffsetSpecification; import com.rabbitmq.stream.StreamException; import com.rabbitmq.stream.SubscriptionListener; +import com.rabbitmq.stream.flow.ConsumerFlowControlStrategy; +import com.rabbitmq.stream.flow.ConsumerFlowControlStrategyBuilder; +import com.rabbitmq.stream.flow.ConsumerFlowControlStrategyBuilderFactory; +import com.rabbitmq.stream.impl.flow.MaximumChunksPerSubscriptionAsyncConsumerFlowControlStrategy; +import com.rabbitmq.stream.impl.flow.SynchronousConsumerFlowControlStrategy; + import java.lang.reflect.Field; import java.lang.reflect.Modifier; import java.time.Duration; @@ -42,7 +48,7 @@ class StreamConsumerBuilder implements ConsumerBuilder { private SubscriptionListener subscriptionListener = subscriptionContext -> {}; private final Map subscriptionProperties = new ConcurrentHashMap<>(); private ConsumerUpdateListener consumerUpdateListener; - private final DefaultFlowConfiguration flowConfiguration = new DefaultFlowConfiguration(this); + private ConsumerFlowControlStrategyBuilder consumerFlowControlStrategyBuilder = SynchronousConsumerFlowControlStrategy.builder(this); public StreamConsumerBuilder(StreamEnvironment environment) { this.environment = environment; @@ -72,6 +78,15 @@ public ConsumerBuilder messageHandler(MessageHandler messageHandler) { return this; } + @Override + public + , S extends ConsumerFlowControlStrategy> + T customFlowControlStrategy(ConsumerFlowControlStrategyBuilderFactory consumerFlowControlStrategyBuilderFactory) { + T localConsumerFlowControlStrategyBuilder = consumerFlowControlStrategyBuilderFactory.builder(this); + this.consumerFlowControlStrategyBuilder = localConsumerFlowControlStrategyBuilder; + return localConsumerFlowControlStrategyBuilder; + } + MessageHandler messageHandler() { return this.messageHandler; } @@ -131,9 +146,35 @@ public ConsumerBuilder noTrackingStrategy() { return this; } + /** + * Configure credit parameters for flow control. + * Implies usage of a traditional {@link SynchronousConsumerFlowControlStrategy}. + * + * @param initialPrefetchLevel Credits to ask for with each new subscription + * @param prefetchLevelAfterDelivery Credits to ask for after a chunk is delivered + * @return this {@link StreamConsumerBuilder} + */ @Override - public FlowConfiguration flow() { - return this.flowConfiguration; + public StreamConsumerBuilder synchronousControlFlow(int initialPrefetchLevel, int prefetchLevelAfterDelivery) { + if (initialPrefetchLevel <= 0 || prefetchLevelAfterDelivery <= 0) { + throw new IllegalArgumentException("Credits must be positive"); + } + this.consumerFlowControlStrategyBuilder = SynchronousConsumerFlowControlStrategy + .builder(this) + .initialCredits(initialPrefetchLevel) + .additionalCredits(prefetchLevelAfterDelivery); + return this; + } + + @Override + public ConsumerBuilder asynchronousControlFlow(int prefetchLevel) { + if (prefetchLevel <= 0) { + throw new IllegalArgumentException("ConcurrencyLevel must be positive"); + } + this.consumerFlowControlStrategyBuilder = MaximumChunksPerSubscriptionAsyncConsumerFlowControlStrategy + .builder(this) + .maximumInflightChunksPerSubscription(prefetchLevel); + return this; } StreamConsumerBuilder lazyInit(boolean lazyInit) { @@ -192,15 +233,15 @@ public Consumer build() { this.stream, this.offsetSpecification, this.messageHandler, + this.consumerFlowControlStrategyBuilder, this.name, this.environment, trackingConfiguration, this.lazyInit, this.subscriptionListener, this.subscriptionProperties, - this.consumerUpdateListener, - this.flowConfiguration.initialCredits, - this.flowConfiguration.additionalCredits); + this.consumerUpdateListener + ); environment.addConsumer((StreamConsumer) consumer); } else { if (Utils.isSac(this.subscriptionProperties)) { @@ -337,32 +378,6 @@ StreamConsumerBuilder duplicate() { return duplicate; } - private static class DefaultFlowConfiguration implements FlowConfiguration { - - private final ConsumerBuilder consumerBuilder; - - private DefaultFlowConfiguration(ConsumerBuilder consumerBuilder) { - this.consumerBuilder = consumerBuilder; - } - - private int initialCredits = 1; - private final int additionalCredits = 1; - - @Override - public FlowConfiguration initialCredits(int initialCredits) { - if (initialCredits <= 0) { - throw new IllegalArgumentException("Credits must be positive"); - } - this.initialCredits = initialCredits; - return this; - } - - @Override - public ConsumerBuilder builder() { - return this.consumerBuilder; - } - } - // to help testing public ConsumerUpdateListener consumerUpdateListener() { return consumerUpdateListener; diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java b/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java index d40f070b56..0bf613b35e 100644 --- a/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java +++ b/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java @@ -13,6 +13,13 @@ // info@rabbitmq.com. package com.rabbitmq.stream.impl; +import com.rabbitmq.stream.Address; +import com.rabbitmq.stream.AddressResolver; +import com.rabbitmq.stream.BackOffDelayPolicy; +import com.rabbitmq.stream.Codec; +import com.rabbitmq.stream.ConsumerBuilder; +import com.rabbitmq.stream.Environment; +import com.rabbitmq.stream.MessageHandler; import static com.rabbitmq.stream.impl.Utils.convertCodeToException; import static com.rabbitmq.stream.impl.Utils.exceptionMessage; import static com.rabbitmq.stream.impl.Utils.formatConstant; @@ -23,18 +30,22 @@ import com.rabbitmq.stream.*; import com.rabbitmq.stream.MessageHandler.Context; import com.rabbitmq.stream.compression.CompressionCodecFactory; +import com.rabbitmq.stream.flow.ConsumerFlowControlStrategyBuilder; import com.rabbitmq.stream.impl.Client.ClientParameters; import com.rabbitmq.stream.impl.Client.ShutdownListener; import com.rabbitmq.stream.impl.Client.StreamStatsResponse; import com.rabbitmq.stream.impl.OffsetTrackingCoordinator.Registration; import com.rabbitmq.stream.impl.StreamConsumerBuilder.TrackingConfiguration; import com.rabbitmq.stream.impl.StreamEnvironmentBuilder.DefaultTlsConfiguration; -import com.rabbitmq.stream.impl.Utils.ClientConnectionType; import io.netty.buffer.ByteBufAllocator; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslContextBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.net.ssl.SSLException; import java.io.IOException; import java.net.URI; import java.net.URLDecoder; @@ -58,9 +69,9 @@ import java.util.function.LongSupplier; import java.util.function.Supplier; import java.util.stream.Collectors; -import javax.net.ssl.SSLException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; + +import static com.rabbitmq.stream.impl.Utils.*; +import static java.util.concurrent.TimeUnit.SECONDS; class StreamEnvironment implements Environment { @@ -650,22 +661,19 @@ Runnable registerConsumer( SubscriptionListener subscriptionListener, Runnable trackingClosingCallback, MessageHandler messageHandler, - Map subscriptionProperties, - int initialCredits, - int additionalCredits) { - Runnable closingCallback = - this.consumersCoordinator.subscribe( - consumer, - stream, - offsetSpecification, - trackingReference, - subscriptionListener, - trackingClosingCallback, - messageHandler, - subscriptionProperties, - initialCredits, - additionalCredits); - return closingCallback; + ConsumerFlowControlStrategyBuilder consumerFlowControlStrategyBuilder, + Map subscriptionProperties) { + return this.consumersCoordinator.subscribe( + consumer, + stream, + offsetSpecification, + trackingReference, + subscriptionListener, + trackingClosingCallback, + messageHandler, + consumerFlowControlStrategyBuilder, + subscriptionProperties + ); } Runnable registerProducer(StreamProducer producer, String reference, String stream) { diff --git a/src/main/java/com/rabbitmq/stream/impl/SuperStreamConsumer.java b/src/main/java/com/rabbitmq/stream/impl/SuperStreamConsumer.java index c884da7e3c..014f6a33b8 100644 --- a/src/main/java/com/rabbitmq/stream/impl/SuperStreamConsumer.java +++ b/src/main/java/com/rabbitmq/stream/impl/SuperStreamConsumer.java @@ -13,20 +13,21 @@ // info@rabbitmq.com. package com.rabbitmq.stream.impl; -import static com.rabbitmq.stream.impl.Utils.namedFunction; - import com.rabbitmq.stream.Consumer; import com.rabbitmq.stream.Message; import com.rabbitmq.stream.MessageHandler; import com.rabbitmq.stream.impl.StreamConsumerBuilder.TrackingConfiguration; import com.rabbitmq.stream.impl.Utils.CompositeConsumerUpdateListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; + +import static com.rabbitmq.stream.impl.Utils.namedFunction; class SuperStreamConsumer implements Consumer { @@ -126,6 +127,11 @@ private ManualOffsetTrackingMessageHandler( public void handle(Context context, Message message) { Context ctx = new Context() { + @Override + public boolean markHandled() { + return context.markHandled(); + } + @Override public long offset() { return context.offset(); @@ -145,7 +151,7 @@ public long committedChunkId() { public void storeOffset() { for (ConsumerState state : consumerStates) { if (ManualOffsetTrackingMessageHandler.this.consumerState == state) { - maybeStoreOffset(state, () -> context.storeOffset()); + maybeStoreOffset(state, context::storeOffset); } else if (state.offset != 0) { maybeStoreOffset(state, () -> state.consumer.store(state.offset)); } @@ -153,9 +159,7 @@ public void storeOffset() { } private void maybeStoreOffset(ConsumerState state, Runnable storeAction) { - if (state.consumer.isSac() && !state.consumer.sacActive()) { - // do nothing - } else { + if (!state.consumer.isSac() || state.consumer.sacActive()) { storeAction.run(); } } diff --git a/src/main/java/com/rabbitmq/stream/impl/flow/AbstractStatisticRecordingConsumerFlowControlStrategy.java b/src/main/java/com/rabbitmq/stream/impl/flow/AbstractStatisticRecordingConsumerFlowControlStrategy.java new file mode 100644 index 0000000000..8c48e71e79 --- /dev/null +++ b/src/main/java/com/rabbitmq/stream/impl/flow/AbstractStatisticRecordingConsumerFlowControlStrategy.java @@ -0,0 +1,134 @@ +package com.rabbitmq.stream.impl.flow; + +import com.rabbitmq.stream.Message; +import com.rabbitmq.stream.MessageHandler; +import com.rabbitmq.stream.OffsetSpecification; +import com.rabbitmq.stream.flow.AbstractConsumerFlowControlStrategy; +import com.rabbitmq.stream.flow.AsyncConsumerFlowControlStrategy; +import com.rabbitmq.stream.flow.ConsumerFlowControlStrategy; +import com.rabbitmq.stream.flow.CreditAsker; +import com.rabbitmq.stream.impl.ConsumerStatisticRecorder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.IntUnaryOperator; + +/** + * Abstract class that calls an instance of {@link ConsumerStatisticRecorder} and exposes it to child implementations + * that may use its statistics to control flow as they see fit. + */ +public abstract class AbstractStatisticRecordingConsumerFlowControlStrategy + extends AbstractConsumerFlowControlStrategy + implements AsyncConsumerFlowControlStrategy { + + private static final Logger LOGGER = LoggerFactory.getLogger(AbstractStatisticRecordingConsumerFlowControlStrategy.class); + + protected final ConsumerStatisticRecorder consumerStatisticRecorder; + + protected AbstractStatisticRecordingConsumerFlowControlStrategy(String identifier, CreditAsker creditAsker) { + super(identifier, creditAsker); + this.consumerStatisticRecorder = new ConsumerStatisticRecorder(identifier); + } + + /** + * Note for implementors: This method MUST be called from the implementation of + * {@link ConsumerFlowControlStrategy#handleSubscribeReturningInitialCredits}, + * otherwise statistics will not be registered! + *

+ * {@inheritDoc} + */ + @Override + public void handleSubscribe( + OffsetSpecification offsetSpecification, + boolean isInitialSubscription + ) { + this.consumerStatisticRecorder.handleSubscribe( + offsetSpecification, + isInitialSubscription + ); + } + + @Override + public void handleChunk(long offset, long messageCount, long dataSize) { + super.handleChunk(offset, messageCount, dataSize); + this.consumerStatisticRecorder.handleChunk( + offset, + messageCount, + dataSize + ); + } + + @Override + public void handleMessage( + long offset, + long chunkTimestamp, + long committedChunkId, + Message message + ) { + super.handleMessage(offset, chunkTimestamp, committedChunkId, message); + this.consumerStatisticRecorder.handleMessage( + offset, + chunkTimestamp, + committedChunkId, + message + ); + } + + @Override + public void handleCreditNotification(short responseCode) { + super.handleCreditNotification(responseCode); + this.consumerStatisticRecorder.handleCreditNotification(responseCode); + } + + @Override + public void handleUnsubscribe() { + super.handleUnsubscribe(); + this.consumerStatisticRecorder.handleUnsubscribe(); + } + + protected int registerCredits(IntUnaryOperator askedToAsk, boolean askForCredits) { + AtomicInteger outerCreditsToAsk = new AtomicInteger(); + ConsumerStatisticRecorder.SubscriptionStatistics subscriptionStatistics = this.consumerStatisticRecorder + .getSubscriptionStatistics(); + if(subscriptionStatistics == null) { + LOGGER.warn("Lost subscription, returning no credits. askForCredits={}", askForCredits); + return 0; + } + subscriptionStatistics.getPendingChunks().updateAndGet(credits -> { + int creditsToAsk = askedToAsk.applyAsInt(credits); + outerCreditsToAsk.set(creditsToAsk); + return credits + creditsToAsk; + }); + int finalCreditsToAsk = outerCreditsToAsk.get(); + if(askForCredits && finalCreditsToAsk > 0) { + LOGGER.debug("Asking for {} credits", finalCreditsToAsk); + getCreditAsker().credit(finalCreditsToAsk); + } + LOGGER.debug("Returning {} credits with askForCredits={}", finalCreditsToAsk, askForCredits); + return finalCreditsToAsk; + } + + @Override + public boolean markHandled(MessageHandler.Context messageContext) { + ConsumerStatisticRecorder.AggregatedMessageStatistics messageStatistics = this.consumerStatisticRecorder + .retrieveStatistics(messageContext); + if(messageStatistics == null) { + LOGGER.warn("Message statistics not found for offset {} on stream '{}'", messageContext.offset(), messageContext.stream()); + return false; + } + boolean markedAsHandled = this.consumerStatisticRecorder.markHandled(messageStatistics); + if(!markedAsHandled) { + LOGGER.warn("Message not marked as handled for offset {} on stream '{}'", messageContext.offset(), messageContext.stream()); + return false; + } + afterMarkHandledStateChanged(messageContext, messageStatistics); + return true; + } + + protected void afterMarkHandledStateChanged( + MessageHandler.Context messageContext, + ConsumerStatisticRecorder.AggregatedMessageStatistics messageStatistics) { + // Default no-op callback + } +} diff --git a/src/main/java/com/rabbitmq/stream/impl/flow/MaximumChunksPerSubscriptionAsyncConsumerFlowControlStrategy.java b/src/main/java/com/rabbitmq/stream/impl/flow/MaximumChunksPerSubscriptionAsyncConsumerFlowControlStrategy.java new file mode 100644 index 0000000000..c16929a314 --- /dev/null +++ b/src/main/java/com/rabbitmq/stream/impl/flow/MaximumChunksPerSubscriptionAsyncConsumerFlowControlStrategy.java @@ -0,0 +1,112 @@ +package com.rabbitmq.stream.impl.flow; + +import com.rabbitmq.stream.ConsumerBuilder; +import com.rabbitmq.stream.MessageHandler; +import com.rabbitmq.stream.OffsetSpecification; +import com.rabbitmq.stream.flow.ConsumerFlowControlStrategyBuilder; +import com.rabbitmq.stream.flow.CreditAsker; +import com.rabbitmq.stream.flow.MessageHandlingListener; +import com.rabbitmq.stream.impl.ConsumerStatisticRecorder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.function.IntUnaryOperator; + +/** + * A flow control strategy that enforces a maximum amount of Inflight chunks per registered subscription. + * Based on {@link MessageHandlingListener message acknowledgement}, asking for the maximum number of chunks possible, given the limit. + */ +public class MaximumChunksPerSubscriptionAsyncConsumerFlowControlStrategy extends AbstractStatisticRecordingConsumerFlowControlStrategy { + + private static final Logger LOGGER = LoggerFactory.getLogger(MaximumChunksPerSubscriptionAsyncConsumerFlowControlStrategy.class); + + private final int maximumSimultaneousChunksPerSubscription; + + public MaximumChunksPerSubscriptionAsyncConsumerFlowControlStrategy( + String identifier, + CreditAsker creditAsker, + int maximumSimultaneousChunksPerSubscription + ) { + super(identifier, creditAsker); + if(maximumSimultaneousChunksPerSubscription <= 0) { + throw new IllegalArgumentException( + "maximumSimultaneousChunksPerSubscription must be greater than 0. Was: " + maximumSimultaneousChunksPerSubscription + ); + } + this.maximumSimultaneousChunksPerSubscription = maximumSimultaneousChunksPerSubscription; + } + + @Override + public int handleSubscribeReturningInitialCredits( + OffsetSpecification offsetSpecification, + boolean isInitialSubscription) { + this.handleSubscribe( + offsetSpecification, + isInitialSubscription + ); + return registerCredits(getCreditRegistererFunction(), false); + } + + @Override + protected void afterMarkHandledStateChanged( + MessageHandler.Context messageContext, + ConsumerStatisticRecorder.AggregatedMessageStatistics messageStatistics) { + registerCredits(getCreditRegistererFunction(), true); + } + + private IntUnaryOperator getCreditRegistererFunction() { + return pendingChunks -> { + int inProcessingChunks = extractInProcessingChunks(); + return Math.max(0, this.maximumSimultaneousChunksPerSubscription - (pendingChunks + inProcessingChunks)); + }; + } + + private int extractInProcessingChunks() { + int inProcessingChunks; + ConsumerStatisticRecorder.SubscriptionStatistics subscriptionStats = this.consumerStatisticRecorder + .getSubscriptionStatistics(); + if(subscriptionStats == null) { + LOGGER.warn("Subscription data not found while calculating credits to ask! Identifier: {}", this.getIdentifier()); + inProcessingChunks = 0; + } else { + inProcessingChunks = subscriptionStats.getUnprocessedChunksByOffset().size(); + } + return inProcessingChunks; + } + + public static Builder builder(ConsumerBuilder consumerBuilder) { + return new Builder(consumerBuilder); + } + + public static class Builder implements ConsumerFlowControlStrategyBuilder, ConsumerBuilder.ConsumerBuilderAccessor { + + private final ConsumerBuilder consumerBuilder; + + private int maximumInflightChunksPerSubscription = 1; + + public Builder(ConsumerBuilder consumerBuilder) { + this.consumerBuilder = consumerBuilder; + } + + @Override + public MaximumChunksPerSubscriptionAsyncConsumerFlowControlStrategy build(String identifier, CreditAsker creditAsker) { + return new MaximumChunksPerSubscriptionAsyncConsumerFlowControlStrategy( + identifier, + creditAsker, + this.maximumInflightChunksPerSubscription + ); + } + + @Override + public ConsumerBuilder builder() { + return this.consumerBuilder; + } + + public Builder maximumInflightChunksPerSubscription(int maximumInflightChunksPerSubscription) { + this.maximumInflightChunksPerSubscription = maximumInflightChunksPerSubscription; + return this; + } + + } + +} diff --git a/src/main/java/com/rabbitmq/stream/impl/flow/SynchronousConsumerFlowControlStrategy.java b/src/main/java/com/rabbitmq/stream/impl/flow/SynchronousConsumerFlowControlStrategy.java new file mode 100644 index 0000000000..f03849f53a --- /dev/null +++ b/src/main/java/com/rabbitmq/stream/impl/flow/SynchronousConsumerFlowControlStrategy.java @@ -0,0 +1,86 @@ +package com.rabbitmq.stream.impl.flow; + +import com.rabbitmq.stream.ConsumerBuilder; +import com.rabbitmq.stream.OffsetSpecification; +import com.rabbitmq.stream.flow.AbstractConsumerFlowControlStrategy; +import com.rabbitmq.stream.flow.ConsumerFlowControlStrategyBuilder; +import com.rabbitmq.stream.flow.CreditAsker; + +/** + * The default flow control strategy. + * Requests a set amount of credits after each chunk arrives. + * Ideal for usage when the message is consumed synchronously inside the message handler, + * which is the case for most Consumers. + */ +public class SynchronousConsumerFlowControlStrategy extends AbstractConsumerFlowControlStrategy { + + private final int initialCredits; + private final int additionalCredits; + + public SynchronousConsumerFlowControlStrategy( + String identifier, + CreditAsker creditAsker, + int initialCredits, + int additionalCredits) { + super(identifier, creditAsker); + this.initialCredits = initialCredits; + this.additionalCredits = additionalCredits; + } + + @Override + public int handleSubscribeReturningInitialCredits( + OffsetSpecification offsetSpecification, + boolean isInitialSubscription + ) { + return this.initialCredits; + } + + @Override + public void handleChunk(long offset, long messageCount, long dataSize) { + getCreditAsker().credit(this.additionalCredits); + } + + public static SynchronousConsumerFlowControlStrategy.Builder builder(ConsumerBuilder consumerBuilder) { + return new SynchronousConsumerFlowControlStrategy.Builder(consumerBuilder); + } + + public static class Builder implements ConsumerFlowControlStrategyBuilder { + + private final ConsumerBuilder consumerBuilder; + + private int initialCredits = 1; + + private int additionalCredits = 1; + + public Builder(ConsumerBuilder consumerBuilder) { + this.consumerBuilder = consumerBuilder; + } + + @Override + public SynchronousConsumerFlowControlStrategy build(String identifier, CreditAsker creditAsker) { + return new SynchronousConsumerFlowControlStrategy( + identifier, + creditAsker, + this.initialCredits, + this.additionalCredits + ); + } + + @Override + public ConsumerBuilder builder() { + return this.consumerBuilder; + } + + public Builder additionalCredits(int additionalCredits) { + this.additionalCredits = additionalCredits; + return this; + } + + public Builder initialCredits(int initialCredits) { + this.initialCredits = initialCredits; + return this; + } + + } + +} diff --git a/src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java b/src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java index 73136cb0fd..d365156745 100644 --- a/src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java +++ b/src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java @@ -13,12 +13,6 @@ // info@rabbitmq.com. package com.rabbitmq.stream.perf; -import static com.rabbitmq.stream.perf.Utils.ENVIRONMENT_VARIABLE_LOOKUP; -import static com.rabbitmq.stream.perf.Utils.ENVIRONMENT_VARIABLE_PREFIX; -import static com.rabbitmq.stream.perf.Utils.OPTION_TO_ENVIRONMENT_VARIABLE; -import static java.lang.String.format; -import static java.time.Duration.ofMillis; - import com.google.common.util.concurrent.RateLimiter; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; @@ -46,8 +40,7 @@ import com.rabbitmq.stream.impl.Client; import com.rabbitmq.stream.metrics.MetricsCollector; import com.rabbitmq.stream.perf.ShutdownService.CloseCallback; -import com.rabbitmq.stream.perf.Utils.NamedThreadFactory; -import com.rabbitmq.stream.perf.Utils.PerformanceMicrometerMetricsCollector; +import com.rabbitmq.stream.perf.Utils.*; import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.Gauge; import io.micrometer.core.instrument.Tag; @@ -64,6 +57,16 @@ import io.netty.handler.ssl.SslContextBuilder; import io.netty.handler.ssl.SslHandler; import io.netty.util.internal.PlatformDependent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import picocli.CommandLine; +import picocli.CommandLine.ArgGroup; +import picocli.CommandLine.Model.CommandSpec; +import picocli.CommandLine.ParameterException; +import picocli.CommandLine.Spec; + +import javax.net.ssl.SNIServerName; +import javax.net.ssl.SSLParameters; import java.io.IOException; import java.io.PrintStream; import java.io.PrintWriter; @@ -93,15 +96,10 @@ import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.IntStream; -import javax.net.ssl.SNIServerName; -import javax.net.ssl.SSLParameters; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import picocli.CommandLine; -import picocli.CommandLine.ArgGroup; -import picocli.CommandLine.Model.CommandSpec; -import picocli.CommandLine.ParameterException; -import picocli.CommandLine.Spec; + +import static com.rabbitmq.stream.perf.Utils.*; +import static java.lang.String.format; +import static java.time.Duration.ofMillis; @CommandLine.Command( name = "stream-perf-test", @@ -996,9 +994,7 @@ public Integer call() throws Exception { environment .consumerBuilder() .offset(this.offset) - .flow() - .initialCredits(this.initialCredits) - .builder(); + .synchronousControlFlow(this.initialCredits, 1); if (this.superStreams) { consumerBuilder.superStream(stream); diff --git a/src/test/java/com/rabbitmq/stream/Host.java b/src/test/java/com/rabbitmq/stream/Host.java index 3c6c51db9f..a44f56d794 100644 --- a/src/test/java/com/rabbitmq/stream/Host.java +++ b/src/test/java/com/rabbitmq/stream/Host.java @@ -47,7 +47,6 @@ private static Process executeCommand(String command) throws IOException { private static Process executeCommand(String command, boolean ignoreError) throws IOException { Process pr = executeCommandProcess(command); - int ev = waitForExitValue(pr); if (ev != 0 && !ignoreError) { String stdout = capture(pr.getInputStream()); @@ -87,10 +86,10 @@ private static Process executeCommandProcess(String command) throws IOException String[] finalCommand; if (System.getProperty("os.name").toLowerCase().contains("windows")) { finalCommand = new String[4]; - finalCommand[0] = "C:\\winnt\\system32\\cmd.exe"; + finalCommand[0] = "C:\\Windows\\system32\\cmd.exe"; finalCommand[1] = "/y"; finalCommand[2] = "/c"; - finalCommand[3] = command; + finalCommand[3] = command.replaceAll("\"", "\"\"\"").replaceAll("'", "\""); } else { finalCommand = new String[3]; finalCommand[0] = "/bin/sh"; diff --git a/src/test/java/com/rabbitmq/stream/impl/ConsumersCoordinatorTest.java b/src/test/java/com/rabbitmq/stream/impl/ConsumersCoordinatorTest.java index a7634e13ea..9b9837d048 100644 --- a/src/test/java/com/rabbitmq/stream/impl/ConsumersCoordinatorTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/ConsumersCoordinatorTest.java @@ -13,38 +13,33 @@ // info@rabbitmq.com. package com.rabbitmq.stream.impl; -import static com.rabbitmq.stream.BackOffDelayPolicy.fixedWithInitialDelay; -import static com.rabbitmq.stream.impl.TestUtils.b; -import static com.rabbitmq.stream.impl.TestUtils.latchAssert; -import static com.rabbitmq.stream.impl.TestUtils.metadata; -import static com.rabbitmq.stream.impl.TestUtils.namedConsumer; -import static com.rabbitmq.stream.impl.TestUtils.waitAtMost; -import static java.lang.String.format; -import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatThrownBy; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyByte; -import static org.mockito.ArgumentMatchers.anyInt; -import static org.mockito.ArgumentMatchers.anyMap; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.ArgumentMatchers.isNull; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - import com.rabbitmq.stream.BackOffDelayPolicy; import com.rabbitmq.stream.Constants; +import com.rabbitmq.stream.Message; import com.rabbitmq.stream.OffsetSpecification; import com.rabbitmq.stream.StreamDoesNotExistException; import com.rabbitmq.stream.StreamException; import com.rabbitmq.stream.SubscriptionListener; import com.rabbitmq.stream.codec.WrapperMessageBuilder; +import com.rabbitmq.stream.flow.ConsumerFlowControlStrategy; +import com.rabbitmq.stream.flow.ConsumerFlowControlStrategyBuilder; import com.rabbitmq.stream.impl.Client.MessageListener; import com.rabbitmq.stream.impl.Client.QueryOffsetResponse; import com.rabbitmq.stream.impl.Client.Response; import com.rabbitmq.stream.impl.MonitoringTestUtils.ConsumerCoordinatorInfo; import com.rabbitmq.stream.impl.Utils.ClientFactory; +import com.rabbitmq.stream.impl.flow.SynchronousConsumerFlowControlStrategy; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; + import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; @@ -62,22 +57,20 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.MethodSource; -import org.mockito.ArgumentCaptor; -import org.mockito.Captor; -import org.mockito.Mock; -import org.mockito.MockitoAnnotations; + +import static com.rabbitmq.stream.BackOffDelayPolicy.fixedWithInitialDelay; +import static com.rabbitmq.stream.impl.TestUtils.*; +import static java.lang.String.format; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.*; +import static org.mockito.Mockito.*; public class ConsumersCoordinatorTest { private static final SubscriptionListener NO_OP_SUBSCRIPTION_LISTENER = subscriptionContext -> {}; private static final Runnable NO_OP_TRACKING_CLOSING_CALLBACK = () -> {}; - private int initialCredits = 10; - private int additionalCredits = 1; + private final int initialCredits = 10; @Mock StreamEnvironment environment; @Mock StreamConsumer consumer; @@ -205,9 +198,9 @@ void tearDown() throws Exception { NO_OP_SUBSCRIPTION_LISTENER, NO_OP_TRACKING_CLOSING_CALLBACK, (offset, message) -> {}, - Collections.emptyMap(), - initialCredits, - additionalCredits); + SynchronousConsumerFlowControlStrategy.builder(null), + Collections.emptyMap() + ); verify(clientFactory, times(2)).client(any()); verify(client, times(1)) .subscribe(anyByte(), anyString(), any(OffsetSpecification.class), anyInt(), anyMap()); @@ -247,9 +240,9 @@ void shouldGetExactNodeImmediatelyWithAdvertisedHostNameClientFactoryAndExactNod NO_OP_SUBSCRIPTION_LISTENER, NO_OP_TRACKING_CLOSING_CALLBACK, (offset, message) -> {}, - Collections.emptyMap(), - initialCredits, - additionalCredits); + SynchronousConsumerFlowControlStrategy.builder(null), + Collections.emptyMap() + ); verify(clientFactory, times(1)).client(any()); verify(client, times(1)) .subscribe(anyByte(), anyString(), any(OffsetSpecification.class), anyInt(), anyMap()); @@ -278,9 +271,9 @@ void shouldSubscribeWithEmptyPropertiesWithUnamedConsumer() { NO_OP_SUBSCRIPTION_LISTENER, NO_OP_TRACKING_CLOSING_CALLBACK, (offset, message) -> {}, - Collections.emptyMap(), - initialCredits, - additionalCredits); + SynchronousConsumerFlowControlStrategy.builder(null), + Collections.emptyMap() + ); verify(clientFactory, times(1)).client(any()); verify(client, times(1)) .subscribe(anyByte(), anyString(), any(OffsetSpecification.class), anyInt(), anyMap()); @@ -300,9 +293,9 @@ void subscribeShouldThrowExceptionWhenNoMetadataForTheStream() { NO_OP_SUBSCRIPTION_LISTENER, NO_OP_TRACKING_CLOSING_CALLBACK, (offset, message) -> {}, - Collections.emptyMap(), - initialCredits, - additionalCredits)) + SynchronousConsumerFlowControlStrategy.builder(null), + Collections.emptyMap() + )) .isInstanceOf(StreamDoesNotExistException.class); } @@ -320,9 +313,9 @@ void subscribeShouldThrowExceptionWhenStreamDoesNotExist() { NO_OP_SUBSCRIPTION_LISTENER, NO_OP_TRACKING_CLOSING_CALLBACK, (offset, message) -> {}, - Collections.emptyMap(), - initialCredits, - additionalCredits)) + SynchronousConsumerFlowControlStrategy.builder(null), + Collections.emptyMap() + )) .isInstanceOf(StreamDoesNotExistException.class); } @@ -350,9 +343,9 @@ void subscribePropagateExceptionWhenClientSubscriptionFails() { NO_OP_SUBSCRIPTION_LISTENER, NO_OP_TRACKING_CLOSING_CALLBACK, (offset, message) -> {}, - Collections.emptyMap(), - initialCredits, - additionalCredits)) + SynchronousConsumerFlowControlStrategy.builder(null), + Collections.emptyMap() + )) .isInstanceOf(StreamException.class) .hasMessage(exceptionMessage); assertThat(MonitoringTestUtils.extract(coordinator).isEmpty()).isTrue(); @@ -372,9 +365,9 @@ void subscribeShouldThrowExceptionWhenMetadataResponseIsNotOk() { NO_OP_SUBSCRIPTION_LISTENER, NO_OP_TRACKING_CLOSING_CALLBACK, (offset, message) -> {}, - Collections.emptyMap(), - initialCredits, - additionalCredits)) + SynchronousConsumerFlowControlStrategy.builder(null), + Collections.emptyMap() + )) .isInstanceOf(IllegalStateException.class); } @@ -391,9 +384,9 @@ void subscribeShouldThrowExceptionIfNoNodeAvailableForStream() { NO_OP_SUBSCRIPTION_LISTENER, NO_OP_TRACKING_CLOSING_CALLBACK, (offset, message) -> {}, - Collections.emptyMap(), - initialCredits, - additionalCredits)) + SynchronousConsumerFlowControlStrategy.builder(null), + Collections.emptyMap() + )) .isInstanceOf(IllegalStateException.class); } @@ -433,9 +426,9 @@ void subscribeShouldSubscribeToStreamAndDispatchMessage_UnsubscribeShouldUnsubsc NO_OP_SUBSCRIPTION_LISTENER, () -> trackingClosingCallbackCalls.incrementAndGet(), (offset, message) -> messageHandlerCalls.incrementAndGet(), - Collections.emptyMap(), - initialCredits, - additionalCredits); + SynchronousConsumerFlowControlStrategy.builder(null), + Collections.emptyMap() + ); verify(clientFactory, times(1)).client(any()); verify(client, times(1)) .subscribe(anyByte(), anyString(), any(OffsetSpecification.class), anyInt(), anyMap()); @@ -484,9 +477,9 @@ void subscribeShouldSubscribeToStreamAndDispatchMessageWithManySubscriptions() { NO_OP_TRACKING_CLOSING_CALLBACK, (offset, message) -> messageHandlerCalls.compute(subId, (k, v) -> (v == null) ? 1 : ++v), - Collections.emptyMap(), - initialCredits, - additionalCredits); + SynchronousConsumerFlowControlStrategy.builder(null), + Collections.emptyMap() + ); closingRunnables.add(closingRunnable); } @@ -560,9 +553,9 @@ void shouldRedistributeConsumerIfConnectionIsLost() throws Exception { NO_OP_SUBSCRIPTION_LISTENER, NO_OP_TRACKING_CLOSING_CALLBACK, (offset, message) -> messageHandlerCalls.incrementAndGet(), - Collections.emptyMap(), - initialCredits, - additionalCredits); + SynchronousConsumerFlowControlStrategy.builder(null), + Collections.emptyMap() + ); verify(clientFactory, times(1)).client(any()); verify(client, times(1)) .subscribe(anyByte(), anyString(), any(OffsetSpecification.class), anyInt(), anyMap()); @@ -580,9 +573,9 @@ void shouldRedistributeConsumerIfConnectionIsLost() throws Exception { NO_OP_SUBSCRIPTION_LISTENER, NO_OP_TRACKING_CLOSING_CALLBACK, (offset, message) -> {}, - Collections.emptyMap(), - initialCredits, - additionalCredits); + SynchronousConsumerFlowControlStrategy.builder(null), + Collections.emptyMap() + ); verify(client, times(1 + 1)) .subscribe(anyByte(), anyString(), any(OffsetSpecification.class), anyInt(), anyMap()); @@ -661,9 +654,9 @@ void shouldSkipRecoveryIfRecoveryIsAlreadyInProgress() throws Exception { NO_OP_SUBSCRIPTION_LISTENER, NO_OP_TRACKING_CLOSING_CALLBACK, (offset, message) -> messageHandlerCalls.incrementAndGet(), - Collections.emptyMap(), - initialCredits, - additionalCredits); + SynchronousConsumerFlowControlStrategy.builder(null), + Collections.emptyMap() + ); verify(clientFactory, times(1)).client(any()); verify(client, times(1)) .subscribe(anyByte(), anyString(), any(OffsetSpecification.class), anyInt(), anyMap()); @@ -717,9 +710,9 @@ void shouldRedistributeConsumerOnMetadataUpdate() throws Exception { NO_OP_SUBSCRIPTION_LISTENER, NO_OP_TRACKING_CLOSING_CALLBACK, (offset, message) -> messageHandlerCalls.incrementAndGet(), - Collections.emptyMap(), - initialCredits, - additionalCredits); + SynchronousConsumerFlowControlStrategy.builder(null), + Collections.emptyMap() + ); verify(clientFactory, times(1)).client(any()); verify(client, times(1)) .subscribe(anyByte(), anyString(), any(OffsetSpecification.class), anyInt(), anyMap()); @@ -732,9 +725,9 @@ void shouldRedistributeConsumerOnMetadataUpdate() throws Exception { NO_OP_SUBSCRIPTION_LISTENER, NO_OP_TRACKING_CLOSING_CALLBACK, (offset, message) -> {}, - Collections.emptyMap(), - initialCredits, - additionalCredits); + SynchronousConsumerFlowControlStrategy.builder(null), + Collections.emptyMap() + ); verify(client, times(1 + 1)) .subscribe(anyByte(), anyString(), any(OffsetSpecification.class), anyInt(), anyMap()); @@ -821,9 +814,9 @@ void shouldRetryRedistributionIfMetadataIsNotUpdatedImmediately() throws Excepti NO_OP_SUBSCRIPTION_LISTENER, NO_OP_TRACKING_CLOSING_CALLBACK, (offset, message) -> messageHandlerCalls.incrementAndGet(), - Collections.emptyMap(), - initialCredits, - additionalCredits); + SynchronousConsumerFlowControlStrategy.builder(null), + Collections.emptyMap() + ); verify(clientFactory, times(1)).client(any()); verify(client, times(1)) .subscribe(anyByte(), anyString(), any(OffsetSpecification.class), anyInt(), anyMap()); @@ -887,9 +880,9 @@ void metadataUpdate_shouldCloseConsumerIfStreamIsDeleted() throws Exception { NO_OP_SUBSCRIPTION_LISTENER, NO_OP_TRACKING_CLOSING_CALLBACK, (offset, message) -> messageHandlerCalls.incrementAndGet(), - Collections.emptyMap(), - initialCredits, - additionalCredits); + SynchronousConsumerFlowControlStrategy.builder(null), + Collections.emptyMap() + ); verify(clientFactory, times(1)).client(any()); verify(client, times(1)) .subscribe(anyByte(), anyString(), any(OffsetSpecification.class), anyInt(), anyMap()); @@ -941,9 +934,9 @@ void metadataUpdate_shouldCloseConsumerIfRetryTimeoutIsReached() throws Exceptio NO_OP_SUBSCRIPTION_LISTENER, NO_OP_TRACKING_CLOSING_CALLBACK, (offset, message) -> messageHandlerCalls.incrementAndGet(), - Collections.emptyMap(), - initialCredits, - additionalCredits); + SynchronousConsumerFlowControlStrategy.builder(null), + Collections.emptyMap() + ); verify(clientFactory, times(1)).client(any()); verify(client, times(1)) .subscribe(anyByte(), anyString(), any(OffsetSpecification.class), anyInt(), anyMap()); @@ -996,9 +989,9 @@ void shouldUseNewClientsForMoreThanMaxSubscriptionsAndCloseClientAfterUnsubscrip NO_OP_SUBSCRIPTION_LISTENER, NO_OP_TRACKING_CLOSING_CALLBACK, (offset, message) -> {}, - Collections.emptyMap(), - initialCredits, - additionalCredits)) + SynchronousConsumerFlowControlStrategy.builder(null), + Collections.emptyMap() + )) .collect(Collectors.toList()); verify(clientFactory, times(2)).client(any()); @@ -1058,9 +1051,9 @@ void shouldRemoveClientSubscriptionManagerFromPoolAfterConnectionDies() throws E NO_OP_SUBSCRIPTION_LISTENER, NO_OP_TRACKING_CLOSING_CALLBACK, (offset, message) -> {}, - Collections.emptyMap(), - initialCredits, - additionalCredits); + SynchronousConsumerFlowControlStrategy.builder(null), + Collections.emptyMap() + ); }); // the extra is allocated on another client from the same pool verify(clientFactory, times(2)).client(any()); @@ -1084,9 +1077,9 @@ void shouldRemoveClientSubscriptionManagerFromPoolAfterConnectionDies() throws E NO_OP_SUBSCRIPTION_LISTENER, NO_OP_TRACKING_CLOSING_CALLBACK, (offset, message) -> {}, - Collections.emptyMap(), - initialCredits, - additionalCredits); + SynchronousConsumerFlowControlStrategy.builder(null), + Collections.emptyMap() + ); verify(clientFactory, times(2 + 1)).client(any()); verify(client, times(subscriptionCount + ConsumersCoordinator.MAX_SUBSCRIPTIONS_PER_CLIENT + 1)) @@ -1125,9 +1118,9 @@ void shouldRemoveClientSubscriptionManagerFromPoolIfEmptyAfterMetadataUpdate() t NO_OP_SUBSCRIPTION_LISTENER, NO_OP_TRACKING_CLOSING_CALLBACK, (offset, message) -> {}, - Collections.emptyMap(), - initialCredits, - additionalCredits); + SynchronousConsumerFlowControlStrategy.builder(null), + Collections.emptyMap() + ); }); // the extra is allocated on another client from the same pool verify(clientFactory, times(2)).client(any()); @@ -1157,9 +1150,9 @@ void shouldRemoveClientSubscriptionManagerFromPoolIfEmptyAfterMetadataUpdate() t NO_OP_SUBSCRIPTION_LISTENER, NO_OP_TRACKING_CLOSING_CALLBACK, (offset, message) -> {}, - Collections.emptyMap(), - initialCredits, - additionalCredits); + SynchronousConsumerFlowControlStrategy.builder(null), + Collections.emptyMap() + ); // no more client creation verify(clientFactory, times(2)).client(any()); @@ -1208,9 +1201,9 @@ void shouldRestartWhereItLeftOffAfterDisruption(Consumer {}, - Collections.emptyMap(), - initialCredits, - additionalCredits); + SynchronousConsumerFlowControlStrategy.builder(null), + Collections.emptyMap() + ); verify(clientFactory, times(1)).client(any()); verify(client, times(1)) .subscribe(anyByte(), anyString(), any(OffsetSpecification.class), anyInt(), anyMap()); @@ -1281,9 +1274,9 @@ void shouldReUseInitialOffsetSpecificationAfterDisruptionIfNoMessagesReceived( NO_OP_SUBSCRIPTION_LISTENER, NO_OP_TRACKING_CLOSING_CALLBACK, (offset, message) -> {}, - Collections.emptyMap(), - initialCredits, - additionalCredits); + SynchronousConsumerFlowControlStrategy.builder(null), + Collections.emptyMap() + ); verify(clientFactory, times(1)).client(any()); verify(client, times(1)) .subscribe(anyByte(), anyString(), any(OffsetSpecification.class), anyInt(), anyMap()); @@ -1356,9 +1349,9 @@ void shouldUseStoredOffsetOnRecovery(Consumer configur NO_OP_SUBSCRIPTION_LISTENER, NO_OP_TRACKING_CLOSING_CALLBACK, (offset, message) -> {}, - Collections.emptyMap(), - initialCredits, - additionalCredits); + SynchronousConsumerFlowControlStrategy.builder(null), + Collections.emptyMap() + ); verify(clientFactory, times(1)).client(any()); verify(client, times(1)) .subscribe(anyByte(), anyString(), any(OffsetSpecification.class), anyInt(), anyMap()); @@ -1443,9 +1436,9 @@ void shouldRetryAssignmentOnRecoveryTimeout() throws Exception { NO_OP_SUBSCRIPTION_LISTENER, NO_OP_TRACKING_CLOSING_CALLBACK, (offset, message) -> {}, - Collections.emptyMap(), - initialCredits, - additionalCredits); + SynchronousConsumerFlowControlStrategy.builder(null), + Collections.emptyMap() + ); verify(clientFactory, times(1)).client(any()); verify(client, times(1)) .subscribe(anyByte(), anyString(), any(OffsetSpecification.class), anyInt(), anyMap()); @@ -1458,9 +1451,9 @@ void shouldRetryAssignmentOnRecoveryTimeout() throws Exception { NO_OP_SUBSCRIPTION_LISTENER, NO_OP_TRACKING_CLOSING_CALLBACK, (offset, message) -> {}, - Collections.emptyMap(), - initialCredits, - additionalCredits); + SynchronousConsumerFlowControlStrategy.builder(null), + Collections.emptyMap() + ); verify(clientFactory, times(1)).client(any()); verify(client, times(1 + 1)) .subscribe(anyByte(), anyString(), any(OffsetSpecification.class), anyInt(), anyMap()); @@ -1524,9 +1517,9 @@ void shouldRetryAssignmentOnRecoveryStreamNotAvailableFailure() throws Exception NO_OP_SUBSCRIPTION_LISTENER, NO_OP_TRACKING_CLOSING_CALLBACK, (offset, message) -> {}, - Collections.emptyMap(), - initialCredits, - additionalCredits); + SynchronousConsumerFlowControlStrategy.builder(null), + Collections.emptyMap() + ); verify(clientFactory, times(1)).client(any()); verify(client, times(1)) .subscribe(anyByte(), anyString(), any(OffsetSpecification.class), anyInt(), anyMap()); @@ -1590,9 +1583,9 @@ void shouldRetryAssignmentOnRecoveryCandidateLookupFailure() throws Exception { NO_OP_SUBSCRIPTION_LISTENER, NO_OP_TRACKING_CLOSING_CALLBACK, (offset, message) -> {}, - Collections.emptyMap(), - initialCredits, - additionalCredits); + SynchronousConsumerFlowControlStrategy.builder(null), + Collections.emptyMap() + ); verify(clientFactory, times(1)).client(any()); verify(client, times(1)) .subscribe(anyByte(), anyString(), any(OffsetSpecification.class), anyInt(), anyMap()); @@ -1633,9 +1626,9 @@ void subscribeUnsubscribeInDifferentThreadsShouldNotDeadlock() { NO_OP_SUBSCRIPTION_LISTENER, NO_OP_TRACKING_CLOSING_CALLBACK, (offset, message) -> {}, - Collections.emptyMap(), - initialCredits, - additionalCredits); + SynchronousConsumerFlowControlStrategy.builder(null), + Collections.emptyMap() + ); closingRunnable.run(); }; @@ -1663,6 +1656,114 @@ void subscribeUnsubscribeInDifferentThreadsShouldNotDeadlock() { } } + @ParameterizedTest + @MethodSource("disruptionArguments") + @SuppressWarnings("unchecked") + void shouldCallConsumerFlowControlHandlers(Consumer configurator) + throws Exception { + + scheduledExecutorService = createScheduledExecutorService(); + when(environment.scheduledExecutorService()).thenReturn(scheduledExecutorService); + Duration retryDelay = Duration.ofMillis(100); + when(environment.recoveryBackOffDelayPolicy()).thenReturn(BackOffDelayPolicy.fixed(retryDelay)); + when(environment.topologyUpdateBackOffDelayPolicy()) + .thenReturn(BackOffDelayPolicy.fixed(retryDelay)); + when(consumer.isOpen()).thenReturn(true); + when(locator.metadata("stream")) + .thenReturn(metadata(null, replicas())) + .thenReturn(metadata(null, Collections.emptyList())) + .thenReturn(metadata(null, replicas())); + + when(clientFactory.client(any())).thenReturn(client); + + String consumerName = "consumer-name"; + long lastStoredOffset = 5; + long lastReceivedOffset = 10; + when(client.queryOffset(consumerName, "stream")) + .thenReturn(new QueryOffsetResponse(Constants.RESPONSE_CODE_OK, 0L)) + .thenReturn(new QueryOffsetResponse(Constants.RESPONSE_CODE_OK, lastStoredOffset)); + + ArgumentCaptor offsetSpecificationArgumentCaptor = + ArgumentCaptor.forClass(OffsetSpecification.class); + ArgumentCaptor> subscriptionPropertiesArgumentCaptor = + ArgumentCaptor.forClass(Map.class); + when(client.subscribe( + subscriptionIdCaptor.capture(), + anyString(), + offsetSpecificationArgumentCaptor.capture(), + anyInt(), + subscriptionPropertiesArgumentCaptor.capture())) + .thenReturn(new Client.Response(Constants.RESPONSE_CODE_OK)); + + ConsumerFlowControlStrategy mockedConsumerFlowControlStrategy = Mockito.mock(ConsumerFlowControlStrategy.class); + + int numberOfInitialCreditsOnSubscribe = 7; + + when(mockedConsumerFlowControlStrategy.handleSubscribeReturningInitialCredits(any(), anyBoolean())) + .thenReturn(numberOfInitialCreditsOnSubscribe); + + ConsumerFlowControlStrategyBuilder mockedConsumerFlowControlStrategyBuilder = Mockito.mock(ConsumerFlowControlStrategyBuilder.class); + when(mockedConsumerFlowControlStrategyBuilder.build(any(), any())).thenReturn(mockedConsumerFlowControlStrategy); + + Runnable closingRunnable = + coordinator.subscribe( + consumer, + "stream", + null, + consumerName, + NO_OP_SUBSCRIPTION_LISTENER, + NO_OP_TRACKING_CLOSING_CALLBACK, + (offset, message) -> {}, + mockedConsumerFlowControlStrategyBuilder, + Collections.emptyMap() + ); + verify(clientFactory, times(1)).client(any()); + verify(client, times(1)) + .subscribe(anyByte(), anyString(), any(OffsetSpecification.class), eq(numberOfInitialCreditsOnSubscribe), anyMap()); + verify(mockedConsumerFlowControlStrategy, times(1)) + .handleSubscribeReturningInitialCredits(any(OffsetSpecification.class), anyBoolean()); + assertThat(offsetSpecificationArgumentCaptor.getAllValues()) + .element(0) + .isEqualTo(OffsetSpecification.next()); + assertThat(subscriptionPropertiesArgumentCaptor.getAllValues()) + .element(0) + .isEqualTo(Collections.singletonMap("name", "consumer-name")); + + Message message = new WrapperMessageBuilder().build(); + + messageListener.handle( + subscriptionIdCaptor.getValue(), + lastReceivedOffset, + 0, + 0, + message); + + verify(mockedConsumerFlowControlStrategy).handleMessage( + lastReceivedOffset, + 0, + 0, + message + ); + + configurator.accept(this); + + Thread.sleep(retryDelay.toMillis() * 5); + + verify(client, times(2)) + .subscribe(anyByte(), anyString(), any(OffsetSpecification.class), anyInt(), anyMap()); + + verify(mockedConsumerFlowControlStrategy, times(2)) + .handleSubscribeReturningInitialCredits(any(OffsetSpecification.class), anyBoolean()); + + assertThat(offsetSpecificationArgumentCaptor.getAllValues()) + .element(1) + .isEqualTo(OffsetSpecification.offset(lastStoredOffset + 1)) + .isNotEqualTo(OffsetSpecification.offset(lastReceivedOffset)); + assertThat(subscriptionPropertiesArgumentCaptor.getAllValues()) + .element(1) + .isEqualTo(Collections.singletonMap("name", "consumer-name")); + } + Client.Broker leader() { return new Client.Broker("leader", -1); } diff --git a/src/test/java/com/rabbitmq/stream/impl/OffsetTrackingCoordinatorTest.java b/src/test/java/com/rabbitmq/stream/impl/OffsetTrackingCoordinatorTest.java index 8a7af3ece6..701ff883b2 100644 --- a/src/test/java/com/rabbitmq/stream/impl/OffsetTrackingCoordinatorTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/OffsetTrackingCoordinatorTest.java @@ -13,14 +13,16 @@ // info@rabbitmq.com. package com.rabbitmq.stream.impl; -import static com.rabbitmq.stream.impl.TestUtils.answer; -import static com.rabbitmq.stream.impl.TestUtils.latchAssert; -import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.Mockito.*; - import com.rabbitmq.stream.MessageHandler.Context; import com.rabbitmq.stream.impl.OffsetTrackingCoordinator.Registration; import com.rabbitmq.stream.impl.StreamConsumerBuilder.TrackingConfiguration; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + import java.time.Duration; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; @@ -29,12 +31,11 @@ import java.util.function.Consumer; import java.util.function.LongConsumer; import java.util.stream.IntStream; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.mockito.ArgumentCaptor; -import org.mockito.Mock; -import org.mockito.MockitoAnnotations; + +import static com.rabbitmq.stream.impl.TestUtils.answer; +import static com.rabbitmq.stream.impl.TestUtils.latchAssert; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.*; public class OffsetTrackingCoordinatorTest { @@ -334,6 +335,11 @@ void manualShouldStoreIfRequestedStoredOffsetIsBehind() { Context context(long offset, Runnable action) { return new Context() { + @Override + public boolean markHandled() { + return true; + } + @Override public long offset() { return offset; diff --git a/src/test/java/com/rabbitmq/stream/impl/RetentionClientTest.java b/src/test/java/com/rabbitmq/stream/impl/RetentionClientTest.java index 252d0b4522..abf5c6fb63 100644 --- a/src/test/java/com/rabbitmq/stream/impl/RetentionClientTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/RetentionClientTest.java @@ -80,7 +80,7 @@ static RetentionTestConfig[] retention() { // small retention in policy String policyCommand = String.format( - "set_policy stream-retention-test \"%s\" " + "set_policy stream-retention-test '%s' " + "'{\"max-length-bytes\":%d,\"stream-max-segment-size-bytes\":%d }' " + "--priority 1 --apply-to queues", stream, maxLengthBytes, maxSegmentSizeBytes); diff --git a/src/test/java/com/rabbitmq/stream/impl/SacClientTest.java b/src/test/java/com/rabbitmq/stream/impl/SacClientTest.java index b915375448..85db4f670c 100644 --- a/src/test/java/com/rabbitmq/stream/impl/SacClientTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/SacClientTest.java @@ -13,19 +13,6 @@ // info@rabbitmq.com. package com.rabbitmq.stream.impl; -import static com.rabbitmq.stream.impl.TestUtils.BrokerVersion.RABBITMQ_3_11_14; -import static com.rabbitmq.stream.impl.TestUtils.ResponseConditions.ko; -import static com.rabbitmq.stream.impl.TestUtils.ResponseConditions.ok; -import static com.rabbitmq.stream.impl.TestUtils.ResponseConditions.responseCode; -import static com.rabbitmq.stream.impl.TestUtils.b; -import static com.rabbitmq.stream.impl.TestUtils.declareSuperStreamTopology; -import static com.rabbitmq.stream.impl.TestUtils.deleteSuperStreamTopology; -import static com.rabbitmq.stream.impl.TestUtils.latchAssert; -import static com.rabbitmq.stream.impl.TestUtils.streamName; -import static com.rabbitmq.stream.impl.TestUtils.waitAtMost; -import static java.util.stream.Collectors.toList; -import static org.assertj.core.api.Assertions.assertThat; - import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.stream.Constants; @@ -36,9 +23,11 @@ import com.rabbitmq.stream.impl.Client.CreditNotification; import com.rabbitmq.stream.impl.Client.MessageListener; import com.rabbitmq.stream.impl.Client.Response; -import com.rabbitmq.stream.impl.TestUtils.BrokerVersionAtLeast; -import com.rabbitmq.stream.impl.TestUtils.BrokerVersionAtLeast311Condition; -import com.rabbitmq.stream.impl.TestUtils.DisabledIfRabbitMqCtlNotSet; +import com.rabbitmq.stream.impl.TestUtils.*; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; +import org.junit.jupiter.api.extension.ExtendWith; + import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.Collections; @@ -53,9 +42,12 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.stream.IntStream; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.TestInfo; -import org.junit.jupiter.api.extension.ExtendWith; + +import static com.rabbitmq.stream.impl.TestUtils.BrokerVersion.RABBITMQ_3_11_14; +import static com.rabbitmq.stream.impl.TestUtils.ResponseConditions.*; +import static com.rabbitmq.stream.impl.TestUtils.*; +import static java.util.stream.Collectors.toList; +import static org.assertj.core.api.Assertions.assertThat; @ExtendWith({ TestUtils.StreamTestInfrastructureExtension.class, @@ -522,7 +514,7 @@ void superStreamRebalancingShouldWorkWhilePublishing(TestInfo info) throws Excep // we keep track of credit errors // with the amount of initial credit and the rebalancing, // the first subscriber is likely to have in-flight credit commands - // when it becomes inactive. The server should then sends some credit + // when it becomes inactive. The server should then send some credit // notifications to tell the client it's not supposed to ask for credits // for this subscription. CreditNotification creditNotification = @@ -572,7 +564,7 @@ void superStreamRebalancingShouldWorkWhilePublishing(TestInfo info) throws Excep waitAtMost( () -> - creditNotificationResponseCode.get() == Constants.RESPONSE_CODE_PRECONDITION_FAILED); + creditNotificationResponseCode.get() == Constants.RESPONSE_CODE_PRECONDITION_FAILED, () -> "Code was actually: " + creditNotificationResponseCode.get()); Response response = client1.unsubscribe(b(0)); assertThat(response).is(ok()); diff --git a/src/test/java/com/rabbitmq/stream/impl/StreamConsumerTest.java b/src/test/java/com/rabbitmq/stream/impl/StreamConsumerTest.java index 96f0f51eb9..8d1cf36b41 100644 --- a/src/test/java/com/rabbitmq/stream/impl/StreamConsumerTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/StreamConsumerTest.java @@ -13,25 +13,35 @@ // info@rabbitmq.com. package com.rabbitmq.stream.impl; -import static com.rabbitmq.stream.impl.TestUtils.b; -import static com.rabbitmq.stream.impl.TestUtils.latchAssert; -import static com.rabbitmq.stream.impl.TestUtils.localhost; -import static com.rabbitmq.stream.impl.TestUtils.streamName; -import static com.rabbitmq.stream.impl.TestUtils.waitAtMost; -import static com.rabbitmq.stream.impl.TestUtils.waitMs; -import static java.lang.String.format; -import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatThrownBy; - -import com.rabbitmq.stream.*; +import com.rabbitmq.stream.Address; +import com.rabbitmq.stream.BackOffDelayPolicy; +import com.rabbitmq.stream.ConfirmationHandler; +import com.rabbitmq.stream.Consumer; +import com.rabbitmq.stream.ConsumerBuilder; +import com.rabbitmq.stream.Environment; +import com.rabbitmq.stream.EnvironmentBuilder; +import com.rabbitmq.stream.Host; +import com.rabbitmq.stream.MessageHandler; +import com.rabbitmq.stream.NoOffsetException; +import com.rabbitmq.stream.OffsetSpecification; +import com.rabbitmq.stream.Producer; +import com.rabbitmq.stream.StreamDoesNotExistException; import com.rabbitmq.stream.impl.Client.QueryOffsetResponse; import com.rabbitmq.stream.impl.MonitoringTestUtils.ConsumerInfo; -import com.rabbitmq.stream.impl.TestUtils.BrokerVersion; -import com.rabbitmq.stream.impl.TestUtils.BrokerVersionAtLeast; -import com.rabbitmq.stream.impl.TestUtils.DisabledIfRabbitMqCtlNotSet; +import com.rabbitmq.stream.impl.flow.MaximumChunksPerSubscriptionAsyncConsumerFlowControlStrategy; import io.netty.channel.EventLoopGroup; +import org.assertj.core.api.ThrowableAssert.ThrowingCallable; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; + import java.net.UnknownHostException; import java.time.Duration; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -48,14 +58,11 @@ import java.util.function.UnaryOperator; import java.util.stream.IntStream; import java.util.stream.Stream; -import org.assertj.core.api.ThrowableAssert.ThrowingCallable; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.TestInfo; -import org.junit.jupiter.api.extension.ExtendWith; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.MethodSource; + +import static com.rabbitmq.stream.impl.TestUtils.*; +import static java.lang.String.format; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; @ExtendWith(TestUtils.StreamTestInfrastructureExtension.class) public class StreamConsumerTest { @@ -68,7 +75,7 @@ public class StreamConsumerTest { TestUtils.ClientFactory cf; Environment environment; - static Stream> consumerShouldKeepConsumingAfterDisruption() { + static Stream> consumerDisruptionTasks() { return Stream.of( TestUtils.namedTask( o -> { @@ -200,6 +207,140 @@ void consume() throws Exception { consumer.close(); } + @Test + void consumeWithAsyncConsumerFlowControl() throws Exception { + int messageCount = 100_000; + CountDownLatch publishLatch = new CountDownLatch(messageCount); + Client client = + cf.get( + new Client.ClientParameters() + .publishConfirmListener((publisherId, publishingId) -> publishLatch.countDown())); + + client.declarePublisher(b(1), null, stream); + IntStream.range(0, messageCount) + .forEach( + i -> + client.publish( + b(1), + Collections.singletonList( + client.messageBuilder().addData("".getBytes()).build()))); + + assertThat(publishLatch.await(10, TimeUnit.SECONDS)).isTrue(); + + CountDownLatch firstConsumeLatch = new CountDownLatch(1); + CountDownLatch consumeLatch = new CountDownLatch(messageCount); + + AtomicLong chunkTimestamp = new AtomicLong(); + + ConsumerBuilder consumerBuilder = environment.consumerBuilder().stream(stream) + .offset(OffsetSpecification.first()) + .asynchronousControlFlow(5); + + List messageContexts = new ArrayList<>(); + + AtomicBoolean shouldInstaConsume = new AtomicBoolean(false); + AtomicBoolean unhandledOnInstaConsume = new AtomicBoolean(false); + + consumerBuilder = consumerBuilder + .messageHandler( + (context, message) -> { + if(shouldInstaConsume.get()) { + if(!context.markHandled()) { + unhandledOnInstaConsume.set(true); + } + } else { + messageContexts.add(context); + } + firstConsumeLatch.countDown(); + chunkTimestamp.set(context.timestamp()); + consumeLatch.countDown(); + }); + Consumer consumer = consumerBuilder.build(); + + assertThat(firstConsumeLatch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(consumeLatch.await(10, TimeUnit.SECONDS)).isFalse(); + assertThat(chunkTimestamp.get()).isNotZero(); + + shouldInstaConsume.set(true); + boolean allMarkedHandled = messageContexts.parallelStream().allMatch(MessageHandler.Context::markHandled); + assertThat(allMarkedHandled).isTrue(); + + assertThat(consumeLatch.await(10, TimeUnit.SECONDS)).isTrue(); + + assertThat(unhandledOnInstaConsume.get()).isFalse(); + + consumer.close(); + } + + @Test + void consumeWithCustomAsyncConsumerFlowControl() throws Exception { + int messageCount = 100_000; + CountDownLatch publishLatch = new CountDownLatch(messageCount); + Client client = + cf.get( + new Client.ClientParameters() + .publishConfirmListener((publisherId, publishingId) -> publishLatch.countDown())); + + client.declarePublisher(b(1), null, stream); + IntStream.range(0, messageCount) + .forEach( + i -> + client.publish( + b(1), + Collections.singletonList( + client.messageBuilder().addData("".getBytes()).build()))); + + assertThat(publishLatch.await(10, TimeUnit.SECONDS)).isTrue(); + + CountDownLatch firstConsumeLatch = new CountDownLatch(1); + CountDownLatch consumeLatch = new CountDownLatch(messageCount); + + AtomicLong chunkTimestamp = new AtomicLong(); + + ConsumerBuilder consumerBuilder = environment.consumerBuilder().stream(stream) + .offset(OffsetSpecification.first()); + + MaximumChunksPerSubscriptionAsyncConsumerFlowControlStrategy.Builder flowControlStrategyBuilder = consumerBuilder + .customFlowControlStrategy(MaximumChunksPerSubscriptionAsyncConsumerFlowControlStrategy::builder) + .maximumInflightChunksPerSubscription(5); + + List messageContexts = new ArrayList<>(); + + AtomicBoolean shouldInstaConsume = new AtomicBoolean(false); + AtomicBoolean unhandledOnInstaConsume = new AtomicBoolean(false); + + consumerBuilder = flowControlStrategyBuilder + .builder() + .messageHandler( + (context, message) -> { + if(shouldInstaConsume.get()) { + if(!context.markHandled()) { + unhandledOnInstaConsume.set(true); + } + } else { + messageContexts.add(context); + } + firstConsumeLatch.countDown(); + chunkTimestamp.set(context.timestamp()); + consumeLatch.countDown(); + }); + Consumer consumer = consumerBuilder.build(); + + assertThat(firstConsumeLatch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(consumeLatch.await(10, TimeUnit.SECONDS)).isFalse(); + assertThat(chunkTimestamp.get()).isNotZero(); + + shouldInstaConsume.set(true); + boolean allMarkedHandled = messageContexts.parallelStream().allMatch(MessageHandler.Context::markHandled); + assertThat(allMarkedHandled).isTrue(); + + assertThat(consumeLatch.await(10, TimeUnit.SECONDS)).isTrue(); + + assertThat(unhandledOnInstaConsume.get()).isFalse(); + + consumer.close(); + } + @Test void closeOnCondition() throws Exception { int messageCount = 50_000; @@ -471,7 +612,7 @@ void consumerShouldReUseInitialOffsetSpecificationAfterDisruptionIfNoMessagesRec } @ParameterizedTest - @MethodSource + @MethodSource("consumerDisruptionTasks") @TestUtils.DisabledIfRabbitMqCtlNotSet void consumerShouldKeepConsumingAfterDisruption( java.util.function.Consumer disruption, TestInfo info) throws Exception { @@ -546,6 +687,110 @@ void consumerShouldKeepConsumingAfterDisruption( } } + @ParameterizedTest + @MethodSource("consumerDisruptionTasks") + @TestUtils.DisabledIfRabbitMqCtlNotSet + void consumerWithAsyncFlowControlShouldKeepConsumingAfterDisruption( + java.util.function.Consumer disruption, TestInfo info) throws Exception { + String s = streamName(info); + environment.streamCreator().stream(s).create(); + StreamConsumer consumer = null; + try { + int messageCount = 10_000; + CountDownLatch publishLatch = new CountDownLatch(messageCount); + Producer producer = environment.producerBuilder().stream(s).build(); + IntStream.range(0, messageCount) + .forEach( + i -> + producer.send( + producer.messageBuilder().addData("".getBytes()).build(), + confirmationStatus -> publishLatch.countDown())); + + assertThat(publishLatch.await(10, TimeUnit.SECONDS)).isTrue(); + producer.close(); + + AtomicInteger receivedMessageCount = new AtomicInteger(0); + CountDownLatch firstConsumeLatch = new CountDownLatch(1); + CountDownLatch consumeLatch = new CountDownLatch(messageCount); + CountDownLatch consumeLatchSecondWave = new CountDownLatch(messageCount * 2); + + ConsumerBuilder consumerBuilder = environment.consumerBuilder() + .stream(s) + .asynchronousControlFlow(5); + + List messageContexts = new ArrayList<>(); + + AtomicBoolean shouldInstaConsume = new AtomicBoolean(false); + AtomicBoolean unhandledOnInstaConsume = new AtomicBoolean(false); + + consumer = + (StreamConsumer) + consumerBuilder + .offset(OffsetSpecification.first()) + .messageHandler( + (context, message) -> { + if(shouldInstaConsume.get()) { + if(!context.markHandled()) { + unhandledOnInstaConsume.set(true); + } + } else { + messageContexts.add(context); + } + receivedMessageCount.incrementAndGet(); + firstConsumeLatch.countDown(); + consumeLatch.countDown(); + consumeLatchSecondWave.countDown(); + }) + .build(); + + assertThat(firstConsumeLatch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(consumeLatch.await(10, TimeUnit.SECONDS)).isFalse(); + + assertThat(consumer.isOpen()).isTrue(); + + shouldInstaConsume.set(true); + boolean allMarkedHandled = messageContexts.parallelStream().allMatch(MessageHandler.Context::markHandled); + assertThat(allMarkedHandled).isTrue(); + + assertThat(consumeLatch.await(20, TimeUnit.SECONDS)).isTrue(); + + assertThat(unhandledOnInstaConsume.get()).isFalse(); + + disruption.accept(s); + + Client client = cf.get(); + TestUtils.waitAtMost( + recoveryInitialDelay.plusSeconds(2), + () -> { + Client.StreamMetadata metadata = client.metadata(s).get(s); + return metadata.getLeader() != null || !metadata.getReplicas().isEmpty(); + }); + + CountDownLatch publishLatchSecondWave = new CountDownLatch(messageCount); + Producer producerSecondWave = environment.producerBuilder().stream(s).build(); + IntStream.range(0, messageCount) + .forEach( + i -> + producerSecondWave.send( + producerSecondWave.messageBuilder().addData("".getBytes()).build(), + confirmationStatus -> publishLatchSecondWave.countDown())); + + assertThat(publishLatchSecondWave.await(20, TimeUnit.SECONDS)).isTrue(); + producerSecondWave.close(); + + latchAssert(consumeLatchSecondWave).completes(recoveryInitialDelay.plusSeconds(30)); + assertThat(receivedMessageCount.get()) + .isBetween(messageCount * 2, messageCount * 2 + 1); // there can be a duplicate + assertThat(consumer.isOpen()).isTrue(); + + } finally { + if (consumer != null) { + consumer.close(); + } + environment.deleteStream(s); + } + } + @Test void autoTrackingShouldStorePeriodicallyAndAfterInactivity() throws Exception { AtomicInteger messageCount = new AtomicInteger(0); diff --git a/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentTest.java b/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentTest.java index 0203c8f536..14467c9772 100644 --- a/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentTest.java @@ -150,8 +150,7 @@ void environmentCreationShouldFailWithUrlUsingWrongPort() { .build() .close()) .isInstanceOf(StreamException.class) - .hasCauseInstanceOf(ConnectException.class) - .hasRootCauseMessage("Connection refused"); + .hasCauseInstanceOf(ConnectException.class); } @Test