Skip to content

Commit 4ba9802

Browse files
committed
Fix a race in some tests for JavaRX integration
An extremely rare race could happen in any of the tests in `LeakedExceptionTest` in the following case: * `withExceptionHandler` runs the block passed to it; * In one of the last iterations of `repeat`, `select` in `combine` happens on both flows at the same time, that is, the block that was passed to `rx[Something]` runs in two threads simultaneously; * One of these two threads (thread A) runs anomalously slow; * The other thread successfully throws an exception; * This exception is propagated to `catch`, so `collect` is finished; * `repeat` is exited, the block passed to `withExceptionHandler` is done executing; * `withExceptionHandler` sets back the usual exception handler, which fails when an exception in JavaRX happens (see https://github.com/ReactiveX/RxJava/wiki/What's-different-in-2.0#error-handling); * Thread A wakes up and throws an exception. This time, it is passed not to `handler`, which is made specifically to deal with this, but to the default handler. The fix is simple: now a special coroutine context passed to `rx[Something]` ensures that the spawned executions are finished before the block passed to `withExceptionHandler` is exited.
1 parent 0126dba commit 4ba9802

File tree

1 file changed

+35
-5
lines changed

1 file changed

+35
-5
lines changed

reactive/kotlinx-coroutines-rx2/test/LeakedExceptionTest.kt

+35-5
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,10 @@ package kotlinx.coroutines.rx2
66

77
import io.reactivex.*
88
import io.reactivex.exceptions.*
9-
import io.reactivex.plugins.*
109
import kotlinx.coroutines.*
1110
import kotlinx.coroutines.flow.*
1211
import kotlinx.coroutines.reactive.*
1312
import org.junit.Test
14-
import java.io.*
1513
import kotlin.test.*
1614

1715
// Check that exception is not leaked to the global exception handler
@@ -22,8 +20,9 @@ class LeakedExceptionTest : TestBase() {
2220

2321
@Test
2422
fun testSingle() = withExceptionHandler(handler) {
25-
val flow = rxSingle<Unit> { throw TestException() }.toFlowable().asFlow()
2623
runBlocking {
24+
val dispatcher = wrapperDispatcher()
25+
val flow = rxSingle<Unit>(dispatcher) { throw TestException() }.toFlowable().asFlow()
2726
repeat(10000) {
2827
combine(flow, flow) { _, _ -> Unit }
2928
.catch {}
@@ -34,8 +33,11 @@ class LeakedExceptionTest : TestBase() {
3433

3534
@Test
3635
fun testObservable() = withExceptionHandler(handler) {
37-
val flow = rxObservable<Unit> { throw TestException() }.toFlowable(BackpressureStrategy.BUFFER).asFlow()
3836
runBlocking {
37+
val dispatcher = wrapperDispatcher()
38+
val flow = rxObservable<Unit>(dispatcher) { throw TestException() }
39+
.toFlowable(BackpressureStrategy.BUFFER)
40+
.asFlow()
3941
repeat(10000) {
4042
combine(flow, flow) { _, _ -> Unit }
4143
.catch {}
@@ -46,13 +48,41 @@ class LeakedExceptionTest : TestBase() {
4648

4749
@Test
4850
fun testFlowable() = withExceptionHandler(handler) {
49-
val flow = rxFlowable<Unit> { throw TestException() }.asFlow()
5051
runBlocking {
52+
val dispatcher = wrapperDispatcher()
53+
val flow = rxFlowable<Unit>(dispatcher) { throw TestException() }.asFlow()
5154
repeat(10000) {
5255
combine(flow, flow) { _, _ -> Unit }
5356
.catch {}
5457
.collect { }
5558
}
5659
}
5760
}
61+
62+
/**
63+
* This test doesn't test much and was added to display a problem with straighforward use of
64+
* [withExceptionHandler].
65+
*
66+
* If one was to remove `dispatcher` and launch `rxFlowable` with an empty coroutine context,
67+
* this test would fail fairly often, while other tests were also vulnerable, but the problem is
68+
* much more difficult to reproduce. Thus, this test is a justification for adding `dispatcher`
69+
* to other tests.
70+
*
71+
* See the commit that introduced this test for a better explanation.
72+
*/
73+
@Test
74+
fun testResettingExceptionHandler() = withExceptionHandler(handler) {
75+
runBlocking {
76+
val dispatcher = wrapperDispatcher()
77+
val flow = rxFlowable<Unit>(dispatcher) {
78+
if ((0..1).random() == 0) {
79+
Thread.sleep(100)
80+
}
81+
throw TestException()
82+
}.asFlow()
83+
combine(flow, flow) { _, _ -> Unit }
84+
.catch {}
85+
.collect { }
86+
}
87+
}
5888
}

0 commit comments

Comments
 (0)