Skip to content

Commit 750d468

Browse files
elizarovqwwdfsad
authored andcommitted
Fixed hangs during thread termination in IO dispatcher
Fixes #524 Fixes #525
1 parent 73d1e46 commit 750d468

File tree

2 files changed

+59
-23
lines changed

2 files changed

+59
-23
lines changed

core/kotlinx-coroutines-core/src/scheduling/CoroutineScheduler.kt

+21-23
Original file line numberDiff line numberDiff line change
@@ -111,21 +111,19 @@ internal class CoroutineScheduler(
111111
*
112112
* Note, [newIndex] can be zero for the worker that is being terminated (removed from [workers]).
113113
*/
114-
private fun parkedWorkersStackTopUpdate(oldIndex: Int, newIndex: Int) {
114+
private fun parkedWorkersStackTopUpdate(worker: Worker, oldIndex: Int, newIndex: Int) {
115115
parkedWorkersStack.loop { top ->
116116
val index = (top and PARKED_INDEX_MASK).toInt()
117117
val updVersion = (top + PARKED_VERSION_INC) and PARKED_VERSION_MASK
118118
val updIndex = if (index == oldIndex) {
119119
if (newIndex == 0) {
120-
parkedWorkersStackNextIndex(workers[oldIndex]!!)
121-
}
122-
else {
120+
parkedWorkersStackNextIndex(worker)
121+
} else {
123122
newIndex
124123
}
125124
} else {
126125
index // no change to index, but update version
127126
}
128-
129127
if (updIndex < 0) return@loop // retry
130128
if (parkedWorkersStack.compareAndSet(top, updVersion or updIndex.toLong())) return
131129
}
@@ -254,7 +252,7 @@ internal class CoroutineScheduler(
254252
private const val MAX_SPINS = 1000
255253
private const val MAX_YIELDS = MAX_SPINS + 500
256254

257-
@JvmStatic // Note, that is fits into Int (it is is equal to 10^9)
255+
@JvmStatic // Note, that is fits into Int (it is equal to 10^9)
258256
private val MAX_PARK_TIME_NS = TimeUnit.SECONDS.toNanos(1).toInt()
259257

260258
@JvmStatic
@@ -824,10 +822,7 @@ internal class CoroutineScheduler(
824822
* See tryUnpark for state reasoning.
825823
* If this CAS fails, then we were successfully unparked by other worker and cannot terminate.
826824
*/
827-
if (!terminationState.compareAndSet(
828-
ALLOWED,
829-
TERMINATED
830-
)) return
825+
if (!terminationState.compareAndSet(ALLOWED, TERMINATED)) return
831826
/*
832827
* At this point this thread is no longer considered as usable for scheduling.
833828
* We need multi-step choreography to reindex workers.
@@ -843,22 +838,25 @@ internal class CoroutineScheduler(
843838
* 2) Update top of stack if it was pointing to oldIndex and make sure no
844839
* pending push/pop operation that might have already retrieved oldIndex could complete.
845840
*/
846-
parkedWorkersStackTopUpdate(oldIndex, 0)
841+
parkedWorkersStackTopUpdate(this, oldIndex, 0)
847842
/*
848-
* 3) Move last worker into an index in array that was previously occupied by this worker.
843+
* 3) Move last worker into an index in array that was previously occupied by this worker,
844+
* if last worker was a different one (sic!).
849845
*/
850846
val lastIndex = decrementCreatedWorkers()
851-
val lastWorker = workers[lastIndex]!!
852-
workers[oldIndex] = lastWorker
853-
lastWorker.indexInArray = oldIndex
854-
/*
855-
* Now lastWorker is available at both indices in the array, but it can
856-
* still be at the stack top on via its lastIndex
857-
*
858-
* 4) Update top of stack lastIndex -> oldIndex and make sure no
859-
* pending push/pop operation that might have already retrieved lastIndex could complete.
860-
*/
861-
parkedWorkersStackTopUpdate(lastIndex, oldIndex)
847+
if (lastIndex != oldIndex) {
848+
val lastWorker = workers[lastIndex]!!
849+
workers[oldIndex] = lastWorker
850+
lastWorker.indexInArray = oldIndex
851+
/*
852+
* Now lastWorker is available at both indices in the array, but it can
853+
* still be at the stack top on via its lastIndex
854+
*
855+
* 4) Update top of stack lastIndex -> oldIndex and make sure no
856+
* pending push/pop operation that might have already retrieved lastIndex could complete.
857+
*/
858+
parkedWorkersStackTopUpdate(lastWorker, lastIndex, oldIndex)
859+
}
862860
/*
863861
* 5) It is safe to clear reference from workers array now.
864862
*/
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.experimental
6+
7+
import kotlinx.coroutines.experimental.scheduling.*
8+
import org.junit.*
9+
import java.util.*
10+
import java.util.concurrent.*
11+
12+
class BlockingIOTerminationStressTest : TestBase() {
13+
private val baseDispatcher = ExperimentalCoroutineDispatcher(
14+
2, 20,
15+
TimeUnit.MILLISECONDS.toNanos(10)
16+
)
17+
private val ioDispatcher = baseDispatcher.blocking()
18+
private val TEST_SECONDS = 3L * stressTestMultiplier
19+
20+
@After
21+
fun tearDown() {
22+
baseDispatcher.close()
23+
}
24+
25+
@Test
26+
fun testTermination() {
27+
val rnd = Random()
28+
val deadline = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(TEST_SECONDS)
29+
while (System.currentTimeMillis() < deadline) {
30+
Thread.sleep(rnd.nextInt(30).toLong())
31+
repeat(rnd.nextInt(5) + 1) {
32+
launch(ioDispatcher) {
33+
Thread.sleep(rnd.nextInt(5).toLong())
34+
}
35+
}
36+
}
37+
}
38+
}

0 commit comments

Comments
 (0)