From 556c92f39be037aac9a0b484f43567ff66a6e8e9 Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Thu, 5 Dec 2019 19:33:44 +0300 Subject: [PATCH] Consistently handle undeliverable exceptions in RxJava and Reactor integrations Use tryOnError in RxJava to make exception delivery check-and-act race free. Deliver undeliverable exceptions via RxJavaPlugins instead of handleCoroutineException. This is a deliberate choice for a multiple reasons: * When using Rx (whether with coroutines or not), undeliverable exceptions are inevitable and users should hook into RxJavaPlugins anyway. We don't want to force them using Rx-specific CoroutineExceptionHandler all over the place * Undeliverable exceptions provide additional helpful stacktrace and proper way to distinguish them from other unhandled exceptions * Be consistent with reactor where we don't have try*, thus cannot provide a completely consistent experience with CEH (at least, without wrapping all the subscribers)\ Do the similar in Reactor integration, but without try*, Reactor does not have notion of undeliverable exceoptions and handles them via Operators.* on its own. Also, get rid of ASCII tables that are not properly render in IDEA Fixes #252 Fixes #1614 --- .../kotlinx-coroutines-reactive.txt | 4 +- .../src/Publish.kt | 29 +++++---- .../kotlinx-coroutines-reactor/src/Flux.kt | 35 ++++++----- .../kotlinx-coroutines-reactor/src/Mono.kt | 31 ++++----- .../test/FluxTest.kt | 12 ++++ .../test/MonoTest.kt | 43 ++++++++++++- .../src/RxCancellable.kt | 11 ++++ .../src/RxCompletable.kt | 24 +++---- .../kotlinx-coroutines-rx2/src/RxConvert.kt | 10 ++- .../kotlinx-coroutines-rx2/src/RxFlowable.kt | 18 +++--- .../kotlinx-coroutines-rx2/src/RxMaybe.kt | 34 ++++------ .../src/RxObservable.kt | 22 +++---- .../kotlinx-coroutines-rx2/src/RxScheduler.kt | 3 +- .../kotlinx-coroutines-rx2/src/RxSingle.kt | 26 +++----- reactive/kotlinx-coroutines-rx2/test/Check.kt | 11 ++++ .../test/CompletableTest.kt | 53 ++++++++++------ .../test/FlowableExceptionHandlingTest.kt | 29 ++++----- .../test/LeakedExceptionTest.kt | 58 +++++++++++++++++ .../kotlinx-coroutines-rx2/test/MaybeTest.kt | 63 ++++++++++++------- .../test/ObservableExceptionHandlingTest.kt | 29 ++++----- .../test/ObservableTest.kt | 33 ++++++++++ .../kotlinx-coroutines-rx2/test/SingleTest.kt | 57 ++++++++++------- 22 files changed, 414 insertions(+), 221 deletions(-) create mode 100644 reactive/kotlinx-coroutines-rx2/test/LeakedExceptionTest.kt diff --git a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-reactive.txt b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-reactive.txt index 60baa74342..bed065d582 100644 --- a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-reactive.txt +++ b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-reactive.txt @@ -42,11 +42,11 @@ public final class kotlinx/coroutines/reactive/PublishKt { public static final fun publish (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lorg/reactivestreams/Publisher; public static synthetic fun publish$default (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lorg/reactivestreams/Publisher; public static synthetic fun publish$default (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lorg/reactivestreams/Publisher; - public static final fun publishInternal (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lorg/reactivestreams/Publisher; + public static final fun publishInternal (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;Lkotlin/jvm/functions/Function2;)Lorg/reactivestreams/Publisher; } public final class kotlinx/coroutines/reactive/PublisherCoroutine : kotlinx/coroutines/AbstractCoroutine, kotlinx/coroutines/channels/ProducerScope, kotlinx/coroutines/selects/SelectClause2, org/reactivestreams/Subscription { - public fun (Lkotlin/coroutines/CoroutineContext;Lorg/reactivestreams/Subscriber;)V + public fun (Lkotlin/coroutines/CoroutineContext;Lorg/reactivestreams/Subscriber;Lkotlin/jvm/functions/Function2;)V public fun cancel ()V public fun close (Ljava/lang/Throwable;)Z public fun getChannel ()Lkotlinx/coroutines/channels/SendChannel; diff --git a/reactive/kotlinx-coroutines-reactive/src/Publish.kt b/reactive/kotlinx-coroutines-reactive/src/Publish.kt index 0e3a7b8c11..704b7142b3 100644 --- a/reactive/kotlinx-coroutines-reactive/src/Publish.kt +++ b/reactive/kotlinx-coroutines-reactive/src/Publish.kt @@ -17,18 +17,15 @@ import kotlin.internal.LowPriorityInOverloadResolution /** * Creates cold reactive [Publisher] that runs a given [block] in a coroutine. - * Every time the returned publisher is subscribed, it starts a new coroutine. - * Coroutine emits items with `send`. Unsubscribing cancels running coroutine. + * Every time the returned flux is subscribed, it starts a new coroutine in the specified [context]. + * Coroutine emits ([Subscriber.onNext]) values with `send`, completes ([Subscriber.onComplete]) + * when the coroutine completes or channel is explicitly closed and emits error ([Subscriber.onError]) + * if coroutine throws an exception or closes channel with a cause. + * Unsubscribing cancels running coroutine. * * Invocations of `send` are suspended appropriately when subscribers apply back-pressure and to ensure that * `onNext` is not invoked concurrently. * - * | **Coroutine action** | **Signal to subscriber** - * | -------------------------------------------- | ------------------------ - * | `send` | `onNext` - * | Normal completion or `close` without cause | `onComplete` - * | Failure with exception or `close` with cause | `onError` - * * Coroutine context can be specified with [context] argument. * If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used. * Method throws [IllegalArgumentException] if provided [context] contains a [Job] instance. @@ -43,7 +40,7 @@ public fun publish( ): Publisher { require(context[Job] === null) { "Publisher context cannot contain job in it." + "Its lifecycle should be managed via subscription. Had $context" } - return publishInternal(GlobalScope, context, block) + return publishInternal(GlobalScope, context, DEFAULT_HANDLER, block) } @Deprecated( @@ -55,37 +52,39 @@ public fun publish( public fun CoroutineScope.publish( context: CoroutineContext = EmptyCoroutineContext, @BuilderInference block: suspend ProducerScope.() -> Unit -): Publisher = publishInternal(this, context, block) +): Publisher = publishInternal(this, context, DEFAULT_HANDLER ,block) /** @suppress For internal use from other reactive integration modules only */ @InternalCoroutinesApi public fun publishInternal( scope: CoroutineScope, // support for legacy publish in scope context: CoroutineContext, + exceptionOnCancelHandler: (Throwable, CoroutineContext) -> Unit, block: suspend ProducerScope.() -> Unit ): Publisher = Publisher { subscriber -> // specification requires NPE on null subscriber if (subscriber == null) throw NullPointerException("Subscriber cannot be null") val newContext = scope.newCoroutineContext(context) - val coroutine = PublisherCoroutine(newContext, subscriber) + val coroutine = PublisherCoroutine(newContext, subscriber, exceptionOnCancelHandler) subscriber.onSubscribe(coroutine) // do it first (before starting coroutine), to avoid unnecessary suspensions coroutine.start(CoroutineStart.DEFAULT, coroutine, block) } private const val CLOSED = -1L // closed, but have not signalled onCompleted/onError yet private const val SIGNALLED = -2L // already signalled subscriber onCompleted/onError +private val DEFAULT_HANDLER: (Throwable, CoroutineContext) -> Unit = { t, ctx -> if (t !is CancellationException) handleCoroutineException(ctx, t) } @Suppress("CONFLICTING_JVM_DECLARATIONS", "RETURN_TYPE_MISMATCH_ON_INHERITANCE") @InternalCoroutinesApi public class PublisherCoroutine( parentContext: CoroutineContext, - private val subscriber: Subscriber + private val subscriber: Subscriber, + private val exceptionOnCancelHandler: (Throwable, CoroutineContext) -> Unit ) : AbstractCoroutine(parentContext, true), ProducerScope, Subscription, SelectClause2> { override val channel: SendChannel get() = this // Mutex is locked when either nRequested == 0 or while subscriber.onXXX is being invoked private val mutex = Mutex(locked = true) - private val _nRequested = atomic(0L) // < 0 when closed (CLOSED or SIGNALLED) @Volatile @@ -198,7 +197,7 @@ public class PublisherCoroutine( // Specification requires that after cancellation requested we don't call onXXX if (cancelled) { // If the parent had failed to handle our exception, then we must not lose this exception - if (cause != null && !handled) handleCoroutineException(context, cause) + if (cause != null && !handled) exceptionOnCancelHandler(cause, context) return } @@ -217,7 +216,7 @@ public class PublisherCoroutine( */ subscriber.onError(cause) if (!handled && cause.isFatal()) { - handleCoroutineException(context, cause) + exceptionOnCancelHandler(cause, context) } } else { subscriber.onComplete() diff --git a/reactive/kotlinx-coroutines-reactor/src/Flux.kt b/reactive/kotlinx-coroutines-reactor/src/Flux.kt index 389428df11..b6cc1615b0 100644 --- a/reactive/kotlinx-coroutines-reactor/src/Flux.kt +++ b/reactive/kotlinx-coroutines-reactor/src/Flux.kt @@ -10,29 +10,24 @@ package kotlinx.coroutines.reactor import kotlinx.coroutines.* import kotlinx.coroutines.channels.* import kotlinx.coroutines.reactive.* -import org.reactivestreams.Publisher -import reactor.core.CoreSubscriber +import org.reactivestreams.* +import reactor.core.* import reactor.core.publisher.* +import reactor.util.context.* import kotlin.coroutines.* -import kotlin.internal.LowPriorityInOverloadResolution +import kotlin.internal.* /** * Creates cold reactive [Flux] that runs a given [block] in a coroutine. * Every time the returned flux is subscribed, it starts a new coroutine in the specified [context]. - * Coroutine emits items with `send`. Unsubscribing cancels running coroutine. - * - * Coroutine context can be specified with [context] argument. - * If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used. + * Coroutine emits ([Subscriber.onNext]) values with `send`, completes ([Subscriber.onComplete]) + * when the coroutine completes or channel is explicitly closed and emits error ([Subscriber.onError]) + * if coroutine throws an exception or closes channel with a cause. + * Unsubscribing cancels running coroutine. * * Invocations of `send` are suspended appropriately when subscribers apply back-pressure and to ensure that * `onNext` is not invoked concurrently. * - * | **Coroutine action** | **Signal to subscriber** - * | -------------------------------------------- | ------------------------ - * | `send` | `onNext` - * | Normal completion or `close` without cause | `onComplete` - * | Failure with exception or `close` with cause | `onError` - * * Method throws [IllegalArgumentException] if provided [context] contains a [Job] instance. * * **Note: This is an experimental api.** Behaviour of publishers that work as children in a parent scope with respect @@ -71,7 +66,17 @@ private fun reactorPublish( val currentContext = subscriber.currentContext() val reactorContext = (context[ReactorContext]?.context?.putAll(currentContext) ?: currentContext).asCoroutineContext() val newContext = scope.newCoroutineContext(context + reactorContext) - val coroutine = PublisherCoroutine(newContext, subscriber) + val coroutine = PublisherCoroutine(newContext, subscriber, REACTOR_HANDLER) subscriber.onSubscribe(coroutine) // do it first (before starting coroutine), to avoid unnecessary suspensions coroutine.start(CoroutineStart.DEFAULT, coroutine, block) -} \ No newline at end of file +} + +private val REACTOR_HANDLER: (Throwable, CoroutineContext) -> Unit = { e, ctx -> + if (e !is CancellationException) { + try { + Operators.onOperatorError(e, ctx[ReactorContext]?.context ?: Context.empty()) + } catch (e: Throwable) { + handleCoroutineException(ctx, e) + } + } +} diff --git a/reactive/kotlinx-coroutines-reactor/src/Mono.kt b/reactive/kotlinx-coroutines-reactor/src/Mono.kt index 76f0418ea6..415932dd7d 100644 --- a/reactive/kotlinx-coroutines-reactor/src/Mono.kt +++ b/reactive/kotlinx-coroutines-reactor/src/Mono.kt @@ -13,15 +13,10 @@ import kotlin.coroutines.* import kotlin.internal.* /** - * Creates cold [mono][Mono] that will run a given [block] in a coroutine. + * Creates cold [mono][Mono] that will run a given [block] in a coroutine and emits its result. * Every time the returned mono is subscribed, it starts a new coroutine. - * Coroutine returns a single, possibly null value. Unsubscribing cancels running coroutine. - * - * | **Coroutine action** | **Signal to sink** - * | ------------------------------------- | ------------------------ - * | Returns a non-null value | `success(value)` - * | Returns a null | `success` - * | Failure with exception or unsubscribe | `error` + * If [block] result is `null`, [MonoSink.success] is invoked without a value. + * Unsubscribing cancels running coroutine. * * Coroutine context can be specified with [context] argument. * If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used. @@ -64,18 +59,24 @@ private class MonoCoroutine( parentContext: CoroutineContext, private val sink: MonoSink ) : AbstractCoroutine(parentContext, true), Disposable { - var disposed = false + @Volatile + private var disposed = false override fun onCompleted(value: T) { - if (!disposed) { - if (value == null) sink.success() else sink.success(value) - } + if (value == null) sink.success() else sink.success(value) } override fun onCancelled(cause: Throwable, handled: Boolean) { - if (!disposed) { - sink.error(cause) - } else if (!handled) { + try { + /* + * sink.error handles exceptions on its own and, by default, handling of undeliverable exceptions is a no-op. + * Guard potentially non-empty handlers against meaningless cancellation exceptions + */ + if (getCancellationException() !== cause) { + sink.error(cause) + } + } catch (e: Throwable) { + // In case of improper error implementation or fatal exceptions handleCoroutineException(context, cause) } } diff --git a/reactive/kotlinx-coroutines-reactor/test/FluxTest.kt b/reactive/kotlinx-coroutines-reactor/test/FluxTest.kt index ee26455ec8..2562c9d3db 100644 --- a/reactive/kotlinx-coroutines-reactor/test/FluxTest.kt +++ b/reactive/kotlinx-coroutines-reactor/test/FluxTest.kt @@ -5,6 +5,7 @@ package kotlinx.coroutines.reactor import kotlinx.coroutines.* +import kotlinx.coroutines.flow.* import kotlinx.coroutines.reactive.* import org.hamcrest.core.* import org.junit.* @@ -130,4 +131,15 @@ class FluxTest : TestBase() { fun testIllegalArgumentException() { assertFailsWith { flux(Job()) { } } } + + @Test + fun testLeakedException() = runBlocking { + // Test exception is not reported to global handler + val flow = flux { throw TestException() }.asFlow() + repeat(2000) { + combine(flow, flow) { _, _ -> Unit } + .catch {} + .collect { } + } + } } \ No newline at end of file diff --git a/reactive/kotlinx-coroutines-reactor/test/MonoTest.kt b/reactive/kotlinx-coroutines-reactor/test/MonoTest.kt index 2283d45afc..223ba7be5d 100644 --- a/reactive/kotlinx-coroutines-reactor/test/MonoTest.kt +++ b/reactive/kotlinx-coroutines-reactor/test/MonoTest.kt @@ -5,13 +5,17 @@ package kotlinx.coroutines.reactor import kotlinx.coroutines.* +import kotlinx.coroutines.flow.* import kotlinx.coroutines.reactive.* import org.hamcrest.core.* import org.junit.* import org.junit.Assert.* import org.reactivestreams.* import reactor.core.publisher.* +import reactor.util.context.* +import java.time.* import java.time.Duration.* +import java.util.function.* class MonoTest : TestBase() { @Before @@ -217,11 +221,13 @@ class MonoTest : TestBase() { fun testUnhandledException() = runTest { expect(1) var subscription: Subscription? = null - val mono = mono(currentDispatcher() + CoroutineExceptionHandler { _, t -> + val handler = BiFunction { t, _ -> assertTrue(t is TestException) expect(5) + t + } - }) { + val mono = mono(currentDispatcher()) { expect(4) subscription!!.cancel() // cancel our own subscription, so that delay will get cancelled try { @@ -229,7 +235,7 @@ class MonoTest : TestBase() { } finally { throw TestException() // would not be able to handle it since mono is disposed } - } + }.subscriberContext { Context.of("reactor.onOperatorError.local", handler) } mono.subscribe(object : Subscriber { override fun onSubscribe(s: Subscription) { expect(2) @@ -248,4 +254,35 @@ class MonoTest : TestBase() { fun testIllegalArgumentException() { assertFailsWith { mono(Job()) { } } } + + @Test + fun testExceptionAfterCancellation() = runTest { + // Test exception is not reported to global handler + Flux + .interval(ofMillis(1)) + .switchMap { + mono(coroutineContext) { + timeBomb().awaitFirst() + } + } + .onErrorReturn({ + expect(1) + true + }, 42) + .blockLast() + finish(2) + } + + private fun timeBomb() = Mono.delay(Duration.ofMillis(1)).doOnSuccess { throw Exception("something went wrong") } + + @Test + fun testLeakedException() = runBlocking { + // Test exception is not reported to global handler + val flow = mono { throw TestException() }.toFlux().asFlow() + repeat(10000) { + combine(flow, flow) { _, _ -> Unit } + .catch {} + .collect { } + } + } } diff --git a/reactive/kotlinx-coroutines-rx2/src/RxCancellable.kt b/reactive/kotlinx-coroutines-rx2/src/RxCancellable.kt index d76f12315b..a0c32f9f8a 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxCancellable.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxCancellable.kt @@ -5,10 +5,21 @@ package kotlinx.coroutines.rx2 import io.reactivex.functions.* +import io.reactivex.plugins.* import kotlinx.coroutines.* +import kotlin.coroutines.* internal class RxCancellable(private val job: Job) : Cancellable { override fun cancel() { job.cancel() } +} + +internal fun handleUndeliverableException(cause: Throwable, context: CoroutineContext) { + if (cause is CancellationException) return // Async CE should be completely ignored + try { + RxJavaPlugins.onError(cause) + } catch (e: Throwable) { + handleCoroutineException(context, cause) + } } \ No newline at end of file diff --git a/reactive/kotlinx-coroutines-rx2/src/RxCompletable.kt b/reactive/kotlinx-coroutines-rx2/src/RxCompletable.kt index c59b4bd6b5..ab96844c60 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxCompletable.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxCompletable.kt @@ -12,15 +12,9 @@ import kotlin.coroutines.* import kotlin.internal.* /** - * Creates cold [Completable] that runs a given [block] in a coroutine. + * Creates cold [Completable] that runs a given [block] in a coroutine and emits its result. * Every time the returned completable is subscribed, it starts a new coroutine. * Unsubscribing cancels running coroutine. - * - * | **Coroutine action** | **Signal to subscriber** - * | ------------------------------------- | ------------------------ - * | Completes successfully | `onCompleted` - * | Failure with exception or unsubscribe | `onError` - * * Coroutine context can be specified with [context] argument. * If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used. * Method throws [IllegalArgumentException] if provided [context] contains a [Job] instance. @@ -62,21 +56,19 @@ private class RxCompletableCoroutine( ) : AbstractCoroutine(parentContext, true) { override fun onCompleted(value: Unit) { try { - if (!subscriber.isDisposed) subscriber.onComplete() + subscriber.onComplete() } catch (e: Throwable) { - handleCoroutineException(context, e) + handleUndeliverableException(e, context) } } override fun onCancelled(cause: Throwable, handled: Boolean) { - if (!subscriber.isDisposed) { - try { - subscriber.onError(cause) - } catch (e: Throwable) { - handleCoroutineException(context, e) + try { + if (!subscriber.tryOnError(cause)) { + handleUndeliverableException(cause, context) } - } else if (!handled) { - handleCoroutineException(context, cause) + } catch (e: Throwable) { + handleUndeliverableException(e, context) } } } diff --git a/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt b/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt index 8df6caeeed..bd369cad55 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt @@ -94,9 +94,13 @@ public fun Flow.asObservable() : Observable = Observable.create { emitter.onComplete() } catch (e: Throwable) { // 'create' provides safe emitter, so we can unconditionally call on* here if exception occurs in `onComplete` - if (e !is CancellationException) emitter.onError(e) - else emitter.onComplete() - + if (e !is CancellationException) { + if (!emitter.tryOnError(e)) { + handleUndeliverableException(e, coroutineContext) + } + } else { + emitter.onComplete() + } } } emitter.setCancellable(RxCancellable(job)) diff --git a/reactive/kotlinx-coroutines-rx2/src/RxFlowable.kt b/reactive/kotlinx-coroutines-rx2/src/RxFlowable.kt index 30a1ed7e92..7924a3f15c 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxFlowable.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxFlowable.kt @@ -16,17 +16,15 @@ import kotlin.internal.* /** * Creates cold [flowable][Flowable] that will run a given [block] in a coroutine. * Every time the returned flowable is subscribed, it starts a new coroutine. - * Coroutine emits items with `send`. Unsubscribing cancels running coroutine. + * + * Coroutine emits ([ObservableEmitter.onNext]) values with `send`, completes ([ObservableEmitter.onComplete]) + * when the coroutine completes or channel is explicitly closed and emits error ([ObservableEmitter.onError]) + * if coroutine throws an exception or closes channel with a cause. + * Unsubscribing cancels running coroutine. * * Invocations of `send` are suspended appropriately when subscribers apply back-pressure and to ensure that * `onNext` is not invoked concurrently. * - * | **Coroutine action** | **Signal to subscriber** - * | -------------------------------------------- | ------------------------ - * | `send` | `onNext` - * | Normal completion or `close` without cause | `onComplete` - * | Failure with exception or `close` with cause | `onError` - * * Coroutine context can be specified with [context] argument. * If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used. * Method throws [IllegalArgumentException] if provided [context] contains a [Job] instance. @@ -40,7 +38,7 @@ public fun rxFlowable( ): Flowable { require(context[Job] === null) { "Flowable context cannot contain job in it." + "Its lifecycle should be managed via Disposable handle. Had $context" } - return Flowable.fromPublisher(publishInternal(GlobalScope, context, block)) + return Flowable.fromPublisher(publishInternal(GlobalScope, context, RX_HANDLER, block)) } @Deprecated( @@ -52,4 +50,6 @@ public fun rxFlowable( public fun CoroutineScope.rxFlowable( context: CoroutineContext = EmptyCoroutineContext, @BuilderInference block: suspend ProducerScope.() -> Unit -): Flowable = Flowable.fromPublisher(publishInternal(this, context, block)) +): Flowable = Flowable.fromPublisher(publishInternal(this, context, RX_HANDLER, block)) + +private val RX_HANDLER: (Throwable, CoroutineContext) -> Unit = ::handleUndeliverableException \ No newline at end of file diff --git a/reactive/kotlinx-coroutines-rx2/src/RxMaybe.kt b/reactive/kotlinx-coroutines-rx2/src/RxMaybe.kt index 9f176e938c..9fb5f650f4 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxMaybe.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxMaybe.kt @@ -12,16 +12,10 @@ import kotlin.coroutines.* import kotlin.internal.* /** - * Creates cold [maybe][Maybe] that will run a given [block] in a coroutine. + * Creates cold [maybe][Maybe] that will run a given [block] in a coroutine and emits its result. + * If [block] result is `null`, [onComplete][MaybeObserver.onComplete] is invoked without a value. * Every time the returned observable is subscribed, it starts a new coroutine. - * Coroutine returns a single, possibly null value. Unsubscribing cancels running coroutine. - * - * | **Coroutine action** | **Signal to subscriber** - * | ------------------------------------- | ------------------------ - * | Returns a non-null value | `onSuccess` - * | Returns a null | `onComplete` - * | Failure with exception or unsubscribe | `onError` - * + * Unsubscribing cancels running coroutine. * Coroutine context can be specified with [context] argument. * If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used. * Method throws [IllegalArgumentException] if provided [context] contains a [Job] instance. @@ -62,24 +56,20 @@ private class RxMaybeCoroutine( private val subscriber: MaybeEmitter ) : AbstractCoroutine(parentContext, true) { override fun onCompleted(value: T) { - if (!subscriber.isDisposed) { - try { - if (value == null) subscriber.onComplete() else subscriber.onSuccess(value) - } catch(e: Throwable) { - handleCoroutineException(context, e) - } + try { + if (value == null) subscriber.onComplete() else subscriber.onSuccess(value) + } catch (e: Throwable) { + handleUndeliverableException(e, context) } } override fun onCancelled(cause: Throwable, handled: Boolean) { - if (!subscriber.isDisposed) { - try { - subscriber.onError(cause) - } catch (e: Throwable) { - handleCoroutineException(context, e) + try { + if (!subscriber.tryOnError(cause)) { + handleUndeliverableException(cause, context) } - } else if (!handled) { - handleCoroutineException(context, cause) + } catch (e: Throwable) { + handleUndeliverableException(e, context) } } } diff --git a/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt b/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt index 6ccf0f0bae..b8de66df08 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt @@ -19,16 +19,14 @@ import kotlin.internal.* /** * Creates cold [observable][Observable] that will run a given [block] in a coroutine. * Every time the returned observable is subscribed, it starts a new coroutine. - * Coroutine emits items with `send`. Unsubscribing cancels running coroutine. * - * Invocations of `send` are suspended appropriately to ensure that `onNext` is not invoked concurrently. - * Note that Rx 2.x [Observable] **does not support backpressure**. Use [rxFlowable]. + * Coroutine emits ([ObservableEmitter.onNext]) values with `send`, completes ([ObservableEmitter.onComplete]) + * when the coroutine completes or channel is explicitly closed and emits error ([ObservableEmitter.onError]) + * if coroutine throws an exception or closes channel with a cause. + * Unsubscribing cancels running coroutine. * - * | **Coroutine action** | **Signal to subscriber** - * | -------------------------------------------- | ------------------------ - * | `send` | `onNext` - * | Normal completion or `close` without cause | `onComplete` - * | Failure with exception or `close` with cause | `onError` + * Invocations of `send` are suspended appropriately to ensure that `onNext` is not invoked concurrently. + * Note that Rx 2.x [Observable] **does not support backpressure**. * * Coroutine context can be specified with [context] argument. * If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used. @@ -170,9 +168,9 @@ private class RxObservableCoroutine( * by coroutines machinery, anyway, they should not be present in regular program flow, * thus our goal here is just to expose it as soon as possible. */ - subscriber.onError(cause) + subscriber.tryOnError(cause) if (!handled && cause.isFatal()) { - handleCoroutineException(context, cause) + handleUndeliverableException(cause, context) } } else { @@ -180,7 +178,7 @@ private class RxObservableCoroutine( } } catch (e: Throwable) { // Unhandled exception (cannot handle in other way, since we are already complete) - handleCoroutineException(context, e) + handleUndeliverableException(e, context) } } } finally { @@ -208,4 +206,4 @@ internal fun Throwable.isFatal() = try { false } catch (e: Throwable) { true -} \ No newline at end of file +} diff --git a/reactive/kotlinx-coroutines-rx2/src/RxScheduler.kt b/reactive/kotlinx-coroutines-rx2/src/RxScheduler.kt index 53fbaf6505..610a5bcd6d 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxScheduler.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxScheduler.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.rx2 @@ -17,7 +17,6 @@ public fun Scheduler.asCoroutineDispatcher(): SchedulerCoroutineDispatcher = Sch /** * Implements [CoroutineDispatcher] on top of an arbitrary [Scheduler]. - * @param scheduler a scheduler. */ public class SchedulerCoroutineDispatcher( /** diff --git a/reactive/kotlinx-coroutines-rx2/src/RxSingle.kt b/reactive/kotlinx-coroutines-rx2/src/RxSingle.kt index f3573ee6eb..07088909b5 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxSingle.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxSingle.kt @@ -12,15 +12,9 @@ import kotlin.coroutines.* import kotlin.internal.* /** - * Creates cold [single][Single] that will run a given [block] in a coroutine. + * Creates cold [single][Single] that will run a given [block] in a coroutine and emits its result. * Every time the returned observable is subscribed, it starts a new coroutine. - * Coroutine returns a single value. Unsubscribing cancels running coroutine. - * - * | **Coroutine action** | **Signal to subscriber** - * | ------------------------------------- | ------------------------ - * | Returns a value | `onSuccess` - * | Failure with exception or unsubscribe | `onError` - * + * Unsubscribing cancels running coroutine. * Coroutine context can be specified with [context] argument. * If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used. * Method throws [IllegalArgumentException] if provided [context] contains a [Job] instance. @@ -62,21 +56,19 @@ private class RxSingleCoroutine( ) : AbstractCoroutine(parentContext, true) { override fun onCompleted(value: T) { try { - if (!subscriber.isDisposed) subscriber.onSuccess(value) + subscriber.onSuccess(value) } catch (e: Throwable) { - handleCoroutineException(context, e) + handleUndeliverableException(e, context) } } override fun onCancelled(cause: Throwable, handled: Boolean) { - if (!subscriber.isDisposed) { - try { - subscriber.onError(cause) - } catch (e: Throwable) { - handleCoroutineException(context, e) + try { + if (!subscriber.tryOnError(cause)) { + handleUndeliverableException(cause, context) } - } else if (!handled) { - handleCoroutineException(context, cause) + } catch (e: Throwable) { + handleUndeliverableException(e, context) } } } diff --git a/reactive/kotlinx-coroutines-rx2/test/Check.kt b/reactive/kotlinx-coroutines-rx2/test/Check.kt index 29eda6fa00..beb2c43a3d 100644 --- a/reactive/kotlinx-coroutines-rx2/test/Check.kt +++ b/reactive/kotlinx-coroutines-rx2/test/Check.kt @@ -5,6 +5,8 @@ package kotlinx.coroutines.rx2 import io.reactivex.* +import io.reactivex.functions.Consumer +import io.reactivex.plugins.* fun checkSingleValue( observable: Observable, @@ -64,3 +66,12 @@ fun checkErroneous( } } +inline fun withExceptionHandler(noinline handler: (Throwable) -> Unit, block: () -> Unit) { + val original = RxJavaPlugins.getErrorHandler() + RxJavaPlugins.setErrorHandler { handler(it) } + try { + block() + } finally { + RxJavaPlugins.setErrorHandler(original) + } +} \ No newline at end of file diff --git a/reactive/kotlinx-coroutines-rx2/test/CompletableTest.kt b/reactive/kotlinx-coroutines-rx2/test/CompletableTest.kt index 9a12bafb1d..04e869758c 100644 --- a/reactive/kotlinx-coroutines-rx2/test/CompletableTest.kt +++ b/reactive/kotlinx-coroutines-rx2/test/CompletableTest.kt @@ -6,6 +6,7 @@ package kotlinx.coroutines.rx2 import io.reactivex.* import io.reactivex.disposables.* +import io.reactivex.exceptions.* import kotlinx.coroutines.* import org.hamcrest.core.* import org.junit.* @@ -122,11 +123,11 @@ class CompletableTest : TestBase() { fun testUnhandledException() = runTest() { expect(1) var disposable: Disposable? = null - val eh = CoroutineExceptionHandler { _, t -> - assertTrue(t is TestException) + val handler = { e: Throwable -> + assertTrue(e is UndeliverableException && e.cause is TestException) expect(5) } - val completable = rxCompletable(currentDispatcher() + eh) { + val completable = rxCompletable(currentDispatcher()) { expect(4) disposable!!.dispose() // cancel our own subscription, so that delay will get cancelled try { @@ -135,26 +136,40 @@ class CompletableTest : TestBase() { throw TestException() // would not be able to handle it since mono is disposed } } - completable.subscribe(object : CompletableObserver { - override fun onSubscribe(d: Disposable) { - expect(2) - disposable = d - } - override fun onComplete() { expectUnreached() } - override fun onError(t: Throwable) { expectUnreached() } - }) - expect(3) - yield() // run coroutine - finish(6) + withExceptionHandler(handler) { + completable.subscribe(object : CompletableObserver { + override fun onSubscribe(d: Disposable) { + expect(2) + disposable = d + } + + override fun onComplete() { + expectUnreached() + } + + override fun onError(t: Throwable) { + expectUnreached() + } + }) + expect(3) + yield() // run coroutine + finish(6) + } } @Test fun testFatalExceptionInSubscribe() = runTest { - rxCompletable(Dispatchers.Unconfined + CoroutineExceptionHandler{ _, e -> assertTrue(e is LinkageError); expect(2)}) { - expect(1) - 42 - }.subscribe({ throw LinkageError() }) - finish(3) + val handler: (Throwable) -> Unit = { e -> + assertTrue(e is UndeliverableException && e.cause is LinkageError); expect(2) + } + + withExceptionHandler(handler) { + rxCompletable(Dispatchers.Unconfined) { + expect(1) + 42 + }.subscribe({ throw LinkageError() }) + finish(3) + } } @Test diff --git a/reactive/kotlinx-coroutines-rx2/test/FlowableExceptionHandlingTest.kt b/reactive/kotlinx-coroutines-rx2/test/FlowableExceptionHandlingTest.kt index 4f3e7241c6..05b7ee92b6 100644 --- a/reactive/kotlinx-coroutines-rx2/test/FlowableExceptionHandlingTest.kt +++ b/reactive/kotlinx-coroutines-rx2/test/FlowableExceptionHandlingTest.kt @@ -4,6 +4,7 @@ package kotlinx.coroutines.rx2 +import io.reactivex.exceptions.* import kotlinx.coroutines.* import org.junit.* import org.junit.Test @@ -16,15 +17,15 @@ class FlowableExceptionHandlingTest : TestBase() { ignoreLostThreads("RxComputationThreadPool-", "RxCachedWorkerPoolEvictor-", "RxSchedulerPurge-") } - private inline fun ceh(expect: Int) = CoroutineExceptionHandler { _, t -> - assertTrue(t is T) + private inline fun handler(expect: Int) = { t: Throwable -> + assertTrue(t is UndeliverableException && t.cause is T) expect(expect) } private fun cehUnreached() = CoroutineExceptionHandler { _, _ -> expectUnreached() } @Test - fun testException() = runTest { + fun testException() = withExceptionHandler({ expectUnreached() }) { rxFlowable(Dispatchers.Unconfined + cehUnreached()) { expect(1) throw TestException() @@ -37,8 +38,8 @@ class FlowableExceptionHandlingTest : TestBase() { } @Test - fun testFatalException() = runTest { - rxFlowable(Dispatchers.Unconfined + ceh(3)) { + fun testFatalException() = withExceptionHandler(handler(3)) { + rxFlowable(Dispatchers.Unconfined) { expect(1) throw LinkageError() }.subscribe({ @@ -50,7 +51,7 @@ class FlowableExceptionHandlingTest : TestBase() { } @Test - fun testExceptionAsynchronous() = runTest { + fun testExceptionAsynchronous() = withExceptionHandler({ expectUnreached() }) { rxFlowable(Dispatchers.Unconfined + cehUnreached()) { expect(1) throw TestException() @@ -65,8 +66,8 @@ class FlowableExceptionHandlingTest : TestBase() { } @Test - fun testFatalExceptionAsynchronous() = runTest { - rxFlowable(Dispatchers.Unconfined + ceh(3)) { + fun testFatalExceptionAsynchronous() = withExceptionHandler(handler(3)) { + rxFlowable(Dispatchers.Unconfined) { expect(1) throw LinkageError() }.publish() @@ -80,8 +81,8 @@ class FlowableExceptionHandlingTest : TestBase() { } @Test - fun testFatalExceptionFromSubscribe() = runTest { - rxFlowable(Dispatchers.Unconfined + ceh(4)) { + fun testFatalExceptionFromSubscribe() = withExceptionHandler(handler(4)) { + rxFlowable(Dispatchers.Unconfined) { expect(1) send(Unit) }.subscribe({ @@ -92,7 +93,7 @@ class FlowableExceptionHandlingTest : TestBase() { } @Test - fun testExceptionFromSubscribe() = runTest { + fun testExceptionFromSubscribe() = withExceptionHandler({ expectUnreached() }) { rxFlowable(Dispatchers.Unconfined + cehUnreached()) { expect(1) send(Unit) @@ -104,7 +105,7 @@ class FlowableExceptionHandlingTest : TestBase() { } @Test - fun testAsynchronousExceptionFromSubscribe() = runTest { + fun testAsynchronousExceptionFromSubscribe() = withExceptionHandler({ expectUnreached() }) { rxFlowable(Dispatchers.Unconfined + cehUnreached()) { expect(1) send(Unit) @@ -118,8 +119,8 @@ class FlowableExceptionHandlingTest : TestBase() { } @Test - fun testAsynchronousFatalExceptionFromSubscribe() = runTest { - rxFlowable(Dispatchers.Unconfined + ceh(3)) { + fun testAsynchronousFatalExceptionFromSubscribe() = withExceptionHandler(handler(3)) { + rxFlowable(Dispatchers.Unconfined) { expect(1) send(Unit) }.publish() diff --git a/reactive/kotlinx-coroutines-rx2/test/LeakedExceptionTest.kt b/reactive/kotlinx-coroutines-rx2/test/LeakedExceptionTest.kt new file mode 100644 index 0000000000..1430dbf381 --- /dev/null +++ b/reactive/kotlinx-coroutines-rx2/test/LeakedExceptionTest.kt @@ -0,0 +1,58 @@ +/* + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.rx2 + +import io.reactivex.* +import io.reactivex.exceptions.* +import io.reactivex.plugins.* +import kotlinx.coroutines.* +import kotlinx.coroutines.flow.* +import kotlinx.coroutines.reactive.* +import org.junit.Test +import java.io.* +import kotlin.test.* + +// Check that exception is not leaked to the global exception handler +class LeakedExceptionTest : TestBase() { + + private val handler: (Throwable) -> Unit = + { assertTrue { it is UndeliverableException && it.cause is TestException } } + + @Test + fun testSingle() = withExceptionHandler(handler) { + val flow = rxSingle { throw TestException() }.toFlowable().asFlow() + runBlocking { + repeat(10000) { + combine(flow, flow) { _, _ -> Unit } + .catch {} + .collect { } + } + } + } + + @Test + fun testObservable() = withExceptionHandler(handler) { + val flow = rxObservable { throw TestException() }.toFlowable(BackpressureStrategy.BUFFER).asFlow() + runBlocking { + repeat(10000) { + combine(flow, flow) { _, _ -> Unit } + .catch {} + .collect { } + } + } + } + + @Test + fun testFlowable() = withExceptionHandler(handler) { + val flow = rxFlowable { throw TestException() }.asFlow() + runBlocking { + repeat(10000) { + combine(flow, flow) { _, _ -> Unit } + .catch {} + .collect { } + } + } + } +} diff --git a/reactive/kotlinx-coroutines-rx2/test/MaybeTest.kt b/reactive/kotlinx-coroutines-rx2/test/MaybeTest.kt index 326c83e45c..deca961e6d 100644 --- a/reactive/kotlinx-coroutines-rx2/test/MaybeTest.kt +++ b/reactive/kotlinx-coroutines-rx2/test/MaybeTest.kt @@ -6,6 +6,7 @@ package kotlinx.coroutines.rx2 import io.reactivex.* import io.reactivex.disposables.* +import io.reactivex.exceptions.* import io.reactivex.functions.* import io.reactivex.internal.functions.Functions.* import kotlinx.coroutines.* @@ -66,8 +67,8 @@ class MaybeTest : TestBase() { expectUnreached() }, { error -> expect(5) - Assert.assertThat(error, IsInstanceOf(RuntimeException::class.java)) - Assert.assertThat(error.message, IsEqual("OK")) + assertThat(error, IsInstanceOf(RuntimeException::class.java)) + assertThat(error.message, IsEqual("OK")) }) expect(3) yield() // to started coroutine @@ -251,11 +252,11 @@ class MaybeTest : TestBase() { fun testUnhandledException() = runTest { expect(1) var disposable: Disposable? = null - val eh = CoroutineExceptionHandler { _, t -> - assertTrue(t is TestException) + val handler = { e: Throwable -> + assertTrue(e is UndeliverableException && e.cause is TestException) expect(5) } - val maybe = rxMaybe(currentDispatcher() + eh) { + val maybe = rxMaybe(currentDispatcher()) { expect(4) disposable!!.dispose() // cancel our own subscription, so that delay will get cancelled try { @@ -264,27 +265,45 @@ class MaybeTest : TestBase() { throw TestException() // would not be able to handle it since mono is disposed } } - maybe.subscribe(object : MaybeObserver { - override fun onSubscribe(d: Disposable) { - expect(2) - disposable = d - } - override fun onComplete() { expectUnreached() } - override fun onSuccess(t: Unit) { expectUnreached() } - override fun onError(t: Throwable) { expectUnreached() } - }) - expect(3) - yield() // run coroutine - finish(6) + withExceptionHandler(handler) { + maybe.subscribe(object : MaybeObserver { + override fun onSubscribe(d: Disposable) { + expect(2) + disposable = d + } + + override fun onComplete() { + expectUnreached() + } + + override fun onSuccess(t: Unit) { + expectUnreached() + } + + override fun onError(t: Throwable) { + expectUnreached() + } + }) + expect(3) + yield() // run coroutine + finish(6) + } } @Test fun testFatalExceptionInSubscribe() = runTest { - rxMaybe(Dispatchers.Unconfined + CoroutineExceptionHandler{ _, e -> assertTrue(e is LinkageError); expect(2)}) { - expect(1) - 42 - }.subscribe({ throw LinkageError() }) - finish(3) + val handler = { e: Throwable -> + assertTrue(e is UndeliverableException && e.cause is LinkageError) + expect(2) + } + + withExceptionHandler(handler) { + rxMaybe(Dispatchers.Unconfined) { + expect(1) + 42 + }.subscribe({ throw LinkageError() }) + finish(3) + } } @Test diff --git a/reactive/kotlinx-coroutines-rx2/test/ObservableExceptionHandlingTest.kt b/reactive/kotlinx-coroutines-rx2/test/ObservableExceptionHandlingTest.kt index 6d247cfab7..d6cdd3ca24 100644 --- a/reactive/kotlinx-coroutines-rx2/test/ObservableExceptionHandlingTest.kt +++ b/reactive/kotlinx-coroutines-rx2/test/ObservableExceptionHandlingTest.kt @@ -4,6 +4,7 @@ package kotlinx.coroutines.rx2 +import io.reactivex.exceptions.* import kotlinx.coroutines.* import org.junit.* import org.junit.Test @@ -16,15 +17,15 @@ class ObservableExceptionHandlingTest : TestBase() { ignoreLostThreads("RxComputationThreadPool-", "RxCachedWorkerPoolEvictor-", "RxSchedulerPurge-") } - private inline fun ceh(expect: Int) = CoroutineExceptionHandler { _, t -> - assertTrue(t is T) + private inline fun handler(expect: Int) = { t: Throwable -> + assertTrue(t is UndeliverableException && t.cause is T) expect(expect) } private fun cehUnreached() = CoroutineExceptionHandler { _, _ -> expectUnreached() } @Test - fun testException() = runTest { + fun testException() = withExceptionHandler({ expectUnreached() }) { rxObservable(Dispatchers.Unconfined + cehUnreached()) { expect(1) throw TestException() @@ -37,8 +38,8 @@ class ObservableExceptionHandlingTest : TestBase() { } @Test - fun testFatalException() = runTest { - rxObservable(Dispatchers.Unconfined + ceh(3)) { + fun testFatalException() = withExceptionHandler(handler(3)) { + rxObservable(Dispatchers.Unconfined) { expect(1) throw LinkageError() }.subscribe({ @@ -50,7 +51,7 @@ class ObservableExceptionHandlingTest : TestBase() { } @Test - fun testExceptionAsynchronous() = runTest { + fun testExceptionAsynchronous() = withExceptionHandler({ expectUnreached() }) { rxObservable(Dispatchers.Unconfined) { expect(1) throw TestException() @@ -65,8 +66,8 @@ class ObservableExceptionHandlingTest : TestBase() { } @Test - fun testFatalExceptionAsynchronous() = runTest { - rxObservable(Dispatchers.Unconfined + ceh(3)) { + fun testFatalExceptionAsynchronous() = withExceptionHandler(handler(3)) { + rxObservable(Dispatchers.Unconfined) { expect(1) throw LinkageError() }.publish() @@ -80,8 +81,8 @@ class ObservableExceptionHandlingTest : TestBase() { } @Test - fun testFatalExceptionFromSubscribe() = runTest { - rxObservable(Dispatchers.Unconfined + ceh(4)) { + fun testFatalExceptionFromSubscribe() = withExceptionHandler(handler(4)) { + rxObservable(Dispatchers.Unconfined) { expect(1) send(Unit) }.subscribe({ @@ -92,7 +93,7 @@ class ObservableExceptionHandlingTest : TestBase() { } @Test - fun testExceptionFromSubscribe() = runTest { + fun testExceptionFromSubscribe() = withExceptionHandler({ expectUnreached() }) { rxObservable(Dispatchers.Unconfined) { expect(1) send(Unit) @@ -104,7 +105,7 @@ class ObservableExceptionHandlingTest : TestBase() { } @Test - fun testAsynchronousExceptionFromSubscribe() = runTest { + fun testAsynchronousExceptionFromSubscribe() = withExceptionHandler({ expectUnreached() }) { rxObservable(Dispatchers.Unconfined) { expect(1) send(Unit) @@ -118,8 +119,8 @@ class ObservableExceptionHandlingTest : TestBase() { } @Test - fun testAsynchronousFatalExceptionFromSubscribe() = runTest { - rxObservable(Dispatchers.Unconfined + ceh(4)) { + fun testAsynchronousFatalExceptionFromSubscribe() = withExceptionHandler(handler(4)) { + rxObservable(Dispatchers.Unconfined) { expect(1) send(Unit) }.publish() diff --git a/reactive/kotlinx-coroutines-rx2/test/ObservableTest.kt b/reactive/kotlinx-coroutines-rx2/test/ObservableTest.kt index c71ef566ba..b9f6fe35a6 100644 --- a/reactive/kotlinx-coroutines-rx2/test/ObservableTest.kt +++ b/reactive/kotlinx-coroutines-rx2/test/ObservableTest.kt @@ -4,13 +4,22 @@ package kotlinx.coroutines.rx2 +import io.reactivex.* +import io.reactivex.plugins.* import kotlinx.coroutines.* +import kotlinx.coroutines.CancellationException import org.hamcrest.core.* import org.junit.* import org.junit.Test +import java.util.concurrent.* import kotlin.test.* class ObservableTest : TestBase() { + @Before + fun setup() { + ignoreLostThreads("RxComputationThreadPool-", "RxCachedWorkerPoolEvictor-", "RxSchedulerPurge-") + } + @Test fun testBasicSuccess() = runBlocking { expect(1) @@ -129,4 +138,28 @@ class ObservableTest : TestBase() { expect(4) } } + + @Test + fun testExceptionAfterCancellation() { + // Test that no exceptions were reported to the global EH (it will fail the test if so) + val handler = { e: Throwable -> + assertFalse(e is CancellationException) + } + withExceptionHandler(handler) { + RxJavaPlugins.setErrorHandler { + require(it !is CancellationException) + } + Observable + .interval(1, TimeUnit.MILLISECONDS) + .take(1000) + .switchMapSingle { + rxSingle { + timeBomb().await() + } + } + .blockingSubscribe({}, {}) + } + } + + private fun timeBomb() = Single.timer(1, TimeUnit.MILLISECONDS).doOnSuccess { throw TestException() } } \ No newline at end of file diff --git a/reactive/kotlinx-coroutines-rx2/test/SingleTest.kt b/reactive/kotlinx-coroutines-rx2/test/SingleTest.kt index 9251149375..d9581f86f9 100644 --- a/reactive/kotlinx-coroutines-rx2/test/SingleTest.kt +++ b/reactive/kotlinx-coroutines-rx2/test/SingleTest.kt @@ -6,6 +6,7 @@ package kotlinx.coroutines.rx2 import io.reactivex.* import io.reactivex.disposables.* +import io.reactivex.exceptions.* import io.reactivex.functions.* import kotlinx.coroutines.* import org.hamcrest.core.* @@ -201,13 +202,19 @@ class SingleTest : TestBase() { @Test fun testFatalExceptionInSubscribe() = runTest { - rxSingle(Dispatchers.Unconfined + CoroutineExceptionHandler { _, e -> assertTrue(e is LinkageError); expect(2) }) { - expect(1) - 42 - }.subscribe(Consumer { - throw LinkageError() - }) - finish(3) + val handler = { e: Throwable -> + assertTrue(e is UndeliverableException && e.cause is LinkageError) + expect(2) + } + withExceptionHandler(handler) { + rxSingle(Dispatchers.Unconfined) { + expect(1) + 42 + }.subscribe(Consumer { + throw LinkageError() + }) + finish(3) + } } @Test @@ -223,11 +230,11 @@ class SingleTest : TestBase() { fun testUnhandledException() = runTest { expect(1) var disposable: Disposable? = null - val eh = CoroutineExceptionHandler { _, t -> - assertTrue(t is TestException) + val handler = { e: Throwable -> + assertTrue(e is UndeliverableException && e.cause is TestException) expect(5) } - val single = rxSingle(currentDispatcher() + eh) { + val single = rxSingle(currentDispatcher()) { expect(4) disposable!!.dispose() // cancel our own subscription, so that delay will get cancelled try { @@ -236,16 +243,24 @@ class SingleTest : TestBase() { throw TestException() // would not be able to handle it since mono is disposed } } - single.subscribe(object : SingleObserver { - override fun onSubscribe(d: Disposable) { - expect(2) - disposable = d - } - override fun onSuccess(t: Unit) { expectUnreached() } - override fun onError(t: Throwable) { expectUnreached() } - }) - expect(3) - yield() // run coroutine - finish(6) + withExceptionHandler(handler) { + single.subscribe(object : SingleObserver { + override fun onSubscribe(d: Disposable) { + expect(2) + disposable = d + } + + override fun onSuccess(t: Unit) { + expectUnreached() + } + + override fun onError(t: Throwable) { + expectUnreached() + } + }) + expect(3) + yield() // run coroutine + finish(6) + } } }