Skip to content

Commit cb682f5

Browse files
dkhalanskyjbpablobaxter
authored andcommitted
Complete mono { } on cancellation (Kotlin#2606)
Fixes Kotlin#2262
1 parent 6757b76 commit cb682f5

File tree

2 files changed

+107
-13
lines changed

2 files changed

+107
-13
lines changed

reactive/kotlinx-coroutines-reactor/src/Mono.kt

+14-13
Original file line numberDiff line numberDiff line change
@@ -7,21 +7,23 @@
77
package kotlinx.coroutines.reactor
88

99
import kotlinx.coroutines.*
10+
import kotlinx.coroutines.reactive.*
11+
import org.reactivestreams.*
1012
import reactor.core.*
1113
import reactor.core.publisher.*
1214
import kotlin.coroutines.*
1315
import kotlin.internal.*
1416

1517
/**
16-
* Creates cold [mono][Mono] that will run a given [block] in a coroutine and emits its result.
18+
* Creates a cold [mono][Mono] that runs a given [block] in a coroutine and emits its result.
1719
* Every time the returned mono is subscribed, it starts a new coroutine.
18-
* If [block] result is `null`, [MonoSink.success] is invoked without a value.
19-
* Unsubscribing cancels running coroutine.
20+
* If the result of [block] is `null`, [MonoSink.success] is invoked without a value.
21+
* Unsubscribing cancels the running coroutine.
2022
*
2123
* Coroutine context can be specified with [context] argument.
2224
* If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used.
2325
*
24-
* Method throws [IllegalArgumentException] if provided [context] contains a [Job] instance.
26+
* @throws IllegalArgumentException if the provided [context] contains a [Job] instance.
2527
*/
2628
public fun <T> mono(
2729
context: CoroutineContext = EmptyCoroutineContext,
@@ -67,17 +69,16 @@ private class MonoCoroutine<in T>(
6769
}
6870

6971
override fun onCancelled(cause: Throwable, handled: Boolean) {
70-
try {
71-
/*
72-
* sink.error handles exceptions on its own and, by default, handling of undeliverable exceptions is a no-op.
73-
* Guard potentially non-empty handlers against meaningless cancellation exceptions
74-
*/
75-
if (getCancellationException() !== cause) {
72+
/** Cancellation exceptions that were caused by [dispose], that is, came from downstream, are not errors. */
73+
if (getCancellationException() !== cause || !disposed) {
74+
try {
75+
/** If [sink] turns out to already be in a terminal state, this exception will be passed through the
76+
* [Hooks.onOperatorError] hook, which is the way to signal undeliverable exceptions in Reactor. */
7677
sink.error(cause)
78+
} catch (e: Throwable) {
79+
// In case of improper error implementation or fatal exceptions
80+
handleCoroutineException(context, cause)
7781
}
78-
} catch (e: Throwable) {
79-
// In case of improper error implementation or fatal exceptions
80-
handleCoroutineException(context, cause)
8182
}
8283
}
8384

reactive/kotlinx-coroutines-reactor/test/MonoTest.kt

+93
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
package kotlinx.coroutines.reactor
66

77
import kotlinx.coroutines.*
8+
import kotlinx.coroutines.CancellationException
89
import kotlinx.coroutines.flow.*
910
import kotlinx.coroutines.reactive.*
1011
import org.junit.*
@@ -21,6 +22,7 @@ class MonoTest : TestBase() {
2122
@Before
2223
fun setup() {
2324
ignoreLostThreads("timer-", "parallel-")
25+
Hooks.onErrorDropped { expectUnreached() }
2426
}
2527

2628
@Test
@@ -285,4 +287,95 @@ class MonoTest : TestBase() {
285287
.collect { }
286288
}
287289
}
290+
291+
/** Test that cancelling a [mono] due to a timeout does throw an exception. */
292+
@Test
293+
fun testTimeout() {
294+
val mono = mono {
295+
withTimeout(1) { delay(100) }
296+
}
297+
try {
298+
mono.doOnSubscribe { expect(1) }
299+
.doOnNext { expectUnreached() }
300+
.doOnSuccess { expectUnreached() }
301+
.doOnError { expect(2) }
302+
.doOnCancel { expectUnreached() }
303+
.block()
304+
} catch (e: CancellationException) {
305+
expect(3)
306+
}
307+
finish(4)
308+
}
309+
310+
/** Test that when the reason for cancellation of a [mono] is that the downstream doesn't want its results anymore,
311+
* this is considered normal behavior and exceptions are not propagated. */
312+
@Test
313+
fun testDownstreamCancellationDoesNotThrow() = runTest {
314+
/** Attach a hook that handles exceptions from publishers that are known to be disposed of. We don't expect it
315+
* to be fired in this case, as the reason for the publisher in this test to accept an exception is simply
316+
* cancellation from the downstream. */
317+
Hooks.onOperatorError("testDownstreamCancellationDoesNotThrow") { t, a ->
318+
expectUnreached()
319+
t
320+
}
321+
/** A Mono that doesn't emit a value and instead waits indefinitely. */
322+
val mono = mono { expect(3); delay(Long.MAX_VALUE) }
323+
.doOnSubscribe { expect(2) }
324+
.doOnNext { expectUnreached() }
325+
.doOnSuccess { expectUnreached() }
326+
.doOnError { expectUnreached() }
327+
.doOnCancel { expect(4) }
328+
expect(1)
329+
mono.awaitCancelAndJoin()
330+
finish(5)
331+
Hooks.resetOnOperatorError("testDownstreamCancellationDoesNotThrow")
332+
}
333+
334+
/** Test that, when [Mono] is cancelled by the downstream and throws during handling the cancellation, the resulting
335+
* error is propagated to [Hooks.onOperatorError]. */
336+
@Test
337+
fun testRethrowingDownstreamCancellation() = runTest {
338+
/** Attach a hook that handles exceptions from publishers that are known to be disposed of. We expect it
339+
* to be fired in this case. */
340+
Hooks.onOperatorError("testDownstreamCancellationDoesNotThrow") { t, a ->
341+
expect(5)
342+
t
343+
}
344+
/** A Mono that doesn't emit a value and instead waits indefinitely, and, when cancelled, throws. */
345+
val mono = mono {
346+
expect(3);
347+
try {
348+
delay(Long.MAX_VALUE)
349+
} catch (e: CancellationException) {
350+
throw TestException()
351+
}
352+
}
353+
.doOnSubscribe { expect(2) }
354+
.doOnNext { expectUnreached() }
355+
.doOnSuccess { expectUnreached() }
356+
.doOnError { expectUnreached() }
357+
.doOnCancel { expect(4) }
358+
expect(1)
359+
mono.awaitCancelAndJoin()
360+
finish(6) /** if this line fails, see the comment for [awaitCancelAndJoin] */
361+
Hooks.resetOnOperatorError("testDownstreamCancellationDoesNotThrow")
362+
}
363+
364+
/** Run the given [Mono], cancel it, wait for the cancellation handler to finish, and *return only then*.
365+
*
366+
* There are no guarantees about the execution context in which the cancellation handler will run, but we have
367+
* to wait for it to finish to check its behavior. The contraption below seems to ensure that everything works out.
368+
* If it stops giving that guarantee, then [testRethrowingDownstreamCancellation] should fail more or less
369+
* consistently because the hook won't have enough time to fire before a call to [finish].
370+
*/
371+
private suspend fun <T> Mono<T>.awaitCancelAndJoin() = coroutineScope {
372+
val job = async(start = CoroutineStart.UNDISPATCHED) {
373+
awaitFirstOrNull()
374+
}
375+
newSingleThreadContext("monoCancellationCleanup").use { pool ->
376+
launch(pool) {
377+
job.cancelAndJoin()
378+
}
379+
}.join()
380+
}
288381
}

0 commit comments

Comments
 (0)