Skip to content

Commit e569bd3

Browse files
authored
Fix exception types for channels to ensure transparency & reporting (#1158)
* Fix exception types for channels to ensure transparency & reporting * ReceiveChannel.cancel always closes channel with CancellationException, so sending or receiving from a cancelled channel produces the corresponding CancellationException. * Cancelling produce builder has similar effect, but an more specific instance of JobCancellationException is created. * This ensure that produce/consumeEach pair is transparent with respect to cancellation and can be used to build "identity" transformation of the flow (the corresponding test is added). * ClosedSendChannelException is now a subclass of IllegalStateException, so that trying to send to a channel that was closed normally is reported as program error and is not eaten (test is added). Fixes #957 Fixes #1128 * Exceptions for channels cleanup * Documentation improved * Better exception message * Simplified flowOn implementation * Avoid exception instantiation on happy path in zip
1 parent f8eac76 commit e569bd3

20 files changed

+166
-66
lines changed

binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt

+1-1
Original file line numberDiff line numberDiff line change
@@ -692,7 +692,7 @@ public final class kotlinx/coroutines/channels/ClosedReceiveChannelException : j
692692
public fun <init> (Ljava/lang/String;)V
693693
}
694694

695-
public final class kotlinx/coroutines/channels/ClosedSendChannelException : java/util/concurrent/CancellationException {
695+
public final class kotlinx/coroutines/channels/ClosedSendChannelException : java/lang/IllegalStateException {
696696
public fun <init> (Ljava/lang/String;)V
697697
}
698698

kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -669,7 +669,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
669669
cancelInternal(cause)
670670

671671
final override fun cancel(cause: CancellationException?) {
672-
cancelInternal(cause)
672+
cancelInternal(cause ?: CancellationException("$classSimpleName was cancelled"))
673673
}
674674

675675
// It needs to be internal to support deprecated cancel(Throwable?) API

kotlinx-coroutines-core/common/src/channels/Channel.kt

+12-6
Original file line numberDiff line numberDiff line change
@@ -242,14 +242,16 @@ public interface ReceiveChannel<out E> {
242242
/**
243243
* Cancels reception of remaining elements from this channel with an optional [cause].
244244
* This function closes the channel and removes all buffered sent elements from it.
245+
*
245246
* A cause can be used to specify an error message or to provide other details on
246247
* a cancellation reason for debugging purposes.
248+
* If the cause is not specified, then an instance of [CancellationException] with a
249+
* default message is created to [close][SendChannel.close] the channel.
247250
*
248251
* Immediately after invocation of this function [isClosedForReceive] and
249252
* [isClosedForSend][SendChannel.isClosedForSend]
250-
* on the side of [SendChannel] start returning `true`, so all attempts to send to this channel
251-
* afterwards will throw [ClosedSendChannelException], while attempts to receive will throw
252-
* [ClosedReceiveChannelException].
253+
* on the side of [SendChannel] start returning `true`. All attempts to send to this channel
254+
* or receive from this channel will throw [CancellationException].
253255
*/
254256
public fun cancel(cause: CancellationException? = null)
255257

@@ -382,14 +384,18 @@ public fun <E> Channel(capacity: Int = RENDEZVOUS): Channel<E> =
382384
* Indicates attempt to [send][SendChannel.send] on [isClosedForSend][SendChannel.isClosedForSend] channel
383385
* that was closed without a cause. A _failed_ channel rethrows the original [close][SendChannel.close] cause
384386
* exception on send attempts.
387+
*
388+
* This exception is a subclass of [IllegalStateException] because, conceptually, sender is responsible
389+
* for closing the channel and not be trying to send anything after the channel was close. Attempts to
390+
* send into the closed channel indicate logical error in the sender's code.
385391
*/
386-
public class ClosedSendChannelException(message: String?) : CancellationException(message)
392+
public class ClosedSendChannelException(message: String?) : IllegalStateException(message)
387393

388394
/**
389395
* Indicates attempt to [receive][ReceiveChannel.receive] on [isClosedForReceive][ReceiveChannel.isClosedForReceive]
390396
* channel that was closed without a cause. A _failed_ channel rethrows the original [close][SendChannel.close] cause
391397
* exception on receive attempts.
392398
*
393-
* This exception is subclass of [NoSuchElementException] to be consistent with plain collections.
399+
* This exception is a subclass of [NoSuchElementException] to be consistent with plain collections.
394400
*/
395-
public class ClosedReceiveChannelException(message: String?) : NoSuchElementException(message)
401+
public class ClosedReceiveChannelException(message: String?) : NoSuchElementException(message)

kotlinx-coroutines-core/common/src/channels/ChannelCoroutine.kt

+9-2
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,15 @@ internal open class ChannelCoroutine<E>(
2828
}
2929

3030
override fun cancelInternal(cause: Throwable?): Boolean {
31-
_channel.cancel(cause?.toCancellationException()) // cancel the channel
32-
cancelCoroutine(cause) // cancel the job
31+
val exception = cause?.toCancellationException()
32+
?: JobCancellationException("$classSimpleName was cancelled", null, this)
33+
_channel.cancel(exception) // cancel the channel
34+
cancelCoroutine(exception) // cancel the job
3335
return true // does not matter - result is used in DEPRECATED functions only
3436
}
37+
38+
@Suppress("UNCHECKED_CAST")
39+
suspend fun sendFair(element: E) {
40+
(_channel as AbstractSendChannel<E>).sendFair(element)
41+
}
3542
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
/*
2+
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.flow.internal
6+
7+
import kotlinx.coroutines.*
8+
9+
/**
10+
* This exception is thrown when operator need no more elements from the flow.
11+
* This exception should never escape outside of operator's implementation.
12+
*/
13+
internal class AbortFlowException : CancellationException("Flow was aborted, no more elements needed") {
14+
// TODO expect/actual
15+
// override fun fillInStackTrace(): Throwable = this
16+
}

kotlinx-coroutines-core/common/src/flow/operators/Context.kt

+2-11
Original file line numberDiff line numberDiff line change
@@ -53,17 +53,8 @@ public fun <T> Flow<T>.flowOn(flowContext: CoroutineContext, bufferSize: Int = 1
5353
send(value)
5454
}
5555
}
56-
57-
// TODO semantics doesn't play well here and we pay for that with additional object
58-
(channel as Job).invokeOnCompletion { if (it is CancellationException && it.cause == null) cancel() }
59-
for (element in channel) {
60-
emit(element)
61-
}
62-
63-
val producer = channel as Job
64-
if (producer.isCancelled) {
65-
producer.join()
66-
throw producer.getCancellationException()
56+
channel.consumeEach { value ->
57+
emit(value)
6758
}
6859
}
6960
}

kotlinx-coroutines-core/common/src/flow/operators/Limit.kt

+6-10
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,10 @@
88

99
package kotlinx.coroutines.flow
1010

11-
import kotlinx.coroutines.flow.unsafeFlow as flow
1211
import kotlinx.coroutines.*
12+
import kotlinx.coroutines.flow.internal.*
1313
import kotlin.jvm.*
14+
import kotlinx.coroutines.flow.unsafeFlow as flow
1415

1516
/**
1617
* Returns a flow that ignores first [count] elements.
@@ -57,10 +58,10 @@ public fun <T> Flow<T>.take(count: Int): Flow<T> {
5758
collect { value ->
5859
emit(value)
5960
if (++consumed == count) {
60-
throw TakeLimitException()
61+
throw AbortFlowException()
6162
}
6263
}
63-
} catch (e: TakeLimitException) {
64+
} catch (e: AbortFlowException) {
6465
// Nothing, bail out
6566
}
6667
}
@@ -74,14 +75,9 @@ public fun <T> Flow<T>.takeWhile(predicate: suspend (T) -> Boolean): Flow<T> = f
7475
try {
7576
collect { value ->
7677
if (predicate(value)) emit(value)
77-
else throw TakeLimitException()
78+
else throw AbortFlowException()
7879
}
79-
} catch (e: TakeLimitException) {
80+
} catch (e: AbortFlowException) {
8081
// Nothing, bail out
8182
}
8283
}
83-
84-
private class TakeLimitException : CancellationException("Flow limit is reached, cancelling") {
85-
// TODO expect/actual
86-
// override fun fillInStackTrace(): Throwable = this
87-
}

kotlinx-coroutines-core/common/src/flow/operators/Zip.kt

+11-14
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ public fun <T1, T2, R> Flow<T1>.combineLatest(other: Flow<T2>, transform: suspen
7878

7979
private inline fun SelectBuilder<Unit>.onReceive(
8080
isClosed: Boolean,
81-
channel: Channel<Any>,
81+
channel: ReceiveChannel<Any>,
8282
crossinline onClosed: () -> Unit,
8383
noinline onReceive: suspend (value: Any) -> Unit
8484
) {
@@ -90,18 +90,11 @@ private inline fun SelectBuilder<Unit>.onReceive(
9090
}
9191

9292
// Channel has any type due to onReceiveOrNull. This will be fixed after receiveOrClosed
93-
private fun CoroutineScope.asFairChannel(flow: Flow<*>): Channel<Any> {
94-
val channel = RendezvousChannel<Any>() // Explicit type
95-
launch {
96-
try {
97-
flow.collect { value ->
98-
channel.sendFair(value ?: NullSurrogate)
99-
}
100-
} finally {
101-
channel.close()
102-
}
93+
private fun CoroutineScope.asFairChannel(flow: Flow<*>): ReceiveChannel<Any> = produce {
94+
val channel = channel as ChannelCoroutine<Any>
95+
flow.collect { value ->
96+
channel.sendFair(value ?: NullSurrogate)
10397
}
104-
return channel
10598
}
10699

107100

@@ -133,7 +126,9 @@ public fun <T1, T2, R> Flow<T1>.zip(other: Flow<T2>, transform: suspend (T1, T2)
133126
*
134127
* Invariant: this clause is invoked only when all elements from the channel were processed (=> rendezvous restriction).
135128
*/
136-
(second as SendChannel<*>).invokeOnClose { first.cancel() }
129+
(second as SendChannel<*>).invokeOnClose {
130+
if (!first.isClosedForReceive) first.cancel(AbortFlowException())
131+
}
137132

138133
val otherIterator = second.iterator()
139134
try {
@@ -144,8 +139,10 @@ public fun <T1, T2, R> Flow<T1>.zip(other: Flow<T2>, transform: suspend (T1, T2)
144139
val secondValue = NullSurrogate.unbox<T2>(otherIterator.next())
145140
emit(transform(NullSurrogate.unbox(value), NullSurrogate.unbox(secondValue)))
146141
}
142+
} catch (e: AbortFlowException) {
143+
// complete
147144
} finally {
148-
second.cancel()
145+
if (!second.isClosedForReceive) second.cancel(AbortFlowException())
149146
}
150147
}
151148
}

kotlinx-coroutines-core/common/test/channels/ArrayBroadcastChannelTest.kt

+8-6
Original file line numberDiff line numberDiff line change
@@ -170,23 +170,25 @@ class ArrayBroadcastChannelTest : TestBase() {
170170
// start consuming
171171
val sub = channel.openSubscription()
172172
var expected = 0
173-
sub.consumeEach {
174-
check(it == ++expected)
175-
if (it == 2) {
176-
sub.cancel()
173+
assertFailsWith<CancellationException> {
174+
sub.consumeEach {
175+
check(it == ++expected)
176+
if (it == 2) {
177+
sub.cancel()
178+
}
177179
}
178180
}
179181
check(expected == 2)
180182
}
181183

182184
@Test
183-
fun testReceiveFromClosedSub() = runTest({ it is ClosedReceiveChannelException }) {
185+
fun testReceiveFromCancelledSub() = runTest {
184186
val channel = BroadcastChannel<Int>(1)
185187
val sub = channel.openSubscription()
186188
assertFalse(sub.isClosedForReceive)
187189
sub.cancel()
188190
assertTrue(sub.isClosedForReceive)
189-
sub.receive()
191+
assertFailsWith<CancellationException> { sub.receive() }
190192
}
191193

192194
@Test

kotlinx-coroutines-core/common/test/channels/ArrayChannelTest.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ class ArrayChannelTest : TestBase() {
134134
q.cancel()
135135
check(q.isClosedForSend)
136136
check(q.isClosedForReceive)
137-
check(q.receiveOrNull() == null)
137+
assertFailsWith<CancellationException> { q.receiveOrNull() }
138138
finish(12)
139139
}
140140

kotlinx-coroutines-core/common/test/channels/BasicOperationsTest.kt

+22-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import kotlinx.coroutines.*
88
import kotlin.test.*
99

1010
class BasicOperationsTest : TestBase() {
11-
1211
@Test
1312
fun testSimpleSendReceive() = runTest {
1413
// Parametrized common test :(
@@ -20,6 +19,11 @@ class BasicOperationsTest : TestBase() {
2019
TestChannelKind.values().forEach { kind -> testOffer(kind) }
2120
}
2221

22+
@Test
23+
fun testSendAfterClose() = runTest {
24+
TestChannelKind.values().forEach { kind -> testSendAfterClose(kind) }
25+
}
26+
2327
@Test
2428
fun testReceiveOrNullAfterClose() = runTest {
2529
TestChannelKind.values().forEach { kind -> testReceiveOrNull(kind) }
@@ -128,6 +132,23 @@ class BasicOperationsTest : TestBase() {
128132
d.await()
129133
}
130134

135+
/**
136+
* [ClosedSendChannelException] should not be eaten.
137+
* See [https://github.com/Kotlin/kotlinx.coroutines/issues/957]
138+
*/
139+
private suspend fun testSendAfterClose(kind: TestChannelKind) {
140+
assertFailsWith<ClosedSendChannelException> {
141+
coroutineScope {
142+
val channel = kind.create()
143+
channel.close()
144+
145+
launch {
146+
channel.send(1)
147+
}
148+
}
149+
}
150+
}
151+
131152
private suspend fun testSendReceive(kind: TestChannelKind, iterations: Int) = coroutineScope {
132153
val channel = kind.create()
133154
launch {

kotlinx-coroutines-core/common/test/channels/ConflatedChannelTest.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ class ConflatedChannelTest : TestBase() {
7373
q.cancel()
7474
check(q.isClosedForSend)
7575
check(q.isClosedForReceive)
76-
check(q.receiveOrNull() == null)
76+
assertFailsWith<CancellationException> { q.receiveOrNull() }
7777
finish(2)
7878
}
7979

kotlinx-coroutines-core/common/test/channels/LinkedListChannelTest.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ class LinkedListChannelTest : TestBase() {
3131
q.cancel()
3232
check(q.isClosedForSend)
3333
check(q.isClosedForReceive)
34-
check(q.receiveOrNull() == null)
34+
assertFailsWith<CancellationException> { q.receiveOrNull() }
3535
}
3636

3737
@Test

kotlinx-coroutines-core/common/test/channels/ProduceTest.kt

+2-3
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ class ProduceTest : TestBase() {
3838
expectUnreached()
3939
} catch (e: Throwable) {
4040
expect(7)
41-
check(e is ClosedSendChannelException)
41+
check(e is CancellationException)
4242
throw e
4343
}
4444
expectUnreached()
@@ -48,7 +48,7 @@ class ProduceTest : TestBase() {
4848
expect(4)
4949
c.cancel()
5050
expect(5)
51-
assertNull(c.receiveOrNull())
51+
assertFailsWith<CancellationException> { c.receiveOrNull() }
5252
expect(6)
5353
yield() // to produce
5454
finish(8)
@@ -107,7 +107,6 @@ class ProduceTest : TestBase() {
107107
produced.cancel()
108108
try {
109109
source.receive()
110-
// TODO shouldn't it be ClosedReceiveChannelException ?
111110
} catch (e: CancellationException) {
112111
finish(4)
113112
}

kotlinx-coroutines-core/common/test/channels/RendezvousChannelTest.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -272,7 +272,7 @@ class RendezvousChannelTest : TestBase() {
272272
q.cancel()
273273
check(q.isClosedForSend)
274274
check(q.isClosedForReceive)
275-
check(q.receiveOrNull() == null)
275+
assertFailsWith<CancellationException> { q.receiveOrNull() }
276276
finish(12)
277277
}
278278

0 commit comments

Comments
 (0)