12
12
13
13
package scala .concurrent
14
14
15
- import java .util .ArrayDeque
16
15
import java .util .concurrent .Executor
17
- import scala . annotation .{ switch , tailrec }
16
+ import java . util . Objects
18
17
import scala .util .control .NonFatal
18
+ import scala .annotation .{switch , tailrec }
19
19
20
20
/**
21
21
* Marker trait to indicate that a Runnable is Batchable by BatchingExecutors
@@ -26,6 +26,17 @@ trait Batchable {
26
26
27
27
private [concurrent] object BatchingExecutorStatics {
28
28
final val emptyBatchArray : Array [Runnable ] = new Array [Runnable ](0 )
29
+
30
+ // Max number of Runnables executed nested before starting to batch (to prevent stack exhaustion)
31
+ final val syncPreBatchDepth = 16
32
+
33
+ // Max number of Runnables processed in one go (to prevent starvation of other tasks on the pool)
34
+ final val runLimit = 1024
35
+
36
+ final object MissingParentBlockContext extends BlockContext {
37
+ override def blockOn [T ](thunk : => T )(implicit permission : CanAwait ): T =
38
+ try thunk finally throw new IllegalStateException (" BUG in BatchingExecutor.Batch: parentBlockContext is null" )
39
+ }
29
40
}
30
41
31
42
/**
@@ -38,154 +49,219 @@ private[concurrent] object BatchingExecutorStatics {
38
49
* thread which may improve CPU affinity. However,
39
50
* if tasks passed to the Executor are blocking
40
51
* or expensive, this optimization can prevent work-stealing
41
- * and make performance worse. Also, some ExecutionContext
42
- * may be fast enough natively that this optimization just
43
- * adds overhead.
44
- * The default ExecutionContext.global is already batching
45
- * or fast enough not to benefit from it; while
46
- * `fromExecutor` and `fromExecutorService` do NOT add
47
- * this optimization since they don't know whether the underlying
48
- * executor will benefit from it.
52
+ * and make performance worse.
49
53
* A batching executor can create deadlocks if code does
50
54
* not use `scala.concurrent.blocking` when it should,
51
55
* because tasks created within other tasks will block
52
56
* on the outer task completing.
53
57
* This executor may run tasks in any order, including LIFO order.
54
58
* There are no ordering guarantees.
55
59
*
56
- * WARNING: The underlying Executor's execute-method must not execute the submitted Runnable
57
- * in the calling thread synchronously. It must enqueue/handoff the Runnable.
60
+ * WARNING: Only use *EITHER* `submitAsyncBatched` OR `submitSyncBatched`!!
61
+ *
62
+ * When you implement this trait for async executors like thread pools,
63
+ * you're going to need to implement it something like the following:
64
+ *
65
+ * {{{
66
+ * final override def submitAsync(runnable: Runnable): Unit =
67
+ * super[SuperClass].execute(runnable) // To prevent reentrancy into `execute`
68
+ *
69
+ * final override def execute(runnable: Runnable): Unit =
70
+ * if (runnable.isInstanceOf[Batchable]) // Or other logic
71
+ * submitAsyncBatched(runnable)
72
+ * else
73
+ * submitAsync(runnable)
74
+ *
75
+ * final override def reportFailure(cause: Throwable): Unit = …
76
+ * }}}
77
+ *
78
+ * And if you want to implement if for a sync, trampolining, executor you're
79
+ * going to implement it something like this:
80
+ *
81
+ * {{{
82
+ * final override def submitAsync(runnable: Runnable): Unit = ()
83
+ *
84
+ * final override def execute(runnable: Runnable): Unit =
85
+ * submitSyncBatched(runnable) // You typically will want to batch everything
86
+ *
87
+ * final override def reportFailure(cause: Throwable): Unit =
88
+ * ExecutionContext.defaultReporter(cause) // Or choose something more fitting
89
+ * }}}
90
+ *
58
91
*/
59
- private [concurrent] trait BatchingExecutor extends Executor {
60
- private [this ] final val _tasksLocal = new ThreadLocal [Batch ]()
61
-
62
- private [this ] final class Batch extends Runnable with BlockContext with (BlockContext => Throwable ) {
63
- private [this ] final var parentBlockContext : BlockContext = _
64
- private [this ] final var first : Runnable = _
65
- private [this ] final var size : Int = _
66
- private [this ] final var other : Array [Runnable ] = BatchingExecutorStatics .emptyBatchArray
67
-
68
- def this (r : Runnable ) = {
69
- this ()
70
- first = r
71
- size = 1
72
- }
92
+ private [concurrent] trait BatchingExecutor extends Executor {
93
+ private [this ] final val _tasksLocal = new ThreadLocal [AnyRef ]()
73
94
74
- private def this ( first : Runnable , other : Array [ Runnable ], size : Int ) = {
75
- this ()
76
- this .first = first
77
- this .other = other
78
- this .size = size
79
- }
95
+ /*
96
+ * Batch implements a LIFO queue (stack) and is used as a trampolining Runnable.
97
+ * In order to conserve allocations, the first element in the batch is stored "unboxed" in
98
+ * the `first` field. Subsequent Runnables are stored in the array called ` other`.
99
+ */
100
+ private [ this ] sealed abstract class AbstractBatch protected ( protected final var first : Runnable , protected final var other : Array [ Runnable ], protected final var size : Int ) {
80
101
81
- private [this ] final def cloneAndClear (): Batch = {
82
- val newBatch = new Batch (first, other, size)
83
- this .first = null
84
- this .other = BatchingExecutorStatics .emptyBatchArray
85
- this .size = 0
86
- newBatch
87
- }
102
+ private [this ] final def ensureCapacity (curSize : Int ): Array [Runnable ] = {
103
+ val curOther = this .other
104
+ val curLen = curOther.length
105
+ if (curSize <= curLen) curOther
106
+ else {
107
+ val newLen = if (curLen == 0 ) 4 else curLen << 1
88
108
89
- private [this ] final def grow (): Unit = {
90
- val len = other.length
91
- other =
92
- if (len == 0 ) new Array [Runnable ](4 )
93
- else {
94
- val newOther = new Array [Runnable ](len << 1 )
95
- System .arraycopy(other, 0 , newOther, 0 , len)
96
- newOther
97
- }
109
+ if (newLen <= curLen) throw new StackOverflowError (" Space limit of asynchronous stack reached: " + curLen)
110
+ val newOther = new Array [Runnable ](newLen)
111
+ System .arraycopy(curOther, 0 , newOther, 0 , curLen)
112
+ this .other = newOther
113
+ newOther
114
+ }
98
115
}
99
116
100
117
final def push (r : Runnable ): Unit = {
101
- val sz = size
102
- if (sz > 0 ) {
103
- if (sz > other.length)
104
- grow()
105
- other(sz - 1 ) = r
106
- } else first = r
107
- size = sz + 1
118
+ val sz = this .size
119
+ if (sz == 0 )
120
+ this .first = r
121
+ else
122
+ ensureCapacity(sz)(sz - 1 ) = r
123
+ this .size = sz + 1
108
124
}
109
125
110
- final def pop (): Runnable =
111
- (size : @ switch) match {
112
- case 0 => null
113
- case 1 =>
114
- val ret = first
115
- first = null
116
- size = 0
117
- ret
118
- case n =>
119
- val ret = other(n - 2 )
120
- other(n - 2 ) = null
121
- size = n - 1
122
- ret
123
- }
126
+ @ tailrec protected final def runN (n : Int ): Unit =
127
+ if (n > 0 )
128
+ (this .size: @ switch) match {
129
+ case 0 =>
130
+ case 1 =>
131
+ val next = this .first
132
+ this .first = null
133
+ this .size = 0
134
+ next.run()
135
+ runN(n - 1 )
136
+ case sz =>
137
+ val o = this .other
138
+ val next = o(sz - 2 )
139
+ o(sz - 2 ) = null
140
+ this .size = sz - 1
141
+ next.run()
142
+ runN(n - 1 )
143
+ }
144
+ }
145
+
146
+ private [this ] final class AsyncBatch private (_first : Runnable , _other : Array [Runnable ], _size : Int ) extends AbstractBatch (_first, _other, _size) with Runnable with BlockContext with (BlockContext => Throwable ) {
147
+ private [this ] final var parentBlockContext : BlockContext = BatchingExecutorStatics .MissingParentBlockContext
148
+
149
+ final def this (runnable : Runnable ) = this (runnable, BatchingExecutorStatics .emptyBatchArray, 1 )
124
150
125
- // this method runs in the delegate ExecutionContext's thread
126
151
override final def run (): Unit = {
127
- // This invariant needs to hold: require(_tasksLocal.get eq null)
128
- _tasksLocal.set( this )
129
- val failure = BlockContext .usingBlockContext(this )(this )
130
- _tasksLocal.remove()
131
- if (failure ne null )
132
- throw handleRunFailure(failure)
152
+ _tasksLocal.set( this ) // This is later cleared in `apply` or `runWithoutResubmit`
153
+
154
+ val f = resubmit( BlockContext .usingBlockContext(this )(this ) )
155
+
156
+ if (f != null )
157
+ throw f
133
158
}
134
159
135
- override final def apply (prevBlockContext : BlockContext ): Throwable = {
160
+ /* LOGIC FOR ASYNCHRONOUS BATCHES */
161
+ override final def apply (prevBlockContext : BlockContext ): Throwable = try {
136
162
parentBlockContext = prevBlockContext
137
- var failure : Throwable = null
138
- try {
139
- var r = pop()
140
- while (r ne null ) {
141
- r.run()
142
- r = pop()
143
- }
144
- } catch {
145
- case t : Throwable => failure = t
146
- }
147
- parentBlockContext = null
148
- failure
163
+ runN(BatchingExecutorStatics .runLimit)
164
+ null
165
+ } catch {
166
+ case t : Throwable => t // We are handling exceptions on the outside of this method
167
+ } finally {
168
+ parentBlockContext = BatchingExecutorStatics .MissingParentBlockContext
169
+ _tasksLocal.remove()
149
170
}
150
171
151
- private [this ] final def handleRunFailure (cause : Throwable ): Throwable =
152
- if (size > 0 && (NonFatal (cause) || cause.isInstanceOf [InterruptedException ])) {
153
- try { unbatchedExecute(this ); cause } catch {
172
+ /* Attempts to resubmit this Batch to the underlying ExecutionContext,
173
+ * this only happens for Batches where `resubmitOnBlock` is `true`.
174
+ * Only attempt to resubmit when there are `Runnables` left to process.
175
+ * Note that `cause` can be `null`.
176
+ */
177
+ private [this ] final def resubmit (cause : Throwable ): Throwable =
178
+ if (this .size > 0 ) {
179
+ try { submitForExecution(this ); cause } catch {
154
180
case inner : Throwable =>
155
181
if (NonFatal (inner)) {
156
182
val e = new ExecutionException (" Non-fatal error occurred and resubmission failed, see suppressed exception." , cause)
157
183
e.addSuppressed(inner)
158
184
e
159
185
} else inner
160
186
}
161
- } else cause
187
+ } else cause // TODO: consider if NonFatals should simply be `reportFailure`:ed rather than rethrown
162
188
163
- override def blockOn [T ](thunk : => T )(implicit permission : CanAwait ): T = {
164
- val pbc = parentBlockContext
165
- if (size > 0 ) // if we know there will be blocking, we don't want to keep tasks queued up because it could deadlock.
166
- unbatchedExecute(cloneAndClear())
189
+ private [this ] final def cloneAndClear (): AsyncBatch = {
190
+ val newBatch = new AsyncBatch (this .first, this .other, this .size)
191
+ this .first = null
192
+ this .parentBlockContext = BatchingExecutorStatics .MissingParentBlockContext
193
+ this .other = BatchingExecutorStatics .emptyBatchArray
194
+ this .size = 0
195
+ newBatch
196
+ }
167
197
168
- if (pbc ne null ) pbc.blockOn(thunk) // now delegate the blocking to the previous BC
169
- else {
170
- try thunk finally throw new IllegalStateException (" BUG in BatchingExecutor.Batch: parentBlockContext is null" )
198
+ override final def blockOn [T ](thunk : => T )(implicit permission : CanAwait ): T = {
199
+ val pbc = parentBlockContext // Store this for later since `cloneAndClear()` will reset it
200
+
201
+ // If we know there will be blocking, we don't want to keep tasks queued up because it could deadlock.
202
+ if (this .size > 0 )
203
+ submitForExecution(cloneAndClear()) // If this throws then we have bigger problems
204
+
205
+ pbc.blockOn(thunk) // Now delegate the blocking to the previous BC
206
+ }
207
+ }
208
+
209
+ private [this ] final class SyncBatch (runnable : Runnable ) extends AbstractBatch (runnable, BatchingExecutorStatics .emptyBatchArray, 1 ) with Runnable {
210
+ @ tailrec override final def run (): Unit = {
211
+ try runN(BatchingExecutorStatics .runLimit) catch {
212
+ case ie : InterruptedException =>
213
+ reportFailure(ie) // TODO: Handle InterruptedException differently?
214
+ case f if NonFatal (f) =>
215
+ reportFailure(f)
171
216
}
217
+
218
+ if (this .size > 0 )
219
+ run()
172
220
}
173
221
}
174
222
175
- protected def unbatchedExecute (r : Runnable ): Unit
223
+ /** MUST throw a NullPointerException when `runnable` is null
224
+ * When implementing a sync BatchingExecutor, it is RECOMMENDED
225
+ * to implement this method as `runnable.run()`
226
+ */
227
+ protected def submitForExecution (runnable : Runnable ): Unit
228
+
229
+ /** Reports that an asynchronous computation failed.
230
+ * See `ExecutionContext.reportFailure(throwable: Throwable)`
231
+ */
232
+ protected def reportFailure (throwable : Throwable ): Unit
176
233
177
- private [this ] final def batchedExecute (runnable : Runnable ): Unit = {
234
+ /**
235
+ * WARNING: Never use both `submitAsyncBatched` and `submitSyncBatched` in the same
236
+ * implementation of `BatchingExecutor`
237
+ */
238
+ protected final def submitAsyncBatched (runnable : Runnable ): Unit = {
178
239
val b = _tasksLocal.get
179
- if (b ne null ) b.push(runnable)
180
- else unbatchedExecute (new Batch (runnable))
240
+ if (b. isInstanceOf [ AsyncBatch ] ) b. asInstanceOf [ AsyncBatch ] .push(runnable)
241
+ else submitForExecution (new AsyncBatch (runnable))
181
242
}
182
243
183
- override def execute (runnable : Runnable ): Unit =
184
- if (batchable(runnable)) batchedExecute(runnable)
185
- else unbatchedExecute(runnable)
186
-
187
- /** Override this to define which runnables will be batched.
188
- * By default it tests the Runnable for being an instance of [Batchable].
189
- **/
190
- protected def batchable (runnable : Runnable ): Boolean = runnable.isInstanceOf [Batchable ]
244
+ /**
245
+ * WARNING: Never use both `submitAsyncBatched` and `submitSyncBatched` in the same
246
+ * implementation of `BatchingExecutor`
247
+ */
248
+ protected final def submitSyncBatched (runnable : Runnable ): Unit = {
249
+ Objects .requireNonNull(runnable, " runnable is null" )
250
+ val tl = _tasksLocal
251
+ val b = tl.get
252
+ if (b.isInstanceOf [SyncBatch ]) b.asInstanceOf [SyncBatch ].push(runnable)
253
+ else {
254
+ val i = if (b ne null ) b.asInstanceOf [java.lang.Integer ].intValue else 0
255
+ if (i < BatchingExecutorStatics .syncPreBatchDepth) {
256
+ tl.set(java.lang.Integer .valueOf(i + 1 ))
257
+ try submitForExecution(runnable) // User code so needs to be try-finally guarded here
258
+ finally tl.set(b)
259
+ } else {
260
+ val batch = new SyncBatch (runnable)
261
+ tl.set(batch)
262
+ submitForExecution(batch)
263
+ tl.set(b) // Batch only throws fatals so no need for try-finally here
264
+ }
265
+ }
266
+ }
191
267
}
0 commit comments