-
Notifications
You must be signed in to change notification settings - Fork 1.9k
No way to check if there is a queued/buffered item awaiting in Flow #2193
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
I did actually find a way around this by changing the code to:
not sure how ideal this is as now we change the flow to a channel, or if that has some other implications for the rest of the application, but it does work in my initial tests. Would be great however to hear if that is the best way of if there is some way to achieve this with pure flow. |
I just tested this implementation with the But no matter what, it would be really great if this issue could be solved some way. |
Few notes about your code:
I think this code is roughly equivalent to what you wrote: @ExperimentalCoroutinesApi
fun <T> Flow<T : Any>.bufferedChunks(maxChunkSize: Int): Flow<List<T>> = callbackFlow<List<T>> {
require(maxChunkSize >= 1) {
"Max chunk size should be greater than 0 but was $maxChunkSize"
}
val bufferChunks = ArrayList<T>(maxChunkSize)
val upstreamChannel = buffer(1).produceIn(this)
val downstreamChannel = this
while (!upstreamChannel.isClosedForReceive) {
// Buffer is full, don't process any more upstream emissions until the
// buffer has been emitted.
if (bufferChunks.size >= maxChunkSize) {
downstreamChannel.send(bufferChunks.toList())
bufferChunks.clear()
}
// Wait for new upstream emissions, but also try to send any buffered
// items.
select {
upstreamChannel.onReceiveOrNull {
// Null means the upstream completed while we were suspended, and
// the loop will terminate after this. Note that if you need T to
// be nullable, you'll need to wrap your upstream values in some
// sort of additional value to distinguish between actual null
// values and the close sentinel. Hopefully there will eventually
// be an onReceiveOrClosed method that makes this simpler.
if (it != null) {
bufferChunks += it
}
}
if (bufferChunks.isNotEmpty()) {
downstreamChannel.onSend(bufferChunks.toList()) {
bufferChunks.clear()
}
}
}
}
// After upstream completes, flush any remaining items.
if (bufferChunks.isNotEmpty()) send(buffer.toList())
}.buffer(0) That said, I think you could also write this with only one channel: @ExperimentalCoroutinesApi
fun <T> Flow<T>.bufferedChunks(maxChunkSize: Int): Flow<List<T>> = callbackFlow<List<T>> {
coroutineScope{
require(maxChunkSize >= 1) {
"Max chunk size should be greater than 0 but was $maxChunkSize"
}
val bufferChunks = ArrayList<T>(maxChunkSize)
val downstreamChannel = this@callbackFlow
var sendJob: Job? = null
collect { item ->
// Cancel the send job in case the downstream is slow.
// Need to join on the job to synchronize access to the buffer.
sendJob?.cancelAndJoin()
bufferChunks += item
// Cache the full status of the buffer, since it's not safe to access
// the buffer after launching the below coroutine until the coroutine has
// completed.
val isBufferFull = bufferChunks.size >= maxChunkSize
// Launch a coroutine to send, so we can still accept more upstream emissions.
sendJob = launch {
// Potentially executing on different thread, but not racy since
// the main coroutine will not touch the buffer until this job has
// completed.
downstreamChannel.send(bufferChunks.toList())
// Send has atomic cancellation - if the send succeeds, it will
// not throw a CancellationException even if the job was cancelled.
bufferChunks.clear()
}
if (isBufferFull) {
// Don't process any more upstream emissions until the
// buffer has been emitted.
sendJob!!.join()
}
}
}
}.buffer(0) |
@zach-klippenstein thanks for the comments/improvements and possible solutions. I started to think of a similar solution to number 2 now as well so that is great to see that I was on the right track there. I wonder if this use-case could be made simpler than this solution, but otherwise we do have an answer now and if the API is not likely to change this issue could also be closed. |
@zach-klippenstein It is unsettling to see a solution that relies on atomic cancellation in |
Not a huge fan of that myself, even without the future plans. It's a little too "magic". Atomic cancellation is load bearing but very subtle and not obvious unless you read all the docs. |
@elizarov I am now making use of a solution very similiar to the latest solution suggested by @zach-klippenstein, i.e. the one with the |
You don't need two channels nor the select expression for this case. Performance will be much higher. You can use the following approach for natural batching:
|
@Globegitter Did you end up using a solution from @elizarov? How is your experience? What about performance? |
@pacher Not yet. My work is project-based and the bug I described has not been high priority enough so I have currently been assigned to a different project. I do expect to come back to it in the future however. |
I think this method does not do buffering, I was trying it out in a project and the elements in the flow would not get buffered, not sure if someone else has seen that. If I get a chance I will try to elaborate more but at least that was my experience copying that method verbatim |
@andguevara It is not buffering, it is batching. If your collector processes elements fast enough there will be no buffering. Try to slow it down with some |
I have been implementing a natural buffering operator as discussed in #902 and just discovered an edge case issue.
The code we have is very similar to what I posted in the other issue:
This has been working very well for us, but the issue I now uncovered is: Given a buffer that is not full, if I offer the buffer but the downstream consumer is busy the code here will then wait until it is able to collect an additional element before offering again to the downstream consumer, even though it may take a long time until I get the additional element and the downstream consumer might be free in the meantime. So is there a way to check if there is a queued item so I could offer the incomplete buffer until I can collect a new item? So ideally my code could now look something like that:
where
hasNoItemsQueued
would be a variable in the scope provided by thecollect
method. This is btw just for illustration purposes and in the end I do not care how this would be implemented.I have been able to get around this in a quite hacky/complicated way (just for a prototype), using an onEach that runs before the
collect
in a different context and sets a shared variable. But that does feel quite ugly and I am not even sure if that approach is free of issues.The text was updated successfully, but these errors were encountered: