diff --git a/reactive/kotlinx-coroutines-rx2/test/LeakedExceptionTest.kt b/reactive/kotlinx-coroutines-rx2/test/LeakedExceptionTest.kt index 1430dbf381..7252ca2132 100644 --- a/reactive/kotlinx-coroutines-rx2/test/LeakedExceptionTest.kt +++ b/reactive/kotlinx-coroutines-rx2/test/LeakedExceptionTest.kt @@ -6,12 +6,12 @@ package kotlinx.coroutines.rx2 import io.reactivex.* import io.reactivex.exceptions.* -import io.reactivex.plugins.* import kotlinx.coroutines.* import kotlinx.coroutines.flow.* import kotlinx.coroutines.reactive.* import org.junit.Test -import java.io.* +import java.util.concurrent.Executors +import java.util.concurrent.TimeUnit import kotlin.test.* // Check that exception is not leaked to the global exception handler @@ -22,37 +22,86 @@ class LeakedExceptionTest : TestBase() { @Test fun testSingle() = withExceptionHandler(handler) { - val flow = rxSingle { throw TestException() }.toFlowable().asFlow() - runBlocking { - repeat(10000) { - combine(flow, flow) { _, _ -> Unit } - .catch {} - .collect { } + withFixedThreadPool(4) { dispatcher -> + val flow = rxSingle(dispatcher) { throw TestException() }.toFlowable().asFlow() + runBlocking { + repeat(10000) { + combine(flow, flow) { _, _ -> Unit } + .catch {} + .collect {} + } } } } @Test fun testObservable() = withExceptionHandler(handler) { - val flow = rxObservable { throw TestException() }.toFlowable(BackpressureStrategy.BUFFER).asFlow() - runBlocking { - repeat(10000) { - combine(flow, flow) { _, _ -> Unit } - .catch {} - .collect { } + withFixedThreadPool(4) { dispatcher -> + val flow = rxObservable(dispatcher) { throw TestException() } + .toFlowable(BackpressureStrategy.BUFFER) + .asFlow() + runBlocking { + repeat(10000) { + combine(flow, flow) { _, _ -> Unit } + .catch {} + .collect {} + } } } } @Test fun testFlowable() = withExceptionHandler(handler) { - val flow = rxFlowable { throw TestException() }.asFlow() - runBlocking { - repeat(10000) { + withFixedThreadPool(4) { dispatcher -> + val flow = rxFlowable(dispatcher) { throw TestException() }.asFlow() + runBlocking { + repeat(10000) { + combine(flow, flow) { _, _ -> Unit } + .catch {} + .collect {} + } + } + } + } + + /** + * This test doesn't test much and was added to display a problem with straighforward use of + * [withExceptionHandler]. + * + * If one was to remove `dispatcher` and launch `rxFlowable` with an empty coroutine context, + * this test would fail fairly often, while other tests were also vulnerable, but the problem is + * much more difficult to reproduce. Thus, this test is a justification for adding `dispatcher` + * to other tests. + * + * See the commit that introduced this test for a better explanation. + */ + @Test + fun testResettingExceptionHandler() = withExceptionHandler(handler) { + withFixedThreadPool(4) { dispatcher -> + val flow = rxFlowable(dispatcher) { + if ((0..1).random() == 0) { + Thread.sleep(100) + } + throw TestException() + }.asFlow() + runBlocking { combine(flow, flow) { _, _ -> Unit } .catch {} - .collect { } + .collect {} } } } + + /** + * Run in a thread pool, then wait for all the tasks to finish. + */ + private fun withFixedThreadPool(numberOfThreads: Int, block: (CoroutineDispatcher) -> Unit) { + val pool = Executors.newFixedThreadPool(numberOfThreads) + val dispatcher = pool.asCoroutineDispatcher() + block(dispatcher) + pool.shutdown() + while (!pool.awaitTermination(10, TimeUnit.SECONDS)) { + /* deliberately empty */ + } + } }