Skip to content

Commit 253265b

Browse files
author
Guido Gustavo Pollitzer
committed
Assistant provider:
- Replace inner-class projections with facade interfaces as a work-around to the compiler bug I discovered: see scala/scala3#22508 AssistantBasedDoerProvider: - Make `DoerAssistantProvider` use a type parameter instead of an abstract type member to allow/simplify the propagation of the type of the provided assistant. - Add the assistant type as a type parameter. SchedulingAssistant: - correct the enabled schedules not being remembered bug.
1 parent b7b8342 commit 253265b

14 files changed

+264
-488
lines changed

build.sbt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
ThisBuild / version := "0.1.0-SNAPSHOT"
22

3-
ThisBuild / scalaVersion := "3.6.2"
3+
ThisBuild / scalaVersion := "3.6.3"
44

55
lazy val root = (project in file("."))
66
.settings(

dump/thread_dump.txt

19.3 KB
Binary file not shown.

src/main/scala/providers/assistant/BalancedDoerAssistantProvider.scala

Lines changed: 26 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ package providers.assistant
33

44
import core.MatrixDoer
55
import providers.ShutdownAble
6-
import providers.assistant.BalancedDoerAssistantProvider.currentAssistant
6+
import providers.assistant.BalancedDoerAssistantProvider.{AssistantImpl, currentAssistant}
77
import providers.doer.AssistantBasedDoerProvider.DoerAssistantProvider
88

99
import readren.taskflow.Doer
@@ -13,9 +13,30 @@ import java.util.concurrent.atomic.AtomicInteger
1313

1414
object BalancedDoerAssistantProvider {
1515
private val currentAssistant: ThreadLocal[Doer.Assistant] = new ThreadLocal()
16+
17+
class AssistantImpl(
18+
val index: Int,
19+
failureReporter: Throwable => Unit = _.printStackTrace(),
20+
threadFactory: ThreadFactory = Executors.defaultThreadFactory(),
21+
queueFactory: () => BlockingQueue[Runnable] = () => new LinkedBlockingQueue[Runnable]()
22+
) extends Doer.Assistant { thisAssistant =>
23+
val doSiThEx: ThreadPoolExecutor = {
24+
val tf: ThreadFactory = (r: Runnable) => threadFactory.newThread { () =>
25+
currentAssistant.set(thisAssistant)
26+
r.run()
27+
}
28+
new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, queueFactory(), tf)
29+
}
30+
31+
override def executeSequentially(runnable: Runnable): Unit = doSiThEx.execute(runnable)
32+
33+
override def current: Doer.Assistant = currentAssistant.get
34+
35+
override def reportFailure(cause: Throwable): Unit = failureReporter(cause)
36+
}
1637
}
1738

18-
/** A [[Doer.Assistant]] provider with a rudimentary thread-load balancing mechanism.
39+
/** A [[Doer.Assistant]] provider with a non-dynamic thread-load balancing mechanism.
1940
* How it works:
2041
* - Manages a fixed number of assistants equal to the thread pool size, each of which owns a thread-worker.
2142
* - A call to the [[provide]] method returns the assistant with the shortest task queue at the moment of the call.
@@ -27,30 +48,13 @@ class BalancedDoerAssistantProvider(
2748
failureReporter: Throwable => Unit = _.printStackTrace(),
2849
threadFactory: ThreadFactory = Executors.defaultThreadFactory(),
2950
queueFactory: () => BlockingQueue[Runnable] = () => new LinkedBlockingQueue[Runnable]()
30-
) extends DoerAssistantProvider, ShutdownAble {
31-
override type ProvidedAssistant = AssistantImpl
51+
) extends DoerAssistantProvider[BalancedDoerAssistantProvider.AssistantImpl], ShutdownAble {
3252

33-
private val assistants = Array.tabulate[AssistantImpl](threadPoolSize)(index => new ProvidedAssistant(index))
53+
private val assistants = Array.tabulate[AssistantImpl](threadPoolSize)(index => new AssistantImpl(index, failureReporter, threadFactory, queueFactory))
3454

3555
private val switcher = new AtomicInteger(0)
3656

37-
class AssistantImpl(val index: Int) extends Doer.Assistant { thisAssistant =>
38-
val doSiThEx: ThreadPoolExecutor = {
39-
val tf: ThreadFactory = (r: Runnable) => threadFactory.newThread { () =>
40-
currentAssistant.set(thisAssistant)
41-
r.run()
42-
}
43-
new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, queueFactory(), tf)
44-
}
45-
46-
override def executeSequentially(runnable: Runnable): Unit = doSiThEx.execute(runnable)
47-
48-
override def current: Doer.Assistant = currentAssistant.get
49-
50-
override def reportFailure(cause: Throwable): Unit = failureReporter(cause)
51-
}
52-
53-
override def provide(serial: MatrixDoer.Id): ProvidedAssistant = {
57+
override def provide(serial: MatrixDoer.Id): AssistantImpl = {
5458
val assistantsWithShortestWorkQueue = findExecutorsWithShortestWorkQueue()
5559
val pickedAssistant =
5660
if assistantsWithShortestWorkQueue.tail == Nil then assistantsWithShortestWorkQueue.head

src/main/scala/providers/assistant/SchedulingAssistantProvider.scala

Lines changed: 126 additions & 394 deletions
Large diffs are not rendered by default.

src/main/scala/providers/assistant/SharedQueueDoerAssistantProvider.scala

Lines changed: 34 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,12 @@ object SharedQueueDoerAssistantProvider {
2727

2828
def currentWorkerIndex: Int = workerIndexThreadLocal.get
2929
// val UNSAFE: Unsafe = Unsafe.getUnsafe
30+
31+
trait DoerAssistant extends Doer.Assistant {
32+
def id: MatrixDoer.Id
33+
/** Exposes the number of [[Runnable]]s that are in the task-queue waiting to be executed sequentially. */
34+
def numOfPendingTasks: Int
35+
}
3036
}
3137

3238
/** A dynamically balanced [[Doer.Assistant]] provider.
@@ -46,17 +52,15 @@ class SharedQueueDoerAssistantProvider(
4652
threadPoolSize: Int = Runtime.getRuntime.availableProcessors(),
4753
failureReporter: Throwable => Unit = _.printStackTrace(),
4854
threadFactory: ThreadFactory = Executors.defaultThreadFactory()
49-
) extends DoerAssistantProvider, ShutdownAble { thisSharedQueueDoerProvider =>
50-
51-
override type ProvidedAssistant = DoerAssistant
55+
) extends DoerAssistantProvider[DoerAssistant], ShutdownAble { thisSharedQueueDoerAssistantProvider =>
5256

5357
private val state: AtomicInteger = new AtomicInteger(State.keepRunning.ordinal)
5458

55-
/** Queue of [[DoerAssistant]] with pending tasks that are waiting to be assigned to a [[Worker]] in order to process them.
59+
/** Queue of [[DoerAssistantImpl]] with pending tasks that are waiting to be assigned to a [[Worker]] in order to process them.
5660
*
57-
* Invariant: this queue contains no duplicate elements due to the [[DoerAssistant.queueForSequentialExecution]] logic.
61+
* Invariant: this queue contains no duplicate elements due to the [[DoerAssistantImpl.queueForSequentialExecution]] logic.
5862
* TODO try using my own implementation of concurrent queue that avoids dynamic memory allocation. */
59-
private val queuedDoersAssistants = new ConcurrentLinkedQueue[DoerAssistant]()
63+
private val queuedDoersAssistants = new ConcurrentLinkedQueue[DoerAssistantImpl]()
6064

6165
private val workers: Array[Worker] = Array.tabulate(threadPoolSize)(Worker.apply)
6266

@@ -73,12 +77,12 @@ class SharedQueueDoerAssistantProvider(
7377
workers.foreach(_.start())
7478
}
7579

76-
class DoerAssistant(val id: MatrixDoer.Id) extends Doer.Assistant { thisDoerAssistant =>
80+
protected class DoerAssistantImpl(override val id: MatrixDoer.Id) extends DoerAssistant { thisDoerAssistant =>
7781
private val taskQueue: TaskQueue = new ConcurrentLinkedQueue[Runnable]
7882
private val taskQueueSize: AtomicInteger = new AtomicInteger(0)
7983
@volatile private var firstTaskInQueue: Runnable = null
8084

81-
inline def hasPendingTasks: Boolean = taskQueueSize.get > 0
85+
override def numOfPendingTasks: Int = taskQueueSize.get
8286

8387
override def executeSequentially(task: Runnable): Unit = {
8488
if taskQueueSize.getAndIncrement() == 0 then {
@@ -100,12 +104,12 @@ class SharedQueueDoerAssistantProvider(
100104
/** Executes all the pending tasks that are visible from the calling [[Worker.thread]].
101105
*
102106
* Note: The [[taskQueueSize]] is decremented not immediately after polling a task from the [[taskQueue]] but only after the task is executed.
103-
* This ensures that another thread invoking `queuedForSequentialExecution` does not call [[wakeUpASleepingWorkerIfAny]] passing this [[DoerAssistant]] or enqueue this [[DoerAssistant]] into the [[queuedDoersAssistants]] queue,
104-
* which would violate the constraint that prevents two workers from being assigned to the same [[DoerAssistant]] instance simultaneously.
107+
* This ensures that another thread invoking `queuedForSequentialExecution` does not call [[wakeUpASleepingWorkerIfAny]] passing this [[DoerAssistantImpl]] or enqueue this [[DoerAssistantImpl]] into the [[queuedDoersAssistants]] queue,
108+
* which would violate the constraint that prevents two workers from being assigned to the same [[DoerAssistantImpl]] instance simultaneously.
105109
*
106-
* If at least one pending task remains unconsumed — typically because it is not yet visible from the [[Worker.thread]] — this [[DoerAssistant]] is enqueued into the [[queuedDoersAssistants]] queue to be assigned to a worker at a later time.
110+
* If at least one pending task remains unconsumed — typically because it is not yet visible from the [[Worker.thread]] — this [[DoerAssistantImpl]] is enqueued into the [[queuedDoersAssistants]] queue to be assigned to a worker at a later time.
107111
*/
108-
private[SharedQueueDoerAssistantProvider] final def executePendingTasks(): Int = {
112+
final def executePendingTasks(): Int = {
109113
doerAssistantThreadLocal.set(thisDoerAssistant)
110114
if debugEnabled then assert(taskQueueSize.get > 0)
111115
var processedTasksCounter: Int = 0
@@ -121,7 +125,7 @@ class SharedQueueDoerAssistantProvider(
121125
task.run()
122126
processedTasksCounter += 1
123127
aDecrementIsPending = false
124-
// the `taskQueueSize` must be decremented after (not before) running the task to avoid that other thread executing `queuedForSequentialExecution` to enqueue this DoerAssistant into `queuedDoersAssistants` allowing the worst problem to occur: two workers assigned to the same DoerAssistant.
128+
// the `taskQueueSize` must be decremented after (not before) running the task to avoid that other thread executing `queuedForSequentialExecution` to enqueue this SchedulingAssistantImpl into `queuedDoersAssistants` allowing the worst problem to occur: two workers assigned to the same SchedulingAssistantImpl.
125129
taskQueueSizeIsPositive = taskQueueSize.decrementAndGet() > 0
126130
if taskQueueSizeIsPositive then task = taskQueue.poll()
127131
}
@@ -140,12 +144,14 @@ class SharedQueueDoerAssistantProvider(
140144
def diagnose(sb: StringBuilder): StringBuilder = {
141145
sb.append(f"(id=$id, taskQueueSize=${taskQueueSize.get}%3d)")
142146
}
147+
148+
override def toString: String = s"${utils.CompileTime.getTypeName[DoerAssistantImpl]}(id=${id})"
143149
}
144150

145151
/** @return `true` if a worker was awakened.
146-
* The provided [[DoerAssistant]] will be assigned to the awakened worker.
147-
* Asumes the provided [[DoerAssistant]] is and will not be enqueued in [[queuedDoersAssistants]], which ensures it will not be assigned to any other worker simultaneously. */
148-
private def wakeUpASleepingWorkerIfAny(stimulator: DoerAssistant): Boolean = {
152+
* The provided [[DoerAssistantImpl]] will be assigned to the awakened worker.
153+
* Asumes the provided [[DoerAssistantImpl]] is and will not be enqueued in [[queuedDoersAssistants]], which ensures it will not be assigned to any other worker simultaneously. */
154+
private def wakeUpASleepingWorkerIfAny(stimulator: DoerAssistantImpl): Boolean = {
149155
if debugEnabled then assert(!queuedDoersAssistants.contains(stimulator))
150156
if sleepingWorkersCount.get > 0 then {
151157
var workerIndex = lastAwakenedWorkerIndex - 1
@@ -183,12 +189,12 @@ class SharedQueueDoerAssistantProvider(
183189
private var refusedTriesToSleepsCounter: Int = 0
184190

185191
/**
186-
* A [[DoerAssistant]] instance that jumps the queue established by the [[circularIterator]] that determines the order in which the [[DoerAssistant]] instances are assigned to this worker.
192+
* A [[DoerAssistantImpl]] instance that jumps the queue established by the [[circularIterator]] that determines the order in which the [[DoerAssistantImpl]] instances are assigned to this worker.
187193
* Should not be modified by any thread other than the [[thread]] of this worker unless this worker is sleeping.
188-
* Is set by [[wakeUpIfSleeping]] while this worker is sleeping, and by [[run]] after calling [[DoerAssistant.executePendingTasks()]] if the task-queue was not completely emptied;
194+
* Is set by [[wakeUpIfSleeping]] while this worker is sleeping, and by [[run]] after calling [[DoerAssistantImpl.executePendingTasks()]] if the task-queue was not completely emptied;
189195
* is read by this worker after it is awakened;
190196
* and is cleared by this worker after it is awakened. */
191-
private var queueJumper: DoerAssistant | Null = null
197+
private var queueJumper: DoerAssistantImpl | Null = null
192198

193199
/**
194200
* Remember the greatest value that [[refusedTriesToSleepsCounter]] reached before it has been reset because a pending task becomes visible.
@@ -215,7 +221,7 @@ class SharedQueueDoerAssistantProvider(
215221
override def run(): Unit = {
216222
workerIndexThreadLocal.set(index)
217223
while keepRunning do {
218-
val assignedDoerAssistant: DoerAssistant | Null =
224+
val assignedDoerAssistant: DoerAssistantImpl | Null =
219225
if queueJumper != null then queueJumper
220226
else queuedDoersAssistants.poll()
221227
queueJumper = null
@@ -227,13 +233,13 @@ class SharedQueueDoerAssistantProvider(
227233
processedTasksCounter += assignedDoerAssistant.executePendingTasks()
228234
completedMainLoopsCounter += 1
229235
}
230-
catch { // TODO analyze if clarity would suffer too much if [[DoerAssistant.executePendingTasks]] accepted a partial function with this catch removing the necessity of this try-catch.
236+
catch { // TODO analyze if clarity would suffer too much if [[SchedulingAssistantImpl.executePendingTasks]] accepted a partial function with this catch removing the necessity of this try-catch.
231237
case e: Throwable =>
232238
if thread.getUncaughtExceptionHandler == null && Thread.getDefaultUncaughtExceptionHandler == null then failureReporter(e)
233239
// Let the current thread to terminate abruptly, create a new one, and start it with the same Runnable (this worker).
234240
thisWorker.synchronized {
235241
thread = threadFactory.newThread(this)
236-
// Memorize the assigned DoerAssistant such that the new thread be assigned to the same [[DoerAssistant]]. It will continue with the task after the one that threw the exception.
242+
// Memorize the assigned SchedulingAssistantImpl such that the new thread be assigned to the same [[SchedulingAssistantImpl]]. It will continue with the task after the one that threw the exception.
237243
queueJumper = assignedDoerAssistant
238244
}
239245
thread.start()
@@ -284,11 +290,11 @@ class SharedQueueDoerAssistantProvider(
284290
}
285291

286292
/** Wakes up this [[Worker]] if it is currently sleeping.
287-
* @param stimulator the [[DoerAssistant]] to be assigned to this worker upon awakening,
293+
* @param stimulator the [[DoerAssistantImpl]] to be assigned to this worker upon awakening,
288294
* provided it has not already been assigned to another [[Worker]].
289295
* @return `true` if this worker was sleeping and has been awakened, otherwise `false`.
290296
*/
291-
def wakeUpIfSleeping(stimulator: DoerAssistant): Boolean = {
297+
def wakeUpIfSleeping(stimulator: DoerAssistantImpl): Boolean = {
292298
if potentiallySleeping then {
293299
thisWorker.synchronized {
294300
if isSleeping then {
@@ -336,8 +342,8 @@ class SharedQueueDoerAssistantProvider(
336342
}
337343
}
338344

339-
override def provide(serial: MatrixDoer.Id): ProvidedAssistant = {
340-
new DoerAssistant(serial)
345+
override def provide(serial: MatrixDoer.Id): DoerAssistant = {
346+
new DoerAssistantImpl(serial)
341347
}
342348

343349
/**
@@ -358,9 +364,10 @@ class SharedQueueDoerAssistantProvider(
358364
}
359365

360366
override def diagnose(sb: StringBuilder): StringBuilder = {
361-
sb.append(thisSharedQueueDoerProvider.getClass.getSimpleName)
367+
sb.append(utils.CompileTime.getTypeName[SharedQueueDoerAssistantProvider])
362368
sb.append('\n')
363369
sb.append(s"\tstate=${State.fromOrdinal(state.get)}\n")
370+
sb.append(s"\trunningWorkersLatch=${runningWorkersLatch.getCount}\n")
364371
sb.append("\tqueuedDoersAssistants: ")
365372
val doersAssistantsIterator = queuedDoersAssistants.iterator()
366373
while doersAssistantsIterator.hasNext do {

src/main/scala/providers/assistant/SimpleDoerAssistantProvider.scala

Lines changed: 24 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ package providers.assistant
33

44
import core.{Matrix, MatrixDoer}
55
import providers.ShutdownAble
6-
import providers.assistant.SimpleDoerAssistantProvider.currentAssistant
6+
import providers.assistant.SimpleDoerAssistantProvider.{AssistantImpl, currentAssistant}
77
import providers.doer.AssistantBasedDoerProvider.DoerAssistantProvider
88

99
import readren.taskflow.Doer
@@ -13,26 +13,13 @@ import java.util.concurrent.atomic.AtomicInteger
1313

1414
object SimpleDoerAssistantProvider {
1515
private val currentAssistant: ThreadLocal[Doer.Assistant] = new ThreadLocal
16-
}
17-
18-
/** A [[Doer.Assistant]] provider */
19-
class SimpleDoerAssistantProvider(
20-
threadPoolSize: Int = Runtime.getRuntime.availableProcessors(),
21-
failureReporter: Throwable => Unit = _.printStackTrace(),
22-
threadFactory: ThreadFactory = Executors.defaultThreadFactory(),
23-
queueFactory: () => BlockingQueue[Runnable] = () => new LinkedBlockingQueue[Runnable]()
24-
) extends DoerAssistantProvider, ShutdownAble { thisProvider =>
25-
override type ProvidedAssistant = Doer.Assistant
26-
27-
private val switcher = new AtomicInteger(0)
28-
29-
private val assistants: IArray[AssistantImpl] = IArray.tabulate(threadPoolSize) { index => new AssistantImpl(index) }
30-
31-
32-
override def provide(serial: MatrixDoer.Id): AssistantImpl =
33-
assistants(switcher.getAndIncrement() % assistants.length)
3416

35-
class AssistantImpl(val index: Int) extends Doer.Assistant { thisAssistant =>
17+
class AssistantImpl(
18+
val index: Int,
19+
failureReporter: Throwable => Unit = _.printStackTrace(),
20+
threadFactory: ThreadFactory = Executors.defaultThreadFactory(),
21+
queueFactory: () => BlockingQueue[Runnable] = () => new LinkedBlockingQueue[Runnable]()
22+
) extends Doer.Assistant { thisAssistant =>
3623

3724
val doSiThEx: ThreadPoolExecutor = {
3825
val tf: ThreadFactory = (r: Runnable) => threadFactory.newThread { () =>
@@ -49,6 +36,23 @@ class SimpleDoerAssistantProvider(
4936

5037
override def reportFailure(cause: Throwable): Unit = failureReporter(cause)
5138
}
39+
}
40+
41+
/** A [[Doer.Assistant]] provider */
42+
class SimpleDoerAssistantProvider(
43+
threadPoolSize: Int = Runtime.getRuntime.availableProcessors(),
44+
failureReporter: Throwable => Unit = _.printStackTrace(),
45+
threadFactory: ThreadFactory = Executors.defaultThreadFactory(),
46+
queueFactory: () => BlockingQueue[Runnable] = () => new LinkedBlockingQueue[Runnable]()
47+
) extends DoerAssistantProvider[SimpleDoerAssistantProvider.AssistantImpl], ShutdownAble { thisProvider =>
48+
49+
private val switcher = new AtomicInteger(0)
50+
51+
private val assistants: IArray[AssistantImpl] = IArray.tabulate(threadPoolSize) { index => new AssistantImpl(index, failureReporter, threadFactory, queueFactory) }
52+
53+
54+
override def provide(serial: MatrixDoer.Id): AssistantImpl =
55+
assistants(switcher.getAndIncrement() % assistants.length)
5256

5357
override def shutdown(): Unit = {
5458
for assistant <- assistants do {

src/main/scala/providers/doer/AssistantBasedDoerProvider.scala

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,9 @@ object AssistantBasedDoerProvider {
1515
* Defines how assistant providers must expose their methods to be adapted by [[AssistantBasedDoerProvider]].
1616
*
1717
* Implementations of this trait provide assistants that serve as parameters to construct [[MatrixDoer]] instances.
18+
* @tparam A The type of assistant provided. Must extend [[Doer.Assistant]].
1819
*/
19-
trait DoerAssistantProvider {
20-
21-
/** The type of assistant provided. Must extend [[Doer.Assistant]]. */
22-
type ProvidedAssistant <: Doer.Assistant
20+
trait DoerAssistantProvider[+A <: Doer.Assistant] {
2321

2422
/**
2523
* Supplies a [[Doer.Assistant]] to be used in constructing a [[MatrixDoer]].
@@ -30,7 +28,7 @@ object AssistantBasedDoerProvider {
3028
* such as debugging or tracking, but this is optional and not required for the assistant's functionality.
3129
* The method may return the same [[ProvidedAssistant]] instance for different calls.
3230
*/
33-
def provide(serial: MatrixDoer.Id): ProvidedAssistant
31+
def provide(serial: MatrixDoer.Id): A
3432
}
3533
}
3634

@@ -41,10 +39,10 @@ object AssistantBasedDoerProvider {
4139
* allowing the use of assistants provided by the former to create [[MatrixDoer]] instances. It also delegates
4240
* lifecycle management operations (e.g., shutdown) to the underlying assistant provider.
4341
*/
44-
abstract class AssistantBasedDoerProvider[MD <: MatrixDoer] extends DoerProvider[MD], ShutdownAble {
42+
abstract class AssistantBasedDoerProvider[+MD <: MatrixDoer, +A <: Doer.Assistant] extends DoerProvider[MD], ShutdownAble {
4543

4644
/** The underlying assistant provider that this adapter wraps. */
47-
protected val assistantProvider: DoerAssistantProvider & ShutdownAble
45+
protected val assistantProvider: DoerAssistantProvider[A] & ShutdownAble
4846

4947
/**
5048
* Creates a [[MatrixDoer]] using an assistant provided by the underlying [[DoerAssistantProvider]].

0 commit comments

Comments
 (0)