Skip to content

Commit 4f84430

Browse files
committed
Use dedicated flag to dispatch frame on connection closing
References #585
1 parent 0017b36 commit 4f84430

File tree

2 files changed

+10
-8
lines changed

2 files changed

+10
-8
lines changed

src/main/java/com/rabbitmq/stream/impl/Client.java

+9-5
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,7 @@ public class Client implements AutoCloseable {
158158
final ExecutorService dispatchingExecutorService;
159159
final TuneState tuneState;
160160
final AtomicBoolean closing = new AtomicBoolean(false);
161+
final AtomicBoolean shuttingDownDispatching = new AtomicBoolean(false);
161162
final ChunkChecksum chunkChecksum;
162163
final MetricsCollector metricsCollector;
163164
final CompressionCodecFactory compressionCodecFactory;
@@ -354,6 +355,7 @@ public void initChannel(SocketChannel ch) {
354355
() -> {
355356
if (dispatchingExecutorServiceFactory == null) {
356357
List<Runnable> outstandingTasks = this.dispatchingExecutorService.shutdownNow();
358+
this.shuttingDownDispatching.set(true);
357359
for (Runnable outstandingTask : outstandingTasks) {
358360
try {
359361
outstandingTask.run();
@@ -2778,7 +2780,9 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
27782780
}
27792781
} else {
27802782
FrameHandler frameHandler = ServerFrameHandler.lookup(commandId, version, m);
2781-
task = new FrameHandlerTask(frameHandler, Client.this, frameSize, ctx, m, closing);
2783+
task =
2784+
new FrameHandlerTask(
2785+
frameHandler, Client.this, frameSize, ctx, m, shuttingDownDispatching);
27822786
}
27832787

27842788
if (task != null) {
@@ -2854,26 +2858,26 @@ private static class FrameHandlerTask implements Runnable {
28542858
private final int frameSize;
28552859
private final ChannelHandlerContext ctx;
28562860
private final ByteBuf message;
2857-
private final AtomicBoolean closing;
2861+
private final AtomicBoolean shouldRelease;
28582862

28592863
private FrameHandlerTask(
28602864
FrameHandler frameHandler,
28612865
Client client,
28622866
int frameSize,
28632867
ChannelHandlerContext ctx,
28642868
ByteBuf message,
2865-
AtomicBoolean closing) {
2869+
AtomicBoolean shouldRelease) {
28662870
this.frameHandler = frameHandler;
28672871
this.client = client;
28682872
this.frameSize = frameSize;
28692873
this.ctx = ctx;
28702874
this.message = message;
2871-
this.closing = closing;
2875+
this.shouldRelease = shouldRelease;
28722876
}
28732877

28742878
@Override
28752879
public void run() {
2876-
if (this.closing.get()) {
2880+
if (this.shouldRelease.get()) {
28772881
try {
28782882
this.message.release();
28792883
} catch (Exception e) {

src/test/java/com/rabbitmq/stream/impl/AuthenticationTest.java

+1-3
Original file line numberDiff line numberDiff line change
@@ -105,9 +105,7 @@ public byte[] handleChallenge(
105105
}));
106106
} catch (StreamException e) {
107107
// there can be a timeout because the connection gets closed before returning the error
108-
assertThat(e.getMessage())
109-
.containsAnyOf(
110-
String.valueOf(Constants.RESPONSE_CODE_SASL_ERROR), "Could not get response in");
108+
assertThat(e).hasMessageContaining(String.valueOf(Constants.RESPONSE_CODE_SASL_ERROR));
111109
}
112110
}
113111

0 commit comments

Comments
 (0)