From 5a32d764457391a455326dfd3353c5d0078fcdcf Mon Sep 17 00:00:00 2001 From: Dmitry Khalanskiy Date: Thu, 13 Feb 2020 11:17:38 +0300 Subject: [PATCH 1/2] Fix a race in some tests for JavaRX integration An extremely rare race could happen in any of the tests in `LeakedExceptionTest` in the following case: * `withExceptionHandler` runs the block passed to it; * In one of the last iterations of `repeat`, `select` in `combine` happens on both flows at the same time, that is, the block that was passed to `rx[Something]` runs in two threads simultaneously; * One of these two threads (thread A) runs anomalously slow; * The other thread successfully throws an exception; * This exception is propagated to `catch`, so `collect` is finished; * `repeat` is exited, the block passed to `withExceptionHandler` is done executing; * `withExceptionHandler` sets back the usual exception handler, which fails when an exception in JavaRX happens (see https://github.com/ReactiveX/RxJava/wiki/What's-different-in-2.0#error-handling); * Thread A wakes up and throws an exception. This time, it is passed not to `handler`, which is made specifically to deal with this, but to the default handler. As a fix, now a special coroutine context passed to `rx[Something]` ensures that the spawned executions are run in a thread pool that blocks until all the tasks are done. --- .../test/LeakedExceptionTest.kt | 85 ++++++++++++++----- 1 file changed, 66 insertions(+), 19 deletions(-) diff --git a/reactive/kotlinx-coroutines-rx2/test/LeakedExceptionTest.kt b/reactive/kotlinx-coroutines-rx2/test/LeakedExceptionTest.kt index 1430dbf381..243f80b825 100644 --- a/reactive/kotlinx-coroutines-rx2/test/LeakedExceptionTest.kt +++ b/reactive/kotlinx-coroutines-rx2/test/LeakedExceptionTest.kt @@ -6,12 +6,12 @@ package kotlinx.coroutines.rx2 import io.reactivex.* import io.reactivex.exceptions.* -import io.reactivex.plugins.* import kotlinx.coroutines.* import kotlinx.coroutines.flow.* import kotlinx.coroutines.reactive.* import org.junit.Test -import java.io.* +import java.util.concurrent.Executors +import java.util.concurrent.TimeUnit import kotlin.test.* // Check that exception is not leaked to the global exception handler @@ -22,37 +22,84 @@ class LeakedExceptionTest : TestBase() { @Test fun testSingle() = withExceptionHandler(handler) { - val flow = rxSingle { throw TestException() }.toFlowable().asFlow() - runBlocking { - repeat(10000) { - combine(flow, flow) { _, _ -> Unit } - .catch {} - .collect { } + withFixedThreadPool(4) { dispatcher -> + val flow = rxSingle(dispatcher) { throw TestException() }.toFlowable().asFlow() + runBlocking { + repeat(10000) { + combine(flow, flow) { _, _ -> Unit } + .catch {} + .collect { } + } } } } @Test fun testObservable() = withExceptionHandler(handler) { - val flow = rxObservable { throw TestException() }.toFlowable(BackpressureStrategy.BUFFER).asFlow() - runBlocking { - repeat(10000) { - combine(flow, flow) { _, _ -> Unit } - .catch {} - .collect { } + withFixedThreadPool(4) { dispatcher -> + val flow = rxObservable(dispatcher) { throw TestException() } + .toFlowable(BackpressureStrategy.BUFFER) + .asFlow() + runBlocking { + repeat(10000) { + combine(flow, flow) { _, _ -> Unit } + .catch {} + .collect { } + } } } } @Test fun testFlowable() = withExceptionHandler(handler) { - val flow = rxFlowable { throw TestException() }.asFlow() - runBlocking { - repeat(10000) { + withFixedThreadPool(4) { dispatcher -> + val flow = rxFlowable(dispatcher) { throw TestException() }.asFlow() + runBlocking { + repeat(10000) { + combine(flow, flow) { _, _ -> Unit } + .catch {} + .collect { } + } + } + } + } + + /** + * This test doesn't test much and was added to display a problem with straighforward use of + * [withExceptionHandler]. + * + * If one was to remove `dispatcher` and launch `rxFlowable` with an empty coroutine context, + * this test would fail fairly often, while other tests were also vulnerable, but the problem is + * much more difficult to reproduce. Thus, this test is a justification for adding `dispatcher` + * to other tests. + * + * See the commit that introduced this test for a better explanation. + */ + @Test + fun testResettingExceptionHandler() = withExceptionHandler(handler) { + withFixedThreadPool(4) { dispatcher -> + val flow = rxFlowable(dispatcher) { + if ((0..1).random() == 0) { + Thread.sleep(100) + } + throw TestException() + }.asFlow() + runBlocking { combine(flow, flow) { _, _ -> Unit } - .catch {} - .collect { } + .catch {} + .collect { } } } } + + /** + * Run in a thread pool, then wait for all the tasks to finish. + */ + private fun withFixedThreadPool(numberOfThreads: Int, block: (CoroutineDispatcher) -> Unit) { + val pool = Executors.newFixedThreadPool(numberOfThreads) + val dispatcher = pool.asCoroutineDispatcher() + block(dispatcher) + pool.shutdown() + while (!pool.awaitTermination(10, TimeUnit.SECONDS)); + } } From c5f29181baa93312bafa48840580299df78ef21b Mon Sep 17 00:00:00 2001 From: Dmitry Khalanskiy Date: Thu, 13 Feb 2020 15:47:44 +0300 Subject: [PATCH 2/2] Fix --- .../test/LeakedExceptionTest.kt | 24 ++++++++++--------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/reactive/kotlinx-coroutines-rx2/test/LeakedExceptionTest.kt b/reactive/kotlinx-coroutines-rx2/test/LeakedExceptionTest.kt index 243f80b825..7252ca2132 100644 --- a/reactive/kotlinx-coroutines-rx2/test/LeakedExceptionTest.kt +++ b/reactive/kotlinx-coroutines-rx2/test/LeakedExceptionTest.kt @@ -27,8 +27,8 @@ class LeakedExceptionTest : TestBase() { runBlocking { repeat(10000) { combine(flow, flow) { _, _ -> Unit } - .catch {} - .collect { } + .catch {} + .collect {} } } } @@ -38,13 +38,13 @@ class LeakedExceptionTest : TestBase() { fun testObservable() = withExceptionHandler(handler) { withFixedThreadPool(4) { dispatcher -> val flow = rxObservable(dispatcher) { throw TestException() } - .toFlowable(BackpressureStrategy.BUFFER) - .asFlow() + .toFlowable(BackpressureStrategy.BUFFER) + .asFlow() runBlocking { repeat(10000) { combine(flow, flow) { _, _ -> Unit } - .catch {} - .collect { } + .catch {} + .collect {} } } } @@ -57,8 +57,8 @@ class LeakedExceptionTest : TestBase() { runBlocking { repeat(10000) { combine(flow, flow) { _, _ -> Unit } - .catch {} - .collect { } + .catch {} + .collect {} } } } @@ -86,8 +86,8 @@ class LeakedExceptionTest : TestBase() { }.asFlow() runBlocking { combine(flow, flow) { _, _ -> Unit } - .catch {} - .collect { } + .catch {} + .collect {} } } } @@ -100,6 +100,8 @@ class LeakedExceptionTest : TestBase() { val dispatcher = pool.asCoroutineDispatcher() block(dispatcher) pool.shutdown() - while (!pool.awaitTermination(10, TimeUnit.SECONDS)); + while (!pool.awaitTermination(10, TimeUnit.SECONDS)) { + /* deliberately empty */ + } } }