Skip to content

Commit 7add606

Browse files
committed
Add flow control strategy mechanism
Added also a single default flow control strategy, named legacy, which is set by default. This change should be transparent by all API consumers, but a new method to configure flow control strategies has been added.
1 parent a499214 commit 7add606

16 files changed

+463
-171
lines changed
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package com.rabbitmq.stream;
2+
3+
import com.rabbitmq.stream.impl.Client;
4+
5+
import java.util.Objects;
6+
import java.util.function.Supplier;
7+
8+
public abstract class AbstractFlowControlStrategy implements ConsumerFlowControlStrategy {
9+
10+
private final Supplier<Client> clientSupplier;
11+
private volatile Client client;
12+
13+
protected AbstractFlowControlStrategy(Supplier<Client> clientSupplier) {
14+
this.clientSupplier = Objects.requireNonNull(clientSupplier, "clientSupplier");
15+
}
16+
17+
protected Client mandatoryClient() {
18+
Client localClient = this.client;
19+
if(localClient != null) {
20+
return localClient;
21+
}
22+
localClient = clientSupplier.get();
23+
if(localClient == null) {
24+
throw new IllegalStateException("Requested client, but client is not yet available! Supplier: " + this.clientSupplier);
25+
}
26+
this.client = localClient;
27+
return localClient;
28+
}
29+
30+
}
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
package com.rabbitmq.stream;
2+
3+
import com.rabbitmq.stream.impl.Client;
4+
5+
/**
6+
* Exposes callbacks to handle events from a particular {@link Client},
7+
* with specific names for methods and no {@link Client} parameter.
8+
*/
9+
public interface ClientDataHandler extends
10+
Client.PublishConfirmListener,
11+
Client.PublishErrorListener,
12+
Client.ChunkListener,
13+
Client.MessageListener,
14+
Client.CreditNotification,
15+
Client.ConsumerUpdateListener,
16+
Client.ShutdownListener,
17+
Client.MetadataListener {
18+
19+
@Override
20+
default void handle(byte publisherId, long publishingId) {
21+
this.handlePublishConfirm(publisherId, publishingId);
22+
}
23+
24+
default void handlePublishConfirm(byte publisherId, long publishingId) {
25+
// No-op by default
26+
}
27+
28+
@Override
29+
default void handle(byte publisherId, long publishingId, short errorCode) {
30+
this.handlePublishError(publisherId, publishingId, errorCode);
31+
}
32+
33+
default void handlePublishError(byte publisherId, long publishingId, short errorCode) {
34+
// No-op by default
35+
}
36+
37+
@Override
38+
default void handle(Client client, byte subscriptionId, long offset, long messageCount, long dataSize) {
39+
this.handleChunk(subscriptionId, offset, messageCount, dataSize);
40+
}
41+
42+
default void handleChunk(byte subscriptionId, long offset, long messageCount, long dataSize) {
43+
// No-op by default
44+
}
45+
46+
@Override
47+
default void handle(byte subscriptionId, long offset, long chunkTimestamp, long committedChunkId, Message message) {
48+
this.handleMessage(subscriptionId, offset, chunkTimestamp, committedChunkId, message);
49+
}
50+
51+
default void handleMessage(byte subscriptionId, long offset, long chunkTimestamp, long committedChunkId, Message message) {
52+
// No-op by default
53+
}
54+
55+
@Override
56+
default void handle(byte subscriptionId, short responseCode) {
57+
this.handleCreditNotification(subscriptionId, responseCode);
58+
}
59+
60+
default void handleCreditNotification(byte subscriptionId, short responseCode) {
61+
// No-op by default
62+
}
63+
64+
@Override
65+
default OffsetSpecification handle(Client client, byte subscriptionId, boolean active) {
66+
this.handleConsumerUpdate(subscriptionId, active);
67+
return null;
68+
}
69+
70+
default void handleConsumerUpdate(byte subscriptionId, boolean active) {
71+
// No-op by default
72+
}
73+
74+
@Override
75+
default void handle(Client.ShutdownContext shutdownContext) {
76+
this.handleShutdown(shutdownContext);
77+
}
78+
79+
default void handleShutdown(Client.ShutdownContext shutdownContext) {
80+
// No-op by default
81+
}
82+
83+
@Override
84+
default void handle(String stream, short code) {
85+
this.handleMetadata(stream, code);
86+
}
87+
88+
default void handleMetadata(String stream, short code) {
89+
// No-op by default
90+
}
91+
92+
}

src/main/java/com/rabbitmq/stream/ConsumerBuilder.java

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,14 @@ public interface ConsumerBuilder {
5959
*/
6060
ConsumerBuilder messageHandler(MessageHandler messageHandler);
6161

62+
/**
63+
* Factory for the flow control strategy to be used when consuming messages.
64+
* @param consumerFlowControlStrategyBuilderFactory the factory
65+
* @return a fluent configurable builder for the flow control strategy
66+
* @param <T>
67+
*/
68+
<T extends ConsumerFlowControlStrategyBuilder<?>> T flowControlStrategy(ConsumerFlowControlStrategyBuilderFactory<?, T> consumerFlowControlStrategyBuilderFactory);
69+
6270
/**
6371
* The logical name of the {@link Consumer}.
6472
*
@@ -159,7 +167,7 @@ public interface ConsumerBuilder {
159167
Consumer build();
160168

161169
/** Manual tracking strategy. */
162-
interface ManualTrackingStrategy {
170+
interface ManualTrackingStrategy extends ConsumerBuilderAccessor {
163171

164172
/**
165173
* Interval to check if the last requested stored offset has been actually stored.
@@ -170,17 +178,10 @@ interface ManualTrackingStrategy {
170178
* @return the manual tracking strategy
171179
*/
172180
ManualTrackingStrategy checkInterval(Duration checkInterval);
173-
174-
/**
175-
* Go back to the builder.
176-
*
177-
* @return the consumer builder
178-
*/
179-
ConsumerBuilder builder();
180181
}
181182

182183
/** Auto-tracking strategy. */
183-
interface AutoTrackingStrategy {
184+
interface AutoTrackingStrategy extends ConsumerBuilderAccessor {
184185

185186
/**
186187
* Number of messages before storing.
@@ -201,12 +202,15 @@ interface AutoTrackingStrategy {
201202
* @return the auto-tracking strategy
202203
*/
203204
AutoTrackingStrategy flushInterval(Duration flushInterval);
205+
}
204206

207+
interface ConsumerBuilderAccessor {
205208
/**
206209
* Go back to the builder.
207210
*
208211
* @return the consumer builder
209212
*/
210213
ConsumerBuilder builder();
211214
}
215+
212216
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package com.rabbitmq.stream;
2+
3+
import com.rabbitmq.stream.impl.Client;
4+
5+
import java.util.Map;
6+
7+
/**
8+
* A built and configured flow control strategy for consumers.
9+
* Implementations may freely implement reactions to the various client callbacks.
10+
* When defined by each implementation, it may internally call {@link Client#credit} to ask for credits.
11+
*/
12+
public interface ConsumerFlowControlStrategy extends ClientDataHandler, AutoCloseable {
13+
14+
/**
15+
* Callback for handling a new stream subscription.
16+
* Called right before the subscription is sent to the actual client.
17+
*
18+
* @param subscriptionId The subscriptionId as specified by the Stream Protocol
19+
* @param stream The name of the stream being subscribed to
20+
* @param offsetSpecification The offset specification for this new subscription
21+
* @param subscriptionProperties The subscription properties for this new subscription
22+
* @return The initial credits that should be granted to this new subscription
23+
*/
24+
int handleSubscribe(
25+
byte subscriptionId,
26+
String stream,
27+
OffsetSpecification offsetSpecification,
28+
Map<String, String> subscriptionProperties
29+
);
30+
31+
/**
32+
* Callback for handling a stream unsubscription.
33+
* @param subscriptionId The subscriptionId as specified by the Stream Protocol
34+
*/
35+
default void handleUnsubscribe(byte subscriptionId) {
36+
// No-op by default
37+
}
38+
39+
@Override
40+
default void handleShutdown(Client.ShutdownContext shutdownContext) {
41+
this.close();
42+
}
43+
44+
@Override
45+
default void close() {
46+
// Override with cleanup logic, if applicable
47+
}
48+
49+
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package com.rabbitmq.stream;
2+
3+
import com.rabbitmq.stream.impl.Client;
4+
5+
import java.util.function.Supplier;
6+
7+
/**
8+
* Fluent builder for a {@link ConsumerFlowControlStrategyBuilderFactory}.
9+
*
10+
* @param <T> the type of {@link ConsumerFlowControlStrategy} to be built
11+
*/
12+
public interface ConsumerFlowControlStrategyBuilder<T extends ConsumerFlowControlStrategy> extends ConsumerBuilder.ConsumerBuilderAccessor {
13+
/**
14+
* Builds the actual FlowControlStrategy instance, for the Client with which it interoperates
15+
*
16+
* @param clientSupplier {@link Supplier <Client>} for retrieving the {@link Client}.
17+
* Is not a {@link Client} instance because the {@link Client} may be lazily initialized.
18+
* @return the FlowControlStrategy
19+
*/
20+
T build(Supplier<Client> clientSupplier);
21+
}
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package com.rabbitmq.stream;
2+
3+
/**
4+
* A strategy for regulating consumer flow when consuming from a RabbitMQ Stream.
5+
* @param <T> the type of {@link ConsumerFlowControlStrategy} to be built
6+
* @param <C> the type of fluent builder exposed by this factory. Must subclass {@link ConsumerFlowControlStrategyBuilder}
7+
*/
8+
public interface ConsumerFlowControlStrategyBuilderFactory<T extends ConsumerFlowControlStrategy, C extends ConsumerFlowControlStrategyBuilder<T>> {
9+
/**
10+
* Accessor for configuration builder with settings specific to each implementing strategy
11+
* @return {@link C} the specific consumer flow control strategy configuration builder
12+
*/
13+
C builder(ConsumerBuilder consumerBuilder);
14+
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
package com.rabbitmq.stream;
2+
3+
import com.rabbitmq.stream.impl.Client;
4+
5+
import java.util.Map;
6+
import java.util.function.Supplier;
7+
8+
/**
9+
* The flow control strategy that was always applied before the flow control strategy mechanism existed in the codebase.
10+
* Requests a set amount of credits after each chunk arrives.
11+
*/
12+
public class LegacyFlowControlStrategy extends AbstractFlowControlStrategy {
13+
14+
private final int initialCredits;
15+
private final int additionalCredits;
16+
17+
public LegacyFlowControlStrategy(Supplier<Client> clientSupplier) {
18+
this(clientSupplier, 1);
19+
}
20+
21+
public LegacyFlowControlStrategy(Supplier<Client> clientSupplier, int initialCredits) {
22+
this(clientSupplier, initialCredits, 1);
23+
}
24+
25+
public LegacyFlowControlStrategy(Supplier<Client> clientSupplier, int initialCredits, int additionalCredits) {
26+
super(clientSupplier);
27+
this.initialCredits = initialCredits;
28+
this.additionalCredits = additionalCredits;
29+
}
30+
31+
@Override
32+
public int handleSubscribe(
33+
byte subscriptionId,
34+
String stream,
35+
OffsetSpecification offsetSpecification,
36+
Map<String, String> subscriptionProperties
37+
) {
38+
return this.initialCredits;
39+
}
40+
41+
@Override
42+
public void handleChunk(byte subscriptionId, long offset, long messageCount, long dataSize) {
43+
mandatoryClient().credit(subscriptionId, this.additionalCredits);
44+
}
45+
46+
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package com.rabbitmq.stream;
2+
3+
import com.rabbitmq.stream.impl.Client;
4+
5+
import java.util.function.Supplier;
6+
7+
public class LegacyFlowControlStrategyBuilderFactory implements ConsumerFlowControlStrategyBuilderFactory<LegacyFlowControlStrategy, LegacyFlowControlStrategyBuilderFactory.LegacyFlowControlStrategyBuilder> {
8+
9+
public static final LegacyFlowControlStrategyBuilderFactory INSTANCE = new LegacyFlowControlStrategyBuilderFactory();
10+
11+
@Override
12+
public LegacyFlowControlStrategyBuilder builder(ConsumerBuilder consumerBuilder) {
13+
return new LegacyFlowControlStrategyBuilder(consumerBuilder);
14+
}
15+
16+
public static class LegacyFlowControlStrategyBuilder implements ConsumerFlowControlStrategyBuilder<LegacyFlowControlStrategy> {
17+
18+
private final ConsumerBuilder consumerBuilder;
19+
20+
private int initialCredits = 1;
21+
22+
private int additionalCredits = 1;
23+
24+
public LegacyFlowControlStrategyBuilder(ConsumerBuilder consumerBuilder) {
25+
this.consumerBuilder = consumerBuilder;
26+
}
27+
28+
@Override
29+
public LegacyFlowControlStrategy build(Supplier<Client> clientSupplier) {
30+
return new LegacyFlowControlStrategy(clientSupplier, this.initialCredits, this.additionalCredits);
31+
}
32+
33+
@Override
34+
public ConsumerBuilder builder() {
35+
return this.consumerBuilder;
36+
}
37+
38+
public LegacyFlowControlStrategyBuilder additionalCredits(int additionalCredits) {
39+
this.additionalCredits = additionalCredits;
40+
return this;
41+
}
42+
43+
public LegacyFlowControlStrategyBuilder initialCredits(int initialCredits) {
44+
this.initialCredits = initialCredits;
45+
return this;
46+
}
47+
}
48+
49+
}

0 commit comments

Comments
 (0)