Skip to content

Consistently handle fatal exceptions in reactive integrations #1305

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jul 19, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ package kotlinx.coroutines.flow
import kotlinx.coroutines.*
import kotlin.test.*

class SingleTest : TestBase() {
class SingleTest : TestBase() {

@Test
fun testSingle() = runTest {
Expand Down
58 changes: 37 additions & 21 deletions reactive/kotlinx-coroutines-reactive/src/Publish.kt
Original file line number Diff line number Diff line change
Expand Up @@ -160,16 +160,16 @@ private class PublisherCoroutine<in T>(
}
// now update nRequested
while (true) { // lock-free loop on nRequested
val cur = _nRequested.value
if (cur < 0) break // closed from inside onNext => unlock
if (cur == Long.MAX_VALUE) break // no back-pressure => unlock
val upd = cur - 1
if (_nRequested.compareAndSet(cur, upd)) {
if (upd == 0L) {
val current = _nRequested.value
if (current < 0) break // closed from inside onNext => unlock
if (current == Long.MAX_VALUE) break // no back-pressure => unlock
val updated = current - 1
if (_nRequested.compareAndSet(current, updated)) {
if (updated == 0L) {
// return to keep locked due to back-pressure
return
}
break // unlock if upd > 0
break // unlock if updated > 0
}
}
unlockAndCheckCompleted()
Expand Down Expand Up @@ -198,17 +198,31 @@ private class PublisherCoroutine<in T>(
if (cancelled) {
// If the parent had failed to handle our exception, then we must not lose this exception
if (cause != null && !handled) handleCoroutineException(context, cause)
} else {
try {
if (cause != null && cause !is CancellationException) {
subscriber.onError(cause)
}
else {
subscriber.onComplete()
return
}

try {
if (cause != null && cause !is CancellationException) {
/*
* Reactive frameworks have two types of exceptions: regular and fatal.
* Regular are passed to onError.
* Fatal can be passed to onError, but even the standard implementations **can just swallow it** (e.g. see #1297).
* Such behaviour is inconsistent, leads to silent failures and we can't possibly know whether
* the cause will be handled by onError (and moreover, it depends on whether a fatal exception was
* thrown by subscriber or upstream).
* To make behaviour consistent and least surprising, we always handle fatal exceptions
* by coroutines machinery, anyway, they should not be present in regular program flow,
* thus our goal here is just to expose it as soon as possible.
*/
subscriber.onError(cause)
if (!handled && cause.isFatal()) {
handleCoroutineException(context, cause)
}
} catch (e: Throwable) {
handleCoroutineException(context, e)
} else {
subscriber.onComplete()
}
} catch (e: Throwable) {
handleCoroutineException(context, e)
}
}
} finally {
Expand Down Expand Up @@ -242,12 +256,12 @@ private class PublisherCoroutine<in T>(
// assert: isCompleted
private fun signalCompleted(cause: Throwable?, handled: Boolean) {
while (true) { // lock-free loop for nRequested
val cur = _nRequested.value
if (cur == SIGNALLED) return // some other thread holding lock already signalled cancellation/completion
check(cur >= 0) // no other thread could have marked it as CLOSED, because onCompleted[Exceptionally] is invoked once
if (!_nRequested.compareAndSet(cur, CLOSED)) continue // retry on failed CAS
val current = _nRequested.value
if (current == SIGNALLED) return // some other thread holding lock already signalled cancellation/completion
check(current >= 0) // no other thread could have marked it as CLOSED, because onCompleted[Exceptionally] is invoked once
if (!_nRequested.compareAndSet(current, CLOSED)) continue // retry on failed CAS
// Ok -- marked as CLOSED, now can unlock the mutex if it was locked due to backpressure
if (cur == 0L) {
if (current == 0L) {
doLockedSignalCompleted(cause, handled)
} else {
// otherwise mutex was either not locked or locked in concurrent onNext... try lock it to signal completion
Expand All @@ -272,4 +286,6 @@ private class PublisherCoroutine<in T>(
cancelled = true
super.cancel(null)
}

private fun Throwable.isFatal() = this is VirtualMachineError || this is ThreadDeath || this is LinkageError
}
12 changes: 10 additions & 2 deletions reactive/kotlinx-coroutines-rx2/src/RxCompletable.kt
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,20 @@ private class RxCompletableCoroutine(
private val subscriber: CompletableEmitter
) : AbstractCoroutine<Unit>(parentContext, true) {
override fun onCompleted(value: Unit) {
if (!subscriber.isDisposed) subscriber.onComplete()
try {
if (!subscriber.isDisposed) subscriber.onComplete()
} catch (e: Throwable) {
handleCoroutineException(context, e)
}
}

override fun onCancelled(cause: Throwable, handled: Boolean) {
if (!subscriber.isDisposed) {
subscriber.onError(cause)
try {
subscriber.onError(cause)
} catch (e: Throwable) {
handleCoroutineException(context, e)
}
} else if (!handled) {
handleCoroutineException(context, cause)
}
Expand Down
12 changes: 10 additions & 2 deletions reactive/kotlinx-coroutines-rx2/src/RxMaybe.kt
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,21 @@ private class RxMaybeCoroutine<T>(
) : AbstractCoroutine<T>(parentContext, true) {
override fun onCompleted(value: T) {
if (!subscriber.isDisposed) {
if (value == null) subscriber.onComplete() else subscriber.onSuccess(value)
try {
if (value == null) subscriber.onComplete() else subscriber.onSuccess(value)
} catch(e: Throwable) {
handleCoroutineException(context, e)
}
}
}

override fun onCancelled(cause: Throwable, handled: Boolean) {
if (!subscriber.isDisposed) {
subscriber.onError(cause)
try {
subscriber.onError(cause)
} catch (e: Throwable) {
handleCoroutineException(context, e)
}
} else if (!handled) {
handleCoroutineException(context, cause)
}
Expand Down
43 changes: 35 additions & 8 deletions reactive/kotlinx-coroutines-rx2/src/RxObservable.kt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
package kotlinx.coroutines.rx2

import io.reactivex.*
import io.reactivex.exceptions.*
import kotlinx.atomicfu.*
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
Expand Down Expand Up @@ -131,15 +132,19 @@ private class RxObservableCoroutine<T: Any>(
// to abort the corresponding send/offer invocation. From the standpoint of coroutines machinery,
// this failure is essentially equivalent to a failure of a child coroutine.
cancelCoroutine(e)
doLockedSignalCompleted(e, false)
mutex.unlock()
throw e
}
/*
There is no sense to check for `isActive` before doing `unlock`, because cancellation/completion might
happen after this check and before `unlock` (see signalCompleted that does not do anything
if it fails to acquire the lock that we are still holding).
We have to recheck `isCompleted` after `unlock` anyway.
* There is no sense to check for `isActive` before doing `unlock`, because cancellation/completion might
* happen after this check and before `unlock` (see signalCompleted that does not do anything
* if it fails to acquire the lock that we are still holding).
* We have to recheck `isCompleted` after `unlock` anyway.
*/
unlockAndCheckCompleted()
}

private fun unlockAndCheckCompleted() {
mutex.unlock()
// recheck isActive
if (!isActive && mutex.tryLock())
Expand All @@ -148,16 +153,31 @@ private class RxObservableCoroutine<T: Any>(

// assert: mutex.isLocked()
private fun doLockedSignalCompleted(cause: Throwable?, handled: Boolean) {
// todo: handled is ignored here, might need something like in PublisherCoroutine to process
// cancellation failures
try {
if (_signal.value >= CLOSED) {
_signal.value = SIGNALLED // we'll signal onError/onCompleted (that the final state -- no CAS needed)
try {
if (cause != null && cause !is CancellationException)
if (cause != null && cause !is CancellationException) {
/*
* Reactive frameworks have two types of exceptions: regular and fatal.
* Regular are passed to onError.
* Fatal can be passed to onError, but even the standard implementations **can just swallow it** (e.g. see #1297).
* Such behaviour is inconsistent, leads to silent failures and we can't possibly know whether
* the cause will be handled by onError (and moreover, it depends on whether a fatal exception was
* thrown by subscriber or upstream).
* To make behaviour consistent and least surprising, we always handle fatal exceptions
* by coroutines machinery, anyway, they should not be present in regular program flow,
* thus our goal here is just to expose it as soon as possible.
*/
subscriber.onError(cause)
else
if (!handled && cause.isFatal()) {
handleCoroutineException(context, cause)
}
}
else {
subscriber.onComplete()
}
} catch (e: Throwable) {
// Unhandled exception (cannot handle in other way, since we are already complete)
handleCoroutineException(context, e)
Expand All @@ -181,4 +201,11 @@ private class RxObservableCoroutine<T: Any>(
override fun onCancelled(cause: Throwable, handled: Boolean) {
signalCompleted(cause, handled)
}
}

internal fun Throwable.isFatal() = try {
Exceptions.throwIfFatal(this) // Rx-consistent behaviour without hardcode
false
} catch (e: Throwable) {
true
}
12 changes: 10 additions & 2 deletions reactive/kotlinx-coroutines-rx2/src/RxSingle.kt
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,20 @@ private class RxSingleCoroutine<T: Any>(
private val subscriber: SingleEmitter<T>
) : AbstractCoroutine<T>(parentContext, true) {
override fun onCompleted(value: T) {
if (!subscriber.isDisposed) subscriber.onSuccess(value)
try {
if (!subscriber.isDisposed) subscriber.onSuccess(value)
} catch (e: Throwable) {
handleCoroutineException(context, e)
}
}

override fun onCancelled(cause: Throwable, handled: Boolean) {
if (!subscriber.isDisposed) {
subscriber.onError(cause)
try {
subscriber.onError(cause)
} catch (e: Throwable) {
handleCoroutineException(context, e)
}
} else if (!handled) {
handleCoroutineException(context, cause)
}
Expand Down
17 changes: 17 additions & 0 deletions reactive/kotlinx-coroutines-rx2/test/CompletableTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -147,4 +147,21 @@ class CompletableTest : TestBase() {
yield() // run coroutine
finish(6)
}

@Test
fun testFatalExceptionInSubscribe() = runTest {
GlobalScope.rxCompletable(Dispatchers.Unconfined + CoroutineExceptionHandler{ _, e -> assertTrue(e is LinkageError); expect(2)}) {
expect(1)
42
}.subscribe({ throw LinkageError() })
finish(3)
}

@Test
fun testFatalExceptionInSingle() = runTest {
GlobalScope.rxCompletable(Dispatchers.Unconfined) {
throw LinkageError()
}.subscribe({ expectUnreached() }, { expect(1); assertTrue(it is LinkageError) })
finish(2)
}
}
4 changes: 2 additions & 2 deletions reactive/kotlinx-coroutines-rx2/test/ConvertTest.kt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines.rx2
Expand Down Expand Up @@ -143,7 +143,7 @@ class ConvertTest : TestBase() {
val single = rxSingle(Dispatchers.Unconfined) {
var result = ""
try {
observable.consumeEach { result += it }
observable.collect { result += it }
} catch(e: Throwable) {
check(e is TestException)
result += e.message
Expand Down
Loading