diff --git a/src/main/java/com/rabbitmq/stream/CallbackStreamDataHandler.java b/src/main/java/com/rabbitmq/stream/CallbackStreamDataHandler.java
new file mode 100644
index 0000000000..e548946f82
--- /dev/null
+++ b/src/main/java/com/rabbitmq/stream/CallbackStreamDataHandler.java
@@ -0,0 +1,56 @@
+package com.rabbitmq.stream;
+
+/**
+ * Exposes callbacks to handle events from a particular Stream subscription,
+ * with specific names for methods and no connection-oriented parameter.
+ */
+public interface CallbackStreamDataHandler {
+
+ default void handleChunk(long offset, long messageCount, long dataSize) {
+ // No-op by default
+ }
+
+ default void handleMessage(long offset, long chunkTimestamp, long committedChunkId, Message message) {
+ // No-op by default
+ }
+
+ default void handleCreditNotification(short responseCode) {
+ // No-op by default
+ }
+
+ default void handleConsumerUpdate(boolean active) {
+ // No-op by default
+ }
+
+ default void handleMetadata(short code) {
+ // No-op by default
+ }
+
+ /**
+ * Callback for handling a stream subscription.
+ *
+ * @param offsetSpecification The offset specification for this new subscription
+ * @param isInitialSubscription Whether this subscription is an initial subscription
+ * or a recovery for an existing subscription
+ */
+ default void handleSubscribe(
+ OffsetSpecification offsetSpecification,
+ boolean isInitialSubscription
+ ) {
+ // No-op by default
+ }
+
+ /**
+ * Callback for handling a stream unsubscription.
+ */
+ default void handleUnsubscribe() {
+ if(this instanceof AutoCloseable) {
+ try {
+ ((AutoCloseable) this).close();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+}
diff --git a/src/main/java/com/rabbitmq/stream/ConsumerBuilder.java b/src/main/java/com/rabbitmq/stream/ConsumerBuilder.java
index 7484ddc433..933ad569d9 100644
--- a/src/main/java/com/rabbitmq/stream/ConsumerBuilder.java
+++ b/src/main/java/com/rabbitmq/stream/ConsumerBuilder.java
@@ -13,6 +13,10 @@
// info@rabbitmq.com.
package com.rabbitmq.stream;
+import com.rabbitmq.stream.flow.ConsumerFlowControlStrategy;
+import com.rabbitmq.stream.flow.ConsumerFlowControlStrategyBuilder;
+import com.rabbitmq.stream.flow.ConsumerFlowControlStrategyBuilderFactory;
+
import java.time.Duration;
/** API to configure and create a {@link Consumer}. */
@@ -59,6 +63,55 @@ public interface ConsumerBuilder {
*/
ConsumerBuilder messageHandler(MessageHandler messageHandler);
+ /**
+ * Configure prefetching parameters for synchronous flow control.
+ *
+ *
+ * Treat the parameters as an abstract scale at the {@link Consumer} level.
+ *
+ *
+ * @param initialPrefetchLevel The initial level of message pre-fetching.
+ * This may me implemented as the credits to initially
+ * ask for with each new subscription,
+ * but do not depend strongly on this aspect.
+ * @param prefetchLevelAfterDelivery The level of message pre-fetching after the initial batch.
+ * This may be implemented as the credits to ask for after a chunk
+ * is delivered on each subscription,
+ * but do not depend strongly on this aspect.
+ *
+ * The recommended value is 1.
+ * Higher values may cause excessive over-fetching.
+ *
+ * @return this {@link ConsumerBuilder}
+ */
+ ConsumerBuilder synchronousControlFlow(int initialPrefetchLevel, int prefetchLevelAfterDelivery);
+
+ /**
+ * Configure prefetching parameters for asynchronous flow control.
+ *
+ *
+ * Treat the parameters as an abstract scale at the {@link Consumer} level.
+ *
+ *
+ * @param prefetchLevel The desired level of message pre-fetching.
+ * This may be implemented as the maximum chunks to have in processing or pending arrival
+ * per subscription at a given moment, but do not depend strongly on this aspect.
+ * @return this {@link ConsumerBuilder}
+ */
+ ConsumerBuilder asynchronousControlFlow(int prefetchLevel);
+
+ /**
+ * Factory for the flow control strategy to be used when consuming messages.
+ * When there is no need to use a custom strategy, which is the majority of cases,
+ * prefer using {@link ConsumerBuilder#synchronousControlFlow} or {@link ConsumerBuilder#asynchronousControlFlow} instead.
+ *
+ * @param consumerFlowControlStrategyBuilderFactory the factory
+ * @return a fluent configurable builder for the flow control strategy
+ * @param The type of the builder for the provided factory
+ */
+ , S extends ConsumerFlowControlStrategy>
+ T customFlowControlStrategy(ConsumerFlowControlStrategyBuilderFactory consumerFlowControlStrategyBuilderFactory);
+
/**
* The logical name of the {@link Consumer}.
*
@@ -151,13 +204,6 @@ public interface ConsumerBuilder {
*/
ConsumerBuilder noTrackingStrategy();
- /**
- * Configure flow of messages.
- *
- * @return the flow configuration
- */
- FlowConfiguration flow();
-
/**
* Create the configured {@link Consumer}
*
@@ -166,7 +212,7 @@ public interface ConsumerBuilder {
Consumer build();
/** Manual tracking strategy. */
- interface ManualTrackingStrategy {
+ interface ManualTrackingStrategy extends ConsumerBuilderAccessor {
/**
* Interval to check if the last requested stored offset has been actually stored.
@@ -177,17 +223,10 @@ interface ManualTrackingStrategy {
* @return the manual tracking strategy
*/
ManualTrackingStrategy checkInterval(Duration checkInterval);
-
- /**
- * Go back to the builder.
- *
- * @return the consumer builder
- */
- ConsumerBuilder builder();
}
/** Auto-tracking strategy. */
- interface AutoTrackingStrategy {
+ interface AutoTrackingStrategy extends ConsumerBuilderAccessor {
/**
* Number of messages before storing.
@@ -208,28 +247,9 @@ interface AutoTrackingStrategy {
* @return the auto-tracking strategy
*/
AutoTrackingStrategy flushInterval(Duration flushInterval);
-
- /**
- * Go back to the builder.
- *
- * @return the consumer builder
- */
- ConsumerBuilder builder();
}
- /** Message flow configuration. */
- interface FlowConfiguration {
-
- /**
- * The number of initial credits for the subscription.
- *
- *
Default is 1.
- *
- * @param initialCredits the number of initial credits
- * @return this configuration instance
- */
- FlowConfiguration initialCredits(int initialCredits);
-
+ interface ConsumerBuilderAccessor {
/**
* Go back to the builder.
*
@@ -237,4 +257,5 @@ interface FlowConfiguration {
*/
ConsumerBuilder builder();
}
+
}
diff --git a/src/main/java/com/rabbitmq/stream/MessageHandler.java b/src/main/java/com/rabbitmq/stream/MessageHandler.java
index d9d39fdb63..907ba62d61 100644
--- a/src/main/java/com/rabbitmq/stream/MessageHandler.java
+++ b/src/main/java/com/rabbitmq/stream/MessageHandler.java
@@ -84,5 +84,13 @@ interface Context {
* @see Consumer#store(long)
*/
Consumer consumer();
+
+ /**
+ * Marks this message as handled
+ *
+ * @return Whether the message was marked as handled (returning {@code true})
+ * or was not found (either because it was already marked as handled, or wasn't tracked)
+ */
+ boolean markHandled();
}
}
diff --git a/src/main/java/com/rabbitmq/stream/flow/AbstractConsumerFlowControlStrategy.java b/src/main/java/com/rabbitmq/stream/flow/AbstractConsumerFlowControlStrategy.java
new file mode 100644
index 0000000000..b810ed6904
--- /dev/null
+++ b/src/main/java/com/rabbitmq/stream/flow/AbstractConsumerFlowControlStrategy.java
@@ -0,0 +1,47 @@
+package com.rabbitmq.stream.flow;
+
+import java.util.Objects;
+
+/**
+ * Abstract class for Consumer Flow Control Strategies which keeps a
+ * {@link CreditAsker creditAsker} field ready for retrieval by its inheritors.
+ */
+public abstract class AbstractConsumerFlowControlStrategy implements ConsumerFlowControlStrategy {
+
+ private final String identifier;
+ private final CreditAsker creditAsker;
+
+ protected AbstractConsumerFlowControlStrategy(String identifier, CreditAsker creditAsker) {
+ this.identifier = identifier;
+ this.creditAsker = Objects.requireNonNull(creditAsker, "creditAsker");
+ }
+
+ public CreditAsker getCreditAsker() {
+ return creditAsker;
+ }
+
+ public String getIdentifier() {
+ return identifier;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ AbstractConsumerFlowControlStrategy that = (AbstractConsumerFlowControlStrategy) o;
+ return Objects.equals(identifier, that.identifier) && Objects.equals(creditAsker, that.creditAsker);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(identifier, creditAsker);
+ }
+
+ @Override
+ public String toString() {
+ return "AbstractConsumerFlowControlStrategy{" +
+ "identifier='" + identifier + '\'' +
+ ", creditAsker=" + creditAsker +
+ '}';
+ }
+}
diff --git a/src/main/java/com/rabbitmq/stream/flow/AsyncConsumerFlowControlStrategy.java b/src/main/java/com/rabbitmq/stream/flow/AsyncConsumerFlowControlStrategy.java
new file mode 100644
index 0000000000..14a27ae9b5
--- /dev/null
+++ b/src/main/java/com/rabbitmq/stream/flow/AsyncConsumerFlowControlStrategy.java
@@ -0,0 +1,9 @@
+package com.rabbitmq.stream.flow;
+
+/**
+ * Variant of {@link ConsumerFlowControlStrategy} that implements {@link MessageHandlingListener} to asynchronously
+ * mark messages as handled.
+ */
+public interface AsyncConsumerFlowControlStrategy extends ConsumerFlowControlStrategy, MessageHandlingListener {
+
+}
diff --git a/src/main/java/com/rabbitmq/stream/flow/ConsumerFlowControlStrategy.java b/src/main/java/com/rabbitmq/stream/flow/ConsumerFlowControlStrategy.java
new file mode 100644
index 0000000000..33113294cf
--- /dev/null
+++ b/src/main/java/com/rabbitmq/stream/flow/ConsumerFlowControlStrategy.java
@@ -0,0 +1,42 @@
+package com.rabbitmq.stream.flow;
+
+import com.rabbitmq.stream.CallbackStreamDataHandler;
+import com.rabbitmq.stream.OffsetSpecification;
+
+/**
+ * A built and configured flow control strategy for consumers.
+ * Implementations may freely implement reactions to the various client callbacks.
+ * When defined by each implementation, it may internally call {@link CreditAsker#credit} to ask for credits.
+ * One instance of this is expected to be built for each separate subscription.
+ * A {@link com.rabbitmq.stream.Consumer} may have multiple subscriptions, and thus multiple instances of this.
+ */
+public interface ConsumerFlowControlStrategy extends CallbackStreamDataHandler {
+
+ /**
+ * Callback for handling a stream subscription.
+ * Called right before the subscription is sent to the broker.
+ *
+ * Either this variant or {@link CallbackStreamDataHandler#handleSubscribe} should be called, NOT both.
+ *
+ *
+ * @param offsetSpecification The offset specification for this new subscription
+ * @param isInitialSubscription Whether this subscription is an initial subscription
+ * or a recovery for an existing subscription
+ * @return The initial credits that should be granted to this new subscription
+ */
+ int handleSubscribeReturningInitialCredits(
+ OffsetSpecification offsetSpecification,
+ boolean isInitialSubscription
+ );
+
+ @Override
+ default void handleSubscribe(
+ OffsetSpecification offsetSpecification,
+ boolean isInitialSubscription) {
+ handleSubscribeReturningInitialCredits(
+ offsetSpecification,
+ isInitialSubscription
+ );
+ }
+
+}
diff --git a/src/main/java/com/rabbitmq/stream/flow/ConsumerFlowControlStrategyBuilder.java b/src/main/java/com/rabbitmq/stream/flow/ConsumerFlowControlStrategyBuilder.java
new file mode 100644
index 0000000000..53ead20daf
--- /dev/null
+++ b/src/main/java/com/rabbitmq/stream/flow/ConsumerFlowControlStrategyBuilder.java
@@ -0,0 +1,21 @@
+package com.rabbitmq.stream.flow;
+
+import com.rabbitmq.stream.ConsumerBuilder;
+
+/**
+ * Fluent builder for a {@link ConsumerFlowControlStrategyBuilderFactory}.
+ * One instance of this is set per {@link com.rabbitmq.stream.Consumer}.
+ * A {@link com.rabbitmq.stream.Consumer} may have multiple subscriptions, and thus multiple instances built by this.
+ *
+ * @param the type of {@link ConsumerFlowControlStrategy} to be built
+ */
+public interface ConsumerFlowControlStrategyBuilder extends ConsumerBuilder.ConsumerBuilderAccessor {
+ /**
+ * Builds the actual FlowControlStrategy instance, for the Client with which it interoperates
+ *
+ * @param identifier A {@link String} to uniquely identify the built instance and/or its subscription.
+ * @param creditAsker {@link CreditAsker} for asking for credits.
+ * @return {@link T} the built {@link ConsumerFlowControlStrategy}
+ */
+ T build(String identifier, CreditAsker creditAsker);
+}
diff --git a/src/main/java/com/rabbitmq/stream/flow/ConsumerFlowControlStrategyBuilderFactory.java b/src/main/java/com/rabbitmq/stream/flow/ConsumerFlowControlStrategyBuilderFactory.java
new file mode 100644
index 0000000000..28450f54bd
--- /dev/null
+++ b/src/main/java/com/rabbitmq/stream/flow/ConsumerFlowControlStrategyBuilderFactory.java
@@ -0,0 +1,17 @@
+package com.rabbitmq.stream.flow;
+
+import com.rabbitmq.stream.ConsumerBuilder;
+
+/**
+ * A strategy for regulating consumer flow when consuming from a RabbitMQ Stream.
+ * @param the type of {@link ConsumerFlowControlStrategy} to be built
+ * @param the type of fluent builder exposed by this factory. Must subclass {@link ConsumerFlowControlStrategyBuilder}
+ */
+@FunctionalInterface
+public interface ConsumerFlowControlStrategyBuilderFactory> {
+ /**
+ * Accessor for configuration builder with settings specific to each implementing strategy
+ * @return {@link C} the specific consumer flow control strategy configuration builder
+ */
+ C builder(ConsumerBuilder consumerBuilder);
+}
diff --git a/src/main/java/com/rabbitmq/stream/flow/CreditAsker.java b/src/main/java/com/rabbitmq/stream/flow/CreditAsker.java
new file mode 100644
index 0000000000..7437053b83
--- /dev/null
+++ b/src/main/java/com/rabbitmq/stream/flow/CreditAsker.java
@@ -0,0 +1,13 @@
+package com.rabbitmq.stream.flow;
+
+@FunctionalInterface
+public interface CreditAsker {
+
+ /**
+ * Asks for credits for a given subscription.
+ * @param credits How many credits to ask for
+ * @throws IllegalArgumentException If credits are below 0 or above {@link Short#MAX_VALUE}
+ */
+ void credit(int credits);
+
+}
diff --git a/src/main/java/com/rabbitmq/stream/flow/MessageHandlingListener.java b/src/main/java/com/rabbitmq/stream/flow/MessageHandlingListener.java
new file mode 100644
index 0000000000..7884e52727
--- /dev/null
+++ b/src/main/java/com/rabbitmq/stream/flow/MessageHandlingListener.java
@@ -0,0 +1,17 @@
+package com.rabbitmq.stream.flow;
+
+import com.rabbitmq.stream.MessageHandler;
+
+@FunctionalInterface
+public interface MessageHandlingListener {
+
+ /**
+ * Marks a message as handled
+ *
+ * @param messageContext The {@link MessageHandler.Context} of the handled message
+ * @return Whether the message was marked as handled (returning {@code true})
+ * or was not found (either because it was already marked as handled, or wasn't tracked)
+ */
+ boolean markHandled(MessageHandler.Context messageContext);
+
+}
diff --git a/src/main/java/com/rabbitmq/stream/impl/Client.java b/src/main/java/com/rabbitmq/stream/impl/Client.java
index f85fa9a51a..d3a78b1b61 100644
--- a/src/main/java/com/rabbitmq/stream/impl/Client.java
+++ b/src/main/java/com/rabbitmq/stream/impl/Client.java
@@ -13,43 +13,6 @@
// info@rabbitmq.com.
package com.rabbitmq.stream.impl;
-import static com.rabbitmq.stream.Constants.COMMAND_CLOSE;
-import static com.rabbitmq.stream.Constants.COMMAND_CONSUMER_UPDATE;
-import static com.rabbitmq.stream.Constants.COMMAND_CREATE_STREAM;
-import static com.rabbitmq.stream.Constants.COMMAND_CREDIT;
-import static com.rabbitmq.stream.Constants.COMMAND_DECLARE_PUBLISHER;
-import static com.rabbitmq.stream.Constants.COMMAND_DELETE_PUBLISHER;
-import static com.rabbitmq.stream.Constants.COMMAND_DELETE_STREAM;
-import static com.rabbitmq.stream.Constants.COMMAND_EXCHANGE_COMMAND_VERSIONS;
-import static com.rabbitmq.stream.Constants.COMMAND_HEARTBEAT;
-import static com.rabbitmq.stream.Constants.COMMAND_METADATA;
-import static com.rabbitmq.stream.Constants.COMMAND_OPEN;
-import static com.rabbitmq.stream.Constants.COMMAND_PARTITIONS;
-import static com.rabbitmq.stream.Constants.COMMAND_PEER_PROPERTIES;
-import static com.rabbitmq.stream.Constants.COMMAND_PUBLISH;
-import static com.rabbitmq.stream.Constants.COMMAND_QUERY_OFFSET;
-import static com.rabbitmq.stream.Constants.COMMAND_QUERY_PUBLISHER_SEQUENCE;
-import static com.rabbitmq.stream.Constants.COMMAND_ROUTE;
-import static com.rabbitmq.stream.Constants.COMMAND_SASL_AUTHENTICATE;
-import static com.rabbitmq.stream.Constants.COMMAND_SASL_HANDSHAKE;
-import static com.rabbitmq.stream.Constants.COMMAND_STORE_OFFSET;
-import static com.rabbitmq.stream.Constants.COMMAND_STREAM_STATS;
-import static com.rabbitmq.stream.Constants.COMMAND_SUBSCRIBE;
-import static com.rabbitmq.stream.Constants.COMMAND_UNSUBSCRIBE;
-import static com.rabbitmq.stream.Constants.RESPONSE_CODE_AUTHENTICATION_FAILURE;
-import static com.rabbitmq.stream.Constants.RESPONSE_CODE_AUTHENTICATION_FAILURE_LOOPBACK;
-import static com.rabbitmq.stream.Constants.RESPONSE_CODE_OK;
-import static com.rabbitmq.stream.Constants.RESPONSE_CODE_SASL_CHALLENGE;
-import static com.rabbitmq.stream.Constants.VERSION_1;
-import static com.rabbitmq.stream.impl.Utils.encodeRequestCode;
-import static com.rabbitmq.stream.impl.Utils.encodeResponseCode;
-import static com.rabbitmq.stream.impl.Utils.extractResponseCode;
-import static com.rabbitmq.stream.impl.Utils.formatConstant;
-import static com.rabbitmq.stream.impl.Utils.noOpConsumer;
-import static java.lang.String.format;
-import static java.lang.String.join;
-import static java.util.concurrent.TimeUnit.SECONDS;
-
import com.rabbitmq.stream.AuthenticationFailureException;
import com.rabbitmq.stream.ByteCapacity;
import com.rabbitmq.stream.ChunkChecksum;
@@ -69,7 +32,6 @@
import com.rabbitmq.stream.impl.Client.ShutdownContext.ShutdownReason;
import com.rabbitmq.stream.impl.ServerFrameHandler.FrameHandler;
import com.rabbitmq.stream.impl.ServerFrameHandler.FrameHandlerInfo;
-import com.rabbitmq.stream.impl.Utils.NamedThreadFactory;
import com.rabbitmq.stream.metrics.MetricsCollector;
import com.rabbitmq.stream.metrics.NoOpMetricsCollector;
import com.rabbitmq.stream.sasl.CredentialsProvider;
@@ -103,6 +65,12 @@
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLHandshakeException;
+import javax.net.ssl.SSLParameters;
import java.io.IOException;
import java.io.OutputStream;
import java.lang.reflect.Field;
@@ -134,11 +102,12 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.ToLongFunction;
-import javax.net.ssl.SSLEngine;
-import javax.net.ssl.SSLHandshakeException;
-import javax.net.ssl.SSLParameters;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
+import static com.rabbitmq.stream.Constants.*;
+import static com.rabbitmq.stream.impl.Utils.*;
+import static java.lang.String.format;
+import static java.lang.String.join;
+import static java.util.concurrent.TimeUnit.SECONDS;
/**
* This is low-level client API to communicate with the broker.
diff --git a/src/main/java/com/rabbitmq/stream/impl/ConsumerStatisticRecorder.java b/src/main/java/com/rabbitmq/stream/impl/ConsumerStatisticRecorder.java
new file mode 100644
index 0000000000..aac9466367
--- /dev/null
+++ b/src/main/java/com/rabbitmq/stream/impl/ConsumerStatisticRecorder.java
@@ -0,0 +1,354 @@
+package com.rabbitmq.stream.impl;
+
+import com.rabbitmq.stream.CallbackStreamDataHandler;
+import com.rabbitmq.stream.Message;
+import com.rabbitmq.stream.MessageHandler;
+import com.rabbitmq.stream.OffsetSpecification;
+import com.rabbitmq.stream.flow.MessageHandlingListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class ConsumerStatisticRecorder implements CallbackStreamDataHandler, MessageHandlingListener {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(ConsumerStatisticRecorder.class);
+
+ private final String identifier;
+ private final AtomicReference subscriptionStatistics = new AtomicReference<>();
+
+ public ConsumerStatisticRecorder(String identifier) {
+ this.identifier = identifier;
+ }
+
+ @Override
+ public void handleSubscribe(
+ OffsetSpecification offsetSpecification,
+ boolean isInitialSubscription
+ ) {
+ SubscriptionStatistics localSubscriptionStatistics = this.subscriptionStatistics.get();
+ if(localSubscriptionStatistics == null) {
+ this.subscriptionStatistics.set(new SubscriptionStatistics(offsetSpecification));
+ return;
+ }
+ if(isInitialSubscription) {
+ LOGGER.warn(
+ "handleSubscribe called for stream that already had same associated subscription! "
+ + "identifier={} offsetSpecification={}",
+ this.identifier,
+ offsetSpecification
+ );
+ }
+ localSubscriptionStatistics.offsetSpecification = offsetSpecification;
+ localSubscriptionStatistics.pendingChunks.set(0);
+ cleanupOldTrackingData(localSubscriptionStatistics);
+ }
+
+ private void cleanupOldTrackingData(SubscriptionStatistics subscriptionStatistics) {
+ if (!subscriptionStatistics.offsetSpecification.isOffset()) {
+ LOGGER.debug("Can't cleanup old tracking data: offsetSpecification is not an offset! "
+ + "identifier={} offsetSpecification={}",
+ this.identifier,
+ subscriptionStatistics.offsetSpecification
+ );
+ return;
+ }
+ // Mark messages before the initial offset as handled
+ long newSubscriptionInitialOffset = subscriptionStatistics.offsetSpecification.getOffset();
+ NavigableMap chunksHeadMap = subscriptionStatistics.unprocessedChunksByOffset.headMap(newSubscriptionInitialOffset, false);
+ Iterator> chunksHeadMapEntryIterator = chunksHeadMap.entrySet().iterator();
+ while(chunksHeadMapEntryIterator.hasNext()) {
+ Map.Entry chunksHeadMapEntry = chunksHeadMapEntryIterator.next();
+ ChunkStatistics chunkStatistics = chunksHeadMapEntry.getValue();
+ Iterator> chunkMessagesIterator = chunkStatistics.unprocessedMessagesByOffset.entrySet().iterator();
+ while(chunkMessagesIterator.hasNext()) {
+ Map.Entry chunkMessageEntry = chunkMessagesIterator.next();
+ long messageOffset = chunkMessageEntry.getKey();
+ if(messageOffset < newSubscriptionInitialOffset) {
+ chunkMessagesIterator.remove();
+ chunkStatistics.processedMessages.incrementAndGet();
+ }
+ }
+ if(chunkStatistics.isDone()) {
+ chunksHeadMapEntryIterator.remove();
+ }
+ }
+ }
+
+ @Override
+ public void handleChunk(long offset, long messageCount, long dataSize) {
+ SubscriptionStatistics localSubscriptionStatistics = this.subscriptionStatistics.get();
+ if(localSubscriptionStatistics == null) {
+ LOGGER.warn(
+ "handleChunk called for subscription that does not exist! "
+ + "identifier={} offset={} messageCount={} dataSize={}",
+ this.identifier,
+ offset,
+ messageCount,
+ dataSize
+ );
+ return;
+ }
+ localSubscriptionStatistics.pendingChunks.decrementAndGet();
+ localSubscriptionStatistics.unprocessedChunksByOffset.put(offset, new ChunkStatistics(offset, messageCount, dataSize));
+ }
+
+ @Override
+ public void handleMessage(
+ long offset,
+ long chunkTimestamp,
+ long committedChunkId,
+ Message message
+ ) {
+ SubscriptionStatistics localSubscriptionStatistics = this.subscriptionStatistics.get();
+ if(localSubscriptionStatistics == null) {
+ LOGGER.warn(
+ "handleMessage called for subscription that does not exist! "
+ + "identifier={} offset={} chunkTimestamp={} committedChunkId={}",
+ this.identifier,
+ offset,
+ chunkTimestamp,
+ committedChunkId
+ );
+ return;
+ }
+ NavigableMap subHeadMapByOffset = localSubscriptionStatistics.unprocessedChunksByOffset.headMap(offset, true);
+ Map.Entry lastOffsetToChunkEntry = subHeadMapByOffset.lastEntry();
+ if(lastOffsetToChunkEntry == null) {
+ LOGGER.warn(
+ "handleMessage called but chunk was not found! "
+ + "identifier={} offset={} chunkTimestamp={} committedChunkId={}",
+ this.identifier,
+ offset,
+ chunkTimestamp,
+ committedChunkId
+ );
+ return;
+ }
+ ChunkStatistics chunkStatistics = lastOffsetToChunkEntry.getValue();
+ chunkStatistics.unprocessedMessagesByOffset.put(offset, message);
+ }
+
+ @Override
+ public void handleUnsubscribe() {
+ ConsumerStatisticRecorder.SubscriptionStatistics localSubscriptionStatistics = this.subscriptionStatistics.getAndSet(null);
+ if(localSubscriptionStatistics == null) {
+ LOGGER.warn(
+ "handleUnsubscribe called for subscriptionId that does not exist! Identifier: {}", identifier
+ );
+ }
+ }
+
+ /**
+ * Marks a message as handled, changing internal statistics.
+ *
+ * @param messageContext The {@link MessageHandler.Context} of the handled message
+ * @return Whether the message was marked as handled (returning {@code true})
+ * or was not found (either because it was already marked as handled, or wasn't tracked)
+ */
+ @Override
+ public boolean markHandled(MessageHandler.Context messageContext) {
+ AggregatedMessageStatistics entry = retrieveStatistics(messageContext);
+ if (entry == null) {
+ return false;
+ }
+ return markHandled(entry);
+ }
+
+ /**
+ * Marks a message as handled, changing internal statistics.
+ *
+ * @param aggregatedMessageStatistics The {@link AggregatedMessageStatistics} of the handled message
+ * @return Whether the message was marked as handled (returning {@code true})
+ * or was not found (either because it was already marked as handled, or wasn't tracked)
+ */
+ public boolean markHandled(AggregatedMessageStatistics aggregatedMessageStatistics) {
+ // Can't remove, not enough information
+ if (aggregatedMessageStatistics.chunkStatistics == null
+ || aggregatedMessageStatistics.messageEntry == null
+ || aggregatedMessageStatistics.chunkHeadMap == null) {
+ return false;
+ }
+ if(aggregatedMessageStatistics.subscriptionStatistics.offsetSpecification.isOffset()) {
+ long initialOffset = aggregatedMessageStatistics.subscriptionStatistics.offsetSpecification.getOffset();
+ // Old tracked message, should already be handled, probably a late acknowledgment of a defunct connection.
+ if(aggregatedMessageStatistics.offset < initialOffset) {
+ LOGGER.debug("Old message registered as consumed. Identifier={} Message Offset: {}, Start Offset: {}",
+ this.identifier,
+ aggregatedMessageStatistics.offset,
+ initialOffset);
+ return true;
+ }
+ }
+ Message removedMessage = aggregatedMessageStatistics.chunkStatistics
+ .unprocessedMessagesByOffset
+ .remove(aggregatedMessageStatistics.offset);
+ if (removedMessage == null) {
+ return false;
+ }
+ // Remove chunk from list of unprocessed chunks if all its messages have been processed
+ aggregatedMessageStatistics.chunkStatistics.processedMessages.incrementAndGet();
+ if (aggregatedMessageStatistics.chunkStatistics.isDone()) {
+ aggregatedMessageStatistics.chunkHeadMap.remove(aggregatedMessageStatistics.messageEntry.getKey(), aggregatedMessageStatistics.chunkStatistics);
+ }
+ return true;
+ }
+
+ public AggregatedMessageStatistics retrieveStatistics(long offset) {
+ SubscriptionStatistics localSubscriptionStatistics = this.subscriptionStatistics.get();
+ if (localSubscriptionStatistics == null) {
+ return null;
+ }
+ NavigableMap chunkStatisticsHeadMap = localSubscriptionStatistics.unprocessedChunksByOffset.headMap(offset, true);
+ Map.Entry messageEntry = chunkStatisticsHeadMap.lastEntry();
+ ChunkStatistics chunkStatistics = messageEntry == null ? null : messageEntry.getValue();
+ return new AggregatedMessageStatistics(offset, localSubscriptionStatistics, chunkStatisticsHeadMap, chunkStatistics, messageEntry);
+ }
+
+ public AggregatedMessageStatistics retrieveStatistics(MessageHandler.Context messageContext) {
+ return retrieveStatistics(messageContext.offset());
+ }
+
+ public static class AggregatedMessageStatistics {
+
+ private final long offset;
+ private final SubscriptionStatistics subscriptionStatistics;
+ private final NavigableMap chunkHeadMap;
+ private final ChunkStatistics chunkStatistics;
+ private final Map.Entry messageEntry;
+
+ public AggregatedMessageStatistics(
+ long offset,
+ @Nonnull SubscriptionStatistics subscriptionStatistics,
+ @Nullable NavigableMap chunkHeadMap,
+ @Nullable ChunkStatistics chunkStatistics,
+ @Nullable Map.Entry messageEntry) {
+ this.subscriptionStatistics = subscriptionStatistics;
+ this.chunkStatistics = chunkStatistics;
+ this.chunkHeadMap = chunkHeadMap;
+ this.messageEntry = messageEntry;
+ this.offset = offset;
+ }
+
+ @Nonnull
+ public SubscriptionStatistics getSubscriptionStatistics() {
+ return subscriptionStatistics;
+ }
+
+ @Nullable
+ public ChunkStatistics getChunkStatistics() {
+ return chunkStatistics;
+ }
+
+ @Nullable
+ public NavigableMap getChunkHeadMap() {
+ return chunkHeadMap;
+ }
+
+ @Nullable
+ public Map.Entry getMessageEntry() {
+ return messageEntry;
+ }
+
+ public long getOffset() {
+ return offset;
+ }
+
+ }
+
+ public static class SubscriptionStatistics {
+
+ private final AtomicInteger pendingChunks = new AtomicInteger(0);
+ private OffsetSpecification offsetSpecification;
+ private final NavigableMap unprocessedChunksByOffset;
+
+ public SubscriptionStatistics(OffsetSpecification offsetSpecification) {
+ this(offsetSpecification, new ConcurrentSkipListMap<>());
+ }
+
+ public SubscriptionStatistics(OffsetSpecification offsetSpecification,
+ NavigableMap unprocessedChunksByOffset) {
+ this.offsetSpecification = offsetSpecification;
+ this.unprocessedChunksByOffset = unprocessedChunksByOffset;
+ }
+
+ public AtomicInteger getPendingChunks() {
+ return pendingChunks;
+ }
+
+ public OffsetSpecification getOffsetSpecification() {
+ return offsetSpecification;
+ }
+
+ public NavigableMap getUnprocessedChunksByOffset() {
+ return Collections.unmodifiableNavigableMap(unprocessedChunksByOffset);
+ }
+
+ }
+
+ public static class ChunkStatistics {
+
+ private final long offset;
+ private final AtomicLong processedMessages = new AtomicLong();
+ private final long messageCount;
+ private final long dataSize;
+ private final Map unprocessedMessagesByOffset;
+
+ public ChunkStatistics(long offset, long messageCount, long dataSize) {
+ this(offset, messageCount, dataSize, new ConcurrentHashMap<>());
+ }
+
+ public ChunkStatistics(long offset, long messageCount, long dataSize, Map unprocessedMessagesByOffset) {
+ this.offset = offset;
+ this.messageCount = messageCount;
+ this.dataSize = dataSize;
+ this.unprocessedMessagesByOffset = unprocessedMessagesByOffset;
+ }
+
+ public long getOffset() {
+ return offset;
+ }
+
+ public long getMessageCount() {
+ return messageCount;
+ }
+
+ public long getDataSize() {
+ return dataSize;
+ }
+
+ public Map getUnprocessedMessagesByOffset() {
+ return Collections.unmodifiableMap(unprocessedMessagesByOffset);
+ }
+
+ public boolean isDone() {
+ return processedMessages.get() == messageCount && unprocessedMessagesByOffset.isEmpty();
+ }
+ }
+
+ public String getIdentifier() {
+ return identifier;
+ }
+
+ public SubscriptionStatistics getSubscriptionStatistics() {
+ return subscriptionStatistics.get();
+ }
+
+ @Override
+ public String toString() {
+ return "ConsumerStatisticRecorder{" +
+ "identifier='" + identifier + '\'' +
+ ", subscriptionStatistics=" + subscriptionStatistics.get() +
+ '}';
+ }
+}
diff --git a/src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java b/src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java
index 5be323db01..0f7f603dc6 100644
--- a/src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java
+++ b/src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java
@@ -13,14 +13,6 @@
// info@rabbitmq.com.
package com.rabbitmq.stream.impl;
-import static com.rabbitmq.stream.impl.Utils.convertCodeToException;
-import static com.rabbitmq.stream.impl.Utils.formatConstant;
-import static com.rabbitmq.stream.impl.Utils.isSac;
-import static com.rabbitmq.stream.impl.Utils.jsonField;
-import static com.rabbitmq.stream.impl.Utils.namedFunction;
-import static com.rabbitmq.stream.impl.Utils.namedRunnable;
-import static com.rabbitmq.stream.impl.Utils.quote;
-
import com.rabbitmq.stream.BackOffDelayPolicy;
import com.rabbitmq.stream.Constants;
import com.rabbitmq.stream.Consumer;
@@ -32,8 +24,10 @@
import com.rabbitmq.stream.StreamNotAvailableException;
import com.rabbitmq.stream.SubscriptionListener;
import com.rabbitmq.stream.SubscriptionListener.SubscriptionContext;
+import com.rabbitmq.stream.flow.ConsumerFlowControlStrategy;
+import com.rabbitmq.stream.flow.ConsumerFlowControlStrategyBuilder;
+import com.rabbitmq.stream.flow.MessageHandlingListener;
import com.rabbitmq.stream.impl.Client.Broker;
-import com.rabbitmq.stream.impl.Client.ChunkListener;
import com.rabbitmq.stream.impl.Client.ClientParameters;
import com.rabbitmq.stream.impl.Client.ConsumerUpdateListener;
import com.rabbitmq.stream.impl.Client.CreditNotification;
@@ -41,9 +35,9 @@
import com.rabbitmq.stream.impl.Client.MetadataListener;
import com.rabbitmq.stream.impl.Client.QueryOffsetResponse;
import com.rabbitmq.stream.impl.Client.ShutdownListener;
-import com.rabbitmq.stream.impl.Utils.ClientConnectionType;
-import com.rabbitmq.stream.impl.Utils.ClientFactory;
-import com.rabbitmq.stream.impl.Utils.ClientFactoryContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -65,8 +59,8 @@
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
+import static com.rabbitmq.stream.impl.Utils.*;
class ConsumersCoordinator {
@@ -121,9 +115,8 @@ Runnable subscribe(
SubscriptionListener subscriptionListener,
Runnable trackingClosingCallback,
MessageHandler messageHandler,
- Map subscriptionProperties,
- int initialCredits,
- int additionalCredits) {
+ ConsumerFlowControlStrategyBuilder> consumerFlowControlStrategyBuilder,
+ Map subscriptionProperties) {
List candidates = findBrokersForStream(stream);
Client.Broker newNode = pickBroker(candidates);
if (newNode == null) {
@@ -142,10 +135,8 @@ Runnable subscribe(
subscriptionListener,
trackingClosingCallback,
messageHandler,
- subscriptionProperties,
- initialCredits,
- additionalCredits);
-
+ consumerFlowControlStrategyBuilder,
+ subscriptionProperties);
try {
addToManager(newNode, subscriptionTracker, offsetSpecification, true);
} catch (ConnectionStreamException e) {
@@ -169,10 +160,10 @@ Runnable subscribe(
}
private void addToManager(
- Broker node,
- SubscriptionTracker tracker,
- OffsetSpecification offsetSpecification,
- boolean isInitialSubscription) {
+ Broker node,
+ SubscriptionTracker tracker,
+ OffsetSpecification offsetSpecification,
+ boolean isInitialSubscription) {
ClientParameters clientParameters =
environment
.clientParametersCopy()
@@ -202,6 +193,7 @@ private void addToManager(
pickedManager = new ClientSubscriptionsManager(node, clientParameters);
LOGGER.debug("Created subscription manager on {}, id {}", name, pickedManager.id);
}
+ tracker.clientReference.set(pickedManager.client);
try {
pickedManager.add(tracker, offsetSpecification, isInitialSubscription);
LOGGER.debug(
@@ -213,22 +205,22 @@ private void addToManager(
tracker.subscriptionIdInClient);
this.managers.add(pickedManager);
} catch (IllegalStateException e) {
+ tracker.clientReference.set(null);
pickedManager = null;
} catch (ConnectionStreamException | ClientClosedException | StreamNotAvailableException e) {
+ tracker.clientReference.set(null);
// manager connection is dead or stream not available
// scheduling manager closing if necessary in another thread to avoid blocking this one
if (pickedManager.isEmpty()) {
- ClientSubscriptionsManager manager = pickedManager;
- ConsumersCoordinator.this.environment.execute(
- () -> {
- manager.closeIfEmpty();
- },
+ ConsumersCoordinator.this.environment.execute(
+ pickedManager::closeIfEmpty,
"Consumer manager closing after timeout, consumer %d on stream '%s'",
tracker.consumer.id(),
tracker.stream);
}
throw e;
} catch (RuntimeException e) {
+ tracker.clientReference.set(null);
if (pickedManager != null) {
pickedManager.closeIfEmpty();
}
@@ -393,6 +385,8 @@ private static class SubscriptionTracker {
private final OffsetSpecification initialOffsetSpecification;
private final String offsetTrackingReference;
private final MessageHandler messageHandler;
+ private final ConsumerFlowControlStrategy consumerFlowControlStrategy;
+ private final AtomicReference clientReference;
private final StreamConsumer consumer;
private final SubscriptionListener subscriptionListener;
private final Runnable trackingClosingCallback;
@@ -403,21 +397,18 @@ private static class SubscriptionTracker {
private volatile ClientSubscriptionsManager manager;
private volatile AtomicReference state =
new AtomicReference<>(SubscriptionState.OPENING);
- private final int initialCredits;
- private final int additionalCredits;
private SubscriptionTracker(
- long id,
- StreamConsumer consumer,
- String stream,
- OffsetSpecification initialOffsetSpecification,
- String offsetTrackingReference,
- SubscriptionListener subscriptionListener,
- Runnable trackingClosingCallback,
- MessageHandler messageHandler,
- Map subscriptionProperties,
- int initialCredits,
- int additionalCredits) {
+ long id,
+ StreamConsumer consumer,
+ String stream,
+ OffsetSpecification initialOffsetSpecification,
+ String offsetTrackingReference,
+ SubscriptionListener subscriptionListener,
+ Runnable trackingClosingCallback,
+ MessageHandler messageHandler,
+ ConsumerFlowControlStrategyBuilder> consumerFlowControlStrategyBuilder,
+ Map subscriptionProperties) {
this.id = id;
this.consumer = consumer;
this.stream = stream;
@@ -426,8 +417,20 @@ private SubscriptionTracker(
this.subscriptionListener = subscriptionListener;
this.trackingClosingCallback = trackingClosingCallback;
this.messageHandler = messageHandler;
- this.initialCredits = initialCredits;
- this.additionalCredits = additionalCredits;
+ this.clientReference = new AtomicReference<>();
+ String identifier = String.format("[stream=%s, subscriptionId=%s]", stream, String.valueOf(subscriptionIdInClient));
+ this.consumerFlowControlStrategy = consumerFlowControlStrategyBuilder.build(
+ identifier,
+ credits -> {
+ Client retrievedClient = this.clientReference.get();
+ if(retrievedClient == null) {
+ LOGGER.error("Client is not initialized, cannot ask for credits! "
+ + "Asked for {} credits. Identifier: {}", credits, identifier);
+ return;
+ }
+ retrievedClient.credit(this.subscriptionIdInClient, credits);
+ }
+ );
if (this.offsetTrackingReference == null) {
this.subscriptionProperties = subscriptionProperties;
} else {
@@ -497,13 +500,15 @@ private static final class MessageHandlerContext implements Context {
private final long timestamp;
private final long committedOffset;
private final StreamConsumer consumer;
+ private final MessageHandlingListener handledCallback;
private MessageHandlerContext(
- long offset, long timestamp, long committedOffset, StreamConsumer consumer) {
+ long offset, long timestamp, long committedOffset, StreamConsumer consumer, MessageHandlingListener handledCallback) {
this.offset = offset;
this.timestamp = timestamp;
this.committedOffset = committedOffset;
this.consumer = consumer;
+ this.handledCallback = handledCallback;
}
@Override
@@ -534,6 +539,12 @@ public String stream() {
public Consumer consumer() {
return this.consumer;
}
+
+ @Override
+ public boolean markHandled() {
+ return this.handledCallback.markHandled(this);
+ }
+
}
/**
@@ -554,30 +565,18 @@ private class ClientSubscriptionsManager implements Comparable subscriptionTrackers =
new ArrayList<>(maxConsumersByConnection);
- private volatile int trackerCount = 0;
+ private volatile int trackerCount;
private final AtomicBoolean closed = new AtomicBoolean(false);
- private ClientSubscriptionsManager(Broker node, Client.ClientParameters clientParameters) {
+ private ClientSubscriptionsManager(
+ Broker node,
+ Client.ClientParameters clientParameters
+ ) {
this.id = managerIdSequence.getAndIncrement();
this.node = node;
this.name = keyForClientSubscription(node);
LOGGER.debug("creating subscription manager on {}", name);
IntStream.range(0, maxConsumersByConnection).forEach(i -> subscriptionTrackers.add(null));
- this.trackerCount = 0;
- AtomicBoolean clientInitializedInManager = new AtomicBoolean(false);
- ChunkListener chunkListener =
- (client, subscriptionId, offset, messageCount, dataSize) -> {
- SubscriptionTracker subscriptionTracker =
- subscriptionTrackers.get(subscriptionId & 0xFF);
- if (subscriptionTracker != null && subscriptionTracker.consumer.isOpen()) {
- client.credit(subscriptionId, subscriptionTracker.additionalCredits);
- } else {
- LOGGER.debug(
- "Could not find stream subscription {} or subscription closing, not providing credits",
- subscriptionId & 0xFF);
- }
- };
-
CreditNotification creditNotification =
(subscriptionId, responseCode) -> {
SubscriptionTracker subscriptionTracker =
@@ -597,9 +596,22 @@ private ClientSubscriptionsManager(Broker node, Client.ClientParameters clientPa
if (subscriptionTracker != null) {
subscriptionTracker.offset = offset;
subscriptionTracker.hasReceivedSomething = true;
+ subscriptionTracker.consumerFlowControlStrategy.handleMessage(
+ offset,
+ chunkTimestamp,
+ committedOffset,
+ message
+ );
subscriptionTracker.messageHandler.handle(
new MessageHandlerContext(
- offset, chunkTimestamp, committedOffset, subscriptionTracker.consumer),
+ offset,
+ chunkTimestamp,
+ committedOffset,
+ subscriptionTracker.consumer,
+ subscriptionTracker.consumerFlowControlStrategy instanceof MessageHandlingListener
+ ? (MessageHandlingListener) subscriptionTracker.consumerFlowControlStrategy
+ : c -> true
+ ),
message);
// FIXME set offset here as well, best effort to avoid duplicates?
} else {
@@ -733,6 +745,14 @@ private ClientSubscriptionsManager(Broker node, Client.ClientParameters clientPa
}
return result;
};
+ Client.ChunkListener chunkListener = (client, subscriptionId, offset, messageCount, dataSize) -> {
+ SubscriptionTracker subscriptionTracker = subscriptionTrackers.get(subscriptionId & 0xFF);
+ if(subscriptionTracker == null) {
+ LOGGER.warn("Could not find stream subscription {} for chunk listener", subscriptionId);
+ return;
+ }
+ subscriptionTracker.consumerFlowControlStrategy.handleChunk(offset, messageCount, dataSize);
+ };
String connectionName = connectionNamingStrategy.apply(ClientConnectionType.CONSUMER);
ClientFactoryContext clientFactoryContext =
ClientFactoryContext.fromParameters(
@@ -747,7 +767,6 @@ private ClientSubscriptionsManager(Broker node, Client.ClientParameters clientPa
.key(name);
this.client = clientFactory.client(clientFactoryContext);
LOGGER.debug("Created consumer connection '{}'", connectionName);
- clientInitializedInManager.set(true);
}
private void assignConsumersToStream(
@@ -961,15 +980,19 @@ synchronized void add(
subscriptionContext.offsetSpecification());
checkNotClosed();
- byte subId = subscriptionId;
+ int initialCredits = subscriptionTracker.consumerFlowControlStrategy.handleSubscribeReturningInitialCredits(
+ subscriptionContext.offsetSpecification(),
+ isInitialSubscription
+ );
+ final byte finalSubscriptionId = subscriptionId;
Client.Response subscribeResponse =
Utils.callAndMaybeRetry(
() ->
client.subscribe(
- subId,
+ finalSubscriptionId,
subscriptionTracker.stream,
subscriptionContext.offsetSpecification(),
- subscriptionTracker.initialCredits,
+ initialCredits,
subscriptionTracker.subscriptionProperties),
RETRY_ON_TIMEOUT,
"Subscribe request for consumer %s on stream '%s'",
@@ -1033,7 +1056,6 @@ synchronized void remove(SubscriptionTracker subscriptionTracker) {
subscriptionTracker.consumer.id(),
subscriptionTracker.stream);
}
-
this.setSubscriptionTrackers(update(this.subscriptionTrackers, subscriptionIdInClient, null));
streamToStreamSubscriptions.compute(
subscriptionTracker.stream,
diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamConsumer.java b/src/main/java/com/rabbitmq/stream/impl/StreamConsumer.java
index 2e63b11e00..4472f42742 100644
--- a/src/main/java/com/rabbitmq/stream/impl/StreamConsumer.java
+++ b/src/main/java/com/rabbitmq/stream/impl/StreamConsumer.java
@@ -13,11 +13,6 @@
// info@rabbitmq.com.
package com.rabbitmq.stream.impl;
-import static com.rabbitmq.stream.BackOffDelayPolicy.fixedWithInitialDelay;
-import static com.rabbitmq.stream.impl.AsyncRetry.asyncRetry;
-import static com.rabbitmq.stream.impl.Utils.offsetBefore;
-import static java.time.Duration.ofMillis;
-
import com.rabbitmq.stream.Constants;
import com.rabbitmq.stream.Consumer;
import com.rabbitmq.stream.ConsumerUpdateListener;
@@ -27,10 +22,14 @@
import com.rabbitmq.stream.OffsetSpecification;
import com.rabbitmq.stream.StreamException;
import com.rabbitmq.stream.SubscriptionListener;
+import com.rabbitmq.stream.flow.ConsumerFlowControlStrategyBuilder;
import com.rabbitmq.stream.impl.Client.QueryOffsetResponse;
import com.rabbitmq.stream.impl.StreamConsumerBuilder.TrackingConfiguration;
import com.rabbitmq.stream.impl.StreamEnvironment.TrackingConsumerRegistration;
import com.rabbitmq.stream.impl.Utils.CompositeConsumerUpdateListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
@@ -44,8 +43,11 @@
import java.util.function.LongConsumer;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
+import static com.rabbitmq.stream.BackOffDelayPolicy.fixedWithInitialDelay;
+import static com.rabbitmq.stream.impl.AsyncRetry.asyncRetry;
+import static com.rabbitmq.stream.impl.Utils.offsetBefore;
+import static java.time.Duration.ofMillis;
class StreamConsumer implements Consumer {
@@ -74,15 +76,14 @@ class StreamConsumer implements Consumer {
String stream,
OffsetSpecification offsetSpecification,
MessageHandler messageHandler,
+ ConsumerFlowControlStrategyBuilder> consumerFlowControlStrategyBuilder,
String name,
StreamEnvironment environment,
TrackingConfiguration trackingConfiguration,
boolean lazyInit,
SubscriptionListener subscriptionListener,
Map subscriptionProperties,
- ConsumerUpdateListener consumerUpdateListener,
- int initialCredits,
- int additionalCredits) {
+ ConsumerUpdateListener consumerUpdateListener) {
this.id = ID_SEQUENCE.getAndIncrement();
Runnable trackingClosingCallback;
@@ -255,9 +256,9 @@ class StreamConsumer implements Consumer {
subscriptionListener,
trackingClosingCallback,
closedAwareMessageHandler,
- Collections.unmodifiableMap(subscriptionProperties),
- initialCredits,
- additionalCredits);
+ consumerFlowControlStrategyBuilder,
+ Collections.unmodifiableMap(subscriptionProperties)
+ );
this.status = Status.RUNNING;
};
diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamConsumerBuilder.java b/src/main/java/com/rabbitmq/stream/impl/StreamConsumerBuilder.java
index 7d829e1f7f..26a44e3dd8 100644
--- a/src/main/java/com/rabbitmq/stream/impl/StreamConsumerBuilder.java
+++ b/src/main/java/com/rabbitmq/stream/impl/StreamConsumerBuilder.java
@@ -20,6 +20,12 @@
import com.rabbitmq.stream.OffsetSpecification;
import com.rabbitmq.stream.StreamException;
import com.rabbitmq.stream.SubscriptionListener;
+import com.rabbitmq.stream.flow.ConsumerFlowControlStrategy;
+import com.rabbitmq.stream.flow.ConsumerFlowControlStrategyBuilder;
+import com.rabbitmq.stream.flow.ConsumerFlowControlStrategyBuilderFactory;
+import com.rabbitmq.stream.impl.flow.MaximumChunksPerSubscriptionAsyncConsumerFlowControlStrategy;
+import com.rabbitmq.stream.impl.flow.SynchronousConsumerFlowControlStrategy;
+
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.time.Duration;
@@ -42,7 +48,7 @@ class StreamConsumerBuilder implements ConsumerBuilder {
private SubscriptionListener subscriptionListener = subscriptionContext -> {};
private final Map subscriptionProperties = new ConcurrentHashMap<>();
private ConsumerUpdateListener consumerUpdateListener;
- private final DefaultFlowConfiguration flowConfiguration = new DefaultFlowConfiguration(this);
+ private ConsumerFlowControlStrategyBuilder> consumerFlowControlStrategyBuilder = SynchronousConsumerFlowControlStrategy.builder(this);
public StreamConsumerBuilder(StreamEnvironment environment) {
this.environment = environment;
@@ -72,6 +78,15 @@ public ConsumerBuilder messageHandler(MessageHandler messageHandler) {
return this;
}
+ @Override
+ public
+ , S extends ConsumerFlowControlStrategy>
+ T customFlowControlStrategy(ConsumerFlowControlStrategyBuilderFactory consumerFlowControlStrategyBuilderFactory) {
+ T localConsumerFlowControlStrategyBuilder = consumerFlowControlStrategyBuilderFactory.builder(this);
+ this.consumerFlowControlStrategyBuilder = localConsumerFlowControlStrategyBuilder;
+ return localConsumerFlowControlStrategyBuilder;
+ }
+
MessageHandler messageHandler() {
return this.messageHandler;
}
@@ -131,9 +146,35 @@ public ConsumerBuilder noTrackingStrategy() {
return this;
}
+ /**
+ * Configure credit parameters for flow control.
+ * Implies usage of a traditional {@link SynchronousConsumerFlowControlStrategy}.
+ *
+ * @param initialPrefetchLevel Credits to ask for with each new subscription
+ * @param prefetchLevelAfterDelivery Credits to ask for after a chunk is delivered
+ * @return this {@link StreamConsumerBuilder}
+ */
@Override
- public FlowConfiguration flow() {
- return this.flowConfiguration;
+ public StreamConsumerBuilder synchronousControlFlow(int initialPrefetchLevel, int prefetchLevelAfterDelivery) {
+ if (initialPrefetchLevel <= 0 || prefetchLevelAfterDelivery <= 0) {
+ throw new IllegalArgumentException("Credits must be positive");
+ }
+ this.consumerFlowControlStrategyBuilder = SynchronousConsumerFlowControlStrategy
+ .builder(this)
+ .initialCredits(initialPrefetchLevel)
+ .additionalCredits(prefetchLevelAfterDelivery);
+ return this;
+ }
+
+ @Override
+ public ConsumerBuilder asynchronousControlFlow(int prefetchLevel) {
+ if (prefetchLevel <= 0) {
+ throw new IllegalArgumentException("ConcurrencyLevel must be positive");
+ }
+ this.consumerFlowControlStrategyBuilder = MaximumChunksPerSubscriptionAsyncConsumerFlowControlStrategy
+ .builder(this)
+ .maximumInflightChunksPerSubscription(prefetchLevel);
+ return this;
}
StreamConsumerBuilder lazyInit(boolean lazyInit) {
@@ -192,15 +233,15 @@ public Consumer build() {
this.stream,
this.offsetSpecification,
this.messageHandler,
+ this.consumerFlowControlStrategyBuilder,
this.name,
this.environment,
trackingConfiguration,
this.lazyInit,
this.subscriptionListener,
this.subscriptionProperties,
- this.consumerUpdateListener,
- this.flowConfiguration.initialCredits,
- this.flowConfiguration.additionalCredits);
+ this.consumerUpdateListener
+ );
environment.addConsumer((StreamConsumer) consumer);
} else {
if (Utils.isSac(this.subscriptionProperties)) {
@@ -337,32 +378,6 @@ StreamConsumerBuilder duplicate() {
return duplicate;
}
- private static class DefaultFlowConfiguration implements FlowConfiguration {
-
- private final ConsumerBuilder consumerBuilder;
-
- private DefaultFlowConfiguration(ConsumerBuilder consumerBuilder) {
- this.consumerBuilder = consumerBuilder;
- }
-
- private int initialCredits = 1;
- private final int additionalCredits = 1;
-
- @Override
- public FlowConfiguration initialCredits(int initialCredits) {
- if (initialCredits <= 0) {
- throw new IllegalArgumentException("Credits must be positive");
- }
- this.initialCredits = initialCredits;
- return this;
- }
-
- @Override
- public ConsumerBuilder builder() {
- return this.consumerBuilder;
- }
- }
-
// to help testing
public ConsumerUpdateListener consumerUpdateListener() {
return consumerUpdateListener;
diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java b/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java
index d40f070b56..0bf613b35e 100644
--- a/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java
+++ b/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java
@@ -13,6 +13,13 @@
// info@rabbitmq.com.
package com.rabbitmq.stream.impl;
+import com.rabbitmq.stream.Address;
+import com.rabbitmq.stream.AddressResolver;
+import com.rabbitmq.stream.BackOffDelayPolicy;
+import com.rabbitmq.stream.Codec;
+import com.rabbitmq.stream.ConsumerBuilder;
+import com.rabbitmq.stream.Environment;
+import com.rabbitmq.stream.MessageHandler;
import static com.rabbitmq.stream.impl.Utils.convertCodeToException;
import static com.rabbitmq.stream.impl.Utils.exceptionMessage;
import static com.rabbitmq.stream.impl.Utils.formatConstant;
@@ -23,18 +30,22 @@
import com.rabbitmq.stream.*;
import com.rabbitmq.stream.MessageHandler.Context;
import com.rabbitmq.stream.compression.CompressionCodecFactory;
+import com.rabbitmq.stream.flow.ConsumerFlowControlStrategyBuilder;
import com.rabbitmq.stream.impl.Client.ClientParameters;
import com.rabbitmq.stream.impl.Client.ShutdownListener;
import com.rabbitmq.stream.impl.Client.StreamStatsResponse;
import com.rabbitmq.stream.impl.OffsetTrackingCoordinator.Registration;
import com.rabbitmq.stream.impl.StreamConsumerBuilder.TrackingConfiguration;
import com.rabbitmq.stream.impl.StreamEnvironmentBuilder.DefaultTlsConfiguration;
-import com.rabbitmq.stream.impl.Utils.ClientConnectionType;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.net.ssl.SSLException;
import java.io.IOException;
import java.net.URI;
import java.net.URLDecoder;
@@ -58,9 +69,9 @@
import java.util.function.LongSupplier;
import java.util.function.Supplier;
import java.util.stream.Collectors;
-import javax.net.ssl.SSLException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
+import static com.rabbitmq.stream.impl.Utils.*;
+import static java.util.concurrent.TimeUnit.SECONDS;
class StreamEnvironment implements Environment {
@@ -650,22 +661,19 @@ Runnable registerConsumer(
SubscriptionListener subscriptionListener,
Runnable trackingClosingCallback,
MessageHandler messageHandler,
- Map subscriptionProperties,
- int initialCredits,
- int additionalCredits) {
- Runnable closingCallback =
- this.consumersCoordinator.subscribe(
- consumer,
- stream,
- offsetSpecification,
- trackingReference,
- subscriptionListener,
- trackingClosingCallback,
- messageHandler,
- subscriptionProperties,
- initialCredits,
- additionalCredits);
- return closingCallback;
+ ConsumerFlowControlStrategyBuilder> consumerFlowControlStrategyBuilder,
+ Map subscriptionProperties) {
+ return this.consumersCoordinator.subscribe(
+ consumer,
+ stream,
+ offsetSpecification,
+ trackingReference,
+ subscriptionListener,
+ trackingClosingCallback,
+ messageHandler,
+ consumerFlowControlStrategyBuilder,
+ subscriptionProperties
+ );
}
Runnable registerProducer(StreamProducer producer, String reference, String stream) {
diff --git a/src/main/java/com/rabbitmq/stream/impl/SuperStreamConsumer.java b/src/main/java/com/rabbitmq/stream/impl/SuperStreamConsumer.java
index c884da7e3c..014f6a33b8 100644
--- a/src/main/java/com/rabbitmq/stream/impl/SuperStreamConsumer.java
+++ b/src/main/java/com/rabbitmq/stream/impl/SuperStreamConsumer.java
@@ -13,20 +13,21 @@
// info@rabbitmq.com.
package com.rabbitmq.stream.impl;
-import static com.rabbitmq.stream.impl.Utils.namedFunction;
-
import com.rabbitmq.stream.Consumer;
import com.rabbitmq.stream.Message;
import com.rabbitmq.stream.MessageHandler;
import com.rabbitmq.stream.impl.StreamConsumerBuilder.TrackingConfiguration;
import com.rabbitmq.stream.impl.Utils.CompositeConsumerUpdateListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
+import static com.rabbitmq.stream.impl.Utils.namedFunction;
class SuperStreamConsumer implements Consumer {
@@ -126,6 +127,11 @@ private ManualOffsetTrackingMessageHandler(
public void handle(Context context, Message message) {
Context ctx =
new Context() {
+ @Override
+ public boolean markHandled() {
+ return context.markHandled();
+ }
+
@Override
public long offset() {
return context.offset();
@@ -145,7 +151,7 @@ public long committedChunkId() {
public void storeOffset() {
for (ConsumerState state : consumerStates) {
if (ManualOffsetTrackingMessageHandler.this.consumerState == state) {
- maybeStoreOffset(state, () -> context.storeOffset());
+ maybeStoreOffset(state, context::storeOffset);
} else if (state.offset != 0) {
maybeStoreOffset(state, () -> state.consumer.store(state.offset));
}
@@ -153,9 +159,7 @@ public void storeOffset() {
}
private void maybeStoreOffset(ConsumerState state, Runnable storeAction) {
- if (state.consumer.isSac() && !state.consumer.sacActive()) {
- // do nothing
- } else {
+ if (!state.consumer.isSac() || state.consumer.sacActive()) {
storeAction.run();
}
}
diff --git a/src/main/java/com/rabbitmq/stream/impl/flow/AbstractStatisticRecordingConsumerFlowControlStrategy.java b/src/main/java/com/rabbitmq/stream/impl/flow/AbstractStatisticRecordingConsumerFlowControlStrategy.java
new file mode 100644
index 0000000000..8c48e71e79
--- /dev/null
+++ b/src/main/java/com/rabbitmq/stream/impl/flow/AbstractStatisticRecordingConsumerFlowControlStrategy.java
@@ -0,0 +1,134 @@
+package com.rabbitmq.stream.impl.flow;
+
+import com.rabbitmq.stream.Message;
+import com.rabbitmq.stream.MessageHandler;
+import com.rabbitmq.stream.OffsetSpecification;
+import com.rabbitmq.stream.flow.AbstractConsumerFlowControlStrategy;
+import com.rabbitmq.stream.flow.AsyncConsumerFlowControlStrategy;
+import com.rabbitmq.stream.flow.ConsumerFlowControlStrategy;
+import com.rabbitmq.stream.flow.CreditAsker;
+import com.rabbitmq.stream.impl.ConsumerStatisticRecorder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.IntUnaryOperator;
+
+/**
+ * Abstract class that calls an instance of {@link ConsumerStatisticRecorder} and exposes it to child implementations
+ * that may use its statistics to control flow as they see fit.
+ */
+public abstract class AbstractStatisticRecordingConsumerFlowControlStrategy
+ extends AbstractConsumerFlowControlStrategy
+ implements AsyncConsumerFlowControlStrategy {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(AbstractStatisticRecordingConsumerFlowControlStrategy.class);
+
+ protected final ConsumerStatisticRecorder consumerStatisticRecorder;
+
+ protected AbstractStatisticRecordingConsumerFlowControlStrategy(String identifier, CreditAsker creditAsker) {
+ super(identifier, creditAsker);
+ this.consumerStatisticRecorder = new ConsumerStatisticRecorder(identifier);
+ }
+
+ /**
+ * Note for implementors: This method MUST be called from the implementation of
+ * {@link ConsumerFlowControlStrategy#handleSubscribeReturningInitialCredits},
+ * otherwise statistics will not be registered!
+ *