Skip to content

Commit 40915c9

Browse files
committed
Allow template.save() to find if it is in a transaction.
Closes #1757.
1 parent 240512d commit 40915c9

File tree

3 files changed

+27
-7
lines changed

3 files changed

+27
-7
lines changed

src/main/java/org/springframework/data/couchbase/core/ReactiveCouchbaseTemplate.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ public <T> Mono<T> save(T entity, String... scopeAndCollection) {
8080

8181
String scope = scopeAndCollection.length > 0 ? scopeAndCollection[0] : null;
8282
String collection = scopeAndCollection.length > 1 ? scopeAndCollection[1] : null;
83-
return Mono.defer(() -> {
83+
return Mono.deferContextual( ctx1 -> {
8484
final CouchbasePersistentEntity<?> mapperEntity = getConverter().getMappingContext()
8585
.getPersistentEntity(entity.getClass());
8686
final CouchbasePersistentProperty versionProperty = mapperEntity.getVersionProperty();

src/test/java/org/springframework/data/couchbase/transactions/sdk/SDKTransactionsSaveIntegrationTests.java

+13-6
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,11 @@
1818

1919
import static org.junit.jupiter.api.Assertions.assertTrue;
2020
import static org.junit.jupiter.api.Assertions.fail;
21-
import static org.springframework.data.couchbase.transactions.util.TransactionTestUtil.assertNotInTransaction;
21+
import static org.springframework.data.couchbase.transactions.util.TransactionTestUtil.assertInTransaction;
22+
import static org.springframework.data.couchbase.transactions.util.TransactionTestUtil.assertInReactiveTransaction;
23+
import static org.springframework.data.couchbase.transactions.util.TransactionTestUtil.assertNotInReactiveTransaction;
2224

2325
import org.springframework.data.couchbase.domain.PersonWithoutVersion;
24-
import reactor.core.publisher.Mono;
25-
import reactor.core.scheduler.Schedulers;
2626

2727
import org.junit.jupiter.api.AfterEach;
2828
import org.junit.jupiter.api.BeforeEach;
@@ -32,7 +32,6 @@
3232
import org.springframework.data.couchbase.CouchbaseClientFactory;
3333
import org.springframework.data.couchbase.core.CouchbaseTemplate;
3434
import org.springframework.data.couchbase.core.ReactiveCouchbaseTemplate;
35-
import org.springframework.data.couchbase.domain.Person;
3635
import org.springframework.data.couchbase.transactions.TransactionsConfig;
3736
import org.springframework.data.couchbase.util.Capabilities;
3837
import org.springframework.data.couchbase.util.ClusterType;
@@ -55,45 +54,53 @@ public class SDKTransactionsSaveIntegrationTests extends JavaIntegrationTests {
5554
@BeforeEach
5655
public void beforeEachTest() {
5756
assertNotInTransaction();
57+
assertNotInReactiveTransaction();
5858
}
5959

6060
@AfterEach
6161
public void afterEachTest() {
6262
assertNotInTransaction();
63+
assertNotInReactiveTransaction();
6364
}
6465

66+
6567
@DisplayName("ReactiveCouchbaseTemplate.save() called inside a reactive SDK transaction should work")
6668
@Test
6769
public void reactiveSaveInReactiveTransaction() {
6870
couchbaseClientFactory.getCluster().reactive().transactions().run(ctx -> {
6971
PersonWithoutVersion p = new PersonWithoutVersion("Walter", "White");
70-
return reactiveOps.save(p);
72+
return reactiveOps.save(p).then(assertInReactiveTransaction());
7173
}).block();
7274
}
7375

7476
@DisplayName("ReactiveCouchbaseTemplate.save().block() called inside a non-reactive SDK transaction should work")
7577
@Test
7678
public void reactiveSaveInBlockingTransaction() {
7779
couchbaseClientFactory.getCluster().transactions().run(ctx -> {
80+
assertInTransaction();
7881
PersonWithoutVersion p = new PersonWithoutVersion("Walter", "White");
7982
reactiveOps.save(p).block();
8083
});
8184
}
8285

86+
// This should not work because ops.save(p) calls block() so everything in that call
87+
// does not have the reactive context (which has the transaction context)
88+
// what happens is the ops.save(p) is not in a transaction. (it will call upsert instead of insert)
8389
@DisplayName("ReactiveCouchbaseTemplate.save() called inside a reactive SDK transaction should work")
8490
@Test
8591
public void blockingSaveInReactiveTransaction() {
8692
couchbaseClientFactory.getCluster().reactive().transactions().run(ctx -> {
8793
PersonWithoutVersion p = new PersonWithoutVersion("Walter", "White");
8894
ops.save(p);
89-
return Mono.empty();
95+
return assertInReactiveTransaction();
9096
}).block();
9197
}
9298

9399
@DisplayName("ReactiveCouchbaseTemplate.save().block() called inside a non-reactive SDK transaction should work")
94100
@Test
95101
public void blockingSaveInBlockingTransaction() {
96102
couchbaseClientFactory.getCluster().transactions().run(ctx -> {
103+
assertInTransaction();
97104
PersonWithoutVersion p = new PersonWithoutVersion("Walter", "White");
98105
ops.save(p);
99106
});

src/test/java/org/springframework/data/couchbase/transactions/util/TransactionTestUtil.java

+13
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import static org.junit.jupiter.api.Assertions.assertTrue;
2020

2121
import org.springframework.data.couchbase.core.TransactionalSupport;
22+
import reactor.core.publisher.Mono;
2223

2324
/**
2425
* Utility methods for transaction tests.
@@ -35,4 +36,16 @@ public static void assertInTransaction() {
3536
public static void assertNotInTransaction() {
3637
assertFalse(TransactionalSupport.checkForTransactionInThreadLocalStorage().block().isPresent());
3738
}
39+
40+
public static <T> Mono<T> assertInReactiveTransaction(T... obj) {
41+
return Mono.deferContextual((ctx1) ->
42+
TransactionalSupport.checkForTransactionInThreadLocalStorage()
43+
.flatMap(ctx2 -> ctx2.isPresent() ? (obj.length>0 ? Mono.just(obj[0]) : Mono.empty()) : Mono.error(new RuntimeException("in transaction"))));
44+
}
45+
46+
public static <T> Mono<T> assertNotInReactiveTransaction(T... obj) {
47+
return Mono.deferContextual((ctx1) ->
48+
TransactionalSupport.checkForTransactionInThreadLocalStorage()
49+
.flatMap(ctx2 -> !ctx2.isPresent() ? (obj.length>0 ? Mono.just(obj[0]) : Mono.empty()) : Mono.error(new RuntimeException("in transaction"))));
50+
}
3851
}

0 commit comments

Comments
 (0)