Skip to content

637: Configurable correlatorId generation for RPCClient #638

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Feb 17, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.rabbitmq.client;

import java.util.function.Supplier;

public class IncrementingCorrelationIdGenerator implements Supplier<String> {

private final String _prefix;
private int _correlationId;

public IncrementingCorrelationIdGenerator(String _prefix) {
this._prefix = _prefix;
}

@Override
public String get() {
return _prefix + _correlationId++;
}
}
18 changes: 6 additions & 12 deletions src/main/java/com/rabbitmq/client/RpcClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.Map.Entry;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.function.Supplier;

import com.rabbitmq.client.impl.MethodArgumentReader;
import com.rabbitmq.client.impl.MethodArgumentWriter;
Expand Down Expand Up @@ -79,12 +80,14 @@ public class RpcClient {
}
};

public static Supplier<String> DEFAULT_CORRELATION_ID_GENERATOR = new IncrementingCorrelationIdGenerator("");

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As-is this static instance may be used by more than one RpcClient instance. In that case Supplier#get may be invoked concurrently, as the synchronized (_continuationMap) block only ensures exclusive access for doCall invocations on the same client. This is a problem because as-is IncrementingCorrelationIdGenerator is not thread-safe.

One option would be to replace int _correlationId with an AtomicInteger (or perhaps more appropriately, an AtomicLong.)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. This should not be static. Will remove that.


private final Function<Object, Response> _replyHandler;

/** Map from request correlation ID to continuation BlockingCell */
private final Map<String, BlockingCell<Object>> _continuationMap = new HashMap<String, BlockingCell<Object>>();
/** Contains the most recently-used request correlation ID */
private int _correlationId;
private final Supplier<String> _correlationIdGenerator;

/** Consumer attached to our reply queue */
private DefaultConsumer _consumer;
Expand All @@ -109,7 +112,7 @@ public RpcClient(RpcClientParams params) throws
_timeout = params.getTimeout();
_useMandatory = params.shouldUseMandatory();
_replyHandler = params.getReplyHandler();
_correlationId = 0;
_correlationIdGenerator = params.getCorrelationIdGenerator();

_consumer = setupConsumer();
if (_useMandatory) {
Expand Down Expand Up @@ -208,8 +211,7 @@ public Response doCall(AMQP.BasicProperties props, byte[] message, int timeout)
BlockingCell<Object> k = new BlockingCell<Object>();
String replyId;
synchronized (_continuationMap) {
_correlationId++;
replyId = "" + _correlationId;
replyId = _correlationIdGenerator.get();
props = ((props==null) ? new AMQP.BasicProperties.Builder() : props.builder())
.correlationId(replyId).replyTo(_replyTo).build();
_continuationMap.put(replyId, k);
Expand Down Expand Up @@ -389,14 +391,6 @@ public Map<String, BlockingCell<Object>> getContinuationMap() {
return _continuationMap;
}

/**
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a breaking change. The last correlation ID must be kept in some way, likely by keeping the _correlationId property and update it in the synchronized block.

* Retrieve the correlation id.
* @return the most recently used correlation id
*/
public int getCorrelationId() {
return _correlationId;
}

/**
* Retrieve the consumer.
* @return an interface to the client's consumer object
Expand Down
12 changes: 12 additions & 0 deletions src/main/java/com/rabbitmq/client/RpcClientParams.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.rabbitmq.client;

import java.util.function.Function;
import java.util.function.Supplier;

/**
* Holder class to configure a {@link RpcClient}.
Expand Down Expand Up @@ -54,6 +55,8 @@ public class RpcClientParams {
*/
private Function<Object, RpcClient.Response> replyHandler = RpcClient.DEFAULT_REPLY_HANDLER;

private Supplier<String> correlationIdGenerator = RpcClient.DEFAULT_CORRELATION_ID_GENERATOR;

/**
* Set the channel to use for communication.
*
Expand Down Expand Up @@ -170,6 +173,15 @@ public boolean shouldUseMandatory() {
return useMandatory;
}

public RpcClientParams correlationIdGenerator(Supplier<String> correlationIdGenerator) {
this.correlationIdGenerator = correlationIdGenerator;
return this;
}

public Supplier<String> getCorrelationIdGenerator() {
return correlationIdGenerator;
}

public Function<Object, RpcClient.Response> getReplyHandler() {
return replyHandler;
}
Expand Down
25 changes: 21 additions & 4 deletions src/test/java/com/rabbitmq/client/test/RpcTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.rabbitmq.client.impl.recovery.RecordedQueue;
import com.rabbitmq.client.impl.recovery.TopologyRecoveryFilter;
import com.rabbitmq.tools.Host;
import org.hamcrest.CoreMatchers;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
Expand All @@ -39,9 +40,7 @@
import java.util.concurrent.atomic.AtomicInteger;

import static org.awaitility.Awaitility.waitAtMost;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.junit.Assert.*;

public class RpcTest {

Expand Down Expand Up @@ -138,6 +137,25 @@ public void rpcUnroutableWithMandatoryFlagShouldThrowUnroutableException() throw
client.close();
}

@Test
public void rpcCustomCorrelatorId() throws Exception {
rpcServer = new TestRpcServer(serverChannel, queue);
new Thread(() -> {
try {
rpcServer.mainloop();
} catch (Exception e) {
// safe to ignore when loops ends/server is canceled
}
}).start();
RpcClient client = new RpcClient(new RpcClientParams()
.channel(clientChannel).exchange("").routingKey(queue).timeout(1000)
.correlationIdGenerator(new IncrementingCorrelationIdGenerator("myPrefix-"))
);
RpcClient.Response response = client.doCall(null, "hello".getBytes());
assertThat(response.getProperties().getCorrelationId(), CoreMatchers.equalTo("myPrefix-0"));
client.close();
}

@Test
public void rpcCustomReplyHandler() throws Exception {
rpcServer = new TestRpcServer(serverChannel, queue);
Expand All @@ -156,7 +174,6 @@ public void rpcCustomReplyHandler() throws Exception {
return RpcClient.DEFAULT_REPLY_HANDLER.apply(reply);
})
);
assertEquals(0, replyHandlerCalls.get());
RpcClient.Response response = client.doCall(null, "hello".getBytes());
assertEquals(1, replyHandlerCalls.get());
assertEquals("*** hello ***", new String(response.getBody()));
Expand Down