Skip to content

Commit f4c17c8

Browse files
authored
GH-3076: Set batchFailed in ListenerConsumer even if no commonErrorHandler
Fixes: #3076 The `ListenerConsumer` sets `batchFailed` to `true` only if `commonErrorHandler` is provided * Move `this.batchFailed = true;` in the `doInvokeBatchListener()` before any error handling. **Auto-cherry-pick to `3.1x` & `3.0.x`**
1 parent 7b0cd0f commit f4c17c8

File tree

2 files changed

+70
-2
lines changed

2 files changed

+70
-2
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,7 @@
163163
* @author Daniel Gentes
164164
* @author Soby Chacko
165165
* @author Wang Zhiyang
166+
* @author Raphael Rösch
166167
*/
167168
public class KafkaMessageListenerContainer<K, V> // NOSONAR line count
168169
extends AbstractMessageListenerContainer<K, V> implements ConsumerPauseResumeEventPublisher {
@@ -2276,13 +2277,13 @@ private RuntimeException doInvokeBatchListener(final ConsumerRecords<K, V> recor
22762277
}
22772278
}
22782279
catch (RuntimeException e) {
2280+
this.batchFailed = true;
22792281
failureTimer(sample, null);
22802282
batchInterceptAfter(records, e);
22812283
if (this.commonErrorHandler == null) {
22822284
throw e;
22832285
}
22842286
try {
2285-
this.batchFailed = true;
22862287
invokeBatchErrorHandler(records, recordList, e);
22872288
commitOffsetsIfNeededAfterHandlingError(records);
22882289
}

spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java

Lines changed: 68 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,14 +103,16 @@
103103
* @author Artem Bilan
104104
* @author Wang Zhiyang
105105
* @author Soby Chacko
106+
* @author Raphael Rösch
106107
*
107108
* @since 1.3
108109
*
109110
*/
110111
@EmbeddedKafka(topics = { TransactionalContainerTests.topic1, TransactionalContainerTests.topic2,
111112
TransactionalContainerTests.topic3, TransactionalContainerTests.topic3DLT, TransactionalContainerTests.topic4,
112113
TransactionalContainerTests.topic5, TransactionalContainerTests.topic6, TransactionalContainerTests.topic7,
113-
TransactionalContainerTests.topic8, TransactionalContainerTests.topic8DLT, TransactionalContainerTests.topic9},
114+
TransactionalContainerTests.topic8, TransactionalContainerTests.topic8DLT, TransactionalContainerTests.topic9,
115+
TransactionalContainerTests.topic10},
114116
brokerProperties = { "transaction.state.log.replication.factor=1", "transaction.state.log.min.isr=1" })
115117
public class TransactionalContainerTests {
116118

@@ -136,6 +138,8 @@ public class TransactionalContainerTests {
136138

137139
public static final String topic9 = "txTopic9";
138140

141+
public static final String topic10 = "txTopic10";
142+
139143
private static EmbeddedKafkaBroker embeddedKafka;
140144

141145
@BeforeAll
@@ -1082,4 +1086,67 @@ void testNoAfterRollbackWhenFenced() throws Exception {
10821086
assertThatIllegalStateException().isThrownBy(container::start);
10831087
}
10841088

1089+
1090+
@Test
1091+
void testArbpWithoutRecovery() throws InterruptedException {
1092+
// init producer
1093+
Map<String, Object> producerProperties = KafkaTestUtils.producerProps(embeddedKafka);
1094+
DefaultKafkaProducerFactory<Object, Object> pf = new DefaultKafkaProducerFactory<>(producerProperties);
1095+
pf.setTransactionIdPrefix("testArbpResetWithoutRecover.batchListener");
1096+
final KafkaTemplate<Object, Object> template = new KafkaTemplate<>(pf);
1097+
// init consumer
1098+
String group = "groupInARBP3";
1099+
Map<String, Object> consumerProperties = KafkaTestUtils.consumerProps(group, "false", embeddedKafka);
1100+
consumerProperties.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
1101+
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(consumerProperties);
1102+
ContainerProperties containerProps = new ContainerProperties(topic10);
1103+
containerProps.setPollTimeout(10_000);
1104+
containerProps.setBatchRecoverAfterRollback(false); // we want to test the behavior if recovery is disabled
1105+
final var successLatch = new AtomicReference<>(new CountDownLatch(2));
1106+
containerProps.setMessageListener(new BatchMessageListener<Integer, String>() {
1107+
private int attempt = 0;
1108+
1109+
@Override
1110+
public void onMessage(List<ConsumerRecord<Integer, String>> data) {
1111+
if (3 > attempt++) { // the first three attempts should fail
1112+
throw new BatchListenerFailedException("fail for test", data.get(0));
1113+
}
1114+
data.forEach(d -> successLatch.get().countDown());
1115+
}
1116+
});
1117+
// init container
1118+
KafkaTransactionManager<Object, Object> tm = new KafkaTransactionManager<>(pf);
1119+
containerProps.setKafkaAwareTransactionManager(tm);
1120+
KafkaMessageListenerContainer<Integer, String> container = new KafkaMessageListenerContainer<>(cf, containerProps);
1121+
container.setBeanName("testArbpWithoutRecover");
1122+
DefaultAfterRollbackProcessor<Object, Object> afterRollbackProcessor = spy(
1123+
new DefaultAfterRollbackProcessor<>(new FixedBackOff(0L, FixedBackOff.UNLIMITED_ATTEMPTS))
1124+
);
1125+
container.setAfterRollbackProcessor(afterRollbackProcessor);
1126+
container.start();
1127+
1128+
// process first batch
1129+
template.executeInTransaction(t -> {
1130+
template.send(new ProducerRecord<>(topic10, 0, 0, "bar1"));
1131+
template.send(new ProducerRecord<>(topic10, 0, 0, "bar2"));
1132+
return null;
1133+
});
1134+
assertThat(successLatch.get().await(30, TimeUnit.SECONDS)).isTrue(); // wait for first batch
1135+
1136+
// process second batch
1137+
successLatch.set(new CountDownLatch(2)); // reset latch
1138+
template.executeInTransaction(t -> {
1139+
template.send(new ProducerRecord<>(topic10, 0, 0, "bar4"));
1140+
template.send(new ProducerRecord<>(topic10, 0, 0, "bar5"));
1141+
return null;
1142+
});
1143+
assertThat(successLatch.get().await(30, TimeUnit.SECONDS)).isTrue(); // wait for second batch
1144+
1145+
// assert three processBatch calls due to the failed attempts + one call to clearThreadState
1146+
verify(afterRollbackProcessor, times(3)).processBatch(any(), any(), any(), any(), any(), anyBoolean(), any());
1147+
verify(afterRollbackProcessor).clearThreadState();
1148+
1149+
container.stop();
1150+
}
1151+
10851152
}

0 commit comments

Comments
 (0)