Skip to content

Consistently handle undeliverable exceptions in RxJava and Reactor in… #1638

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Dec 5, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,11 @@ public final class kotlinx/coroutines/reactive/PublishKt {
public static final fun publish (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lorg/reactivestreams/Publisher;
public static synthetic fun publish$default (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lorg/reactivestreams/Publisher;
public static synthetic fun publish$default (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lorg/reactivestreams/Publisher;
public static final fun publishInternal (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lorg/reactivestreams/Publisher;
public static final fun publishInternal (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;Lkotlin/jvm/functions/Function2;)Lorg/reactivestreams/Publisher;
}

public final class kotlinx/coroutines/reactive/PublisherCoroutine : kotlinx/coroutines/AbstractCoroutine, kotlinx/coroutines/channels/ProducerScope, kotlinx/coroutines/selects/SelectClause2, org/reactivestreams/Subscription {
public fun <init> (Lkotlin/coroutines/CoroutineContext;Lorg/reactivestreams/Subscriber;)V
public fun <init> (Lkotlin/coroutines/CoroutineContext;Lorg/reactivestreams/Subscriber;Lkotlin/jvm/functions/Function2;)V
public fun cancel ()V
public fun close (Ljava/lang/Throwable;)Z
public fun getChannel ()Lkotlinx/coroutines/channels/SendChannel;
Expand Down
29 changes: 14 additions & 15 deletions reactive/kotlinx-coroutines-reactive/src/Publish.kt
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,15 @@ import kotlin.internal.LowPriorityInOverloadResolution

/**
* Creates cold reactive [Publisher] that runs a given [block] in a coroutine.
* Every time the returned publisher is subscribed, it starts a new coroutine.
* Coroutine emits items with `send`. Unsubscribing cancels running coroutine.
* Every time the returned flux is subscribed, it starts a new coroutine in the specified [context].
* Coroutine emits ([Subscriber.onNext]) values with `send`, completes ([Subscriber.onComplete])
* when the coroutine completes or channel is explicitly closed and emits error ([Subscriber.onError])
* if coroutine throws an exception or closes channel with a cause.
* Unsubscribing cancels running coroutine.
*
* Invocations of `send` are suspended appropriately when subscribers apply back-pressure and to ensure that
* `onNext` is not invoked concurrently.
*
* | **Coroutine action** | **Signal to subscriber**
* | -------------------------------------------- | ------------------------
* | `send` | `onNext`
* | Normal completion or `close` without cause | `onComplete`
* | Failure with exception or `close` with cause | `onError`
*
* Coroutine context can be specified with [context] argument.
* If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used.
* Method throws [IllegalArgumentException] if provided [context] contains a [Job] instance.
Expand All @@ -43,7 +40,7 @@ public fun <T> publish(
): Publisher<T> {
require(context[Job] === null) { "Publisher context cannot contain job in it." +
"Its lifecycle should be managed via subscription. Had $context" }
return publishInternal(GlobalScope, context, block)
return publishInternal(GlobalScope, context, DEFAULT_HANDLER, block)
}

@Deprecated(
Expand All @@ -55,37 +52,39 @@ public fun <T> publish(
public fun <T> CoroutineScope.publish(
context: CoroutineContext = EmptyCoroutineContext,
@BuilderInference block: suspend ProducerScope<T>.() -> Unit
): Publisher<T> = publishInternal(this, context, block)
): Publisher<T> = publishInternal(this, context, DEFAULT_HANDLER ,block)

/** @suppress For internal use from other reactive integration modules only */
@InternalCoroutinesApi
public fun <T> publishInternal(
scope: CoroutineScope, // support for legacy publish in scope
context: CoroutineContext,
exceptionOnCancelHandler: (Throwable, CoroutineContext) -> Unit,
block: suspend ProducerScope<T>.() -> Unit
): Publisher<T> = Publisher { subscriber ->
// specification requires NPE on null subscriber
if (subscriber == null) throw NullPointerException("Subscriber cannot be null")
val newContext = scope.newCoroutineContext(context)
val coroutine = PublisherCoroutine(newContext, subscriber)
val coroutine = PublisherCoroutine(newContext, subscriber, exceptionOnCancelHandler)
subscriber.onSubscribe(coroutine) // do it first (before starting coroutine), to avoid unnecessary suspensions
coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
}

private const val CLOSED = -1L // closed, but have not signalled onCompleted/onError yet
private const val SIGNALLED = -2L // already signalled subscriber onCompleted/onError
private val DEFAULT_HANDLER: (Throwable, CoroutineContext) -> Unit = { t, ctx -> if (t !is CancellationException) handleCoroutineException(ctx, t) }

@Suppress("CONFLICTING_JVM_DECLARATIONS", "RETURN_TYPE_MISMATCH_ON_INHERITANCE")
@InternalCoroutinesApi
public class PublisherCoroutine<in T>(
parentContext: CoroutineContext,
private val subscriber: Subscriber<T>
private val subscriber: Subscriber<T>,
private val exceptionOnCancelHandler: (Throwable, CoroutineContext) -> Unit
) : AbstractCoroutine<Unit>(parentContext, true), ProducerScope<T>, Subscription, SelectClause2<T, SendChannel<T>> {
override val channel: SendChannel<T> get() = this

// Mutex is locked when either nRequested == 0 or while subscriber.onXXX is being invoked
private val mutex = Mutex(locked = true)

private val _nRequested = atomic(0L) // < 0 when closed (CLOSED or SIGNALLED)

@Volatile
Expand Down Expand Up @@ -198,7 +197,7 @@ public class PublisherCoroutine<in T>(
// Specification requires that after cancellation requested we don't call onXXX
if (cancelled) {
// If the parent had failed to handle our exception, then we must not lose this exception
if (cause != null && !handled) handleCoroutineException(context, cause)
if (cause != null && !handled) exceptionOnCancelHandler(cause, context)
return
}

Expand All @@ -217,7 +216,7 @@ public class PublisherCoroutine<in T>(
*/
subscriber.onError(cause)
if (!handled && cause.isFatal()) {
handleCoroutineException(context, cause)
exceptionOnCancelHandler(cause, context)
}
} else {
subscriber.onComplete()
Expand Down
35 changes: 20 additions & 15 deletions reactive/kotlinx-coroutines-reactor/src/Flux.kt
Original file line number Diff line number Diff line change
Expand Up @@ -10,29 +10,24 @@ package kotlinx.coroutines.reactor
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.reactive.*
import org.reactivestreams.Publisher
import reactor.core.CoreSubscriber
import org.reactivestreams.*
import reactor.core.*
import reactor.core.publisher.*
import reactor.util.context.*
import kotlin.coroutines.*
import kotlin.internal.LowPriorityInOverloadResolution
import kotlin.internal.*

/**
* Creates cold reactive [Flux] that runs a given [block] in a coroutine.
* Every time the returned flux is subscribed, it starts a new coroutine in the specified [context].
* Coroutine emits items with `send`. Unsubscribing cancels running coroutine.
*
* Coroutine context can be specified with [context] argument.
* If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used.
* Coroutine emits ([Subscriber.onNext]) values with `send`, completes ([Subscriber.onComplete])
* when the coroutine completes or channel is explicitly closed and emits error ([Subscriber.onError])
* if coroutine throws an exception or closes channel with a cause.
* Unsubscribing cancels running coroutine.
*
* Invocations of `send` are suspended appropriately when subscribers apply back-pressure and to ensure that
* `onNext` is not invoked concurrently.
*
* | **Coroutine action** | **Signal to subscriber**
* | -------------------------------------------- | ------------------------
* | `send` | `onNext`
* | Normal completion or `close` without cause | `onComplete`
* | Failure with exception or `close` with cause | `onError`
*
* Method throws [IllegalArgumentException] if provided [context] contains a [Job] instance.
*
* **Note: This is an experimental api.** Behaviour of publishers that work as children in a parent scope with respect
Expand Down Expand Up @@ -71,7 +66,17 @@ private fun <T> reactorPublish(
val currentContext = subscriber.currentContext()
val reactorContext = (context[ReactorContext]?.context?.putAll(currentContext) ?: currentContext).asCoroutineContext()
val newContext = scope.newCoroutineContext(context + reactorContext)
val coroutine = PublisherCoroutine(newContext, subscriber)
val coroutine = PublisherCoroutine(newContext, subscriber, REACTOR_HANDLER)
subscriber.onSubscribe(coroutine) // do it first (before starting coroutine), to avoid unnecessary suspensions
coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
}
}

private val REACTOR_HANDLER: (Throwable, CoroutineContext) -> Unit = { e, ctx ->
if (e !is CancellationException) {
try {
Operators.onOperatorError(e, ctx[ReactorContext]?.context ?: Context.empty())
} catch (e: Throwable) {
handleCoroutineException(ctx, e)
}
}
}
31 changes: 16 additions & 15 deletions reactive/kotlinx-coroutines-reactor/src/Mono.kt
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,10 @@ import kotlin.coroutines.*
import kotlin.internal.*

/**
* Creates cold [mono][Mono] that will run a given [block] in a coroutine.
* Creates cold [mono][Mono] that will run a given [block] in a coroutine and emits its result.
* Every time the returned mono is subscribed, it starts a new coroutine.
* Coroutine returns a single, possibly null value. Unsubscribing cancels running coroutine.
*
* | **Coroutine action** | **Signal to sink**
* | ------------------------------------- | ------------------------
* | Returns a non-null value | `success(value)`
* | Returns a null | `success`
* | Failure with exception or unsubscribe | `error`
* If [block] result is `null`, [MonoSink.success] is invoked without a value.
* Unsubscribing cancels running coroutine.
*
* Coroutine context can be specified with [context] argument.
* If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used.
Expand Down Expand Up @@ -64,18 +59,24 @@ private class MonoCoroutine<in T>(
parentContext: CoroutineContext,
private val sink: MonoSink<T>
) : AbstractCoroutine<T>(parentContext, true), Disposable {
var disposed = false
@Volatile
private var disposed = false

override fun onCompleted(value: T) {
if (!disposed) {
if (value == null) sink.success() else sink.success(value)
}
if (value == null) sink.success() else sink.success(value)
}

override fun onCancelled(cause: Throwable, handled: Boolean) {
if (!disposed) {
sink.error(cause)
} else if (!handled) {
try {
/*
* sink.error handles exceptions on its own and, by default, handling of undeliverable exceptions is a no-op.
* Guard potentially non-empty handlers against meaningless cancellation exceptions
*/
if (getCancellationException() !== cause) {
sink.error(cause)
}
} catch (e: Throwable) {
// In case of improper error implementation or fatal exceptions
handleCoroutineException(context, cause)
}
}
Expand Down
12 changes: 12 additions & 0 deletions reactive/kotlinx-coroutines-reactor/test/FluxTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package kotlinx.coroutines.reactor

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.reactive.*
import org.hamcrest.core.*
import org.junit.*
Expand Down Expand Up @@ -130,4 +131,15 @@ class FluxTest : TestBase() {
fun testIllegalArgumentException() {
assertFailsWith<IllegalArgumentException> { flux<Int>(Job()) { } }
}

@Test
fun testLeakedException() = runBlocking {
// Test exception is not reported to global handler
val flow = flux<Unit> { throw TestException() }.asFlow()
repeat(2000) {
combine(flow, flow) { _, _ -> Unit }
.catch {}
.collect { }
}
}
}
43 changes: 40 additions & 3 deletions reactive/kotlinx-coroutines-reactor/test/MonoTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,17 @@
package kotlinx.coroutines.reactor

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.reactive.*
import org.hamcrest.core.*
import org.junit.*
import org.junit.Assert.*
import org.reactivestreams.*
import reactor.core.publisher.*
import reactor.util.context.*
import java.time.*
import java.time.Duration.*
import java.util.function.*

class MonoTest : TestBase() {
@Before
Expand Down Expand Up @@ -217,19 +221,21 @@ class MonoTest : TestBase() {
fun testUnhandledException() = runTest {
expect(1)
var subscription: Subscription? = null
val mono = mono(currentDispatcher() + CoroutineExceptionHandler { _, t ->
val handler = BiFunction<Throwable, Any?, Throwable> { t, _ ->
assertTrue(t is TestException)
expect(5)
t
}

}) {
val mono = mono(currentDispatcher()) {
expect(4)
subscription!!.cancel() // cancel our own subscription, so that delay will get cancelled
try {
delay(Long.MAX_VALUE)
} finally {
throw TestException() // would not be able to handle it since mono is disposed
}
}
}.subscriberContext { Context.of("reactor.onOperatorError.local", handler) }
mono.subscribe(object : Subscriber<Unit> {
override fun onSubscribe(s: Subscription) {
expect(2)
Expand All @@ -248,4 +254,35 @@ class MonoTest : TestBase() {
fun testIllegalArgumentException() {
assertFailsWith<IllegalArgumentException> { mono(Job()) { } }
}

@Test
fun testExceptionAfterCancellation() = runTest {
// Test exception is not reported to global handler
Flux
.interval(ofMillis(1))
.switchMap {
mono(coroutineContext) {
timeBomb().awaitFirst()
}
}
.onErrorReturn({
expect(1)
true
}, 42)
.blockLast()
finish(2)
}

private fun timeBomb() = Mono.delay(Duration.ofMillis(1)).doOnSuccess { throw Exception("something went wrong") }

@Test
fun testLeakedException() = runBlocking {
// Test exception is not reported to global handler
val flow = mono<Unit> { throw TestException() }.toFlux().asFlow()
repeat(10000) {
combine(flow, flow) { _, _ -> Unit }
.catch {}
.collect { }
}
}
}
11 changes: 11 additions & 0 deletions reactive/kotlinx-coroutines-rx2/src/RxCancellable.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,21 @@
package kotlinx.coroutines.rx2

import io.reactivex.functions.*
import io.reactivex.plugins.*
import kotlinx.coroutines.*
import kotlin.coroutines.*

internal class RxCancellable(private val job: Job) : Cancellable {
override fun cancel() {
job.cancel()
}
}

internal fun handleUndeliverableException(cause: Throwable, context: CoroutineContext) {
if (cause is CancellationException) return // Async CE should be completely ignored
try {
RxJavaPlugins.onError(cause)
} catch (e: Throwable) {
handleCoroutineException(context, cause)
}
}
24 changes: 8 additions & 16 deletions reactive/kotlinx-coroutines-rx2/src/RxCompletable.kt
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,9 @@ import kotlin.coroutines.*
import kotlin.internal.*

/**
* Creates cold [Completable] that runs a given [block] in a coroutine.
* Creates cold [Completable] that runs a given [block] in a coroutine and emits its result.
* Every time the returned completable is subscribed, it starts a new coroutine.
* Unsubscribing cancels running coroutine.
*
* | **Coroutine action** | **Signal to subscriber**
* | ------------------------------------- | ------------------------
* | Completes successfully | `onCompleted`
* | Failure with exception or unsubscribe | `onError`
*
* Coroutine context can be specified with [context] argument.
* If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used.
* Method throws [IllegalArgumentException] if provided [context] contains a [Job] instance.
Expand Down Expand Up @@ -62,21 +56,19 @@ private class RxCompletableCoroutine(
) : AbstractCoroutine<Unit>(parentContext, true) {
override fun onCompleted(value: Unit) {
try {
if (!subscriber.isDisposed) subscriber.onComplete()
subscriber.onComplete()
} catch (e: Throwable) {
handleCoroutineException(context, e)
handleUndeliverableException(e, context)
}
}

override fun onCancelled(cause: Throwable, handled: Boolean) {
if (!subscriber.isDisposed) {
try {
subscriber.onError(cause)
} catch (e: Throwable) {
handleCoroutineException(context, e)
try {
if (!subscriber.tryOnError(cause)) {
handleUndeliverableException(cause, context)
}
} else if (!handled) {
handleCoroutineException(context, cause)
} catch (e: Throwable) {
handleUndeliverableException(e, context)
}
}
}
Loading