-
Notifications
You must be signed in to change notification settings - Fork 534
Allow request-coordinating Processor implementations to pass the TCK #414
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Allow request-coordinating Processor implementations to pass the TCK #414
Conversation
* request amounts and only delivers onNext signals if all Subscribers have | ||
* indicated (via their Subscription#request(long)) they are ready to receive elements. | ||
*/ | ||
public boolean doesCoordinatedEmission() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since most devs probably won't ever notice this method, or know that they should enable/disable it—what do you think about including a hint about it in case the tests which use it fail?
(Sorry, this comment was on the wrong line before)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Documenting it more in the README of the TCK could be a good step?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can add some description to it in a couple of hours.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
sub1.request(1); | ||
sub2.request(1); | ||
|
||
sendError(ex); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps put sendError(ex);
after the branch (since it is the last statement in each "leg" of the branch.
expectNextElement(sub1, z); | ||
expectNextElement(sub2, z); | ||
|
||
if (totalRequests == 3) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this is the same behavior on each side of the branch, perhaps put it after the branch?
* <p> | ||
* The test is not executed if {@link IdentityProcessorVerification#maxSupportedSubscribers()} is less than 2. | ||
* <p> | ||
* If this test fails, the following could be checked within the {@code Publisher} implementation: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we wanted to include a hint about doesCoordinatedEmission
either when running the test case, or when the testcase fails, what would you propose?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could introduce an overload to expectNextElement
that takes an error message that hints about that method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That sounds very good! (the hint override)
expectNextElement(sub2, x); | ||
|
||
sub1.request(1); | ||
sub2.request(1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't expectRequest()
work after these two?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it should. I'll update.
final T z = sendNextTFromUpstream(); | ||
expectNextElement(sub1, z, | ||
"If the Processor coordinates requests/emissions when having multiple Subscribers" | ||
+ " at once, please override doesCoordinatedEmission() in this " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"please override doesCoordinatedEmission() to return true
in this"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated.
I do want to review this, please don't merge yet. I want to review the thing in depth to be able to reply more precisely though |
@ktoso Are you conflating "coordinated omission" with "coordinated emission"? In any case, perhaps there's a better term, I'm all for improvements when it comes to names :) |
Not conflating, but misreading what was in the PR it seems 😉 Anyway, want to review this but completely under-water today, hope to get to it tomorrow |
@ktoso No worries! :-) |
if (!elem.equals(expected)) { | ||
env.flop(String.format("Received `onNext(%s)` on downstream but expected `onNext(%s)`", elem, expected)); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice 👍
@ktoso @reactive-streams/contributors Ready for merge? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good as far as I can see, thanks for implementing it @akarnokd 👍
VERY nice, @akarnokd! Merging! |
This PR fixes the TCK to support request-coordinating
Processor
implementations. These kinds ofProcessor
s may eitherSubscriber
s and request only from upstream when allSubscriber
s requested something; orSubscriber
s have requested something.From the downstream
Subscriber
s' perspective, both manifest as lack of emissions.The
IdentityProcessorVerification.doesCoordinatedEmission()
added will affectrequired_spec104_mustCallOnErrorOnAllItsSubscribersIfItEncountersANonRecoverableError
required_mustRequestFromUpstreamForElementsThatHaveBeenRequestedLongAgo
By adjusting the test request pattern to request on both
Subscriber
s before asserting on events received.The
LockstepProcessorTest
contains theLockstepProcessor
implementation which coordinates emissions when there are multipleSubscriber
s.Replaces #287.
Original issue #284.