Skip to content

Add Consumer Flow Control Strategies #340

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 15 commits into from
Closed
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 56 additions & 0 deletions src/main/java/com/rabbitmq/stream/CallbackStreamDataHandler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package com.rabbitmq.stream;

/**
* Exposes callbacks to handle events from a particular Stream subscription,
* with specific names for methods and no connection-oriented parameter.
*/
public interface CallbackStreamDataHandler {

default void handleChunk(long offset, long messageCount, long dataSize) {
// No-op by default
}

default void handleMessage(long offset, long chunkTimestamp, long committedChunkId, Message message) {
// No-op by default
}

default void handleCreditNotification(short responseCode) {
// No-op by default
}

default void handleConsumerUpdate(boolean active) {
// No-op by default
}

default void handleMetadata(short code) {
// No-op by default
}

/**
* Callback for handling a stream subscription.
*
* @param offsetSpecification The offset specification for this new subscription
* @param isInitialSubscription Whether this subscription is an initial subscription
* or a recovery for an existing subscription
*/
default void handleSubscribe(
OffsetSpecification offsetSpecification,
boolean isInitialSubscription
) {
// No-op by default
}

/**
* Callback for handling a stream unsubscription.
*/
default void handleUnsubscribe() {
if(this instanceof AutoCloseable) {
try {
((AutoCloseable) this).close();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}

}
93 changes: 57 additions & 36 deletions src/main/java/com/rabbitmq/stream/ConsumerBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@
// [email protected].
package com.rabbitmq.stream;

import com.rabbitmq.stream.flow.ConsumerFlowControlStrategy;
import com.rabbitmq.stream.flow.ConsumerFlowControlStrategyBuilder;
import com.rabbitmq.stream.flow.ConsumerFlowControlStrategyBuilderFactory;

import java.time.Duration;

/** API to configure and create a {@link Consumer}. */
Expand Down Expand Up @@ -59,6 +63,55 @@ public interface ConsumerBuilder {
*/
ConsumerBuilder messageHandler(MessageHandler messageHandler);

/**
* Configure prefetching parameters for synchronous flow control.
*
* <p>
* Treat the parameters as an abstract scale at the {@link Consumer} level.
* </p>
*
* @param initialPrefetchLevel The initial level of message pre-fetching.
* This may me implemented as the credits to initially
* ask for with each new subscription,
* but do not depend strongly on this aspect.
* @param prefetchLevelAfterDelivery The level of message pre-fetching after the initial batch.
* This may be implemented as the credits to ask for after a chunk
* is delivered on each subscription,
* but do not depend strongly on this aspect.
* <b>
* The recommended value is <code>1</code>.
* Higher values may cause excessive over-fetching.
* </b>
* @return this {@link ConsumerBuilder}
*/
ConsumerBuilder synchronousControlFlow(int initialPrefetchLevel, int prefetchLevelAfterDelivery);

/**
* Configure prefetching parameters for asynchronous flow control.
*
* <p>
* Treat the parameters as an abstract scale at the {@link Consumer} level.
* </p>
*
* @param prefetchLevel The desired level of message pre-fetching.
* This may be implemented as the maximum chunks to have in processing or pending arrival
* per subscription at a given moment, but do not depend strongly on this aspect.
* @return this {@link ConsumerBuilder}
*/
ConsumerBuilder asynchronousControlFlow(int prefetchLevel);

/**
* Factory for the flow control strategy to be used when consuming messages.
* When there is no need to use a custom strategy, which is the majority of cases,
* prefer using {@link ConsumerBuilder#synchronousControlFlow} or {@link ConsumerBuilder#asynchronousControlFlow} instead.
*
* @param consumerFlowControlStrategyBuilderFactory the factory
* @return a fluent configurable builder for the flow control strategy
* @param <T> The type of the builder for the provided factory
*/
<T extends ConsumerFlowControlStrategyBuilder<S>, S extends ConsumerFlowControlStrategy>
T customFlowControlStrategy(ConsumerFlowControlStrategyBuilderFactory<S, T> consumerFlowControlStrategyBuilderFactory);

/**
* The logical name of the {@link Consumer}.
*
Expand Down Expand Up @@ -151,13 +204,6 @@ public interface ConsumerBuilder {
*/
ConsumerBuilder noTrackingStrategy();

/**
* Configure flow of messages.
*
* @return the flow configuration
*/
FlowConfiguration flow();

/**
* Create the configured {@link Consumer}
*
Expand All @@ -166,7 +212,7 @@ public interface ConsumerBuilder {
Consumer build();

/** Manual tracking strategy. */
interface ManualTrackingStrategy {
interface ManualTrackingStrategy extends ConsumerBuilderAccessor {

/**
* Interval to check if the last requested stored offset has been actually stored.
Expand All @@ -177,17 +223,10 @@ interface ManualTrackingStrategy {
* @return the manual tracking strategy
*/
ManualTrackingStrategy checkInterval(Duration checkInterval);

/**
* Go back to the builder.
*
* @return the consumer builder
*/
ConsumerBuilder builder();
}

/** Auto-tracking strategy. */
interface AutoTrackingStrategy {
interface AutoTrackingStrategy extends ConsumerBuilderAccessor {

/**
* Number of messages before storing.
Expand All @@ -208,33 +247,15 @@ interface AutoTrackingStrategy {
* @return the auto-tracking strategy
*/
AutoTrackingStrategy flushInterval(Duration flushInterval);

/**
* Go back to the builder.
*
* @return the consumer builder
*/
ConsumerBuilder builder();
}

/** Message flow configuration. */
interface FlowConfiguration {

/**
* The number of initial credits for the subscription.
*
* <p>Default is 1.
*
* @param initialCredits the number of initial credits
* @return this configuration instance
*/
FlowConfiguration initialCredits(int initialCredits);

interface ConsumerBuilderAccessor {
/**
* Go back to the builder.
*
* @return the consumer builder
*/
ConsumerBuilder builder();
}

}
8 changes: 8 additions & 0 deletions src/main/java/com/rabbitmq/stream/MessageHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -84,5 +84,13 @@ interface Context {
* @see Consumer#store(long)
*/
Consumer consumer();

/**
* Marks this message as handled
*
* @return Whether the message was marked as handled (returning {@code true})
* or was not found (either because it was already marked as handled, or wasn't tracked)
*/
boolean markHandled();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package com.rabbitmq.stream.flow;

import java.util.Objects;

/**
* Abstract class for Consumer Flow Control Strategies which keeps a
* {@link CreditAsker creditAsker} field ready for retrieval by its inheritors.
*/
public abstract class AbstractConsumerFlowControlStrategy implements ConsumerFlowControlStrategy {

private final String identifier;
private final CreditAsker creditAsker;

protected AbstractConsumerFlowControlStrategy(String identifier, CreditAsker creditAsker) {
this.identifier = identifier;
this.creditAsker = Objects.requireNonNull(creditAsker, "creditAsker");
}

public CreditAsker getCreditAsker() {
return creditAsker;
}

public String getIdentifier() {
return identifier;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
AbstractConsumerFlowControlStrategy that = (AbstractConsumerFlowControlStrategy) o;
return Objects.equals(identifier, that.identifier) && Objects.equals(creditAsker, that.creditAsker);
}

@Override
public int hashCode() {
return Objects.hash(identifier, creditAsker);
}

@Override
public String toString() {
return "AbstractConsumerFlowControlStrategy{" +
"identifier='" + identifier + '\'' +
", creditAsker=" + creditAsker +
'}';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package com.rabbitmq.stream.flow;

/**
* Variant of {@link ConsumerFlowControlStrategy} that implements {@link MessageHandlingListener} to asynchronously
* mark messages as handled.
*/
public interface AsyncConsumerFlowControlStrategy extends ConsumerFlowControlStrategy, MessageHandlingListener {

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package com.rabbitmq.stream.flow;

import com.rabbitmq.stream.CallbackStreamDataHandler;
import com.rabbitmq.stream.OffsetSpecification;

/**
* A built and configured flow control strategy for consumers.
* Implementations may freely implement reactions to the various client callbacks.
* When defined by each implementation, it may internally call {@link CreditAsker#credit} to ask for credits.
* One instance of this is expected to be built for each separate subscription.
* A {@link com.rabbitmq.stream.Consumer} may have multiple subscriptions, and thus multiple instances of this.
*/
public interface ConsumerFlowControlStrategy extends CallbackStreamDataHandler {

/**
* Callback for handling a stream subscription.
* Called right before the subscription is sent to the broker.
* <p>
* Either this variant or {@link CallbackStreamDataHandler#handleSubscribe} should be called, NOT both.
* </p>
*
* @param offsetSpecification The offset specification for this new subscription
* @param isInitialSubscription Whether this subscription is an initial subscription
* or a recovery for an existing subscription
* @return The initial credits that should be granted to this new subscription
*/
int handleSubscribeReturningInitialCredits(
OffsetSpecification offsetSpecification,
boolean isInitialSubscription
);

@Override
default void handleSubscribe(
OffsetSpecification offsetSpecification,
boolean isInitialSubscription) {
handleSubscribeReturningInitialCredits(
offsetSpecification,
isInitialSubscription
);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.rabbitmq.stream.flow;

import com.rabbitmq.stream.ConsumerBuilder;

/**
* Fluent builder for a {@link ConsumerFlowControlStrategyBuilderFactory}.
* One instance of this is set per {@link com.rabbitmq.stream.Consumer}.
* A {@link com.rabbitmq.stream.Consumer} may have multiple subscriptions, and thus multiple instances built by this.
*
* @param <T> the type of {@link ConsumerFlowControlStrategy} to be built
*/
public interface ConsumerFlowControlStrategyBuilder<T extends ConsumerFlowControlStrategy> extends ConsumerBuilder.ConsumerBuilderAccessor {
/**
* Builds the actual FlowControlStrategy instance, for the Client with which it interoperates
*
* @param identifier A {@link String} to uniquely identify the built instance and/or its subscription.
* @param creditAsker {@link CreditAsker} for asking for credits.
* @return {@link T} the built {@link ConsumerFlowControlStrategy}
*/
T build(String identifier, CreditAsker creditAsker);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package com.rabbitmq.stream.flow;

import com.rabbitmq.stream.ConsumerBuilder;

/**
* A strategy for regulating consumer flow when consuming from a RabbitMQ Stream.
* @param <T> the type of {@link ConsumerFlowControlStrategy} to be built
* @param <C> the type of fluent builder exposed by this factory. Must subclass {@link ConsumerFlowControlStrategyBuilder}
*/
@FunctionalInterface
public interface ConsumerFlowControlStrategyBuilderFactory<T extends ConsumerFlowControlStrategy, C extends ConsumerFlowControlStrategyBuilder<T>> {
/**
* Accessor for configuration builder with settings specific to each implementing strategy
* @return {@link C} the specific consumer flow control strategy configuration builder
*/
C builder(ConsumerBuilder consumerBuilder);
}
13 changes: 13 additions & 0 deletions src/main/java/com/rabbitmq/stream/flow/CreditAsker.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package com.rabbitmq.stream.flow;

@FunctionalInterface
public interface CreditAsker {

/**
* Asks for credits for a given subscription.
* @param credits How many credits to ask for
* @throws IllegalArgumentException If credits are below 0 or above {@link Short#MAX_VALUE}
*/
void credit(int credits);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package com.rabbitmq.stream.flow;

import com.rabbitmq.stream.MessageHandler;

@FunctionalInterface
public interface MessageHandlingListener {

/**
* Marks a message as handled
*
* @param messageContext The {@link MessageHandler.Context} of the handled message
* @return Whether the message was marked as handled (returning {@code true})
* or was not found (either because it was already marked as handled, or wasn't tracked)
*/
boolean markHandled(MessageHandler.Context messageContext);

}
Loading