Skip to content

2.x: inconsistent error handling in onErrorResumeNext after completion #6571

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
davideberlein opened this issue Jul 15, 2019 · 9 comments
Closed

Comments

@davideberlein
Copy link

davideberlein commented Jul 15, 2019

Tested using RxJava v2.2.10

Use Case:

We want to handle a known exception that can occur anywhere in a long observable stream using onErrorResumeNext.
The Exception is thrown due to async behavior: A resource is closed which triggers the a takeUntil however due to async operation (observeOn) in the stream, an operator may still access the closed resource causing the exception.

Problem

  1. The observable emits a value
  2. During the execution of an operator A (in this case map) the takeUntil is triggered
  3. The subscription completes
  4. Operator A throws an Exception
  5. XXX

Depending on which operators are between the operator that throws the exception and the onErrorResumeNext RX behaves differently in step 5, not allowing us to properly handle the Error:

  • doOnNext, map, concatMap and probably most others: onErrorResumeNext is called and handles the error
  • switchMap: the Exception is completely swallowed, neither onErrorResumeNext is called nor an UndeliverableException is thrown or printed anywhere.
  • flatMap: A UndeliverableException is thrown.

For us all cases except for the flatMap behavior is okay.

Example:

final Subject<String> takeUntil$ = BehaviorSubject.create();
Observable.just("Some value")
		.map(e -> {
			takeUntil$.onNext("complete");
			throw new RuntimeException("Exception after unsubscribe");
		})
		// Add any of these to test the different cases:
		//.doOnNext(e -> System.out.println("On Next"))
		//.map(v -> v)
		//.concatMap(o -> Observable.just(o))
		//.switchMap(o -> Observable.just(o))
		.flatMap(o -> Observable.just(o))
		.onErrorResumeNext(e -> {
			System.out.println(System.currentTimeMillis() + " onErrorResumeNext GOT ERROR: " + e);
			return Observable.empty();
		})
		.takeUntil(takeUntil$)
		.subscribe(v -> System.out.println("GOT NEXT" + v), v -> System.out.println("Subscribe GOT ERROR" + v), () -> System.out.println(System.currentTimeMillis() + " COMPLETED"));

Exception Printed with flatMap:

io.reactivex.exceptions.UndeliverableException: The exception could not be delivered to the consumer because it has already canceled/disposed the flow or the exception has nowhere to go to begin with. Further reading: https://github.com/ReactiveX/RxJava/wiki/What's-different-in-2.0#error-handling | java.lang.RuntimeException: Exception after unsubscribe
    at io.reactivex.plugins.RxJavaPlugins.onError(RxJavaPlugins.java:349)
    at io.reactivex.internal.operators.observable.ObservableFlatMap$MergeObserver.onError(ObservableFlatMap.java:290)
    at io.reactivex.internal.observers.BasicFuseableObserver.onError(BasicFuseableObserver.java:100)
    at io.reactivex.internal.observers.BasicFuseableObserver.fail(BasicFuseableObserver.java:110)
    at io.reactivex.internal.operators.observable.ObservableMap$MapObserver.onNext(ObservableMap.java:61)
    at io.reactivex.internal.operators.observable.ObservableScalarXMap$ScalarDisposable.run(ObservableScalarXMap.java:248)
    at io.reactivex.internal.operators.observable.ObservableJust.subscribeActual(ObservableJust.java:35)
    at io.reactivex.Observable.subscribe(Observable.java:10981)
    at io.reactivex.internal.operators.observable.ObservableMap.subscribeActual(ObservableMap.java:33)
    at io.reactivex.Observable.subscribe(Observable.java:10981)
    at io.reactivex.internal.operators.observable.ObservableFlatMap.subscribeActual(ObservableFlatMap.java:55)
    at io.reactivex.Observable.subscribe(Observable.java:10981)
    at io.reactivex.internal.operators.observable.ObservableOnErrorNext.subscribeActual(ObservableOnErrorNext.java:38)
    at io.reactivex.Observable.subscribe(Observable.java:10981)
    at io.reactivex.internal.operators.observable.ObservableTakeUntil.subscribeActual(ObservableTakeUntil.java:41)
    at io.reactivex.Observable.subscribe(Observable.java:10981)
    at io.reactivex.Observable.subscribe(Observable.java:10967)
    at io.reactivex.Observable.subscribe(Observable.java:10927)
    at test.Main.main(Main.java:29)
Caused by: java.lang.RuntimeException: Exception after unsubscribe
    at test.Main.lambda$0(Main.java:17)
    at io.reactivex.internal.operators.observable.ObservableMap$MapObserver.onNext(ObservableMap.java:59)
    ... 14 more```
@akarnokd
Copy link
Member

With operators that are async boundaries themselves, signaling an UndeliverableException is to be expected. I'm not sure why switchMap doesn't do that but I'll investigate.

As for the map-onErrorResumeNext case, synchronous operators don't react eagerly to cancellation and thus some side-effects on the error channel may be triggered. If this is not desired, the onTerminateDetach operator will break the signal channels and suppress signals passing through upon cancellation.

The fundamental problem though is the API that crashes for getting cancelled/interrupted. These are very common due to the vast amount of blocking APIs that have been created over the decades. One of the solutions is to not let them crash into RxJava.

Handling item or resource lifecylce is beyond the scope of RxJava. The operator using may provide some means for cleanup but there is no support for preventing use-after-cleanup directly. Practically you'd have to confine the use of the resource to a thread and send computations to this thread via the flow, thus a terminal state can't be bypassed by further computations.

@davideberlein
Copy link
Author

@akarnokd thanks for the fast response. I understand your reasoning, however RxJava makes it is really hard to do a proper central error handling for this sort of API / Resource which is very unfortunate.
I guess for the moment we have to add onErrorResumeNext before every flatMap to make sure the error isn't propagated to the global error handler because moving the access to the resource to a synchronized thread isn't currently possible. Also adding try / catches to every access isn't a very elegant solution since we basically don't care if the Exception is thrown and simply want to complete gracefully in this case.

@davideberlein
Copy link
Author

@akarnokd an other thing I was wondering about is, why does concatMap propagate the error down the stream whereas flatMap and switchMap(after your fix) delegate to the global error handler?
I understand that map is a synchronous operator simply handing the error down stream, but concatMap isn't synchronous so shouldn't it behave like the other 2?

@akarnokd
Copy link
Member

why does concatMap propagate the error down the stream

I'm not sure, I have to check them.

@akarnokd
Copy link
Member

why does concatMap propagate the error down the stream

concatMap talks more or less directly to the downstream and since cancel doesn't terminate the error aggregation, you get an error pass-through. The other operators use a so-called drain loop with explicit is-cancelled testing and thus were ignoring errors. The fix for all of them is the same though.

@davideberlein
Copy link
Author

davideberlein commented Jul 17, 2019

Thanks for checking!

The fix for all of them is the same though.

Sorry, I didn't quite understand which fix you mean here?

@akarnokd
Copy link
Member

#6572

@davideberlein
Copy link
Author

Okay, I agree!

@akarnokd
Copy link
Member

akarnokd commented Aug 5, 2019

I don't think the error handling can be resolved in 2.x because how drastically the behavior changes. 3.x will be more thorough in this regard.

@akarnokd akarnokd closed this as completed Aug 5, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants