Skip to content

Commit 34204ec

Browse files
committed
Stop dispatching as soon as Consumer#close() is called
References #142
1 parent 80ae339 commit 34204ec

File tree

3 files changed

+54
-8
lines changed

3 files changed

+54
-8
lines changed

src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java

+1-7
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,6 @@ private static class SubscriptionTracker {
236236
private volatile boolean hasReceivedSomething = false;
237237
private volatile byte subscriptionIdInClient;
238238
private volatile ClientSubscriptionsManager manager;
239-
private volatile boolean closing = false;
240239

241240
private SubscriptionTracker(
242241
StreamConsumer consumer,
@@ -254,7 +253,6 @@ private SubscriptionTracker(
254253
}
255254

256255
synchronized void cancel() {
257-
this.closing = true;
258256
if (this.manager != null) {
259257
LOGGER.debug("Removing consumer from manager " + this.consumer);
260258
this.manager.remove(this);
@@ -263,10 +261,6 @@ synchronized void cancel() {
263261
}
264262
}
265263

266-
boolean isClosing() {
267-
return this.closing;
268-
}
269-
270264
synchronized void assign(byte subscriptionIdInClient, ClientSubscriptionsManager manager) {
271265
this.subscriptionIdInClient = subscriptionIdInClient;
272266
this.manager = manager;
@@ -420,7 +414,7 @@ private ClientSubscriptionsManager(
420414
(client, subscriptionId, offset, messageCount, dataSize) -> {
421415
SubscriptionTracker subscriptionTracker =
422416
subscriptionTrackers.get(subscriptionId & 0xFF);
423-
if (subscriptionTracker != null && !subscriptionTracker.isClosing()) {
417+
if (subscriptionTracker != null && subscriptionTracker.consumer.isOpen()) {
424418
client.credit(subscriptionId, 1);
425419
} else {
426420
LOGGER.debug(

src/main/java/com/rabbitmq/stream/impl/StreamConsumer.java

+8-1
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,13 @@ class StreamConsumer implements Consumer {
9595
messageHandlerWithOrWithoutTracking = messageHandler;
9696
}
9797

98+
MessageHandler closedAwareMessageHandler =
99+
(context, message) -> {
100+
if (!closed.get()) {
101+
messageHandlerWithOrWithoutTracking.handle(context, message);
102+
}
103+
};
104+
98105
Runnable init =
99106
() -> {
100107
this.closingCallback =
@@ -104,7 +111,7 @@ class StreamConsumer implements Consumer {
104111
offsetSpecification,
105112
this.name,
106113
subscriptionListener,
107-
messageHandlerWithOrWithoutTracking);
114+
closedAwareMessageHandler);
108115

109116
this.status = Status.RUNNING;
110117
};

src/test/java/com/rabbitmq/stream/impl/StreamConsumerTest.java

+45
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,51 @@ void consume() throws Exception {
185185
consumer.close();
186186
}
187187

188+
@Test
189+
void closeOnCondition() throws Exception {
190+
int messageCount = 50_000;
191+
CountDownLatch publishLatch = new CountDownLatch(messageCount);
192+
Client client =
193+
cf.get(
194+
new Client.ClientParameters()
195+
.publishConfirmListener((publisherId, publishingId) -> publishLatch.countDown()));
196+
197+
client.declarePublisher(b(1), null, stream);
198+
IntStream.range(0, messageCount)
199+
.forEach(
200+
i ->
201+
client.publish(
202+
b(1),
203+
Collections.singletonList(
204+
client.messageBuilder().addData("".getBytes()).build())));
205+
206+
assertThat(publishLatch.await(10, TimeUnit.SECONDS)).isTrue();
207+
208+
int messagesToProcess = 20_000;
209+
210+
CountDownLatch consumeLatch = new CountDownLatch(1);
211+
AtomicInteger receivedMessages = new AtomicInteger();
212+
AtomicInteger processedMessages = new AtomicInteger();
213+
214+
Consumer consumer =
215+
environment.consumerBuilder().stream(stream)
216+
.offset(OffsetSpecification.first())
217+
.messageHandler(
218+
(context, message) -> {
219+
if (receivedMessages.incrementAndGet() <= messagesToProcess) {
220+
processedMessages.incrementAndGet();
221+
}
222+
if (receivedMessages.get() == messagesToProcess) {
223+
consumeLatch.countDown();
224+
}
225+
})
226+
.build();
227+
228+
assertThat(consumeLatch.await(10, TimeUnit.SECONDS)).isTrue();
229+
consumer.close();
230+
assertThat(processedMessages).hasValue(messagesToProcess);
231+
}
232+
188233
@Test
189234
void creatingConsumerOnNonExistingStreamShouldThrowException() {
190235
String nonExistingStream = UUID.randomUUID().toString();

0 commit comments

Comments
 (0)