Skip to content

Commit cd15494

Browse files
author
Guido Gustavo Pollitzer
committed
Finish the implementation of TimedAssistantProvider and add TimedDoerProvider and MatrixTimedDoer to support scheduling.
Also rename to third person tense the methods that return a `Duty`/`Task` and require the returned `Duty`/`Task` be triggered in order for the method's intended action to be performed. Add the `PruebaTimed` to test the scheduling work. Curiously it had a single bug (a `wait` that had to be an `await`). I can't believe it! Surely there are more bugs I will find later, as always.
1 parent 93c4c47 commit cd15494

21 files changed

+301
-150
lines changed

build.sbt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ lazy val root = (project in file("."))
99
)
1010

1111
ThisBuild / libraryDependencies ++= Seq(
12-
"readren" %% "taskflow-core" % "0.2.3-SNAPSHOT",
12+
"readren" %% "taskflow-core" % "0.2.4-SNAPSHOT",
1313
"org.typelevel" %% "scalacheck-effect" % "1.0.4" % Test,
1414
"org.typelevel" %% "scalacheck-effect-munit" % "1.0.4" % Test
1515
)

checked/src/main/scala/pruebas/PruebaChecked.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ object PruebaChecked {
3030

3131
val matrix = new Matrix("testChecked", matrixAide)
3232

33-
matrix.spawn[Cmd](RegularRf) { parent =>
33+
matrix.spawns[Cmd](RegularRf) { parent =>
3434
val cb: CheckedBehavior[Cmd, MyException] =
3535
CheckedBehavior.factory[Cmd, MyException, NeverException] {
3636
case cmd: DoWork =>

src/main/scala/core/AbstractMatrix.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ abstract class AbstractMatrix(val name: String) { thisMatrix =>
3838
def provideDefaultDoer: MatrixDoer
3939

4040
/** thread-safe */
41-
def spawn[U](
41+
def spawns[U](
4242
childFactory: ReactantFactory,
4343
childDoer: MatrixDoer = provideDefaultDoer
4444
)(
@@ -47,7 +47,7 @@ abstract class AbstractMatrix(val name: String) { thisMatrix =>
4747
using isSignalTest: IsSignalTest[U]
4848
): doer.Duty[ReactantRelay[U]] = {
4949
doer.Duty.mineFlat { () =>
50-
spawner.createReactant[U](childFactory, childDoer, isSignalTest, initialBehaviorBuilder)
50+
spawner.createsReactant[U](childFactory, childDoer, isSignalTest, initialBehaviorBuilder)
5151
}
5252
}
5353

src/main/scala/core/Reactant.scala

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -89,13 +89,13 @@ abstract class Reactant[U](
8989
def initialize(): doer.Duty[this.type] = { // send Started signal after all the vals and vars have been initialized
9090
doer.checkWithin()
9191
assert(currentBehavior == null)
92-
selfStart(false, initialBehaviorBuilder).map(_ => thisReactant) // TODO considerar hacer que selfStart devuelva Duty[this.type] para evitar este 'map` del final. Esto requiere que selfStop, selfRestar, stayIdleUntilNextMessageArrive, y otros que ahora devuelven Duty[Unit] también hagan lo mismo.
92+
selfStarts(false, initialBehaviorBuilder).map(_ => thisReactant) // TODO considerar hacer que selfStarts devuelva Duty[this.type] para evitar este 'map` del final. Esto requiere que selfStop, selfRestar, stayIdleUntilNextMessageArrive, y otros que ahora devuelven Duty[Unit] también hagan lo mismo.
9393
}
9494

9595
/** Starts or restarts this [[Reactant]].
9696
* Should be called only once and within the [[doer]].
9797
* */
98-
private def selfStart(comesFromRestart: Boolean, behaviorBuilder: ReactantRelay[U] => Behavior[U]): doer.Duty[Unit] = {
98+
private def selfStarts(comesFromRestart: Boolean, behaviorBuilder: ReactantRelay[U] => Behavior[U]): doer.Duty[Unit] = {
9999
doer.checkWithin()
100100
currentBehavior = behaviorBuilder(thisReactant)
101101
val handleResult = handleSignal(if comesFromRestart then isSignalTest.restarted else isSignalTest.started)
@@ -106,12 +106,12 @@ abstract class Reactant[U](
106106
case ToStop =>
107107
selfStop()
108108
case tr: ToRestart[U @unchecked] =>
109-
selfRestart(tr.stopChildren, tr.restartBehaviorBuilder)
109+
selfRestarts(tr.stopChildren, tr.restartBehaviorBuilder)
110110
}
111111
}
112112

113113
/** Should be called withing the [[doer]]. */
114-
override def spawn[V](
114+
override def spawns[V](
115115
childReactantFactory: ReactantFactory,
116116
childDoer: MatrixDoer
117117
)(
@@ -126,26 +126,26 @@ abstract class Reactant[U](
126126
childrenRelays = spawner.childrenView
127127
spawner
128128
}(alreadyBuiltSpawner => alreadyBuiltSpawner)
129-
.createReactant[V](childReactantFactory, childDoer, isSignalTest, initialChildBehaviorBuilder)
129+
.createsReactant[V](childReactantFactory, childDoer, isSignalTest, initialChildBehaviorBuilder)
130130
}
131131

132132
/** The children of this [[Reactant]] by serial number.
133133
*
134-
* Should be called withing the [[doer]]. */
134+
* Calls must be within the [[doer]]. */
135135
override def children: MapView[Long, ReactantRelay[?]] = {
136136
doer.checkWithin()
137137
childrenRelays
138138
}
139139

140-
/** should be called within the [[doer]]. */
141-
private final def selfRestart(stopChildren: Boolean, restartBehaviorBuilder: ReactantRelay[U] => Behavior[U]): doer.Duty[Unit] = {
140+
/** Calls must be within the [[doer]]. */
141+
private final def selfRestarts(stopChildren: Boolean, restartBehaviorBuilder: ReactantRelay[U] => Behavior[U]): doer.Duty[Unit] = {
142142
doer.checkWithin()
143143

144144
def restartMe(): doer.Duty[Unit] = {
145145
// send RestartReceived signal
146146
val hr = handleSignal(isSignalTest.restartReceived)
147147
mapHrToDecision(hr) match {
148-
case ToContinue => selfStart(true, restartBehaviorBuilder)
148+
case ToContinue => selfStarts(true, restartBehaviorBuilder)
149149
case ToStop =>
150150
// if the `handleSignal` responds `Stop` to the `RestartReceived` signal, then the restart is canceled and the reactant is stopped instead, which provokes the signal handler be called again with a `StopReceived` signal.
151151
selfStop()
@@ -154,17 +154,17 @@ abstract class Reactant[U](
154154
val stopsChildrenIfInstructed =
155155
if tr.stopChildren && !stopChildren then {
156156
oSpawner.fold(doer.dutyReadyUnit) { spawner =>
157-
spawner.stopChildren()
157+
spawner.stopsChildren()
158158
}
159159
}
160160
else doer.dutyReadyUnit
161-
stopsChildrenIfInstructed.flatMap(_ => selfStart(true, tr.restartBehaviorBuilder))
161+
stopsChildrenIfInstructed.flatMap(_ => selfStarts(true, tr.restartBehaviorBuilder))
162162
}
163163
}
164164

165165
if stopChildren then {
166166
oSpawner.fold(restartMe()) { spawner =>
167-
spawner.stopChildren().flatMap(_ => restartMe())
167+
spawner.stopsChildren().flatMap(_ => restartMe())
168168
}
169169
} else restartMe()
170170
}
@@ -184,7 +184,7 @@ abstract class Reactant[U](
184184
mapHrToDecision(currentBehavior.handle(stoppedSignal)) match {
185185
case ToContinue => ()
186186
case ToStop => selfStop()
187-
case tr: ToRestart[U @unchecked] => selfRestart(tr.stopChildren, tr.restartBehaviorBuilder).triggerAndForget(true)
187+
case tr: ToRestart[U @unchecked] => selfRestarts(tr.stopChildren, tr.restartBehaviorBuilder).triggerAndForget(true)
188188
}
189189
}
190190
}
@@ -261,7 +261,7 @@ abstract class Reactant[U](
261261
isReadyToProcessMsg = false
262262
activeWatchSubscriptions.forEach { (k, v) => v.foreach(_.unsubscribe()) }
263263
oSpawner.fold(stopMe()) { spawner =>
264-
spawner.stopChildren().trigger(true)(_ => stopMe())
264+
spawner.stopsChildren().trigger(true)(_ => stopMe())
265265
}
266266
}
267267
stopCovenant.subscriptableDuty
@@ -358,12 +358,12 @@ abstract class Reactant[U](
358358
finalDecision match {
359359
case ToContinue => beReadyToProcess()
360360
case ToStop => selfStop()
361-
case tr: ToRestart[U @unchecked] => selfRestart(tr.stopChildren, tr.restartBehaviorBuilder).triggerAndForget(true)
361+
case tr: ToRestart[U @unchecked] => selfRestarts(tr.stopChildren, tr.restartBehaviorBuilder).triggerAndForget(true)
362362
}
363363
}
364364

365365

366-
override def diagnose: doer.Duty[ReactantDiagnostic] =
366+
override def diagnoses: doer.Duty[ReactantDiagnostic] =
367367
doer.Duty.mine { () =>
368368
val childrenDiagnostic = children.map(_._2.staleDiagnose).toArray
369369
ReactantDiagnostic(thisReactant.isReadyToProcessMsg, thisReactant.isMarkedToStop, thisReactant.stopWasStarted, inbox.size, inbox.iterator, childrenDiagnostic)

src/main/scala/core/ReactantFactory.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,9 @@ import readren.taskflow.Doer
55

66
trait ReactantFactory {
77

8-
/** Creates a new [[Reactant]].
8+
/** Creates a [[Duty]] that creates a new [[Reactant]].
99
* The implementation should be thread-safe, doing its job withing the received [[MatrixDoer]]. */
10-
def createReactant[U, MD <: MatrixDoer](
10+
def createsReactant[U, MD <: MatrixDoer](
1111
id: Reactant.SerialNumber,
1212
progenitor: Spawner[?],
1313
reactantDoer: MD,

src/main/scala/core/ReactantRelay.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,8 @@ abstract class ReactantRelay[-U] {
2222
* This method is thread-safe. */
2323
def isMarkedToBeStopped: Boolean
2424

25-
/** Should be called withing the [[doer]]. */
26-
def spawn[V](
25+
/** Calls must be within the [[doer]]. */
26+
def spawns[V](
2727
childFactory: ReactantFactory,
2828
childDoer: MatrixDoer = doer.matrix.provideDefaultDoer
2929
)(
@@ -32,7 +32,7 @@ abstract class ReactantRelay[-U] {
3232
using isSignalTest: IsSignalTest[V]
3333
): doer.Duty[ReactantRelay[V]]
3434

35-
/** Should be called within the [[doer]]. */
35+
/** Calls must be within the [[doer]]. */
3636
def children: MapView[Long, ReactantRelay[?]]
3737

3838
/**
@@ -49,7 +49,7 @@ abstract class ReactantRelay[-U] {
4949
*
5050
* This duty is the same as the returned by the [[stop]] method.
5151
*
52-
* This method is thread-safe but some methods of the returned [[SubscriptableDuty]] require being called withing the [[doer]]. */
52+
* This method is thread-safe but some methods of the returned [[SubscriptableDuty]] require being called within the [[doer]]. */
5353
def stopDuty: doer.SubscriptableDuty[Unit]
5454

5555
/** Registers this [[Reactant]] to be notified with the specified signal when the given `watchedReactant` is fully stopped.
@@ -71,7 +71,7 @@ abstract class ReactantRelay[-U] {
7171
def watch[SS <: U](watchedReactant: ReactantRelay[?], stoppedSignal: SS, univocally: Boolean = true, subscriptionCompleted: Maybe[doer.Covenant[Unit]] = Maybe.empty): Maybe[WatchSubscription]
7272

7373
/** Provides diagnostic information about the current instance. */
74-
def diagnose: doer.Duty[ReactantDiagnostic]
74+
def diagnoses: doer.Duty[ReactantDiagnostic]
7575

7676
/** Provides diagnostic information about the current instance that may be stale due to cache visibility issues across processor cores. */
7777
def staleDiagnose: ReactantDiagnostic

src/main/scala/core/Spawner.scala

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,26 +11,27 @@ object Spawner {
1111
}
1212

1313
/** A progenitor of [[Reactant]]s.
14-
* @param doer the [[MatrixDoer]] within which the mutable members of this class are mutated; and the [[Doer]] that contains the [[Duty]] returned by [[createReactant]].
14+
* @param doer the [[MatrixDoer]] within which the mutable members of this class are mutated; and the [[Doer]] that contains the [[Duty]] returned by [[createsReactant]].
1515
* If this [[Spawner]] has an owner then the [[MatrixDoer]] referred by this parameter should be the same as the assigned to the `owner`.
1616
* @param initialReactantSerial child reactants' serial will start at this value. Allows to distribute the load of [[MatrixDoer]] more evenly among siblings.
1717
* @tparam MD the singleton type of the [[MatrixDoer]] assigned to the `owner`.
1818
* */
1919
class Spawner[+MD <: MatrixDoer](val owner: Maybe[Reactant[?]], val doer: MD, initialReactantSerial: Reactant.SerialNumber) { thisSpawner =>
2020
assert(owner.fold(true)(_.doer eq doer))
2121

22-
/** Should be accessed only within the [[doer]] */
22+
/** Access must be within the [[doer]]. */
2323
private var reactantSerialSequencer: Reactant.SerialNumber = initialReactantSerial
2424

25-
/** Should be accessed within the [[doer]] only. */
25+
/** Access must be within the [[doer]]. */
2626
private val children: mutable.LongMap[Reactant[?]] = mutable.LongMap.empty
2727

2828
/** A view of the children that aren't fully stopped.
29-
* Should be accessed withing the [[doer]]. */
29+
* Access must be within the [[doer]]. */
3030
val childrenView: MapView[Long, Reactant[?]] = children.view
3131

32-
/** Should be called withing the [[doer]] only. */
33-
def createReactant[U](
32+
/**Creates a [[Duty]] that creates a new [[Reactant]].
33+
* Calls must be within the [[doer]]. */
34+
def createsReactant[U](
3435
childFactory: ReactantFactory,
3536
childDoer: MatrixDoer,
3637
isSignalTest: IsSignalTest[U],
@@ -39,22 +40,22 @@ class Spawner[+MD <: MatrixDoer](val owner: Maybe[Reactant[?]], val doer: MD, in
3940
doer.checkWithin()
4041
reactantSerialSequencer += 1
4142
val reactantSerial = reactantSerialSequencer
42-
childFactory.createReactant(reactantSerial, thisSpawner, childDoer, isSignalTest, initialBehaviorBuilder)
43+
childFactory.createsReactant(reactantSerial, thisSpawner, childDoer, isSignalTest, initialBehaviorBuilder)
4344
.onBehalfOf(doer)
4445
.map { reactant =>
4546
children.addOne(reactantSerial, reactant)
4647
reactant
4748
}
4849
}
4950

50-
/** should be called withing the doer */
51-
def stopChildren(): doer.Duty[Array[Unit]] = {
51+
/** Calls must be within the [[doer]]. */
52+
def stopsChildren(): doer.Duty[Array[Unit]] = {
5253
doer.checkWithin()
5354
val stopDuties = childrenView.values.map(child => doer.Duty.foreign(child.doer)(child.stop()))
5455
doer.Duty.sequenceToArray(stopDuties)
5556
}
5657

57-
/** should be called withing the doer */
58+
/** Calls must be within the [[doer]]. */
5859
def removeChild(childSerial: Long): Unit = {
5960
doer.checkWithin()
6061
children -= childSerial

src/main/scala/providers/CloseableDoerProvidersManager.scala renamed to src/main/scala/providers/ShutdownAbleDpd.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,9 @@ import readren.taskflow.Doer
1010
import java.util.concurrent.atomic.AtomicBoolean
1111
import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
1212

13-
class CloseableDoerProvidersManager extends DoerProvidersManager, ShutdownAble {
13+
/** An implementation of the [[DoerProvidersManager]] trait that is [[ShutdownAble]].
14+
* Call to methods inherited from [[ShutdownAble]] are propagated to all the [[DoerProvider]] instances managed by this instance, provided the [[DoerProvider]] are [[ShutdownAble]]. */
15+
class ShutdownAbleDpd extends DoerProvidersManager, ShutdownAble {
1416

1517
private val registeredProviders: ConcurrentHashMap[DoerProviderDescriptor[?, ?], DoerProvider[?]] = new ConcurrentHashMap()
1618
private val wasShutdown: AtomicBoolean = new AtomicBoolean(false)
@@ -22,7 +24,7 @@ class CloseableDoerProvidersManager extends DoerProvidersManager, ShutdownAble {
2224
if wasShutdown.get() then null
2325
else descriptor.build(this)
2426
}).asInstanceOf[DP]
25-
if provider == null then throw new IllegalStateException(s"A ${getTypeName[CloseableDoerProvidersManager]} instance was asked to build a new instances of ${getTypeName[DoerProvider[D]]} after it was shutdown.")
27+
if provider == null then throw new IllegalStateException(s"A ${getTypeName[ShutdownAbleDpd]} instance was asked to build a new instances of ${getTypeName[DoerProvider[D]]} after it was shutdown.")
2628
else provider
2729
}
2830

0 commit comments

Comments
 (0)