From b3d9fb6ea31fe4d928020a54d92c8c0fd0573f74 Mon Sep 17 00:00:00 2001 From: Mark Paluch Date: Fri, 15 Sep 2023 11:30:14 +0200 Subject: [PATCH 1/5] Release version 4.2 M3 (2023.1.0). See #4479 --- pom.xml | 2 +- spring-data-mongodb-benchmarks/pom.xml | 2 +- spring-data-mongodb-distribution/pom.xml | 2 +- spring-data-mongodb/pom.xml | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/pom.xml b/pom.xml index 366786fc6d..7ac4849e6a 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ org.springframework.data spring-data-mongodb-parent - 4.2.0-SNAPSHOT + 4.2.0-M3 pom Spring Data MongoDB diff --git a/spring-data-mongodb-benchmarks/pom.xml b/spring-data-mongodb-benchmarks/pom.xml index 2de4b6b635..c042302051 100644 --- a/spring-data-mongodb-benchmarks/pom.xml +++ b/spring-data-mongodb-benchmarks/pom.xml @@ -7,7 +7,7 @@ org.springframework.data spring-data-mongodb-parent - 4.2.0-SNAPSHOT + 4.2.0-M3 ../pom.xml diff --git a/spring-data-mongodb-distribution/pom.xml b/spring-data-mongodb-distribution/pom.xml index 41b81f9aa6..42aecc619c 100644 --- a/spring-data-mongodb-distribution/pom.xml +++ b/spring-data-mongodb-distribution/pom.xml @@ -15,7 +15,7 @@ org.springframework.data spring-data-mongodb-parent - 4.2.0-SNAPSHOT + 4.2.0-M3 ../pom.xml diff --git a/spring-data-mongodb/pom.xml b/spring-data-mongodb/pom.xml index dc07f13ccc..ff54006b6f 100644 --- a/spring-data-mongodb/pom.xml +++ b/spring-data-mongodb/pom.xml @@ -13,7 +13,7 @@ org.springframework.data spring-data-mongodb-parent - 4.2.0-SNAPSHOT + 4.2.0-M3 ../pom.xml From d17722f2c5e83cfdbbb9f0dcb6433fb8c0f00cb1 Mon Sep 17 00:00:00 2001 From: Mark Paluch Date: Wed, 27 Sep 2023 11:48:04 +0200 Subject: [PATCH 2/5] Prepare issue branch. --- pom.xml | 2 +- spring-data-mongodb-benchmarks/pom.xml | 2 +- spring-data-mongodb-distribution/pom.xml | 2 +- spring-data-mongodb/pom.xml | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/pom.xml b/pom.xml index 7ac4849e6a..2cc19b9202 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ org.springframework.data spring-data-mongodb-parent - 4.2.0-M3 + 4.2.0-GH-4429-SNAPSHOT pom Spring Data MongoDB diff --git a/spring-data-mongodb-benchmarks/pom.xml b/spring-data-mongodb-benchmarks/pom.xml index c042302051..6d5d01072c 100644 --- a/spring-data-mongodb-benchmarks/pom.xml +++ b/spring-data-mongodb-benchmarks/pom.xml @@ -7,7 +7,7 @@ org.springframework.data spring-data-mongodb-parent - 4.2.0-M3 + 4.2.0-GH-4429-SNAPSHOT ../pom.xml diff --git a/spring-data-mongodb-distribution/pom.xml b/spring-data-mongodb-distribution/pom.xml index 42aecc619c..51c090412a 100644 --- a/spring-data-mongodb-distribution/pom.xml +++ b/spring-data-mongodb-distribution/pom.xml @@ -15,7 +15,7 @@ org.springframework.data spring-data-mongodb-parent - 4.2.0-M3 + 4.2.0-GH-4429-SNAPSHOT ../pom.xml diff --git a/spring-data-mongodb/pom.xml b/spring-data-mongodb/pom.xml index ff54006b6f..b428934787 100644 --- a/spring-data-mongodb/pom.xml +++ b/spring-data-mongodb/pom.xml @@ -13,7 +13,7 @@ org.springframework.data spring-data-mongodb-parent - 4.2.0-M3 + 4.2.0-GH-4429-SNAPSHOT ../pom.xml From 0bf129e9ff8eddf03510fb0fa562e0e2adbc9e2f Mon Sep 17 00:00:00 2001 From: Christoph Strobl Date: Tue, 27 Jun 2023 12:06:39 +0200 Subject: [PATCH 3/5] Refine locking. --- .../data/mongodb/core/MongoOperations.java | 13 +++- .../core/convert/LazyLoadingProxyFactory.java | 13 +++- .../core/messaging/CursorReadingTask.java | 51 +++++++++----- .../DefaultMessageListenerContainer.java | 66 ++++++++++++------- 4 files changed, 97 insertions(+), 46 deletions(-) diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoOperations.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoOperations.java index 51d93db0ec..db419a25e3 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoOperations.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoOperations.java @@ -18,8 +18,12 @@ import java.util.Collection; import java.util.List; import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.Consumer; import java.util.function.Supplier; +import java.util.function.UnaryOperator; import java.util.stream.Stream; import org.bson.Document; @@ -188,16 +192,19 @@ default SessionScoped withSession(Supplier sessionProvider) { return new SessionScoped() { - private final Object lock = new Object(); - private @Nullable ClientSession session = null; + private final Lock lock = new ReentrantLock(); + private @Nullable ClientSession session; @Override public T execute(SessionCallback action, Consumer onComplete) { - synchronized (lock) { + lock.lock(); + try { if (session == null) { session = sessionProvider.get(); } + } finally { + lock.unlock(); } try { diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/convert/LazyLoadingProxyFactory.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/convert/LazyLoadingProxyFactory.java index 0c14b27970..bec7ab3b76 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/convert/LazyLoadingProxyFactory.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/convert/LazyLoadingProxyFactory.java @@ -22,6 +22,10 @@ import java.io.ObjectOutputStream; import java.io.Serializable; import java.lang.reflect.Method; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Supplier; import org.aopalliance.intercept.MethodInterceptor; @@ -171,6 +175,8 @@ public static class LazyLoadingInterceptor } } + private final ReadWriteLock lock = new ReentrantReadWriteLock(); + private final MongoPersistentProperty property; private final DbRefResolverCallback callback; private final Object source; @@ -339,8 +345,9 @@ private void readObject(ObjectInputStream in) throws IOException { } @Nullable - private synchronized Object resolve() { + private Object resolve() { + lock.readLock().lock(); if (resolved) { if (LOGGER.isTraceEnabled()) { @@ -349,6 +356,7 @@ private synchronized Object resolve() { } return result; } + lock.readLock().unlock(); try { if (LOGGER.isTraceEnabled()) { @@ -356,6 +364,7 @@ private synchronized Object resolve() { property.getOwner() != null ? property.getOwner().getName() : "unknown", property.getName())); } + lock.writeLock().lock(); return callback.resolve(property); } catch (RuntimeException ex) { @@ -368,6 +377,8 @@ private synchronized Object resolve() { throw new LazyLoadingException("Unable to lazily resolve DBRef", translatedException != null ? translatedException : ex); + } finally { + lock.writeLock().unlock(); } } } diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/CursorReadingTask.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/CursorReadingTask.java index cc0c64eeaf..1d6d81be62 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/CursorReadingTask.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/CursorReadingTask.java @@ -18,6 +18,8 @@ import java.time.Duration; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.Supplier; import org.springframework.dao.DataAccessResourceFailureException; @@ -39,7 +41,7 @@ */ abstract class CursorReadingTask implements Task { - private final Object lifecycleMonitor = new Object(); + private final Lock lock = new ReentrantLock(); private final MongoTemplate template; private final SubscriptionRequest request; @@ -86,18 +88,18 @@ public void run() { } } catch (InterruptedException e) { - synchronized (lifecycleMonitor) { - state = State.CANCELLED; - } + lock.lock(); + state = State.CANCELLED; + lock.unlock(); Thread.currentThread().interrupt(); break; } } } catch (RuntimeException e) { - synchronized (lifecycleMonitor) { - state = State.CANCELLED; - } + lock.lock(); + state = State.CANCELLED; + lock.unlock(); errorHandler.handleError(e); } @@ -114,18 +116,19 @@ public void run() { */ private void start() { - synchronized (lifecycleMonitor) { - if (!State.RUNNING.equals(state)) { - state = State.STARTING; - } + lock.lock(); + if (!State.RUNNING.equals(state)) { + state = State.STARTING; } + lock.unlock(); do { boolean valid = false; - synchronized (lifecycleMonitor) { + lock.lock(); + try { if (State.STARTING.equals(state)) { MongoCursor cursor = execute(() -> initCursor(template, request.getRequestOptions(), targetType)); @@ -137,6 +140,8 @@ private void start() { cursor.close(); } } + } finally { + lock.unlock(); } if (!valid) { @@ -145,9 +150,9 @@ private void start() { Thread.sleep(100); } catch (InterruptedException e) { - synchronized (lifecycleMonitor) { - state = State.CANCELLED; - } + lock.lock(); + state = State.CANCELLED; + lock.unlock(); Thread.currentThread().interrupt(); } } @@ -163,7 +168,9 @@ private void start() { @Override public void cancel() throws DataAccessResourceFailureException { - synchronized (lifecycleMonitor) { + lock.lock(); + + try { if (State.RUNNING.equals(state) || State.STARTING.equals(state)) { this.state = State.CANCELLED; @@ -171,6 +178,8 @@ public void cancel() throws DataAccessResourceFailureException { cursor.close(); } } + } finally { + lock.unlock(); } } @@ -182,8 +191,11 @@ public boolean isLongLived() { @Override public State getState() { - synchronized (lifecycleMonitor) { + lock.lock(); + try { return state; + } finally { + lock.unlock(); } } @@ -220,10 +232,13 @@ private void emitMessage(Message message) { @Nullable private T getNext() { - synchronized (lifecycleMonitor) { + lock.lock(); + try{ if (State.RUNNING.equals(state)) { return cursor.tryNext(); } + } finally { + lock.unlock(); } throw new IllegalStateException(String.format("Cursor %s is not longer open", cursor)); diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/DefaultMessageListenerContainer.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/DefaultMessageListenerContainer.java index 7eb088c491..a38a02b186 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/DefaultMessageListenerContainer.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/DefaultMessageListenerContainer.java @@ -20,6 +20,8 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.Executor; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -49,9 +51,11 @@ public class DefaultMessageListenerContainer implements MessageListenerContainer private final TaskFactory taskFactory; private final Optional errorHandler; - private final Object lifecycleMonitor = new Object(); private final Map subscriptions = new LinkedHashMap<>(); + ReadWriteLock lifecycleMonitor = new ReentrantReadWriteLock(); + ReadWriteLock subscriptionMonitor = new ReentrantReadWriteLock(); + private boolean running = false; /** @@ -109,42 +113,47 @@ public void stop(Runnable callback) { @Override public void start() { - synchronized (lifecycleMonitor) { - - if (this.running) { - return; + lifecycleMonitor.writeLock().lock(); + try { + if (!this.running) { + subscriptions.values().stream() // + .filter(it -> !it.isActive()) // + .filter(TaskSubscription.class::isInstance) // + .map(TaskSubscription.class::cast) // + .map(TaskSubscription::getTask) // + .forEach(taskExecutor::execute); + + running = true; } - - subscriptions.values().stream() // - .filter(it -> !it.isActive()) // - .filter(TaskSubscription.class::isInstance) // - .map(TaskSubscription.class::cast) // - .map(TaskSubscription::getTask) // - .forEach(taskExecutor::execute); - - running = true; + } finally { + lifecycleMonitor.writeLock().unlock(); } } @Override public void stop() { - synchronized (lifecycleMonitor) { + lifecycleMonitor.writeLock().lock(); + try { if (this.running) { - subscriptions.values().forEach(Cancelable::cancel); - running = false; } + } finally { + lifecycleMonitor.writeLock().unlock(); } + } @Override public boolean isRunning() { - synchronized (this.lifecycleMonitor) { + lifecycleMonitor.writeLock().lock(); + try { return running; + } finally { + lifecycleMonitor.writeLock().unlock(); } } @@ -171,35 +180,42 @@ public Subscription register(SubscriptionRequest lookup(SubscriptionRequest request) { - synchronized (lifecycleMonitor) { + subscriptionMonitor.readLock(); + try { return Optional.ofNullable(subscriptions.get(request)); + } finally { + subscriptionMonitor.readLock().unlock(); } + } public Subscription register(SubscriptionRequest request, Task task) { Subscription subscription = new TaskSubscription(task); - synchronized (lifecycleMonitor) { - + this.subscriptionMonitor.writeLock().lock(); + try { if (subscriptions.containsKey(request)) { return subscriptions.get(request); } this.subscriptions.put(request, subscription); - if (this.running) { + if (this.isRunning()) { taskExecutor.execute(task); } + return subscription; + } finally { + this.subscriptionMonitor.writeLock().unlock(); } - return subscription; } @Override public void remove(Subscription subscription) { - synchronized (lifecycleMonitor) { + this.subscriptionMonitor.writeLock().lock(); + try { if (subscriptions.containsValue(subscription)) { @@ -209,6 +225,8 @@ public void remove(Subscription subscription) { subscriptions.values().remove(subscription); } + } finally { + this.subscriptionMonitor.writeLock().unlock(); } } From 5e7cbe327408ceacc011868ce743749bf262c9ec Mon Sep 17 00:00:00 2001 From: Christoph Strobl Date: Tue, 25 Jul 2023 10:30:00 +0200 Subject: [PATCH 4/5] Update after review --- .../core/convert/LazyLoadingProxyFactory.java | 43 ++++---- .../core/messaging/CursorReadingTask.java | 97 +++++++++---------- .../DefaultMessageListenerContainer.java | 71 ++++++-------- 3 files changed, 105 insertions(+), 106 deletions(-) diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/convert/LazyLoadingProxyFactory.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/convert/LazyLoadingProxyFactory.java index bec7ab3b76..53ff2a0be3 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/convert/LazyLoadingProxyFactory.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/convert/LazyLoadingProxyFactory.java @@ -24,7 +24,6 @@ import java.lang.reflect.Method; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Supplier; @@ -138,7 +137,8 @@ public Object createLazyLoadingProxy(MongoPersistentProperty property, DbRefReso } return prepareProxyFactory(propertyType, - () -> new LazyLoadingInterceptor(property, callback, source, exceptionTranslator)).getProxy(LazyLoadingProxy.class.getClassLoader()); + () -> new LazyLoadingInterceptor(property, callback, source, exceptionTranslator)) + .getProxy(LazyLoadingProxy.class.getClassLoader()); } /** @@ -348,25 +348,26 @@ private void readObject(ObjectInputStream in) throws IOException { private Object resolve() { lock.readLock().lock(); - if (resolved) { + try { + if (resolved) { - if (LOGGER.isTraceEnabled()) { - LOGGER.trace(String.format("Accessing already resolved lazy loading property %s.%s", - property.getOwner() != null ? property.getOwner().getName() : "unknown", property.getName())); + if (LOGGER.isTraceEnabled()) { + LOGGER.trace(String.format("Accessing already resolved lazy loading property %s.%s", + property.getOwner() != null ? property.getOwner().getName() : "unknown", property.getName())); + } + return result; } - return result; + } finally { + lock.readLock().unlock(); } - lock.readLock().unlock(); - - try { - if (LOGGER.isTraceEnabled()) { - LOGGER.trace(String.format("Resolving lazy loading property %s.%s", - property.getOwner() != null ? property.getOwner().getName() : "unknown", property.getName())); - } - lock.writeLock().lock(); - return callback.resolve(property); + if (LOGGER.isTraceEnabled()) { + LOGGER.trace(String.format("Resolving lazy loading property %s.%s", + property.getOwner() != null ? property.getOwner().getName() : "unknown", property.getName())); + } + try { + return executeWhileLocked(lock.writeLock(), () -> callback.resolve(property)); } catch (RuntimeException ex) { DataAccessException translatedException = exceptionTranslator.translateExceptionIfPossible(ex); @@ -377,8 +378,16 @@ private Object resolve() { throw new LazyLoadingException("Unable to lazily resolve DBRef", translatedException != null ? translatedException : ex); + } + } + + private static T executeWhileLocked(Lock lock, Supplier stuff) { + + lock.lock(); + try { + return stuff.get(); } finally { - lock.writeLock().unlock(); + lock.unlock(); } } } diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/CursorReadingTask.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/CursorReadingTask.java index 1d6d81be62..dcebebbf75 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/CursorReadingTask.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/CursorReadingTask.java @@ -88,19 +88,14 @@ public void run() { } } catch (InterruptedException e) { - lock.lock(); - state = State.CANCELLED; - lock.unlock(); + doWhileLocked(lock, () -> state = State.CANCELLED); Thread.currentThread().interrupt(); break; } } } catch (RuntimeException e) { - lock.lock(); - state = State.CANCELLED; - lock.unlock(); - + doWhileLocked(lock, () -> state = State.CANCELLED); errorHandler.handleError(e); } } @@ -116,33 +111,32 @@ public void run() { */ private void start() { - lock.lock(); - if (!State.RUNNING.equals(state)) { - state = State.STARTING; - } - lock.unlock(); + doWhileLocked(lock, () -> { + if (!State.RUNNING.equals(state)) { + state = State.STARTING; + } + }); do { - boolean valid = false; + // boolean valid = false; - lock.lock(); + boolean valid = executeWhileLocked(lock, () -> { - try { - if (State.STARTING.equals(state)) { + if (!State.STARTING.equals(state)) { + return false; + } - MongoCursor cursor = execute(() -> initCursor(template, request.getRequestOptions(), targetType)); - valid = isValidCursor(cursor); - if (valid) { - this.cursor = cursor; - state = State.RUNNING; - } else if (cursor != null) { - cursor.close(); - } + MongoCursor cursor = execute(() -> initCursor(template, request.getRequestOptions(), targetType)); + boolean isValid = isValidCursor(cursor); + if (isValid) { + this.cursor = cursor; + state = State.RUNNING; + } else if (cursor != null) { + cursor.close(); } - } finally { - lock.unlock(); - } + return isValid; + }); if (!valid) { @@ -150,9 +144,7 @@ private void start() { Thread.sleep(100); } catch (InterruptedException e) { - lock.lock(); - state = State.CANCELLED; - lock.unlock(); + doWhileLocked(lock, () -> state = State.CANCELLED); Thread.currentThread().interrupt(); } } @@ -168,9 +160,7 @@ private void start() { @Override public void cancel() throws DataAccessResourceFailureException { - lock.lock(); - - try { + doWhileLocked(lock, () -> { if (State.RUNNING.equals(state) || State.STARTING.equals(state)) { this.state = State.CANCELLED; @@ -178,9 +168,7 @@ public void cancel() throws DataAccessResourceFailureException { cursor.close(); } } - } finally { - lock.unlock(); - } + }); } @Override @@ -190,13 +178,7 @@ public boolean isLongLived() { @Override public State getState() { - - lock.lock(); - try { - return state; - } finally { - lock.unlock(); - } + return executeWhileLocked(lock, () -> state); } @Override @@ -232,16 +214,12 @@ private void emitMessage(Message message) { @Nullable private T getNext() { - lock.lock(); - try{ + return executeWhileLocked(lock, () -> { if (State.RUNNING.equals(state)) { return cursor.tryNext(); } - } finally { - lock.unlock(); - } - - throw new IllegalStateException(String.format("Cursor %s is not longer open", cursor)); + throw new IllegalStateException(String.format("Cursor %s is not longer open", cursor)); + }); } private static boolean isValidCursor(@Nullable MongoCursor cursor) { @@ -278,4 +256,23 @@ private V execute(Supplier callback) { throw translated != null ? translated : e; } } + + private static void doWhileLocked(Lock lock, Runnable action) { + + executeWhileLocked(lock, () -> { + action.run(); + return null; + }); + } + + @Nullable + private static T executeWhileLocked(Lock lock, Supplier stuff) { + + lock.lock(); + try { + return stuff.get(); + } finally { + lock.unlock(); + } + } } diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/DefaultMessageListenerContainer.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/DefaultMessageListenerContainer.java index a38a02b186..0e8f72cfe9 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/DefaultMessageListenerContainer.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/DefaultMessageListenerContainer.java @@ -20,8 +20,10 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.Executor; +import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Supplier; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -37,8 +39,7 @@ /** * Simple {@link Executor} based {@link MessageListenerContainer} implementation for running {@link Task tasks} like * listening to MongoDB Change Streams and tailable - * cursors. - *
+ * cursors.
* This message container creates long-running tasks that are executed on {@link Executor}. * * @author Christoph Strobl @@ -113,8 +114,7 @@ public void stop(Runnable callback) { @Override public void start() { - lifecycleMonitor.writeLock().lock(); - try { + doWhileLocked(lifecycleMonitor.writeLock(), () -> { if (!this.running) { subscriptions.values().stream() // .filter(it -> !it.isActive()) // @@ -125,36 +125,23 @@ public void start() { running = true; } - } finally { - lifecycleMonitor.writeLock().unlock(); - } + }); } @Override public void stop() { - lifecycleMonitor.writeLock().lock(); - - try { + doWhileLocked(lifecycleMonitor.writeLock(), () -> { if (this.running) { subscriptions.values().forEach(Cancelable::cancel); running = false; } - } finally { - lifecycleMonitor.writeLock().unlock(); - } - + }); } @Override public boolean isRunning() { - - lifecycleMonitor.writeLock().lock(); - try { - return running; - } finally { - lifecycleMonitor.writeLock().unlock(); - } + return executeWhileLocked(lifecycleMonitor.readLock(), () -> running); } @Override @@ -179,43 +166,32 @@ public Subscription register(SubscriptionRequest lookup(SubscriptionRequest request) { - - subscriptionMonitor.readLock(); - try { - return Optional.ofNullable(subscriptions.get(request)); - } finally { - subscriptionMonitor.readLock().unlock(); - } - + return executeWhileLocked(subscriptionMonitor.readLock(), () -> Optional.ofNullable(subscriptions.get(request))); } public Subscription register(SubscriptionRequest request, Task task) { - Subscription subscription = new TaskSubscription(task); - - this.subscriptionMonitor.writeLock().lock(); - try { + return executeWhileLocked(this.subscriptionMonitor.writeLock(), () -> + { if (subscriptions.containsKey(request)) { return subscriptions.get(request); } + Subscription subscription = new TaskSubscription(task); this.subscriptions.put(request, subscription); if (this.isRunning()) { taskExecutor.execute(task); } return subscription; - } finally { - this.subscriptionMonitor.writeLock().unlock(); - } + }); } @Override public void remove(Subscription subscription) { - this.subscriptionMonitor.writeLock().lock(); - try { + doWhileLocked(this.subscriptionMonitor.writeLock(), () -> { if (subscriptions.containsValue(subscription)) { @@ -225,8 +201,25 @@ public void remove(Subscription subscription) { subscriptions.values().remove(subscription); } + }); + } + + private static void doWhileLocked(Lock lock, Runnable action) { + + executeWhileLocked(lock, () -> { + action.run(); + return null; + }); + } + + @Nullable + private static T executeWhileLocked(Lock lock, Supplier stuff) { + + lock.lock(); + try { + return stuff.get(); } finally { - this.subscriptionMonitor.writeLock().unlock(); + lock.unlock(); } } From 5ccf2642739b513a474d81ba05267bbce8d4d5bd Mon Sep 17 00:00:00 2001 From: Mark Paluch Date: Wed, 27 Sep 2023 11:46:11 +0200 Subject: [PATCH 5/5] Polishing. Introduce Lock utility for easier lock handling. --- .../data/mongodb/core/MongoOperations.java | 73 +++++++------- .../core/convert/LazyLoadingProxyFactory.java | 23 ++--- .../core/messaging/CursorReadingTask.java | 40 ++------ .../DefaultMessageListenerContainer.java | 46 +++------ .../data/mongodb/util/DefaultLock.java | 44 +++++++++ .../data/mongodb/util/Lock.java | 96 +++++++++++++++++++ 6 files changed, 208 insertions(+), 114 deletions(-) create mode 100644 spring-data-mongodb/src/main/java/org/springframework/data/mongodb/util/DefaultLock.java create mode 100644 spring-data-mongodb/src/main/java/org/springframework/data/mongodb/util/Lock.java diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoOperations.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoOperations.java index db419a25e3..409bf725a7 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoOperations.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoOperations.java @@ -18,12 +18,9 @@ import java.util.Collection; import java.util.List; import java.util.Set; -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Consumer; import java.util.function.Supplier; -import java.util.function.UnaryOperator; import java.util.stream.Stream; import org.bson.Document; @@ -50,6 +47,7 @@ import org.springframework.data.mongodb.core.query.Query; import org.springframework.data.mongodb.core.query.Update; import org.springframework.data.mongodb.core.query.UpdateDefinition; +import org.springframework.data.mongodb.util.Lock; import org.springframework.lang.Nullable; import org.springframework.util.Assert; import org.springframework.util.ClassUtils; @@ -192,20 +190,18 @@ default SessionScoped withSession(Supplier sessionProvider) { return new SessionScoped() { - private final Lock lock = new ReentrantLock(); + private final Lock lock = Lock.of(new ReentrantLock()); private @Nullable ClientSession session; @Override public T execute(SessionCallback action, Consumer onComplete) { - lock.lock(); - try { + lock.executeWithoutResult(() -> { + if (session == null) { session = sessionProvider.get(); } - } finally { - lock.unlock(); - } + }); try { return action.doInSession(MongoOperations.this.withSession(session)); @@ -950,8 +946,8 @@ default List findDistinct(Query query, String field, String collection, C * Triggers findAndModify * to apply provided {@link Update} on documents matching {@link Criteria} of given {@link Query}. * - * @param query the {@link Query} class that specifies the {@link Criteria} used to find a document and also an optional - * fields specification. Must not be {@literal null}. + * @param query the {@link Query} class that specifies the {@link Criteria} used to find a document and also an + * optional fields specification. Must not be {@literal null}. * @param update the {@link UpdateDefinition} to apply on matching documents. Must not be {@literal null}. * @param entityClass the parametrized type. Must not be {@literal null}. * @return the converted object that was updated before it was updated or {@literal null}, if not found. @@ -966,8 +962,8 @@ default List findDistinct(Query query, String field, String collection, C * Triggers findAndModify * to apply provided {@link Update} on documents matching {@link Criteria} of given {@link Query}. * - * @param query the {@link Query} class that specifies the {@link Criteria} used to find a document and also an optional - * fields specification. Must not be {@literal null}. + * @param query the {@link Query} class that specifies the {@link Criteria} used to find a document and also an + * optional fields specification. Must not be {@literal null}. * @param update the {@link UpdateDefinition} to apply on matching documents. Must not be {@literal null}. * @param entityClass the parametrized type. Must not be {@literal null}. * @param collectionName the collection to query. Must not be {@literal null}. @@ -984,8 +980,8 @@ default List findDistinct(Query query, String field, String collection, C * to apply provided {@link Update} on documents matching {@link Criteria} of given {@link Query} taking * {@link FindAndModifyOptions} into account. * - * @param query the {@link Query} class that specifies the {@link Criteria} used to find a document and also an optional - * fields specification. + * @param query the {@link Query} class that specifies the {@link Criteria} used to find a document and also an + * optional fields specification. * @param update the {@link UpdateDefinition} to apply on matching documents. * @param options the {@link FindAndModifyOptions} holding additional information. * @param entityClass the parametrized type. @@ -1004,8 +1000,8 @@ default List findDistinct(Query query, String field, String collection, C * to apply provided {@link Update} on documents matching {@link Criteria} of given {@link Query} taking * {@link FindAndModifyOptions} into account. * - * @param query the {@link Query} class that specifies the {@link Criteria} used to find a document and also an optional - * fields specification. Must not be {@literal null}. + * @param query the {@link Query} class that specifies the {@link Criteria} used to find a document and also an + * optional fields specification. Must not be {@literal null}. * @param update the {@link UpdateDefinition} to apply on matching documents. Must not be {@literal null}. * @param options the {@link FindAndModifyOptions} holding additional information. Must not be {@literal null}. * @param entityClass the parametrized type. Must not be {@literal null}. @@ -1030,8 +1026,8 @@ T findAndModify(Query query, UpdateDefinition update, FindAndModifyOptions o * Options are defaulted to {@link FindAndReplaceOptions#empty()}.
* NOTE: The replacement entity must not hold an {@literal id}. * - * @param query the {@link Query} class that specifies the {@link Criteria} used to find a document and also an optional - * fields specification. Must not be {@literal null}. + * @param query the {@link Query} class that specifies the {@link Criteria} used to find a document and also an + * optional fields specification. Must not be {@literal null}. * @param replacement the replacement document. Must not be {@literal null}. * @return the converted object that was updated or {@literal null}, if not found. * @throws org.springframework.data.mapping.MappingException if the collection name cannot be @@ -1051,8 +1047,8 @@ default T findAndReplace(Query query, T replacement) { * Options are defaulted to {@link FindAndReplaceOptions#empty()}.
* NOTE: The replacement entity must not hold an {@literal id}. * - * @param query the {@link Query} class that specifies the {@link Criteria} used to find a document and also an optional - * fields specification. Must not be {@literal null}. + * @param query the {@link Query} class that specifies the {@link Criteria} used to find a document and also an + * optional fields specification. Must not be {@literal null}. * @param replacement the replacement document. Must not be {@literal null}. * @param collectionName the collection to query. Must not be {@literal null}. * @return the converted object that was updated or {@literal null}, if not found. @@ -1070,8 +1066,8 @@ default T findAndReplace(Query query, T replacement, String collectionName) * taking {@link FindAndReplaceOptions} into account.
* NOTE: The replacement entity must not hold an {@literal id}. * - * @param query the {@link Query} class that specifies the {@link Criteria} used to find a document and also an optional - * fields specification. Must not be {@literal null}. + * @param query the {@link Query} class that specifies the {@link Criteria} used to find a document and also an + * optional fields specification. Must not be {@literal null}. * @param replacement the replacement document. Must not be {@literal null}. * @param options the {@link FindAndModifyOptions} holding additional information. Must not be {@literal null}. * @return the converted object that was updated or {@literal null}, if not found. Depending on the value of @@ -1093,8 +1089,8 @@ default T findAndReplace(Query query, T replacement, FindAndReplaceOptions o * taking {@link FindAndReplaceOptions} into account.
* NOTE: The replacement entity must not hold an {@literal id}. * - * @param query the {@link Query} class that specifies the {@link Criteria} used to find a document and also an optional - * fields specification. Must not be {@literal null}. + * @param query the {@link Query} class that specifies the {@link Criteria} used to find a document and also an + * optional fields specification. Must not be {@literal null}. * @param replacement the replacement document. Must not be {@literal null}. * @param options the {@link FindAndModifyOptions} holding additional information. Must not be {@literal null}. * @return the converted object that was updated or {@literal null}, if not found. Depending on the value of @@ -1116,8 +1112,8 @@ default T findAndReplace(Query query, T replacement, FindAndReplaceOptions o * taking {@link FindAndReplaceOptions} into account.
* NOTE: The replacement entity must not hold an {@literal id}. * - * @param query the {@link Query} class that specifies the {@link Criteria} used to find a document and also an optional - * fields specification. Must not be {@literal null}. + * @param query the {@link Query} class that specifies the {@link Criteria} used to find a document and also an + * optional fields specification. Must not be {@literal null}. * @param replacement the replacement document. Must not be {@literal null}. * @param options the {@link FindAndModifyOptions} holding additional information. Must not be {@literal null}. * @param entityType the parametrized type. Must not be {@literal null}. @@ -1141,8 +1137,8 @@ default T findAndReplace(Query query, T replacement, FindAndReplaceOptions o * taking {@link FindAndReplaceOptions} into account.
* NOTE: The replacement entity must not hold an {@literal id}. * - * @param query the {@link Query} class that specifies the {@link Criteria} used to find a document and also an optional - * fields specification. Must not be {@literal null}. + * @param query the {@link Query} class that specifies the {@link Criteria} used to find a document and also an + * optional fields specification. Must not be {@literal null}. * @param replacement the replacement document. Must not be {@literal null}. * @param options the {@link FindAndModifyOptions} holding additional information. Must not be {@literal null}. * @param entityType the type used for mapping the {@link Query} to domain type fields and deriving the collection @@ -1171,8 +1167,8 @@ default T findAndReplace(Query query, S replacement, FindAndReplaceOption * taking {@link FindAndReplaceOptions} into account.
* NOTE: The replacement entity must not hold an {@literal id}. * - * @param query the {@link Query} class that specifies the {@link Criteria} used to find a document and also an optional - * fields specification. Must not be {@literal null}. + * @param query the {@link Query} class that specifies the {@link Criteria} used to find a document and also an + * optional fields specification. Must not be {@literal null}. * @param replacement the replacement document. Must not be {@literal null}. * @param options the {@link FindAndModifyOptions} holding additional information. Must not be {@literal null}. * @param entityType the type used for mapping the {@link Query} to domain type fields. Must not be {@literal null}. @@ -1680,7 +1676,8 @@ default long exactCount(Query query, String collectionName) { * acknowledged} remove operation was successful or not. * * @param object must not be {@literal null}. - * @param collectionName name of the collection where the documents will be removed from, must not be {@literal null} or empty. + * @param collectionName name of the collection where the documents will be removed from, must not be {@literal null} + * or empty. * @return the {@link DeleteResult} which lets you access the results of the previous delete. */ DeleteResult remove(Object object, String collectionName); @@ -1704,7 +1701,8 @@ default long exactCount(Query query, String collectionName) { * * @param query the query document that specifies the criteria used to remove a document. * @param entityClass class of the pojo to be operated on. Can be {@literal null}. - * @param collectionName name of the collection where the documents will be removed from, must not be {@literal null} or empty. + * @param collectionName name of the collection where the documents will be removed from, must not be {@literal null} + * or empty. * @return the {@link DeleteResult} which lets you access the results of the previous delete. * @throws IllegalArgumentException when {@literal query}, {@literal entityClass} or {@literal collectionName} is * {@literal null}. @@ -1718,7 +1716,8 @@ default long exactCount(Query query, String collectionName) { * information. Use {@link #remove(Query, Class, String)} to get full type specific support. * * @param query the query document that specifies the criteria used to remove a document. - * @param collectionName name of the collection where the documents will be removed from, must not be {@literal null} or empty. + * @param collectionName name of the collection where the documents will be removed from, must not be {@literal null} + * or empty. * @return the {@link DeleteResult} which lets you access the results of the previous delete. * @throws IllegalArgumentException when {@literal query} or {@literal collectionName} is {@literal null}. */ @@ -1730,7 +1729,8 @@ default long exactCount(Query query, String collectionName) { * information. Use {@link #findAllAndRemove(Query, Class, String)} to get full type specific support. * * @param query the query document that specifies the criteria used to find and remove documents. - * @param collectionName name of the collection where the documents will be removed from, must not be {@literal null} or empty. + * @param collectionName name of the collection where the documents will be removed from, must not be {@literal null} + * or empty. * @return the {@link List} converted objects deleted by this operation. * @since 1.5 */ @@ -1755,7 +1755,8 @@ default long exactCount(Query query, String collectionName) { * * @param query the query document that specifies the criteria used to find and remove documents. * @param entityClass class of the pojo to be operated on. - * @param collectionName name of the collection where the documents will be removed from, must not be {@literal null} or empty. + * @param collectionName name of the collection where the documents will be removed from, must not be {@literal null} + * or empty. * @return the {@link List} converted objects deleted by this operation. * @since 1.5 */ diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/convert/LazyLoadingProxyFactory.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/convert/LazyLoadingProxyFactory.java index 53ff2a0be3..05f337a8e3 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/convert/LazyLoadingProxyFactory.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/convert/LazyLoadingProxyFactory.java @@ -22,7 +22,6 @@ import java.io.ObjectOutputStream; import java.io.Serializable; import java.lang.reflect.Method; -import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Supplier; @@ -42,6 +41,8 @@ import org.springframework.data.mongodb.ClientSessionException; import org.springframework.data.mongodb.LazyLoadingException; import org.springframework.data.mongodb.core.mapping.MongoPersistentProperty; +import org.springframework.data.mongodb.util.Lock; +import org.springframework.data.mongodb.util.Lock.AcquiredLock; import org.springframework.lang.Nullable; import org.springframework.objenesis.SpringObjenesis; import org.springframework.util.ReflectionUtils; @@ -175,7 +176,9 @@ public static class LazyLoadingInterceptor } } - private final ReadWriteLock lock = new ReentrantReadWriteLock(); + private final ReadWriteLock rwLock = new ReentrantReadWriteLock(); + private final Lock readLock = Lock.of(rwLock.readLock()); + private final Lock writeLock = Lock.of(rwLock.writeLock()); private final MongoPersistentProperty property; private final DbRefResolverCallback callback; @@ -347,8 +350,7 @@ private void readObject(ObjectInputStream in) throws IOException { @Nullable private Object resolve() { - lock.readLock().lock(); - try { + try (AcquiredLock l = readLock.lock()) { if (resolved) { if (LOGGER.isTraceEnabled()) { @@ -357,8 +359,6 @@ private Object resolve() { } return result; } - } finally { - lock.readLock().unlock(); } if (LOGGER.isTraceEnabled()) { @@ -367,7 +367,7 @@ private Object resolve() { } try { - return executeWhileLocked(lock.writeLock(), () -> callback.resolve(property)); + return writeLock.execute(() -> callback.resolve(property)); } catch (RuntimeException ex) { DataAccessException translatedException = exceptionTranslator.translateExceptionIfPossible(ex); @@ -381,15 +381,6 @@ private Object resolve() { } } - private static T executeWhileLocked(Lock lock, Supplier stuff) { - - lock.lock(); - try { - return stuff.get(); - } finally { - lock.unlock(); - } - } } } diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/CursorReadingTask.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/CursorReadingTask.java index dcebebbf75..f20fb22bdd 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/CursorReadingTask.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/CursorReadingTask.java @@ -18,7 +18,6 @@ import java.time.Duration; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Supplier; @@ -26,6 +25,7 @@ import org.springframework.data.mongodb.core.MongoTemplate; import org.springframework.data.mongodb.core.messaging.Message.MessageProperties; import org.springframework.data.mongodb.core.messaging.SubscriptionRequest.RequestOptions; +import org.springframework.data.mongodb.util.Lock; import org.springframework.lang.Nullable; import org.springframework.util.Assert; import org.springframework.util.ErrorHandler; @@ -41,7 +41,7 @@ */ abstract class CursorReadingTask implements Task { - private final Lock lock = new ReentrantLock(); + private final Lock lock = Lock.of(new ReentrantLock()); private final MongoTemplate template; private final SubscriptionRequest request; @@ -88,14 +88,14 @@ public void run() { } } catch (InterruptedException e) { - doWhileLocked(lock, () -> state = State.CANCELLED); + lock.executeWithoutResult(() -> state = State.CANCELLED); Thread.currentThread().interrupt(); break; } } } catch (RuntimeException e) { - doWhileLocked(lock, () -> state = State.CANCELLED); + lock.executeWithoutResult(() -> state = State.CANCELLED); errorHandler.handleError(e); } } @@ -111,7 +111,7 @@ public void run() { */ private void start() { - doWhileLocked(lock, () -> { + lock.executeWithoutResult(() -> { if (!State.RUNNING.equals(state)) { state = State.STARTING; } @@ -119,9 +119,7 @@ private void start() { do { - // boolean valid = false; - - boolean valid = executeWhileLocked(lock, () -> { + boolean valid = lock.execute(() -> { if (!State.STARTING.equals(state)) { return false; @@ -144,7 +142,7 @@ private void start() { Thread.sleep(100); } catch (InterruptedException e) { - doWhileLocked(lock, () -> state = State.CANCELLED); + lock.executeWithoutResult(() -> state = State.CANCELLED); Thread.currentThread().interrupt(); } } @@ -160,7 +158,7 @@ private void start() { @Override public void cancel() throws DataAccessResourceFailureException { - doWhileLocked(lock, () -> { + lock.executeWithoutResult(() -> { if (State.RUNNING.equals(state) || State.STARTING.equals(state)) { this.state = State.CANCELLED; @@ -178,7 +176,7 @@ public boolean isLongLived() { @Override public State getState() { - return executeWhileLocked(lock, () -> state); + return lock.execute(() -> state); } @Override @@ -214,7 +212,7 @@ private void emitMessage(Message message) { @Nullable private T getNext() { - return executeWhileLocked(lock, () -> { + return lock.execute(() -> { if (State.RUNNING.equals(state)) { return cursor.tryNext(); } @@ -257,22 +255,4 @@ private V execute(Supplier callback) { } } - private static void doWhileLocked(Lock lock, Runnable action) { - - executeWhileLocked(lock, () -> { - action.run(); - return null; - }); - } - - @Nullable - private static T executeWhileLocked(Lock lock, Supplier stuff) { - - lock.lock(); - try { - return stuff.get(); - } finally { - lock.unlock(); - } - } } diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/DefaultMessageListenerContainer.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/DefaultMessageListenerContainer.java index 0e8f72cfe9..47aa59ce35 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/DefaultMessageListenerContainer.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/DefaultMessageListenerContainer.java @@ -20,10 +20,8 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.Executor; -import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.function.Supplier; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -31,6 +29,7 @@ import org.springframework.dao.DataAccessResourceFailureException; import org.springframework.data.mongodb.core.MongoTemplate; import org.springframework.data.mongodb.core.messaging.SubscriptionRequest.RequestOptions; +import org.springframework.data.mongodb.util.Lock; import org.springframework.lang.Nullable; import org.springframework.util.Assert; import org.springframework.util.ErrorHandler; @@ -54,8 +53,13 @@ public class DefaultMessageListenerContainer implements MessageListenerContainer private final Map subscriptions = new LinkedHashMap<>(); - ReadWriteLock lifecycleMonitor = new ReentrantReadWriteLock(); - ReadWriteLock subscriptionMonitor = new ReentrantReadWriteLock(); + private final ReadWriteLock lifecycleMonitor = new ReentrantReadWriteLock(); + private final Lock lifecycleRead = Lock.of(lifecycleMonitor.readLock()); + private final Lock lifecycleWrite = Lock.of(lifecycleMonitor.readLock()); + + private final ReadWriteLock subscriptionMonitor = new ReentrantReadWriteLock(); + private final Lock subscriptionRead = Lock.of(subscriptionMonitor.readLock()); + private final Lock subscriptionWrite = Lock.of(subscriptionMonitor.readLock()); private boolean running = false; @@ -114,7 +118,7 @@ public void stop(Runnable callback) { @Override public void start() { - doWhileLocked(lifecycleMonitor.writeLock(), () -> { + lifecycleWrite.executeWithoutResult(() -> { if (!this.running) { subscriptions.values().stream() // .filter(it -> !it.isActive()) // @@ -130,8 +134,7 @@ public void start() { @Override public void stop() { - - doWhileLocked(lifecycleMonitor.writeLock(), () -> { + lifecycleWrite.executeWithoutResult(() -> { if (this.running) { subscriptions.values().forEach(Cancelable::cancel); running = false; @@ -141,7 +144,7 @@ public void stop() { @Override public boolean isRunning() { - return executeWhileLocked(lifecycleMonitor.readLock(), () -> running); + return lifecycleRead.execute(() -> running); } @Override @@ -166,13 +169,12 @@ public Subscription register(SubscriptionRequest lookup(SubscriptionRequest request) { - return executeWhileLocked(subscriptionMonitor.readLock(), () -> Optional.ofNullable(subscriptions.get(request))); + return subscriptionRead.execute(() -> Optional.ofNullable(subscriptions.get(request))); } public Subscription register(SubscriptionRequest request, Task task) { - return executeWhileLocked(this.subscriptionMonitor.writeLock(), () -> - { + return subscriptionWrite.execute(() -> { if (subscriptions.containsKey(request)) { return subscriptions.get(request); } @@ -190,8 +192,7 @@ public Subscription register(SubscriptionRequest request, Task task) { @Override public void remove(Subscription subscription) { - - doWhileLocked(this.subscriptionMonitor.writeLock(), () -> { + subscriptionWrite.executeWithoutResult(() -> { if (subscriptions.containsValue(subscription)) { @@ -203,25 +204,6 @@ public void remove(Subscription subscription) { } }); } - - private static void doWhileLocked(Lock lock, Runnable action) { - - executeWhileLocked(lock, () -> { - action.run(); - return null; - }); - } - - @Nullable - private static T executeWhileLocked(Lock lock, Supplier stuff) { - - lock.lock(); - try { - return stuff.get(); - } finally { - lock.unlock(); - } - } /** * @author Christoph Strobl diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/util/DefaultLock.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/util/DefaultLock.java new file mode 100644 index 0000000000..4d34e857fd --- /dev/null +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/util/DefaultLock.java @@ -0,0 +1,44 @@ +/* + * Copyright 2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.mongodb.util; + +import org.springframework.data.mongodb.util.Lock.AcquiredLock; + +/** + * Default {@link Lock} implementation. + * + * @author Mark Paluch + * @since 3.2 + */ +class DefaultLock implements Lock, AcquiredLock { + + private final java.util.concurrent.locks.Lock delegate; + + DefaultLock(java.util.concurrent.locks.Lock delegate) { + this.delegate = delegate; + } + + @Override + public AcquiredLock lock() { + delegate.lock(); + return this; + } + + @Override + public void close() { + delegate.unlock(); + } +} diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/util/Lock.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/util/Lock.java new file mode 100644 index 0000000000..608ec23728 --- /dev/null +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/util/Lock.java @@ -0,0 +1,96 @@ +/* + * Copyright 2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.mongodb.util; + +import java.util.concurrent.locks.Condition; +import java.util.function.Supplier; + +import org.springframework.util.Assert; + +/** + * {@code Lock} provides more extensive locking operations than can be obtained using {@code synchronized} methods and + * {@link java.util.concurrent.locks.Lock}. It allows more flexible structuring and may support multiple associated + * {@link Condition} objects. + * + * @author Mark Paluch + * @since 4.2 + */ +public interface Lock { + + /** + * Create a new {@link Lock} adapter for the given {@link java.util.concurrent.locks.Lock delegate}. + * + * @param delegate must not be {@literal null}. + * @return a new {@link Lock} adapter. + */ + static Lock of(java.util.concurrent.locks.Lock delegate) { + + Assert.notNull(delegate, "Lock delegate must not be null"); + + return new DefaultLock(delegate); + } + + /** + * Acquires the lock. + *

+ * If the lock is not available then the current thread becomes disabled for thread scheduling purposes and lies + * dormant until the lock has been acquired. + * + * @see java.util.concurrent.locks.Lock#lock() + */ + AcquiredLock lock(); + + /** + * Execute the action specified by the given callback object guarded by a lock and return its result. + * + * @param action the action to run. + * @return the result of the action. + * @param type of the result. + * @throws RuntimeException if thrown by the action + */ + default T execute(Supplier action) { + try (AcquiredLock l = lock()) { + return action.get(); + } + } + + /** + * Execute the action specified by the given callback object guarded by a lock. + * + * @param action the action to run. + * @throws RuntimeException if thrown by the action + */ + default void executeWithoutResult(Runnable action) { + try (AcquiredLock l = lock()) { + action.run(); + } + } + + /** + * An acquired lock can be used with try-with-resources for easier releasing. + */ + interface AcquiredLock extends AutoCloseable { + + /** + * Releases the lock. + * + * @see java.util.concurrent.locks.Lock#unlock() + */ + @Override + void close(); + } + +}