Skip to content

Commit f07ce32

Browse files
committed
spring-projectsGH-3942: Fix Race in Kafka OB Gateway
Resolves spring-projects#3942 When determining the default reply-to topic/partition, we need to wait for assignment. Already covered by `KafkaDslTests` (a recent build failure exposed this problem). **No back-port - 5.5.x uses 2.7.x by default, which does not support this.** 5.5.x users can call `waitForAssignment` on the `ReplyingKafkaTemplate` that is supplied to the gateways before sending messages.
1 parent 73ba448 commit f07ce32

File tree

3 files changed

+35
-0
lines changed

3 files changed

+35
-0
lines changed

spring-integration-kafka/src/main/java/org/springframework/integration/kafka/dsl/KafkaOutboundGatewaySpec.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,18 @@ public S replyMessageConverter(RecordMessageConverter messageConverter) {
6060
return _this();
6161
}
6262

63+
/**
64+
* Set the time to wait for partition assignment, when used as a gateway, to determine
65+
* the default reply-to topic/partition.
66+
* @param duration the duration.
67+
* @return the spec.
68+
* @since 6.0
69+
*/
70+
public S assigmentDuration(Duration duration) {
71+
this.target.setAssignmentDuration(duration);
72+
return _this();
73+
}
74+
6375
/**
6476
* A {@link org.springframework.kafka.core.KafkaTemplate}-based {@link KafkaProducerMessageHandlerSpec} extension.
6577
*

spring-integration-kafka/src/main/java/org/springframework/integration/kafka/outbound/KafkaProducerMessageHandler.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import java.lang.reflect.Type;
2020
import java.nio.charset.StandardCharsets;
21+
import java.time.Duration;
2122
import java.util.Collection;
2223
import java.util.HashMap;
2324
import java.util.Map;
@@ -103,6 +104,8 @@ public class KafkaProducerMessageHandler<K, V> extends AbstractReplyProducingMes
103104
*/
104105
private static final int DEFAULT_TIMEOUT_BUFFER = 5000;
105106

107+
private static final Duration DEFAULT_ASSIGNMENT_TIMEOUT = Duration.ofSeconds(20);
108+
106109
private final Map<String, Set<Integer>> replyTopicsAndPartitions = new HashMap<>();
107110

108111
private final KafkaTemplate<K, V> kafkaTemplate;
@@ -162,6 +165,8 @@ public class KafkaProducerMessageHandler<K, V> extends AbstractReplyProducingMes
162165

163166
private boolean useTemplateConverter;
164167

168+
private Duration assignmentDuration = DEFAULT_ASSIGNMENT_TIMEOUT;
169+
165170
private volatile byte[] singleReplyTopic;
166171

167172
public KafkaProducerMessageHandler(final KafkaTemplate<K, V> kafkaTemplate) {
@@ -415,6 +420,17 @@ public void setUseTemplateConverter(boolean useTemplateConverter) {
415420
this.useTemplateConverter = useTemplateConverter;
416421
}
417422

423+
/**
424+
* Set the time to wait for partition assignment, when used as a gateway, to determine
425+
* the default reply-to topic/partition.
426+
* @param assignmentDuration the assignmentDuration to set.
427+
* @since 6.0
428+
*/
429+
public void setAssignmentDuration(Duration assignmentDuration) {
430+
Assert.notNull(assignmentDuration, "'assignmentDuration' cannot be null");
431+
this.assignmentDuration = assignmentDuration;
432+
}
433+
418434
@Override
419435
public String getComponentType() {
420436
return this.isGateway ? "kafka:outbound-gateway" : "kafka:outbound-channel-adapter";
@@ -647,6 +663,12 @@ private byte[] getSingleReplyTopic() {
647663

648664
private void determineValidReplyTopicsAndPartitions() {
649665
ReplyingKafkaTemplate<?, ?, ?> rkt = (ReplyingKafkaTemplate<?, ?, ?>) this.kafkaTemplate;
666+
try {
667+
rkt.waitForAssignment(this.assignmentDuration);
668+
}
669+
catch (InterruptedException e) {
670+
Thread.currentThread().interrupt();
671+
}
650672
Collection<TopicPartition> replyTopics = rkt.getAssignedReplyTopicPartitions();
651673
Map<String, Set<Integer>> topicsAndPartitions = new HashMap<>();
652674
if (replyTopics != null) {

spring-integration-kafka/src/test/java/org/springframework/integration/kafka/dsl/KafkaDslTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -400,6 +400,7 @@ public IntegrationFlow sourceFlow() {
400400
public IntegrationFlow outboundGateFlow() {
401401
return IntegrationFlow.from(Gate.class)
402402
.handle(Kafka.outboundGateway(producerFactory(), replyContainer())
403+
.assigmentDuration(Duration.ofSeconds(30))
403404
.flushExpression("true")
404405
.sync(true)
405406
.configureKafkaTemplate(t -> t.defaultReplyTimeout(Duration.ofSeconds(30))))

0 commit comments

Comments
 (0)