From 58ad0cdfca81ab2d5c5c410f889293c51beede75 Mon Sep 17 00:00:00 2001 From: Ibraheem Zaman Date: Wed, 29 Jul 2020 18:46:49 +0400 Subject: [PATCH 1/2] Make exception after cancellation test more robust Ensure that the RX error handler actually gets invoked --- .../test/ObservableTest.kt | 21 +++++++++++-------- .../test/ObservableTest.kt | 21 +++++++++++-------- 2 files changed, 24 insertions(+), 18 deletions(-) diff --git a/reactive/kotlinx-coroutines-rx2/test/ObservableTest.kt b/reactive/kotlinx-coroutines-rx2/test/ObservableTest.kt index 4f7fa547d2..cf42dea570 100644 --- a/reactive/kotlinx-coroutines-rx2/test/ObservableTest.kt +++ b/reactive/kotlinx-coroutines-rx2/test/ObservableTest.kt @@ -5,12 +5,13 @@ package kotlinx.coroutines.rx2 import io.reactivex.* -import io.reactivex.plugins.* +import io.reactivex.exceptions.* import kotlinx.coroutines.* import kotlinx.coroutines.CancellationException import org.junit.* import org.junit.Test import java.util.concurrent.* +import java.util.concurrent.atomic.* import kotlin.test.* class ObservableTest : TestBase() { @@ -140,24 +141,26 @@ class ObservableTest : TestBase() { @Test fun testExceptionAfterCancellation() { - // Test that no exceptions were reported to the global EH (it will fail the test if so) - val handler = { e: Throwable -> - assertFalse(e is CancellationException) + // Test that no exceptions were reported to the coroutine EH (it will fail the test if so) + val coroutineExceptionHandler = CoroutineExceptionHandler { _, _ -> expectUnreached() } + val rxExceptionHandlerInvocations = AtomicInteger() + val rxExceptionHandler: (Throwable) -> Unit = { error -> + assertTrue(error is UndeliverableException) + assertTrue(error.cause is TestException) + rxExceptionHandlerInvocations.getAndIncrement() } - withExceptionHandler(handler) { - RxJavaPlugins.setErrorHandler { - require(it !is CancellationException) - } + withExceptionHandler(rxExceptionHandler) { Observable .interval(1, TimeUnit.MILLISECONDS) .take(1000) .switchMapSingle { - rxSingle { + rxSingle(coroutineExceptionHandler) { timeBomb().await() } } .blockingSubscribe({}, {}) } + assertTrue(rxExceptionHandlerInvocations.get() > 0) } private fun timeBomb() = Single.timer(1, TimeUnit.MILLISECONDS).doOnSuccess { throw TestException() } diff --git a/reactive/kotlinx-coroutines-rx3/test/ObservableTest.kt b/reactive/kotlinx-coroutines-rx3/test/ObservableTest.kt index c6a6be56e3..3cff07a65d 100644 --- a/reactive/kotlinx-coroutines-rx3/test/ObservableTest.kt +++ b/reactive/kotlinx-coroutines-rx3/test/ObservableTest.kt @@ -5,12 +5,13 @@ package kotlinx.coroutines.rx3 import io.reactivex.rxjava3.core.* -import io.reactivex.rxjava3.plugins.* +import io.reactivex.rxjava3.exceptions.* import kotlinx.coroutines.* import kotlinx.coroutines.CancellationException import org.junit.* import org.junit.Test import java.util.concurrent.* +import java.util.concurrent.atomic.* import kotlin.test.* class ObservableTest : TestBase() { @@ -140,24 +141,26 @@ class ObservableTest : TestBase() { @Test fun testExceptionAfterCancellation() { - // Test that no exceptions were reported to the global EH (it will fail the test if so) - val handler = { e: Throwable -> - assertFalse(e is CancellationException) + // Test that no exceptions were reported to the coroutine EH (it will fail the test if so) + val coroutineExceptionHandler = CoroutineExceptionHandler { _, _ -> expectUnreached() } + val rxExceptionHandlerInvocations = AtomicInteger() + val rxExceptionHandler: (Throwable) -> Unit = { error -> + assertTrue(error is UndeliverableException) + assertTrue(error.cause is TestException) + rxExceptionHandlerInvocations.getAndIncrement() } - withExceptionHandler(handler) { - RxJavaPlugins.setErrorHandler { - require(it !is CancellationException) - } + withExceptionHandler(rxExceptionHandler) { Observable .interval(1, TimeUnit.MILLISECONDS) .take(1000) .switchMapSingle { - rxSingle { + rxSingle(coroutineExceptionHandler) { timeBomb().await() } } .blockingSubscribe({}, {}) } + assertTrue(rxExceptionHandlerInvocations.get() > 0) } private fun timeBomb() = Single.timer(1, TimeUnit.MILLISECONDS).doOnSuccess { throw TestException() } From 65e13b8590dd39b462eea5954c221e5dd0ed8e8a Mon Sep 17 00:00:00 2001 From: Ibraheem Zaman Date: Fri, 31 Jul 2020 04:43:19 +0400 Subject: [PATCH 2/2] Handle undeliverable errors in rxObservable If an error can't be delivered to the observer due to the observer already having disposed the subscription, then it should be reported to the RX global exception handler. This was already being done in the implementations for rxSingle, rxMaybe, and rxCompletable, but not in rxObservable. Copied the test over from the other implementations as well. Also removed the previous test in ObservableTest, which actually was testing rxSingle instead of rxObservable, and was too complicated and dependent on race conditions (both rxSingle and rxObservable now have a simple and deterministic test for undeliverable error handling). It was also not well implemented, and I originally tried to fix it to be more robust in commit 58ad0cdfca81ab2d5c5c410f889293c51beede75. Fixes #2173. --- .../src/RxObservable.kt | 8 ++-- .../test/ObservableExceptionHandlingTest.kt | 44 +++++++++++++++++++ .../test/ObservableTest.kt | 30 ------------- .../src/RxObservable.kt | 8 ++-- .../test/ObservableExceptionHandlingTest.kt | 44 +++++++++++++++++++ .../test/ObservableTest.kt | 30 ------------- 6 files changed, 94 insertions(+), 70 deletions(-) diff --git a/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt b/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt index 2f483879b8..454e0f2ff0 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt @@ -164,12 +164,10 @@ private class RxObservableCoroutine( * Such behaviour is inconsistent, leads to silent failures and we can't possibly know whether * the cause will be handled by onError (and moreover, it depends on whether a fatal exception was * thrown by subscriber or upstream). - * To make behaviour consistent and least surprising, we always handle fatal exceptions - * by coroutines machinery, anyway, they should not be present in regular program flow, - * thus our goal here is just to expose it as soon as possible. + * To make behaviour consistent and least surprising, we always deliver fatal exceptions to the + * RX global exception handler. */ - subscriber.tryOnError(cause) - if (!handled && cause.isFatal()) { + if (!subscriber.tryOnError(cause) || (!handled && cause.isFatal())) { handleUndeliverableException(cause, context) } } diff --git a/reactive/kotlinx-coroutines-rx2/test/ObservableExceptionHandlingTest.kt b/reactive/kotlinx-coroutines-rx2/test/ObservableExceptionHandlingTest.kt index d6cdd3ca24..a09993218c 100644 --- a/reactive/kotlinx-coroutines-rx2/test/ObservableExceptionHandlingTest.kt +++ b/reactive/kotlinx-coroutines-rx2/test/ObservableExceptionHandlingTest.kt @@ -4,6 +4,8 @@ package kotlinx.coroutines.rx2 +import io.reactivex.* +import io.reactivex.disposables.* import io.reactivex.exceptions.* import kotlinx.coroutines.* import org.junit.* @@ -131,4 +133,46 @@ class ObservableExceptionHandlingTest : TestBase() { }, { expect(3) }) finish(5) } + + @Test + fun testUnhandledException() = runTest { + expect(1) + var disposable: Disposable? = null + val handler = { e: Throwable -> + assertTrue(e is UndeliverableException && e.cause is TestException) + expect(5) + } + val observable = rxObservable(currentDispatcher()) { + expect(4) + disposable!!.dispose() // cancel our own subscription, so that delay will get cancelled + try { + delay(Long.MAX_VALUE) + } finally { + throw TestException() // would not be able to handle it since mono is disposed + } + } + withExceptionHandler(handler) { + observable.subscribe(object : Observer { + override fun onSubscribe(d: Disposable) { + expect(2) + disposable = d + } + + override fun onNext(t: Nothing) { + expectUnreached() + } + + override fun onError(t: Throwable) { + expectUnreached() + } + + override fun onComplete() { + expectUnreached() + } + }) + expect(3) + yield() // run coroutine + finish(6) + } + } } diff --git a/reactive/kotlinx-coroutines-rx2/test/ObservableTest.kt b/reactive/kotlinx-coroutines-rx2/test/ObservableTest.kt index cf42dea570..22d08ca752 100644 --- a/reactive/kotlinx-coroutines-rx2/test/ObservableTest.kt +++ b/reactive/kotlinx-coroutines-rx2/test/ObservableTest.kt @@ -4,14 +4,10 @@ package kotlinx.coroutines.rx2 -import io.reactivex.* -import io.reactivex.exceptions.* import kotlinx.coroutines.* import kotlinx.coroutines.CancellationException import org.junit.* import org.junit.Test -import java.util.concurrent.* -import java.util.concurrent.atomic.* import kotlin.test.* class ObservableTest : TestBase() { @@ -138,30 +134,4 @@ class ObservableTest : TestBase() { expect(4) } } - - @Test - fun testExceptionAfterCancellation() { - // Test that no exceptions were reported to the coroutine EH (it will fail the test if so) - val coroutineExceptionHandler = CoroutineExceptionHandler { _, _ -> expectUnreached() } - val rxExceptionHandlerInvocations = AtomicInteger() - val rxExceptionHandler: (Throwable) -> Unit = { error -> - assertTrue(error is UndeliverableException) - assertTrue(error.cause is TestException) - rxExceptionHandlerInvocations.getAndIncrement() - } - withExceptionHandler(rxExceptionHandler) { - Observable - .interval(1, TimeUnit.MILLISECONDS) - .take(1000) - .switchMapSingle { - rxSingle(coroutineExceptionHandler) { - timeBomb().await() - } - } - .blockingSubscribe({}, {}) - } - assertTrue(rxExceptionHandlerInvocations.get() > 0) - } - - private fun timeBomb() = Single.timer(1, TimeUnit.MILLISECONDS).doOnSuccess { throw TestException() } } \ No newline at end of file diff --git a/reactive/kotlinx-coroutines-rx3/src/RxObservable.kt b/reactive/kotlinx-coroutines-rx3/src/RxObservable.kt index 102d06ea60..3366cafa85 100644 --- a/reactive/kotlinx-coroutines-rx3/src/RxObservable.kt +++ b/reactive/kotlinx-coroutines-rx3/src/RxObservable.kt @@ -150,12 +150,10 @@ private class RxObservableCoroutine( * Such behaviour is inconsistent, leads to silent failures and we can't possibly know whether * the cause will be handled by onError (and moreover, it depends on whether a fatal exception was * thrown by subscriber or upstream). - * To make behaviour consistent and least surprising, we always handle fatal exceptions - * by coroutines machinery, anyway, they should not be present in regular program flow, - * thus our goal here is just to expose it as soon as possible. + * To make behaviour consistent and least surprising, we always deliver fatal exceptions to the + * RX global exception handler. */ - subscriber.tryOnError(cause) - if (!handled && cause.isFatal()) { + if (!subscriber.tryOnError(cause) || (!handled && cause.isFatal())) { handleUndeliverableException(cause, context) } } diff --git a/reactive/kotlinx-coroutines-rx3/test/ObservableExceptionHandlingTest.kt b/reactive/kotlinx-coroutines-rx3/test/ObservableExceptionHandlingTest.kt index 1183b2ae21..35c536cdd2 100644 --- a/reactive/kotlinx-coroutines-rx3/test/ObservableExceptionHandlingTest.kt +++ b/reactive/kotlinx-coroutines-rx3/test/ObservableExceptionHandlingTest.kt @@ -4,6 +4,8 @@ package kotlinx.coroutines.rx3 +import io.reactivex.rxjava3.core.* +import io.reactivex.rxjava3.disposables.* import io.reactivex.rxjava3.exceptions.* import kotlinx.coroutines.* import org.junit.* @@ -131,4 +133,46 @@ class ObservableExceptionHandlingTest : TestBase() { }, { expect(3) }) finish(5) } + + @Test + fun testUnhandledException() = runTest { + expect(1) + var disposable: Disposable? = null + val handler = { e: Throwable -> + assertTrue(e is UndeliverableException && e.cause is TestException) + expect(5) + } + val observable = rxObservable(currentDispatcher()) { + expect(4) + disposable!!.dispose() // cancel our own subscription, so that delay will get cancelled + try { + delay(Long.MAX_VALUE) + } finally { + throw TestException() // would not be able to handle it since mono is disposed + } + } + withExceptionHandler(handler) { + observable.subscribe(object : Observer { + override fun onSubscribe(d: Disposable) { + expect(2) + disposable = d + } + + override fun onNext(t: Nothing) { + expectUnreached() + } + + override fun onError(t: Throwable) { + expectUnreached() + } + + override fun onComplete() { + expectUnreached() + } + }) + expect(3) + yield() // run coroutine + finish(6) + } + } } diff --git a/reactive/kotlinx-coroutines-rx3/test/ObservableTest.kt b/reactive/kotlinx-coroutines-rx3/test/ObservableTest.kt index 3cff07a65d..9a4a849211 100644 --- a/reactive/kotlinx-coroutines-rx3/test/ObservableTest.kt +++ b/reactive/kotlinx-coroutines-rx3/test/ObservableTest.kt @@ -4,14 +4,10 @@ package kotlinx.coroutines.rx3 -import io.reactivex.rxjava3.core.* -import io.reactivex.rxjava3.exceptions.* import kotlinx.coroutines.* import kotlinx.coroutines.CancellationException import org.junit.* import org.junit.Test -import java.util.concurrent.* -import java.util.concurrent.atomic.* import kotlin.test.* class ObservableTest : TestBase() { @@ -138,30 +134,4 @@ class ObservableTest : TestBase() { expect(4) } } - - @Test - fun testExceptionAfterCancellation() { - // Test that no exceptions were reported to the coroutine EH (it will fail the test if so) - val coroutineExceptionHandler = CoroutineExceptionHandler { _, _ -> expectUnreached() } - val rxExceptionHandlerInvocations = AtomicInteger() - val rxExceptionHandler: (Throwable) -> Unit = { error -> - assertTrue(error is UndeliverableException) - assertTrue(error.cause is TestException) - rxExceptionHandlerInvocations.getAndIncrement() - } - withExceptionHandler(rxExceptionHandler) { - Observable - .interval(1, TimeUnit.MILLISECONDS) - .take(1000) - .switchMapSingle { - rxSingle(coroutineExceptionHandler) { - timeBomb().await() - } - } - .blockingSubscribe({}, {}) - } - assertTrue(rxExceptionHandlerInvocations.get() > 0) - } - - private fun timeBomb() = Single.timer(1, TimeUnit.MILLISECONDS).doOnSuccess { throw TestException() } }