-
Notifications
You must be signed in to change notification settings - Fork 534
Allow request-coordinating Processor implementations to pass the TCK #414
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 5 commits
2b1c7e2
09fe154
62d385c
984063e
721002d
14e821b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -189,6 +189,16 @@ public long maxSupportedSubscribers() { | |
return Long.MAX_VALUE; | ||
} | ||
|
||
/** | ||
* Override this method and return {@code true} if the {@link Processor} returned by the | ||
* {@link #createIdentityProcessor(int)} coordinates its {@link Subscriber}s | ||
* request amounts and only delivers onNext signals if all Subscribers have | ||
* indicated (via their Subscription#request(long)) they are ready to receive elements. | ||
*/ | ||
public boolean doesCoordinatedEmission() { | ||
return false; | ||
} | ||
|
||
////////////////////// TEST ENV CLEANUP ///////////////////////////////////// | ||
|
||
@BeforeMethod | ||
|
@@ -401,8 +411,27 @@ public void required_spec317_mustNotSignalOnErrorWhenPendingAboveLongMaxValue() | |
publisherVerification.required_spec317_mustNotSignalOnErrorWhenPendingAboveLongMaxValue(); | ||
} | ||
|
||
// Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#1.4 | ||
// for multiple subscribers | ||
|
||
/** | ||
* Asks for a {@code Processor} that supports at least 2 {@code Subscriber}s at once and checks if two {@code Subscriber}s | ||
* receive the same items and a terminal {@code Exception}. | ||
* <p> | ||
* If the {@code Processor} requests and/or emits items only when all of its {@code Subscriber}s have requested, | ||
* override {@link #doesCoordinatedEmission()} and return {@code true} to indicate this property. | ||
* <p> | ||
* <b>Verifies rule:</b> <a href='https://github.com/reactive-streams/reactive-streams-jvm#1.4'>1.4</a> with multiple | ||
* {@code Subscriber}s. | ||
* <p> | ||
* The test is not executed if {@link IdentityProcessorVerification#maxSupportedSubscribers()} is less than 2. | ||
* <p> | ||
* If this test fails, the following could be checked within the {@code Publisher} implementation: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we wanted to include a hint about There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We could introduce an overload to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That sounds very good! (the hint override) |
||
* <ul> | ||
* <li>The {@code TestEnvironment} has large enough timeout specified in case the {@code Processor} has some time-delay behavior.</li> | ||
* <li>The {@code Processor} is able to fulfill requests of its {@code Subscriber}s independently of each other's requests or | ||
* else override {@link #doesCoordinatedEmission()} and return {@code true} to indicate the test {@code Subscriber}s | ||
* both have to request first.</li> | ||
* </ul> | ||
*/ | ||
@Test | ||
public void required_spec104_mustCallOnErrorOnAllItsSubscribersIfItEncountersANonRecoverableError() throws Throwable { | ||
optionalMultipleSubscribersTest(2, new Function<Long,TestSetup>() { | ||
|
@@ -415,17 +444,42 @@ public TestSetup apply(Long aLong) throws Throwable { | |
final ManualSubscriberWithErrorCollection<T> sub2 = new ManualSubscriberWithErrorCollection<T>(env); | ||
env.subscribe(processor, sub2); | ||
|
||
sub1.request(1); | ||
expectRequest(); | ||
final T x = sendNextTFromUpstream(); | ||
expectNextElement(sub1, x); | ||
sub1.request(1); | ||
final Exception ex = new RuntimeException("Test exception"); | ||
|
||
// sub1 has received one element, and has one demand pending | ||
// sub2 has not yet requested anything | ||
if (doesCoordinatedEmission()) { | ||
sub1.request(1); | ||
sub2.request(1); | ||
|
||
final Exception ex = new RuntimeException("Test exception"); | ||
expectRequest(); | ||
|
||
final T x = sendNextTFromUpstream(); | ||
|
||
expectNextElement(sub1, x); | ||
expectNextElement(sub2, x); | ||
|
||
sub1.request(1); | ||
sub2.request(1); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shouldn't There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, it should. I'll update. |
||
} else { | ||
sub1.request(1); | ||
|
||
expectRequest(env.defaultTimeoutMillis(), | ||
"If the Processor coordinates requests/emissions when having multiple Subscribers" | ||
+ " at once, please override doesCoordinatedEmission() to return true in this " | ||
+ "IdentityProcessorVerification to allow this test to pass."); | ||
|
||
final T x = sendNextTFromUpstream(); | ||
expectNextElement(sub1, x, | ||
"If the Processor coordinates requests/emissions when having multiple Subscribers" | ||
+ " at once, please override doesCoordinatedEmission() to return true in this " | ||
+ "IdentityProcessorVerification to allow this test to pass."); | ||
|
||
sub1.request(1); | ||
|
||
// sub1 has received one element, and has one demand pending | ||
// sub2 has not yet requested anything | ||
} | ||
sendError(ex); | ||
|
||
sub1.expectError(ex); | ||
sub2.expectError(ex); | ||
|
||
|
@@ -643,8 +697,26 @@ public void untested_spec316_requestMustNotThrowExceptionAndMustOnErrorTheSubscr | |
|
||
/////////////////////// ADDITIONAL "COROLLARY" TESTS ////////////////////// | ||
|
||
// A Processor | ||
// must trigger `requestFromUpstream` for elements that have been requested 'long ago' | ||
/** | ||
* Asks for a {@code Processor} that supports at least 2 {@code Subscriber}s at once and checks requests | ||
* from {@code Subscriber}s will eventually lead to requests towards the upstream of the {@code Processor}. | ||
* <p> | ||
* If the {@code Processor} requests and/or emits items only when all of its {@code Subscriber}s have requested, | ||
* override {@link #doesCoordinatedEmission()} and return {@code true} to indicate this property. | ||
* <p> | ||
* <b>Verifies rule:</b> <a href='https://github.com/reactive-streams/reactive-streams-jvm#1.4'>2.1</a> with multiple | ||
* {@code Subscriber}s. | ||
* <p> | ||
* The test is not executed if {@link IdentityProcessorVerification#maxSupportedSubscribers()} is less than 2. | ||
* <p> | ||
* If this test fails, the following could be checked within the {@code Publisher} implementation: | ||
* <ul> | ||
* <li>The {@code TestEnvironment} has large enough timeout specified in case the {@code Processor} has some time-delay behavior.</li> | ||
* <li>The {@code Processor} is able to fulfill requests of its {@code Subscriber}s independently of each other's requests or | ||
* else override {@link #doesCoordinatedEmission()} and return {@code true} to indicate the test {@code Subscriber}s | ||
* both have to request first.</li> | ||
* </ul> | ||
*/ | ||
@Test | ||
public void required_mustRequestFromUpstreamForElementsThatHaveBeenRequestedLongAgo() throws Throwable { | ||
optionalMultipleSubscribersTest(2, new Function<Long,TestSetup>() { | ||
|
@@ -673,13 +745,25 @@ public TestSetup apply(Long subscribers) throws Throwable { | |
// sub1 now has 18 pending | ||
// sub2 has 0 pending | ||
|
||
final T z = sendNextTFromUpstream(); | ||
expectNextElement(sub1, z); | ||
sub2.expectNone(); // since sub2 hasn't requested anything yet | ||
if (doesCoordinatedEmission()) { | ||
sub2.expectNone(); // since sub2 hasn't requested anything yet | ||
|
||
sub2.request(1); | ||
|
||
sub2.request(1); | ||
expectNextElement(sub2, z); | ||
final T z = sendNextTFromUpstream(); | ||
expectNextElement(sub1, z); | ||
expectNextElement(sub2, z); | ||
} else { | ||
final T z = sendNextTFromUpstream(); | ||
expectNextElement(sub1, z, | ||
"If the Processor coordinates requests/emissions when having multiple Subscribers" | ||
+ " at once, please override doesCoordinatedEmission() to return true in this " | ||
+ "IdentityProcessorVerification to allow this test to pass."); | ||
sub2.expectNone(); // since sub2 hasn't requested anything yet | ||
|
||
sub2.request(1); | ||
expectNextElement(sub2, z); | ||
} | ||
if (totalRequests == 3) { | ||
expectRequest(); | ||
} | ||
|
@@ -748,6 +832,13 @@ public void expectNextElement(ManualSubscriber<T> sub, T expected) throws Interr | |
} | ||
} | ||
|
||
public void expectNextElement(ManualSubscriber<T> sub, T expected, String errorMessageAddendum) throws InterruptedException { | ||
final T elem = sub.nextElement(String.format("timeout while awaiting %s. %s", expected, errorMessageAddendum)); | ||
if (!elem.equals(expected)) { | ||
env.flop(String.format("Received `onNext(%s)` on downstream but expected `onNext(%s)`", elem, expected)); | ||
} | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nice 👍 |
||
|
||
public T sendNextTFromUpstream() throws InterruptedException { | ||
final T x = nextT(); | ||
sendNext(x); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since most devs probably won't ever notice this method, or know that they should enable/disable it—what do you think about including a hint about it in case the tests which use it fail?
(Sorry, this comment was on the wrong line before)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Documenting it more in the README of the TCK could be a good step?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can add some description to it in a couple of hours.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.