Skip to content

Commit 65e13b8

Browse files
committed
Handle undeliverable errors in rxObservable
If an error can't be delivered to the observer due to the observer already having disposed the subscription, then it should be reported to the RX global exception handler. This was already being done in the implementations for rxSingle, rxMaybe, and rxCompletable, but not in rxObservable. Copied the test over from the other implementations as well. Also removed the previous test in ObservableTest, which actually was testing rxSingle instead of rxObservable, and was too complicated and dependent on race conditions (both rxSingle and rxObservable now have a simple and deterministic test for undeliverable error handling). It was also not well implemented, and I originally tried to fix it to be more robust in commit 58ad0cd. Fixes Kotlin#2173.
1 parent 58ad0cd commit 65e13b8

File tree

6 files changed

+94
-70
lines changed

6 files changed

+94
-70
lines changed

reactive/kotlinx-coroutines-rx2/src/RxObservable.kt

+3-5
Original file line numberDiff line numberDiff line change
@@ -164,12 +164,10 @@ private class RxObservableCoroutine<T: Any>(
164164
* Such behaviour is inconsistent, leads to silent failures and we can't possibly know whether
165165
* the cause will be handled by onError (and moreover, it depends on whether a fatal exception was
166166
* thrown by subscriber or upstream).
167-
* To make behaviour consistent and least surprising, we always handle fatal exceptions
168-
* by coroutines machinery, anyway, they should not be present in regular program flow,
169-
* thus our goal here is just to expose it as soon as possible.
167+
* To make behaviour consistent and least surprising, we always deliver fatal exceptions to the
168+
* RX global exception handler.
170169
*/
171-
subscriber.tryOnError(cause)
172-
if (!handled && cause.isFatal()) {
170+
if (!subscriber.tryOnError(cause) || (!handled && cause.isFatal())) {
173171
handleUndeliverableException(cause, context)
174172
}
175173
}

reactive/kotlinx-coroutines-rx2/test/ObservableExceptionHandlingTest.kt

+44
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44

55
package kotlinx.coroutines.rx2
66

7+
import io.reactivex.*
8+
import io.reactivex.disposables.*
79
import io.reactivex.exceptions.*
810
import kotlinx.coroutines.*
911
import org.junit.*
@@ -131,4 +133,46 @@ class ObservableExceptionHandlingTest : TestBase() {
131133
}, { expect(3) })
132134
finish(5)
133135
}
136+
137+
@Test
138+
fun testUnhandledException() = runTest {
139+
expect(1)
140+
var disposable: Disposable? = null
141+
val handler = { e: Throwable ->
142+
assertTrue(e is UndeliverableException && e.cause is TestException)
143+
expect(5)
144+
}
145+
val observable = rxObservable<Nothing>(currentDispatcher()) {
146+
expect(4)
147+
disposable!!.dispose() // cancel our own subscription, so that delay will get cancelled
148+
try {
149+
delay(Long.MAX_VALUE)
150+
} finally {
151+
throw TestException() // would not be able to handle it since mono is disposed
152+
}
153+
}
154+
withExceptionHandler(handler) {
155+
observable.subscribe(object : Observer<Nothing> {
156+
override fun onSubscribe(d: Disposable) {
157+
expect(2)
158+
disposable = d
159+
}
160+
161+
override fun onNext(t: Nothing) {
162+
expectUnreached()
163+
}
164+
165+
override fun onError(t: Throwable) {
166+
expectUnreached()
167+
}
168+
169+
override fun onComplete() {
170+
expectUnreached()
171+
}
172+
})
173+
expect(3)
174+
yield() // run coroutine
175+
finish(6)
176+
}
177+
}
134178
}

reactive/kotlinx-coroutines-rx2/test/ObservableTest.kt

-30
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,10 @@
44

55
package kotlinx.coroutines.rx2
66

7-
import io.reactivex.*
8-
import io.reactivex.exceptions.*
97
import kotlinx.coroutines.*
108
import kotlinx.coroutines.CancellationException
119
import org.junit.*
1210
import org.junit.Test
13-
import java.util.concurrent.*
14-
import java.util.concurrent.atomic.*
1511
import kotlin.test.*
1612

1713
class ObservableTest : TestBase() {
@@ -138,30 +134,4 @@ class ObservableTest : TestBase() {
138134
expect(4)
139135
}
140136
}
141-
142-
@Test
143-
fun testExceptionAfterCancellation() {
144-
// Test that no exceptions were reported to the coroutine EH (it will fail the test if so)
145-
val coroutineExceptionHandler = CoroutineExceptionHandler { _, _ -> expectUnreached() }
146-
val rxExceptionHandlerInvocations = AtomicInteger()
147-
val rxExceptionHandler: (Throwable) -> Unit = { error ->
148-
assertTrue(error is UndeliverableException)
149-
assertTrue(error.cause is TestException)
150-
rxExceptionHandlerInvocations.getAndIncrement()
151-
}
152-
withExceptionHandler(rxExceptionHandler) {
153-
Observable
154-
.interval(1, TimeUnit.MILLISECONDS)
155-
.take(1000)
156-
.switchMapSingle {
157-
rxSingle(coroutineExceptionHandler) {
158-
timeBomb().await()
159-
}
160-
}
161-
.blockingSubscribe({}, {})
162-
}
163-
assertTrue(rxExceptionHandlerInvocations.get() > 0)
164-
}
165-
166-
private fun timeBomb() = Single.timer(1, TimeUnit.MILLISECONDS).doOnSuccess { throw TestException() }
167137
}

reactive/kotlinx-coroutines-rx3/src/RxObservable.kt

+3-5
Original file line numberDiff line numberDiff line change
@@ -150,12 +150,10 @@ private class RxObservableCoroutine<T: Any>(
150150
* Such behaviour is inconsistent, leads to silent failures and we can't possibly know whether
151151
* the cause will be handled by onError (and moreover, it depends on whether a fatal exception was
152152
* thrown by subscriber or upstream).
153-
* To make behaviour consistent and least surprising, we always handle fatal exceptions
154-
* by coroutines machinery, anyway, they should not be present in regular program flow,
155-
* thus our goal here is just to expose it as soon as possible.
153+
* To make behaviour consistent and least surprising, we always deliver fatal exceptions to the
154+
* RX global exception handler.
156155
*/
157-
subscriber.tryOnError(cause)
158-
if (!handled && cause.isFatal()) {
156+
if (!subscriber.tryOnError(cause) || (!handled && cause.isFatal())) {
159157
handleUndeliverableException(cause, context)
160158
}
161159
}

reactive/kotlinx-coroutines-rx3/test/ObservableExceptionHandlingTest.kt

+44
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44

55
package kotlinx.coroutines.rx3
66

7+
import io.reactivex.rxjava3.core.*
8+
import io.reactivex.rxjava3.disposables.*
79
import io.reactivex.rxjava3.exceptions.*
810
import kotlinx.coroutines.*
911
import org.junit.*
@@ -131,4 +133,46 @@ class ObservableExceptionHandlingTest : TestBase() {
131133
}, { expect(3) })
132134
finish(5)
133135
}
136+
137+
@Test
138+
fun testUnhandledException() = runTest {
139+
expect(1)
140+
var disposable: Disposable? = null
141+
val handler = { e: Throwable ->
142+
assertTrue(e is UndeliverableException && e.cause is TestException)
143+
expect(5)
144+
}
145+
val observable = rxObservable<Nothing>(currentDispatcher()) {
146+
expect(4)
147+
disposable!!.dispose() // cancel our own subscription, so that delay will get cancelled
148+
try {
149+
delay(Long.MAX_VALUE)
150+
} finally {
151+
throw TestException() // would not be able to handle it since mono is disposed
152+
}
153+
}
154+
withExceptionHandler(handler) {
155+
observable.subscribe(object : Observer<Nothing> {
156+
override fun onSubscribe(d: Disposable) {
157+
expect(2)
158+
disposable = d
159+
}
160+
161+
override fun onNext(t: Nothing) {
162+
expectUnreached()
163+
}
164+
165+
override fun onError(t: Throwable) {
166+
expectUnreached()
167+
}
168+
169+
override fun onComplete() {
170+
expectUnreached()
171+
}
172+
})
173+
expect(3)
174+
yield() // run coroutine
175+
finish(6)
176+
}
177+
}
134178
}

reactive/kotlinx-coroutines-rx3/test/ObservableTest.kt

-30
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,10 @@
44

55
package kotlinx.coroutines.rx3
66

7-
import io.reactivex.rxjava3.core.*
8-
import io.reactivex.rxjava3.exceptions.*
97
import kotlinx.coroutines.*
108
import kotlinx.coroutines.CancellationException
119
import org.junit.*
1210
import org.junit.Test
13-
import java.util.concurrent.*
14-
import java.util.concurrent.atomic.*
1511
import kotlin.test.*
1612

1713
class ObservableTest : TestBase() {
@@ -138,30 +134,4 @@ class ObservableTest : TestBase() {
138134
expect(4)
139135
}
140136
}
141-
142-
@Test
143-
fun testExceptionAfterCancellation() {
144-
// Test that no exceptions were reported to the coroutine EH (it will fail the test if so)
145-
val coroutineExceptionHandler = CoroutineExceptionHandler { _, _ -> expectUnreached() }
146-
val rxExceptionHandlerInvocations = AtomicInteger()
147-
val rxExceptionHandler: (Throwable) -> Unit = { error ->
148-
assertTrue(error is UndeliverableException)
149-
assertTrue(error.cause is TestException)
150-
rxExceptionHandlerInvocations.getAndIncrement()
151-
}
152-
withExceptionHandler(rxExceptionHandler) {
153-
Observable
154-
.interval(1, TimeUnit.MILLISECONDS)
155-
.take(1000)
156-
.switchMapSingle {
157-
rxSingle(coroutineExceptionHandler) {
158-
timeBomb().await()
159-
}
160-
}
161-
.blockingSubscribe({}, {})
162-
}
163-
assertTrue(rxExceptionHandlerInvocations.get() > 0)
164-
}
165-
166-
private fun timeBomb() = Single.timer(1, TimeUnit.MILLISECONDS).doOnSuccess { throw TestException() }
167137
}

0 commit comments

Comments
 (0)