Skip to content

Commit 182b8f9

Browse files
committed
Report fatal errors to both onError (to comply the spec fully) and to coroutine context (to eagerly fail as fatal exceptions are not recoverable anyway), cleanup tests after scopeless reactive
1 parent c5fe42a commit 182b8f9

File tree

6 files changed

+78
-250
lines changed

6 files changed

+78
-250
lines changed

kotlinx-coroutines-core/common/test/flow/terminal/SingleTest.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ package kotlinx.coroutines.flow
77
import kotlinx.coroutines.*
88
import kotlin.test.*
99

10-
class SingleTest : TestBase() {
10+
class SingleTest : TestBase() {
1111

1212
@Test
1313
fun testSingle() = runTest {

reactive/kotlinx-coroutines-reactive/src/Publish.kt

+8-9
Original file line numberDiff line numberDiff line change
@@ -163,13 +163,13 @@ private class PublisherCoroutine<in T>(
163163
val current = _nRequested.value
164164
if (current < 0) break // closed from inside onNext => unlock
165165
if (current == Long.MAX_VALUE) break // no back-pressure => unlock
166-
val upd = current - 1
167-
if (_nRequested.compareAndSet(current, upd)) {
168-
if (upd == 0L) {
166+
val updated = current - 1
167+
if (_nRequested.compareAndSet(current, updated)) {
168+
if (updated == 0L) {
169169
// return to keep locked due to back-pressure
170170
return
171171
}
172-
break // unlock if upd > 0
172+
break // unlock if updated > 0
173173
}
174174
}
175175
unlockAndCheckCompleted()
@@ -206,18 +206,17 @@ private class PublisherCoroutine<in T>(
206206
/*
207207
* Reactive frameworks have two types of exceptions: regular and fatal.
208208
* Regular are passed to onError.
209-
* Fatal can be passed to onError, but implementation **is free to swallow it** (e.g. see #1297).
209+
* Fatal can be passed to onError, but even the standard implementations **can just swallow it** (e.g. see #1297).
210210
* Such behaviour is inconsistent, leads to silent failures and we can't possibly know whether
211211
* the cause will be handled by onError (and moreover, it depends on whether a fatal exception was
212212
* thrown by subscriber or upstream).
213213
* To make behaviour consistent and least surprising, we always handle fatal exceptions
214214
* by coroutines machinery, anyway, they should not be present in regular program flow,
215215
* thus our goal here is just to expose it as soon as possible.
216216
*/
217-
if (cause.isFatal()) {
218-
if (!handled) handleCoroutineException(context, cause)
219-
} else {
220-
subscriber.onError(cause)
217+
subscriber.onError(cause)
218+
if (!handled && cause.isFatal()) {
219+
handleCoroutineException(context, cause)
221220
}
222221
} else {
223222
subscriber.onComplete()

reactive/kotlinx-coroutines-rx2/src/RxObservable.kt

+20-15
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
package kotlinx.coroutines.rx2
88

99
import io.reactivex.*
10+
import io.reactivex.exceptions.*
1011
import kotlinx.atomicfu.*
1112
import kotlinx.coroutines.*
1213
import kotlinx.coroutines.channels.*
@@ -159,20 +160,19 @@ private class RxObservableCoroutine<T: Any>(
159160
try {
160161
if (cause != null && cause !is CancellationException) {
161162
/*
162-
* Reactive frameworks have two types of exceptions: regular and fatal.
163-
* Regular are passed to onError.
164-
* Fatal can be passed to onError, but implementation **is free to swallow it** (e.g. see #1297).
165-
* Such behaviour is inconsistent, leads to silent failures and we can't possibly know whether
166-
* the cause will be handled by onError (and moreover, it depends on whether a fatal exception was
167-
* thrown by subscriber or upstream).
168-
* To make behaviour consistent and least surprising, we always handle fatal exceptions
169-
* by coroutines machinery, anyway, they should not be present in regular program flow,
170-
* thus our goal here is just to expose it as soon as possible.
171-
*/
172-
if (cause.isFatal()) {
173-
if (!handled) handleCoroutineException(context, cause)
174-
} else {
175-
subscriber.onError(cause)
163+
* Reactive frameworks have two types of exceptions: regular and fatal.
164+
* Regular are passed to onError.
165+
* Fatal can be passed to onError, but even the standard implementations **can just swallow it** (e.g. see #1297).
166+
* Such behaviour is inconsistent, leads to silent failures and we can't possibly know whether
167+
* the cause will be handled by onError (and moreover, it depends on whether a fatal exception was
168+
* thrown by subscriber or upstream).
169+
* To make behaviour consistent and least surprising, we always handle fatal exceptions
170+
* by coroutines machinery, anyway, they should not be present in regular program flow,
171+
* thus our goal here is just to expose it as soon as possible.
172+
*/
173+
subscriber.onError(cause)
174+
if (!handled && cause.isFatal()) {
175+
handleCoroutineException(context, cause)
176176
}
177177
}
178178
else {
@@ -203,4 +203,9 @@ private class RxObservableCoroutine<T: Any>(
203203
}
204204
}
205205

206-
internal fun Throwable.isFatal() = this is VirtualMachineError || this is ThreadDeath || this is LinkageError
206+
internal fun Throwable.isFatal() = try {
207+
Exceptions.throwIfFatal(this) // Rx-consistent behaviour without hardcode
208+
false
209+
} catch (e: Throwable) {
210+
true
211+
}

reactive/kotlinx-coroutines-rx2/test/ConvertTest.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ class ConvertTest : TestBase() {
143143
val single = rxSingle(Dispatchers.Unconfined) {
144144
var result = ""
145145
try {
146-
observable.consumeEach { result += it }
146+
observable.collect { result += it }
147147
} catch(e: Throwable) {
148148
check(e is TestException)
149149
result += e.message

reactive/kotlinx-coroutines-rx2/test/FlowableExceptionHandlingTest.kt

+25-113
Original file line numberDiff line numberDiff line change
@@ -16,96 +16,42 @@ class FlowableExceptionHandlingTest : TestBase() {
1616
ignoreLostThreads("RxComputationThreadPool-", "RxCachedWorkerPoolEvictor-", "RxSchedulerPurge-")
1717
}
1818

19-
@Test
20-
fun testException() = runTest(expected = { it is TestException }) {
21-
rxFlowable<Int>(Dispatchers.Unconfined) {
22-
expect(1)
23-
throw TestException()
24-
}.subscribe({
25-
expectUnreached()
26-
}, {
27-
expect(2) // Reported to onError
28-
})
29-
finish(3)
19+
private inline fun <reified T : Throwable> ceh(expect: Int) = CoroutineExceptionHandler { _, t ->
20+
assertTrue(t is T)
21+
expect(expect)
3022
}
3123

32-
@Test
33-
fun testFatalException() = runTest(expected = { it is LinkageError }) {
34-
rxFlowable<Int>(Dispatchers.Unconfined + CoroutineExceptionHandler { _, _ -> expectUnreached() }) {
35-
expect(1)
36-
throw LinkageError()
37-
}.subscribe({
38-
expectUnreached()
39-
}, {
40-
expectUnreached() // Fatal exception is not reported in onError
41-
})
42-
finish(2)
43-
}
24+
private fun cehUnreached() = CoroutineExceptionHandler { _, _ -> expectUnreached() }
4425

4526
@Test
46-
fun testExceptionWithoutParent() = runTest {
47-
GlobalScope.rxFlowable<Int>(Dispatchers.Unconfined + CoroutineExceptionHandler { _, _ -> expectUnreached() }) {
27+
fun testException() = runTest {
28+
rxFlowable<Int>(Dispatchers.Unconfined + cehUnreached()) {
4829
expect(1)
4930
throw TestException()
5031
}.subscribe({
5132
expectUnreached()
5233
}, {
53-
assertTrue(it is TestException)
5434
expect(2) // Reported to onError
5535
})
5636
finish(3)
5737
}
5838

5939
@Test
60-
fun testFatalExceptionWithoutParent() = runTest {
61-
GlobalScope.rxFlowable<Int>(Dispatchers.Unconfined + CoroutineExceptionHandler { _, e ->
62-
assertTrue(e is LinkageError); expect(
63-
2
64-
)
65-
}) {
40+
fun testFatalException() = runTest {
41+
rxFlowable<Int>(Dispatchers.Unconfined + ceh<LinkageError>(3)) {
6642
expect(1)
6743
throw LinkageError()
6844
}.subscribe({
6945
expectUnreached()
7046
}, {
71-
expectUnreached() // Fatal exception is not reported in onError
47+
expect(2) // Fatal exception is reported to both onError and CEH
7248
})
73-
finish(3)
74-
}
75-
76-
@Test
77-
fun testExceptionAsynchronous() = runTest(expected = { it is TestException }) {
78-
rxFlowable<Int>(Dispatchers.Unconfined) {
79-
expect(1)
80-
throw TestException()
81-
}.publish()
82-
.refCount()
83-
.subscribe({
84-
expectUnreached()
85-
}, {
86-
expect(2) // Reported to onError
87-
})
88-
finish(3)
89-
}
90-
91-
@Test
92-
fun testFatalExceptionAsynchronous() = runTest(expected = { it is LinkageError }) {
93-
rxFlowable<Int>(Dispatchers.Unconfined + CoroutineExceptionHandler { _, _ -> expectUnreached() }) {
94-
expect(1)
95-
throw LinkageError()
96-
}.publish()
97-
.refCount()
98-
.subscribe({
99-
expectUnreached()
100-
}, {
101-
expectUnreached() // Fatal exception is not reported in onError
102-
})
103-
finish(2)
49+
finish(4)
10450
}
10551

10652
@Test
107-
fun testExceptionAsynchronousWithoutParent() = runTest {
108-
GlobalScope.rxFlowable<Int>(Dispatchers.Unconfined + CoroutineExceptionHandler { _, _ -> expectUnreached() }) {
53+
fun testExceptionAsynchronous() = runTest {
54+
rxFlowable<Int>(Dispatchers.Unconfined + cehUnreached()) {
10955
expect(1)
11056
throw TestException()
11157
}.publish()
@@ -119,37 +65,35 @@ class FlowableExceptionHandlingTest : TestBase() {
11965
}
12066

12167
@Test
122-
fun testFatalExceptionAsynchronousWithoutParent() = runTest {
123-
GlobalScope.rxFlowable<Int>(Dispatchers.Unconfined + CoroutineExceptionHandler { _, e ->
124-
assertTrue(e is LinkageError); expect(2)
125-
}) {
68+
fun testFatalExceptionAsynchronous() = runTest {
69+
rxFlowable<Int>(Dispatchers.Unconfined + ceh<LinkageError>(3)) {
12670
expect(1)
12771
throw LinkageError()
12872
}.publish()
12973
.refCount()
13074
.subscribe({
13175
expectUnreached()
13276
}, {
133-
expectUnreached() // Fatal exception is not reported in onError
77+
expect(2)
13478
})
135-
finish(3)
79+
finish(4)
13680
}
13781

13882
@Test
139-
fun testFatalExceptionFromSubscribe() = runTest(expected = { it is LinkageError }) {
140-
rxFlowable(Dispatchers.Unconfined) {
83+
fun testFatalExceptionFromSubscribe() = runTest {
84+
rxFlowable(Dispatchers.Unconfined + ceh<LinkageError>(4)) {
14185
expect(1)
14286
send(Unit)
14387
}.subscribe({
14488
expect(2)
14589
throw LinkageError()
146-
}, { expectUnreached() }) // Unreached because fatal errors are rethrown
147-
finish(3)
90+
}, { expect(3) }) // Fatal exception is reported to both onError and CEH
91+
finish(5)
14892
}
14993

15094
@Test
15195
fun testExceptionFromSubscribe() = runTest {
152-
rxFlowable(Dispatchers.Unconfined) {
96+
rxFlowable(Dispatchers.Unconfined + cehUnreached()) {
15397
expect(1)
15498
send(Unit)
15599
}.subscribe({
@@ -161,7 +105,7 @@ class FlowableExceptionHandlingTest : TestBase() {
161105

162106
@Test
163107
fun testAsynchronousExceptionFromSubscribe() = runTest {
164-
rxFlowable(Dispatchers.Unconfined) {
108+
rxFlowable(Dispatchers.Unconfined + cehUnreached()) {
165109
expect(1)
166110
send(Unit)
167111
}.publish()
@@ -174,8 +118,8 @@ class FlowableExceptionHandlingTest : TestBase() {
174118
}
175119

176120
@Test
177-
fun testAsynchronousFatalExceptionFromSubscribe() = runTest(expected = { it is LinkageError }) {
178-
rxFlowable(Dispatchers.Unconfined) {
121+
fun testAsynchronousFatalExceptionFromSubscribe() = runTest {
122+
rxFlowable(Dispatchers.Unconfined + ceh<LinkageError>(3)) {
179123
expect(1)
180124
send(Unit)
181125
}.publish()
@@ -184,38 +128,6 @@ class FlowableExceptionHandlingTest : TestBase() {
184128
expect(2)
185129
throw LinkageError()
186130
}, { expectUnreached() })
187-
finish(3)
131+
finish(4)
188132
}
189-
190-
@Test
191-
fun testAsynchronousExceptionFromSubscribeWithoutParent() =
192-
runTest {
193-
GlobalScope.rxFlowable(Dispatchers.Unconfined) {
194-
expect(1)
195-
send(Unit)
196-
}.publish()
197-
.refCount()
198-
.subscribe({
199-
expect(2)
200-
throw RuntimeException()
201-
}, { expect(3) })
202-
finish(4)
203-
}
204-
205-
@Test
206-
fun testAsynchronousFatalExceptionFromSubscribeWithoutParent() =
207-
runTest {
208-
GlobalScope.rxFlowable(Dispatchers.Unconfined + CoroutineExceptionHandler { _, e ->
209-
assertTrue(e is LinkageError); expect(3)
210-
}) {
211-
expect(1)
212-
send(Unit)
213-
}.publish()
214-
.refCount()
215-
.subscribe({
216-
expect(2)
217-
throw LinkageError()
218-
}, { expectUnreached() })
219-
finish(4)
220-
}
221133
}

0 commit comments

Comments
 (0)