Skip to content

Fix a race in some tests for JavaRX integration #1801

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Feb 13, 2020
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 66 additions & 19 deletions reactive/kotlinx-coroutines-rx2/test/LeakedExceptionTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ package kotlinx.coroutines.rx2

import io.reactivex.*
import io.reactivex.exceptions.*
import io.reactivex.plugins.*
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.reactive.*
import org.junit.Test
import java.io.*
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
import kotlin.test.*

// Check that exception is not leaked to the global exception handler
Expand All @@ -22,37 +22,84 @@ class LeakedExceptionTest : TestBase() {

@Test
fun testSingle() = withExceptionHandler(handler) {
val flow = rxSingle<Unit> { throw TestException() }.toFlowable().asFlow()
runBlocking {
repeat(10000) {
combine(flow, flow) { _, _ -> Unit }
.catch {}
.collect { }
withFixedThreadPool(4) { dispatcher ->
val flow = rxSingle<Unit>(dispatcher) { throw TestException() }.toFlowable().asFlow()
runBlocking {
repeat(10000) {
combine(flow, flow) { _, _ -> Unit }
.catch {}
.collect { }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Style nit: Continuation lines should be formatted with 4 spaces of indentation (not 8).
Also, it as we are touching this code anyway, it would be create to fix { } -> {} (remove space)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

}
}
}
}

@Test
fun testObservable() = withExceptionHandler(handler) {
val flow = rxObservable<Unit> { throw TestException() }.toFlowable(BackpressureStrategy.BUFFER).asFlow()
runBlocking {
repeat(10000) {
combine(flow, flow) { _, _ -> Unit }
.catch {}
.collect { }
withFixedThreadPool(4) { dispatcher ->
val flow = rxObservable<Unit>(dispatcher) { throw TestException() }
.toFlowable(BackpressureStrategy.BUFFER)
.asFlow()
runBlocking {
repeat(10000) {
combine(flow, flow) { _, _ -> Unit }
.catch {}
.collect { }
}
}
}
}

@Test
fun testFlowable() = withExceptionHandler(handler) {
val flow = rxFlowable<Unit> { throw TestException() }.asFlow()
runBlocking {
repeat(10000) {
withFixedThreadPool(4) { dispatcher ->
val flow = rxFlowable<Unit>(dispatcher) { throw TestException() }.asFlow()
runBlocking {
repeat(10000) {
combine(flow, flow) { _, _ -> Unit }
.catch {}
.collect { }
}
}
}
}

/**
* This test doesn't test much and was added to display a problem with straighforward use of
* [withExceptionHandler].
*
* If one was to remove `dispatcher` and launch `rxFlowable` with an empty coroutine context,
* this test would fail fairly often, while other tests were also vulnerable, but the problem is
* much more difficult to reproduce. Thus, this test is a justification for adding `dispatcher`
* to other tests.
*
* See the commit that introduced this test for a better explanation.
*/
@Test
fun testResettingExceptionHandler() = withExceptionHandler(handler) {
withFixedThreadPool(4) { dispatcher ->
val flow = rxFlowable<Unit>(dispatcher) {
if ((0..1).random() == 0) {
Thread.sleep(100)
}
throw TestException()
}.asFlow()
runBlocking {
combine(flow, flow) { _, _ -> Unit }
.catch {}
.collect { }
.catch {}
.collect { }
}
}
}

/**
* Run in a thread pool, then wait for all the tasks to finish.
*/
private fun withFixedThreadPool(numberOfThreads: Int, block: (CoroutineDispatcher) -> Unit) {
val pool = Executors.newFixedThreadPool(numberOfThreads)
val dispatcher = pool.asCoroutineDispatcher()
block(dispatcher)
pool.shutdown()
while (!pool.awaitTermination(10, TimeUnit.SECONDS));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when you write while with empty body write { /* deliberately empty */ } (with comment!) as its body.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

}
}