Skip to content

Commit 409906a

Browse files
committed
Release buffer from outstanding connection tasks
A connection can accumulate delivery frames in its dedicated executor service. The executor service is shut down when the connection closes and it can contain tasks that have not commenced execution. These tasks will never get executed, so their respective buffer must be released to avoid a memory leak. The leak can occur with consumer churn combined witht one-consumer connections. Fixes #585
1 parent 61aee06 commit 409906a

File tree

1 file changed

+50
-3
lines changed

1 file changed

+50
-3
lines changed

src/main/java/com/rabbitmq/stream/impl/Client.java

+50-3
Original file line numberDiff line numberDiff line change
@@ -352,9 +352,17 @@ public void initChannel(SocketChannel ch) {
352352
this.executorServiceClosing =
353353
Utils.makeIdempotent(
354354
() -> {
355-
this.dispatchingExecutorService.shutdownNow();
356355
if (dispatchingExecutorServiceFactory == null) {
357-
this.dispatchingExecutorService.shutdownNow();
356+
List<Runnable> outstandingTasks = this.dispatchingExecutorService.shutdownNow();
357+
for (Runnable outstandingTask : outstandingTasks) {
358+
try {
359+
outstandingTask.run();
360+
} catch (Exception e) {
361+
LOGGER.info(
362+
"Error while releasing buffer in outstanding connection tasks: {}",
363+
e.getMessage());
364+
}
365+
}
358366
} else {
359367
dispatchingExecutorServiceFactory.clientClosed(this.dispatchingExecutorService);
360368
}
@@ -2770,7 +2778,8 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
27702778
}
27712779
} else {
27722780
FrameHandler frameHandler = ServerFrameHandler.lookup(commandId, version, m);
2773-
task = () -> frameHandler.handle(Client.this, frameSize, ctx, m);
2781+
// task = () -> frameHandler.handle(Client.this, frameSize, ctx, m);
2782+
task = new FrameHandlerTask(frameHandler, Client.this, frameSize, ctx, m, closing);
27742783
}
27752784

27762785
if (task != null) {
@@ -2839,6 +2848,44 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
28392848
}
28402849
}
28412850

2851+
private static class FrameHandlerTask implements Runnable {
2852+
2853+
private final FrameHandler frameHandler;
2854+
private final Client client;
2855+
private final int frameSize;
2856+
private final ChannelHandlerContext ctx;
2857+
private final ByteBuf message;
2858+
private final AtomicBoolean closing;
2859+
2860+
private FrameHandlerTask(
2861+
FrameHandler frameHandler,
2862+
Client client,
2863+
int frameSize,
2864+
ChannelHandlerContext ctx,
2865+
ByteBuf message,
2866+
AtomicBoolean closing) {
2867+
this.frameHandler = frameHandler;
2868+
this.client = client;
2869+
this.frameSize = frameSize;
2870+
this.ctx = ctx;
2871+
this.message = message;
2872+
this.closing = closing;
2873+
}
2874+
2875+
@Override
2876+
public void run() {
2877+
if (this.closing.get()) {
2878+
try {
2879+
this.message.release();
2880+
} catch (Exception e) {
2881+
e.printStackTrace();
2882+
}
2883+
} else {
2884+
this.frameHandler.handle(this.client, this.frameSize, this.ctx, this.message);
2885+
}
2886+
}
2887+
}
2888+
28422889
private <T> OutstandingRequest<T> outstandingRequest() {
28432890
return new OutstandingRequest<>(this.rpcTimeout, this.host + ":" + this.port);
28442891
}

0 commit comments

Comments
 (0)