Skip to content

Commit cd4341c

Browse files
Wzy19930507sobychacko
authored andcommitted
GH-2775: Deprecate CP#transactionManager property
Fixes: #2775 * Use the more specific `KafkaAwareTransactionManager` type instead of the `PlatfromTransactionManager` to properly manage Kafka transactions. * Deprecate transactionManager property in `ContainerProperties` and add 'kafkaAwareTransationManager` as a new property. * Modify transaction unit tests to `setKafkaAwareTransactionManager`.
1 parent a96e08b commit cd4341c

15 files changed

+81
-52
lines changed

spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/container-props.adoc

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,10 @@ Also see `idleBeforeDataMultiplier`.
123123
|None
124124
|Used to override any arbitrary consumer properties configured on the consumer factory.
125125

126+
|[[kafkaAwareTransactionManager]]<<kafkaAwareTransactionManager,`kafkaAwareTransactionManager`>>
127+
|`null`
128+
|See xref:kafka/transactions.adoc[Transactions].
129+
126130
|[[listenerTaskExecutor]]<<listenerTaskExecutor,`listenerTaskExecutor`>>
127131
|`SimpleAsyncTaskExecutor`
128132
|A task executor to run the consumer threads.
@@ -232,7 +236,7 @@ Mutually exclusive; at least one must be provided; enforced by `ContainerPropert
232236

233237
|[[transactionManager]]<<transactionManager,`transactionManager`>>
234238
|`null`
235-
|See xref:kafka/transactions.adoc[Transactions].
239+
|Deprecated since 3.2, see <<kafkaAwareTransactionManager>>, xref:kafka/transactions.adoc#transaction-synchronization[Other transaction managers].
236240
|===
237241

238242
[[alc-props]]

spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,11 @@ Rules for the redirection are set either via the `RetryableTopic.exceptionBasedD
3636
Custom DLTs are created automatically as well as other retry and dead-letter topics.
3737
See xref:retrytopic/features.adoc#exc-based-custom-dlt-routing[Routing of messages to custom DLTs based on thrown exceptions] for more information.
3838

39+
[[x32-cp-ptm]]
40+
=== Deprecating ContainerProperties transactionManager property
41+
42+
Deprecating the `transactionManager` property in `ContainerProperties` in favor of `KafkaAwareTransactionManager`, a narrower type compared to the general `PlatformTransactionManager`. See xref:kafka/container-props.adoc#kafkaAwareTransactionManager[ContainerProperties] and xref:kafka/transactions.adoc#transaction-synchronization[Transaction Synchronization].
43+
3944
[[x32-after-rollback-processing]]
4045
=== After Rollback Processing
4146

spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerProperties.java

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2023 the original author or authors.
2+
* Copyright 2016-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -35,6 +35,7 @@
3535
import org.springframework.core.task.AsyncTaskExecutor;
3636
import org.springframework.kafka.support.TopicPartitionOffset;
3737
import org.springframework.kafka.support.micrometer.KafkaListenerObservationConvention;
38+
import org.springframework.kafka.transaction.KafkaAwareTransactionManager;
3839
import org.springframework.lang.Nullable;
3940
import org.springframework.scheduling.TaskScheduler;
4041
import org.springframework.transaction.PlatformTransactionManager;
@@ -257,8 +258,11 @@ public enum EOSMode {
257258

258259
private double idleBeforeDataMultiplier = DEFAULT_IDLE_BEFORE_DATA_MULTIPLIER;
259260

261+
@Deprecated(since = "3.2")
260262
private PlatformTransactionManager transactionManager;
261263

264+
private KafkaAwareTransactionManager<?, ?> kafkaAwareTransactionManager;
265+
262266
private boolean batchRecoverAfterRollback = false;
263267

264268
private int monitorInterval = DEFAULT_MONITOR_INTERVAL;
@@ -371,7 +375,7 @@ public void setMessageListener(Object messageListener) {
371375
* calling thread and sometimes not.</li>
372376
* </ul>
373377
* @param ackMode the {@link AckMode}; default BATCH.
374-
* @see #setTransactionManager(PlatformTransactionManager)
378+
* @see #setKafkaAwareTransactionManager(KafkaAwareTransactionManager)
375379
*/
376380
public void setAckMode(AckMode ackMode) {
377381
Assert.notNull(ackMode, "'ackMode' cannot be null");
@@ -525,6 +529,7 @@ public Long getIdlePartitionEventInterval() {
525529
return this.idlePartitionEventInterval;
526530
}
527531

532+
@Deprecated(since = "3.2", forRemoval = true)
528533
@Nullable
529534
public PlatformTransactionManager getTransactionManager() {
530535
return this.transactionManager;
@@ -542,10 +547,25 @@ public PlatformTransactionManager getTransactionManager() {
542547
* @since 1.3
543548
* @see #setAckMode(AckMode)
544549
*/
550+
@Deprecated(since = "3.2", forRemoval = true)
545551
public void setTransactionManager(@Nullable PlatformTransactionManager transactionManager) {
546552
this.transactionManager = transactionManager;
547553
}
548554

555+
@Nullable
556+
public KafkaAwareTransactionManager<?, ?> getKafkaAwareTransactionManager() {
557+
return this.kafkaAwareTransactionManager;
558+
}
559+
560+
/**
561+
* Set the transaction manager to start a transaction; replace {@link #setTransactionManager}.
562+
* @param kafkaAwareTransactionManager the transaction manager.
563+
* @since 3.2
564+
*/
565+
public void setKafkaAwareTransactionManager(@Nullable KafkaAwareTransactionManager<?, ?> kafkaAwareTransactionManager) {
566+
this.kafkaAwareTransactionManager = kafkaAwareTransactionManager;
567+
}
568+
549569
/**
550570
* Recover batch records after rollback if true.
551571
* @return true to recover.
@@ -857,16 +877,16 @@ public TransactionDefinition getTransactionDefinition() {
857877
/**
858878
* Set a transaction definition with properties (e.g. timeout) that will be copied to
859879
* the container's transaction template. Note that this is only generally useful when
860-
* used with a {@link #setTransactionManager(PlatformTransactionManager)
861-
* PlatformTransactionManager} that supports a custom definition; this does NOT
880+
* used with a {@link #setKafkaAwareTransactionManager(KafkaAwareTransactionManager)
881+
* KafkaAwareTransactionManager} that supports a custom definition; this does NOT
862882
* include the {@link org.springframework.kafka.transaction.KafkaTransactionManager}
863883
* which has no concept of transaction timeout. It can be useful to start, for example
864884
* a database transaction, in the container, rather than using {@code @Transactional}
865885
* on the listener, because then a record interceptor, or filter in a listener adapter
866886
* can participate in the transaction.
867887
* @param transactionDefinition the definition.
868888
* @since 2.5.4
869-
* @see #setTransactionManager(PlatformTransactionManager)
889+
* @see #setKafkaAwareTransactionManager(KafkaAwareTransactionManager)
870890
*/
871891
public void setTransactionDefinition(@Nullable TransactionDefinition transactionDefinition) {
872892
this.transactionDefinition = transactionDefinition;
@@ -1072,6 +1092,9 @@ public String toString() {
10721092
+ (this.transactionManager != null
10731093
? "\n transactionManager=" + this.transactionManager
10741094
: "")
1095+
+ (this.kafkaAwareTransactionManager != null
1096+
? "\n kafkaAwareTransactionManager=" + this.kafkaAwareTransactionManager
1097+
: "")
10751098
+ "\n monitorInterval=" + this.monitorInterval
10761099
+ (this.scheduler != null ? "\n scheduler=" + this.scheduler : "")
10771100
+ "\n noPollThreshold=" + this.noPollThreshold

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -696,12 +696,16 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
696696

697697
private final CommonErrorHandler commonErrorHandler;
698698

699-
private final PlatformTransactionManager transactionManager = this.containerProperties.getTransactionManager();
699+
@Deprecated(since = "3.2", forRemoval = true)
700+
@SuppressWarnings("removal")
701+
private final PlatformTransactionManager transactionManager =
702+
this.containerProperties.getKafkaAwareTransactionManager() != null ?
703+
this.containerProperties.getKafkaAwareTransactionManager() :
704+
this.containerProperties.getTransactionManager();
700705

701-
@SuppressWarnings(RAWTYPES)
702-
private final KafkaAwareTransactionManager kafkaTxManager =
703-
this.transactionManager instanceof KafkaAwareTransactionManager
704-
? ((KafkaAwareTransactionManager) this.transactionManager) : null;
706+
private final KafkaAwareTransactionManager<?, ?> kafkaTxManager =
707+
this.transactionManager instanceof KafkaAwareTransactionManager<?, ?> kafkaAwareTransactionManager ?
708+
kafkaAwareTransactionManager : null;
705709

706710
private final TransactionTemplate transactionTemplate;
707711

@@ -3034,7 +3038,6 @@ private void sendOffsetsToTransaction() {
30343038
doSendOffsets(this.producer, commits);
30353039
}
30363040

3037-
@SuppressWarnings("deprecation")
30383041
private void doSendOffsets(Producer<?, ?> prod, Map<TopicPartition, OffsetAndMetadata> commits) {
30393042
prod.sendOffsetsToTransaction(commits, this.consumer.groupMetadata());
30403043
if (this.fixTxOffsets) {

spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaConsumerFactoryTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -355,7 +355,7 @@ public void testNestedTxProducerIsCached() throws Exception {
355355
latch.countDown();
356356
});
357357
KafkaTransactionManager<Integer, String> tm = new KafkaTransactionManager<>(pfTx);
358-
containerProps.setTransactionManager(tm);
358+
containerProps.setKafkaAwareTransactionManager(tm);
359359
KafkaMessageListenerContainer<Integer, String> container = new KafkaMessageListenerContainer<>(cf,
360360
containerProps);
361361
container.start();
@@ -406,7 +406,7 @@ public void testNestedTxProducerIsFixed() throws Exception {
406406
latch.countDown();
407407
});
408408
KafkaTransactionManager<Integer, String> tm = new KafkaTransactionManager<>(pfTx);
409-
containerProps.setTransactionManager(tm);
409+
containerProps.setKafkaAwareTransactionManager(tm);
410410
KafkaMessageListenerContainer<Integer, String> container = new KafkaMessageListenerContainer<>(cf,
411411
containerProps);
412412
container.start();

spring-kafka/src/test/java/org/springframework/kafka/listener/CommitOnAssignmentTests.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2020-2021 the original author or authors.
2+
* Copyright 2020-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -120,7 +120,7 @@ void testLatestOnlyTx() throws InterruptedException {
120120
latch.countDown();
121121
return null;
122122
}).given(producer).sendOffsetsToTransaction(any(), any(ConsumerGroupMetadata.class));
123-
props.setTransactionManager(tm);
123+
props.setKafkaAwareTransactionManager(tm);
124124
this.registry.start();
125125
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
126126
}
@@ -135,8 +135,7 @@ void testLatestOnlyNoTx() throws InterruptedException {
135135
KafkaTransactionManager tm = new KafkaTransactionManager<>(pf);
136136
Producer producer = mock(Producer.class);
137137
given(pf.createProducer(any())).willReturn(producer);
138-
CountDownLatch latch = new CountDownLatch(1);
139-
props.setTransactionManager(tm);
138+
props.setKafkaAwareTransactionManager(tm);
140139
this.registry.start();
141140
assertThat(this.config.commitLatch.await(10, TimeUnit.SECONDS)).isTrue();
142141
verify(producer, never()).sendOffsetsToTransaction(any(), any(ConsumerGroupMetadata.class));

spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerMockTests.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -561,7 +561,7 @@ void testIntercept(boolean beforeTx, @Nullable AssignmentCommitOption option, bo
561561
given(tm.getProducerFactory()).willReturn(pf);
562562
Producer producer = mock(Producer.class);
563563
given(pf.createProducer()).willReturn(producer);
564-
containerProperties.setTransactionManager(tm);
564+
containerProperties.setKafkaAwareTransactionManager(tm);
565565
List<String> order = new ArrayList<>();
566566
CountDownLatch latch = new CountDownLatch(option == null ? 2 : 3);
567567
willAnswer(inv -> {
@@ -688,7 +688,6 @@ void testInterceptInTxNonKafkaTM() throws InterruptedException {
688688
.willReturn(consumer);
689689
ContainerProperties containerProperties = new ContainerProperties("foo");
690690
containerProperties.setGroupId("grp");
691-
AtomicReference<List<ConsumerRecord<String, String>>> received = new AtomicReference<>();
692691
containerProperties.setMessageListener((MessageListener) rec -> {
693692
});
694693
containerProperties.setMissingTopicsFatal(false);

spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3951,7 +3951,7 @@ public ConsumerRecord<Integer, String> intercept(ConsumerRecord<Integer, String>
39513951

39523952
@ParameterizedTest(name = "{index} testInvokeBatchInterceptorAllSkipped early intercept {0}")
39533953
@ValueSource(booleans = { true, false })
3954-
@SuppressWarnings({"unchecked"})
3954+
@SuppressWarnings({ "unchecked", "deprecation" })
39553955
public void testInvokeBatchInterceptorAllSkipped(boolean early) throws Exception {
39563956
ConsumerFactory<Integer, String> cf = mock(ConsumerFactory.class);
39573957
Consumer<Integer, String> consumer = mock(Consumer.class);

spring-kafka/src/test/java/org/springframework/kafka/listener/ManualNackBatchTxTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2022 the original author or authors.
2+
* Copyright 2017-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -279,7 +279,7 @@ public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
279279
factory.setConsumerFactory(consumerFactory());
280280
factory.getContainerProperties().setAckMode(AckMode.MANUAL);
281281
factory.getContainerProperties().setMissingTopicsFatal(false);
282-
factory.getContainerProperties().setTransactionManager(tm());
282+
factory.getContainerProperties().setKafkaAwareTransactionManager(tm());
283283
factory.setBatchListener(true);
284284
return factory;
285285
}

spring-kafka/src/test/java/org/springframework/kafka/listener/SeekToCurrentOnErrorBatchModeTXTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2021 the original author or authors.
2+
* Copyright 2017-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -247,7 +247,7 @@ public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
247247
factory.setConsumerFactory(consumerFactory());
248248
factory.setCommonErrorHandler(new DefaultErrorHandler());
249249
factory.getContainerProperties().setAckMode(AckMode.BATCH);
250-
factory.getContainerProperties().setTransactionManager(tm());
250+
factory.getContainerProperties().setKafkaAwareTransactionManager(tm());
251251
return factory;
252252
}
253253

spring-kafka/src/test/java/org/springframework/kafka/listener/SeekToCurrentOnErrorRecordModeTXTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2021 the original author or authors.
2+
* Copyright 2017-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -248,7 +248,7 @@ public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
248248
factory.setConsumerFactory(consumerFactory());
249249
factory.setCommonErrorHandler(new DefaultErrorHandler());
250250
factory.getContainerProperties().setAckMode(AckMode.RECORD);
251-
factory.getContainerProperties().setTransactionManager(tm());
251+
factory.getContainerProperties().setKafkaAwareTransactionManager(tm());
252252
return factory;
253253
}
254254

spring-kafka/src/test/java/org/springframework/kafka/listener/SubBatchPerPartitionTests.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2022 the original author or authors.
2+
* Copyright 2017-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -62,10 +62,10 @@
6262
import org.springframework.kafka.test.EmbeddedKafkaBroker;
6363
import org.springframework.kafka.test.context.EmbeddedKafka;
6464
import org.springframework.kafka.test.utils.KafkaTestUtils;
65+
import org.springframework.kafka.transaction.KafkaAwareTransactionManager;
6566
import org.springframework.test.annotation.DirtiesContext;
6667
import org.springframework.test.annotation.DirtiesContext.ClassMode;
6768
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
68-
import org.springframework.transaction.PlatformTransactionManager;
6969

7070
/**
7171
* @author Gary Russell
@@ -156,7 +156,7 @@ void defaults() {
156156

157157
containerProps = new ContainerProperties("sbpp");
158158
containerProps.setMessageListener(mock(MessageListener.class));
159-
containerProps.setTransactionManager(mock(PlatformTransactionManager.class));
159+
containerProps.setKafkaAwareTransactionManager(mock(KafkaAwareTransactionManager.class));
160160
container = new KafkaMessageListenerContainer<>(cf, containerProps);
161161
container.start();
162162
assertThat(KafkaTestUtils.getPropertyValue(container, "listenerConsumer.subBatchPerPartition"))

spring-kafka/src/test/java/org/springframework/kafka/listener/SubBatchPerPartitionTxRollbackTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2022 the original author or authors.
2+
* Copyright 2017-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -220,7 +220,7 @@ public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
220220
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
221221
factory.setConsumerFactory(consumerFactory());
222222
factory.getContainerProperties().setAckMode(AckMode.BATCH);
223-
factory.getContainerProperties().setTransactionManager(tm());
223+
factory.getContainerProperties().setKafkaAwareTransactionManager(tm());
224224
factory.setBatchListener(true);
225225
factory.getContainerProperties().setSubBatchPerPartition(true);
226226
return factory;

spring-kafka/src/test/java/org/springframework/kafka/listener/SubBatchPerPartitionTxTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2022 the original author or authors.
2+
* Copyright 2017-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -206,7 +206,7 @@ public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
206206
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
207207
factory.setConsumerFactory(consumerFactory());
208208
factory.getContainerProperties().setAckMode(AckMode.BATCH);
209-
factory.getContainerProperties().setTransactionManager(tm());
209+
factory.getContainerProperties().setKafkaAwareTransactionManager(tm());
210210
factory.getContainerProperties().setSubBatchPerPartition(true);
211211
factory.setBatchListener(true);
212212
return factory;

0 commit comments

Comments
 (0)