Skip to content

637: Configurable correlatorId generation for RPCClient #638

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Feb 17, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package com.rabbitmq.client;

import java.util.function.Supplier;

public class IncrementingCorrelationIdGenerator implements Supplier<String> {

private final String _prefix;
private int _correlationId;

public IncrementingCorrelationIdGenerator(String _prefix) {
this._prefix = _prefix;
}

@Override
public String get() {
return _prefix + _correlationId++;
}

public int getCorrelationId() {
return _correlationId;
}
}
15 changes: 10 additions & 5 deletions src/main/java/com/rabbitmq/client/RpcClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.Map.Entry;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.function.Supplier;

import com.rabbitmq.client.impl.MethodArgumentReader;
import com.rabbitmq.client.impl.MethodArgumentWriter;
Expand Down Expand Up @@ -84,7 +85,7 @@ public class RpcClient {
/** Map from request correlation ID to continuation BlockingCell */
private final Map<String, BlockingCell<Object>> _continuationMap = new HashMap<String, BlockingCell<Object>>();
/** Contains the most recently-used request correlation ID */
private int _correlationId;
private final Supplier<String> _correlationIdGenerator;

/** Consumer attached to our reply queue */
private DefaultConsumer _consumer;
Expand All @@ -109,7 +110,7 @@ public RpcClient(RpcClientParams params) throws
_timeout = params.getTimeout();
_useMandatory = params.shouldUseMandatory();
_replyHandler = params.getReplyHandler();
_correlationId = 0;
_correlationIdGenerator = params.getCorrelationIdGenerator();

_consumer = setupConsumer();
if (_useMandatory) {
Expand Down Expand Up @@ -208,8 +209,7 @@ public Response doCall(AMQP.BasicProperties props, byte[] message, int timeout)
BlockingCell<Object> k = new BlockingCell<Object>();
String replyId;
synchronized (_continuationMap) {
_correlationId++;
replyId = "" + _correlationId;
replyId = _correlationIdGenerator.get();
props = ((props==null) ? new AMQP.BasicProperties.Builder() : props.builder())
.correlationId(replyId).replyTo(_replyTo).build();
_continuationMap.put(replyId, k);
Expand Down Expand Up @@ -392,9 +392,14 @@ public Map<String, BlockingCell<Object>> getContinuationMap() {
/**
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a breaking change. The last correlation ID must be kept in some way, likely by keeping the _correlationId property and update it in the synchronized block.

* Retrieve the correlation id.
* @return the most recently used correlation id
* @deprecated Only works for {@link IncrementingCorrelationIdGenerator}
*/
public int getCorrelationId() {
return _correlationId;
if (_correlationIdGenerator instanceof IncrementingCorrelationIdGenerator) {
return ((IncrementingCorrelationIdGenerator) _correlationIdGenerator).getCorrelationId();
} else {
throw new UnsupportedOperationException();
}
Comment on lines +398 to +402
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not super fan of this code TBH. Maybe keeping a reference to the last correlation ID (a string) and trying to convert it to an integer when getCorrelationId() is called would do the job as well. This is a best-effort approach anyway, but it avoids relying on some implementation.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is implementation is only there for BW compatiblity, So i don't think it's a problem that it only works for the 'default' id generator. I marked the method as deprecated anyway.

}

/**
Expand Down
12 changes: 12 additions & 0 deletions src/main/java/com/rabbitmq/client/RpcClientParams.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.rabbitmq.client;

import java.util.function.Function;
import java.util.function.Supplier;

/**
* Holder class to configure a {@link RpcClient}.
Expand Down Expand Up @@ -54,6 +55,8 @@ public class RpcClientParams {
*/
private Function<Object, RpcClient.Response> replyHandler = RpcClient.DEFAULT_REPLY_HANDLER;

private Supplier<String> correlationIdGenerator = new IncrementingCorrelationIdGenerator("");

/**
* Set the channel to use for communication.
*
Expand Down Expand Up @@ -170,6 +173,15 @@ public boolean shouldUseMandatory() {
return useMandatory;
}

public RpcClientParams correlationIdGenerator(Supplier<String> correlationIdGenerator) {
this.correlationIdGenerator = correlationIdGenerator;
return this;
}

public Supplier<String> getCorrelationIdGenerator() {
return correlationIdGenerator;
}

public Function<Object, RpcClient.Response> getReplyHandler() {
return replyHandler;
}
Expand Down
25 changes: 21 additions & 4 deletions src/test/java/com/rabbitmq/client/test/RpcTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.rabbitmq.client.impl.recovery.RecordedQueue;
import com.rabbitmq.client.impl.recovery.TopologyRecoveryFilter;
import com.rabbitmq.tools.Host;
import org.hamcrest.CoreMatchers;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
Expand All @@ -39,9 +40,7 @@
import java.util.concurrent.atomic.AtomicInteger;

import static org.awaitility.Awaitility.waitAtMost;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.junit.Assert.*;

public class RpcTest {

Expand Down Expand Up @@ -138,6 +137,25 @@ public void rpcUnroutableWithMandatoryFlagShouldThrowUnroutableException() throw
client.close();
}

@Test
public void rpcCustomCorrelatorId() throws Exception {
rpcServer = new TestRpcServer(serverChannel, queue);
new Thread(() -> {
try {
rpcServer.mainloop();
} catch (Exception e) {
// safe to ignore when loops ends/server is canceled
}
}).start();
RpcClient client = new RpcClient(new RpcClientParams()
.channel(clientChannel).exchange("").routingKey(queue).timeout(1000)
.correlationIdGenerator(new IncrementingCorrelationIdGenerator("myPrefix-"))
);
RpcClient.Response response = client.doCall(null, "hello".getBytes());
assertThat(response.getProperties().getCorrelationId(), CoreMatchers.equalTo("myPrefix-0"));
client.close();
}

@Test
public void rpcCustomReplyHandler() throws Exception {
rpcServer = new TestRpcServer(serverChannel, queue);
Expand All @@ -156,7 +174,6 @@ public void rpcCustomReplyHandler() throws Exception {
return RpcClient.DEFAULT_REPLY_HANDLER.apply(reply);
})
);
assertEquals(0, replyHandlerCalls.get());
RpcClient.Response response = client.doCall(null, "hello".getBytes());
assertEquals(1, replyHandlerCalls.get());
assertEquals("*** hello ***", new String(response.getBody()));
Expand Down