@@ -50,6 +50,38 @@ private[concurrent] object BatchingExecutorStatics {
50
50
* on the outer task completing.
51
51
* This executor may run tasks in any order, including LIFO order.
52
52
* There are no ordering guarantees.
53
+ *
54
+ * WARNING: Only use *EITHER* `submitAsyncBatched` OR `submitSyncBatched`!!
55
+ *
56
+ * When you implement this trait for async executors like thread pools,
57
+ * you're going to need to implement it something like the following:
58
+ *
59
+ * {{{
60
+ * final override def submitAsync(runnable: Runnable): Unit =
61
+ * super[SuperClass].execute(runnable) // To prevent reentrancy into `execute`
62
+ *
63
+ * final override def execute(runnable: Runnable): Unit =
64
+ * if (runnable.isInstanceOf[Batchable]) // Or other logic
65
+ * submitAsyncBatched(runnable)
66
+ * else
67
+ * submitAsync(runnable)
68
+ *
69
+ * final override def reportFailure(cause: Throwable): Unit = …
70
+ * }}}
71
+ *
72
+ * And if you want to implement if for a sync, trampolining, executor you're
73
+ * going to implement it something like this:
74
+ *
75
+ * {{{
76
+ * final override def submitAsync(runnable: Runnable): Unit = ()
77
+ *
78
+ * final override def execute(runnable: Runnable): Unit =
79
+ * submitSyncBatched(runnable) // You typically will want to batch everything
80
+ *
81
+ * final override def reportFailure(cause: Throwable): Unit =
82
+ * ExecutionContext.defaultReporter(cause) // Or choose something more fitting
83
+ * }}}
84
+ *
53
85
*/
54
86
private [concurrent] trait BatchingExecutor extends Executor {
55
87
private [this ] final val _tasksLocal = new ThreadLocal [AnyRef ]()
@@ -59,33 +91,7 @@ private[concurrent] trait BatchingExecutor extends Executor {
59
91
* In order to conserve allocations, the first element in the batch is stored "unboxed" in
60
92
* the `first` field. Subsequent Runnables are stored in the array called `other`.
61
93
*/
62
- private [this ] final class Batch (private [this ] final val resubmitOnBlock : Boolean ) extends Runnable with BlockContext with (BlockContext => Throwable ) {
63
- private [this ] final var parentBlockContext : BlockContext = BatchingExecutorStatics .MissingParentBlockContext
64
- private [this ] final var first : Runnable = _
65
- private [this ] final var size : Int = _
66
- private [this ] final var other : Array [Runnable ] = BatchingExecutorStatics .emptyBatchArray
67
-
68
- def this (r : Runnable , resubmitOnBlock : Boolean ) = {
69
- this (resubmitOnBlock)
70
- first = r
71
- size = 1
72
- }
73
-
74
- private def this (first : Runnable , other : Array [Runnable ], size : Int , resubmitOnBlock : Boolean ) = {
75
- this (resubmitOnBlock)
76
- this .first = first
77
- this .other = other
78
- this .size = size
79
- }
80
-
81
- private [this ] final def cloneAndClear (): Batch = {
82
- val newBatch = new Batch (first, other, size, resubmitOnBlock)
83
- this .first = null
84
- this .parentBlockContext = BatchingExecutorStatics .MissingParentBlockContext
85
- this .other = BatchingExecutorStatics .emptyBatchArray
86
- this .size = 0
87
- newBatch
88
- }
94
+ private [this ] sealed abstract class AbstractBatch protected (protected final var first : Runnable , protected final var other : Array [Runnable ], protected final var size : Int ) {
89
95
90
96
private [this ] final def ensureCapacity (curSize : Int ): Array [Runnable ] = {
91
97
val curOther = this .other
@@ -111,61 +117,63 @@ private[concurrent] trait BatchingExecutor extends Executor {
111
117
this .size = sz + 1
112
118
}
113
119
114
- private [ this ] final def runNext (): Boolean =
120
+ @ tailrec protected final def runAll (): Unit = // TODO: Impose max limit of number of items (fairness)
115
121
(this .size: @ switch) match {
116
- case 0 => false
122
+ case 0 =>
117
123
case 1 =>
118
124
val next = this .first
119
125
this .first = null
120
126
this .size = 0
121
127
next.run()
122
- this .size > 0 // Could have changed during next.run ()
128
+ runAll ()
123
129
case sz =>
124
130
val o = this .other
125
131
val next = o(sz - 2 )
126
132
o(sz - 2 ) = null
127
133
this .size = sz - 1 // Important to update prior to `r.run()`
128
134
next.run()
129
- this .size > 0 // Could have changed during next.run ()
135
+ runAll ()
130
136
}
131
137
132
- // This method runs in the delegate ExecutionContext's thread
138
+ protected final def runUntilFailureOrDone (): Throwable =
139
+ try {
140
+ runAll()
141
+ null
142
+ } catch {
143
+ case t : Throwable => t
144
+ }
145
+ }
146
+
147
+ private [this ] final class AsyncBatch private (_first : Runnable , _other : Array [Runnable ], _size : Int ) extends AbstractBatch (_first, _other, _size) with Runnable with BlockContext with (BlockContext => Throwable ) {
148
+ private [this ] final var parentBlockContext : BlockContext = BatchingExecutorStatics .MissingParentBlockContext
149
+
150
+ final def this (runnable : Runnable ) = this (runnable, BatchingExecutorStatics .emptyBatchArray, 1 )
151
+
133
152
override final def run (): Unit = {
134
- _tasksLocal.set(this )
153
+ _tasksLocal.set(this ) // This is later cleared in `apply` or `runWithoutResubmit`
135
154
136
- val failure = // Only install the block context if we can resubmit on blocking
137
- if (resubmitOnBlock) BlockContext .usingBlockContext(this )(this )
138
- else runWithoutResubmit(runUntilFailureOrDone())
155
+ val f = resubmit(BlockContext .usingBlockContext(this )(this ))
139
156
140
- _tasksLocal.set(BatchingExecutorStatics .marker)
141
- if (failure != null )
142
- throw handleRunFailure(failure)
157
+ if (f != null )
158
+ throw f
143
159
}
144
160
161
+ /* LOGIC FOR ASYNCHRONOUS BATCHES */
145
162
override final def apply (prevBlockContext : BlockContext ): Throwable = {
146
163
parentBlockContext = prevBlockContext
147
164
val failure = runUntilFailureOrDone()
148
165
parentBlockContext = BatchingExecutorStatics .MissingParentBlockContext
166
+ _tasksLocal.remove()
149
167
failure
150
168
}
151
169
152
- @ tailrec private [this ] final def runWithoutResubmit (failure : Throwable ): Throwable =
153
- if (failure != null && (failure.isInstanceOf [InterruptedException ] || NonFatal (failure))) {
154
- reportFailure(failure)
155
- runWithoutResubmit(runUntilFailureOrDone())
156
- } else failure
157
-
158
- private [this ] final def runUntilFailureOrDone (): Throwable =
159
- try {
160
- while (runNext()) {}
161
-
162
- null
163
- } catch {
164
- case t : Throwable => t
165
- }
166
-
167
- private [this ] final def handleRunFailure (cause : Throwable ): Throwable =
168
- if (resubmitOnBlock && size > 0 ) {
170
+ /* Attempts to resubmit this Batch to the underlying ExecutionContext,
171
+ * this only happens for Batches where `resubmitOnBlock` is `true`.
172
+ * Only attempt to resubmit when there are `Runnables` left to process.
173
+ * Note that `cause` can be `null`.
174
+ */
175
+ private [this ] final def resubmit (cause : Throwable ): Throwable =
176
+ if (this .size > 0 ) {
169
177
try { submitAsync(this ); cause } catch {
170
178
case inner : Throwable =>
171
179
if (NonFatal (inner)) {
@@ -174,50 +182,70 @@ private[concurrent] trait BatchingExecutor extends Executor {
174
182
e
175
183
} else inner
176
184
}
177
- } else cause
185
+ } else cause // TODO: consider if NonFatals should simply be `reportFailure`:ed rather than rethrown
178
186
179
- override def blockOn [T ](thunk : => T )(implicit permission : CanAwait ): T = {
187
+ private [this ] final def cloneAndClear (): AsyncBatch = {
188
+ val newBatch = new AsyncBatch (first, other, size)
189
+ this .first = null
190
+ this .parentBlockContext = BatchingExecutorStatics .MissingParentBlockContext
191
+ this .other = BatchingExecutorStatics .emptyBatchArray
192
+ this .size = 0
193
+ newBatch
194
+ }
195
+
196
+ override final def blockOn [T ](thunk : => T )(implicit permission : CanAwait ): T = {
180
197
val pbc = parentBlockContext // Store this for later since `cloneAndClear()` will reset it
181
198
182
- if (size > 0 ) // If we know there will be blocking, we don't want to keep tasks queued up because it could deadlock.
199
+ if (this . size > 0 ) // If we know there will be blocking, we don't want to keep tasks queued up because it could deadlock.
183
200
submitAsync(cloneAndClear()) // If this throws then we have bigger problems
184
201
185
202
pbc.blockOn(thunk) // Now delegate the blocking to the previous BC
186
203
}
187
204
}
188
205
189
- /** Schedules the `runnable` to be executed—will only be used if `isAsync` returns `true`.
190
- */
191
- protected def submitAsync (runnable : Runnable ): Unit
206
+ private [this ] final class SyncBatch (runnable : Runnable ) extends AbstractBatch (runnable, BatchingExecutorStatics .emptyBatchArray, 1 ) with Runnable {
207
+ @ tailrec private [this ] final def runWithoutResubmit (failure : Throwable ): Throwable =
208
+ if (failure != null && (failure.isInstanceOf [InterruptedException ] || NonFatal (failure))) {
209
+ reportFailure(failure)
210
+ runWithoutResubmit(runUntilFailureOrDone())
211
+ } else {
212
+ _tasksLocal.set(BatchingExecutorStatics .marker)
213
+ failure
214
+ }
192
215
193
- /** Returns whether this `Executor` runs on the calling thread or if it `submitAsync` will execute its `Runnable`:s asynchronously.
194
- */
195
- protected def isAsync : Boolean = true
216
+ override final def run (): Unit = {
217
+ _tasksLocal.set(this ) // This is later cleared in `runWithoutResubmit`
196
218
197
- /** Must return `false` when `runnable` is `null`
219
+ val f = runWithoutResubmit(runUntilFailureOrDone())
220
+
221
+ if (f != null )
222
+ throw f
223
+ }
224
+ }
225
+
226
+ /** SHOULD throw a NullPointerException when `runnable` is null
198
227
*/
199
- protected def batchable (runnable : Runnable ): Boolean = runnable. isInstanceOf [ Batchable ]
228
+ protected def submitAsync (runnable : Runnable ): Unit
200
229
201
230
/** Reports that an asynchronous computation failed.
231
+ * See `ExecutionContext.reportFailure(throwable: Throwable)`
202
232
*/
203
233
protected def reportFailure (throwable : Throwable ): Unit
204
234
205
- override final def execute (runnable : Runnable ): Unit = {
235
+ protected final def submitAsyncBatched (runnable : Runnable ): Unit = {
236
+ val b = _tasksLocal.get
237
+ if (b.isInstanceOf [AsyncBatch ]) b.asInstanceOf [AsyncBatch ].push(runnable)
238
+ else submitAsync(new AsyncBatch (runnable))
239
+ }
240
+
241
+ protected final def submitSyncBatched (runnable : Runnable ): Unit = {
206
242
Objects .requireNonNull(runnable, " runnable is null" )
207
- if (isAsync) {
208
- if (batchable(runnable)) {
209
- val b = _tasksLocal.get
210
- if (b.isInstanceOf [Batch ]) b.asInstanceOf [Batch ].push(runnable)
211
- else submitAsync(new Batch (runnable, resubmitOnBlock = true ))
212
- } else submitAsync(runnable)
213
- } else {
214
- val b = _tasksLocal.get
215
- if (b.isInstanceOf [Batch ]) b.asInstanceOf [Batch ].push(runnable)
216
- else if (b == null ) { // If there is null in _tasksLocal, set a marker and run, inflate the Batch only if needed
217
- _tasksLocal.set(BatchingExecutorStatics .marker) // Set a marker to indicate that we are submitting synchronously
218
- runnable.run() // If we observe a non-null task which isn't a batch here, then allocate a batch
219
- _tasksLocal.remove() // Since we are executing synchronously, we can clear this at the end of execution
220
- } else new Batch (runnable, resubmitOnBlock = false ).run()
221
- }
243
+ val b = _tasksLocal.get
244
+ if (b.isInstanceOf [SyncBatch ]) b.asInstanceOf [SyncBatch ].push(runnable)
245
+ else if (b == null ) { // If there is null in _tasksLocal, set a marker and run, inflate the Batch only if needed
246
+ _tasksLocal.set(BatchingExecutorStatics .marker) // Set a marker to indicate that we are submitting synchronously
247
+ runnable.run() // If we observe a non-null task which isn't a batch here, then allocate a batch
248
+ _tasksLocal.remove() // Since we are executing synchronously, we can clear this at the end of execution
249
+ } else new SyncBatch (runnable).run()
222
250
}
223
251
}
0 commit comments