Skip to content

StateFlow-like primitive for big states with delta updates #3316

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
elizarov opened this issue Jun 7, 2022 · 5 comments
Open

StateFlow-like primitive for big states with delta updates #3316

elizarov opened this issue Jun 7, 2022 · 5 comments

Comments

@elizarov
Copy link
Contributor

elizarov commented Jun 7, 2022

A common problem often appears when writing applications with a state flow in cases when a state is quite a big object, for example some kind of a list of events or other entities, a map, or something similar. These kinds of states are usually transferred over network as a one-time snapshot followed by incremental updates, and there is no ready-to-use primitive to represent them as a StateFlow.

A solution shall cleanly address two sides of the problem:

Ingress: turning a snapshot of the current state plus a sequence of updates into some kind of Flow representation.
Egress: turning a Flow representation into a snapshot of a current state, plus a sequence of updates.

A trivial solution is to use StateFlow<State> as a flow representation, which might be acceptable for moderately-sized state objects. In this representation, the ingress problem is solved like this:

val flow: StaetFlow<State> = flow<State> {
    var currentState = getCurrentStateSnapshot()
    emit(currentState)
    forEachIncomingUpdate { update -> 
        currentState = currentState.apply(update) // compute updated state
        emit(currentState) // emit updated state
    }
}.stateIn(scope) // convert into StateFlow<State>

And the egress problem can be solved like this:

var lastState = emptyState() // domain-specific empty state
flow.collect { state -> 
    val update = state.diffFrom(lastState) // compute delta between states
    processUpdate(update)
    lastState = state
}

This implementation assumes that we have fun State.diffFrom(other: State): Update that computes a difference between two states and represents them as an update. This is usually possible and would work fine as long as the states are not very big. However, this does not scale easily. If state big (for example, a list of 100s of thousands of items), then typically-quadratic implementation of apply and diffFrom functions will make it too slow to be used in practice.

An efficient design will work somewhat like a combination of StateFlow<State> and SharedFlow<Update>. It will rely on the apply function only and will internally maintain a current state snapshot to be delivered to the new subscribed, followed by the corresponding updates in sync. To make this kind of design representable as a Flow I would suggest to limit ourselves to case where State <: Update (a value space of states is a subset of a value space of update) or, saying it another way, only to cases where sending a n State snapshot is just one kind of Update operation. We will represent the result as a Flow<Update> and will need a new kind of name for this primitive. Let's call it, tentatively, UpdateFlow for now.

A sketch of the corresponding API might look like this. On ingress side one writes:

val flow: UpdateFlow<Update> = flow<Update> { 
    emit(getCurrentStateSnapshot()) // Note: State <: Update
    forEachIncomingUpdate { update -> emit(update) }
}.updateIn(state, ::apply) // convert into UpdateFlow<State>

The apply will need to a type of (Update, Update) -> Update), that is a function that is capable of merging a snapshot plus update, and an arbitrary pair of updates into an update. It will be used internally to conflate updates for slow subscribers.

On egress side one writes:

flow.collect { update -> 
    // Note: the first update is guaranteed to be a snapshot
    processUpdate(update) 
}

All names in this "design" as TBD

@dovchinnikov
Copy link
Contributor

We have use cases for this in IJ and would be very glad to see this in the library.

  1. In IJ we provide extension points, each plugin/module can register an extension. As plugins can be loaded or unloaded dynamically, it's very convenient to receive a stream (I intentionally omit flow at the moment) of updates. I'm not really fond of State <: Update as it will require handling initial State differently, but it will only happen once. Each new subscriber should initially receive a bunch of Added updates as if it was subscribed from the very beginning before the extensions appeared in the collection, and then continue receiving Added/Removed as the time goes on.
sealed interface Update<T>  {
  class Added<T>(val value: T) : Update<T>
  class Removed<T>(val value: T) : Update<T>
}
val ep: ExtensionPoint = ...
launch {
  ep.extensions.subscribe { update: Update -> 
    when (update) {
      is Added -> // 
      is Removed -> // 
    }
  }
}

It would be great to have a capability to squash Added-Removed pair into nothing, e.g. if a new subscriber did not yet receive an Added update, and Removed with the same value is already in the queue.

  1. In editor tabs we have to compute diff between two lists to yield an Added update.

  2. In navigation popups we don't use coroutines yet, but we'd like to have a view model which is updatable from a BG thread, and which is rendered on EDT incrementally, i.e. handles additions/removals.

  3. In remote dev we have UI models representing lists/tables/etc, and they are expected to be transferred over the network exactly as you suggest.

I'll not list every use case here, but we do have a few with the same semantics: a collection + updates of it.

It may be useful to receive custom data in updates, like Added update over a list should contain an index, so it might be more useful to design a primitive, on top of which various custom models can be built.

Ironically, the closest interface we have for list + updates is javax.swing.ListModel with its javax.swing.event.ListDataListener.

@elizarov
Copy link
Contributor Author

elizarov commented Jun 29, 2023

@dovchinnikov Let me elaborate how I see your use-case 1 encoded in the strawman design I've sketched.

Here I'd actually use List<Update> as the type in the update flow. You will provide a function that that takes a pair of such List<Update> objects and combines them into a single List<Update> object. The simplest implementation just concatenates these two lists, but in an optimized version you'll implement squashing of Add-Remove pairs in the combined list. This will ensure that the first list of updates that any subscriber receives will actually consist of just a list of Add operations only.

In the end, you'll get Flow<List<Update>> that you'll process like this:

extensions.onEach { updates: List<Update> -> 
    for (update in updates) when (update) {
      is Added -> // 
      is Removed -> // 
    }        
}

@dovchinnikov
Copy link
Contributor

Looks good, I'd like to try this in real life

@pablichjenkov
Copy link

What about DiffFlow, for the name.

@fvasco
Copy link
Contributor

fvasco commented Oct 12, 2023

I share this use case, but we need three kind of operations: ADD, REMOVE, and UPDATE.
Obviously it is possible to design UPDATE like REMOVE+ADD, however this choice introduces application and performance issues, for example: if I want to deposit money in the bank, I don't want to close my current account and open a new one.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

5 participants