Skip to content

Commit 80052a7

Browse files
committed
Polish correlation ID supplier support in RPC client
References #637 (cherry picked from commit 96b24b3) Conflicts: src/main/java/com/rabbitmq/client/RpcClient.java src/main/java/com/rabbitmq/client/RpcClientParams.java src/test/java/com/rabbitmq/client/test/RpcTest.java
1 parent a264044 commit 80052a7

File tree

4 files changed

+87
-52
lines changed

4 files changed

+87
-52
lines changed

src/main/java/com/rabbitmq/client/IncrementingCorrelationIdGenerator.java

Lines changed: 0 additions & 22 deletions
This file was deleted.

src/main/java/com/rabbitmq/client/RpcClient.java

Lines changed: 65 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2007-Present Pivotal Software, Inc. All rights reserved.
1+
// Copyright (c) 2007-2020 Pivotal Software, Inc. All rights reserved.
22
//
33
// This software, the RabbitMQ Java client library, is triple-licensed under the
44
// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2
@@ -81,17 +81,20 @@ public Response handle(Object reply) {
8181
}
8282
};
8383

84-
<<<<<<< HEAD
8584
private final RpcClientReplyHandler _replyHandler;
86-
=======
87-
private final Function<Object, Response> _replyHandler;
88-
>>>>>>> 102cbbde1... 637: Default generator should not be static
8985

9086
/** Map from request correlation ID to continuation BlockingCell */
9187
private final Map<String, BlockingCell<Object>> _continuationMap = new HashMap<String, BlockingCell<Object>>();
92-
/** Contains the most recently-used request correlation ID */
88+
89+
/**
90+
* Generates correlation ID for each request.
91+
*
92+
* @since 5.9.0
93+
*/
9394
private final Supplier<String> _correlationIdGenerator;
9495

96+
private String lastCorrelationId = "0";
97+
9598
/** Consumer attached to our reply queue */
9699
private DefaultConsumer _consumer;
97100

@@ -115,7 +118,7 @@ public RpcClient(RpcClientParams params) throws
115118
_timeout = params.getTimeout();
116119
_useMandatory = params.shouldUseMandatory();
117120
_replyHandler = params.getReplyHandler();
118-
_correlationIdGenerator = params.getCorrelationIdGenerator();
121+
_correlationIdGenerator = params.getCorrelationIdSupplier();
119122

120123
_consumer = setupConsumer();
121124
if (_useMandatory) {
@@ -302,6 +305,7 @@ public Response doCall(AMQP.BasicProperties props, byte[] message, int timeout)
302305
String replyId;
303306
synchronized (_continuationMap) {
304307
replyId = _correlationIdGenerator.get();
308+
lastCorrelationId = replyId;
305309
props = ((props==null) ? new AMQP.BasicProperties.Builder() : props.builder())
306310
.correlationId(replyId).replyTo(_replyTo).build();
307311
_continuationMap.put(replyId, k);
@@ -482,16 +486,21 @@ public Map<String, BlockingCell<Object>> getContinuationMap() {
482486
}
483487

484488
/**
485-
* Retrieve the correlation id.
489+
* Retrieve the last correlation id used.
490+
* <p>
491+
* Note as of 5.9.0, correlation IDs may not always be integers
492+
* (by default, they are).
493+
* This method will try to parse the last correlation ID string
494+
* as an integer, so this may result in {@link NumberFormatException}
495+
* if the correlation ID supplier provided by
496+
* {@link RpcClientParams#correlationIdSupplier(Supplier)}
497+
* does not generate appropriate IDs.
498+
*
486499
* @return the most recently used correlation id
487-
* @deprecated Only works for {@link IncrementingCorrelationIdGenerator}
500+
* @see RpcClientParams#correlationIdSupplier(Supplier)
488501
*/
489502
public int getCorrelationId() {
490-
if (_correlationIdGenerator instanceof IncrementingCorrelationIdGenerator) {
491-
return ((IncrementingCorrelationIdGenerator) _correlationIdGenerator).getCorrelationId();
492-
} else {
493-
throw new UnsupportedOperationException();
494-
}
503+
return Integer.valueOf(this.lastCorrelationId);
495504
}
496505

497506
/**
@@ -559,5 +568,47 @@ public interface RpcClientReplyHandler {
559568
Response handle(Object reply);
560569

561570
}
571+
572+
/**
573+
* Creates generation IDs as a sequence of integers.
574+
*
575+
* @return
576+
* @see RpcClientParams#correlationIdSupplier(Supplier)
577+
* @since 5.9.0
578+
*/
579+
public static Supplier<String> incrementingCorrelationIdSupplier() {
580+
return incrementingCorrelationIdSupplier("");
581+
}
582+
583+
/**
584+
* Creates generation IDs as a sequence of integers, with the provided prefix.
585+
*
586+
* @param prefix
587+
* @return
588+
* @see RpcClientParams#correlationIdSupplier(Supplier)
589+
* @since 5.9.0
590+
*/
591+
public static Supplier<String> incrementingCorrelationIdSupplier(String prefix) {
592+
return new IncrementingCorrelationIdSupplier(prefix);
593+
}
594+
595+
/**
596+
* @since 5.9.0
597+
*/
598+
private static class IncrementingCorrelationIdSupplier implements Supplier<String> {
599+
600+
private final String prefix;
601+
private int correlationId;
602+
603+
public IncrementingCorrelationIdSupplier(String prefix) {
604+
this.prefix = prefix;
605+
}
606+
607+
@Override
608+
public String get() {
609+
return prefix + ++correlationId;
610+
}
611+
612+
}
562613
}
563614

src/main/java/com/rabbitmq/client/RpcClientParams.java

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2007-Present Pivotal Software, Inc. All rights reserved.
1+
// Copyright (c) 2007-2020 Pivotal Software, Inc. All rights reserved.
22
//
33
// This software, the RabbitMQ Java client library, is triple-licensed under the
44
// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2
@@ -52,7 +52,10 @@ public class RpcClientParams {
5252
*/
5353
private RpcClient.RpcClientReplyHandler replyHandler = RpcClient.DEFAULT_REPLY_HANDLER;
5454

55-
private Supplier<String> correlationIdGenerator = new IncrementingCorrelationIdGenerator("");
55+
/**
56+
* Logic to generate correlation IDs.
57+
*/
58+
private Supplier<String> correlationIdSupplier = RpcClient.incrementingCorrelationIdSupplier();
5659

5760
/**
5861
* Set the channel to use for communication.
@@ -146,7 +149,7 @@ public RpcClientParams timeout(int timeout) {
146149
*
147150
* @param useMandatory
148151
* @return
149-
* @see #replyHandler(RpcClient.RpcClientReplyHandler)
152+
* @see #replyHandler(Function)
150153
*/
151154
public RpcClientParams useMandatory(boolean useMandatory) {
152155
this.useMandatory = useMandatory;
@@ -170,17 +173,13 @@ public boolean shouldUseMandatory() {
170173
return useMandatory;
171174
}
172175

173-
public RpcClient.RpcClientReplyHandler getReplyHandler() {
174-
return replyHandler;
175-
}
176-
177-
public RpcClientParams correlationIdGenerator(Supplier<String> correlationIdGenerator) {
178-
this.correlationIdGenerator = correlationIdGenerator;
176+
public RpcClientParams correlationIdSupplier(Supplier<String> correlationIdGenerator) {
177+
this.correlationIdSupplier = correlationIdGenerator;
179178
return this;
180179
}
181180

182-
public Supplier<String> getCorrelationIdGenerator() {
183-
return correlationIdGenerator;
181+
public Supplier<String> getCorrelationIdSupplier() {
182+
return correlationIdSupplier;
184183
}
185184

186185
/**
@@ -199,4 +198,8 @@ public RpcClientParams replyHandler(RpcClient.RpcClientReplyHandler replyHandler
199198
this.replyHandler = replyHandler;
200199
return this;
201200
}
201+
202+
public RpcClient.RpcClientReplyHandler getReplyHandler() {
203+
return replyHandler;
204+
}
202205
}

src/test/java/com/rabbitmq/client/test/RpcTest.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2017 Pivotal Software, Inc. All rights reserved.
1+
// Copyright (c) 2017-2020 Pivotal Software, Inc. All rights reserved.
22
//
33
// This software, the RabbitMQ Java client library, is triple-licensed under the
44
// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2
@@ -34,7 +34,7 @@
3434
import com.rabbitmq.client.impl.recovery.RecordedQueue;
3535
import com.rabbitmq.client.impl.recovery.TopologyRecoveryFilter;
3636
import com.rabbitmq.tools.Host;
37-
import org.hamcrest.CoreMatchers;
37+
import org.assertj.core.api.Assertions;
3838
import org.junit.After;
3939
import org.junit.Before;
4040
import org.junit.Test;
@@ -100,6 +100,9 @@ public void run() {
100100
assertEquals("*** hello ***", new String(response.getBody()));
101101
assertEquals("pre-hello", response.getProperties().getHeaders().get("pre").toString());
102102
assertEquals("post-hello", response.getProperties().getHeaders().get("post").toString());
103+
104+
Assertions.assertThat(client.getCorrelationId()).isEqualTo(Integer.valueOf(response.getProperties().getCorrelationId()));
105+
103106
client.close();
104107
}
105108

@@ -158,7 +161,7 @@ public void run() {
158161
}
159162

160163
@Test
161-
public void rpcCustomCorrelatorId() throws Exception {
164+
public void rpcCustomCorrelationId() throws Exception {
162165
rpcServer = new TestRpcServer(serverChannel, queue);
163166
new Thread(() -> {
164167
try {
@@ -169,10 +172,10 @@ public void rpcCustomCorrelatorId() throws Exception {
169172
}).start();
170173
RpcClient client = new RpcClient(new RpcClientParams()
171174
.channel(clientChannel).exchange("").routingKey(queue).timeout(1000)
172-
.correlationIdGenerator(new IncrementingCorrelationIdGenerator("myPrefix-"))
175+
.correlationIdSupplier(RpcClient.incrementingCorrelationIdSupplier("myPrefix-"))
173176
);
174177
RpcClient.Response response = client.doCall(null, "hello".getBytes());
175-
assertThat(response.getProperties().getCorrelationId(), CoreMatchers.equalTo("myPrefix-0"));
178+
Assertions.assertThat(response.getProperties().getCorrelationId()).isEqualTo("myPrefix-1");
176179
client.close();
177180
}
178181

0 commit comments

Comments
 (0)