Skip to content

Fix a flaky test in MonoTest #2635

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 8, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 32 additions & 28 deletions reactive/kotlinx-coroutines-reactor/test/MonoTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,7 @@ class MonoTest : TestBase() {
* this is considered normal behavior and exceptions are not propagated. */
@Test
fun testDownstreamCancellationDoesNotThrow() = runTest {
var i = 0
/** Attach a hook that handles exceptions from publishers that are known to be disposed of. We don't expect it
* to be fired in this case, as the reason for the publisher in this test to accept an exception is simply
* cancellation from the downstream. */
Expand All @@ -319,63 +320,66 @@ class MonoTest : TestBase() {
t
}
/** A Mono that doesn't emit a value and instead waits indefinitely. */
val mono = mono { expect(3); delay(Long.MAX_VALUE) }
.doOnSubscribe { expect(2) }
val mono = mono(Dispatchers.Unconfined) { expect(5 * i + 3); delay(Long.MAX_VALUE) }
.doOnSubscribe { expect(5 * i + 2) }
.doOnNext { expectUnreached() }
.doOnSuccess { expectUnreached() }
.doOnError { expectUnreached() }
.doOnCancel { expect(4) }
expect(1)
mono.awaitCancelAndJoin()
finish(5)
.doOnCancel { expect(5 * i + 4) }
val n = 1000
repeat(n) {
i = it
expect(5 * i + 1)
mono.awaitCancelAndJoin()
expect(5 * i + 5)
}
finish(5 * n + 1)
Hooks.resetOnOperatorError("testDownstreamCancellationDoesNotThrow")
}

/** Test that, when [Mono] is cancelled by the downstream and throws during handling the cancellation, the resulting
* error is propagated to [Hooks.onOperatorError]. */
@Test
fun testRethrowingDownstreamCancellation() = runTest {
var i = 0
/** Attach a hook that handles exceptions from publishers that are known to be disposed of. We expect it
* to be fired in this case. */
Hooks.onOperatorError("testDownstreamCancellationDoesNotThrow") { t, a ->
expect(5)
expect(i * 6 + 5)
t
}
/** A Mono that doesn't emit a value and instead waits indefinitely, and, when cancelled, throws. */
val mono = mono {
expect(3);
val mono = mono(Dispatchers.Unconfined) {
expect(i * 6 + 3)
try {
delay(Long.MAX_VALUE)
} catch (e: CancellationException) {
throw TestException()
}
}
.doOnSubscribe { expect(2) }
.doOnSubscribe { expect(i * 6 + 2) }
.doOnNext { expectUnreached() }
.doOnSuccess { expectUnreached() }
.doOnError { expectUnreached() }
.doOnCancel { expect(4) }
expect(1)
mono.awaitCancelAndJoin()
finish(6) /** if this line fails, see the comment for [awaitCancelAndJoin] */
.doOnCancel { expect(i * 6 + 4) }
val n = 1000
repeat(n) {
i = it
expect(i * 6 + 1)
mono.awaitCancelAndJoin()
expect(i * 6 + 6)
}
finish(n * 6 + 1)
Hooks.resetOnOperatorError("testDownstreamCancellationDoesNotThrow")
}

/** Run the given [Mono], cancel it, wait for the cancellation handler to finish, and *return only then*.
/** Run the given [Publisher], cancel it, wait for the cancellation handler to finish, and return only then.
*
* There are no guarantees about the execution context in which the cancellation handler will run, but we have
* to wait for it to finish to check its behavior. The contraption below seems to ensure that everything works out.
* If it stops giving that guarantee, then [testRethrowingDownstreamCancellation] should fail more or less
* consistently because the hook won't have enough time to fire before a call to [finish].
*/
private suspend fun <T> Mono<T>.awaitCancelAndJoin() = coroutineScope {
val job = async(start = CoroutineStart.UNDISPATCHED) {
* Will not work in the general case, but here, when the publisher uses [Dispatchers.Unconfined], this seems to
* ensure that the cancellation handler will have nowhere to execute but serially with the cancellation. */
private suspend fun <T> Publisher<T>.awaitCancelAndJoin() = coroutineScope {
async(start = CoroutineStart.UNDISPATCHED) {
awaitFirstOrNull()
}
newSingleThreadContext("monoCancellationCleanup").use { pool ->
launch(pool) {
job.cancelAndJoin()
}
}.join()
}.cancelAndJoin()
}
}