Skip to content

Commit 8cdb4d6

Browse files
authored
Fix potential data race in EventLoop (#3289)
Fixes #3251
1 parent 20d47b7 commit 8cdb4d6

File tree

4 files changed

+74
-4
lines changed

4 files changed

+74
-4
lines changed

kotlinx-coroutines-core/common/src/EventLoop.common.kt

+7-1
Original file line numberDiff line numberDiff line change
@@ -236,8 +236,13 @@ internal abstract class EventLoopImplBase: EventLoopImplPlatform(), Delay {
236236
if (timeNanos < MAX_DELAY_NS) {
237237
val now = nanoTime()
238238
DelayedResumeTask(now + timeNanos, continuation).also { task ->
239-
continuation.disposeOnCancellation(task)
239+
/*
240+
* Order is important here: first we schedule the heap and only then
241+
* publish it to continuation. Otherwise, `DelayedResumeTask` would
242+
* have to know how to be disposed of even when it wasn't scheduled yet.
243+
*/
240244
schedule(now, task)
245+
continuation.disposeOnCancellation(task)
241246
}
242247
}
243248
}
@@ -410,6 +415,7 @@ internal abstract class EventLoopImplBase: EventLoopImplPlatform(), Delay {
410415
*/
411416
@JvmField var nanoTime: Long
412417
) : Runnable, Comparable<DelayedTask>, DisposableHandle, ThreadSafeHeapNode {
418+
@Volatile
413419
private var _heap: Any? = null // null | ThreadSafeHeap | DISPOSED_TASK
414420

415421
override var heap: ThreadSafeHeap<*>?

kotlinx-coroutines-core/common/src/internal/ThreadSafeHeap.kt

-2
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@ public open class ThreadSafeHeap<T> : SynchronizedObject() where T: ThreadSafeHe
4747
}
4848
}
4949

50-
// @Synchronized // NOTE! NOTE! NOTE! inline fun cannot be @Synchronized
5150
public inline fun removeFirstIf(predicate: (T) -> Boolean): T? = synchronized(this) {
5251
val first = firstImpl() ?: return null
5352
if (predicate(first)) {
@@ -59,7 +58,6 @@ public open class ThreadSafeHeap<T> : SynchronizedObject() where T: ThreadSafeHe
5958

6059
public fun addLast(node: T): Unit = synchronized(this) { addImpl(node) }
6160

62-
// @Synchronized // NOTE! NOTE! NOTE! inline fun cannot be @Synchronized
6361
// Condition also receives current first node in the heap
6462
public inline fun addLastIf(node: T, cond: (T?) -> Boolean): Boolean = synchronized(this) {
6563
if (cond(firstImpl())) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.internal
6+
7+
import kotlinx.coroutines.*
8+
import java.util.concurrent.*
9+
import java.util.concurrent.CancellationException
10+
import kotlin.test.*
11+
12+
class ThreadSafeHeapStressTest : TestBase() {
13+
private class DisposableNode : EventLoopImplBase.DelayedTask(1L) {
14+
override fun run() {
15+
}
16+
}
17+
18+
@Test
19+
fun testConcurrentRemoveDispose() = runTest {
20+
val heap = EventLoopImplBase.DelayedTaskQueue(1)
21+
repeat(10_000 * stressTestMultiplierSqrt) {
22+
withContext(Dispatchers.Default) {
23+
val node = DisposableNode()
24+
val barrier = CyclicBarrier(2)
25+
launch {
26+
heap.addLast(node)
27+
barrier.await()
28+
heap.remove(node)
29+
}
30+
launch {
31+
barrier.await()
32+
Thread.yield()
33+
node.dispose()
34+
}
35+
}
36+
}
37+
}
38+
39+
@Test()
40+
fun testConcurrentAddDispose() = runTest {
41+
repeat(10_000 * stressTestMultiplierSqrt) {
42+
val jobToCancel = Job()
43+
val barrier = CyclicBarrier(2)
44+
val jobToJoin = launch(Dispatchers.Default) {
45+
barrier.await()
46+
jobToCancel.cancelAndJoin()
47+
}
48+
49+
try {
50+
runBlocking { // Use event loop impl
51+
withContext(jobToCancel) {
52+
// This one is to work around heap allocation optimization
53+
launch(start = CoroutineStart.UNDISPATCHED) {
54+
delay(100_000)
55+
}
56+
barrier.await()
57+
delay(100_000)
58+
}
59+
}
60+
} catch (e: CancellationException) {
61+
// Expected exception
62+
}
63+
jobToJoin.join()
64+
}
65+
}
66+
}

kotlinx-coroutines-core/jvm/test/internal/ThreadSafeHeapTest.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -93,4 +93,4 @@ class ThreadSafeHeapTest : TestBase() {
9393
assertEquals(set.size, h.size)
9494
}
9595
}
96-
}
96+
}

0 commit comments

Comments
 (0)