Skip to content

Commit c4ee551

Browse files
authored
GH-8638: Kafka: Send All Fails to Failure Channel
Resolves #8638 Previously, immediate failures (e.g. timeout getting metadata) were only thrown as exceptions, and not sent to the failure channel, if present. **cherry-pick to all supported branches**
1 parent 95f1eb8 commit c4ee551

File tree

3 files changed

+64
-15
lines changed

3 files changed

+64
-15
lines changed

spring-integration-kafka/src/main/java/org/springframework/integration/kafka/outbound/KafkaProducerMessageHandler.java

Lines changed: 31 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2013-2022 the original author or authors.
2+
* Copyright 2013-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -438,6 +438,7 @@ public String getComponentType() {
438438
return this.isGateway ? "kafka:outbound-gateway" : "kafka:outbound-channel-adapter";
439439
}
440440

441+
@Nullable
441442
protected MessageChannel getSendFailureChannel() {
442443
if (this.sendFailureChannel != null) {
443444
return this.sendFailureChannel;
@@ -515,19 +516,27 @@ protected Object handleRequestMessage(final Message<?> message) {
515516
}
516517
CompletableFuture<SendResult<K, V>> sendFuture;
517518
RequestReplyFuture<K, V, Object> gatewayFuture = null;
518-
if (this.isGateway && (!preBuilt || producerRecord.headers().lastHeader(KafkaHeaders.REPLY_TOPIC) == null)) {
519-
producerRecord.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, getReplyTopic(message)));
520-
gatewayFuture = ((ReplyingKafkaTemplate<K, V, Object>) this.kafkaTemplate).sendAndReceive(producerRecord);
521-
sendFuture = gatewayFuture.getSendFuture();
522-
}
523-
else {
524-
if (this.transactional && !this.kafkaTemplate.inTransaction() && !this.allowNonTransactional) {
525-
sendFuture = this.kafkaTemplate.executeInTransaction(template -> template.send(producerRecord));
519+
try {
520+
if (this.isGateway
521+
&& (!preBuilt || producerRecord.headers().lastHeader(KafkaHeaders.REPLY_TOPIC) == null)) {
522+
producerRecord.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, getReplyTopic(message)));
523+
gatewayFuture = ((ReplyingKafkaTemplate<K, V, Object>) this.kafkaTemplate)
524+
.sendAndReceive(producerRecord);
525+
sendFuture = gatewayFuture.getSendFuture();
526526
}
527527
else {
528-
sendFuture = this.kafkaTemplate.send(producerRecord);
528+
if (this.transactional && !this.kafkaTemplate.inTransaction() && !this.allowNonTransactional) {
529+
sendFuture = this.kafkaTemplate.executeInTransaction(template -> template.send(producerRecord));
530+
}
531+
else {
532+
sendFuture = this.kafkaTemplate.send(producerRecord);
533+
}
529534
}
530535
}
536+
catch (RuntimeException rtex) {
537+
sendFailure(message, producerRecord, getSendFailureChannel(), rtex);
538+
throw rtex;
539+
}
531540
sendFutureIfRequested(sendFuture, futureToken);
532541
if (flush) {
533542
this.kafkaTemplate.flush();
@@ -699,10 +708,8 @@ public void processSendResult(final Message<?> message, final ProducerRecord<K,
699708
.build());
700709
}
701710
}
702-
else if (failureChannel != null) {
703-
KafkaProducerMessageHandler.this.messagingTemplate.send(failureChannel,
704-
KafkaProducerMessageHandler.this.errorMessageStrategy.buildErrorMessage(
705-
new KafkaSendFailureException(message, producerRecord, exception), null));
711+
else {
712+
sendFailure(message, producerRecord, failureChannel, exception);
706713
}
707714
});
708715
}
@@ -730,6 +737,16 @@ else if (failureChannel != null) {
730737
}
731738
}
732739

740+
private void sendFailure(final Message<?> message, final ProducerRecord<K, V> producerRecord,
741+
@Nullable MessageChannel failureChannel, Throwable exception) {
742+
743+
if (failureChannel != null) {
744+
KafkaProducerMessageHandler.this.messagingTemplate.send(failureChannel,
745+
KafkaProducerMessageHandler.this.errorMessageStrategy.buildErrorMessage(
746+
new KafkaSendFailureException(message, producerRecord, exception), null));
747+
}
748+
}
749+
733750
private Future<?> processReplyFuture(@Nullable RequestReplyFuture<?, ?, Object> future) {
734751
if (future == null) {
735752
return null;

spring-integration-kafka/src/test/java/org/springframework/integration/kafka/outbound/KafkaProducerMessageHandlerTests.java

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2022 the original author or authors.
2+
* Copyright 2016-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -60,6 +60,7 @@
6060
import org.springframework.integration.kafka.support.KafkaIntegrationHeaders;
6161
import org.springframework.integration.kafka.support.KafkaSendFailureException;
6262
import org.springframework.integration.support.MessageBuilder;
63+
import org.springframework.kafka.KafkaException;
6364
import org.springframework.kafka.core.ConsumerFactory;
6465
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
6566
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
@@ -93,6 +94,7 @@
9394

9495
import static org.assertj.core.api.Assertions.assertThat;
9596
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
97+
import static org.assertj.core.api.InstanceOfAssertFactories.throwable;
9698
import static org.mockito.ArgumentMatchers.any;
9799
import static org.mockito.ArgumentMatchers.isNull;
98100
import static org.mockito.BDDMockito.given;
@@ -344,6 +346,35 @@ protected CompletableFuture<SendResult<Integer, String>> doSend(
344346
producerFactory.destroy();
345347
}
346348

349+
@SuppressWarnings({ "rawtypes", "unchecked" })
350+
@Test
351+
void immediateFailure() {
352+
Producer producer = mock(Producer.class);
353+
CompletableFuture cf = new CompletableFuture();
354+
RuntimeException rte = new RuntimeException("test.immediate");
355+
cf.completeExceptionally(rte);
356+
given(producer.send(any(), any())).willReturn(cf);
357+
ProducerFactory pf = mock(ProducerFactory.class);
358+
given(pf.createProducer()).willReturn(producer);
359+
KafkaTemplate template = new KafkaTemplate(pf);
360+
template.setDefaultTopic("foo");
361+
KafkaProducerMessageHandler handler = new KafkaProducerMessageHandler<>(template);
362+
QueueChannel fails = new QueueChannel();
363+
handler.setSendFailureChannel(fails);
364+
assertThatExceptionOfType(MessageHandlingException.class).isThrownBy(
365+
() -> handler.handleMessage(new GenericMessage<>("")))
366+
.withCauseExactlyInstanceOf(KafkaException.class)
367+
.withStackTraceContaining("test.immediate");
368+
Message<?> fail = fails.receive(0);
369+
assertThat(fail).isNotNull();
370+
assertThat(fail.getPayload())
371+
.asInstanceOf(throwable(KafkaSendFailureException.class))
372+
.cause()
373+
.isInstanceOf(KafkaException.class)
374+
.cause()
375+
.isEqualTo(rte);
376+
}
377+
347378
@Test
348379
void testOutboundWithCustomHeaderMapper() {
349380
DefaultKafkaProducerFactory<Integer, String> producerFactory = new DefaultKafkaProducerFactory<>(

src/checkstyle/checkstyle.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@
8080
value="org.assertj.core.api.Assertions.*,
8181
org.xmlunit.assertj3.XmlAssert.*,
8282
org.assertj.core.api.Assumptions.*,
83+
org.assertj.core.api.InstanceOfAssertFactories.*,
8384
org.awaitility.Awaitility.*,
8485
org.mockito.Mockito.*,
8586
org.mockito.BDDMockito.*,

0 commit comments

Comments
 (0)