Skip to content

Commit 82aaaff

Browse files
committed
Fixed unlimited channel select onSend on closed channel
It was hanging not being able to properly see that the channel was already closed at that send attempt should fail.
1 parent d89c41e commit 82aaaff

File tree

3 files changed

+64
-11
lines changed

3 files changed

+64
-11
lines changed

kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt

+12-11
Original file line numberDiff line numberDiff line change
@@ -112,9 +112,10 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
112112
queue: LockFreeLinkedListHead,
113113
element: E
114114
) : AddLastDesc<SendBuffered<E>>(queue, SendBuffered(element)) {
115-
override fun failure(affected: LockFreeLinkedListNode, next: Any): Any? {
116-
if (affected is ReceiveOrClosed<*>) return OFFER_FAILED
117-
return null
115+
override fun failure(affected: LockFreeLinkedListNode, next: Any): Any? = when (affected) {
116+
is Closed<*> -> affected
117+
is ReceiveOrClosed<*> -> OFFER_FAILED
118+
else -> null
118119
}
119120
}
120121

@@ -332,10 +333,10 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
332333
) : RemoveFirstDesc<ReceiveOrClosed<E>>(queue) {
333334
@JvmField var resumeToken: Any? = null
334335

335-
override fun failure(affected: LockFreeLinkedListNode, next: Any): Any? {
336-
if (affected !is ReceiveOrClosed<*>) return OFFER_FAILED
337-
if (affected is Closed<*>) return affected
338-
return null
336+
override fun failure(affected: LockFreeLinkedListNode, next: Any): Any? = when (affected) {
337+
is Closed<*> -> affected
338+
!is ReceiveOrClosed<*> -> OFFER_FAILED
339+
else -> null
339340
}
340341

341342
override fun validatePrepared(node: ReceiveOrClosed<E>): Boolean {
@@ -633,10 +634,10 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
633634
@JvmField var resumeToken: Any? = null
634635
@JvmField var pollResult: E? = null
635636

636-
override fun failure(affected: LockFreeLinkedListNode, next: Any): Any? {
637-
if (affected is Closed<*>) return affected
638-
if (affected !is Send) return POLL_FAILED
639-
return null
637+
override fun failure(affected: LockFreeLinkedListNode, next: Any): Any? = when (affected) {
638+
is Closed<*> -> affected
639+
!is Send -> POLL_FAILED
640+
else -> null
640641
}
641642

642643
@Suppress("UNCHECKED_CAST")
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.selects
6+
7+
import kotlinx.coroutines.*
8+
import kotlinx.coroutines.channels.*
9+
import kotlin.test.*
10+
11+
class SelectLinkedListChannelTest : TestBase() {
12+
@Test
13+
fun testSelectSendWhenClosed() = runTest {
14+
expect(1)
15+
val c = Channel<Int>(Channel.UNLIMITED)
16+
c.send(1) // enqueue buffered element
17+
c.close() // then close
18+
assertFailsWith<ClosedSendChannelException> {
19+
// select sender should fail
20+
expect(2)
21+
select {
22+
c.onSend(2) {
23+
expectUnreached()
24+
}
25+
}
26+
}
27+
finish(3)
28+
}
29+
}

kotlinx-coroutines-core/common/test/selects/SelectRendezvousChannelTest.kt

+23
Original file line numberDiff line numberDiff line change
@@ -403,6 +403,29 @@ class SelectRendezvousChannelTest : TestBase() {
403403
finish(10)
404404
}
405405

406+
@Test
407+
fun testSelectSendWhenClosed() = runTest {
408+
expect(1)
409+
val c = Channel<Int>(Channel.RENDEZVOUS)
410+
val sender = launch(start = CoroutineStart.UNDISPATCHED) {
411+
expect(2)
412+
c.send(1) // enqueue sender
413+
expectUnreached()
414+
}
415+
c.close() // then close
416+
assertFailsWith<ClosedSendChannelException> {
417+
// select sender should fail
418+
expect(3)
419+
select {
420+
c.onSend(2) {
421+
expectUnreached()
422+
}
423+
}
424+
}
425+
sender.cancel()
426+
finish(4)
427+
}
428+
406429
// only for debugging
407430
internal fun <R> SelectBuilder<R>.default(block: suspend () -> R) {
408431
this as SelectBuilderImpl // type assertion

0 commit comments

Comments
 (0)