Skip to content

Commit 79e1669

Browse files
committed
Rule 3.12 verification in subscriber whitebox now really checks it
This verification is now moved to PublisherVerification, as it's a publishers behavior (*it* must stop emiting, not the subscriber). Resolves reactive-streams#115
1 parent 96a4a0a commit 79e1669

5 files changed

+72
-31
lines changed

tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java

+5-8
Original file line numberDiff line numberDiff line change
@@ -324,6 +324,11 @@ public void spec309_requestNegativeNumberMustThrowIllegalArgumentException() thr
324324
publisherVerification.spec309_requestNegativeNumberMustThrowIllegalArgumentException();
325325
}
326326

327+
@Test
328+
public void spec312_cancelMustMakeThePublisherToEventuallyStopSignaling() throws Throwable {
329+
publisherVerification.spec312_cancelMustMakeThePublisherToEventuallyStopSignaling();
330+
}
331+
327332
@Test
328333
public void spec313_cancelMustMakeThePublisherEventuallyDropAllReferencesToTheSubscriber() throws Throwable {
329334
publisherVerification.spec313_cancelMustMakeThePublisherEventuallyDropAllReferencesToTheSubscriber();
@@ -386,9 +391,6 @@ public void onSubscribe(final Subscription subscription) {
386391
if (subs.isCompleted()) subscription.cancel(); // the Probe must also pass subscriber verification
387392

388393
probe.registerOnSubscribe(new SubscriberWhiteboxVerification.SubscriberPuppet() {
389-
public void triggerShutdown() {
390-
subscription.cancel();
391-
}
392394

393395
@Override
394396
public void triggerRequest(long elements) {
@@ -583,11 +585,6 @@ public void spec311_requestMaySynchronouslyCallOnCompleteOrOnError() throws Exce
583585
subscriberVerification.spec311_requestMaySynchronouslyCallOnCompleteOrOnError();
584586
}
585587

586-
@Test
587-
public void spec312_cancelMustRequestThePublisherToEventuallyStopSignaling() throws Throwable {
588-
subscriberVerification.spec312_cancelMustRequestThePublisherToEventuallyStopSignaling();
589-
}
590-
591588
@Test
592589
public void spec314_cancelMayCauseThePublisherToShutdownIfNoOtherSubscriptionExists() throws Exception {
593590
subscriberVerification.spec314_cancelMayCauseThePublisherToShutdownIfNoOtherSubscriptionExists();

tck/src/main/java/org/reactivestreams/tck/PublisherVerification.java

+42
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import static org.reactivestreams.tck.Annotations.*;
2121
import static org.testng.Assert.assertEquals;
22+
import static org.testng.Assert.assertFalse;
2223
import static org.testng.Assert.assertTrue;
2324

2425
/**
@@ -656,6 +657,47 @@ public void run() {
656657
});
657658
}
658659

660+
// Verifies rule: https://github.com/reactive-streams/reactive-streams#3.12
661+
@Required @Test
662+
public void spec312_cancelMustMakeThePublisherToEventuallyStopSignaling() throws Throwable {
663+
final int elementsCount = 20;
664+
665+
activePublisherTest(elementsCount, new PublisherTestRun<T>() {
666+
@Override
667+
public void run(Publisher<T> pub) throws Throwable {
668+
final ManualSubscriber<T> sub = env.newManualSubscriber(pub);
669+
670+
sub.request(10);
671+
sub.request(5);
672+
final int totalDemand = 10 + 5;
673+
674+
sub.cancel();
675+
676+
sub.nextElement();
677+
int onNextsSignalled = 1;
678+
679+
boolean stillBeingSignalled;
680+
do {
681+
// put asyncError if onNext signal received
682+
sub.expectNone();
683+
Throwable error = env.dropAsyncError();
684+
685+
if (error == null) {
686+
stillBeingSignalled = false;
687+
} else {
688+
onNextsSignalled += 1;
689+
stillBeingSignalled = true;
690+
}
691+
} while (stillBeingSignalled && onNextsSignalled < totalDemand);
692+
693+
assertTrue(onNextsSignalled <= totalDemand,
694+
String.format("Publisher signalled [%d] elements, which is more than the signalled demand: %d", onNextsSignalled, totalDemand));
695+
}
696+
});
697+
698+
env.verifyNoAsyncErrors();
699+
}
700+
659701
// Verifies rule: https://github.com/reactive-streams/reactive-streams#3.13
660702
@Required @Test
661703
public void spec313_cancelMustMakeThePublisherEventuallyDropAllReferencesToTheSubscriber() throws Throwable {

tck/src/main/java/org/reactivestreams/tck/SubscriberBlackboxVerification.java

-6
Original file line numberDiff line numberDiff line change
@@ -335,12 +335,6 @@ public void spec311_blackbox_requestMaySynchronouslyCallOnCompleteOrOnError() th
335335
notVerified(); // cannot be meaningfully tested, or can it?
336336
}
337337

338-
// Verifies rule: https://github.com/reactive-streams/reactive-streams#3.12
339-
@Required @Test
340-
public void spec312_blackbox_cancelMustRequestThePublisherToEventuallyStopSignaling() throws Throwable {
341-
notVerified(); // cannot be meaningfully tested as black box, or can it?
342-
}
343-
344338
// Verifies rule: https://github.com/reactive-streams/reactive-streams#3.14
345339
@NotVerified @Test
346340
public void spec314_blackbox_cancelMayCauseThePublisherToShutdownIfNoOtherSubscriptionExists() throws Exception {

tck/src/main/java/org/reactivestreams/tck/SubscriberWhiteboxVerification.java

+16-15
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,17 @@ protected SubscriberWhiteboxVerification(TestEnvironment env) {
5252
*/
5353
public abstract Publisher<T> createHelperPublisher(long elements);
5454

55+
/**
56+
* Used to break possibly infinite wait-loops.
57+
* Some Rules use the "eventually stop signalling" wording, which requires the test to spin accepting {@code onNext}
58+
* signals until no more are signalled. In these tests, this value will be used as upper bound on the number of spin iterations.
59+
*
60+
* Override this method in case your implementation synchronously signals very large batches before reacting to cancellation (for example).
61+
*/
62+
public long maxOnNextSignalsInTest() {
63+
return 100;
64+
}
65+
5566
////////////////////// TEST ENV CLEANUP /////////////////////////////////////
5667

5768
@BeforeMethod
@@ -431,20 +442,6 @@ public void spec311_requestMaySynchronouslyCallOnCompleteOrOnError() throws Exce
431442
notVerified(); // cannot be meaningfully tested, or can it?
432443
}
433444

434-
// Verifies rule: https://github.com/reactive-streams/reactive-streams#3.12
435-
@Required @Test
436-
public void spec312_cancelMustRequestThePublisherToEventuallyStopSignaling() throws Throwable {
437-
subscriberTest(new TestStageTestRun() {
438-
@Override
439-
public void run(WhiteboxTestStage stage) throws InterruptedException {
440-
stage.puppet().signalCancel();
441-
stage.expectCancelling();
442-
443-
stage.verifyNoAsyncErrors();
444-
}
445-
});
446-
}
447-
448445
// Verifies rule: https://github.com/reactive-streams/reactive-streams#3.14
449446
@NotVerified @Test
450447
public void spec314_cancelMayCauseThePublisherToShutdownIfNoOtherSubscriptionExists() throws Exception {
@@ -518,7 +515,7 @@ public void subscriberTestWithoutSetup(TestStageTestRun body) throws Throwable {
518515

519516
public class WhiteboxTestStage extends ManualPublisher<T> {
520517
public Publisher<T> pub;
521-
public ManualSubscriber<T> tees; // gives us access to an infinite stream of T values
518+
public ManualSubscriber<T> tees; // gives us access to a stream T values
522519
public WhiteboxSubscriberProbe<T> probe;
523520

524521
public T lastT = null;
@@ -546,6 +543,10 @@ public SubscriberPuppet puppet() {
546543
return probe.puppet();
547544
}
548545

546+
public WhiteboxSubscriberProbe<T> probe() {
547+
return probe;
548+
}
549+
549550
public Publisher<T> createHelperPublisher(long elements) {
550551
return SubscriberWhiteboxVerification.this.createHelperPublisher(elements);
551552
}

tck/src/main/java/org/reactivestreams/tck/TestEnvironment.java

+9-2
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,14 @@ public void clearAsyncErrors() {
128128
asyncErrors.clear();
129129
}
130130

131+
public Throwable dropAsyncError() {
132+
try {
133+
return asyncErrors.remove(0);
134+
} catch (IndexOutOfBoundsException ex) {
135+
return null;
136+
}
137+
}
138+
131139
public void verifyNoAsyncErrors(long delay) {
132140
try {
133141
Thread.sleep(delay);
@@ -620,7 +628,6 @@ public boolean isCompleted() {
620628
return _value != null;
621629
}
622630

623-
624631
/**
625632
* Allows using expectCompletion to await for completion of the value and complete it _then_
626633
*/
@@ -761,7 +768,7 @@ public void expectError(long timeoutMillis, String errorMsg) throws Exception {
761768
}
762769
}
763770

764-
void expectNone(long withinMillis, String errorMsgPrefix) throws InterruptedException {
771+
public void expectNone(long withinMillis, String errorMsgPrefix) throws InterruptedException {
765772
Thread.sleep(withinMillis);
766773
Optional<T> value = abq.poll();
767774

0 commit comments

Comments
 (0)