Skip to content

rxAwait calls the uncaught exception handler #252

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
PaulWoitaschek opened this issue Feb 20, 2018 · 21 comments
Closed

rxAwait calls the uncaught exception handler #252

PaulWoitaschek opened this issue Feb 20, 2018 · 21 comments
Assignees

Comments

@PaulWoitaschek
Copy link
Contributor

Why does the following code calls the uncaught exception handler?

As there is an onError supplied I expect it to be called instead.

import io.reactivex.Observable
import io.reactivex.Single
import kotlinx.coroutines.experimental.rx2.await
import kotlinx.coroutines.experimental.rx2.rxSingle
import java.io.IOException
import java.util.concurrent.TimeUnit

fun main(args: Array<String>) {
  Observable
    .interval(1, TimeUnit.MILLISECONDS)
    .switchMapSingle {
      rxSingle {
        timeBomb().await()
      }
    }
    .subscribe({}, {})

  Thread.sleep(1000)
}

private fun timeBomb() = Single.timer(1, TimeUnit.MILLISECONDS)
  .doOnSuccess { throw IOException("") }
@jcornaz
Copy link
Contributor

jcornaz commented Feb 20, 2018

I may be wrong, but I think it's more an Rx issue. I have the same problem without coroutines

import io.reactivex.Observable
import io.reactivex.Single
import java.util.concurrent.TimeUnit

fun main(args: Array<String>) {
  Observable
      .interval(1, TimeUnit.MILLISECONDS)
      .switchMapSingle {
        timeBomb()
      }
      .blockingSubscribe(
          { println("element = $it") },
          { println("error: ${it.message}") }
      )
}

private fun timeBomb() = Single.timer(1, TimeUnit.MILLISECONDS)
    .doOnSuccess { throw Exception("something went wrong") }

@PaulWoitaschek
Copy link
Contributor Author

PaulWoitaschek commented Feb 20, 2018

Nope, that one calls the RxJavaPlugins.setErrorHandler error handler with an UndeliverableException. My example calls the threads UncaughtExceptionHandler

@jcornaz
Copy link
Contributor

jcornaz commented Feb 21, 2018

Yes indeed sorry.

However, Rx shouldn't throw an UndeliverableException if the onError is implemented (which is the case i my example).

So what is your expected behavior ?
Do you want the exception to be delivered to Rx error handler instead of the UncaughtExceptionHandler ?
Or do you want only the onError to be called and have no uncaught exception ? (In this case even the plain rx code doesn't work)

@PaulWoitaschek
Copy link
Contributor Author

I expect exceptions that can't be delivered inside the RxJava chain to be delivered to the RxJavaPlugins.

@akarnokd What do you think is the correct behavior here?

@akarnokd
Copy link

You have switchMapSingle that cancels the current outstanding Single when switching. At this point the await crashes and since that Single is no longer the current, it counts as an undeliverable exception.

Many blocking (IO) APIs respond to async cancellations with exceptions so that their blocking awaiters can be unblocked and resumed with a failure. I assume coroutines do the same minus the blocking part.

@elizarov
Copy link
Contributor

I'm not sure if there anything that can be done in coroutines, however I find it related to the issue #251 Maybe we can find some common solution to both.

@PaulWoitaschek
Copy link
Contributor Author

Well in RxJava, exceptions that cannot be delivered, get wrapped in an UndeliverableException and forwarded to the error handler by RxJavaPlugins#onError

This is nice because I can set an error handler and don't crash my app.

I expect the coroutine to catch that excpetion if it's not deliverable and forward it to the plugins too.
Else I would need a generic try-catch around every place where I let coroutines and rx interop.

@elizarov
Copy link
Contributor

The current approach in kotlinx.coroutines is that all undeliverable (uncaught) exception are forwarded to CoroutineExceptionHandler that you can explicitly install in your context to prevent crashes. However, the desire is to have a design that does not result in "racy" uncaught exceptions (that happen only when the certain events happen at the exactly right time).

@PaulWoitaschek
Copy link
Contributor Author

Ah that's nice. So what about installing this by default in all rxXYZ methods?

object RxCoroutineExceptionHandler : AbstractCoroutineContextElement(CoroutineExceptionHandler),
  CoroutineExceptionHandler {

  override fun handleException(context: CoroutineContext, exception: Throwable) {
    RxJavaPlugins.onError(exception)
  }
}

So it can be used like

fun <T> rxSingle(
  context: CoroutineContext = DefaultDispatcher,
  parent: Job? = null,
  block: suspend CoroutineScope.() -> T
): Single<T> = Single.create { subscriber ->
    val newContext = newCoroutineContext(context + RxCoroutineExceptionHandler, parent)
    val coroutine = RxSingleCoroutine(newContext, subscriber)
    subscriber.setCancellable(coroutine)
    coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
}

@elizarov
Copy link
Contributor

I don't know if that is the right approach. I'm wary of statics. This way, the behavior of some library code that uses rxSingle will non-trivially depend on some static initialization that was done elsewhere. I'd concentrate on fixing the core issue of racy exceptions.

@PaulWoitaschek
Copy link
Contributor Author

Any news on this? This makes coroutines-rx integration quite painful for me.
I always have try-catch blocks because else my app crashes unpredicatbly.

@elizarov
Copy link
Contributor

elizarov commented Aug 7, 2018

I've revisited topic starter code and I still don't see what can be done about it in kotlinx.coroutines. The timeBomb function has a bug in it. It crashes inside doOnSuccess handler which, in Rx world, results in io.reactivex.exceptions.UndeliverableException: java.io.IOException. In Rx world all such exceptions are routed to Rx error handler. So if add the following code the beginning of main:

    RxJavaPlugins.setErrorHandler { e ->
        println("ErrorHandler received exception $e")
    }

then I see that it is being invoked from time to time.

How is that related to kotlinx.coroutines and what can we possibly do about it?

@PaulWoitaschek
Copy link
Contributor Author

If that was the case I'd be happy.

For me it doesn't:

import io.reactivex.Observable
import io.reactivex.Single
import io.reactivex.plugins.RxJavaPlugins
import kotlinx.coroutines.experimental.rx2.await
import kotlinx.coroutines.experimental.rx2.rxSingle
import java.io.IOException
import java.util.concurrent.TimeUnit

fun main(args: Array<String>) {
  RxJavaPlugins.setErrorHandler {
    println("error handler called")
  }
  Observable
      .interval(1, TimeUnit.MILLISECONDS)
      .switchMapSingle {
        rxSingle {
          timeBomb().await()
        }
      }
      .subscribe({}, {})

  Thread.sleep(1000)
}

private fun timeBomb() = Single.timer(1, TimeUnit.MILLISECONDS)
    .doOnSuccess { throw IOException("") }

It crashes the current thread:

Exception in thread "RxComputationThreadPool-6" java.io.IOException: 
	at CRKt$timeBomb$1.accept(CR.kt:26)
	at CRKt$timeBomb$1.accept(CR.kt)
	at io.reactivex.internal.operators.single.SingleDoOnSuccess$DoOnSuccess.onSuccess(SingleDoOnSuccess.java:53)
	at io.reactivex.internal.operators.single.SingleTimer$TimerDisposable.run(SingleTimer.java:56)
	at io.reactivex.internal.schedulers.ScheduledDirectTask.call(ScheduledDirectTask.java:38)
	at io.reactivex.internal.schedulers.ScheduledDirectTask.call(ScheduledDirectTask.java:26)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Exception in thread "RxComputationThreadPool-3" java.io.IOException: 
	at CRKt$timeBomb$1.accept(CR.kt:26)
	at CRKt$timeBomb$1.accept(CR.kt)
	at io.reactivex.internal.operators.single.SingleDoOnSuccess$DoOnSuccess.onSuccess(SingleDoOnSuccess.java:53)
	at io.reactivex.internal.operators.single.SingleTimer$TimerDisposable.run(SingleTimer.java:56)
	at io.reactivex.internal.schedulers.ScheduledDirectTask.call(ScheduledDirectTask.java:38)
	at io.reactivex.internal.schedulers.ScheduledDirectTask.call(ScheduledDirectTask.java:26)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Exception in thread "RxComputationThreadPool-3" java.io.IOException: 
	at CRKt$timeBomb$1.accept(CR.kt:26)
	at CRKt$timeBomb$1.accept(CR.kt)
	at io.reactivex.internal.operators.single.SingleDoOnSuccess$DoOnSuccess.onSuccess(SingleDoOnSuccess.java:53)
	at io.reactivex.internal.operators.single.SingleTimer$TimerDisposable.run(SingleTimer.java:56)
	at io.reactivex.internal.schedulers.ScheduledDirectTask.call(ScheduledDirectTask.java:38)
	at io.reactivex.internal.schedulers.ScheduledDirectTask.call(ScheduledDirectTask.java:26)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

@PaulWoitaschek
Copy link
Contributor Author

Actually it was not happening all the time: This code is reproducible for me:

fun main(args: Array<String>) {
  RxJavaPlugins.setErrorHandler {
    println("error handler called")
  }
  val timeBomb = Single.timer(1, TimeUnit.NANOSECONDS)
      .doOnSuccess { throw IOException("") }
  Observable
      .interval(1, TimeUnit.MILLISECONDS)
      .switchMapSingle {
        rxSingle {
          timeBomb.await()
        }.onErrorReturnItem(1)
      }
      .test()
      .await()
}

@elizarov elizarov self-assigned this Aug 14, 2018
@elizarov
Copy link
Contributor

elizarov commented Aug 14, 2018

Thanks a lot for this reproducer. It indeed clearly shows a problem. The solution is not obvious though. What I can recommend so far is to install CoroutineExceptionHandler that delegates to your RxJavaPlugins error handler:

val coroutineHandler = CoroutineExceptionHandler { _, t -> 
    RxJavaPlugings.getErrorHandler().accept(t)
}
....
rxSingle(coroutineHandler) { ... }

That will ensure that your "async exceptions" are handled in the same way regardless of whether they were caught by Rx or by coroutines machinery.

@akarnokd
Copy link

For v2, it is RxJavaPlugins.onError(exception);.

@PaulWoitaschek
Copy link
Contributor Author

Thank's, I do that for some time now.

It is crutial to catch CancellationExceptions, else they end up in your RxJava error handler.

import io.reactivex.plugins.RxJavaPlugins
import kotlinx.coroutines.experimental.CancellationException
import kotlinx.coroutines.experimental.CoroutineExceptionHandler
import kotlin.coroutines.experimental.AbstractCoroutineContextElement
import kotlin.coroutines.experimental.CoroutineContext

object RxCoroutineExceptionHandler : AbstractCoroutineContextElement(CoroutineExceptionHandler),
  CoroutineExceptionHandler {

  override fun handleException(context: CoroutineContext, exception: Throwable) {
    if (exception is CancellationException) return
    RxJavaPlugins.onError(exception)
  }
}

My initial suggestion was to install this by default in the rxXYZ functions. What about that?

@elizarov
Copy link
Contributor

We could use a default coroutine exception handler for rxXXX function that delegates to Rx error handler. I wonder if that could break anything else.

@qwwdfsad
Copy link
Collaborator

qwwdfsad commented Aug 20, 2018

In general, it's not an rx-specific issue and the same problem arises in any cancellable asynchronous API. And there is no good solution: one is to ignore unhandled exceptions (which can lead to a lot of hidden/undebuggable bugs), the other is to always handle them and crash related coroutines (which can lead to a lot of pain with try-catch machinery).

Thus unconditionally installing a default rx handler can lead to an undesired behaviour. Moreover, it's easy to forget rx-handler during the launch of not rx-based nested coroutines.

We may do the following:

  1. Introduce RxCoroutineExceptionHandler
  2. After Change default for global vs child coroutines by scoping coroutine builder (structured concurrency) #410 introduce RxScope with pre-installed RxCoroutineExceptionHandler.

Then you can choose between (standard) GlobalScope.rxSingle { ... } and RxScope.rxSingle { ... }

@fabioCollini
Copy link

Hey everyone, any updates about this issue? We had the same problem described in this issue, we have fixed it using the RxCoroutineExceptionHandler in all the rxSingle/rxCompletable invocation.

Is this the best way to solve it? Should RxCoroutineExceptionHandler be included in the library as default exception handler for all the methods that transforms a suspending method into an rx object?

qwwdfsad added a commit that referenced this issue Oct 29, 2019
…tegrations

Use tryOnError in RxJava to make exception delivery check-and-act race free.
Deliver undeliverable exceptions via RxJavaPlugins instead of handleCoroutineException.
This is a deliberate choice for a multiple reasons:
    * When using Rx (whether with coroutines or not), undeliverable exceptions are inevitable and users should hook into RxJavaPlugins anyway. We don't want to force them using Rx-specific CoroutineExceptionHandler all over the place
    * Undeliverable exceptions provide additional helpful stacktrace and proper way to distinguish them from other unhandled exceptions
    * Be consistent with reactor where we don't have try*, thus cannot provide a completely consistent experience with CEH (at least, without wrapping all the subscribers)

Do the similar in Reactor integration, but without try*, Reactor does not have notion of undeliverable exceoptions and handles them via Operators.* on its own.

Also, get rid of ASCII tables that are not properly render in IDEA

Fixes #252
Fixes #1614
@qwwdfsad
Copy link
Collaborator

@fabioCollini yes, I've made the fix for 1.3.3.
The current solution is to report undeliverable exceptions to RxJavaPlugins, so @jcornaz example without coroutines and original one work similarly.

qwwdfsad added a commit that referenced this issue Dec 5, 2019
…tegrations

Use tryOnError in RxJava to make exception delivery check-and-act race free.
Deliver undeliverable exceptions via RxJavaPlugins instead of handleCoroutineException.
This is a deliberate choice for a multiple reasons:
    * When using Rx (whether with coroutines or not), undeliverable exceptions are inevitable and users should hook into RxJavaPlugins anyway. We don't want to force them using Rx-specific CoroutineExceptionHandler all over the place
    * Undeliverable exceptions provide additional helpful stacktrace and proper way to distinguish them from other unhandled exceptions
    * Be consistent with reactor where we don't have try*, thus cannot provide a completely consistent experience with CEH (at least, without wrapping all the subscribers)

Do the similar in Reactor integration, but without try*, Reactor does not have notion of undeliverable exceoptions and handles them via Operators.* on its own.

Also, get rid of ASCII tables that are not properly render in IDEA

Fixes #252
Fixes #1614
qwwdfsad added a commit that referenced this issue Dec 5, 2019
…tegrations

Use tryOnError in RxJava to make exception delivery check-and-act race free.
Deliver undeliverable exceptions via RxJavaPlugins instead of handleCoroutineException.
This is a deliberate choice for a multiple reasons:
  * When using Rx (whether with coroutines or not), undeliverable exceptions are inevitable and users should hook into RxJavaPlugins anyway. We don't want to force them using Rx-specific CoroutineExceptionHandler all over the place
  * Undeliverable exceptions provide additional helpful stacktrace and proper way to distinguish them from other unhandled exceptions
  * Be consistent with reactor where we don't have try*, thus cannot provide a completely consistent experience with CEH (at least, without wrapping all the subscribers)\

Do the similar in Reactor integration, but without try*, Reactor does not have notion of undeliverable exceoptions and handles them via Operators.* on its own.

Also, get rid of ASCII tables that are not properly render in IDEA

Fixes #252
Fixes #1614
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

6 participants