Skip to content

Commit ea651aa

Browse files
qwwdfsadelizarov
authored andcommitted
Channels concurrency improvement:
Make close, cancel, isClosedForSend, isClosedForReceive and offer linearizable with other operations Fix bug when send operation can be stuck in channel forever New concurrency tests Fixes #359
1 parent ae66c95 commit ea651aa

File tree

10 files changed

+288
-82
lines changed

10 files changed

+288
-82
lines changed

common/kotlinx-coroutines-core-common/src/channels/AbstractChannel.kt

+51-18
Original file line numberDiff line numberDiff line change
@@ -71,13 +71,13 @@ public abstract class AbstractSendChannel<E> : SendChannel<E> {
7171
* Returns non-null closed token if it is last in the queue.
7272
* @suppress **This is unstable API and it is subject to change.**
7373
*/
74-
protected val closedForSend: Closed<*>? get() = queue.prevNode as? Closed<*>
74+
protected val closedForSend: Closed<*>? get() = (queue.prevNode as? Closed<*>)?.also { helpClose(it) }
7575

7676
/**
7777
* Returns non-null closed token if it is first in the queue.
7878
* @suppress **This is unstable API and it is subject to change.**
7979
*/
80-
protected val closedForReceive: Closed<*>? get() = queue.nextNode as? Closed<*>
80+
protected val closedForReceive: Closed<*>? get() = (queue.nextNode as? Closed<*>)?.also { helpClose(it) }
8181

8282
/**
8383
* Retrieves first sending waiter from the queue or returns closed token.
@@ -169,7 +169,9 @@ public abstract class AbstractSendChannel<E> : SendChannel<E> {
169169
val result = offerInternal(element)
170170
return when {
171171
result === OFFER_SUCCESS -> true
172-
result === OFFER_FAILED -> false
172+
// We should check for closed token on offer as well, otherwise offer won't be linearizable
173+
// in the face of concurrent close()
174+
result === OFFER_FAILED -> throw closedForSend?.sendException ?: return false
173175
result is Closed<*> -> throw result.sendException
174176
else -> error("offerInternal returned $result")
175177
}
@@ -231,23 +233,54 @@ public abstract class AbstractSendChannel<E> : SendChannel<E> {
231233

232234
public override fun close(cause: Throwable?): Boolean {
233235
val closed = Closed<E>(cause)
236+
237+
/*
238+
* Try to commit close by adding a close token to the end of the queue.
239+
* Successful -> we're now responsible for closing receivers
240+
* Not successful -> help closing pending receivers to maintain invariant
241+
* "if (!close()) next send will throw"
242+
*/
243+
val closeAdded = queue.addLastIfPrev(closed, { it !is Closed<*> })
244+
if (!closeAdded) {
245+
helpClose(queue.prevNode as Closed<*>)
246+
return false
247+
}
248+
249+
helpClose(closed)
250+
onClosed(closed)
251+
afterClose(cause)
252+
return true
253+
}
254+
255+
private fun helpClose(closed: Closed<*>) {
256+
/*
257+
* It's important to traverse list from right to left to avoid races with sender.
258+
* Consider channel state
259+
* head sentinel -> [receiver 1] -> [receiver 2] -> head sentinel
260+
* T1 invokes receive()
261+
* T2 invokes close()
262+
* T3 invokes close() + send(value)
263+
*
264+
* If both will traverse list from left to right, following non-linearizable history is possible:
265+
* [close -> false], [send -> transferred 'value' to receiver]
266+
*/
234267
while (true) {
235-
val receive = takeFirstReceiveOrPeekClosed()
236-
if (receive == null) {
237-
// queue empty or has only senders -- try add last "Closed" item to the queue
238-
if (queue.addLastIfPrev(closed, { prev ->
239-
if (prev is Closed<*>) return false // already closed
240-
prev !is ReceiveOrClosed<*> // only add close if no waiting receive
241-
})) {
242-
onClosed(closed)
243-
afterClose(cause)
244-
return true
245-
}
246-
continue // retry on failure
268+
val previous = closed.prevNode
269+
// Channel is empty or has no receivers
270+
if (previous is LockFreeLinkedListHead || previous !is Receive<*>) {
271+
break
272+
}
273+
274+
if (!previous.remove()) {
275+
// failed to remove the node (due to race) -- retry finding non-removed prevNode
276+
// NOTE: remove() DOES NOT help pending remove operation (that marked next pointer)
277+
previous.helpRemove() // make sure remove is complete before continuing
278+
continue
247279
}
248-
if (receive is Closed<*>) return false // already marked as closed -- nothing to do
249-
receive as Receive<E> // type assertion
250-
receive.resumeReceiveClosed(closed)
280+
281+
@Suppress("UNCHECKED_CAST")
282+
previous as Receive<E> // type assertion
283+
previous.resumeReceiveClosed(closed)
251284
}
252285
}
253286

common/kotlinx-coroutines-core-common/src/channels/ConflatedBroadcastChannel.kt

+2-1
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ public class ConflatedBroadcastChannel<E>() : BroadcastChannel<E> {
103103

104104
@Suppress("UNCHECKED_CAST")
105105
public override fun openSubscription(): ReceiveChannel<E> {
106-
val subscriber = Subscriber<E>(this)
106+
val subscriber = Subscriber(this)
107107
_state.loop { state ->
108108
when (state) {
109109
is Closed -> {
@@ -238,6 +238,7 @@ public class ConflatedBroadcastChannel<E>() : BroadcastChannel<E> {
238238
block.startCoroutineUndispatched(receiver = this, completion = select.completion)
239239
}
240240

241+
@Suppress("DEPRECATION")
241242
private class Subscriber<E>(
242243
private val broadcastChannel: ConflatedBroadcastChannel<E>
243244
) : ConflatedChannel<E>(), ReceiveChannel<E>, SubscriptionReceiveChannel<E> {

common/kotlinx-coroutines-core-common/src/internal/LockFreeLinkedList.common.kt

+7
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,13 @@ public expect open class LockFreeLinkedListNode() {
2424
): Boolean
2525

2626
public open fun remove(): Boolean
27+
28+
/**
29+
* Helps fully finish [remove] operation, must be invoked after [remove] if needed.
30+
* Ensures that traversing the list via prev pointers sees this node as removed.
31+
* No-op on JS
32+
*/
33+
public fun helpRemove()
2734
public fun removeFirstOrNull(): LockFreeLinkedListNode?
2835
public inline fun <reified T> removeFirstIfIsInstanceOfOrPeekIf(predicate: (T) -> Boolean): T?
2936
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
/*
2+
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.experimental.channels
6+
7+
import kotlinx.coroutines.experimental.*
8+
import kotlin.coroutines.experimental.*
9+
import kotlin.test.*
10+
11+
class BasicOperationsTest : TestBase() {
12+
13+
@Test
14+
fun testSimpleSendReceive() = runTest {
15+
// Parametrized common test :(
16+
TestChannelKind.values().forEach { kind -> testSendReceive(kind, 100) }
17+
}
18+
19+
@Test
20+
fun testOfferAfterClose() = runTest {
21+
TestChannelKind.values().forEach { kind -> testOffer(kind) }
22+
}
23+
24+
@Test
25+
fun testReceiveOrNullAfterClose() = runTest {
26+
TestChannelKind.values().forEach { kind -> testReceiveOrNull(kind) }
27+
}
28+
29+
@Test
30+
fun testReceiveOrNullAfterCloseWithException() = runTest {
31+
TestChannelKind.values().forEach { kind -> testReceiveOrNullException(kind) }
32+
}
33+
34+
private suspend fun testReceiveOrNull(kind: TestChannelKind) {
35+
val channel = kind.create()
36+
val d = async(coroutineContext) {
37+
channel.receive()
38+
}
39+
40+
yield()
41+
channel.close()
42+
assertTrue(channel.isClosedForReceive)
43+
44+
assertNull(channel.receiveOrNull())
45+
assertNull(channel.poll())
46+
47+
d.join()
48+
assertTrue(d.getCancellationException().cause is ClosedReceiveChannelException)
49+
}
50+
51+
private suspend fun testReceiveOrNullException(kind: TestChannelKind) {
52+
val channel = kind.create()
53+
val d = async(coroutineContext) {
54+
channel.receive()
55+
}
56+
57+
yield()
58+
channel.close(IndexOutOfBoundsException())
59+
assertTrue(channel.isClosedForReceive)
60+
61+
assertFailsWith<IndexOutOfBoundsException> { channel.poll() }
62+
try {
63+
channel.receiveOrNull()
64+
fail()
65+
} catch (e: IndexOutOfBoundsException) {
66+
// Expected
67+
}
68+
69+
d.join()
70+
assertTrue(d.getCancellationException().cause is IndexOutOfBoundsException)
71+
}
72+
73+
74+
private suspend fun testOffer(kind: TestChannelKind) {
75+
val channel = kind.create()
76+
val d = async(coroutineContext) { channel.send(42) }
77+
yield()
78+
channel.close()
79+
80+
assertTrue(channel.isClosedForSend)
81+
try {
82+
channel.offer(2)
83+
fail()
84+
} catch (e: ClosedSendChannelException) {
85+
if (!kind.isConflated) {
86+
assertEquals(42, channel.receive())
87+
}
88+
}
89+
90+
d.await()
91+
}
92+
93+
private suspend fun testSendReceive(kind: TestChannelKind, iterations: Int) {
94+
val channel = kind.create()
95+
96+
launch(coroutineContext) {
97+
repeat(iterations) { channel.send(it) }
98+
channel.close()
99+
}
100+
var expected = 0
101+
for (x in channel) {
102+
if (!kind.isConflated) {
103+
assertEquals(expected++, x)
104+
} else {
105+
assertTrue(x >= expected)
106+
expected = x + 1
107+
}
108+
}
109+
if (!kind.isConflated) {
110+
assertEquals(iterations, expected)
111+
}
112+
}
113+
}

common/kotlinx-coroutines-core-common/test/channels/SimpleSendReceiveTest.kt

-39
This file was deleted.

core/kotlinx-coroutines-core/src/internal/LockFreeLinkedList.kt

+10-4
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,7 @@
44

55
package kotlinx.coroutines.experimental.internal
66

7-
import kotlinx.atomicfu.atomic
8-
import kotlinx.atomicfu.loop
7+
import kotlinx.atomicfu.*
98

109
private typealias Node = LockFreeLinkedListNode
1110

@@ -84,8 +83,6 @@ public actual open class LockFreeLinkedListNode {
8483
override fun prepare(affected: Node): Any? = if (condition()) null else CONDITION_FALSE
8584
}
8685

87-
public val isFresh: Boolean get() = _next.value === this
88-
8986
public actual val isRemoved: Boolean get() = next is Removed
9087

9188
// LINEARIZABLE. Returns Node | Removed
@@ -231,6 +228,10 @@ public actual open class LockFreeLinkedListNode {
231228
/**
232229
* Removes this node from the list. Returns `true` when removed successfully, or `false` if the node was already
233230
* removed or if it was not added to any list in the first place.
231+
*
232+
* **Note**: Invocation of this operation does not guarantee that remove was actually complete if result was `false`.
233+
* In particular, invoking [nextNode].[prevNode] might still return this node even though it is "already removed".
234+
* Invoke [helpRemove] to make sure that remove was completed.
234235
*/
235236
public actual open fun remove(): Boolean {
236237
while (true) { // lock-free loop on next
@@ -246,6 +247,11 @@ public actual open class LockFreeLinkedListNode {
246247
}
247248
}
248249

250+
public actual fun helpRemove() {
251+
val removed = this.next as? Removed ?: error("Must be invoked on a removed node")
252+
finishRemove(removed.ref)
253+
}
254+
249255
public open fun describeRemove() : AtomicDesc? {
250256
if (isRemoved) return null // fast path if was already removed
251257
return object : AbstractAtomicDesc() {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.experimental.channels
6+
7+
import com.devexperts.dxlab.lincheck.*
8+
import com.devexperts.dxlab.lincheck.annotations.*
9+
import com.devexperts.dxlab.lincheck.paramgen.*
10+
import com.devexperts.dxlab.lincheck.stress.*
11+
import kotlinx.coroutines.experimental.*
12+
import org.junit.*
13+
import java.io.*
14+
15+
@Param(name = "value", gen = IntGen::class, conf = "1:3")
16+
class ChannelIsClosedLinearizabilityTest : TestBase() {
17+
18+
private val lt = LinTesting()
19+
private lateinit var channel: Channel<Int>
20+
21+
@Reset
22+
fun reset() {
23+
channel = Channel()
24+
}
25+
26+
@Operation(runOnce = true)
27+
fun send1(@Param(name = "value") value: Int) = lt.run("send1") { channel.send(value) }
28+
29+
@Operation(runOnce = true)
30+
fun send2(@Param(name = "value") value: Int) = lt.run("send2") { channel.send(value) }
31+
32+
@Operation(runOnce = true)
33+
fun receive1() = lt.run("receive1") { channel.receive() }
34+
35+
@Operation(runOnce = true)
36+
fun receive2() = lt.run("receive2") { channel.receive() }
37+
38+
@Operation(runOnce = true)
39+
fun close1() = lt.run("close1") { channel.close(IOException("close1")) }
40+
41+
@Operation(runOnce = true)
42+
fun isClosedForReceive() = lt.run("isClosedForReceive") { channel.isClosedForReceive }
43+
44+
@Operation(runOnce = true)
45+
fun isClosedForSend() = lt.run("isClosedForSend") { channel.isClosedForSend }
46+
47+
@Test
48+
fun testLinearizability() {
49+
val options = StressOptions()
50+
.iterations(100)
51+
.invocationsPerIteration(1000 * stressTestMultiplier)
52+
.addThread(1, 3)
53+
.addThread(1, 3)
54+
.addThread(1, 3)
55+
.verifier(LinVerifier::class.java)
56+
LinChecker.check(ChannelIsClosedLinearizabilityTest::class.java, options)
57+
}
58+
}

0 commit comments

Comments
 (0)