diff --git a/pom.xml b/pom.xml index 366786fc6d..2cc19b9202 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ org.springframework.data spring-data-mongodb-parent - 4.2.0-SNAPSHOT + 4.2.0-GH-4429-SNAPSHOT pom Spring Data MongoDB diff --git a/spring-data-mongodb-benchmarks/pom.xml b/spring-data-mongodb-benchmarks/pom.xml index 2de4b6b635..6d5d01072c 100644 --- a/spring-data-mongodb-benchmarks/pom.xml +++ b/spring-data-mongodb-benchmarks/pom.xml @@ -7,7 +7,7 @@ org.springframework.data spring-data-mongodb-parent - 4.2.0-SNAPSHOT + 4.2.0-GH-4429-SNAPSHOT ../pom.xml diff --git a/spring-data-mongodb-distribution/pom.xml b/spring-data-mongodb-distribution/pom.xml index 41b81f9aa6..51c090412a 100644 --- a/spring-data-mongodb-distribution/pom.xml +++ b/spring-data-mongodb-distribution/pom.xml @@ -15,7 +15,7 @@ org.springframework.data spring-data-mongodb-parent - 4.2.0-SNAPSHOT + 4.2.0-GH-4429-SNAPSHOT ../pom.xml diff --git a/spring-data-mongodb/pom.xml b/spring-data-mongodb/pom.xml index dc07f13ccc..b428934787 100644 --- a/spring-data-mongodb/pom.xml +++ b/spring-data-mongodb/pom.xml @@ -13,7 +13,7 @@ org.springframework.data spring-data-mongodb-parent - 4.2.0-SNAPSHOT + 4.2.0-GH-4429-SNAPSHOT ../pom.xml diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoOperations.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoOperations.java index 51d93db0ec..409bf725a7 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoOperations.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoOperations.java @@ -18,6 +18,7 @@ import java.util.Collection; import java.util.List; import java.util.Set; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.Consumer; import java.util.function.Supplier; import java.util.stream.Stream; @@ -46,6 +47,7 @@ import org.springframework.data.mongodb.core.query.Query; import org.springframework.data.mongodb.core.query.Update; import org.springframework.data.mongodb.core.query.UpdateDefinition; +import org.springframework.data.mongodb.util.Lock; import org.springframework.lang.Nullable; import org.springframework.util.Assert; import org.springframework.util.ClassUtils; @@ -188,17 +190,18 @@ default SessionScoped withSession(Supplier sessionProvider) { return new SessionScoped() { - private final Object lock = new Object(); - private @Nullable ClientSession session = null; + private final Lock lock = Lock.of(new ReentrantLock()); + private @Nullable ClientSession session; @Override public T execute(SessionCallback action, Consumer onComplete) { - synchronized (lock) { + lock.executeWithoutResult(() -> { + if (session == null) { session = sessionProvider.get(); } - } + }); try { return action.doInSession(MongoOperations.this.withSession(session)); @@ -943,8 +946,8 @@ default List findDistinct(Query query, String field, String collection, C * Triggers findAndModify * to apply provided {@link Update} on documents matching {@link Criteria} of given {@link Query}. * - * @param query the {@link Query} class that specifies the {@link Criteria} used to find a document and also an optional - * fields specification. Must not be {@literal null}. + * @param query the {@link Query} class that specifies the {@link Criteria} used to find a document and also an + * optional fields specification. Must not be {@literal null}. * @param update the {@link UpdateDefinition} to apply on matching documents. Must not be {@literal null}. * @param entityClass the parametrized type. Must not be {@literal null}. * @return the converted object that was updated before it was updated or {@literal null}, if not found. @@ -959,8 +962,8 @@ default List findDistinct(Query query, String field, String collection, C * Triggers findAndModify * to apply provided {@link Update} on documents matching {@link Criteria} of given {@link Query}. * - * @param query the {@link Query} class that specifies the {@link Criteria} used to find a document and also an optional - * fields specification. Must not be {@literal null}. + * @param query the {@link Query} class that specifies the {@link Criteria} used to find a document and also an + * optional fields specification. Must not be {@literal null}. * @param update the {@link UpdateDefinition} to apply on matching documents. Must not be {@literal null}. * @param entityClass the parametrized type. Must not be {@literal null}. * @param collectionName the collection to query. Must not be {@literal null}. @@ -977,8 +980,8 @@ default List findDistinct(Query query, String field, String collection, C * to apply provided {@link Update} on documents matching {@link Criteria} of given {@link Query} taking * {@link FindAndModifyOptions} into account. * - * @param query the {@link Query} class that specifies the {@link Criteria} used to find a document and also an optional - * fields specification. + * @param query the {@link Query} class that specifies the {@link Criteria} used to find a document and also an + * optional fields specification. * @param update the {@link UpdateDefinition} to apply on matching documents. * @param options the {@link FindAndModifyOptions} holding additional information. * @param entityClass the parametrized type. @@ -997,8 +1000,8 @@ default List findDistinct(Query query, String field, String collection, C * to apply provided {@link Update} on documents matching {@link Criteria} of given {@link Query} taking * {@link FindAndModifyOptions} into account. * - * @param query the {@link Query} class that specifies the {@link Criteria} used to find a document and also an optional - * fields specification. Must not be {@literal null}. + * @param query the {@link Query} class that specifies the {@link Criteria} used to find a document and also an + * optional fields specification. Must not be {@literal null}. * @param update the {@link UpdateDefinition} to apply on matching documents. Must not be {@literal null}. * @param options the {@link FindAndModifyOptions} holding additional information. Must not be {@literal null}. * @param entityClass the parametrized type. Must not be {@literal null}. @@ -1023,8 +1026,8 @@ T findAndModify(Query query, UpdateDefinition update, FindAndModifyOptions o * Options are defaulted to {@link FindAndReplaceOptions#empty()}.
* NOTE: The replacement entity must not hold an {@literal id}. * - * @param query the {@link Query} class that specifies the {@link Criteria} used to find a document and also an optional - * fields specification. Must not be {@literal null}. + * @param query the {@link Query} class that specifies the {@link Criteria} used to find a document and also an + * optional fields specification. Must not be {@literal null}. * @param replacement the replacement document. Must not be {@literal null}. * @return the converted object that was updated or {@literal null}, if not found. * @throws org.springframework.data.mapping.MappingException if the collection name cannot be @@ -1044,8 +1047,8 @@ default T findAndReplace(Query query, T replacement) { * Options are defaulted to {@link FindAndReplaceOptions#empty()}.
* NOTE: The replacement entity must not hold an {@literal id}. * - * @param query the {@link Query} class that specifies the {@link Criteria} used to find a document and also an optional - * fields specification. Must not be {@literal null}. + * @param query the {@link Query} class that specifies the {@link Criteria} used to find a document and also an + * optional fields specification. Must not be {@literal null}. * @param replacement the replacement document. Must not be {@literal null}. * @param collectionName the collection to query. Must not be {@literal null}. * @return the converted object that was updated or {@literal null}, if not found. @@ -1063,8 +1066,8 @@ default T findAndReplace(Query query, T replacement, String collectionName) * taking {@link FindAndReplaceOptions} into account.
* NOTE: The replacement entity must not hold an {@literal id}. * - * @param query the {@link Query} class that specifies the {@link Criteria} used to find a document and also an optional - * fields specification. Must not be {@literal null}. + * @param query the {@link Query} class that specifies the {@link Criteria} used to find a document and also an + * optional fields specification. Must not be {@literal null}. * @param replacement the replacement document. Must not be {@literal null}. * @param options the {@link FindAndModifyOptions} holding additional information. Must not be {@literal null}. * @return the converted object that was updated or {@literal null}, if not found. Depending on the value of @@ -1086,8 +1089,8 @@ default T findAndReplace(Query query, T replacement, FindAndReplaceOptions o * taking {@link FindAndReplaceOptions} into account.
* NOTE: The replacement entity must not hold an {@literal id}. * - * @param query the {@link Query} class that specifies the {@link Criteria} used to find a document and also an optional - * fields specification. Must not be {@literal null}. + * @param query the {@link Query} class that specifies the {@link Criteria} used to find a document and also an + * optional fields specification. Must not be {@literal null}. * @param replacement the replacement document. Must not be {@literal null}. * @param options the {@link FindAndModifyOptions} holding additional information. Must not be {@literal null}. * @return the converted object that was updated or {@literal null}, if not found. Depending on the value of @@ -1109,8 +1112,8 @@ default T findAndReplace(Query query, T replacement, FindAndReplaceOptions o * taking {@link FindAndReplaceOptions} into account.
* NOTE: The replacement entity must not hold an {@literal id}. * - * @param query the {@link Query} class that specifies the {@link Criteria} used to find a document and also an optional - * fields specification. Must not be {@literal null}. + * @param query the {@link Query} class that specifies the {@link Criteria} used to find a document and also an + * optional fields specification. Must not be {@literal null}. * @param replacement the replacement document. Must not be {@literal null}. * @param options the {@link FindAndModifyOptions} holding additional information. Must not be {@literal null}. * @param entityType the parametrized type. Must not be {@literal null}. @@ -1134,8 +1137,8 @@ default T findAndReplace(Query query, T replacement, FindAndReplaceOptions o * taking {@link FindAndReplaceOptions} into account.
* NOTE: The replacement entity must not hold an {@literal id}. * - * @param query the {@link Query} class that specifies the {@link Criteria} used to find a document and also an optional - * fields specification. Must not be {@literal null}. + * @param query the {@link Query} class that specifies the {@link Criteria} used to find a document and also an + * optional fields specification. Must not be {@literal null}. * @param replacement the replacement document. Must not be {@literal null}. * @param options the {@link FindAndModifyOptions} holding additional information. Must not be {@literal null}. * @param entityType the type used for mapping the {@link Query} to domain type fields and deriving the collection @@ -1164,8 +1167,8 @@ default T findAndReplace(Query query, S replacement, FindAndReplaceOption * taking {@link FindAndReplaceOptions} into account.
* NOTE: The replacement entity must not hold an {@literal id}. * - * @param query the {@link Query} class that specifies the {@link Criteria} used to find a document and also an optional - * fields specification. Must not be {@literal null}. + * @param query the {@link Query} class that specifies the {@link Criteria} used to find a document and also an + * optional fields specification. Must not be {@literal null}. * @param replacement the replacement document. Must not be {@literal null}. * @param options the {@link FindAndModifyOptions} holding additional information. Must not be {@literal null}. * @param entityType the type used for mapping the {@link Query} to domain type fields. Must not be {@literal null}. @@ -1673,7 +1676,8 @@ default long exactCount(Query query, String collectionName) { * acknowledged} remove operation was successful or not. * * @param object must not be {@literal null}. - * @param collectionName name of the collection where the documents will be removed from, must not be {@literal null} or empty. + * @param collectionName name of the collection where the documents will be removed from, must not be {@literal null} + * or empty. * @return the {@link DeleteResult} which lets you access the results of the previous delete. */ DeleteResult remove(Object object, String collectionName); @@ -1697,7 +1701,8 @@ default long exactCount(Query query, String collectionName) { * * @param query the query document that specifies the criteria used to remove a document. * @param entityClass class of the pojo to be operated on. Can be {@literal null}. - * @param collectionName name of the collection where the documents will be removed from, must not be {@literal null} or empty. + * @param collectionName name of the collection where the documents will be removed from, must not be {@literal null} + * or empty. * @return the {@link DeleteResult} which lets you access the results of the previous delete. * @throws IllegalArgumentException when {@literal query}, {@literal entityClass} or {@literal collectionName} is * {@literal null}. @@ -1711,7 +1716,8 @@ default long exactCount(Query query, String collectionName) { * information. Use {@link #remove(Query, Class, String)} to get full type specific support. * * @param query the query document that specifies the criteria used to remove a document. - * @param collectionName name of the collection where the documents will be removed from, must not be {@literal null} or empty. + * @param collectionName name of the collection where the documents will be removed from, must not be {@literal null} + * or empty. * @return the {@link DeleteResult} which lets you access the results of the previous delete. * @throws IllegalArgumentException when {@literal query} or {@literal collectionName} is {@literal null}. */ @@ -1723,7 +1729,8 @@ default long exactCount(Query query, String collectionName) { * information. Use {@link #findAllAndRemove(Query, Class, String)} to get full type specific support. * * @param query the query document that specifies the criteria used to find and remove documents. - * @param collectionName name of the collection where the documents will be removed from, must not be {@literal null} or empty. + * @param collectionName name of the collection where the documents will be removed from, must not be {@literal null} + * or empty. * @return the {@link List} converted objects deleted by this operation. * @since 1.5 */ @@ -1748,7 +1755,8 @@ default long exactCount(Query query, String collectionName) { * * @param query the query document that specifies the criteria used to find and remove documents. * @param entityClass class of the pojo to be operated on. - * @param collectionName name of the collection where the documents will be removed from, must not be {@literal null} or empty. + * @param collectionName name of the collection where the documents will be removed from, must not be {@literal null} + * or empty. * @return the {@link List} converted objects deleted by this operation. * @since 1.5 */ diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/convert/LazyLoadingProxyFactory.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/convert/LazyLoadingProxyFactory.java index 0c14b27970..05f337a8e3 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/convert/LazyLoadingProxyFactory.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/convert/LazyLoadingProxyFactory.java @@ -22,6 +22,8 @@ import java.io.ObjectOutputStream; import java.io.Serializable; import java.lang.reflect.Method; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Supplier; import org.aopalliance.intercept.MethodInterceptor; @@ -39,6 +41,8 @@ import org.springframework.data.mongodb.ClientSessionException; import org.springframework.data.mongodb.LazyLoadingException; import org.springframework.data.mongodb.core.mapping.MongoPersistentProperty; +import org.springframework.data.mongodb.util.Lock; +import org.springframework.data.mongodb.util.Lock.AcquiredLock; import org.springframework.lang.Nullable; import org.springframework.objenesis.SpringObjenesis; import org.springframework.util.ReflectionUtils; @@ -134,7 +138,8 @@ public Object createLazyLoadingProxy(MongoPersistentProperty property, DbRefReso } return prepareProxyFactory(propertyType, - () -> new LazyLoadingInterceptor(property, callback, source, exceptionTranslator)).getProxy(LazyLoadingProxy.class.getClassLoader()); + () -> new LazyLoadingInterceptor(property, callback, source, exceptionTranslator)) + .getProxy(LazyLoadingProxy.class.getClassLoader()); } /** @@ -171,6 +176,10 @@ public static class LazyLoadingInterceptor } } + private final ReadWriteLock rwLock = new ReentrantReadWriteLock(); + private final Lock readLock = Lock.of(rwLock.readLock()); + private final Lock writeLock = Lock.of(rwLock.writeLock()); + private final MongoPersistentProperty property; private final DbRefResolverCallback callback; private final Object source; @@ -339,25 +348,26 @@ private void readObject(ObjectInputStream in) throws IOException { } @Nullable - private synchronized Object resolve() { + private Object resolve() { - if (resolved) { + try (AcquiredLock l = readLock.lock()) { + if (resolved) { - if (LOGGER.isTraceEnabled()) { - LOGGER.trace(String.format("Accessing already resolved lazy loading property %s.%s", - property.getOwner() != null ? property.getOwner().getName() : "unknown", property.getName())); + if (LOGGER.isTraceEnabled()) { + LOGGER.trace(String.format("Accessing already resolved lazy loading property %s.%s", + property.getOwner() != null ? property.getOwner().getName() : "unknown", property.getName())); + } + return result; } - return result; } - try { - if (LOGGER.isTraceEnabled()) { - LOGGER.trace(String.format("Resolving lazy loading property %s.%s", - property.getOwner() != null ? property.getOwner().getName() : "unknown", property.getName())); - } - - return callback.resolve(property); + if (LOGGER.isTraceEnabled()) { + LOGGER.trace(String.format("Resolving lazy loading property %s.%s", + property.getOwner() != null ? property.getOwner().getName() : "unknown", property.getName())); + } + try { + return writeLock.execute(() -> callback.resolve(property)); } catch (RuntimeException ex) { DataAccessException translatedException = exceptionTranslator.translateExceptionIfPossible(ex); @@ -370,6 +380,7 @@ private synchronized Object resolve() { translatedException != null ? translatedException : ex); } } + } } diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/CursorReadingTask.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/CursorReadingTask.java index cc0c64eeaf..f20fb22bdd 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/CursorReadingTask.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/CursorReadingTask.java @@ -18,12 +18,14 @@ import java.time.Duration; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.Supplier; import org.springframework.dao.DataAccessResourceFailureException; import org.springframework.data.mongodb.core.MongoTemplate; import org.springframework.data.mongodb.core.messaging.Message.MessageProperties; import org.springframework.data.mongodb.core.messaging.SubscriptionRequest.RequestOptions; +import org.springframework.data.mongodb.util.Lock; import org.springframework.lang.Nullable; import org.springframework.util.Assert; import org.springframework.util.ErrorHandler; @@ -39,7 +41,7 @@ */ abstract class CursorReadingTask implements Task { - private final Object lifecycleMonitor = new Object(); + private final Lock lock = Lock.of(new ReentrantLock()); private final MongoTemplate template; private final SubscriptionRequest request; @@ -86,19 +88,14 @@ public void run() { } } catch (InterruptedException e) { - synchronized (lifecycleMonitor) { - state = State.CANCELLED; - } + lock.executeWithoutResult(() -> state = State.CANCELLED); Thread.currentThread().interrupt(); break; } } } catch (RuntimeException e) { - synchronized (lifecycleMonitor) { - state = State.CANCELLED; - } - + lock.executeWithoutResult(() -> state = State.CANCELLED); errorHandler.handleError(e); } } @@ -114,30 +111,30 @@ public void run() { */ private void start() { - synchronized (lifecycleMonitor) { + lock.executeWithoutResult(() -> { if (!State.RUNNING.equals(state)) { state = State.STARTING; } - } + }); do { - boolean valid = false; + boolean valid = lock.execute(() -> { - synchronized (lifecycleMonitor) { - - if (State.STARTING.equals(state)) { + if (!State.STARTING.equals(state)) { + return false; + } - MongoCursor cursor = execute(() -> initCursor(template, request.getRequestOptions(), targetType)); - valid = isValidCursor(cursor); - if (valid) { - this.cursor = cursor; - state = State.RUNNING; - } else if (cursor != null) { - cursor.close(); - } + MongoCursor cursor = execute(() -> initCursor(template, request.getRequestOptions(), targetType)); + boolean isValid = isValidCursor(cursor); + if (isValid) { + this.cursor = cursor; + state = State.RUNNING; + } else if (cursor != null) { + cursor.close(); } - } + return isValid; + }); if (!valid) { @@ -145,9 +142,7 @@ private void start() { Thread.sleep(100); } catch (InterruptedException e) { - synchronized (lifecycleMonitor) { - state = State.CANCELLED; - } + lock.executeWithoutResult(() -> state = State.CANCELLED); Thread.currentThread().interrupt(); } } @@ -163,7 +158,7 @@ private void start() { @Override public void cancel() throws DataAccessResourceFailureException { - synchronized (lifecycleMonitor) { + lock.executeWithoutResult(() -> { if (State.RUNNING.equals(state) || State.STARTING.equals(state)) { this.state = State.CANCELLED; @@ -171,7 +166,7 @@ public void cancel() throws DataAccessResourceFailureException { cursor.close(); } } - } + }); } @Override @@ -181,10 +176,7 @@ public boolean isLongLived() { @Override public State getState() { - - synchronized (lifecycleMonitor) { - return state; - } + return lock.execute(() -> state); } @Override @@ -220,13 +212,12 @@ private void emitMessage(Message message) { @Nullable private T getNext() { - synchronized (lifecycleMonitor) { + return lock.execute(() -> { if (State.RUNNING.equals(state)) { return cursor.tryNext(); } - } - - throw new IllegalStateException(String.format("Cursor %s is not longer open", cursor)); + throw new IllegalStateException(String.format("Cursor %s is not longer open", cursor)); + }); } private static boolean isValidCursor(@Nullable MongoCursor cursor) { @@ -263,4 +254,5 @@ private V execute(Supplier callback) { throw translated != null ? translated : e; } } + } diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/DefaultMessageListenerContainer.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/DefaultMessageListenerContainer.java index 7eb088c491..47aa59ce35 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/DefaultMessageListenerContainer.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/DefaultMessageListenerContainer.java @@ -20,6 +20,8 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.Executor; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -27,6 +29,7 @@ import org.springframework.dao.DataAccessResourceFailureException; import org.springframework.data.mongodb.core.MongoTemplate; import org.springframework.data.mongodb.core.messaging.SubscriptionRequest.RequestOptions; +import org.springframework.data.mongodb.util.Lock; import org.springframework.lang.Nullable; import org.springframework.util.Assert; import org.springframework.util.ErrorHandler; @@ -35,8 +38,7 @@ /** * Simple {@link Executor} based {@link MessageListenerContainer} implementation for running {@link Task tasks} like * listening to MongoDB Change Streams and tailable - * cursors. - *
+ * cursors.
* This message container creates long-running tasks that are executed on {@link Executor}. * * @author Christoph Strobl @@ -49,9 +51,16 @@ public class DefaultMessageListenerContainer implements MessageListenerContainer private final TaskFactory taskFactory; private final Optional errorHandler; - private final Object lifecycleMonitor = new Object(); private final Map subscriptions = new LinkedHashMap<>(); + private final ReadWriteLock lifecycleMonitor = new ReentrantReadWriteLock(); + private final Lock lifecycleRead = Lock.of(lifecycleMonitor.readLock()); + private final Lock lifecycleWrite = Lock.of(lifecycleMonitor.readLock()); + + private final ReadWriteLock subscriptionMonitor = new ReentrantReadWriteLock(); + private final Lock subscriptionRead = Lock.of(subscriptionMonitor.readLock()); + private final Lock subscriptionWrite = Lock.of(subscriptionMonitor.readLock()); + private boolean running = false; /** @@ -109,43 +118,33 @@ public void stop(Runnable callback) { @Override public void start() { - synchronized (lifecycleMonitor) { + lifecycleWrite.executeWithoutResult(() -> { + if (!this.running) { + subscriptions.values().stream() // + .filter(it -> !it.isActive()) // + .filter(TaskSubscription.class::isInstance) // + .map(TaskSubscription.class::cast) // + .map(TaskSubscription::getTask) // + .forEach(taskExecutor::execute); - if (this.running) { - return; + running = true; } - - subscriptions.values().stream() // - .filter(it -> !it.isActive()) // - .filter(TaskSubscription.class::isInstance) // - .map(TaskSubscription.class::cast) // - .map(TaskSubscription::getTask) // - .forEach(taskExecutor::execute); - - running = true; - } + }); } @Override public void stop() { - - synchronized (lifecycleMonitor) { - + lifecycleWrite.executeWithoutResult(() -> { if (this.running) { - subscriptions.values().forEach(Cancelable::cancel); - running = false; } - } + }); } @Override public boolean isRunning() { - - synchronized (this.lifecycleMonitor) { - return running; - } + return lifecycleRead.execute(() -> running); } @Override @@ -170,36 +169,30 @@ public Subscription register(SubscriptionRequest lookup(SubscriptionRequest request) { - - synchronized (lifecycleMonitor) { - return Optional.ofNullable(subscriptions.get(request)); - } + return subscriptionRead.execute(() -> Optional.ofNullable(subscriptions.get(request))); } public Subscription register(SubscriptionRequest request, Task task) { - Subscription subscription = new TaskSubscription(task); - - synchronized (lifecycleMonitor) { - + return subscriptionWrite.execute(() -> { if (subscriptions.containsKey(request)) { return subscriptions.get(request); } + Subscription subscription = new TaskSubscription(task); this.subscriptions.put(request, subscription); - if (this.running) { + if (this.isRunning()) { taskExecutor.execute(task); } - } + return subscription; + }); - return subscription; } @Override public void remove(Subscription subscription) { - - synchronized (lifecycleMonitor) { + subscriptionWrite.executeWithoutResult(() -> { if (subscriptions.containsValue(subscription)) { @@ -209,7 +202,7 @@ public void remove(Subscription subscription) { subscriptions.values().remove(subscription); } - } + }); } /** diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/util/DefaultLock.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/util/DefaultLock.java new file mode 100644 index 0000000000..4d34e857fd --- /dev/null +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/util/DefaultLock.java @@ -0,0 +1,44 @@ +/* + * Copyright 2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.mongodb.util; + +import org.springframework.data.mongodb.util.Lock.AcquiredLock; + +/** + * Default {@link Lock} implementation. + * + * @author Mark Paluch + * @since 3.2 + */ +class DefaultLock implements Lock, AcquiredLock { + + private final java.util.concurrent.locks.Lock delegate; + + DefaultLock(java.util.concurrent.locks.Lock delegate) { + this.delegate = delegate; + } + + @Override + public AcquiredLock lock() { + delegate.lock(); + return this; + } + + @Override + public void close() { + delegate.unlock(); + } +} diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/util/Lock.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/util/Lock.java new file mode 100644 index 0000000000..608ec23728 --- /dev/null +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/util/Lock.java @@ -0,0 +1,96 @@ +/* + * Copyright 2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.mongodb.util; + +import java.util.concurrent.locks.Condition; +import java.util.function.Supplier; + +import org.springframework.util.Assert; + +/** + * {@code Lock} provides more extensive locking operations than can be obtained using {@code synchronized} methods and + * {@link java.util.concurrent.locks.Lock}. It allows more flexible structuring and may support multiple associated + * {@link Condition} objects. + * + * @author Mark Paluch + * @since 4.2 + */ +public interface Lock { + + /** + * Create a new {@link Lock} adapter for the given {@link java.util.concurrent.locks.Lock delegate}. + * + * @param delegate must not be {@literal null}. + * @return a new {@link Lock} adapter. + */ + static Lock of(java.util.concurrent.locks.Lock delegate) { + + Assert.notNull(delegate, "Lock delegate must not be null"); + + return new DefaultLock(delegate); + } + + /** + * Acquires the lock. + *

+ * If the lock is not available then the current thread becomes disabled for thread scheduling purposes and lies + * dormant until the lock has been acquired. + * + * @see java.util.concurrent.locks.Lock#lock() + */ + AcquiredLock lock(); + + /** + * Execute the action specified by the given callback object guarded by a lock and return its result. + * + * @param action the action to run. + * @return the result of the action. + * @param type of the result. + * @throws RuntimeException if thrown by the action + */ + default T execute(Supplier action) { + try (AcquiredLock l = lock()) { + return action.get(); + } + } + + /** + * Execute the action specified by the given callback object guarded by a lock. + * + * @param action the action to run. + * @throws RuntimeException if thrown by the action + */ + default void executeWithoutResult(Runnable action) { + try (AcquiredLock l = lock()) { + action.run(); + } + } + + /** + * An acquired lock can be used with try-with-resources for easier releasing. + */ + interface AcquiredLock extends AutoCloseable { + + /** + * Releases the lock. + * + * @see java.util.concurrent.locks.Lock#unlock() + */ + @Override + void close(); + } + +}