Skip to content

Commit 96b24b3

Browse files
committed
Polish correlation ID supplier support in RPC client
References #637
1 parent ebffa3e commit 96b24b3

File tree

4 files changed

+90
-44
lines changed

4 files changed

+90
-44
lines changed

src/main/java/com/rabbitmq/client/IncrementingCorrelationIdGenerator.java

Lines changed: 0 additions & 22 deletions
This file was deleted.

src/main/java/com/rabbitmq/client/RpcClient.java

Lines changed: 65 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2007-Present Pivotal Software, Inc. All rights reserved.
1+
// Copyright (c) 2007-2020 Pivotal Software, Inc. All rights reserved.
22
//
33
// This software, the RabbitMQ Java client library, is triple-licensed under the
44
// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2
@@ -84,9 +84,16 @@ public class RpcClient {
8484

8585
/** Map from request correlation ID to continuation BlockingCell */
8686
private final Map<String, BlockingCell<Object>> _continuationMap = new HashMap<String, BlockingCell<Object>>();
87-
/** Contains the most recently-used request correlation ID */
87+
88+
/**
89+
* Generates correlation ID for each request.
90+
*
91+
* @since 5.9.0
92+
*/
8893
private final Supplier<String> _correlationIdGenerator;
8994

95+
private String lastCorrelationId = "0";
96+
9097
/** Consumer attached to our reply queue */
9198
private DefaultConsumer _consumer;
9299

@@ -110,7 +117,7 @@ public RpcClient(RpcClientParams params) throws
110117
_timeout = params.getTimeout();
111118
_useMandatory = params.shouldUseMandatory();
112119
_replyHandler = params.getReplyHandler();
113-
_correlationIdGenerator = params.getCorrelationIdGenerator();
120+
_correlationIdGenerator = params.getCorrelationIdSupplier();
114121

115122
_consumer = setupConsumer();
116123
if (_useMandatory) {
@@ -210,6 +217,7 @@ public Response doCall(AMQP.BasicProperties props, byte[] message, int timeout)
210217
String replyId;
211218
synchronized (_continuationMap) {
212219
replyId = _correlationIdGenerator.get();
220+
lastCorrelationId = replyId;
213221
props = ((props==null) ? new AMQP.BasicProperties.Builder() : props.builder())
214222
.correlationId(replyId).replyTo(_replyTo).build();
215223
_continuationMap.put(replyId, k);
@@ -390,16 +398,21 @@ public Map<String, BlockingCell<Object>> getContinuationMap() {
390398
}
391399

392400
/**
393-
* Retrieve the correlation id.
401+
* Retrieve the last correlation id used.
402+
* <p>
403+
* Note as of 5.9.0, correlation IDs may not always be integers
404+
* (by default, they are).
405+
* This method will try to parse the last correlation ID string
406+
* as an integer, so this may result in {@link NumberFormatException}
407+
* if the correlation ID supplier provided by
408+
* {@link RpcClientParams#correlationIdSupplier(Supplier)}
409+
* does not generate appropriate IDs.
410+
*
394411
* @return the most recently used correlation id
395-
* @deprecated Only works for {@link IncrementingCorrelationIdGenerator}
412+
* @see RpcClientParams#correlationIdSupplier(Supplier)
396413
*/
397414
public int getCorrelationId() {
398-
if (_correlationIdGenerator instanceof IncrementingCorrelationIdGenerator) {
399-
return ((IncrementingCorrelationIdGenerator) _correlationIdGenerator).getCorrelationId();
400-
} else {
401-
throw new UnsupportedOperationException();
402-
}
415+
return Integer.valueOf(this.lastCorrelationId);
403416
}
404417

405418
/**
@@ -447,5 +460,47 @@ public byte[] getBody() {
447460
return body;
448461
}
449462
}
463+
464+
/**
465+
* Creates generation IDs as a sequence of integers.
466+
*
467+
* @return
468+
* @see RpcClientParams#correlationIdSupplier(Supplier)
469+
* @since 5.9.0
470+
*/
471+
public static Supplier<String> incrementingCorrelationIdSupplier() {
472+
return incrementingCorrelationIdSupplier("");
473+
}
474+
475+
/**
476+
* Creates generation IDs as a sequence of integers, with the provided prefix.
477+
*
478+
* @param prefix
479+
* @return
480+
* @see RpcClientParams#correlationIdSupplier(Supplier)
481+
* @since 5.9.0
482+
*/
483+
public static Supplier<String> incrementingCorrelationIdSupplier(String prefix) {
484+
return new IncrementingCorrelationIdSupplier(prefix);
485+
}
486+
487+
/**
488+
* @since 5.9.0
489+
*/
490+
private static class IncrementingCorrelationIdSupplier implements Supplier<String> {
491+
492+
private final String prefix;
493+
private int correlationId;
494+
495+
public IncrementingCorrelationIdSupplier(String prefix) {
496+
this.prefix = prefix;
497+
}
498+
499+
@Override
500+
public String get() {
501+
return prefix + ++correlationId;
502+
}
503+
504+
}
450505
}
451506

src/main/java/com/rabbitmq/client/RpcClientParams.java

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2007-Present Pivotal Software, Inc. All rights reserved.
1+
// Copyright (c) 2007-2020 Pivotal Software, Inc. All rights reserved.
22
//
33
// This software, the RabbitMQ Java client library, is triple-licensed under the
44
// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2
@@ -55,7 +55,10 @@ public class RpcClientParams {
5555
*/
5656
private Function<Object, RpcClient.Response> replyHandler = RpcClient.DEFAULT_REPLY_HANDLER;
5757

58-
private Supplier<String> correlationIdGenerator = new IncrementingCorrelationIdGenerator("");
58+
/**
59+
* Logic to generate correlation IDs.
60+
*/
61+
private Supplier<String> correlationIdSupplier = RpcClient.incrementingCorrelationIdSupplier();
5962

6063
/**
6164
* Set the channel to use for communication.
@@ -149,7 +152,7 @@ public RpcClientParams timeout(int timeout) {
149152
*
150153
* @param useMandatory
151154
* @return
152-
* @see #replyHandler(RpcClient.RpcClientReplyHandler)
155+
* @see #replyHandler(Function)
153156
*/
154157
public RpcClientParams useMandatory(boolean useMandatory) {
155158
this.useMandatory = useMandatory;
@@ -173,13 +176,20 @@ public boolean shouldUseMandatory() {
173176
return useMandatory;
174177
}
175178

176-
public RpcClientParams correlationIdGenerator(Supplier<String> correlationIdGenerator) {
177-
this.correlationIdGenerator = correlationIdGenerator;
179+
/**
180+
* Logic to generate correlation IDs.
181+
*
182+
* @param correlationIdGenerator
183+
* @return
184+
* @since 5.9.0
185+
*/
186+
public RpcClientParams correlationIdSupplier(Supplier<String> correlationIdGenerator) {
187+
this.correlationIdSupplier = correlationIdGenerator;
178188
return this;
179189
}
180190

181-
public Supplier<String> getCorrelationIdGenerator() {
182-
return correlationIdGenerator;
191+
public Supplier<String> getCorrelationIdSupplier() {
192+
return correlationIdSupplier;
183193
}
184194

185195
public Function<Object, RpcClient.Response> getReplyHandler() {

src/test/java/com/rabbitmq/client/test/RpcTest.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2017-2019 Pivotal Software, Inc. All rights reserved.
1+
// Copyright (c) 2017-2020 Pivotal Software, Inc. All rights reserved.
22
//
33
// This software, the RabbitMQ Java client library, is triple-licensed under the
44
// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2
@@ -24,7 +24,7 @@
2424
import com.rabbitmq.client.impl.recovery.RecordedQueue;
2525
import com.rabbitmq.client.impl.recovery.TopologyRecoveryFilter;
2626
import com.rabbitmq.tools.Host;
27-
import org.hamcrest.CoreMatchers;
27+
import org.assertj.core.api.Assertions;
2828
import org.junit.After;
2929
import org.junit.Before;
3030
import org.junit.Test;
@@ -86,6 +86,9 @@ public void rpc() throws Exception {
8686
assertEquals("*** hello ***", new String(response.getBody()));
8787
assertEquals("pre-hello", response.getProperties().getHeaders().get("pre").toString());
8888
assertEquals("post-hello", response.getProperties().getHeaders().get("post").toString());
89+
90+
Assertions.assertThat(client.getCorrelationId()).isEqualTo(Integer.valueOf(response.getProperties().getCorrelationId()));
91+
8992
client.close();
9093
}
9194

@@ -138,7 +141,7 @@ public void rpcUnroutableWithMandatoryFlagShouldThrowUnroutableException() throw
138141
}
139142

140143
@Test
141-
public void rpcCustomCorrelatorId() throws Exception {
144+
public void rpcCustomCorrelationId() throws Exception {
142145
rpcServer = new TestRpcServer(serverChannel, queue);
143146
new Thread(() -> {
144147
try {
@@ -149,10 +152,10 @@ public void rpcCustomCorrelatorId() throws Exception {
149152
}).start();
150153
RpcClient client = new RpcClient(new RpcClientParams()
151154
.channel(clientChannel).exchange("").routingKey(queue).timeout(1000)
152-
.correlationIdGenerator(new IncrementingCorrelationIdGenerator("myPrefix-"))
155+
.correlationIdSupplier(RpcClient.incrementingCorrelationIdSupplier("myPrefix-"))
153156
);
154157
RpcClient.Response response = client.doCall(null, "hello".getBytes());
155-
assertThat(response.getProperties().getCorrelationId(), CoreMatchers.equalTo("myPrefix-0"));
158+
Assertions.assertThat(response.getProperties().getCorrelationId()).isEqualTo("myPrefix-1");
156159
client.close();
157160
}
158161

0 commit comments

Comments
 (0)