Skip to content

Commit 8b4b601

Browse files
authored
GH-2399: ReplyingKT Human Readable Correlation
Resolves #2399 By default, the correlationId header contains a binary representation of a UUID; add an option to use a String representation instead. * Fix test name.
1 parent 10b75a3 commit 8b4b601

File tree

3 files changed

+134
-23
lines changed

3 files changed

+134
-23
lines changed

spring-kafka/src/main/java/org/springframework/kafka/requestreply/AggregatingReplyingKafkaTemplate.java

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package org.springframework.kafka.requestreply;
1818

19+
import java.nio.charset.StandardCharsets;
1920
import java.time.Duration;
2021
import java.util.ArrayList;
2122
import java.util.Collection;
@@ -70,7 +71,7 @@ public class AggregatingReplyingKafkaTemplate<K, V, R>
7071

7172
private static final int DEFAULT_COMMIT_TIMEOUT = 30;
7273

73-
private final Map<CorrelationKey, Set<RecordHolder<K, R>>> pending = new HashMap<>();
74+
private final Map<Object, Set<RecordHolder<K, R>>> pending = new HashMap<>();
7475

7576
private final Map<TopicPartition, Long> offsets = new HashMap<>();
7677

@@ -132,7 +133,9 @@ public void onMessage(List<ConsumerRecord<K, Collection<ConsumerRecord<K, R>>>>
132133
+ " in the '" + correlationHeaderName + "' header");
133134
}
134135
else {
135-
CorrelationKey correlationId = new CorrelationKey(correlation.value());
136+
Object correlationId = isBinaryCorrelation()
137+
? new CorrelationKey(correlation.value())
138+
: new String(correlation.value(), StandardCharsets.UTF_8);
136139
synchronized (this) {
137140
if (isPending(correlationId)) {
138141
List<ConsumerRecord<K, R>> list = addToCollection(record, correlationId).stream()
@@ -142,8 +145,10 @@ public void onMessage(List<ConsumerRecord<K, Collection<ConsumerRecord<K, R>>>>
142145
ConsumerRecord<K, Collection<ConsumerRecord<K, R>>> done =
143146
new ConsumerRecord<>(AGGREGATED_RESULTS_TOPIC, 0, 0L, null, list);
144147
done.headers()
145-
.add(new RecordHeader(correlationHeaderName, correlationId
146-
.getCorrelationId()));
148+
.add(new RecordHeader(correlationHeaderName,
149+
isBinaryCorrelation()
150+
? ((CorrelationKey) correlationId).getCorrelationId()
151+
: ((String) correlationId).getBytes(StandardCharsets.UTF_8)));
147152
this.pending.remove(correlationId);
148153
checkOffsetsAndCommitIfNecessary(list, consumer);
149154
completed.add(done);
@@ -161,7 +166,7 @@ public void onMessage(List<ConsumerRecord<K, Collection<ConsumerRecord<K, R>>>>
161166
}
162167

163168
@Override
164-
protected synchronized boolean handleTimeout(CorrelationKey correlationId,
169+
protected synchronized boolean handleTimeout(Object correlationId,
165170
RequestReplyFuture<K, V, Collection<ConsumerRecord<K, R>>> future) {
166171

167172
Set<RecordHolder<K, R>> removed = this.pending.remove(correlationId);
@@ -191,7 +196,7 @@ private void checkOffsetsAndCommitIfNecessary(List<ConsumerRecord<K, R>> list, C
191196
}
192197

193198
@SuppressWarnings({ "rawtypes", "unchecked" })
194-
private Set<RecordHolder<K, R>> addToCollection(ConsumerRecord record, CorrelationKey correlationId) {
199+
private Set<RecordHolder<K, R>> addToCollection(ConsumerRecord record, Object correlationId) {
195200
Set<RecordHolder<K, R>> set = this.pending.computeIfAbsent(correlationId, id -> new LinkedHashSet<>());
196201
set.add(new RecordHolder<>(record));
197202
return set;

spring-kafka/src/main/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplate.java

Lines changed: 37 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ public class ReplyingKafkaTemplate<K, V, R> extends KafkaTemplate<K, V> implemen
8484

8585
private final GenericMessageListenerContainer<K, R> replyContainer;
8686

87-
private final ConcurrentMap<CorrelationKey, RequestReplyFuture<K, V, R>> futures = new ConcurrentHashMap<>();
87+
private final ConcurrentMap<Object, RequestReplyFuture<K, V, R>> futures = new ConcurrentHashMap<>();
8888

8989
private final byte[] replyTopic;
9090

@@ -102,7 +102,10 @@ public class ReplyingKafkaTemplate<K, V, R> extends KafkaTemplate<K, V> implemen
102102

103103
private boolean sharedReplyTopic;
104104

105-
private Function<ProducerRecord<K, V>, CorrelationKey> correlationStrategy = ReplyingKafkaTemplate::defaultCorrelationIdStrategy;
105+
private Function<ProducerRecord<K, V>, CorrelationKey> correlationStrategy =
106+
ReplyingKafkaTemplate::defaultCorrelationIdStrategy;
107+
108+
private boolean binaryCorrelation = true;
106109

107110
private String correlationHeaderName = KafkaHeaders.CORRELATION_ID;
108111

@@ -292,6 +295,20 @@ public void setReplyErrorChecker(Function<ConsumerRecord<?, ?>, Exception> reply
292295
this.replyErrorChecker = replyErrorChecker;
293296
}
294297

298+
/**
299+
* Set to false to use the String representation of the correlation as the
300+
* correlationId rather than the binary representation. Default true.
301+
* @param binaryCorrelation false for String.
302+
* @since 3.0
303+
*/
304+
public void setBinaryCorrelation(boolean binaryCorrelation) {
305+
this.binaryCorrelation = binaryCorrelation;
306+
}
307+
308+
protected boolean isBinaryCorrelation() {
309+
return this.binaryCorrelation;
310+
}
311+
295312
@Override
296313
public void afterPropertiesSet() {
297314
if (!this.schedulerSet && !this.schedulerInitialized) {
@@ -407,22 +424,26 @@ public RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record, @
407424
headers.add(new RecordHeader(this.replyPartitionHeaderName, this.replyPartition));
408425
}
409426
}
410-
headers.add(new RecordHeader(this.correlationHeaderName, correlationId.getCorrelationId()));
427+
Object correlation = this.binaryCorrelation ? correlationId : correlationId.toString();
428+
byte[] correlationValue = this.binaryCorrelation
429+
? correlationId.getCorrelationId()
430+
: ((String) correlation).getBytes(StandardCharsets.UTF_8);
431+
headers.add(new RecordHeader(this.correlationHeaderName, correlationValue));
411432
this.logger.debug(() -> "Sending: " + KafkaUtils.format(record) + WITH_CORRELATION_ID + correlationId);
412433
RequestReplyFuture<K, V, R> future = new RequestReplyFuture<>();
413-
this.futures.put(correlationId, future);
434+
this.futures.put(correlation, future);
414435
try {
415436
future.setSendFuture(send(record));
416437
}
417438
catch (Exception e) {
418-
this.futures.remove(correlationId);
439+
this.futures.remove(correlation);
419440
throw new KafkaException("Send failed", e);
420441
}
421-
scheduleTimeout(record, correlationId, timeout);
442+
scheduleTimeout(record, correlation, timeout);
422443
return future;
423444
}
424445

425-
private void scheduleTimeout(ProducerRecord<K, V> record, CorrelationKey correlationId, Duration replyTimeout) {
446+
private void scheduleTimeout(ProducerRecord<K, V> record, Object correlationId, Duration replyTimeout) {
426447
this.scheduler.schedule(() -> {
427448
RequestReplyFuture<K, V, R> removed = this.futures.remove(correlationId);
428449
if (removed != null) {
@@ -443,7 +464,7 @@ private void scheduleTimeout(ProducerRecord<K, V> record, CorrelationKey correla
443464
* @return true to indicate the future has been completed.
444465
* @since 2.3
445466
*/
446-
protected boolean handleTimeout(@SuppressWarnings("unused") CorrelationKey correlationId,
467+
protected boolean handleTimeout(@SuppressWarnings("unused") Object correlationId,
447468
@SuppressWarnings("unused") RequestReplyFuture<K, V, R> future) {
448469

449470
return false;
@@ -455,7 +476,7 @@ protected boolean handleTimeout(@SuppressWarnings("unused") CorrelationKey corre
455476
* @return true if pending.
456477
* @since 2.3
457478
*/
458-
protected boolean isPending(CorrelationKey correlationId) {
479+
protected boolean isPending(Object correlationId) {
459480
return this.futures.containsKey(correlationId);
460481
}
461482

@@ -481,9 +502,11 @@ private static <K, V> CorrelationKey defaultCorrelationIdStrategy(
481502
public void onMessage(List<ConsumerRecord<K, R>> data) {
482503
data.forEach(record -> {
483504
Header correlationHeader = record.headers().lastHeader(this.correlationHeaderName);
484-
CorrelationKey correlationId = null;
505+
Object correlationId = null;
485506
if (correlationHeader != null) {
486-
correlationId = new CorrelationKey(correlationHeader.value());
507+
correlationId = this.binaryCorrelation
508+
? new CorrelationKey(correlationHeader.value())
509+
: new String(correlationHeader.value(), StandardCharsets.UTF_8);
487510
}
488511
if (correlationId == null) {
489512
this.logger.error(() -> "No correlationId found in reply: " + KafkaUtils.format(record)
@@ -492,7 +515,7 @@ public void onMessage(List<ConsumerRecord<K, R>> data) {
492515
}
493516
else {
494517
RequestReplyFuture<K, V, R> future = this.futures.remove(correlationId);
495-
CorrelationKey correlationKey = correlationId;
518+
Object correlationKey = correlationId;
496519
if (future == null) {
497520
logLateArrival(record, correlationId);
498521
}
@@ -562,7 +585,7 @@ public static DeserializationException checkDeserialization(ConsumerRecord<?, ?>
562585
return null;
563586
}
564587

565-
protected void logLateArrival(ConsumerRecord<K, R> record, CorrelationKey correlationId) {
588+
protected void logLateArrival(ConsumerRecord<K, R> record, Object correlationId) {
566589
if (this.sharedReplyTopic) {
567590
this.logger.debug(() -> missingCorrelationLogMessage(record, correlationId));
568591
}
@@ -571,7 +594,7 @@ protected void logLateArrival(ConsumerRecord<K, R> record, CorrelationKey correl
571594
}
572595
}
573596

574-
private String missingCorrelationLogMessage(ConsumerRecord<K, R> record, CorrelationKey correlationId) {
597+
private String missingCorrelationLogMessage(ConsumerRecord<K, R> record, Object correlationId) {
575598
return "No pending reply: " + KafkaUtils.format(record) + WITH_CORRELATION_ID
576599
+ correlationId + ", perhaps timed out, or using a shared reply topic";
577600
}

spring-kafka/src/test/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplateTests.java

Lines changed: 86 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -398,6 +398,26 @@ public void testGoodDefaultReplyHeaders() throws Exception {
398398
}
399399
}
400400

401+
@Test
402+
public void testGoodDefaultReplyHeadersStringCorrelation() throws Exception {
403+
ReplyingKafkaTemplate<Integer, String, String> template = createTemplate(
404+
new TopicPartitionOffset(A_REPLY, 3));
405+
try {
406+
template.setDefaultReplyTimeout(Duration.ofSeconds(30));
407+
template.setBinaryCorrelation(false);
408+
ProducerRecord<Integer, String> record = new ProducerRecord<>(A_REQUEST, "bar");
409+
RequestReplyFuture<Integer, String, String> future = template.sendAndReceive(record);
410+
future.getSendFuture().get(10, TimeUnit.SECONDS); // send ok
411+
ConsumerRecord<Integer, String> consumerRecord = future.get(30, TimeUnit.SECONDS);
412+
assertThat(consumerRecord.value()).isEqualTo("BAR");
413+
assertThat(consumerRecord.partition()).isEqualTo(3);
414+
}
415+
finally {
416+
template.stop();
417+
template.destroy();
418+
}
419+
}
420+
401421
@Test
402422
public void testGoodSamePartition() throws Exception {
403423
ReplyingKafkaTemplate<Integer, String, String> template = createTemplate(A_REPLY);
@@ -472,6 +492,35 @@ public void testAggregateNormal() throws Exception {
472492
}
473493
}
474494

495+
@Test
496+
public void testAggregateNormalStringCorrelation() throws Exception {
497+
AggregatingReplyingKafkaTemplate<Integer, String, String> template = aggregatingTemplate(
498+
new TopicPartitionOffset(D_REPLY, 0), 2, new AtomicInteger());
499+
try {
500+
template.setCorrelationHeaderName("customCorrelation");
501+
template.setBinaryCorrelation(false);
502+
template.setDefaultReplyTimeout(Duration.ofSeconds(30));
503+
ProducerRecord<Integer, String> record = new ProducerRecord<>(D_REQUEST, null, null, null, "foo");
504+
RequestReplyFuture<Integer, String, Collection<ConsumerRecord<Integer, String>>> future =
505+
template.sendAndReceive(record);
506+
future.getSendFuture().get(10, TimeUnit.SECONDS); // send ok
507+
ConsumerRecord<Integer, Collection<ConsumerRecord<Integer, String>>> consumerRecord =
508+
future.get(30, TimeUnit.SECONDS);
509+
assertThat(consumerRecord.value().size()).isEqualTo(2);
510+
Iterator<ConsumerRecord<Integer, String>> iterator = consumerRecord.value().iterator();
511+
String value1 = iterator.next().value();
512+
assertThat(value1).isIn("fOO", "FOO");
513+
String value2 = iterator.next().value();
514+
assertThat(value2).isIn("fOO", "FOO");
515+
assertThat(value2).isNotSameAs(value1);
516+
assertThat(consumerRecord.topic()).isEqualTo(AggregatingReplyingKafkaTemplate.AGGREGATED_RESULTS_TOPIC);
517+
}
518+
finally {
519+
template.stop();
520+
template.destroy();
521+
}
522+
}
523+
475524
@SuppressWarnings("unchecked")
476525
@Test
477526
@Disabled("time sensitive")
@@ -480,6 +529,7 @@ public void testAggregateTimeout() throws Exception {
480529
new TopicPartitionOffset(E_REPLY, 0), 3, new AtomicInteger());
481530
try {
482531
template.setDefaultReplyTimeout(Duration.ofSeconds(5));
532+
template.setCorrelationHeaderName("customCorrelation");
483533
ProducerRecord<Integer, String> record = new ProducerRecord<>(E_REQUEST, null, null, null, "foo");
484534
RequestReplyFuture<Integer, String, Collection<ConsumerRecord<Integer, String>>> future =
485535
template.sendAndReceive(record);
@@ -508,14 +558,47 @@ public void testAggregateTimeout() throws Exception {
508558
}
509559

510560
@Test
511-
@Disabled("time sensitive")
512561
public void testAggregateTimeoutPartial() throws Exception {
513562
AtomicInteger releaseCount = new AtomicInteger();
514563
AggregatingReplyingKafkaTemplate<Integer, String, String> template = aggregatingTemplate(
515564
new TopicPartitionOffset(F_REPLY, 0), 3, releaseCount);
516565
template.setReturnPartialOnTimeout(true);
517566
try {
518567
template.setDefaultReplyTimeout(Duration.ofSeconds(5));
568+
template.setCorrelationHeaderName("customCorrelation");
569+
ProducerRecord<Integer, String> record = new ProducerRecord<>(F_REQUEST, null, null, null, "foo");
570+
RequestReplyFuture<Integer, String, Collection<ConsumerRecord<Integer, String>>> future =
571+
template.sendAndReceive(record);
572+
future.getSendFuture().get(10, TimeUnit.SECONDS); // send ok
573+
ConsumerRecord<Integer, Collection<ConsumerRecord<Integer, String>>> consumerRecord =
574+
future.get(30, TimeUnit.SECONDS);
575+
assertThat(consumerRecord.value().size()).isEqualTo(2);
576+
Iterator<ConsumerRecord<Integer, String>> iterator = consumerRecord.value().iterator();
577+
String value1 = iterator.next().value();
578+
assertThat(value1).isIn("fOO", "FOO");
579+
String value2 = iterator.next().value();
580+
assertThat(value2).isIn("fOO", "FOO");
581+
assertThat(value2).isNotSameAs(value1);
582+
assertThat(consumerRecord.topic())
583+
.isEqualTo(AggregatingReplyingKafkaTemplate.PARTIAL_RESULTS_AFTER_TIMEOUT_TOPIC);
584+
assertThat(releaseCount.get()).isEqualTo(3);
585+
}
586+
finally {
587+
template.stop();
588+
template.destroy();
589+
}
590+
}
591+
592+
@Test
593+
public void testAggregateTimeoutPartialStringCorrelation() throws Exception {
594+
AtomicInteger releaseCount = new AtomicInteger();
595+
AggregatingReplyingKafkaTemplate<Integer, String, String> template = aggregatingTemplate(
596+
new TopicPartitionOffset(F_REPLY, 0), 3, releaseCount);
597+
template.setReturnPartialOnTimeout(true);
598+
template.setBinaryCorrelation(false);
599+
try {
600+
template.setDefaultReplyTimeout(Duration.ofSeconds(5));
601+
template.setCorrelationHeaderName("customCorrelation");
519602
ProducerRecord<Integer, String> record = new ProducerRecord<>(F_REQUEST, null, null, null, "foo");
520603
RequestReplyFuture<Integer, String, Collection<ConsumerRecord<Integer, String>>> future =
521604
template.sendAndReceive(record);
@@ -531,7 +614,7 @@ public void testAggregateTimeoutPartial() throws Exception {
531614
assertThat(value2).isNotSameAs(value1);
532615
assertThat(consumerRecord.topic())
533616
.isEqualTo(AggregatingReplyingKafkaTemplate.PARTIAL_RESULTS_AFTER_TIMEOUT_TOPIC);
534-
assertThat(releaseCount.get()).isEqualTo(2);
617+
assertThat(releaseCount.get()).isEqualTo(3);
535618
}
536619
finally {
537620
template.stop();
@@ -654,7 +737,7 @@ public AggregatingReplyingKafkaTemplate<Integer, String, String> aggregatingTemp
654737
new AggregatingReplyingKafkaTemplate<>(this.config.pf(), container,
655738
(list, timeout) -> {
656739
releaseCount.incrementAndGet();
657-
return list.size() == releaseSize;
740+
return list.size() == releaseSize || timeout;
658741
});
659742
template.setSharedReplyTopic(true);
660743
template.start();

0 commit comments

Comments
 (0)