-
Notifications
You must be signed in to change notification settings - Fork 1.9k
ReceiveChannel.asFlow operator #1340
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
I've run up against this a few times, where I know I will only ever have a single collector (so the lack of multicasting is not an issue), but I still want to use the operators (or integrate with an API that requires If such an operator were to be added to the library, I think it would need to be either clearly named and/or clearly documented that multiple concurrent collectors would not get the same items. As long as that is documented, I think it would be fine as long as #1261 lands, since a lot of use cases would probably want to multicast the channel before doing anything else with it. |
* This is a consuming conversion -- the resulting flow can be collected just once and the channel is closed after the first collect. * The implementation is made efficient (without iterators) using a new internal ReceiveChannel.consumeEachTo function which also ensure that the reference to the last emitted value is not retained. * AbstractChannel implementation is optimized to avoid code duplication in different receive methods. Fixes #1340 Fixes #1333
The proposal is to name it
|
* This is a consuming conversion -- the resulting flow can be collected just once and the channel is closed after the first collect. * The implementation is made efficient (without iterators) using a new internal ReceiveChannel.consumeEachTo function which also ensures that the reference to the last emitted value is not retained (does not leak). * AbstractChannel implementation is optimized to avoid code duplication in different receive methods (receive and receiveOrNull) and also shares code with new receiveInternal that is used for an efficient consumeEachTo implementation. Fixes #1340 Fixes #1333
Should
|
@fvasco For |
You are right, @elizarov, |
* This is a consuming conversion -- the resulting flow can be collected just once and the channel is closed after the first collect. * The implementation is made efficient (without iterators) using a new internal ReceiveChannel.consumeEachTo function which also ensures that the reference to the last emitted value is not retained (does not leak). * AbstractChannel implementation is optimized to avoid code duplication in different receive methods (receive and receiveOrNull) and also shares code with new receiveInternal that is used for an efficient consumeEachTo implementation. Fixes #1340 Fixes #1333
* This is a consuming conversion -- the resulting flow can be collected just once and the channel is closed after the first collect. * The implementation is made efficient via emitAll extension. * Experimental FlowCollector.emitAll extension is introduced. * It is based on the (internal) Channel.receiveOrClose and ensures that the reference to the last emitted value is not retained (does not leak). Fixes #1340 Fixes #1333
* This is a consuming conversion -- the resulting flow can be collected just once and the channel is closed after the first collect. * The implementation is made efficient via emitAll extension. * Experimental FlowCollector.emitAll extension is introduced. * It is based on the (internal) Channel.receiveOrClose and ensures that the reference to the last emitted value is not retained (does not leak). Fixes #1340 Fixes #1333
* This is a consuming conversion -- the resulting flow can be collected just once and the channel is closed after the first collect. * The implementation is made efficient via emitAll extension. * Experimental FlowCollector.emitAll extension is introduced. * It is based on the (internal) Channel.receiveOrClose and ensures that the reference to the last emitted value is not retained (does not leak). Fixes #1340 Fixes #1333
A missing piece of the Flow puzzle is a connection between regular
ReceiveChannel
andFlow
(note that we already have a pretty non-obviousReceieveChannel.asFlux
) with a proper migration path for channel operators.For example, all operators on top of the channels are deprecated, but users have no clear migration path from their channel operators sequence to flow.
The main focus here is to provide clear consumption semantics and decide whether we want to introduce such primitive at all
The text was updated successfully, but these errors were encountered: