From 0be54f7887ae1c80c99b4707431a68d0cc2fcae8 Mon Sep 17 00:00:00 2001 From: Dmitry Khalanskiy Date: Wed, 7 Apr 2021 17:41:52 +0300 Subject: [PATCH] Fix a flaky test in MonoTest --- .../test/MonoTest.kt | 60 ++++++++++--------- 1 file changed, 32 insertions(+), 28 deletions(-) diff --git a/reactive/kotlinx-coroutines-reactor/test/MonoTest.kt b/reactive/kotlinx-coroutines-reactor/test/MonoTest.kt index 97b195c517..a98c514f19 100644 --- a/reactive/kotlinx-coroutines-reactor/test/MonoTest.kt +++ b/reactive/kotlinx-coroutines-reactor/test/MonoTest.kt @@ -311,6 +311,7 @@ class MonoTest : TestBase() { * this is considered normal behavior and exceptions are not propagated. */ @Test fun testDownstreamCancellationDoesNotThrow() = runTest { + var i = 0 /** Attach a hook that handles exceptions from publishers that are known to be disposed of. We don't expect it * to be fired in this case, as the reason for the publisher in this test to accept an exception is simply * cancellation from the downstream. */ @@ -319,15 +320,20 @@ class MonoTest : TestBase() { t } /** A Mono that doesn't emit a value and instead waits indefinitely. */ - val mono = mono { expect(3); delay(Long.MAX_VALUE) } - .doOnSubscribe { expect(2) } + val mono = mono(Dispatchers.Unconfined) { expect(5 * i + 3); delay(Long.MAX_VALUE) } + .doOnSubscribe { expect(5 * i + 2) } .doOnNext { expectUnreached() } .doOnSuccess { expectUnreached() } .doOnError { expectUnreached() } - .doOnCancel { expect(4) } - expect(1) - mono.awaitCancelAndJoin() - finish(5) + .doOnCancel { expect(5 * i + 4) } + val n = 1000 + repeat(n) { + i = it + expect(5 * i + 1) + mono.awaitCancelAndJoin() + expect(5 * i + 5) + } + finish(5 * n + 1) Hooks.resetOnOperatorError("testDownstreamCancellationDoesNotThrow") } @@ -335,47 +341,45 @@ class MonoTest : TestBase() { * error is propagated to [Hooks.onOperatorError]. */ @Test fun testRethrowingDownstreamCancellation() = runTest { + var i = 0 /** Attach a hook that handles exceptions from publishers that are known to be disposed of. We expect it * to be fired in this case. */ Hooks.onOperatorError("testDownstreamCancellationDoesNotThrow") { t, a -> - expect(5) + expect(i * 6 + 5) t } /** A Mono that doesn't emit a value and instead waits indefinitely, and, when cancelled, throws. */ - val mono = mono { - expect(3); + val mono = mono(Dispatchers.Unconfined) { + expect(i * 6 + 3) try { delay(Long.MAX_VALUE) } catch (e: CancellationException) { throw TestException() } } - .doOnSubscribe { expect(2) } + .doOnSubscribe { expect(i * 6 + 2) } .doOnNext { expectUnreached() } .doOnSuccess { expectUnreached() } .doOnError { expectUnreached() } - .doOnCancel { expect(4) } - expect(1) - mono.awaitCancelAndJoin() - finish(6) /** if this line fails, see the comment for [awaitCancelAndJoin] */ + .doOnCancel { expect(i * 6 + 4) } + val n = 1000 + repeat(n) { + i = it + expect(i * 6 + 1) + mono.awaitCancelAndJoin() + expect(i * 6 + 6) + } + finish(n * 6 + 1) Hooks.resetOnOperatorError("testDownstreamCancellationDoesNotThrow") } - /** Run the given [Mono], cancel it, wait for the cancellation handler to finish, and *return only then*. + /** Run the given [Publisher], cancel it, wait for the cancellation handler to finish, and return only then. * - * There are no guarantees about the execution context in which the cancellation handler will run, but we have - * to wait for it to finish to check its behavior. The contraption below seems to ensure that everything works out. - * If it stops giving that guarantee, then [testRethrowingDownstreamCancellation] should fail more or less - * consistently because the hook won't have enough time to fire before a call to [finish]. - */ - private suspend fun Mono.awaitCancelAndJoin() = coroutineScope { - val job = async(start = CoroutineStart.UNDISPATCHED) { + * Will not work in the general case, but here, when the publisher uses [Dispatchers.Unconfined], this seems to + * ensure that the cancellation handler will have nowhere to execute but serially with the cancellation. */ + private suspend fun Publisher.awaitCancelAndJoin() = coroutineScope { + async(start = CoroutineStart.UNDISPATCHED) { awaitFirstOrNull() - } - newSingleThreadContext("monoCancellationCleanup").use { pool -> - launch(pool) { - job.cancelAndJoin() - } - }.join() + }.cancelAndJoin() } }