Skip to content

Commit c08b38e

Browse files
committed
Complete Mono on cancellation
1 parent 81e51a3 commit c08b38e

File tree

2 files changed

+22
-6
lines changed

2 files changed

+22
-6
lines changed

reactive/kotlinx-coroutines-reactor/src/Mono.kt

+8-6
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ import kotlin.internal.*
1515
/**
1616
* Creates cold [mono][Mono] that will run a given [block] in a coroutine and emits its result.
1717
* Every time the returned mono is subscribed, it starts a new coroutine.
18-
* If [block] result is `null`, [MonoSink.success] is invoked without a value.
18+
* If [block] completes with `null` or its execution was cancelled, [MonoSink.success] is invoked without a value.
1919
* Unsubscribing cancels running coroutine.
2020
*
2121
* Coroutine context can be specified with [context] argument.
@@ -68,11 +68,13 @@ private class MonoCoroutine<in T>(
6868

6969
override fun onCancelled(cause: Throwable, handled: Boolean) {
7070
try {
71-
/*
72-
* sink.error handles exceptions on its own and, by default, handling of undeliverable exceptions is a no-op.
73-
* Guard potentially non-empty handlers against meaningless cancellation exceptions
74-
*/
75-
if (getCancellationException() !== cause) {
71+
if (getCancellationException() === cause) {
72+
/** Cancellation exceptions are meaningless to the user, so we present them as absences of a value. If
73+
* [sink] is already in a terminal state, this call will be ignored altogether, which is good. */
74+
sink.success()
75+
} else {
76+
/** If [sink] turns out to already be in a terminal state, this exception will be passed through the
77+
* [Hooks.onErrorDropped] hook, which is the way to signal undeliverable exceptions in Reactor. */
7678
sink.error(cause)
7779
}
7880
} catch (e: Throwable) {

reactive/kotlinx-coroutines-reactor/test/MonoTest.kt

+14
Original file line numberDiff line numberDiff line change
@@ -285,4 +285,18 @@ class MonoTest : TestBase() {
285285
.collect { }
286286
}
287287
}
288+
289+
@Test
290+
fun testTimeout() {
291+
val mono = mono {
292+
withTimeout(1) { delay(100) }
293+
}
294+
mono.doOnSubscribe { expect(1) }
295+
.doOnNext { expectUnreached() }
296+
.doOnSuccess { expect(2) }
297+
.doOnError { expectUnreached() }
298+
.doOnCancel { expectUnreached() }
299+
.block()
300+
finish(3)
301+
}
288302
}

0 commit comments

Comments
 (0)