Skip to content

Commit 59ee2a5

Browse files
committed
Fix exception types for channels to ensure transparency & reporting
* ReceiveChannel.cancel always closes channel with CancellationException, so sending or receiving from a cancelled channel produces the corresponding CancellationException. * Cancelling produce builder has similar effect, but an more specific instance of JobCancellationException is created. * This ensure that produce/consumeEach pair is transparent with respect to cancellation and can be used to build "identity" transformation of the flow (the corresponding test is added). * ClosedSendChannelException is now a subclass of IllegalStateException, so that trying to send to a channel that was closed normally is reported as program error and is not eaten (test is added). Fixes #957 Fixes #1128
1 parent f8eac76 commit 59ee2a5

16 files changed

+159
-50
lines changed

kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -669,7 +669,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
669669
cancelInternal(cause)
670670

671671
final override fun cancel(cause: CancellationException?) {
672-
cancelInternal(cause)
672+
cancelInternal(cause ?: CancellationException("$classSimpleName was cancelled"))
673673
}
674674

675675
// It needs to be internal to support deprecated cancel(Throwable?) API

kotlinx-coroutines-core/common/src/channels/Channel.kt

+12-6
Original file line numberDiff line numberDiff line change
@@ -242,14 +242,16 @@ public interface ReceiveChannel<out E> {
242242
/**
243243
* Cancels reception of remaining elements from this channel with an optional [cause].
244244
* This function closes the channel and removes all buffered sent elements from it.
245+
*
245246
* A cause can be used to specify an error message or to provide other details on
246247
* a cancellation reason for debugging purposes.
248+
* If cause is not specified, then an instance of [CancellationException] with a
249+
* default message is create to [close][SendChannel.close] the channel.
247250
*
248251
* Immediately after invocation of this function [isClosedForReceive] and
249252
* [isClosedForSend][SendChannel.isClosedForSend]
250-
* on the side of [SendChannel] start returning `true`, so all attempts to send to this channel
251-
* afterwards will throw [ClosedSendChannelException], while attempts to receive will throw
252-
* [ClosedReceiveChannelException].
253+
* on the side of [SendChannel] start returning `true`. All attempts to send to this channel
254+
* or receive from this channel will throw [CancellationException].
253255
*/
254256
public fun cancel(cause: CancellationException? = null)
255257

@@ -382,14 +384,18 @@ public fun <E> Channel(capacity: Int = RENDEZVOUS): Channel<E> =
382384
* Indicates attempt to [send][SendChannel.send] on [isClosedForSend][SendChannel.isClosedForSend] channel
383385
* that was closed without a cause. A _failed_ channel rethrows the original [close][SendChannel.close] cause
384386
* exception on send attempts.
387+
*
388+
* This exception is a subclass of [IllegalStateException] because, conceptually, sender is responsible
389+
* for closing the channel and not be trying to send anything after the channel was close. Attempts to
390+
* send into the closed channel indicate logical error in the sender's code.
385391
*/
386-
public class ClosedSendChannelException(message: String?) : CancellationException(message)
392+
public class ClosedSendChannelException(message: String?) : IllegalStateException(message)
387393

388394
/**
389395
* Indicates attempt to [receive][ReceiveChannel.receive] on [isClosedForReceive][ReceiveChannel.isClosedForReceive]
390396
* channel that was closed without a cause. A _failed_ channel rethrows the original [close][SendChannel.close] cause
391397
* exception on receive attempts.
392398
*
393-
* This exception is subclass of [NoSuchElementException] to be consistent with plain collections.
399+
* This exception is a subclass of [NoSuchElementException] to be consistent with plain collections.
394400
*/
395-
public class ClosedReceiveChannelException(message: String?) : NoSuchElementException(message)
401+
public class ClosedReceiveChannelException(message: String?) : NoSuchElementException(message)

kotlinx-coroutines-core/common/src/channels/ChannelCoroutine.kt

+9-2
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,15 @@ internal open class ChannelCoroutine<E>(
2828
}
2929

3030
override fun cancelInternal(cause: Throwable?): Boolean {
31-
_channel.cancel(cause?.toCancellationException()) // cancel the channel
32-
cancelCoroutine(cause) // cancel the job
31+
val exception = cause?.toCancellationException()
32+
?: JobCancellationException("Job was cancelled", null, this)
33+
_channel.cancel(exception) // cancel the channel
34+
cancelCoroutine(exception) // cancel the job
3335
return true // does not matter - result is used in DEPRECATED functions only
3436
}
37+
38+
@Suppress("UNCHECKED_CAST")
39+
suspend fun sendFair(element: E) {
40+
(_channel as AbstractSendChannel<E>).sendFair(element)
41+
}
3542
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
/*
2+
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.flow.internal
6+
7+
import kotlinx.coroutines.*
8+
9+
/**
10+
* This exception is thrown when operator need no more elements from the flow.
11+
* This exception should never escape outside of operator's implementation.
12+
*/
13+
internal class AbortFlowException : CancellationException("Flow was aborted, no more elements needed") {
14+
// TODO expect/actual
15+
// override fun fillInStackTrace(): Throwable = this
16+
}

kotlinx-coroutines-core/common/src/flow/operators/Limit.kt

+6-10
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,10 @@
88

99
package kotlinx.coroutines.flow
1010

11-
import kotlinx.coroutines.flow.unsafeFlow as flow
1211
import kotlinx.coroutines.*
12+
import kotlinx.coroutines.flow.internal.*
1313
import kotlin.jvm.*
14+
import kotlinx.coroutines.flow.unsafeFlow as flow
1415

1516
/**
1617
* Returns a flow that ignores first [count] elements.
@@ -57,10 +58,10 @@ public fun <T> Flow<T>.take(count: Int): Flow<T> {
5758
collect { value ->
5859
emit(value)
5960
if (++consumed == count) {
60-
throw TakeLimitException()
61+
throw AbortFlowException()
6162
}
6263
}
63-
} catch (e: TakeLimitException) {
64+
} catch (e: AbortFlowException) {
6465
// Nothing, bail out
6566
}
6667
}
@@ -74,14 +75,9 @@ public fun <T> Flow<T>.takeWhile(predicate: suspend (T) -> Boolean): Flow<T> = f
7475
try {
7576
collect { value ->
7677
if (predicate(value)) emit(value)
77-
else throw TakeLimitException()
78+
else throw AbortFlowException()
7879
}
79-
} catch (e: TakeLimitException) {
80+
} catch (e: AbortFlowException) {
8081
// Nothing, bail out
8182
}
8283
}
83-
84-
private class TakeLimitException : CancellationException("Flow limit is reached, cancelling") {
85-
// TODO expect/actual
86-
// override fun fillInStackTrace(): Throwable = this
87-
}

kotlinx-coroutines-core/common/src/flow/operators/Zip.kt

+11-14
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ public fun <T1, T2, R> Flow<T1>.combineLatest(other: Flow<T2>, transform: suspen
7878

7979
private inline fun SelectBuilder<Unit>.onReceive(
8080
isClosed: Boolean,
81-
channel: Channel<Any>,
81+
channel: ReceiveChannel<Any>,
8282
crossinline onClosed: () -> Unit,
8383
noinline onReceive: suspend (value: Any) -> Unit
8484
) {
@@ -90,18 +90,11 @@ private inline fun SelectBuilder<Unit>.onReceive(
9090
}
9191

9292
// Channel has any type due to onReceiveOrNull. This will be fixed after receiveOrClosed
93-
private fun CoroutineScope.asFairChannel(flow: Flow<*>): Channel<Any> {
94-
val channel = RendezvousChannel<Any>() // Explicit type
95-
launch {
96-
try {
97-
flow.collect { value ->
98-
channel.sendFair(value ?: NullSurrogate)
99-
}
100-
} finally {
101-
channel.close()
102-
}
93+
private fun CoroutineScope.asFairChannel(flow: Flow<*>): ReceiveChannel<Any> = produce {
94+
val channel = channel as ChannelCoroutine<Any>
95+
flow.collect { value ->
96+
channel.sendFair(value ?: NullSurrogate)
10397
}
104-
return channel
10598
}
10699

107100

@@ -133,7 +126,9 @@ public fun <T1, T2, R> Flow<T1>.zip(other: Flow<T2>, transform: suspend (T1, T2)
133126
*
134127
* Invariant: this clause is invoked only when all elements from the channel were processed (=> rendezvous restriction).
135128
*/
136-
(second as SendChannel<*>).invokeOnClose { first.cancel() }
129+
(second as SendChannel<*>).invokeOnClose {
130+
first.cancel(AbortFlowException())
131+
}
137132

138133
val otherIterator = second.iterator()
139134
try {
@@ -144,8 +139,10 @@ public fun <T1, T2, R> Flow<T1>.zip(other: Flow<T2>, transform: suspend (T1, T2)
144139
val secondValue = NullSurrogate.unbox<T2>(otherIterator.next())
145140
emit(transform(NullSurrogate.unbox(value), NullSurrogate.unbox(secondValue)))
146141
}
142+
} catch (e: AbortFlowException) {
143+
// complete
147144
} finally {
148-
second.cancel()
145+
second.cancel(AbortFlowException())
149146
}
150147
}
151148
}

kotlinx-coroutines-core/common/test/channels/ArrayBroadcastChannelTest.kt

+8-6
Original file line numberDiff line numberDiff line change
@@ -170,23 +170,25 @@ class ArrayBroadcastChannelTest : TestBase() {
170170
// start consuming
171171
val sub = channel.openSubscription()
172172
var expected = 0
173-
sub.consumeEach {
174-
check(it == ++expected)
175-
if (it == 2) {
176-
sub.cancel()
173+
assertFailsWith<CancellationException> {
174+
sub.consumeEach {
175+
check(it == ++expected)
176+
if (it == 2) {
177+
sub.cancel()
178+
}
177179
}
178180
}
179181
check(expected == 2)
180182
}
181183

182184
@Test
183-
fun testReceiveFromClosedSub() = runTest({ it is ClosedReceiveChannelException }) {
185+
fun testReceiveFromCancelledSub() = runTest {
184186
val channel = BroadcastChannel<Int>(1)
185187
val sub = channel.openSubscription()
186188
assertFalse(sub.isClosedForReceive)
187189
sub.cancel()
188190
assertTrue(sub.isClosedForReceive)
189-
sub.receive()
191+
assertFailsWith<CancellationException> { sub.receive() }
190192
}
191193

192194
@Test

kotlinx-coroutines-core/common/test/channels/ArrayChannelTest.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ class ArrayChannelTest : TestBase() {
134134
q.cancel()
135135
check(q.isClosedForSend)
136136
check(q.isClosedForReceive)
137-
check(q.receiveOrNull() == null)
137+
assertFailsWith<CancellationException> { q.receiveOrNull() }
138138
finish(12)
139139
}
140140

kotlinx-coroutines-core/common/test/channels/BasicOperationsTest.kt

+22-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import kotlinx.coroutines.*
88
import kotlin.test.*
99

1010
class BasicOperationsTest : TestBase() {
11-
1211
@Test
1312
fun testSimpleSendReceive() = runTest {
1413
// Parametrized common test :(
@@ -20,6 +19,11 @@ class BasicOperationsTest : TestBase() {
2019
TestChannelKind.values().forEach { kind -> testOffer(kind) }
2120
}
2221

22+
@Test
23+
fun testSendAfterClose() = runTest {
24+
TestChannelKind.values().forEach { kind -> testSendAfterClose(kind) }
25+
}
26+
2327
@Test
2428
fun testReceiveOrNullAfterClose() = runTest {
2529
TestChannelKind.values().forEach { kind -> testReceiveOrNull(kind) }
@@ -128,6 +132,23 @@ class BasicOperationsTest : TestBase() {
128132
d.await()
129133
}
130134

135+
/**
136+
* [ClosedSendChannelException] should not be eaten.
137+
* See [https://github.com/Kotlin/kotlinx.coroutines/issues/957]
138+
*/
139+
private suspend fun testSendAfterClose(kind: TestChannelKind) {
140+
assertFailsWith<ClosedSendChannelException> {
141+
coroutineScope {
142+
val channel = kind.create()
143+
channel.close()
144+
145+
launch {
146+
channel.send(1)
147+
}.join()
148+
}
149+
}
150+
}
151+
131152
private suspend fun testSendReceive(kind: TestChannelKind, iterations: Int) = coroutineScope {
132153
val channel = kind.create()
133154
launch {

kotlinx-coroutines-core/common/test/channels/ConflatedChannelTest.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ class ConflatedChannelTest : TestBase() {
7373
q.cancel()
7474
check(q.isClosedForSend)
7575
check(q.isClosedForReceive)
76-
check(q.receiveOrNull() == null)
76+
assertFailsWith<CancellationException> { q.receiveOrNull() }
7777
finish(2)
7878
}
7979

kotlinx-coroutines-core/common/test/channels/LinkedListChannelTest.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ class LinkedListChannelTest : TestBase() {
3131
q.cancel()
3232
check(q.isClosedForSend)
3333
check(q.isClosedForReceive)
34-
check(q.receiveOrNull() == null)
34+
assertFailsWith<CancellationException> { q.receiveOrNull() }
3535
}
3636

3737
@Test

kotlinx-coroutines-core/common/test/channels/ProduceTest.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ class ProduceTest : TestBase() {
3838
expectUnreached()
3939
} catch (e: Throwable) {
4040
expect(7)
41-
check(e is ClosedSendChannelException)
41+
check(e is CancellationException)
4242
throw e
4343
}
4444
expectUnreached()
@@ -48,7 +48,7 @@ class ProduceTest : TestBase() {
4848
expect(4)
4949
c.cancel()
5050
expect(5)
51-
assertNull(c.receiveOrNull())
51+
assertFailsWith<CancellationException> { c.receiveOrNull() }
5252
expect(6)
5353
yield() // to produce
5454
finish(8)

kotlinx-coroutines-core/common/test/channels/RendezvousChannelTest.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -272,7 +272,7 @@ class RendezvousChannelTest : TestBase() {
272272
q.cancel()
273273
check(q.isClosedForSend)
274274
check(q.isClosedForReceive)
275-
check(q.receiveOrNull() == null)
275+
assertFailsWith<CancellationException> { q.receiveOrNull() }
276276
finish(12)
277277
}
278278

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
@file:Suppress("NAMED_ARGUMENTS_NOT_ALLOWED") // KT-21913
6+
7+
package kotlinx.coroutines.flow
8+
9+
import kotlinx.coroutines.*
10+
import kotlinx.coroutines.channels.*
11+
import kotlin.test.*
12+
13+
// See https://github.com/Kotlin/kotlinx.coroutines/issues/1128
14+
class IdFlowTest : TestBase() {
15+
@Test
16+
fun testCancelInCollect() = runTest(
17+
expected = { it is CancellationException }
18+
) {
19+
expect(1)
20+
flow {
21+
expect(2)
22+
emit(1)
23+
expect(3)
24+
delay(Long.MAX_VALUE)
25+
expectUnreached()
26+
}.idScoped().collect { value ->
27+
expect(4)
28+
assertEquals(1, value)
29+
kotlin.coroutines.coroutineContext.cancel()
30+
finish(5)
31+
}
32+
expectUnreached()
33+
}
34+
35+
@Test
36+
fun testCancelInFlow() = runTest(
37+
expected = { it is CancellationException }
38+
) {
39+
expect(1)
40+
flow {
41+
expect(2)
42+
emit(1)
43+
kotlin.coroutines.coroutineContext.cancel()
44+
expect(3)
45+
}.idScoped().collect { value ->
46+
finish(4)
47+
assertEquals(1, value)
48+
}
49+
expectUnreached()
50+
}
51+
}
52+
53+
/**
54+
* This flow should be "identity" function with respect to cancellation.
55+
*/
56+
private fun <T> Flow<T>.idScoped(): Flow<T> = flow {
57+
coroutineScope {
58+
val channel = produce<T> {
59+
collect { send(it) }
60+
}
61+
channel.consumeEach {
62+
emit(it)
63+
}
64+
}
65+
}

kotlinx-coroutines-core/jvm/test/channels/TickerChannelCommonTest.kt

+1-2
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,7 @@ class TickerChannelCommonTest(private val channelFactory: Channel) : TestBase()
4848

4949
delayChannel.cancel()
5050
delay(5100)
51-
delayChannel.checkEmpty()
52-
delayChannel.cancel()
51+
assertFailsWith<CancellationException> { delayChannel.poll() }
5352
}
5453
}
5554

0 commit comments

Comments
 (0)