Skip to content

Commit 7df61ee

Browse files
authored
Fixed memory leak on a race between adding/removing from lock-free list (#1845)
* The problem was introduced by #1565. When doing concurrent add+removeFirst the following can happen: - "add" completes, but has not correct prev pointer in next node yet - "removeFirst" removes freshly added element - "add" performs "finishAdd" that adjust prev pointer of the next node and thus removed element is pointed from the list again * A separate LockFreeLinkedListAddRemoveStressTest is added that reproduces this problem. * The old LockFreeLinkedListAtomicLFStressTest is refactored a bit.
1 parent c67aed0 commit 7df61ee

File tree

3 files changed

+90
-29
lines changed

3 files changed

+90
-29
lines changed

Diff for: kotlinx-coroutines-core/jvm/src/internal/LockFreeLinkedList.kt

+10-5
Original file line numberDiff line numberDiff line change
@@ -390,7 +390,7 @@ public actual open class LockFreeLinkedListNode {
390390
final override fun updatedNext(affected: Node, next: Node): Any = next.removed()
391391

392392
final override fun finishOnSuccess(affected: Node, next: Node) {
393-
// Complete removal operation here. It bails out if next node is also removed and it becomes
393+
// Complete removal operation here. It bails out if next node is also removed. It becomes
394394
// responsibility of the next's removes to call correctPrev which would help fix all the links.
395395
next.correctPrev(null)
396396
}
@@ -531,7 +531,12 @@ public actual open class LockFreeLinkedListNode {
531531
private fun finishAdd(next: Node) {
532532
next._prev.loop { nextPrev ->
533533
if (this.next !== next) return // this or next was removed or another node added, remover/adder fixes up links
534-
if (next._prev.compareAndSet(nextPrev, this)) return
534+
if (next._prev.compareAndSet(nextPrev, this)) {
535+
// This newly added node could have been removed, and the above CAS would have added it physically again.
536+
// Let us double-check for this situation and correct if needed
537+
if (isRemoved) next.correctPrev(null)
538+
return
539+
}
535540
}
536541
}
537542

@@ -546,15 +551,15 @@ public actual open class LockFreeLinkedListNode {
546551
* * When this node is removed. In this case there is no need to waste time on corrections, because
547552
* remover of this node will ultimately call [correctPrev] on the next node and that will fix all
548553
* the links from this node, too.
549-
* * When [op] descriptor is not `null` and and operation descriptor that is [OpDescriptor.isEarlierThan]
554+
* * When [op] descriptor is not `null` and operation descriptor that is [OpDescriptor.isEarlierThan]
550555
* that current [op] is found while traversing the list. This `null` result will be translated
551556
* by callers to [RETRY_ATOMIC].
552557
*/
553558
private tailrec fun correctPrev(op: OpDescriptor?): Node? {
554559
val oldPrev = _prev.value
555560
var prev: Node = oldPrev
556561
var last: Node? = null // will be set so that last.next === prev
557-
while (true) { // move the the left until first non-removed node
562+
while (true) { // move the left until first non-removed node
558563
val prevNext: Any = prev._next.value
559564
when {
560565
// fast path to find quickly find prev node when everything is properly linked
@@ -565,7 +570,7 @@ public actual open class LockFreeLinkedListNode {
565570
// Note: retry from scratch on failure to update prev
566571
return correctPrev(op)
567572
}
568-
return prev // return a correct prev
573+
return prev // return the correct prev
569574
}
570575
// slow path when we need to help remove operations
571576
this.isRemoved -> return null // nothing to do, this node was removed, bail out asap to save time
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.internal
6+
7+
import kotlinx.atomicfu.*
8+
import kotlinx.coroutines.*
9+
import java.util.concurrent.*
10+
import kotlin.concurrent.*
11+
import kotlin.test.*
12+
13+
class LockFreeLinkedListAddRemoveStressTest : TestBase() {
14+
private class Node : LockFreeLinkedListNode()
15+
16+
private val nRepeat = 100_000 * stressTestMultiplier
17+
private val list = LockFreeLinkedListHead()
18+
private val barrier = CyclicBarrier(3)
19+
private val done = atomic(false)
20+
private val removed = atomic(0)
21+
22+
@Test
23+
fun testStressAddRemove() {
24+
val threads = ArrayList<Thread>()
25+
threads += testThread("adder") {
26+
val node = Node()
27+
list.addLast(node)
28+
if (node.remove()) removed.incrementAndGet()
29+
}
30+
threads += testThread("remover") {
31+
val node = list.removeFirstOrNull()
32+
if (node != null) removed.incrementAndGet()
33+
}
34+
try {
35+
for (i in 1..nRepeat) {
36+
barrier.await()
37+
barrier.await()
38+
assertEquals(i, removed.value)
39+
list.validate()
40+
}
41+
} finally {
42+
done.value = true
43+
barrier.await()
44+
threads.forEach { it.join() }
45+
}
46+
}
47+
48+
private fun testThread(name: String, op: () -> Unit) = thread(name = name) {
49+
while (true) {
50+
barrier.await()
51+
if (done.value) break
52+
op()
53+
barrier.await()
54+
}
55+
}
56+
}

Diff for: kotlinx-coroutines-core/jvm/test/internal/LockFreeLinkedListAtomicLFStressTest.kt

+24-24
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
2+
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
33
*/
44

55
package kotlinx.coroutines.internal
@@ -19,9 +19,9 @@ import kotlin.test.*
1919
class LockFreeLinkedListAtomicLFStressTest {
2020
private val env = LockFreedomTestEnvironment("LockFreeLinkedListAtomicLFStressTest")
2121

22-
data class IntNode(val i: Int) : LockFreeLinkedListNode()
22+
private data class Node(val i: Long) : LockFreeLinkedListNode()
2323

24-
private val TEST_DURATION_SEC = 5 * stressTestMultiplier
24+
private val nSeconds = 5 * stressTestMultiplier
2525

2626
private val nLists = 4
2727
private val nAdderThreads = 4
@@ -32,7 +32,8 @@ class LockFreeLinkedListAtomicLFStressTest {
3232
private val undone = AtomicLong()
3333
private val missed = AtomicLong()
3434
private val removed = AtomicLong()
35-
val error = AtomicReference<Throwable>()
35+
private val error = AtomicReference<Throwable>()
36+
private val index = AtomicLong()
3637

3738
@Test
3839
fun testStress() {
@@ -42,23 +43,23 @@ class LockFreeLinkedListAtomicLFStressTest {
4243
when (rnd.nextInt(4)) {
4344
0 -> {
4445
val list = lists[rnd.nextInt(nLists)]
45-
val node = IntNode(threadId)
46+
val node = Node(index.incrementAndGet())
4647
addLastOp(list, node)
4748
randomSpinWaitIntermission()
4849
tryRemoveOp(node)
4950
}
5051
1 -> {
5152
// just to test conditional add
5253
val list = lists[rnd.nextInt(nLists)]
53-
val node = IntNode(threadId)
54+
val node = Node(index.incrementAndGet())
5455
addLastIfTrueOp(list, node)
5556
randomSpinWaitIntermission()
5657
tryRemoveOp(node)
5758
}
5859
2 -> {
5960
// just to test failed conditional add and burn some time
6061
val list = lists[rnd.nextInt(nLists)]
61-
val node = IntNode(threadId)
62+
val node = Node(index.incrementAndGet())
6263
addLastIfFalseOp(list, node)
6364
}
6465
3 -> {
@@ -68,8 +69,8 @@ class LockFreeLinkedListAtomicLFStressTest {
6869
check(idx1 < idx2) // that is our global order
6970
val list1 = lists[idx1]
7071
val list2 = lists[idx2]
71-
val node1 = IntNode(threadId)
72-
val node2 = IntNode(-threadId - 1)
72+
val node1 = Node(index.incrementAndGet())
73+
val node2 = Node(index.incrementAndGet())
7374
addTwoOp(list1, node1, list2, node2)
7475
randomSpinWaitIntermission()
7576
tryRemoveOp(node1)
@@ -91,13 +92,13 @@ class LockFreeLinkedListAtomicLFStressTest {
9192
removeTwoOp(list1, list2)
9293
}
9394
}
94-
env.performTest(TEST_DURATION_SEC) {
95-
val _undone = undone.get()
96-
val _missed = missed.get()
97-
val _removed = removed.get()
98-
println(" Adders undone $_undone node additions")
99-
println(" Adders missed $_missed nodes")
100-
println("Remover removed $_removed nodes")
95+
env.performTest(nSeconds) {
96+
val undone = undone.get()
97+
val missed = missed.get()
98+
val removed = removed.get()
99+
println(" Adders undone $undone node additions")
100+
println(" Adders missed $missed nodes")
101+
println("Remover removed $removed nodes")
101102
}
102103
error.get()?.let { throw it }
103104
assertEquals(missed.get(), removed.get())
@@ -106,19 +107,19 @@ class LockFreeLinkedListAtomicLFStressTest {
106107
lists.forEach { it.validate() }
107108
}
108109

109-
private fun addLastOp(list: LockFreeLinkedListHead, node: IntNode) {
110+
private fun addLastOp(list: LockFreeLinkedListHead, node: Node) {
110111
list.addLast(node)
111112
}
112113

113-
private fun addLastIfTrueOp(list: LockFreeLinkedListHead, node: IntNode) {
114-
assertTrue(list.addLastIf(node, { true }))
114+
private fun addLastIfTrueOp(list: LockFreeLinkedListHead, node: Node) {
115+
assertTrue(list.addLastIf(node) { true })
115116
}
116117

117-
private fun addLastIfFalseOp(list: LockFreeLinkedListHead, node: IntNode) {
118-
assertFalse(list.addLastIf(node, { false }))
118+
private fun addLastIfFalseOp(list: LockFreeLinkedListHead, node: Node) {
119+
assertFalse(list.addLastIf(node) { false })
119120
}
120121

121-
private fun addTwoOp(list1: LockFreeLinkedListHead, node1: IntNode, list2: LockFreeLinkedListHead, node2: IntNode) {
122+
private fun addTwoOp(list1: LockFreeLinkedListHead, node1: Node, list2: LockFreeLinkedListHead, node2: Node) {
122123
val add1 = list1.describeAddLast(node1)
123124
val add2 = list2.describeAddLast(node2)
124125
val op = object : AtomicOp<Any?>() {
@@ -138,7 +139,7 @@ class LockFreeLinkedListAtomicLFStressTest {
138139
assertTrue(op.perform(null) == null)
139140
}
140141

141-
private fun tryRemoveOp(node: IntNode) {
142+
private fun tryRemoveOp(node: Node) {
142143
if (node.remove())
143144
undone.incrementAndGet()
144145
else
@@ -165,5 +166,4 @@ class LockFreeLinkedListAtomicLFStressTest {
165166
val success = op.perform(null) == null
166167
if (success) removed.addAndGet(2)
167168
}
168-
169169
}

0 commit comments

Comments
 (0)