From 4fbe388ea71e34efddb065e206ad6b22f5fa5d1d Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Fri, 28 Jun 2019 15:33:15 +0300 Subject: [PATCH 1/3] Consistently handle fatal exceptions in PublisherCoroutine Fixes #1297 --- .../src/Publish.kt | 43 ++-- .../test/FlowableExceptionHandlingTest.kt | 221 ++++++++++++++++++ .../kotlinx-coroutines-rx2/test/MaybeTest.kt | 2 +- 3 files changed, 252 insertions(+), 14 deletions(-) create mode 100644 reactive/kotlinx-coroutines-rx2/test/FlowableExceptionHandlingTest.kt diff --git a/reactive/kotlinx-coroutines-reactive/src/Publish.kt b/reactive/kotlinx-coroutines-reactive/src/Publish.kt index 843c94c8d6..d4ebf7360a 100644 --- a/reactive/kotlinx-coroutines-reactive/src/Publish.kt +++ b/reactive/kotlinx-coroutines-reactive/src/Publish.kt @@ -198,17 +198,32 @@ private class PublisherCoroutine( if (cancelled) { // If the parent had failed to handle our exception, then we must not lose this exception if (cause != null && !handled) handleCoroutineException(context, cause) - } else { - try { - if (cause != null && cause !is CancellationException) { + return + } + + try { + if (cause != null && cause !is CancellationException) { + /* + * Reactive frameworks have two types of exceptions: regular and fatal. + * Regular are passed to onError. + * Fatal can be passed to onError, but implementation **is free to swallow it** (e.g. see #1297). + * Such behaviour is inconsistent, leads to silent failures and we can't possibly know whether + * the cause will be handled by onError (and moreover, it depends on whether a fatal exception was + * thrown by subscriber or upstream). + * To make behaviour consistent and least surprising, we always handle fatal exceptions + * by coroutines machinery, anyway, they should not be present in regular program flow, + * thus our goal here is just to expose it as soon as possible. + */ + if (cause.isFatal()) { + if (!handled) handleCoroutineException(context, cause) + } else { subscriber.onError(cause) } - else { - subscriber.onComplete() - } - } catch (e: Throwable) { - handleCoroutineException(context, e) + } else { + subscriber.onComplete() } + } catch (e: Throwable) { + handleCoroutineException(context, e) } } } finally { @@ -242,12 +257,12 @@ private class PublisherCoroutine( // assert: isCompleted private fun signalCompleted(cause: Throwable?, handled: Boolean) { while (true) { // lock-free loop for nRequested - val cur = _nRequested.value - if (cur == SIGNALLED) return // some other thread holding lock already signalled cancellation/completion - check(cur >= 0) // no other thread could have marked it as CLOSED, because onCompleted[Exceptionally] is invoked once - if (!_nRequested.compareAndSet(cur, CLOSED)) continue // retry on failed CAS + val current = _nRequested.value + if (current == SIGNALLED) return // some other thread holding lock already signalled cancellation/completion + check(current >= 0) // no other thread could have marked it as CLOSED, because onCompleted[Exceptionally] is invoked once + if (!_nRequested.compareAndSet(current, CLOSED)) continue // retry on failed CAS // Ok -- marked as CLOSED, now can unlock the mutex if it was locked due to backpressure - if (cur == 0L) { + if (current == 0L) { doLockedSignalCompleted(cause, handled) } else { // otherwise mutex was either not locked or locked in concurrent onNext... try lock it to signal completion @@ -272,4 +287,6 @@ private class PublisherCoroutine( cancelled = true super.cancel(null) } + + private fun Throwable.isFatal() = this is VirtualMachineError || this is ThreadDeath || this is LinkageError } diff --git a/reactive/kotlinx-coroutines-rx2/test/FlowableExceptionHandlingTest.kt b/reactive/kotlinx-coroutines-rx2/test/FlowableExceptionHandlingTest.kt new file mode 100644 index 0000000000..4195fc3bab --- /dev/null +++ b/reactive/kotlinx-coroutines-rx2/test/FlowableExceptionHandlingTest.kt @@ -0,0 +1,221 @@ +/* + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.rx2 + +import kotlinx.coroutines.* +import org.junit.* +import org.junit.Test +import kotlin.test.* + +class FlowableExceptionHandlingTest : TestBase() { + + @Before + fun setup() { + ignoreLostThreads("RxComputationThreadPool-", "RxCachedWorkerPoolEvictor-", "RxSchedulerPurge-") + } + + @Test + fun testException() = runTest(expected = { it is TestException }) { + rxFlowable(Dispatchers.Unconfined) { + expect(1) + throw TestException() + }.subscribe({ + expectUnreached() + }, { + expect(2) // Reported to onError + }) + finish(3) + } + + @Test + fun testFatalException() = runTest(expected = { it is LinkageError }) { + rxFlowable(Dispatchers.Unconfined + CoroutineExceptionHandler { _, _ -> expectUnreached() }) { + expect(1) + throw LinkageError() + }.subscribe({ + expectUnreached() + }, { + expectUnreached() // Fatal exception is not reported in onError + }) + finish(2) + } + + @Test + fun testExceptionWithoutParent() = runTest { + GlobalScope.rxFlowable(Dispatchers.Unconfined + CoroutineExceptionHandler { _, _ -> expectUnreached() }) { + expect(1) + throw TestException() + }.subscribe({ + expectUnreached() + }, { + assertTrue(it is TestException) + expect(2) // Reported to onError + }) + finish(3) + } + + @Test + fun testFatalExceptionWithoutParent() = runTest { + GlobalScope.rxFlowable(Dispatchers.Unconfined + CoroutineExceptionHandler { _, e -> + assertTrue(e is LinkageError); expect( + 2 + ) + }) { + expect(1) + throw LinkageError() + }.subscribe({ + expectUnreached() + }, { + expectUnreached() // Fatal exception is not reported in onError + }) + finish(3) + } + + @Test + fun testExceptionAsynchronous() = runTest(expected = { it is TestException }) { + rxFlowable(Dispatchers.Unconfined) { + expect(1) + throw TestException() + }.publish() + .refCount() + .subscribe({ + expectUnreached() + }, { + expect(2) // Reported to onError + }) + finish(3) + } + + @Test + fun testFatalExceptionAsynchronous() = runTest(expected = { it is LinkageError }) { + rxFlowable(Dispatchers.Unconfined + CoroutineExceptionHandler { _, _ -> expectUnreached() }) { + expect(1) + throw LinkageError() + }.publish() + .refCount() + .subscribe({ + expectUnreached() + }, { + expectUnreached() // Fatal exception is not reported in onError + }) + finish(2) + } + + @Test + fun testExceptionAsynchronousWithoutParent() = runTest { + GlobalScope.rxFlowable(Dispatchers.Unconfined + CoroutineExceptionHandler { _, _ -> expectUnreached() }) { + expect(1) + throw TestException() + }.publish() + .refCount() + .subscribe({ + expectUnreached() + }, { + expect(2) // Reported to onError + }) + finish(3) + } + + @Test + fun testFatalExceptionAsynchronousWithoutParent() = runTest { + GlobalScope.rxFlowable(Dispatchers.Unconfined + CoroutineExceptionHandler { _, e -> + assertTrue(e is LinkageError); expect(2) + }) { + expect(1) + throw LinkageError() + }.publish() + .refCount() + .subscribe({ + expectUnreached() + }, { + expectUnreached() // Fatal exception is not reported in onError + }) + finish(3) + } + + @Test + fun testFatalExceptionFromSubscribe() = runTest(expected = { it is LinkageError }) { + rxFlowable(Dispatchers.Unconfined) { + expect(1) + send(Unit) + }.subscribe({ + expect(2) + throw LinkageError() + }, { expectUnreached() }) // Unreached because fatal errors are rethrown + finish(3) + } + + @Test + fun testExceptionFromSubscribe() = runTest { + rxFlowable(Dispatchers.Unconfined) { + expect(1) + send(Unit) + }.subscribe({ + expect(2) + throw TestException() + }, { expect(3) }) // not reported to onError because came from the subscribe itself + finish(4) + } + + @Test + fun testAsynchronousExceptionFromSubscribe() = runTest { + rxFlowable(Dispatchers.Unconfined) { + expect(1) + send(Unit) + }.publish() + .refCount() + .subscribe({ + expect(2) + throw RuntimeException() + }, { expect(3) }) + finish(4) + } + + @Test + fun testAsynchronousFatalExceptionFromSubscribe() = runTest(expected = { it is LinkageError }) { + rxFlowable(Dispatchers.Unconfined) { + expect(1) + send(Unit) + }.publish() + .refCount() + .subscribe({ + expect(2) + throw LinkageError() + }, { expectUnreached() }) + finish(3) + } + + @Test + fun testAsynchronousExceptionFromSubscribeWithoutParent() = + runTest { + GlobalScope.rxFlowable(Dispatchers.Unconfined) { + expect(1) + send(Unit) + }.publish() + .refCount() + .subscribe({ + expect(2) + throw RuntimeException() + }, { expect(3) }) + finish(4) + } + + @Test + fun testAsynchronousFatalExceptionFromSubscribeWithoutParent() = + runTest { + GlobalScope.rxFlowable(Dispatchers.Unconfined + CoroutineExceptionHandler { _, e -> + assertTrue(e is LinkageError); expect(3) + }) { + expect(1) + send(Unit) + }.publish() + .refCount() + .subscribe({ + expect(2) + throw LinkageError() + }, { expectUnreached() }) + finish(4) + } +} diff --git a/reactive/kotlinx-coroutines-rx2/test/MaybeTest.kt b/reactive/kotlinx-coroutines-rx2/test/MaybeTest.kt index 5a9bac2fdb..4ddeaa9c63 100644 --- a/reactive/kotlinx-coroutines-rx2/test/MaybeTest.kt +++ b/reactive/kotlinx-coroutines-rx2/test/MaybeTest.kt @@ -31,7 +31,7 @@ class MaybeTest : TestBase() { expect(2) maybe.subscribe { value -> expect(5) - Assert.assertThat(value, IsEqual("OK")) + assertThat(value, IsEqual("OK")) } expect(3) yield() // to started coroutine From c5fe42a5d84c0fbe38cd6f9c34f09ee4ae968757 Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Fri, 28 Jun 2019 16:47:30 +0300 Subject: [PATCH 2/3] Properly handle fatal exceptions in Rx coroutines, get rid of deadlock in RxObservable --- .../src/Publish.kt | 10 +- .../src/RxCompletable.kt | 12 +- .../kotlinx-coroutines-rx2/src/RxMaybe.kt | 12 +- .../src/RxObservable.kt | 42 +++- .../kotlinx-coroutines-rx2/src/RxSingle.kt | 12 +- .../test/CompletableTest.kt | 17 ++ .../test/ConvertTest.kt | 2 +- .../kotlinx-coroutines-rx2/test/MaybeTest.kt | 17 ++ .../test/ObservableExceptionHandlingTest.kt | 221 ++++++++++++++++++ .../kotlinx-coroutines-rx2/test/SingleTest.kt | 21 ++ 10 files changed, 344 insertions(+), 22 deletions(-) create mode 100644 reactive/kotlinx-coroutines-rx2/test/ObservableExceptionHandlingTest.kt diff --git a/reactive/kotlinx-coroutines-reactive/src/Publish.kt b/reactive/kotlinx-coroutines-reactive/src/Publish.kt index d4ebf7360a..f0dac3d409 100644 --- a/reactive/kotlinx-coroutines-reactive/src/Publish.kt +++ b/reactive/kotlinx-coroutines-reactive/src/Publish.kt @@ -160,11 +160,11 @@ private class PublisherCoroutine( } // now update nRequested while (true) { // lock-free loop on nRequested - val cur = _nRequested.value - if (cur < 0) break // closed from inside onNext => unlock - if (cur == Long.MAX_VALUE) break // no back-pressure => unlock - val upd = cur - 1 - if (_nRequested.compareAndSet(cur, upd)) { + val current = _nRequested.value + if (current < 0) break // closed from inside onNext => unlock + if (current == Long.MAX_VALUE) break // no back-pressure => unlock + val upd = current - 1 + if (_nRequested.compareAndSet(current, upd)) { if (upd == 0L) { // return to keep locked due to back-pressure return diff --git a/reactive/kotlinx-coroutines-rx2/src/RxCompletable.kt b/reactive/kotlinx-coroutines-rx2/src/RxCompletable.kt index 61046f280b..0da06776a5 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxCompletable.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxCompletable.kt @@ -61,12 +61,20 @@ private class RxCompletableCoroutine( private val subscriber: CompletableEmitter ) : AbstractCoroutine(parentContext, true) { override fun onCompleted(value: Unit) { - if (!subscriber.isDisposed) subscriber.onComplete() + try { + if (!subscriber.isDisposed) subscriber.onComplete() + } catch (e: Throwable) { + handleCoroutineException(context, e) + } } override fun onCancelled(cause: Throwable, handled: Boolean) { if (!subscriber.isDisposed) { - subscriber.onError(cause) + try { + subscriber.onError(cause) + } catch (e: Throwable) { + handleCoroutineException(context, e) + } } else if (!handled) { handleCoroutineException(context, cause) } diff --git a/reactive/kotlinx-coroutines-rx2/src/RxMaybe.kt b/reactive/kotlinx-coroutines-rx2/src/RxMaybe.kt index e93ae6b2d4..fbc366c6df 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxMaybe.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxMaybe.kt @@ -63,13 +63,21 @@ private class RxMaybeCoroutine( ) : AbstractCoroutine(parentContext, true) { override fun onCompleted(value: T) { if (!subscriber.isDisposed) { - if (value == null) subscriber.onComplete() else subscriber.onSuccess(value) + try { + if (value == null) subscriber.onComplete() else subscriber.onSuccess(value) + } catch(e: Throwable) { + handleCoroutineException(context, e) + } } } override fun onCancelled(cause: Throwable, handled: Boolean) { if (!subscriber.isDisposed) { - subscriber.onError(cause) + try { + subscriber.onError(cause) + } catch (e: Throwable) { + handleCoroutineException(context, e) + } } else if (!handled) { handleCoroutineException(context, cause) } diff --git a/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt b/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt index 35176f1484..858b120b57 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt @@ -131,15 +131,19 @@ private class RxObservableCoroutine( // to abort the corresponding send/offer invocation. From the standpoint of coroutines machinery, // this failure is essentially equivalent to a failure of a child coroutine. cancelCoroutine(e) - doLockedSignalCompleted(e, false) + mutex.unlock() throw e } /* - There is no sense to check for `isActive` before doing `unlock`, because cancellation/completion might - happen after this check and before `unlock` (see signalCompleted that does not do anything - if it fails to acquire the lock that we are still holding). - We have to recheck `isCompleted` after `unlock` anyway. + * There is no sense to check for `isActive` before doing `unlock`, because cancellation/completion might + * happen after this check and before `unlock` (see signalCompleted that does not do anything + * if it fails to acquire the lock that we are still holding). + * We have to recheck `isCompleted` after `unlock` anyway. */ + unlockAndCheckCompleted() + } + + private fun unlockAndCheckCompleted() { mutex.unlock() // recheck isActive if (!isActive && mutex.tryLock()) @@ -148,16 +152,32 @@ private class RxObservableCoroutine( // assert: mutex.isLocked() private fun doLockedSignalCompleted(cause: Throwable?, handled: Boolean) { - // todo: handled is ignored here, might need something like in PublisherCoroutine to process // cancellation failures try { if (_signal.value >= CLOSED) { _signal.value = SIGNALLED // we'll signal onError/onCompleted (that the final state -- no CAS needed) try { - if (cause != null && cause !is CancellationException) - subscriber.onError(cause) - else + if (cause != null && cause !is CancellationException) { + /* + * Reactive frameworks have two types of exceptions: regular and fatal. + * Regular are passed to onError. + * Fatal can be passed to onError, but implementation **is free to swallow it** (e.g. see #1297). + * Such behaviour is inconsistent, leads to silent failures and we can't possibly know whether + * the cause will be handled by onError (and moreover, it depends on whether a fatal exception was + * thrown by subscriber or upstream). + * To make behaviour consistent and least surprising, we always handle fatal exceptions + * by coroutines machinery, anyway, they should not be present in regular program flow, + * thus our goal here is just to expose it as soon as possible. + */ + if (cause.isFatal()) { + if (!handled) handleCoroutineException(context, cause) + } else { + subscriber.onError(cause) + } + } + else { subscriber.onComplete() + } } catch (e: Throwable) { // Unhandled exception (cannot handle in other way, since we are already complete) handleCoroutineException(context, e) @@ -181,4 +201,6 @@ private class RxObservableCoroutine( override fun onCancelled(cause: Throwable, handled: Boolean) { signalCompleted(cause, handled) } -} \ No newline at end of file +} + +internal fun Throwable.isFatal() = this is VirtualMachineError || this is ThreadDeath || this is LinkageError diff --git a/reactive/kotlinx-coroutines-rx2/src/RxSingle.kt b/reactive/kotlinx-coroutines-rx2/src/RxSingle.kt index e382bbe39d..b6cebf097c 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxSingle.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxSingle.kt @@ -61,12 +61,20 @@ private class RxSingleCoroutine( private val subscriber: SingleEmitter ) : AbstractCoroutine(parentContext, true) { override fun onCompleted(value: T) { - if (!subscriber.isDisposed) subscriber.onSuccess(value) + try { + if (!subscriber.isDisposed) subscriber.onSuccess(value) + } catch (e: Throwable) { + handleCoroutineException(context, e) + } } override fun onCancelled(cause: Throwable, handled: Boolean) { if (!subscriber.isDisposed) { - subscriber.onError(cause) + try { + subscriber.onError(cause) + } catch (e: Throwable) { + handleCoroutineException(context, e) + } } else if (!handled) { handleCoroutineException(context, cause) } diff --git a/reactive/kotlinx-coroutines-rx2/test/CompletableTest.kt b/reactive/kotlinx-coroutines-rx2/test/CompletableTest.kt index fd159641bb..a7caea4793 100644 --- a/reactive/kotlinx-coroutines-rx2/test/CompletableTest.kt +++ b/reactive/kotlinx-coroutines-rx2/test/CompletableTest.kt @@ -147,4 +147,21 @@ class CompletableTest : TestBase() { yield() // run coroutine finish(6) } + + @Test + fun testFatalExceptionInSubscribe() = runTest { + GlobalScope.rxCompletable(Dispatchers.Unconfined + CoroutineExceptionHandler{ _, e -> assertTrue(e is LinkageError); expect(2)}) { + expect(1) + 42 + }.subscribe({ throw LinkageError() }) + finish(3) + } + + @Test + fun testFatalExceptionInSingle() = runTest { + GlobalScope.rxCompletable(Dispatchers.Unconfined) { + throw LinkageError() + }.subscribe({ expectUnreached() }, { expect(1); assertTrue(it is LinkageError) }) + finish(2) + } } diff --git a/reactive/kotlinx-coroutines-rx2/test/ConvertTest.kt b/reactive/kotlinx-coroutines-rx2/test/ConvertTest.kt index ba14b89ca8..42f53bd596 100644 --- a/reactive/kotlinx-coroutines-rx2/test/ConvertTest.kt +++ b/reactive/kotlinx-coroutines-rx2/test/ConvertTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.rx2 diff --git a/reactive/kotlinx-coroutines-rx2/test/MaybeTest.kt b/reactive/kotlinx-coroutines-rx2/test/MaybeTest.kt index 4ddeaa9c63..dcd66638e5 100644 --- a/reactive/kotlinx-coroutines-rx2/test/MaybeTest.kt +++ b/reactive/kotlinx-coroutines-rx2/test/MaybeTest.kt @@ -277,4 +277,21 @@ class MaybeTest : TestBase() { yield() // run coroutine finish(6) } + + @Test + fun testFatalExceptionInSubscribe() = runTest { + GlobalScope.rxMaybe(Dispatchers.Unconfined + CoroutineExceptionHandler{ _, e -> assertTrue(e is LinkageError); expect(2)}) { + expect(1) + 42 + }.subscribe({ throw LinkageError() }) + finish(3) + } + + @Test + fun testFatalExceptionInSingle() = runTest { + GlobalScope.rxMaybe(Dispatchers.Unconfined) { + throw LinkageError() + }.subscribe({ expectUnreached() }, { expect(1); assertTrue(it is LinkageError) }) + finish(2) + } } diff --git a/reactive/kotlinx-coroutines-rx2/test/ObservableExceptionHandlingTest.kt b/reactive/kotlinx-coroutines-rx2/test/ObservableExceptionHandlingTest.kt new file mode 100644 index 0000000000..9d1ae91ff5 --- /dev/null +++ b/reactive/kotlinx-coroutines-rx2/test/ObservableExceptionHandlingTest.kt @@ -0,0 +1,221 @@ +/* + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.rx2 + +import kotlinx.coroutines.* +import org.junit.* +import org.junit.Test +import kotlin.test.* + +class ObservableExceptionHandlingTest : TestBase() { + + @Before + fun setup() { + ignoreLostThreads("RxComputationThreadPool-", "RxCachedWorkerPoolEvictor-", "RxSchedulerPurge-") + } + + @Test + fun testException() = runTest(expected = { it is TestException }) { + rxObservable(Dispatchers.Unconfined) { + expect(1) + throw TestException() + }.subscribe({ + expectUnreached() + }, { + expect(2) // Reported to onError + }) + finish(3) + } + + @Test + fun testFatalException() = runTest(expected = { it is LinkageError }) { + rxObservable(Dispatchers.Unconfined + CoroutineExceptionHandler { _, _ -> expectUnreached() }) { + expect(1) + throw LinkageError() + }.subscribe({ + expectUnreached() + }, { + expectUnreached() // Fatal exception is not reported in onError + }) + finish(2) + } + + @Test + fun testExceptionWithoutParent() = runTest { + GlobalScope.rxObservable(Dispatchers.Unconfined + CoroutineExceptionHandler { _, _ -> expectUnreached() }) { + expect(1) + throw TestException() + }.subscribe({ + expectUnreached() + }, { + assertTrue(it is TestException) + expect(2) // Reported to onError + }) + finish(3) + } + + @Test + fun testFatalExceptionWithoutParent() = runTest { + GlobalScope.rxObservable(Dispatchers.Unconfined + CoroutineExceptionHandler { _, e -> + assertTrue(e is LinkageError); expect( + 2 + ) + }) { + expect(1) + throw LinkageError() + }.subscribe({ + expectUnreached() + }, { + expectUnreached() // Fatal exception is not reported in onError + }) + finish(3) + } + + @Test + fun testExceptionAsynchronous() = runTest(expected = { it is TestException }) { + rxObservable(Dispatchers.Unconfined) { + expect(1) + throw TestException() + }.publish() + .refCount() + .subscribe({ + expectUnreached() + }, { + expect(2) // Reported to onError + }) + finish(3) + } + + @Test + fun testFatalExceptionAsynchronous() = runTest(expected = { it is LinkageError }) { + rxObservable(Dispatchers.Unconfined + CoroutineExceptionHandler { _, _ -> expectUnreached() }) { + expect(1) + throw LinkageError() + }.publish() + .refCount() + .subscribe({ + expectUnreached() + }, { + expectUnreached() // Fatal exception is not reported in onError + }) + finish(2) + } + + @Test + fun testExceptionAsynchronousWithoutParent() = runTest { + GlobalScope.rxObservable(Dispatchers.Unconfined + CoroutineExceptionHandler { _, _ -> expectUnreached() }) { + expect(1) + throw TestException() + }.publish() + .refCount() + .subscribe({ + expectUnreached() + }, { + expect(2) // Reported to onError + }) + finish(3) + } + + @Test + fun testFatalExceptionAsynchronousWithoutParent() = runTest { + GlobalScope.rxObservable(Dispatchers.Unconfined + CoroutineExceptionHandler { _, e -> + assertTrue(e is LinkageError); expect(2) + }) { + expect(1) + throw LinkageError() + }.publish() + .refCount() + .subscribe({ + expectUnreached() + }, { + expectUnreached() // Fatal exception is not reported in onError + }) + finish(3) + } + + @Test + fun testFatalExceptionFromSubscribe() = runTest(expected = { it is LinkageError }) { + rxObservable(Dispatchers.Unconfined) { + expect(1) + send(Unit) + }.subscribe({ + expect(2) + throw LinkageError() + }, { expectUnreached() }) // Unreached because fatal errors are rethrown + finish(3) + } + + @Test + fun testExceptionFromSubscribe() = runTest { + rxObservable(Dispatchers.Unconfined) { + expect(1) + send(Unit) + }.subscribe({ + expect(2) + throw TestException() + }, { expect(3) }) // not reported to onError because came from the subscribe itself + finish(4) + } + + @Test + fun testAsynchronousExceptionFromSubscribe() = runTest { + rxObservable(Dispatchers.Unconfined) { + expect(1) + send(Unit) + }.publish() + .refCount() + .subscribe({ + expect(2) + throw RuntimeException() + }, { expect(3) }) + finish(4) + } + + @Test + fun testAsynchronousFatalExceptionFromSubscribe() = runTest(expected = { it is LinkageError }) { + rxObservable(Dispatchers.Unconfined) { + expect(1) + send(Unit) + }.publish() + .refCount() + .subscribe({ + expect(2) + throw LinkageError() + }, { expectUnreached() }) + finish(3) + } + + @Test + fun testAsynchronousExceptionFromSubscribeWithoutParent() = + runTest { + GlobalScope.rxObservable(Dispatchers.Unconfined) { + expect(1) + send(Unit) + }.publish() + .refCount() + .subscribe({ + expect(2) + throw RuntimeException() + }, { expect(3) }) + finish(4) + } + + @Test + fun testAsynchronousFatalExceptionFromSubscribeWithoutParent() = + runTest { + GlobalScope.rxObservable(Dispatchers.Unconfined + CoroutineExceptionHandler { _, e -> + assertTrue(e is LinkageError); expect(3) + }) { + expect(1) + send(Unit) + }.publish() + .refCount() + .subscribe({ + expect(2) + throw LinkageError() + }, { expectUnreached() }) + finish(4) + } +} diff --git a/reactive/kotlinx-coroutines-rx2/test/SingleTest.kt b/reactive/kotlinx-coroutines-rx2/test/SingleTest.kt index 8b786164b4..fce772347b 100644 --- a/reactive/kotlinx-coroutines-rx2/test/SingleTest.kt +++ b/reactive/kotlinx-coroutines-rx2/test/SingleTest.kt @@ -6,6 +6,7 @@ package kotlinx.coroutines.rx2 import io.reactivex.* import io.reactivex.disposables.* +import io.reactivex.functions.* import kotlinx.coroutines.* import org.hamcrest.core.* import org.junit.* @@ -198,6 +199,26 @@ class SingleTest : TestBase() { } } + @Test + fun testFatalExceptionInSubscribe() = runTest { + GlobalScope.rxSingle(Dispatchers.Unconfined + CoroutineExceptionHandler { _, e -> assertTrue(e is LinkageError); expect(2) }) { + expect(1) + 42 + }.subscribe(Consumer { + throw LinkageError() + }) + finish(3) + } + + @Test + fun testFatalExceptionInSingle() = runTest { + GlobalScope.rxSingle(Dispatchers.Unconfined) { + throw LinkageError() + }.subscribe({ _, e -> assertTrue(e is LinkageError); expect(1) }) + + finish(2) + } + @Test fun testUnhandledException() = runTest { expect(1) From 182b8f97bee48a71af61f6d5bd89339cd533873e Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Wed, 17 Jul 2019 17:02:14 +0300 Subject: [PATCH 3/3] Report fatal errors to both onError (to comply the spec fully) and to coroutine context (to eagerly fail as fatal exceptions are not recoverable anyway), cleanup tests after scopeless reactive --- .../common/test/flow/terminal/SingleTest.kt | 2 +- .../src/Publish.kt | 17 +-- .../src/RxObservable.kt | 35 +++-- .../test/ConvertTest.kt | 2 +- .../test/FlowableExceptionHandlingTest.kt | 138 ++++-------------- .../test/ObservableExceptionHandlingTest.kt | 134 +++-------------- 6 files changed, 78 insertions(+), 250 deletions(-) diff --git a/kotlinx-coroutines-core/common/test/flow/terminal/SingleTest.kt b/kotlinx-coroutines-core/common/test/flow/terminal/SingleTest.kt index 5ce6d47b69..f7205957d1 100644 --- a/kotlinx-coroutines-core/common/test/flow/terminal/SingleTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/terminal/SingleTest.kt @@ -7,7 +7,7 @@ package kotlinx.coroutines.flow import kotlinx.coroutines.* import kotlin.test.* -class SingleTest : TestBase() { +class SingleTest : TestBase() { @Test fun testSingle() = runTest { diff --git a/reactive/kotlinx-coroutines-reactive/src/Publish.kt b/reactive/kotlinx-coroutines-reactive/src/Publish.kt index f0dac3d409..4c41167eac 100644 --- a/reactive/kotlinx-coroutines-reactive/src/Publish.kt +++ b/reactive/kotlinx-coroutines-reactive/src/Publish.kt @@ -163,13 +163,13 @@ private class PublisherCoroutine( val current = _nRequested.value if (current < 0) break // closed from inside onNext => unlock if (current == Long.MAX_VALUE) break // no back-pressure => unlock - val upd = current - 1 - if (_nRequested.compareAndSet(current, upd)) { - if (upd == 0L) { + val updated = current - 1 + if (_nRequested.compareAndSet(current, updated)) { + if (updated == 0L) { // return to keep locked due to back-pressure return } - break // unlock if upd > 0 + break // unlock if updated > 0 } } unlockAndCheckCompleted() @@ -206,7 +206,7 @@ private class PublisherCoroutine( /* * Reactive frameworks have two types of exceptions: regular and fatal. * Regular are passed to onError. - * Fatal can be passed to onError, but implementation **is free to swallow it** (e.g. see #1297). + * Fatal can be passed to onError, but even the standard implementations **can just swallow it** (e.g. see #1297). * Such behaviour is inconsistent, leads to silent failures and we can't possibly know whether * the cause will be handled by onError (and moreover, it depends on whether a fatal exception was * thrown by subscriber or upstream). @@ -214,10 +214,9 @@ private class PublisherCoroutine( * by coroutines machinery, anyway, they should not be present in regular program flow, * thus our goal here is just to expose it as soon as possible. */ - if (cause.isFatal()) { - if (!handled) handleCoroutineException(context, cause) - } else { - subscriber.onError(cause) + subscriber.onError(cause) + if (!handled && cause.isFatal()) { + handleCoroutineException(context, cause) } } else { subscriber.onComplete() diff --git a/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt b/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt index 858b120b57..3d0ccd824d 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt @@ -7,6 +7,7 @@ package kotlinx.coroutines.rx2 import io.reactivex.* +import io.reactivex.exceptions.* import kotlinx.atomicfu.* import kotlinx.coroutines.* import kotlinx.coroutines.channels.* @@ -159,20 +160,19 @@ private class RxObservableCoroutine( try { if (cause != null && cause !is CancellationException) { /* - * Reactive frameworks have two types of exceptions: regular and fatal. - * Regular are passed to onError. - * Fatal can be passed to onError, but implementation **is free to swallow it** (e.g. see #1297). - * Such behaviour is inconsistent, leads to silent failures and we can't possibly know whether - * the cause will be handled by onError (and moreover, it depends on whether a fatal exception was - * thrown by subscriber or upstream). - * To make behaviour consistent and least surprising, we always handle fatal exceptions - * by coroutines machinery, anyway, they should not be present in regular program flow, - * thus our goal here is just to expose it as soon as possible. - */ - if (cause.isFatal()) { - if (!handled) handleCoroutineException(context, cause) - } else { - subscriber.onError(cause) + * Reactive frameworks have two types of exceptions: regular and fatal. + * Regular are passed to onError. + * Fatal can be passed to onError, but even the standard implementations **can just swallow it** (e.g. see #1297). + * Such behaviour is inconsistent, leads to silent failures and we can't possibly know whether + * the cause will be handled by onError (and moreover, it depends on whether a fatal exception was + * thrown by subscriber or upstream). + * To make behaviour consistent and least surprising, we always handle fatal exceptions + * by coroutines machinery, anyway, they should not be present in regular program flow, + * thus our goal here is just to expose it as soon as possible. + */ + subscriber.onError(cause) + if (!handled && cause.isFatal()) { + handleCoroutineException(context, cause) } } else { @@ -203,4 +203,9 @@ private class RxObservableCoroutine( } } -internal fun Throwable.isFatal() = this is VirtualMachineError || this is ThreadDeath || this is LinkageError +internal fun Throwable.isFatal() = try { + Exceptions.throwIfFatal(this) // Rx-consistent behaviour without hardcode + false +} catch (e: Throwable) { + true +} \ No newline at end of file diff --git a/reactive/kotlinx-coroutines-rx2/test/ConvertTest.kt b/reactive/kotlinx-coroutines-rx2/test/ConvertTest.kt index 42f53bd596..758b632604 100644 --- a/reactive/kotlinx-coroutines-rx2/test/ConvertTest.kt +++ b/reactive/kotlinx-coroutines-rx2/test/ConvertTest.kt @@ -143,7 +143,7 @@ class ConvertTest : TestBase() { val single = rxSingle(Dispatchers.Unconfined) { var result = "" try { - observable.consumeEach { result += it } + observable.collect { result += it } } catch(e: Throwable) { check(e is TestException) result += e.message diff --git a/reactive/kotlinx-coroutines-rx2/test/FlowableExceptionHandlingTest.kt b/reactive/kotlinx-coroutines-rx2/test/FlowableExceptionHandlingTest.kt index 4195fc3bab..4f3e7241c6 100644 --- a/reactive/kotlinx-coroutines-rx2/test/FlowableExceptionHandlingTest.kt +++ b/reactive/kotlinx-coroutines-rx2/test/FlowableExceptionHandlingTest.kt @@ -16,96 +16,42 @@ class FlowableExceptionHandlingTest : TestBase() { ignoreLostThreads("RxComputationThreadPool-", "RxCachedWorkerPoolEvictor-", "RxSchedulerPurge-") } - @Test - fun testException() = runTest(expected = { it is TestException }) { - rxFlowable(Dispatchers.Unconfined) { - expect(1) - throw TestException() - }.subscribe({ - expectUnreached() - }, { - expect(2) // Reported to onError - }) - finish(3) + private inline fun ceh(expect: Int) = CoroutineExceptionHandler { _, t -> + assertTrue(t is T) + expect(expect) } - @Test - fun testFatalException() = runTest(expected = { it is LinkageError }) { - rxFlowable(Dispatchers.Unconfined + CoroutineExceptionHandler { _, _ -> expectUnreached() }) { - expect(1) - throw LinkageError() - }.subscribe({ - expectUnreached() - }, { - expectUnreached() // Fatal exception is not reported in onError - }) - finish(2) - } + private fun cehUnreached() = CoroutineExceptionHandler { _, _ -> expectUnreached() } @Test - fun testExceptionWithoutParent() = runTest { - GlobalScope.rxFlowable(Dispatchers.Unconfined + CoroutineExceptionHandler { _, _ -> expectUnreached() }) { + fun testException() = runTest { + rxFlowable(Dispatchers.Unconfined + cehUnreached()) { expect(1) throw TestException() }.subscribe({ expectUnreached() }, { - assertTrue(it is TestException) expect(2) // Reported to onError }) finish(3) } @Test - fun testFatalExceptionWithoutParent() = runTest { - GlobalScope.rxFlowable(Dispatchers.Unconfined + CoroutineExceptionHandler { _, e -> - assertTrue(e is LinkageError); expect( - 2 - ) - }) { + fun testFatalException() = runTest { + rxFlowable(Dispatchers.Unconfined + ceh(3)) { expect(1) throw LinkageError() }.subscribe({ expectUnreached() }, { - expectUnreached() // Fatal exception is not reported in onError + expect(2) // Fatal exception is reported to both onError and CEH }) - finish(3) - } - - @Test - fun testExceptionAsynchronous() = runTest(expected = { it is TestException }) { - rxFlowable(Dispatchers.Unconfined) { - expect(1) - throw TestException() - }.publish() - .refCount() - .subscribe({ - expectUnreached() - }, { - expect(2) // Reported to onError - }) - finish(3) - } - - @Test - fun testFatalExceptionAsynchronous() = runTest(expected = { it is LinkageError }) { - rxFlowable(Dispatchers.Unconfined + CoroutineExceptionHandler { _, _ -> expectUnreached() }) { - expect(1) - throw LinkageError() - }.publish() - .refCount() - .subscribe({ - expectUnreached() - }, { - expectUnreached() // Fatal exception is not reported in onError - }) - finish(2) + finish(4) } @Test - fun testExceptionAsynchronousWithoutParent() = runTest { - GlobalScope.rxFlowable(Dispatchers.Unconfined + CoroutineExceptionHandler { _, _ -> expectUnreached() }) { + fun testExceptionAsynchronous() = runTest { + rxFlowable(Dispatchers.Unconfined + cehUnreached()) { expect(1) throw TestException() }.publish() @@ -119,10 +65,8 @@ class FlowableExceptionHandlingTest : TestBase() { } @Test - fun testFatalExceptionAsynchronousWithoutParent() = runTest { - GlobalScope.rxFlowable(Dispatchers.Unconfined + CoroutineExceptionHandler { _, e -> - assertTrue(e is LinkageError); expect(2) - }) { + fun testFatalExceptionAsynchronous() = runTest { + rxFlowable(Dispatchers.Unconfined + ceh(3)) { expect(1) throw LinkageError() }.publish() @@ -130,26 +74,26 @@ class FlowableExceptionHandlingTest : TestBase() { .subscribe({ expectUnreached() }, { - expectUnreached() // Fatal exception is not reported in onError + expect(2) }) - finish(3) + finish(4) } @Test - fun testFatalExceptionFromSubscribe() = runTest(expected = { it is LinkageError }) { - rxFlowable(Dispatchers.Unconfined) { + fun testFatalExceptionFromSubscribe() = runTest { + rxFlowable(Dispatchers.Unconfined + ceh(4)) { expect(1) send(Unit) }.subscribe({ expect(2) throw LinkageError() - }, { expectUnreached() }) // Unreached because fatal errors are rethrown - finish(3) + }, { expect(3) }) // Fatal exception is reported to both onError and CEH + finish(5) } @Test fun testExceptionFromSubscribe() = runTest { - rxFlowable(Dispatchers.Unconfined) { + rxFlowable(Dispatchers.Unconfined + cehUnreached()) { expect(1) send(Unit) }.subscribe({ @@ -161,7 +105,7 @@ class FlowableExceptionHandlingTest : TestBase() { @Test fun testAsynchronousExceptionFromSubscribe() = runTest { - rxFlowable(Dispatchers.Unconfined) { + rxFlowable(Dispatchers.Unconfined + cehUnreached()) { expect(1) send(Unit) }.publish() @@ -174,8 +118,8 @@ class FlowableExceptionHandlingTest : TestBase() { } @Test - fun testAsynchronousFatalExceptionFromSubscribe() = runTest(expected = { it is LinkageError }) { - rxFlowable(Dispatchers.Unconfined) { + fun testAsynchronousFatalExceptionFromSubscribe() = runTest { + rxFlowable(Dispatchers.Unconfined + ceh(3)) { expect(1) send(Unit) }.publish() @@ -184,38 +128,6 @@ class FlowableExceptionHandlingTest : TestBase() { expect(2) throw LinkageError() }, { expectUnreached() }) - finish(3) + finish(4) } - - @Test - fun testAsynchronousExceptionFromSubscribeWithoutParent() = - runTest { - GlobalScope.rxFlowable(Dispatchers.Unconfined) { - expect(1) - send(Unit) - }.publish() - .refCount() - .subscribe({ - expect(2) - throw RuntimeException() - }, { expect(3) }) - finish(4) - } - - @Test - fun testAsynchronousFatalExceptionFromSubscribeWithoutParent() = - runTest { - GlobalScope.rxFlowable(Dispatchers.Unconfined + CoroutineExceptionHandler { _, e -> - assertTrue(e is LinkageError); expect(3) - }) { - expect(1) - send(Unit) - }.publish() - .refCount() - .subscribe({ - expect(2) - throw LinkageError() - }, { expectUnreached() }) - finish(4) - } } diff --git a/reactive/kotlinx-coroutines-rx2/test/ObservableExceptionHandlingTest.kt b/reactive/kotlinx-coroutines-rx2/test/ObservableExceptionHandlingTest.kt index 9d1ae91ff5..6d247cfab7 100644 --- a/reactive/kotlinx-coroutines-rx2/test/ObservableExceptionHandlingTest.kt +++ b/reactive/kotlinx-coroutines-rx2/test/ObservableExceptionHandlingTest.kt @@ -16,65 +16,41 @@ class ObservableExceptionHandlingTest : TestBase() { ignoreLostThreads("RxComputationThreadPool-", "RxCachedWorkerPoolEvictor-", "RxSchedulerPurge-") } - @Test - fun testException() = runTest(expected = { it is TestException }) { - rxObservable(Dispatchers.Unconfined) { - expect(1) - throw TestException() - }.subscribe({ - expectUnreached() - }, { - expect(2) // Reported to onError - }) - finish(3) + private inline fun ceh(expect: Int) = CoroutineExceptionHandler { _, t -> + assertTrue(t is T) + expect(expect) } - @Test - fun testFatalException() = runTest(expected = { it is LinkageError }) { - rxObservable(Dispatchers.Unconfined + CoroutineExceptionHandler { _, _ -> expectUnreached() }) { - expect(1) - throw LinkageError() - }.subscribe({ - expectUnreached() - }, { - expectUnreached() // Fatal exception is not reported in onError - }) - finish(2) - } + private fun cehUnreached() = CoroutineExceptionHandler { _, _ -> expectUnreached() } @Test - fun testExceptionWithoutParent() = runTest { - GlobalScope.rxObservable(Dispatchers.Unconfined + CoroutineExceptionHandler { _, _ -> expectUnreached() }) { + fun testException() = runTest { + rxObservable(Dispatchers.Unconfined + cehUnreached()) { expect(1) throw TestException() }.subscribe({ expectUnreached() }, { - assertTrue(it is TestException) expect(2) // Reported to onError }) finish(3) } @Test - fun testFatalExceptionWithoutParent() = runTest { - GlobalScope.rxObservable(Dispatchers.Unconfined + CoroutineExceptionHandler { _, e -> - assertTrue(e is LinkageError); expect( - 2 - ) - }) { + fun testFatalException() = runTest { + rxObservable(Dispatchers.Unconfined + ceh(3)) { expect(1) throw LinkageError() }.subscribe({ expectUnreached() }, { - expectUnreached() // Fatal exception is not reported in onError + expect(2) }) - finish(3) + finish(4) } @Test - fun testExceptionAsynchronous() = runTest(expected = { it is TestException }) { + fun testExceptionAsynchronous() = runTest { rxObservable(Dispatchers.Unconfined) { expect(1) throw TestException() @@ -89,40 +65,8 @@ class ObservableExceptionHandlingTest : TestBase() { } @Test - fun testFatalExceptionAsynchronous() = runTest(expected = { it is LinkageError }) { - rxObservable(Dispatchers.Unconfined + CoroutineExceptionHandler { _, _ -> expectUnreached() }) { - expect(1) - throw LinkageError() - }.publish() - .refCount() - .subscribe({ - expectUnreached() - }, { - expectUnreached() // Fatal exception is not reported in onError - }) - finish(2) - } - - @Test - fun testExceptionAsynchronousWithoutParent() = runTest { - GlobalScope.rxObservable(Dispatchers.Unconfined + CoroutineExceptionHandler { _, _ -> expectUnreached() }) { - expect(1) - throw TestException() - }.publish() - .refCount() - .subscribe({ - expectUnreached() - }, { - expect(2) // Reported to onError - }) - finish(3) - } - - @Test - fun testFatalExceptionAsynchronousWithoutParent() = runTest { - GlobalScope.rxObservable(Dispatchers.Unconfined + CoroutineExceptionHandler { _, e -> - assertTrue(e is LinkageError); expect(2) - }) { + fun testFatalExceptionAsynchronous() = runTest { + rxObservable(Dispatchers.Unconfined + ceh(3)) { expect(1) throw LinkageError() }.publish() @@ -130,21 +74,21 @@ class ObservableExceptionHandlingTest : TestBase() { .subscribe({ expectUnreached() }, { - expectUnreached() // Fatal exception is not reported in onError + expect(2) // Fatal exception is not reported in onError }) - finish(3) + finish(4) } @Test - fun testFatalExceptionFromSubscribe() = runTest(expected = { it is LinkageError }) { - rxObservable(Dispatchers.Unconfined) { + fun testFatalExceptionFromSubscribe() = runTest { + rxObservable(Dispatchers.Unconfined + ceh(4)) { expect(1) send(Unit) }.subscribe({ expect(2) throw LinkageError() - }, { expectUnreached() }) // Unreached because fatal errors are rethrown - finish(3) + }, { expect(3) }) // Unreached because fatal errors are rethrown + finish(5) } @Test @@ -174,8 +118,8 @@ class ObservableExceptionHandlingTest : TestBase() { } @Test - fun testAsynchronousFatalExceptionFromSubscribe() = runTest(expected = { it is LinkageError }) { - rxObservable(Dispatchers.Unconfined) { + fun testAsynchronousFatalExceptionFromSubscribe() = runTest { + rxObservable(Dispatchers.Unconfined + ceh(4)) { expect(1) send(Unit) }.publish() @@ -183,39 +127,7 @@ class ObservableExceptionHandlingTest : TestBase() { .subscribe({ expect(2) throw LinkageError() - }, { expectUnreached() }) - finish(3) + }, { expect(3) }) + finish(5) } - - @Test - fun testAsynchronousExceptionFromSubscribeWithoutParent() = - runTest { - GlobalScope.rxObservable(Dispatchers.Unconfined) { - expect(1) - send(Unit) - }.publish() - .refCount() - .subscribe({ - expect(2) - throw RuntimeException() - }, { expect(3) }) - finish(4) - } - - @Test - fun testAsynchronousFatalExceptionFromSubscribeWithoutParent() = - runTest { - GlobalScope.rxObservable(Dispatchers.Unconfined + CoroutineExceptionHandler { _, e -> - assertTrue(e is LinkageError); expect(3) - }) { - expect(1) - send(Unit) - }.publish() - .refCount() - .subscribe({ - expect(2) - throw LinkageError() - }, { expectUnreached() }) - finish(4) - } }