Skip to content

Commit a930b0c

Browse files
committed
Consistently handle undeliverable exceptions in RxJava and Reactor integrations
Use tryOnError in RxJava to make exception delivery check-and-act race free. Deliver undeliverable exceptions via RxJavaPlugins instead of handleCoroutineException. This is a deliberate choice for a multiple reasons: * When using Rx (whether with coroutines or not), undeliverable exceptions are inevitable and users should hook into RxJavaPlugins anyway. We don't want to force them using Rx-specific CoroutineExceptionHandler all over the place * Undeliverable exceptions provide additional helpful stacktrace and proper way to distinguish them from other unhandled exceptions * Be consistent with reactor where we don't have try*, thus cannot provide a completely consistent experience with CEH (at least, without wrapping all the subscribers)\ Do the similar in Reactor integration, but without try*, Reactor does not have notion of undeliverable exceoptions and handles them via Operators.* on its own. Also, get rid of ASCII tables that are not properly render in IDEA Fixes Kotlin#252 Fixes Kotlin#1614
1 parent 6c98c19 commit a930b0c

22 files changed

+414
-221
lines changed

binary-compatibility-validator/reference-public-api/kotlinx-coroutines-reactive.txt

+2-2
Original file line numberDiff line numberDiff line change
@@ -42,11 +42,11 @@ public final class kotlinx/coroutines/reactive/PublishKt {
4242
public static final fun publish (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lorg/reactivestreams/Publisher;
4343
public static synthetic fun publish$default (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lorg/reactivestreams/Publisher;
4444
public static synthetic fun publish$default (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lorg/reactivestreams/Publisher;
45-
public static final fun publishInternal (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lorg/reactivestreams/Publisher;
45+
public static final fun publishInternal (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;Lkotlin/jvm/functions/Function2;)Lorg/reactivestreams/Publisher;
4646
}
4747

4848
public final class kotlinx/coroutines/reactive/PublisherCoroutine : kotlinx/coroutines/AbstractCoroutine, kotlinx/coroutines/channels/ProducerScope, kotlinx/coroutines/selects/SelectClause2, org/reactivestreams/Subscription {
49-
public fun <init> (Lkotlin/coroutines/CoroutineContext;Lorg/reactivestreams/Subscriber;)V
49+
public fun <init> (Lkotlin/coroutines/CoroutineContext;Lorg/reactivestreams/Subscriber;Lkotlin/jvm/functions/Function2;)V
5050
public fun cancel ()V
5151
public fun close (Ljava/lang/Throwable;)Z
5252
public fun getChannel ()Lkotlinx/coroutines/channels/SendChannel;

reactive/kotlinx-coroutines-reactive/src/Publish.kt

+14-15
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,15 @@ import kotlin.internal.LowPriorityInOverloadResolution
1717

1818
/**
1919
* Creates cold reactive [Publisher] that runs a given [block] in a coroutine.
20-
* Every time the returned publisher is subscribed, it starts a new coroutine.
21-
* Coroutine emits items with `send`. Unsubscribing cancels running coroutine.
20+
* Every time the returned flux is subscribed, it starts a new coroutine in the specified [context].
21+
* Coroutine emits ([Subscriber.onNext]) values with `send`, completes ([Subscriber.onComplete])
22+
* when the coroutine completes or channel is explicitly closed and emits error ([Subscriber.onError])
23+
* if coroutine throws an exception or closes channel with a cause.
24+
* Unsubscribing cancels running coroutine.
2225
*
2326
* Invocations of `send` are suspended appropriately when subscribers apply back-pressure and to ensure that
2427
* `onNext` is not invoked concurrently.
2528
*
26-
* | **Coroutine action** | **Signal to subscriber**
27-
* | -------------------------------------------- | ------------------------
28-
* | `send` | `onNext`
29-
* | Normal completion or `close` without cause | `onComplete`
30-
* | Failure with exception or `close` with cause | `onError`
31-
*
3229
* Coroutine context can be specified with [context] argument.
3330
* If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used.
3431
* Method throws [IllegalArgumentException] if provided [context] contains a [Job] instance.
@@ -43,7 +40,7 @@ public fun <T> publish(
4340
): Publisher<T> {
4441
require(context[Job] === null) { "Publisher context cannot contain job in it." +
4542
"Its lifecycle should be managed via subscription. Had $context" }
46-
return publishInternal(GlobalScope, context, block)
43+
return publishInternal(GlobalScope, context, DEFAULT_HANDLER, block)
4744
}
4845

4946
@Deprecated(
@@ -55,37 +52,39 @@ public fun <T> publish(
5552
public fun <T> CoroutineScope.publish(
5653
context: CoroutineContext = EmptyCoroutineContext,
5754
@BuilderInference block: suspend ProducerScope<T>.() -> Unit
58-
): Publisher<T> = publishInternal(this, context, block)
55+
): Publisher<T> = publishInternal(this, context, DEFAULT_HANDLER ,block)
5956

6057
/** @suppress For internal use from other reactive integration modules only */
6158
@InternalCoroutinesApi
6259
public fun <T> publishInternal(
6360
scope: CoroutineScope, // support for legacy publish in scope
6461
context: CoroutineContext,
62+
exceptionOnCancelHandler: (Throwable, CoroutineContext) -> Unit,
6563
block: suspend ProducerScope<T>.() -> Unit
6664
): Publisher<T> = Publisher { subscriber ->
6765
// specification requires NPE on null subscriber
6866
if (subscriber == null) throw NullPointerException("Subscriber cannot be null")
6967
val newContext = scope.newCoroutineContext(context)
70-
val coroutine = PublisherCoroutine(newContext, subscriber)
68+
val coroutine = PublisherCoroutine(newContext, subscriber, exceptionOnCancelHandler)
7169
subscriber.onSubscribe(coroutine) // do it first (before starting coroutine), to avoid unnecessary suspensions
7270
coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
7371
}
7472

7573
private const val CLOSED = -1L // closed, but have not signalled onCompleted/onError yet
7674
private const val SIGNALLED = -2L // already signalled subscriber onCompleted/onError
75+
private val DEFAULT_HANDLER: (Throwable, CoroutineContext) -> Unit = { t, ctx -> if (t !is CancellationException) handleCoroutineException(ctx, t) }
7776

7877
@Suppress("CONFLICTING_JVM_DECLARATIONS", "RETURN_TYPE_MISMATCH_ON_INHERITANCE")
7978
@InternalCoroutinesApi
8079
public class PublisherCoroutine<in T>(
8180
parentContext: CoroutineContext,
82-
private val subscriber: Subscriber<T>
81+
private val subscriber: Subscriber<T>,
82+
private val exceptionOnCancelHandler: (Throwable, CoroutineContext) -> Unit
8383
) : AbstractCoroutine<Unit>(parentContext, true), ProducerScope<T>, Subscription, SelectClause2<T, SendChannel<T>> {
8484
override val channel: SendChannel<T> get() = this
8585

8686
// Mutex is locked when either nRequested == 0 or while subscriber.onXXX is being invoked
8787
private val mutex = Mutex(locked = true)
88-
8988
private val _nRequested = atomic(0L) // < 0 when closed (CLOSED or SIGNALLED)
9089

9190
@Volatile
@@ -198,7 +197,7 @@ public class PublisherCoroutine<in T>(
198197
// Specification requires that after cancellation requested we don't call onXXX
199198
if (cancelled) {
200199
// If the parent had failed to handle our exception, then we must not lose this exception
201-
if (cause != null && !handled) handleCoroutineException(context, cause)
200+
if (cause != null && !handled) exceptionOnCancelHandler(cause, context)
202201
return
203202
}
204203

@@ -217,7 +216,7 @@ public class PublisherCoroutine<in T>(
217216
*/
218217
subscriber.onError(cause)
219218
if (!handled && cause.isFatal()) {
220-
handleCoroutineException(context, cause)
219+
exceptionOnCancelHandler(cause, context)
221220
}
222221
} else {
223222
subscriber.onComplete()

reactive/kotlinx-coroutines-reactor/src/Flux.kt

+20-15
Original file line numberDiff line numberDiff line change
@@ -10,29 +10,24 @@ package kotlinx.coroutines.reactor
1010
import kotlinx.coroutines.*
1111
import kotlinx.coroutines.channels.*
1212
import kotlinx.coroutines.reactive.*
13-
import org.reactivestreams.Publisher
14-
import reactor.core.CoreSubscriber
13+
import org.reactivestreams.*
14+
import reactor.core.*
1515
import reactor.core.publisher.*
16+
import reactor.util.context.*
1617
import kotlin.coroutines.*
17-
import kotlin.internal.LowPriorityInOverloadResolution
18+
import kotlin.internal.*
1819

1920
/**
2021
* Creates cold reactive [Flux] that runs a given [block] in a coroutine.
2122
* Every time the returned flux is subscribed, it starts a new coroutine in the specified [context].
22-
* Coroutine emits items with `send`. Unsubscribing cancels running coroutine.
23-
*
24-
* Coroutine context can be specified with [context] argument.
25-
* If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used.
23+
* Coroutine emits ([Subscriber.onNext]) values with `send`, completes ([Subscriber.onComplete])
24+
* when the coroutine completes or channel is explicitly closed and emits error ([Subscriber.onError])
25+
* if coroutine throws an exception or closes channel with a cause.
26+
* Unsubscribing cancels running coroutine.
2627
*
2728
* Invocations of `send` are suspended appropriately when subscribers apply back-pressure and to ensure that
2829
* `onNext` is not invoked concurrently.
2930
*
30-
* | **Coroutine action** | **Signal to subscriber**
31-
* | -------------------------------------------- | ------------------------
32-
* | `send` | `onNext`
33-
* | Normal completion or `close` without cause | `onComplete`
34-
* | Failure with exception or `close` with cause | `onError`
35-
*
3631
* Method throws [IllegalArgumentException] if provided [context] contains a [Job] instance.
3732
*
3833
* **Note: This is an experimental api.** Behaviour of publishers that work as children in a parent scope with respect
@@ -71,7 +66,17 @@ private fun <T> reactorPublish(
7166
val currentContext = subscriber.currentContext()
7267
val reactorContext = (context[ReactorContext]?.context?.putAll(currentContext) ?: currentContext).asCoroutineContext()
7368
val newContext = scope.newCoroutineContext(context + reactorContext)
74-
val coroutine = PublisherCoroutine(newContext, subscriber)
69+
val coroutine = PublisherCoroutine(newContext, subscriber, REACTOR_HANDLER)
7570
subscriber.onSubscribe(coroutine) // do it first (before starting coroutine), to avoid unnecessary suspensions
7671
coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
77-
}
72+
}
73+
74+
private val REACTOR_HANDLER: (Throwable, CoroutineContext) -> Unit = { e, ctx ->
75+
if (e !is CancellationException) {
76+
try {
77+
Operators.onOperatorError(e, ctx[ReactorContext]?.context ?: Context.empty())
78+
} catch (e: Throwable) {
79+
handleCoroutineException(ctx, e)
80+
}
81+
}
82+
}

reactive/kotlinx-coroutines-reactor/src/Mono.kt

+16-15
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,10 @@ import kotlin.coroutines.*
1313
import kotlin.internal.*
1414

1515
/**
16-
* Creates cold [mono][Mono] that will run a given [block] in a coroutine.
16+
* Creates cold [mono][Mono] that will run a given [block] in a coroutine and emits its result.
1717
* Every time the returned mono is subscribed, it starts a new coroutine.
18-
* Coroutine returns a single, possibly null value. Unsubscribing cancels running coroutine.
19-
*
20-
* | **Coroutine action** | **Signal to sink**
21-
* | ------------------------------------- | ------------------------
22-
* | Returns a non-null value | `success(value)`
23-
* | Returns a null | `success`
24-
* | Failure with exception or unsubscribe | `error`
18+
* If [block] result is `null`, [MonoSink.success] is invoked without a value.
19+
* Unsubscribing cancels running coroutine.
2520
*
2621
* Coroutine context can be specified with [context] argument.
2722
* If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used.
@@ -64,18 +59,24 @@ private class MonoCoroutine<in T>(
6459
parentContext: CoroutineContext,
6560
private val sink: MonoSink<T>
6661
) : AbstractCoroutine<T>(parentContext, true), Disposable {
67-
var disposed = false
62+
@Volatile
63+
private var disposed = false
6864

6965
override fun onCompleted(value: T) {
70-
if (!disposed) {
71-
if (value == null) sink.success() else sink.success(value)
72-
}
66+
if (value == null) sink.success() else sink.success(value)
7367
}
7468

7569
override fun onCancelled(cause: Throwable, handled: Boolean) {
76-
if (!disposed) {
77-
sink.error(cause)
78-
} else if (!handled) {
70+
try {
71+
/*
72+
* sink.error handles exceptions on its own and, by default, handling of undeliverable exceptions is a no-op.
73+
* Guard potentially non-empty handlers against meaningless cancellation exceptions
74+
*/
75+
if (getCancellationException() !== cause) {
76+
sink.error(cause)
77+
}
78+
} catch (e: Throwable) {
79+
// In case of improper error implementation or fatal exceptions
7980
handleCoroutineException(context, cause)
8081
}
8182
}

reactive/kotlinx-coroutines-reactor/test/FluxTest.kt

+12
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
package kotlinx.coroutines.reactor
66

77
import kotlinx.coroutines.*
8+
import kotlinx.coroutines.flow.*
89
import kotlinx.coroutines.reactive.*
910
import org.hamcrest.core.*
1011
import org.junit.*
@@ -130,4 +131,15 @@ class FluxTest : TestBase() {
130131
fun testIllegalArgumentException() {
131132
assertFailsWith<IllegalArgumentException> { flux<Int>(Job()) { } }
132133
}
134+
135+
@Test
136+
fun testLeakedException() = runBlocking {
137+
// Test exception is not reported to global handler
138+
val flow = flux<Unit> { throw TestException() }.asFlow()
139+
repeat(2000) {
140+
combine(flow, flow) { _, _ -> Unit }
141+
.catch {}
142+
.collect { }
143+
}
144+
}
133145
}

reactive/kotlinx-coroutines-reactor/test/MonoTest.kt

+40-3
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,17 @@
55
package kotlinx.coroutines.reactor
66

77
import kotlinx.coroutines.*
8+
import kotlinx.coroutines.flow.*
89
import kotlinx.coroutines.reactive.*
910
import org.hamcrest.core.*
1011
import org.junit.*
1112
import org.junit.Assert.*
1213
import org.reactivestreams.*
1314
import reactor.core.publisher.*
15+
import reactor.util.context.*
16+
import java.time.*
1417
import java.time.Duration.*
18+
import java.util.function.*
1519

1620
class MonoTest : TestBase() {
1721
@Before
@@ -217,19 +221,21 @@ class MonoTest : TestBase() {
217221
fun testUnhandledException() = runTest {
218222
expect(1)
219223
var subscription: Subscription? = null
220-
val mono = mono(currentDispatcher() + CoroutineExceptionHandler { _, t ->
224+
val handler = BiFunction<Throwable, Any?, Throwable> { t, _ ->
221225
assertTrue(t is TestException)
222226
expect(5)
227+
t
228+
}
223229

224-
}) {
230+
val mono = mono(currentDispatcher()) {
225231
expect(4)
226232
subscription!!.cancel() // cancel our own subscription, so that delay will get cancelled
227233
try {
228234
delay(Long.MAX_VALUE)
229235
} finally {
230236
throw TestException() // would not be able to handle it since mono is disposed
231237
}
232-
}
238+
}.subscriberContext { Context.of("reactor.onOperatorError.local", handler) }
233239
mono.subscribe(object : Subscriber<Unit> {
234240
override fun onSubscribe(s: Subscription) {
235241
expect(2)
@@ -248,4 +254,35 @@ class MonoTest : TestBase() {
248254
fun testIllegalArgumentException() {
249255
assertFailsWith<IllegalArgumentException> { mono(Job()) { } }
250256
}
257+
258+
@Test
259+
fun testExceptionAfterCancellation() = runTest {
260+
// Test exception is not reported to global handler
261+
Flux
262+
.interval(ofMillis(1))
263+
.switchMap {
264+
mono(coroutineContext) {
265+
timeBomb().awaitFirst()
266+
}
267+
}
268+
.onErrorReturn({
269+
expect(1)
270+
true
271+
}, 42)
272+
.blockLast()
273+
finish(2)
274+
}
275+
276+
private fun timeBomb() = Mono.delay(Duration.ofMillis(1)).doOnSuccess { throw Exception("something went wrong") }
277+
278+
@Test
279+
fun testLeakedException() = runBlocking {
280+
// Test exception is not reported to global handler
281+
val flow = mono<Unit> { throw TestException() }.toFlux().asFlow()
282+
repeat(10000) {
283+
combine(flow, flow) { _, _ -> Unit }
284+
.catch {}
285+
.collect { }
286+
}
287+
}
251288
}

reactive/kotlinx-coroutines-rx2/src/RxCancellable.kt

+11
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,21 @@
55
package kotlinx.coroutines.rx2
66

77
import io.reactivex.functions.*
8+
import io.reactivex.plugins.*
89
import kotlinx.coroutines.*
10+
import kotlin.coroutines.*
911

1012
internal class RxCancellable(private val job: Job) : Cancellable {
1113
override fun cancel() {
1214
job.cancel()
1315
}
16+
}
17+
18+
internal fun handleUndeliverableException(cause: Throwable, context: CoroutineContext) {
19+
if (cause is CancellationException) return // Async CE should be completely ignored
20+
try {
21+
RxJavaPlugins.onError(cause)
22+
} catch (e: Throwable) {
23+
handleCoroutineException(context, cause)
24+
}
1425
}

reactive/kotlinx-coroutines-rx2/src/RxCompletable.kt

+8-16
Original file line numberDiff line numberDiff line change
@@ -12,15 +12,9 @@ import kotlin.coroutines.*
1212
import kotlin.internal.*
1313

1414
/**
15-
* Creates cold [Completable] that runs a given [block] in a coroutine.
15+
* Creates cold [Completable] that runs a given [block] in a coroutine and emits its result.
1616
* Every time the returned completable is subscribed, it starts a new coroutine.
1717
* Unsubscribing cancels running coroutine.
18-
*
19-
* | **Coroutine action** | **Signal to subscriber**
20-
* | ------------------------------------- | ------------------------
21-
* | Completes successfully | `onCompleted`
22-
* | Failure with exception or unsubscribe | `onError`
23-
*
2418
* Coroutine context can be specified with [context] argument.
2519
* If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used.
2620
* Method throws [IllegalArgumentException] if provided [context] contains a [Job] instance.
@@ -62,21 +56,19 @@ private class RxCompletableCoroutine(
6256
) : AbstractCoroutine<Unit>(parentContext, true) {
6357
override fun onCompleted(value: Unit) {
6458
try {
65-
if (!subscriber.isDisposed) subscriber.onComplete()
59+
subscriber.onComplete()
6660
} catch (e: Throwable) {
67-
handleCoroutineException(context, e)
61+
handleUndeliverableException(e, context)
6862
}
6963
}
7064

7165
override fun onCancelled(cause: Throwable, handled: Boolean) {
72-
if (!subscriber.isDisposed) {
73-
try {
74-
subscriber.onError(cause)
75-
} catch (e: Throwable) {
76-
handleCoroutineException(context, e)
66+
try {
67+
if (!subscriber.tryOnError(cause)) {
68+
handleUndeliverableException(cause, context)
7769
}
78-
} else if (!handled) {
79-
handleCoroutineException(context, cause)
70+
} catch (e: Throwable) {
71+
handleUndeliverableException(e, context)
8072
}
8173
}
8274
}

0 commit comments

Comments
 (0)