-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Unable to get intended replay configuration with completing shared flows #2890
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
Here's a minimal repro playground demonstrating the issue https://pl.kotl.in/tC25RSijl You can see that without the |
Thanks for a detailed write-up with a self-contained example. This is definitely a design issue that we have to solve when working on #2092. Alas, I do not know a simple solution out-of-the-box right now. |
I have since come up with an albeit horrible workaround that I believe fulfils the requirements that I am after with a sentinel null value and some buffer trickery. I'm unconvinced this is the cleanest solution, but it passes my tests! the salient bit: /**
* this awkward chain handles the materialized completion in a special way so that the _replay_ count is correctly
* preserved. Without it, and a `replay = 1` for example, the completion event would be replayed, but not the
* last Notification.Result. To side-step this, a sentinel null is emitted _before_ each Notification.Result that
* would be buffered. When the error or completion result comes through, the oldest sentinel value will be bumped
* off the buffer, crucially preserving the actual result that wants to be replayed in the buffer, _and_ the new
* terminating notification. This sentinel value is just stripped out before dematerializing.
* The downside is that the actual buffer will have every second value as a pointless null, but there is no simpler
* workaround found yet.
* @see https://github.com/Kotlin/kotlinx.coroutines/issues/2890
*/
// set the buffer replay to double the requested replay size to fit the sentinel nulls, prevent overflow.
val bufferReplayIncludingSentinel = if (replay == 0) 1 else (replay.coerceAtMost(Int.MAX_VALUE/2) * 2)
return materialize()
.transform {
// note the different ordering of emission of the sentinel null for the special replay = 0 case - this is
// due to the buffer size being 1 for that case, and the sentinel value must be emitted before the
// notification value in order to correctly empty the buffer of useful values
if (replay == 0) {
emit(it)
}
if (it is Notification.Result) {
emit(null)
}
if (replay != 0){
emit(it)
}
}
.shareIn(coroutineScope, started, bufferReplayIncludingSentinel)
.filterNotNull()
.dematerialize() |
Thank you for the workaround, I have tried it myself and unfortunately there is still one blocker for us - when setting a Our usecase is: An example of what would be desired, using Rx, is @Test
fun sharingFlowable(): Unit = runBlocking {
lateinit var publisher: PublishProcessor<String>
val refCountedFlowable = Flowable.defer {
publisher = PublishProcessor.create()
publisher
}
.replay(2)
.refCount(1000, MILLISECONDS)
.asFlow()
refCountedFlowable.test {
publisher.onNext("A")
awaitItem() shouldBeEqualTo "A"
publisher.onNext("B")
awaitItem() shouldBeEqualTo "B"
publisher.onNext("C")
awaitItem() shouldBeEqualTo "C"
}
// The replay buffer and upstream subscription still exists as no upstream complete was fired and we are
// within the refCount cooldown
refCountedFlowable.test {
awaitItem() shouldBeEqualTo "B"
awaitItem() shouldBeEqualTo "C"
publisher.onNext("D")
awaitItem() shouldBeEqualTo "D"
}
delay(1001)
// The replay buffer was reset and the upstream subscription cancelled as we had no subscribers for longer than
// the refCount cooldown
refCountedFlowable.test {
expectNoEvents()
publisher.onNext("D")
awaitItem() shouldBeEqualTo "D"
publisher.onComplete()
awaitComplete()
}
// The upstream complete immediately reset the replay buffer and terminated the upstream subscription
// despite the refCount cooldown, and we've now initiated a new upstream subscription
refCountedFlowable.test {
expectNoEvents()
publisher.onNext("X")
awaitItem() shouldBeEqualTo "X"
publisher.onComplete()
awaitComplete()
}
} Unfortunately I've tried implementing this myself by materializing and using existing operators to recombine, but haven't managed to get anything working yet. |
I did eventually manage to write a version that appears to pass all of our requirements fun <T : Any?> Flow<T>.shareInCompleting(
scope: CoroutineScope,
started: SharingStarted,
replay: Int,
): Flow<T> {
val reference = AtomicReference<SharedFlow<Action<T>>>()
return flow {
fun compute(): SharedFlow<Action<T>> {
while (true) {
val current = reference.get()
if (current != null) return current
val sharedFlow = materialize()
.transformWhile {
emit(it)
it is Update
}
.onCompletion { reference.set(null) }
.shareIn(scope, started, replay)
if (reference.compareAndSet(null, sharedFlow)) return sharedFlow
}
}
val flow = compute()
emitAll(flow.dematerialize())
}
} |
I have a requirement to have a flow that is converted to a
SharedFlow
as multiple downstream subscribers consume the result, but the originating flow is particularly expensive so multiple collection is unacceptable.Using
shareIn
solves this issue, however a requirement is that the downstream subscribers must also know of the completion status of the original flow. I've implemented amaterialize
&demateralize
function as mentioned in both #2751 & #2034 (comment)I've wrapped this up in a generalised function
shareUntilComplete
with the same signature asshareIn
, there remains however a significant problem with implementing thereplay
functionality - the gist of the function body is as follows:return materialize() .shareIn(coroutineScope, started, replay) .dematerialize()
The issue is that the completion symbol will count towards the
replay
allocation, and I can't simply doreplay + 1
, because a subscriber that joins before completion will end up in one more than the intended replay count from buffer, and if you just doreplay
directly, then subscribers before completion will get the expected result, but late subscribers would only get an empty completing flow (because the only thing in cache was the completion notification).Is there a simple solution I'm overlooking here?
The text was updated successfully, but these errors were encountered: