Skip to content

Commit c27e297

Browse files
committed
Ensure reactive transaction rollback on commit error
This change fixes a situation where error handling was skipped during `processCommit()` in case the `doCommit()` failed. The error handling was set up via an `onErrorResume` operator that was nested inside a `then(...)`, applied to an inner `Mono.empty()`. As a consequence, it would never receive an error signal (effectively decoupling the onErrorResume from the main chain). This change simply moves the error handling back one level up. It also simplifies the `doCommit` code a bit by getting rid of the steps that artificially introduce a `Mono<Object>` return type, which is not really needed. A pre-existing test was missing the fact that the rollback didn't occur, which is now fixed. Another dedicated test is introduced building upon the `ReactiveTestTransactionManager` class. Closes spring-projectsgh-28968 Closes spring-projectsgh-30096
1 parent 2e5d047 commit c27e297

File tree

4 files changed

+37
-6
lines changed

4 files changed

+37
-6
lines changed

spring-r2dbc/src/test/java/org/springframework/r2dbc/connection/R2dbcTransactionManagerUnitTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -270,6 +270,7 @@ void testCommitFails() {
270270
verify(connectionMock).beginTransaction(any(io.r2dbc.spi.TransactionDefinition.class));
271271
verify(connectionMock).createStatement("foo");
272272
verify(connectionMock).commitTransaction();
273+
verify(connectionMock).rollbackTransaction();
273274
verify(connectionMock).close();
274275
verifyNoMoreInteractions(connectionMock);
275276
}

spring-tx/src/main/java/org/springframework/transaction/reactive/AbstractReactiveTransactionManager.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -433,7 +433,7 @@ private Mono<Void> processCommit(TransactionSynchronizationManager synchronizati
433433

434434
AtomicBoolean beforeCompletionInvoked = new AtomicBoolean();
435435

436-
Mono<Object> commit = prepareForCommit(synchronizationManager, status)
436+
Mono<Void> commit = prepareForCommit(synchronizationManager, status)
437437
.then(triggerBeforeCommit(synchronizationManager, status))
438438
.then(triggerBeforeCompletion(synchronizationManager, status))
439439
.then(Mono.defer(() -> {
@@ -445,11 +445,12 @@ private Mono<Void> processCommit(TransactionSynchronizationManager synchronizati
445445
return doCommit(synchronizationManager, status);
446446
}
447447
return Mono.empty();
448-
})).then(Mono.empty().onErrorResume(ex -> {
449-
Mono<Object> propagateException = Mono.error(ex);
448+
})) //
449+
.onErrorResume(ex -> {
450+
Mono<Void> propagateException = Mono.error(ex);
450451
// Store result in a local variable in order to appease the
451452
// Eclipse compiler with regard to inferred generics.
452-
Mono<Object> result = propagateException;
453+
Mono<Void> result = propagateException;
453454
if (ErrorPredicates.UNEXPECTED_ROLLBACK.test(ex)) {
454455
result = triggerAfterCompletion(synchronizationManager, status, TransactionSynchronization.STATUS_ROLLED_BACK)
455456
.then(propagateException);
@@ -471,7 +472,8 @@ else if (ErrorPredicates.RUNTIME_OR_ERROR.test(ex)) {
471472
}
472473

473474
return result;
474-
})).then(Mono.defer(() -> triggerAfterCommit(synchronizationManager, status).onErrorResume(ex ->
475+
})
476+
.then(Mono.defer(() -> triggerAfterCommit(synchronizationManager, status).onErrorResume(ex ->
475477
triggerAfterCompletion(synchronizationManager, status, TransactionSynchronization.STATUS_COMMITTED).then(Mono.error(ex)))
476478
.then(triggerAfterCompletion(synchronizationManager, status, TransactionSynchronization.STATUS_COMMITTED))));
477479

spring-tx/src/test/java/org/springframework/transaction/reactive/ReactiveTestTransactionManager.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ class ReactiveTestTransactionManager extends AbstractReactiveTransactionManager
3636

3737
private final boolean canCreateTransaction;
3838

39+
private final boolean forceFailOnCommit;
40+
3941
protected boolean begin = false;
4042

4143
protected boolean commit = false;
@@ -48,8 +50,13 @@ class ReactiveTestTransactionManager extends AbstractReactiveTransactionManager
4850

4951

5052
ReactiveTestTransactionManager(boolean existingTransaction, boolean canCreateTransaction) {
53+
this(existingTransaction, canCreateTransaction, false);
54+
}
55+
56+
ReactiveTestTransactionManager(boolean existingTransaction, boolean canCreateTransaction, boolean forceFailOnCommit) {
5157
this.existingTransaction = existingTransaction;
5258
this.canCreateTransaction = canCreateTransaction;
59+
this.forceFailOnCommit = forceFailOnCommit;
5360
}
5461

5562

@@ -79,7 +86,12 @@ protected Mono<Void> doCommit(TransactionSynchronizationManager synchronizationM
7986
if (!TRANSACTION.equals(status.getTransaction())) {
8087
return Mono.error(new IllegalArgumentException("Not the same transaction object"));
8188
}
82-
return Mono.fromRunnable(() -> this.commit = true);
89+
return Mono.fromRunnable(() -> {
90+
this.commit = true;
91+
if (this.forceFailOnCommit) {
92+
throw new IllegalArgumentException("Forced failure on commit");
93+
}
94+
});
8395
}
8496

8597
@Override

spring-tx/src/test/java/org/springframework/transaction/reactive/ReactiveTransactionSupportTests.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,22 @@ public void transactionTemplateWithException() {
203203
assertHasCleanedUp(tm);
204204
}
205205

206+
//gh-28968
207+
@Test
208+
void errorInCommitDoesInitiateRollbackAfterCommit() {
209+
ReactiveTestTransactionManager tm = new ReactiveTestTransactionManager(false, true, true);
210+
TransactionalOperator rxtx = TransactionalOperator.create(tm);
211+
212+
StepVerifier.create(rxtx.transactional(Mono.just("bar")))
213+
.verifyErrorMessage("Forced failure on commit");
214+
215+
assertHasBegan(tm);
216+
assertHasCommitted(tm);
217+
assertHasRolledBack(tm);
218+
assertHasNotSetRollbackOnly(tm);
219+
assertHasCleanedUp(tm);
220+
}
221+
206222
private void assertHasBegan(ReactiveTestTransactionManager actual) {
207223
assertThat(actual.begin).as("Expected <ReactiveTransactionManager.begin()> but was <begin()> was not invoked").isTrue();
208224
}

0 commit comments

Comments
 (0)