Skip to content

Commit 93c4c47

Browse files
author
Guido Gustavo Pollitzer
committed
Make TimedAssistantProvider use a single thread exclusively used for waiting delays.
Before adapting to the changes in the "task-flow" dependency which reworked the `TimersExtension.Assistant` interface.
1 parent 9ea509f commit 93c4c47

File tree

1 file changed

+67
-82
lines changed

1 file changed

+67
-82
lines changed

src/main/scala/providers/assistant/TimedAssistantProvider.scala

Lines changed: 67 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,7 @@ import readren.taskflow.{Doer, TimersExtension}
1111

1212
import java.lang.invoke.VarHandle
1313
import java.util
14-
import java.util.Arrays
1514
import java.util.concurrent.*
16-
import java.util.concurrent.TimeUnit.NANOSECONDS
1715
import java.util.concurrent.atomic.AtomicInteger
1816
import java.util.concurrent.locks.ReentrantLock
1917
import scala.concurrent.duration.FiniteDuration
@@ -126,7 +124,7 @@ class TimedAssistantProvider(
126124
try {
127125
var task = firstTaskInQueue
128126
firstTaskInQueue = null
129-
if task == null then task = taskQueue.poll()
127+
if task == null then task = taskQueue.poll()
130128
while task != null && taskQueueSizeIsPositive do {
131129
aDecrementIsPending = true
132130
task.run()
@@ -382,66 +380,71 @@ class TimedAssistantProvider(
382380
private var heap: Array[DelayedTask | Null] = Array.fill(INITIAL_DELAYED_TASK_QUEUE_CAPACITY)(null)
383381
private var size: Int = 0
384382

385-
private val thread: Thread = threadFactory.newThread(this)
386-
387-
/**
388-
* Thread designated to wait for the task at the head of the
389-
* queue. This variant of the Leader-Follower pattern
390-
* (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
391-
* minimize unnecessary timed waiting. When a thread becomes
392-
* the leader, it waits only for the next delay to elapse, but
393-
* other threads await indefinitely. The leader thread must
394-
* signal some other thread before returning from take() or
395-
* poll(...), unless some other thread becomes leader in the
396-
* interim. Whenever the head of the queue is replaced with a
397-
* task with an earlier expiration time, the leader field is
398-
* invalidated by being reset to null, and some waiting
399-
* thread, but not necessarily the current leader, is
400-
* signalled. So waiting threads must be prepared to acquire
401-
* and lose leadership while waiting.
402-
*/
403-
var leader: Thread = null
404-
383+
private var keepRunning = true
405384
private val lock = new ReentrantLock()
406385
/**
407386
* Condition signalled when a newer task becomes available at the
408387
* head of the queue or a new thread may need to become leader.
409388
*/
410389
private val available = lock.newCondition()
411390

391+
private val timeWaitingThread: Thread = threadFactory.newThread(this)
392+
timeWaitingThread.start()
412393

413394
def schedule(delayedTask: DelayedTask): Unit = {
414395
schedulesQueue.offer(delayedTask)
415396
available.signal()
416397
}
417398

399+
def stop(): Unit = {
400+
keepRunning = false
401+
available.signal()
402+
}
403+
418404
override def run(): Unit = {
419-
while true do {
420-
available.await()
421-
val delayedTask = schedulesQueue.poll()
422-
enqueue(delayedTask)
405+
while state.getPlain == State.keepRunning.ordinal do {
406+
val delayedTask: DelayedTask | Null = schedulesQueue.poll()
407+
if delayedTask != null then enqueue(delayedTask)
408+
409+
var firstTask = peek
410+
var currentNanoTime = System.nanoTime()
411+
while firstTask != null && firstTask.time <= currentNanoTime do {
412+
finishPoll(firstTask)
413+
firstTask.owner.executeSequentially(firstTask.runnable)
414+
if firstTask.interval > 0 then {
415+
firstTask.time += firstTask.interval
416+
if firstTask.time <= currentNanoTime then {
417+
val numberOfSkippedExecutions = 1L + (currentNanoTime - firstTask.time) / firstTask.interval
418+
firstTask.time += firstTask.interval * numberOfSkippedExecutions
419+
firstTask.numberOfSkippedExecutions = numberOfSkippedExecutions.toInt
420+
} else firstTask.numberOfSkippedExecutions = 0
421+
enqueue(firstTask)
422+
}
423+
currentNanoTime = System.nanoTime()
424+
firstTask = peek
425+
}
426+
lock.lock()
427+
try if firstTask == null then available.wait()
428+
else {
429+
val delay = firstTask.time - currentNanoTime
430+
firstTask = null
431+
available.awaitNanos(delay)
432+
}
433+
finally lock.unlock()
423434
}
424435
}
425436

426-
437+
private inline def peek: DelayedTask | Null = heap(0)
427438

428439
private def enqueue(task: DelayedTask): Unit = {
429-
val lock = this.lock
430-
lock.lock()
431-
try {
432-
val holeIndex = size
433-
if (holeIndex >= heap.length) grow()
434-
size = holeIndex + 1
435-
if (holeIndex == 0) {
436-
heap(0) = task
437-
task.heapIndex = 0
438-
}
439-
else siftUp(holeIndex, task)
440-
if heap(0) eq task then {
441-
leader = null
442-
available.signal()
443-
}
444-
} finally lock.unlock()
440+
val holeIndex = size
441+
if (holeIndex >= heap.length) grow()
442+
size = holeIndex + 1
443+
if (holeIndex == 0) {
444+
heap(0) = task
445+
task.heapIndex = 0
446+
}
447+
else siftUp(holeIndex, task)
445448
}
446449

447450
/**
@@ -458,35 +461,27 @@ class TimedAssistantProvider(
458461
task
459462
}
460463

461-
private def poll(): DelayedTask = {
462-
val lock = this.lock
463-
lock.lock()
464-
try {
465-
val first = heap(0)
466-
if (first == null || first.delay > 0) null
467-
else finishPoll(first)
468-
} finally lock.unlock()
464+
private def poll(currentNanoTime: NanoTime = System.nanoTime()): DelayedTask = {
465+
val first = heap(0)
466+
if first == null || first.time > currentNanoTime then null
467+
else finishPoll(first)
469468
}
470469

471470
private def remove(task: DelayedTask): Boolean = {
472-
val lock = this.lock
473-
lock.lock()
474-
try {
475-
val taskIndex = indexOf(task)
476-
if (taskIndex < 0) return false
477-
heap(taskIndex).heapIndex = -1
478-
size -= 1
479-
val s = size
480-
val replacement = heap(s)
481-
heap(s) = null
482-
if (s != taskIndex) {
483-
siftDown(taskIndex, replacement)
484-
if (heap(taskIndex) eq replacement) siftUp(taskIndex, replacement)
485-
}
486-
true
487-
} finally lock.unlock()
471+
val taskIndex = indexOf(task)
472+
if (taskIndex < 0) return false
473+
heap(taskIndex).heapIndex = -1
474+
size -= 1
475+
val s = size
476+
val replacement = heap(s)
477+
heap(s) = null
478+
if (s != taskIndex) {
479+
siftDown(taskIndex, replacement)
480+
if (heap(taskIndex) eq replacement) siftUp(taskIndex, replacement)
481+
}
482+
true
488483
}
489-
484+
490485
private inline def indexOf(task: DelayedTask): Int = task.heapIndex
491486

492487
/**
@@ -552,20 +547,9 @@ class TimedAssistantProvider(
552547
}
553548
}
554549

555-
private class DelayedTask(val owner: DoerAssistant, timerKey: TimerKey, val time: NanoTime, interval: Duration, runnable: Runnable) extends Runnable {
550+
private class DelayedTask(val owner: DoerAssistant, val timerKey: TimerKey, var time: NanoTime, val interval: Duration, val runnable: Runnable) {
556551
var heapIndex: Int = -1
557-
558-
override def run(): Unit = {
559-
runnable.run()
560-
// TODO
561-
}
562-
563-
/**
564-
* @return the remaining delay; zero or negative values indicate that the delay has already elapsed
565-
*/
566-
def delay: Duration =
567-
time - System.nanoTime()
568-
552+
var numberOfSkippedExecutions: Int = 0
569553
}
570554

571555
/**
@@ -577,6 +561,7 @@ class TimedAssistantProvider(
577561
* @throws SecurityException @inheritDoc
578562
*/
579563
override def shutdown(): Unit = {
564+
delayedTasksQueue.stop()
580565
if state.compareAndSet(State.keepRunning.ordinal, State.shutdownWhenAllWorkersSleep.ordinal) && workers.forall(_.isAsleep) then stopAllWorkers()
581566
}
582567

0 commit comments

Comments
 (0)