Skip to content

Commit f9f1631

Browse files
garyrussellartembilan
authored andcommitted
GH-8638: Kafka: Send All Fails to Failure Channel
Resolves #8638 Previously, immediate failures (e.g. timeout getting metadata) were only thrown as exceptions, and not sent to the failure channel, if present. **cherry-pick to all supported branches** # Conflicts: # spring-integration-kafka/src/main/java/org/springframework/integration/kafka/outbound/KafkaProducerMessageHandler.java
1 parent d641e72 commit f9f1631

File tree

3 files changed

+62
-15
lines changed

3 files changed

+62
-15
lines changed

spring-integration-kafka/src/main/java/org/springframework/integration/kafka/outbound/KafkaProducerMessageHandler.java

Lines changed: 30 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2013-2021 the original author or authors.
2+
* Copyright 2013-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -423,6 +423,7 @@ public String getComponentType() {
423423
return this.isGateway ? "kafka:outbound-gateway" : "kafka:outbound-channel-adapter";
424424
}
425425

426+
@Nullable
426427
protected MessageChannel getSendFailureChannel() {
427428
if (this.sendFailureChannel != null) {
428429
return this.sendFailureChannel;
@@ -500,19 +501,27 @@ protected Object handleRequestMessage(final Message<?> message) {
500501
}
501502
ListenableFuture<SendResult<K, V>> sendFuture;
502503
RequestReplyFuture<K, V, Object> gatewayFuture = null;
503-
if (this.isGateway && (!preBuilt || producerRecord.headers().lastHeader(KafkaHeaders.REPLY_TOPIC) == null)) {
504-
producerRecord.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, getReplyTopic(message)));
505-
gatewayFuture = ((ReplyingKafkaTemplate<K, V, Object>) this.kafkaTemplate).sendAndReceive(producerRecord);
506-
sendFuture = gatewayFuture.getSendFuture();
507-
}
508-
else {
509-
if (this.transactional && !this.kafkaTemplate.inTransaction() && !this.allowNonTransactional) {
510-
sendFuture = this.kafkaTemplate.executeInTransaction(template -> template.send(producerRecord));
504+
try {
505+
if (this.isGateway
506+
&& (!preBuilt || producerRecord.headers().lastHeader(KafkaHeaders.REPLY_TOPIC) == null)) {
507+
producerRecord.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, getReplyTopic(message)));
508+
gatewayFuture = ((ReplyingKafkaTemplate<K, V, Object>) this.kafkaTemplate)
509+
.sendAndReceive(producerRecord);
510+
sendFuture = gatewayFuture.getSendFuture();
511511
}
512512
else {
513-
sendFuture = this.kafkaTemplate.send(producerRecord);
513+
if (this.transactional && !this.kafkaTemplate.inTransaction() && !this.allowNonTransactional) {
514+
sendFuture = this.kafkaTemplate.executeInTransaction(template -> template.send(producerRecord));
515+
}
516+
else {
517+
sendFuture = this.kafkaTemplate.send(producerRecord);
518+
}
514519
}
515520
}
521+
catch (RuntimeException rtex) {
522+
sendFailure(message, producerRecord, getSendFailureChannel(), rtex);
523+
throw rtex;
524+
}
516525
sendFutureIfRequested(sendFuture, futureToken);
517526
if (flush) {
518527
this.kafkaTemplate.flush();
@@ -680,11 +689,7 @@ public void onSuccess(SendResult<K, V> result) {
680689

681690
@Override
682691
public void onFailure(Throwable ex) {
683-
if (failureChannel != null) {
684-
KafkaProducerMessageHandler.this.messagingTemplate.send(failureChannel,
685-
KafkaProducerMessageHandler.this.errorMessageStrategy.buildErrorMessage(
686-
new KafkaSendFailureException(message, producerRecord, ex), null));
687-
}
692+
sendFailure(message, producerRecord, failureChannel, ex);
688693
}
689694

690695
});
@@ -713,6 +718,16 @@ public void onFailure(Throwable ex) {
713718
}
714719
}
715720

721+
private void sendFailure(final Message<?> message, final ProducerRecord<K, V> producerRecord,
722+
@Nullable MessageChannel failureChannel, Throwable exception) {
723+
724+
if (failureChannel != null) {
725+
KafkaProducerMessageHandler.this.messagingTemplate.send(failureChannel,
726+
KafkaProducerMessageHandler.this.errorMessageStrategy.buildErrorMessage(
727+
new KafkaSendFailureException(message, producerRecord, exception), null));
728+
}
729+
}
730+
716731
private Future<?> processReplyFuture(@Nullable RequestReplyFuture<?, ?, Object> future) {
717732
if (future == null) {
718733
return null;

spring-integration-kafka/src/test/java/org/springframework/integration/kafka/outbound/KafkaProducerMessageHandlerTests.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
import org.springframework.integration.kafka.support.KafkaIntegrationHeaders;
6060
import org.springframework.integration.kafka.support.KafkaSendFailureException;
6161
import org.springframework.integration.support.MessageBuilder;
62+
import org.springframework.kafka.KafkaException;
6263
import org.springframework.kafka.core.ConsumerFactory;
6364
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
6465
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
@@ -94,6 +95,7 @@
9495

9596
import static org.assertj.core.api.Assertions.assertThat;
9697
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
98+
import static org.assertj.core.api.InstanceOfAssertFactories.throwable;
9799
import static org.mockito.ArgumentMatchers.any;
98100
import static org.mockito.ArgumentMatchers.isNull;
99101
import static org.mockito.BDDMockito.given;
@@ -343,6 +345,35 @@ protected ListenableFuture<SendResult<Integer, String>> doSend(
343345
producerFactory.destroy();
344346
}
345347

348+
@SuppressWarnings({ "rawtypes", "unchecked" })
349+
@Test
350+
void immediateFailure() {
351+
Producer producer = mock(Producer.class);
352+
CompletableFuture cf = new CompletableFuture();
353+
RuntimeException rte = new RuntimeException("test.immediate");
354+
cf.completeExceptionally(rte);
355+
given(producer.send(any(), any())).willReturn(cf);
356+
ProducerFactory pf = mock(ProducerFactory.class);
357+
given(pf.createProducer()).willReturn(producer);
358+
KafkaTemplate template = new KafkaTemplate(pf);
359+
template.setDefaultTopic("foo");
360+
KafkaProducerMessageHandler handler = new KafkaProducerMessageHandler<>(template);
361+
QueueChannel fails = new QueueChannel();
362+
handler.setSendFailureChannel(fails);
363+
assertThatExceptionOfType(MessageHandlingException.class).isThrownBy(
364+
() -> handler.handleMessage(new GenericMessage<>("")))
365+
.withCauseExactlyInstanceOf(KafkaException.class)
366+
.withStackTraceContaining("test.immediate");
367+
Message<?> fail = fails.receive(0);
368+
assertThat(fail).isNotNull();
369+
assertThat(fail.getPayload())
370+
.asInstanceOf(throwable(KafkaSendFailureException.class))
371+
.cause()
372+
.isInstanceOf(KafkaException.class)
373+
.cause()
374+
.isEqualTo(rte);
375+
}
376+
346377
@Test
347378
void testOutboundWithCustomHeaderMapper() {
348379
DefaultKafkaProducerFactory<Integer, String> producerFactory = new DefaultKafkaProducerFactory<>(

src/checkstyle/checkstyle.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@
8080
value="org.assertj.core.api.Assertions.*,
8181
org.xmlunit.assertj3.XmlAssert.*,
8282
org.assertj.core.api.Assumptions.*,
83+
org.assertj.core.api.InstanceOfAssertFactories.*,
8384
org.awaitility.Awaitility.*,
8485
org.mockito.Mockito.*,
8586
org.mockito.BDDMockito.*,

0 commit comments

Comments
 (0)