-
Notifications
You must be signed in to change notification settings - Fork 582
637: Configurable correlatorId generation for RPCClient #638
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,22 @@ | ||
package com.rabbitmq.client; | ||
|
||
import java.util.function.Supplier; | ||
|
||
public class IncrementingCorrelationIdGenerator implements Supplier<String> { | ||
|
||
private final String _prefix; | ||
private int _correlationId; | ||
|
||
public IncrementingCorrelationIdGenerator(String _prefix) { | ||
this._prefix = _prefix; | ||
} | ||
|
||
@Override | ||
public String get() { | ||
return _prefix + _correlationId++; | ||
} | ||
|
||
public int getCorrelationId() { | ||
return _correlationId; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -28,6 +28,7 @@ | |
import java.util.Map.Entry; | ||
import java.util.concurrent.TimeoutException; | ||
import java.util.function.Function; | ||
import java.util.function.Supplier; | ||
|
||
import com.rabbitmq.client.impl.MethodArgumentReader; | ||
import com.rabbitmq.client.impl.MethodArgumentWriter; | ||
|
@@ -84,7 +85,7 @@ public class RpcClient { | |
/** Map from request correlation ID to continuation BlockingCell */ | ||
private final Map<String, BlockingCell<Object>> _continuationMap = new HashMap<String, BlockingCell<Object>>(); | ||
/** Contains the most recently-used request correlation ID */ | ||
private int _correlationId; | ||
private final Supplier<String> _correlationIdGenerator; | ||
|
||
/** Consumer attached to our reply queue */ | ||
private DefaultConsumer _consumer; | ||
|
@@ -109,7 +110,7 @@ public RpcClient(RpcClientParams params) throws | |
_timeout = params.getTimeout(); | ||
_useMandatory = params.shouldUseMandatory(); | ||
_replyHandler = params.getReplyHandler(); | ||
_correlationId = 0; | ||
_correlationIdGenerator = params.getCorrelationIdGenerator(); | ||
|
||
_consumer = setupConsumer(); | ||
if (_useMandatory) { | ||
|
@@ -208,8 +209,7 @@ public Response doCall(AMQP.BasicProperties props, byte[] message, int timeout) | |
BlockingCell<Object> k = new BlockingCell<Object>(); | ||
String replyId; | ||
synchronized (_continuationMap) { | ||
_correlationId++; | ||
replyId = "" + _correlationId; | ||
replyId = _correlationIdGenerator.get(); | ||
props = ((props==null) ? new AMQP.BasicProperties.Builder() : props.builder()) | ||
.correlationId(replyId).replyTo(_replyTo).build(); | ||
_continuationMap.put(replyId, k); | ||
|
@@ -392,9 +392,14 @@ public Map<String, BlockingCell<Object>> getContinuationMap() { | |
/** | ||
* Retrieve the correlation id. | ||
* @return the most recently used correlation id | ||
* @deprecated Only works for {@link IncrementingCorrelationIdGenerator} | ||
*/ | ||
public int getCorrelationId() { | ||
return _correlationId; | ||
if (_correlationIdGenerator instanceof IncrementingCorrelationIdGenerator) { | ||
return ((IncrementingCorrelationIdGenerator) _correlationIdGenerator).getCorrelationId(); | ||
} else { | ||
throw new UnsupportedOperationException(); | ||
} | ||
Comment on lines
+398
to
+402
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not super fan of this code TBH. Maybe keeping a reference to the last correlation ID (a string) and trying to convert it to an integer when There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is implementation is only there for BW compatiblity, So i don't think it's a problem that it only works for the 'default' id generator. I marked the method as deprecated anyway. |
||
} | ||
|
||
/** | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a breaking change. The last correlation ID must be kept in some way, likely by keeping the
_correlationId
property and update it in the synchronized block.