Skip to content

Commit a413f4f

Browse files
committed
Pivot to use the existing Runnable-based infrastructure
1 parent ffdbc27 commit a413f4f

File tree

7 files changed

+187
-392
lines changed

7 files changed

+187
-392
lines changed

framework-docs/modules/ROOT/pages/integration/scheduling.adoc

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -452,17 +452,13 @@ registry denotes that by having the `getDescriptor().isDeferred()` method return
452452
}
453453
----
454454

455-
All these types of methods must be declared without any arguments. Additionally, the `cron`
456-
configuration elements are not supported for these cases.
457-
458-
Note that the `TaskExecutor`/`TaskScheduler` infrastructure is not used and the scheduling
459-
is instead performed by the Reactor library, which must be present at runtime.
460-
In the case of Kotlin suspending functions the `kotlinx.coroutines.reactor` bridge must
461-
also be present.
455+
All these types of methods must be declared without any arguments. In the case of Kotlin
456+
suspending functions the `kotlinx.coroutines.reactor` bridge must also be present to allow
457+
the framework to invoke a suspending function as a `Publisher`.
462458

463459
The Spring Framework will obtain a `Publisher` out of the annotated method once and will
464-
create a timed `Flux` which regularly subscribes to said `Publisher`. These inner regular
465-
subscriptions happen according to the `fixedDelay`/`fixedRate` configuration.
460+
schedule a `Runnable` in which it subscribes to said `Publisher`. These inner regular
461+
subscriptions happen according to the `cron`/fixedDelay`/`fixedRate` configuration.
466462

467463
If the `Publisher` emits `onNext` signal(s), these are ignored and discarded (the same way
468464
return values from synchronous `@Scheduled` methods are ignored).
@@ -502,8 +498,11 @@ seconds:
502498

503499
[NOTE]
504500
====
505-
The Spring Framework subscribes once to the underlying scheduling `Flux`, which can be
506-
cancelled by destroying the annotated bean or closing the application context.
501+
When destroying the annotated bean or closing the application context Spring Framework cancels
502+
scheduled tasks, which includes the next scheduled subscription to the `Publisher`.
503+
However, there is no tracking of individual subscriptions: once a subscription has started
504+
it can't be cancelled. For that reason, it is a pre-requisite that the `Publisher` is finite
505+
in addition to supporting multiple subsequent subscriptions.
507506
====
508507

509508

spring-context/spring-context.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ dependencies {
3838
testImplementation("org.apache.commons:commons-pool2")
3939
testImplementation("org.awaitility:awaitility")
4040
testImplementation("jakarta.inject:jakarta.inject-tck")
41+
testImplementation("org.jetbrains.kotlinx:kotlinx-coroutines-core")
4142
testImplementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactor")
4243
testImplementation("io.reactivex.rxjava3:rxjava")
4344
testRuntimeOnly("jakarta.xml.bind:jakarta.xml.bind-api")

spring-context/src/main/java/org/springframework/scheduling/annotation/EnableScheduling.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -188,11 +188,6 @@
188188
* application context and any separate {@code DispatcherServlet} application contexts,
189189
* if you need to apply its behavior at multiple levels.
190190
*
191-
* <p><b>Note: {@code @EnableScheduling} and {@code @Scheduled} support of reactive methods
192-
* and Kotlin suspending functions uses Reactor infrastructure</b> instead of the
193-
* {@code ScheduledTaskRegistrar} infrastructure, so previously discussed task-related
194-
* aspects like {@code SchedulingConfigurer} don't apply to these two cases.
195-
*
196191
* @author Chris Beams
197192
* @author Juergen Hoeller
198193
* @since 3.1

spring-context/src/main/java/org/springframework/scheduling/annotation/Scheduled.java

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -37,17 +37,18 @@
3737
* when called through the scheduler.
3838
*
3939
* <p>Methods that return a reactive {@code Publisher} or a type which can be adapted
40-
* to {@code Publisher} by the default {@code ReactiveAdapterRegistry} are supported
41-
* provided the Reactor library is present at runtime. Reactor is used to implement
42-
* the scheduling by repeatedly subscribing to the returned Publisher, which is only
43-
* produced once.
44-
* The cron configuration is not supported for this type of method. Values emitted by
45-
* the publisher are ignored and discarded. Errors are logged at WARN level, which
46-
* doesn't prevent further iterations.
40+
* to {@code Publisher} by the default {@code ReactiveAdapterRegistry} are supported.
41+
* The {@code Publisher} MUST be finite and MUST support multiple subsequent subscriptions
42+
* (i.e. be cold).
43+
* The returned Publisher is only produced once, and the scheduling infrastructure
44+
* then periodically {@code subscribe()} to it according to configuration.
45+
* Values emitted by the publisher are ignored. Errors are logged at WARN level, which
46+
* doesn't prevent further iterations. If a {@code fixed delay} is configured, the
47+
* subscription is blocked upon in order to respect the fixed delay semantics.
4748
*
48-
* <p>Kotlin suspending functions are also supported, provided the coroutine-Reactor
49+
* <p>Kotlin suspending functions are also supported, provided the coroutine-reactor
4950
* bridge ({@code kotlinx.coroutine.reactor}) is present at runtime. This bridge is
50-
* used to adapt the suspending function into a Reactor {@code Mono} which is treated
51+
* used to adapt the suspending function into a {@code Publisher} which is treated
5152
* the same way as in the reactive method case (see above).
5253
*
5354
* <p>Processing of {@code @Scheduled} annotations is performed by

spring-context/src/main/java/org/springframework/scheduling/annotation/ScheduledAnnotationBeanPostProcessor.java

Lines changed: 44 additions & 140 deletions
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,6 @@ public class ScheduledAnnotationBeanPostProcessor
143143
private final Set<Class<?>> nonAnnotatedClasses = Collections.newSetFromMap(new ConcurrentHashMap<>(64));
144144

145145
private final Map<Object, Set<ScheduledTask>> scheduledTasks = new IdentityHashMap<>(16);
146-
private final Map<Object, Set<ScheduledAnnotationReactiveSupport.ReactiveTask>> scheduledReactiveTasks = new IdentityHashMap<>(16);
147146

148147

149148
/**
@@ -318,15 +317,6 @@ private void finishRegistration() {
318317
}
319318

320319
this.registrar.afterPropertiesSet();
321-
// Start the reactive tasks (we synchronize on the common scheduledTasks on purpose)
322-
synchronized (this.scheduledTasks) {
323-
for (Set<ScheduledAnnotationReactiveSupport.ReactiveTask> reactiveTasks : this.scheduledReactiveTasks.values()) {
324-
for (ScheduledAnnotationReactiveSupport.ReactiveTask reactiveTask : reactiveTasks) {
325-
reactiveTask.subscribe();
326-
}
327-
}
328-
}
329-
330320
}
331321

332322
private <T> T resolveSchedulerBean(BeanFactory beanFactory, Class<T> schedulerType, boolean byName) {
@@ -397,37 +387,32 @@ public Object postProcessAfterInitialization(Object bean, String beanName) {
397387

398388
/**
399389
* Process the given {@code @Scheduled} method declaration on the given bean,
400-
* attempting to distinguish {@link #processScheduledReactive(Scheduled, Method, Object) reactive}
401-
* method from {@link #processScheduledSync(Scheduled, Method, Object) synchronous} methods.
390+
* attempting to distinguish {@link #processScheduledAsync(Scheduled, Method, Object) reactive}
391+
* methods from {@link #processScheduledSync(Scheduled, Method, Object) synchronous} methods.
402392
* @param scheduled the {@code @Scheduled} annotation
403393
* @param method the method that the annotation has been declared on
404394
* @param bean the target bean instance
405395
* @see #processScheduledSync(Scheduled, Method, Object)
406-
* @see #processScheduledReactive(Scheduled, Method, Object)
396+
* @see #processScheduledAsync(Scheduled, Method, Object)
407397
*/
408398
protected void processScheduled(Scheduled scheduled, Method method, Object bean) {
409399
// Is method a Kotlin suspending function? Throws if true but reactor bridge isn't on the classpath.
410-
// Is method returning a Publisher instance? Throws if true but Reactor isn't on the classpath.
400+
// Is method returning a reactive type? Throws if true, but it isn't a deferred Publisher type.
411401
if (ScheduledAnnotationReactiveSupport.isReactive(method)) {
412-
processScheduledReactive(scheduled, method, bean);
402+
processScheduledAsync(scheduled, method, bean);
413403
return;
414404
}
415405
processScheduledSync(scheduled, method, bean);
416406
}
417407

418408
/**
419-
* Process the given {@code @Scheduled} method declaration on the given bean,
420-
* as a synchronous method. The method MUST take no arguments. Its return value
421-
* is ignored (if any) and the scheduled invocations of the method take place
422-
* using the underlying {@link TaskScheduler} infrastructure.
423-
* @param scheduled the {@code @Scheduled} annotation
424-
* @param method the method that the annotation has been declared on
425-
* @param bean the target bean instance
426-
* @see #createRunnable(Object, Method)
409+
* Parse the {@code Scheduled} annotation and schedule the provided {@code Runnable}
410+
* accordingly. The Runnable can represent either a synchronous method invocation
411+
* (see {@link #processScheduledSync(Scheduled, Method, Object)}) or an asynchronous
412+
* one (see {@link #processScheduledAsync(Scheduled, Method, Object)}).
427413
*/
428-
protected void processScheduledSync(Scheduled scheduled, Method method, Object bean) {
414+
protected void processScheduledTask(Scheduled scheduled, Runnable runnable, Method method, Object bean) {
429415
try {
430-
Runnable runnable = createRunnable(bean, method);
431416
boolean processedSchedule = false;
432417
String errorMessage =
433418
"Exactly one of the 'cron', 'fixedDelay(String)', or 'fixedRate(String)' attributes is required";
@@ -550,118 +535,51 @@ protected void processScheduledSync(Scheduled scheduled, Method method, Object b
550535
}
551536
}
552537

538+
/**
539+
* Process the given {@code @Scheduled} method declaration on the given bean,
540+
* as a synchronous method. The method MUST take no arguments. Its return value
541+
* is ignored (if any) and the scheduled invocations of the method take place
542+
* using the underlying {@link TaskScheduler} infrastructure.
543+
* @param scheduled the {@code @Scheduled} annotation
544+
* @param method the method that the annotation has been declared on
545+
* @param bean the target bean instance
546+
* @see #createRunnable(Object, Method)
547+
*/
548+
protected void processScheduledSync(Scheduled scheduled, Method method, Object bean) {
549+
Runnable task;
550+
try {
551+
task = createRunnable(bean, method);
552+
}
553+
catch (IllegalArgumentException ex) {
554+
throw new IllegalStateException("Could not create recurring task for @Scheduled method '" + method.getName() + "': " + ex.getMessage());
555+
}
556+
processScheduledTask(scheduled, task, method, bean);
557+
}
558+
553559
/**
554560
* Process the given {@code @Scheduled} bean method declaration which returns
555561
* a {@code Publisher}, or the given Kotlin suspending function converted to a
556-
* Publisher. The publisher is then repeatedly subscribed to, according to the
557-
* fixedDelay/fixedRate configuration. Cron configuration isn't supported,nor
558-
* is non-Publisher return types (even if a {@code ReactiveAdapter} is registered).
562+
* Publisher. A {@code Runnable} which subscribes to that publisher is then repeatedly
563+
* scheduled according to the annotation configuration.
564+
* <p>Note that for fixed delay configuration, the subscription is turned into a blocking
565+
* call instead. Types for which a {@code ReactiveAdapter} is registered but which cannot
566+
* be deferred (i.e. not a {@code Publisher}) are not supported.
559567
* @param scheduled the {@code @Scheduled} annotation
560568
* @param method the method that the annotation has been declared on, which
561-
* MUST either return a Publisher or be a Kotlin suspending function
569+
* MUST either return a Publisher-adaptable type or be a Kotlin suspending function
562570
* @param bean the target bean instance
563571
* @see ScheduledAnnotationReactiveSupport
564572
*/
565-
protected void processScheduledReactive(Scheduled scheduled, Method method, Object bean) {
573+
protected void processScheduledAsync(Scheduled scheduled, Method method, Object bean) {
574+
Runnable task;
566575
try {
567-
boolean processedSchedule = false;
568-
String errorMessage =
569-
"Exactly one of the 'fixedDelay(String)' or 'fixedRate(String)' attributes is required";
570-
571-
Set<ScheduledAnnotationReactiveSupport.ReactiveTask> reactiveTasks = new LinkedHashSet<>(4);
572-
573-
// Determine initial delay
574-
Duration initialDelay = toDuration(scheduled.initialDelay(), scheduled.timeUnit());
575-
String initialDelayString = scheduled.initialDelayString();
576-
if (StringUtils.hasText(initialDelayString)) {
577-
Assert.isTrue(initialDelay.isNegative(), "Specify 'initialDelay' or 'initialDelayString', not both");
578-
if (this.embeddedValueResolver != null) {
579-
initialDelayString = this.embeddedValueResolver.resolveStringValue(initialDelayString);
580-
}
581-
if (StringUtils.hasLength(initialDelayString)) {
582-
try {
583-
initialDelay = toDuration(initialDelayString, scheduled.timeUnit());
584-
}
585-
catch (RuntimeException ex) {
586-
throw new IllegalArgumentException(
587-
"Invalid initialDelayString value \"" + initialDelayString + "\" - cannot parse into long");
588-
}
589-
}
590-
}
591-
592-
// Reject cron expression
593-
Assert.state(!StringUtils.hasText(scheduled.cron()), "'cron' not supported for reactive @Scheduled");
594-
595-
// At this point we don't need to differentiate between initial delay set or not anymore
596-
if (initialDelay.isNegative()) {
597-
initialDelay = Duration.ZERO;
598-
}
599-
600-
// Check fixed delay
601-
Duration fixedDelay = toDuration(scheduled.fixedDelay(), scheduled.timeUnit());
602-
if (!fixedDelay.isNegative()) {
603-
processedSchedule = true;
604-
reactiveTasks.add(new ScheduledAnnotationReactiveSupport.ReactiveTask(method, bean, initialDelay, fixedDelay, false));
605-
}
606-
607-
String fixedDelayString = scheduled.fixedDelayString();
608-
if (StringUtils.hasText(fixedDelayString)) {
609-
if (this.embeddedValueResolver != null) {
610-
fixedDelayString = this.embeddedValueResolver.resolveStringValue(fixedDelayString);
611-
}
612-
if (StringUtils.hasLength(fixedDelayString)) {
613-
Assert.isTrue(!processedSchedule, errorMessage);
614-
processedSchedule = true;
615-
try {
616-
fixedDelay = toDuration(fixedDelayString, scheduled.timeUnit());
617-
}
618-
catch (RuntimeException ex) {
619-
throw new IllegalArgumentException(
620-
"Invalid fixedDelayString value \"" + fixedDelayString + "\" - cannot parse into long");
621-
}
622-
reactiveTasks.add(new ScheduledAnnotationReactiveSupport.ReactiveTask(method, bean, initialDelay, fixedDelay, false));
623-
}
624-
}
625-
626-
// Check fixed rate
627-
Duration fixedRate = toDuration(scheduled.fixedRate(), scheduled.timeUnit());
628-
if (!fixedRate.isNegative()) {
629-
Assert.isTrue(!processedSchedule, errorMessage);
630-
processedSchedule = true;
631-
reactiveTasks.add(new ScheduledAnnotationReactiveSupport.ReactiveTask(method, bean, initialDelay, fixedRate, true));
632-
}
633-
String fixedRateString = scheduled.fixedRateString();
634-
if (StringUtils.hasText(fixedRateString)) {
635-
if (this.embeddedValueResolver != null) {
636-
fixedRateString = this.embeddedValueResolver.resolveStringValue(fixedRateString);
637-
}
638-
if (StringUtils.hasLength(fixedRateString)) {
639-
Assert.isTrue(!processedSchedule, errorMessage);
640-
processedSchedule = true;
641-
try {
642-
fixedRate = toDuration(fixedRateString, scheduled.timeUnit());
643-
}
644-
catch (RuntimeException ex) {
645-
throw new IllegalArgumentException(
646-
"Invalid fixedRateString value \"" + fixedRateString + "\" - cannot parse into long");
647-
}
648-
reactiveTasks.add(new ScheduledAnnotationReactiveSupport.ReactiveTask(method, bean, initialDelay, fixedRate, true));
649-
}
650-
}
651-
652-
// Check whether we had any attribute set
653-
Assert.isTrue(processedSchedule, errorMessage);
654-
655-
// Finally register the scheduled tasks (we synchronize on scheduledTasks on purpose)
656-
synchronized (this.scheduledTasks) {
657-
Set<ScheduledAnnotationReactiveSupport.ReactiveTask> subscriptionTasks = this.scheduledReactiveTasks.computeIfAbsent(bean, key -> new LinkedHashSet<>(4));
658-
subscriptionTasks.addAll(reactiveTasks);
659-
}
576+
boolean isFixedDelaySpecialCase = scheduled.fixedDelay() > 0 || StringUtils.hasText(scheduled.fixedDelayString());
577+
task = ScheduledAnnotationReactiveSupport.createSubscriptionRunnable(method, bean, isFixedDelaySpecialCase);
660578
}
661579
catch (IllegalArgumentException ex) {
662-
throw new IllegalStateException(
663-
"Encountered invalid reactive @Scheduled method '" + method.getName() + "': " + ex.getMessage(), ex);
580+
throw new IllegalStateException("Could not create recurring task for @Scheduled method '" + method.getName() + "': " + ex.getMessage());
664581
}
582+
processScheduledTask(scheduled, task, method, bean);
665583
}
666584

667585
/**
@@ -721,27 +639,20 @@ public Set<ScheduledTask> getScheduledTasks() {
721639
@Override
722640
public void postProcessBeforeDestruction(Object bean, String beanName) {
723641
Set<ScheduledTask> tasks;
724-
Set<ScheduledAnnotationReactiveSupport.ReactiveTask> reactiveTasks;
725642
synchronized (this.scheduledTasks) {
726643
tasks = this.scheduledTasks.remove(bean);
727-
reactiveTasks = this.scheduledReactiveTasks.remove(bean);
728644
}
729645
if (tasks != null) {
730646
for (ScheduledTask task : tasks) {
731647
task.cancel();
732648
}
733649
}
734-
if (reactiveTasks != null) {
735-
for (ScheduledAnnotationReactiveSupport.ReactiveTask task : reactiveTasks) {
736-
task.cancel();
737-
}
738-
}
739650
}
740651

741652
@Override
742653
public boolean requiresDestruction(Object bean) {
743654
synchronized (this.scheduledTasks) {
744-
return this.scheduledTasks.containsKey(bean) || this.scheduledReactiveTasks.containsKey(bean);
655+
return this.scheduledTasks.containsKey(bean);
745656
}
746657
}
747658

@@ -755,13 +666,6 @@ public void destroy() {
755666
}
756667
}
757668
this.scheduledTasks.clear();
758-
Collection<Set<ScheduledAnnotationReactiveSupport.ReactiveTask>> allReactiveTasks = this.scheduledReactiveTasks.values();
759-
for (Set<ScheduledAnnotationReactiveSupport.ReactiveTask> tasks : allReactiveTasks) {
760-
for (ScheduledAnnotationReactiveSupport.ReactiveTask task : tasks) {
761-
task.cancel();
762-
}
763-
}
764-
this.scheduledReactiveTasks.clear();
765669
}
766670
this.registrar.destroy();
767671
}

0 commit comments

Comments
 (0)