Skip to content

Channel onUndeliveredElement doesn't work #3330

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
artemkaxboy opened this issue Jun 20, 2022 · 6 comments
Closed

Channel onUndeliveredElement doesn't work #3330

artemkaxboy opened this issue Jun 20, 2022 · 6 comments

Comments

@artemkaxboy
Copy link

I wanted to use channel with buffer but was disappointed to find out that it doesn't work as expected.

When I use a channel with capacity = 1, like this:

val capacity = 1

val channel = Channel<Int>(
    capacity = capacity,
    onBufferOverflow = BufferOverflow.DROP_OLDEST,
    onUndeliveredElement = { value -> println("Dropped value: $value") }
)

runBlocking {
    (1..3).forEach { value ->
        channel.send(value)
        println("Sent value: $value")
    }
}

It works perfectly as I want, output:

Sent value: 1
Dropped value: 1
Sent value: 2
Dropped value: 2
Sent value: 3

So I expected the same behavior with greater capactity = 2:

val capacity = 2

val channel = Channel<Int>(
    capacity = capacity,
    onBufferOverflow = BufferOverflow.DROP_OLDEST,
    onUndeliveredElement = { value -> println("Dropped value: $value") }
)

runBlocking {
    (1..3).forEach { value ->
        channel.send(value)
        println("Sent value: $value")
    }
}

But onUndeliveredElement never called in that case

Sent value: 1
Sent value: 2
Sent value: 3

BufferOverflow.DROP_LATEST doesn't work either.

@qwwdfsad
Copy link
Collaborator

This is indeed a bug, but it seems to be only fixable in 1.7.0.

The story is the following:

  • BufferOverflow doesn't acknowledge onUndeliveredElement in any channels at all, except CONFLATED
  • Channel() function has an internal optimization -- if capacity is 1 and strategy is DROP_LATEST, it picks CONFLATED as an underlying implementation because it's semantically the same.

So we have to fix all channels except CONFLATED which potentially can be considered as breaking change for most of the channels

@qwwdfsad
Copy link
Collaborator

While we're evaluating it, could you folks please describe your use case for this?

@sakex
Copy link

sakex commented Jun 22, 2022

My use case from #3327 is that I am using a bounded channel in a thread that receives some data, processes it, then sends it over the network. Sometimes, the network takes longer than expected so the channel reaches max capacity and the BufferOverflow cleans the oldest data. I would like to be able to monitor the number of messages that are dropped for analysis and log the lost data.

Edit: I'm wondering if it would be better in a callback or if the send call should return the dismissed element

@artemkaxboy
Copy link
Author

We have almost the same use case. We receive some data from the network and send items to a channel while the background worker processes them. When we have too many messages we'd like to drop the oldest. Planned to use callback for metrics and logging.

@qwwdfsad
Copy link
Collaborator

Thanks, folks!

That's not the use-cases we expected when designing this API, but they are definitely the ones we'd like to support

qwwdfsad added a commit that referenced this issue Jan 25, 2023
Marked as "fixes 3330" to automatically close the issue yet it's the original PR that fixed it

Fixes #3330
@qwwdfsad
Copy link
Collaborator

qwwdfsad commented Mar 9, 2023

Fixed by #3621

@qwwdfsad qwwdfsad closed this as completed Mar 9, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants