Skip to content

Commit 1b6b030

Browse files
committed
Throw non-downstream-induced cancellation exceptions from mono { }
1 parent c08b38e commit 1b6b030

File tree

2 files changed

+67
-10
lines changed

2 files changed

+67
-10
lines changed

reactive/kotlinx-coroutines-reactor/src/Mono.kt

+19-5
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
package kotlinx.coroutines.reactor
88

99
import kotlinx.coroutines.*
10+
import org.reactivestreams.*
1011
import reactor.core.*
1112
import reactor.core.publisher.*
1213
import kotlin.coroutines.*
@@ -32,6 +33,22 @@ public fun <T> mono(
3233
return monoInternal(GlobalScope, context, block)
3334
}
3435

36+
@Suppress("UNCHECKED_CAST")
37+
internal suspend fun <T> Mono<T>.await(): T? = (this as Mono<T?>).awaitOrDefault(null)
38+
39+
internal suspend fun <T> Mono<T>.awaitOrDefault(default: T): T = suspendCancellableCoroutine { cont ->
40+
subscribe(object: CoreSubscriber<T> {
41+
override fun onSubscribe(s: Subscription) {
42+
cont.invokeOnCancellation { s.cancel() }
43+
s.request(1)
44+
}
45+
override fun onNext(t: T) { cont.resume(t) }
46+
override fun onError(t: Throwable) { cont.resumeWithException(t) }
47+
override fun onComplete() { cont.resume(default) }
48+
49+
})
50+
}
51+
3552
@Deprecated(
3653
message = "CoroutineScope.mono is deprecated in favour of top-level mono",
3754
level = DeprecationLevel.ERROR,
@@ -68,11 +85,8 @@ private class MonoCoroutine<in T>(
6885

6986
override fun onCancelled(cause: Throwable, handled: Boolean) {
7087
try {
71-
if (getCancellationException() === cause) {
72-
/** Cancellation exceptions are meaningless to the user, so we present them as absences of a value. If
73-
* [sink] is already in a terminal state, this call will be ignored altogether, which is good. */
74-
sink.success()
75-
} else {
88+
/** Cancellation exceptions that were caused by [dispose], that is, came from downstream, are not errors. */
89+
if (getCancellationException() !== cause || !disposed) {
7690
/** If [sink] turns out to already be in a terminal state, this exception will be passed through the
7791
* [Hooks.onErrorDropped] hook, which is the way to signal undeliverable exceptions in Reactor. */
7892
sink.error(cause)

reactive/kotlinx-coroutines-reactor/test/MonoTest.kt

+48-5
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
package kotlinx.coroutines.reactor
66

77
import kotlinx.coroutines.*
8+
import kotlinx.coroutines.CancellationException
89
import kotlinx.coroutines.flow.*
910
import kotlinx.coroutines.reactive.*
1011
import org.junit.*
@@ -14,13 +15,15 @@ import reactor.core.publisher.*
1415
import reactor.util.context.*
1516
import java.time.*
1617
import java.time.Duration.*
18+
import java.util.concurrent.*
1719
import java.util.function.*
1820
import kotlin.test.*
1921

2022
class MonoTest : TestBase() {
2123
@Before
2224
fun setup() {
2325
ignoreLostThreads("timer-", "parallel-")
26+
Hooks.onErrorDropped { expectUnreached() }
2427
}
2528

2629
@Test
@@ -286,17 +289,57 @@ class MonoTest : TestBase() {
286289
}
287290
}
288291

292+
/** Test that cancelling a [mono] due to a timeout does throw an exception. */
289293
@Test
290294
fun testTimeout() {
291295
val mono = mono {
292296
withTimeout(1) { delay(100) }
293297
}
294-
mono.doOnSubscribe { expect(1) }
298+
try {
299+
mono.doOnSubscribe { expect(1) }
300+
.doOnNext { expectUnreached() }
301+
.doOnSuccess { expectUnreached() }
302+
.doOnError { expect(2) }
303+
.doOnCancel { expectUnreached() }
304+
.block()
305+
} catch (e: CancellationException) {
306+
expect(3)
307+
}
308+
finish(4)
309+
}
310+
311+
/** Test that when the reason for cancellation of a [mono] is that the downstream doesn't want its results anymore,
312+
* this is considered normal behavior and exceptions are not propagated. */
313+
@Test
314+
fun testDownstreamCancellationDoesNotThrow() = runTest {
315+
/** Attach a hook that handles exceptions from publishers that are known to be disposed of. We don't expect it
316+
* to be fired in this case, as the reason for the publisher in this test to accept an exception is simply
317+
* cancellation from the downstream. */
318+
Hooks.onOperatorError("testDownstreamCancellationDoesNotThrow") { t, a ->
319+
expectUnreached()
320+
t
321+
}
322+
/** A Mono that doesn't emit a value and instead waits indefinitely. */
323+
val mono = mono { expect(3); delay(Long.MAX_VALUE) }
324+
.doOnSubscribe { expect(2) }
295325
.doOnNext { expectUnreached() }
296-
.doOnSuccess { expect(2) }
326+
.doOnSuccess { expectUnreached() }
297327
.doOnError { expectUnreached() }
298-
.doOnCancel { expectUnreached() }
299-
.block()
300-
finish(3)
328+
.doOnCancel { expect(4) }
329+
/** There are no guarantees about the execution context in which the cancellation handler will run, but we have
330+
* to somehow make sure that [Hooks.resetOnOperatorError] occurs after that, as otherwise, the test will pass
331+
* successfully even for an incorrect implementation. This contraption seems to ensure that the cancellation
332+
* handler does complete before [finish] is called. */
333+
newSingleThreadContext("testDownstreamCancellationDoesNotThrow").use { pool ->
334+
val job = launch(pool, start = CoroutineStart.UNDISPATCHED) {
335+
expect(1)
336+
mono.await()
337+
}
338+
launch(pool) {
339+
job.cancelAndJoin()
340+
}
341+
}.join()
342+
finish(5)
343+
Hooks.resetOnOperatorError("testDownstreamCancellationDoesNotThrow")
301344
}
302345
}

0 commit comments

Comments
 (0)