Skip to content

Commit 32a5651

Browse files
author
Simon MacMullen
committed
Merge bug24651
2 parents 9123637 + 0aa2205 commit 32a5651

File tree

2 files changed

+69
-18
lines changed

2 files changed

+69
-18
lines changed

src/com/rabbitmq/client/impl/AMQConnection.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.io.IOException;
2121
import java.net.InetAddress;
2222
import java.net.SocketException;
23+
import java.net.SocketTimeoutException;
2324
import java.util.Collections;
2425
import java.util.HashMap;
2526
import java.util.Map;
@@ -108,6 +109,9 @@ public static final Map<String, Object> defaultClientProperties() {
108109
/** Flag indicating whether the client received Connection.Close message from the broker */
109110
private volatile boolean _brokerInitiatedShutdown;
110111

112+
/** Flag indicating we are still negotiating the connection in start */
113+
private volatile boolean _inConnectionNegotiation;
114+
111115
/** Manages heart-beat sending for this connection */
112116
private final HeartbeatSender _heartbeatSender;
113117

@@ -248,6 +252,8 @@ public AMQConnection(String username,
248252

249253
this._heartbeatSender = new HeartbeatSender(frameHandler);
250254
this._brokerInitiatedShutdown = false;
255+
256+
this._inConnectionNegotiation = true; // we start out waiting for the first protocol response
251257
}
252258

253259
/**
@@ -385,6 +391,9 @@ public void start()
385391
throw AMQChannel.wrap(sse);
386392
}
387393

394+
// We can now respond to errors having finished tailoring the connection
395+
this._inConnectionNegotiation = false;
396+
388397
return;
389398
}
390399

@@ -538,7 +547,11 @@ private class MainLoop extends Thread {
538547
* Called when a frame-read operation times out
539548
* @throws MissedHeartbeatException if heart-beats have been missed
540549
*/
541-
private void handleSocketTimeout() throws MissedHeartbeatException {
550+
private void handleSocketTimeout() throws SocketTimeoutException {
551+
if (_inConnectionNegotiation) {
552+
throw new SocketTimeoutException("Timeout during Connection negotiation");
553+
}
554+
542555
if (_heartbeat == 0) { // No heart-beating
543556
return;
544557
}

test/src/com/rabbitmq/client/test/AMQConnectionTest.java

Lines changed: 55 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,8 @@
4444
public class AMQConnectionTest extends TestCase {
4545
// private static final String CLOSE_MESSAGE = "terminated by test";
4646

47-
/**
48-
* Build a suite of tests
47+
/**
48+
* Build a suite of tests
4949
* @return the test suite for this class
5050
*/
5151
public static TestSuite suite() {
@@ -79,7 +79,7 @@ public static TestSuite suite() {
7979
}
8080

8181
/** Check the AMQConnection does send exactly 1 initial header, and deal correctly with
82-
* the frame handler throwing an exception when we try to read data
82+
* the frame handler throwing an exception when we try to read data
8383
*/
8484
public void testConnectionSendsSingleHeaderAndTimesOut() {
8585
IOException exception = new SocketTimeoutException();
@@ -100,14 +100,14 @@ public void testConnectionSendsSingleHeaderAndTimesOut() {
100100
handler).start();
101101
fail("Connection should have thrown exception");
102102
} catch(IOException signal) {
103-
// As expected
103+
// As expected
104104
}
105105
assertEquals(1, _mockFrameHandler.countHeadersSent());
106106
// _connection.close(0, CLOSE_MESSAGE);
107107
List<Throwable> exceptionList = handler.getHandledExceptions();
108108
assertEquals(Collections.<Throwable>singletonList(exception), exceptionList);
109109
}
110-
110+
111111
/** Check we can open a connection once, but not twice.
112112
* @throws IOException */
113113
// public void testCanOpenConnectionOnceOnly() throws IOException {
@@ -121,18 +121,49 @@ public void testConnectionSendsSingleHeaderAndTimesOut() {
121121
// }
122122
// }
123123

124-
// add test that we time out if no initial Start command is received,
125-
// setting a timeout and having the FrameHandler return null
126-
124+
/**
125+
* Test that we catch timeout between connect and negotiation of the connection being finished.
126+
*/
127+
public void testConnectionHangInNegotiation() {
128+
this._mockFrameHandler.setTimeoutCount(10); // to limit hang
129+
MyExceptionHandler handler = new MyExceptionHandler();
130+
assertEquals(0, this._mockFrameHandler.countHeadersSent());
131+
try {
132+
new AMQConnection(factory.getUsername(),
133+
factory.getPassword(),
134+
this._mockFrameHandler,
135+
Executors.newFixedThreadPool(1),
136+
factory.getVirtualHost(),
137+
factory.getClientProperties(),
138+
factory.getRequestedFrameMax(),
139+
factory.getRequestedChannelMax(),
140+
factory.getRequestedHeartbeat(),
141+
factory.getSaslConfig(),
142+
handler).start();
143+
fail("Connection should have thrown exception");
144+
} catch(IOException signal) {
145+
// As expected
146+
}
147+
assertEquals(1, this._mockFrameHandler.countHeadersSent());
148+
// _connection.close(0, CLOSE_MESSAGE);
149+
List<Throwable> exceptionList = handler.getHandledExceptions();
150+
assertEquals("Only one exception expected", 1, exceptionList.size());
151+
assertEquals("Wrong type of exception returned.", SocketTimeoutException.class, exceptionList.get(0).getClass());
152+
}
153+
127154
/** Mock frame handler to facilitate testing. */
128155
private static class MockFrameHandler implements FrameHandler {
129156
/** How many times has sendHeader() been called? */
130157
private int _numHeadersSent;
131-
158+
159+
private int timeout;
160+
132161
/** An optional exception for us to throw on reading frames */
133162
private IOException _exceptionOnReadingFrames;
134163

135-
/** count how many headers we've sent
164+
private int timeoutCount = 0;
165+
166+
/** count how many headers we've sent
136167
* @return the number of sent headers
137168
*/
138169
public int countHeadersSent() {
@@ -143,20 +174,27 @@ public void setExceptionOnReadingFrames(IOException exception) {
143174
_exceptionOnReadingFrames = exception;
144175
}
145176

177+
public void setTimeoutCount(int timeoutCount) {
178+
this.timeoutCount = timeoutCount;
179+
}
180+
146181
public Frame readFrame() throws IOException {
147182
if (_exceptionOnReadingFrames != null) {
148183
throw _exceptionOnReadingFrames;
149184
}
150-
return null;
151-
// throw new SocketTimeoutException(); // simulate a socket timeout
185+
if (this.timeoutCount > 0) {
186+
if (--this.timeoutCount == 0)
187+
throw new IOException("Mock Framehandler: too many timeouts.");
188+
}
189+
return null; // simulate a socket timeout
152190
}
153191

154192
public void sendHeader() throws IOException {
155-
_numHeadersSent++;
193+
_numHeadersSent++;
156194
}
157195

158196
public void setTimeout(int timeoutMs) throws SocketException {
159-
// no need to implement this: don't bother changing the timeout
197+
this.timeout = timeoutMs;
160198
}
161199

162200
public void writeFrame(Frame frame) throws IOException {
@@ -168,7 +206,7 @@ public void close() {
168206
}
169207

170208
public int getTimeout() throws SocketException {
171-
return 0;
209+
return this.timeout;
172210
}
173211

174212
public InetAddress getAddress() {
@@ -180,7 +218,7 @@ public int getPort() {
180218
}
181219
}
182220

183-
/** Mock frame handler to facilitate testing. */
221+
/** Exception handler to facilitate testing. */
184222
private class MyExceptionHandler implements ExceptionHandler {
185223
private List<Throwable> _handledExceptions = new ArrayList<Throwable>();
186224

@@ -208,7 +246,7 @@ public void handleConsumerException(Channel ch,
208246
{
209247
fail("handleConsumerException " + consumerTag + " " + methodName + ": " + ex);
210248
}
211-
249+
212250
public List<Throwable> getHandledExceptions() {
213251
return _handledExceptions;
214252
}

0 commit comments

Comments
 (0)