Skip to content

Commit ef2761d

Browse files
ykardziyakachristophstrobl
authored andcommitted
Declarative way for setting MongoDB transaction options.
Closes #1628
1 parent 8e01261 commit ef2761d

8 files changed

+807
-22
lines changed

Diff for: spring-data-mongodb/src/main/java/org/springframework/data/mongodb/MongoTransactionManager.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ protected void doBegin(Object transaction, TransactionDefinition definition) thr
134134
}
135135

136136
try {
137-
mongoTransactionObject.startTransaction(options);
137+
mongoTransactionObject.startTransaction(MongoTransactionUtils.extractOptions(definition, options));
138138
} catch (MongoException ex) {
139139
throw new TransactionSystemException(String.format("Could not start Mongo transaction for session %s.",
140140
debugString(mongoTransactionObject.getSession())), ex);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
/*
2+
* Copyright 2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.data.mongodb;
17+
18+
import java.time.Duration;
19+
import java.util.concurrent.TimeUnit;
20+
21+
import org.apache.commons.logging.Log;
22+
import org.apache.commons.logging.LogFactory;
23+
import org.springframework.lang.Nullable;
24+
import org.springframework.transaction.TransactionDefinition;
25+
import org.springframework.transaction.interceptor.TransactionAttribute;
26+
27+
import com.mongodb.ReadConcern;
28+
import com.mongodb.ReadConcernLevel;
29+
import com.mongodb.ReadPreference;
30+
import com.mongodb.TransactionOptions;
31+
import com.mongodb.WriteConcern;
32+
33+
/**
34+
* Helper class for translating @Transactional labels into Mongo-specific {@link TransactionOptions}.
35+
*
36+
* @author Yan Kardziyaka
37+
*/
38+
public final class MongoTransactionUtils {
39+
private static final Log LOGGER = LogFactory.getLog(MongoTransactionUtils.class);
40+
41+
private static final String MAX_COMMIT_TIME = "mongo:maxCommitTime";
42+
43+
private static final String READ_CONCERN_OPTION = "mongo:readConcern";
44+
45+
private static final String READ_PREFERENCE_OPTION = "mongo:readPreference";
46+
47+
private static final String WRITE_CONCERN_OPTION = "mongo:writeConcern";
48+
49+
private MongoTransactionUtils() {}
50+
51+
@Nullable
52+
public static TransactionOptions extractOptions(TransactionDefinition transactionDefinition,
53+
@Nullable TransactionOptions fallbackOptions) {
54+
if (transactionDefinition instanceof TransactionAttribute transactionAttribute) {
55+
TransactionOptions.Builder builder = null;
56+
for (String label : transactionAttribute.getLabels()) {
57+
String[] tokens = label.split("=", 2);
58+
builder = tokens.length == 2 ? enhanceWithProperty(builder, tokens[0], tokens[1]) : builder;
59+
}
60+
if (builder == null) {
61+
return fallbackOptions;
62+
}
63+
TransactionOptions options = builder.build();
64+
return fallbackOptions == null ? options : TransactionOptions.merge(options, fallbackOptions);
65+
} else {
66+
if (LOGGER.isDebugEnabled()) {
67+
LOGGER.debug("%s cannot be casted to %s. Transaction labels won't be evaluated as options".formatted(
68+
TransactionDefinition.class.getName(), TransactionAttribute.class.getName()));
69+
}
70+
return fallbackOptions;
71+
}
72+
}
73+
74+
@Nullable
75+
private static TransactionOptions.Builder enhanceWithProperty(@Nullable TransactionOptions.Builder builder,
76+
String key, String value) {
77+
return switch (key) {
78+
case MAX_COMMIT_TIME -> nullSafe(builder).maxCommitTime(Duration.parse(value).toMillis(), TimeUnit.MILLISECONDS);
79+
case READ_CONCERN_OPTION -> nullSafe(builder).readConcern(new ReadConcern(ReadConcernLevel.fromString(value)));
80+
case READ_PREFERENCE_OPTION -> nullSafe(builder).readPreference(ReadPreference.valueOf(value));
81+
case WRITE_CONCERN_OPTION -> nullSafe(builder).writeConcern(getWriteConcern(value));
82+
default -> builder;
83+
};
84+
}
85+
86+
private static TransactionOptions.Builder nullSafe(@Nullable TransactionOptions.Builder builder) {
87+
return builder == null ? TransactionOptions.builder() : builder;
88+
}
89+
90+
private static WriteConcern getWriteConcern(String writeConcernAsString) {
91+
WriteConcern writeConcern = WriteConcern.valueOf(writeConcernAsString);
92+
if (writeConcern == null) {
93+
throw new IllegalArgumentException("'%s' is not a valid WriteConcern".formatted(writeConcernAsString));
94+
}
95+
return writeConcern;
96+
}
97+
98+
}

Diff for: spring-data-mongodb/src/main/java/org/springframework/data/mongodb/ReactiveMongoTransactionManager.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ protected Mono<Void> doBegin(TransactionSynchronizationManager synchronizationMa
146146

147147
}).doOnNext(resourceHolder -> {
148148

149-
mongoTransactionObject.startTransaction(options);
149+
mongoTransactionObject.startTransaction(MongoTransactionUtils.extractOptions(definition, options));
150150

151151
if (logger.isDebugEnabled()) {
152152
logger.debug(String.format("Started transaction for session %s.", debugString(resourceHolder.getSession())));
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,227 @@
1+
/*
2+
* Copyright 2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.data.mongodb;
17+
18+
import static java.util.UUID.*;
19+
import static org.assertj.core.api.Assertions.*;
20+
21+
import java.util.Set;
22+
import java.util.concurrent.TimeUnit;
23+
24+
import org.junit.jupiter.api.Test;
25+
import org.springframework.transaction.interceptor.DefaultTransactionAttribute;
26+
import org.springframework.transaction.interceptor.TransactionAttribute;
27+
import org.springframework.transaction.support.DefaultTransactionDefinition;
28+
29+
import com.mongodb.ReadConcern;
30+
import com.mongodb.ReadPreference;
31+
import com.mongodb.TransactionOptions;
32+
import com.mongodb.WriteConcern;
33+
34+
/**
35+
* @author Yan Kardziyaka
36+
*/
37+
class MongoTransactionUtilsUnitTests {
38+
39+
@Test // GH-1628
40+
public void shouldThrowIllegalArgumentExceptionIfLabelsContainInvalidMaxCommitTime() {
41+
TransactionOptions fallbackOptions = getTransactionOptions();
42+
DefaultTransactionAttribute attribute = new DefaultTransactionAttribute();
43+
attribute.setLabels(Set.of("mongo:maxCommitTime=-PT5S"));
44+
45+
assertThatThrownBy(() -> MongoTransactionUtils.extractOptions(attribute, fallbackOptions)) //
46+
.isInstanceOf(IllegalArgumentException.class);
47+
}
48+
49+
@Test // GH-1628
50+
public void shouldThrowIllegalArgumentExceptionIfLabelsContainInvalidReadConcern() {
51+
TransactionOptions fallbackOptions = getTransactionOptions();
52+
DefaultTransactionAttribute attribute = new DefaultTransactionAttribute();
53+
attribute.setLabels(Set.of("mongo:readConcern=invalidValue"));
54+
55+
assertThatThrownBy(() -> MongoTransactionUtils.extractOptions(attribute, fallbackOptions)) //
56+
.isInstanceOf(IllegalArgumentException.class);
57+
}
58+
59+
@Test // GH-1628
60+
public void shouldThrowIllegalArgumentExceptionIfLabelsContainInvalidReadPreference() {
61+
TransactionOptions fallbackOptions = getTransactionOptions();
62+
DefaultTransactionAttribute attribute = new DefaultTransactionAttribute();
63+
attribute.setLabels(Set.of("mongo:readPreference=invalidValue"));
64+
65+
assertThatThrownBy(() -> MongoTransactionUtils.extractOptions(attribute, fallbackOptions)) //
66+
.isInstanceOf(IllegalArgumentException.class);
67+
}
68+
69+
@Test // GH-1628
70+
public void shouldThrowIllegalArgumentExceptionIfLabelsContainInvalidWriteConcern() {
71+
TransactionOptions fallbackOptions = getTransactionOptions();
72+
DefaultTransactionAttribute attribute = new DefaultTransactionAttribute();
73+
attribute.setLabels(Set.of("mongo:writeConcern=invalidValue"));
74+
75+
assertThatThrownBy(() -> MongoTransactionUtils.extractOptions(attribute, fallbackOptions)) //
76+
.isInstanceOf(IllegalArgumentException.class);
77+
}
78+
79+
@Test // GH-1628
80+
public void shouldReturnFallbackOptionsIfNotTransactionAttribute() {
81+
TransactionOptions fallbackOptions = getTransactionOptions();
82+
DefaultTransactionDefinition definition = new DefaultTransactionDefinition();
83+
84+
TransactionOptions result = MongoTransactionUtils.extractOptions(definition, fallbackOptions);
85+
86+
assertThat(result).isSameAs(fallbackOptions);
87+
}
88+
89+
@Test // GH-1628
90+
public void shouldReturnFallbackOptionsIfNoLabelsProvided() {
91+
TransactionOptions fallbackOptions = getTransactionOptions();
92+
TransactionAttribute attribute = new DefaultTransactionAttribute();
93+
94+
TransactionOptions result = MongoTransactionUtils.extractOptions(attribute, fallbackOptions);
95+
96+
assertThat(result).isSameAs(fallbackOptions);
97+
}
98+
99+
@Test // GH-1628
100+
public void shouldReturnFallbackOptionsIfLabelsDoesNotContainValidOptions() {
101+
TransactionOptions fallbackOptions = getTransactionOptions();
102+
DefaultTransactionAttribute attribute = new DefaultTransactionAttribute();
103+
Set<String> labels = Set.of("mongo:readConcern", "writeConcern", "readPreference=SECONDARY",
104+
"mongo:maxCommitTime PT5M", randomUUID().toString());
105+
attribute.setLabels(labels);
106+
107+
TransactionOptions result = MongoTransactionUtils.extractOptions(attribute, fallbackOptions);
108+
109+
assertThat(result).isSameAs(fallbackOptions);
110+
}
111+
112+
@Test // GH-1628
113+
public void shouldReturnMergedOptionsIfLabelsContainMaxCommitTime() {
114+
TransactionOptions fallbackOptions = getTransactionOptions();
115+
DefaultTransactionAttribute attribute = new DefaultTransactionAttribute();
116+
attribute.setLabels(Set.of("mongo:maxCommitTime=PT5S"));
117+
118+
TransactionOptions result = MongoTransactionUtils.extractOptions(attribute, fallbackOptions);
119+
120+
assertThat(result).isNotSameAs(fallbackOptions) //
121+
.returns(5L, from(options -> options.getMaxCommitTime(TimeUnit.SECONDS))) //
122+
.returns(ReadConcern.AVAILABLE, from(TransactionOptions::getReadConcern)) //
123+
.returns(ReadPreference.secondaryPreferred(), from(TransactionOptions::getReadPreference)) //
124+
.returns(WriteConcern.UNACKNOWLEDGED, from(TransactionOptions::getWriteConcern));
125+
}
126+
127+
@Test // GH-1628
128+
public void shouldReturnMergedOptionsIfLabelsContainReadConcern() {
129+
TransactionOptions fallbackOptions = getTransactionOptions();
130+
DefaultTransactionAttribute attribute = new DefaultTransactionAttribute();
131+
attribute.setLabels(Set.of("mongo:readConcern=majority"));
132+
133+
TransactionOptions result = MongoTransactionUtils.extractOptions(attribute, fallbackOptions);
134+
135+
assertThat(result).isNotSameAs(fallbackOptions) //
136+
.returns(1L, from(options -> options.getMaxCommitTime(TimeUnit.MINUTES))) //
137+
.returns(ReadConcern.MAJORITY, from(TransactionOptions::getReadConcern)) //
138+
.returns(ReadPreference.secondaryPreferred(), from(TransactionOptions::getReadPreference)) //
139+
.returns(WriteConcern.UNACKNOWLEDGED, from(TransactionOptions::getWriteConcern));
140+
}
141+
142+
@Test // GH-1628
143+
public void shouldReturnMergedOptionsIfLabelsContainReadPreference() {
144+
TransactionOptions fallbackOptions = getTransactionOptions();
145+
DefaultTransactionAttribute attribute = new DefaultTransactionAttribute();
146+
attribute.setLabels(Set.of("mongo:readPreference=primaryPreferred"));
147+
148+
TransactionOptions result = MongoTransactionUtils.extractOptions(attribute, fallbackOptions);
149+
150+
assertThat(result).isNotSameAs(fallbackOptions) //
151+
.returns(1L, from(options -> options.getMaxCommitTime(TimeUnit.MINUTES))) //
152+
.returns(ReadConcern.AVAILABLE, from(TransactionOptions::getReadConcern)) //
153+
.returns(ReadPreference.primaryPreferred(), from(TransactionOptions::getReadPreference)) //
154+
.returns(WriteConcern.UNACKNOWLEDGED, from(TransactionOptions::getWriteConcern));
155+
}
156+
157+
@Test // GH-1628
158+
public void shouldReturnMergedOptionsIfLabelsContainWriteConcern() {
159+
TransactionOptions fallbackOptions = getTransactionOptions();
160+
DefaultTransactionAttribute attribute = new DefaultTransactionAttribute();
161+
attribute.setLabels(Set.of("mongo:writeConcern=w3"));
162+
163+
TransactionOptions result = MongoTransactionUtils.extractOptions(attribute, fallbackOptions);
164+
165+
assertThat(result).isNotSameAs(fallbackOptions) //
166+
.returns(1L, from(options -> options.getMaxCommitTime(TimeUnit.MINUTES))) //
167+
.returns(ReadConcern.AVAILABLE, from(TransactionOptions::getReadConcern)) //
168+
.returns(ReadPreference.secondaryPreferred(), from(TransactionOptions::getReadPreference)) //
169+
.returns(WriteConcern.W3, from(TransactionOptions::getWriteConcern));
170+
}
171+
172+
@Test // GH-1628
173+
public void shouldReturnNewOptionsIfLabelsContainAllOptions() {
174+
TransactionOptions fallbackOptions = getTransactionOptions();
175+
DefaultTransactionAttribute attribute = new DefaultTransactionAttribute();
176+
Set<String> labels = Set.of("mongo:maxCommitTime=PT5S", "mongo:readConcern=majority",
177+
"mongo:readPreference=primaryPreferred", "mongo:writeConcern=w3");
178+
attribute.setLabels(labels);
179+
180+
TransactionOptions result = MongoTransactionUtils.extractOptions(attribute, fallbackOptions);
181+
182+
assertThat(result).isNotSameAs(fallbackOptions) //
183+
.returns(5L, from(options -> options.getMaxCommitTime(TimeUnit.SECONDS))) //
184+
.returns(ReadConcern.MAJORITY, from(TransactionOptions::getReadConcern)) //
185+
.returns(ReadPreference.primaryPreferred(), from(TransactionOptions::getReadPreference)) //
186+
.returns(WriteConcern.W3, from(TransactionOptions::getWriteConcern));
187+
}
188+
189+
@Test // GH-1628
190+
public void shouldReturnMergedOptionsIfLabelsContainOptionsMixedWithOrdinaryStrings() {
191+
TransactionOptions fallbackOptions = getTransactionOptions();
192+
DefaultTransactionAttribute attribute = new DefaultTransactionAttribute();
193+
Set<String> labels = Set.of("mongo:maxCommitTime=PT5S", "mongo:nonExistentOption=value", "label",
194+
"mongo:writeConcern=w3");
195+
attribute.setLabels(labels);
196+
197+
TransactionOptions result = MongoTransactionUtils.extractOptions(attribute, fallbackOptions);
198+
199+
assertThat(result).isNotSameAs(fallbackOptions) //
200+
.returns(5L, from(options -> options.getMaxCommitTime(TimeUnit.SECONDS))) //
201+
.returns(ReadConcern.AVAILABLE, from(TransactionOptions::getReadConcern)) //
202+
.returns(ReadPreference.secondaryPreferred(), from(TransactionOptions::getReadPreference)) //
203+
.returns(WriteConcern.W3, from(TransactionOptions::getWriteConcern));
204+
}
205+
206+
@Test // GH-1628
207+
public void shouldReturnNewOptionsIFallbackIsNull() {
208+
DefaultTransactionAttribute attribute = new DefaultTransactionAttribute();
209+
Set<String> labels = Set.of("mongo:maxCommitTime=PT5S", "mongo:writeConcern=w3");
210+
attribute.setLabels(labels);
211+
212+
TransactionOptions result = MongoTransactionUtils.extractOptions(attribute, null);
213+
214+
assertThat(result).returns(5L, from(options -> options.getMaxCommitTime(TimeUnit.SECONDS))) //
215+
.returns(null, from(TransactionOptions::getReadConcern)) //
216+
.returns(null, from(TransactionOptions::getReadPreference)) //
217+
.returns(WriteConcern.W3, from(TransactionOptions::getWriteConcern));
218+
}
219+
220+
private TransactionOptions getTransactionOptions() {
221+
return TransactionOptions.builder() //
222+
.maxCommitTime(1L, TimeUnit.MINUTES) //
223+
.readConcern(ReadConcern.AVAILABLE) //
224+
.readPreference(ReadPreference.secondaryPreferred()) //
225+
.writeConcern(WriteConcern.UNACKNOWLEDGED).build();
226+
}
227+
}

0 commit comments

Comments
 (0)