Skip to content

Commit 5bb8a4e

Browse files
qwwdfsadelizarov
authored andcommitted
Properly close subscription on exception in Publisher.consumeEach
1 parent cd2a8d7 commit 5bb8a4e

File tree

2 files changed

+19
-3
lines changed

2 files changed

+19
-3
lines changed

reactive/kotlinx-coroutines-reactive/src/Channel.kt

+1-3
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,7 @@ public fun <T> Publisher<T>.openSubscription(request: Int = 0): ReceiveChannel<T
3131
* Subscribes to this [Publisher] and performs the specified action for each received element.
3232
*/
3333
public suspend inline fun <T> Publisher<T>.consumeEach(action: (T) -> Unit) {
34-
val channel = openSubscription()
35-
for (x in channel) action(x)
36-
channel.cancel()
34+
openSubscription().consumeEach(action)
3735
}
3836

3937
@Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER")

reactive/kotlinx-coroutines-reactive/test/IntegrationTest.kt

+18
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,24 @@ class IntegrationTest(
7474
assertThat(cnt, IsEqual(1))
7575
}
7676

77+
@Test
78+
fun testFailingConsumer() = runTest {
79+
val pub = publish {
80+
repeat(3) {
81+
expect(it + 1) // expect(1), expect(2) *should* be invoked
82+
send(it)
83+
}
84+
}
85+
86+
try {
87+
pub.consumeEach {
88+
throw TestException()
89+
}
90+
} catch (e: TestException) {
91+
finish(3)
92+
}
93+
}
94+
7795
@Test
7896
fun testNumbers() = runBlocking<Unit> {
7997
val n = 100 * stressTestMultiplier

0 commit comments

Comments
 (0)