Skip to content

Commit 9f4fd70

Browse files
authored
Fix a race in some tests for JavaRX integration (#1801)
An extremely rare race could happen in any of the tests in `LeakedExceptionTest` in the following case: * `withExceptionHandler` runs the block passed to it; * In one of the last iterations of `repeat`, `select` in `combine` happens on both flows at the same time, that is, the block that was passed to `rx[Something]` runs in two threads simultaneously; * One of these two threads (thread A) runs anomalously slow; * The other thread successfully throws an exception; * This exception is propagated to `catch`, so `collect` is finished; * `repeat` is exited, the block passed to `withExceptionHandler` is done executing; * `withExceptionHandler` sets back the usual exception handler, which fails when an exception in JavaRX happens (see https://github.com/ReactiveX/RxJava/wiki/What's-different-in-2.0#error-handling); * Thread A wakes up and throws an exception. This time, it is passed not to `handler`, which is made specifically to deal with this, but to the default handler. As a fix, now a special coroutine context passed to `rx[Something]` ensures that the spawned executions are run in a thread pool that blocks until all the tasks are done.
1 parent de491d2 commit 9f4fd70

File tree

1 file changed

+67
-18
lines changed

1 file changed

+67
-18
lines changed

reactive/kotlinx-coroutines-rx2/test/LeakedExceptionTest.kt

+67-18
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,12 @@ package kotlinx.coroutines.rx2
66

77
import io.reactivex.*
88
import io.reactivex.exceptions.*
9-
import io.reactivex.plugins.*
109
import kotlinx.coroutines.*
1110
import kotlinx.coroutines.flow.*
1211
import kotlinx.coroutines.reactive.*
1312
import org.junit.Test
14-
import java.io.*
13+
import java.util.concurrent.Executors
14+
import java.util.concurrent.TimeUnit
1515
import kotlin.test.*
1616

1717
// Check that exception is not leaked to the global exception handler
@@ -22,37 +22,86 @@ class LeakedExceptionTest : TestBase() {
2222

2323
@Test
2424
fun testSingle() = withExceptionHandler(handler) {
25-
val flow = rxSingle<Unit> { throw TestException() }.toFlowable().asFlow()
26-
runBlocking {
27-
repeat(10000) {
28-
combine(flow, flow) { _, _ -> Unit }
29-
.catch {}
30-
.collect { }
25+
withFixedThreadPool(4) { dispatcher ->
26+
val flow = rxSingle<Unit>(dispatcher) { throw TestException() }.toFlowable().asFlow()
27+
runBlocking {
28+
repeat(10000) {
29+
combine(flow, flow) { _, _ -> Unit }
30+
.catch {}
31+
.collect {}
32+
}
3133
}
3234
}
3335
}
3436

3537
@Test
3638
fun testObservable() = withExceptionHandler(handler) {
37-
val flow = rxObservable<Unit> { throw TestException() }.toFlowable(BackpressureStrategy.BUFFER).asFlow()
38-
runBlocking {
39-
repeat(10000) {
40-
combine(flow, flow) { _, _ -> Unit }
41-
.catch {}
42-
.collect { }
39+
withFixedThreadPool(4) { dispatcher ->
40+
val flow = rxObservable<Unit>(dispatcher) { throw TestException() }
41+
.toFlowable(BackpressureStrategy.BUFFER)
42+
.asFlow()
43+
runBlocking {
44+
repeat(10000) {
45+
combine(flow, flow) { _, _ -> Unit }
46+
.catch {}
47+
.collect {}
48+
}
4349
}
4450
}
4551
}
4652

4753
@Test
4854
fun testFlowable() = withExceptionHandler(handler) {
49-
val flow = rxFlowable<Unit> { throw TestException() }.asFlow()
50-
runBlocking {
51-
repeat(10000) {
55+
withFixedThreadPool(4) { dispatcher ->
56+
val flow = rxFlowable<Unit>(dispatcher) { throw TestException() }.asFlow()
57+
runBlocking {
58+
repeat(10000) {
59+
combine(flow, flow) { _, _ -> Unit }
60+
.catch {}
61+
.collect {}
62+
}
63+
}
64+
}
65+
}
66+
67+
/**
68+
* This test doesn't test much and was added to display a problem with straighforward use of
69+
* [withExceptionHandler].
70+
*
71+
* If one was to remove `dispatcher` and launch `rxFlowable` with an empty coroutine context,
72+
* this test would fail fairly often, while other tests were also vulnerable, but the problem is
73+
* much more difficult to reproduce. Thus, this test is a justification for adding `dispatcher`
74+
* to other tests.
75+
*
76+
* See the commit that introduced this test for a better explanation.
77+
*/
78+
@Test
79+
fun testResettingExceptionHandler() = withExceptionHandler(handler) {
80+
withFixedThreadPool(4) { dispatcher ->
81+
val flow = rxFlowable<Unit>(dispatcher) {
82+
if ((0..1).random() == 0) {
83+
Thread.sleep(100)
84+
}
85+
throw TestException()
86+
}.asFlow()
87+
runBlocking {
5288
combine(flow, flow) { _, _ -> Unit }
5389
.catch {}
54-
.collect { }
90+
.collect {}
5591
}
5692
}
5793
}
94+
95+
/**
96+
* Run in a thread pool, then wait for all the tasks to finish.
97+
*/
98+
private fun withFixedThreadPool(numberOfThreads: Int, block: (CoroutineDispatcher) -> Unit) {
99+
val pool = Executors.newFixedThreadPool(numberOfThreads)
100+
val dispatcher = pool.asCoroutineDispatcher()
101+
block(dispatcher)
102+
pool.shutdown()
103+
while (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
104+
/* deliberately empty */
105+
}
106+
}
58107
}

0 commit comments

Comments
 (0)