Skip to content

Commit 0bc2fc2

Browse files
committed
Consistently handle fatal exceptions in PublisherCoroutine
Fixes #1297
1 parent 7acaae6 commit 0bc2fc2

File tree

3 files changed

+252
-14
lines changed

3 files changed

+252
-14
lines changed

reactive/kotlinx-coroutines-reactive/src/Publish.kt

+30-13
Original file line numberDiff line numberDiff line change
@@ -176,17 +176,32 @@ private class PublisherCoroutine<in T>(
176176
if (cancelled) {
177177
// If the parent had failed to handle our exception, then we must not lose this exception
178178
if (cause != null && !handled) handleCoroutineException(context, cause)
179-
} else {
180-
try {
181-
if (cause != null && cause !is CancellationException) {
179+
return
180+
}
181+
182+
try {
183+
if (cause != null && cause !is CancellationException) {
184+
/*
185+
* Reactive frameworks have two types of exceptions: regular and fatal.
186+
* Regular are passed to onError.
187+
* Fatal can be passed to onError, but implementation **is free to swallow it** (e.g. see #1297).
188+
* Such behaviour is inconsistent, leads to silent failures and we can't possibly know whether
189+
* the cause will be handled by onError (and moreover, it depends on whether a fatal exception was
190+
* thrown by subscriber or upstream).
191+
* To make behaviour consistent and least surprising, we always handle fatal exceptions
192+
* by coroutines machinery, anyway, they should not be present in regular program flow,
193+
* thus our goal here is just to expose it as soon as possible.
194+
*/
195+
if (cause.isFatal()) {
196+
if (!handled) handleCoroutineException(context, cause)
197+
} else {
182198
subscriber.onError(cause)
183199
}
184-
else {
185-
subscriber.onComplete()
186-
}
187-
} catch (e: Throwable) {
188-
handleCoroutineException(context, e)
200+
} else {
201+
subscriber.onComplete()
189202
}
203+
} catch (e: Throwable) {
204+
handleCoroutineException(context, e)
190205
}
191206
}
192207
} finally {
@@ -220,12 +235,12 @@ private class PublisherCoroutine<in T>(
220235
// assert: isCompleted
221236
private fun signalCompleted(cause: Throwable?, handled: Boolean) {
222237
while (true) { // lock-free loop for nRequested
223-
val cur = _nRequested.value
224-
if (cur == SIGNALLED) return // some other thread holding lock already signalled cancellation/completion
225-
check(cur >= 0) // no other thread could have marked it as CLOSED, because onCompleted[Exceptionally] is invoked once
226-
if (!_nRequested.compareAndSet(cur, CLOSED)) continue // retry on failed CAS
238+
val current = _nRequested.value
239+
if (current == SIGNALLED) return // some other thread holding lock already signalled cancellation/completion
240+
check(current >= 0) // no other thread could have marked it as CLOSED, because onCompleted[Exceptionally] is invoked once
241+
if (!_nRequested.compareAndSet(current, CLOSED)) continue // retry on failed CAS
227242
// Ok -- marked as CLOSED, now can unlock the mutex if it was locked due to backpressure
228-
if (cur == 0L) {
243+
if (current == 0L) {
229244
doLockedSignalCompleted(cause, handled)
230245
} else {
231246
// otherwise mutex was either not locked or locked in concurrent onNext... try lock it to signal completion
@@ -250,4 +265,6 @@ private class PublisherCoroutine<in T>(
250265
cancelled = true
251266
super.cancel(null)
252267
}
268+
269+
private fun Throwable.isFatal() = this is VirtualMachineError || this is ThreadDeath || this is LinkageError
253270
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,221 @@
1+
/*
2+
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.rx2
6+
7+
import kotlinx.coroutines.*
8+
import org.junit.*
9+
import org.junit.Test
10+
import kotlin.test.*
11+
12+
class FlowableExceptionHandlingTest : TestBase() {
13+
14+
@Before
15+
fun setup() {
16+
ignoreLostThreads("RxComputationThreadPool-", "RxCachedWorkerPoolEvictor-", "RxSchedulerPurge-")
17+
}
18+
19+
@Test
20+
fun testException() = runTest(expected = { it is TestException }) {
21+
rxFlowable<Int>(Dispatchers.Unconfined) {
22+
expect(1)
23+
throw TestException()
24+
}.subscribe({
25+
expectUnreached()
26+
}, {
27+
expect(2) // Reported to onError
28+
})
29+
finish(3)
30+
}
31+
32+
@Test
33+
fun testFatalException() = runTest(expected = { it is LinkageError }) {
34+
rxFlowable<Int>(Dispatchers.Unconfined + CoroutineExceptionHandler { _, _ -> expectUnreached() }) {
35+
expect(1)
36+
throw LinkageError()
37+
}.subscribe({
38+
expectUnreached()
39+
}, {
40+
expectUnreached() // Fatal exception is not reported in onError
41+
})
42+
finish(2)
43+
}
44+
45+
@Test
46+
fun testExceptionWithoutParent() = runTest {
47+
GlobalScope.rxFlowable<Int>(Dispatchers.Unconfined + CoroutineExceptionHandler { _, _ -> expectUnreached() }) {
48+
expect(1)
49+
throw TestException()
50+
}.subscribe({
51+
expectUnreached()
52+
}, {
53+
assertTrue(it is TestException)
54+
expect(2) // Reported to onError
55+
})
56+
finish(3)
57+
}
58+
59+
@Test
60+
fun testFatalExceptionWithoutParent() = runTest {
61+
GlobalScope.rxFlowable<Int>(Dispatchers.Unconfined + CoroutineExceptionHandler { _, e ->
62+
assertTrue(e is LinkageError); expect(
63+
2
64+
)
65+
}) {
66+
expect(1)
67+
throw LinkageError()
68+
}.subscribe({
69+
expectUnreached()
70+
}, {
71+
expectUnreached() // Fatal exception is not reported in onError
72+
})
73+
finish(3)
74+
}
75+
76+
@Test
77+
fun testExceptionAsynchronous() = runTest(expected = { it is TestException }) {
78+
rxFlowable<Int>(Dispatchers.Unconfined) {
79+
expect(1)
80+
throw TestException()
81+
}.publish()
82+
.refCount()
83+
.subscribe({
84+
expectUnreached()
85+
}, {
86+
expect(2) // Reported to onError
87+
})
88+
finish(3)
89+
}
90+
91+
@Test
92+
fun testFatalExceptionAsynchronous() = runTest(expected = { it is LinkageError }) {
93+
rxFlowable<Int>(Dispatchers.Unconfined + CoroutineExceptionHandler { _, _ -> expectUnreached() }) {
94+
expect(1)
95+
throw LinkageError()
96+
}.publish()
97+
.refCount()
98+
.subscribe({
99+
expectUnreached()
100+
}, {
101+
expectUnreached() // Fatal exception is not reported in onError
102+
})
103+
finish(2)
104+
}
105+
106+
@Test
107+
fun testExceptionAsynchronousWithoutParent() = runTest {
108+
GlobalScope.rxFlowable<Int>(Dispatchers.Unconfined + CoroutineExceptionHandler { _, _ -> expectUnreached() }) {
109+
expect(1)
110+
throw TestException()
111+
}.publish()
112+
.refCount()
113+
.subscribe({
114+
expectUnreached()
115+
}, {
116+
expect(2) // Reported to onError
117+
})
118+
finish(3)
119+
}
120+
121+
@Test
122+
fun testFatalExceptionAsynchronousWithoutParent() = runTest {
123+
GlobalScope.rxFlowable<Int>(Dispatchers.Unconfined + CoroutineExceptionHandler { _, e ->
124+
assertTrue(e is LinkageError); expect(2)
125+
}) {
126+
expect(1)
127+
throw LinkageError()
128+
}.publish()
129+
.refCount()
130+
.subscribe({
131+
expectUnreached()
132+
}, {
133+
expectUnreached() // Fatal exception is not reported in onError
134+
})
135+
finish(3)
136+
}
137+
138+
@Test
139+
fun testFatalExceptionFromSubscribe() = runTest(expected = { it is LinkageError }) {
140+
rxFlowable(Dispatchers.Unconfined) {
141+
expect(1)
142+
send(Unit)
143+
}.subscribe({
144+
expect(2)
145+
throw LinkageError()
146+
}, { expectUnreached() }) // Unreached because fatal errors are rethrown
147+
finish(3)
148+
}
149+
150+
@Test
151+
fun testExceptionFromSubscribe() = runTest {
152+
rxFlowable(Dispatchers.Unconfined) {
153+
expect(1)
154+
send(Unit)
155+
}.subscribe({
156+
expect(2)
157+
throw TestException()
158+
}, { expect(3) }) // not reported to onError because came from the subscribe itself
159+
finish(4)
160+
}
161+
162+
@Test
163+
fun testAsynchronousExceptionFromSubscribe() = runTest {
164+
rxFlowable(Dispatchers.Unconfined) {
165+
expect(1)
166+
send(Unit)
167+
}.publish()
168+
.refCount()
169+
.subscribe({
170+
expect(2)
171+
throw RuntimeException()
172+
}, { expect(3) })
173+
finish(4)
174+
}
175+
176+
@Test
177+
fun testAsynchronousFatalExceptionFromSubscribe() = runTest(expected = { it is LinkageError }) {
178+
rxFlowable(Dispatchers.Unconfined) {
179+
expect(1)
180+
send(Unit)
181+
}.publish()
182+
.refCount()
183+
.subscribe({
184+
expect(2)
185+
throw LinkageError()
186+
}, { expectUnreached() })
187+
finish(3)
188+
}
189+
190+
@Test
191+
fun testAsynchronousExceptionFromSubscribeWithoutParent() =
192+
runTest {
193+
GlobalScope.rxFlowable(Dispatchers.Unconfined) {
194+
expect(1)
195+
send(Unit)
196+
}.publish()
197+
.refCount()
198+
.subscribe({
199+
expect(2)
200+
throw RuntimeException()
201+
}, { expect(3) })
202+
finish(4)
203+
}
204+
205+
@Test
206+
fun testAsynchronousFatalExceptionFromSubscribeWithoutParent() =
207+
runTest {
208+
GlobalScope.rxFlowable(Dispatchers.Unconfined + CoroutineExceptionHandler { _, e ->
209+
assertTrue(e is LinkageError); expect(3)
210+
}) {
211+
expect(1)
212+
send(Unit)
213+
}.publish()
214+
.refCount()
215+
.subscribe({
216+
expect(2)
217+
throw LinkageError()
218+
}, { expectUnreached() })
219+
finish(4)
220+
}
221+
}

reactive/kotlinx-coroutines-rx2/test/MaybeTest.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ class MaybeTest : TestBase() {
3131
expect(2)
3232
maybe.subscribe { value ->
3333
expect(5)
34-
Assert.assertThat(value, IsEqual("OK"))
34+
assertThat(value, IsEqual("OK"))
3535
}
3636
expect(3)
3737
yield() // to started coroutine

0 commit comments

Comments
 (0)