Skip to content

Unable to get intended replay configuration with completing shared flows #2890

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
zakhenry opened this issue Aug 21, 2021 · 5 comments
Open

Comments

@zakhenry
Copy link

I have a requirement to have a flow that is converted to a SharedFlow as multiple downstream subscribers consume the result, but the originating flow is particularly expensive so multiple collection is unacceptable.

Using shareIn solves this issue, however a requirement is that the downstream subscribers must also know of the completion status of the original flow. I've implemented a materialize & demateralize function as mentioned in both #2751 & #2034 (comment)

As a side-effect of this error-handling design decision, the SharedFlow never completes. A call collect { ... } on a SharedFlow must be canceled to terminate it. However, if completion is needed, then it can always be materialized by a special emitted value. A collector can apply takeWhile operator to complete the resulting flow when this special value is encountered.

I've wrapped this up in a generalised function shareUntilComplete with the same signature as shareIn, there remains however a significant problem with implementing the replay functionality - the gist of the function body is as follows:

return materialize()
        .shareIn(coroutineScope, started, replay)
        .dematerialize()

The issue is that the completion symbol will count towards the replay allocation, and I can't simply do replay + 1, because a subscriber that joins before completion will end up in one more than the intended replay count from buffer, and if you just do replay directly, then subscribers before completion will get the expected result, but late subscribers would only get an empty completing flow (because the only thing in cache was the completion notification).

Is there a simple solution I'm overlooking here?

@zakhenry
Copy link
Author

Here's a minimal repro playground demonstrating the issue https://pl.kotl.in/tC25RSijl

You can see that without the replay + 1, the late subscriber will only be receiving the completion notification from cache, not the actual value that they would have been interested in, and with the replay + 1, then the "during" subscriber will immediately receive two values on subscription, when they were only interested in one (and the buffer is oversized by 1 too).

@qwwdfsad
Copy link
Collaborator

Thanks for a detailed write-up with a self-contained example. This is definitely a design issue that we have to solve when working on #2092.

Alas, I do not know a simple solution out-of-the-box right now.

@zakhenry
Copy link
Author

I have since come up with an albeit horrible workaround that I believe fulfils the requirements that I am after with a sentinel null value and some buffer trickery. I'm unconvinced this is the cleanest solution, but it passes my tests!

https://pl.kotl.in/ARnUWIxZI

the salient bit:

    /**
     * this awkward chain handles the materialized completion in a special way so that the _replay_ count is correctly
     * preserved. Without it, and a `replay = 1` for example, the completion event would be replayed, but not the
     * last Notification.Result. To side-step this, a sentinel null is emitted _before_ each Notification.Result that
     * would be buffered. When the error or completion result comes through, the oldest sentinel value will be bumped
     * off the buffer, crucially preserving the actual result that wants to be replayed in the buffer, _and_ the new
     * terminating notification. This sentinel value is just stripped out before dematerializing.
     * The downside is that the actual buffer will have every second value as a pointless null, but there is no simpler
     * workaround found yet.
     * @see https://github.com/Kotlin/kotlinx.coroutines/issues/2890
     */
    // set the buffer replay to double the requested replay size to fit the sentinel nulls, prevent overflow.
    val bufferReplayIncludingSentinel = if (replay == 0) 1 else (replay.coerceAtMost(Int.MAX_VALUE/2) * 2)
    return materialize()
        .transform {

            // note the different ordering of emission of the sentinel null for the special replay = 0 case - this is
            // due to the buffer size being 1 for that case, and the sentinel value must be emitted before the
            // notification value in order to correctly empty the buffer of useful values
            if (replay == 0) {
                emit(it)
            }
            if (it is Notification.Result) {
                emit(null)
            }
            if (replay != 0){
                emit(it)
            }
        }
        .shareIn(coroutineScope, started, bufferReplayIncludingSentinel)
        .filterNotNull()
        .dematerialize()

@rossdanderson
Copy link

rossdanderson commented Mar 15, 2022

Thank you for the workaround, I have tried it myself and unfortunately there is still one blocker for us - when setting a stopTimeoutMillis value in sharingStarted, a later subscriber within this time period does not trigger a resubscription to the upstream and simply gets the last replay values and the previous completion/error.
Comparing with Rx, replay(1).refCount(1000, MILLISECONDS) would immediately resets the refCount cooldown and clears the replay buffer when the upstream triggers an error or complete, and I cannot find a Flow equivalent.

Our usecase is:
We want to share expensive streams typically with a replay of 1
We tend to use stopTimeoutMillis to keep the stream around for a little while as, if requested, they are likely to be requested again in the near future and it saves us the set up cost.
These streams may complete - due to some upstream entitlement change for example - or error - due to some unforseen issue or network conditions. Often we would like to communicate the problem to the user and allow them to initiate the retry rather than retrying with a fixed delay or the like. We would rather avoid passing around wrapper objects to replace the existing complete/error semantics - from a memory overhead perspective and code cleanliness one - performing combine across many materialized streams quickly becomes very messy.

An example of what would be desired, using Rx, is

@Test
    fun sharingFlowable(): Unit = runBlocking {
        lateinit var publisher: PublishProcessor<String>

        val refCountedFlowable = Flowable.defer {
            publisher = PublishProcessor.create()
            publisher
        }
            .replay(2)
            .refCount(1000, MILLISECONDS)
            .asFlow()

        refCountedFlowable.test {
            publisher.onNext("A")
            awaitItem() shouldBeEqualTo "A"

            publisher.onNext("B")
            awaitItem() shouldBeEqualTo "B"

            publisher.onNext("C")
            awaitItem() shouldBeEqualTo "C"
        }

        // The replay buffer and upstream subscription still exists as no upstream complete was fired and we are
        // within the refCount cooldown

        refCountedFlowable.test {
            awaitItem() shouldBeEqualTo "B"
            awaitItem() shouldBeEqualTo "C"

            publisher.onNext("D")
            awaitItem() shouldBeEqualTo "D"
        }

        delay(1001)

        // The replay buffer was reset and the upstream subscription cancelled as we had no subscribers for longer than
        // the refCount cooldown

        refCountedFlowable.test {
            expectNoEvents()

            publisher.onNext("D")
            awaitItem() shouldBeEqualTo "D"


            publisher.onComplete()
            awaitComplete()
        }

        // The upstream complete immediately reset the replay buffer and terminated the upstream subscription
        // despite the refCount cooldown, and we've now initiated a new upstream subscription

        refCountedFlowable.test {
            expectNoEvents()

            publisher.onNext("X")
            awaitItem() shouldBeEqualTo "X"

            publisher.onComplete()
            awaitComplete()
        }
    }

Unfortunately I've tried implementing this myself by materializing and using existing operators to recombine, but haven't managed to get anything working yet.

@rossdanderson
Copy link

rossdanderson commented Mar 21, 2022

I did eventually manage to write a version that appears to pass all of our requirements

fun <T : Any?> Flow<T>.shareInCompleting(
    scope: CoroutineScope,
    started: SharingStarted,
    replay: Int,
): Flow<T> {
    val reference = AtomicReference<SharedFlow<Action<T>>>()

    return flow {
        fun compute(): SharedFlow<Action<T>> {
            while (true) {
                val current = reference.get()
                if (current != null) return current
                
                val sharedFlow =  materialize()
                    .transformWhile {
                        emit(it)
                        it is Update
                    }
                    .onCompletion { reference.set(null) }
                    .shareIn(scope, started, replay)
                
                if (reference.compareAndSet(null, sharedFlow)) return sharedFlow
            }
        }

        val flow = compute()

        emitAll(flow.dematerialize())
    }
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants