Skip to content

Commit d4b3090

Browse files
committed
Integrate correlation ID supplier
References #637
1 parent 80052a7 commit d4b3090

File tree

2 files changed

+37
-19
lines changed

2 files changed

+37
-19
lines changed

src/main/java/com/rabbitmq/client/RpcClient.java

Lines changed: 26 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -89,9 +89,9 @@ public Response handle(Object reply) {
8989
/**
9090
* Generates correlation ID for each request.
9191
*
92-
* @since 5.9.0
92+
* @since 4.12.0
9393
*/
94-
private final Supplier<String> _correlationIdGenerator;
94+
private final CorrelationIdSupplier _correlationIdSupplier;
9595

9696
private String lastCorrelationId = "0";
9797

@@ -118,7 +118,7 @@ public RpcClient(RpcClientParams params) throws
118118
_timeout = params.getTimeout();
119119
_useMandatory = params.shouldUseMandatory();
120120
_replyHandler = params.getReplyHandler();
121-
_correlationIdGenerator = params.getCorrelationIdSupplier();
121+
_correlationIdSupplier = params.getCorrelationIdSupplier();
122122

123123
_consumer = setupConsumer();
124124
if (_useMandatory) {
@@ -304,7 +304,7 @@ public Response doCall(AMQP.BasicProperties props, byte[] message, int timeout)
304304
BlockingCell<Object> k = new BlockingCell<Object>();
305305
String replyId;
306306
synchronized (_continuationMap) {
307-
replyId = _correlationIdGenerator.get();
307+
replyId = _correlationIdSupplier.get();
308308
lastCorrelationId = replyId;
309309
props = ((props==null) ? new AMQP.BasicProperties.Builder() : props.builder())
310310
.correlationId(replyId).replyTo(_replyTo).build();
@@ -488,16 +488,16 @@ public Map<String, BlockingCell<Object>> getContinuationMap() {
488488
/**
489489
* Retrieve the last correlation id used.
490490
* <p>
491-
* Note as of 5.9.0, correlation IDs may not always be integers
491+
* Note as of 4.12.0, correlation IDs may not always be integers
492492
* (by default, they are).
493493
* This method will try to parse the last correlation ID string
494494
* as an integer, so this may result in {@link NumberFormatException}
495495
* if the correlation ID supplier provided by
496-
* {@link RpcClientParams#correlationIdSupplier(Supplier)}
496+
* {@link RpcClientParams#correlationIdSupplier(CorrelationIdSupplier)}
497497
* does not generate appropriate IDs.
498498
*
499499
* @return the most recently used correlation id
500-
* @see RpcClientParams#correlationIdSupplier(Supplier)
500+
* @see RpcClientParams#correlationIdSupplier(CorrelationIdSupplier)
501501
*/
502502
public int getCorrelationId() {
503503
return Integer.valueOf(this.lastCorrelationId);
@@ -569,14 +569,25 @@ public interface RpcClientReplyHandler {
569569

570570
}
571571

572+
/**
573+
* Contract to generate correlation IDs.
574+
*
575+
* @since 4.12.0
576+
*/
577+
public interface CorrelationIdSupplier {
578+
579+
String get();
580+
581+
}
582+
572583
/**
573584
* Creates generation IDs as a sequence of integers.
574585
*
575586
* @return
576-
* @see RpcClientParams#correlationIdSupplier(Supplier)
577-
* @since 5.9.0
587+
* @see RpcClientParams#correlationIdSupplier(CorrelationIdSupplier)
588+
* @since 4.12.0
578589
*/
579-
public static Supplier<String> incrementingCorrelationIdSupplier() {
590+
public static CorrelationIdSupplier incrementingCorrelationIdSupplier() {
580591
return incrementingCorrelationIdSupplier("");
581592
}
582593

@@ -585,17 +596,17 @@ public static Supplier<String> incrementingCorrelationIdSupplier() {
585596
*
586597
* @param prefix
587598
* @return
588-
* @see RpcClientParams#correlationIdSupplier(Supplier)
589-
* @since 5.9.0
599+
* @see RpcClientParams#correlationIdSupplier(CorrelationIdSupplier)
600+
* @since 4.12.0
590601
*/
591-
public static Supplier<String> incrementingCorrelationIdSupplier(String prefix) {
602+
public static CorrelationIdSupplier incrementingCorrelationIdSupplier(String prefix) {
592603
return new IncrementingCorrelationIdSupplier(prefix);
593604
}
594605

595606
/**
596-
* @since 5.9.0
607+
* @since 4.12.0
597608
*/
598-
private static class IncrementingCorrelationIdSupplier implements Supplier<String> {
609+
private static class IncrementingCorrelationIdSupplier implements CorrelationIdSupplier {
599610

600611
private final String prefix;
601612
private int correlationId;

src/main/java/com/rabbitmq/client/RpcClientParams.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ public class RpcClientParams {
5555
/**
5656
* Logic to generate correlation IDs.
5757
*/
58-
private Supplier<String> correlationIdSupplier = RpcClient.incrementingCorrelationIdSupplier();
58+
private RpcClient.CorrelationIdSupplier correlationIdSupplier = RpcClient.incrementingCorrelationIdSupplier();
5959

6060
/**
6161
* Set the channel to use for communication.
@@ -149,7 +149,7 @@ public RpcClientParams timeout(int timeout) {
149149
*
150150
* @param useMandatory
151151
* @return
152-
* @see #replyHandler(Function)
152+
* @see #replyHandler(RpcClient.RpcClientReplyHandler)
153153
*/
154154
public RpcClientParams useMandatory(boolean useMandatory) {
155155
this.useMandatory = useMandatory;
@@ -173,12 +173,19 @@ public boolean shouldUseMandatory() {
173173
return useMandatory;
174174
}
175175

176-
public RpcClientParams correlationIdSupplier(Supplier<String> correlationIdGenerator) {
176+
/**
177+
* Logic to generate correlation IDs.
178+
*
179+
* @param correlationIdGenerator
180+
* @return
181+
* @since 4.12.0
182+
*/
183+
public RpcClientParams correlationIdSupplier(RpcClient.CorrelationIdSupplier correlationIdGenerator) {
177184
this.correlationIdSupplier = correlationIdGenerator;
178185
return this;
179186
}
180187

181-
public Supplier<String> getCorrelationIdSupplier() {
188+
public RpcClient.CorrelationIdSupplier getCorrelationIdSupplier() {
182189
return correlationIdSupplier;
183190
}
184191

0 commit comments

Comments
 (0)