Skip to content

Commit 5c080c0

Browse files
committed
Handle heartbeat and shutdown
Limit also written frames to the (snapshot) size when iteration starts (instead of unqueuing during writing). Fixes #11
1 parent fe2ecff commit 5c080c0

File tree

5 files changed

+144
-35
lines changed

5 files changed

+144
-35
lines changed

src/main/java/com/rabbitmq/client/impl/AMQConnection.java

+12
Original file line numberDiff line numberDiff line change
@@ -283,6 +283,7 @@ public void start()
283283

284284
// FIXME properly prepare the connection depending on IO implementation
285285
if(this._frameHandler instanceof SocketChannelFrameHandler) {
286+
// FIXME set the connection earlier
286287
((SocketChannelFrameHandler) _frameHandler).getState().setConnection(this);
287288
} else {
288289
// start the main loop going
@@ -617,6 +618,17 @@ private void readFrame(Frame frame) throws IOException {
617618
}
618619
}
619620

621+
void handleHeartbeatFailure() {
622+
Exception ex = new MissedHeartbeatException("Heartbeat missing with heartbeat = " +
623+
_heartbeat + " seconds");
624+
try {
625+
_exceptionHandler.handleUnexpectedConnectionDriverException(this, ex);
626+
shutdown(null, false, ex, true);
627+
} finally {
628+
doFinalShutdownInRead();
629+
}
630+
}
631+
620632
private void handleFailureInRead(Throwable ex) {
621633
if(ex instanceof EOFException) {
622634
if (!_brokerInitiatedShutdown)

src/main/java/com/rabbitmq/client/impl/Frame.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -296,7 +296,7 @@ public void writeTo(WritableByteChannel writableChannel, ByteBuffer buffer) thro
296296
buffer.put(b);
297297
}
298298

299-
buffer.put((byte) AMQP.FRAME_END);
299+
safePut(writableChannel, buffer, (byte) AMQP.FRAME_END);
300300
} else {
301301
safePut(writableChannel, buffer, (byte) type);
302302
safePut(writableChannel, buffer, (byte) ((channel >>> 8) & 0xFF));
@@ -337,7 +337,7 @@ private void safePut(WritableByteChannel channel, ByteBuffer buffer, byte conten
337337

338338
public static void drain(WritableByteChannel channel, ByteBuffer buffer) throws IOException {
339339
buffer.flip();
340-
while(buffer.hasRemaining() && channel.write(buffer) != 0);
340+
while(buffer.hasRemaining() && channel.write(buffer) != -1);
341341
buffer.clear();
342342
}
343343

src/main/java/com/rabbitmq/client/impl/SocketChannelFrameHandler.java

-3
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,6 @@ public class SocketChannelFrameHandler implements FrameHandler {
3232

3333
private final SocketChannelFrameHandlerState state;
3434

35-
private volatile int readTimeout = ConnectionFactory.DEFAULT_HEARTBEAT * 1000;
36-
3735
public SocketChannelFrameHandler(SocketChannelFrameHandlerState state) {
3836
this.state = state;
3937
}
@@ -61,7 +59,6 @@ public int getPort() {
6159
@Override
6260
public void setTimeout(int timeoutMs) throws SocketException {
6361
state.getChannel().socket().setSoTimeout(timeoutMs);
64-
this.readTimeout = timeoutMs;
6562
}
6663

6764
@Override

src/main/java/com/rabbitmq/client/impl/SocketChannelFrameHandlerFactory.java

+107-26
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,10 @@
3232
import java.util.Iterator;
3333
import java.util.Queue;
3434
import java.util.Set;
35-
import java.util.concurrent.ExecutorService;
36-
import java.util.concurrent.Executors;
37-
import java.util.concurrent.LinkedBlockingQueue;
35+
import java.util.concurrent.*;
36+
import java.util.concurrent.atomic.AtomicLong;
37+
import java.util.concurrent.locks.Lock;
38+
import java.util.concurrent.locks.ReentrantLock;
3839

3940
/**
4041
*
@@ -46,24 +47,35 @@ public class SocketChannelFrameHandlerFactory extends FrameHandlerFactory {
4647
private final SelectorState readSelectorState;
4748
private final SelectorState writeSelectorState;
4849

49-
private final ExecutorService executorService = Executors.newFixedThreadPool(2);
50+
private ExecutorService executorService;
51+
52+
private Future<?> readLoop;
53+
private Future<?> writeLoop;
54+
55+
private Lock loopsLock = new ReentrantLock();
56+
57+
private final ThreadFactory threadFactory = new ThreadFactory() {
58+
59+
AtomicLong counter = new AtomicLong();
60+
61+
@Override
62+
public Thread newThread(Runnable r) {
63+
return new Thread(r, "rabbitmq-nio-"+counter.getAndIncrement());
64+
}
65+
};
5066

5167
public SocketChannelFrameHandlerFactory(int connectionTimeout, SocketFactory factory, SocketConfigurator configurator,
5268
boolean ssl) throws IOException {
5369
super(connectionTimeout, factory, configurator, ssl);
5470
this.readSelectorState = new SelectorState(Selector.open());
5571
this.writeSelectorState = new SelectorState(Selector.open());
56-
this.executorService.submit(new ReadLoop(readSelectorState));
57-
this.executorService.submit(new WriteLoop(writeSelectorState));
5872
}
5973

6074
public SocketChannelFrameHandlerFactory(int connectionTimeout, SocketFactory factory, SocketConfigurator configurator, boolean ssl,
6175
ExecutorService shutdownExecutor) throws IOException {
6276
super(connectionTimeout, factory, configurator, ssl, shutdownExecutor);
6377
this.readSelectorState = new SelectorState(Selector.open());
6478
this.writeSelectorState = new SelectorState(Selector.open());
65-
this.executorService.submit(new ReadLoop(readSelectorState));
66-
this.executorService.submit(new WriteLoop(writeSelectorState));
6779
}
6880

6981
@Override
@@ -79,7 +91,25 @@ public FrameHandler create(Address addr) throws IOException {
7991

8092
SocketChannelFrameHandlerState state = new SocketChannelFrameHandlerState(channel, writeSelectorState);
8193

82-
readSelectorState.registerFrameHandlerState(state, SelectionKey.OP_READ);
94+
loopsLock.lock();
95+
try {
96+
readSelectorState.registerFrameHandlerState(state, SelectionKey.OP_READ);
97+
98+
if(this.executorService == null) {
99+
100+
this.executorService = Executors.newFixedThreadPool(2, threadFactory);
101+
}
102+
if(readLoop == null) {
103+
readLoop = this.executorService.submit(new ReadLoop(readSelectorState));
104+
}
105+
if(writeLoop == null) {
106+
writeLoop = this.executorService.submit(new WriteLoop(writeSelectorState));
107+
}
108+
} finally {
109+
loopsLock.unlock();
110+
}
111+
112+
83113

84114
SocketChannelFrameHandler frameHandler = new SocketChannelFrameHandler(state);
85115
return frameHandler;
@@ -102,7 +132,28 @@ public void registerFrameHandlerState(SocketChannelFrameHandlerState state, int
102132

103133
}
104134

105-
private static class ReadLoop implements Runnable {
135+
private boolean cleanLoopsOrKeepRunning() {
136+
loopsLock.lock();
137+
try {
138+
if(readSelectorState.statesToBeRegistered.isEmpty()) {
139+
boolean cancelled = writeLoop.cancel(true);
140+
if(!cancelled) {
141+
LOGGER.warn("Could not stop write loop");
142+
}
143+
this.writeLoop = null;
144+
this.readLoop = null;
145+
this.executorService.shutdownNow();
146+
return false;
147+
} else {
148+
// looks like someone is trying to connect, keep running
149+
return true;
150+
}
151+
} finally {
152+
loopsLock.unlock();
153+
}
154+
}
155+
156+
private class ReadLoop implements Runnable {
106157

107158
private final SelectorState state;
108159

@@ -117,16 +168,45 @@ public void run() {
117168
ByteBuffer buffer = ByteBuffer.allocate(8192);
118169
try {
119170
while(true) {
171+
172+
// if there's no read key anymore
173+
boolean someoneIsStillReading = false;
174+
for (SelectionKey selectionKey : selector.keys()) {
175+
SocketChannelFrameHandlerState state = (SocketChannelFrameHandlerState) selectionKey.attachment();
176+
// FIXME connection should always be here
177+
if(state.getConnection() != null && state.getConnection().getHeartbeat() > 0) {
178+
long now = System.currentTimeMillis();
179+
if((now - state.getLastActivity()) > state.getConnection().getHeartbeat() * 1000 * 2) {
180+
try {
181+
state.getConnection().handleHeartbeatFailure();
182+
} catch(Exception e) {
183+
LOGGER.warn("Error after heartbeat failure of connection {}", state.getConnection());
184+
} finally {
185+
selectionKey.cancel();
186+
}
187+
}
188+
}
189+
190+
if(!selectionKey.channel().isOpen()) {
191+
selectionKey.cancel();
192+
} else {
193+
someoneIsStillReading = true;
194+
}
195+
}
196+
197+
if(!someoneIsStillReading && state.statesToBeRegistered.isEmpty()) {
198+
boolean keepRunning = cleanLoopsOrKeepRunning();
199+
if(!keepRunning) {
200+
return;
201+
}
202+
}
203+
120204
int select;
121205
if(state.statesToBeRegistered.isEmpty()) {
122206
// we can block, registration will call Selector.wakeup()
123-
124-
// FIXME check the number of keys and stop the read and write loops
125-
// if there's no read key anymore
126-
127-
select = selector.select();
207+
select = selector.select(1000);
128208
} else {
129-
// we cannot block, we need to select and clean cancelled keys before registration
209+
// we don't have to block, we need to select and clean cancelled keys before registration
130210
select = selector.selectNow();
131211
}
132212

@@ -153,11 +233,12 @@ public void run() {
153233
buffer.flip();
154234
while(buffer.hasRemaining()) {
155235
Frame frame = Frame.readFrom(channel, buffer);
236+
156237
// FIXME the connection may not be there yet (to be checked)
157238
boolean handled = state.getConnection().handleReadFrame(frame);
158239

159240
// problem during frame processing, the connection triggered shutdown
160-
// we can stop
241+
// we can stop for this channel
161242
if(!handled) {
162243
break;
163244
}
@@ -169,9 +250,10 @@ public void run() {
169250
}
170251

171252
}
172-
253+
state.setLastActivity(System.currentTimeMillis());
173254
} catch (Exception e) {
174-
LOGGER.warn("Error during reading frames: "+e.getMessage());
255+
LOGGER.warn("Error during reading frames", e);
256+
key.cancel();
175257
} finally {
176258
buffer.clear();
177259
}
@@ -203,7 +285,7 @@ public void run() {
203285
ByteBuffer buffer = ByteBuffer.allocate(8192);
204286

205287
try {
206-
while(true) {
288+
while(true && !Thread.currentThread().isInterrupted()) {
207289
int select;
208290
if(state.statesToBeRegistered.isEmpty()) {
209291
// we can block, registration will call Selector.wakeup()
@@ -228,12 +310,9 @@ public void run() {
228310
SelectionKey key = iterator.next();
229311
iterator.remove();
230312
SocketChannel channel = (SocketChannel) key.channel();
231-
if (key.isConnectable()) {
232-
if (!channel.isConnected()) {
233-
channel.finishConnect();
234-
}
235-
} else if (key.isWritable()) {
313+
if (key.isWritable()) {
236314
SocketChannelFrameHandlerState state = (SocketChannelFrameHandlerState) key.attachment();
315+
int toBeWritten = state.getWriteQueue().size();
237316
// FIXME property handle header sending request
238317
if(state.isSendHeader()) {
239318
buffer.put("AMQP".getBytes("US-ASCII"));
@@ -247,9 +326,11 @@ public void run() {
247326
state.setSendHeader(false);
248327
}
249328

329+
int written = 0;
250330
Frame frame;
251-
while((frame = state.getWriteQueue().poll()) != null) {
331+
while(written <= toBeWritten && (frame = state.getWriteQueue().poll()) != null) {
252332
frame.writeTo(channel, buffer);
333+
written++;
253334
}
254335
Frame.drain(channel, buffer);
255336
}

src/main/java/com/rabbitmq/client/impl/SocketChannelFrameHandlerState.java

+23-4
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,14 @@
33
import org.slf4j.Logger;
44
import org.slf4j.LoggerFactory;
55

6+
import java.io.IOException;
67
import java.nio.channels.SelectionKey;
78
import java.nio.channels.SocketChannel;
89
import java.util.Queue;
910
import java.util.concurrent.ArrayBlockingQueue;
1011
import java.util.concurrent.BlockingQueue;
12+
import java.util.concurrent.LinkedBlockingQueue;
13+
import java.util.concurrent.TimeUnit;
1114

1215
/**
1316
*
@@ -19,12 +22,15 @@ public class SocketChannelFrameHandlerState {
1922
private final SocketChannel channel;
2023

2124
// FIXME find appropriate default for limit in write queue
22-
private final BlockingQueue<Frame> writeQueue = new ArrayBlockingQueue<Frame>(1000);
25+
private final BlockingQueue<Frame> writeQueue = new LinkedBlockingQueue<Frame>(10000);
2326

2427
private volatile AMQConnection connection;
2528

2629
private volatile boolean sendHeader = false;
2730

31+
/** should be used only in the NIO read thread */
32+
private long lastActivity;
33+
2834
private final SocketChannelFrameHandlerFactory.SelectorState selectorState;
2935

3036
public SocketChannelFrameHandlerState(SocketChannel channel, SocketChannelFrameHandlerFactory.SelectorState selectorState) {
@@ -51,10 +57,15 @@ public void setSendHeader(boolean sendHeader) {
5157
}
5258
}
5359

54-
public void write(Frame frame) {
60+
public void write(Frame frame) throws IOException {
5561
try {
56-
this.writeQueue.put(frame);
57-
this.selectorState.registerFrameHandlerState(this, SelectionKey.OP_WRITE);
62+
boolean offered = this.writeQueue.offer(frame, 10, TimeUnit.SECONDS);
63+
if(offered) {
64+
this.selectorState.registerFrameHandlerState(this, SelectionKey.OP_WRITE);
65+
} else {
66+
throw new IOException("Frame enqueuing failed");
67+
}
68+
5869
} catch (InterruptedException e) {
5970
LOGGER.warn("Thread interrupted during enqueuing frame in write queue");
6071
}
@@ -67,4 +78,12 @@ public AMQConnection getConnection() {
6778
public void setConnection(AMQConnection connection) {
6879
this.connection = connection;
6980
}
81+
82+
public void setLastActivity(long lastActivity) {
83+
this.lastActivity = lastActivity;
84+
}
85+
86+
public long getLastActivity() {
87+
return lastActivity;
88+
}
7089
}

0 commit comments

Comments
 (0)