Skip to content

Commit ed3deab

Browse files
committed
Clean NIO state automatically
Use NIO by default (except in SSLTests). Fixes #11
1 parent 6335aa4 commit ed3deab

14 files changed

+318
-80
lines changed

src/main/java/com/rabbitmq/client/ConnectionFactory.java

+11-2
Original file line numberDiff line numberDiff line change
@@ -108,8 +108,9 @@ public class ConnectionFactory implements Cloneable {
108108

109109
private MetricsCollector metricsCollector;
110110

111-
private boolean nio = false;
111+
private boolean nio = true;
112112
private FrameHandlerFactory frameHandlerFactory;
113+
private ExecutorService nioExecutor;
113114

114115
/** @return the default host to use for connections */
115116
public String getHost() {
@@ -647,7 +648,11 @@ public MetricsCollector getMetricsCollector() {
647648
protected synchronized FrameHandlerFactory createFrameHandlerFactory() throws IOException {
648649
if(nio) {
649650
if(this.frameHandlerFactory == null) {
650-
this.frameHandlerFactory = new SocketChannelFrameHandlerFactory(connectionTimeout, socketConf, isSSL());
651+
if(this.nioExecutor == null) {
652+
this.frameHandlerFactory = new SocketChannelFrameHandlerFactory(connectionTimeout, socketConf, isSSL(), this.threadFactory);
653+
} else {
654+
this.frameHandlerFactory = new SocketChannelFrameHandlerFactory(connectionTimeout, socketConf, isSSL(), this.nioExecutor);
655+
}
651656
}
652657
return this.frameHandlerFactory;
653658
} else {
@@ -1034,4 +1039,8 @@ public void setNetworkRecoveryInterval(long networkRecoveryInterval) {
10341039
public void setNio(boolean nio) {
10351040
this.nio = nio;
10361041
}
1042+
1043+
public void setNioExecutor(ExecutorService nioExecutor) {
1044+
this.nioExecutor = nioExecutor;
1045+
}
10371046
}

src/main/java/com/rabbitmq/client/impl/AMQConnection.java

+10-2
Original file line numberDiff line numberDiff line change
@@ -577,6 +577,14 @@ boolean handleReadFrame(Frame frame) {
577577
return false;
578578
}
579579

580+
public boolean isRunning() {
581+
return _running;
582+
}
583+
584+
public boolean hasBrokerInitiatedShutdown() {
585+
return _brokerInitiatedShutdown;
586+
}
587+
580588
private void readFrame(Frame frame) throws IOException {
581589
if (frame != null) {
582590
_missedHeartbeats = 0;
@@ -646,7 +654,7 @@ private void handleFailure(Throwable ex) {
646654
}
647655
}
648656

649-
private void doFinalShutdown() {
657+
void doFinalShutdown() {
650658
_frameHandler.close();
651659
_appContinuation.set(null);
652660
notifyListeners();
@@ -798,7 +806,7 @@ public void run() {
798806
// this releases the thread
799807
} finally {
800808
_running = false;
801-
_channel0.notifyOutstandingRpc(cause);
809+
_channel0.notifyOutstandingRpc(cause);
802810
}
803811
}
804812
}

src/main/java/com/rabbitmq/client/impl/ForgivingExceptionHandler.java

+4-3
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020

2121
import java.io.IOException;
2222
import java.net.ConnectException;
23-
import java.net.SocketException;
2423

2524
/**
2625
* An implementation of {@link com.rabbitmq.client.ExceptionHandler} that does not
@@ -120,7 +119,9 @@ protected void log(String message, Throwable e) {
120119
}
121120

122121
private boolean isSocketClosedOrConnectionReset(Throwable e) {
123-
return e instanceof SocketException &&
124-
("Connection reset".equals(e.getMessage()) || "Socket closed".equals(e.getMessage()));
122+
return e instanceof IOException &&
123+
("Connection reset".equals(e.getMessage()) || "Socket closed".equals(e.getMessage()) ||
124+
"Connection reset by peer".equals(e.getMessage())
125+
);
125126
}
126127
}

src/main/java/com/rabbitmq/client/impl/SocketChannelFrameHandlerFactory.java

+141-23
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,10 @@
3232
import java.util.Iterator;
3333
import java.util.Queue;
3434
import java.util.Set;
35-
import java.util.concurrent.ExecutorService;
36-
import java.util.concurrent.Executors;
37-
import java.util.concurrent.LinkedBlockingQueue;
38-
import java.util.concurrent.ThreadFactory;
35+
import java.util.concurrent.*;
36+
import java.util.concurrent.atomic.AtomicLong;
37+
import java.util.concurrent.locks.Lock;
38+
import java.util.concurrent.locks.ReentrantLock;
3939

4040
/**
4141
*
@@ -44,19 +44,31 @@ public class SocketChannelFrameHandlerFactory extends AbstractFrameHandlerFactor
4444

4545
private static final Logger LOGGER = LoggerFactory.getLogger(SocketChannelFrameHandlerFactory.class);
4646

47-
private final SelectorState readSelectorState;
48-
private final SelectorState writeSelectorState;
47+
private SelectorState readSelectorState;
48+
private SelectorState writeSelectorState;
4949

50-
// FIXME provide constructor with executorservice
51-
private final ExecutorService executorService = null;
50+
private final ExecutorService executorService;
5251

53-
private final ThreadFactory threadFactory = Executors.defaultThreadFactory();
52+
private final ThreadFactory threadFactory;
5453

55-
public SocketChannelFrameHandlerFactory(int connectionTimeout, SocketConfigurator configurator, boolean ssl) throws IOException {
54+
private final Lock stateLock = new ReentrantLock();
55+
56+
private final AtomicLong connectionCount = new AtomicLong();
57+
58+
private Thread readThread, writeThread;
59+
60+
private Future<?> readTask, writeTask;
61+
62+
public SocketChannelFrameHandlerFactory(int connectionTimeout, SocketConfigurator configurator, boolean ssl, ExecutorService executorService) throws IOException {
63+
super(connectionTimeout, configurator, ssl);
64+
this.executorService = executorService;
65+
this.threadFactory = null;
66+
}
67+
68+
public SocketChannelFrameHandlerFactory(int connectionTimeout, SocketConfigurator configurator, boolean ssl, ThreadFactory threadFactory) throws IOException {
5669
super(connectionTimeout, configurator, ssl);
57-
this.readSelectorState = new SelectorState(Selector.open());
58-
this.writeSelectorState = new SelectorState(Selector.open());
59-
startIoLoops();
70+
this.executorService = null;
71+
this.threadFactory = threadFactory;
6072
}
6173

6274
@Override
@@ -67,6 +79,15 @@ public FrameHandler create(Address addr) throws IOException {
6779
SocketChannel channel = SocketChannel.open();
6880
configurator.configure(channel.socket());
6981

82+
// lock
83+
stateLock.lock();
84+
try {
85+
connectionCount.incrementAndGet();
86+
initStateIfNecessary();
87+
} finally {
88+
stateLock.unlock();
89+
}
90+
7091
// FIXME handle connection failure
7192
channel.connect(address);
7293

@@ -78,15 +99,68 @@ public FrameHandler create(Address addr) throws IOException {
7899
return frameHandler;
79100
}
80101

102+
protected boolean cleanUp() {
103+
// get connection count
104+
// lock
105+
// if connection count has changed, do nothing
106+
// if connection count hasn't changed, clean
107+
long connectionCountNow = connectionCount.get();
108+
stateLock.lock();
109+
try {
110+
if(connectionCountNow != connectionCount.get()) {
111+
// a connection request has come in meanwhile, don't do anything
112+
return false;
113+
}
114+
115+
if(this.executorService == null) {
116+
this.writeThread.interrupt();
117+
} else {
118+
boolean canceled = this.writeTask.cancel(true);
119+
if(!canceled) {
120+
LOGGER.info("Could not stop write NIO task");
121+
}
122+
}
123+
124+
try {
125+
readSelectorState.selector.close();
126+
} catch (IOException e) {
127+
LOGGER.warn("Could not close read selector: {}", e.getMessage());
128+
}
129+
try {
130+
writeSelectorState.selector.close();
131+
} catch (IOException e) {
132+
LOGGER.warn("Could not close write selector: {}", e.getMessage());
133+
}
134+
135+
this.readSelectorState = null;
136+
this.writeSelectorState = null;
137+
138+
} finally {
139+
stateLock.unlock();
140+
}
141+
return true;
142+
}
143+
144+
protected void initStateIfNecessary() throws IOException {
145+
if(this.readSelectorState == null) {
146+
// create selectors
147+
this.readSelectorState = new SelectorState(Selector.open());
148+
this.writeSelectorState = new SelectorState(Selector.open());
149+
150+
// create threads/tasks
151+
startIoLoops();
152+
}
153+
}
154+
81155
protected void startIoLoops() {
82156
if(executorService == null) {
83-
Thread readThread = Environment.newThread(threadFactory, new ReadLoop(readSelectorState), "rabbitmq-nio-read");
84-
Thread writeThread = Environment.newThread(threadFactory, new WriteLoop(writeSelectorState), "rabbitmq-nio-write");
157+
this.readThread = Environment.newThread(threadFactory, new ReadLoop(this.readSelectorState), "rabbitmq-nio-read");
158+
this.writeThread = Environment.newThread(threadFactory, new WriteLoop(this.writeSelectorState), "rabbitmq-nio-write");
85159
readThread.start();
86160
writeThread.start();
87161
} else {
88-
this.executorService.submit(new ReadLoop(readSelectorState));
89-
this.executorService.submit(new WriteLoop(writeSelectorState));
162+
this.readTask = this.executorService.submit(new ReadLoop(this.readSelectorState));
163+
this.writeTask = this.executorService.submit(new WriteLoop(this.writeSelectorState));
90164
}
91165
}
92166

@@ -104,9 +178,9 @@ public void run() {
104178
// FIXME find a better default?
105179
ByteBuffer buffer = ByteBuffer.allocate(8192);
106180
try {
181+
int idlenessCount = 0;
107182
while(true && !Thread.currentThread().isInterrupted()) {
108183

109-
// if there's no read key anymore
110184
for (SelectionKey selectionKey : selector.keys()) {
111185
SocketChannelFrameHandlerState state = (SocketChannelFrameHandlerState) selectionKey.attachment();
112186
// FIXME connection should always be here
@@ -123,18 +197,30 @@ public void run() {
123197
}
124198
}
125199

200+
// FIXME really necessary? key are supposed to be removed when channel is closed
201+
/*
126202
if(!selectionKey.channel().isOpen()) {
127-
// FIXME maybe better to wait for an exception and to trigger AMQConnection shutdown?
128-
// or check if AMQConnection is closed and init shutdown if appropriate
129203
LOGGER.warn("Channel for connection {} closed, removing it from IO thread", state.getConnection());
130204
selectionKey.cancel();
131205
}
206+
*/
132207
}
133208

134209
int select;
135210
if(state.statesToBeRegistered.isEmpty()) {
136211
// we can block, registration will call Selector.wakeup()
137212
select = selector.select(1000);
213+
idlenessCount++;
214+
if(idlenessCount == 10 && selector.keys().size() == 0) {
215+
//if(false) {
216+
// we haven't been doing anything for a while, shutdown state
217+
boolean clean = cleanUp();
218+
if(clean) {
219+
// we stop this thread
220+
return;
221+
}
222+
// there may be incoming connections, keep going
223+
}
138224
} else {
139225
// we don't have to block, we need to select and clean cancelled keys before registration
140226
select = selector.selectNow();
@@ -149,6 +235,7 @@ public void run() {
149235
}
150236

151237
if (select > 0) {
238+
idlenessCount = 0;
152239
Set<SelectionKey> readyKeys = selector.selectedKeys();
153240
Iterator<SelectionKey> iterator = readyKeys.iterator();
154241
while (iterator.hasNext()) {
@@ -165,11 +252,19 @@ public void run() {
165252
Frame frame = Frame.readFrom(channel, buffer);
166253

167254
try {
168-
state.getConnection().handleReadFrame(frame);
255+
boolean noProblem = state.getConnection().handleReadFrame(frame);
256+
if(noProblem && (!state.getConnection().isRunning() || state.getConnection().hasBrokerInitiatedShutdown())) {
257+
// looks like the frame was Close-Ok or Close
258+
dispatchShutdownToConnection(state);
259+
key.cancel();
260+
break;
261+
}
262+
169263
} catch(Throwable ex) {
170264
// problem during frame processing, tell connection, and
171265
// we can stop for this channel
172266
handleIoError(state, ex);
267+
key.cancel();
173268
break;
174269
}
175270

@@ -231,7 +326,13 @@ public void run() {
231326
RegistrationState registration;
232327
while((registration = state.statesToBeRegistered.poll()) != null) {
233328
int operations = registration.operations;
234-
registration.state.getChannel().register(selector, operations, registration.state);
329+
try {
330+
registration.state.getChannel().register(selector, operations, registration.state);
331+
} catch(Exception e) {
332+
// can happen if the channel has been closed since the operation has been enqueued
333+
LOGGER.info("Error while registering socket channel for write: {}", e.getMessage());
334+
}
335+
235336
}
236337

237338
if(select > 0) {
@@ -264,10 +365,11 @@ public void run() {
264365
frame.writeTo(channel, buffer);
265366
written++;
266367
}
368+
Frame.drain(channel, buffer);
267369
} catch(Exception e) {
268370
handleIoError(state, e);
269371
} finally {
270-
Frame.drain(channel, buffer);
372+
buffer.clear();
271373
key.cancel();
272374
}
273375
}
@@ -310,6 +412,22 @@ public void run() {
310412
}
311413
}
312414

415+
protected void dispatchShutdownToConnection(final SocketChannelFrameHandlerState state) {
416+
Runnable shutdown = new Runnable() {
417+
@Override
418+
public void run() {
419+
state.getConnection().doFinalShutdown();
420+
}
421+
};
422+
if(this.executorService == null) {
423+
String name = "rabbitmq-connection-shutdown-" + state.getConnection();
424+
Thread shutdownThread = Environment.newThread(threadFactory, shutdown, name);
425+
shutdownThread.start();
426+
} else {
427+
this.executorService.submit(shutdown);
428+
}
429+
}
430+
313431
public static class RegistrationState {
314432

315433
private final SocketChannelFrameHandlerState state;

0 commit comments

Comments
 (0)