Skip to content

Commit 9284dee

Browse files
committed
Rule 3.12 verification in subscriber whitebox now really checks it
Resolves reactive-streams#115
1 parent 96a4a0a commit 9284dee

File tree

2 files changed

+42
-5
lines changed

2 files changed

+42
-5
lines changed

tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java

-3
Original file line numberDiff line numberDiff line change
@@ -386,9 +386,6 @@ public void onSubscribe(final Subscription subscription) {
386386
if (subs.isCompleted()) subscription.cancel(); // the Probe must also pass subscriber verification
387387

388388
probe.registerOnSubscribe(new SubscriberWhiteboxVerification.SubscriberPuppet() {
389-
public void triggerShutdown() {
390-
subscription.cancel();
391-
}
392389

393390
@Override
394391
public void triggerRequest(long elements) {

tck/src/main/java/org/reactivestreams/tck/SubscriberWhiteboxVerification.java

+42-2
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,17 @@ protected SubscriberWhiteboxVerification(TestEnvironment env) {
5252
*/
5353
public abstract Publisher<T> createHelperPublisher(long elements);
5454

55+
/**
56+
* Used to break possibly infinite wait-loops.
57+
* Some Rules use the "eventually stop signalling" wording, which requires the test to spin accepting {@code onNext}
58+
* signals until no more are signalled. In these tests, this value will be used as upper bound on the number of spin iterations.
59+
*
60+
* Override this method in case your implementation synchronously signals very large batches before reacting to cancellation (for example).
61+
*/
62+
public long maxOnNextSignalsInTest() {
63+
return 100;
64+
}
65+
5566
////////////////////// TEST ENV CLEANUP /////////////////////////////////////
5667

5768
@BeforeMethod
@@ -437,8 +448,33 @@ public void spec312_cancelMustRequestThePublisherToEventuallyStopSignaling() thr
437448
subscriberTest(new TestStageTestRun() {
438449
@Override
439450
public void run(WhiteboxTestStage stage) throws InterruptedException {
451+
final int requestedElements = 5;
452+
453+
stage.puppet().triggerRequest(requestedElements);
454+
stage.expectRequest();
455+
stage.signalNext();
456+
457+
// cancelling is NOT REQUIRED to stop the stream immediatly, example: there may be signals in flight
440458
stage.puppet().signalCancel();
441-
stage.expectCancelling();
459+
460+
stage.probe().expectNext();
461+
long onNextSignals = 1;
462+
463+
boolean stillBeingSignalled;
464+
do {
465+
// depends on Rule 3.6 https://github.com/reactive-streams/reactive-streams#3.12
466+
// requesting from a cancelled subscription MUST be a NOPs
467+
stage.puppet().triggerRequest(1);
468+
stage.signalNext(); // signalling will stop, since Subscription cancelled
469+
470+
try {
471+
stage.probe().expectNone();
472+
stillBeingSignalled = false;
473+
} catch (Throwable th) {
474+
onNextSignals += 1;
475+
stillBeingSignalled = true;
476+
}
477+
} while (stillBeingSignalled && onNextSignals < maxOnNextSignalsInTest());
442478

443479
stage.verifyNoAsyncErrors();
444480
}
@@ -518,7 +554,7 @@ public void subscriberTestWithoutSetup(TestStageTestRun body) throws Throwable {
518554

519555
public class WhiteboxTestStage extends ManualPublisher<T> {
520556
public Publisher<T> pub;
521-
public ManualSubscriber<T> tees; // gives us access to an infinite stream of T values
557+
public ManualSubscriber<T> tees; // gives us access to a stream T values
522558
public WhiteboxSubscriberProbe<T> probe;
523559

524560
public T lastT = null;
@@ -546,6 +582,10 @@ public SubscriberPuppet puppet() {
546582
return probe.puppet();
547583
}
548584

585+
public WhiteboxSubscriberProbe<T> probe() {
586+
return probe;
587+
}
588+
549589
public Publisher<T> createHelperPublisher(long elements) {
550590
return SubscriberWhiteboxVerification.this.createHelperPublisher(elements);
551591
}

0 commit comments

Comments
 (0)