Skip to content

Commit c5fe42a

Browse files
committed
Properly handle fatal exceptions in Rx coroutines, get rid of deadlock in RxObservable
1 parent 4fbe388 commit c5fe42a

File tree

10 files changed

+344
-22
lines changed

10 files changed

+344
-22
lines changed

reactive/kotlinx-coroutines-reactive/src/Publish.kt

+5-5
Original file line numberDiff line numberDiff line change
@@ -160,11 +160,11 @@ private class PublisherCoroutine<in T>(
160160
}
161161
// now update nRequested
162162
while (true) { // lock-free loop on nRequested
163-
val cur = _nRequested.value
164-
if (cur < 0) break // closed from inside onNext => unlock
165-
if (cur == Long.MAX_VALUE) break // no back-pressure => unlock
166-
val upd = cur - 1
167-
if (_nRequested.compareAndSet(cur, upd)) {
163+
val current = _nRequested.value
164+
if (current < 0) break // closed from inside onNext => unlock
165+
if (current == Long.MAX_VALUE) break // no back-pressure => unlock
166+
val upd = current - 1
167+
if (_nRequested.compareAndSet(current, upd)) {
168168
if (upd == 0L) {
169169
// return to keep locked due to back-pressure
170170
return

reactive/kotlinx-coroutines-rx2/src/RxCompletable.kt

+10-2
Original file line numberDiff line numberDiff line change
@@ -61,12 +61,20 @@ private class RxCompletableCoroutine(
6161
private val subscriber: CompletableEmitter
6262
) : AbstractCoroutine<Unit>(parentContext, true) {
6363
override fun onCompleted(value: Unit) {
64-
if (!subscriber.isDisposed) subscriber.onComplete()
64+
try {
65+
if (!subscriber.isDisposed) subscriber.onComplete()
66+
} catch (e: Throwable) {
67+
handleCoroutineException(context, e)
68+
}
6569
}
6670

6771
override fun onCancelled(cause: Throwable, handled: Boolean) {
6872
if (!subscriber.isDisposed) {
69-
subscriber.onError(cause)
73+
try {
74+
subscriber.onError(cause)
75+
} catch (e: Throwable) {
76+
handleCoroutineException(context, e)
77+
}
7078
} else if (!handled) {
7179
handleCoroutineException(context, cause)
7280
}

reactive/kotlinx-coroutines-rx2/src/RxMaybe.kt

+10-2
Original file line numberDiff line numberDiff line change
@@ -63,13 +63,21 @@ private class RxMaybeCoroutine<T>(
6363
) : AbstractCoroutine<T>(parentContext, true) {
6464
override fun onCompleted(value: T) {
6565
if (!subscriber.isDisposed) {
66-
if (value == null) subscriber.onComplete() else subscriber.onSuccess(value)
66+
try {
67+
if (value == null) subscriber.onComplete() else subscriber.onSuccess(value)
68+
} catch(e: Throwable) {
69+
handleCoroutineException(context, e)
70+
}
6771
}
6872
}
6973

7074
override fun onCancelled(cause: Throwable, handled: Boolean) {
7175
if (!subscriber.isDisposed) {
72-
subscriber.onError(cause)
76+
try {
77+
subscriber.onError(cause)
78+
} catch (e: Throwable) {
79+
handleCoroutineException(context, e)
80+
}
7381
} else if (!handled) {
7482
handleCoroutineException(context, cause)
7583
}

reactive/kotlinx-coroutines-rx2/src/RxObservable.kt

+32-10
Original file line numberDiff line numberDiff line change
@@ -131,15 +131,19 @@ private class RxObservableCoroutine<T: Any>(
131131
// to abort the corresponding send/offer invocation. From the standpoint of coroutines machinery,
132132
// this failure is essentially equivalent to a failure of a child coroutine.
133133
cancelCoroutine(e)
134-
doLockedSignalCompleted(e, false)
134+
mutex.unlock()
135135
throw e
136136
}
137137
/*
138-
There is no sense to check for `isActive` before doing `unlock`, because cancellation/completion might
139-
happen after this check and before `unlock` (see signalCompleted that does not do anything
140-
if it fails to acquire the lock that we are still holding).
141-
We have to recheck `isCompleted` after `unlock` anyway.
138+
* There is no sense to check for `isActive` before doing `unlock`, because cancellation/completion might
139+
* happen after this check and before `unlock` (see signalCompleted that does not do anything
140+
* if it fails to acquire the lock that we are still holding).
141+
* We have to recheck `isCompleted` after `unlock` anyway.
142142
*/
143+
unlockAndCheckCompleted()
144+
}
145+
146+
private fun unlockAndCheckCompleted() {
143147
mutex.unlock()
144148
// recheck isActive
145149
if (!isActive && mutex.tryLock())
@@ -148,16 +152,32 @@ private class RxObservableCoroutine<T: Any>(
148152

149153
// assert: mutex.isLocked()
150154
private fun doLockedSignalCompleted(cause: Throwable?, handled: Boolean) {
151-
// todo: handled is ignored here, might need something like in PublisherCoroutine to process
152155
// cancellation failures
153156
try {
154157
if (_signal.value >= CLOSED) {
155158
_signal.value = SIGNALLED // we'll signal onError/onCompleted (that the final state -- no CAS needed)
156159
try {
157-
if (cause != null && cause !is CancellationException)
158-
subscriber.onError(cause)
159-
else
160+
if (cause != null && cause !is CancellationException) {
161+
/*
162+
* Reactive frameworks have two types of exceptions: regular and fatal.
163+
* Regular are passed to onError.
164+
* Fatal can be passed to onError, but implementation **is free to swallow it** (e.g. see #1297).
165+
* Such behaviour is inconsistent, leads to silent failures and we can't possibly know whether
166+
* the cause will be handled by onError (and moreover, it depends on whether a fatal exception was
167+
* thrown by subscriber or upstream).
168+
* To make behaviour consistent and least surprising, we always handle fatal exceptions
169+
* by coroutines machinery, anyway, they should not be present in regular program flow,
170+
* thus our goal here is just to expose it as soon as possible.
171+
*/
172+
if (cause.isFatal()) {
173+
if (!handled) handleCoroutineException(context, cause)
174+
} else {
175+
subscriber.onError(cause)
176+
}
177+
}
178+
else {
160179
subscriber.onComplete()
180+
}
161181
} catch (e: Throwable) {
162182
// Unhandled exception (cannot handle in other way, since we are already complete)
163183
handleCoroutineException(context, e)
@@ -181,4 +201,6 @@ private class RxObservableCoroutine<T: Any>(
181201
override fun onCancelled(cause: Throwable, handled: Boolean) {
182202
signalCompleted(cause, handled)
183203
}
184-
}
204+
}
205+
206+
internal fun Throwable.isFatal() = this is VirtualMachineError || this is ThreadDeath || this is LinkageError

reactive/kotlinx-coroutines-rx2/src/RxSingle.kt

+10-2
Original file line numberDiff line numberDiff line change
@@ -61,12 +61,20 @@ private class RxSingleCoroutine<T: Any>(
6161
private val subscriber: SingleEmitter<T>
6262
) : AbstractCoroutine<T>(parentContext, true) {
6363
override fun onCompleted(value: T) {
64-
if (!subscriber.isDisposed) subscriber.onSuccess(value)
64+
try {
65+
if (!subscriber.isDisposed) subscriber.onSuccess(value)
66+
} catch (e: Throwable) {
67+
handleCoroutineException(context, e)
68+
}
6569
}
6670

6771
override fun onCancelled(cause: Throwable, handled: Boolean) {
6872
if (!subscriber.isDisposed) {
69-
subscriber.onError(cause)
73+
try {
74+
subscriber.onError(cause)
75+
} catch (e: Throwable) {
76+
handleCoroutineException(context, e)
77+
}
7078
} else if (!handled) {
7179
handleCoroutineException(context, cause)
7280
}

reactive/kotlinx-coroutines-rx2/test/CompletableTest.kt

+17
Original file line numberDiff line numberDiff line change
@@ -147,4 +147,21 @@ class CompletableTest : TestBase() {
147147
yield() // run coroutine
148148
finish(6)
149149
}
150+
151+
@Test
152+
fun testFatalExceptionInSubscribe() = runTest {
153+
GlobalScope.rxCompletable(Dispatchers.Unconfined + CoroutineExceptionHandler{ _, e -> assertTrue(e is LinkageError); expect(2)}) {
154+
expect(1)
155+
42
156+
}.subscribe({ throw LinkageError() })
157+
finish(3)
158+
}
159+
160+
@Test
161+
fun testFatalExceptionInSingle() = runTest {
162+
GlobalScope.rxCompletable(Dispatchers.Unconfined) {
163+
throw LinkageError()
164+
}.subscribe({ expectUnreached() }, { expect(1); assertTrue(it is LinkageError) })
165+
finish(2)
166+
}
150167
}

reactive/kotlinx-coroutines-rx2/test/ConvertTest.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
2+
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
33
*/
44

55
package kotlinx.coroutines.rx2

reactive/kotlinx-coroutines-rx2/test/MaybeTest.kt

+17
Original file line numberDiff line numberDiff line change
@@ -277,4 +277,21 @@ class MaybeTest : TestBase() {
277277
yield() // run coroutine
278278
finish(6)
279279
}
280+
281+
@Test
282+
fun testFatalExceptionInSubscribe() = runTest {
283+
GlobalScope.rxMaybe(Dispatchers.Unconfined + CoroutineExceptionHandler{ _, e -> assertTrue(e is LinkageError); expect(2)}) {
284+
expect(1)
285+
42
286+
}.subscribe({ throw LinkageError() })
287+
finish(3)
288+
}
289+
290+
@Test
291+
fun testFatalExceptionInSingle() = runTest {
292+
GlobalScope.rxMaybe(Dispatchers.Unconfined) {
293+
throw LinkageError()
294+
}.subscribe({ expectUnreached() }, { expect(1); assertTrue(it is LinkageError) })
295+
finish(2)
296+
}
280297
}

0 commit comments

Comments
 (0)