Skip to content

Commit 5f2bbc8

Browse files
committed
Properly cancel ChannelCoroutine when the channel was closed or cancelled
Fixes #2506
1 parent d37b834 commit 5f2bbc8

File tree

5 files changed

+51
-7
lines changed

5 files changed

+51
-7
lines changed

kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt

+8-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
2+
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
33
*/
44

55
package kotlinx.coroutines.channels
@@ -642,7 +642,13 @@ internal abstract class AbstractChannel<E>(
642642
cancelInternal(cause)
643643

644644
final override fun cancel(cause: CancellationException?) {
645-
if (isClosedForReceive) return // Do not create an exception if channel is already cancelled
645+
/*
646+
* Do not create an exception if channel is already cancelled.
647+
* Channel is closed for receive when either it is cancelled (then we are free to bail out)
648+
* or was closed and elements were received.
649+
* Then `onCancelIdempotent` does nothing for all implementations.
650+
*/
651+
if (isClosedForReceive) return
646652
cancelInternal(cause ?: CancellationException("$classSimpleName was cancelled"))
647653
}
648654

kotlinx-coroutines-core/common/src/channels/ChannelCoroutine.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
2+
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
33
*/
44

55
package kotlinx.coroutines.channels
@@ -26,7 +26,7 @@ internal open class ChannelCoroutine<E>(
2626
}
2727

2828
final override fun cancel(cause: CancellationException?) {
29-
if (isClosedForReceive) return // Do not create an exception if channel is already cancelled
29+
if (isCancelled) return // Do not create an exception if the coroutine (-> the channel) is already cancelled
3030
cancelInternal(cause ?: defaultCancellationException())
3131
}
3232

kotlinx-coroutines-core/common/test/channels/ChannelUndeliveredElementTest.kt

+5-1
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
/*
2+
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
15
package kotlinx.coroutines.channels
26

37
import kotlinx.atomicfu.*
@@ -115,4 +119,4 @@ class ChannelUndeliveredElementTest : TestBase() {
115119
check(!_cancelled.getAndSet(true)) { "Already cancelled" }
116120
}
117121
}
118-
}
122+
}

kotlinx-coroutines-core/common/test/channels/ProduceTest.kt

+22-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
2+
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
33
*/
44

55
package kotlinx.coroutines.channels
@@ -95,6 +95,27 @@ class ProduceTest : TestBase() {
9595
cancelOnCompletion(coroutineContext)
9696
}
9797

98+
@Test
99+
fun testCancelWhenTheChannelIsClosed() = runTest {
100+
val channel = produce<Int> {
101+
send(1)
102+
close()
103+
expect(2)
104+
launch {
105+
expect(3)
106+
hang { expect(5) }
107+
}
108+
}
109+
110+
expect(1)
111+
channel.receive()
112+
yield()
113+
expect(4)
114+
channel.cancel()
115+
(channel as Job).join()
116+
finish(6)
117+
}
118+
98119
@Test
99120
fun testAwaitConsumerCancellation() = runTest {
100121
val parent = Job()

kotlinx-coroutines-core/common/test/flow/channels/ChannelFlowTest.kt

+14-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
2+
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
33
*/
44

55
package kotlinx.coroutines.flow
@@ -194,4 +194,17 @@ class ChannelFlowTest : TestBase() {
194194
assertEquals(listOf(1), flow.toList())
195195
finish(3)
196196
}
197+
198+
@Test
199+
fun testCancelledOnCompletion() = runTest {
200+
val myFlow = callbackFlow<Any> {
201+
expect(2)
202+
close()
203+
hang { expect(3) }
204+
}
205+
206+
expect(1)
207+
myFlow.collect()
208+
finish(4)
209+
}
197210
}

0 commit comments

Comments
 (0)