Skip to content

Handle CancelledKeyException in NIO loop #1493

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Dec 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 24 additions & 32 deletions src/main/java/com/rabbitmq/client/impl/nio/NioLoop.java
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2007-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
// Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
//
// This software, the RabbitMQ Java client library, is triple-licensed under the
// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2
Expand Down Expand Up @@ -73,9 +73,9 @@ public void run() {

for (SelectionKey selectionKey : selector.keys()) {
SocketChannelFrameHandlerState state = (SocketChannelFrameHandlerState) selectionKey.attachment();
if (state.getConnection() != null && state.getConnection().getHeartbeat() > 0) {
long now = System.currentTimeMillis();
if ((now - state.getLastActivity()) > state.getConnection().getHeartbeat() * 1000 * 2) {
if (state.getConnection() != null && state.getHeartbeatNanoSeconds() > 0) {
long now = System.nanoTime();
if ((now - state.getLastActivity()) > state.getHeartbeatNanoSeconds() * 2) {
try {
handleHeartbeatFailure(state);
} catch (Exception e) {
Expand All @@ -91,7 +91,7 @@ public void run() {
if (!writeRegistered && registrations.isEmpty() && writeRegistrations.isEmpty()) {
// we can block, registrations will call Selector.wakeup()
select = selector.select(1000);
if (selector.keys().size() == 0) {
if (selector.keys().isEmpty()) {
// we haven't been doing anything for a while, shutdown state
boolean clean = context.cleanUp();
if (clean) {
Expand Down Expand Up @@ -135,11 +135,9 @@ public void run() {
if (!key.isValid()) {
continue;
}

if (key.isReadable()) {
final SocketChannelFrameHandlerState state = (SocketChannelFrameHandlerState) key.attachment();

try {
final SocketChannelFrameHandlerState state = (SocketChannelFrameHandlerState) key.attachment();
try {
if (key.isReadable()) {
if (!state.getChannel().isOpen()) {
key.cancel();
continue;
Expand Down Expand Up @@ -175,14 +173,14 @@ public void run() {
}
}

state.setLastActivity(System.currentTimeMillis());
} catch (final Exception e) {
LOGGER.warn("Error during reading frames", e);
handleIoError(state, e);
key.cancel();
} finally {
buffer.clear();
state.setLastActivity(System.nanoTime());
}
} catch (final Exception e) {
LOGGER.warn("Error during reading frames", e);
handleIoError(state, e);
key.cancel();
} finally {
buffer.clear();
}
}
}
Expand Down Expand Up @@ -222,9 +220,8 @@ public void run() {
continue;
}

if (key.isWritable()) {
boolean cancelKey = true;
try {
try {
if (key.isWritable()) {
if (!state.getChannel().isOpen()) {
key.cancel();
continue;
Expand All @@ -243,17 +240,12 @@ public void run() {
written++;
}
outputStream.flush();
if (!state.getWriteQueue().isEmpty()) {
cancelKey = true;
}
} catch (Exception e) {
handleIoError(state, e);
} finally {
state.endWriteSequence();
if (cancelKey) {
key.cancel();
}
}
} catch (Exception e) {
handleIoError(state, e);
} finally {
state.endWriteSequence();
key.cancel();
}
}
}
Expand All @@ -269,7 +261,7 @@ protected void handleIoError(SocketChannelFrameHandlerState state, Throwable ex)
} else {
try {
state.close();
} catch (IOException e) {
} catch (IOException ignored) {

}
}
Expand All @@ -284,7 +276,7 @@ protected void handleHeartbeatFailure(SocketChannelFrameHandlerState state) {
} else {
try {
state.close();
} catch (IOException e) {
} catch (IOException ignored) {

}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.io.IOException;
import java.net.InetAddress;
import java.net.SocketException;
import java.time.Duration;

/**
*
Expand Down Expand Up @@ -61,6 +62,9 @@ public int getPort() {
@Override
public void setTimeout(int timeoutMs) throws SocketException {
state.getChannel().socket().setSoTimeout(timeoutMs);
if (state.getConnection() != null) {
state.setHeartbeat(Duration.ofSeconds(state.getConnection().getHeartbeat()));
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2007-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
// Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
//
// This software, the RabbitMQ Java client library, is triple-licensed under the
// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2
Expand Down Expand Up @@ -26,7 +26,7 @@
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;

import java.time.Duration;

/**
*
Expand All @@ -43,6 +43,7 @@ public class SocketChannelFrameHandlerState {
private final NioQueue writeQueue;

private volatile AMQConnection connection;
private volatile long heartbeatNanoSeconds = -1;

/** should be used only in the NIO read thread */
private long lastActivity;
Expand Down Expand Up @@ -157,6 +158,10 @@ public void setConnection(AMQConnection connection) {
this.connection = connection;
}

void setHeartbeat(Duration ht) {
this.heartbeatNanoSeconds = ht.toNanos();
}

public void setLastActivity(long lastActivity) {
this.lastActivity = lastActivity;
}
Expand All @@ -165,6 +170,10 @@ public long getLastActivity() {
return lastActivity;
}

long getHeartbeatNanoSeconds() {
return this.heartbeatNanoSeconds;
}

void prepareForWriteSequence() {
if(ssl) {
plainOut.clear();
Expand Down