Skip to content

Commit be98041

Browse files
committed
Rule 3.12 verification in subscriber whitebox now really checks it
This verification is now moved to PublisherVerification, as it's a publishers behavior (*it* must stop emiting, not the subscriber). Resolves reactive-streams#115
1 parent 96a4a0a commit be98041

5 files changed

+68
-31
lines changed

tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java

+5-8
Original file line numberDiff line numberDiff line change
@@ -324,6 +324,11 @@ public void spec309_requestNegativeNumberMustThrowIllegalArgumentException() thr
324324
publisherVerification.spec309_requestNegativeNumberMustThrowIllegalArgumentException();
325325
}
326326

327+
@Test
328+
public void spec312_cancelMustMakeThePublisherToEventuallyStopSignaling() throws Throwable {
329+
publisherVerification.spec312_cancelMustMakeThePublisherToEventuallyStopSignaling();
330+
}
331+
327332
@Test
328333
public void spec313_cancelMustMakeThePublisherEventuallyDropAllReferencesToTheSubscriber() throws Throwable {
329334
publisherVerification.spec313_cancelMustMakeThePublisherEventuallyDropAllReferencesToTheSubscriber();
@@ -386,9 +391,6 @@ public void onSubscribe(final Subscription subscription) {
386391
if (subs.isCompleted()) subscription.cancel(); // the Probe must also pass subscriber verification
387392

388393
probe.registerOnSubscribe(new SubscriberWhiteboxVerification.SubscriberPuppet() {
389-
public void triggerShutdown() {
390-
subscription.cancel();
391-
}
392394

393395
@Override
394396
public void triggerRequest(long elements) {
@@ -583,11 +585,6 @@ public void spec311_requestMaySynchronouslyCallOnCompleteOrOnError() throws Exce
583585
subscriberVerification.spec311_requestMaySynchronouslyCallOnCompleteOrOnError();
584586
}
585587

586-
@Test
587-
public void spec312_cancelMustRequestThePublisherToEventuallyStopSignaling() throws Throwable {
588-
subscriberVerification.spec312_cancelMustRequestThePublisherToEventuallyStopSignaling();
589-
}
590-
591588
@Test
592589
public void spec314_cancelMayCauseThePublisherToShutdownIfNoOtherSubscriptionExists() throws Exception {
593590
subscriberVerification.spec314_cancelMayCauseThePublisherToShutdownIfNoOtherSubscriptionExists();

tck/src/main/java/org/reactivestreams/tck/PublisherVerification.java

+42
Original file line numberDiff line numberDiff line change
@@ -656,6 +656,48 @@ public void run() {
656656
});
657657
}
658658

659+
// Verifies rule: https://github.com/reactive-streams/reactive-streams#3.12
660+
@Required @Test
661+
public void spec312_cancelMustMakeThePublisherToEventuallyStopSignaling() throws Throwable {
662+
final int elementsCount = 20;
663+
664+
activePublisherTest(elementsCount, new PublisherTestRun<T>() {
665+
@Override
666+
public void run(Publisher<T> pub) throws Throwable {
667+
final ManualSubscriber<T> sub = env.newManualSubscriber(pub);
668+
669+
sub.request(10);
670+
sub.request(5);
671+
final int totalDemand = 10 + 5;
672+
673+
sub.cancel(); // must keep subscription so we may accept
674+
675+
sub.nextElement();
676+
int onNextSignals = 1;
677+
678+
boolean stillBeingSignalled;
679+
do {
680+
try {
681+
sub.expectNone();
682+
env.verifyNoAsyncErrors();
683+
684+
stillBeingSignalled = false;
685+
} catch (Throwable th) {
686+
onNextSignals += 1;
687+
env.dropAsyncError();
688+
689+
stillBeingSignalled = true;
690+
}
691+
} while (stillBeingSignalled && onNextSignals < elementsCount);
692+
693+
assertTrue(onNextSignals <= totalDemand,
694+
String.format("Publisher signalled [%d] elements, which is more than the signalled demand: %d", onNextSignals, totalDemand));
695+
}
696+
});
697+
698+
env.verifyNoAsyncErrors();
699+
}
700+
659701
// Verifies rule: https://github.com/reactive-streams/reactive-streams#3.13
660702
@Required @Test
661703
public void spec313_cancelMustMakeThePublisherEventuallyDropAllReferencesToTheSubscriber() throws Throwable {

tck/src/main/java/org/reactivestreams/tck/SubscriberBlackboxVerification.java

-6
Original file line numberDiff line numberDiff line change
@@ -335,12 +335,6 @@ public void spec311_blackbox_requestMaySynchronouslyCallOnCompleteOrOnError() th
335335
notVerified(); // cannot be meaningfully tested, or can it?
336336
}
337337

338-
// Verifies rule: https://github.com/reactive-streams/reactive-streams#3.12
339-
@Required @Test
340-
public void spec312_blackbox_cancelMustRequestThePublisherToEventuallyStopSignaling() throws Throwable {
341-
notVerified(); // cannot be meaningfully tested as black box, or can it?
342-
}
343-
344338
// Verifies rule: https://github.com/reactive-streams/reactive-streams#3.14
345339
@NotVerified @Test
346340
public void spec314_blackbox_cancelMayCauseThePublisherToShutdownIfNoOtherSubscriptionExists() throws Exception {

tck/src/main/java/org/reactivestreams/tck/SubscriberWhiteboxVerification.java

+16-15
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,17 @@ protected SubscriberWhiteboxVerification(TestEnvironment env) {
5252
*/
5353
public abstract Publisher<T> createHelperPublisher(long elements);
5454

55+
/**
56+
* Used to break possibly infinite wait-loops.
57+
* Some Rules use the "eventually stop signalling" wording, which requires the test to spin accepting {@code onNext}
58+
* signals until no more are signalled. In these tests, this value will be used as upper bound on the number of spin iterations.
59+
*
60+
* Override this method in case your implementation synchronously signals very large batches before reacting to cancellation (for example).
61+
*/
62+
public long maxOnNextSignalsInTest() {
63+
return 100;
64+
}
65+
5566
////////////////////// TEST ENV CLEANUP /////////////////////////////////////
5667

5768
@BeforeMethod
@@ -431,20 +442,6 @@ public void spec311_requestMaySynchronouslyCallOnCompleteOrOnError() throws Exce
431442
notVerified(); // cannot be meaningfully tested, or can it?
432443
}
433444

434-
// Verifies rule: https://github.com/reactive-streams/reactive-streams#3.12
435-
@Required @Test
436-
public void spec312_cancelMustRequestThePublisherToEventuallyStopSignaling() throws Throwable {
437-
subscriberTest(new TestStageTestRun() {
438-
@Override
439-
public void run(WhiteboxTestStage stage) throws InterruptedException {
440-
stage.puppet().signalCancel();
441-
stage.expectCancelling();
442-
443-
stage.verifyNoAsyncErrors();
444-
}
445-
});
446-
}
447-
448445
// Verifies rule: https://github.com/reactive-streams/reactive-streams#3.14
449446
@NotVerified @Test
450447
public void spec314_cancelMayCauseThePublisherToShutdownIfNoOtherSubscriptionExists() throws Exception {
@@ -518,7 +515,7 @@ public void subscriberTestWithoutSetup(TestStageTestRun body) throws Throwable {
518515

519516
public class WhiteboxTestStage extends ManualPublisher<T> {
520517
public Publisher<T> pub;
521-
public ManualSubscriber<T> tees; // gives us access to an infinite stream of T values
518+
public ManualSubscriber<T> tees; // gives us access to a stream T values
522519
public WhiteboxSubscriberProbe<T> probe;
523520

524521
public T lastT = null;
@@ -546,6 +543,10 @@ public SubscriberPuppet puppet() {
546543
return probe.puppet();
547544
}
548545

546+
public WhiteboxSubscriberProbe<T> probe() {
547+
return probe;
548+
}
549+
549550
public Publisher<T> createHelperPublisher(long elements) {
550551
return SubscriberWhiteboxVerification.this.createHelperPublisher(elements);
551552
}

tck/src/main/java/org/reactivestreams/tck/TestEnvironment.java

+5-2
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,10 @@ public void clearAsyncErrors() {
128128
asyncErrors.clear();
129129
}
130130

131+
public void dropAsyncError() {
132+
asyncErrors.remove(0);
133+
}
134+
131135
public void verifyNoAsyncErrors(long delay) {
132136
try {
133137
Thread.sleep(delay);
@@ -620,7 +624,6 @@ public boolean isCompleted() {
620624
return _value != null;
621625
}
622626

623-
624627
/**
625628
* Allows using expectCompletion to await for completion of the value and complete it _then_
626629
*/
@@ -761,7 +764,7 @@ public void expectError(long timeoutMillis, String errorMsg) throws Exception {
761764
}
762765
}
763766

764-
void expectNone(long withinMillis, String errorMsgPrefix) throws InterruptedException {
767+
public void expectNone(long withinMillis, String errorMsgPrefix) throws InterruptedException {
765768
Thread.sleep(withinMillis);
766769
Optional<T> value = abq.poll();
767770

0 commit comments

Comments
 (0)