@@ -23,9 +23,11 @@ internal class LimitedDispatcher(
23
23
24
24
private val queue = LockFreeTaskQueue <Runnable >(singleConsumer = false )
25
25
26
- @InternalCoroutinesApi
27
- override fun dispatchYield (context : CoroutineContext , block : Runnable ) {
28
- dispatcher.dispatchYield(context, block)
26
+ @ExperimentalCoroutinesApi
27
+ override fun limitedParallelism (parallelism : Int ): CoroutineDispatcher {
28
+ parallelism.checkParallelism()
29
+ if (parallelism >= this .parallelism) return this
30
+ return super .limitedParallelism(parallelism)
29
31
}
30
32
31
33
override fun run () {
@@ -59,25 +61,47 @@ internal class LimitedDispatcher(
59
61
}
60
62
61
63
override fun dispatch (context : CoroutineContext , block : Runnable ) {
62
- // Add task to queue so running workers will be able to see that
63
- queue.addLast(block)
64
- if (runningWorkers >= parallelism) {
65
- return
64
+ dispatchInternal(block) {
65
+ if (dispatcher.isDispatchNeeded(EmptyCoroutineContext )) {
66
+ dispatcher.dispatch(EmptyCoroutineContext , this )
67
+ } else {
68
+ run ()
69
+ }
70
+ }
71
+ }
72
+
73
+ @InternalCoroutinesApi
74
+ override fun dispatchYield (context : CoroutineContext , block : Runnable ) {
75
+ dispatchInternal(block) {
76
+ dispatcher.dispatchYield(context, this )
66
77
}
78
+ }
67
79
80
+ private inline fun dispatchInternal (block : Runnable , dispatch : () -> Unit ) {
81
+ // Add task to queue so running workers will be able to see that
82
+ if (tryAdd(block)) return
68
83
/*
69
- * Protect against race when the worker is finished right after our check.
84
+ * Protect against the race when the number of workers is enough,
85
+ * but one (because of synchronized serialization) attempts to complete,
86
+ * and we just observed the number of running workers smaller than the actual
87
+ * number (hit right between `--runningWorkers` and `++runningWorkers` in `run()`)
70
88
*/
89
+ if (enoughWorkers()) return
90
+ dispatch()
91
+ }
92
+
93
+ private fun enoughWorkers (): Boolean {
71
94
@Suppress(" CAST_NEVER_SUCCEEDS" )
72
95
synchronized(this as SynchronizedObject ) {
73
- if (runningWorkers >= parallelism) return
96
+ if (runningWorkers >= parallelism) return true
74
97
++ runningWorkers
98
+ return false
75
99
}
76
- if (dispatcher.isDispatchNeeded( EmptyCoroutineContext )) {
77
- dispatcher.dispatch( EmptyCoroutineContext , this )
78
- } else {
79
- run ( )
80
- }
100
+ }
101
+
102
+ private fun tryAdd ( block : Runnable ): Boolean {
103
+ queue.addLast(block )
104
+ return runningWorkers >= parallelism
81
105
}
82
106
}
83
107
0 commit comments