Skip to content

Commit 4abe1f7

Browse files
committed
Check that worker threads belongs to the same scheduler in dispatchers.shutdown
Fixes #990
1 parent 605f603 commit 4abe1f7

File tree

2 files changed

+53
-4
lines changed

2 files changed

+53
-4
lines changed

kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt

+7-4
Original file line numberDiff line numberDiff line change
@@ -299,7 +299,7 @@ internal class CoroutineScheduler(
299299
// atomically set termination flag which is checked when workers are added or removed
300300
if (!_isTerminated.compareAndSet(0, 1)) return
301301
// make sure we are not waiting for the current thread
302-
val currentWorker = Thread.currentThread() as? Worker
302+
val currentWorker = currentWorker()
303303
// Capture # of created workers that cannot change anymore (mind the synchronized block!)
304304
val created = synchronized(workers) { createdWorkers }
305305
// Shutdown all workers with the only exception of the current thread
@@ -481,9 +481,7 @@ internal class CoroutineScheduler(
481481
* Returns [ADDED], or [NOT_ADDED], or [ADDED_REQUIRES_HELP].
482482
*/
483483
private fun submitToLocalQueue(task: Task, fair: Boolean): Int {
484-
val worker = Thread.currentThread() as? Worker
485-
?: return NOT_ADDED
486-
if (worker.scheduler !== this) return NOT_ADDED // different scheduler's worker (!!!)
484+
val worker = currentWorker() ?: return NOT_ADDED
487485

488486
/*
489487
* This worker could have been already terminated from this thread by close/shutdown and it should not
@@ -533,6 +531,11 @@ internal class CoroutineScheduler(
533531
return ADDED_REQUIRES_HELP
534532
}
535533

534+
private fun currentWorker(): Worker? {
535+
val worker = Thread.currentThread() as? Worker ?: return null
536+
return if (worker.scheduler == this) worker else null
537+
}
538+
536539
/**
537540
* Returns a string identifying the state of this scheduler for nicer debugging.
538541
* Note that this method is not atomic and represents rough state of pool.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.scheduling
6+
7+
import kotlinx.coroutines.*
8+
import org.junit.Test
9+
import kotlin.test.*
10+
11+
class SharingWorkerClassTest : SchedulerTestBase() {
12+
private val threadLocal = ThreadLocal<Int?>()
13+
14+
@Test
15+
fun testSharedThread() = runTest {
16+
val dispatcher = ExperimentalCoroutineDispatcher(1, schedulerName = "first")
17+
val dispatcher2 = ExperimentalCoroutineDispatcher(1, schedulerName = "second")
18+
19+
try {
20+
withContext(dispatcher) {
21+
assertNull(threadLocal.get())
22+
threadLocal.set(239)
23+
withContext(dispatcher2) {
24+
assertNull(threadLocal.get())
25+
threadLocal.set(42)
26+
}
27+
28+
assertEquals(239, threadLocal.get())
29+
}
30+
} finally {
31+
dispatcher.close()
32+
dispatcher2.close()
33+
}
34+
}
35+
36+
@Test(timeout = 5000L)
37+
fun testProgress() = runTest {
38+
// See #990
39+
val cores = Runtime.getRuntime().availableProcessors()
40+
repeat(cores + 1) {
41+
CoroutineScope(Dispatchers.Default).launch {
42+
ExperimentalCoroutineDispatcher(1).close()
43+
}.join()
44+
}
45+
}
46+
}

0 commit comments

Comments
 (0)