Skip to content

Add property file-based initialization #329

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Nov 10, 2017
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 88 additions & 0 deletions src/main/java/com/rabbitmq/client/ConnectionFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -1059,6 +1059,86 @@ protected AddressResolver createAddressResolver(List<Address> addresses) {
}
}

/**
* Load settings from a property file.
* Keys must be prefixed with <code>rabbitmq.</code>,
* use {@link ConnectionFactory#load(String, String)} to
* specify your own prefix.
* @param propertyFileLocation location of the property file to use
* @throws IOException when something goes wrong reading the file
* @since 4.4.0
* @see ConnectionFactoryConfigurator
*/
public ConnectionFactory load(String propertyFileLocation) throws IOException {
ConnectionFactoryConfigurator.load(this, propertyFileLocation);
return this;
}

/**
* Load settings from a property file.
* @param propertyFileLocation location of the property file to use
* @param prefix key prefix for the entries in the file
* @throws IOException when something goes wrong reading the file
* @since 4.4.0
* @see ConnectionFactoryConfigurator
*/
public ConnectionFactory load(String propertyFileLocation, String prefix) throws IOException {
ConnectionFactoryConfigurator.load(this, propertyFileLocation, prefix);
return this;
}

/**
* Load settings from a {@link Properties} instance.
* Keys must be prefixed with <code>rabbitmq.</code>,
* use {@link ConnectionFactory#load(Properties, String)} to
* specify your own prefix.
* @param properties source for settings
* @since 4.4.0
* @see ConnectionFactoryConfigurator
*/
public ConnectionFactory load(Properties properties) {
ConnectionFactoryConfigurator.load(this, properties);
return this;
}

/**
* Load settings from a {@link Properties} instance.
* @param properties source for settings
* @param prefix key prefix for properties entries
* @since 4.4.0
* @see ConnectionFactoryConfigurator
*/
public ConnectionFactory load(Properties properties, String prefix) {
ConnectionFactoryConfigurator.load(this, (Map) properties, prefix);
return this;
}

/**
* Load settings from a {@link Map} instance.
* Keys must be prefixed with <code>rabbitmq.</code>,
* use {@link ConnectionFactory#load(Map, String)} to
* specify your own prefix.
* @param properties source for settings
* @since 4.4.0
* @see ConnectionFactoryConfigurator
*/
public ConnectionFactory load(Map<String, String> properties) {
ConnectionFactoryConfigurator.load(this, properties);
return this;
}

/**
* Load settings from a {@link Map} instance.
* @param properties source for settings
* @param prefix key prefix for map entries
* @since 4.4.0
* @see ConnectionFactoryConfigurator
*/
public ConnectionFactory load(Map<String, String> properties, String prefix) {
ConnectionFactoryConfigurator.load(this, properties, prefix);
return this;
}

/**
* Returns automatic connection recovery interval in milliseconds.
* @return how long will automatic recovery wait before attempting to reconnect, in ms; default is 5000
Expand Down Expand Up @@ -1118,6 +1198,14 @@ public void setNioParams(NioParams nioParams) {
this.nioParams = nioParams;
}

/**
* Retrieve the parameters for NIO mode.
* @return
*/
public NioParams getNioParams() {
return nioParams;
}

/**
* Use non-blocking IO (NIO) for communication with the server.
* With NIO, several connections created from the same {@link ConnectionFactory}
Expand Down
252 changes: 252 additions & 0 deletions src/main/java/com/rabbitmq/client/ConnectionFactoryConfigurator.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,252 @@
// Copyright (c) 2017-Present Pivotal Software, Inc. All rights reserved.
//
// This software, the RabbitMQ Java client library, is triple-licensed under the
// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2
// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see
// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL,
// please see LICENSE-APACHE2.
//
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
// either express or implied. See the LICENSE file for specific language governing
// rights and limitations of this software.
//
// If you have any questions regarding licensing, please contact us at
// [email protected].

package com.rabbitmq.client;

import com.rabbitmq.client.impl.AMQConnection;
import com.rabbitmq.client.impl.nio.NioParams;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.Reader;
import java.net.URISyntaxException;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

/**
* Helper class to load {@link ConnectionFactory} settings from a property file.
*
* The authorised keys are the constants values in this class (e.g. USERNAME).
* The property file/properties instance/map instance keys can have
* a prefix, the default being <code>rabbitmq.</code>.
*
* Property files can be loaded from the file system (the default),
* but also from the classpath, by using the <code>classpath:</code> prefix
* in the location.
*
* If default client properties should be set, set the <code>use.default.client.properties</code>
* key to <code>true</code>. Custom client properties can be set by using
* the <code>client.properties.</code>, e.g. <code>client.properties.app.name</code>.
*
* @since 4.4.0
* @see ConnectionFactory#load(String, String)
*/
public class ConnectionFactoryConfigurator {

public static final String DEFAULT_PREFIX = "rabbitmq.";

public static final String USERNAME = "username";
public static final String PASSWORD = "password";
public static final String VIRTUAL_HOST = "virtual.host";
public static final String HOST = "host";
public static final String PORT = "port";
public static final String CONNECTION_CHANNEL_MAX = "connection.channel.max";
public static final String CONNECTION_FRAME_MAX = "connection.frame.max";
public static final String CONNECTION_HEARTBEAT = "connection.heartbeat";
public static final String CONNECTION_TIMEOUT = "connection.timeout";
public static final String HANDSHAKE_TIMEOUT = "handshake.timeout";
public static final String SHUTDOWN_TIMEOUT = "shutdown.timeout";
public static final String USE_DEFAULT_CLIENT_PROPERTIES = "use.default.client.properties";
public static final String CLIENT_PROPERTIES_PREFIX = "client.properties.";
public static final String AUTOMATIC_RECOVERY_ENABLED = "automatic.recovery.enabled";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can probably be connection.recovery.enabled. We already have topology.recovery.enabled.

public static final String TOPOLOGY_RECOVERY_ENABLED = "topology.recovery.enabled";
public static final String NETWORK_RECOVERY_INTERVAL = "network.recovery.interval";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

connection.recovery.interval?

public static final String CHANNEL_RPC_TIMEOUT = "channel.rpc.timeout";
public static final String CHANNEL_SHOULD_CHECK_RPC_RESPONSE_TYPE = "channel.should.check.rpc.response.type";
public static final String USE_NIO = "use.nio";
public static final String NIO_READ_BYTE_BUFFER_SIZE = "nio.read.byte.buffer.size";
public static final String NIO_WRITE_BYTE_BUFFER_SIZE = "nio.write.byte.buffer.size";
public static final String NIO_NB_IO_THREADS = "nio.nb.io.threads";
public static final String NIO_WRITE_ENQUEUING_TIMEOUT_IN_MS = "nio.write.enqueuing.timeout.in.ms";
public static final String NIO_WRITE_QUEUE_CAPACITY = "nio.write.queue.capacity";

public static void load(ConnectionFactory cf, String propertyFileLocation, String prefix) throws IOException {
if (propertyFileLocation == null || propertyFileLocation.isEmpty()) {
throw new IllegalArgumentException("Property file argument cannot be null or empty");
}
Properties properties = new Properties();
if (propertyFileLocation.startsWith("classpath:")) {
InputStream in = null;
try {
in = ConnectionFactoryConfigurator.class.getResourceAsStream(
propertyFileLocation.substring("classpath:".length())
);
properties.load(in);
} finally {
if (in != null) {
in.close();
}
}
} else {
Reader reader = null;
try {
reader = new BufferedReader(new FileReader(propertyFileLocation));
properties.load(reader);
} finally {
if (reader != null) {
reader.close();
}
}
}
load(cf, (Map) properties, prefix);
}

public static void load(ConnectionFactory cf, Map<String, String> properties, String prefix) {
prefix = prefix == null ? "" : prefix;
String uri = properties.get(prefix + "uri");
if (uri != null) {
try {
cf.setUri(uri);
} catch (URISyntaxException e) {
throw new IllegalArgumentException("Error while setting AMQP URI: "+uri, e);
} catch (NoSuchAlgorithmException e) {
throw new IllegalArgumentException("Error while setting AMQP URI: "+uri, e);
} catch (KeyManagementException e) {
throw new IllegalArgumentException("Error while setting AMQP URI: "+uri, e);
}
}
String username = properties.get(prefix + USERNAME);
if (username != null) {
cf.setUsername(username);
}
String password = properties.get(prefix + PASSWORD);
if (password != null) {
cf.setPassword(password);
}
String vhost = properties.get(prefix + VIRTUAL_HOST);
if (vhost != null) {
cf.setVirtualHost(vhost);
}
String host = properties.get(prefix + HOST);
if (host != null) {
cf.setHost(host);
}
String port = properties.get(prefix + PORT);
if (port != null) {
cf.setPort(Integer.valueOf(port));
}
String requestedChannelMax = properties.get(prefix + CONNECTION_CHANNEL_MAX);
if (requestedChannelMax != null) {
cf.setRequestedChannelMax(Integer.valueOf(requestedChannelMax));
}
String requestedFrameMax = properties.get(prefix + CONNECTION_FRAME_MAX);
if (requestedFrameMax != null) {
cf.setRequestedFrameMax(Integer.valueOf(requestedFrameMax));
}
String requestedHeartbeat = properties.get(prefix + CONNECTION_HEARTBEAT);
if (requestedHeartbeat != null) {
cf.setRequestedHeartbeat(Integer.valueOf(requestedHeartbeat));
}
String connectionTimeout = properties.get(prefix + CONNECTION_TIMEOUT);
if (connectionTimeout != null) {
cf.setConnectionTimeout(Integer.valueOf(connectionTimeout));
}
String handshakeTimeout = properties.get(prefix + HANDSHAKE_TIMEOUT);
if (handshakeTimeout != null) {
cf.setHandshakeTimeout(Integer.valueOf(handshakeTimeout));
}
String shutdownTimeout = properties.get(prefix + SHUTDOWN_TIMEOUT);
if (shutdownTimeout != null) {
cf.setShutdownTimeout(Integer.valueOf(shutdownTimeout));
}

Map<String, Object> clientProperties = new HashMap<String, Object>();
String useDefaultClientProperties = properties.get(prefix + USE_DEFAULT_CLIENT_PROPERTIES);
if (useDefaultClientProperties != null && Boolean.valueOf(useDefaultClientProperties)) {
clientProperties.putAll(AMQConnection.defaultClientProperties());
}

for (Map.Entry<String, String> entry : properties.entrySet()) {
if (entry.getKey().startsWith(prefix + CLIENT_PROPERTIES_PREFIX)) {
clientProperties.put(
entry.getKey().substring((prefix + CLIENT_PROPERTIES_PREFIX).length()),
entry.getValue()
);
}
}
cf.setClientProperties(clientProperties);

String automaticRecovery = properties.get(prefix + AUTOMATIC_RECOVERY_ENABLED);
if (automaticRecovery != null) {
cf.setAutomaticRecoveryEnabled(Boolean.valueOf(automaticRecovery));
}
String topologyRecovery = properties.get(prefix + TOPOLOGY_RECOVERY_ENABLED);
if (topologyRecovery != null) {
cf.setTopologyRecoveryEnabled(Boolean.getBoolean(topologyRecovery));
}
String networkRecoveryInterval = properties.get(prefix + NETWORK_RECOVERY_INTERVAL);
if (networkRecoveryInterval != null) {
cf.setNetworkRecoveryInterval(Long.valueOf(networkRecoveryInterval));
}
String channelRpcTimeout = properties.get(prefix + CHANNEL_RPC_TIMEOUT);
if (channelRpcTimeout != null) {
cf.setChannelRpcTimeout(Integer.valueOf(channelRpcTimeout));
}
String channelShouldCheckRpcResponseType = properties.get(prefix + CHANNEL_SHOULD_CHECK_RPC_RESPONSE_TYPE);
if (channelShouldCheckRpcResponseType != null) {
cf.setChannelShouldCheckRpcResponseType(Boolean.valueOf(channelShouldCheckRpcResponseType));
}

String useNio = properties.get(prefix + USE_NIO);
if (useNio != null && Boolean.valueOf(useNio)) {
cf.useNio();

NioParams nioParams = new NioParams();

String readByteBufferSize = properties.get(prefix + NIO_READ_BYTE_BUFFER_SIZE);
if (readByteBufferSize != null) {
nioParams.setReadByteBufferSize(Integer.valueOf(readByteBufferSize));
}
String writeByteBufferSize = properties.get(prefix + NIO_WRITE_BYTE_BUFFER_SIZE);
if (writeByteBufferSize != null) {
nioParams.setWriteByteBufferSize(Integer.valueOf(writeByteBufferSize));
}
String nbIoThreads = properties.get(prefix + NIO_NB_IO_THREADS);
if (nbIoThreads != null) {
nioParams.setNbIoThreads(Integer.valueOf(nbIoThreads));
}
String writeEnqueuingTime = properties.get(prefix + NIO_WRITE_ENQUEUING_TIMEOUT_IN_MS);
if (writeEnqueuingTime != null) {
nioParams.setWriteEnqueuingTimeoutInMs(Integer.valueOf(writeEnqueuingTime));
}
String writeQueueCapacity = properties.get(prefix + NIO_WRITE_QUEUE_CAPACITY);
if (writeQueueCapacity != null) {
nioParams.setWriteQueueCapacity(Integer.valueOf(writeQueueCapacity));
}
cf.setNioParams(nioParams);
}
}

public static void load(ConnectionFactory connectionFactory, String propertyFileLocation) throws IOException {
load(connectionFactory, propertyFileLocation, DEFAULT_PREFIX);
}

public static void load(ConnectionFactory connectionFactory, Properties properties) {
load(connectionFactory, (Map) properties, DEFAULT_PREFIX);
}

public static void load(ConnectionFactory connectionFactory, Properties properties, String prefix) {
load(connectionFactory, (Map) properties, prefix);
}

public static void load(ConnectionFactory connectionFactory, Map<String, String> properties) {
load(connectionFactory, properties, DEFAULT_PREFIX);
}
}
3 changes: 2 additions & 1 deletion src/test/java/com/rabbitmq/client/test/ClientTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@
ConnectionFactoryTest.class,
RecoveryAwareAMQConnectionFactoryTest.class,
RpcTest.class,
RecoveryDelayHandlerTest.class
RecoveryDelayHandlerTest.class,
PropertyFileInitialisationTest.class
})
public class ClientTests {

Expand Down
Loading