Skip to content

Commit 1278a9c

Browse files
authored
GH-2604: Fix Transaction Synch Regression (#2605)
Resolves #2604 Prior to #2005 synchronized transactions were committed in `afterCompletion`. This was incorrect because a failure to commit was not propagated to the caller. However, if a higher order synchronization throws an exception in `afterCommit`, the producer is closed (returned to the cache) while still in a transaction - the transaction was not committed. Restore the commit in `afterCompletion` and add a check in `KafkaResourceHolder` to only commit once. Add tests. **cherry-pick to 2.9.x**
1 parent 9d49128 commit 1278a9c

File tree

3 files changed

+127
-4
lines changed

3 files changed

+127
-4
lines changed

spring-kafka/src/main/java/org/springframework/kafka/core/KafkaResourceHolder.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2019 the original author or authors.
2+
* Copyright 2017-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -38,6 +38,8 @@ public class KafkaResourceHolder<K, V> extends ResourceHolderSupport {
3838

3939
private final Duration closeTimeout;
4040

41+
private boolean committed;
42+
4143
/**
4244
* Construct an instance for the producer.
4345
* @param producer the producer.
@@ -55,7 +57,10 @@ public Producer<K, V> getProducer() {
5557
}
5658

5759
public void commit() {
58-
this.producer.commitTransaction();
60+
if (!this.committed) {
61+
this.producer.commitTransaction();
62+
this.committed = true;
63+
}
5964
}
6065

6166
public void close() {

spring-kafka/src/main/java/org/springframework/kafka/core/ProducerFactoryUtils.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2021 the original author or authors.
2+
* Copyright 2002-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -153,7 +153,10 @@ protected void processResourceAfterCommit(KafkaResourceHolder<K, V> resourceHold
153153
@Override
154154
public void afterCompletion(int status) {
155155
try {
156-
if (status != TransactionSynchronization.STATUS_COMMITTED) {
156+
if (status == TransactionSynchronization.STATUS_COMMITTED) {
157+
this.resourceHolder.commit();
158+
}
159+
else {
157160
this.resourceHolder.rollback();
158161
}
159162
}
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
/*
2+
* Copyright 2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package transaction;
18+
19+
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
20+
import static org.mockito.ArgumentMatchers.any;
21+
import static org.mockito.BDDMockito.given;
22+
import static org.mockito.Mockito.mock;
23+
import static org.mockito.Mockito.verify;
24+
25+
import org.apache.kafka.clients.producer.Producer;
26+
import org.junit.jupiter.api.Test;
27+
28+
import org.springframework.core.Ordered;
29+
import org.springframework.kafka.core.KafkaResourceHolder;
30+
import org.springframework.kafka.core.ProducerFactory;
31+
import org.springframework.kafka.core.ProducerFactoryUtils;
32+
import org.springframework.transaction.TransactionDefinition;
33+
import org.springframework.transaction.TransactionException;
34+
import org.springframework.transaction.support.AbstractPlatformTransactionManager;
35+
import org.springframework.transaction.support.DefaultTransactionStatus;
36+
import org.springframework.transaction.support.TransactionSynchronization;
37+
import org.springframework.transaction.support.TransactionSynchronizationManager;
38+
import org.springframework.transaction.support.TransactionTemplate;
39+
40+
/**
41+
* @author Gary Russell
42+
* @since 2.9.7
43+
*
44+
*/
45+
public class TransactionSynchronizationTests {
46+
47+
@SuppressWarnings({ "rawtypes", "unchecked" })
48+
@Test
49+
void commitAfterAnotherSyncFails() {
50+
Producer producer = mock(Producer.class);
51+
ProducerFactory pf = mock(ProducerFactory.class);
52+
given(pf.createProducer(any())).willReturn(producer);
53+
assertThatExceptionOfType(RuntimeException.class)
54+
.isThrownBy(() ->
55+
new TransactionTemplate(new TM()).executeWithoutResult(status -> {
56+
KafkaResourceHolder holder = ProducerFactoryUtils.getTransactionalResourceHolder(pf);
57+
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
58+
59+
@Override
60+
public void afterCommit() {
61+
if (true) {
62+
throw new RuntimeException("Test");
63+
}
64+
}
65+
66+
@Override
67+
public int getOrder() {
68+
return Ordered.HIGHEST_PRECEDENCE;
69+
}
70+
71+
});
72+
}))
73+
.withMessage("Test");
74+
verify(producer).beginTransaction();
75+
verify(producer).commitTransaction();
76+
verify(producer).close(any());
77+
}
78+
79+
@SuppressWarnings({ "rawtypes", "unchecked" })
80+
@Test
81+
void onlyOnceCommit() {
82+
Producer producer = mock(Producer.class);
83+
ProducerFactory pf = mock(ProducerFactory.class);
84+
given(pf.createProducer(any())).willReturn(producer);
85+
new TransactionTemplate(new TM()).executeWithoutResult(status -> {
86+
KafkaResourceHolder holder = ProducerFactoryUtils.getTransactionalResourceHolder(pf);
87+
});
88+
verify(producer).beginTransaction();
89+
verify(producer).commitTransaction();
90+
verify(producer).close(any());
91+
}
92+
93+
static class TM extends AbstractPlatformTransactionManager {
94+
95+
@Override
96+
protected Object doGetTransaction() throws TransactionException {
97+
return new Object();
98+
}
99+
100+
@Override
101+
protected void doBegin(Object transaction, TransactionDefinition definition) throws TransactionException {
102+
}
103+
104+
@Override
105+
protected void doCommit(DefaultTransactionStatus status) throws TransactionException {
106+
}
107+
108+
@Override
109+
protected void doRollback(DefaultTransactionStatus status) throws TransactionException {
110+
}
111+
112+
}
113+
114+
}
115+

0 commit comments

Comments
 (0)