Skip to content

Decouple asFlow from batchSize and move it to buffer instead, promote… #1279

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jun 26, 2019

Conversation

qwwdfsad
Copy link
Member

… it to experimental

@qwwdfsad qwwdfsad requested a review from elizarov June 19, 2019 13:03
@qwwdfsad qwwdfsad marked this pull request as ready for review June 19, 2019 16:02
Copy link
Contributor

@elizarov elizarov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've optimized implementation by reusing Publisher.openSubscription infrastructure, so that publisher.asFlow().produceIn(scope) now works via a single channel and there is generally less code due to reuse. Also fixed few minor style & doc issues. Please, take a look.

Copy link
Member Author

@qwwdfsad qwwdfsad left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't formally approve this PR because I am its creator.

The code changes look good, feel free to merge.
But in general, I don't think that it is worth to optimize pretty marginal use-case while losing code readability and its straightforwardness.
Now the implementation of PublisherAsFlow is much harder to reason about, it exposes more internal API and uses compiler hacks that can stop working at any moment.

@elizarov
Copy link
Contributor

elizarov commented Jun 20, 2019

I was not looking to optimize this case, but to have a single Subscriber implementation. They are hard to get right, so having just one such implementation is easier it terms of testing and future maintenance. However, Subscriber implementation that openSubscription has (which I've used) is definitely more complex. Moreover, it has different behavior. It uses "batch size" only for initial call to request but then just makes constant request calls just for one element in the future. That is less efficient than the original Subscriber implementation in this PR and may worth a look at.

@elizarov
Copy link
Contributor

Double checked... that is not the case... SubscriptionChannel has similar batch-requesting logic (requests full batch when it is over), but it has weird behavior with respect to batch. I'll looking at how valid it is.

@elizarov
Copy link
Contributor

Anyway, this solution would not work with publisher.asFlow().conflate(). Needs to be fixed.

* Fixed the case of Publisher.asFlow().conflate().
* Fixed the cast of Publisher.asFlow().buffer(UNLIMITED) to request
  Long.MAX_VALUE items as a reactive-streams way to indicate "all"
* Use channel.receiveOrNull instead of for loop iteration
  (it is more efficient)
* Calling Publisher.asFlow().produceIn(...) uses a single channel
  and is implemented via Publisher.openSubscription()
* Added tests for conflation and for produceIn cancellation
@elizarov
Copy link
Contributor

elizarov commented Jun 20, 2019

Ok. Having studied openSubscription in detail I've reverted to mostly original implementation of the subscriber for publisher.asFlow() because it is slightly more efficient. I've also fixed support for conflate() and properly integrated with reactive contract for "requests everything" (shall be Long.MAX_VALUE!). Added tests for various case.

@elizarov
Copy link
Contributor

@qwwdfsad Please take a look again.

Copy link
Member Author

@qwwdfsad qwwdfsad left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, will squash and merge soon

@qwwdfsad qwwdfsad merged commit 502610e into develop Jun 26, 2019
@qwwdfsad qwwdfsad deleted the as-flow-buffer branch June 26, 2019 09:09
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants