diff --git a/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt b/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt index 2f483879b8..454e0f2ff0 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt @@ -164,12 +164,10 @@ private class RxObservableCoroutine( * Such behaviour is inconsistent, leads to silent failures and we can't possibly know whether * the cause will be handled by onError (and moreover, it depends on whether a fatal exception was * thrown by subscriber or upstream). - * To make behaviour consistent and least surprising, we always handle fatal exceptions - * by coroutines machinery, anyway, they should not be present in regular program flow, - * thus our goal here is just to expose it as soon as possible. + * To make behaviour consistent and least surprising, we always deliver fatal exceptions to the + * RX global exception handler. */ - subscriber.tryOnError(cause) - if (!handled && cause.isFatal()) { + if (!subscriber.tryOnError(cause) || (!handled && cause.isFatal())) { handleUndeliverableException(cause, context) } } diff --git a/reactive/kotlinx-coroutines-rx2/test/ObservableExceptionHandlingTest.kt b/reactive/kotlinx-coroutines-rx2/test/ObservableExceptionHandlingTest.kt index d6cdd3ca24..a09993218c 100644 --- a/reactive/kotlinx-coroutines-rx2/test/ObservableExceptionHandlingTest.kt +++ b/reactive/kotlinx-coroutines-rx2/test/ObservableExceptionHandlingTest.kt @@ -4,6 +4,8 @@ package kotlinx.coroutines.rx2 +import io.reactivex.* +import io.reactivex.disposables.* import io.reactivex.exceptions.* import kotlinx.coroutines.* import org.junit.* @@ -131,4 +133,46 @@ class ObservableExceptionHandlingTest : TestBase() { }, { expect(3) }) finish(5) } + + @Test + fun testUnhandledException() = runTest { + expect(1) + var disposable: Disposable? = null + val handler = { e: Throwable -> + assertTrue(e is UndeliverableException && e.cause is TestException) + expect(5) + } + val observable = rxObservable(currentDispatcher()) { + expect(4) + disposable!!.dispose() // cancel our own subscription, so that delay will get cancelled + try { + delay(Long.MAX_VALUE) + } finally { + throw TestException() // would not be able to handle it since mono is disposed + } + } + withExceptionHandler(handler) { + observable.subscribe(object : Observer { + override fun onSubscribe(d: Disposable) { + expect(2) + disposable = d + } + + override fun onNext(t: Nothing) { + expectUnreached() + } + + override fun onError(t: Throwable) { + expectUnreached() + } + + override fun onComplete() { + expectUnreached() + } + }) + expect(3) + yield() // run coroutine + finish(6) + } + } } diff --git a/reactive/kotlinx-coroutines-rx2/test/ObservableTest.kt b/reactive/kotlinx-coroutines-rx2/test/ObservableTest.kt index 4f7fa547d2..22d08ca752 100644 --- a/reactive/kotlinx-coroutines-rx2/test/ObservableTest.kt +++ b/reactive/kotlinx-coroutines-rx2/test/ObservableTest.kt @@ -4,13 +4,10 @@ package kotlinx.coroutines.rx2 -import io.reactivex.* -import io.reactivex.plugins.* import kotlinx.coroutines.* import kotlinx.coroutines.CancellationException import org.junit.* import org.junit.Test -import java.util.concurrent.* import kotlin.test.* class ObservableTest : TestBase() { @@ -137,28 +134,4 @@ class ObservableTest : TestBase() { expect(4) } } - - @Test - fun testExceptionAfterCancellation() { - // Test that no exceptions were reported to the global EH (it will fail the test if so) - val handler = { e: Throwable -> - assertFalse(e is CancellationException) - } - withExceptionHandler(handler) { - RxJavaPlugins.setErrorHandler { - require(it !is CancellationException) - } - Observable - .interval(1, TimeUnit.MILLISECONDS) - .take(1000) - .switchMapSingle { - rxSingle { - timeBomb().await() - } - } - .blockingSubscribe({}, {}) - } - } - - private fun timeBomb() = Single.timer(1, TimeUnit.MILLISECONDS).doOnSuccess { throw TestException() } } \ No newline at end of file diff --git a/reactive/kotlinx-coroutines-rx3/src/RxObservable.kt b/reactive/kotlinx-coroutines-rx3/src/RxObservable.kt index 102d06ea60..3366cafa85 100644 --- a/reactive/kotlinx-coroutines-rx3/src/RxObservable.kt +++ b/reactive/kotlinx-coroutines-rx3/src/RxObservable.kt @@ -150,12 +150,10 @@ private class RxObservableCoroutine( * Such behaviour is inconsistent, leads to silent failures and we can't possibly know whether * the cause will be handled by onError (and moreover, it depends on whether a fatal exception was * thrown by subscriber or upstream). - * To make behaviour consistent and least surprising, we always handle fatal exceptions - * by coroutines machinery, anyway, they should not be present in regular program flow, - * thus our goal here is just to expose it as soon as possible. + * To make behaviour consistent and least surprising, we always deliver fatal exceptions to the + * RX global exception handler. */ - subscriber.tryOnError(cause) - if (!handled && cause.isFatal()) { + if (!subscriber.tryOnError(cause) || (!handled && cause.isFatal())) { handleUndeliverableException(cause, context) } } diff --git a/reactive/kotlinx-coroutines-rx3/test/ObservableExceptionHandlingTest.kt b/reactive/kotlinx-coroutines-rx3/test/ObservableExceptionHandlingTest.kt index 1183b2ae21..35c536cdd2 100644 --- a/reactive/kotlinx-coroutines-rx3/test/ObservableExceptionHandlingTest.kt +++ b/reactive/kotlinx-coroutines-rx3/test/ObservableExceptionHandlingTest.kt @@ -4,6 +4,8 @@ package kotlinx.coroutines.rx3 +import io.reactivex.rxjava3.core.* +import io.reactivex.rxjava3.disposables.* import io.reactivex.rxjava3.exceptions.* import kotlinx.coroutines.* import org.junit.* @@ -131,4 +133,46 @@ class ObservableExceptionHandlingTest : TestBase() { }, { expect(3) }) finish(5) } + + @Test + fun testUnhandledException() = runTest { + expect(1) + var disposable: Disposable? = null + val handler = { e: Throwable -> + assertTrue(e is UndeliverableException && e.cause is TestException) + expect(5) + } + val observable = rxObservable(currentDispatcher()) { + expect(4) + disposable!!.dispose() // cancel our own subscription, so that delay will get cancelled + try { + delay(Long.MAX_VALUE) + } finally { + throw TestException() // would not be able to handle it since mono is disposed + } + } + withExceptionHandler(handler) { + observable.subscribe(object : Observer { + override fun onSubscribe(d: Disposable) { + expect(2) + disposable = d + } + + override fun onNext(t: Nothing) { + expectUnreached() + } + + override fun onError(t: Throwable) { + expectUnreached() + } + + override fun onComplete() { + expectUnreached() + } + }) + expect(3) + yield() // run coroutine + finish(6) + } + } } diff --git a/reactive/kotlinx-coroutines-rx3/test/ObservableTest.kt b/reactive/kotlinx-coroutines-rx3/test/ObservableTest.kt index c6a6be56e3..9a4a849211 100644 --- a/reactive/kotlinx-coroutines-rx3/test/ObservableTest.kt +++ b/reactive/kotlinx-coroutines-rx3/test/ObservableTest.kt @@ -4,13 +4,10 @@ package kotlinx.coroutines.rx3 -import io.reactivex.rxjava3.core.* -import io.reactivex.rxjava3.plugins.* import kotlinx.coroutines.* import kotlinx.coroutines.CancellationException import org.junit.* import org.junit.Test -import java.util.concurrent.* import kotlin.test.* class ObservableTest : TestBase() { @@ -137,28 +134,4 @@ class ObservableTest : TestBase() { expect(4) } } - - @Test - fun testExceptionAfterCancellation() { - // Test that no exceptions were reported to the global EH (it will fail the test if so) - val handler = { e: Throwable -> - assertFalse(e is CancellationException) - } - withExceptionHandler(handler) { - RxJavaPlugins.setErrorHandler { - require(it !is CancellationException) - } - Observable - .interval(1, TimeUnit.MILLISECONDS) - .take(1000) - .switchMapSingle { - rxSingle { - timeBomb().await() - } - } - .blockingSubscribe({}, {}) - } - } - - private fun timeBomb() = Single.timer(1, TimeUnit.MILLISECONDS).doOnSuccess { throw TestException() } }