Skip to content

Commit 348ccf3

Browse files
author
Guido Gustavo Pollitzer
committed
Adapt to the changes made to the "task-flow" dependency.
1 parent cd15494 commit 348ccf3

9 files changed

+142
-94
lines changed

.gitignore

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,5 +46,12 @@ build/
4646
### Mac OS ###
4747
.DS_Store
4848

49+
### VSC & metals ###
50+
.bloop
51+
.metals
52+
.vscode
53+
metals.sbt
54+
4955
### Scala ###
50-
.bsp/
56+
.bsp
57+

build.sbt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ lazy val root = (project in file("."))
99
)
1010

1111
ThisBuild / libraryDependencies ++= Seq(
12-
"readren" %% "taskflow-core" % "0.2.4-SNAPSHOT",
12+
"readren" %% "taskflow-core" % "0.2.5-SNAPSHOT",
1313
"org.typelevel" %% "scalacheck-effect" % "1.0.4" % Test,
1414
"org.typelevel" %% "scalacheck-effect-munit" % "1.0.4" % Test
1515
)

src/main/scala/providers/assistant/TimedAssistantProvider.scala renamed to src/main/scala/providers/assistant/SchedulingAssistantProvider.scala

Lines changed: 29 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,11 @@ package providers.assistant
33

44
import core.{Matrix, MatrixDoer}
55
import providers.ShutdownAble
6-
import providers.assistant.TimedAssistantProvider.*
6+
import providers.assistant.SchedulingAssistantProvider.*
77
import providers.doer.AssistantBasedDoerProvider.DoerAssistantProvider
88

9-
import readren.taskflow.TimersExtension.{FixedRateLike, NanoDuration}
10-
import readren.taskflow.{Doer, TimersExtension}
9+
import readren.taskflow.SchedulingExtension.NanoDuration
10+
import readren.taskflow.{Doer, SchedulingExtension}
1111

1212
import java.lang.invoke.VarHandle
1313
import java.util
@@ -17,7 +17,7 @@ import java.util.concurrent.locks.ReentrantLock
1717
import scala.annotation.tailrec
1818
import scala.concurrent.duration.FiniteDuration
1919

20-
object TimedAssistantProvider {
20+
object SchedulingAssistantProvider {
2121
type TaskQueue = ConcurrentLinkedQueue[Runnable]
2222
/** A nano time based on the [[System.nanoTime]] method. */
2323
type NanoTime = Long
@@ -51,7 +51,7 @@ object TimedAssistantProvider {
5151
* @param applyMemoryFence Determines whether memory fences are applied to ensure that store operations made by a task happen before load operations performed by successive tasks enqueued to the same [[Doer.Assistant]].
5252
* The application of memory fences is optional because no test case has been devised to demonstrate their necessity. Apparently, the ordering constraints are already satisfied by the surrounding code.
5353
*/
54-
class TimedAssistantProvider(
54+
class SchedulingAssistantProvider(
5555
applyMemoryFence: Boolean = true,
5656
threadPoolSize: Int = Runtime.getRuntime.availableProcessors(),
5757
failureReporter: Throwable => Unit = _.printStackTrace(),
@@ -83,7 +83,7 @@ class TimedAssistantProvider(
8383
workers.foreach(_.start())
8484
}
8585

86-
class DoerAssistant(val id: MatrixDoer.Id) extends Doer.Assistant, TimersExtension.Assistant { thisDoerAssistant =>
86+
class DoerAssistant(val id: MatrixDoer.Id) extends Doer.Assistant, SchedulingExtension.Assistant { thisDoerAssistant =>
8787
private val taskQueue: TaskQueue = new ConcurrentLinkedQueue[Runnable]
8888
private val taskQueueSize: AtomicInteger = new AtomicInteger(0)
8989
@volatile private var firstTaskInQueue: Runnable = null
@@ -115,7 +115,7 @@ class TimedAssistantProvider(
115115
*
116116
* If at least one pending task remains unconsumed — typically because it is not yet visible from the [[Worker.thread]] — this [[DoerAssistant]] is enqueued into the [[queuedDoersAssistants]] queue to be assigned to a worker at a later time.
117117
*/
118-
private[TimedAssistantProvider] final def executePendingTasks(): Int = {
118+
private[SchedulingAssistantProvider] final def executePendingTasks(): Int = {
119119
doerAssistantThreadLocal.set(thisDoerAssistant)
120120
if debugEnabled then assert(taskQueueSize.get > 0)
121121
var processedTasksCounter: Int = 0
@@ -156,13 +156,13 @@ class TimedAssistantProvider(
156156
override def newDelaySchedule(delay: NanoDuration): Schedule =
157157
new ScheduleImpl(thisDoerAssistant, delay, 0L, false)
158158

159-
override def newFixedRateSchedule(initialDelay: NanoDuration, interval: NanoDuration): Schedule & FixedRateLike =
159+
override def newFixedRateSchedule(initialDelay: NanoDuration, interval: NanoDuration): Schedule =
160160
new ScheduleImpl(thisDoerAssistant, initialDelay, interval, true)
161161

162162
override def newFixedDelaySchedule(initialDelay: NanoDuration, delay: NanoDuration): Schedule =
163163
new ScheduleImpl(thisDoerAssistant, initialDelay, delay, false)
164164

165-
override def executeSequentiallyScheduled(schedule: Schedule, originalRunnable: Runnable): Unit = {
165+
override def scheduleSequentially(schedule: Schedule, originalRunnable: Runnable): Unit = {
166166
val currentTime = System.nanoTime()
167167

168168
object fixedDelayWrapper extends Runnable {
@@ -178,18 +178,18 @@ class TimedAssistantProvider(
178178
override def run(): Unit = {
179179
@tailrec
180180
def loop(currentTime: NanoTime): Unit = {
181+
if schedule.heapIndex < 0 then return
181182
schedule.startingTime = currentTime
182183
schedule.numOfSkippedExecutions = (currentTime - schedule.scheduledTime) / schedule.interval
183184
originalRunnable.run()
184185
// TODO analyze if the following lines must be in a `finally` block whose `try`'s body is `originalRunnable.run()`
185186
val nextTime = schedule.scheduledTime + schedule.interval * (1L + schedule.numOfSkippedExecutions)
187+
schedule.scheduledTime = nextTime
186188
val newCurrentTime = System.nanoTime()
187189
if nextTime <= newCurrentTime then loop(newCurrentTime)
188-
else {
189-
schedule.scheduledTime = nextTime
190-
scheduler.schedule(schedule)
191-
}
190+
else scheduler.schedule(schedule)
192191
}
192+
193193
loop(System.nanoTime())
194194
}
195195
}
@@ -493,7 +493,7 @@ class TimedAssistantProvider(
493493

494494
private inline def peek: ScheduleImpl | Null = heap(0)
495495

496-
/** Adds the provided element to this min-heap based priority queue. */
496+
/** Adds the provided element to this min-heap based priority queue. */
497497
private def enqueue(element: ScheduleImpl): Unit = {
498498
val holeIndex = size
499499
if holeIndex >= heap.length then grow()
@@ -600,11 +600,24 @@ class TimedAssistantProvider(
600600
}
601601
}
602602

603-
class ScheduleImpl(val owner: DoerAssistant, val initialDelay: NanoTime, val interval: Duration, val isFixedRate: Boolean) extends FixedRateLike {
603+
class ScheduleImpl(val owner: DoerAssistant, val initialDelay: NanoTime, val interval: Duration, val isFixedRate: Boolean) {
604+
/** The [[Runnable]] that this [[TimersExtension.Assistant.Schedule]] schedules. */
605+
private[SchedulingAssistantProvider] var runnable: Runnable | Null = null
606+
/** Exposes the time the [[Runnable]] is expected to be run.
607+
* Updated after the [[Runnable]] execution is completed. */
604608
var scheduledTime: NanoTime = 0L
605-
var runnable: Runnable | Null = null
606609
@volatile var heapIndex: Int = -1
610+
/** Exposes the number of executions of the [[Runnable]] that were skipped before the current one due to processing power saturation or negative `initialDelay`.
611+
* It is calculated based on the scheduled interval, and the difference between the actual [[startingTime]] and the scheduled time:
612+
* {{{ (actualTime - scheduledTime) / interval }}}
613+
* Updated before the [[Runnable]] is run.
614+
* The value of this variable is used after the [[runnable]]'s execution completes to calculate the [[scheduledTime]]; therefore, the [[runnable]] may modify it to affect the resulting [[scheduledTime]] and therefore when it's next execution will be.
615+
* Intended to be accessed only within the thread that is currently running the [[Runnable]] that is scheduled by this instance. */
607616
var numOfSkippedExecutions: Long = 0
617+
/** Exposes the [[System.nanoTime]] when the current execution started.
618+
* The [[numOfSkippedExecutions]] is calculated based on this time.
619+
* Updated before the [[Runnable]] is run.
620+
* Intended to be accessed only within the thread that is currently running the [[Runnable]] that is scheduled by this instance. */
608621
var startingTime: NanoTime = 0L
609622
}
610623

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package readren.matrix
2+
package providers.doer
3+
4+
import core.{AbstractMatrix, MatrixDoer}
5+
import providers.ShutdownAble
6+
import providers.assistant.SchedulingAssistantProvider
7+
import providers.doer.SchedulingDoerProvider.ProvidedDoer
8+
9+
import readren.taskflow.SchedulingExtension
10+
11+
import java.util.concurrent.{Executors, ThreadFactory}
12+
13+
object SchedulingDoerProvider {
14+
class ProvidedDoer(id: MatrixDoer.Id, anAssistant: SchedulingAssistantProvider#ProvidedAssistant, matrix: AbstractMatrix) extends MatrixDoer(id, anAssistant, matrix), SchedulingExtension {
15+
override type SchedulingAssistant = SchedulingAssistantProvider#ProvidedAssistant
16+
override val schedulingAssistant: SchedulingAssistant = anAssistant
17+
}
18+
}
19+
20+
class SchedulingDoerProvider(
21+
applyMemoryFence: Boolean = true,
22+
threadPoolSize: Int = Runtime.getRuntime.availableProcessors(),
23+
failureReporter: Throwable => Unit = _.printStackTrace(),
24+
threadFactory: ThreadFactory = Executors.defaultThreadFactory()
25+
) extends AssistantBasedDoerProvider[ProvidedDoer] {
26+
override protected val assistantProvider: SchedulingAssistantProvider = new SchedulingAssistantProvider(applyMemoryFence, threadPoolSize, failureReporter, threadFactory)
27+
28+
override def provide(matrix: AbstractMatrix): ProvidedDoer = {
29+
val doerId = matrix.genDoerId()
30+
new ProvidedDoer(doerId, assistantProvider.provide(doerId), matrix)
31+
}
32+
}

src/main/scala/providers/doer/TimedDoerProvider.scala

Lines changed: 0 additions & 23 deletions
This file was deleted.

src/main/scala/pruebas/AideImpl.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,5 +18,5 @@ class AideImpl[D <: MatrixDoer, DP <: Matrix.DoerProvider[D]](
1818

1919
override def buildDoerProviderManager(owner: Matrix[AideImpl[D, DP]]): DPsManager = new ShutdownAbleDpd
2020

21-
override def buildLogger(owner: Matrix[AideImpl[D, DP]]): Logger = new SimpleLogger(Logger.Level.debug)
21+
override def buildLogger(owner: Matrix[AideImpl[D, DP]]): Logger = new SimpleLogger(Logger.Level.info)
2222
}
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
package readren.matrix
2+
package pruebas
3+
4+
import core.Matrix.DoerProviderDescriptor
5+
import core.{Continue, Matrix, Started, Stop}
6+
import providers.doer.SchedulingDoerProvider
7+
import rf.RegularRf
8+
9+
import java.util.concurrent.TimeUnit
10+
import scala.concurrent.duration.FiniteDuration
11+
12+
object PruebaScheduling {
13+
14+
15+
@main def runPruebaTimed(): Unit = {
16+
17+
case object Tick
18+
19+
val schedulingDpd = new DoerProviderDescriptor[SchedulingDoerProvider.ProvidedDoer, SchedulingDoerProvider]("schedulingDpd") {
20+
override def build(owner: Matrix.DoerProvidersManager): SchedulingDoerProvider = new SchedulingDoerProvider(false)
21+
}
22+
val aide = new AideImpl(schedulingDpd)
23+
24+
val matrix = new Matrix("scheduled", aide)
25+
println(s"Matrix created")
26+
27+
val timedDoer = matrix.provideDoer(schedulingDpd)
28+
29+
if false then {
30+
@volatile var inside = false
31+
32+
var count = 0
33+
val schedule = timedDoer.newFixedRateSchedule(1_000_000_000, 1_000)
34+
timedDoer.scheduleSequentially(schedule) { () =>
35+
assert(!inside)
36+
inside = true
37+
count += 1
38+
println(s"count=$count, numOfSkippedExecutions=${schedule.numOfSkippedExecutions}, diff=${(schedule.startingTime - schedule.scheduledTime) / 100}, thread=${Thread.currentThread().getId}")
39+
inside = false
40+
}
41+
42+
} else {
43+
matrix.spawns[Started.type | Tick.type](RegularRf, timedDoer) { reactant =>
44+
val selfEndpoint = reactant.endpointProvider.local[Tick.type]
45+
val interval = FiniteDuration(1, TimeUnit.SECONDS)
46+
var counter: Int = 0
47+
{
48+
case m@(Started | Tick) =>
49+
counter += 1
50+
if counter > 10000 then {
51+
timedDoer.cancelAll()
52+
Stop
53+
} else {
54+
println(s"The $m number $counter was received, thread=${Thread.currentThread().getId}")
55+
val schedule: timedDoer.Schedule = timedDoer.newFixedRateSchedule((counter % 1000) * 1_000_000, 1_000_000_000)
56+
timedDoer.scheduleSequentially(schedule) { () =>
57+
println(s"counter=$counter, diff=${schedule.startingTime - schedule.scheduledTime}, thread=${Thread.currentThread().getId}")
58+
selfEndpoint.tell(Tick)
59+
}
60+
// timedDoer.Duty.delay(interval) { () =>
61+
// selfEndpoint.tell(Tick)
62+
// selfEndpoint.tell(Tick)
63+
// }
64+
// .triggerAndForget()
65+
Continue
66+
}
67+
}
68+
}.triggerAndForget()
69+
}
70+
}
71+
}

src/main/scala/pruebas/PruebaTimed.scala

Lines changed: 0 additions & 40 deletions
This file was deleted.

src/main/scala/timed/MatrixTimedDoer.scala

Lines changed: 0 additions & 12 deletions
This file was deleted.

0 commit comments

Comments
 (0)