Skip to content

Commit 50d541c

Browse files
committed
spring-projectsGH-3076: Set batchFailed state in ListenerConsumer even if no commonErrorHandler is configured
Fixes: spring-projects#3076
1 parent 265e55f commit 50d541c

File tree

2 files changed

+68
-2
lines changed

2 files changed

+68
-2
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2276,13 +2276,13 @@ private RuntimeException doInvokeBatchListener(final ConsumerRecords<K, V> recor
22762276
}
22772277
}
22782278
catch (RuntimeException e) {
2279+
this.batchFailed = true;
22792280
failureTimer(sample, null);
22802281
batchInterceptAfter(records, e);
22812282
if (this.commonErrorHandler == null) {
22822283
throw e;
22832284
}
22842285
try {
2285-
this.batchFailed = true;
22862286
invokeBatchErrorHandler(records, recordList, e);
22872287
commitOffsetsIfNeededAfterHandlingError(records);
22882288
}

spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java

Lines changed: 67 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,8 @@
110110
@EmbeddedKafka(topics = { TransactionalContainerTests.topic1, TransactionalContainerTests.topic2,
111111
TransactionalContainerTests.topic3, TransactionalContainerTests.topic3DLT, TransactionalContainerTests.topic4,
112112
TransactionalContainerTests.topic5, TransactionalContainerTests.topic6, TransactionalContainerTests.topic7,
113-
TransactionalContainerTests.topic8, TransactionalContainerTests.topic8DLT, TransactionalContainerTests.topic9},
113+
TransactionalContainerTests.topic8, TransactionalContainerTests.topic8DLT, TransactionalContainerTests.topic9,
114+
TransactionalContainerTests.topic10},
114115
brokerProperties = { "transaction.state.log.replication.factor=1", "transaction.state.log.min.isr=1" })
115116
public class TransactionalContainerTests {
116117

@@ -136,6 +137,8 @@ public class TransactionalContainerTests {
136137

137138
public static final String topic9 = "txTopic9";
138139

140+
public static final String topic10 = "txTopic10";
141+
139142
private static EmbeddedKafkaBroker embeddedKafka;
140143

141144
@BeforeAll
@@ -1082,4 +1085,67 @@ void testNoAfterRollbackWhenFenced() throws Exception {
10821085
assertThatIllegalStateException().isThrownBy(container::start);
10831086
}
10841087

1088+
1089+
@Test
1090+
void testArbpWithoutRecovery() throws InterruptedException {
1091+
// init producer
1092+
Map<String, Object> producerProperties = KafkaTestUtils.producerProps(embeddedKafka);
1093+
DefaultKafkaProducerFactory<Object, Object> pf = new DefaultKafkaProducerFactory<>(producerProperties);
1094+
pf.setTransactionIdPrefix("testArbpResetWithoutRecover.batchListener");
1095+
final KafkaTemplate<Object, Object> template = new KafkaTemplate<>(pf);
1096+
// init consumer
1097+
String group = "groupInARBP3";
1098+
Map<String, Object> consumerProperties = KafkaTestUtils.consumerProps(group, "false", embeddedKafka);
1099+
consumerProperties.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
1100+
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(consumerProperties);
1101+
ContainerProperties containerProps = new ContainerProperties(topic10);
1102+
containerProps.setPollTimeout(10_000);
1103+
containerProps.setBatchRecoverAfterRollback(false); // we want to test the behavior if recovery is disabled
1104+
final var successLatch = new AtomicReference<>(new CountDownLatch(2));
1105+
containerProps.setMessageListener(new BatchMessageListener<Integer, String>() {
1106+
private int attempt = 0;
1107+
1108+
@Override
1109+
public void onMessage(List<ConsumerRecord<Integer, String>> data) {
1110+
if (3 > attempt++) { // the first three attempts should fail
1111+
throw new BatchListenerFailedException("fail for test", data.get(0));
1112+
}
1113+
data.forEach(d -> successLatch.get().countDown());
1114+
}
1115+
});
1116+
// init container
1117+
KafkaTransactionManager<Object, Object> tm = new KafkaTransactionManager<>(pf);
1118+
containerProps.setKafkaAwareTransactionManager(tm);
1119+
KafkaMessageListenerContainer<Integer, String> container = new KafkaMessageListenerContainer<>(cf, containerProps);
1120+
container.setBeanName("testArbpWithoutRecover");
1121+
DefaultAfterRollbackProcessor<Object, Object> afterRollbackProcessor = spy(
1122+
new DefaultAfterRollbackProcessor<>(new FixedBackOff(0L, FixedBackOff.UNLIMITED_ATTEMPTS))
1123+
);
1124+
container.setAfterRollbackProcessor(afterRollbackProcessor);
1125+
container.start();
1126+
1127+
// process first batch
1128+
template.executeInTransaction(t -> {
1129+
template.send(new ProducerRecord<>(topic10, 0, 0, "bar1"));
1130+
template.send(new ProducerRecord<>(topic10, 0, 0, "bar2"));
1131+
return null;
1132+
});
1133+
assertThat(successLatch.get().await(30, TimeUnit.SECONDS)).isTrue(); // wait for first batch
1134+
1135+
// process second batch
1136+
successLatch.set(new CountDownLatch(2)); // reset latch
1137+
template.executeInTransaction(t -> {
1138+
template.send(new ProducerRecord<>(topic10, 0, 0, "bar4"));
1139+
template.send(new ProducerRecord<>(topic10, 0, 0, "bar5"));
1140+
return null;
1141+
});
1142+
assertThat(successLatch.get().await(30, TimeUnit.SECONDS)).isTrue(); // wait for second batch
1143+
1144+
// assert three processBatch calls due to the failed attempts + one call to clearThreadState
1145+
verify(afterRollbackProcessor, times(3)).processBatch(any(), any(), any(), any(), any(), anyBoolean(), any());
1146+
verify(afterRollbackProcessor).clearThreadState();
1147+
1148+
container.stop();
1149+
}
1150+
10851151
}

0 commit comments

Comments
 (0)