Skip to content

Commit ebffa3e

Browse files
authored
Merge pull request #638 from janssk1/master
637: Configurable correlatorId generation for RPCClient
2 parents 12c085b + 59e9455 commit ebffa3e

File tree

4 files changed

+65
-9
lines changed

4 files changed

+65
-9
lines changed
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package com.rabbitmq.client;
2+
3+
import java.util.function.Supplier;
4+
5+
public class IncrementingCorrelationIdGenerator implements Supplier<String> {
6+
7+
private final String _prefix;
8+
private int _correlationId;
9+
10+
public IncrementingCorrelationIdGenerator(String _prefix) {
11+
this._prefix = _prefix;
12+
}
13+
14+
@Override
15+
public String get() {
16+
return _prefix + _correlationId++;
17+
}
18+
19+
public int getCorrelationId() {
20+
return _correlationId;
21+
}
22+
}

src/main/java/com/rabbitmq/client/RpcClient.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import java.util.Map.Entry;
2929
import java.util.concurrent.TimeoutException;
3030
import java.util.function.Function;
31+
import java.util.function.Supplier;
3132

3233
import com.rabbitmq.client.impl.MethodArgumentReader;
3334
import com.rabbitmq.client.impl.MethodArgumentWriter;
@@ -84,7 +85,7 @@ public class RpcClient {
8485
/** Map from request correlation ID to continuation BlockingCell */
8586
private final Map<String, BlockingCell<Object>> _continuationMap = new HashMap<String, BlockingCell<Object>>();
8687
/** Contains the most recently-used request correlation ID */
87-
private int _correlationId;
88+
private final Supplier<String> _correlationIdGenerator;
8889

8990
/** Consumer attached to our reply queue */
9091
private DefaultConsumer _consumer;
@@ -109,7 +110,7 @@ public RpcClient(RpcClientParams params) throws
109110
_timeout = params.getTimeout();
110111
_useMandatory = params.shouldUseMandatory();
111112
_replyHandler = params.getReplyHandler();
112-
_correlationId = 0;
113+
_correlationIdGenerator = params.getCorrelationIdGenerator();
113114

114115
_consumer = setupConsumer();
115116
if (_useMandatory) {
@@ -208,8 +209,7 @@ public Response doCall(AMQP.BasicProperties props, byte[] message, int timeout)
208209
BlockingCell<Object> k = new BlockingCell<Object>();
209210
String replyId;
210211
synchronized (_continuationMap) {
211-
_correlationId++;
212-
replyId = "" + _correlationId;
212+
replyId = _correlationIdGenerator.get();
213213
props = ((props==null) ? new AMQP.BasicProperties.Builder() : props.builder())
214214
.correlationId(replyId).replyTo(_replyTo).build();
215215
_continuationMap.put(replyId, k);
@@ -392,9 +392,14 @@ public Map<String, BlockingCell<Object>> getContinuationMap() {
392392
/**
393393
* Retrieve the correlation id.
394394
* @return the most recently used correlation id
395+
* @deprecated Only works for {@link IncrementingCorrelationIdGenerator}
395396
*/
396397
public int getCorrelationId() {
397-
return _correlationId;
398+
if (_correlationIdGenerator instanceof IncrementingCorrelationIdGenerator) {
399+
return ((IncrementingCorrelationIdGenerator) _correlationIdGenerator).getCorrelationId();
400+
} else {
401+
throw new UnsupportedOperationException();
402+
}
398403
}
399404

400405
/**

src/main/java/com/rabbitmq/client/RpcClientParams.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package com.rabbitmq.client;
1717

1818
import java.util.function.Function;
19+
import java.util.function.Supplier;
1920

2021
/**
2122
* Holder class to configure a {@link RpcClient}.
@@ -54,6 +55,8 @@ public class RpcClientParams {
5455
*/
5556
private Function<Object, RpcClient.Response> replyHandler = RpcClient.DEFAULT_REPLY_HANDLER;
5657

58+
private Supplier<String> correlationIdGenerator = new IncrementingCorrelationIdGenerator("");
59+
5760
/**
5861
* Set the channel to use for communication.
5962
*
@@ -170,6 +173,15 @@ public boolean shouldUseMandatory() {
170173
return useMandatory;
171174
}
172175

176+
public RpcClientParams correlationIdGenerator(Supplier<String> correlationIdGenerator) {
177+
this.correlationIdGenerator = correlationIdGenerator;
178+
return this;
179+
}
180+
181+
public Supplier<String> getCorrelationIdGenerator() {
182+
return correlationIdGenerator;
183+
}
184+
173185
public Function<Object, RpcClient.Response> getReplyHandler() {
174186
return replyHandler;
175187
}

src/test/java/com/rabbitmq/client/test/RpcTest.java

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import com.rabbitmq.client.impl.recovery.RecordedQueue;
2525
import com.rabbitmq.client.impl.recovery.TopologyRecoveryFilter;
2626
import com.rabbitmq.tools.Host;
27+
import org.hamcrest.CoreMatchers;
2728
import org.junit.After;
2829
import org.junit.Before;
2930
import org.junit.Test;
@@ -39,9 +40,7 @@
3940
import java.util.concurrent.atomic.AtomicInteger;
4041

4142
import static org.awaitility.Awaitility.waitAtMost;
42-
import static org.junit.Assert.assertEquals;
43-
import static org.junit.Assert.assertTrue;
44-
import static org.junit.Assert.fail;
43+
import static org.junit.Assert.*;
4544

4645
public class RpcTest {
4746

@@ -138,6 +137,25 @@ public void rpcUnroutableWithMandatoryFlagShouldThrowUnroutableException() throw
138137
client.close();
139138
}
140139

140+
@Test
141+
public void rpcCustomCorrelatorId() throws Exception {
142+
rpcServer = new TestRpcServer(serverChannel, queue);
143+
new Thread(() -> {
144+
try {
145+
rpcServer.mainloop();
146+
} catch (Exception e) {
147+
// safe to ignore when loops ends/server is canceled
148+
}
149+
}).start();
150+
RpcClient client = new RpcClient(new RpcClientParams()
151+
.channel(clientChannel).exchange("").routingKey(queue).timeout(1000)
152+
.correlationIdGenerator(new IncrementingCorrelationIdGenerator("myPrefix-"))
153+
);
154+
RpcClient.Response response = client.doCall(null, "hello".getBytes());
155+
assertThat(response.getProperties().getCorrelationId(), CoreMatchers.equalTo("myPrefix-0"));
156+
client.close();
157+
}
158+
141159
@Test
142160
public void rpcCustomReplyHandler() throws Exception {
143161
rpcServer = new TestRpcServer(serverChannel, queue);
@@ -156,7 +174,6 @@ public void rpcCustomReplyHandler() throws Exception {
156174
return RpcClient.DEFAULT_REPLY_HANDLER.apply(reply);
157175
})
158176
);
159-
assertEquals(0, replyHandlerCalls.get());
160177
RpcClient.Response response = client.doCall(null, "hello".getBytes());
161178
assertEquals(1, replyHandlerCalls.get());
162179
assertEquals("*** hello ***", new String(response.getBody()));

0 commit comments

Comments
 (0)