From d0825cfb90735865c5cf53cdaac6f7e102bd9674 Mon Sep 17 00:00:00 2001 From: Yan Kardziyaka Date: Sat, 11 Nov 2023 03:08:57 +0300 Subject: [PATCH] Declarative way for setting MongoDB transaction options. Closes #1628 --- .../data/mongodb/MongoTransactionManager.java | 2 +- .../data/mongodb/MongoTransactionUtils.java | 98 ++++++++ .../ReactiveMongoTransactionManager.java | 2 +- .../MongoTransactionUtilsUnitTests.java | 227 ++++++++++++++++++ .../ReactiveTransactionIntegrationTests.java | 159 ++++++++++-- ...ReactiveTransactionOptionsTestService.java | 96 ++++++++ .../core/MongoTemplateTransactionTests.java | 144 ++++++++++- .../core/TransactionOptionsTestService.java | 101 ++++++++ 8 files changed, 807 insertions(+), 22 deletions(-) create mode 100644 spring-data-mongodb/src/main/java/org/springframework/data/mongodb/MongoTransactionUtils.java create mode 100644 spring-data-mongodb/src/test/java/org/springframework/data/mongodb/MongoTransactionUtilsUnitTests.java create mode 100644 spring-data-mongodb/src/test/java/org/springframework/data/mongodb/ReactiveTransactionOptionsTestService.java create mode 100644 spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/TransactionOptionsTestService.java diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/MongoTransactionManager.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/MongoTransactionManager.java index 60cdb193c0..f573c7fb5d 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/MongoTransactionManager.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/MongoTransactionManager.java @@ -134,7 +134,7 @@ protected void doBegin(Object transaction, TransactionDefinition definition) thr } try { - mongoTransactionObject.startTransaction(options); + mongoTransactionObject.startTransaction(MongoTransactionUtils.extractOptions(definition, options)); } catch (MongoException ex) { throw new TransactionSystemException(String.format("Could not start Mongo transaction for session %s.", debugString(mongoTransactionObject.getSession())), ex); diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/MongoTransactionUtils.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/MongoTransactionUtils.java new file mode 100644 index 0000000000..13c4c259b9 --- /dev/null +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/MongoTransactionUtils.java @@ -0,0 +1,98 @@ +/* + * Copyright 2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.mongodb; + +import java.time.Duration; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.springframework.lang.Nullable; +import org.springframework.transaction.TransactionDefinition; +import org.springframework.transaction.interceptor.TransactionAttribute; + +import com.mongodb.ReadConcern; +import com.mongodb.ReadConcernLevel; +import com.mongodb.ReadPreference; +import com.mongodb.TransactionOptions; +import com.mongodb.WriteConcern; + +/** + * Helper class for translating @Transactional labels into Mongo-specific {@link TransactionOptions}. + * + * @author Yan Kardziyaka + */ +public final class MongoTransactionUtils { + private static final Log LOGGER = LogFactory.getLog(MongoTransactionUtils.class); + + private static final String MAX_COMMIT_TIME = "mongo:maxCommitTime"; + + private static final String READ_CONCERN_OPTION = "mongo:readConcern"; + + private static final String READ_PREFERENCE_OPTION = "mongo:readPreference"; + + private static final String WRITE_CONCERN_OPTION = "mongo:writeConcern"; + + private MongoTransactionUtils() {} + + @Nullable + public static TransactionOptions extractOptions(TransactionDefinition transactionDefinition, + @Nullable TransactionOptions fallbackOptions) { + if (transactionDefinition instanceof TransactionAttribute transactionAttribute) { + TransactionOptions.Builder builder = null; + for (String label : transactionAttribute.getLabels()) { + String[] tokens = label.split("=", 2); + builder = tokens.length == 2 ? enhanceWithProperty(builder, tokens[0], tokens[1]) : builder; + } + if (builder == null) { + return fallbackOptions; + } + TransactionOptions options = builder.build(); + return fallbackOptions == null ? options : TransactionOptions.merge(options, fallbackOptions); + } else { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("%s cannot be casted to %s. Transaction labels won't be evaluated as options".formatted( + TransactionDefinition.class.getName(), TransactionAttribute.class.getName())); + } + return fallbackOptions; + } + } + + @Nullable + private static TransactionOptions.Builder enhanceWithProperty(@Nullable TransactionOptions.Builder builder, + String key, String value) { + return switch (key) { + case MAX_COMMIT_TIME -> nullSafe(builder).maxCommitTime(Duration.parse(value).toMillis(), TimeUnit.MILLISECONDS); + case READ_CONCERN_OPTION -> nullSafe(builder).readConcern(new ReadConcern(ReadConcernLevel.fromString(value))); + case READ_PREFERENCE_OPTION -> nullSafe(builder).readPreference(ReadPreference.valueOf(value)); + case WRITE_CONCERN_OPTION -> nullSafe(builder).writeConcern(getWriteConcern(value)); + default -> builder; + }; + } + + private static TransactionOptions.Builder nullSafe(@Nullable TransactionOptions.Builder builder) { + return builder == null ? TransactionOptions.builder() : builder; + } + + private static WriteConcern getWriteConcern(String writeConcernAsString) { + WriteConcern writeConcern = WriteConcern.valueOf(writeConcernAsString); + if (writeConcern == null) { + throw new IllegalArgumentException("'%s' is not a valid WriteConcern".formatted(writeConcernAsString)); + } + return writeConcern; + } + +} diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/ReactiveMongoTransactionManager.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/ReactiveMongoTransactionManager.java index aafd06605a..76d646ad9b 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/ReactiveMongoTransactionManager.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/ReactiveMongoTransactionManager.java @@ -146,7 +146,7 @@ protected Mono doBegin(TransactionSynchronizationManager synchronizationMa }).doOnNext(resourceHolder -> { - mongoTransactionObject.startTransaction(options); + mongoTransactionObject.startTransaction(MongoTransactionUtils.extractOptions(definition, options)); if (logger.isDebugEnabled()) { logger.debug(String.format("Started transaction for session %s.", debugString(resourceHolder.getSession()))); diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/MongoTransactionUtilsUnitTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/MongoTransactionUtilsUnitTests.java new file mode 100644 index 0000000000..1e2916d004 --- /dev/null +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/MongoTransactionUtilsUnitTests.java @@ -0,0 +1,227 @@ +/* + * Copyright 2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.mongodb; + +import static java.util.UUID.*; +import static org.assertj.core.api.Assertions.*; + +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import org.junit.jupiter.api.Test; +import org.springframework.transaction.interceptor.DefaultTransactionAttribute; +import org.springframework.transaction.interceptor.TransactionAttribute; +import org.springframework.transaction.support.DefaultTransactionDefinition; + +import com.mongodb.ReadConcern; +import com.mongodb.ReadPreference; +import com.mongodb.TransactionOptions; +import com.mongodb.WriteConcern; + +/** + * @author Yan Kardziyaka + */ +class MongoTransactionUtilsUnitTests { + + @Test // GH-1628 + public void shouldThrowIllegalArgumentExceptionIfLabelsContainInvalidMaxCommitTime() { + TransactionOptions fallbackOptions = getTransactionOptions(); + DefaultTransactionAttribute attribute = new DefaultTransactionAttribute(); + attribute.setLabels(Set.of("mongo:maxCommitTime=-PT5S")); + + assertThatThrownBy(() -> MongoTransactionUtils.extractOptions(attribute, fallbackOptions)) // + .isInstanceOf(IllegalArgumentException.class); + } + + @Test // GH-1628 + public void shouldThrowIllegalArgumentExceptionIfLabelsContainInvalidReadConcern() { + TransactionOptions fallbackOptions = getTransactionOptions(); + DefaultTransactionAttribute attribute = new DefaultTransactionAttribute(); + attribute.setLabels(Set.of("mongo:readConcern=invalidValue")); + + assertThatThrownBy(() -> MongoTransactionUtils.extractOptions(attribute, fallbackOptions)) // + .isInstanceOf(IllegalArgumentException.class); + } + + @Test // GH-1628 + public void shouldThrowIllegalArgumentExceptionIfLabelsContainInvalidReadPreference() { + TransactionOptions fallbackOptions = getTransactionOptions(); + DefaultTransactionAttribute attribute = new DefaultTransactionAttribute(); + attribute.setLabels(Set.of("mongo:readPreference=invalidValue")); + + assertThatThrownBy(() -> MongoTransactionUtils.extractOptions(attribute, fallbackOptions)) // + .isInstanceOf(IllegalArgumentException.class); + } + + @Test // GH-1628 + public void shouldThrowIllegalArgumentExceptionIfLabelsContainInvalidWriteConcern() { + TransactionOptions fallbackOptions = getTransactionOptions(); + DefaultTransactionAttribute attribute = new DefaultTransactionAttribute(); + attribute.setLabels(Set.of("mongo:writeConcern=invalidValue")); + + assertThatThrownBy(() -> MongoTransactionUtils.extractOptions(attribute, fallbackOptions)) // + .isInstanceOf(IllegalArgumentException.class); + } + + @Test // GH-1628 + public void shouldReturnFallbackOptionsIfNotTransactionAttribute() { + TransactionOptions fallbackOptions = getTransactionOptions(); + DefaultTransactionDefinition definition = new DefaultTransactionDefinition(); + + TransactionOptions result = MongoTransactionUtils.extractOptions(definition, fallbackOptions); + + assertThat(result).isSameAs(fallbackOptions); + } + + @Test // GH-1628 + public void shouldReturnFallbackOptionsIfNoLabelsProvided() { + TransactionOptions fallbackOptions = getTransactionOptions(); + TransactionAttribute attribute = new DefaultTransactionAttribute(); + + TransactionOptions result = MongoTransactionUtils.extractOptions(attribute, fallbackOptions); + + assertThat(result).isSameAs(fallbackOptions); + } + + @Test // GH-1628 + public void shouldReturnFallbackOptionsIfLabelsDoesNotContainValidOptions() { + TransactionOptions fallbackOptions = getTransactionOptions(); + DefaultTransactionAttribute attribute = new DefaultTransactionAttribute(); + Set labels = Set.of("mongo:readConcern", "writeConcern", "readPreference=SECONDARY", + "mongo:maxCommitTime PT5M", randomUUID().toString()); + attribute.setLabels(labels); + + TransactionOptions result = MongoTransactionUtils.extractOptions(attribute, fallbackOptions); + + assertThat(result).isSameAs(fallbackOptions); + } + + @Test // GH-1628 + public void shouldReturnMergedOptionsIfLabelsContainMaxCommitTime() { + TransactionOptions fallbackOptions = getTransactionOptions(); + DefaultTransactionAttribute attribute = new DefaultTransactionAttribute(); + attribute.setLabels(Set.of("mongo:maxCommitTime=PT5S")); + + TransactionOptions result = MongoTransactionUtils.extractOptions(attribute, fallbackOptions); + + assertThat(result).isNotSameAs(fallbackOptions) // + .returns(5L, from(options -> options.getMaxCommitTime(TimeUnit.SECONDS))) // + .returns(ReadConcern.AVAILABLE, from(TransactionOptions::getReadConcern)) // + .returns(ReadPreference.secondaryPreferred(), from(TransactionOptions::getReadPreference)) // + .returns(WriteConcern.UNACKNOWLEDGED, from(TransactionOptions::getWriteConcern)); + } + + @Test // GH-1628 + public void shouldReturnMergedOptionsIfLabelsContainReadConcern() { + TransactionOptions fallbackOptions = getTransactionOptions(); + DefaultTransactionAttribute attribute = new DefaultTransactionAttribute(); + attribute.setLabels(Set.of("mongo:readConcern=majority")); + + TransactionOptions result = MongoTransactionUtils.extractOptions(attribute, fallbackOptions); + + assertThat(result).isNotSameAs(fallbackOptions) // + .returns(1L, from(options -> options.getMaxCommitTime(TimeUnit.MINUTES))) // + .returns(ReadConcern.MAJORITY, from(TransactionOptions::getReadConcern)) // + .returns(ReadPreference.secondaryPreferred(), from(TransactionOptions::getReadPreference)) // + .returns(WriteConcern.UNACKNOWLEDGED, from(TransactionOptions::getWriteConcern)); + } + + @Test // GH-1628 + public void shouldReturnMergedOptionsIfLabelsContainReadPreference() { + TransactionOptions fallbackOptions = getTransactionOptions(); + DefaultTransactionAttribute attribute = new DefaultTransactionAttribute(); + attribute.setLabels(Set.of("mongo:readPreference=primaryPreferred")); + + TransactionOptions result = MongoTransactionUtils.extractOptions(attribute, fallbackOptions); + + assertThat(result).isNotSameAs(fallbackOptions) // + .returns(1L, from(options -> options.getMaxCommitTime(TimeUnit.MINUTES))) // + .returns(ReadConcern.AVAILABLE, from(TransactionOptions::getReadConcern)) // + .returns(ReadPreference.primaryPreferred(), from(TransactionOptions::getReadPreference)) // + .returns(WriteConcern.UNACKNOWLEDGED, from(TransactionOptions::getWriteConcern)); + } + + @Test // GH-1628 + public void shouldReturnMergedOptionsIfLabelsContainWriteConcern() { + TransactionOptions fallbackOptions = getTransactionOptions(); + DefaultTransactionAttribute attribute = new DefaultTransactionAttribute(); + attribute.setLabels(Set.of("mongo:writeConcern=w3")); + + TransactionOptions result = MongoTransactionUtils.extractOptions(attribute, fallbackOptions); + + assertThat(result).isNotSameAs(fallbackOptions) // + .returns(1L, from(options -> options.getMaxCommitTime(TimeUnit.MINUTES))) // + .returns(ReadConcern.AVAILABLE, from(TransactionOptions::getReadConcern)) // + .returns(ReadPreference.secondaryPreferred(), from(TransactionOptions::getReadPreference)) // + .returns(WriteConcern.W3, from(TransactionOptions::getWriteConcern)); + } + + @Test // GH-1628 + public void shouldReturnNewOptionsIfLabelsContainAllOptions() { + TransactionOptions fallbackOptions = getTransactionOptions(); + DefaultTransactionAttribute attribute = new DefaultTransactionAttribute(); + Set labels = Set.of("mongo:maxCommitTime=PT5S", "mongo:readConcern=majority", + "mongo:readPreference=primaryPreferred", "mongo:writeConcern=w3"); + attribute.setLabels(labels); + + TransactionOptions result = MongoTransactionUtils.extractOptions(attribute, fallbackOptions); + + assertThat(result).isNotSameAs(fallbackOptions) // + .returns(5L, from(options -> options.getMaxCommitTime(TimeUnit.SECONDS))) // + .returns(ReadConcern.MAJORITY, from(TransactionOptions::getReadConcern)) // + .returns(ReadPreference.primaryPreferred(), from(TransactionOptions::getReadPreference)) // + .returns(WriteConcern.W3, from(TransactionOptions::getWriteConcern)); + } + + @Test // GH-1628 + public void shouldReturnMergedOptionsIfLabelsContainOptionsMixedWithOrdinaryStrings() { + TransactionOptions fallbackOptions = getTransactionOptions(); + DefaultTransactionAttribute attribute = new DefaultTransactionAttribute(); + Set labels = Set.of("mongo:maxCommitTime=PT5S", "mongo:nonExistentOption=value", "label", + "mongo:writeConcern=w3"); + attribute.setLabels(labels); + + TransactionOptions result = MongoTransactionUtils.extractOptions(attribute, fallbackOptions); + + assertThat(result).isNotSameAs(fallbackOptions) // + .returns(5L, from(options -> options.getMaxCommitTime(TimeUnit.SECONDS))) // + .returns(ReadConcern.AVAILABLE, from(TransactionOptions::getReadConcern)) // + .returns(ReadPreference.secondaryPreferred(), from(TransactionOptions::getReadPreference)) // + .returns(WriteConcern.W3, from(TransactionOptions::getWriteConcern)); + } + + @Test // GH-1628 + public void shouldReturnNewOptionsIFallbackIsNull() { + DefaultTransactionAttribute attribute = new DefaultTransactionAttribute(); + Set labels = Set.of("mongo:maxCommitTime=PT5S", "mongo:writeConcern=w3"); + attribute.setLabels(labels); + + TransactionOptions result = MongoTransactionUtils.extractOptions(attribute, null); + + assertThat(result).returns(5L, from(options -> options.getMaxCommitTime(TimeUnit.SECONDS))) // + .returns(null, from(TransactionOptions::getReadConcern)) // + .returns(null, from(TransactionOptions::getReadPreference)) // + .returns(WriteConcern.W3, from(TransactionOptions::getWriteConcern)); + } + + private TransactionOptions getTransactionOptions() { + return TransactionOptions.builder() // + .maxCommitTime(1L, TimeUnit.MINUTES) // + .readConcern(ReadConcern.AVAILABLE) // + .readPreference(ReadPreference.secondaryPreferred()) // + .writeConcern(WriteConcern.UNACKNOWLEDGED).build(); + } +} diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/ReactiveTransactionIntegrationTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/ReactiveTransactionIntegrationTests.java index 81467671ea..6d2b4fb333 100644 --- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/ReactiveTransactionIntegrationTests.java +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/ReactiveTransactionIntegrationTests.java @@ -15,6 +15,8 @@ */ package org.springframework.data.mongodb; +import static java.util.UUID.*; + import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; @@ -35,6 +37,7 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.support.GenericApplicationContext; +import org.springframework.dao.InvalidDataAccessApiUsageException; import org.springframework.data.mongodb.config.AbstractReactiveMongoConfiguration; import org.springframework.data.mongodb.core.ReactiveMongoOperations; import org.springframework.data.mongodb.core.mapping.Document; @@ -44,6 +47,8 @@ import org.springframework.data.mongodb.test.util.EnableIfReplicaSetAvailable; import org.springframework.data.mongodb.test.util.MongoClientExtension; import org.springframework.data.mongodb.test.util.MongoTestUtils; +import org.springframework.transaction.TransactionSystemException; +import org.springframework.transaction.annotation.EnableTransactionManagement; import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.reactive.TransactionalOperator; import org.springframework.transaction.support.DefaultTransactionDefinition; @@ -55,6 +60,7 @@ * * @author Mark Paluch * @author Christoph Strobl + * @author Yan Kardziyaka */ @ExtendWith(MongoClientExtension.class) @EnableIfMongoServerVersion(isGreaterThanEqual = "4.0") @@ -69,6 +75,7 @@ public class ReactiveTransactionIntegrationTests { PersonService personService; ReactiveMongoOperations operations; + ReactiveTransactionOptionsTestService transactionOptionsTestService; @BeforeAll public static void init() { @@ -85,6 +92,7 @@ public void setUp() { personService = context.getBean(PersonService.class); operations = context.getBean(ReactiveMongoOperations.class); + transactionOptionsTestService = context.getBean(ReactiveTransactionOptionsTestService.class); try (MongoClient client = MongoTestUtils.reactiveClient()) { @@ -220,7 +228,123 @@ public void errorAfterTxShouldNotAffectPreviousStep() { .verifyComplete(); } + @Test // GH-1628 + public void shouldThrowTransactionSystemExceptionOnTransactionWithInvalidMaxCommitTime() { + + Person person = new Person(ObjectId.get(), randomUUID().toString(), randomUUID().toString()); + transactionOptionsTestService.saveWithInvalidMaxCommitTime(person) // + .as(StepVerifier::create) // + .verifyError(TransactionSystemException.class); + + operations.count(new Query(), Person.class) // + .as(StepVerifier::create) // + .expectNext(0L) // + .verifyComplete(); + } + + @Test // GH-1628 + public void shouldCommitOnTransactionWithinMaxCommitTime() { + + Person person = new Person(ObjectId.get(), randomUUID().toString(), randomUUID().toString()); + transactionOptionsTestService.saveWithinMaxCommitTime(person) // + .as(StepVerifier::create) // + .expectNext(person) // + .verifyComplete(); + + operations.count(new Query(), Person.class) // + .as(StepVerifier::create) // + .expectNext(1L) // + .verifyComplete(); + } + + @Test // GH-1628 + public void shouldThrowInvalidDataAccessApiUsageExceptionOnTransactionWithAvailableReadConcern() { + transactionOptionsTestService.availableReadConcernFind(randomUUID().toString()) // + .as(StepVerifier::create) // + .verifyError(InvalidDataAccessApiUsageException.class); + } + + @Test // GH-1628 + public void shouldThrowTransactionSystemExceptionOnTransactionWithInvalidReadConcern() { + transactionOptionsTestService.invalidReadConcernFind(randomUUID().toString()) // + .as(StepVerifier::create) // + .verifyError(TransactionSystemException.class); + } + + @Test // GH-1628 + public void shouldNotThrowOnTransactionWithMajorityReadConcern() { + transactionOptionsTestService.majorityReadConcernFind(randomUUID().toString()) // + .as(StepVerifier::create) // + .expectNextCount(0L) // + .verifyComplete(); + } + + @Test // GH-1628 + public void shouldThrowUncategorizedMongoDbExceptionOnTransactionWithPrimaryPreferredReadPreference() { + transactionOptionsTestService.findFromPrimaryPreferredReplica(randomUUID().toString()) // + .as(StepVerifier::create) // + .verifyError(UncategorizedMongoDbException.class); + } + + @Test // GH-1628 + public void shouldThrowTransactionSystemExceptionOnTransactionWithInvalidReadPreference() { + transactionOptionsTestService.findFromInvalidReplica(randomUUID().toString()) // + .as(StepVerifier::create) // + .verifyError(TransactionSystemException.class); + } + + @Test // GH-1628 + public void shouldNotThrowOnTransactionWithPrimaryReadPreference() { + transactionOptionsTestService.findFromPrimaryReplica(randomUUID().toString()) // + .as(StepVerifier::create) // + .expectNextCount(0L) // + .verifyComplete(); + } + + @Test // GH-1628 + public void shouldThrowTransactionSystemExceptionOnTransactionWithUnacknowledgedWriteConcern() { + + Person person = new Person(ObjectId.get(), randomUUID().toString(), randomUUID().toString()); + transactionOptionsTestService.unacknowledgedWriteConcernSave(person) // + .as(StepVerifier::create) // + .verifyError(TransactionSystemException.class); + + operations.count(new Query(), Person.class) // + .as(StepVerifier::create).expectNext(0L) // + .verifyComplete(); + } + + @Test // GH-1628 + public void shouldThrowTransactionSystemExceptionOnTransactionWithInvalidWriteConcern() { + + Person person = new Person(ObjectId.get(), randomUUID().toString(), randomUUID().toString()); + transactionOptionsTestService.invalidWriteConcernSave(person) // + .as(StepVerifier::create) // + .verifyError(TransactionSystemException.class); + + operations.count(new Query(), Person.class) // + .as(StepVerifier::create) // + .expectNext(0L) // + .verifyComplete(); + } + + @Test // GH-1628 + public void shouldCommitOnTransactionWithAcknowledgedWriteConcern() { + + Person person = new Person(ObjectId.get(), randomUUID().toString(), randomUUID().toString()); + transactionOptionsTestService.acknowledgedWriteConcernSave(person) // + .as(StepVerifier::create) // + .expectNext(person) // + .verifyComplete(); + + operations.count(new Query(), Person.class) // + .as(StepVerifier::create) // + .expectNext(1L) // + .verifyComplete(); + } + @Configuration + @EnableTransactionManagement static class TestMongoConfig extends AbstractReactiveMongoConfiguration { @Override @@ -234,10 +358,16 @@ protected String getDatabaseName() { } @Bean - public ReactiveMongoTransactionManager transactionManager(ReactiveMongoDatabaseFactory factory) { + public ReactiveMongoTransactionManager txManager(ReactiveMongoDatabaseFactory factory) { return new ReactiveMongoTransactionManager(factory); } + @Bean + public ReactiveTransactionOptionsTestService transactionOptionsTestService( + ReactiveMongoOperations operations) { + return new ReactiveTransactionOptionsTestService<>(operations, Person.class); + } + @Override protected Set> getInitialEntitySet() { return Collections.singleton(Person.class); @@ -291,10 +421,10 @@ public Flux saveWithLogs(Person person) { new DefaultTransactionDefinition()); return Flux.merge(operations.save(new EventLog(new ObjectId(), "beforeConvert")), // - operations.save(new EventLog(new ObjectId(), "afterConvert")), // - operations.save(new EventLog(new ObjectId(), "beforeInsert")), // - operations.save(person), // - operations.save(new EventLog(new ObjectId(), "afterInsert"))) // + operations.save(new EventLog(new ObjectId(), "afterConvert")), // + operations.save(new EventLog(new ObjectId(), "beforeInsert")), // + operations.save(person), // + operations.save(new EventLog(new ObjectId(), "afterInsert"))) // .thenMany(operations.query(EventLog.class).all()) // .as(transactionalOperator::transactional); } @@ -305,15 +435,15 @@ public Flux saveWithErrorLogs(Person person) { new DefaultTransactionDefinition()); return Flux.merge(operations.save(new EventLog(new ObjectId(), "beforeConvert")), // - operations.save(new EventLog(new ObjectId(), "afterConvert")), // - operations.save(new EventLog(new ObjectId(), "beforeInsert")), // - operations.save(person), // - operations.save(new EventLog(new ObjectId(), "afterInsert"))) // + operations.save(new EventLog(new ObjectId(), "afterConvert")), // + operations.save(new EventLog(new ObjectId(), "beforeInsert")), // + operations.save(person), // + operations.save(new EventLog(new ObjectId(), "afterInsert"))) // . flatMap(it -> Mono.error(new RuntimeException("poof"))) // .as(transactionalOperator::transactional); } - @Transactional + @Transactional(transactionManager = "txManager") public Flux declarativeSavePerson(Person person) { TransactionalOperator transactionalOperator = TransactionalOperator.create(manager, @@ -324,7 +454,7 @@ public Flux declarativeSavePerson(Person person) { }); } - @Transactional + @Transactional(transactionManager = "txManager") public Flux declarativeSavePersonErrors(Person person) { TransactionalOperator transactionalOperator = TransactionalOperator.create(manager, @@ -384,8 +514,8 @@ public boolean equals(Object o) { return false; } Person person = (Person) o; - return Objects.equals(id, person.id) && Objects.equals(firstname, person.firstname) - && Objects.equals(lastname, person.lastname); + return Objects.equals(id, person.id) && Objects.equals(firstname, person.firstname) && Objects.equals(lastname, + person.lastname); } @Override @@ -394,8 +524,7 @@ public int hashCode() { } public String toString() { - return "ReactiveTransactionIntegrationTests.Person(id=" + this.getId() + ", firstname=" + this.getFirstname() - + ", lastname=" + this.getLastname() + ")"; + return "ReactiveTransactionIntegrationTests.Person(id=" + this.getId() + ", firstname=" + this.getFirstname() + ", lastname=" + this.getLastname() + ")"; } } diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/ReactiveTransactionOptionsTestService.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/ReactiveTransactionOptionsTestService.java new file mode 100644 index 0000000000..15f1056873 --- /dev/null +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/ReactiveTransactionOptionsTestService.java @@ -0,0 +1,96 @@ +/* + * Copyright 2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.mongodb; + +import reactor.core.publisher.Mono; + +import java.util.function.Function; + +import org.springframework.data.mongodb.core.ReactiveMongoOperations; +import org.springframework.transaction.annotation.Transactional; + +/** + * Helper class for integration tests of {@link Transactional#label()} MongoDb options in reactive context. + * + * @param root document type + * @author Yan Kardziyaka + * @see org.springframework.data.mongodb.core.TransactionOptionsTestService + */ +public class ReactiveTransactionOptionsTestService { + private final Function> findByIdFunction; + + private final Function> saveFunction; + + public ReactiveTransactionOptionsTestService(ReactiveMongoOperations operations, Class entityClass) { + this.findByIdFunction = id -> operations.findById(id, entityClass); + this.saveFunction = operations::save; + } + + @Transactional(transactionManager = "txManager", label = { "mongo:maxCommitTime=-PT6H3M" }) + public Mono saveWithInvalidMaxCommitTime(T entity) { + return saveFunction.apply(entity); + } + + @Transactional(transactionManager = "txManager", label = { "mongo:maxCommitTime=PT1M" }) + public Mono saveWithinMaxCommitTime(T entity) { + return saveFunction.apply(entity); + } + + @Transactional(transactionManager = "txManager", label = { "mongo:readConcern=available" }) + public Mono availableReadConcernFind(Object id) { + return findByIdFunction.apply(id); + } + + @Transactional(transactionManager = "txManager", label = { "mongo:readConcern=invalid" }) + public Mono invalidReadConcernFind(Object id) { + return findByIdFunction.apply(id); + } + + @Transactional(transactionManager = "txManager", label = { "mongo:readConcern=majority" }) + public Mono majorityReadConcernFind(Object id) { + return findByIdFunction.apply(id); + } + + @Transactional(transactionManager = "txManager", label = { "mongo:readPreference=primaryPreferred" }) + public Mono findFromPrimaryPreferredReplica(Object id) { + return findByIdFunction.apply(id); + } + + @Transactional(transactionManager = "txManager", label = { "mongo:readPreference=invalid" }) + public Mono findFromInvalidReplica(Object id) { + return findByIdFunction.apply(id); + } + + @Transactional(transactionManager = "txManager", label = { "mongo:readPreference=primary" }) + public Mono findFromPrimaryReplica(Object id) { + return findByIdFunction.apply(id); + } + + @Transactional(transactionManager = "txManager", label = { "mongo:writeConcern=unacknowledged" }) + public Mono unacknowledgedWriteConcernSave(T entity) { + return saveFunction.apply(entity); + } + + @Transactional(transactionManager = "txManager", label = { "mongo:writeConcern=invalid" }) + public Mono invalidWriteConcernSave(T entity) { + return saveFunction.apply(entity); + } + + @Transactional(transactionManager = "txManager", label = { "mongo:writeConcern=acknowledged" }) + public Mono acknowledgedWriteConcernSave(T entity) { + return saveFunction.apply(entity); + } +} diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/MongoTemplateTransactionTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/MongoTemplateTransactionTests.java index 440dcabd5a..36c2c2eb5b 100644 --- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/MongoTemplateTransactionTests.java +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/MongoTemplateTransactionTests.java @@ -15,6 +15,7 @@ */ package org.springframework.data.mongodb.core; +import static java.util.UUID.*; import static org.assertj.core.api.Assertions.*; import static org.springframework.data.mongodb.core.query.Criteria.*; import static org.springframework.data.mongodb.core.query.Query.*; @@ -33,10 +34,12 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.dao.InvalidDataAccessApiUsageException; import org.springframework.data.annotation.Id; import org.springframework.data.domain.Persistable; import org.springframework.data.mongodb.MongoDatabaseFactory; import org.springframework.data.mongodb.MongoTransactionManager; +import org.springframework.data.mongodb.UncategorizedMongoDbException; import org.springframework.data.mongodb.config.AbstractMongoClientConfiguration; import org.springframework.data.mongodb.test.util.AfterTransactionAssertion; import org.springframework.data.mongodb.test.util.EnableIfMongoServerVersion; @@ -48,6 +51,9 @@ import org.springframework.test.context.junit.jupiter.SpringExtension; import org.springframework.test.context.transaction.AfterTransaction; import org.springframework.test.context.transaction.BeforeTransaction; +import org.springframework.transaction.TransactionSystemException; +import org.springframework.transaction.annotation.EnableTransactionManagement; +import org.springframework.transaction.annotation.Propagation; import org.springframework.transaction.annotation.Transactional; import com.mongodb.ReadPreference; @@ -57,6 +63,7 @@ /** * @author Christoph Strobl + * @author Yan Kardziyaka * @currentRead Shadow's Edge - Brent Weeks */ @ExtendWith({ MongoClientExtension.class, SpringExtension.class }) @@ -72,6 +79,7 @@ public class MongoTemplateTransactionTests { static @ReplSetClient MongoClient mongoClient; @Configuration + @EnableTransactionManagement static class Config extends AbstractMongoClientConfiguration { @Bean @@ -98,10 +106,19 @@ MongoTransactionManager txManager(MongoDatabaseFactory dbFactory) { protected Set> getInitialEntitySet() throws ClassNotFoundException { return Collections.emptySet(); } + + @Bean + public TransactionOptionsTestService transactionOptionsTestService(MongoOperations operations) { + return new TransactionOptionsTestService<>(operations, Assassin.class); + } } - @Autowired MongoTemplate template; - @Autowired MongoClient client; + @Autowired + MongoTemplate template; + @Autowired + MongoClient client; + @Autowired + TransactionOptionsTestService transactionOptionsTestService; List>> assertionList; @@ -127,8 +144,8 @@ public void verifyDbState() { boolean isPresent = collection.countDocuments(Filters.eq("_id", it.getId())) != 0; - assertThat(isPresent).isEqualTo(it.shouldBePresent()) - .withFailMessage(String.format("After transaction entity %s should %s.", it.getPersistable(), + assertThat(isPresent).isEqualTo(it.shouldBePresent()).withFailMessage( + String.format("After transaction entity %s should %s.", it.getPersistable(), it.shouldBePresent() ? "be present" : "NOT be present")); }); } @@ -166,6 +183,122 @@ public void shouldBeAbleToViewChangesDuringTransaction() throws InterruptedExcep assertAfterTransaction(durzo).isNotPresent(); } + @Rollback(false) + @Test // GH-1628 + @Transactional(transactionManager = "txManager", propagation = Propagation.NEVER) + public void shouldThrowIllegalArgumentExceptionOnTransactionWithInvalidMaxCommitTime() { + + Assassin assassin = new Assassin(randomUUID().toString(), randomUUID().toString()); + + assertThatThrownBy(() -> transactionOptionsTestService.saveWithInvalidMaxCommitTime(assassin)) // + .isInstanceOf(IllegalArgumentException.class); + + assertAfterTransaction(assassin).isNotPresent(); + } + + @Rollback(false) + @Test // GH-1628 + @Transactional(transactionManager = "txManager", propagation = Propagation.NEVER) + public void shouldCommitOnTransactionWithinMaxCommitTime() { + + Assassin assassin = new Assassin(randomUUID().toString(), randomUUID().toString()); + + transactionOptionsTestService.saveWithinMaxCommitTime(assassin); + + assertAfterTransaction(assassin).isPresent(); + } + + @Rollback(false) + @Test // GH-1628 + @Transactional(transactionManager = "txManager", propagation = Propagation.NEVER) + public void shouldThrowInvalidDataAccessApiUsageExceptionOnTransactionWithAvailableReadConcern() { + + assertThatThrownBy(() -> transactionOptionsTestService.availableReadConcernFind(randomUUID().toString())) // + .isInstanceOf(InvalidDataAccessApiUsageException.class); + } + + @Rollback(false) + @Test // GH-1628 + @Transactional(transactionManager = "txManager", propagation = Propagation.NEVER) + public void shouldThrowIllegalArgumentExceptionOnTransactionWithInvalidReadConcern() { + + assertThatThrownBy(() -> transactionOptionsTestService.invalidReadConcernFind(randomUUID().toString())) // + .isInstanceOf(IllegalArgumentException.class); + } + + @Rollback(false) + @Test // GH-1628 + @Transactional(transactionManager = "txManager", propagation = Propagation.NEVER) + public void shouldNotThrowOnTransactionWithMajorityReadConcern() { + assertThatNoException() // + .isThrownBy(() -> transactionOptionsTestService.majorityReadConcernFind(randomUUID().toString())); + } + + @Rollback(false) + @Test // GH-1628 + @Transactional(transactionManager = "txManager", propagation = Propagation.NEVER) + public void shouldThrowUncategorizedMongoDbExceptionOnTransactionWithPrimaryPreferredReadPreference() { + + assertThatThrownBy(() -> transactionOptionsTestService.findFromPrimaryPreferredReplica(randomUUID().toString())) // + .isInstanceOf(UncategorizedMongoDbException.class); + } + + @Rollback(false) + @Test // GH-1628 + @Transactional(transactionManager = "txManager", propagation = Propagation.NEVER) + public void shouldThrowIllegalArgumentExceptionOnTransactionWithInvalidReadPreference() { + + assertThatThrownBy(() -> transactionOptionsTestService.findFromInvalidReplica(randomUUID().toString())) // + .isInstanceOf(IllegalArgumentException.class); + } + + @Rollback(false) + @Test // GH-1628 + @Transactional(transactionManager = "txManager", propagation = Propagation.NEVER) + public void shouldNotThrowOnTransactionWithPrimaryReadPreference() { + + assertThatNoException() // + .isThrownBy(() -> transactionOptionsTestService.findFromPrimaryReplica(randomUUID().toString())); + } + + @Rollback(false) + @Test // GH-1628 + @Transactional(transactionManager = "txManager", propagation = Propagation.NEVER) + public void shouldThrowTransactionSystemExceptionOnTransactionWithUnacknowledgedWriteConcern() { + + Assassin assassin = new Assassin(randomUUID().toString(), randomUUID().toString()); + + assertThatThrownBy(() -> transactionOptionsTestService.unacknowledgedWriteConcernSave(assassin)) // + .isInstanceOf(TransactionSystemException.class); + + assertAfterTransaction(assassin).isNotPresent(); + } + + @Rollback(false) + @Test // GH-1628 + @Transactional(transactionManager = "txManager", propagation = Propagation.NEVER) + public void shouldThrowIllegalArgumentExceptionOnTransactionWithInvalidWriteConcern() { + + Assassin assassin = new Assassin(randomUUID().toString(), randomUUID().toString()); + + assertThatThrownBy(() -> transactionOptionsTestService.invalidWriteConcernSave(assassin)) // + .isInstanceOf(IllegalArgumentException.class); + + assertAfterTransaction(assassin).isNotPresent(); + } + + @Rollback(false) + @Test // GH-1628 + @Transactional(transactionManager = "txManager", propagation = Propagation.NEVER) + public void shouldCommitOnTransactionWithAcknowledgedWriteConcern() { + + Assassin assassin = new Assassin(randomUUID().toString(), randomUUID().toString()); + + transactionOptionsTestService.acknowledgedWriteConcernSave(assassin); + + assertAfterTransaction(assassin).isPresent(); + } + // --- Just some helpers and tests entities private AfterTransactionAssertion assertAfterTransaction(Assassin assassin) { @@ -178,7 +311,8 @@ private AfterTransactionAssertion assertAfterTransaction(Assassin assassin) { @org.springframework.data.mongodb.core.mapping.Document(COLLECTION_NAME) static class Assassin implements Persistable { - @Id String id; + @Id + String id; String name; public Assassin(String id, String name) { diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/TransactionOptionsTestService.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/TransactionOptionsTestService.java new file mode 100644 index 0000000000..f075b91343 --- /dev/null +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/TransactionOptionsTestService.java @@ -0,0 +1,101 @@ +/* + * Copyright 2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.mongodb.core; + +import java.util.function.Function; +import java.util.function.UnaryOperator; + +import org.springframework.lang.Nullable; +import org.springframework.transaction.annotation.Transactional; + +/** + * Helper class for integration tests of {@link Transactional#label()} MongoDb options in non-reactive context. + * + * @param root document type + * @author Yan Kardziyaka + * @see org.springframework.data.mongodb.ReactiveTransactionOptionsTestService + */ +public class TransactionOptionsTestService { + + private final Function findByIdFunction; + private final UnaryOperator saveFunction; + + public TransactionOptionsTestService(MongoOperations operations, Class entityClass) { + this.findByIdFunction = id -> operations.findById(id, entityClass); + this.saveFunction = operations::save; + } + + @Transactional(transactionManager = "txManager", label = { "mongo:maxCommitTime=-PT6H3M" }) + public T saveWithInvalidMaxCommitTime(T entity) { + return saveFunction.apply(entity); + } + + @Transactional(transactionManager = "txManager", label = { "mongo:maxCommitTime=PT1M" }) + public T saveWithinMaxCommitTime(T entity) { + return saveFunction.apply(entity); + } + + @Nullable + @Transactional(transactionManager = "txManager", label = { "mongo:readConcern=available" }) + public T availableReadConcernFind(Object id) { + return findByIdFunction.apply(id); + } + + @Nullable + @Transactional(transactionManager = "txManager", label = { "mongo:readConcern=invalid" }) + public T invalidReadConcernFind(Object id) { + return findByIdFunction.apply(id); + } + + @Nullable + @Transactional(transactionManager = "txManager", label = { "mongo:readConcern=majority" }) + public T majorityReadConcernFind(Object id) { + return findByIdFunction.apply(id); + } + + @Nullable + @Transactional(transactionManager = "txManager", label = { "mongo:readPreference=primaryPreferred" }) + public T findFromPrimaryPreferredReplica(Object id) { + return findByIdFunction.apply(id); + } + + @Nullable + @Transactional(transactionManager = "txManager", label = { "mongo:readPreference=invalid" }) + public T findFromInvalidReplica(Object id) { + return findByIdFunction.apply(id); + } + + @Nullable + @Transactional(transactionManager = "txManager", label = { "mongo:readPreference=primary" }) + public T findFromPrimaryReplica(Object id) { + return findByIdFunction.apply(id); + } + + @Transactional(transactionManager = "txManager", label = { "mongo:writeConcern=unacknowledged" }) + public T unacknowledgedWriteConcernSave(T entity) { + return saveFunction.apply(entity); + } + + @Transactional(transactionManager = "txManager", label = { "mongo:writeConcern=invalid" }) + public T invalidWriteConcernSave(T entity) { + return saveFunction.apply(entity); + } + + @Transactional(transactionManager = "txManager", label = { "mongo:writeConcern=acknowledged" }) + public T acknowledgedWriteConcernSave(T entity) { + return saveFunction.apply(entity); + } +}