subscriptions = new LinkedHashMap<>();
+ private final ReadWriteLock lifecycleMonitor = new ReentrantReadWriteLock();
+ private final Lock lifecycleRead = Lock.of(lifecycleMonitor.readLock());
+ private final Lock lifecycleWrite = Lock.of(lifecycleMonitor.readLock());
+
+ private final ReadWriteLock subscriptionMonitor = new ReentrantReadWriteLock();
+ private final Lock subscriptionRead = Lock.of(subscriptionMonitor.readLock());
+ private final Lock subscriptionWrite = Lock.of(subscriptionMonitor.readLock());
+
private boolean running = false;
/**
@@ -109,43 +118,33 @@ public void stop(Runnable callback) {
@Override
public void start() {
- synchronized (lifecycleMonitor) {
+ lifecycleWrite.executeWithoutResult(() -> {
+ if (!this.running) {
+ subscriptions.values().stream() //
+ .filter(it -> !it.isActive()) //
+ .filter(TaskSubscription.class::isInstance) //
+ .map(TaskSubscription.class::cast) //
+ .map(TaskSubscription::getTask) //
+ .forEach(taskExecutor::execute);
- if (this.running) {
- return;
+ running = true;
}
-
- subscriptions.values().stream() //
- .filter(it -> !it.isActive()) //
- .filter(TaskSubscription.class::isInstance) //
- .map(TaskSubscription.class::cast) //
- .map(TaskSubscription::getTask) //
- .forEach(taskExecutor::execute);
-
- running = true;
- }
+ });
}
@Override
public void stop() {
-
- synchronized (lifecycleMonitor) {
-
+ lifecycleWrite.executeWithoutResult(() -> {
if (this.running) {
-
subscriptions.values().forEach(Cancelable::cancel);
-
running = false;
}
- }
+ });
}
@Override
public boolean isRunning() {
-
- synchronized (this.lifecycleMonitor) {
- return running;
- }
+ return lifecycleRead.execute(() -> running);
}
@Override
@@ -170,36 +169,30 @@ public Subscription register(SubscriptionRequest lookup(SubscriptionRequest, ?, ?> request) {
-
- synchronized (lifecycleMonitor) {
- return Optional.ofNullable(subscriptions.get(request));
- }
+ return subscriptionRead.execute(() -> Optional.ofNullable(subscriptions.get(request)));
}
public Subscription register(SubscriptionRequest request, Task task) {
- Subscription subscription = new TaskSubscription(task);
-
- synchronized (lifecycleMonitor) {
-
+ return subscriptionWrite.execute(() -> {
if (subscriptions.containsKey(request)) {
return subscriptions.get(request);
}
+ Subscription subscription = new TaskSubscription(task);
this.subscriptions.put(request, subscription);
- if (this.running) {
+ if (this.isRunning()) {
taskExecutor.execute(task);
}
- }
+ return subscription;
+ });
- return subscription;
}
@Override
public void remove(Subscription subscription) {
-
- synchronized (lifecycleMonitor) {
+ subscriptionWrite.executeWithoutResult(() -> {
if (subscriptions.containsValue(subscription)) {
@@ -209,7 +202,7 @@ public void remove(Subscription subscription) {
subscriptions.values().remove(subscription);
}
- }
+ });
}
/**
diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/util/DefaultLock.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/util/DefaultLock.java
new file mode 100644
index 0000000000..4d34e857fd
--- /dev/null
+++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/util/DefaultLock.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2023 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.springframework.data.mongodb.util;
+
+import org.springframework.data.mongodb.util.Lock.AcquiredLock;
+
+/**
+ * Default {@link Lock} implementation.
+ *
+ * @author Mark Paluch
+ * @since 3.2
+ */
+class DefaultLock implements Lock, AcquiredLock {
+
+ private final java.util.concurrent.locks.Lock delegate;
+
+ DefaultLock(java.util.concurrent.locks.Lock delegate) {
+ this.delegate = delegate;
+ }
+
+ @Override
+ public AcquiredLock lock() {
+ delegate.lock();
+ return this;
+ }
+
+ @Override
+ public void close() {
+ delegate.unlock();
+ }
+}
diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/util/Lock.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/util/Lock.java
new file mode 100644
index 0000000000..608ec23728
--- /dev/null
+++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/util/Lock.java
@@ -0,0 +1,96 @@
+/*
+ * Copyright 2023 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.springframework.data.mongodb.util;
+
+import java.util.concurrent.locks.Condition;
+import java.util.function.Supplier;
+
+import org.springframework.util.Assert;
+
+/**
+ * {@code Lock} provides more extensive locking operations than can be obtained using {@code synchronized} methods and
+ * {@link java.util.concurrent.locks.Lock}. It allows more flexible structuring and may support multiple associated
+ * {@link Condition} objects.
+ *
+ * @author Mark Paluch
+ * @since 4.2
+ */
+public interface Lock {
+
+ /**
+ * Create a new {@link Lock} adapter for the given {@link java.util.concurrent.locks.Lock delegate}.
+ *
+ * @param delegate must not be {@literal null}.
+ * @return a new {@link Lock} adapter.
+ */
+ static Lock of(java.util.concurrent.locks.Lock delegate) {
+
+ Assert.notNull(delegate, "Lock delegate must not be null");
+
+ return new DefaultLock(delegate);
+ }
+
+ /**
+ * Acquires the lock.
+ *
+ * If the lock is not available then the current thread becomes disabled for thread scheduling purposes and lies
+ * dormant until the lock has been acquired.
+ *
+ * @see java.util.concurrent.locks.Lock#lock()
+ */
+ AcquiredLock lock();
+
+ /**
+ * Execute the action specified by the given callback object guarded by a lock and return its result.
+ *
+ * @param action the action to run.
+ * @return the result of the action.
+ * @param type of the result.
+ * @throws RuntimeException if thrown by the action
+ */
+ default T execute(Supplier action) {
+ try (AcquiredLock l = lock()) {
+ return action.get();
+ }
+ }
+
+ /**
+ * Execute the action specified by the given callback object guarded by a lock.
+ *
+ * @param action the action to run.
+ * @throws RuntimeException if thrown by the action
+ */
+ default void executeWithoutResult(Runnable action) {
+ try (AcquiredLock l = lock()) {
+ action.run();
+ }
+ }
+
+ /**
+ * An acquired lock can be used with try-with-resources for easier releasing.
+ */
+ interface AcquiredLock extends AutoCloseable {
+
+ /**
+ * Releases the lock.
+ *
+ * @see java.util.concurrent.locks.Lock#unlock()
+ */
+ @Override
+ void close();
+ }
+
+}