Skip to content

Commit 0c83b45

Browse files
committed
Fix lost sender during close() call, make close() linearizable
Fixes #359
1 parent 548e778 commit 0c83b45

File tree

3 files changed

+77
-33
lines changed

3 files changed

+77
-33
lines changed

common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/AbstractChannel.kt

+43-15
Original file line numberDiff line numberDiff line change
@@ -243,23 +243,51 @@ public abstract class AbstractSendChannel<E> : SendChannel<E> {
243243

244244
public override fun close(cause: Throwable?): Boolean {
245245
val closed = Closed<E>(cause)
246+
247+
/*
248+
* Try to commit close by adding a close token to the end of the queue.
249+
* Successful -> we're now responsible for closing receivers
250+
* Not successful -> help closing pending receivers to maintain invariant
251+
* "if (!close()) next send will throw"
252+
*/
253+
val closeAdded = queue.addLastIfPrev(closed, { it !is Closed<*> })
254+
if (!closeAdded) {
255+
helpClose(queue.prevNode as Closed<*>)
256+
return false
257+
}
258+
259+
helpClose(closed)
260+
onClosed(closed)
261+
afterClose(cause)
262+
return true
263+
}
264+
265+
private fun helpClose(closed: Closed<*>) {
266+
/*
267+
* It's important to traverse list from right to left to avoid races with sender.
268+
* Consider channel state
269+
* head sentinel -> [receiver 1] -> [receiver 2] -> head sentinel
270+
* T1 invokes receive()
271+
* T2 invokes close()
272+
* T3 invokes close() + send(value)
273+
*
274+
* If both will traverse list from left to right, following non-linearizable history is possible:
275+
* [close -> false], [send -> transferred 'value' to receiver]
276+
*/
246277
while (true) {
247-
val receive = takeFirstReceiveOrPeekClosed()
248-
if (receive == null) {
249-
// queue empty or has only senders -- try add last "Closed" item to the queue
250-
if (queue.addLastIfPrev(closed, { prev ->
251-
if (prev is Closed<*>) return false // already closed
252-
prev !is ReceiveOrClosed<*> // only add close if no waiting receive
253-
})) {
254-
onClosed(closed)
255-
afterClose(cause)
256-
return true
257-
}
258-
continue // retry on failure
278+
val previous = closed.prevNode
279+
// Channel is empty or has no receivers
280+
if (previous is LockFreeLinkedListHead || previous !is Receive<*>) {
281+
break
282+
}
283+
284+
if (!previous.remove()) {
285+
continue
259286
}
260-
if (receive is Closed<*>) return false // already marked as closed -- nothing to do
261-
receive as Receive<E> // type assertion
262-
receive.resumeReceiveClosed(closed)
287+
288+
@Suppress("UNCHECKED_CAST")
289+
previous as Receive<E> // type assertion
290+
previous.resumeReceiveClosed(closed)
263291
}
264292
}
265293

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/internal/LockFreeLinkedList.kt

+1-4
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,7 @@
1616

1717
package kotlinx.coroutines.experimental.internal
1818

19-
import kotlinx.atomicfu.atomic
20-
import kotlinx.atomicfu.loop
19+
import kotlinx.atomicfu.*
2120

2221
private typealias Node = LockFreeLinkedListNode
2322

@@ -96,8 +95,6 @@ public actual open class LockFreeLinkedListNode {
9695
override fun prepare(affected: Node): Any? = if (condition()) null else CONDITION_FALSE
9796
}
9897

99-
public val isFresh: Boolean get() = _next.value === this
100-
10198
public actual val isRemoved: Boolean get() = next is Removed
10299

103100
// LINEARIZABLE. Returns Node | Removed

core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelLinearizabilityTest.kt

+33-14
Original file line numberDiff line numberDiff line change
@@ -16,23 +16,24 @@
1616

1717
package kotlinx.coroutines.experimental.channels
1818

19-
import com.devexperts.dxlab.lincheck.LinChecker
20-
import com.devexperts.dxlab.lincheck.annotations.Operation
21-
import com.devexperts.dxlab.lincheck.annotations.Param
22-
import com.devexperts.dxlab.lincheck.annotations.Reset
23-
import com.devexperts.dxlab.lincheck.paramgen.IntGen
19+
import com.devexperts.dxlab.lincheck.*
20+
import com.devexperts.dxlab.lincheck.annotations.*
21+
import com.devexperts.dxlab.lincheck.paramgen.*
2422
import com.devexperts.dxlab.lincheck.stress.*
2523
import kotlinx.coroutines.experimental.*
26-
import org.junit.Test
24+
import org.junit.*
25+
import java.io.*
2726

2827
@Param(name = "value", gen = IntGen::class, conf = "1:3")
2928
class ChannelLinearizabilityTest : TestBase() {
29+
3030
private val lt = LinTesting()
31+
private var capacity = 0
3132
private lateinit var channel: Channel<Int>
3233

3334
@Reset
3435
fun reset() {
35-
channel = Channel<Int>()
36+
channel = Channel(capacity)
3637
}
3738

3839
@Operation(runOnce = true)
@@ -53,14 +54,32 @@ class ChannelLinearizabilityTest : TestBase() {
5354
@Operation(runOnce = true)
5455
fun receive3() = lt.run("receive3") { channel.receive() }
5556

56-
// @Operation(runOnce = true)
57-
// fun close1() = lt.run("close1") { channel.close(IOException("close1")) }
58-
//
59-
// @Operation(runOnce = true)
60-
// fun close2() = lt.run("close2") { channel.close(IOException("close2")) }
57+
@Operation(runOnce = true)
58+
fun close1() = lt.run("close1") { channel.close(IOException("close1")) }
59+
60+
@Operation(runOnce = true)
61+
fun close2() = lt.run("close2") { channel.close(IOException("close2")) }
62+
63+
@Test
64+
fun testRendezvousChannelLinearizability() {
65+
runTest(0)
66+
}
6167

6268
@Test
63-
fun testLinearizability() {
69+
fun testArrayChannelLinearizability() {
70+
for (i in listOf(1, 2, 16)) {
71+
runTest(i)
72+
}
73+
}
74+
75+
@Test
76+
fun testConflatedChannelLinearizability() = runTest(Channel.CONFLATED)
77+
78+
@Test
79+
fun testUnlimitedChannelLinearizability() = runTest(Channel.UNLIMITED)
80+
81+
private fun runTest(capacity: Int) {
82+
this.capacity = capacity
6483
val options = StressOptions()
6584
.iterations(100)
6685
.invocationsPerIteration(1000 * stressTestMultiplier)
@@ -70,4 +89,4 @@ class ChannelLinearizabilityTest : TestBase() {
7089
.verifier(LinVerifier::class.java)
7190
LinChecker.check(ChannelLinearizabilityTest::class.java, options)
7291
}
73-
}
92+
}

0 commit comments

Comments
 (0)