-
Notifications
You must be signed in to change notification settings - Fork 583
Add property file-based initialization #329
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 5 commits
Commits
Show all changes
6 commits
Select commit
Hold shift + click to select a range
e9b642f
Add property file-based initialization
acogoluegnes 6a9c1ab
Rename ConnectionFactoryConfigurer to ConnectionFactoryConfigurator
acogoluegnes a5b271d
Prefer connection prefix over requested
acogoluegnes cb03dee
Return connection factory in load methods
acogoluegnes 580677c
Polish Javadoc of ConnectionFactory#load
acogoluegnes c5c0f64
Use connection prefix for auto-recovery keys
acogoluegnes File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
252 changes: 252 additions & 0 deletions
252
src/main/java/com/rabbitmq/client/ConnectionFactoryConfigurator.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,252 @@ | ||
// Copyright (c) 2017-Present Pivotal Software, Inc. All rights reserved. | ||
// | ||
// This software, the RabbitMQ Java client library, is triple-licensed under the | ||
// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2 | ||
// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see | ||
// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL, | ||
// please see LICENSE-APACHE2. | ||
// | ||
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, | ||
// either express or implied. See the LICENSE file for specific language governing | ||
// rights and limitations of this software. | ||
// | ||
// If you have any questions regarding licensing, please contact us at | ||
// [email protected]. | ||
|
||
package com.rabbitmq.client; | ||
|
||
import com.rabbitmq.client.impl.AMQConnection; | ||
import com.rabbitmq.client.impl.nio.NioParams; | ||
|
||
import java.io.BufferedReader; | ||
import java.io.FileReader; | ||
import java.io.IOException; | ||
import java.io.InputStream; | ||
import java.io.Reader; | ||
import java.net.URISyntaxException; | ||
import java.security.KeyManagementException; | ||
import java.security.NoSuchAlgorithmException; | ||
import java.util.HashMap; | ||
import java.util.Map; | ||
import java.util.Properties; | ||
|
||
/** | ||
* Helper class to load {@link ConnectionFactory} settings from a property file. | ||
* | ||
* The authorised keys are the constants values in this class (e.g. USERNAME). | ||
* The property file/properties instance/map instance keys can have | ||
* a prefix, the default being <code>rabbitmq.</code>. | ||
* | ||
* Property files can be loaded from the file system (the default), | ||
* but also from the classpath, by using the <code>classpath:</code> prefix | ||
* in the location. | ||
* | ||
* If default client properties should be set, set the <code>use.default.client.properties</code> | ||
* key to <code>true</code>. Custom client properties can be set by using | ||
* the <code>client.properties.</code>, e.g. <code>client.properties.app.name</code>. | ||
* | ||
* @since 4.4.0 | ||
* @see ConnectionFactory#load(String, String) | ||
*/ | ||
public class ConnectionFactoryConfigurator { | ||
|
||
public static final String DEFAULT_PREFIX = "rabbitmq."; | ||
|
||
public static final String USERNAME = "username"; | ||
public static final String PASSWORD = "password"; | ||
public static final String VIRTUAL_HOST = "virtual.host"; | ||
public static final String HOST = "host"; | ||
public static final String PORT = "port"; | ||
public static final String CONNECTION_CHANNEL_MAX = "connection.channel.max"; | ||
public static final String CONNECTION_FRAME_MAX = "connection.frame.max"; | ||
public static final String CONNECTION_HEARTBEAT = "connection.heartbeat"; | ||
public static final String CONNECTION_TIMEOUT = "connection.timeout"; | ||
public static final String HANDSHAKE_TIMEOUT = "handshake.timeout"; | ||
public static final String SHUTDOWN_TIMEOUT = "shutdown.timeout"; | ||
public static final String USE_DEFAULT_CLIENT_PROPERTIES = "use.default.client.properties"; | ||
public static final String CLIENT_PROPERTIES_PREFIX = "client.properties."; | ||
public static final String AUTOMATIC_RECOVERY_ENABLED = "automatic.recovery.enabled"; | ||
public static final String TOPOLOGY_RECOVERY_ENABLED = "topology.recovery.enabled"; | ||
public static final String NETWORK_RECOVERY_INTERVAL = "network.recovery.interval"; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
public static final String CHANNEL_RPC_TIMEOUT = "channel.rpc.timeout"; | ||
public static final String CHANNEL_SHOULD_CHECK_RPC_RESPONSE_TYPE = "channel.should.check.rpc.response.type"; | ||
public static final String USE_NIO = "use.nio"; | ||
public static final String NIO_READ_BYTE_BUFFER_SIZE = "nio.read.byte.buffer.size"; | ||
public static final String NIO_WRITE_BYTE_BUFFER_SIZE = "nio.write.byte.buffer.size"; | ||
public static final String NIO_NB_IO_THREADS = "nio.nb.io.threads"; | ||
public static final String NIO_WRITE_ENQUEUING_TIMEOUT_IN_MS = "nio.write.enqueuing.timeout.in.ms"; | ||
public static final String NIO_WRITE_QUEUE_CAPACITY = "nio.write.queue.capacity"; | ||
|
||
public static void load(ConnectionFactory cf, String propertyFileLocation, String prefix) throws IOException { | ||
if (propertyFileLocation == null || propertyFileLocation.isEmpty()) { | ||
throw new IllegalArgumentException("Property file argument cannot be null or empty"); | ||
} | ||
Properties properties = new Properties(); | ||
if (propertyFileLocation.startsWith("classpath:")) { | ||
InputStream in = null; | ||
try { | ||
in = ConnectionFactoryConfigurator.class.getResourceAsStream( | ||
propertyFileLocation.substring("classpath:".length()) | ||
); | ||
properties.load(in); | ||
} finally { | ||
if (in != null) { | ||
in.close(); | ||
} | ||
} | ||
} else { | ||
Reader reader = null; | ||
try { | ||
reader = new BufferedReader(new FileReader(propertyFileLocation)); | ||
properties.load(reader); | ||
} finally { | ||
if (reader != null) { | ||
reader.close(); | ||
} | ||
} | ||
} | ||
load(cf, (Map) properties, prefix); | ||
} | ||
|
||
public static void load(ConnectionFactory cf, Map<String, String> properties, String prefix) { | ||
prefix = prefix == null ? "" : prefix; | ||
String uri = properties.get(prefix + "uri"); | ||
if (uri != null) { | ||
try { | ||
cf.setUri(uri); | ||
} catch (URISyntaxException e) { | ||
throw new IllegalArgumentException("Error while setting AMQP URI: "+uri, e); | ||
} catch (NoSuchAlgorithmException e) { | ||
throw new IllegalArgumentException("Error while setting AMQP URI: "+uri, e); | ||
} catch (KeyManagementException e) { | ||
throw new IllegalArgumentException("Error while setting AMQP URI: "+uri, e); | ||
} | ||
} | ||
String username = properties.get(prefix + USERNAME); | ||
if (username != null) { | ||
cf.setUsername(username); | ||
} | ||
String password = properties.get(prefix + PASSWORD); | ||
if (password != null) { | ||
cf.setPassword(password); | ||
} | ||
String vhost = properties.get(prefix + VIRTUAL_HOST); | ||
if (vhost != null) { | ||
cf.setVirtualHost(vhost); | ||
} | ||
String host = properties.get(prefix + HOST); | ||
if (host != null) { | ||
cf.setHost(host); | ||
} | ||
String port = properties.get(prefix + PORT); | ||
if (port != null) { | ||
cf.setPort(Integer.valueOf(port)); | ||
} | ||
String requestedChannelMax = properties.get(prefix + CONNECTION_CHANNEL_MAX); | ||
if (requestedChannelMax != null) { | ||
cf.setRequestedChannelMax(Integer.valueOf(requestedChannelMax)); | ||
} | ||
String requestedFrameMax = properties.get(prefix + CONNECTION_FRAME_MAX); | ||
if (requestedFrameMax != null) { | ||
cf.setRequestedFrameMax(Integer.valueOf(requestedFrameMax)); | ||
} | ||
String requestedHeartbeat = properties.get(prefix + CONNECTION_HEARTBEAT); | ||
if (requestedHeartbeat != null) { | ||
cf.setRequestedHeartbeat(Integer.valueOf(requestedHeartbeat)); | ||
} | ||
String connectionTimeout = properties.get(prefix + CONNECTION_TIMEOUT); | ||
if (connectionTimeout != null) { | ||
cf.setConnectionTimeout(Integer.valueOf(connectionTimeout)); | ||
} | ||
String handshakeTimeout = properties.get(prefix + HANDSHAKE_TIMEOUT); | ||
if (handshakeTimeout != null) { | ||
cf.setHandshakeTimeout(Integer.valueOf(handshakeTimeout)); | ||
} | ||
String shutdownTimeout = properties.get(prefix + SHUTDOWN_TIMEOUT); | ||
if (shutdownTimeout != null) { | ||
cf.setShutdownTimeout(Integer.valueOf(shutdownTimeout)); | ||
} | ||
|
||
Map<String, Object> clientProperties = new HashMap<String, Object>(); | ||
String useDefaultClientProperties = properties.get(prefix + USE_DEFAULT_CLIENT_PROPERTIES); | ||
if (useDefaultClientProperties != null && Boolean.valueOf(useDefaultClientProperties)) { | ||
clientProperties.putAll(AMQConnection.defaultClientProperties()); | ||
} | ||
|
||
for (Map.Entry<String, String> entry : properties.entrySet()) { | ||
if (entry.getKey().startsWith(prefix + CLIENT_PROPERTIES_PREFIX)) { | ||
clientProperties.put( | ||
entry.getKey().substring((prefix + CLIENT_PROPERTIES_PREFIX).length()), | ||
entry.getValue() | ||
); | ||
} | ||
} | ||
cf.setClientProperties(clientProperties); | ||
|
||
String automaticRecovery = properties.get(prefix + AUTOMATIC_RECOVERY_ENABLED); | ||
if (automaticRecovery != null) { | ||
cf.setAutomaticRecoveryEnabled(Boolean.valueOf(automaticRecovery)); | ||
} | ||
String topologyRecovery = properties.get(prefix + TOPOLOGY_RECOVERY_ENABLED); | ||
if (topologyRecovery != null) { | ||
cf.setTopologyRecoveryEnabled(Boolean.getBoolean(topologyRecovery)); | ||
} | ||
String networkRecoveryInterval = properties.get(prefix + NETWORK_RECOVERY_INTERVAL); | ||
if (networkRecoveryInterval != null) { | ||
cf.setNetworkRecoveryInterval(Long.valueOf(networkRecoveryInterval)); | ||
} | ||
String channelRpcTimeout = properties.get(prefix + CHANNEL_RPC_TIMEOUT); | ||
if (channelRpcTimeout != null) { | ||
cf.setChannelRpcTimeout(Integer.valueOf(channelRpcTimeout)); | ||
} | ||
String channelShouldCheckRpcResponseType = properties.get(prefix + CHANNEL_SHOULD_CHECK_RPC_RESPONSE_TYPE); | ||
if (channelShouldCheckRpcResponseType != null) { | ||
cf.setChannelShouldCheckRpcResponseType(Boolean.valueOf(channelShouldCheckRpcResponseType)); | ||
} | ||
|
||
String useNio = properties.get(prefix + USE_NIO); | ||
if (useNio != null && Boolean.valueOf(useNio)) { | ||
cf.useNio(); | ||
|
||
NioParams nioParams = new NioParams(); | ||
|
||
String readByteBufferSize = properties.get(prefix + NIO_READ_BYTE_BUFFER_SIZE); | ||
if (readByteBufferSize != null) { | ||
nioParams.setReadByteBufferSize(Integer.valueOf(readByteBufferSize)); | ||
} | ||
String writeByteBufferSize = properties.get(prefix + NIO_WRITE_BYTE_BUFFER_SIZE); | ||
if (writeByteBufferSize != null) { | ||
nioParams.setWriteByteBufferSize(Integer.valueOf(writeByteBufferSize)); | ||
} | ||
String nbIoThreads = properties.get(prefix + NIO_NB_IO_THREADS); | ||
if (nbIoThreads != null) { | ||
nioParams.setNbIoThreads(Integer.valueOf(nbIoThreads)); | ||
} | ||
String writeEnqueuingTime = properties.get(prefix + NIO_WRITE_ENQUEUING_TIMEOUT_IN_MS); | ||
if (writeEnqueuingTime != null) { | ||
nioParams.setWriteEnqueuingTimeoutInMs(Integer.valueOf(writeEnqueuingTime)); | ||
} | ||
String writeQueueCapacity = properties.get(prefix + NIO_WRITE_QUEUE_CAPACITY); | ||
if (writeQueueCapacity != null) { | ||
nioParams.setWriteQueueCapacity(Integer.valueOf(writeQueueCapacity)); | ||
} | ||
cf.setNioParams(nioParams); | ||
} | ||
} | ||
|
||
public static void load(ConnectionFactory connectionFactory, String propertyFileLocation) throws IOException { | ||
load(connectionFactory, propertyFileLocation, DEFAULT_PREFIX); | ||
} | ||
|
||
public static void load(ConnectionFactory connectionFactory, Properties properties) { | ||
load(connectionFactory, (Map) properties, DEFAULT_PREFIX); | ||
} | ||
|
||
public static void load(ConnectionFactory connectionFactory, Properties properties, String prefix) { | ||
load(connectionFactory, (Map) properties, prefix); | ||
} | ||
|
||
public static void load(ConnectionFactory connectionFactory, Map<String, String> properties) { | ||
load(connectionFactory, properties, DEFAULT_PREFIX); | ||
} | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can probably be
connection.recovery.enabled
. We already havetopology.recovery.enabled
.