Skip to content

Commit 5a3f940

Browse files
committed
Fixed linearizability of Channel.close operation
Send operations must ALWAYS help close the channel when they observe that it was closed before throwing an exception. Fixes #1419
1 parent a70022d commit 5a3f940

File tree

3 files changed

+113
-31
lines changed

3 files changed

+113
-31
lines changed

kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt

+29-28
Original file line numberDiff line numberDiff line change
@@ -160,14 +160,24 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
160160
val result = offerInternal(element)
161161
return when {
162162
result === OFFER_SUCCESS -> true
163-
// We should check for closed token on offer as well, otherwise offer won't be linearizable
164-
// in the face of concurrent close()
165-
result === OFFER_FAILED -> throw closedForSend?.sendException?.let { recoverStackTrace(it) } ?: return false
166-
result is Closed<*> -> throw recoverStackTrace(result.sendException)
163+
result === OFFER_FAILED -> {
164+
// We should check for closed token on offer as well, otherwise offer won't be linearizable
165+
// in the face of concurrent close()
166+
// See https://github.com/Kotlin/kotlinx.coroutines/issues/359
167+
throw recoverStackTrace(helpCloseAndGetSendException(closedForSend ?: return false))
168+
}
169+
result is Closed<*> -> throw recoverStackTrace(helpCloseAndGetSendException(result))
167170
else -> error("offerInternal returned $result")
168171
}
169172
}
170173

174+
private fun helpCloseAndGetSendException(closed: Closed<*>): Throwable {
175+
// To ensure linearizablity we must ALWAYS help close the channel when we observe that it was closed
176+
// See https://github.com/Kotlin/kotlinx.coroutines/issues/1419
177+
helpClose(closed)
178+
return closed.sendException
179+
}
180+
171181
private suspend fun sendSuspend(element: E): Unit = suspendAtomicCancellableCoroutine sc@ { cont ->
172182
loop@ while (true) {
173183
if (full) {
@@ -179,8 +189,7 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
179189
return@sc
180190
}
181191
enqueueResult is Closed<*> -> {
182-
helpClose(enqueueResult)
183-
cont.resumeWithException(enqueueResult.sendException)
192+
cont.helpCloseAndResumeWithSendException(enqueueResult)
184193
return@sc
185194
}
186195
enqueueResult === ENQUEUE_FAILED -> {} // try to offer instead
@@ -197,15 +206,19 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
197206
}
198207
offerResult === OFFER_FAILED -> continue@loop
199208
offerResult is Closed<*> -> {
200-
helpClose(offerResult)
201-
cont.resumeWithException(offerResult.sendException)
209+
cont.helpCloseAndResumeWithSendException(offerResult)
202210
return@sc
203211
}
204212
else -> error("offerInternal returned $offerResult")
205213
}
206214
}
207215
}
208216

217+
private fun Continuation<*>.helpCloseAndResumeWithSendException(closed: Closed<*>) {
218+
helpClose(closed)
219+
resumeWithException(closed.sendException)
220+
}
221+
209222
/**
210223
* Result is:
211224
* * null -- successfully enqueued
@@ -230,23 +243,17 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
230243

231244
public override fun close(cause: Throwable?): Boolean {
232245
val closed = Closed<E>(cause)
233-
234246
/*
235247
* Try to commit close by adding a close token to the end of the queue.
236248
* Successful -> we're now responsible for closing receivers
237249
* Not successful -> help closing pending receivers to maintain invariant
238250
* "if (!close()) next send will throw"
239251
*/
240-
val closeAdded = queue.addLastIfPrev(closed, { it !is Closed<*> })
241-
if (!closeAdded) {
242-
val actualClosed = queue.prevNode as Closed<*>
243-
helpClose(actualClosed)
244-
return false
245-
}
246-
247-
helpClose(closed)
248-
invokeOnCloseHandler(cause)
249-
return true
252+
val closeAdded = queue.addLastIfPrev(closed) { it !is Closed<*> }
253+
val actuallyClosed = if (closeAdded) closed else queue.prevNode as Closed<*>
254+
helpClose(actuallyClosed)
255+
if (closeAdded) invokeOnCloseHandler(cause)
256+
return closeAdded // true if we have closed
250257
}
251258

252259
private fun invokeOnCloseHandler(cause: Throwable?) {
@@ -370,10 +377,7 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
370377
select.disposeOnSelect(node)
371378
return
372379
}
373-
enqueueResult is Closed<*> -> {
374-
helpClose(enqueueResult)
375-
throw recoverStackTrace(enqueueResult.sendException)
376-
}
380+
enqueueResult is Closed<*> -> throw recoverStackTrace(helpCloseAndGetSendException(enqueueResult))
377381
enqueueResult === ENQUEUE_FAILED -> {} // try to offer
378382
enqueueResult is Receive<*> -> {} // try to offer
379383
else -> error("enqueueSend returned $enqueueResult ")
@@ -388,10 +392,7 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
388392
block.startCoroutineUnintercepted(receiver = this, completion = select.completion)
389393
return
390394
}
391-
offerResult is Closed<*> -> {
392-
helpClose(offerResult)
393-
throw recoverStackTrace(offerResult.sendException)
394-
}
395+
offerResult is Closed<*> -> throw recoverStackTrace(helpCloseAndGetSendException(offerResult))
395396
else -> error("offerSelectInternal returned $offerResult")
396397
}
397398
}
@@ -432,7 +433,7 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
432433

433434
private class SendSelect<E, R>(
434435
override val pollResult: Any?,
435-
@JvmField val channel: SendChannel<E>,
436+
@JvmField val channel: AbstractSendChannel<E>,
436437
@JvmField val select: SelectInstance<R>,
437438
@JvmField val block: suspend (SendChannel<E>) -> R
438439
) : Send(), DisposableHandle {

kotlinx-coroutines-core/common/src/channels/LinkedListChannel.kt

+2-3
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
2+
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
33
*/
44

55
package kotlinx.coroutines.channels
@@ -29,8 +29,7 @@ internal open class LinkedListChannel<E> : AbstractChannel<E>() {
2929
when {
3030
result === OFFER_SUCCESS -> return OFFER_SUCCESS
3131
result === OFFER_FAILED -> { // try to buffer
32-
val sendResult = sendBuffered(element)
33-
when (sendResult) {
32+
when (val sendResult = sendBuffered(element)) {
3433
null -> return OFFER_SUCCESS
3534
is Closed<*> -> return sendResult
3635
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
/*
2+
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
@file:Suppress("unused")
5+
6+
package kotlinx.coroutines.linearizability
7+
8+
import com.devexperts.dxlab.lincheck.*
9+
import com.devexperts.dxlab.lincheck.annotations.*
10+
import com.devexperts.dxlab.lincheck.paramgen.*
11+
import com.devexperts.dxlab.lincheck.strategy.stress.*
12+
import kotlinx.coroutines.*
13+
import kotlinx.coroutines.channels.*
14+
import org.junit.*
15+
import java.io.*
16+
17+
/**
18+
* This is stress test that is fine-tuned to catch the problem
19+
* [#1419](https://github.com/Kotlin/kotlinx.coroutines/issues/1419)
20+
*/
21+
@Param(name = "value", gen = IntGen::class, conf = "2:2")
22+
@OpGroupConfig.OpGroupConfigs(
23+
OpGroupConfig(name = "send", nonParallel = true),
24+
OpGroupConfig(name = "receive", nonParallel = true),
25+
OpGroupConfig(name = "close", nonParallel = true)
26+
)
27+
class ChannelCloseLCStressTest : TestBase() {
28+
29+
private companion object {
30+
// Emulating ctor argument for lincheck
31+
var capacity = 0
32+
}
33+
34+
private val lt = LinTesting()
35+
private var channel: Channel<Int> = Channel(capacity)
36+
37+
@Operation(runOnce = true, group = "send")
38+
fun send1(@Param(name = "value") value: Int) = lt.run("send1") { channel.send(value) }
39+
40+
@Operation(runOnce = true, group = "send")
41+
fun send2(@Param(name = "value") value: Int) = lt.run("send2") { channel.send(value) }
42+
43+
@Operation(runOnce = true, group = "receive")
44+
fun receive1() = lt.run("receive1") { channel.receive() }
45+
46+
@Operation(runOnce = true, group = "receive")
47+
fun receive2() = lt.run("receive2") { channel.receive() }
48+
49+
@Operation(runOnce = true, group = "close")
50+
fun close1() = lt.run("close1") { channel.close(IOException("close1")) }
51+
52+
@Operation(runOnce = true, group = "close")
53+
fun close2() = lt.run("close2") { channel.close(IOException("close2")) }
54+
55+
@Test
56+
fun testRendezvousChannelLinearizability() {
57+
runTest(0)
58+
}
59+
60+
@Test
61+
fun testArrayChannelLinearizability() {
62+
for (i in listOf(1, 2, 16)) {
63+
runTest(i)
64+
}
65+
}
66+
67+
@Test
68+
fun testConflatedChannelLinearizability() = runTest(Channel.CONFLATED)
69+
70+
@Test
71+
fun testUnlimitedChannelLinearizability() = runTest(Channel.UNLIMITED)
72+
73+
private fun runTest(capacity: Int) {
74+
ChannelCloseLCStressTest.capacity = capacity
75+
val options = StressOptions()
76+
.iterations(1) // only one iteration -- test scenario is fixed
77+
.invocationsPerIteration(10_000 * stressTestMultiplierSqrt)
78+
.threads(3)
79+
.verifier(LinVerifier::class.java)
80+
LinChecker.check(ChannelCloseLCStressTest::class.java, options)
81+
}
82+
}

0 commit comments

Comments
 (0)