@@ -16,6 +16,11 @@ internal const val MASK = BUFFER_CAPACITY - 1 // 128 by default
16
16
internal const val TASK_STOLEN = - 1L
17
17
internal const val NOTHING_TO_STEAL = - 2L
18
18
19
+ internal typealias StealingMode = Int
20
+ internal const val STEAL_ANY : StealingMode = - 1
21
+ internal const val STEAL_CPU_ONLY : StealingMode = 0
22
+ internal const val STEAL_BLOCKING_ONLY : StealingMode = 1
23
+
19
24
/* *
20
25
* Tightly coupled with [CoroutineScheduler] queue of pending tasks, but extracted to separate file for simplicity.
21
26
* At any moment queue is used only by [CoroutineScheduler.Worker] threads, has only one producer (worker owning this queue)
@@ -107,25 +112,37 @@ internal class WorkQueue {
107
112
*
108
113
* Returns [NOTHING_TO_STEAL] if queue has nothing to steal, [TASK_STOLEN] if at least task was stolen
109
114
* or positive value of how many nanoseconds should pass until the head of this queue will be available to steal.
115
+ *
116
+ * [StealingMode] controls what tasks to steal:
117
+ * * [STEAL_ANY] is default mode for scheduler, task from the head (in FIFO order) is stolen
118
+ * * [STEAL_BLOCKING_ONLY] is mode for stealing *an arbitrary* blocking task which is used by scheduler when helping in Dispatchers.IO mode
119
+ * * [STEAL_CPU_ONLY] is a kludge for `runSingleTaskFromCurrentSystemDispatcher`
110
120
*/
111
- fun trySteal (stolenTaskRef : ObjectRef <Task ?>): Long {
112
- val task = pollBuffer()
121
+ fun trySteal (stealingMode : StealingMode , stolenTaskRef : ObjectRef <Task ?>): Long {
122
+ val task = when (stealingMode) {
123
+ STEAL_ANY -> pollBuffer()
124
+ else -> stealWithExclusiveMode(stealingMode)
125
+ }
126
+
113
127
if (task != null ) {
114
128
stolenTaskRef.element = task
115
129
return TASK_STOLEN
116
130
}
117
- return tryStealLastScheduled(stolenTaskRef, blockingOnly = false )
131
+ return tryStealLastScheduled(stealingMode, stolenTaskRef )
118
132
}
119
133
120
- fun tryStealBlocking (stolenTaskRef : ObjectRef <Task ?>): Long {
134
+ // Steal only tasks of a particular kind, potentially invoking full queue scan
135
+ private fun stealWithExclusiveMode (stealingMode : StealingMode ): Task ? {
121
136
var start = consumerIndex.value
122
137
val end = producerIndex.value
123
-
124
- while (start != end && blockingTasksInBuffer.value > 0 ) {
125
- stolenTaskRef.element = tryExtractBlockingTask(start++ ) ? : continue
126
- return TASK_STOLEN
138
+ val onlyBlocking = stealingMode == STEAL_BLOCKING_ONLY
139
+ // CPU or (BLOCKING & hasBlocking)
140
+ val shouldProceed = ! onlyBlocking || blockingTasksInBuffer.value > 0
141
+ while (start != end && shouldProceed) {
142
+ return tryExtractFromTheMiddle(start++ , onlyBlocking) ? : continue
127
143
}
128
- return tryStealLastScheduled(stolenTaskRef, blockingOnly = true )
144
+
145
+ return null
129
146
}
130
147
131
148
// Polls for blocking task, invoked only by the owner
@@ -138,23 +155,41 @@ internal class WorkQueue {
138
155
} // Failed -> someone else stole it
139
156
}
140
157
158
+ return pollWithMode(onlyBlocking = true /* only blocking */ )
159
+ }
160
+
161
+ fun pollCpu (): Task ? {
162
+ while (true ) { // Poll the slot
163
+ val lastScheduled = lastScheduledTask.value ? : break
164
+ if (lastScheduled.isBlocking) break
165
+ if (lastScheduledTask.compareAndSet(lastScheduled, null )) {
166
+ return lastScheduled
167
+ } // Failed -> someone else stole it
168
+ }
169
+
170
+ return pollWithMode(onlyBlocking = false /* only cpu */ )
171
+ }
172
+
173
+ private fun pollWithMode (/* Only blocking OR only CPU */ onlyBlocking : Boolean ): Task ? {
141
174
val start = consumerIndex.value
142
175
var end = producerIndex.value
143
-
144
- while (start != end && blockingTasksInBuffer.value > 0 ) {
145
- val task = tryExtractBlockingTask(-- end)
176
+ // CPU or (BLOCKING & hasBlocking)
177
+ val shouldProceed = ! onlyBlocking || blockingTasksInBuffer.value > 0
178
+ while (start != end && shouldProceed) {
179
+ val task = tryExtractFromTheMiddle(-- end, onlyBlocking)
146
180
if (task != null ) {
147
181
return task
148
182
}
149
183
}
150
184
return null
151
185
}
152
186
153
- private fun tryExtractBlockingTask (index : Int ): Task ? {
187
+ private fun tryExtractFromTheMiddle (index : Int , onlyBlocking : Boolean ): Task ? {
188
+ if (onlyBlocking && blockingTasksInBuffer.value == 0 ) return null
154
189
val arrayIndex = index and MASK
155
190
val value = buffer[arrayIndex]
156
- if (value != null && value.isBlocking && buffer.compareAndSet(arrayIndex, value, null )) {
157
- blockingTasksInBuffer.decrementAndGet()
191
+ if (value != null && value.isBlocking == onlyBlocking && buffer.compareAndSet(arrayIndex, value, null )) {
192
+ if (onlyBlocking) blockingTasksInBuffer.decrementAndGet()
158
193
return value
159
194
}
160
195
return null
@@ -170,10 +205,16 @@ internal class WorkQueue {
170
205
/* *
171
206
* Contract on return value is the same as for [trySteal]
172
207
*/
173
- private fun tryStealLastScheduled (stolenTaskRef : ObjectRef <Task ?>, blockingOnly : Boolean ): Long {
208
+ private fun tryStealLastScheduled (stealingMode : StealingMode , stolenTaskRef : ObjectRef <Task ?>): Long {
174
209
while (true ) {
175
210
val lastScheduled = lastScheduledTask.value ? : return NOTHING_TO_STEAL
176
- if (blockingOnly && ! lastScheduled.isBlocking) return NOTHING_TO_STEAL
211
+ if (lastScheduled.isBlocking) {
212
+ if (stealingMode == STEAL_CPU_ONLY ) {
213
+ return NOTHING_TO_STEAL
214
+ }
215
+ } else if (stealingMode == STEAL_BLOCKING_ONLY ) {
216
+ return NOTHING_TO_STEAL
217
+ }
177
218
178
219
// TODO time wraparound ?
179
220
val time = schedulerTimeSource.nanoTime()
0 commit comments