From 79e16698a205064662ccc834bfd7db9c9f69b8b9 Mon Sep 17 00:00:00 2001 From: Konrad 'ktoso' Malawski Date: Wed, 1 Oct 2014 22:23:42 +0900 Subject: [PATCH] Rule 3.12 verification in subscriber whitebox now really checks it This verification is now moved to PublisherVerification, as it's a publishers behavior (*it* must stop emiting, not the subscriber). Resolves #115 --- .../tck/IdentityProcessorVerification.java | 13 +++--- .../tck/PublisherVerification.java | 42 +++++++++++++++++++ .../tck/SubscriberBlackboxVerification.java | 6 --- .../tck/SubscriberWhiteboxVerification.java | 31 +++++++------- .../reactivestreams/tck/TestEnvironment.java | 11 ++++- 5 files changed, 72 insertions(+), 31 deletions(-) diff --git a/tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java b/tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java index 1e55407c..7834476e 100644 --- a/tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java +++ b/tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java @@ -324,6 +324,11 @@ public void spec309_requestNegativeNumberMustThrowIllegalArgumentException() thr publisherVerification.spec309_requestNegativeNumberMustThrowIllegalArgumentException(); } + @Test + public void spec312_cancelMustMakeThePublisherToEventuallyStopSignaling() throws Throwable { + publisherVerification.spec312_cancelMustMakeThePublisherToEventuallyStopSignaling(); + } + @Test public void spec313_cancelMustMakeThePublisherEventuallyDropAllReferencesToTheSubscriber() throws Throwable { publisherVerification.spec313_cancelMustMakeThePublisherEventuallyDropAllReferencesToTheSubscriber(); @@ -386,9 +391,6 @@ public void onSubscribe(final Subscription subscription) { if (subs.isCompleted()) subscription.cancel(); // the Probe must also pass subscriber verification probe.registerOnSubscribe(new SubscriberWhiteboxVerification.SubscriberPuppet() { - public void triggerShutdown() { - subscription.cancel(); - } @Override public void triggerRequest(long elements) { @@ -583,11 +585,6 @@ public void spec311_requestMaySynchronouslyCallOnCompleteOrOnError() throws Exce subscriberVerification.spec311_requestMaySynchronouslyCallOnCompleteOrOnError(); } - @Test - public void spec312_cancelMustRequestThePublisherToEventuallyStopSignaling() throws Throwable { - subscriberVerification.spec312_cancelMustRequestThePublisherToEventuallyStopSignaling(); - } - @Test public void spec314_cancelMayCauseThePublisherToShutdownIfNoOtherSubscriptionExists() throws Exception { subscriberVerification.spec314_cancelMayCauseThePublisherToShutdownIfNoOtherSubscriptionExists(); diff --git a/tck/src/main/java/org/reactivestreams/tck/PublisherVerification.java b/tck/src/main/java/org/reactivestreams/tck/PublisherVerification.java index 39876412..dfede81f 100644 --- a/tck/src/main/java/org/reactivestreams/tck/PublisherVerification.java +++ b/tck/src/main/java/org/reactivestreams/tck/PublisherVerification.java @@ -19,6 +19,7 @@ import static org.reactivestreams.tck.Annotations.*; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; /** @@ -656,6 +657,47 @@ public void run() { }); } + // Verifies rule: https://github.com/reactive-streams/reactive-streams#3.12 + @Required @Test + public void spec312_cancelMustMakeThePublisherToEventuallyStopSignaling() throws Throwable { + final int elementsCount = 20; + + activePublisherTest(elementsCount, new PublisherTestRun() { + @Override + public void run(Publisher pub) throws Throwable { + final ManualSubscriber sub = env.newManualSubscriber(pub); + + sub.request(10); + sub.request(5); + final int totalDemand = 10 + 5; + + sub.cancel(); + + sub.nextElement(); + int onNextsSignalled = 1; + + boolean stillBeingSignalled; + do { + // put asyncError if onNext signal received + sub.expectNone(); + Throwable error = env.dropAsyncError(); + + if (error == null) { + stillBeingSignalled = false; + } else { + onNextsSignalled += 1; + stillBeingSignalled = true; + } + } while (stillBeingSignalled && onNextsSignalled < totalDemand); + + assertTrue(onNextsSignalled <= totalDemand, + String.format("Publisher signalled [%d] elements, which is more than the signalled demand: %d", onNextsSignalled, totalDemand)); + } + }); + + env.verifyNoAsyncErrors(); + } + // Verifies rule: https://github.com/reactive-streams/reactive-streams#3.13 @Required @Test public void spec313_cancelMustMakeThePublisherEventuallyDropAllReferencesToTheSubscriber() throws Throwable { diff --git a/tck/src/main/java/org/reactivestreams/tck/SubscriberBlackboxVerification.java b/tck/src/main/java/org/reactivestreams/tck/SubscriberBlackboxVerification.java index eb71a2b4..cfb1b61e 100644 --- a/tck/src/main/java/org/reactivestreams/tck/SubscriberBlackboxVerification.java +++ b/tck/src/main/java/org/reactivestreams/tck/SubscriberBlackboxVerification.java @@ -335,12 +335,6 @@ public void spec311_blackbox_requestMaySynchronouslyCallOnCompleteOrOnError() th notVerified(); // cannot be meaningfully tested, or can it? } - // Verifies rule: https://github.com/reactive-streams/reactive-streams#3.12 - @Required @Test - public void spec312_blackbox_cancelMustRequestThePublisherToEventuallyStopSignaling() throws Throwable { - notVerified(); // cannot be meaningfully tested as black box, or can it? - } - // Verifies rule: https://github.com/reactive-streams/reactive-streams#3.14 @NotVerified @Test public void spec314_blackbox_cancelMayCauseThePublisherToShutdownIfNoOtherSubscriptionExists() throws Exception { diff --git a/tck/src/main/java/org/reactivestreams/tck/SubscriberWhiteboxVerification.java b/tck/src/main/java/org/reactivestreams/tck/SubscriberWhiteboxVerification.java index a01ef4d5..cee14af8 100644 --- a/tck/src/main/java/org/reactivestreams/tck/SubscriberWhiteboxVerification.java +++ b/tck/src/main/java/org/reactivestreams/tck/SubscriberWhiteboxVerification.java @@ -52,6 +52,17 @@ protected SubscriberWhiteboxVerification(TestEnvironment env) { */ public abstract Publisher createHelperPublisher(long elements); + /** + * Used to break possibly infinite wait-loops. + * Some Rules use the "eventually stop signalling" wording, which requires the test to spin accepting {@code onNext} + * signals until no more are signalled. In these tests, this value will be used as upper bound on the number of spin iterations. + * + * Override this method in case your implementation synchronously signals very large batches before reacting to cancellation (for example). + */ + public long maxOnNextSignalsInTest() { + return 100; + } + ////////////////////// TEST ENV CLEANUP ///////////////////////////////////// @BeforeMethod @@ -431,20 +442,6 @@ public void spec311_requestMaySynchronouslyCallOnCompleteOrOnError() throws Exce notVerified(); // cannot be meaningfully tested, or can it? } - // Verifies rule: https://github.com/reactive-streams/reactive-streams#3.12 - @Required @Test - public void spec312_cancelMustRequestThePublisherToEventuallyStopSignaling() throws Throwable { - subscriberTest(new TestStageTestRun() { - @Override - public void run(WhiteboxTestStage stage) throws InterruptedException { - stage.puppet().signalCancel(); - stage.expectCancelling(); - - stage.verifyNoAsyncErrors(); - } - }); - } - // Verifies rule: https://github.com/reactive-streams/reactive-streams#3.14 @NotVerified @Test public void spec314_cancelMayCauseThePublisherToShutdownIfNoOtherSubscriptionExists() throws Exception { @@ -518,7 +515,7 @@ public void subscriberTestWithoutSetup(TestStageTestRun body) throws Throwable { public class WhiteboxTestStage extends ManualPublisher { public Publisher pub; - public ManualSubscriber tees; // gives us access to an infinite stream of T values + public ManualSubscriber tees; // gives us access to a stream T values public WhiteboxSubscriberProbe probe; public T lastT = null; @@ -546,6 +543,10 @@ public SubscriberPuppet puppet() { return probe.puppet(); } + public WhiteboxSubscriberProbe probe() { + return probe; + } + public Publisher createHelperPublisher(long elements) { return SubscriberWhiteboxVerification.this.createHelperPublisher(elements); } diff --git a/tck/src/main/java/org/reactivestreams/tck/TestEnvironment.java b/tck/src/main/java/org/reactivestreams/tck/TestEnvironment.java index 4a554d48..f6dd2eb3 100644 --- a/tck/src/main/java/org/reactivestreams/tck/TestEnvironment.java +++ b/tck/src/main/java/org/reactivestreams/tck/TestEnvironment.java @@ -128,6 +128,14 @@ public void clearAsyncErrors() { asyncErrors.clear(); } + public Throwable dropAsyncError() { + try { + return asyncErrors.remove(0); + } catch (IndexOutOfBoundsException ex) { + return null; + } + } + public void verifyNoAsyncErrors(long delay) { try { Thread.sleep(delay); @@ -620,7 +628,6 @@ public boolean isCompleted() { return _value != null; } - /** * Allows using expectCompletion to await for completion of the value and complete it _then_ */ @@ -761,7 +768,7 @@ public void expectError(long timeoutMillis, String errorMsg) throws Exception { } } - void expectNone(long withinMillis, String errorMsgPrefix) throws InterruptedException { + public void expectNone(long withinMillis, String errorMsgPrefix) throws InterruptedException { Thread.sleep(withinMillis); Optional value = abq.poll();