From 8330a36b81ac54eb43dae0104fee290150a4b1aa Mon Sep 17 00:00:00 2001 From: Dmitry Khalanskiy Date: Fri, 20 Jan 2023 15:20:22 +0100 Subject: [PATCH 1/7] Don't allocate threads on every dispatch in Native's thread pools --- .../test/MultithreadedDispatcherStressTest.kt | 34 +++++++++ .../native/src/MultithreadedDispatchers.kt | 70 ++++++++++++++----- .../native/test/WorkerTest.kt | 35 ++++++++++ 3 files changed, 123 insertions(+), 16 deletions(-) create mode 100644 kotlinx-coroutines-core/concurrent/test/MultithreadedDispatcherStressTest.kt diff --git a/kotlinx-coroutines-core/concurrent/test/MultithreadedDispatcherStressTest.kt b/kotlinx-coroutines-core/concurrent/test/MultithreadedDispatcherStressTest.kt new file mode 100644 index 0000000000..262c1a07af --- /dev/null +++ b/kotlinx-coroutines-core/concurrent/test/MultithreadedDispatcherStressTest.kt @@ -0,0 +1,34 @@ +/* + * Copyright 2016-2023 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ +package kotlinx.coroutines + +import kotlinx.atomicfu.* +import kotlin.coroutines.* +import kotlin.test.* + +class MultithreadedDispatcherStressTest { + + private val n = (if (isNative) 1_000 else 10_000) * stressTestMultiplier + val shared = atomic(0) + + /** + * Tests that [newFixedThreadPoolContext] will not drop tasks when closed. + */ + @Test + fun testClosingNotDroppingTasks() { + repeat(7) { + shared.value = 0 + val nThreads = it + 1 + val dispatcher = newFixedThreadPoolContext(nThreads, "testMultiThreadedContext") + repeat(n) { + dispatcher.dispatch(EmptyCoroutineContext, Runnable { + shared.incrementAndGet() + }) + } + dispatcher.close() + val m = shared.value + assertEquals(n, m, "$nThreads threads") + } + } +} diff --git a/kotlinx-coroutines-core/native/src/MultithreadedDispatchers.kt b/kotlinx-coroutines-core/native/src/MultithreadedDispatchers.kt index bf91e7003b..e8b239bcf9 100644 --- a/kotlinx-coroutines-core/native/src/MultithreadedDispatchers.kt +++ b/kotlinx-coroutines-core/native/src/MultithreadedDispatchers.kt @@ -4,6 +4,7 @@ package kotlinx.coroutines +import kotlinx.atomicfu.* import kotlinx.coroutines.channels.* import kotlinx.coroutines.internal.* import kotlin.coroutines.* @@ -73,38 +74,75 @@ private class MultiWorkerDispatcher( workersCount: Int ) : CloseableCoroutineDispatcher() { private val tasksQueue = Channel(Channel.UNLIMITED) + private val availableWorkers = Channel>(Channel.UNLIMITED) private val workerPool = OnDemandAllocatingPool(workersCount) { Worker.start(name = "$name-$it").apply { executeAfter { workerRunLoop() } } } + /** + * (number of tasks - number of workers) * 2 + (1 if closed) + */ + private val tasksAndWorkersCounter = atomic(0L) + + private inline fun Long.isClosed() = this and 1L == 1L + private inline fun Long.hasTasks() = this >= 2 + private inline fun Long.hasWorkers() = this < 0 + private fun workerRunLoop() = runBlocking { - // NB: we leverage tail-call optimization in this loop, do not replace it with - // .receive() without proper evaluation - for (task in tasksQueue) { - /** - * Any unhandled exception here will pass through worker's boundary and will be properly reported. - */ - task.run() + val privateChannel = Channel(1) + while (true) { + val state = tasksAndWorkersCounter.getAndUpdate { + if (it.isClosed() && !it.hasTasks()) return@runBlocking + it - 2 + } + if (state.hasTasks()) { + // we promised to process a task, and there are some + tasksQueue.receive().run() + } else { + availableWorkers.send(privateChannel) + privateChannel.receiveCatching().getOrNull()?.run() + } } } - override fun dispatch(context: CoroutineContext, block: Runnable) { - fun throwClosed(block: Runnable) { - throw IllegalStateException("Dispatcher $name was closed, attempted to schedule: $block") + private fun obtainWorker(): Channel { + // spin loop until a worker that promised to be here actually arrives. + while (true) { + val result = availableWorkers.tryReceive() + return result.getOrNull() ?: continue } + } - if (!workerPool.allocate()) throwClosed(block) // Do not even try to send to avoid race - - tasksQueue.trySend(block).onClosed { - throwClosed(block) + override fun dispatch(context: CoroutineContext, block: Runnable) { + val state = tasksAndWorkersCounter.getAndUpdate { + if (it.isClosed()) + throw IllegalStateException("Dispatcher $name was closed, attempted to schedule: $block") + it + 2 + } + if (state.hasWorkers()) { + // there are workers that have nothing to do, let's grab one of them + obtainWorker().trySend(block) + } else { + workerPool.allocate() + // no workers are available, we must queue the task + tasksQueue.trySend(block) } } override fun close() { - val workers = workerPool.close() - tasksQueue.close() + tasksAndWorkersCounter.getAndUpdate { if (it.isClosed()) it else it or 1L } + val workers = workerPool.close() // no new workers will be created + while (true) { + // check if there are workers that await tasks in their personal channels, we need to wake them up + val state = tasksAndWorkersCounter.getAndUpdate { + if (it.hasWorkers()) it + 2 else it + } + if (!state.hasWorkers()) + break + obtainWorker().close() + } /* * Here we cannot avoid waiting on `.result`, otherwise it will lead * to a native memory leak, including a pthread handle. diff --git a/kotlinx-coroutines-core/native/test/WorkerTest.kt b/kotlinx-coroutines-core/native/test/WorkerTest.kt index 7ae31b2656..ce3304ba9a 100644 --- a/kotlinx-coroutines-core/native/test/WorkerTest.kt +++ b/kotlinx-coroutines-core/native/test/WorkerTest.kt @@ -62,4 +62,39 @@ class WorkerTest : TestBase() { finished.receive() } } + + /** + * Test that [newFixedThreadPoolContext] does not allocate more dispatchers than it needs to. + * Incidentally also tests that it will allocate enough workers for its needs. Otherwise, the test will hang. + */ + @Test + fun testNotAllocatingExtraDispatchers() { + suspend fun spin(set: MutableSet) { + repeat(100) { + set.add(Worker.current) + delay(1) + } + } + val dispatcher = newFixedThreadPoolContext(64, "test") + try { + runBlocking { + val encounteredWorkers = mutableSetOf() + var canStart = false + val coroutine1 = launch(dispatcher) { + while (!canStart) { + // intentionally empty + } + spin(encounteredWorkers) + } + val coroutine2 = launch(dispatcher) { + canStart = true + spin(encounteredWorkers) + } + listOf(coroutine1, coroutine2).joinAll() + assertEquals(2, encounteredWorkers.size) + } + } finally { + dispatcher.close() + } + } } From e69842d5dc99423355943bc88a8bccdd63be1f30 Mon Sep 17 00:00:00 2001 From: Dmitry Khalanskiy Date: Fri, 20 Jan 2023 16:16:04 +0100 Subject: [PATCH 2/7] Document a suddenly revealed bug --- .../common/src/CloseableCoroutineDispatcher.kt | 4 ++-- .../test/MultithreadedDispatcherStressTest.kt | 7 +++---- 2 files changed, 5 insertions(+), 6 deletions(-) rename kotlinx-coroutines-core/{concurrent => native}/test/MultithreadedDispatcherStressTest.kt (84%) diff --git a/kotlinx-coroutines-core/common/src/CloseableCoroutineDispatcher.kt b/kotlinx-coroutines-core/common/src/CloseableCoroutineDispatcher.kt index 9c6703291a..b656094c53 100644 --- a/kotlinx-coroutines-core/common/src/CloseableCoroutineDispatcher.kt +++ b/kotlinx-coroutines-core/common/src/CloseableCoroutineDispatcher.kt @@ -19,8 +19,8 @@ public expect abstract class CloseableCoroutineDispatcher() : CoroutineDispatche /** * Initiate the closing sequence of the coroutine dispatcher. - * After a successful call to [close], no new tasks will - * be accepted to be [dispatched][dispatch], but the previously dispatched tasks will be run. + * After a successful call to [close], no new tasks will be accepted to be [dispatched][dispatch]. + * The previously dispatched tasks may be lost as well, so the caller must ensure all the work is finished. * * Invocations of `close` are idempotent and thread-safe. */ diff --git a/kotlinx-coroutines-core/concurrent/test/MultithreadedDispatcherStressTest.kt b/kotlinx-coroutines-core/native/test/MultithreadedDispatcherStressTest.kt similarity index 84% rename from kotlinx-coroutines-core/concurrent/test/MultithreadedDispatcherStressTest.kt rename to kotlinx-coroutines-core/native/test/MultithreadedDispatcherStressTest.kt index 262c1a07af..6e2772a0db 100644 --- a/kotlinx-coroutines-core/concurrent/test/MultithreadedDispatcherStressTest.kt +++ b/kotlinx-coroutines-core/native/test/MultithreadedDispatcherStressTest.kt @@ -1,6 +1,7 @@ /* * Copyright 2016-2023 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ + package kotlinx.coroutines import kotlinx.atomicfu.* @@ -8,8 +9,6 @@ import kotlin.coroutines.* import kotlin.test.* class MultithreadedDispatcherStressTest { - - private val n = (if (isNative) 1_000 else 10_000) * stressTestMultiplier val shared = atomic(0) /** @@ -21,14 +20,14 @@ class MultithreadedDispatcherStressTest { shared.value = 0 val nThreads = it + 1 val dispatcher = newFixedThreadPoolContext(nThreads, "testMultiThreadedContext") - repeat(n) { + repeat(1_000) { dispatcher.dispatch(EmptyCoroutineContext, Runnable { shared.incrementAndGet() }) } dispatcher.close() val m = shared.value - assertEquals(n, m, "$nThreads threads") + assertEquals(1_000, m, "$nThreads threads") } } } From 14affab51f0700cb1e51c330a397b912d5a7effc Mon Sep 17 00:00:00 2001 From: Dmitry Khalanskiy Date: Fri, 20 Jan 2023 16:28:16 +0100 Subject: [PATCH 3/7] JVM: wait for the existing tasks to finish in close() --- .../common/src/CloseableCoroutineDispatcher.kt | 2 +- .../test/MultithreadedDispatcherStressTest.kt | 0 .../jvm/test/knit/ClosedAfterGuideTestExecutor.kt | 5 ++++- 3 files changed, 5 insertions(+), 2 deletions(-) rename kotlinx-coroutines-core/{native => concurrent}/test/MultithreadedDispatcherStressTest.kt (100%) diff --git a/kotlinx-coroutines-core/common/src/CloseableCoroutineDispatcher.kt b/kotlinx-coroutines-core/common/src/CloseableCoroutineDispatcher.kt index b656094c53..49f8ead3eb 100644 --- a/kotlinx-coroutines-core/common/src/CloseableCoroutineDispatcher.kt +++ b/kotlinx-coroutines-core/common/src/CloseableCoroutineDispatcher.kt @@ -20,7 +20,7 @@ public expect abstract class CloseableCoroutineDispatcher() : CoroutineDispatche /** * Initiate the closing sequence of the coroutine dispatcher. * After a successful call to [close], no new tasks will be accepted to be [dispatched][dispatch]. - * The previously dispatched tasks may be lost as well, so the caller must ensure all the work is finished. + * The previously-submitted tasks will still be run before the call to [close] is finished. * * Invocations of `close` are idempotent and thread-safe. */ diff --git a/kotlinx-coroutines-core/native/test/MultithreadedDispatcherStressTest.kt b/kotlinx-coroutines-core/concurrent/test/MultithreadedDispatcherStressTest.kt similarity index 100% rename from kotlinx-coroutines-core/native/test/MultithreadedDispatcherStressTest.kt rename to kotlinx-coroutines-core/concurrent/test/MultithreadedDispatcherStressTest.kt diff --git a/kotlinx-coroutines-core/jvm/test/knit/ClosedAfterGuideTestExecutor.kt b/kotlinx-coroutines-core/jvm/test/knit/ClosedAfterGuideTestExecutor.kt index 30fbfee264..945824cc03 100644 --- a/kotlinx-coroutines-core/jvm/test/knit/ClosedAfterGuideTestExecutor.kt +++ b/kotlinx-coroutines-core/jvm/test/knit/ClosedAfterGuideTestExecutor.kt @@ -44,7 +44,10 @@ private class ClosedAfterGuideTestDispatcher( } override fun close() { - (executor as ExecutorService).shutdown() + (executor as ExecutorService).apply { + shutdown() + awaitTermination(1, TimeUnit.MINUTES) + } } override fun toString(): String = "ThreadPoolDispatcher[$nThreads, $name]" From e4956271e92a1eba7877450df62f89370ae17d23 Mon Sep 17 00:00:00 2001 From: Dmitry Khalanskiy Date: Mon, 23 Jan 2023 13:38:39 +0100 Subject: [PATCH 4/7] Revert the change on the JVM --- .../src/CloseableCoroutineDispatcher.kt | 2 +- .../test/MultithreadedDispatcherStressTest.kt | 6 ++++-- .../test/knit/ClosedAfterGuideTestExecutor.kt | 5 +---- .../native/src/MultithreadedDispatchers.kt | 19 ++++++++++++------- 4 files changed, 18 insertions(+), 14 deletions(-) diff --git a/kotlinx-coroutines-core/common/src/CloseableCoroutineDispatcher.kt b/kotlinx-coroutines-core/common/src/CloseableCoroutineDispatcher.kt index 49f8ead3eb..541b3082e2 100644 --- a/kotlinx-coroutines-core/common/src/CloseableCoroutineDispatcher.kt +++ b/kotlinx-coroutines-core/common/src/CloseableCoroutineDispatcher.kt @@ -20,7 +20,7 @@ public expect abstract class CloseableCoroutineDispatcher() : CoroutineDispatche /** * Initiate the closing sequence of the coroutine dispatcher. * After a successful call to [close], no new tasks will be accepted to be [dispatched][dispatch]. - * The previously-submitted tasks will still be run before the call to [close] is finished. + * The previously-submitted tasks will still be run, but [close] is not guaranteed to wait for them to finish. * * Invocations of `close` are idempotent and thread-safe. */ diff --git a/kotlinx-coroutines-core/concurrent/test/MultithreadedDispatcherStressTest.kt b/kotlinx-coroutines-core/concurrent/test/MultithreadedDispatcherStressTest.kt index 6e2772a0db..4e4583f20a 100644 --- a/kotlinx-coroutines-core/concurrent/test/MultithreadedDispatcherStressTest.kt +++ b/kotlinx-coroutines-core/concurrent/test/MultithreadedDispatcherStressTest.kt @@ -26,8 +26,10 @@ class MultithreadedDispatcherStressTest { }) } dispatcher.close() - val m = shared.value - assertEquals(1_000, m, "$nThreads threads") + while (shared.value < 1_000) { + // spin. + // the test will hang here if the dispatcher drops tasks. + } } } } diff --git a/kotlinx-coroutines-core/jvm/test/knit/ClosedAfterGuideTestExecutor.kt b/kotlinx-coroutines-core/jvm/test/knit/ClosedAfterGuideTestExecutor.kt index 945824cc03..30fbfee264 100644 --- a/kotlinx-coroutines-core/jvm/test/knit/ClosedAfterGuideTestExecutor.kt +++ b/kotlinx-coroutines-core/jvm/test/knit/ClosedAfterGuideTestExecutor.kt @@ -44,10 +44,7 @@ private class ClosedAfterGuideTestDispatcher( } override fun close() { - (executor as ExecutorService).apply { - shutdown() - awaitTermination(1, TimeUnit.MINUTES) - } + (executor as ExecutorService).shutdown() } override fun toString(): String = "ThreadPoolDispatcher[$nThreads, $name]" diff --git a/kotlinx-coroutines-core/native/src/MultithreadedDispatchers.kt b/kotlinx-coroutines-core/native/src/MultithreadedDispatchers.kt index e8b239bcf9..34d2a5ea59 100644 --- a/kotlinx-coroutines-core/native/src/MultithreadedDispatchers.kt +++ b/kotlinx-coroutines-core/native/src/MultithreadedDispatchers.kt @@ -74,7 +74,7 @@ private class MultiWorkerDispatcher( workersCount: Int ) : CloseableCoroutineDispatcher() { private val tasksQueue = Channel(Channel.UNLIMITED) - private val availableWorkers = Channel>(Channel.UNLIMITED) + private val availableWorkers = Channel>(Channel.UNLIMITED) private val workerPool = OnDemandAllocatingPool(workersCount) { Worker.start(name = "$name-$it").apply { executeAfter { workerRunLoop() } @@ -91,7 +91,6 @@ private class MultiWorkerDispatcher( private inline fun Long.hasWorkers() = this < 0 private fun workerRunLoop() = runBlocking { - val privateChannel = Channel(1) while (true) { val state = tasksAndWorkersCounter.getAndUpdate { if (it.isClosed() && !it.hasTasks()) return@runBlocking @@ -101,13 +100,19 @@ private class MultiWorkerDispatcher( // we promised to process a task, and there are some tasksQueue.receive().run() } else { - availableWorkers.send(privateChannel) - privateChannel.receiveCatching().getOrNull()?.run() + try { + suspendCancellableCoroutine { + availableWorkers.trySend(it) + }.run() + } catch (e: CancellationException) { + /** we are cancelled from [close] and thus will never get back to this branch of code, + but there may still be pending work, so we can't just exit here. */ + } } } } - private fun obtainWorker(): Channel { + private fun obtainWorker(): CancellableContinuation { // spin loop until a worker that promised to be here actually arrives. while (true) { val result = availableWorkers.tryReceive() @@ -123,7 +128,7 @@ private class MultiWorkerDispatcher( } if (state.hasWorkers()) { // there are workers that have nothing to do, let's grab one of them - obtainWorker().trySend(block) + obtainWorker().resume(block) } else { workerPool.allocate() // no workers are available, we must queue the task @@ -141,7 +146,7 @@ private class MultiWorkerDispatcher( } if (!state.hasWorkers()) break - obtainWorker().close() + obtainWorker().cancel() } /* * Here we cannot avoid waiting on `.result`, otherwise it will lead From aa307362c57f9867ebe7430c73226573c36ef15f Mon Sep 17 00:00:00 2001 From: Dmitry Khalanskiy Date: Mon, 23 Jan 2023 14:22:47 +0100 Subject: [PATCH 5/7] Fix a test --- .../test/MultithreadedDispatchersTest.kt | 50 +++++++++++++++++++ .../native/test/WorkerTest.kt | 35 ------------- 2 files changed, 50 insertions(+), 35 deletions(-) create mode 100644 kotlinx-coroutines-core/native/test/MultithreadedDispatchersTest.kt diff --git a/kotlinx-coroutines-core/native/test/MultithreadedDispatchersTest.kt b/kotlinx-coroutines-core/native/test/MultithreadedDispatchersTest.kt new file mode 100644 index 0000000000..2d6267a4d2 --- /dev/null +++ b/kotlinx-coroutines-core/native/test/MultithreadedDispatchersTest.kt @@ -0,0 +1,50 @@ +/* + * Copyright 2016-2023 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines + +import kotlin.native.concurrent.* +import kotlin.test.* + +class MultithreadedDispatchersTest { + /** + * Test that [newFixedThreadPoolContext] does not allocate more dispatchers than it needs to. + * Incidentally also tests that it will allocate enough workers for its needs. Otherwise, the test will hang. + */ + @Test + fun testNotAllocatingExtraDispatchers() { + suspend fun spin(set: MutableSet) { + repeat(100) { + set.add(Worker.current) + delay(1) + } + } + val dispatcher = newFixedThreadPoolContext(64, "test") + try { + runBlocking { + val encounteredWorkers = mutableSetOf() + var canStart1 = false + var canStart2 = false + val coroutine1 = launch(dispatcher) { + while (!canStart1) { + // intentionally empty + } + canStart2 = true + spin(encounteredWorkers) + } + val coroutine2 = launch(dispatcher) { + canStart1 = true + while (!canStart2) { + // intentionally empty + } + spin(encounteredWorkers) + } + listOf(coroutine1, coroutine2).joinAll() + assertEquals(2, encounteredWorkers.size) + } + } finally { + dispatcher.close() + } + } +} diff --git a/kotlinx-coroutines-core/native/test/WorkerTest.kt b/kotlinx-coroutines-core/native/test/WorkerTest.kt index ce3304ba9a..7ae31b2656 100644 --- a/kotlinx-coroutines-core/native/test/WorkerTest.kt +++ b/kotlinx-coroutines-core/native/test/WorkerTest.kt @@ -62,39 +62,4 @@ class WorkerTest : TestBase() { finished.receive() } } - - /** - * Test that [newFixedThreadPoolContext] does not allocate more dispatchers than it needs to. - * Incidentally also tests that it will allocate enough workers for its needs. Otherwise, the test will hang. - */ - @Test - fun testNotAllocatingExtraDispatchers() { - suspend fun spin(set: MutableSet) { - repeat(100) { - set.add(Worker.current) - delay(1) - } - } - val dispatcher = newFixedThreadPoolContext(64, "test") - try { - runBlocking { - val encounteredWorkers = mutableSetOf() - var canStart = false - val coroutine1 = launch(dispatcher) { - while (!canStart) { - // intentionally empty - } - spin(encounteredWorkers) - } - val coroutine2 = launch(dispatcher) { - canStart = true - spin(encounteredWorkers) - } - listOf(coroutine1, coroutine2).joinAll() - assertEquals(2, encounteredWorkers.size) - } - } finally { - dispatcher.close() - } - } } From 1f83a183b62236f2584023b6694006dea7a97cbc Mon Sep 17 00:00:00 2001 From: Dmitry Khalanskiy Date: Thu, 9 Feb 2023 15:35:57 +0100 Subject: [PATCH 6/7] Address the review --- .../native/src/MultithreadedDispatchers.kt | 24 ++++++++++++------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/kotlinx-coroutines-core/native/src/MultithreadedDispatchers.kt b/kotlinx-coroutines-core/native/src/MultithreadedDispatchers.kt index 34d2a5ea59..0012ff65db 100644 --- a/kotlinx-coroutines-core/native/src/MultithreadedDispatchers.kt +++ b/kotlinx-coroutines-core/native/src/MultithreadedDispatchers.kt @@ -102,7 +102,8 @@ private class MultiWorkerDispatcher( } else { try { suspendCancellableCoroutine { - availableWorkers.trySend(it) + val result = availableWorkers.trySend(it) + checkChannelResult(result) }.run() } catch (e: CancellationException) { /** we are cancelled from [close] and thus will never get back to this branch of code, @@ -112,13 +113,9 @@ private class MultiWorkerDispatcher( } } - private fun obtainWorker(): CancellableContinuation { - // spin loop until a worker that promised to be here actually arrives. - while (true) { - val result = availableWorkers.tryReceive() - return result.getOrNull() ?: continue - } - } + // a worker that promised to be here and should actually arrive, so we wait for it in a blocking manner. + private fun obtainWorker(): CancellableContinuation = + availableWorkers.tryReceive().getOrNull() ?: runBlocking { availableWorkers.receive() } override fun dispatch(context: CoroutineContext, block: Runnable) { val state = tasksAndWorkersCounter.getAndUpdate { @@ -132,7 +129,8 @@ private class MultiWorkerDispatcher( } else { workerPool.allocate() // no workers are available, we must queue the task - tasksQueue.trySend(block) + val result = tasksQueue.trySend(block) + checkChannelResult(result) } } @@ -155,4 +153,12 @@ private class MultiWorkerDispatcher( val requests = workers.map { it.requestTermination() } requests.map { it.result } } + + private fun checkChannelResult(result: ChannelResult<*>) { + if (!result.isSuccess) + throw IllegalStateException( + "Internal invariants of $this were violated, please file a bug to kotlinx.coroutines", + result.exceptionOrNull() + ) + } } From e4a7463e1446247c93caf3c3cb52ae9f1c0eabd4 Mon Sep 17 00:00:00 2001 From: Dmitry Khalanskiy Date: Fri, 10 Feb 2023 12:44:06 +0100 Subject: [PATCH 7/7] Fix embarassing data races --- .../test/MultithreadedDispatchersTest.kt | 38 +++++++++++++------ 1 file changed, 27 insertions(+), 11 deletions(-) diff --git a/kotlinx-coroutines-core/native/test/MultithreadedDispatchersTest.kt b/kotlinx-coroutines-core/native/test/MultithreadedDispatchersTest.kt index 2d6267a4d2..ce433cc3e3 100644 --- a/kotlinx-coroutines-core/native/test/MultithreadedDispatchersTest.kt +++ b/kotlinx-coroutines-core/native/test/MultithreadedDispatchersTest.kt @@ -4,9 +4,31 @@ package kotlinx.coroutines +import kotlinx.atomicfu.* +import kotlinx.coroutines.channels.* +import kotlinx.coroutines.internal.* import kotlin.native.concurrent.* import kotlin.test.* +private class BlockingBarrier(val n: Int) { + val counter = atomic(0) + val wakeUp = Channel(n - 1) + fun await() { + val count = counter.addAndGet(1) + if (count == n) { + repeat(n - 1) { + runBlocking { + wakeUp.send(Unit) + } + } + } else if (count < n) { + runBlocking { + wakeUp.receive() + } + } + } +} + class MultithreadedDispatchersTest { /** * Test that [newFixedThreadPoolContext] does not allocate more dispatchers than it needs to. @@ -14,9 +36,11 @@ class MultithreadedDispatchersTest { */ @Test fun testNotAllocatingExtraDispatchers() { + val barrier = BlockingBarrier(2) + val lock = SynchronizedObject() suspend fun spin(set: MutableSet) { repeat(100) { - set.add(Worker.current) + synchronized(lock) { set.add(Worker.current) } delay(1) } } @@ -24,20 +48,12 @@ class MultithreadedDispatchersTest { try { runBlocking { val encounteredWorkers = mutableSetOf() - var canStart1 = false - var canStart2 = false val coroutine1 = launch(dispatcher) { - while (!canStart1) { - // intentionally empty - } - canStart2 = true + barrier.await() spin(encounteredWorkers) } val coroutine2 = launch(dispatcher) { - canStart1 = true - while (!canStart2) { - // intentionally empty - } + barrier.await() spin(encounteredWorkers) } listOf(coroutine1, coroutine2).joinAll()