Skip to content

Rule 3.12 verification in subscriber whitebox now really checks it #121

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,11 @@ public void spec309_requestNegativeNumberMustThrowIllegalArgumentException() thr
publisherVerification.spec309_requestNegativeNumberMustThrowIllegalArgumentException();
}

@Test
public void spec312_cancelMustMakeThePublisherToEventuallyStopSignaling() throws Throwable {
publisherVerification.spec312_cancelMustMakeThePublisherToEventuallyStopSignaling();
}

@Test
public void spec313_cancelMustMakeThePublisherEventuallyDropAllReferencesToTheSubscriber() throws Throwable {
publisherVerification.spec313_cancelMustMakeThePublisherEventuallyDropAllReferencesToTheSubscriber();
Expand Down Expand Up @@ -386,9 +391,6 @@ public void onSubscribe(final Subscription subscription) {
if (subs.isCompleted()) subscription.cancel(); // the Probe must also pass subscriber verification

probe.registerOnSubscribe(new SubscriberWhiteboxVerification.SubscriberPuppet() {
public void triggerShutdown() {
subscription.cancel();
}

@Override
public void triggerRequest(long elements) {
Expand Down Expand Up @@ -583,11 +585,6 @@ public void spec311_requestMaySynchronouslyCallOnCompleteOrOnError() throws Exce
subscriberVerification.spec311_requestMaySynchronouslyCallOnCompleteOrOnError();
}

@Test
public void spec312_cancelMustRequestThePublisherToEventuallyStopSignaling() throws Throwable {
subscriberVerification.spec312_cancelMustRequestThePublisherToEventuallyStopSignaling();
}

@Test
public void spec314_cancelMayCauseThePublisherToShutdownIfNoOtherSubscriptionExists() throws Exception {
subscriberVerification.spec314_cancelMayCauseThePublisherToShutdownIfNoOtherSubscriptionExists();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import static org.reactivestreams.tck.Annotations.*;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;

/**
Expand Down Expand Up @@ -656,6 +657,47 @@ public void run() {
});
}

// Verifies rule: https://github.com/reactive-streams/reactive-streams#3.12
@Required @Test
public void spec312_cancelMustMakeThePublisherToEventuallyStopSignaling() throws Throwable {
final int elementsCount = 20;

activePublisherTest(elementsCount, new PublisherTestRun<T>() {
@Override
public void run(Publisher<T> pub) throws Throwable {
final ManualSubscriber<T> sub = env.newManualSubscriber(pub);

sub.request(10);
sub.request(5);
final int totalDemand = 10 + 5;

sub.cancel();

sub.nextElement();
int onNextsSignalled = 1;

boolean stillBeingSignalled;
do {
// put asyncError if onNext signal received
sub.expectNone();
Throwable error = env.dropAsyncError();

if (error == null) {
stillBeingSignalled = false;
} else {
onNextsSignalled += 1;
stillBeingSignalled = true;
}
} while (stillBeingSignalled && onNextsSignalled < totalDemand);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

much better!


assertTrue(onNextsSignalled <= totalDemand,
String.format("Publisher signalled [%d] elements, which is more than the signalled demand: %d", onNextsSignalled, totalDemand));
}
});

env.verifyNoAsyncErrors();
}

// Verifies rule: https://github.com/reactive-streams/reactive-streams#3.13
@Required @Test
public void spec313_cancelMustMakeThePublisherEventuallyDropAllReferencesToTheSubscriber() throws Throwable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -335,12 +335,6 @@ public void spec311_blackbox_requestMaySynchronouslyCallOnCompleteOrOnError() th
notVerified(); // cannot be meaningfully tested, or can it?
}

// Verifies rule: https://github.com/reactive-streams/reactive-streams#3.12
@Required @Test
public void spec312_blackbox_cancelMustRequestThePublisherToEventuallyStopSignaling() throws Throwable {
notVerified(); // cannot be meaningfully tested as black box, or can it?
}

// Verifies rule: https://github.com/reactive-streams/reactive-streams#3.14
@NotVerified @Test
public void spec314_blackbox_cancelMayCauseThePublisherToShutdownIfNoOtherSubscriptionExists() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,17 @@ protected SubscriberWhiteboxVerification(TestEnvironment env) {
*/
public abstract Publisher<T> createHelperPublisher(long elements);

/**
* Used to break possibly infinite wait-loops.
* Some Rules use the "eventually stop signalling" wording, which requires the test to spin accepting {@code onNext}
* signals until no more are signalled. In these tests, this value will be used as upper bound on the number of spin iterations.
*
* Override this method in case your implementation synchronously signals very large batches before reacting to cancellation (for example).
*/
public long maxOnNextSignalsInTest() {
return 100;
}

////////////////////// TEST ENV CLEANUP /////////////////////////////////////

@BeforeMethod
Expand Down Expand Up @@ -431,20 +442,6 @@ public void spec311_requestMaySynchronouslyCallOnCompleteOrOnError() throws Exce
notVerified(); // cannot be meaningfully tested, or can it?
}

// Verifies rule: https://github.com/reactive-streams/reactive-streams#3.12
@Required @Test
public void spec312_cancelMustRequestThePublisherToEventuallyStopSignaling() throws Throwable {
subscriberTest(new TestStageTestRun() {
@Override
public void run(WhiteboxTestStage stage) throws InterruptedException {
stage.puppet().signalCancel();
stage.expectCancelling();

stage.verifyNoAsyncErrors();
}
});
}

// Verifies rule: https://github.com/reactive-streams/reactive-streams#3.14
@NotVerified @Test
public void spec314_cancelMayCauseThePublisherToShutdownIfNoOtherSubscriptionExists() throws Exception {
Expand Down Expand Up @@ -518,7 +515,7 @@ public void subscriberTestWithoutSetup(TestStageTestRun body) throws Throwable {

public class WhiteboxTestStage extends ManualPublisher<T> {
public Publisher<T> pub;
public ManualSubscriber<T> tees; // gives us access to an infinite stream of T values
public ManualSubscriber<T> tees; // gives us access to a stream T values
public WhiteboxSubscriberProbe<T> probe;

public T lastT = null;
Expand Down Expand Up @@ -546,6 +543,10 @@ public SubscriberPuppet puppet() {
return probe.puppet();
}

public WhiteboxSubscriberProbe<T> probe() {
return probe;
}

public Publisher<T> createHelperPublisher(long elements) {
return SubscriberWhiteboxVerification.this.createHelperPublisher(elements);
}
Expand Down
11 changes: 9 additions & 2 deletions tck/src/main/java/org/reactivestreams/tck/TestEnvironment.java
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,14 @@ public void clearAsyncErrors() {
asyncErrors.clear();
}

public Throwable dropAsyncError() {
try {
return asyncErrors.remove(0);
} catch (IndexOutOfBoundsException ex) {
return null;
}
}

public void verifyNoAsyncErrors(long delay) {
try {
Thread.sleep(delay);
Expand Down Expand Up @@ -620,7 +628,6 @@ public boolean isCompleted() {
return _value != null;
}


/**
* Allows using expectCompletion to await for completion of the value and complete it _then_
*/
Expand Down Expand Up @@ -761,7 +768,7 @@ public void expectError(long timeoutMillis, String errorMsg) throws Exception {
}
}

void expectNone(long withinMillis, String errorMsgPrefix) throws InterruptedException {
public void expectNone(long withinMillis, String errorMsgPrefix) throws InterruptedException {
Thread.sleep(withinMillis);
Optional<T> value = abq.poll();

Expand Down