Skip to content

Commit d9108ec

Browse files
committed
Handle URI query parameters in chain of responsibility
With fallback hook, by default empty. References #672 (cherry picked from commit b3edb1e)
1 parent 7401a4b commit d9108ec

File tree

2 files changed

+68
-33
lines changed

2 files changed

+68
-33
lines changed

src/main/java/com/rabbitmq/client/ConnectionFactory.java

+51-32
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
import org.slf4j.Logger;
2525
import org.slf4j.LoggerFactory;
2626

27+
import java.util.Map.Entry;
28+
import java.util.function.BiConsumer;
2729
import javax.net.SocketFactory;
2830
import javax.net.ssl.SSLContext;
2931
import javax.net.ssl.SSLSocketFactory;
@@ -386,6 +388,36 @@ private static String uriDecode(String s) {
386388
}
387389
}
388390

391+
private static final Map<String, BiConsumer<String, ConnectionFactory>> URI_QUERY_PARAMETER_HANDLERS =
392+
new HashMap<String, BiConsumer<String, ConnectionFactory>>() {
393+
{
394+
put("heartbeat", (value, cf) -> {
395+
try {
396+
int heartbeatInt = Integer.parseInt(value);
397+
cf.setRequestedHeartbeat(heartbeatInt);
398+
} catch (NumberFormatException e) {
399+
throw new IllegalArgumentException("Requested heartbeat must an integer");
400+
}
401+
});
402+
put("connection_timeout", (value, cf) -> {
403+
try {
404+
int connectionTimeoutInt = Integer.parseInt(value);
405+
cf.setConnectionTimeout(connectionTimeoutInt);
406+
} catch (NumberFormatException e) {
407+
throw new IllegalArgumentException("TCP connection timeout must an integer");
408+
}
409+
});
410+
put("channel_max", (value, cf) -> {
411+
try {
412+
int channelMaxInt = Integer.parseInt(value);
413+
cf.setRequestedChannelMax(channelMaxInt);
414+
} catch (NumberFormatException e) {
415+
throw new IllegalArgumentException("Requested channel max must an integer");
416+
}
417+
});
418+
}
419+
};
420+
389421
/**
390422
* Convenience method for setting some fields from query parameters
391423
* Will handle only a subset of the query parameters supported by the
@@ -395,7 +427,6 @@ private static String uriDecode(String s) {
395427
*/
396428
private void setQuery(String rawQuery) {
397429
Map<String, String> parameters = new HashMap<>();
398-
399430
// parsing the query parameters
400431
try {
401432
for (String param : rawQuery.split("&")) {
@@ -408,43 +439,31 @@ private void setQuery(String rawQuery) {
408439
parameters.put(key, value);
409440
}
410441
} catch (IOException e) {
411-
throw new RuntimeException("Cannot parse the query parameters", e);
442+
throw new IllegalArgumentException("Cannot parse the query parameters", e);
412443
}
413444

414-
// heartbeat
415-
String heartbeat = parameters.get("heartbeat");
416-
if (heartbeat != null) {
417-
try {
418-
int heartbeatInt = Integer.parseInt(heartbeat);
419-
setRequestedHeartbeat(heartbeatInt);
420-
} catch (NumberFormatException e) {
421-
throw new IllegalArgumentException("Requested heartbeat must an integer");
422-
}
423-
}
424-
425-
// connection_timeout
426-
String connectionTimeout = parameters.get("connection_timeout");
427-
if (connectionTimeout != null) {
428-
try {
429-
int connectionTimeoutInt = Integer.parseInt(connectionTimeout);
430-
setConnectionTimeout(connectionTimeoutInt);
431-
} catch (NumberFormatException e) {
432-
throw new IllegalArgumentException("TCP connection timeout must an integer");
433-
}
434-
}
435-
436-
// channel_max
437-
String channelMax = parameters.get("channel_max");
438-
if (channelMax != null) {
439-
try {
440-
int channelMaxInt = Integer.parseInt(channelMax);
441-
setRequestedChannelMax(channelMaxInt);
442-
} catch (NumberFormatException e) {
443-
throw new IllegalArgumentException("Requested channel max must an integer");
445+
for (Entry<String, String> entry : parameters.entrySet()) {
446+
BiConsumer<String, ConnectionFactory> handler = URI_QUERY_PARAMETER_HANDLERS
447+
.get(entry.getKey());
448+
if (handler != null) {
449+
handler.accept(entry.getValue(), this);
450+
} else {
451+
processUriQueryParameter(entry.getKey(), entry.getValue());
444452
}
445453
}
446454
}
447455

456+
/**
457+
* Hook to process query parameters not handled natively.
458+
* Handled natively: <code>heartbeat</code>, <code>connection_timeout</code>,
459+
* <code>channel_max</code>.
460+
* @param key
461+
* @param value
462+
*/
463+
protected void processUriQueryParameter(String key, String value) {
464+
465+
}
466+
448467
/**
449468
* Retrieve the requested maximum channel number
450469
* @return the initially requested maximum channel number; zero for unlimited

src/test/java/com/rabbitmq/client/test/AmqpUriTest.java

+17-1
Original file line numberDiff line numberDiff line change
@@ -14,18 +14,21 @@
1414
1515
package com.rabbitmq.client.test;
1616

17+
import static org.assertj.core.api.Assertions.assertThat;
1718
import static org.junit.Assert.assertEquals;
1819
import static org.junit.Assert.fail;
1920

2021
import java.net.URISyntaxException;
2122
import java.security.KeyManagementException;
2223
import java.security.NoSuchAlgorithmException;
2324

25+
import java.util.HashMap;
26+
import java.util.Map;
2427
import org.junit.Test;
2528

2629
import com.rabbitmq.client.ConnectionFactory;
2730

28-
public class AmqpUriTest extends BrokerTestCase
31+
public class AmqpUriTest
2932
{
3033
@Test public void uriParsing()
3134
throws URISyntaxException, NoSuchAlgorithmException, KeyManagementException
@@ -109,6 +112,19 @@ public class AmqpUriTest extends BrokerTestCase
109112
parseFail("amqp://user:pass@host:10000/vhost?heartbeat=342?connection_timeout=442");
110113
}
111114

115+
@Test
116+
public void processUriQueryParameterShouldBeCalledForNotHandledParameter() throws Exception {
117+
Map<String, String> processedParameters = new HashMap<>();
118+
ConnectionFactory cf = new ConnectionFactory() {
119+
@Override
120+
protected void processUriQueryParameter(String key, String value) {
121+
processedParameters.put(key, value);
122+
}
123+
};
124+
cf.setUri("amqp://user:pass@host:10000/vhost?heartbeat=60&key=value");
125+
assertThat(processedParameters).hasSize(1).containsEntry("key", "value");
126+
}
127+
112128
private void parseSuccess(String uri, String user, String password,
113129
String host, int port, String vhost, boolean secured)
114130
throws URISyntaxException, NoSuchAlgorithmException, KeyManagementException

0 commit comments

Comments
 (0)