Skip to content

SharedFlow didn't cancel or throw exception with callbackFlow #2368

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
waqasakram117 opened this issue Nov 5, 2020 · 5 comments
Closed

SharedFlow didn't cancel or throw exception with callbackFlow #2368

waqasakram117 opened this issue Nov 5, 2020 · 5 comments

Comments

@waqasakram117
Copy link

waqasakram117 commented Nov 5, 2020

The exception didn't throw to parent scope or cancel itself while using SharedFlow with callbackFlow. I tried to cancel sharedflow in case of failure. Here is my code snippet

fun adapt(call: Call<T>): SharedFlow<T> {
        val scope = CoroutineScope(dispatcher)
        return callbackFlow<T> {
            call.enqueue(object : Callback<T> {
                override fun onResponse(call: Call<T>, response: Response<T>) {
                    sendBlocking(response.body())
                    close()
                }

                override fun onFailure(call: Call<T>, t: Throwable) {
                    close(t)
                }
            })
            awaitClose()
        }.catch {
            /**
             *Here caught exception that threw in Failure case
             * Try to cancel Job so SharedFlow can terminate but It didn't work However catching exception here won't crash my app
             */
            scope.cancel()
        }.shareIn(scope, SharingStarted.Lazily, 1)
    }

I used it

runBlocking{

//this will get first output in case of success and then completed this coroutine
//But In case of Failure, this will always wait to get the first item and never completed

    val result = adapt(someCall).first()
}

Here is another way tried to accomplish the same thing

fun adapt(call: Call<T>): SharedFlow<T> {
        val scope = CoroutineScope(dispatcher)
        return callbackFlow<T> {
            call.enqueue(object : Callback<T> {
                override fun onResponse(call: Call<T>, response: Response<T>) {
                    sendBlocking(response.body())
                    close()
                }

                override fun onFailure(call: Call<T>, t: Throwable) {
                    close(t)
                }
            })
            awaitClose()
        }.shareIn(scope, SharingStarted.Lazily, 1).catch {
            /**
             *This catch block never invoked in case of failure
             * Try to cancel Job so sharedFlow can terminate its coroutine
             */
              currentCoroutineContext().cancel()
        }
    }

I used this as previous but this time

runBlocking{
//this will get first output in case of success and then completed this coroutine
//But In case of Failure App crashes
    val result = adapt(someCall).first()

}

sharedflow.collect|first()|take(1){} stays always in resume state I want to cancel it in case of failure.

@elizarov
Copy link
Contributor

elizarov commented Nov 5, 2020

Sorry, but I cannot figure out what you are trying to achieve. Do you run into the problem with catch before shareIn or with catch after shareIn? Can you, please, provide a self-contained reproducer code that demonstrates what you try to do and explain what you have expected this code to do?

@waqasakram117
Copy link
Author

@elizarov I added more details could you please check it now?

@elizarov
Copy link
Contributor

elizarov commented Nov 5, 2020

SharedFlow never completes (see more in the docs).

To achive what you need you can do something like this:

val flow = callbackFlow { ... }
    .catch { e -> 
        // catch before shareIn operator (!!!)
        emit(ErrorValue) // emit some special error value to indicate that error happended
    }
   .shareIn(....) 
   .takeWhile { it != ErrorValue } // take the flow until there's no error

Because you use takeWhile operator the resulting flow will complete.

Does it help?

P.S. In the future we plan to provide some ready-to-use operators for that. See #2092

@waqasakram117
Copy link
Author

waqasakram117 commented Nov 5, 2020

Yes this is a nice trick but I wonder why

val flow = callbackFlow { ... }
   .shareIn(....) 
   .catch(...)//this won't work

catch after shareIn didn't catch anything and app crashes.
And

val flow = callbackFlow { ... }
    .catch { e -> 
      scope.cancel()
    }
   .shareIn(scope,.,.) 

Didn't work.

@elizarov
Copy link
Contributor

elizarov commented Nov 5, 2020

catch on SharedFlow is meaningless, since SharedFlow never completes. I'll add the corresponding Lint checks with warnings to explain it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants