Skip to content

Suspending broadcast flow emission #1901

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
fluidsonic opened this issue Apr 4, 2020 · 7 comments
Closed

Suspending broadcast flow emission #1901

fluidsonic opened this issue Apr 4, 2020 · 7 comments

Comments

@fluidsonic
Copy link

fluidsonic commented Apr 4, 2020

I propose to add a Flow operation that broadcasts upstream values downstream without involving a BroadcastChannel and thus suspending the emitter until the broadcast for a value is complete.

Status quo on suspending emitters

  • (1) emit()ing a value to a Flow suspends the emitter until the collector has returned from the collection lambda for that value.
  • (2) If we broadcast Flow values using .broadcastIn(scope).asFlow() the emitter is no longer suspended until the collector(s) have returned from the collection lambda for that value.
  • (3) There is no easy way to emit() a value to multiple collectors where the emitter remains suspended until all collectors have processed that value.

Example output

(1) Emitting two values [1, 2] to one collector looks like this:

1: before emit
1: processing started
1: processing completed
1: after emit
2: before emit
2: processing started
2: processing completed
2: after emit

(2) Emitting two values [1, 2] to multiple collectors through a BroadcastChannel looks like this:

1: before emit
1: after emit
2: before emit
2: after emit
1: processing A started
1: processing B started
1: processing A completed
1: processing B completed
2: processing A started
2: processing B started
2: processing A completed
2: processing B completed

(3) The desired behavior is that two values [1, 2] emitted to multiple collectors looks like this:

1: before emit
1: processing A started
1: processing A completed
1: processing B started
1: processing B completed
1: after emit
2: before emit
2: processing A started
2: processing A completed
2: processing B started
2: processing B completed
2: after emit

Use case

My use case for (3) are events where the event emitter has to wait for all event subscribers to fully process the event.

Example

  • AuthenticationManager has a Flow<WipeDataEvent>.
  • When a WipeDataEvent is emitted by the AuthenticationManager all subscribers must wipe the data they have stored.
  • Only after all subscribers have completed wiping their data the AuthenticationManager can continue with its work, e.g. the sign-out/sign-in process.

.broadcast() example for (3)

The following code is very naive. It merely illustrates how the behavior in the example output for (3) can be achieved.

  • Whether downstream collectors are executed in parallel or serially could be specified by introducing a parameter to .broadcast().
  • Whether the emitter is suspended until all downstream collectors have completed could be specified by introducing another parameter. Non-suspending would behave similarly to .broadcastIn(scope).asFlow() but not involve a channel.
fun <T> Flow<T>.broadcast(): Flow<T> =
    BroadcastFlow(this)

@OptIn(FlowPreview::class)
class BroadcastFlow<T>(
    private val flow: Flow<T>
) : AbstractFlow<T>() {

    private val collectors: MutableList<Pair<FlowCollector<T>, CoroutineContext>> = mutableListOf()
    private var completion: CompletableDeferred<Unit>? = null
    private val mutex = Mutex()


    override suspend fun collectSafely(collector: FlowCollector<T>) {
        mutex.lock()

        val completion = completion
  
        collectors += collector to coroutineContext

        try {
            if (completion != null) {
                mutex.unlock()

                completion.await()
            } else {
                val completion = CompletableDeferred<Unit>()
                this@BroadcastFlow.completion = completion

                mutex.unlock()

                try {
                    flow.collect { value ->
                        mutex.withLock { collectors.toList() }
                            .forEach { (collector, coroutineContext) ->
                                withContext(coroutineContext) {
                                    collector.emit(value)
                                }
                            }
                    }

                    completion.complete(Unit)
                } catch (e: Throwable) {
                    completion.completeExceptionally(e)

                    throw e
                }
            }
        } finally {
            mutex.withLock {
                collectors.removeIf { it.first === collector }
            }
        }
    }
}

A fully working example which creates all of the output mentioned above can be found here:
https://gist.github.com/fluidsonic/f7b2b0084f184932ea3be4cf0074496a

@fluidsonic fluidsonic changed the title Synchronous broadcast flow Suspending broadcast flow emission Apr 4, 2020
@elizarov
Copy link
Contributor

elizarov commented Apr 7, 2020

What is your use-case? What kind of application scenario calls for this kind of behavior?

@fluidsonic
Copy link
Author

At the moment I would be beneficial in the sign-in and sign-out processes of an Android app.

  1. An AuthenticationManager performs sign-in/sign-out operations (verify credentials with API, save token, etc.).
  2. When done, it sends respective events (UserSignedIn / UserSignedOut) to subscribers. They now have a chance for set-up work (e.g. fetch & save user object to DB on sign-in) or clean-up work (e.g. wipe all user data in DB, memory & disk on sign-out) before the AuthenticationManager completes the process and gives control back to the UI.
  3. Once everyone had their chance to set things up, the UI can safely transition to logged-in/out state.

If I write that without subscriptions, the AuthenticationManager (or an intermediate component) would have to know all components that need to perform some work synchronously upon sign-in/sign-out.

If I use a synchronous Flow, synchronous event bus or similar then the AuthenticationManager just needs to know that there are some components that need to do some related work and waits for them to finish.

I've heard that a Flow.share() operator is planned that I think covers such a use case already?

@elizarov
Copy link
Contributor

elizarov commented Apr 7, 2020

I'm not sure that Flow (which is cold and reactive) is a good fit for this kind of use-case to start with. Have you tried just doing a regular suspending callbacks? E.g. define the property in your AuthenticationManager:

val authProcessors = ArrayList<suspend (AuthEvent) -> Unit>()

Here, I suppose that AuthEvent is a super class for UserSignedIn and UserSignedOut. Now, in your subscribers that should do some required processing of those events, write:

authManager.authProcessors += { event -> ... process event }

In the code of the AuthenticationManager itself, when you have an authentication event and need to wait until every component has processed it, you just write:

authProcessors.forEach { it(event) }

This looks pretty straigtforward and simple to me. Would it work for you?

@fluidsonic
Copy link
Author

fluidsonic commented Apr 8, 2020

That for sure would work.
But I'd have to build things on top which I have for free in Flow, like unsubscribing when the CoroutineScope is closed for example.

I just wonder why I can use Flow to

  • flow events to one consumer synchronously
  • flow events to one consumer asynchronously (through channel or by using launch/etc.)
  • flow events to many consumers asynchronously (through channel or by using launch/etc.)

but not

  • flow events to many consumers synchronously

It just feels weird that I have to make an exception here and use Flow for all the other cases.

Maybe Flows aren't a good idea for events in general?
Or the synchronous single-consumer case isn't something I should rely on?

@elizarov
Copy link
Contributor

elizarov commented Apr 8, 2020

Flow for events is a complicated story, indeed. There was a big discussion in #1082 and one outcome for that is that we might introduce some kind of EventFlow that would exactly cover your use-case of flowing to many consumers synchronously.

@elizarov
Copy link
Contributor

Here is a design for SharedFlow that covers this use-case for suspending broadcast flow emission. See #2034

@elizarov
Copy link
Contributor

It looks like it is now safe to close this issue, as its use-cases are fully accounted for by #2034.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants