diff --git a/core/kotlinx-coroutines-core/src/scheduling/CoroutineScheduler.kt b/core/kotlinx-coroutines-core/src/scheduling/CoroutineScheduler.kt index 2fbda69769..79df560aac 100644 --- a/core/kotlinx-coroutines-core/src/scheduling/CoroutineScheduler.kt +++ b/core/kotlinx-coroutines-core/src/scheduling/CoroutineScheduler.kt @@ -111,21 +111,19 @@ internal class CoroutineScheduler( * * Note, [newIndex] can be zero for the worker that is being terminated (removed from [workers]). */ - private fun parkedWorkersStackTopUpdate(oldIndex: Int, newIndex: Int) { + private fun parkedWorkersStackTopUpdate(worker: Worker, oldIndex: Int, newIndex: Int) { parkedWorkersStack.loop { top -> val index = (top and PARKED_INDEX_MASK).toInt() val updVersion = (top + PARKED_VERSION_INC) and PARKED_VERSION_MASK val updIndex = if (index == oldIndex) { if (newIndex == 0) { - parkedWorkersStackNextIndex(workers[oldIndex]!!) - } - else { + parkedWorkersStackNextIndex(worker) + } else { newIndex } } else { index // no change to index, but update version } - if (updIndex < 0) return@loop // retry if (parkedWorkersStack.compareAndSet(top, updVersion or updIndex.toLong())) return } @@ -254,7 +252,7 @@ internal class CoroutineScheduler( private const val MAX_SPINS = 1000 private const val MAX_YIELDS = MAX_SPINS + 500 - @JvmStatic // Note, that is fits into Int (it is is equal to 10^9) + @JvmStatic // Note, that is fits into Int (it is equal to 10^9) private val MAX_PARK_TIME_NS = TimeUnit.SECONDS.toNanos(1).toInt() @JvmStatic @@ -824,10 +822,7 @@ internal class CoroutineScheduler( * See tryUnpark for state reasoning. * If this CAS fails, then we were successfully unparked by other worker and cannot terminate. */ - if (!terminationState.compareAndSet( - ALLOWED, - TERMINATED - )) return + if (!terminationState.compareAndSet(ALLOWED, TERMINATED)) return /* * At this point this thread is no longer considered as usable for scheduling. * We need multi-step choreography to reindex workers. @@ -843,22 +838,25 @@ internal class CoroutineScheduler( * 2) Update top of stack if it was pointing to oldIndex and make sure no * pending push/pop operation that might have already retrieved oldIndex could complete. */ - parkedWorkersStackTopUpdate(oldIndex, 0) + parkedWorkersStackTopUpdate(this, oldIndex, 0) /* - * 3) Move last worker into an index in array that was previously occupied by this worker. + * 3) Move last worker into an index in array that was previously occupied by this worker, + * if last worker was a different one (sic!). */ val lastIndex = decrementCreatedWorkers() - val lastWorker = workers[lastIndex]!! - workers[oldIndex] = lastWorker - lastWorker.indexInArray = oldIndex - /* - * Now lastWorker is available at both indices in the array, but it can - * still be at the stack top on via its lastIndex - * - * 4) Update top of stack lastIndex -> oldIndex and make sure no - * pending push/pop operation that might have already retrieved lastIndex could complete. - */ - parkedWorkersStackTopUpdate(lastIndex, oldIndex) + if (lastIndex != oldIndex) { + val lastWorker = workers[lastIndex]!! + workers[oldIndex] = lastWorker + lastWorker.indexInArray = oldIndex + /* + * Now lastWorker is available at both indices in the array, but it can + * still be at the stack top on via its lastIndex + * + * 4) Update top of stack lastIndex -> oldIndex and make sure no + * pending push/pop operation that might have already retrieved lastIndex could complete. + */ + parkedWorkersStackTopUpdate(lastWorker, lastIndex, oldIndex) + } /* * 5) It is safe to clear reference from workers array now. */ diff --git a/core/kotlinx-coroutines-core/test/scheduling/BlockingIOTerminationStressTest.kt b/core/kotlinx-coroutines-core/test/scheduling/BlockingIOTerminationStressTest.kt new file mode 100644 index 0000000000..48e2653f1f --- /dev/null +++ b/core/kotlinx-coroutines-core/test/scheduling/BlockingIOTerminationStressTest.kt @@ -0,0 +1,38 @@ +/* + * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.experimental + +import kotlinx.coroutines.experimental.scheduling.* +import org.junit.* +import java.util.* +import java.util.concurrent.* + +class BlockingIOTerminationStressTest : TestBase() { + private val baseDispatcher = ExperimentalCoroutineDispatcher( + 2, 20, + TimeUnit.MILLISECONDS.toNanos(10) + ) + private val ioDispatcher = baseDispatcher.blocking() + private val TEST_SECONDS = 3L * stressTestMultiplier + + @After + fun tearDown() { + baseDispatcher.close() + } + + @Test + fun testTermination() { + val rnd = Random() + val deadline = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(TEST_SECONDS) + while (System.currentTimeMillis() < deadline) { + Thread.sleep(rnd.nextInt(30).toLong()) + repeat(rnd.nextInt(5) + 1) { + launch(ioDispatcher) { + Thread.sleep(rnd.nextInt(5).toLong()) + } + } + } + } +}