Skip to content

Commit 177c4ea

Browse files
committed
Polish correlation ID supplier support in RPC client
References #637 (cherry picked from commit 96b24b3)
1 parent 5ee2a77 commit 177c4ea

File tree

4 files changed

+90
-44
lines changed

4 files changed

+90
-44
lines changed

src/main/java/com/rabbitmq/client/IncrementingCorrelationIdGenerator.java

Lines changed: 0 additions & 22 deletions
This file was deleted.

src/main/java/com/rabbitmq/client/RpcClient.java

Lines changed: 65 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2007-Present Pivotal Software, Inc. All rights reserved.
1+
// Copyright (c) 2007-2020 Pivotal Software, Inc. All rights reserved.
22
//
33
// This software, the RabbitMQ Java client library, is triple-licensed under the
44
// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2
@@ -84,9 +84,16 @@ public class RpcClient {
8484

8585
/** Map from request correlation ID to continuation BlockingCell */
8686
private final Map<String, BlockingCell<Object>> _continuationMap = new HashMap<String, BlockingCell<Object>>();
87-
/** Contains the most recently-used request correlation ID */
87+
88+
/**
89+
* Generates correlation ID for each request.
90+
*
91+
* @since 5.9.0
92+
*/
8893
private final Supplier<String> _correlationIdGenerator;
8994

95+
private String lastCorrelationId = "0";
96+
9097
/** Consumer attached to our reply queue */
9198
private DefaultConsumer _consumer;
9299

@@ -110,7 +117,7 @@ public RpcClient(RpcClientParams params) throws
110117
_timeout = params.getTimeout();
111118
_useMandatory = params.shouldUseMandatory();
112119
_replyHandler = params.getReplyHandler();
113-
_correlationIdGenerator = params.getCorrelationIdGenerator();
120+
_correlationIdGenerator = params.getCorrelationIdSupplier();
114121

115122
_consumer = setupConsumer();
116123
if (_useMandatory) {
@@ -295,6 +302,7 @@ public Response doCall(AMQP.BasicProperties props, byte[] message, int timeout)
295302
String replyId;
296303
synchronized (_continuationMap) {
297304
replyId = _correlationIdGenerator.get();
305+
lastCorrelationId = replyId;
298306
props = ((props==null) ? new AMQP.BasicProperties.Builder() : props.builder())
299307
.correlationId(replyId).replyTo(_replyTo).build();
300308
_continuationMap.put(replyId, k);
@@ -475,16 +483,21 @@ public Map<String, BlockingCell<Object>> getContinuationMap() {
475483
}
476484

477485
/**
478-
* Retrieve the correlation id.
486+
* Retrieve the last correlation id used.
487+
* <p>
488+
* Note as of 5.9.0, correlation IDs may not always be integers
489+
* (by default, they are).
490+
* This method will try to parse the last correlation ID string
491+
* as an integer, so this may result in {@link NumberFormatException}
492+
* if the correlation ID supplier provided by
493+
* {@link RpcClientParams#correlationIdSupplier(Supplier)}
494+
* does not generate appropriate IDs.
495+
*
479496
* @return the most recently used correlation id
480-
* @deprecated Only works for {@link IncrementingCorrelationIdGenerator}
497+
* @see RpcClientParams#correlationIdSupplier(Supplier)
481498
*/
482499
public int getCorrelationId() {
483-
if (_correlationIdGenerator instanceof IncrementingCorrelationIdGenerator) {
484-
return ((IncrementingCorrelationIdGenerator) _correlationIdGenerator).getCorrelationId();
485-
} else {
486-
throw new UnsupportedOperationException();
487-
}
500+
return Integer.valueOf(this.lastCorrelationId);
488501
}
489502

490503
/**
@@ -532,5 +545,47 @@ public byte[] getBody() {
532545
return body;
533546
}
534547
}
548+
549+
/**
550+
* Creates generation IDs as a sequence of integers.
551+
*
552+
* @return
553+
* @see RpcClientParams#correlationIdSupplier(Supplier)
554+
* @since 5.9.0
555+
*/
556+
public static Supplier<String> incrementingCorrelationIdSupplier() {
557+
return incrementingCorrelationIdSupplier("");
558+
}
559+
560+
/**
561+
* Creates generation IDs as a sequence of integers, with the provided prefix.
562+
*
563+
* @param prefix
564+
* @return
565+
* @see RpcClientParams#correlationIdSupplier(Supplier)
566+
* @since 5.9.0
567+
*/
568+
public static Supplier<String> incrementingCorrelationIdSupplier(String prefix) {
569+
return new IncrementingCorrelationIdSupplier(prefix);
570+
}
571+
572+
/**
573+
* @since 5.9.0
574+
*/
575+
private static class IncrementingCorrelationIdSupplier implements Supplier<String> {
576+
577+
private final String prefix;
578+
private int correlationId;
579+
580+
public IncrementingCorrelationIdSupplier(String prefix) {
581+
this.prefix = prefix;
582+
}
583+
584+
@Override
585+
public String get() {
586+
return prefix + ++correlationId;
587+
}
588+
589+
}
535590
}
536591

src/main/java/com/rabbitmq/client/RpcClientParams.java

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2007-Present Pivotal Software, Inc. All rights reserved.
1+
// Copyright (c) 2007-2020 Pivotal Software, Inc. All rights reserved.
22
//
33
// This software, the RabbitMQ Java client library, is triple-licensed under the
44
// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2
@@ -55,7 +55,10 @@ public class RpcClientParams {
5555
*/
5656
private Function<Object, RpcClient.Response> replyHandler = RpcClient.DEFAULT_REPLY_HANDLER;
5757

58-
private Supplier<String> correlationIdGenerator = new IncrementingCorrelationIdGenerator("");
58+
/**
59+
* Logic to generate correlation IDs.
60+
*/
61+
private Supplier<String> correlationIdSupplier = RpcClient.incrementingCorrelationIdSupplier();
5962

6063
/**
6164
* Set the channel to use for communication.
@@ -149,7 +152,7 @@ public RpcClientParams timeout(int timeout) {
149152
*
150153
* @param useMandatory
151154
* @return
152-
* @see #replyHandler(RpcClient.RpcClientReplyHandler)
155+
* @see #replyHandler(Function)
153156
*/
154157
public RpcClientParams useMandatory(boolean useMandatory) {
155158
this.useMandatory = useMandatory;
@@ -173,13 +176,20 @@ public boolean shouldUseMandatory() {
173176
return useMandatory;
174177
}
175178

176-
public RpcClientParams correlationIdGenerator(Supplier<String> correlationIdGenerator) {
177-
this.correlationIdGenerator = correlationIdGenerator;
179+
/**
180+
* Logic to generate correlation IDs.
181+
*
182+
* @param correlationIdGenerator
183+
* @return
184+
* @since 5.9.0
185+
*/
186+
public RpcClientParams correlationIdSupplier(Supplier<String> correlationIdGenerator) {
187+
this.correlationIdSupplier = correlationIdGenerator;
178188
return this;
179189
}
180190

181-
public Supplier<String> getCorrelationIdGenerator() {
182-
return correlationIdGenerator;
191+
public Supplier<String> getCorrelationIdSupplier() {
192+
return correlationIdSupplier;
183193
}
184194

185195
public Function<Object, RpcClient.Response> getReplyHandler() {

src/test/java/com/rabbitmq/client/test/RpcTest.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2017-2019 Pivotal Software, Inc. All rights reserved.
1+
// Copyright (c) 2017-2020 Pivotal Software, Inc. All rights reserved.
22
//
33
// This software, the RabbitMQ Java client library, is triple-licensed under the
44
// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2
@@ -24,7 +24,7 @@
2424
import com.rabbitmq.client.impl.recovery.RecordedQueue;
2525
import com.rabbitmq.client.impl.recovery.TopologyRecoveryFilter;
2626
import com.rabbitmq.tools.Host;
27-
import org.hamcrest.CoreMatchers;
27+
import org.assertj.core.api.Assertions;
2828
import org.junit.After;
2929
import org.junit.Before;
3030
import org.junit.Test;
@@ -86,6 +86,9 @@ public void rpc() throws Exception {
8686
assertEquals("*** hello ***", new String(response.getBody()));
8787
assertEquals("pre-hello", response.getProperties().getHeaders().get("pre").toString());
8888
assertEquals("post-hello", response.getProperties().getHeaders().get("post").toString());
89+
90+
Assertions.assertThat(client.getCorrelationId()).isEqualTo(Integer.valueOf(response.getProperties().getCorrelationId()));
91+
8992
client.close();
9093
}
9194

@@ -138,7 +141,7 @@ public void rpcUnroutableWithMandatoryFlagShouldThrowUnroutableException() throw
138141
}
139142

140143
@Test
141-
public void rpcCustomCorrelatorId() throws Exception {
144+
public void rpcCustomCorrelationId() throws Exception {
142145
rpcServer = new TestRpcServer(serverChannel, queue);
143146
new Thread(() -> {
144147
try {
@@ -149,10 +152,10 @@ public void rpcCustomCorrelatorId() throws Exception {
149152
}).start();
150153
RpcClient client = new RpcClient(new RpcClientParams()
151154
.channel(clientChannel).exchange("").routingKey(queue).timeout(1000)
152-
.correlationIdGenerator(new IncrementingCorrelationIdGenerator("myPrefix-"))
155+
.correlationIdSupplier(RpcClient.incrementingCorrelationIdSupplier("myPrefix-"))
153156
);
154157
RpcClient.Response response = client.doCall(null, "hello".getBytes());
155-
assertThat(response.getProperties().getCorrelationId(), CoreMatchers.equalTo("myPrefix-0"));
158+
Assertions.assertThat(response.getProperties().getCorrelationId()).isEqualTo("myPrefix-1");
156159
client.close();
157160
}
158161

0 commit comments

Comments
 (0)