Skip to content

Commit abd2209

Browse files
committed
Move victim argument in WorkQieie into receiver position to simplify the overall code structure
1 parent 20daaa7 commit abd2209

File tree

4 files changed

+23
-26
lines changed

4 files changed

+23
-26
lines changed

kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt

+4-4
Original file line numberDiff line numberDiff line change
@@ -265,15 +265,15 @@ internal class CoroutineScheduler(
265265

266266
/**
267267
* Long describing state of workers in this pool.
268-
* Currently includes created, CPU-acquired and blocking workers each occupying [BLOCKING_SHIFT] bits.
268+
* Currently, includes created, CPU-acquired and blocking workers each occupying [BLOCKING_SHIFT] bits.
269269
*/
270270
private val controlState = atomic(corePoolSize.toLong() shl CPU_PERMITS_SHIFT)
271271
private val createdWorkers: Int inline get() = (controlState.value and CREATED_MASK).toInt()
272272
private val availableCpuPermits: Int inline get() = availableCpuPermits(controlState.value)
273273

274274
private inline fun createdWorkers(state: Long): Int = (state and CREATED_MASK).toInt()
275275
private inline fun blockingTasks(state: Long): Int = (state and BLOCKING_MASK shr BLOCKING_SHIFT).toInt()
276-
public inline fun availableCpuPermits(state: Long): Int = (state and CPU_PERMITS_MASK shr CPU_PERMITS_SHIFT).toInt()
276+
inline fun availableCpuPermits(state: Long): Int = (state and CPU_PERMITS_MASK shr CPU_PERMITS_SHIFT).toInt()
277277

278278
// Guarded by synchronization
279279
private inline fun incrementCreatedWorkers(): Int = createdWorkers(controlState.incrementAndGet())
@@ -927,9 +927,9 @@ internal class CoroutineScheduler(
927927
if (worker !== null && worker !== this) {
928928
assert { localQueue.size == 0 }
929929
val stealResult = if (blockingOnly) {
930-
localQueue.tryStealBlockingFrom(victim = worker.localQueue, stolenTask)
930+
worker.localQueue.tryStealBlocking(stolenTask)
931931
} else {
932-
localQueue.tryStealFrom(victim = worker.localQueue, stolenTask)
932+
worker.localQueue.trySteal(stolenTask)
933933
}
934934
if (stealResult == TASK_STOLEN) {
935935
val result = stolenTask.element

kotlinx-coroutines-core/jvm/src/scheduling/WorkQueue.kt

+14-17
Original file line numberDiff line numberDiff line change
@@ -101,40 +101,37 @@ internal class WorkQueue {
101101
}
102102

103103
/**
104-
* Tries stealing from [victim] queue into the [stolenTaskRef] argument.
104+
* Tries stealing from this queue into the [stolenTaskRef] argument.
105105
*
106106
* Returns [NOTHING_TO_STEAL] if queue has nothing to steal, [TASK_STOLEN] if at least task was stolen
107107
* or positive value of how many nanoseconds should pass until the head of this queue will be available to steal.
108108
*/
109-
fun tryStealFrom(victim: WorkQueue, stolenTaskRef: ObjectRef<Task?>): Long {
110-
assert { bufferSize == 0 }
111-
val task = victim.pollBuffer()
109+
fun trySteal(stolenTaskRef: ObjectRef<Task?>): Long {
110+
val task = pollBuffer()
112111
if (task != null) {
113112
stolenTaskRef.element = task
114113
return TASK_STOLEN
115114
}
116-
return tryStealLastScheduled(victim, stolenTaskRef, blockingOnly = false)
115+
return tryStealLastScheduled(stolenTaskRef, blockingOnly = false)
117116
}
118117

119-
fun tryStealBlockingFrom(victim: WorkQueue, stolenTaskRef: ObjectRef<Task?>): Long {
120-
assert { bufferSize == 0 }
121-
var start = victim.consumerIndex.value
122-
val end = victim.producerIndex.value
123-
val buffer = victim.buffer
118+
fun tryStealBlocking(stolenTaskRef: ObjectRef<Task?>): Long {
119+
var start = consumerIndex.value
120+
val end = producerIndex.value
124121

125122
while (start != end) {
126123
val index = start and MASK
127-
if (victim.blockingTasksInBuffer.value == 0) break
124+
if (blockingTasksInBuffer.value == 0) break
128125
val value = buffer[index]
129126
if (value != null && value.isBlocking && buffer.compareAndSet(index, value, null)) {
130-
victim.blockingTasksInBuffer.decrementAndGet()
127+
blockingTasksInBuffer.decrementAndGet()
131128
stolenTaskRef.element = value
132129
return TASK_STOLEN
133130
} else {
134131
++start
135132
}
136133
}
137-
return tryStealLastScheduled(victim, stolenTaskRef, blockingOnly = true)
134+
return tryStealLastScheduled(stolenTaskRef, blockingOnly = true)
138135
}
139136

140137
fun offloadAllWorkTo(globalQueue: GlobalQueue) {
@@ -145,11 +142,11 @@ internal class WorkQueue {
145142
}
146143

147144
/**
148-
* Contract on return value is the same as for [tryStealFrom]
145+
* Contract on return value is the same as for [trySteal]
149146
*/
150-
private fun tryStealLastScheduled(victim: WorkQueue, stolenTaskRef: ObjectRef<Task?>, blockingOnly: Boolean): Long {
147+
private fun tryStealLastScheduled(stolenTaskRef: ObjectRef<Task?>, blockingOnly: Boolean): Long {
151148
while (true) {
152-
val lastScheduled = victim.lastScheduledTask.value ?: return NOTHING_TO_STEAL
149+
val lastScheduled = lastScheduledTask.value ?: return NOTHING_TO_STEAL
153150
if (blockingOnly && !lastScheduled.isBlocking) return NOTHING_TO_STEAL
154151

155152
// TODO time wraparound ?
@@ -163,7 +160,7 @@ internal class WorkQueue {
163160
* If CAS has failed, either someone else had stolen this task or the owner executed this task
164161
* and dispatched another one. In the latter case we should retry to avoid missing task.
165162
*/
166-
if (victim.lastScheduledTask.compareAndSet(lastScheduled, null)) {
163+
if (lastScheduledTask.compareAndSet(lastScheduled, null)) {
167164
stolenTaskRef.element = lastScheduled
168165
return TASK_STOLEN
169166
}

kotlinx-coroutines-core/jvm/test/scheduling/WorkQueueStressTest.kt

+3-3
Original file line numberDiff line numberDiff line change
@@ -58,12 +58,12 @@ class WorkQueueStressTest : TestBase() {
5858
startLatch.await()
5959
while (!producerFinished || producerQueue.size != 0) {
6060
stolenTasks[i].addAll(myQueue.drain(ref).map { task(it) })
61-
myQueue.tryStealFrom(victim = producerQueue, ref)
61+
producerQueue.trySteal(ref)
6262
}
6363

6464
// Drain last element which is not counted in buffer
6565
stolenTasks[i].addAll(myQueue.drain(ref).map { task(it) })
66-
myQueue.tryStealFrom(producerQueue, ref)
66+
producerQueue.trySteal(ref)
6767
stolenTasks[i].addAll(myQueue.drain(ref).map { task(it) })
6868
}
6969
}
@@ -94,7 +94,7 @@ class WorkQueueStressTest : TestBase() {
9494
val ref = Ref.ObjectRef<Task?>()
9595
startLatch.await()
9696
while (stolen.size != offerIterations) {
97-
if (myQueue.tryStealFrom(producerQueue, ref) != NOTHING_TO_STEAL) {
97+
if (producerQueue.trySteal(ref) != NOTHING_TO_STEAL) {
9898
stolen.addAll(myQueue.drain(ref).map { task(it) })
9999
}
100100
}

kotlinx-coroutines-core/jvm/test/scheduling/WorkQueueTest.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -63,10 +63,10 @@ class WorkQueueTest : TestBase() {
6363

6464
val stealer = WorkQueue()
6565
val ref = ObjectRef<Task?>()
66-
assertEquals(TASK_STOLEN, stealer.tryStealFrom(victim, ref))
66+
assertEquals(TASK_STOLEN, victim.trySteal(ref))
6767
assertEquals(arrayListOf(1L), stealer.drain(ref))
6868

69-
assertEquals(TASK_STOLEN, stealer.tryStealFrom(victim, ref))
69+
assertEquals(TASK_STOLEN, victim.trySteal(ref))
7070
assertEquals(arrayListOf(2L), stealer.drain(ref))
7171
}
7272
}

0 commit comments

Comments
 (0)