Skip to content

Commit c5691d9

Browse files
janssk1acogoluegnes
janssk1
authored andcommitted
637: Configurable correlatorId generation for RPCClient
Change-Id: I17d7a214d3336ad6e5fe890857a48b457ea599b6 (cherry picked from commit e554e4b) Conflicts: src/main/java/com/rabbitmq/client/RpcClient.java src/main/java/com/rabbitmq/client/RpcClientParams.java
1 parent e517bf0 commit c5691d9

File tree

4 files changed

+53
-16
lines changed

4 files changed

+53
-16
lines changed
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package com.rabbitmq.client;
2+
3+
import java.util.function.Supplier;
4+
5+
public class IncrementingCorrelationIdGenerator implements Supplier<String> {
6+
7+
private final String _prefix;
8+
private int _correlationId;
9+
10+
public IncrementingCorrelationIdGenerator(String _prefix) {
11+
this._prefix = _prefix;
12+
}
13+
14+
@Override
15+
public String get() {
16+
return _prefix + _correlationId++;
17+
}
18+
}

src/main/java/com/rabbitmq/client/RpcClient.java

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ public Response handle(Object reply) {
8686
/** Map from request correlation ID to continuation BlockingCell */
8787
private final Map<String, BlockingCell<Object>> _continuationMap = new HashMap<String, BlockingCell<Object>>();
8888
/** Contains the most recently-used request correlation ID */
89-
private int _correlationId;
89+
private final Supplier<String> _correlationIdGenerator;
9090

9191
/** Consumer attached to our reply queue */
9292
private DefaultConsumer _consumer;
@@ -111,7 +111,7 @@ public RpcClient(RpcClientParams params) throws
111111
_timeout = params.getTimeout();
112112
_useMandatory = params.shouldUseMandatory();
113113
_replyHandler = params.getReplyHandler();
114-
_correlationId = 0;
114+
_correlationIdGenerator = params.getCorrelationIdGenerator();
115115

116116
_consumer = setupConsumer();
117117
if (_useMandatory) {
@@ -297,8 +297,7 @@ public Response doCall(AMQP.BasicProperties props, byte[] message, int timeout)
297297
BlockingCell<Object> k = new BlockingCell<Object>();
298298
String replyId;
299299
synchronized (_continuationMap) {
300-
_correlationId++;
301-
replyId = "" + _correlationId;
300+
replyId = _correlationIdGenerator.get();
302301
props = ((props==null) ? new AMQP.BasicProperties.Builder() : props.builder())
303302
.correlationId(replyId).replyTo(_replyTo).build();
304303
_continuationMap.put(replyId, k);
@@ -478,14 +477,6 @@ public Map<String, BlockingCell<Object>> getContinuationMap() {
478477
return _continuationMap;
479478
}
480479

481-
/**
482-
* Retrieve the correlation id.
483-
* @return the most recently used correlation id
484-
*/
485-
public int getCorrelationId() {
486-
return _correlationId;
487-
}
488-
489480
/**
490481
* Retrieve the consumer.
491482
* @return an interface to the client's consumer object

src/main/java/com/rabbitmq/client/RpcClientParams.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ public class RpcClientParams {
5252
*/
5353
private RpcClient.RpcClientReplyHandler replyHandler = RpcClient.DEFAULT_REPLY_HANDLER;
5454

55+
private Supplier<String> correlationIdGenerator = RpcClient.DEFAULT_CORRELATION_ID_GENERATOR;
56+
5557
/**
5658
* Set the channel to use for communication.
5759
*
@@ -172,6 +174,15 @@ public RpcClient.RpcClientReplyHandler getReplyHandler() {
172174
return replyHandler;
173175
}
174176

177+
public RpcClientParams correlationIdGenerator(Supplier<String> correlationIdGenerator) {
178+
this.correlationIdGenerator = correlationIdGenerator;
179+
return this;
180+
}
181+
182+
public Supplier<String> getCorrelationIdGenerator() {
183+
return correlationIdGenerator;
184+
}
185+
175186
/**
176187
* Set the behavior to use when receiving replies.
177188
* <p>

src/test/java/com/rabbitmq/client/test/RpcTest.java

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import com.rabbitmq.client.impl.recovery.RecordedQueue;
3535
import com.rabbitmq.client.impl.recovery.TopologyRecoveryFilter;
3636
import com.rabbitmq.tools.Host;
37+
import org.hamcrest.CoreMatchers;
3738
import org.junit.After;
3839
import org.junit.Before;
3940
import org.junit.Test;
@@ -49,9 +50,7 @@
4950
import java.util.concurrent.atomic.AtomicInteger;
5051

5152
import static org.awaitility.Awaitility.waitAtMost;
52-
import static org.junit.Assert.assertEquals;
53-
import static org.junit.Assert.assertTrue;
54-
import static org.junit.Assert.fail;
53+
import static org.junit.Assert.*;
5554

5655
public class RpcTest {
5756

@@ -158,6 +157,25 @@ public void run() {
158157
client.close();
159158
}
160159

160+
@Test
161+
public void rpcCustomCorrelatorId() throws Exception {
162+
rpcServer = new TestRpcServer(serverChannel, queue);
163+
new Thread(() -> {
164+
try {
165+
rpcServer.mainloop();
166+
} catch (Exception e) {
167+
// safe to ignore when loops ends/server is canceled
168+
}
169+
}).start();
170+
RpcClient client = new RpcClient(new RpcClientParams()
171+
.channel(clientChannel).exchange("").routingKey(queue).timeout(1000)
172+
.correlationIdGenerator(new IncrementingCorrelationIdGenerator("myPrefix-"))
173+
);
174+
RpcClient.Response response = client.doCall(null, "hello".getBytes());
175+
assertThat(response.getProperties().getCorrelationId(), CoreMatchers.equalTo("myPrefix-0"));
176+
client.close();
177+
}
178+
161179
@Test
162180
public void rpcCustomReplyHandler() throws Exception {
163181
rpcServer = new TestRpcServer(serverChannel, queue);
@@ -182,7 +200,6 @@ public RpcClient.Response handle(Object reply) {
182200
}
183201
})
184202
);
185-
assertEquals(0, replyHandlerCalls.get());
186203
RpcClient.Response response = client.doCall(null, "hello".getBytes());
187204
assertEquals(1, replyHandlerCalls.get());
188205
assertEquals("*** hello ***", new String(response.getBody()));

0 commit comments

Comments
 (0)