Skip to content

Allow request-coordinating Processor implementations to pass the TCK #414

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Nov 30, 2017

Conversation

akarnokd
Copy link
Contributor

@akarnokd akarnokd commented Nov 23, 2017

This PR fixes the TCK to support request-coordinating Processor implementations. These kinds of Processors may either

  • coordinate the request amounts of their Subscribers and request only from upstream when all Subscribers requested something; or
  • coordinate emissions, requesting a bounded amount upfront from the upstream and then emitting only when all Subscribers have requested something.

From the downstream Subscribers' perspective, both manifest as lack of emissions.

The IdentityProcessorVerification.doesCoordinatedEmission() added will affect

  • required_spec104_mustCallOnErrorOnAllItsSubscribersIfItEncountersANonRecoverableError
  • required_mustRequestFromUpstreamForElementsThatHaveBeenRequestedLongAgo

By adjusting the test request pattern to request on both Subscribers before asserting on events received.

The LockstepProcessorTest contains the LockstepProcessor implementation which coordinates emissions when there are multiple Subscribers.

Replaces #287.
Original issue #284.

* request amounts and only delivers onNext signals if all Subscribers have
* indicated (via their Subscription#request(long)) they are ready to receive elements.
*/
public boolean doesCoordinatedEmission() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since most devs probably won't ever notice this method, or know that they should enable/disable it—what do you think about including a hint about it in case the tests which use it fail?

(Sorry, this comment was on the wrong line before)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Documenting it more in the README of the TCK could be a good step?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can add some description to it in a couple of hours.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

sub1.request(1);
sub2.request(1);

sendError(ex);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps put sendError(ex); after the branch (since it is the last statement in each "leg" of the branch.

expectNextElement(sub1, z);
expectNextElement(sub2, z);

if (totalRequests == 3) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is the same behavior on each side of the branch, perhaps put it after the branch?

* <p>
* The test is not executed if {@link IdentityProcessorVerification#maxSupportedSubscribers()} is less than 2.
* <p>
* If this test fails, the following could be checked within the {@code Publisher} implementation:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we wanted to include a hint about doesCoordinatedEmission either when running the test case, or when the testcase fails, what would you propose?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could introduce an overload to expectNextElement that takes an error message that hints about that method.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That sounds very good! (the hint override)

expectNextElement(sub2, x);

sub1.request(1);
sub2.request(1);
Copy link
Contributor

@viktorklang viktorklang Nov 27, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't expectRequest() work after these two?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it should. I'll update.

@viktorklang
Copy link
Contributor

@akarnokd Thanks for taking a stab at this issue!

@ktoso, @reactive-streams/contributors Anyone else have feedback on this?

final T z = sendNextTFromUpstream();
expectNextElement(sub1, z,
"If the Processor coordinates requests/emissions when having multiple Subscribers"
+ " at once, please override doesCoordinatedEmission() in this "
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"please override doesCoordinatedEmission() to return true in this"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated.

@ktoso
Copy link
Contributor

ktoso commented Nov 27, 2017

I do want to review this, please don't merge yet.
Esp. I'm worried about the use of coordinated omission -- I don't think this is an instance of that, and the term is used a bit misleadingly. Coordinated omission is about a measurement problem where the measuring-party is not measuring the real latencies due to backpressure from stressed system, I don't think that a Processor that handles many subscribers sis the same thing.

I want to review the thing in depth to be able to reply more precisely though

@viktorklang
Copy link
Contributor

@ktoso Are you conflating "coordinated omission" with "coordinated emission"?

In any case, perhaps there's a better term, I'm all for improvements when it comes to names :)

@ktoso
Copy link
Contributor

ktoso commented Nov 27, 2017

Not conflating, but misreading what was in the PR it seems 😉
Yeah emission is fine, but perhaps a bit too close to that one...

Anyway, want to review this but completely under-water today, hope to get to it tomorrow

@viktorklang
Copy link
Contributor

@ktoso No worries! :-)

if (!elem.equals(expected)) {
env.flop(String.format("Received `onNext(%s)` on downstream but expected `onNext(%s)`", elem, expected));
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice 👍

@viktorklang
Copy link
Contributor

@ktoso @reactive-streams/contributors Ready for merge?

Copy link
Contributor

@ktoso ktoso left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good as far as I can see, thanks for implementing it @akarnokd 👍

@viktorklang
Copy link
Contributor

VERY nice, @akarnokd! Merging!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants