Skip to content

Allow request-coordinating Processor implementations to pass the TCK #414

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Nov 30, 2017
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,16 @@ public long maxSupportedSubscribers() {
return Long.MAX_VALUE;
}

/**
* Override this method and return {@code true} if the {@link Processor} returned by the
* {@link #createIdentityProcessor(int)} coordinates its {@link Subscriber}s
* request amounts and only delivers onNext signals if all Subscribers have
* indicated (via their Subscription#request(long)) they are ready to receive elements.
*/
public boolean doesCoordinatedEmission() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since most devs probably won't ever notice this method, or know that they should enable/disable it—what do you think about including a hint about it in case the tests which use it fail?

(Sorry, this comment was on the wrong line before)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Documenting it more in the README of the TCK could be a good step?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can add some description to it in a couple of hours.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

return false;
}

////////////////////// TEST ENV CLEANUP /////////////////////////////////////

@BeforeMethod
Expand Down Expand Up @@ -401,8 +411,27 @@ public void required_spec317_mustNotSignalOnErrorWhenPendingAboveLongMaxValue()
publisherVerification.required_spec317_mustNotSignalOnErrorWhenPendingAboveLongMaxValue();
}

// Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#1.4
// for multiple subscribers

/**
* Asks for a {@code Processor} that supports at least 2 {@code Subscriber}s at once and checks if two {@code Subscriber}s
* receive the same items and a terminal {@code Exception}.
* <p>
* If the {@code Processor} requests and/or emits items only when all of its {@code Subscriber}s have requested,
* override {@link #doesCoordinatedEmission()} and return {@code true} to indicate this property.
* <p>
* <b>Verifies rule:</b> <a href='https://github.com/reactive-streams/reactive-streams-jvm#1.4'>1.4</a> with multiple
* {@code Subscriber}s.
* <p>
* The test is not executed if {@link IdentityProcessorVerification#maxSupportedSubscribers()} is less than 2.
* <p>
* If this test fails, the following could be checked within the {@code Publisher} implementation:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we wanted to include a hint about doesCoordinatedEmission either when running the test case, or when the testcase fails, what would you propose?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could introduce an overload to expectNextElement that takes an error message that hints about that method.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That sounds very good! (the hint override)

* <ul>
* <li>The {@code TestEnvironment} has large enough timeout specified in case the {@code Processor} has some time-delay behavior.</li>
* <li>The {@code Processor} is able to fulfill requests of its {@code Subscriber}s independently of each other's requests or
* else override {@link #doesCoordinatedEmission()} and return {@code true} to indicate the test {@code Subscriber}s
* both have to request first.</li>
* </ul>
*/
@Test
public void required_spec104_mustCallOnErrorOnAllItsSubscribersIfItEncountersANonRecoverableError() throws Throwable {
optionalMultipleSubscribersTest(2, new Function<Long,TestSetup>() {
Expand All @@ -415,17 +444,42 @@ public TestSetup apply(Long aLong) throws Throwable {
final ManualSubscriberWithErrorCollection<T> sub2 = new ManualSubscriberWithErrorCollection<T>(env);
env.subscribe(processor, sub2);

sub1.request(1);
expectRequest();
final T x = sendNextTFromUpstream();
expectNextElement(sub1, x);
sub1.request(1);
final Exception ex = new RuntimeException("Test exception");

// sub1 has received one element, and has one demand pending
// sub2 has not yet requested anything
if (doesCoordinatedEmission()) {
sub1.request(1);
sub2.request(1);

final Exception ex = new RuntimeException("Test exception");
expectRequest();

final T x = sendNextTFromUpstream();

expectNextElement(sub1, x);
expectNextElement(sub2, x);

sub1.request(1);
sub2.request(1);
Copy link
Contributor

@viktorklang viktorklang Nov 27, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't expectRequest() work after these two?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it should. I'll update.

} else {
sub1.request(1);

expectRequest(env.defaultTimeoutMillis(),
"If the Processor coordinates requests/emissions when having multiple Subscribers"
+ " at once, please override doesCoordinatedEmission() to return true in this "
+ "IdentityProcessorVerification to allow this test to pass.");

final T x = sendNextTFromUpstream();
expectNextElement(sub1, x,
"If the Processor coordinates requests/emissions when having multiple Subscribers"
+ " at once, please override doesCoordinatedEmission() to return true in this "
+ "IdentityProcessorVerification to allow this test to pass.");

sub1.request(1);

// sub1 has received one element, and has one demand pending
// sub2 has not yet requested anything
}
sendError(ex);

sub1.expectError(ex);
sub2.expectError(ex);

Expand Down Expand Up @@ -643,8 +697,26 @@ public void untested_spec316_requestMustNotThrowExceptionAndMustOnErrorTheSubscr

/////////////////////// ADDITIONAL "COROLLARY" TESTS //////////////////////

// A Processor
// must trigger `requestFromUpstream` for elements that have been requested 'long ago'
/**
* Asks for a {@code Processor} that supports at least 2 {@code Subscriber}s at once and checks requests
* from {@code Subscriber}s will eventually lead to requests towards the upstream of the {@code Processor}.
* <p>
* If the {@code Processor} requests and/or emits items only when all of its {@code Subscriber}s have requested,
* override {@link #doesCoordinatedEmission()} and return {@code true} to indicate this property.
* <p>
* <b>Verifies rule:</b> <a href='https://github.com/reactive-streams/reactive-streams-jvm#1.4'>2.1</a> with multiple
* {@code Subscriber}s.
* <p>
* The test is not executed if {@link IdentityProcessorVerification#maxSupportedSubscribers()} is less than 2.
* <p>
* If this test fails, the following could be checked within the {@code Publisher} implementation:
* <ul>
* <li>The {@code TestEnvironment} has large enough timeout specified in case the {@code Processor} has some time-delay behavior.</li>
* <li>The {@code Processor} is able to fulfill requests of its {@code Subscriber}s independently of each other's requests or
* else override {@link #doesCoordinatedEmission()} and return {@code true} to indicate the test {@code Subscriber}s
* both have to request first.</li>
* </ul>
*/
@Test
public void required_mustRequestFromUpstreamForElementsThatHaveBeenRequestedLongAgo() throws Throwable {
optionalMultipleSubscribersTest(2, new Function<Long,TestSetup>() {
Expand Down Expand Up @@ -673,13 +745,25 @@ public TestSetup apply(Long subscribers) throws Throwable {
// sub1 now has 18 pending
// sub2 has 0 pending

final T z = sendNextTFromUpstream();
expectNextElement(sub1, z);
sub2.expectNone(); // since sub2 hasn't requested anything yet
if (doesCoordinatedEmission()) {
sub2.expectNone(); // since sub2 hasn't requested anything yet

sub2.request(1);

sub2.request(1);
expectNextElement(sub2, z);
final T z = sendNextTFromUpstream();
expectNextElement(sub1, z);
expectNextElement(sub2, z);
} else {
final T z = sendNextTFromUpstream();
expectNextElement(sub1, z,
"If the Processor coordinates requests/emissions when having multiple Subscribers"
+ " at once, please override doesCoordinatedEmission() to return true in this "
+ "IdentityProcessorVerification to allow this test to pass.");
sub2.expectNone(); // since sub2 hasn't requested anything yet

sub2.request(1);
expectNextElement(sub2, z);
}
if (totalRequests == 3) {
expectRequest();
}
Expand Down Expand Up @@ -748,6 +832,13 @@ public void expectNextElement(ManualSubscriber<T> sub, T expected) throws Interr
}
}

public void expectNextElement(ManualSubscriber<T> sub, T expected, String errorMessageAddendum) throws InterruptedException {
final T elem = sub.nextElement(String.format("timeout while awaiting %s. %s", expected, errorMessageAddendum));
if (!elem.equals(expected)) {
env.flop(String.format("Received `onNext(%s)` on downstream but expected `onNext(%s)`", elem, expected));
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice 👍


public T sendNextTFromUpstream() throws InterruptedException {
final T x = nextT();
sendNext(x);
Expand Down
11 changes: 11 additions & 0 deletions tck/src/main/java/org/reactivestreams/tck/TestEnvironment.java
Original file line number Diff line number Diff line change
Expand Up @@ -763,6 +763,17 @@ public long expectRequest(long timeoutMillis) throws InterruptedException {
}
}


public long expectRequest(long timeoutMillis, String errorMessageAddendum) throws InterruptedException {
long requested = requests.next(timeoutMillis, String.format("Did not receive expected `request` call. %s", errorMessageAddendum));
if (requested <= 0) {
return env.<Long>flopAndFail(String.format("Requests cannot be zero or negative but received request(%s)", requested));
} else {
pendingDemand += requested;
return requested;
}
}

public void expectExactRequest(long expected) throws InterruptedException {
expectExactRequest(expected, env.defaultTimeoutMillis());
}
Expand Down
Loading