Skip to content

Commit 1f87111

Browse files
committed
Track active subscriptions as Runnable in the processor
1 parent 78a47c9 commit 1f87111

File tree

2 files changed

+99
-43
lines changed

2 files changed

+99
-43
lines changed

spring-context/src/main/java/org/springframework/scheduling/annotation/ScheduledAnnotationBeanPostProcessor.java

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import java.util.Set;
2929
import java.util.TimeZone;
3030
import java.util.concurrent.ConcurrentHashMap;
31+
import java.util.concurrent.CopyOnWriteArrayList;
3132
import java.util.concurrent.ScheduledExecutorService;
3233
import java.util.concurrent.TimeUnit;
3334

@@ -144,6 +145,8 @@ public class ScheduledAnnotationBeanPostProcessor
144145

145146
private final Map<Object, Set<ScheduledTask>> scheduledTasks = new IdentityHashMap<>(16);
146147

148+
private final Map<Object, List<Runnable>> reactiveSubscriptions = new IdentityHashMap<>(16);
149+
147150

148151
/**
149152
* Create a default {@code ScheduledAnnotationBeanPostProcessor}.
@@ -573,8 +576,8 @@ protected void processScheduledSync(Scheduled scheduled, Method method, Object b
573576
protected void processScheduledAsync(Scheduled scheduled, Method method, Object bean) {
574577
Runnable task;
575578
try {
576-
boolean isFixedDelaySpecialCase = scheduled.fixedDelay() > 0 || StringUtils.hasText(scheduled.fixedDelayString());
577-
task = ScheduledAnnotationReactiveSupport.createSubscriptionRunnable(method, bean, isFixedDelaySpecialCase);
579+
task = ScheduledAnnotationReactiveSupport.createSubscriptionRunnable(method, bean, scheduled,
580+
this.reactiveSubscriptions.computeIfAbsent(bean, k -> new CopyOnWriteArrayList<>()));
578581
}
579582
catch (IllegalArgumentException ex) {
580583
throw new IllegalStateException("Could not create recurring task for @Scheduled method '" + method.getName() + "': " + ex.getMessage());
@@ -620,7 +623,8 @@ private static boolean isP(char ch) {
620623
/**
621624
* Return all currently scheduled tasks, from {@link Scheduled} methods
622625
* as well as from programmatic {@link SchedulingConfigurer} interaction.
623-
* <p>Note this doesn't include scheduled reactive methods.
626+
* <p>Note this includes upcoming scheduled subscriptions for reactive methods,
627+
* but doesn't cover any currently active subscription for such methods.
624628
* @since 5.0.2
625629
*/
626630
@Override
@@ -639,20 +643,27 @@ public Set<ScheduledTask> getScheduledTasks() {
639643
@Override
640644
public void postProcessBeforeDestruction(Object bean, String beanName) {
641645
Set<ScheduledTask> tasks;
646+
List<Runnable> liveSubscriptions;
642647
synchronized (this.scheduledTasks) {
643648
tasks = this.scheduledTasks.remove(bean);
649+
liveSubscriptions = this.reactiveSubscriptions.remove(bean);
644650
}
645651
if (tasks != null) {
646652
for (ScheduledTask task : tasks) {
647653
task.cancel();
648654
}
649655
}
656+
if (liveSubscriptions != null) {
657+
for (Runnable subscription : liveSubscriptions) {
658+
subscription.run(); // equivalent to cancelling the subscription
659+
}
660+
}
650661
}
651662

652663
@Override
653664
public boolean requiresDestruction(Object bean) {
654665
synchronized (this.scheduledTasks) {
655-
return this.scheduledTasks.containsKey(bean);
666+
return this.scheduledTasks.containsKey(bean) || this.reactiveSubscriptions.containsKey(bean);
656667
}
657668
}
658669

@@ -666,6 +677,12 @@ public void destroy() {
666677
}
667678
}
668679
this.scheduledTasks.clear();
680+
Collection<List<Runnable>> allLiveSubscriptions = this.reactiveSubscriptions.values();
681+
for (List<Runnable> liveSubscriptions : allLiveSubscriptions) {
682+
for (Runnable liveSubscription : liveSubscriptions) {
683+
liveSubscription.run(); //equivalent to cancelling the subscription
684+
}
685+
}
669686
}
670687
this.registrar.destroy();
671688
}

spring-context/src/main/java/org/springframework/scheduling/annotation/ScheduledAnnotationReactiveSupport.java

Lines changed: 78 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import java.lang.reflect.InvocationTargetException;
2020
import java.lang.reflect.Method;
21+
import java.util.List;
2122
import java.util.concurrent.CountDownLatch;
2223

2324
import org.apache.commons.logging.Log;
@@ -32,9 +33,11 @@
3233
import org.springframework.core.KotlinDetector;
3334
import org.springframework.core.ReactiveAdapter;
3435
import org.springframework.core.ReactiveAdapterRegistry;
36+
import org.springframework.lang.Nullable;
3537
import org.springframework.util.Assert;
3638
import org.springframework.util.ClassUtils;
3739
import org.springframework.util.ReflectionUtils;
40+
import org.springframework.util.StringUtils;
3841

3942
/**
4043
* Helper class for @{@link ScheduledAnnotationBeanPostProcessor} to support reactive cases
@@ -148,33 +151,16 @@ static Publisher<?> getPublisherFor(Method method, Object bean) {
148151
* (i.e. the task blocks until completion of the Publisher, then the delay is applied
149152
* until next iteration).
150153
*/
151-
static Runnable createSubscriptionRunnable(Method method, Object targetBean, boolean isFixedDelaySpecialCase) {
154+
static Runnable createSubscriptionRunnable(Method method, Object targetBean, Scheduled scheduled,
155+
List<Runnable> subscriptionTrackerRegistry) {
156+
boolean shouldBlock = scheduled.fixedDelay() > 0 || StringUtils.hasText(scheduled.fixedDelayString());
152157
final Publisher<?> publisher = getPublisherFor(method, targetBean);
153-
if (isFixedDelaySpecialCase) {
158+
if (shouldBlock) {
154159
return () -> {
155160
final CountDownLatch latch = new CountDownLatch(1);
156-
publisher.subscribe(new Subscriber<Object>() {
157-
@Override
158-
public void onSubscribe(Subscription s) {
159-
s.request(Integer.MAX_VALUE);
160-
}
161-
162-
@Override
163-
public void onNext(Object o) {
164-
// NO-OP
165-
}
166-
167-
@Override
168-
public void onError(Throwable ex) {
169-
LOGGER.warn("Unexpected error occurred in scheduled reactive task", ex);
170-
latch.countDown();
171-
}
172-
173-
@Override
174-
public void onComplete() {
175-
latch.countDown();
176-
}
177-
});
161+
TrackingSubscriber subscriber = new TrackingSubscriber(subscriptionTrackerRegistry, latch);
162+
subscriptionTrackerRegistry.add(subscriber);
163+
publisher.subscribe(subscriber);
178164
try {
179165
latch.await();
180166
}
@@ -183,27 +169,80 @@ public void onComplete() {
183169
}
184170
};
185171
}
186-
return () -> publisher.subscribe(new Subscriber<Object>() {
187-
@Override
188-
public void onSubscribe(Subscription s) {
189-
s.request(Integer.MAX_VALUE);
190-
}
172+
return () -> {
173+
final TrackingSubscriber subscriber = new TrackingSubscriber(subscriptionTrackerRegistry);
174+
subscriptionTrackerRegistry.add(subscriber);
175+
publisher.subscribe(subscriber);
176+
};
177+
}
178+
179+
/**
180+
* A {@code Subscriber} which keeps track of its {@code Subscription} and exposes the
181+
* capacity to cancel the subscription as a {@code Runnable}. Can optionally support
182+
* blocking if a {@code CountDownLatch} is passed at construction.
183+
*/
184+
private static final class TrackingSubscriber implements Subscriber<Object>, Runnable {
185+
186+
private final List<Runnable> subscriptionTrackerRegistry;
187+
188+
@Nullable
189+
private final CountDownLatch blockingLatch;
190+
191+
/*
192+
Implementation note: since this is created last minute when subscribing,
193+
there shouldn't be a way to cancel the tracker externally from the
194+
ScheduledAnnotationBeanProcessor before the #setSubscription(Subscription)
195+
method is called.
196+
*/
197+
@Nullable
198+
private Subscription s;
199+
200+
TrackingSubscriber(List<Runnable> subscriptionTrackerRegistry) {
201+
this(subscriptionTrackerRegistry, null);
202+
}
191203

192-
@Override
193-
public void onNext(Object o) {
194-
// NO-OP
204+
TrackingSubscriber(List<Runnable> subscriptionTrackerRegistry, @Nullable CountDownLatch latch) {
205+
this.subscriptionTrackerRegistry = subscriptionTrackerRegistry;
206+
this.blockingLatch = latch;
207+
}
208+
209+
@Override
210+
public void run() {
211+
if (this.s != null) {
212+
this.s.cancel();
213+
}
214+
if (this.blockingLatch != null) {
215+
this.blockingLatch.countDown();
195216
}
217+
}
218+
219+
@Override
220+
public void onSubscribe(Subscription s) {
221+
this.s = s;
222+
s.request(Integer.MAX_VALUE);
223+
}
224+
225+
@Override
226+
public void onNext(Object o) {
227+
// NO-OP
228+
}
196229

197-
@Override
198-
public void onError(Throwable ex) {
199-
LOGGER.warn("Unexpected error occurred in scheduled reactive task", ex);
230+
@Override
231+
public void onError(Throwable ex) {
232+
this.subscriptionTrackerRegistry.remove(this);
233+
LOGGER.warn("Unexpected error occurred in scheduled reactive task", ex);
234+
if (this.blockingLatch != null) {
235+
this.blockingLatch.countDown();
200236
}
237+
}
201238

202-
@Override
203-
public void onComplete() {
204-
// NO-OP
239+
@Override
240+
public void onComplete() {
241+
this.subscriptionTrackerRegistry.remove(this);
242+
if (this.blockingLatch != null) {
243+
this.blockingLatch.countDown();
205244
}
206-
});
245+
}
207246
}
208247

209248
}

0 commit comments

Comments
 (0)