Skip to content

Commit 59fe965

Browse files
committed
Properly handle fatal exceptions in Rx coroutines, get rid of deadlock in RxObservable
1 parent 0bc2fc2 commit 59fe965

File tree

10 files changed

+347
-25
lines changed

10 files changed

+347
-25
lines changed

reactive/kotlinx-coroutines-reactive/src/Publish.kt

+5-5
Original file line numberDiff line numberDiff line change
@@ -138,11 +138,11 @@ private class PublisherCoroutine<in T>(
138138
}
139139
// now update nRequested
140140
while (true) { // lock-free loop on nRequested
141-
val cur = _nRequested.value
142-
if (cur < 0) break // closed from inside onNext => unlock
143-
if (cur == Long.MAX_VALUE) break // no back-pressure => unlock
144-
val upd = cur - 1
145-
if (_nRequested.compareAndSet(cur, upd)) {
141+
val current = _nRequested.value
142+
if (current < 0) break // closed from inside onNext => unlock
143+
if (current == Long.MAX_VALUE) break // no back-pressure => unlock
144+
val upd = current - 1
145+
if (_nRequested.compareAndSet(current, upd)) {
146146
if (upd == 0L) {
147147
// return to keep locked due to back-pressure
148148
return

reactive/kotlinx-coroutines-rx2/src/RxCompletable.kt

+10-2
Original file line numberDiff line numberDiff line change
@@ -41,12 +41,20 @@ private class RxCompletableCoroutine(
4141
private val subscriber: CompletableEmitter
4242
) : AbstractCoroutine<Unit>(parentContext, true) {
4343
override fun onCompleted(value: Unit) {
44-
if (!subscriber.isDisposed) subscriber.onComplete()
44+
try {
45+
if (!subscriber.isDisposed) subscriber.onComplete()
46+
} catch (e: Throwable) {
47+
handleCoroutineException(context, e)
48+
}
4549
}
4650

4751
override fun onCancelled(cause: Throwable, handled: Boolean) {
4852
if (!subscriber.isDisposed) {
49-
subscriber.onError(cause)
53+
try {
54+
subscriber.onError(cause)
55+
} catch (e: Throwable) {
56+
handleCoroutineException(context, e)
57+
}
5058
} else if (!handled) {
5159
handleCoroutineException(context, cause)
5260
}

reactive/kotlinx-coroutines-rx2/src/RxMaybe.kt

+10-2
Original file line numberDiff line numberDiff line change
@@ -43,13 +43,21 @@ private class RxMaybeCoroutine<T>(
4343
) : AbstractCoroutine<T>(parentContext, true) {
4444
override fun onCompleted(value: T) {
4545
if (!subscriber.isDisposed) {
46-
if (value == null) subscriber.onComplete() else subscriber.onSuccess(value)
46+
try {
47+
if (value == null) subscriber.onComplete() else subscriber.onSuccess(value)
48+
} catch(e: Throwable) {
49+
handleCoroutineException(context, e)
50+
}
4751
}
4852
}
4953

5054
override fun onCancelled(cause: Throwable, handled: Boolean) {
5155
if (!subscriber.isDisposed) {
52-
subscriber.onError(cause)
56+
try {
57+
subscriber.onError(cause)
58+
} catch (e: Throwable) {
59+
handleCoroutineException(context, e)
60+
}
5361
} else if (!handled) {
5462
handleCoroutineException(context, cause)
5563
}

reactive/kotlinx-coroutines-rx2/src/RxObservable.kt

+32-10
Original file line numberDiff line numberDiff line change
@@ -114,15 +114,19 @@ private class RxObservableCoroutine<T: Any>(
114114
// to abort the corresponding send/offer invocation. From the standpoint of coroutines machinery,
115115
// this failure is essentially equivalent to a failure of a child coroutine.
116116
cancelCoroutine(e)
117-
doLockedSignalCompleted(e, false)
117+
mutex.unlock()
118118
throw e
119119
}
120120
/*
121-
There is no sense to check for `isActive` before doing `unlock`, because cancellation/completion might
122-
happen after this check and before `unlock` (see signalCompleted that does not do anything
123-
if it fails to acquire the lock that we are still holding).
124-
We have to recheck `isCompleted` after `unlock` anyway.
121+
* There is no sense to check for `isActive` before doing `unlock`, because cancellation/completion might
122+
* happen after this check and before `unlock` (see signalCompleted that does not do anything
123+
* if it fails to acquire the lock that we are still holding).
124+
* We have to recheck `isCompleted` after `unlock` anyway.
125125
*/
126+
unlockAndCheckCompleted()
127+
}
128+
129+
private fun unlockAndCheckCompleted() {
126130
mutex.unlock()
127131
// recheck isActive
128132
if (!isActive && mutex.tryLock())
@@ -131,16 +135,32 @@ private class RxObservableCoroutine<T: Any>(
131135

132136
// assert: mutex.isLocked()
133137
private fun doLockedSignalCompleted(cause: Throwable?, handled: Boolean) {
134-
// todo: handled is ignored here, might need something like in PublisherCoroutine to process
135138
// cancellation failures
136139
try {
137140
if (_signal.value >= CLOSED) {
138141
_signal.value = SIGNALLED // we'll signal onError/onCompleted (that the final state -- no CAS needed)
139142
try {
140-
if (cause != null && cause !is CancellationException)
141-
subscriber.onError(cause)
142-
else
143+
if (cause != null && cause !is CancellationException) {
144+
/*
145+
* Reactive frameworks have two types of exceptions: regular and fatal.
146+
* Regular are passed to onError.
147+
* Fatal can be passed to onError, but implementation **is free to swallow it** (e.g. see #1297).
148+
* Such behaviour is inconsistent, leads to silent failures and we can't possibly know whether
149+
* the cause will be handled by onError (and moreover, it depends on whether a fatal exception was
150+
* thrown by subscriber or upstream).
151+
* To make behaviour consistent and least surprising, we always handle fatal exceptions
152+
* by coroutines machinery, anyway, they should not be present in regular program flow,
153+
* thus our goal here is just to expose it as soon as possible.
154+
*/
155+
if (cause.isFatal()) {
156+
if (!handled) handleCoroutineException(context, cause)
157+
} else {
158+
subscriber.onError(cause)
159+
}
160+
}
161+
else {
143162
subscriber.onComplete()
163+
}
144164
} catch (e: Throwable) {
145165
// Unhandled exception (cannot handle in other way, since we are already complete)
146166
handleCoroutineException(context, e)
@@ -164,4 +184,6 @@ private class RxObservableCoroutine<T: Any>(
164184
override fun onCancelled(cause: Throwable, handled: Boolean) {
165185
signalCompleted(cause, handled)
166186
}
167-
}
187+
}
188+
189+
internal fun Throwable.isFatal() = this is VirtualMachineError || this is ThreadDeath || this is LinkageError

reactive/kotlinx-coroutines-rx2/src/RxSingle.kt

+10-2
Original file line numberDiff line numberDiff line change
@@ -41,12 +41,20 @@ private class RxSingleCoroutine<T: Any>(
4141
private val subscriber: SingleEmitter<T>
4242
) : AbstractCoroutine<T>(parentContext, true) {
4343
override fun onCompleted(value: T) {
44-
if (!subscriber.isDisposed) subscriber.onSuccess(value)
44+
try {
45+
if (!subscriber.isDisposed) subscriber.onSuccess(value)
46+
} catch (e: Throwable) {
47+
handleCoroutineException(context, e)
48+
}
4549
}
4650

4751
override fun onCancelled(cause: Throwable, handled: Boolean) {
4852
if (!subscriber.isDisposed) {
49-
subscriber.onError(cause)
53+
try {
54+
subscriber.onError(cause)
55+
} catch (e: Throwable) {
56+
handleCoroutineException(context, e)
57+
}
5058
} else if (!handled) {
5159
handleCoroutineException(context, cause)
5260
}

reactive/kotlinx-coroutines-rx2/test/CompletableTest.kt

+17
Original file line numberDiff line numberDiff line change
@@ -158,4 +158,21 @@ class CompletableTest : TestBase() {
158158
yield() // run coroutine
159159
finish(5)
160160
}
161+
162+
@Test
163+
fun testFatalExceptionInSubscribe() = runTest {
164+
GlobalScope.rxCompletable(Dispatchers.Unconfined + CoroutineExceptionHandler{ _, e -> assertTrue(e is LinkageError); expect(2)}) {
165+
expect(1)
166+
42
167+
}.subscribe({ throw LinkageError() })
168+
finish(3)
169+
}
170+
171+
@Test
172+
fun testFatalExceptionInSingle() = runTest {
173+
GlobalScope.rxCompletable(Dispatchers.Unconfined) {
174+
throw LinkageError()
175+
}.subscribe({ expectUnreached() }, { expect(1); assertTrue(it is LinkageError) })
176+
finish(2)
177+
}
161178
}

reactive/kotlinx-coroutines-rx2/test/ConvertTest.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
2+
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
33
*/
44

55
package kotlinx.coroutines.rx2

reactive/kotlinx-coroutines-rx2/test/MaybeTest.kt

+17
Original file line numberDiff line numberDiff line change
@@ -288,4 +288,21 @@ class MaybeTest : TestBase() {
288288
yield() // run coroutine
289289
finish(5)
290290
}
291+
292+
@Test
293+
fun testFatalExceptionInSubscribe() = runTest {
294+
GlobalScope.rxMaybe(Dispatchers.Unconfined + CoroutineExceptionHandler{ _, e -> assertTrue(e is LinkageError); expect(2)}) {
295+
expect(1)
296+
42
297+
}.subscribe({ throw LinkageError() })
298+
finish(3)
299+
}
300+
301+
@Test
302+
fun testFatalExceptionInSingle() = runTest {
303+
GlobalScope.rxMaybe(Dispatchers.Unconfined) {
304+
throw LinkageError()
305+
}.subscribe({ expectUnreached() }, { expect(1); assertTrue(it is LinkageError) })
306+
finish(2)
307+
}
291308
}

0 commit comments

Comments
 (0)