Skip to content

Commit 68b9dd4

Browse files
authored
Merge pull request #1493 from rabbitmq/handle-nio-key-cancelled
Handle CancelledKeyException in NIO loop
2 parents 7ec9a56 + 63e4635 commit 68b9dd4

File tree

3 files changed

+39
-34
lines changed

3 files changed

+39
-34
lines changed

src/main/java/com/rabbitmq/client/impl/nio/NioLoop.java

+24-32
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2007-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
1+
// Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
22
//
33
// This software, the RabbitMQ Java client library, is triple-licensed under the
44
// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2
@@ -73,9 +73,9 @@ public void run() {
7373

7474
for (SelectionKey selectionKey : selector.keys()) {
7575
SocketChannelFrameHandlerState state = (SocketChannelFrameHandlerState) selectionKey.attachment();
76-
if (state.getConnection() != null && state.getConnection().getHeartbeat() > 0) {
77-
long now = System.currentTimeMillis();
78-
if ((now - state.getLastActivity()) > state.getConnection().getHeartbeat() * 1000 * 2) {
76+
if (state.getConnection() != null && state.getHeartbeatNanoSeconds() > 0) {
77+
long now = System.nanoTime();
78+
if ((now - state.getLastActivity()) > state.getHeartbeatNanoSeconds() * 2) {
7979
try {
8080
handleHeartbeatFailure(state);
8181
} catch (Exception e) {
@@ -91,7 +91,7 @@ public void run() {
9191
if (!writeRegistered && registrations.isEmpty() && writeRegistrations.isEmpty()) {
9292
// we can block, registrations will call Selector.wakeup()
9393
select = selector.select(1000);
94-
if (selector.keys().size() == 0) {
94+
if (selector.keys().isEmpty()) {
9595
// we haven't been doing anything for a while, shutdown state
9696
boolean clean = context.cleanUp();
9797
if (clean) {
@@ -135,11 +135,9 @@ public void run() {
135135
if (!key.isValid()) {
136136
continue;
137137
}
138-
139-
if (key.isReadable()) {
140-
final SocketChannelFrameHandlerState state = (SocketChannelFrameHandlerState) key.attachment();
141-
142-
try {
138+
final SocketChannelFrameHandlerState state = (SocketChannelFrameHandlerState) key.attachment();
139+
try {
140+
if (key.isReadable()) {
143141
if (!state.getChannel().isOpen()) {
144142
key.cancel();
145143
continue;
@@ -175,14 +173,14 @@ public void run() {
175173
}
176174
}
177175

178-
state.setLastActivity(System.currentTimeMillis());
179-
} catch (final Exception e) {
180-
LOGGER.warn("Error during reading frames", e);
181-
handleIoError(state, e);
182-
key.cancel();
183-
} finally {
184-
buffer.clear();
176+
state.setLastActivity(System.nanoTime());
185177
}
178+
} catch (final Exception e) {
179+
LOGGER.warn("Error during reading frames", e);
180+
handleIoError(state, e);
181+
key.cancel();
182+
} finally {
183+
buffer.clear();
186184
}
187185
}
188186
}
@@ -222,9 +220,8 @@ public void run() {
222220
continue;
223221
}
224222

225-
if (key.isWritable()) {
226-
boolean cancelKey = true;
227-
try {
223+
try {
224+
if (key.isWritable()) {
228225
if (!state.getChannel().isOpen()) {
229226
key.cancel();
230227
continue;
@@ -243,17 +240,12 @@ public void run() {
243240
written++;
244241
}
245242
outputStream.flush();
246-
if (!state.getWriteQueue().isEmpty()) {
247-
cancelKey = true;
248-
}
249-
} catch (Exception e) {
250-
handleIoError(state, e);
251-
} finally {
252-
state.endWriteSequence();
253-
if (cancelKey) {
254-
key.cancel();
255-
}
256243
}
244+
} catch (Exception e) {
245+
handleIoError(state, e);
246+
} finally {
247+
state.endWriteSequence();
248+
key.cancel();
257249
}
258250
}
259251
}
@@ -269,7 +261,7 @@ protected void handleIoError(SocketChannelFrameHandlerState state, Throwable ex)
269261
} else {
270262
try {
271263
state.close();
272-
} catch (IOException e) {
264+
} catch (IOException ignored) {
273265

274266
}
275267
}
@@ -284,7 +276,7 @@ protected void handleHeartbeatFailure(SocketChannelFrameHandlerState state) {
284276
} else {
285277
try {
286278
state.close();
287-
} catch (IOException e) {
279+
} catch (IOException ignored) {
288280

289281
}
290282
}

src/main/java/com/rabbitmq/client/impl/nio/SocketChannelFrameHandler.java

+4
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.io.IOException;
2525
import java.net.InetAddress;
2626
import java.net.SocketException;
27+
import java.time.Duration;
2728

2829
/**
2930
*
@@ -61,6 +62,9 @@ public int getPort() {
6162
@Override
6263
public void setTimeout(int timeoutMs) throws SocketException {
6364
state.getChannel().socket().setSoTimeout(timeoutMs);
65+
if (state.getConnection() != null) {
66+
state.setHeartbeat(Duration.ofSeconds(state.getConnection().getHeartbeat()));
67+
}
6468
}
6569

6670
@Override

src/main/java/com/rabbitmq/client/impl/nio/SocketChannelFrameHandlerState.java

+11-2
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2007-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
1+
// Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
22
//
33
// This software, the RabbitMQ Java client library, is triple-licensed under the
44
// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2
@@ -26,7 +26,7 @@
2626
import java.nio.ByteBuffer;
2727
import java.nio.channels.SelectionKey;
2828
import java.nio.channels.SocketChannel;
29-
29+
import java.time.Duration;
3030

3131
/**
3232
*
@@ -43,6 +43,7 @@ public class SocketChannelFrameHandlerState {
4343
private final NioQueue writeQueue;
4444

4545
private volatile AMQConnection connection;
46+
private volatile long heartbeatNanoSeconds = -1;
4647

4748
/** should be used only in the NIO read thread */
4849
private long lastActivity;
@@ -157,6 +158,10 @@ public void setConnection(AMQConnection connection) {
157158
this.connection = connection;
158159
}
159160

161+
void setHeartbeat(Duration ht) {
162+
this.heartbeatNanoSeconds = ht.toNanos();
163+
}
164+
160165
public void setLastActivity(long lastActivity) {
161166
this.lastActivity = lastActivity;
162167
}
@@ -165,6 +170,10 @@ public long getLastActivity() {
165170
return lastActivity;
166171
}
167172

173+
long getHeartbeatNanoSeconds() {
174+
return this.heartbeatNanoSeconds;
175+
}
176+
168177
void prepareForWriteSequence() {
169178
if(ssl) {
170179
plainOut.clear();

0 commit comments

Comments
 (0)