Skip to content

Commit c37a616

Browse files
committed
GH-2588: support batch recoverable DefaultAfterRollbackProcessor
deprecated transactionManager and add KafkaAwareTransactionManager at ContainerProperties, after remove transactionManager, change `KafkaMessageListenerContainer#transactionManager` type to PlatformTransactionManager * modify toString in `ContainerProperties`
1 parent 628b5da commit c37a616

11 files changed

+138
-26
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerProperties.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.springframework.core.task.AsyncTaskExecutor;
3636
import org.springframework.kafka.support.TopicPartitionOffset;
3737
import org.springframework.kafka.support.micrometer.KafkaListenerObservationConvention;
38+
import org.springframework.kafka.transaction.KafkaAwareTransactionManager;
3839
import org.springframework.lang.Nullable;
3940
import org.springframework.scheduling.TaskScheduler;
4041
import org.springframework.transaction.PlatformTransactionManager;
@@ -256,8 +257,11 @@ public enum EOSMode {
256257

257258
private double idleBeforeDataMultiplier = DEFAULT_IDLE_BEFORE_DATA_MULTIPLIER;
258259

260+
@Deprecated(since = "3.1")
259261
private PlatformTransactionManager transactionManager;
260262

263+
private KafkaAwareTransactionManager<?, ?> kafkaAwareTransactionManager;
264+
261265
private boolean batchRecoverAfterRollback = false;
262266

263267
private int monitorInterval = DEFAULT_MONITOR_INTERVAL;
@@ -524,6 +528,7 @@ public Long getIdlePartitionEventInterval() {
524528
return this.idlePartitionEventInterval;
525529
}
526530

531+
@Deprecated(since = "3.1")
527532
@Nullable
528533
public PlatformTransactionManager getTransactionManager() {
529534
return this.transactionManager;
@@ -541,10 +546,26 @@ public PlatformTransactionManager getTransactionManager() {
541546
* @since 1.3
542547
* @see #setAckMode(AckMode)
543548
*/
549+
@Deprecated(since = "3.1")
544550
public void setTransactionManager(@Nullable PlatformTransactionManager transactionManager) {
545551
this.transactionManager = transactionManager;
546552
}
547553

554+
@Nullable
555+
public KafkaAwareTransactionManager<?, ?> getKafkaAwareTransactionManager() {
556+
return this.kafkaAwareTransactionManager;
557+
}
558+
559+
/**
560+
* Set the transaction manager to start a transaction; replace {@link #setTransactionManager}.
561+
* @param kafkaAwareTransactionManager the transaction manager.
562+
* @since 3.1
563+
*/
564+
public void setKafkaAwareTransactionManager(@Nullable KafkaAwareTransactionManager<?, ?> kafkaAwareTransactionManager) {
565+
this.kafkaAwareTransactionManager = kafkaAwareTransactionManager;
566+
}
567+
568+
548569
public boolean isBatchRecoverAfterRollback() {
549570
return this.batchRecoverAfterRollback;
550571
}
@@ -1060,6 +1081,9 @@ public String toString() {
10601081
+ (this.transactionManager != null
10611082
? "\n transactionManager=" + this.transactionManager
10621083
: "")
1084+
+ (this.kafkaAwareTransactionManager != null
1085+
? "\n kafkaAwareTransactionManager=" + this.kafkaAwareTransactionManager
1086+
: "")
10631087
+ "\n monitorInterval=" + this.monitorInterval
10641088
+ (this.scheduler != null ? "\n scheduler=" + this.scheduler : "")
10651089
+ "\n noPollThreshold=" + this.noPollThreshold

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -683,7 +683,11 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
683683

684684
private final CommonErrorHandler commonErrorHandler;
685685

686-
private final PlatformTransactionManager transactionManager = this.containerProperties.getTransactionManager();
686+
@SuppressWarnings("deprecation")
687+
private final PlatformTransactionManager transactionManager =
688+
this.containerProperties.getKafkaAwareTransactionManager() != null ?
689+
this.containerProperties.getKafkaAwareTransactionManager() :
690+
this.containerProperties.getTransactionManager();
687691

688692
@SuppressWarnings(RAWTYPES)
689693
private final KafkaAwareTransactionManager kafkaTxManager =
@@ -2993,7 +2997,6 @@ private void sendOffsetsToTransaction() {
29932997
doSendOffsets(this.producer, commits);
29942998
}
29952999

2996-
@SuppressWarnings("deprecation")
29973000
private void doSendOffsets(Producer<?, ?> prod, Map<TopicPartition, OffsetAndMetadata> commits) {
29983001
prod.sendOffsetsToTransaction(commits, this.consumer.groupMetadata());
29993002
if (this.fixTxOffsets) {

spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaConsumerFactoryTests.java

Lines changed: 50 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -327,7 +327,7 @@ protected KafkaConsumer<String, String> createKafkaConsumer(Map<String, Object>
327327
assertThat(configPassedToKafkaConsumer.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG)).isEqualTo("2");
328328
}
329329

330-
@SuppressWarnings("unchecked")
330+
@SuppressWarnings({"unchecked", "deprecation"})
331331
@Test
332332
public void testNestedTxProducerIsCached() throws Exception {
333333
Map<String, Object> producerProps = KafkaTestUtils.producerProps(this.embeddedKafka);
@@ -376,6 +376,55 @@ public void testNestedTxProducerIsCached() throws Exception {
376376
}
377377
}
378378

379+
@SuppressWarnings("unchecked")
380+
@Test
381+
public void testNestedTxProducerIsCachedByKWTM() throws Exception {
382+
Map<String, Object> producerProps = KafkaTestUtils.producerProps(this.embeddedKafka);
383+
DefaultKafkaProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(producerProps);
384+
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
385+
DefaultKafkaProducerFactory<Integer, String> pfTx = new DefaultKafkaProducerFactory<>(producerProps);
386+
pfTx.setTransactionIdPrefix("fooTx.");
387+
KafkaOperations<Integer, String> templateTx = new KafkaTemplate<>(pfTx);
388+
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("txCache1Group", "false", this.embeddedKafka);
389+
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
390+
AtomicReference<Consumer<Integer, String>> wrapped = new AtomicReference<>();
391+
cf.addPostProcessor(consumer -> {
392+
ProxyFactory prox = new ProxyFactory();
393+
prox.setTarget(consumer);
394+
@SuppressWarnings("unchecked")
395+
Consumer<Integer, String> proxy = (Consumer<Integer, String>) prox.getProxy();
396+
wrapped.set(proxy);
397+
return proxy;
398+
});
399+
ContainerProperties containerProps = new ContainerProperties("txCache1");
400+
CountDownLatch latch = new CountDownLatch(1);
401+
containerProps.setMessageListener((MessageListener<Integer, String>) r -> {
402+
templateTx.executeInTransaction(t -> t.send("txCacheSendFromListener", "bar"));
403+
templateTx.executeInTransaction(t -> t.send("txCacheSendFromListener", "baz"));
404+
latch.countDown();
405+
});
406+
KafkaTransactionManager<Integer, String> tm = new KafkaTransactionManager<>(pfTx);
407+
containerProps.setKafkaAwareTransactionManager(tm);
408+
KafkaMessageListenerContainer<Integer, String> container = new KafkaMessageListenerContainer<>(cf,
409+
containerProps);
410+
container.start();
411+
try {
412+
CompletableFuture<SendResult<Integer, String>> future = template.send("txCache1", "foo");
413+
future.get(10, TimeUnit.SECONDS);
414+
pf.getCache();
415+
assertThat(KafkaTestUtils.getPropertyValue(pf, "cache", Map.class)).hasSize(0);
416+
assertThat(latch.await(30, TimeUnit.SECONDS)).isTrue();
417+
assertThat(KafkaTestUtils.getPropertyValue(pfTx, "cache", Map.class)).hasSize(1);
418+
assertThat(pfTx.getCache()).hasSize(1);
419+
assertThat(KafkaTestUtils.getPropertyValue(container, "listenerConsumer.consumer")).isSameAs(wrapped.get());
420+
}
421+
finally {
422+
container.stop();
423+
pf.destroy();
424+
pfTx.destroy();
425+
}
426+
}
427+
379428
@SuppressWarnings({ "rawtypes", "unchecked" })
380429
@Test
381430
void listener() {

spring-kafka/src/test/java/org/springframework/kafka/listener/CommitOnAssignmentTests.java

Lines changed: 39 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2020-2021 the original author or authors.
2+
* Copyright 2020-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -105,7 +105,7 @@ void testLastOnly() throws InterruptedException {
105105
assertThat(this.config.commitLatch.await(10, TimeUnit.SECONDS)).isTrue();
106106
}
107107

108-
@SuppressWarnings({ "rawtypes", "unchecked" })
108+
@SuppressWarnings({ "rawtypes", "unchecked", "deprecation" })
109109
@Test
110110
void testLatestOnlyTx() throws InterruptedException {
111111
ContainerProperties props = this.registry.getListenerContainer("foo").getContainerProperties();
@@ -125,7 +125,7 @@ void testLatestOnlyTx() throws InterruptedException {
125125
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
126126
}
127127

128-
@SuppressWarnings({ "rawtypes", "unchecked" })
128+
@SuppressWarnings({ "rawtypes", "unchecked", "deprecation" })
129129
@Test
130130
void testLatestOnlyNoTx() throws InterruptedException {
131131
ContainerProperties props = this.registry.getListenerContainer("foo").getContainerProperties();
@@ -135,13 +135,48 @@ void testLatestOnlyNoTx() throws InterruptedException {
135135
KafkaTransactionManager tm = new KafkaTransactionManager<>(pf);
136136
Producer producer = mock(Producer.class);
137137
given(pf.createProducer(any())).willReturn(producer);
138-
CountDownLatch latch = new CountDownLatch(1);
139138
props.setTransactionManager(tm);
140139
this.registry.start();
141140
assertThat(this.config.commitLatch.await(10, TimeUnit.SECONDS)).isTrue();
142141
verify(producer, never()).sendOffsetsToTransaction(any(), any(ConsumerGroupMetadata.class));
143142
}
144143

144+
@SuppressWarnings({ "rawtypes", "unchecked" })
145+
@Test
146+
void testLatestOnlyTxByKATM() throws InterruptedException {
147+
ContainerProperties props = this.registry.getListenerContainer("foo").getContainerProperties();
148+
props.setAssignmentCommitOption(AssignmentCommitOption.LATEST_ONLY);
149+
ProducerFactory pf = mock(ProducerFactory.class, withSettings().verboseLogging());
150+
given(pf.transactionCapable()).willReturn(true);
151+
KafkaTransactionManager tm = new KafkaTransactionManager<>(pf);
152+
Producer producer = mock(Producer.class);
153+
given(pf.createProducer(any())).willReturn(producer);
154+
CountDownLatch latch = new CountDownLatch(1);
155+
willAnswer(inv -> {
156+
latch.countDown();
157+
return null;
158+
}).given(producer).sendOffsetsToTransaction(any(), any(ConsumerGroupMetadata.class));
159+
props.setKafkaAwareTransactionManager(tm);
160+
this.registry.start();
161+
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
162+
}
163+
164+
@SuppressWarnings({ "rawtypes", "unchecked" })
165+
@Test
166+
void testLatestOnlyNoTxByKATM() throws InterruptedException {
167+
ContainerProperties props = this.registry.getListenerContainer("foo").getContainerProperties();
168+
props.setAssignmentCommitOption(AssignmentCommitOption.LATEST_ONLY_NO_TX);
169+
ProducerFactory pf = mock(ProducerFactory.class, withSettings().verboseLogging());
170+
given(pf.transactionCapable()).willReturn(true);
171+
KafkaTransactionManager tm = new KafkaTransactionManager<>(pf);
172+
Producer producer = mock(Producer.class);
173+
given(pf.createProducer(any())).willReturn(producer);
174+
props.setKafkaAwareTransactionManager(tm);
175+
this.registry.start();
176+
assertThat(this.config.commitLatch.await(10, TimeUnit.SECONDS)).isTrue();
177+
verify(producer, never()).sendOffsetsToTransaction(any(), any(ConsumerGroupMetadata.class));
178+
}
179+
145180
@Configuration
146181
@EnableKafka
147182
public static class Config {

spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerMockTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -507,7 +507,7 @@ void testBatchInterceptBeforeTx1() throws InterruptedException {
507507
testIntercept(true, null, true);
508508
}
509509

510-
@SuppressWarnings({ "rawtypes", "unchecked" })
510+
@SuppressWarnings({ "rawtypes", "unchecked", "deprecation" })
511511
void testIntercept(boolean beforeTx, AssignmentCommitOption option, boolean batch) throws InterruptedException {
512512
ConsumerFactory consumerFactory = mock(ConsumerFactory.class);
513513
final Consumer consumer = mock(Consumer.class);
@@ -662,7 +662,7 @@ public void failure(ConsumerRecords records, Exception exception, Consumer consu
662662
}
663663

664664
@Test
665-
@SuppressWarnings({ "rawtypes", "unchecked" })
665+
@SuppressWarnings({ "rawtypes", "unchecked", "deprecation" })
666666
void testInterceptInTxNonKafkaTM() throws InterruptedException {
667667
ConsumerFactory consumerFactory = mock(ConsumerFactory.class);
668668
final Consumer consumer = mock(Consumer.class);

spring-kafka/src/test/java/org/springframework/kafka/listener/SeekToCurrentOnErrorBatchModeTXTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2021 the original author or authors.
2+
* Copyright 2017-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -240,7 +240,7 @@ public Consumer consumer() {
240240
return consumer;
241241
}
242242

243-
@SuppressWarnings({ "rawtypes", "unchecked" })
243+
@SuppressWarnings({ "rawtypes", "unchecked", "deprecation" })
244244
@Bean
245245
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
246246
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();

spring-kafka/src/test/java/org/springframework/kafka/listener/SeekToCurrentOnErrorRecordModeTXTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2021 the original author or authors.
2+
* Copyright 2017-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -241,7 +241,7 @@ public Consumer consumer() {
241241
return consumer;
242242
}
243243

244-
@SuppressWarnings({ "rawtypes", "unchecked" })
244+
@SuppressWarnings({ "rawtypes", "unchecked", "deprecation" })
245245
@Bean
246246
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
247247
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();

spring-kafka/src/test/java/org/springframework/kafka/listener/SubBatchPerPartitionTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2022 the original author or authors.
2+
* Copyright 2017-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -132,6 +132,7 @@ void withFilter() throws Exception {
132132
this.registry.stop();
133133
}
134134

135+
@SuppressWarnings("deprecation")
135136
@Test
136137
void defaults() {
137138
Map<String, Object> props = KafkaTestUtils.consumerProps("sbpp", "false", this.broker);

spring-kafka/src/test/java/org/springframework/kafka/listener/SubBatchPerPartitionTxRollbackTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2022 the original author or authors.
2+
* Copyright 2017-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -214,7 +214,7 @@ public Consumer consumer() {
214214
return consumer;
215215
}
216216

217-
@SuppressWarnings({ "rawtypes", "unchecked" })
217+
@SuppressWarnings({ "rawtypes", "unchecked", "deprecation" })
218218
@Bean
219219
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
220220
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();

spring-kafka/src/test/java/org/springframework/kafka/listener/SubBatchPerPartitionTxTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2022 the original author or authors.
2+
* Copyright 2017-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -200,7 +200,7 @@ public Consumer consumer() {
200200
return consumer;
201201
}
202202

203-
@SuppressWarnings({ "rawtypes", "unchecked" })
203+
@SuppressWarnings({ "rawtypes", "unchecked", "deprecation" })
204204
@Bean
205205
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
206206
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();

spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -299,7 +299,7 @@ private void testConsumeAndProduceTransactionGuts(boolean handleError, AckMode a
299299
assertThat(stopEvent.get().getSource()).isSameAs(container);
300300
}
301301

302-
@SuppressWarnings({ "rawtypes", "unchecked" })
302+
@SuppressWarnings({ "rawtypes", "unchecked", "deprecation" })
303303
@Test
304304
public void testConsumeAndProduceTransactionRollback() throws Exception {
305305
Consumer consumer = mock(Consumer.class);
@@ -370,7 +370,7 @@ public void testConsumeAndProduceTransactionRollback() throws Exception {
370370
assertThat(delivery.get()).isNotNull();
371371
}
372372

373-
@SuppressWarnings({ "rawtypes", "unchecked" })
373+
@SuppressWarnings({ "rawtypes", "unchecked", "deprecation" })
374374
@Test
375375
public void testConsumeAndProduceTransactionRollbackBatch() throws Exception {
376376
Consumer consumer = mock(Consumer.class);
@@ -619,7 +619,7 @@ public void testFixLagOtherTM() throws InterruptedException {
619619
testFixLagGuts(topic7, 2);
620620
}
621621

622-
@SuppressWarnings("unchecked")
622+
@SuppressWarnings({ "unchecked", "deprecation" })
623623
private void testFixLagGuts(String topic, int whichTm) throws InterruptedException {
624624
logger.info("Start testFixLag");
625625
Map<String, Object> props = KafkaTestUtils.consumerProps("txTest2", "false", embeddedKafka);
@@ -676,7 +676,7 @@ private void testFixLagGuts(String topic, int whichTm) throws InterruptedExcepti
676676
pf.destroy();
677677
}
678678

679-
@SuppressWarnings({ "unchecked"})
679+
@SuppressWarnings({ "unchecked", "deprecation" })
680680
@Test
681681
public void testMaxFailures() throws Exception {
682682
logger.info("Start testMaxFailures");
@@ -786,7 +786,7 @@ public void accept(ConsumerRecord<?, ?> record, Consumer<?, ?> consumer, Excepti
786786
logger.info("Stop testMaxAttempts");
787787
}
788788

789-
@SuppressWarnings({ "unchecked"})
789+
@SuppressWarnings({ "unchecked", "deprecation" })
790790
@Test
791791
public void testBatchListenerMaxFailuresOnRecover() throws Exception {
792792

@@ -908,7 +908,7 @@ public void accept(ConsumerRecord<?, ?> record, Consumer<?, ?> consumer, Excepti
908908
logger.info("Stop testBatchListenerMaxFailures");
909909
}
910910

911-
@SuppressWarnings("unchecked")
911+
@SuppressWarnings({ "unchecked", "deprecation" })
912912
@Test
913913
public void testRollbackProcessorCrash() throws Exception {
914914
logger.info("Start testRollbackNoRetries");
@@ -971,7 +971,7 @@ public void testRollbackProcessorCrash() throws Exception {
971971
logger.info("Stop testRollbackNoRetries");
972972
}
973973

974-
@SuppressWarnings("unchecked")
974+
@SuppressWarnings({ "unchecked", "deprecation" })
975975
@Test
976976
public void testBatchListenerRecoverAfterRollbackProcessorCrash() throws Exception {
977977
logger.info("Start testBatchListenerRollbackNoRetries");
@@ -999,7 +999,7 @@ public void testBatchListenerRecoverAfterRollbackProcessorCrash() throws Excepti
999999
}
10001000
});
10011001

1002-
@SuppressWarnings({ "rawtypes" })
1002+
@SuppressWarnings({ "rawtypes"})
10031003
KafkaTransactionManager tm = new KafkaTransactionManager(pf);
10041004
containerProps.setTransactionManager(tm);
10051005
KafkaMessageListenerContainer<Integer, String> container =

0 commit comments

Comments
 (0)