Skip to content

Commit 6a0bf30

Browse files
committed
Rule 3.12 verification in subscriber whitebox now really checks it
Resolves reactive-streams#115
1 parent 96a4a0a commit 6a0bf30

File tree

2 files changed

+28
-5
lines changed

2 files changed

+28
-5
lines changed

tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java

-3
Original file line numberDiff line numberDiff line change
@@ -386,9 +386,6 @@ public void onSubscribe(final Subscription subscription) {
386386
if (subs.isCompleted()) subscription.cancel(); // the Probe must also pass subscriber verification
387387

388388
probe.registerOnSubscribe(new SubscriberWhiteboxVerification.SubscriberPuppet() {
389-
public void triggerShutdown() {
390-
subscription.cancel();
391-
}
392389

393390
@Override
394391
public void triggerRequest(long elements) {

tck/src/main/java/org/reactivestreams/tck/SubscriberWhiteboxVerification.java

+28-2
Original file line numberDiff line numberDiff line change
@@ -437,8 +437,30 @@ public void spec312_cancelMustRequestThePublisherToEventuallyStopSignaling() thr
437437
subscriberTest(new TestStageTestRun() {
438438
@Override
439439
public void run(WhiteboxTestStage stage) throws InterruptedException {
440+
final int requestedElements = 5;
441+
442+
stage.puppet().triggerRequest(requestedElements);
443+
stage.expectRequest();
444+
stage.signalNext();
445+
446+
// cancelling is NOT REQUIRED to stop the stream immediatly, example: there may be signals in flight
440447
stage.puppet().signalCancel();
441-
stage.expectCancelling();
448+
449+
stage.probe().expectNext();
450+
boolean stillBeingSignalled;
451+
do {
452+
// depends on Rule 3.6 https://github.com/reactive-streams/reactive-streams#3.12
453+
// requesting from a cancelled subscription MUST be a NOPs
454+
stage.puppet().triggerRequest(1);
455+
stage.signalNext(); // signalling will stop, since Subscription cancelled
456+
457+
try {
458+
stage.probe().expectNone();
459+
stillBeingSignalled = false;
460+
} catch (Throwable th) {
461+
stillBeingSignalled = true;
462+
}
463+
} while (stillBeingSignalled);
442464

443465
stage.verifyNoAsyncErrors();
444466
}
@@ -518,7 +540,7 @@ public void subscriberTestWithoutSetup(TestStageTestRun body) throws Throwable {
518540

519541
public class WhiteboxTestStage extends ManualPublisher<T> {
520542
public Publisher<T> pub;
521-
public ManualSubscriber<T> tees; // gives us access to an infinite stream of T values
543+
public ManualSubscriber<T> tees; // gives us access to a stream T values
522544
public WhiteboxSubscriberProbe<T> probe;
523545

524546
public T lastT = null;
@@ -546,6 +568,10 @@ public SubscriberPuppet puppet() {
546568
return probe.puppet();
547569
}
548570

571+
public WhiteboxSubscriberProbe<T> probe() {
572+
return probe;
573+
}
574+
549575
public Publisher<T> createHelperPublisher(long elements) {
550576
return SubscriberWhiteboxVerification.this.createHelperPublisher(elements);
551577
}

0 commit comments

Comments
 (0)