Skip to content

Commit a156eb0

Browse files
committed
Remove MessageHandlingListenerConsumerBuilderAccessor
1 parent 70486f4 commit a156eb0

File tree

5 files changed

+51
-73
lines changed

5 files changed

+51
-73
lines changed

src/main/java/com/rabbitmq/stream/ConsumerBuilder.java

Lines changed: 29 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@
1616
import com.rabbitmq.stream.flow.ConsumerFlowControlStrategy;
1717
import com.rabbitmq.stream.flow.ConsumerFlowControlStrategyBuilder;
1818
import com.rabbitmq.stream.flow.ConsumerFlowControlStrategyBuilderFactory;
19-
import com.rabbitmq.stream.flow.MessageHandlingListener;
20-
import com.rabbitmq.stream.flow.MessageHandlingListenerConsumerBuilderAccessor;
2119

2220
import java.time.Duration;
2321

@@ -66,22 +64,41 @@ public interface ConsumerBuilder {
6664
ConsumerBuilder messageHandler(MessageHandler messageHandler);
6765

6866
/**
69-
* Configure credit parameters for synchronous flow control.
70-
*
71-
* @param initial Credits to ask for with each new subscription
72-
* @param onChunkDelivery Credits to ask for after a chunk is delivered
67+
* Configure prefetching parameters for synchronous flow control.
68+
*
69+
* <p>
70+
* Treat the parameters as an abstract scale at the {@link Consumer} level.
71+
* </p>
72+
*
73+
* @param initialPrefetchLevel The initial level of message pre-fetching.
74+
* This may me implemented as the credits to initially
75+
* ask for with each new subscription,
76+
* but do not depend strongly on this aspect.
77+
* @param prefetchLevelAfterDelivery The level of message pre-fetching after the initial batch.
78+
* This may be implemented as the credits to ask for after a chunk
79+
* is delivered on each subscription,
80+
* but do not depend strongly on this aspect.
81+
* <b>
82+
* The recommended value is <code>1</code>.
83+
* Higher values may cause excessive over-fetching.
84+
* </b>
7385
* @return this {@link ConsumerBuilder}
7486
*/
75-
ConsumerBuilder synchronousControlFlow(int initial, int onChunkDelivery);
87+
ConsumerBuilder synchronousControlFlow(int initialPrefetchLevel, int prefetchLevelAfterDelivery);
7688

7789
/**
78-
* Configure credit parameters for asynchronous flow control.
90+
* Configure prefetching parameters for asynchronous flow control.
91+
*
92+
* <p>
93+
* Treat the parameters as an abstract scale at the {@link Consumer} level.
94+
* </p>
7995
*
80-
* @param concurrencyLevel Maximum chunks to have in-processing at a given moment
81-
* @return A {@link MessageHandlingListenerConsumerBuilderAccessor} for obtaining the {@link MessageHandlingListener}
82-
* and navigating fluently back to this {@link ConsumerBuilder}
96+
* @param prefetchLevel The desired level of message pre-fetching.
97+
* This may be implemented as the maximum chunks to have in processing or pending arrival
98+
* per subscription at a given moment, but do not depend strongly on this aspect.
99+
* @return this {@link ConsumerBuilder}
83100
*/
84-
MessageHandlingListenerConsumerBuilderAccessor asynchronousControlFlow(int concurrencyLevel);
101+
ConsumerBuilder asynchronousControlFlow(int prefetchLevel);
85102

86103
/**
87104
* Factory for the flow control strategy to be used when consuming messages.

src/main/java/com/rabbitmq/stream/flow/MessageHandlingListenerConsumerBuilderAccessor.java

Lines changed: 0 additions & 7 deletions
This file was deleted.

src/main/java/com/rabbitmq/stream/impl/StreamConsumerBuilder.java

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import com.rabbitmq.stream.flow.ConsumerFlowControlStrategy;
2424
import com.rabbitmq.stream.flow.ConsumerFlowControlStrategyBuilder;
2525
import com.rabbitmq.stream.flow.ConsumerFlowControlStrategyBuilderFactory;
26-
import com.rabbitmq.stream.flow.MessageHandlingListenerConsumerBuilderAccessor;
2726
import com.rabbitmq.stream.impl.flow.MaximumChunksPerSubscriptionAsyncConsumerFlowControlStrategy;
2827
import com.rabbitmq.stream.impl.flow.SynchronousConsumerFlowControlStrategy;
2928

@@ -151,32 +150,31 @@ public ConsumerBuilder noTrackingStrategy() {
151150
* Configure credit parameters for flow control.
152151
* Implies usage of a traditional {@link SynchronousConsumerFlowControlStrategy}.
153152
*
154-
* @param initial Credits to ask for with each new subscription
155-
* @param onChunkDelivery Credits to ask for after a chunk is delivered
153+
* @param initialPrefetchLevel Credits to ask for with each new subscription
154+
* @param prefetchLevelAfterDelivery Credits to ask for after a chunk is delivered
156155
* @return this {@link StreamConsumerBuilder}
157156
*/
158157
@Override
159-
public StreamConsumerBuilder synchronousControlFlow(int initial, int onChunkDelivery) {
160-
if (initial <= 0 || onChunkDelivery <= 0) {
158+
public StreamConsumerBuilder synchronousControlFlow(int initialPrefetchLevel, int prefetchLevelAfterDelivery) {
159+
if (initialPrefetchLevel <= 0 || prefetchLevelAfterDelivery <= 0) {
161160
throw new IllegalArgumentException("Credits must be positive");
162161
}
163162
this.consumerFlowControlStrategyBuilder = SynchronousConsumerFlowControlStrategy
164163
.builder(this)
165-
.initialCredits(initial)
166-
.additionalCredits(onChunkDelivery);
164+
.initialCredits(initialPrefetchLevel)
165+
.additionalCredits(prefetchLevelAfterDelivery);
167166
return this;
168167
}
169168

170169
@Override
171-
public MessageHandlingListenerConsumerBuilderAccessor asynchronousControlFlow(int concurrencyLevel) {
172-
if (concurrencyLevel <= 0) {
170+
public ConsumerBuilder asynchronousControlFlow(int prefetchLevel) {
171+
if (prefetchLevel <= 0) {
173172
throw new IllegalArgumentException("ConcurrencyLevel must be positive");
174173
}
175-
MaximumChunksPerSubscriptionAsyncConsumerFlowControlStrategy.Builder localBuilder = MaximumChunksPerSubscriptionAsyncConsumerFlowControlStrategy
174+
this.consumerFlowControlStrategyBuilder = MaximumChunksPerSubscriptionAsyncConsumerFlowControlStrategy
176175
.builder(this)
177-
.maximumInflightChunksPerSubscription(concurrencyLevel);
178-
this.consumerFlowControlStrategyBuilder = localBuilder;
179-
return localBuilder;
176+
.maximumInflightChunksPerSubscription(prefetchLevel);
177+
return this;
180178
}
181179

182180
StreamConsumerBuilder lazyInit(boolean lazyInit) {

src/main/java/com/rabbitmq/stream/impl/flow/MaximumChunksPerSubscriptionAsyncConsumerFlowControlStrategy.java

Lines changed: 2 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,11 @@
66
import com.rabbitmq.stream.flow.ConsumerFlowControlStrategyBuilder;
77
import com.rabbitmq.stream.flow.CreditAsker;
88
import com.rabbitmq.stream.flow.MessageHandlingListener;
9-
import com.rabbitmq.stream.flow.MessageHandlingListenerConsumerBuilderAccessor;
109
import com.rabbitmq.stream.impl.ConsumerStatisticRecorder;
1110
import org.slf4j.Logger;
1211
import org.slf4j.LoggerFactory;
1312

14-
import java.util.Collections;
1513
import java.util.Map;
16-
import java.util.Set;
17-
import java.util.WeakHashMap;
1814
import java.util.function.IntUnaryOperator;
1915
import java.util.function.Supplier;
2016

@@ -91,26 +87,22 @@ public static Builder builder(ConsumerBuilder consumerBuilder) {
9187
return new Builder(consumerBuilder);
9288
}
9389

94-
public static class Builder implements ConsumerFlowControlStrategyBuilder<MaximumChunksPerSubscriptionAsyncConsumerFlowControlStrategy>, MessageHandlingListenerConsumerBuilderAccessor {
90+
public static class Builder implements ConsumerFlowControlStrategyBuilder<MaximumChunksPerSubscriptionAsyncConsumerFlowControlStrategy>, ConsumerBuilder.ConsumerBuilderAccessor {
9591

9692
private final ConsumerBuilder consumerBuilder;
9793

9894
private int maximumInflightChunksPerSubscription = 1;
9995

100-
private final Set<MessageHandlingListener> instances = Collections.newSetFromMap(new WeakHashMap<>());
101-
10296
public Builder(ConsumerBuilder consumerBuilder) {
10397
this.consumerBuilder = consumerBuilder;
10498
}
10599

106100
@Override
107101
public MaximumChunksPerSubscriptionAsyncConsumerFlowControlStrategy build(Supplier<CreditAsker> creditAskerSupplier) {
108-
MaximumChunksPerSubscriptionAsyncConsumerFlowControlStrategy built = new MaximumChunksPerSubscriptionAsyncConsumerFlowControlStrategy(
102+
return new MaximumChunksPerSubscriptionAsyncConsumerFlowControlStrategy(
109103
creditAskerSupplier,
110104
this.maximumInflightChunksPerSubscription
111105
);
112-
instances.add(built);
113-
return built;
114106
}
115107

116108
@Override
@@ -123,18 +115,6 @@ public Builder maximumInflightChunksPerSubscription(int maximumInflightChunksPer
123115
return this;
124116
}
125117

126-
@Override
127-
public MessageHandlingListener messageHandlingListener() {
128-
return this::messageHandlingMulticaster;
129-
}
130-
131-
private boolean messageHandlingMulticaster(MessageHandler.Context context) {
132-
boolean changed = false;
133-
for(MessageHandlingListener instance : instances) {
134-
changed = changed || instance.markHandled(context);
135-
}
136-
return changed;
137-
}
138118
}
139119

140120
}

src/test/java/com/rabbitmq/stream/impl/StreamConsumerTest.java

Lines changed: 9 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,6 @@
2626
import com.rabbitmq.stream.OffsetSpecification;
2727
import com.rabbitmq.stream.Producer;
2828
import com.rabbitmq.stream.StreamDoesNotExistException;
29-
import com.rabbitmq.stream.flow.MessageHandlingListener;
30-
import com.rabbitmq.stream.flow.MessageHandlingListenerConsumerBuilderAccessor;
3129
import com.rabbitmq.stream.impl.Client.QueryOffsetResponse;
3230
import com.rabbitmq.stream.impl.MonitoringTestUtils.ConsumerInfo;
3331
import com.rabbitmq.stream.impl.flow.MaximumChunksPerSubscriptionAsyncConsumerFlowControlStrategy;
@@ -235,19 +233,15 @@ void consumeWithAsyncConsumerFlowControl() throws Exception {
235233
AtomicLong chunkTimestamp = new AtomicLong();
236234

237235
ConsumerBuilder consumerBuilder = environment.consumerBuilder().stream(stream)
238-
.offset(OffsetSpecification.first());
239-
240-
MessageHandlingListenerConsumerBuilderAccessor messageHandlingListenerConsumerBuilderAccessor = consumerBuilder
236+
.offset(OffsetSpecification.first())
241237
.asynchronousControlFlow(5);
242-
MessageHandlingListener messageHandlingListener = messageHandlingListenerConsumerBuilderAccessor.messageHandlingListener();
243238

244239
List<MessageHandler.Context> messageContexts = new ArrayList<>();
245240

246241
AtomicBoolean shouldInstaConsume = new AtomicBoolean(false);
247242
AtomicBoolean unhandledOnInstaConsume = new AtomicBoolean(false);
248243

249-
consumerBuilder = messageHandlingListenerConsumerBuilderAccessor
250-
.builder()
244+
consumerBuilder = consumerBuilder
251245
.messageHandler(
252246
(context, message) -> {
253247
if(shouldInstaConsume.get()) {
@@ -268,7 +262,7 @@ void consumeWithAsyncConsumerFlowControl() throws Exception {
268262
assertThat(chunkTimestamp.get()).isNotZero();
269263

270264
shouldInstaConsume.set(true);
271-
boolean allMarkedHandled = messageContexts.parallelStream().allMatch(messageHandlingListener::markHandled);
265+
boolean allMarkedHandled = messageContexts.parallelStream().allMatch(MessageHandler.Context::markHandled);
272266
assertThat(allMarkedHandled).isTrue();
273267

274268
assertThat(consumeLatch.await(10, TimeUnit.SECONDS)).isTrue();
@@ -309,7 +303,6 @@ void consumeWithCustomAsyncConsumerFlowControl() throws Exception {
309303
MaximumChunksPerSubscriptionAsyncConsumerFlowControlStrategy.Builder flowControlStrategyBuilder = consumerBuilder
310304
.customFlowControlStrategy(MaximumChunksPerSubscriptionAsyncConsumerFlowControlStrategy::builder)
311305
.maximumInflightChunksPerSubscription(5);
312-
MessageHandlingListener messageHandlingListener = flowControlStrategyBuilder.messageHandlingListener();
313306

314307
List<MessageHandler.Context> messageContexts = new ArrayList<>();
315308

@@ -321,7 +314,7 @@ void consumeWithCustomAsyncConsumerFlowControl() throws Exception {
321314
.messageHandler(
322315
(context, message) -> {
323316
if(shouldInstaConsume.get()) {
324-
if(!messageHandlingListener.markHandled(context)) {
317+
if(!context.markHandled()) {
325318
unhandledOnInstaConsume.set(true);
326319
}
327320
} else {
@@ -338,7 +331,7 @@ void consumeWithCustomAsyncConsumerFlowControl() throws Exception {
338331
assertThat(chunkTimestamp.get()).isNotZero();
339332

340333
shouldInstaConsume.set(true);
341-
boolean allMarkedHandled = messageContexts.parallelStream().allMatch(messageHandlingListener::markHandled);
334+
boolean allMarkedHandled = messageContexts.parallelStream().allMatch(MessageHandler.Context::markHandled);
342335
assertThat(allMarkedHandled).isTrue();
343336

344337
assertThat(consumeLatch.await(10, TimeUnit.SECONDS)).isTrue();
@@ -721,12 +714,9 @@ void consumerWithAsyncFlowControlShouldKeepConsumingAfterDisruption(
721714
CountDownLatch consumeLatch = new CountDownLatch(messageCount);
722715
CountDownLatch consumeLatchSecondWave = new CountDownLatch(messageCount * 2);
723716

724-
ConsumerBuilder consumerBuilder = environment.consumerBuilder().stream(s);
725-
726-
MessageHandlingListenerConsumerBuilderAccessor messageHandlingListenerConsumerBuilderAccessor = consumerBuilder
717+
ConsumerBuilder consumerBuilder = environment.consumerBuilder()
718+
.stream(s)
727719
.asynchronousControlFlow(5);
728-
MessageHandlingListener messageHandlingListener = messageHandlingListenerConsumerBuilderAccessor.messageHandlingListener();
729-
consumerBuilder = messageHandlingListenerConsumerBuilderAccessor.builder();
730720

731721
List<MessageHandler.Context> messageContexts = new ArrayList<>();
732722

@@ -740,7 +730,7 @@ void consumerWithAsyncFlowControlShouldKeepConsumingAfterDisruption(
740730
.messageHandler(
741731
(context, message) -> {
742732
if(shouldInstaConsume.get()) {
743-
if(!messageHandlingListener.markHandled(context)) {
733+
if(!context.markHandled()) {
744734
unhandledOnInstaConsume.set(true);
745735
}
746736
} else {
@@ -759,7 +749,7 @@ void consumerWithAsyncFlowControlShouldKeepConsumingAfterDisruption(
759749
assertThat(consumer.isOpen()).isTrue();
760750

761751
shouldInstaConsume.set(true);
762-
boolean allMarkedHandled = messageContexts.parallelStream().allMatch(messageHandlingListener::markHandled);
752+
boolean allMarkedHandled = messageContexts.parallelStream().allMatch(MessageHandler.Context::markHandled);
763753
assertThat(allMarkedHandled).isTrue();
764754

765755
assertThat(consumeLatch.await(20, TimeUnit.SECONDS)).isTrue();

0 commit comments

Comments
 (0)