Skip to content

Commit 4362048

Browse files
Merge branch 'master' into rabbitmq-java-client-215
Conflicts: src/test/java/com/rabbitmq/client/test/ClientTests.java
2 parents d9f5aec + ce3a04c commit 4362048

16 files changed

+512
-49
lines changed

src/main/java/com/rabbitmq/client/Channel.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@
5555
* @see <a href="http://www.rabbitmq.com/getstarted.html">RabbitMQ tutorials</a>
5656
* @see <a href="http://www.rabbitmq.com/api-guide.html">RabbitMQ Java Client User Guide</a>
5757
*/
58-
public interface Channel extends ShutdownNotifier {
58+
public interface Channel extends ShutdownNotifier, AutoCloseable {
5959
/**
6060
* Retrieve this channel's channel number.
6161
* @return the channel number
@@ -74,6 +74,7 @@ public interface Channel extends ShutdownNotifier {
7474
*
7575
* @throws java.io.IOException if an error is encountered
7676
*/
77+
@Override
7778
void close() throws IOException, TimeoutException;
7879

7980
/**

src/main/java/com/rabbitmq/client/ConnectionFactory.java

Lines changed: 63 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,11 @@
3636
import static java.util.concurrent.TimeUnit.*;
3737

3838
/**
39-
* Convenience "factory" class to facilitate opening a {@link Connection} to an AMQP broker.
39+
* Convenience factory class to facilitate opening a {@link Connection} to a RabbitMQ node.
40+
*
41+
* Most connection and socket settings are configured using this factory.
42+
* Some settings that apply to connections can also be configured here
43+
* and will apply to all connections produced by this factory.
4044
*/
4145

4246
public class ConnectionFactory implements Cloneable {
@@ -94,7 +98,7 @@ public class ConnectionFactory implements Cloneable {
9498
private int handshakeTimeout = DEFAULT_HANDSHAKE_TIMEOUT;
9599
private int shutdownTimeout = DEFAULT_SHUTDOWN_TIMEOUT;
96100
private Map<String, Object> _clientProperties = AMQConnection.defaultClientProperties();
97-
private SocketFactory factory = SocketFactory.getDefault();
101+
private SocketFactory socketFactory = null;
98102
private SaslConfig saslConfig = DefaultSaslConfig.PLAIN;
99103
private ExecutorService sharedExecutor;
100104
private ThreadFactory threadFactory = Executors.defaultThreadFactory();
@@ -119,7 +123,7 @@ public class ConnectionFactory implements Cloneable {
119123
private FrameHandlerFactory frameHandlerFactory;
120124
private NioParams nioParams = new NioParams();
121125

122-
private SSLContext sslContext;
126+
private SslContextFactory sslContextFactory;
123127

124128
/**
125129
* Continuation timeout on RPC calls.
@@ -442,18 +446,19 @@ public void setSaslConfig(SaslConfig saslConfig) {
442446
* Retrieve the socket factory used to make connections with.
443447
*/
444448
public SocketFactory getSocketFactory() {
445-
return this.factory;
449+
return this.socketFactory;
446450
}
447451

448452
/**
449-
* Set the socket factory used to make connections with. Can be
450-
* used to enable SSL connections by passing in a
453+
* Set the socket factory used to create sockets for new connections. Can be
454+
* used to customize TLS-related settings by passing in a
451455
* javax.net.ssl.SSLSocketFactory instance.
452-
*
456+
* Note this applies only to blocking IO, not to
457+
* NIO, as the NIO API doesn't use the SocketFactory API.
453458
* @see #useSslProtocol
454459
*/
455460
public void setSocketFactory(SocketFactory factory) {
456-
this.factory = factory;
461+
this.socketFactory = factory;
457462
}
458463

459464
/**
@@ -556,7 +561,7 @@ public void setExceptionHandler(ExceptionHandler exceptionHandler) {
556561
}
557562

558563
public boolean isSSL(){
559-
return getSocketFactory() instanceof SSLSocketFactory;
564+
return getSocketFactory() instanceof SSLSocketFactory || sslContextFactory != null;
560565
}
561566

562567
/**
@@ -572,6 +577,10 @@ public void useSslProtocol()
572577
/**
573578
* Convenience method for setting up a SSL socket factory/engine, using
574579
* the supplied protocol and a very trusting TrustManager.
580+
* The produced {@link SSLContext} instance will be shared by all
581+
* the connections created by this connection factory. Use
582+
* {@link #setSslContextFactory(SslContextFactory)} for more flexibility.
583+
* @see #setSslContextFactory(SslContextFactory)
575584
*/
576585
public void useSslProtocol(String protocol)
577586
throws NoSuchAlgorithmException, KeyManagementException
@@ -582,8 +591,11 @@ public void useSslProtocol(String protocol)
582591
/**
583592
* Convenience method for setting up an SSL socket factory/engine.
584593
* Pass in the SSL protocol to use, e.g. "TLSv1" or "TLSv1.2".
585-
*
594+
* The produced {@link SSLContext} instance will be shared with all
595+
* the connections created by this connection factory. Use
596+
* {@link #setSslContextFactory(SslContextFactory)} for more flexibility.
586597
* @param protocol SSL protocol to use.
598+
* @see #setSslContextFactory(SslContextFactory)
587599
*/
588600
public void useSslProtocol(String protocol, TrustManager trustManager)
589601
throws NoSuchAlgorithmException, KeyManagementException
@@ -594,14 +606,17 @@ public void useSslProtocol(String protocol, TrustManager trustManager)
594606
}
595607

596608
/**
597-
* Convenience method for setting up an SSL socket factory/engine.
609+
* Convenience method for setting up an SSL socket socketFactory/engine.
598610
* Pass in an initialized SSLContext.
599-
*
611+
* The {@link SSLContext} instance will be shared with all
612+
* the connections created by this connection factory. Use
613+
* {@link #setSslContextFactory(SslContextFactory)} for more flexibility.
600614
* @param context An initialized SSLContext
615+
* @see #setSslContextFactory(SslContextFactory)
601616
*/
602617
public void useSslProtocol(SSLContext context) {
618+
this.sslContextFactory = name -> context;
603619
setSocketFactory(context.getSocketFactory());
604-
this.sslContext = context;
605620
}
606621

607622
public static String computeDefaultTlsProcotol(String[] supportedProtocols) {
@@ -667,11 +682,11 @@ protected synchronized FrameHandlerFactory createFrameHandlerFactory() throws IO
667682
if(this.nioParams.getNioExecutor() == null && this.nioParams.getThreadFactory() == null) {
668683
this.nioParams.setThreadFactory(getThreadFactory());
669684
}
670-
this.frameHandlerFactory = new SocketChannelFrameHandlerFactory(connectionTimeout, nioParams, isSSL(), sslContext);
685+
this.frameHandlerFactory = new SocketChannelFrameHandlerFactory(connectionTimeout, nioParams, isSSL(), sslContextFactory);
671686
}
672687
return this.frameHandlerFactory;
673688
} else {
674-
return new SocketFrameHandlerFactory(connectionTimeout, factory, socketConf, isSSL(), this.shutdownExecutor);
689+
return new SocketFrameHandlerFactory(connectionTimeout, socketFactory, socketConf, isSSL(), this.shutdownExecutor, sslContextFactory);
675690
}
676691

677692
}
@@ -912,19 +927,28 @@ public Connection newConnection(ExecutorService executor, AddressResolver addres
912927
return conn;
913928
} else {
914929
List<Address> addrs = addressResolver.getAddresses();
915-
IOException lastException = null;
930+
Exception lastException = null;
916931
for (Address addr : addrs) {
917932
try {
918-
FrameHandler handler = fhFactory.create(addr);
919-
AMQConnection conn = new AMQConnection(params, handler, metricsCollector);
933+
FrameHandler handler = fhFactory.create(addr, clientProvidedName);
934+
AMQConnection conn = createConnection(params, handler, metricsCollector);
920935
conn.start();
921936
this.metricsCollector.newConnection(conn);
922937
return conn;
923938
} catch (IOException e) {
924939
lastException = e;
940+
} catch (TimeoutException te) {
941+
lastException = te;
942+
}
943+
}
944+
if (lastException != null) {
945+
if (lastException instanceof IOException) {
946+
throw (IOException) lastException;
947+
} else if (lastException instanceof TimeoutException) {
948+
throw (TimeoutException) lastException;
925949
}
926950
}
927-
throw (lastException != null) ? lastException : new IOException("failed to connect");
951+
throw new IOException("failed to connect");
928952
}
929953
}
930954

@@ -952,6 +976,10 @@ public ConnectionParams params(ExecutorService consumerWorkServiceExecutor) {
952976
return result;
953977
}
954978

979+
protected AMQConnection createConnection(ConnectionParams params, FrameHandler frameHandler, MetricsCollector metricsCollector) {
980+
return new AMQConnection(params, frameHandler, metricsCollector);
981+
}
982+
955983
/**
956984
* Create a new broker connection.
957985
*
@@ -1111,4 +1139,20 @@ public void setChannelRpcTimeout(int channelRpcTimeout) {
11111139
public int getChannelRpcTimeout() {
11121140
return channelRpcTimeout;
11131141
}
1142+
1143+
/**
1144+
* The factory to create SSL contexts.
1145+
* This provides more flexibility to create {@link SSLContext}s
1146+
* for different connections than sharing the {@link SSLContext}
1147+
* with all the connections produced by the connection factory
1148+
* (which is the case with the {@link #useSslProtocol()} methods).
1149+
* This way, different connections with a different certificate
1150+
* for each of them is a possible scenario.
1151+
* @param sslContextFactory
1152+
* @see #useSslProtocol(SSLContext)
1153+
* @since 5.0.0
1154+
*/
1155+
public void setSslContextFactory(SslContextFactory sslContextFactory) {
1156+
this.sslContextFactory = sslContextFactory;
1157+
}
11141158
}

src/main/java/com/rabbitmq/client/RpcServer.java

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -148,9 +148,11 @@ public void processRequest(Delivery request)
148148
String replyTo = requestProperties.getReplyTo();
149149
if (correlationId != null && replyTo != null)
150150
{
151-
AMQP.BasicProperties replyProperties
152-
= new AMQP.BasicProperties.Builder().correlationId(correlationId).build();
151+
AMQP.BasicProperties.Builder replyPropertiesBuilder
152+
= new AMQP.BasicProperties.Builder().correlationId(correlationId);
153+
AMQP.BasicProperties replyProperties = preprocessReplyProperties(request, replyPropertiesBuilder);
153154
byte[] replyBody = handleCall(request, replyProperties);
155+
replyProperties = postprocessReplyProperties(request, replyProperties.builder());
154156
_channel.basicPublish("", replyTo, replyProperties, replyBody);
155157
} else {
156158
handleCast(request);
@@ -191,6 +193,27 @@ public byte[] handleCall(byte[] requestBody,
191193
return new byte[0];
192194
}
193195

196+
/**
197+
* Gives a chance to set/modify reply properties before handling call.
198+
* Note the correlationId property is already set.
199+
* @param request the inbound message
200+
* @param builder the reply properties builder
201+
* @return the properties to pass in to the handling call
202+
*/
203+
protected AMQP.BasicProperties preprocessReplyProperties(Delivery request, AMQP.BasicProperties.Builder builder) {
204+
return builder.build();
205+
}
206+
207+
/**
208+
* Gives a chance to set/modify reply properties after the handling call
209+
* @param request the inbound message
210+
* @param builder the reply properties builder
211+
* @return the properties to pass in to the response message
212+
*/
213+
protected AMQP.BasicProperties postprocessReplyProperties(Delivery request, AMQP.BasicProperties.Builder builder) {
214+
return builder.build();
215+
}
216+
194217
/**
195218
* Lowest-level handler method. Calls
196219
* handleCast(AMQP.BasicProperties,byte[]).
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
// Copyright (c) 2017-Present Pivotal Software, Inc. All rights reserved.
2+
//
3+
// This software, the RabbitMQ Java client library, is triple-licensed under the
4+
// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2
5+
// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see
6+
// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL,
7+
// please see LICENSE-APACHE2.
8+
//
9+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
10+
// either express or implied. See the LICENSE file for specific language governing
11+
// rights and limitations of this software.
12+
//
13+
// If you have any questions regarding licensing, please contact us at
14+
15+
16+
package com.rabbitmq.client;
17+
18+
import javax.net.ssl.SSLContext;
19+
20+
/**
21+
* A factory to create {@link SSLContext}s.
22+
*
23+
* @see ConnectionFactory#setSslContextFactory(SslContextFactory)
24+
* @since 5.0.0
25+
*/
26+
public interface SslContextFactory {
27+
28+
/**
29+
* Create a {@link SSLContext} for a given name.
30+
* The name is typically the name of the connection.
31+
* @param name name of the connection the SSLContext is used for
32+
* @return the SSLContext for this name
33+
* @see ConnectionFactory#newConnection(String)
34+
*/
35+
SSLContext create(String name);
36+
37+
}

src/main/java/com/rabbitmq/client/impl/FrameHandlerFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,6 @@
99
*/
1010
public interface FrameHandlerFactory {
1111

12-
FrameHandler create(Address addr) throws IOException;
12+
FrameHandler create(Address addr, String connectionName) throws IOException;
1313

1414
}

src/main/java/com/rabbitmq/client/impl/SocketFrameHandlerFactory.java

Lines changed: 30 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import com.rabbitmq.client.Address;
1919
import com.rabbitmq.client.ConnectionFactory;
2020
import com.rabbitmq.client.SocketConfigurator;
21+
import com.rabbitmq.client.SslContextFactory;
2122

2223
import javax.net.SocketFactory;
2324
import java.io.IOException;
@@ -27,25 +28,34 @@
2728

2829
public class SocketFrameHandlerFactory extends AbstractFrameHandlerFactory {
2930

30-
private final SocketFactory factory;
31+
private final SocketFactory socketFactory;
3132
private final ExecutorService shutdownExecutor;
33+
private final SslContextFactory sslContextFactory;
3234

33-
public SocketFrameHandlerFactory(int connectionTimeout, SocketFactory factory, SocketConfigurator configurator, boolean ssl) {
34-
this(connectionTimeout, factory, configurator, ssl, null);
35+
public SocketFrameHandlerFactory(int connectionTimeout, SocketFactory socketFactory, SocketConfigurator configurator,
36+
boolean ssl) {
37+
this(connectionTimeout, socketFactory, configurator, ssl, null);
3538
}
3639

37-
public SocketFrameHandlerFactory(int connectionTimeout, SocketFactory factory, SocketConfigurator configurator, boolean ssl, ExecutorService shutdownExecutor) {
40+
public SocketFrameHandlerFactory(int connectionTimeout, SocketFactory socketFactory, SocketConfigurator configurator,
41+
boolean ssl, ExecutorService shutdownExecutor) {
42+
this(connectionTimeout, socketFactory, configurator, ssl, shutdownExecutor, null);
43+
}
44+
45+
public SocketFrameHandlerFactory(int connectionTimeout, SocketFactory socketFactory, SocketConfigurator configurator,
46+
boolean ssl, ExecutorService shutdownExecutor, SslContextFactory sslContextFactory) {
3847
super(connectionTimeout, configurator, ssl);
39-
this.factory = factory;
48+
this.socketFactory = socketFactory;
4049
this.shutdownExecutor = shutdownExecutor;
50+
this.sslContextFactory = sslContextFactory;
4151
}
4252

43-
public FrameHandler create(Address addr) throws IOException {
53+
public FrameHandler create(Address addr, String connectionName) throws IOException {
4454
String hostName = addr.getHost();
4555
int portNumber = ConnectionFactory.portOrDefault(addr.getPort(), ssl);
4656
Socket socket = null;
4757
try {
48-
socket = factory.createSocket();
58+
socket = createSocket(connectionName);
4959
configurator.configure(socket);
5060
socket.connect(new InetSocketAddress(hostName, portNumber),
5161
connectionTimeout);
@@ -56,6 +66,19 @@ public FrameHandler create(Address addr) throws IOException {
5666
}
5767
}
5868

69+
protected Socket createSocket(String connectionName) throws IOException {
70+
// SocketFactory takes precedence if specified
71+
if (socketFactory != null) {
72+
return socketFactory.createSocket();
73+
} else {
74+
if (ssl) {
75+
return sslContextFactory.create(connectionName).getSocketFactory().createSocket();
76+
} else {
77+
return SocketFactory.getDefault().createSocket();
78+
}
79+
}
80+
}
81+
5982
public FrameHandler create(Socket sock) throws IOException
6083
{
6184
return new SocketFrameHandler(sock, this.shutdownExecutor);

0 commit comments

Comments
 (0)