Skip to content

Commit 8bb5210

Browse files
authored
Consistently handle exceptions in reactive streams (#2646)
* Fixed `PublisherCoroutine`, `rxObservable`, and `Flow.toPublisher` ignoring cancellations. * Fatal exceptions are not treated in a special manner by us anymore. Instead, we follow the requirement in the reactive streams specification that, in case some method of `Subscriber` throws, that subscriber MUST be considered canceled, and the exception MUST be reported in someplace other than `onError`. * Fixed `trySend` sometimes throwing in `PublisherCoroutine` and `rxObservable`. * When an exception happens inside a cancellation handler, we now consistently throw the original exception passed to the handler, with the new exception added as suppressed. * Fixed `PublisherCoroutine` and `rxObservable` claiming that the channel is not closed for send for some time after `close()` has finished. * Fixed publishers sometimes signalling `onComplete()` after cancellation even though their streams are not finite. Fixes #2173
1 parent 95ad444 commit 8bb5210

29 files changed

+740
-357
lines changed

reactive/kotlinx-coroutines-jdk9/test/FlowAsPublisherTest.kt

+18-3
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ class FlowAsPublisherTest : TestBase() {
1515
@Test
1616
fun testErrorOnCancellationIsReported() {
1717
expect(1)
18-
flow<Int> {
18+
flow {
1919
try {
2020
emit(2)
2121
} finally {
@@ -50,13 +50,13 @@ class FlowAsPublisherTest : TestBase() {
5050
@Test
5151
fun testCancellationIsNotReported() {
5252
expect(1)
53-
flow<Int> {
53+
flow {
5454
emit(2)
5555
}.asPublisher().subscribe(object : JFlow.Subscriber<Int> {
5656
private lateinit var subscription: JFlow.Subscription
5757

5858
override fun onComplete() {
59-
expect(3)
59+
expectUnreached()
6060
}
6161

6262
override fun onSubscribe(s: JFlow.Subscription?) {
@@ -73,6 +73,21 @@ class FlowAsPublisherTest : TestBase() {
7373
expectUnreached()
7474
}
7575
})
76+
finish(3)
77+
}
78+
79+
@Test
80+
fun testFlowWithTimeout() = runTest {
81+
val publisher = flow<Int> {
82+
expect(2)
83+
withTimeout(1) { delay(Long.MAX_VALUE) }
84+
}.asPublisher()
85+
try {
86+
expect(1)
87+
publisher.awaitFirstOrNull()
88+
} catch (e: CancellationException) {
89+
expect(3)
90+
}
7691
finish(4)
7792
}
7893
}

reactive/kotlinx-coroutines-jdk9/test/IntegrationTest.kt

+19-1
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,12 @@
55
package kotlinx.coroutines.jdk9
66

77
import kotlinx.coroutines.*
8+
import kotlinx.coroutines.exceptions.*
89
import org.junit.Test
910
import kotlinx.coroutines.flow.flowOn
1011
import org.junit.runner.*
1112
import org.junit.runners.*
13+
import kotlin.contracts.*
1214
import java.util.concurrent.Flow as JFlow
1315
import kotlin.coroutines.*
1416
import kotlin.test.*
@@ -129,4 +131,20 @@ class IntegrationTest(
129131
assertEquals(n, last)
130132
}
131133

132-
}
134+
}
135+
136+
@OptIn(ExperimentalContracts::class)
137+
internal suspend inline fun <reified E: Throwable> assertCallsExceptionHandlerWith(
138+
crossinline operation: suspend (CoroutineExceptionHandler) -> Unit): E {
139+
contract {
140+
callsInPlace(operation, InvocationKind.EXACTLY_ONCE)
141+
}
142+
val handler = CapturingHandler()
143+
return withContext(handler) {
144+
operation(handler)
145+
handler.getException().let {
146+
assertTrue(it is E, it.toString())
147+
it
148+
}
149+
}
150+
}

reactive/kotlinx-coroutines-jdk9/test/PublishTest.kt

+130-28
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
package kotlinx.coroutines.jdk9
66

77
import kotlinx.coroutines.*
8+
import kotlinx.coroutines.channels.*
89
import org.junit.Test
910
import java.util.concurrent.Flow as JFlow
1011
import kotlin.test.*
@@ -121,44 +122,110 @@ class PublishTest : TestBase() {
121122
finish(7)
122123
}
123124

125+
/** Tests that, as soon as `ProducerScope.close` is called, `isClosedForSend` starts returning `true`. */
124126
@Test
125-
fun testOnNextError() = runTest {
127+
fun testChannelClosing() = runTest {
126128
expect(1)
127-
val publisher = flowPublish(currentDispatcher()) {
129+
val publisher = flowPublish<Int>(Dispatchers.Unconfined) {
130+
expect(3)
131+
close()
132+
assert(isClosedForSend)
128133
expect(4)
129-
try {
130-
send("OK")
131-
} catch(e: Throwable) {
132-
expect(6)
133-
assert(e is TestException)
134-
}
135134
}
136-
expect(2)
135+
try {
136+
expect(2)
137+
publisher.awaitFirstOrNull()
138+
} catch (e: CancellationException) {
139+
expect(5)
140+
}
141+
finish(6)
142+
}
143+
144+
@Test
145+
fun testOnNextError() = runTest {
137146
val latch = CompletableDeferred<Unit>()
138-
publisher.subscribe(object : JFlow.Subscriber<String> {
139-
override fun onComplete() {
140-
expectUnreached()
147+
expect(1)
148+
assertCallsExceptionHandlerWith<TestException> { exceptionHandler ->
149+
val publisher = flowPublish(currentDispatcher() + exceptionHandler) {
150+
expect(4)
151+
try {
152+
send("OK")
153+
} catch (e: Throwable) {
154+
expect(6)
155+
assert(e is TestException)
156+
assert(isClosedForSend)
157+
latch.complete(Unit)
158+
}
141159
}
160+
expect(2)
161+
publisher.subscribe(object : JFlow.Subscriber<String> {
162+
override fun onComplete() {
163+
expectUnreached()
164+
}
142165

143-
override fun onSubscribe(s: JFlow.Subscription) {
144-
expect(3)
145-
s.request(1)
146-
}
166+
override fun onSubscribe(s: JFlow.Subscription) {
167+
expect(3)
168+
s.request(1)
169+
}
147170

148-
override fun onNext(t: String) {
149-
expect(5)
150-
assertEquals("OK", t)
151-
throw TestException()
152-
}
171+
override fun onNext(t: String) {
172+
expect(5)
173+
assertEquals("OK", t)
174+
throw TestException()
175+
}
153176

154-
override fun onError(t: Throwable) {
155-
expect(7)
156-
assert(t is TestException)
157-
latch.complete(Unit)
177+
override fun onError(t: Throwable) {
178+
expectUnreached()
179+
}
180+
})
181+
latch.await()
182+
}
183+
finish(7)
184+
}
185+
186+
/** Tests the behavior when a call to `onNext` fails after the channel is already closed. */
187+
@Test
188+
fun testOnNextErrorAfterCancellation() = runTest {
189+
assertCallsExceptionHandlerWith<TestException> { handler ->
190+
var producerScope: ProducerScope<Int>? = null
191+
CompletableDeferred<Unit>()
192+
expect(1)
193+
var job: Job? = null
194+
val publisher = flowPublish<Int>(handler + Dispatchers.Unconfined) {
195+
producerScope = this
196+
expect(4)
197+
job = launch {
198+
delay(Long.MAX_VALUE)
199+
}
158200
}
159-
})
160-
latch.await()
161-
finish(8)
201+
expect(2)
202+
publisher.subscribe(object: JFlow.Subscriber<Int> {
203+
override fun onSubscribe(s: JFlow.Subscription) {
204+
expect(3)
205+
s.request(Long.MAX_VALUE)
206+
}
207+
override fun onNext(t: Int) {
208+
expect(6)
209+
assertEquals(1, t)
210+
job!!.cancel()
211+
throw TestException()
212+
}
213+
override fun onError(t: Throwable?) {
214+
/* Correct changes to the implementation could lead to us entering or not entering this method, but
215+
it only matters that if we do, it is the "correct" exception that was validly used to cancel the
216+
coroutine that gets passed here and not `TestException`. */
217+
assertTrue(t is CancellationException)
218+
}
219+
override fun onComplete() { expectUnreached() }
220+
})
221+
expect(5)
222+
val result: ChannelResult<Unit> = producerScope!!.trySend(1)
223+
val e = result.exceptionOrNull()!!
224+
assertTrue(e is CancellationException, "The actual error: $e")
225+
assertTrue(producerScope!!.isClosedForSend)
226+
assertTrue(result.isFailure)
227+
}
228+
finish(7)
162229
}
163230

164231
@Test
@@ -182,4 +249,39 @@ class PublishTest : TestBase() {
182249
fun testIllegalArgumentException() {
183250
assertFailsWith<IllegalArgumentException> { flowPublish<Int>(Job()) { } }
184251
}
252+
253+
/** Tests that `trySend` doesn't throw in `flowPublish`. */
254+
@Test
255+
fun testTrySendNotThrowing() = runTest {
256+
var producerScope: ProducerScope<Int>? = null
257+
expect(1)
258+
val publisher = flowPublish<Int>(Dispatchers.Unconfined) {
259+
producerScope = this
260+
expect(3)
261+
delay(Long.MAX_VALUE)
262+
}
263+
val job = launch(start = CoroutineStart.UNDISPATCHED) {
264+
expect(2)
265+
publisher.awaitFirstOrNull()
266+
expectUnreached()
267+
}
268+
job.cancel()
269+
expect(4)
270+
val result = producerScope!!.trySend(1)
271+
assertTrue(result.isFailure)
272+
finish(5)
273+
}
274+
275+
/** Tests that all methods on `flowPublish` fail without closing the channel when attempting to emit `null`. */
276+
@Test
277+
fun testEmittingNull() = runTest {
278+
val publisher = flowPublish {
279+
assertFailsWith<NullPointerException> { send(null) }
280+
assertFailsWith<NullPointerException> { trySend(null) }
281+
@Suppress("DEPRECATION")
282+
assertFailsWith<NullPointerException> { offer(null) }
283+
send("OK")
284+
}
285+
assertEquals("OK", publisher.awaitFirstOrNull())
286+
}
185287
}

0 commit comments

Comments
 (0)