Skip to content

Fail-fast when trying to consume a channel multiple times #167

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
elizarov opened this issue Nov 20, 2017 · 13 comments
Closed

Fail-fast when trying to consume a channel multiple times #167

elizarov opened this issue Nov 20, 2017 · 13 comments

Comments

@elizarov
Copy link
Contributor

Channels are designed to be used in fan-out fashion or in a pipeline fashion. Pipeline is conveniently performed with various operators like filter, map, etc (see PR #88) which are modeled after a Sequence. However, a sequence is a multi-shot abstraction, for the following is perfectly file:

val s: Sequence<T> = ...
s.filter { ... }.forEach { it.doSomething() }
s.map { ... }.forEach { it.doSomethingElse() }

However, a similar code will not work in the same way with a ReceiveChannel. ReceiveChannel is more like Iterator, than it is like a Sequence. Each element of the ReceiveChannel can be consumed only once. It is expected that this might be a common source of errors and misunderstanding, so the proposal is to make all the standard operators "fail-fast" when the channel is already being consumed by another operator. So, in the above example, if the types of s was to be replaced with ReceiveChannel<T>, then the invocation of s.map (which comes after s.filter) shall fail immediately.

P.S. It is a separate issue whether a truly Sequence-like (Rx-like) higher-order abstraction shall be introduced for channels.

@elizarov elizarov changed the title Fail-fast when trying to consume a channel multiple-times. Fail-fast when trying to consume a channel multiple times Nov 20, 2017
@ZakTaccardi
Copy link

just want to confirm that the following would still work:

val channel: ReceiveChannel<T> = ...

channel.filter { .. }
    .map { .. }
    .consumeEach { .. }

@bdavisx
Copy link
Contributor

bdavisx commented Jun 21, 2018

I'm confused by the description, will consumeEach only be able to be called once? If so how would you do a fan-out?

@elizarov
Copy link
Contributor Author

@bdavisx Thank you for observation. This indeed can be confusing and we should think on how to make it less confusing. They idea is that for fan-out you don't consume the channel. Instead, you simply invoke receive() to receive elements from it as you need.

@bdavisx
Copy link
Contributor

bdavisx commented Jun 21, 2018

Thanks and yes, very confusing because the current guide/examples fan out section doesn't use receive(), it uses multiple calls to channel.consumeEach().

@bdavisx
Copy link
Contributor

bdavisx commented Jun 22, 2018

I changed the doc to use an iterator instead of consumeEach() - and in the end it looks like the iterator uses receive(). I think this is a better example, but I'm not sure.

#406

@jcornaz
Copy link
Contributor

jcornaz commented Jun 26, 2018

Will a forEach extension function be provided in order to easily iterate over all elements, but not consume the channel?

@qwwdfsad
Copy link
Collaborator

qwwdfsad commented Jun 26, 2018

@jcornaz probably no, because it will be an additional source of confusion.
Users (who didn't read the documentation carefully) may start to pick "random" method. because it's not clear from the name whether consumeEach and forEach have any semantic difference.

Usually you need consumeEach, so it's a reasonable "default" for the library

@jcornaz
Copy link
Contributor

jcornaz commented Jun 26, 2018

Isn't the confusion already here? I don't think it is particularity clearer that for(elt in channel) has semantic difference with channel.consumeEach.

However I agree that user may see the forEach in the auto-completion and use it without understanding the danger of not consuming the channel...

@elizarov
Copy link
Contributor Author

elizarov commented Sep 7, 2018

The difference between "consuming" and "iterating in fan-out fashion" is indeed confusing and more design effort is needed to make it more explicit.

@dave08
Copy link

dave08 commented Sep 7, 2018

In regular collections we're all used to forEach being terminal and onEach not, why have consume in the first place and not reuse familiar concepts (apart from the suspending nature of consume which is anyways in the function's signature...)? Or maybe there's another aspect here that I didn't get?

@elizarov
Copy link
Contributor Author

elizarov commented Sep 7, 2018

Initially consumeEach was called forEach, but we had to rename it for better integration with Rx where there is already a forEach member which accepts non suspending code block which means that you cannot easily do nice Kotlin coroutines stuff there, e.g. this does not work:

observable.forEach {
    delay(1000)
    println("Received $it")
}

But this works:

observable.consumeEach {
    delay(1000)
    println("Received $it")
}

We can revert this decision, rename it back to forEach, and find a different name for extensions on various reactive classes and interfaces.

@efemoney
Copy link

efemoney commented Mar 4, 2019

How about a receiveEach to augment consumeEach?. That might solidify the mental model of consume vs receive. Personally would've prefer forEach and onEach but they are taken like @elizarov says.

@qwwdfsad
Copy link
Collaborator

Status update: won't be fixed.

There are valid use-cases with a fan-out style (thus concurrent consumeEach calls) and this change will add a subtle difference between iteration and consumption.
Additionally, tracking a closed state is a way too intrusive change for channels

qwwdfsad added a commit that referenced this issue May 20, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

7 participants