Skip to content

Commit 1b70f42

Browse files
committed
Specify the behavior of Channel.consumeEach on scope cancellation
1 parent 776adad commit 1b70f42

File tree

2 files changed

+32
-1
lines changed

2 files changed

+32
-1
lines changed

kotlinx-coroutines-core/common/src/channels/Channels.common.kt

+2-1
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,8 @@ public inline fun <E, R> ReceiveChannel<E>.consume(block: ReceiveChannel<E>.() -
107107
* Performs the given [action] for each received element and [cancels][ReceiveChannel.cancel] the channel afterward.
108108
*
109109
* This function stops processing elements when the channel is closed,
110-
* the coroutine in which the collection is performed gets cancelled,
110+
* the coroutine in which the collection is performed gets cancelled and there are no readily available elements in the
111+
* channel's buffer,
111112
* or an early return from [action] happens.
112113
* Throwing an exception from [action] will attempt to close the channel using the thrown exception.
113114
*

kotlinx-coroutines-core/common/test/channels/ConsumeTest.kt

+30
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,36 @@ class ConsumeTest: TestBase() {
9191
assertTrue(channel.isClosedForReceive)
9292
}
9393

94+
/** Checks that [ReceiveChannel.consumeEach] reacts to cancellation, but processes the elements that are
95+
* readily available in the buffer. */
96+
@Test
97+
fun testConsumeEachExitsOnCancellation() = runTest {
98+
val undeliveredElements = mutableListOf<Int>()
99+
val channel = Channel<Int>(2, onUndeliveredElement = {
100+
undeliveredElements.add(it)
101+
})
102+
launch {
103+
// These two elements will be sent and put into the buffer:
104+
channel.send(0)
105+
channel.send(1)
106+
// This element will not fit into the buffer, so `send` suspends:
107+
channel.send(2)
108+
// At this point, the consumer's `launch` is cancelled.
109+
yield() // Allow the cancellation handler of the consumer to run.
110+
// Try to send a new element, which will fail at this point:
111+
channel.send(3)
112+
fail("unreached")
113+
}
114+
launch {
115+
channel.consumeEach {
116+
cancel()
117+
assertTrue(it in 0..2)
118+
}
119+
}.join()
120+
assertTrue(channel.isClosedForReceive)
121+
assertEquals(listOf(3), undeliveredElements)
122+
}
123+
94124
/** Check that [BroadcastChannel.consume] does not suffer from KT-58685 */
95125
@Suppress("DEPRECATION", "DEPRECATION_ERROR")
96126
@Test

0 commit comments

Comments
 (0)