From c97be991f2a508bfcf882439318214cfd223a7ae Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Fri, 27 Mar 2015 13:01:18 +0100 Subject: [PATCH] Fixes #233 by implementing support for triggered demand in in the SubscriberBlackboxVerification --- .../tck/SubscriberBlackboxVerification.java | 12 ++ .../SyncTriggeredDemandSubscriberTest.java | 47 +++++++ ...TriggeredDemandSubscriberWhiteboxTest.java | 77 +++++++++++ .../SyncTriggeredDemandSubscriber.java | 123 ++++++++++++++++++ 4 files changed, 259 insertions(+) create mode 100644 tck/src/test/java/org/reactivestreams/tck/SyncTriggeredDemandSubscriberTest.java create mode 100644 tck/src/test/java/org/reactivestreams/tck/SyncTriggeredDemandSubscriberWhiteboxTest.java create mode 100644 tck/src/test/java/org/reactivestreams/tck/support/SyncTriggeredDemandSubscriber.java diff --git a/tck/src/main/java/org/reactivestreams/tck/SubscriberBlackboxVerification.java b/tck/src/main/java/org/reactivestreams/tck/SubscriberBlackboxVerification.java index cb02800a..5f132aec 100644 --- a/tck/src/main/java/org/reactivestreams/tck/SubscriberBlackboxVerification.java +++ b/tck/src/main/java/org/reactivestreams/tck/SubscriberBlackboxVerification.java @@ -48,6 +48,16 @@ protected SubscriberBlackboxVerification(TestEnvironment env) { */ public abstract Subscriber createSubscriber(); + /** + * Override this method if the Subscriber implementation you are verifying + * needs an external signal before it signals demand to its Publisher. + * + * By default this method does nothing. + */ + public void triggerRequest(final Subscriber subscriber) { + + } + // ENV SETUP /** @@ -74,6 +84,7 @@ public void required_spec201_blackbox_mustSignalDemandViaSubscriptionRequest() t blackboxSubscriberTest(new BlackboxTestStageTestRun() { @Override public void run(BlackboxTestStage stage) throws InterruptedException { + triggerRequest(stage.subProxy().sub()); final long n = stage.expectRequest();// assuming subscriber wants to consume elements... // should cope with up to requested number of elements @@ -248,6 +259,7 @@ public void run(BlackboxTestStage stage) throws Throwable { final BlackboxSubscriberProxy probe = stage.createBlackboxSubscriberProxy(env, sub); pub.subscribe(probe); + triggerRequest(sub); probe.expectCompletion(); probe.expectNone(); diff --git a/tck/src/test/java/org/reactivestreams/tck/SyncTriggeredDemandSubscriberTest.java b/tck/src/test/java/org/reactivestreams/tck/SyncTriggeredDemandSubscriberTest.java new file mode 100644 index 00000000..0b099fcf --- /dev/null +++ b/tck/src/test/java/org/reactivestreams/tck/SyncTriggeredDemandSubscriberTest.java @@ -0,0 +1,47 @@ +package org.reactivestreams.tck; + +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; +import org.reactivestreams.tck.SubscriberBlackboxVerification; +import org.reactivestreams.tck.TestEnvironment; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import org.reactivestreams.tck.support.SyncTriggeredDemandSubscriber; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +@Test // Must be here for TestNG to find and run this, do not remove +public class SyncTriggeredDemandSubscriberTest extends SubscriberBlackboxVerification { + + private ExecutorService e; + @BeforeClass void before() { e = Executors.newFixedThreadPool(4); } + @AfterClass void after() { if (e != null) e.shutdown(); } + + public SyncTriggeredDemandSubscriberTest() { + super(new TestEnvironment()); + } + + @Override public void triggerRequest(final Subscriber subscriber) { + ((SyncTriggeredDemandSubscriber)subscriber).triggerDemand(1); + } + + @Override public Subscriber createSubscriber() { + return new SyncTriggeredDemandSubscriber() { + private long acc; + @Override protected long foreach(final Integer element) { + acc += element; + return 1; + } + + @Override public void onComplete() { + } + }; + } + + @Override public Integer createElement(int element) { + return element; + } +} diff --git a/tck/src/test/java/org/reactivestreams/tck/SyncTriggeredDemandSubscriberWhiteboxTest.java b/tck/src/test/java/org/reactivestreams/tck/SyncTriggeredDemandSubscriberWhiteboxTest.java new file mode 100644 index 00000000..fb85a434 --- /dev/null +++ b/tck/src/test/java/org/reactivestreams/tck/SyncTriggeredDemandSubscriberWhiteboxTest.java @@ -0,0 +1,77 @@ +package org.reactivestreams.tck; + +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; +import org.reactivestreams.tck.SubscriberBlackboxVerification; +import org.reactivestreams.tck.SubscriberWhiteboxVerification; +import org.reactivestreams.tck.TestEnvironment; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import org.reactivestreams.tck.support.SyncTriggeredDemandSubscriber; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +@Test // Must be here for TestNG to find and run this, do not remove +public class SyncTriggeredDemandSubscriberWhiteboxTest extends SubscriberWhiteboxVerification { + + private ExecutorService e; + @BeforeClass void before() { e = Executors.newFixedThreadPool(4); } + @AfterClass void after() { if (e != null) e.shutdown(); } + + public SyncTriggeredDemandSubscriberWhiteboxTest() { + super(new TestEnvironment()); + } + + @Override + public Subscriber createSubscriber(final WhiteboxSubscriberProbe probe) { + return new SyncTriggeredDemandSubscriber() { + @Override + public void onSubscribe(final Subscription s) { + super.onSubscribe(s); + + probe.registerOnSubscribe(new SubscriberPuppet() { + @Override + public void triggerRequest(long elements) { + s.request(elements); + } + + @Override + public void signalCancel() { + s.cancel(); + } + }); + } + + @Override + public void onNext(Integer element) { + super.onNext(element); + probe.registerOnNext(element); + } + + @Override + public void onError(Throwable cause) { + super.onError(cause); + probe.registerOnError(cause); + } + + @Override + public void onComplete() { + super.onComplete(); + probe.registerOnComplete(); + } + + @Override + protected long foreach(Integer element) { + return 1; + } + }; + } + + @Override public Integer createElement(int element) { + return element; + } + +} diff --git a/tck/src/test/java/org/reactivestreams/tck/support/SyncTriggeredDemandSubscriber.java b/tck/src/test/java/org/reactivestreams/tck/support/SyncTriggeredDemandSubscriber.java new file mode 100644 index 00000000..00a3ea6c --- /dev/null +++ b/tck/src/test/java/org/reactivestreams/tck/support/SyncTriggeredDemandSubscriber.java @@ -0,0 +1,123 @@ +package org.reactivestreams.tck.support; + +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; + +/** + * SyncTriggeredDemandSubscriber is an implementation of Reactive Streams `Subscriber`, + * it runs synchronously (on the Publisher's thread) and requests demand triggered from + * "the outside" using its `triggerDemand` method and from "the inside" using the return + * value of its user-defined `foreach` method which is invoked to process each element. + * + * NOTE: The code below uses a lot of try-catches to show the reader where exceptions can be expected, and where they are forbidden. + */ +public abstract class SyncTriggeredDemandSubscriber implements Subscriber { + private Subscription subscription; // Obeying rule 3.1, we make this private! + private boolean done = false; + + @Override public void onSubscribe(final Subscription s) { + // As per rule 2.13, we need to throw a `java.lang.NullPointerException` if the `Subscription` is `null` + if (s == null) throw null; + + if (subscription != null) { // If someone has made a mistake and added this Subscriber multiple times, let's handle it gracefully + try { + s.cancel(); // Cancel the additional subscription + } catch(final Throwable t) { + //Subscription.cancel is not allowed to throw an exception, according to rule 3.15 + (new IllegalStateException(s + " violated the Reactive Streams rule 3.15 by throwing an exception from cancel.", t)).printStackTrace(System.err); + } + } else { + // We have to assign it locally before we use it, if we want to be a synchronous `Subscriber` + // Because according to rule 3.10, the Subscription is allowed to call `onNext` synchronously from within `request` + subscription = s; + } + } + + /** + * Requests the provided number of elements from the `Subscription` of this `Subscriber`. + * NOTE: This makes no attempt at thread safety so only invoke it once from the outside to initiate the demand. + * @return `true` if successful and `false` if not (either due to no `Subscription` or due to exceptions thrown) + */ + public boolean triggerDemand(final long n) { + final Subscription s = subscription; + if (s == null) return false; + else { + try { + s.request(n); + } catch(final Throwable t) { + // Subscription.request is not allowed to throw according to rule 3.16 + (new IllegalStateException(s + " violated the Reactive Streams rule 3.16 by throwing an exception from request.", t)).printStackTrace(System.err); + return false; + } + return true; + } + } + + @Override public void onNext(final T element) { + if (subscription == null) { // Technically this check is not needed, since we are expecting Publishers to conform to the spec + (new IllegalStateException("Publisher violated the Reactive Streams rule 1.09 signalling onNext prior to onSubscribe.")).printStackTrace(System.err); + } else { + // As per rule 2.13, we need to throw a `java.lang.NullPointerException` if the `element` is `null` + if (element == null) throw null; + + if (!done) { // If we aren't already done + try { + final long need = foreach(element); + if (need > 0) triggerDemand(need); + else if (need == 0) {} + else { + done(); + } + } catch (final Throwable t) { + done(); + try { + onError(t); + } catch (final Throwable t2) { + //Subscriber.onError is not allowed to throw an exception, according to rule 2.13 + (new IllegalStateException(this + " violated the Reactive Streams rule 2.13 by throwing an exception from onError.", t2)).printStackTrace(System.err); + } + } + } + } + } + + // Showcases a convenience method to idempotently marking the Subscriber as "done", so we don't want to process more elements + // herefor we also need to cancel our `Subscription`. + private void done() { + //On this line we could add a guard against `!done`, but since rule 3.7 says that `Subscription.cancel()` is idempotent, we don't need to. + done = true; // If we `foreach` throws an exception, let's consider ourselves done (not accepting more elements) + try { + subscription.cancel(); // Cancel the subscription + } catch(final Throwable t) { + //Subscription.cancel is not allowed to throw an exception, according to rule 3.15 + (new IllegalStateException(subscription + " violated the Reactive Streams rule 3.15 by throwing an exception from cancel.", t)).printStackTrace(System.err); + } + } + + // This method is left as an exercise to the reader/extension point + // Don't forget to call `triggerDemand` at the end if you are interested in more data, + // a return value of < 0 indicates that the subscription should be cancelled, + // a value of 0 indicates that there is no current need, + // a value of > 0 indicates the current need. + protected abstract long foreach(final T element); + + @Override public void onError(final Throwable t) { + if (subscription == null) { // Technically this check is not needed, since we are expecting Publishers to conform to the spec + (new IllegalStateException("Publisher violated the Reactive Streams rule 1.09 signalling onError prior to onSubscribe.")).printStackTrace(System.err); + } else { + // As per rule 2.13, we need to throw a `java.lang.NullPointerException` if the `Throwable` is `null` + if (t == null) throw null; + // Here we are not allowed to call any methods on the `Subscription` or the `Publisher`, as per rule 2.3 + // And anyway, the `Subscription` is considered to be cancelled if this method gets called, as per rule 2.4 + } + } + + @Override public void onComplete() { + if (subscription == null) { // Technically this check is not needed, since we are expecting Publishers to conform to the spec + (new IllegalStateException("Publisher violated the Reactive Streams rule 1.09 signalling onComplete prior to onSubscribe.")).printStackTrace(System.err); + } else { + // Here we are not allowed to call any methods on the `Subscription` or the `Publisher`, as per rule 2.3 + // And anyway, the `Subscription` is considered to be cancelled if this method gets called, as per rule 2.4 + } + } +} \ No newline at end of file