-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Decouple asFlow from batchSize and move it to buffer instead, promote… #1279
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
… it to experimental
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've optimized implementation by reusing Publisher.openSubscription infrastructure, so that publisher.asFlow().produceIn(scope) now works via a single channel and there is generally less code due to reuse. Also fixed few minor style & doc issues. Please, take a look.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can't formally approve this PR because I am its creator.
The code changes look good, feel free to merge.
But in general, I don't think that it is worth to optimize pretty marginal use-case while losing code readability and its straightforwardness.
Now the implementation of PublisherAsFlow
is much harder to reason about, it exposes more internal API and uses compiler hacks that can stop working at any moment.
I was not looking to optimize this case, but to have a single |
Double checked... that is not the case... |
Anyway, this solution would not work with |
* Fixed the case of Publisher.asFlow().conflate(). * Fixed the cast of Publisher.asFlow().buffer(UNLIMITED) to request Long.MAX_VALUE items as a reactive-streams way to indicate "all" * Use channel.receiveOrNull instead of for loop iteration (it is more efficient) * Calling Publisher.asFlow().produceIn(...) uses a single channel and is implemented via Publisher.openSubscription() * Added tests for conflation and for produceIn cancellation
Ok. Having studied |
@qwwdfsad Please take a look again. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, will squash and merge soon
… it to experimental