diff --git a/kotlinx-coroutines-core/common/test/flow/terminal/SingleTest.kt b/kotlinx-coroutines-core/common/test/flow/terminal/SingleTest.kt index 5ce6d47b69..f7205957d1 100644 --- a/kotlinx-coroutines-core/common/test/flow/terminal/SingleTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/terminal/SingleTest.kt @@ -7,7 +7,7 @@ package kotlinx.coroutines.flow import kotlinx.coroutines.* import kotlin.test.* -class SingleTest : TestBase() { +class SingleTest : TestBase() { @Test fun testSingle() = runTest { diff --git a/reactive/kotlinx-coroutines-reactive/src/Publish.kt b/reactive/kotlinx-coroutines-reactive/src/Publish.kt index 843c94c8d6..4c41167eac 100644 --- a/reactive/kotlinx-coroutines-reactive/src/Publish.kt +++ b/reactive/kotlinx-coroutines-reactive/src/Publish.kt @@ -160,16 +160,16 @@ private class PublisherCoroutine( } // now update nRequested while (true) { // lock-free loop on nRequested - val cur = _nRequested.value - if (cur < 0) break // closed from inside onNext => unlock - if (cur == Long.MAX_VALUE) break // no back-pressure => unlock - val upd = cur - 1 - if (_nRequested.compareAndSet(cur, upd)) { - if (upd == 0L) { + val current = _nRequested.value + if (current < 0) break // closed from inside onNext => unlock + if (current == Long.MAX_VALUE) break // no back-pressure => unlock + val updated = current - 1 + if (_nRequested.compareAndSet(current, updated)) { + if (updated == 0L) { // return to keep locked due to back-pressure return } - break // unlock if upd > 0 + break // unlock if updated > 0 } } unlockAndCheckCompleted() @@ -198,17 +198,31 @@ private class PublisherCoroutine( if (cancelled) { // If the parent had failed to handle our exception, then we must not lose this exception if (cause != null && !handled) handleCoroutineException(context, cause) - } else { - try { - if (cause != null && cause !is CancellationException) { - subscriber.onError(cause) - } - else { - subscriber.onComplete() + return + } + + try { + if (cause != null && cause !is CancellationException) { + /* + * Reactive frameworks have two types of exceptions: regular and fatal. + * Regular are passed to onError. + * Fatal can be passed to onError, but even the standard implementations **can just swallow it** (e.g. see #1297). + * Such behaviour is inconsistent, leads to silent failures and we can't possibly know whether + * the cause will be handled by onError (and moreover, it depends on whether a fatal exception was + * thrown by subscriber or upstream). + * To make behaviour consistent and least surprising, we always handle fatal exceptions + * by coroutines machinery, anyway, they should not be present in regular program flow, + * thus our goal here is just to expose it as soon as possible. + */ + subscriber.onError(cause) + if (!handled && cause.isFatal()) { + handleCoroutineException(context, cause) } - } catch (e: Throwable) { - handleCoroutineException(context, e) + } else { + subscriber.onComplete() } + } catch (e: Throwable) { + handleCoroutineException(context, e) } } } finally { @@ -242,12 +256,12 @@ private class PublisherCoroutine( // assert: isCompleted private fun signalCompleted(cause: Throwable?, handled: Boolean) { while (true) { // lock-free loop for nRequested - val cur = _nRequested.value - if (cur == SIGNALLED) return // some other thread holding lock already signalled cancellation/completion - check(cur >= 0) // no other thread could have marked it as CLOSED, because onCompleted[Exceptionally] is invoked once - if (!_nRequested.compareAndSet(cur, CLOSED)) continue // retry on failed CAS + val current = _nRequested.value + if (current == SIGNALLED) return // some other thread holding lock already signalled cancellation/completion + check(current >= 0) // no other thread could have marked it as CLOSED, because onCompleted[Exceptionally] is invoked once + if (!_nRequested.compareAndSet(current, CLOSED)) continue // retry on failed CAS // Ok -- marked as CLOSED, now can unlock the mutex if it was locked due to backpressure - if (cur == 0L) { + if (current == 0L) { doLockedSignalCompleted(cause, handled) } else { // otherwise mutex was either not locked or locked in concurrent onNext... try lock it to signal completion @@ -272,4 +286,6 @@ private class PublisherCoroutine( cancelled = true super.cancel(null) } + + private fun Throwable.isFatal() = this is VirtualMachineError || this is ThreadDeath || this is LinkageError } diff --git a/reactive/kotlinx-coroutines-rx2/src/RxCompletable.kt b/reactive/kotlinx-coroutines-rx2/src/RxCompletable.kt index 61046f280b..0da06776a5 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxCompletable.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxCompletable.kt @@ -61,12 +61,20 @@ private class RxCompletableCoroutine( private val subscriber: CompletableEmitter ) : AbstractCoroutine(parentContext, true) { override fun onCompleted(value: Unit) { - if (!subscriber.isDisposed) subscriber.onComplete() + try { + if (!subscriber.isDisposed) subscriber.onComplete() + } catch (e: Throwable) { + handleCoroutineException(context, e) + } } override fun onCancelled(cause: Throwable, handled: Boolean) { if (!subscriber.isDisposed) { - subscriber.onError(cause) + try { + subscriber.onError(cause) + } catch (e: Throwable) { + handleCoroutineException(context, e) + } } else if (!handled) { handleCoroutineException(context, cause) } diff --git a/reactive/kotlinx-coroutines-rx2/src/RxMaybe.kt b/reactive/kotlinx-coroutines-rx2/src/RxMaybe.kt index e93ae6b2d4..fbc366c6df 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxMaybe.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxMaybe.kt @@ -63,13 +63,21 @@ private class RxMaybeCoroutine( ) : AbstractCoroutine(parentContext, true) { override fun onCompleted(value: T) { if (!subscriber.isDisposed) { - if (value == null) subscriber.onComplete() else subscriber.onSuccess(value) + try { + if (value == null) subscriber.onComplete() else subscriber.onSuccess(value) + } catch(e: Throwable) { + handleCoroutineException(context, e) + } } } override fun onCancelled(cause: Throwable, handled: Boolean) { if (!subscriber.isDisposed) { - subscriber.onError(cause) + try { + subscriber.onError(cause) + } catch (e: Throwable) { + handleCoroutineException(context, e) + } } else if (!handled) { handleCoroutineException(context, cause) } diff --git a/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt b/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt index 35176f1484..3d0ccd824d 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt @@ -7,6 +7,7 @@ package kotlinx.coroutines.rx2 import io.reactivex.* +import io.reactivex.exceptions.* import kotlinx.atomicfu.* import kotlinx.coroutines.* import kotlinx.coroutines.channels.* @@ -131,15 +132,19 @@ private class RxObservableCoroutine( // to abort the corresponding send/offer invocation. From the standpoint of coroutines machinery, // this failure is essentially equivalent to a failure of a child coroutine. cancelCoroutine(e) - doLockedSignalCompleted(e, false) + mutex.unlock() throw e } /* - There is no sense to check for `isActive` before doing `unlock`, because cancellation/completion might - happen after this check and before `unlock` (see signalCompleted that does not do anything - if it fails to acquire the lock that we are still holding). - We have to recheck `isCompleted` after `unlock` anyway. + * There is no sense to check for `isActive` before doing `unlock`, because cancellation/completion might + * happen after this check and before `unlock` (see signalCompleted that does not do anything + * if it fails to acquire the lock that we are still holding). + * We have to recheck `isCompleted` after `unlock` anyway. */ + unlockAndCheckCompleted() + } + + private fun unlockAndCheckCompleted() { mutex.unlock() // recheck isActive if (!isActive && mutex.tryLock()) @@ -148,16 +153,31 @@ private class RxObservableCoroutine( // assert: mutex.isLocked() private fun doLockedSignalCompleted(cause: Throwable?, handled: Boolean) { - // todo: handled is ignored here, might need something like in PublisherCoroutine to process // cancellation failures try { if (_signal.value >= CLOSED) { _signal.value = SIGNALLED // we'll signal onError/onCompleted (that the final state -- no CAS needed) try { - if (cause != null && cause !is CancellationException) + if (cause != null && cause !is CancellationException) { + /* + * Reactive frameworks have two types of exceptions: regular and fatal. + * Regular are passed to onError. + * Fatal can be passed to onError, but even the standard implementations **can just swallow it** (e.g. see #1297). + * Such behaviour is inconsistent, leads to silent failures and we can't possibly know whether + * the cause will be handled by onError (and moreover, it depends on whether a fatal exception was + * thrown by subscriber or upstream). + * To make behaviour consistent and least surprising, we always handle fatal exceptions + * by coroutines machinery, anyway, they should not be present in regular program flow, + * thus our goal here is just to expose it as soon as possible. + */ subscriber.onError(cause) - else + if (!handled && cause.isFatal()) { + handleCoroutineException(context, cause) + } + } + else { subscriber.onComplete() + } } catch (e: Throwable) { // Unhandled exception (cannot handle in other way, since we are already complete) handleCoroutineException(context, e) @@ -181,4 +201,11 @@ private class RxObservableCoroutine( override fun onCancelled(cause: Throwable, handled: Boolean) { signalCompleted(cause, handled) } +} + +internal fun Throwable.isFatal() = try { + Exceptions.throwIfFatal(this) // Rx-consistent behaviour without hardcode + false +} catch (e: Throwable) { + true } \ No newline at end of file diff --git a/reactive/kotlinx-coroutines-rx2/src/RxSingle.kt b/reactive/kotlinx-coroutines-rx2/src/RxSingle.kt index e382bbe39d..b6cebf097c 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxSingle.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxSingle.kt @@ -61,12 +61,20 @@ private class RxSingleCoroutine( private val subscriber: SingleEmitter ) : AbstractCoroutine(parentContext, true) { override fun onCompleted(value: T) { - if (!subscriber.isDisposed) subscriber.onSuccess(value) + try { + if (!subscriber.isDisposed) subscriber.onSuccess(value) + } catch (e: Throwable) { + handleCoroutineException(context, e) + } } override fun onCancelled(cause: Throwable, handled: Boolean) { if (!subscriber.isDisposed) { - subscriber.onError(cause) + try { + subscriber.onError(cause) + } catch (e: Throwable) { + handleCoroutineException(context, e) + } } else if (!handled) { handleCoroutineException(context, cause) } diff --git a/reactive/kotlinx-coroutines-rx2/test/CompletableTest.kt b/reactive/kotlinx-coroutines-rx2/test/CompletableTest.kt index fd159641bb..a7caea4793 100644 --- a/reactive/kotlinx-coroutines-rx2/test/CompletableTest.kt +++ b/reactive/kotlinx-coroutines-rx2/test/CompletableTest.kt @@ -147,4 +147,21 @@ class CompletableTest : TestBase() { yield() // run coroutine finish(6) } + + @Test + fun testFatalExceptionInSubscribe() = runTest { + GlobalScope.rxCompletable(Dispatchers.Unconfined + CoroutineExceptionHandler{ _, e -> assertTrue(e is LinkageError); expect(2)}) { + expect(1) + 42 + }.subscribe({ throw LinkageError() }) + finish(3) + } + + @Test + fun testFatalExceptionInSingle() = runTest { + GlobalScope.rxCompletable(Dispatchers.Unconfined) { + throw LinkageError() + }.subscribe({ expectUnreached() }, { expect(1); assertTrue(it is LinkageError) }) + finish(2) + } } diff --git a/reactive/kotlinx-coroutines-rx2/test/ConvertTest.kt b/reactive/kotlinx-coroutines-rx2/test/ConvertTest.kt index ba14b89ca8..758b632604 100644 --- a/reactive/kotlinx-coroutines-rx2/test/ConvertTest.kt +++ b/reactive/kotlinx-coroutines-rx2/test/ConvertTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.rx2 @@ -143,7 +143,7 @@ class ConvertTest : TestBase() { val single = rxSingle(Dispatchers.Unconfined) { var result = "" try { - observable.consumeEach { result += it } + observable.collect { result += it } } catch(e: Throwable) { check(e is TestException) result += e.message diff --git a/reactive/kotlinx-coroutines-rx2/test/FlowableExceptionHandlingTest.kt b/reactive/kotlinx-coroutines-rx2/test/FlowableExceptionHandlingTest.kt new file mode 100644 index 0000000000..4f3e7241c6 --- /dev/null +++ b/reactive/kotlinx-coroutines-rx2/test/FlowableExceptionHandlingTest.kt @@ -0,0 +1,133 @@ +/* + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.rx2 + +import kotlinx.coroutines.* +import org.junit.* +import org.junit.Test +import kotlin.test.* + +class FlowableExceptionHandlingTest : TestBase() { + + @Before + fun setup() { + ignoreLostThreads("RxComputationThreadPool-", "RxCachedWorkerPoolEvictor-", "RxSchedulerPurge-") + } + + private inline fun ceh(expect: Int) = CoroutineExceptionHandler { _, t -> + assertTrue(t is T) + expect(expect) + } + + private fun cehUnreached() = CoroutineExceptionHandler { _, _ -> expectUnreached() } + + @Test + fun testException() = runTest { + rxFlowable(Dispatchers.Unconfined + cehUnreached()) { + expect(1) + throw TestException() + }.subscribe({ + expectUnreached() + }, { + expect(2) // Reported to onError + }) + finish(3) + } + + @Test + fun testFatalException() = runTest { + rxFlowable(Dispatchers.Unconfined + ceh(3)) { + expect(1) + throw LinkageError() + }.subscribe({ + expectUnreached() + }, { + expect(2) // Fatal exception is reported to both onError and CEH + }) + finish(4) + } + + @Test + fun testExceptionAsynchronous() = runTest { + rxFlowable(Dispatchers.Unconfined + cehUnreached()) { + expect(1) + throw TestException() + }.publish() + .refCount() + .subscribe({ + expectUnreached() + }, { + expect(2) // Reported to onError + }) + finish(3) + } + + @Test + fun testFatalExceptionAsynchronous() = runTest { + rxFlowable(Dispatchers.Unconfined + ceh(3)) { + expect(1) + throw LinkageError() + }.publish() + .refCount() + .subscribe({ + expectUnreached() + }, { + expect(2) + }) + finish(4) + } + + @Test + fun testFatalExceptionFromSubscribe() = runTest { + rxFlowable(Dispatchers.Unconfined + ceh(4)) { + expect(1) + send(Unit) + }.subscribe({ + expect(2) + throw LinkageError() + }, { expect(3) }) // Fatal exception is reported to both onError and CEH + finish(5) + } + + @Test + fun testExceptionFromSubscribe() = runTest { + rxFlowable(Dispatchers.Unconfined + cehUnreached()) { + expect(1) + send(Unit) + }.subscribe({ + expect(2) + throw TestException() + }, { expect(3) }) // not reported to onError because came from the subscribe itself + finish(4) + } + + @Test + fun testAsynchronousExceptionFromSubscribe() = runTest { + rxFlowable(Dispatchers.Unconfined + cehUnreached()) { + expect(1) + send(Unit) + }.publish() + .refCount() + .subscribe({ + expect(2) + throw RuntimeException() + }, { expect(3) }) + finish(4) + } + + @Test + fun testAsynchronousFatalExceptionFromSubscribe() = runTest { + rxFlowable(Dispatchers.Unconfined + ceh(3)) { + expect(1) + send(Unit) + }.publish() + .refCount() + .subscribe({ + expect(2) + throw LinkageError() + }, { expectUnreached() }) + finish(4) + } +} diff --git a/reactive/kotlinx-coroutines-rx2/test/MaybeTest.kt b/reactive/kotlinx-coroutines-rx2/test/MaybeTest.kt index 5a9bac2fdb..dcd66638e5 100644 --- a/reactive/kotlinx-coroutines-rx2/test/MaybeTest.kt +++ b/reactive/kotlinx-coroutines-rx2/test/MaybeTest.kt @@ -31,7 +31,7 @@ class MaybeTest : TestBase() { expect(2) maybe.subscribe { value -> expect(5) - Assert.assertThat(value, IsEqual("OK")) + assertThat(value, IsEqual("OK")) } expect(3) yield() // to started coroutine @@ -277,4 +277,21 @@ class MaybeTest : TestBase() { yield() // run coroutine finish(6) } + + @Test + fun testFatalExceptionInSubscribe() = runTest { + GlobalScope.rxMaybe(Dispatchers.Unconfined + CoroutineExceptionHandler{ _, e -> assertTrue(e is LinkageError); expect(2)}) { + expect(1) + 42 + }.subscribe({ throw LinkageError() }) + finish(3) + } + + @Test + fun testFatalExceptionInSingle() = runTest { + GlobalScope.rxMaybe(Dispatchers.Unconfined) { + throw LinkageError() + }.subscribe({ expectUnreached() }, { expect(1); assertTrue(it is LinkageError) }) + finish(2) + } } diff --git a/reactive/kotlinx-coroutines-rx2/test/ObservableExceptionHandlingTest.kt b/reactive/kotlinx-coroutines-rx2/test/ObservableExceptionHandlingTest.kt new file mode 100644 index 0000000000..6d247cfab7 --- /dev/null +++ b/reactive/kotlinx-coroutines-rx2/test/ObservableExceptionHandlingTest.kt @@ -0,0 +1,133 @@ +/* + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.rx2 + +import kotlinx.coroutines.* +import org.junit.* +import org.junit.Test +import kotlin.test.* + +class ObservableExceptionHandlingTest : TestBase() { + + @Before + fun setup() { + ignoreLostThreads("RxComputationThreadPool-", "RxCachedWorkerPoolEvictor-", "RxSchedulerPurge-") + } + + private inline fun ceh(expect: Int) = CoroutineExceptionHandler { _, t -> + assertTrue(t is T) + expect(expect) + } + + private fun cehUnreached() = CoroutineExceptionHandler { _, _ -> expectUnreached() } + + @Test + fun testException() = runTest { + rxObservable(Dispatchers.Unconfined + cehUnreached()) { + expect(1) + throw TestException() + }.subscribe({ + expectUnreached() + }, { + expect(2) // Reported to onError + }) + finish(3) + } + + @Test + fun testFatalException() = runTest { + rxObservable(Dispatchers.Unconfined + ceh(3)) { + expect(1) + throw LinkageError() + }.subscribe({ + expectUnreached() + }, { + expect(2) + }) + finish(4) + } + + @Test + fun testExceptionAsynchronous() = runTest { + rxObservable(Dispatchers.Unconfined) { + expect(1) + throw TestException() + }.publish() + .refCount() + .subscribe({ + expectUnreached() + }, { + expect(2) // Reported to onError + }) + finish(3) + } + + @Test + fun testFatalExceptionAsynchronous() = runTest { + rxObservable(Dispatchers.Unconfined + ceh(3)) { + expect(1) + throw LinkageError() + }.publish() + .refCount() + .subscribe({ + expectUnreached() + }, { + expect(2) // Fatal exception is not reported in onError + }) + finish(4) + } + + @Test + fun testFatalExceptionFromSubscribe() = runTest { + rxObservable(Dispatchers.Unconfined + ceh(4)) { + expect(1) + send(Unit) + }.subscribe({ + expect(2) + throw LinkageError() + }, { expect(3) }) // Unreached because fatal errors are rethrown + finish(5) + } + + @Test + fun testExceptionFromSubscribe() = runTest { + rxObservable(Dispatchers.Unconfined) { + expect(1) + send(Unit) + }.subscribe({ + expect(2) + throw TestException() + }, { expect(3) }) // not reported to onError because came from the subscribe itself + finish(4) + } + + @Test + fun testAsynchronousExceptionFromSubscribe() = runTest { + rxObservable(Dispatchers.Unconfined) { + expect(1) + send(Unit) + }.publish() + .refCount() + .subscribe({ + expect(2) + throw RuntimeException() + }, { expect(3) }) + finish(4) + } + + @Test + fun testAsynchronousFatalExceptionFromSubscribe() = runTest { + rxObservable(Dispatchers.Unconfined + ceh(4)) { + expect(1) + send(Unit) + }.publish() + .refCount() + .subscribe({ + expect(2) + throw LinkageError() + }, { expect(3) }) + finish(5) + } +} diff --git a/reactive/kotlinx-coroutines-rx2/test/SingleTest.kt b/reactive/kotlinx-coroutines-rx2/test/SingleTest.kt index 8b786164b4..fce772347b 100644 --- a/reactive/kotlinx-coroutines-rx2/test/SingleTest.kt +++ b/reactive/kotlinx-coroutines-rx2/test/SingleTest.kt @@ -6,6 +6,7 @@ package kotlinx.coroutines.rx2 import io.reactivex.* import io.reactivex.disposables.* +import io.reactivex.functions.* import kotlinx.coroutines.* import org.hamcrest.core.* import org.junit.* @@ -198,6 +199,26 @@ class SingleTest : TestBase() { } } + @Test + fun testFatalExceptionInSubscribe() = runTest { + GlobalScope.rxSingle(Dispatchers.Unconfined + CoroutineExceptionHandler { _, e -> assertTrue(e is LinkageError); expect(2) }) { + expect(1) + 42 + }.subscribe(Consumer { + throw LinkageError() + }) + finish(3) + } + + @Test + fun testFatalExceptionInSingle() = runTest { + GlobalScope.rxSingle(Dispatchers.Unconfined) { + throw LinkageError() + }.subscribe({ _, e -> assertTrue(e is LinkageError); expect(1) }) + + finish(2) + } + @Test fun testUnhandledException() = runTest { expect(1)