@@ -206,142 +206,6 @@ trait AdaptiveWorkStealingTasks extends Tasks {
206
206
207
207
}
208
208
209
-
210
- /** An implementation of tasks objects based on the Java thread pooling API. */
211
- @ deprecated(" use `ForkJoinTasks` instead" , " 2.11.0" )
212
- trait ThreadPoolTasks extends Tasks {
213
- import java .util .concurrent ._
214
-
215
- trait WrappedTask [R , + Tp ] extends Runnable with super .WrappedTask [R , Tp ] {
216
- // initially, this is null
217
- // once the task is started, this future is set and used for `sync`
218
- // utb: var future: Future[_] = null
219
- @ volatile var owned = false
220
- @ volatile var completed = false
221
-
222
- def start () = synchronized {
223
- // debuglog("Starting " + body)
224
- // utb: future = executor.submit(this)
225
- executor.synchronized {
226
- incrTasks()
227
- executor.submit(this )
228
- }
229
- }
230
- def sync () = synchronized {
231
- // debuglog("Syncing on " + body)
232
- // utb: future.get()
233
- executor.synchronized {
234
- val coresize = executor.getCorePoolSize
235
- if (coresize < totaltasks) {
236
- executor.setCorePoolSize(coresize + 1 )
237
- // assert(executor.getCorePoolSize == (coresize + 1))
238
- }
239
- }
240
- while (! completed) this .wait
241
- }
242
- def tryCancel () = synchronized {
243
- // utb: future.cancel(false)
244
- if (! owned) {
245
- // debuglog("Cancelling " + body)
246
- owned = true
247
- true
248
- } else false
249
- }
250
- def run () = {
251
- // utb: compute
252
- var isOkToRun = false
253
- synchronized {
254
- if (! owned) {
255
- owned = true
256
- isOkToRun = true
257
- }
258
- }
259
- if (isOkToRun) {
260
- // debuglog("Running body of " + body)
261
- compute()
262
- } else {
263
- // just skip
264
- // debuglog("skipping body of " + body)
265
- }
266
- }
267
- override def release () = synchronized {
268
- // println("releasing: " + this + ", body: " + this.body)
269
- completed = true
270
- executor.synchronized {
271
- decrTasks()
272
- }
273
- this .notifyAll
274
- }
275
- }
276
-
277
- protected def newWrappedTask [R , Tp ](b : Task [R , Tp ]): WrappedTask [R , Tp ]
278
-
279
- val environment : ThreadPoolExecutor
280
- def executor = environment.asInstanceOf [ThreadPoolExecutor ]
281
- def queue = executor.getQueue.asInstanceOf [LinkedBlockingQueue [Runnable ]]
282
- @ volatile var totaltasks = 0
283
-
284
- private def incrTasks () = synchronized {
285
- totaltasks += 1
286
- }
287
-
288
- private def decrTasks () = synchronized {
289
- totaltasks -= 1
290
- }
291
-
292
- def execute [R , Tp ](task : Task [R , Tp ]): () => R = {
293
- val t = newWrappedTask(task)
294
-
295
- // debuglog("-----------> Executing without wait: " + task)
296
- t.start()
297
-
298
- () => {
299
- t.sync()
300
- t.body.forwardThrowable()
301
- t.body.result
302
- }
303
- }
304
-
305
- def executeAndWaitResult [R , Tp ](task : Task [R , Tp ]): R = {
306
- val t = newWrappedTask(task)
307
-
308
- // debuglog("-----------> Executing with wait: " + task)
309
- t.start()
310
-
311
- t.sync()
312
- t.body.forwardThrowable()
313
- t.body.result
314
- }
315
-
316
- def parallelismLevel = ThreadPoolTasks .numCores
317
-
318
- }
319
-
320
- @ deprecated(" use `ForkJoinTasks` instead" , " 2.11.0" )
321
- object ThreadPoolTasks {
322
- import java .util .concurrent ._
323
-
324
- val numCores = Runtime .getRuntime.availableProcessors
325
-
326
- val tcount = new atomic.AtomicLong (0L )
327
-
328
- val defaultThreadPool = new ThreadPoolExecutor (
329
- numCores,
330
- Int .MaxValue ,
331
- 60L , TimeUnit .MILLISECONDS ,
332
- new LinkedBlockingQueue [Runnable ],
333
- new ThreadFactory {
334
- def newThread (r : Runnable ) = {
335
- val t = new Thread (r)
336
- t.setName(" pc-thread-" + tcount.incrementAndGet)
337
- t.setDaemon(true )
338
- t
339
- }
340
- },
341
- new ThreadPoolExecutor .CallerRunsPolicy
342
- )
343
- }
344
-
345
209
object FutureThreadPoolTasks {
346
210
import java .util .concurrent ._
347
211
@@ -445,17 +309,6 @@ trait AdaptiveWorkStealingForkJoinTasks extends ForkJoinTasks with AdaptiveWorkS
445
309
def newWrappedTask [R , Tp ](b : Task [R , Tp ]) = new WrappedTask [R , Tp ](b)
446
310
}
447
311
448
- @ deprecated(" use `AdaptiveWorkStealingForkJoinTasks` instead" , " 2.11.0" )
449
- trait AdaptiveWorkStealingThreadPoolTasks extends ThreadPoolTasks with AdaptiveWorkStealingTasks {
450
-
451
- class WrappedTask [R , Tp ](val body : Task [R , Tp ])
452
- extends super [ThreadPoolTasks ].WrappedTask [R , Tp ] with super [AdaptiveWorkStealingTasks ].WrappedTask [R , Tp ] {
453
- def split = body.split.map(b => newWrappedTask(b))
454
- }
455
-
456
- def newWrappedTask [R , Tp ](b : Task [R , Tp ]) = new WrappedTask [R , Tp ](b)
457
- }
458
-
459
312
/** An implementation of the `Tasks` that uses Scala `Future`s to compute
460
313
* the work encapsulated in each task.
461
314
*/
0 commit comments