Skip to content

Commit b7b8342

Browse files
author
Guido Gustavo Pollitzer
committed
SchedulingAssistantProvider:
- Move `ScheduleImpl` inside `DoerAssistant`. - Add the `DoerAssistant.ScheduleImpl.isActive` field to avoid the more used `heapIndex` and `isEnabled` fields have to be @volatile. - Factor out the common code of the `scheduler` command methods into the new `scheduler.signal` method. - Add the `scheduler.enabledSchedulesByAssistant` to be able to iterator over the enabled schedules when `cancelAll` is called or the `scheduler` is stopped.
1 parent 348ccf3 commit b7b8342

File tree

3 files changed

+147
-95
lines changed

3 files changed

+147
-95
lines changed

src/main/scala/providers/assistant/SchedulingAssistantProvider.scala

Lines changed: 124 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package readren.matrix
22
package providers.assistant
33

4-
import core.{Matrix, MatrixDoer}
4+
import core.MatrixDoer
55
import providers.ShutdownAble
66
import providers.assistant.SchedulingAssistantProvider.*
77
import providers.doer.AssistantBasedDoerProvider.DoerAssistantProvider
@@ -15,7 +15,7 @@ import java.util.concurrent.*
1515
import java.util.concurrent.atomic.AtomicInteger
1616
import java.util.concurrent.locks.ReentrantLock
1717
import scala.annotation.tailrec
18-
import scala.concurrent.duration.FiniteDuration
18+
import scala.collection.mutable
1919

2020
object SchedulingAssistantProvider {
2121
type TaskQueue = ConcurrentLinkedQueue[Runnable]
@@ -154,62 +154,100 @@ class SchedulingAssistantProvider(
154154
override type Schedule = ScheduleImpl
155155

156156
override def newDelaySchedule(delay: NanoDuration): Schedule =
157-
new ScheduleImpl(thisDoerAssistant, delay, 0L, false)
157+
new ScheduleImpl(delay, 0L, false)
158158

159159
override def newFixedRateSchedule(initialDelay: NanoDuration, interval: NanoDuration): Schedule =
160-
new ScheduleImpl(thisDoerAssistant, initialDelay, interval, true)
160+
new ScheduleImpl(initialDelay, interval, true)
161161

162162
override def newFixedDelaySchedule(initialDelay: NanoDuration, delay: NanoDuration): Schedule =
163-
new ScheduleImpl(thisDoerAssistant, initialDelay, delay, false)
163+
new ScheduleImpl(initialDelay, delay, false)
164164

165165
override def scheduleSequentially(schedule: Schedule, originalRunnable: Runnable): Unit = {
166166
val currentTime = System.nanoTime()
167+
assert(!schedule.isActive)
168+
schedule.isActive = true
167169

168170
object fixedDelayWrapper extends Runnable {
169171
override def run(): Unit = {
170-
originalRunnable.run()
171-
// TODO analyze if the following lines must be in a `finally` block whose `try`'s body is `originalRunnable.run()`
172-
schedule.scheduledTime = System.nanoTime() + schedule.interval
173-
scheduler.schedule(schedule)
172+
if schedule.isActive then {
173+
originalRunnable.run()
174+
// TODO analyze if the following lines must be in a `finally` block whose `try`'s body is `originalRunnable.run()`
175+
scheduler.schedule(schedule, System.nanoTime() + schedule.interval)
176+
}
174177
}
175178
}
176179

177180
object fixedRateWrapper extends Runnable {
178181
override def run(): Unit = {
179182
@tailrec
180183
def loop(currentTime: NanoTime): Unit = {
181-
if schedule.heapIndex < 0 then return
182-
schedule.startingTime = currentTime
183-
schedule.numOfSkippedExecutions = (currentTime - schedule.scheduledTime) / schedule.interval
184-
originalRunnable.run()
185-
// TODO analyze if the following lines must be in a `finally` block whose `try`'s body is `originalRunnable.run()`
186-
val nextTime = schedule.scheduledTime + schedule.interval * (1L + schedule.numOfSkippedExecutions)
187-
schedule.scheduledTime = nextTime
188-
val newCurrentTime = System.nanoTime()
189-
if nextTime <= newCurrentTime then loop(newCurrentTime)
190-
else scheduler.schedule(schedule)
184+
if schedule.isActive then {
185+
schedule.startingTime = currentTime
186+
schedule.numOfSkippedExecutions = (currentTime - schedule.scheduledTime) / schedule.interval
187+
originalRunnable.run()
188+
// TODO analyze if the following lines must be in a `finally` block whose `try`'s body is `originalRunnable.run()`
189+
val nextTime = schedule.scheduledTime + schedule.interval * (1L + schedule.numOfSkippedExecutions)
190+
val newCurrentTime = System.nanoTime()
191+
if nextTime <= newCurrentTime then {
192+
schedule.scheduledTime = nextTime
193+
loop(newCurrentTime)
194+
} else scheduler.schedule(schedule, nextTime)
195+
}
191196
}
192197

193198
loop(System.nanoTime())
194199
}
195200
}
196201

197-
val scheduledRunnable: Runnable =
202+
schedule.runnable =
198203
if schedule.interval > 0 then {
199204
if schedule.isFixedRate then fixedRateWrapper
200205
else fixedDelayWrapper
201206
} else originalRunnable
202-
schedule.scheduledTime = currentTime + schedule.initialDelay
203-
schedule.runnable = scheduledRunnable
204-
if schedule.initialDelay <= 0 then executeSequentially(scheduledRunnable)
205-
else scheduler.schedule(schedule)
207+
scheduler.schedule(schedule, currentTime + schedule.initialDelay)
206208
}
207209

208210
override def cancel(schedule: Schedule): Unit = scheduler.cancel(schedule)
209211

210212
override def cancelAll(): Unit = scheduler.cancelAllBelongingTo(thisDoerAssistant)
211213

212-
override def isActive(schedule: Schedule): Boolean = schedule.heapIndex >= 0
214+
/** An instance becomes active when is passed to the [[scheduleSequentially]] method.
215+
* An instances becomes inactive when it is passed to the [[cancel]] method or when [[cancelAll]] is called. */
216+
override def isActive(schedule: Schedule): Boolean = schedule.isActive
217+
218+
219+
/** IMPORTANT: Represents a unique entity where equality and hash code must be based on identity. */
220+
class ScheduleImpl(val initialDelay: NanoTime, val interval: Duration, val isFixedRate: Boolean) {
221+
/** The [[Runnable]] that this [[TimersExtension.Assistant.Schedule]] schedules. */
222+
private[SchedulingAssistantProvider] var runnable: Runnable | Null = null
223+
/** Exposes the time the [[Runnable]] is expected to be run.
224+
* Updated after the [[Runnable]] execution is completed. */
225+
var scheduledTime: NanoTime = 0L
226+
/** The index of this instance in the array-based min-heap. */
227+
private[SchedulingAssistantProvider] var heapIndex: Int = -1
228+
/** Exposes the number of executions of the [[Runnable]] that were skipped before the current one due to processing power saturation or negative `initialDelay`.
229+
* It is calculated based on the scheduled interval, and the difference between the actual [[startingTime]] and the scheduled time:
230+
* {{{ (actualTime - scheduledTime) / interval }}}
231+
* Updated before the [[Runnable]] is run.
232+
* The value of this variable is used after the [[runnable]]'s execution completes to calculate the [[scheduledTime]]; therefore, the [[runnable]] may modify it to affect the resulting [[scheduledTime]] and therefore when it's next execution will be.
233+
* Intended to be accessed only within the thread that is currently running the [[Runnable]] that is scheduled by this instance. */
234+
var numOfSkippedExecutions: Long = 0
235+
/** Exposes the [[System.nanoTime]] when the current execution started.
236+
* The [[numOfSkippedExecutions]] is calculated based on this time.
237+
* Updated before the [[Runnable]] is run.
238+
* Intended to be accessed only within the thread that is currently running the [[Runnable]] that is scheduled by this instance. */
239+
var startingTime: NanoTime = 0L
240+
/** An instance becomes enabled when the [[scheduledTime]] is reached, and it's [[runnable]] is enqueued in [[thisDoerAssistant.taskQueue]].
241+
* An instance becomes disabled after the [[runnable]] execution finishes and the */
242+
private[SchedulingAssistantProvider] var isEnabled = false
243+
/** An instance becomes active when is passed to the [[thisDoerAssistant.scheduleSequentially]] method.
244+
* An instances becomes inactive when it is passed to the [[thisDoerAssistant.cancel]] method or when [[thisDoerAssistant.cancelAll]] is called.
245+
*
246+
* Implementation note: This var may be replaced with {{{ def isActive = !isEnabled && heapIndex < 0}}} but that would require both [[isEnabled]] and [[heapIndex]] to be @volatile. */
247+
@volatile private[SchedulingAssistantProvider] var isActive = false
248+
249+
inline def owner: thisDoerAssistant.type = thisDoerAssistant
250+
}
213251
}
214252

215253
/** @return `true` if a worker was awakened.
@@ -411,9 +449,10 @@ class SchedulingAssistantProvider(
411449
}
412450

413451
private object scheduler extends Runnable {
414-
private var commandsQueue = new ConcurrentLinkedQueue[Runnable]()
415-
private var heap: Array[ScheduleImpl | Null] = Array.fill(INITIAL_DELAYED_TASK_QUEUE_CAPACITY)(null)
452+
private val commandsQueue = new ConcurrentLinkedQueue[Runnable]()
453+
private var heap: Array[DoerAssistant#ScheduleImpl | Null] = Array.fill(INITIAL_DELAYED_TASK_QUEUE_CAPACITY)(null)
416454
private var size: Int = 0
455+
private var enabledSchedulesByAssistant: util.HashMap[DoerAssistant, mutable.HashSet[DoerAssistant#ScheduleImpl]] = new util.HashMap()
417456

418457
private var isRunning = true
419458
private val lock = new ReentrantLock()
@@ -425,36 +464,52 @@ class SchedulingAssistantProvider(
425464
private val timeWaitingThread: Thread = threadFactory.newThread(this)
426465
timeWaitingThread.start()
427466

428-
def schedule(schedule: ScheduleImpl): Unit = {
429-
commandsQueue.offer(() => enqueue(schedule))
430-
lock.lock()
431-
try commandPending.signal()
432-
finally lock.unlock()
467+
def schedule(schedule: DoerAssistant#ScheduleImpl, scheduleTime: NanoTime): Unit = {
468+
signal { () =>
469+
if schedule.isEnabled then {
470+
schedule.isEnabled = false
471+
enabledSchedulesByAssistant.computeIfPresent(schedule.owner, (_, enabledSchedules) => enabledSchedules.subtractOne(schedule))
472+
}
473+
schedule.scheduledTime = scheduleTime
474+
enqueue(schedule)
475+
}
433476
}
434477

435-
def cancel(schedule: ScheduleImpl): Unit = {
436-
commandsQueue.offer(() => remove(schedule))
437-
lock.lock()
438-
try commandPending.signal()
439-
finally lock.unlock()
478+
def cancel(schedule: DoerAssistant#ScheduleImpl): Unit = {
479+
signal { () =>
480+
schedule.isActive = false
481+
if schedule.isEnabled then {
482+
schedule.isEnabled = false
483+
enabledSchedulesByAssistant.computeIfPresent(schedule.owner, (_, enabledSchedules) => enabledSchedules.subtractOne(schedule))
484+
} else remove(schedule)
485+
}
440486
}
441487

442488
def cancelAllBelongingTo(assistant: DoerAssistant): Unit = {
443-
commandsQueue.offer { () =>
489+
signal { () =>
444490
var index = size
445491
while index > 0 do {
446492
index -= 1
447493
val schedule = heap(index)
448-
if schedule.owner eq assistant then remove(schedule)
494+
if schedule.owner eq assistant then {
495+
schedule.isActive = false
496+
remove(schedule)
497+
}
498+
}
499+
500+
enabledSchedulesByAssistant.remove(assistant).foreach { schedule =>
501+
schedule.isActive = false
502+
schedule.isEnabled = false
449503
}
450504
}
451-
lock.lock()
452-
try commandPending.signal()
453-
finally lock.unlock()
454505
}
455506

456507
def stop(): Unit = {
457-
isRunning = false
508+
signal(() => isRunning = false)
509+
}
510+
511+
private def signal(command: Runnable): Unit = {
512+
commandsQueue.offer(command)
458513
lock.lock()
459514
try commandPending.signal()
460515
finally lock.unlock()
@@ -468,33 +523,43 @@ class SchedulingAssistantProvider(
468523
command = commandsQueue.poll()
469524
}
470525

471-
var firstTask = peek
526+
var earlierSchedule = peek
472527
var currentNanoTime = System.nanoTime()
473-
while firstTask != null && firstTask.scheduledTime <= currentNanoTime do {
474-
finishPoll(firstTask)
475-
firstTask.owner.executeSequentially(firstTask.runnable)
528+
while earlierSchedule != null && earlierSchedule.scheduledTime <= currentNanoTime do {
529+
finishPoll(earlierSchedule)
530+
earlierSchedule.isEnabled = true
531+
enabledSchedulesByAssistant.merge(earlierSchedule.owner, mutable.HashSet(earlierSchedule), (_, enabledSchedules) => enabledSchedules.addOne(earlierSchedule))
532+
earlierSchedule.owner.executeSequentially(earlierSchedule.runnable)
476533
currentNanoTime = System.nanoTime()
477-
firstTask = peek
534+
earlierSchedule = peek
478535
}
479536
lock.lock()
480537
try {
481-
if firstTask == null then commandPending.await()
538+
if earlierSchedule == null then commandPending.await()
482539
else {
483-
val delay = firstTask.scheduledTime - currentNanoTime
484-
firstTask = null // do not keep unnecessary references while waiting to avoid unnecessary memory retention
540+
val delay = earlierSchedule.scheduledTime - currentNanoTime
541+
earlierSchedule = null // do not keep unnecessary references while waiting to avoid unnecessary memory retention
485542
commandPending.awaitNanos(delay)
486543
}
487544
}
488545
finally lock.unlock()
489546
}
547+
commandsQueue.clear() // do not keep unnecessary references while waiting to avoid unnecessary memory retention
548+
for i <- 0 until size do heap(i).isActive = false
490549
heap = null // do not keep unnecessary references while waiting to avoid unnecessary memory retention
491-
commandsQueue = null // do not keep unnecessary references while waiting to avoid unnecessary memory retention
550+
enabledSchedulesByAssistant.forEach { (_, enabledSchedules) =>
551+
enabledSchedules.foreach { schedule =>
552+
schedule.isActive = false
553+
schedule.isEnabled = false
554+
}
555+
}
556+
enabledSchedulesByAssistant = null // do not keep unnecessary references while waiting to avoid unnecessary memory retention
492557
}
493558

494-
private inline def peek: ScheduleImpl | Null = heap(0)
559+
private inline def peek: DoerAssistant#ScheduleImpl | Null = heap(0)
495560

496561
/** Adds the provided element to this min-heap based priority queue. */
497-
private def enqueue(element: ScheduleImpl): Unit = {
562+
private def enqueue(element: DoerAssistant#ScheduleImpl): Unit = {
498563
val holeIndex = size
499564
if holeIndex >= heap.length then grow()
500565
size = holeIndex + 1
@@ -510,7 +575,7 @@ class SchedulingAssistantProvider(
510575
* Assumes the provided element is the same as the returned by [[peek]].
511576
* @param peekedElement the [[ScheduleImpl]] to remove and return.
512577
*/
513-
private def finishPoll(peekedElement: ScheduleImpl): ScheduleImpl = {
578+
private def finishPoll(peekedElement: DoerAssistant#ScheduleImpl): DoerAssistant#ScheduleImpl = {
514579
size -= 1;
515580
val s = size
516581
val replacement = heap(s)
@@ -522,7 +587,7 @@ class SchedulingAssistantProvider(
522587

523588
/** Removes the provided element from this queue.
524589
* @return true if the element was removed; false if it is not contained by this queue. */
525-
private def remove(element: ScheduleImpl): Boolean = {
590+
private def remove(element: DoerAssistant#ScheduleImpl): Boolean = {
526591
val elemIndex = indexOf(element)
527592
if elemIndex < 0 then return false
528593
element.heapIndex = -1
@@ -537,14 +602,14 @@ class SchedulingAssistantProvider(
537602
true
538603
}
539604

540-
private inline def indexOf(task: ScheduleImpl): Int = task.heapIndex
605+
private inline def indexOf(task: DoerAssistant#ScheduleImpl): Int = task.heapIndex
541606

542607
/**
543608
* Replaces the element at position `holeIndex` of the heap-based array with the `providedElement` and rearranges it and its parents as necessary to ensure that all parents are less than or equal to their children.
544609
* Note that for the entire heap to satisfy the min-heap property, the `providedElement` must be less than or equal to the children of `holeIndex`.
545610
* Sifts element added at bottom up to its heap-ordered spot.
546611
*/
547-
private def siftUp(holeIndex: Int, providedElement: ScheduleImpl): Unit = {
612+
private def siftUp(holeIndex: Int, providedElement: DoerAssistant#ScheduleImpl): Unit = {
548613
var gapIndex = holeIndex
549614
var zero = 0
550615
while (gapIndex > zero) {
@@ -565,7 +630,7 @@ class SchedulingAssistantProvider(
565630
* Replaces the element that is currently at position `holeIndex` of the heap-based array with the `providedElement` and rearranges the elements in the subtree rooted at `holeIndex` such that the subtree conform to the min-heap property.
566631
* Sifts element added at top down to its heap-ordered spot.
567632
*/
568-
private def siftDown(holeIndex: Int, providedElement: ScheduleImpl): Unit = {
633+
private def siftDown(holeIndex: Int, providedElement: DoerAssistant#ScheduleImpl): Unit = {
569634
var gapIndex = holeIndex
570635
var half = size >>> 1
571636
while gapIndex < half do {
@@ -600,27 +665,6 @@ class SchedulingAssistantProvider(
600665
}
601666
}
602667

603-
class ScheduleImpl(val owner: DoerAssistant, val initialDelay: NanoTime, val interval: Duration, val isFixedRate: Boolean) {
604-
/** The [[Runnable]] that this [[TimersExtension.Assistant.Schedule]] schedules. */
605-
private[SchedulingAssistantProvider] var runnable: Runnable | Null = null
606-
/** Exposes the time the [[Runnable]] is expected to be run.
607-
* Updated after the [[Runnable]] execution is completed. */
608-
var scheduledTime: NanoTime = 0L
609-
@volatile var heapIndex: Int = -1
610-
/** Exposes the number of executions of the [[Runnable]] that were skipped before the current one due to processing power saturation or negative `initialDelay`.
611-
* It is calculated based on the scheduled interval, and the difference between the actual [[startingTime]] and the scheduled time:
612-
* {{{ (actualTime - scheduledTime) / interval }}}
613-
* Updated before the [[Runnable]] is run.
614-
* The value of this variable is used after the [[runnable]]'s execution completes to calculate the [[scheduledTime]]; therefore, the [[runnable]] may modify it to affect the resulting [[scheduledTime]] and therefore when it's next execution will be.
615-
* Intended to be accessed only within the thread that is currently running the [[Runnable]] that is scheduled by this instance. */
616-
var numOfSkippedExecutions: Long = 0
617-
/** Exposes the [[System.nanoTime]] when the current execution started.
618-
* The [[numOfSkippedExecutions]] is calculated based on this time.
619-
* Updated before the [[Runnable]] is run.
620-
* Intended to be accessed only within the thread that is currently running the [[Runnable]] that is scheduled by this instance. */
621-
var startingTime: NanoTime = 0L
622-
}
623-
624668
/**
625669
* Makes this [[Matrix.DoerAssistantProvider]] to shut down when all the workers are sleeping.
626670
* Invocation has no additional effect if already shut down.

src/main/scala/providers/assistant/SharedQueueDoerAssistantProvider.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ class SharedQueueDoerAssistantProvider(
6161
private val workers: Array[Worker] = Array.tabulate(threadPoolSize)(Worker.apply)
6262

6363
private val runningWorkersLatch: CountDownLatch = new CountDownLatch(workers.length)
64-
/** Usually equal to the number of workers that whose [[Worker.isSleeping]] flat is set, but may be temporarily greater. Never smaller.
64+
/** Usually equal to the number of workers whose [[Worker.isSleeping]] flag is set, but may be temporarily greater. Never smaller.
6565
* Invariant: {{{ workers.count(_.isSleeping) <= sleepingWorkersCount.get <= workers.length }}} */
6666
private val sleepingWorkersCount = AtomicInteger(0)
6767

0 commit comments

Comments
 (0)