Skip to content

Commit 996c2ee

Browse files
author
Emile Joubert
committed
Merged bug24545 into default
2 parents 82492e1 + ebd1719 commit 996c2ee

26 files changed

+249
-199
lines changed

src/com/rabbitmq/client/ConnectionFactory.java

Lines changed: 48 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import java.security.NoSuchAlgorithmException;
2222
import java.util.Map;
2323
import java.util.concurrent.ExecutorService;
24-
import java.util.concurrent.Executors;
2524

2625
import java.net.InetSocketAddress;
2726
import java.net.Socket;
@@ -43,39 +42,40 @@
4342
*/
4443

4544
public class ConnectionFactory implements Cloneable {
46-
45+
4746
/** Default Executor threads */
48-
private static final int DEFAULT_NUM_CONSUMER_THREADS = 5;
47+
@Deprecated
48+
public static final int DEFAULT_NUM_CONSUMER_THREADS = 5;
4949
/** Default user name */
50-
private static final String DEFAULT_USER = "guest";
50+
public static final String DEFAULT_USER = "guest";
5151
/** Default password */
52-
private static final String DEFAULT_PASS = "guest";
52+
public static final String DEFAULT_PASS = "guest";
5353
/** Default virtual host */
54-
private static final String DEFAULT_VHOST = "/";
54+
public static final String DEFAULT_VHOST = "/";
5555
/** Default maximum channel number;
5656
* zero for unlimited */
57-
private static final int DEFAULT_CHANNEL_MAX = 0;
57+
public static final int DEFAULT_CHANNEL_MAX = 0;
5858
/** Default maximum frame size;
5959
* zero means no limit */
60-
private static final int DEFAULT_FRAME_MAX = 0;
60+
public static final int DEFAULT_FRAME_MAX = 0;
6161
/** Default heart-beat interval;
6262
* zero means no heart-beats */
63-
private static final int DEFAULT_HEARTBEAT = 0;
63+
public static final int DEFAULT_HEARTBEAT = 0;
6464
/** The default host */
65-
private static final String DEFAULT_HOST = "localhost";
65+
public static final String DEFAULT_HOST = "localhost";
6666
/** 'Use the default port' port */
67-
private static final int USE_DEFAULT_PORT = -1;
67+
public static final int USE_DEFAULT_PORT = -1;
6868
/** The default non-ssl port */
69-
private static final int DEFAULT_AMQP_PORT = AMQP.PROTOCOL.PORT;
69+
public static final int DEFAULT_AMQP_PORT = AMQP.PROTOCOL.PORT;
7070
/** The default ssl port */
71-
private static final int DEFAULT_AMQP_OVER_SSL_PORT = 5671;
71+
public static final int DEFAULT_AMQP_OVER_SSL_PORT = 5671;
7272
/** The default connection timeout;
7373
* zero means wait indefinitely */
74-
private static final int DEFAULT_CONNECTION_TIMEOUT = 0;
74+
public static final int DEFAULT_CONNECTION_TIMEOUT = 0;
75+
7576
/** The default SSL protocol */
7677
private static final String DEFAULT_SSL_PROTOCOL = "SSLv3";
7778

78-
private int numConsumerThreads = DEFAULT_NUM_CONSUMER_THREADS;
7979
private String username = DEFAULT_USER;
8080
private String password = DEFAULT_PASS;
8181
private String virtualHost = DEFAULT_VHOST;
@@ -90,13 +90,15 @@ public class ConnectionFactory implements Cloneable {
9090
private SaslConfig saslConfig = DefaultSaslConfig.PLAIN;
9191

9292
/** @return number of consumer threads in default {@link ExecutorService} */
93+
@Deprecated
9394
public int getNumConsumerThreads() {
94-
return numConsumerThreads;
95+
return DEFAULT_NUM_CONSUMER_THREADS;
9596
}
9697

9798
/** @param numConsumerThreads threads in created private executor service */
99+
@Deprecated
98100
public void setNumConsumerThreads(int numConsumerThreads) {
99-
this.numConsumerThreads = numConsumerThreads;
101+
throw new IllegalArgumentException("setNumConsumerThreads not supported -- create explicit ExecutorService instead.");
100102
}
101103

102104
/** @return the default host to use for connections */
@@ -436,10 +438,22 @@ protected FrameHandler createFrameHandler(Address addr)
436438

437439
String hostName = addr.getHost();
438440
int portNumber = portOrDefault(addr.getPort());
439-
Socket socket = factory.createSocket();
440-
configureSocket(socket);
441-
socket.connect(new InetSocketAddress(hostName, portNumber), connectionTimeout);
442-
return createFrameHandler(socket);
441+
Socket socket = null;
442+
try {
443+
socket = factory.createSocket();
444+
configureSocket(socket);
445+
socket.connect(new InetSocketAddress(hostName, portNumber),
446+
connectionTimeout);
447+
return createFrameHandler(socket);
448+
} catch (IOException ioe) {
449+
quietTrySocketClose(socket);
450+
throw ioe;
451+
}
452+
}
453+
454+
private static void quietTrySocketClose(Socket socket) {
455+
if (socket != null)
456+
try { socket.close(); } catch (Exception _) {/*ignore exceptions*/}
443457
}
444458

445459
protected FrameHandler createFrameHandler(Socket sock)
@@ -464,21 +478,31 @@ protected void configureSocket(Socket socket) throws IOException{
464478
socket.setTcpNoDelay(true);
465479
}
466480

481+
/**
482+
* Create a new broker connection
483+
* @param addrs an array of known broker addresses (hostname/port pairs) to try in order
484+
* @return an interface to the connection
485+
* @throws IOException if it encounters a problem
486+
*/
487+
public Connection newConnection(Address[] addrs) throws IOException {
488+
return newConnection(null, addrs);
489+
}
490+
467491
/**
468492
* Create a new broker connection
469493
* @param executor thread execution service for consumers on the connection
470494
* @param addrs an array of known broker addresses (hostname/port pairs) to try in order
471495
* @return an interface to the connection
472496
* @throws IOException if it encounters a problem
473497
*/
474-
private Connection newConnection(ExecutorService executor, Address[] addrs)
498+
public Connection newConnection(ExecutorService executor, Address[] addrs)
475499
throws IOException
476500
{
477501
IOException lastException = null;
478502
for (Address addr : addrs) {
479503
try {
480504
FrameHandler frameHandler = createFrameHandler(addr);
481-
AMQConnection conn =
505+
AMQConnection conn =
482506
new AMQConnection(username,
483507
password,
484508
frameHandler,
@@ -506,7 +530,7 @@ private Connection newConnection(ExecutorService executor, Address[] addrs)
506530
* @throws IOException if it encounters a problem
507531
*/
508532
public Connection newConnection() throws IOException {
509-
return newConnection(Executors.newFixedThreadPool(this.numConsumerThreads),
533+
return newConnection(null,
510534
new Address[] {new Address(getHost(), getPort())}
511535
);
512536
}

src/com/rabbitmq/client/impl/AMQChannel.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -99,9 +99,7 @@ public void handleFrame(Frame frame) throws IOException {
9999
* @return the wrapped exception
100100
*/
101101
public static IOException wrap(ShutdownSignalException ex) {
102-
IOException ioe = new IOException();
103-
ioe.initCause(ex);
104-
return ioe;
102+
return wrap(ex, null);
105103
}
106104

107105
public static IOException wrap(ShutdownSignalException ex, String message) {
@@ -153,13 +151,17 @@ public void handleCompleteInboundCommand(AMQCommand command) throws IOException
153151
public void enqueueRpc(RpcContinuation k)
154152
{
155153
synchronized (_channelMutex) {
154+
boolean waitClearedInterruptStatus = false;
156155
while (_activeRpc != null) {
157156
try {
158157
_channelMutex.wait();
159158
} catch (InterruptedException e) {
160-
Thread.currentThread().interrupt();
159+
waitClearedInterruptStatus = true;
161160
}
162161
}
162+
if (waitClearedInterruptStatus) {
163+
Thread.currentThread().interrupt();
164+
}
163165
_activeRpc = k;
164166
}
165167
}

0 commit comments

Comments
 (0)