From 7c51b068dddb94a501bdcc20d94aa5d24b632494 Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Mon, 21 Apr 2014 16:55:35 -0700 Subject: [PATCH 01/19] Source/Listener/Handle A simplified implementation where SPI and API combine. --- project/plugins.sbt | 2 + spi/.gitignore | 1 + .../main/java/org/reactivestreams/Handle.java | 29 ++++++ .../java/org/reactivestreams/Listener.java | 55 +++++++++++ .../main/java/org/reactivestreams/Source.java | 17 ++++ .../org/reactivestreams/api/Consumer.java | 21 ----- .../org/reactivestreams/api/Processor.java | 10 -- .../org/reactivestreams/api/Producer.java | 38 -------- .../org/reactivestreams/spi/Publisher.java | 16 ---- .../org/reactivestreams/spi/Subscriber.java | 43 --------- .../org/reactivestreams/spi/Subscription.java | 26 ------ tck/.gitignore | 1 + .../tck/IdentityProcessorVerification.java | 55 ++++++----- .../tck/PublisherVerification.java | 92 ++++++++++--------- .../tck/SubscriberVerification.java | 27 +++--- .../reactivestreams/tck/TestEnvironment.java | 46 +++++----- .../reactivestreams/tck/TestProcessor.java | 15 +++ 17 files changed, 233 insertions(+), 261 deletions(-) create mode 100644 spi/.gitignore create mode 100644 spi/src/main/java/org/reactivestreams/Handle.java create mode 100644 spi/src/main/java/org/reactivestreams/Listener.java create mode 100644 spi/src/main/java/org/reactivestreams/Source.java delete mode 100644 spi/src/main/java/org/reactivestreams/api/Consumer.java delete mode 100644 spi/src/main/java/org/reactivestreams/api/Processor.java delete mode 100644 spi/src/main/java/org/reactivestreams/api/Producer.java delete mode 100644 spi/src/main/java/org/reactivestreams/spi/Publisher.java delete mode 100644 spi/src/main/java/org/reactivestreams/spi/Subscriber.java delete mode 100644 spi/src/main/java/org/reactivestreams/spi/Subscription.java create mode 100644 tck/.gitignore create mode 100644 tck/src/main/java/org/reactivestreams/tck/TestProcessor.java diff --git a/project/plugins.sbt b/project/plugins.sbt index a5ddf282..0f403e99 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -1,3 +1,5 @@ +addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "2.4.0") + addSbtPlugin("de.johoop" % "sbt-testng-plugin" % "3.0.0") addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "2.4.0") diff --git a/spi/.gitignore b/spi/.gitignore new file mode 100644 index 00000000..5e56e040 --- /dev/null +++ b/spi/.gitignore @@ -0,0 +1 @@ +/bin diff --git a/spi/src/main/java/org/reactivestreams/Handle.java b/spi/src/main/java/org/reactivestreams/Handle.java new file mode 100644 index 00000000..7c812964 --- /dev/null +++ b/spi/src/main/java/org/reactivestreams/Handle.java @@ -0,0 +1,29 @@ +package org.reactivestreams; + +/** + * A {@link Handle} represents a one-to-one lifecycle of a {@link Listener} subscribing to a {@link Source}. + *

+ * It can only be used once by a single {@link Listener}. + *

+ * It is used to both signal desire for data and cancel demand (and allow resource cleanup). + * + */ +public interface Handle { + /** + * No events will be sent by a {@link Source} until demand is signaled via this method. + *

+ * It can be called however often and whenever needed. + *

+ * Whatever has been signalled can be sent by the {@link Source} so only signal demand for what can be safely handled. + * + * @param n + */ + public void request(int n); + + /** + * Request the {@link Source} to stop sending data and clean up resources. + *

+ * Data may still be sent to meet previously signalled demand after calling cancel as this request is asynchronous. + */ + public void cancel(); +} diff --git a/spi/src/main/java/org/reactivestreams/Listener.java b/spi/src/main/java/org/reactivestreams/Listener.java new file mode 100644 index 00000000..8f4b136e --- /dev/null +++ b/spi/src/main/java/org/reactivestreams/Listener.java @@ -0,0 +1,55 @@ +package org.reactivestreams; + +/** + * Will receive call to {@link #onListen(Handle)} once after passing an instance of {@link Listener} to {@link Source#listen(Listener)}. + *

+ * No further notifications will be received until {@link Handle#request(int)} is called. + *

+ * After signaling demand: + *

+ *

+ * Demand can be signalled via {@link Handle#request(int)} whenever the {@link Listener} instance is capable of handling more. + * + * @param + */ +public interface Listener { + /** + * Invoked after calling {@link Source#listen(Listener)}. + *

+ * No data will start flowing until {@link Handle#request(int)} is invoked. + *

+ * It is the resonsibility of this {@link Listener} instance to call {@link Handle#request(int)} whenever more data is wanted. + *

+ * The {@link Source} will send notifications only in response to {@link Handle#request(int)}. + * + * @param s + * {@link Handle} that allows requesting data via {@link Handle#request(int)} + */ + public void onListen(Handle s); + + /** + * Data notification sent by the {@link Source} in response to requests to {@link Handle#request(int)}. + * + * @param t + */ + public void onNext(T t); + + /** + * Failed terminal state. + *

+ * No further events will be sent even if {@link Handle#request(int)} is invoked again. + * + * @param t + */ + public void onError(Throwable t); + + /** + * Successful terminal state. + *

+ * No further events will be sent even if {@link Handle#request(int)} is invoked again. + */ + public void onComplete(); +} diff --git a/spi/src/main/java/org/reactivestreams/Source.java b/spi/src/main/java/org/reactivestreams/Source.java new file mode 100644 index 00000000..f4e0b19d --- /dev/null +++ b/spi/src/main/java/org/reactivestreams/Source.java @@ -0,0 +1,17 @@ +package org.reactivestreams; + +public interface Source { + + /** + * Request {@link Source} to start streaming data. + *

+ * This is a "factory method" and can be called multiple times, each time starting a new {@link Listener}. + *

+ * Each {@link Handle} will work for only a single {@link Listener}. + *

+ * A {@link Listener} should only subscribe once to a single {@link Source}. + * + * @param s + */ + public void listen(Listener s); +} diff --git a/spi/src/main/java/org/reactivestreams/api/Consumer.java b/spi/src/main/java/org/reactivestreams/api/Consumer.java deleted file mode 100644 index d9696edd..00000000 --- a/spi/src/main/java/org/reactivestreams/api/Consumer.java +++ /dev/null @@ -1,21 +0,0 @@ -package org.reactivestreams.api; - -import org.reactivestreams.spi.Subscriber; - -/** - * A Consumer is the logical sink of elements of a given type. - * The underlying implementation is done by way of a {@link org.reactivestreams.spi.Subscriber Subscriber}. - * This interface is the user-level API for a sink while a Subscriber is the SPI. - *

- * Implementations of this interface will typically offer domain- or language-specific - * methods for transforming or otherwise interacting with the stream of elements. - */ -public interface Consumer { - - /** - * Get the underlying {@link org.reactivestreams.spi.Subscriber Subscriber} for this Consumer. This method should only be used by - * implementations of this API. - * @return the underlying subscriber for this consumer - */ - public Subscriber getSubscriber(); -} \ No newline at end of file diff --git a/spi/src/main/java/org/reactivestreams/api/Processor.java b/spi/src/main/java/org/reactivestreams/api/Processor.java deleted file mode 100644 index da522cd6..00000000 --- a/spi/src/main/java/org/reactivestreams/api/Processor.java +++ /dev/null @@ -1,10 +0,0 @@ -package org.reactivestreams.api; - -/** - * A Processor is a stand-alone representation of a transformation for - * elements from In to Out types. Implementations of this API will provide - * factory methods for creating Processors and connecting them to - * {@link org.reactivestreams.api.Producer Producer} and {@link org.reactivestreams.api.Consumer Consumer}. - */ -public interface Processor extends Consumer, Producer { -} diff --git a/spi/src/main/java/org/reactivestreams/api/Producer.java b/spi/src/main/java/org/reactivestreams/api/Producer.java deleted file mode 100644 index 3e8b5640..00000000 --- a/spi/src/main/java/org/reactivestreams/api/Producer.java +++ /dev/null @@ -1,38 +0,0 @@ -package org.reactivestreams.api; - -import org.reactivestreams.spi.Publisher; - -/** - * A Producer is the logical source of elements of a given type. - * The underlying implementation is done by way of a {@link org.reactivestreams.spi.Publisher Publisher}. - * This interface is the user-level API for a source while a Publisher is the - * SPI. - *

- * Implementations of this interface will typically offer domain- or language-specific - * methods for transforming or otherwise interacting with the produced stream of elements. - */ -public interface Producer { - - /** - * Get the underlying {@link org.reactivestreams.spi.Publisher Publisher} for this Producer. This method should only be used by - * implementations of this API. - * @return the underlying publisher for this producer - */ - public Publisher getPublisher(); - - /** - * Connect the given consumer to this producer. This means that the - * Subscriber underlying the {@link org.reactivestreams.api.Consumer Consumer} subscribes to this Producer’s - * underlying {@link org.reactivestreams.spi.Publisher Publisher}, which will initiate the transfer of the produced - * stream of elements from producer to consumer until either of three things - * happen: - *

- *

- * @param consumer The consumer to register with this producer. - */ - public void produceTo(Consumer consumer); -} diff --git a/spi/src/main/java/org/reactivestreams/spi/Publisher.java b/spi/src/main/java/org/reactivestreams/spi/Publisher.java deleted file mode 100644 index 7a4a2154..00000000 --- a/spi/src/main/java/org/reactivestreams/spi/Publisher.java +++ /dev/null @@ -1,16 +0,0 @@ -package org.reactivestreams.spi; - -/** - * A Publisher is a source of elements of a given type. One or more {@link org.reactivestreams.spi.Subscriber Subscriber} may be connected - * to this Publisher in order to receive the published elements, contingent on availability of these - * elements as well as the presence of demand signaled by the Subscriber via {@link org.reactivestreams.spi.Subscription#requestMore(int) requestMore}. - */ -public interface Publisher { - - /** - * Subscribe the given {@link org.reactivestreams.spi.Subscriber Subscriber} to this Publisher. A Subscriber can at most be subscribed once - * to a given Publisher, and to at most one Publisher in total. - * @param subscriber The subscriber to register with this publisher. - */ - public void subscribe(Subscriber subscriber); -} diff --git a/spi/src/main/java/org/reactivestreams/spi/Subscriber.java b/spi/src/main/java/org/reactivestreams/spi/Subscriber.java deleted file mode 100644 index c900229c..00000000 --- a/spi/src/main/java/org/reactivestreams/spi/Subscriber.java +++ /dev/null @@ -1,43 +0,0 @@ -package org.reactivestreams.spi; - -/** - * A Subscriber receives elements from a {@link org.reactivestreams.spi.Publisher Publisher} based on the {@link org.reactivestreams.spi.Subscription Subscription} it has. - * The Publisher may supply elements as they become available, the Subscriber signals demand via - * {@link org.reactivestreams.spi.Subscription#requestMore(int) requestMore} and elements from when supply and demand are both present. - */ -public interface Subscriber { - - /** - * The {@link org.reactivestreams.spi.Publisher Publisher} generates a {@link org.reactivestreams.spi.Subscription Subscription} upon {@link org.reactivestreams.spi.Publisher#subscribe(org.reactivestreams.spi.Subscriber) subscribe} and passes - * it on to the Subscriber named there using this method. The Publisher may choose to reject - * the subscription request by calling {@link #onError onError} instead. - * @param subscription The subscription which connects this subscriber to its publisher. - */ - public void onSubscribe(Subscription subscription); - - /** - * The {@link org.reactivestreams.spi.Publisher Publisher} calls this method to pass one element to this Subscriber. The element - * must not be null. The Publisher must not call this method more often than - * the Subscriber has signaled demand for via the corresponding {@link org.reactivestreams.spi.Subscription Subscription}. - * @param element The element that is passed from publisher to subscriber. - */ - public void onNext(T element); - - /** - * The {@link org.reactivestreams.spi.Publisher Publisher} calls this method in order to signal that it terminated normally. - * No more elements will be forthcoming and none of the Subscriber’s methods will be called hereafter. - */ - public void onComplete(); - - /** - * The {@link org.reactivestreams.spi.Publisher Publisher} calls this method to signal that the stream of elements has failed - * and is being aborted. The Subscriber should abort its processing as soon as possible. - * No more elements will be forthcoming and none of the Subscriber’s methods will be called hereafter. - *

- * This method is not intended to pass validation errors or similar from Publisher to Subscriber - * in order to initiate an orderly shutdown of the exchange; it is intended only for fatal - * failure conditions which make it impossible to continue processing further elements. - * @param cause An exception which describes the reason for tearing down this stream. - */ - public void onError(Throwable cause); -} diff --git a/spi/src/main/java/org/reactivestreams/spi/Subscription.java b/spi/src/main/java/org/reactivestreams/spi/Subscription.java deleted file mode 100644 index 3c8b1f47..00000000 --- a/spi/src/main/java/org/reactivestreams/spi/Subscription.java +++ /dev/null @@ -1,26 +0,0 @@ -package org.reactivestreams.spi; - -/** - * A Subscription models the relationship between a {@link org.reactivestreams.spi.Publisher Publisher} and a {@link org.reactivestreams.spi.Subscriber Subscriber}. - * The Subscriber receives a Subscription so that it can ask for elements to be delivered - * using {@link org.reactivestreams.spi.Subscription#requestMore(int) requestMore}. The Subscription can be disposed of by canceling it. - */ -public interface Subscription { - - /** - * Cancel this subscription. The {@link org.reactivestreams.spi.Publisher Publisher} to which produced this Subscription - * will eventually stop sending more elements to the {@link org.reactivestreams.spi.Subscriber Subscriber} which owns - * this Subscription. This may happen before the requested number of elements has - * been delivered, even if the Publisher would still have more elements. - */ - public void cancel(); - - /** - * Request more data from the {@link org.reactivestreams.spi.Publisher Publisher} which produced this Subscription. - * The number of requested elements is cumulative to the number requested previously. - * The Publisher may eventually publish up to the requested number of elements to - * the {@link org.reactivestreams.spi.Subscriber Subscriber} which owns this Subscription. - * @param elements The number of elements requested. - */ - public void requestMore(int elements); -} \ No newline at end of file diff --git a/tck/.gitignore b/tck/.gitignore new file mode 100644 index 00000000..5e56e040 --- /dev/null +++ b/tck/.gitignore @@ -0,0 +1 @@ +/bin diff --git a/tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java b/tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java index 99f18d78..f6e44213 100644 --- a/tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java +++ b/tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java @@ -1,18 +1,17 @@ package org.reactivestreams.tck; -import org.reactivestreams.api.Processor; -import org.reactivestreams.spi.Publisher; -import org.reactivestreams.spi.Subscriber; -import org.reactivestreams.spi.Subscription; +import java.util.HashSet; +import java.util.Set; + +import org.reactivestreams.Handle; +import org.reactivestreams.Listener; +import org.reactivestreams.Source; import org.reactivestreams.tck.TestEnvironment.ManualPublisher; import org.reactivestreams.tck.TestEnvironment.ManualSubscriber; import org.reactivestreams.tck.TestEnvironment.ManualSubscriberWithSubscriptionSupport; import org.reactivestreams.tck.TestEnvironment.Promise; import org.testng.annotations.Test; -import java.util.HashSet; -import java.util.Set; - public abstract class IdentityProcessorVerification { private final TestEnvironment env; @@ -45,29 +44,29 @@ public IdentityProcessorVerification(final TestEnvironment env, long publisherSh this.subscriberVerification = new SubscriberVerification(env) { @Override - Subscriber createSubscriber(SubscriberProbe probe) { + Listener createSubscriber(SubscriberProbe probe) { return IdentityProcessorVerification.this.createSubscriber(probe); } @Override - Publisher createHelperPublisher(int elements) { + Source createHelperPublisher(int elements) { return IdentityProcessorVerification.this.createHelperPublisher(elements); } }; publisherVerification = new PublisherVerification(env, publisherShutdownTimeoutMillis) { @Override - public Publisher createPublisher(int elements) { + public Source createPublisher(int elements) { return IdentityProcessorVerification.this.createPublisher(elements); } @Override - public Publisher createCompletedStatePublisher() { + public Source createCompletedStatePublisher() { return IdentityProcessorVerification.this.createCompletedStatePublisher(); } @Override - public Publisher createErrorStatePublisher() { + public Source createErrorStatePublisher() { return IdentityProcessorVerification.this.createErrorStatePublisher(); } @@ -79,7 +78,7 @@ public Publisher createErrorStatePublisher() { * It must create a Processor, which simply forwards all stream elements from its upstream * to its downstream. It must be able to internally buffer the given number of elements. */ - public abstract Processor createIdentityProcessor(int bufferSize); + public abstract TestProcessor createIdentityProcessor(int bufferSize); /** * Helper method required for running the Publisher rules against a Processor. @@ -88,28 +87,28 @@ public Publisher createErrorStatePublisher() { * The stream must not produce the same element twice (in case of an infinite stream this requirement * is relaxed to only apply to the elements that are actually requested during all tests). */ - public abstract Publisher createHelperPublisher(int elements); + public abstract Source createHelperPublisher(int elements); /** * Return a Publisher in {@code completed} state in order to run additional tests on it, * or {@code null} in order to skip them. */ - public abstract Publisher createCompletedStatePublisher(); + public abstract Source createCompletedStatePublisher(); /** * Return a Publisher in {@code error} state in order to run additional tests on it, * or {@code null} in order to skip them. */ - public abstract Publisher createErrorStatePublisher(); + public abstract Source createErrorStatePublisher(); ////////////////////// PUBLISHER RULES VERIFICATION /////////////////////////// // A Processor // must obey all Publisher rules on its producing side - public Publisher createPublisher(int elements) { - Processor processor = createIdentityProcessor(testBufferSize); - Publisher pub = createHelperPublisher(elements); - pub.subscribe(processor.getSubscriber()); + public Source createPublisher(int elements) { + TestProcessor processor = createIdentityProcessor(testBufferSize); + Source pub = createHelperPublisher(elements); + pub.listen(processor.getSubscriber()); return processor.getPublisher(); // we run the PublisherVerification against this } @@ -191,11 +190,11 @@ public void mustNotCallOnCompleteOrOnErrorMoreThanOncePerSubscriber() { // A Processor // must obey all Subscriber rules on its consuming side - public Subscriber createSubscriber(final SubscriberVerification.SubscriberProbe probe) { - Processor processor = createIdentityProcessor(testBufferSize); - processor.getPublisher().subscribe( - new Subscriber() { - public void onSubscribe(final Subscription subscription) { + public Listener createSubscriber(final SubscriberVerification.SubscriberProbe probe) { + TestProcessor processor = createIdentityProcessor(testBufferSize); + processor.getPublisher().listen( + new Listener() { + public void onListen(final Handle subscription) { probe.registerOnSubscribe( new SubscriberVerification.SubscriberPuppet() { public void triggerShutdown() { @@ -203,7 +202,7 @@ public void triggerShutdown() { } public void triggerRequestMore(int elements) { - subscription.requestMore(elements); + subscription.request(elements); } public void triggerCancel() { @@ -502,7 +501,7 @@ abstract class TestSetup extends ManualPublisher { private TestEnvironment.ManualSubscriber tees; // gives us access to an infinite stream of T values private Set seenTees = new HashSet(); - final Processor processor; + final TestProcessor processor; final int testBufferSize; public TestSetup(TestEnvironment env, int testBufferSize) throws InterruptedException { @@ -510,7 +509,7 @@ public TestSetup(TestEnvironment env, int testBufferSize) throws InterruptedExce this.testBufferSize = testBufferSize; tees = env.newManualSubscriber(createHelperPublisher(0)); processor = createIdentityProcessor(testBufferSize); - subscribe(processor.getSubscriber()); + listen(processor.getSubscriber()); } public TestEnvironment.ManualSubscriber newSubscriber() throws InterruptedException { diff --git a/tck/src/main/java/org/reactivestreams/tck/PublisherVerification.java b/tck/src/main/java/org/reactivestreams/tck/PublisherVerification.java index d296f814..ebded995 100644 --- a/tck/src/main/java/org/reactivestreams/tck/PublisherVerification.java +++ b/tck/src/main/java/org/reactivestreams/tck/PublisherVerification.java @@ -1,10 +1,7 @@ package org.reactivestreams.tck; -import org.reactivestreams.spi.Publisher; -import org.reactivestreams.spi.Subscription; -import org.reactivestreams.tck.support.Optional; -import org.testng.SkipException; -import org.testng.annotations.Test; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; import java.lang.ref.ReferenceQueue; import java.lang.ref.WeakReference; @@ -14,9 +11,16 @@ import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; -import static org.reactivestreams.tck.TestEnvironment.*; -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertTrue; +import org.reactivestreams.Handle; +import org.reactivestreams.Source; +import org.reactivestreams.tck.TestEnvironment.Latch; +import org.reactivestreams.tck.TestEnvironment.ManualSubscriber; +import org.reactivestreams.tck.TestEnvironment.ManualSubscriberWithSubscriptionSupport; +import org.reactivestreams.tck.TestEnvironment.Promise; +import org.reactivestreams.tck.TestEnvironment.TestSubscriber; +import org.reactivestreams.tck.support.Optional; +import org.testng.SkipException; +import org.testng.annotations.Test; public abstract class PublisherVerification { @@ -33,19 +37,19 @@ public PublisherVerification(TestEnvironment env, long publisherShutdownTimeoutM * It must create a Publisher for a stream with exactly the given number of elements. * If `elements` is zero the produced stream must be infinite. */ - public abstract Publisher createPublisher(int elements); + public abstract Source createPublisher(int elements); /** * Return a Publisher in {@code completed} state in order to run additional tests on it, * or {@code null} in order to skip them. */ - public abstract Publisher createCompletedStatePublisher(); + public abstract Source createCompletedStatePublisher(); /** * Return a Publisher in {@code error} state in order to run additional tests on it, * or {@code null} in order to skip them. */ - public abstract Publisher createErrorStatePublisher(); + public abstract Source createErrorStatePublisher(); ////////////////////// TEST SETUP VERIFICATION /////////////////////////// @@ -53,7 +57,7 @@ public PublisherVerification(TestEnvironment env, long publisherShutdownTimeoutM public void createPublisher3MustProduceAStreamOfExactly3Elements() throws Throwable { activePublisherTest(3, new PublisherTestRun() { @Override - public void run(Publisher pub) throws InterruptedException { + public void run(Source pub) throws InterruptedException { TestEnvironment.ManualSubscriber sub = env.newManualSubscriber(pub); assertTrue(requestNextElementOrEndOfStream(pub, sub).isDefined(), String.format("Publisher %s produced no elements", pub)); assertTrue(requestNextElementOrEndOfStream(pub, sub).isDefined(), String.format("Publisher %s produced only 1 element", pub)); @@ -61,7 +65,7 @@ public void run(Publisher pub) throws InterruptedException { sub.requestEndOfStream(); } - Optional requestNextElementOrEndOfStream(Publisher pub, TestEnvironment.ManualSubscriber sub) throws InterruptedException { + Optional requestNextElementOrEndOfStream(Source pub, TestEnvironment.ManualSubscriber sub) throws InterruptedException { return sub.requestNextElementOrEndOfStream("Timeout while waiting for next element from Publisher" + pub); } @@ -79,16 +83,16 @@ Optional requestNextElementOrEndOfStream(Publisher pub, TestEnvironment.Ma public void publisherSubscribeWhenCompletedMustTriggerOnCompleteAndNotOnSubscribe() throws Throwable { completedPublisherTest(new PublisherTestRun() { @Override - public void run(final Publisher pub) throws InterruptedException { + public void run(final Source pub) throws InterruptedException { final Latch latch = new Latch(env); - pub.subscribe( + pub.listen( new TestEnvironment.TestSubscriber(env) { public void onComplete() { latch.assertOpen(String.format("Publisher %s called `onComplete` twice on new Subscriber", pub)); latch.close(); } - public void onSubscribe(Subscription subscription) { + public void onListen(Handle subscription) { env.flop(String.format("Publisher created by `createCompletedStatePublisher()` (%s) called `onSubscribe` on new Subscriber", pub)); } }); @@ -107,9 +111,9 @@ public void onSubscribe(Subscription subscription) { public void publisherSubscribeWhenInErrorStateMustTriggerOnErrorAndNotOnSubscribe() throws Throwable { errorPublisherTest(new PublisherTestRun() { @Override - public void run(final Publisher pub) throws InterruptedException { + public void run(final Source pub) throws InterruptedException { final Latch latch = new Latch(env); - pub.subscribe( + pub.listen( new TestEnvironment.TestSubscriber(env) { public void onError(Throwable cause) { latch.assertOpen(String.format("Error-state Publisher %s called `onError` twice on new Subscriber", pub)); @@ -135,7 +139,7 @@ public void onError(Throwable cause) { public void publisherSubscribeWhenInShutDownStateMustTriggerOnErrorAndNotOnSubscribe() throws Throwable { activePublisherTest(3, new PublisherTestRun() { @Override - public void run(final Publisher pub) throws InterruptedException { + public void run(final Source pub) throws InterruptedException { TestEnvironment.ManualSubscriber sub = env.newManualSubscriber(pub); sub.cancel(); @@ -145,7 +149,7 @@ public void run(final Publisher pub) throws InterruptedException { Thread.sleep(publisherShutdownTimeoutMillis); final Latch latch = new Latch(env); - pub.subscribe( + pub.listen( new TestEnvironment.TestSubscriber(env) { public void onError(Throwable cause) { latch.assertOpen(String.format("shut-down-state Publisher %s called `onError` twice on new Subscriber", pub)); @@ -165,12 +169,12 @@ public void onError(Throwable cause) { public void publisherSubscribeWhenActiveMustCallOnSubscribeFirst() throws Throwable { activePublisherTest(1, new PublisherTestRun() { @Override - public void run(Publisher pub) throws InterruptedException { + public void run(Source pub) throws InterruptedException { final Latch latch = new Latch(env); - final Subscription[] sub = {null}; - pub.subscribe( + final Handle[] sub = {null}; + pub.listen( new TestSubscriber(env) { - public void onSubscribe(Subscription subscription) { + public void onListen(Handle subscription) { latch.close(); sub[0] = subscription; } @@ -190,19 +194,19 @@ public void onSubscribe(Subscription subscription) { public void publisherSubscribeWhenActiveMustRejectDoubleSubscription() throws Throwable { activePublisherTest(1, new PublisherTestRun() { @Override - public void run(Publisher pub) throws InterruptedException { + public void run(Source pub) throws InterruptedException { final Latch latch = new Latch(env); final Promise errorCause = new Promise(env); TestSubscriber sub = new TestSubscriber(env) { - public void onSubscribe(Subscription subscription) { latch.close(); } + public void onListen(Handle subscription) { latch.close(); } public void onError(Throwable cause) { errorCause.complete(cause); } }; - pub.subscribe(sub); + pub.listen(sub); latch.expectClose(env.defaultTimeoutMillis(), "Active Publisher "+ pub+" did not call `onSubscribe` on first subscription request"); errorCause.assertUncompleted("Active Publisher "+ pub+" unexpectedly called `onError` on first subscription request"); latch.reOpen(); - pub.subscribe(sub); + pub.listen(sub); errorCause.expectCompletion(env.defaultTimeoutMillis(), "Active Publisher "+ pub+" did not call `onError` on double subscription request"); if(!IllegalStateException.class.isInstance(errorCause.value())) env.flop("Publisher " + pub + " called `onError` with " + errorCause.value() + " rather than an `IllegalStateException` on double subscription request"); @@ -219,10 +223,10 @@ public void run(Publisher pub) throws InterruptedException { public void subscriptionRequestMoreWhenCancelledMustIgnoreTheCall() throws Throwable { activePublisherTest(1, new PublisherTestRun() { @Override - public void run(Publisher pub) throws InterruptedException { + public void run(Source pub) throws InterruptedException { ManualSubscriber sub = env.newManualSubscriber(pub); sub.subscription.value().cancel(); - sub.subscription.value().requestMore(1); // must not throw + sub.subscription.value().request(1); // must not throw } }); } @@ -237,7 +241,7 @@ public void run(Publisher pub) throws InterruptedException { public void subscriptionRequestMoreMustResultInTheCorrectNumberOfProducedElements() throws Throwable { activePublisherTest(5, new PublisherTestRun() { @Override - public void run(Publisher pub) throws InterruptedException { + public void run(Source pub) throws InterruptedException { ManualSubscriber sub = env.newManualSubscriber(pub); @@ -262,7 +266,7 @@ public void run(Publisher pub) throws InterruptedException { public void subscriptionRequestMoreMustThrowIfArgumentIsNonPositive() throws Throwable { activePublisherTest(1, new PublisherTestRun() { @Override - public void run(Publisher pub) throws Throwable { + public void run(Source pub) throws Throwable { final ManualSubscriber sub = env.newManualSubscriber(pub); env.expectThrowingOf( @@ -271,7 +275,7 @@ public void run(Publisher pub) throws Throwable { new Runnable() { @Override public void run() { - sub.subscription.value().requestMore(-1); + sub.subscription.value().request(-1); } }); @@ -281,7 +285,7 @@ public void run() { new Runnable() { @Override public void run() { - sub.subscription.value().requestMore(0); + sub.subscription.value().request(0); } }); sub.cancel(); @@ -296,7 +300,7 @@ public void run() { public void subscriptionCancelWhenCancelledMustIgnoreCall() throws Throwable { activePublisherTest(1, new PublisherTestRun() { @Override - public void run(Publisher pub) throws InterruptedException { + public void run(Source pub) throws InterruptedException { ManualSubscriber sub = env.newManualSubscriber(pub); sub.subscription.value().cancel(); // first time must succeed sub.subscription.value().cancel(); // the second time must not throw @@ -311,7 +315,7 @@ public void run(Publisher pub) throws InterruptedException { public void onSubscriptionCancelThePublisherMustEventuallyCeaseToCallAnyMethodsOnTheSubscriber() throws Throwable { activePublisherTest(0, new PublisherTestRun() { @Override - public void run(Publisher pub) throws InterruptedException { + public void run(Source pub) throws InterruptedException { // infinite stream final AtomicBoolean drop = new AtomicBoolean(true); @@ -344,9 +348,9 @@ private interface Function { public void onSubscriptionCancelThePublisherMustEventuallyDropAllReferencesToTheSubscriber() throws Throwable { final ReferenceQueue> queue = new ReferenceQueue>(); - final Function, WeakReference>> run = new Function, WeakReference>>() { + final Function, WeakReference>> run = new Function, WeakReference>>() { @Override - public WeakReference> apply(Publisher pub) throws Exception { + public WeakReference> apply(Source pub) throws Exception { ManualSubscriber sub = env.newManualSubscriber(pub); WeakReference> ref = new WeakReference>(sub, queue); sub.requestMore(1); @@ -358,7 +362,7 @@ public WeakReference> apply(Publisher pub) throws Excepti activePublisherTest(3, new PublisherTestRun() { @Override - public void run(Publisher pub) throws Exception { + public void run(Source pub) throws Exception { WeakReference> ref = run.apply(pub); // cancel may be run asynchronously so we add a sleep before running the GC @@ -388,7 +392,7 @@ public void mustProduceTheSameElementsInTheSameSequenceForAllItsSubscribers() th activePublisherTest(5, new PublisherTestRun() { @Override - public void run(Publisher pub) throws InterruptedException { + public void run(Source pub) throws InterruptedException { ManualSubscriber sub1 = env.newManualSubscriber(pub); ManualSubscriber sub2 = env.newManualSubscriber(pub); ManualSubscriber sub3 = env.newManualSubscriber(pub); @@ -449,7 +453,7 @@ public void mustSupportAPendingElementCountUpToLongMaxValue() { public void mustCallOnCompleteOnASubscriberAfterHavingProducedTheFinalStreamElementToIt() throws Throwable { activePublisherTest(3, new PublisherTestRun() { @Override - public void run(Publisher pub) throws InterruptedException { + public void run(Source pub) throws InterruptedException { ManualSubscriber sub = env.newManualSubscriber(pub); sub.requestNextElement(); sub.requestNextElement(); @@ -488,11 +492,11 @@ public void mustNotCallOnCompleteOrOnErrorMoreThanOncePerSubscriber() { /////////////////////// TEST INFRASTRUCTURE ////////////////////// interface PublisherTestRun { - public void run(Publisher pub) throws Throwable; + public void run(Source pub) throws Throwable; } public void activePublisherTest(int elements, PublisherTestRun body) throws Throwable { - Publisher pub = createPublisher(elements); + Source pub = createPublisher(elements); body.run(pub); env.verifyNoAsyncErrors(); } @@ -505,7 +509,7 @@ public void errorPublisherTest(PublisherTestRun body) throws Throwable { potentiallyPendingTest(createErrorStatePublisher(), body); } - public void potentiallyPendingTest(Publisher pub, PublisherTestRun body) throws Throwable { + public void potentiallyPendingTest(Source pub, PublisherTestRun body) throws Throwable { if (pub != null) { body.run(pub); env.verifyNoAsyncErrors(); diff --git a/tck/src/main/java/org/reactivestreams/tck/SubscriberVerification.java b/tck/src/main/java/org/reactivestreams/tck/SubscriberVerification.java index b208ccff..8215677c 100644 --- a/tck/src/main/java/org/reactivestreams/tck/SubscriberVerification.java +++ b/tck/src/main/java/org/reactivestreams/tck/SubscriberVerification.java @@ -1,12 +1,15 @@ package org.reactivestreams.tck; -import org.reactivestreams.spi.Publisher; -import org.reactivestreams.spi.Subscriber; -import org.reactivestreams.spi.Subscription; +import org.reactivestreams.Handle; +import org.reactivestreams.Listener; +import org.reactivestreams.Source; +import org.reactivestreams.tck.TestEnvironment.Latch; +import org.reactivestreams.tck.TestEnvironment.ManualPublisher; +import org.reactivestreams.tck.TestEnvironment.ManualSubscriber; +import org.reactivestreams.tck.TestEnvironment.Promise; +import org.reactivestreams.tck.TestEnvironment.Receptacle; import org.testng.annotations.Test; -import static org.reactivestreams.tck.TestEnvironment.*; - public abstract class SubscriberVerification { private final TestEnvironment env; @@ -22,14 +25,14 @@ protected SubscriberVerification(TestEnvironment env) { * In order to be meaningfully testable your Subscriber must inform the given * `SubscriberProbe` of the respective events having been received. */ - abstract Subscriber createSubscriber(SubscriberProbe probe); + abstract Listener createSubscriber(SubscriberProbe probe); /** * Helper method required for generating test elements. * It must create a Publisher for a stream with exactly the given number of elements. * If `elements` is zero the produced stream must be infinite. */ - abstract Publisher createHelperPublisher(int elements); + abstract Source createHelperPublisher(int elements); ////////////////////// TEST SETUP VERIFICATION /////////////////////////// @@ -82,9 +85,9 @@ void onCompleteAndOnErrorMustAsynchronouslyScheduleAnEvent() { void mustNotAcceptAnOnSubscribeEventIfItAlreadyHasAnActiveSubscription() throws InterruptedException { new TestSetup(env) {{ // try to subscribe another time, if the subscriber calls `probe.registerOnSubscribe` the test will fail - sub().onSubscribe( - new Subscription() { - public void requestMore(int elements) { + sub().onListen( + new Handle() { + public void request(int elements) { env.flop(String.format("Subscriber %s illegally called `subscription.requestMore(%s)`", sub(), elements)); } @@ -201,11 +204,11 @@ public TestSetup(TestEnvironment env) throws InterruptedException { super(env); tees = env.newManualSubscriber(createHelperPublisher(0)); probe = new Probe(); - subscribe(createSubscriber(probe)); + listen(createSubscriber(probe)); probe.puppet.expectCompletion(env.defaultTimeoutMillis(), String.format("Subscriber %s did not `registerOnSubscribe`", sub())); } - Subscriber sub() { + Listener sub() { return subscriber.get(); } diff --git a/tck/src/main/java/org/reactivestreams/tck/TestEnvironment.java b/tck/src/main/java/org/reactivestreams/tck/TestEnvironment.java index d6e8835f..bd7f5270 100644 --- a/tck/src/main/java/org/reactivestreams/tck/TestEnvironment.java +++ b/tck/src/main/java/org/reactivestreams/tck/TestEnvironment.java @@ -1,9 +1,6 @@ package org.reactivestreams.tck; -import org.reactivestreams.spi.Publisher; -import org.reactivestreams.spi.Subscriber; -import org.reactivestreams.spi.Subscription; -import org.reactivestreams.tck.support.Optional; +import static org.testng.Assert.fail; import java.util.LinkedList; import java.util.List; @@ -12,7 +9,10 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import static org.testng.Assert.fail; +import org.reactivestreams.Handle; +import org.reactivestreams.Listener; +import org.reactivestreams.Source; +import org.reactivestreams.tck.support.Optional; public class TestEnvironment { public static final int TEST_BUFFER_SIZE = 16; @@ -60,21 +60,21 @@ public void expectThrowingOf(Class clazz, String errorM } } - public void subscribe(Publisher pub, TestSubscriber sub) throws InterruptedException { + public void subscribe(Source pub, TestSubscriber sub) throws InterruptedException { subscribe(pub, sub, defaultTimeoutMillis); } - public void subscribe(Publisher pub, TestSubscriber sub, long timeoutMillis) throws InterruptedException { - pub.subscribe(sub); + public void subscribe(Source pub, TestSubscriber sub, long timeoutMillis) throws InterruptedException { + pub.listen(sub); sub.subscription.expectCompletion(timeoutMillis, String.format("Could not subscribe %s to Publisher %s", sub, pub)); verifyNoAsyncErrors(); } - public ManualSubscriber newManualSubscriber(Publisher pub) throws InterruptedException { + public ManualSubscriber newManualSubscriber(Source pub) throws InterruptedException { return newManualSubscriber(pub, defaultTimeoutMillis()); } - public ManualSubscriber newManualSubscriber(Publisher pub, long timeoutMillis) throws InterruptedException { + public ManualSubscriber newManualSubscriber(Source pub, long timeoutMillis) throws InterruptedException { ManualSubscriberWithSubscriptionSupport sub = new ManualSubscriberWithSubscriptionSupport(this); subscribe(pub, sub, timeoutMillis); return sub; @@ -111,7 +111,7 @@ public void onComplete() { } } - public void onSubscribe(Subscription s) { + public void onListen(Handle s) { if (!subscription.isCompleted()) { subscription.complete(s); } else { @@ -128,14 +128,14 @@ public void onError(Throwable cause) { } } - static class TestSubscriber implements Subscriber { - volatile Promise subscription; + static class TestSubscriber implements Listener { + volatile Promise subscription; protected final TestEnvironment env; public TestSubscriber(TestEnvironment env) { this.env = env; - subscription = new Promise(env); + subscription = new Promise(env); } @Override @@ -153,14 +153,14 @@ public void onNext(T element) { env.flop(String.format("Unexpected Subscriber::onNext(%s)", element)); } - public void onSubscribe(Subscription subscription) { + public void onListen(Handle subscription) { env.flop(String.format("Unexpected Subscriber::onSubscribe(%s)", subscription)); } public void cancel() { if (subscription.isCompleted()) { subscription.value().cancel(); - subscription = new Promise(env); + subscription = new Promise(env); } else env.flop("Cannot cancel a subscription before having received it"); } } @@ -183,7 +183,7 @@ public void onComplete() { } void requestMore(int elements) { - subscription.value().requestMore(elements); + subscription.value().request(elements); } public T requestNextElement() throws InterruptedException { @@ -315,10 +315,10 @@ public void expectNone(long withinMillis) throws InterruptedException { } - static class ManualPublisher implements Publisher { + static class ManualPublisher implements Source { protected final TestEnvironment env; - Optional> subscriber = Optional.empty(); + Optional> subscriber = Optional.empty(); Receptacle requests; Latch cancelled; @@ -329,13 +329,13 @@ public ManualPublisher(TestEnvironment env) { } @Override - public void subscribe(Subscriber s) { + public void listen(Listener s) { if (subscriber.isEmpty()) { subscriber = Optional.of(s); - Subscription subs = new Subscription() { + Handle subs = new Handle() { @Override - public void requestMore(int elements) { + public void request(int elements) { requests.add(elements); } @@ -344,7 +344,7 @@ public void cancel() { cancelled.close(); } }; - s.onSubscribe(subs); + s.onListen(subs); } else { env.flop("TestPublisher doesn't support more than one Subscriber"); diff --git a/tck/src/main/java/org/reactivestreams/tck/TestProcessor.java b/tck/src/main/java/org/reactivestreams/tck/TestProcessor.java new file mode 100644 index 00000000..3d687141 --- /dev/null +++ b/tck/src/main/java/org/reactivestreams/tck/TestProcessor.java @@ -0,0 +1,15 @@ +package org.reactivestreams.tck; + +import org.reactivestreams.Listener; +import org.reactivestreams.Source; + +/** + * The TCK uses this to pull together the 2 sides of {@link Source} and {@link Listener}. + */ +public interface TestProcessor extends Source, Listener { + + Listener getSubscriber(); + + Source getPublisher(); + +} From 0124aaa9b17c1f7aaf6a0fdaf0cf312e2e68875a Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Mon, 21 Apr 2014 17:28:11 -0700 Subject: [PATCH 02/19] Examples --- .../example/multicast/MulticastExample.java | 23 ++++++ .../multicast/NeverEndingStockStream.java | 75 ++++++++++++++++++ .../example/multicast/Stock.java | 15 ++++ .../multicast/StockPricePublisher.java | 73 +++++++++++++++++ .../multicast/StockPriceSubscriber.java | 78 +++++++++++++++++++ .../InfiniteIncrementNumberPublisher.java | 52 +++++++++++++ .../NumberSubscriberThatHopsThreads.java | 64 +++++++++++++++ .../example/unicast/UnicastExample.java | 21 +++++ 8 files changed, 401 insertions(+) create mode 100644 spi/src/examples/java/org/reactivestreams/example/multicast/MulticastExample.java create mode 100644 spi/src/examples/java/org/reactivestreams/example/multicast/NeverEndingStockStream.java create mode 100644 spi/src/examples/java/org/reactivestreams/example/multicast/Stock.java create mode 100644 spi/src/examples/java/org/reactivestreams/example/multicast/StockPricePublisher.java create mode 100644 spi/src/examples/java/org/reactivestreams/example/multicast/StockPriceSubscriber.java create mode 100644 spi/src/examples/java/org/reactivestreams/example/unicast/InfiniteIncrementNumberPublisher.java create mode 100644 spi/src/examples/java/org/reactivestreams/example/unicast/NumberSubscriberThatHopsThreads.java create mode 100644 spi/src/examples/java/org/reactivestreams/example/unicast/UnicastExample.java diff --git a/spi/src/examples/java/org/reactivestreams/example/multicast/MulticastExample.java b/spi/src/examples/java/org/reactivestreams/example/multicast/MulticastExample.java new file mode 100644 index 00000000..260ddb81 --- /dev/null +++ b/spi/src/examples/java/org/reactivestreams/example/multicast/MulticastExample.java @@ -0,0 +1,23 @@ +package org.reactivestreams.example.multicast; + +import org.reactivestreams.Source; + +public class MulticastExample { + + /** + * Each subscribe will join an existing stream. + * + * @param args + * @throws InterruptedException + */ + public static void main(String... args) throws InterruptedException { + Source dataStream = new StockPricePublisher(); + + dataStream.listen(new StockPriceSubscriber(5, 500)); // 500ms on each event, infinite + dataStream.listen(new StockPriceSubscriber(10, 2000)); // 2000ms on each event, infinite + Thread.sleep(5000); + dataStream.listen(new StockPriceSubscriber(10, 111, 20)); // 111ms on each event, take 20 + Thread.sleep(5000); + dataStream.listen(new StockPriceSubscriber(10, 222, 20));// 222ms on each event, take 20 + } +} diff --git a/spi/src/examples/java/org/reactivestreams/example/multicast/NeverEndingStockStream.java b/spi/src/examples/java/org/reactivestreams/example/multicast/NeverEndingStockStream.java new file mode 100644 index 00000000..0086e8c1 --- /dev/null +++ b/spi/src/examples/java/org/reactivestreams/example/multicast/NeverEndingStockStream.java @@ -0,0 +1,75 @@ +package org.reactivestreams.example.multicast; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Simulate a network connection that is firing data at you. + *

+ * Purposefully not using the `Subscriber` and `Publisher` types to not confuse things with `StockPricePublisher` + */ +public class NeverEndingStockStream { + + private static final NeverEndingStockStream INSTANCE = new NeverEndingStockStream(); + + private NeverEndingStockStream() { + } + + // using array because it is far faster than list/set for iteration + // which is where most of the time in the tight loop will go (... well, beside object allocation) + private volatile Handler[] handlers = new Handler[0]; + + public static synchronized void addHandler(Handler handler) { + if (INSTANCE.handlers.length == 0) { + INSTANCE.handlers = new Handler[] { handler }; + } else { + Handler[] newHandlers = new Handler[INSTANCE.handlers.length + 1]; + System.arraycopy(INSTANCE.handlers, 0, newHandlers, 0, INSTANCE.handlers.length); + newHandlers[newHandlers.length - 1] = handler; + INSTANCE.handlers = newHandlers; + } + INSTANCE.startIfNeeded(); + } + + public static synchronized void removeHandler(Handler handler) { + // too lazy to do the array handling + HashSet set = new HashSet<>(Arrays.asList(INSTANCE.handlers)); + set.remove(handler); + INSTANCE.handlers = set.toArray(new Handler[set.size()]); + } + + public static interface Handler { + public void handle(Stock event); + } + + private final AtomicBoolean running = new AtomicBoolean(false); + private final AtomicLong stockIndex = new AtomicLong(); + + private void startIfNeeded() { + if (running.compareAndSet(false, true)) { + new Thread(new Runnable() { + + @Override + public void run() { + while (handlers.length > 0) { + long l = stockIndex.incrementAndGet(); + Stock s = new Stock(l); + for (Handler h : handlers) { + h.handle(s); + } + try { + // just so it is someone sane how fast this is moving + Thread.sleep(1); + } catch (InterruptedException e) { + } + } + running.set(false); + } + + }).start(); + } + } + +} diff --git a/spi/src/examples/java/org/reactivestreams/example/multicast/Stock.java b/spi/src/examples/java/org/reactivestreams/example/multicast/Stock.java new file mode 100644 index 00000000..431e60de --- /dev/null +++ b/spi/src/examples/java/org/reactivestreams/example/multicast/Stock.java @@ -0,0 +1,15 @@ +package org.reactivestreams.example.multicast; + +public class Stock { + + private final long l; + + public Stock(long l) { + this.l = l; + } + + public long getPrice() { + return l; + } + +} diff --git a/spi/src/examples/java/org/reactivestreams/example/multicast/StockPricePublisher.java b/spi/src/examples/java/org/reactivestreams/example/multicast/StockPricePublisher.java new file mode 100644 index 00000000..643c2995 --- /dev/null +++ b/spi/src/examples/java/org/reactivestreams/example/multicast/StockPricePublisher.java @@ -0,0 +1,73 @@ +package org.reactivestreams.example.multicast; + +import java.util.concurrent.atomic.AtomicInteger; + +import org.reactivestreams.Handle; +import org.reactivestreams.Listener; +import org.reactivestreams.Source; +import org.reactivestreams.example.multicast.NeverEndingStockStream.Handler; + +/** + * Publisher of stock prices from a never ending stream. + *

+ * It will share a single underlying stream with as many subscribers as it receives. + *

+ * If the subscriber can not keep up, it will drop (different strategies could be implemented, configurable, etc). + */ +public class StockPricePublisher implements Source { + + @Override + public void listen(final Listener s) { + s.onListen(new Handle() { + + AtomicInteger capacity = new AtomicInteger(); + EventHandler handler = new EventHandler(s, capacity); + + @Override + public void request(int n) { + if (capacity.getAndAdd(n) == 0) { + // was at 0, so start up consumption again + startConsuming(); + } + } + + @Override + public void cancel() { + System.out.println("StockPricePublisher => Cancel Subscription"); + NeverEndingStockStream.removeHandler(handler); + } + + public void startConsuming() { + NeverEndingStockStream.addHandler(handler); + } + + }); + + } + + private static final class EventHandler implements Handler { + private final Listener s; + private final AtomicInteger capacity; + + private EventHandler(Listener s, AtomicInteger capacity) { + this.s = s; + this.capacity = capacity; + } + + @Override + public void handle(Stock event) { + int c = capacity.get(); + if (c == 0) { + // shortcut instead of doing decrement/increment loops while no capacity + return; + } + if (capacity.getAndDecrement() > 0) { + s.onNext(event); + } else { + // we just decremented below 0 so increment back one + capacity.incrementAndGet(); + } + } + } + +} diff --git a/spi/src/examples/java/org/reactivestreams/example/multicast/StockPriceSubscriber.java b/spi/src/examples/java/org/reactivestreams/example/multicast/StockPriceSubscriber.java new file mode 100644 index 00000000..1262068f --- /dev/null +++ b/spi/src/examples/java/org/reactivestreams/example/multicast/StockPriceSubscriber.java @@ -0,0 +1,78 @@ +package org.reactivestreams.example.multicast; + +import java.util.concurrent.ArrayBlockingQueue; + +import org.reactivestreams.Handle; +import org.reactivestreams.Listener; + +public class StockPriceSubscriber implements Listener { + + private final ArrayBlockingQueue buffer; + private final int delayPerStock; + private volatile boolean terminated = false; + private final int take; + + public StockPriceSubscriber(int bufferSize, int delayPerStock, int take) { + this.buffer = new ArrayBlockingQueue<>(bufferSize); + this.delayPerStock = delayPerStock; + this.take = take; + } + + public StockPriceSubscriber(int bufferSize, int delayPerStock) { + this(bufferSize, delayPerStock, -1); + } + + @Override + public void onListen(Handle s) { + System.out.println("StockPriceSubscriber.onSubscribe => request " + buffer.remainingCapacity()); + s.request(buffer.remainingCapacity()); + startAsyncWork(s); + } + + @Override + public void onNext(Stock t) { + buffer.add(t); + } + + @Override + public void onError(Throwable t) { + terminated = true; + throw new RuntimeException(t); + } + + @Override + public void onComplete() { + terminated = true; + } + + private void startAsyncWork(final Handle s) { + System.out.println("StockPriceSubscriber => Start new worker thread"); + /* don't write real code like this! just for quick demo */ + new Thread(new Runnable() { + public void run() { + int received = 0; + + while (!terminated) { + Stock v = buffer.poll(); + try { + Thread.sleep(delayPerStock); + } catch (Exception e) { + e.printStackTrace(); + } + if (buffer.size() < 3) { + s.request(buffer.remainingCapacity()); + } + if (v != null) { + received++; + System.out.println("StockPriceSubscriber[" + delayPerStock + "] => " + v.getPrice()); + if (take > 0 && received >= take) { + s.cancel(); + terminated = true; + } + } + } + } + }).start(); + } + +} diff --git a/spi/src/examples/java/org/reactivestreams/example/unicast/InfiniteIncrementNumberPublisher.java b/spi/src/examples/java/org/reactivestreams/example/unicast/InfiniteIncrementNumberPublisher.java new file mode 100644 index 00000000..dabbef77 --- /dev/null +++ b/spi/src/examples/java/org/reactivestreams/example/unicast/InfiniteIncrementNumberPublisher.java @@ -0,0 +1,52 @@ +package org.reactivestreams.example.unicast; + +import java.util.concurrent.atomic.AtomicInteger; + +import org.reactivestreams.Handle; +import org.reactivestreams.Listener; +import org.reactivestreams.Source; + +class InfiniteIncrementNumberPublisher implements Source { + + @Override + public void listen(final Listener s) { + + final AtomicInteger i = new AtomicInteger(); + + Handle subscription = new Handle() { + + AtomicInteger capacity = new AtomicInteger(); + + @Override + public void request(int n) { + System.out.println("signalAdditionalDemand => " + n); + if (capacity.getAndAdd(n) == 0) { + // start sending again if it wasn't already running + send(); + } + } + + private void send() { + System.out.println("send => " + capacity.get()); + // this would normally use an eventloop, actor, whatever + new Thread(new Runnable() { + + public void run() { + do { + s.onNext(i.incrementAndGet()); + } while (capacity.decrementAndGet() > 0); + } + }).start(); + } + + @Override + public void cancel() { + capacity.set(-1); + } + + }; + + s.onListen(subscription); + + } +} \ No newline at end of file diff --git a/spi/src/examples/java/org/reactivestreams/example/unicast/NumberSubscriberThatHopsThreads.java b/spi/src/examples/java/org/reactivestreams/example/unicast/NumberSubscriberThatHopsThreads.java new file mode 100644 index 00000000..f0cc5137 --- /dev/null +++ b/spi/src/examples/java/org/reactivestreams/example/unicast/NumberSubscriberThatHopsThreads.java @@ -0,0 +1,64 @@ +package org.reactivestreams.example.unicast; + +import java.util.concurrent.ArrayBlockingQueue; + +import org.reactivestreams.Handle; +import org.reactivestreams.Listener; + +class NumberSubscriberThatHopsThreads implements Listener { + + final int BUFFER_SIZE = 10; + private final ArrayBlockingQueue buffer = new ArrayBlockingQueue<>(BUFFER_SIZE); + private volatile boolean terminated = false; + private final String token; + + NumberSubscriberThatHopsThreads(String token) { + this.token = token; + } + + @Override + public void onListen(Handle s) { + System.out.println("onSubscribe => request " + BUFFER_SIZE); + s.request(BUFFER_SIZE); + startAsyncWork(s); + } + + @Override + public void onNext(Integer t) { + buffer.add(t); + } + + @Override + public void onError(Throwable t) { + terminated = true; + throw new RuntimeException(t); + } + + @Override + public void onComplete() { + terminated = true; + } + + private void startAsyncWork(final Handle s) { + System.out.println("**** Start new worker thread"); + /* don't write real code like this! just for quick demo */ + new Thread(new Runnable() { + public void run() { + while (!terminated) { + Integer v = buffer.poll(); + try { + Thread.sleep(100); + } catch (Exception e) { + e.printStackTrace(); + } + if (buffer.size() < 3) { + s.request(BUFFER_SIZE - buffer.size()); + } + if (v != null) { + System.out.println(token + " => Did stuff with v: " + v); + } + } + } + }).start(); + } +} \ No newline at end of file diff --git a/spi/src/examples/java/org/reactivestreams/example/unicast/UnicastExample.java b/spi/src/examples/java/org/reactivestreams/example/unicast/UnicastExample.java new file mode 100644 index 00000000..f867de88 --- /dev/null +++ b/spi/src/examples/java/org/reactivestreams/example/unicast/UnicastExample.java @@ -0,0 +1,21 @@ +package org.reactivestreams.example.unicast; + +import org.reactivestreams.Source; + +public class UnicastExample { + + /** + * Each subscribe will start a new stream starting at 0. + * + * @param args + * @throws InterruptedException + */ + public static void main(String... args) throws InterruptedException { + Source dataStream = new InfiniteIncrementNumberPublisher(); + + dataStream.listen(new NumberSubscriberThatHopsThreads("A")); + Thread.sleep(2000); + dataStream.listen(new NumberSubscriberThatHopsThreads("B")); + } + +} From b7ba5f4151370172178c3eb6c0e8d5c0666d2ac8 Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Mon, 21 Apr 2014 17:30:39 -0700 Subject: [PATCH 03/19] Publisher/Subscriber/Subscription Naming --- .../example/multicast/MulticastExample.java | 12 ++-- .../multicast/StockPricePublisher.java | 16 ++--- .../multicast/StockPriceSubscriber.java | 10 +-- .../InfiniteIncrementNumberPublisher.java | 14 ++-- .../NumberSubscriberThatHopsThreads.java | 10 +-- .../example/unicast/UnicastExample.java | 8 +-- .../main/java/org/reactivestreams/Handle.java | 29 -------- .../java/org/reactivestreams/Listener.java | 55 --------------- .../java/org/reactivestreams/Publisher.java | 17 +++++ .../main/java/org/reactivestreams/Source.java | 17 ----- .../java/org/reactivestreams/Subscriber.java | 55 +++++++++++++++ .../org/reactivestreams/Subscription.java | 29 ++++++++ .../tck/IdentityProcessorVerification.java | 38 +++++----- .../tck/PublisherVerification.java | 70 +++++++++---------- .../tck/SubscriberVerification.java | 18 ++--- .../reactivestreams/tck/TestEnvironment.java | 38 +++++----- .../reactivestreams/tck/TestProcessor.java | 12 ++-- 17 files changed, 224 insertions(+), 224 deletions(-) delete mode 100644 spi/src/main/java/org/reactivestreams/Handle.java delete mode 100644 spi/src/main/java/org/reactivestreams/Listener.java create mode 100644 spi/src/main/java/org/reactivestreams/Publisher.java delete mode 100644 spi/src/main/java/org/reactivestreams/Source.java create mode 100644 spi/src/main/java/org/reactivestreams/Subscriber.java create mode 100644 spi/src/main/java/org/reactivestreams/Subscription.java diff --git a/spi/src/examples/java/org/reactivestreams/example/multicast/MulticastExample.java b/spi/src/examples/java/org/reactivestreams/example/multicast/MulticastExample.java index 260ddb81..7c7b2a26 100644 --- a/spi/src/examples/java/org/reactivestreams/example/multicast/MulticastExample.java +++ b/spi/src/examples/java/org/reactivestreams/example/multicast/MulticastExample.java @@ -1,6 +1,6 @@ package org.reactivestreams.example.multicast; -import org.reactivestreams.Source; +import org.reactivestreams.Publisher; public class MulticastExample { @@ -11,13 +11,13 @@ public class MulticastExample { * @throws InterruptedException */ public static void main(String... args) throws InterruptedException { - Source dataStream = new StockPricePublisher(); + Publisher dataStream = new StockPricePublisher(); - dataStream.listen(new StockPriceSubscriber(5, 500)); // 500ms on each event, infinite - dataStream.listen(new StockPriceSubscriber(10, 2000)); // 2000ms on each event, infinite + dataStream.subscribe(new StockPriceSubscriber(5, 500)); // 500ms on each event, infinite + dataStream.subscribe(new StockPriceSubscriber(10, 2000)); // 2000ms on each event, infinite Thread.sleep(5000); - dataStream.listen(new StockPriceSubscriber(10, 111, 20)); // 111ms on each event, take 20 + dataStream.subscribe(new StockPriceSubscriber(10, 111, 20)); // 111ms on each event, take 20 Thread.sleep(5000); - dataStream.listen(new StockPriceSubscriber(10, 222, 20));// 222ms on each event, take 20 + dataStream.subscribe(new StockPriceSubscriber(10, 222, 20));// 222ms on each event, take 20 } } diff --git a/spi/src/examples/java/org/reactivestreams/example/multicast/StockPricePublisher.java b/spi/src/examples/java/org/reactivestreams/example/multicast/StockPricePublisher.java index 643c2995..78aa3e6f 100644 --- a/spi/src/examples/java/org/reactivestreams/example/multicast/StockPricePublisher.java +++ b/spi/src/examples/java/org/reactivestreams/example/multicast/StockPricePublisher.java @@ -2,9 +2,9 @@ import java.util.concurrent.atomic.AtomicInteger; -import org.reactivestreams.Handle; -import org.reactivestreams.Listener; -import org.reactivestreams.Source; +import org.reactivestreams.Subscription; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Publisher; import org.reactivestreams.example.multicast.NeverEndingStockStream.Handler; /** @@ -14,11 +14,11 @@ *

* If the subscriber can not keep up, it will drop (different strategies could be implemented, configurable, etc). */ -public class StockPricePublisher implements Source { +public class StockPricePublisher implements Publisher { @Override - public void listen(final Listener s) { - s.onListen(new Handle() { + public void subscribe(final Subscriber s) { + s.onSubscribe(new Subscription() { AtomicInteger capacity = new AtomicInteger(); EventHandler handler = new EventHandler(s, capacity); @@ -46,10 +46,10 @@ public void startConsuming() { } private static final class EventHandler implements Handler { - private final Listener s; + private final Subscriber s; private final AtomicInteger capacity; - private EventHandler(Listener s, AtomicInteger capacity) { + private EventHandler(Subscriber s, AtomicInteger capacity) { this.s = s; this.capacity = capacity; } diff --git a/spi/src/examples/java/org/reactivestreams/example/multicast/StockPriceSubscriber.java b/spi/src/examples/java/org/reactivestreams/example/multicast/StockPriceSubscriber.java index 1262068f..e0a37813 100644 --- a/spi/src/examples/java/org/reactivestreams/example/multicast/StockPriceSubscriber.java +++ b/spi/src/examples/java/org/reactivestreams/example/multicast/StockPriceSubscriber.java @@ -2,10 +2,10 @@ import java.util.concurrent.ArrayBlockingQueue; -import org.reactivestreams.Handle; -import org.reactivestreams.Listener; +import org.reactivestreams.Subscription; +import org.reactivestreams.Subscriber; -public class StockPriceSubscriber implements Listener { +public class StockPriceSubscriber implements Subscriber { private final ArrayBlockingQueue buffer; private final int delayPerStock; @@ -23,7 +23,7 @@ public StockPriceSubscriber(int bufferSize, int delayPerStock) { } @Override - public void onListen(Handle s) { + public void onSubscribe(Subscription s) { System.out.println("StockPriceSubscriber.onSubscribe => request " + buffer.remainingCapacity()); s.request(buffer.remainingCapacity()); startAsyncWork(s); @@ -45,7 +45,7 @@ public void onComplete() { terminated = true; } - private void startAsyncWork(final Handle s) { + private void startAsyncWork(final Subscription s) { System.out.println("StockPriceSubscriber => Start new worker thread"); /* don't write real code like this! just for quick demo */ new Thread(new Runnable() { diff --git a/spi/src/examples/java/org/reactivestreams/example/unicast/InfiniteIncrementNumberPublisher.java b/spi/src/examples/java/org/reactivestreams/example/unicast/InfiniteIncrementNumberPublisher.java index dabbef77..450e0089 100644 --- a/spi/src/examples/java/org/reactivestreams/example/unicast/InfiniteIncrementNumberPublisher.java +++ b/spi/src/examples/java/org/reactivestreams/example/unicast/InfiniteIncrementNumberPublisher.java @@ -2,18 +2,18 @@ import java.util.concurrent.atomic.AtomicInteger; -import org.reactivestreams.Handle; -import org.reactivestreams.Listener; -import org.reactivestreams.Source; +import org.reactivestreams.Subscription; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Publisher; -class InfiniteIncrementNumberPublisher implements Source { +class InfiniteIncrementNumberPublisher implements Publisher { @Override - public void listen(final Listener s) { + public void subscribe(final Subscriber s) { final AtomicInteger i = new AtomicInteger(); - Handle subscription = new Handle() { + Subscription subscription = new Subscription() { AtomicInteger capacity = new AtomicInteger(); @@ -46,7 +46,7 @@ public void cancel() { }; - s.onListen(subscription); + s.onSubscribe(subscription); } } \ No newline at end of file diff --git a/spi/src/examples/java/org/reactivestreams/example/unicast/NumberSubscriberThatHopsThreads.java b/spi/src/examples/java/org/reactivestreams/example/unicast/NumberSubscriberThatHopsThreads.java index f0cc5137..f0d594fa 100644 --- a/spi/src/examples/java/org/reactivestreams/example/unicast/NumberSubscriberThatHopsThreads.java +++ b/spi/src/examples/java/org/reactivestreams/example/unicast/NumberSubscriberThatHopsThreads.java @@ -2,10 +2,10 @@ import java.util.concurrent.ArrayBlockingQueue; -import org.reactivestreams.Handle; -import org.reactivestreams.Listener; +import org.reactivestreams.Subscription; +import org.reactivestreams.Subscriber; -class NumberSubscriberThatHopsThreads implements Listener { +class NumberSubscriberThatHopsThreads implements Subscriber { final int BUFFER_SIZE = 10; private final ArrayBlockingQueue buffer = new ArrayBlockingQueue<>(BUFFER_SIZE); @@ -17,7 +17,7 @@ class NumberSubscriberThatHopsThreads implements Listener { } @Override - public void onListen(Handle s) { + public void onSubscribe(Subscription s) { System.out.println("onSubscribe => request " + BUFFER_SIZE); s.request(BUFFER_SIZE); startAsyncWork(s); @@ -39,7 +39,7 @@ public void onComplete() { terminated = true; } - private void startAsyncWork(final Handle s) { + private void startAsyncWork(final Subscription s) { System.out.println("**** Start new worker thread"); /* don't write real code like this! just for quick demo */ new Thread(new Runnable() { diff --git a/spi/src/examples/java/org/reactivestreams/example/unicast/UnicastExample.java b/spi/src/examples/java/org/reactivestreams/example/unicast/UnicastExample.java index f867de88..59c7e197 100644 --- a/spi/src/examples/java/org/reactivestreams/example/unicast/UnicastExample.java +++ b/spi/src/examples/java/org/reactivestreams/example/unicast/UnicastExample.java @@ -1,6 +1,6 @@ package org.reactivestreams.example.unicast; -import org.reactivestreams.Source; +import org.reactivestreams.Publisher; public class UnicastExample { @@ -11,11 +11,11 @@ public class UnicastExample { * @throws InterruptedException */ public static void main(String... args) throws InterruptedException { - Source dataStream = new InfiniteIncrementNumberPublisher(); + Publisher dataStream = new InfiniteIncrementNumberPublisher(); - dataStream.listen(new NumberSubscriberThatHopsThreads("A")); + dataStream.subscribe(new NumberSubscriberThatHopsThreads("A")); Thread.sleep(2000); - dataStream.listen(new NumberSubscriberThatHopsThreads("B")); + dataStream.subscribe(new NumberSubscriberThatHopsThreads("B")); } } diff --git a/spi/src/main/java/org/reactivestreams/Handle.java b/spi/src/main/java/org/reactivestreams/Handle.java deleted file mode 100644 index 7c812964..00000000 --- a/spi/src/main/java/org/reactivestreams/Handle.java +++ /dev/null @@ -1,29 +0,0 @@ -package org.reactivestreams; - -/** - * A {@link Handle} represents a one-to-one lifecycle of a {@link Listener} subscribing to a {@link Source}. - *

- * It can only be used once by a single {@link Listener}. - *

- * It is used to both signal desire for data and cancel demand (and allow resource cleanup). - * - */ -public interface Handle { - /** - * No events will be sent by a {@link Source} until demand is signaled via this method. - *

- * It can be called however often and whenever needed. - *

- * Whatever has been signalled can be sent by the {@link Source} so only signal demand for what can be safely handled. - * - * @param n - */ - public void request(int n); - - /** - * Request the {@link Source} to stop sending data and clean up resources. - *

- * Data may still be sent to meet previously signalled demand after calling cancel as this request is asynchronous. - */ - public void cancel(); -} diff --git a/spi/src/main/java/org/reactivestreams/Listener.java b/spi/src/main/java/org/reactivestreams/Listener.java deleted file mode 100644 index 8f4b136e..00000000 --- a/spi/src/main/java/org/reactivestreams/Listener.java +++ /dev/null @@ -1,55 +0,0 @@ -package org.reactivestreams; - -/** - * Will receive call to {@link #onListen(Handle)} once after passing an instance of {@link Listener} to {@link Source#listen(Listener)}. - *

- * No further notifications will be received until {@link Handle#request(int)} is called. - *

- * After signaling demand: - *

    - *
  • One or more invocations of {@link #onNext(Object)} up to the maximum number defined by {@link Handle#request(int)}
  • - *
  • Single invocation of {@link #onError(Throwable)} or {@link #onCompleted()} which signals a terminal state after which no further events will be sent. - *
- *

- * Demand can be signalled via {@link Handle#request(int)} whenever the {@link Listener} instance is capable of handling more. - * - * @param - */ -public interface Listener { - /** - * Invoked after calling {@link Source#listen(Listener)}. - *

- * No data will start flowing until {@link Handle#request(int)} is invoked. - *

- * It is the resonsibility of this {@link Listener} instance to call {@link Handle#request(int)} whenever more data is wanted. - *

- * The {@link Source} will send notifications only in response to {@link Handle#request(int)}. - * - * @param s - * {@link Handle} that allows requesting data via {@link Handle#request(int)} - */ - public void onListen(Handle s); - - /** - * Data notification sent by the {@link Source} in response to requests to {@link Handle#request(int)}. - * - * @param t - */ - public void onNext(T t); - - /** - * Failed terminal state. - *

- * No further events will be sent even if {@link Handle#request(int)} is invoked again. - * - * @param t - */ - public void onError(Throwable t); - - /** - * Successful terminal state. - *

- * No further events will be sent even if {@link Handle#request(int)} is invoked again. - */ - public void onComplete(); -} diff --git a/spi/src/main/java/org/reactivestreams/Publisher.java b/spi/src/main/java/org/reactivestreams/Publisher.java new file mode 100644 index 00000000..3e39e75f --- /dev/null +++ b/spi/src/main/java/org/reactivestreams/Publisher.java @@ -0,0 +1,17 @@ +package org.reactivestreams; + +public interface Publisher { + + /** + * Request {@link Publisher} to start streaming data. + *

+ * This is a "factory method" and can be called multiple times, each time starting a new {@link Subscriber}. + *

+ * Each {@link Subscription} will work for only a single {@link Subscriber}. + *

+ * A {@link Subscriber} should only subscribe once to a single {@link Publisher}. + * + * @param s + */ + public void subscribe(Subscriber s); +} diff --git a/spi/src/main/java/org/reactivestreams/Source.java b/spi/src/main/java/org/reactivestreams/Source.java deleted file mode 100644 index f4e0b19d..00000000 --- a/spi/src/main/java/org/reactivestreams/Source.java +++ /dev/null @@ -1,17 +0,0 @@ -package org.reactivestreams; - -public interface Source { - - /** - * Request {@link Source} to start streaming data. - *

- * This is a "factory method" and can be called multiple times, each time starting a new {@link Listener}. - *

- * Each {@link Handle} will work for only a single {@link Listener}. - *

- * A {@link Listener} should only subscribe once to a single {@link Source}. - * - * @param s - */ - public void listen(Listener s); -} diff --git a/spi/src/main/java/org/reactivestreams/Subscriber.java b/spi/src/main/java/org/reactivestreams/Subscriber.java new file mode 100644 index 00000000..dd02ba98 --- /dev/null +++ b/spi/src/main/java/org/reactivestreams/Subscriber.java @@ -0,0 +1,55 @@ +package org.reactivestreams; + +/** + * Will receive call to {@link #onSubscribe(Subscription)} once after passing an instance of {@link Subscriber} to {@link Publisher#subscribe(Subscriber)}. + *

+ * No further notifications will be received until {@link Subscription#request(int)} is called. + *

+ * After signaling demand: + *

    + *
  • One or more invocations of {@link #onNext(Object)} up to the maximum number defined by {@link Subscription#request(int)}
  • + *
  • Single invocation of {@link #onError(Throwable)} or {@link #onCompleted()} which signals a terminal state after which no further events will be sent. + *
+ *

+ * Demand can be signalled via {@link Subscription#request(int)} whenever the {@link Subscriber} instance is capable of handling more. + * + * @param + */ +public interface Subscriber { + /** + * Invoked after calling {@link Publisher#subscribe(Subscriber)}. + *

+ * No data will start flowing until {@link Subscription#request(int)} is invoked. + *

+ * It is the resonsibility of this {@link Subscriber} instance to call {@link Subscription#request(int)} whenever more data is wanted. + *

+ * The {@link Publisher} will send notifications only in response to {@link Subscription#request(int)}. + * + * @param s + * {@link Subscription} that allows requesting data via {@link Subscription#request(int)} + */ + public void onSubscribe(Subscription s); + + /** + * Data notification sent by the {@link Publisher} in response to requests to {@link Subscription#request(int)}. + * + * @param t + */ + public void onNext(T t); + + /** + * Failed terminal state. + *

+ * No further events will be sent even if {@link Subscription#request(int)} is invoked again. + * + * @param t + */ + public void onError(Throwable t); + + /** + * Successful terminal state. + *

+ * No further events will be sent even if {@link Subscription#request(int)} is invoked again. + */ + public void onComplete(); +} diff --git a/spi/src/main/java/org/reactivestreams/Subscription.java b/spi/src/main/java/org/reactivestreams/Subscription.java new file mode 100644 index 00000000..cf31e838 --- /dev/null +++ b/spi/src/main/java/org/reactivestreams/Subscription.java @@ -0,0 +1,29 @@ +package org.reactivestreams; + +/** + * A {@link Subscription} represents a one-to-one lifecycle of a {@link Subscriber} subscribing to a {@link Publisher}. + *

+ * It can only be used once by a single {@link Subscriber}. + *

+ * It is used to both signal desire for data and cancel demand (and allow resource cleanup). + * + */ +public interface Subscription { + /** + * No events will be sent by a {@link Publisher} until demand is signaled via this method. + *

+ * It can be called however often and whenever needed. + *

+ * Whatever has been signalled can be sent by the {@link Publisher} so only signal demand for what can be safely handled. + * + * @param n + */ + public void request(int n); + + /** + * Request the {@link Publisher} to stop sending data and clean up resources. + *

+ * Data may still be sent to meet previously signalled demand after calling cancel as this request is asynchronous. + */ + public void cancel(); +} diff --git a/tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java b/tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java index f6e44213..ca0e3200 100644 --- a/tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java +++ b/tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java @@ -3,9 +3,9 @@ import java.util.HashSet; import java.util.Set; -import org.reactivestreams.Handle; -import org.reactivestreams.Listener; -import org.reactivestreams.Source; +import org.reactivestreams.Subscription; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Publisher; import org.reactivestreams.tck.TestEnvironment.ManualPublisher; import org.reactivestreams.tck.TestEnvironment.ManualSubscriber; import org.reactivestreams.tck.TestEnvironment.ManualSubscriberWithSubscriptionSupport; @@ -44,29 +44,29 @@ public IdentityProcessorVerification(final TestEnvironment env, long publisherSh this.subscriberVerification = new SubscriberVerification(env) { @Override - Listener createSubscriber(SubscriberProbe probe) { + Subscriber createSubscriber(SubscriberProbe probe) { return IdentityProcessorVerification.this.createSubscriber(probe); } @Override - Source createHelperPublisher(int elements) { + Publisher createHelperPublisher(int elements) { return IdentityProcessorVerification.this.createHelperPublisher(elements); } }; publisherVerification = new PublisherVerification(env, publisherShutdownTimeoutMillis) { @Override - public Source createPublisher(int elements) { + public Publisher createPublisher(int elements) { return IdentityProcessorVerification.this.createPublisher(elements); } @Override - public Source createCompletedStatePublisher() { + public Publisher createCompletedStatePublisher() { return IdentityProcessorVerification.this.createCompletedStatePublisher(); } @Override - public Source createErrorStatePublisher() { + public Publisher createErrorStatePublisher() { return IdentityProcessorVerification.this.createErrorStatePublisher(); } @@ -87,28 +87,28 @@ public Source createErrorStatePublisher() { * The stream must not produce the same element twice (in case of an infinite stream this requirement * is relaxed to only apply to the elements that are actually requested during all tests). */ - public abstract Source createHelperPublisher(int elements); + public abstract Publisher createHelperPublisher(int elements); /** * Return a Publisher in {@code completed} state in order to run additional tests on it, * or {@code null} in order to skip them. */ - public abstract Source createCompletedStatePublisher(); + public abstract Publisher createCompletedStatePublisher(); /** * Return a Publisher in {@code error} state in order to run additional tests on it, * or {@code null} in order to skip them. */ - public abstract Source createErrorStatePublisher(); + public abstract Publisher createErrorStatePublisher(); ////////////////////// PUBLISHER RULES VERIFICATION /////////////////////////// // A Processor // must obey all Publisher rules on its producing side - public Source createPublisher(int elements) { + public Publisher createPublisher(int elements) { TestProcessor processor = createIdentityProcessor(testBufferSize); - Source pub = createHelperPublisher(elements); - pub.listen(processor.getSubscriber()); + Publisher pub = createHelperPublisher(elements); + pub.subscribe(processor.getSubscriber()); return processor.getPublisher(); // we run the PublisherVerification against this } @@ -190,11 +190,11 @@ public void mustNotCallOnCompleteOrOnErrorMoreThanOncePerSubscriber() { // A Processor // must obey all Subscriber rules on its consuming side - public Listener createSubscriber(final SubscriberVerification.SubscriberProbe probe) { + public Subscriber createSubscriber(final SubscriberVerification.SubscriberProbe probe) { TestProcessor processor = createIdentityProcessor(testBufferSize); - processor.getPublisher().listen( - new Listener() { - public void onListen(final Handle subscription) { + processor.getPublisher().subscribe( + new Subscriber() { + public void onSubscribe(final Subscription subscription) { probe.registerOnSubscribe( new SubscriberVerification.SubscriberPuppet() { public void triggerShutdown() { @@ -509,7 +509,7 @@ public TestSetup(TestEnvironment env, int testBufferSize) throws InterruptedExce this.testBufferSize = testBufferSize; tees = env.newManualSubscriber(createHelperPublisher(0)); processor = createIdentityProcessor(testBufferSize); - listen(processor.getSubscriber()); + subscribe(processor.getSubscriber()); } public TestEnvironment.ManualSubscriber newSubscriber() throws InterruptedException { diff --git a/tck/src/main/java/org/reactivestreams/tck/PublisherVerification.java b/tck/src/main/java/org/reactivestreams/tck/PublisherVerification.java index ebded995..a9990e1b 100644 --- a/tck/src/main/java/org/reactivestreams/tck/PublisherVerification.java +++ b/tck/src/main/java/org/reactivestreams/tck/PublisherVerification.java @@ -11,8 +11,8 @@ import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; -import org.reactivestreams.Handle; -import org.reactivestreams.Source; +import org.reactivestreams.Subscription; +import org.reactivestreams.Publisher; import org.reactivestreams.tck.TestEnvironment.Latch; import org.reactivestreams.tck.TestEnvironment.ManualSubscriber; import org.reactivestreams.tck.TestEnvironment.ManualSubscriberWithSubscriptionSupport; @@ -37,19 +37,19 @@ public PublisherVerification(TestEnvironment env, long publisherShutdownTimeoutM * It must create a Publisher for a stream with exactly the given number of elements. * If `elements` is zero the produced stream must be infinite. */ - public abstract Source createPublisher(int elements); + public abstract Publisher createPublisher(int elements); /** * Return a Publisher in {@code completed} state in order to run additional tests on it, * or {@code null} in order to skip them. */ - public abstract Source createCompletedStatePublisher(); + public abstract Publisher createCompletedStatePublisher(); /** * Return a Publisher in {@code error} state in order to run additional tests on it, * or {@code null} in order to skip them. */ - public abstract Source createErrorStatePublisher(); + public abstract Publisher createErrorStatePublisher(); ////////////////////// TEST SETUP VERIFICATION /////////////////////////// @@ -57,7 +57,7 @@ public PublisherVerification(TestEnvironment env, long publisherShutdownTimeoutM public void createPublisher3MustProduceAStreamOfExactly3Elements() throws Throwable { activePublisherTest(3, new PublisherTestRun() { @Override - public void run(Source pub) throws InterruptedException { + public void run(Publisher pub) throws InterruptedException { TestEnvironment.ManualSubscriber sub = env.newManualSubscriber(pub); assertTrue(requestNextElementOrEndOfStream(pub, sub).isDefined(), String.format("Publisher %s produced no elements", pub)); assertTrue(requestNextElementOrEndOfStream(pub, sub).isDefined(), String.format("Publisher %s produced only 1 element", pub)); @@ -65,7 +65,7 @@ public void run(Source pub) throws InterruptedException { sub.requestEndOfStream(); } - Optional requestNextElementOrEndOfStream(Source pub, TestEnvironment.ManualSubscriber sub) throws InterruptedException { + Optional requestNextElementOrEndOfStream(Publisher pub, TestEnvironment.ManualSubscriber sub) throws InterruptedException { return sub.requestNextElementOrEndOfStream("Timeout while waiting for next element from Publisher" + pub); } @@ -83,16 +83,16 @@ Optional requestNextElementOrEndOfStream(Source pub, TestEnvironment.Manua public void publisherSubscribeWhenCompletedMustTriggerOnCompleteAndNotOnSubscribe() throws Throwable { completedPublisherTest(new PublisherTestRun() { @Override - public void run(final Source pub) throws InterruptedException { + public void run(final Publisher pub) throws InterruptedException { final Latch latch = new Latch(env); - pub.listen( + pub.subscribe( new TestEnvironment.TestSubscriber(env) { public void onComplete() { latch.assertOpen(String.format("Publisher %s called `onComplete` twice on new Subscriber", pub)); latch.close(); } - public void onListen(Handle subscription) { + public void onSubscribe(Subscription subscription) { env.flop(String.format("Publisher created by `createCompletedStatePublisher()` (%s) called `onSubscribe` on new Subscriber", pub)); } }); @@ -111,9 +111,9 @@ public void onListen(Handle subscription) { public void publisherSubscribeWhenInErrorStateMustTriggerOnErrorAndNotOnSubscribe() throws Throwable { errorPublisherTest(new PublisherTestRun() { @Override - public void run(final Source pub) throws InterruptedException { + public void run(final Publisher pub) throws InterruptedException { final Latch latch = new Latch(env); - pub.listen( + pub.subscribe( new TestEnvironment.TestSubscriber(env) { public void onError(Throwable cause) { latch.assertOpen(String.format("Error-state Publisher %s called `onError` twice on new Subscriber", pub)); @@ -139,7 +139,7 @@ public void onError(Throwable cause) { public void publisherSubscribeWhenInShutDownStateMustTriggerOnErrorAndNotOnSubscribe() throws Throwable { activePublisherTest(3, new PublisherTestRun() { @Override - public void run(final Source pub) throws InterruptedException { + public void run(final Publisher pub) throws InterruptedException { TestEnvironment.ManualSubscriber sub = env.newManualSubscriber(pub); sub.cancel(); @@ -149,7 +149,7 @@ public void run(final Source pub) throws InterruptedException { Thread.sleep(publisherShutdownTimeoutMillis); final Latch latch = new Latch(env); - pub.listen( + pub.subscribe( new TestEnvironment.TestSubscriber(env) { public void onError(Throwable cause) { latch.assertOpen(String.format("shut-down-state Publisher %s called `onError` twice on new Subscriber", pub)); @@ -169,12 +169,12 @@ public void onError(Throwable cause) { public void publisherSubscribeWhenActiveMustCallOnSubscribeFirst() throws Throwable { activePublisherTest(1, new PublisherTestRun() { @Override - public void run(Source pub) throws InterruptedException { + public void run(Publisher pub) throws InterruptedException { final Latch latch = new Latch(env); - final Handle[] sub = {null}; - pub.listen( + final Subscription[] sub = {null}; + pub.subscribe( new TestSubscriber(env) { - public void onListen(Handle subscription) { + public void onSubscribe(Subscription subscription) { latch.close(); sub[0] = subscription; } @@ -194,19 +194,19 @@ public void onListen(Handle subscription) { public void publisherSubscribeWhenActiveMustRejectDoubleSubscription() throws Throwable { activePublisherTest(1, new PublisherTestRun() { @Override - public void run(Source pub) throws InterruptedException { + public void run(Publisher pub) throws InterruptedException { final Latch latch = new Latch(env); final Promise errorCause = new Promise(env); TestSubscriber sub = new TestSubscriber(env) { - public void onListen(Handle subscription) { latch.close(); } + public void onSubscribe(Subscription subscription) { latch.close(); } public void onError(Throwable cause) { errorCause.complete(cause); } }; - pub.listen(sub); + pub.subscribe(sub); latch.expectClose(env.defaultTimeoutMillis(), "Active Publisher "+ pub+" did not call `onSubscribe` on first subscription request"); errorCause.assertUncompleted("Active Publisher "+ pub+" unexpectedly called `onError` on first subscription request"); latch.reOpen(); - pub.listen(sub); + pub.subscribe(sub); errorCause.expectCompletion(env.defaultTimeoutMillis(), "Active Publisher "+ pub+" did not call `onError` on double subscription request"); if(!IllegalStateException.class.isInstance(errorCause.value())) env.flop("Publisher " + pub + " called `onError` with " + errorCause.value() + " rather than an `IllegalStateException` on double subscription request"); @@ -223,7 +223,7 @@ public void run(Source pub) throws InterruptedException { public void subscriptionRequestMoreWhenCancelledMustIgnoreTheCall() throws Throwable { activePublisherTest(1, new PublisherTestRun() { @Override - public void run(Source pub) throws InterruptedException { + public void run(Publisher pub) throws InterruptedException { ManualSubscriber sub = env.newManualSubscriber(pub); sub.subscription.value().cancel(); sub.subscription.value().request(1); // must not throw @@ -241,7 +241,7 @@ public void run(Source pub) throws InterruptedException { public void subscriptionRequestMoreMustResultInTheCorrectNumberOfProducedElements() throws Throwable { activePublisherTest(5, new PublisherTestRun() { @Override - public void run(Source pub) throws InterruptedException { + public void run(Publisher pub) throws InterruptedException { ManualSubscriber sub = env.newManualSubscriber(pub); @@ -266,7 +266,7 @@ public void run(Source pub) throws InterruptedException { public void subscriptionRequestMoreMustThrowIfArgumentIsNonPositive() throws Throwable { activePublisherTest(1, new PublisherTestRun() { @Override - public void run(Source pub) throws Throwable { + public void run(Publisher pub) throws Throwable { final ManualSubscriber sub = env.newManualSubscriber(pub); env.expectThrowingOf( @@ -300,7 +300,7 @@ public void run() { public void subscriptionCancelWhenCancelledMustIgnoreCall() throws Throwable { activePublisherTest(1, new PublisherTestRun() { @Override - public void run(Source pub) throws InterruptedException { + public void run(Publisher pub) throws InterruptedException { ManualSubscriber sub = env.newManualSubscriber(pub); sub.subscription.value().cancel(); // first time must succeed sub.subscription.value().cancel(); // the second time must not throw @@ -315,7 +315,7 @@ public void run(Source pub) throws InterruptedException { public void onSubscriptionCancelThePublisherMustEventuallyCeaseToCallAnyMethodsOnTheSubscriber() throws Throwable { activePublisherTest(0, new PublisherTestRun() { @Override - public void run(Source pub) throws InterruptedException { + public void run(Publisher pub) throws InterruptedException { // infinite stream final AtomicBoolean drop = new AtomicBoolean(true); @@ -348,9 +348,9 @@ private interface Function { public void onSubscriptionCancelThePublisherMustEventuallyDropAllReferencesToTheSubscriber() throws Throwable { final ReferenceQueue> queue = new ReferenceQueue>(); - final Function, WeakReference>> run = new Function, WeakReference>>() { + final Function, WeakReference>> run = new Function, WeakReference>>() { @Override - public WeakReference> apply(Source pub) throws Exception { + public WeakReference> apply(Publisher pub) throws Exception { ManualSubscriber sub = env.newManualSubscriber(pub); WeakReference> ref = new WeakReference>(sub, queue); sub.requestMore(1); @@ -362,7 +362,7 @@ public WeakReference> apply(Source pub) throws Exception activePublisherTest(3, new PublisherTestRun() { @Override - public void run(Source pub) throws Exception { + public void run(Publisher pub) throws Exception { WeakReference> ref = run.apply(pub); // cancel may be run asynchronously so we add a sleep before running the GC @@ -392,7 +392,7 @@ public void mustProduceTheSameElementsInTheSameSequenceForAllItsSubscribers() th activePublisherTest(5, new PublisherTestRun() { @Override - public void run(Source pub) throws InterruptedException { + public void run(Publisher pub) throws InterruptedException { ManualSubscriber sub1 = env.newManualSubscriber(pub); ManualSubscriber sub2 = env.newManualSubscriber(pub); ManualSubscriber sub3 = env.newManualSubscriber(pub); @@ -453,7 +453,7 @@ public void mustSupportAPendingElementCountUpToLongMaxValue() { public void mustCallOnCompleteOnASubscriberAfterHavingProducedTheFinalStreamElementToIt() throws Throwable { activePublisherTest(3, new PublisherTestRun() { @Override - public void run(Source pub) throws InterruptedException { + public void run(Publisher pub) throws InterruptedException { ManualSubscriber sub = env.newManualSubscriber(pub); sub.requestNextElement(); sub.requestNextElement(); @@ -492,11 +492,11 @@ public void mustNotCallOnCompleteOrOnErrorMoreThanOncePerSubscriber() { /////////////////////// TEST INFRASTRUCTURE ////////////////////// interface PublisherTestRun { - public void run(Source pub) throws Throwable; + public void run(Publisher pub) throws Throwable; } public void activePublisherTest(int elements, PublisherTestRun body) throws Throwable { - Source pub = createPublisher(elements); + Publisher pub = createPublisher(elements); body.run(pub); env.verifyNoAsyncErrors(); } @@ -509,7 +509,7 @@ public void errorPublisherTest(PublisherTestRun body) throws Throwable { potentiallyPendingTest(createErrorStatePublisher(), body); } - public void potentiallyPendingTest(Source pub, PublisherTestRun body) throws Throwable { + public void potentiallyPendingTest(Publisher pub, PublisherTestRun body) throws Throwable { if (pub != null) { body.run(pub); env.verifyNoAsyncErrors(); diff --git a/tck/src/main/java/org/reactivestreams/tck/SubscriberVerification.java b/tck/src/main/java/org/reactivestreams/tck/SubscriberVerification.java index 8215677c..f85df31e 100644 --- a/tck/src/main/java/org/reactivestreams/tck/SubscriberVerification.java +++ b/tck/src/main/java/org/reactivestreams/tck/SubscriberVerification.java @@ -1,8 +1,8 @@ package org.reactivestreams.tck; -import org.reactivestreams.Handle; -import org.reactivestreams.Listener; -import org.reactivestreams.Source; +import org.reactivestreams.Subscription; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Publisher; import org.reactivestreams.tck.TestEnvironment.Latch; import org.reactivestreams.tck.TestEnvironment.ManualPublisher; import org.reactivestreams.tck.TestEnvironment.ManualSubscriber; @@ -25,14 +25,14 @@ protected SubscriberVerification(TestEnvironment env) { * In order to be meaningfully testable your Subscriber must inform the given * `SubscriberProbe` of the respective events having been received. */ - abstract Listener createSubscriber(SubscriberProbe probe); + abstract Subscriber createSubscriber(SubscriberProbe probe); /** * Helper method required for generating test elements. * It must create a Publisher for a stream with exactly the given number of elements. * If `elements` is zero the produced stream must be infinite. */ - abstract Source createHelperPublisher(int elements); + abstract Publisher createHelperPublisher(int elements); ////////////////////// TEST SETUP VERIFICATION /////////////////////////// @@ -85,8 +85,8 @@ void onCompleteAndOnErrorMustAsynchronouslyScheduleAnEvent() { void mustNotAcceptAnOnSubscribeEventIfItAlreadyHasAnActiveSubscription() throws InterruptedException { new TestSetup(env) {{ // try to subscribe another time, if the subscriber calls `probe.registerOnSubscribe` the test will fail - sub().onListen( - new Handle() { + sub().onSubscribe( + new Subscription() { public void request(int elements) { env.flop(String.format("Subscriber %s illegally called `subscription.requestMore(%s)`", sub(), elements)); } @@ -204,11 +204,11 @@ public TestSetup(TestEnvironment env) throws InterruptedException { super(env); tees = env.newManualSubscriber(createHelperPublisher(0)); probe = new Probe(); - listen(createSubscriber(probe)); + subscribe(createSubscriber(probe)); probe.puppet.expectCompletion(env.defaultTimeoutMillis(), String.format("Subscriber %s did not `registerOnSubscribe`", sub())); } - Listener sub() { + Subscriber sub() { return subscriber.get(); } diff --git a/tck/src/main/java/org/reactivestreams/tck/TestEnvironment.java b/tck/src/main/java/org/reactivestreams/tck/TestEnvironment.java index bd7f5270..be33c78a 100644 --- a/tck/src/main/java/org/reactivestreams/tck/TestEnvironment.java +++ b/tck/src/main/java/org/reactivestreams/tck/TestEnvironment.java @@ -9,9 +9,9 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import org.reactivestreams.Handle; -import org.reactivestreams.Listener; -import org.reactivestreams.Source; +import org.reactivestreams.Subscription; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Publisher; import org.reactivestreams.tck.support.Optional; public class TestEnvironment { @@ -60,21 +60,21 @@ public void expectThrowingOf(Class clazz, String errorM } } - public void subscribe(Source pub, TestSubscriber sub) throws InterruptedException { + public void subscribe(Publisher pub, TestSubscriber sub) throws InterruptedException { subscribe(pub, sub, defaultTimeoutMillis); } - public void subscribe(Source pub, TestSubscriber sub, long timeoutMillis) throws InterruptedException { - pub.listen(sub); + public void subscribe(Publisher pub, TestSubscriber sub, long timeoutMillis) throws InterruptedException { + pub.subscribe(sub); sub.subscription.expectCompletion(timeoutMillis, String.format("Could not subscribe %s to Publisher %s", sub, pub)); verifyNoAsyncErrors(); } - public ManualSubscriber newManualSubscriber(Source pub) throws InterruptedException { + public ManualSubscriber newManualSubscriber(Publisher pub) throws InterruptedException { return newManualSubscriber(pub, defaultTimeoutMillis()); } - public ManualSubscriber newManualSubscriber(Source pub, long timeoutMillis) throws InterruptedException { + public ManualSubscriber newManualSubscriber(Publisher pub, long timeoutMillis) throws InterruptedException { ManualSubscriberWithSubscriptionSupport sub = new ManualSubscriberWithSubscriptionSupport(this); subscribe(pub, sub, timeoutMillis); return sub; @@ -111,7 +111,7 @@ public void onComplete() { } } - public void onListen(Handle s) { + public void onSubscribe(Subscription s) { if (!subscription.isCompleted()) { subscription.complete(s); } else { @@ -128,14 +128,14 @@ public void onError(Throwable cause) { } } - static class TestSubscriber implements Listener { - volatile Promise subscription; + static class TestSubscriber implements Subscriber { + volatile Promise subscription; protected final TestEnvironment env; public TestSubscriber(TestEnvironment env) { this.env = env; - subscription = new Promise(env); + subscription = new Promise(env); } @Override @@ -153,14 +153,14 @@ public void onNext(T element) { env.flop(String.format("Unexpected Subscriber::onNext(%s)", element)); } - public void onListen(Handle subscription) { + public void onSubscribe(Subscription subscription) { env.flop(String.format("Unexpected Subscriber::onSubscribe(%s)", subscription)); } public void cancel() { if (subscription.isCompleted()) { subscription.value().cancel(); - subscription = new Promise(env); + subscription = new Promise(env); } else env.flop("Cannot cancel a subscription before having received it"); } } @@ -315,10 +315,10 @@ public void expectNone(long withinMillis) throws InterruptedException { } - static class ManualPublisher implements Source { + static class ManualPublisher implements Publisher { protected final TestEnvironment env; - Optional> subscriber = Optional.empty(); + Optional> subscriber = Optional.empty(); Receptacle requests; Latch cancelled; @@ -329,11 +329,11 @@ public ManualPublisher(TestEnvironment env) { } @Override - public void listen(Listener s) { + public void subscribe(Subscriber s) { if (subscriber.isEmpty()) { subscriber = Optional.of(s); - Handle subs = new Handle() { + Subscription subs = new Subscription() { @Override public void request(int elements) { requests.add(elements); @@ -344,7 +344,7 @@ public void cancel() { cancelled.close(); } }; - s.onListen(subs); + s.onSubscribe(subs); } else { env.flop("TestPublisher doesn't support more than one Subscriber"); diff --git a/tck/src/main/java/org/reactivestreams/tck/TestProcessor.java b/tck/src/main/java/org/reactivestreams/tck/TestProcessor.java index 3d687141..3c6f335c 100644 --- a/tck/src/main/java/org/reactivestreams/tck/TestProcessor.java +++ b/tck/src/main/java/org/reactivestreams/tck/TestProcessor.java @@ -1,15 +1,15 @@ package org.reactivestreams.tck; -import org.reactivestreams.Listener; -import org.reactivestreams.Source; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Publisher; /** - * The TCK uses this to pull together the 2 sides of {@link Source} and {@link Listener}. + * The TCK uses this to pull together the 2 sides of {@link Publisher} and {@link Subscriber}. */ -public interface TestProcessor extends Source, Listener { +public interface TestProcessor extends Publisher, Subscriber { - Listener getSubscriber(); + Subscriber getSubscriber(); - Source getPublisher(); + Publisher getPublisher(); } From 34742e960ecc3188364483cfcad463417acf4901 Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Mon, 21 Apr 2014 17:36:02 -0700 Subject: [PATCH 04/19] Contract Clarification -------------------------- - `Subscriber` can be used once-and-only-once to subscribe to a `Publisher`. - a `Subscription` can be used once-and-only-once to represent a subscription by a `Subscriber` to a `Publisher`. - The `Publisher.subscribe` method can be called as many times as wanted as long as it is with a different `Subscriber` each time. It is up to the `Publisher` whether underlying streams are shared or not. - A `Publisher` can refuse subscriptions (calls to `subscribe`) if it is unable or unwilling to serve them (overwhelmed, fronting a single-use data sources, etc) and can do so by immediately calling `Subscriber.onError` on the `Subscriber` instance calling `subscribe`. - Events sent to a `Subscriber` can only be sent sequentially (no concurrent notifications). - Once an `onComplete` or `onError` is sent, no further events can be sent. - Once a `Subscription` is cancelled, the `Publisher` will stop sending events as soon as it can. - A `Publisher` will never send more `onNext` events than have been requested via the `Subscription.request/signalDemand` method. It can send less events than requested and end the subscription by emitting `onComplete` or `onError`. --- spi/src/main/java/org/reactivestreams/Subscriber.java | 2 +- spi/src/main/java/org/reactivestreams/Subscription.java | 5 ++++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/spi/src/main/java/org/reactivestreams/Subscriber.java b/spi/src/main/java/org/reactivestreams/Subscriber.java index dd02ba98..4d28b44a 100644 --- a/spi/src/main/java/org/reactivestreams/Subscriber.java +++ b/spi/src/main/java/org/reactivestreams/Subscriber.java @@ -21,7 +21,7 @@ public interface Subscriber { *

* No data will start flowing until {@link Subscription#request(int)} is invoked. *

- * It is the resonsibility of this {@link Subscriber} instance to call {@link Subscription#request(int)} whenever more data is wanted. + * It is the responsibility of this {@link Subscriber} instance to call {@link Subscription#request(int)} whenever more data is wanted. *

* The {@link Publisher} will send notifications only in response to {@link Subscription#request(int)}. * diff --git a/spi/src/main/java/org/reactivestreams/Subscription.java b/spi/src/main/java/org/reactivestreams/Subscription.java index cf31e838..9c665bdd 100644 --- a/spi/src/main/java/org/reactivestreams/Subscription.java +++ b/spi/src/main/java/org/reactivestreams/Subscription.java @@ -14,7 +14,10 @@ public interface Subscription { *

* It can be called however often and whenever needed. *

- * Whatever has been signalled can be sent by the {@link Publisher} so only signal demand for what can be safely handled. + * Whatever has been requested can be sent by the {@link Publisher} so only signal demand for what can be safely handled. + *

+ * A {@link Publisher} can send less than is requested if the stream ends but + * then must emit either {@link Subscriber#onError(Throwable)} or {@link Subscriber#onComplete()}. * * @param n */ From 9c037359b275855b218c0e2f1334fee29fee9ab0 Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Mon, 21 Apr 2014 19:27:14 -0700 Subject: [PATCH 05/19] Update README with new Types and Contract --- README.md | 51 ++++++++++++++++++++++++++++----------------------- 1 file changed, 28 insertions(+), 23 deletions(-) diff --git a/README.md b/README.md index ff044502..7d93e8db 100644 --- a/README.md +++ b/README.md @@ -34,13 +34,11 @@ In summary, Reactive Streams is a standard and specification for Stream-oriented The Reactive Streams specification consists of the following parts: -**The SPI** defines the interoperablility layer between different implementations. - -**The API** specifies the types that the users of Reactive Stream libraries use. +**The API** specifies the types to implement Reactive Streams and achieve interoperablility between different implementations. ***The Technology Compatibility Kit (TCK)*** is a standard test suite for conformance testing of implementations. -Implementations are free to implement additional features not covered by the specification as long as they conform to the API and SPI requirements and pass the tests in the TCK. +Implementations are free to implement additional features not covered by the specification as long as they conform to the API requirements and pass the tests in the TCK. #### Comparison with related technologies #### @@ -50,47 +48,54 @@ Compared to Rx, the SPI described here prescribes a mandatory, non-blocking way Iteratees are an abstraction used for consuming a stream, often for parsing it. In this sense they are not a stream transformation or combination tool in themselves. -### SPI Components ### - -The SPI consists of components that are required to be provided by Reactive Stream implementations but these interfaces should not be exposed to libraries or user code that *use* a Reactive Streams implementation. The reason for this is that the methods used on the SPI level have very strict and rather complex semantic requirements which are likely to be violated by end users. +### API Components ### -The components of the SPI are: +The API consists of the following components that are required to be provided by Reactive Stream implementations: - Publisher - Subscriber - Subscription -A *Publisher* is a provider of a potentially unbounded number of sequenced elements, publishing them according to the demand received from its Subscriber(s). A Publisher can serve multiple subscribers subscribed dynamically at various points in time. In the case of multiple subscribers the Publisher should respect the processing rates of all of its subscribers (possibly allowing for a bounded drift between them). It must eventually clean up its resources after all of its subscribers have been unsubscribed and shut down. A Publisher will typically support fanning out to multiple Subscribers in order to support the dynamic assembly of processing networks from building blocks that can freely be shared. +A *Publisher* is a provider of a potentially unbounded number of sequenced elements, publishing them according to the demand received from its Subscriber(s). -A *Subscriber* is a component that accepts a sequenced stream of elements provided by a Publisher. At any given time a Subscriber might be subscribed to at most one Publisher. It provides the callback onNext to be called by the upstream Producer, accepting an element that is to be asynchronously processed or enqueued without blocking the Producer. +- A `Publisher` will never send more `onNext` events than have been requested via the `Subscription.request` method. +- A `Publisher` can send less events that requested and end the `Subscription` by emitting `onComplete` or `onError`. +- Events sent to a `Subscriber` can only be sent sequentially (no concurrent notifications). +- If a `Publisher` fails it must emit an `onError`. +- If a `Publisher` terminates successfully (finite stream) it must emit an `onComplete`. +- If a `Publisher` terminates via either `onError` or `onComplete` it must `cancel` its `Subscription` +- Once a terminal state has occurred (`onError`, `onNext`) no further events can be sent. +- Upon receiving a `Subscription.cancel` request it should stop sending events as soon as it can. The `onError` and `onComplete` events are not needed if `Subscription.cancel` was initiated by the `Subscriber`. +- The `Publisher.subscribe` method can be called as many times as wanted as long as it is with a different `Subscriber` each time. It is up to the `Publisher` whether underlying streams are shared or not. In other words, a `Publisher` can support multi-subscribe and then choose whether each `Subscription` is unicast or multicast. +- A `Publisher` can refuse subscriptions (calls to `subscribe`) if it is unable or unwilling to serve them (overwhelmed, fronting a single-use data source, etc) and can do so by immediately calling `Subscriber.onError` on the `Subscriber` instance calling `subscribe`. -A Subscriber communicates demand to the Publisher via a *Subscription* which is passed to the Subscriber after the subscription has been established. The Subscription exposes the requestMore(int) method that is used by the Subscriber to signal demand to the Publisher. For each of its subscribers the Publisher obeys the following invariant: -*If N is the total number of demand tokens handed to the Publisher P by a Subscriber S during the time period up to a time T, then the number of onNext calls that are allowed to be performed by P on S before T must be less than or equal to N. The number of pending demand tokens must be tracked by the Producer separately for each of its subscribers.* +A *`Subscriber`* is a component that accepts a sequenced stream of elements provided by a `Publisher`. At any given time a `Subscriber` might be subscribed to at most one `Publisher`. It provides the callback `onNext` to be called by the upstream `Publisher`, accepting an element that is to be processed or enqueued without blocking the `Publisher`. -Subscribers that do not currently have an active subscription may subscribe to a Publisher. The only guarantee for subscribers attached at different points in time is that they all observe a common suffix of the stream, i.e. they receive the same elements after a certain point in time but it is not guaranteed that they see exactly the same number of elements. This obviously only holds if the subscriber does not cancel its subscription before the stream has been terminated. +- `Subscriber` can be used once-and-only-once to subscribe to a `Publisher`. -> In practice there is a difference between the guarantees that different publishers can provide for subscribers attached at different points in time. For example Publishers serving elements from a strict collection (“cold”) might guarantee that all subscribers see *exactly* the same elements (unless unsubscribed before completion) since they can replay the elements from the collection at any point in time. Other publishers might represent an ephemeral source of elements (e.g. a “hot” TCP stream) and keep only a limited output buffer to replay for future subscribers. -At any time the Publisher may signal that it is not able to provide more elements. This is done by invoking onComplete on its subscribers. +A `Subscriber` communicates demand to the `Publisher` via a *`Subscription`* which is passed to the `Subscriber` after the subscription has been established. The `Subscription` exposes the `request(int)` method that is used by the `Subscriber` to signal demand to the `Publisher`. -> For example a Publisher representing a strict collection signals completion to its subscriber after it provided all the elements. Now a later subscriber might still receive the whole collection before receiving onComplete. +- a `Subscription` can be used once-and-only-once to represent a subscription by a `Subscriber` to a `Publisher`. -### API components ### +For each of its subscribers the `Publisher` obeys the following invariant: -The purpose of the API is to provide the types that users interact with directly. SPI methods and interfaces should not be exposed expect for the purpose of writing Reactive Streams implementations. +*If N is the total number of demand tokens handed to the `Publisher` P by a `Subscriber` S during the time period up to a time T, then the number of `onNext` calls that are allowed to be performed by P on S before T must be less than or equal to N. The number of pending demand tokens must be tracked by the `Producer` separately for each of its subscribers.* -The API counterpart for Publisher is *Producer* and for Subscriber is *Consumer*. The combination of these two—a stream processing element with asynchronous input and output—is called *Processor*. +`Subscriber`s that do not currently have an active subscription may subscribe to a `Publisher`. The only guarantee for subscribers attached at different points in time is that they all observe a common suffix of the stream, i.e. they receive the same elements after a certain point in time but it is not guaranteed that they see exactly the same number of elements. This obviously only holds if the subscriber does not cancel its subscription before the stream has been terminated. + +> In practice there is a difference between the guarantees that different publishers can provide for subscribers attached at different points in time. For example Publishers serving elements from a strict collection (“cold”) might guarantee that all subscribers see *exactly* the same elements (unless unsubscribed before completion) since they can replay the elements from the collection at any point in time. Other publishers might represent an ephemeral source of elements (e.g. a “hot” TCP stream) and keep only a limited output buffer to replay for future subscribers. -The only operation supported by any Producer–Consumer pair is their ability to establish a connection for the purpose of transferring the stream of elements from Producer to Consumer; this is achieved by the method `produceTo()`. Concrete implementations of Reactive Streams are expected to offer a rich set of combinators and transformations, but these are not the subject of this specification. The reason is that implementations shall have the freedom to formulate the end-user API in an idiomatic fashion for the respective platform, language and use-case they target. +At any time the `Publisher` may signal that it is not able to provide more elements. This is done by invoking `onComplete` on its subscribers. -In addition there is one method each on Producer and Consumer to obtain a reference to the underlying Publisher or Subscriber, respectively. These are necessary for implementations, but is not to be considered end-user API. +> For example a `Publisher` representing a strict collection signals completion to its subscriber after it provided all the elements. Now a later subscriber might still receive the whole collection before receiving onComplete. ### Asynchronous processing ### -The Reactive Streams SPI prescribes that all processing of elements (onNext) or termination signals (onError, onComplete) happens outside of the execution stack of the Publisher. This is achieved by scheduling the processing to run asynchronously, possibly on a different thread. The Subscriber should make sure to minimize the amount of processing steps used to initiate this process, meaning that all its SPI-mandated methods shall return as quickly as possible. +The Reactive Streams API prescribes that all processing of elements (`onNext`) or termination signals (`onError`, `onComplete`) happens outside of the execution stack of the `Publisher`. This is achieved by scheduling the processing to run asynchronously, possibly on a different thread. The Subscriber should make sure to minimize the amount of processing steps used to initiate this process, meaning that all its API-mandated methods shall return as quickly as possible. Note that this does not mean synchronous processing is not permitted; see "Relationship to synchronous stream-processing" below. -In contrast to communicating back-pressure by blocking the publisher, a non-blocking solution needs to communicate demand through a dedicated control channel. This channel is provided by the Subscription: the subscriber controls the maximum amount of future elements it is willing receive by sending explicit demand tokens (by calling requestMore(int)). +In contrast to communicating back-pressure by blocking the publisher, a non-blocking solution needs to communicate demand through a dedicated control channel. This channel is provided by the `Subscription`: the subscriber controls the maximum amount of future elements it is willing receive by sending explicit demand tokens (by calling request(int)). #### Relationship to synchronous stream-processing #### From 992c75869653d1b2a3b33b80653933fc43a0fcef Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Wed, 23 Apr 2014 19:15:48 -0700 Subject: [PATCH 06/19] Update as per discussion with Roland in review MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Line 61: The number of onNext events emitted by a Publisher to a Subscriber will at no point in time exceed the cumulative demand that has been signaled via that Subscriber’s Subscription. --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 7d93e8db..8278ca4e 100644 --- a/README.md +++ b/README.md @@ -58,7 +58,7 @@ The API consists of the following components that are required to be provided by A *Publisher* is a provider of a potentially unbounded number of sequenced elements, publishing them according to the demand received from its Subscriber(s). -- A `Publisher` will never send more `onNext` events than have been requested via the `Subscription.request` method. +- The number of `onNext` events emitted by a `Publisher` to a `Subscriber` will at no point in time exceed the cumulative demand that has been signaled via that `Subscriber`’s `Subscription`. - A `Publisher` can send less events that requested and end the `Subscription` by emitting `onComplete` or `onError`. - Events sent to a `Subscriber` can only be sent sequentially (no concurrent notifications). - If a `Publisher` fails it must emit an `onError`. From b11c4f8d6c955d72a822f07f59bed3b446641fcc Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Wed, 23 Apr 2014 19:25:33 -0700 Subject: [PATCH 07/19] Added protocol definition --- README.md | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 8278ca4e..5e949b5f 100644 --- a/README.md +++ b/README.md @@ -58,13 +58,19 @@ The API consists of the following components that are required to be provided by A *Publisher* is a provider of a potentially unbounded number of sequenced elements, publishing them according to the demand received from its Subscriber(s). +The protocol of a `Publisher`/`Subscriber` relationship is defined as: + +``` +onSubscribe onNext* (onError | onComplete)? +``` + - The number of `onNext` events emitted by a `Publisher` to a `Subscriber` will at no point in time exceed the cumulative demand that has been signaled via that `Subscriber`’s `Subscription`. - A `Publisher` can send less events that requested and end the `Subscription` by emitting `onComplete` or `onError`. - Events sent to a `Subscriber` can only be sent sequentially (no concurrent notifications). - If a `Publisher` fails it must emit an `onError`. - If a `Publisher` terminates successfully (finite stream) it must emit an `onComplete`. - If a `Publisher` terminates via either `onError` or `onComplete` it must `cancel` its `Subscription` -- Once a terminal state has occurred (`onError`, `onNext`) no further events can be sent. +- Once a terminal state has been signaled (`onError`, `onNext`) no further events can be sent. - Upon receiving a `Subscription.cancel` request it should stop sending events as soon as it can. The `onError` and `onComplete` events are not needed if `Subscription.cancel` was initiated by the `Subscriber`. - The `Publisher.subscribe` method can be called as many times as wanted as long as it is with a different `Subscriber` each time. It is up to the `Publisher` whether underlying streams are shared or not. In other words, a `Publisher` can support multi-subscribe and then choose whether each `Subscription` is unicast or multicast. - A `Publisher` can refuse subscriptions (calls to `subscribe`) if it is unable or unwilling to serve them (overwhelmed, fronting a single-use data source, etc) and can do so by immediately calling `Subscriber.onError` on the `Subscriber` instance calling `subscribe`. From df8a14b9002d0dae0dc190f9677c25d36793fd9e Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Wed, 23 Apr 2014 19:30:55 -0700 Subject: [PATCH 08/19] Rewording of behavior after unsubscribe --- README.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 5e949b5f..c62db5c1 100644 --- a/README.md +++ b/README.md @@ -71,7 +71,8 @@ onSubscribe onNext* (onError | onComplete)? - If a `Publisher` terminates successfully (finite stream) it must emit an `onComplete`. - If a `Publisher` terminates via either `onError` or `onComplete` it must `cancel` its `Subscription` - Once a terminal state has been signaled (`onError`, `onNext`) no further events can be sent. -- Upon receiving a `Subscription.cancel` request it should stop sending events as soon as it can. The `onError` and `onComplete` events are not needed if `Subscription.cancel` was initiated by the `Subscriber`. +- Upon receiving a `Subscription.cancel` request it should stop sending events as soon as it can. +- Calling `onError` or `onComplete` is not required after having received a `Subscription.cancel`. - The `Publisher.subscribe` method can be called as many times as wanted as long as it is with a different `Subscriber` each time. It is up to the `Publisher` whether underlying streams are shared or not. In other words, a `Publisher` can support multi-subscribe and then choose whether each `Subscription` is unicast or multicast. - A `Publisher` can refuse subscriptions (calls to `subscribe`) if it is unable or unwilling to serve them (overwhelmed, fronting a single-use data source, etc) and can do so by immediately calling `Subscriber.onError` on the `Subscriber` instance calling `subscribe`. From 715a155aefff9ba0e36edc9c92c6ad4a87acc65b Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Wed, 23 Apr 2014 19:35:24 -0700 Subject: [PATCH 09/19] A `Publisher` should not throw an `Exception` As per discussion with Roland. --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index c62db5c1..46f76f21 100644 --- a/README.md +++ b/README.md @@ -75,6 +75,7 @@ onSubscribe onNext* (onError | onComplete)? - Calling `onError` or `onComplete` is not required after having received a `Subscription.cancel`. - The `Publisher.subscribe` method can be called as many times as wanted as long as it is with a different `Subscriber` each time. It is up to the `Publisher` whether underlying streams are shared or not. In other words, a `Publisher` can support multi-subscribe and then choose whether each `Subscription` is unicast or multicast. - A `Publisher` can refuse subscriptions (calls to `subscribe`) if it is unable or unwilling to serve them (overwhelmed, fronting a single-use data source, etc) and can do so by immediately calling `Subscriber.onError` on the `Subscriber` instance calling `subscribe`. +- A `Publisher` should not throw an `Exception`. The only legal way to signal failure or reject a `Subscription` is via the `Subscriber.onError` method. A *`Subscriber`* is a component that accepts a sequenced stream of elements provided by a `Publisher`. At any given time a `Subscriber` might be subscribed to at most one `Publisher`. It provides the callback `onNext` to be called by the upstream `Publisher`, accepting an element that is to be processed or enqueued without blocking the `Publisher`. From eba598c4f0b1db4f5e2db89e42d794ab6fd57112 Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Wed, 23 Apr 2014 19:37:01 -0700 Subject: [PATCH 10/19] Update README.md --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 46f76f21..1e030bf6 100644 --- a/README.md +++ b/README.md @@ -74,8 +74,8 @@ onSubscribe onNext* (onError | onComplete)? - Upon receiving a `Subscription.cancel` request it should stop sending events as soon as it can. - Calling `onError` or `onComplete` is not required after having received a `Subscription.cancel`. - The `Publisher.subscribe` method can be called as many times as wanted as long as it is with a different `Subscriber` each time. It is up to the `Publisher` whether underlying streams are shared or not. In other words, a `Publisher` can support multi-subscribe and then choose whether each `Subscription` is unicast or multicast. -- A `Publisher` can refuse subscriptions (calls to `subscribe`) if it is unable or unwilling to serve them (overwhelmed, fronting a single-use data source, etc) and can do so by immediately calling `Subscriber.onError` on the `Subscriber` instance calling `subscribe`. -- A `Publisher` should not throw an `Exception`. The only legal way to signal failure or reject a `Subscription` is via the `Subscriber.onError` method. +- A `Publisher` can refuse subscriptions (calls to `subscribe`) if it is unable or unwilling to serve them (overwhelmed, fronting a single-use data source, etc) and can do so by calling `Subscriber.onError` instead of `Subscriber.onSubscribe` on the `Subscriber` instance calling `subscribe`. +- A `Publisher` should not throw an `Exception`. The only legal way to signal failure (or reject a `Subscription`) is via the `Subscriber.onError` method. A *`Subscriber`* is a component that accepts a sequenced stream of elements provided by a `Publisher`. At any given time a `Subscriber` might be subscribed to at most one `Publisher`. It provides the callback `onNext` to be called by the upstream `Publisher`, accepting an element that is to be processed or enqueued without blocking the `Publisher`. From 7e5fcf98fe853191b738d6edf09919d7cc8ecd52 Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Wed, 23 Apr 2014 19:45:14 -0700 Subject: [PATCH 11/19] Attempt at clarifying request language Not thrilled with this ... but it's a starting point: - Calls from a `Subscriber` to `Subscription` such as `Subscription.request(int n)` must be dispatched asynchronously (separate thread, event loop, trampoline, etc) so as to not cause a StackOverflow since `Subscriber.onNext` -> `Subscription.request` -> `Subscriber.onNext` can recurse infinitely. --- README.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 1e030bf6..26786e1e 100644 --- a/README.md +++ b/README.md @@ -85,7 +85,8 @@ A *`Subscriber`* is a component that accepts a sequenced stream of elements prov A `Subscriber` communicates demand to the `Publisher` via a *`Subscription`* which is passed to the `Subscriber` after the subscription has been established. The `Subscription` exposes the `request(int)` method that is used by the `Subscriber` to signal demand to the `Publisher`. -- a `Subscription` can be used once-and-only-once to represent a subscription by a `Subscriber` to a `Publisher`. +- A `Subscription` can be used once-and-only-once to represent a subscription by a `Subscriber` to a `Publisher`. +- Calls from a `Subscriber` to `Subscription` such as `Subscription.request(int n)` must be dispatched asynchronously (separate thread, event loop, trampoline, etc) so as to not cause a StackOverflow since `Subscriber.onNext` -> `Subscription.request` -> `Subscriber.onNext` can recurse infinitely. For each of its subscribers the `Publisher` obeys the following invariant: From 27b7ffe22de768e628934c216fcd409a4a846d48 Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Wed, 23 Apr 2014 19:45:44 -0700 Subject: [PATCH 12/19] Remove duplicate --- project/plugins.sbt | 2 -- 1 file changed, 2 deletions(-) diff --git a/project/plugins.sbt b/project/plugins.sbt index 0f403e99..5128a075 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -1,5 +1,3 @@ addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "2.4.0") addSbtPlugin("de.johoop" % "sbt-testng-plugin" % "3.0.0") - -addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "2.4.0") From 0c1be8d50d98ba3cf094b3bc1d70fbf58828ba54 Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Wed, 23 Apr 2014 19:50:20 -0700 Subject: [PATCH 13/19] Subscriber -> Subscription in Javadoc --- spi/src/main/java/org/reactivestreams/Publisher.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/spi/src/main/java/org/reactivestreams/Publisher.java b/spi/src/main/java/org/reactivestreams/Publisher.java index 3e39e75f..3e094965 100644 --- a/spi/src/main/java/org/reactivestreams/Publisher.java +++ b/spi/src/main/java/org/reactivestreams/Publisher.java @@ -5,11 +5,14 @@ public interface Publisher { /** * Request {@link Publisher} to start streaming data. *

- * This is a "factory method" and can be called multiple times, each time starting a new {@link Subscriber}. + * This is a "factory method" and can be called multiple times, each time starting a new {@link Subscription}. *

* Each {@link Subscription} will work for only a single {@link Subscriber}. *

* A {@link Subscriber} should only subscribe once to a single {@link Publisher}. + *

+ * If the {@link Publisher} rejects the subscription attempt or otherwise fails it will + * signal the error via {@link Subscriber#onError}. * * @param s */ From da058ccd0326d655ab2a62f7e6bc25165ef587bc Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Wed, 23 Apr 2014 20:28:54 -0700 Subject: [PATCH 14/19] Subscriber's Subscription in terminal state --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 26786e1e..de3fc1e2 100644 --- a/README.md +++ b/README.md @@ -69,7 +69,7 @@ onSubscribe onNext* (onError | onComplete)? - Events sent to a `Subscriber` can only be sent sequentially (no concurrent notifications). - If a `Publisher` fails it must emit an `onError`. - If a `Publisher` terminates successfully (finite stream) it must emit an `onComplete`. -- If a `Publisher` terminates via either `onError` or `onComplete` it must `cancel` its `Subscription` +- If a Publisher signals either `onError` or `onComplete` on a `Subscriber`, that `Subscriber`’s `Subscription` must be considered canceled. - Once a terminal state has been signaled (`onError`, `onNext`) no further events can be sent. - Upon receiving a `Subscription.cancel` request it should stop sending events as soon as it can. - Calling `onError` or `onComplete` is not required after having received a `Subscription.cancel`. From 7862230405d173657451c239fad9a4706ae0066d Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Thu, 24 Apr 2014 09:31:56 -0700 Subject: [PATCH 15/19] Update protocol specification ... to include onError/onSubscribe at beginning as per discussion on pull request. --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index de3fc1e2..eef85d5d 100644 --- a/README.md +++ b/README.md @@ -61,7 +61,7 @@ A *Publisher* is a provider of a potentially unbounded number of sequenced eleme The protocol of a `Publisher`/`Subscriber` relationship is defined as: ``` -onSubscribe onNext* (onError | onComplete)? +onError | (onSubscribe onNext* (onError | onComplete)?) ``` - The number of `onNext` events emitted by a `Publisher` to a `Subscriber` will at no point in time exceed the cumulative demand that has been signaled via that `Subscriber`’s `Subscription`. From 08744a5ef1ee8325b2f948d3989030cfedd4cb07 Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Thu, 24 Apr 2014 09:46:31 -0700 Subject: [PATCH 16/19] Moving Subscription.request async responsibility --- README.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index eef85d5d..553e9429 100644 --- a/README.md +++ b/README.md @@ -76,6 +76,7 @@ onError | (onSubscribe onNext* (onError | onComplete)?) - The `Publisher.subscribe` method can be called as many times as wanted as long as it is with a different `Subscriber` each time. It is up to the `Publisher` whether underlying streams are shared or not. In other words, a `Publisher` can support multi-subscribe and then choose whether each `Subscription` is unicast or multicast. - A `Publisher` can refuse subscriptions (calls to `subscribe`) if it is unable or unwilling to serve them (overwhelmed, fronting a single-use data source, etc) and can do so by calling `Subscriber.onError` instead of `Subscriber.onSubscribe` on the `Subscriber` instance calling `subscribe`. - A `Publisher` should not throw an `Exception`. The only legal way to signal failure (or reject a `Subscription`) is via the `Subscriber.onError` method. +- The `Subscription.request` method must behave asynchronously (separate thread, event loop, trampoline, etc) so as to not cause a StackOverflow since `Subscriber.onNext` -> `Subscription.request` -> `Subscriber.onNext` can recurse infinitely. This allows a `Subscriber` to directly invoke `Subscription.request` and isolate the async responsibility to the `Subscription` instance which has responsibility for scheduling events. A *`Subscriber`* is a component that accepts a sequenced stream of elements provided by a `Publisher`. At any given time a `Subscriber` might be subscribed to at most one `Publisher`. It provides the callback `onNext` to be called by the upstream `Publisher`, accepting an element that is to be processed or enqueued without blocking the `Publisher`. @@ -86,7 +87,7 @@ A *`Subscriber`* is a component that accepts a sequenced stream of elements prov A `Subscriber` communicates demand to the `Publisher` via a *`Subscription`* which is passed to the `Subscriber` after the subscription has been established. The `Subscription` exposes the `request(int)` method that is used by the `Subscriber` to signal demand to the `Publisher`. - A `Subscription` can be used once-and-only-once to represent a subscription by a `Subscriber` to a `Publisher`. -- Calls from a `Subscriber` to `Subscription` such as `Subscription.request(int n)` must be dispatched asynchronously (separate thread, event loop, trampoline, etc) so as to not cause a StackOverflow since `Subscriber.onNext` -> `Subscription.request` -> `Subscriber.onNext` can recurse infinitely. +- Calls from a `Subscriber` to `Subscription.request(int n)` can be made directly since it is the responsibility of `Subscription` to handle async dispatching. For each of its subscribers the `Publisher` obeys the following invariant: From 6343aebaa7b7da42db8e7f3a0f64a53e893ad31a Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Thu, 24 Apr 2014 11:11:21 -0700 Subject: [PATCH 17/19] /spi/ -> /api/ --- {spi => api}/.gitignore | 0 {spi => api}/build.sbt | 0 .../org/reactivestreams/example/multicast/MulticastExample.java | 0 .../reactivestreams/example/multicast/NeverEndingStockStream.java | 0 .../java/org/reactivestreams/example/multicast/Stock.java | 0 .../reactivestreams/example/multicast/StockPricePublisher.java | 0 .../reactivestreams/example/multicast/StockPriceSubscriber.java | 0 .../example/unicast/InfiniteIncrementNumberPublisher.java | 0 .../example/unicast/NumberSubscriberThatHopsThreads.java | 0 .../java/org/reactivestreams/example/unicast/UnicastExample.java | 0 {spi => api}/src/main/java/org/reactivestreams/Publisher.java | 0 {spi => api}/src/main/java/org/reactivestreams/Subscriber.java | 0 {spi => api}/src/main/java/org/reactivestreams/Subscription.java | 0 13 files changed, 0 insertions(+), 0 deletions(-) rename {spi => api}/.gitignore (100%) rename {spi => api}/build.sbt (100%) rename {spi => api}/src/examples/java/org/reactivestreams/example/multicast/MulticastExample.java (100%) rename {spi => api}/src/examples/java/org/reactivestreams/example/multicast/NeverEndingStockStream.java (100%) rename {spi => api}/src/examples/java/org/reactivestreams/example/multicast/Stock.java (100%) rename {spi => api}/src/examples/java/org/reactivestreams/example/multicast/StockPricePublisher.java (100%) rename {spi => api}/src/examples/java/org/reactivestreams/example/multicast/StockPriceSubscriber.java (100%) rename {spi => api}/src/examples/java/org/reactivestreams/example/unicast/InfiniteIncrementNumberPublisher.java (100%) rename {spi => api}/src/examples/java/org/reactivestreams/example/unicast/NumberSubscriberThatHopsThreads.java (100%) rename {spi => api}/src/examples/java/org/reactivestreams/example/unicast/UnicastExample.java (100%) rename {spi => api}/src/main/java/org/reactivestreams/Publisher.java (100%) rename {spi => api}/src/main/java/org/reactivestreams/Subscriber.java (100%) rename {spi => api}/src/main/java/org/reactivestreams/Subscription.java (100%) diff --git a/spi/.gitignore b/api/.gitignore similarity index 100% rename from spi/.gitignore rename to api/.gitignore diff --git a/spi/build.sbt b/api/build.sbt similarity index 100% rename from spi/build.sbt rename to api/build.sbt diff --git a/spi/src/examples/java/org/reactivestreams/example/multicast/MulticastExample.java b/api/src/examples/java/org/reactivestreams/example/multicast/MulticastExample.java similarity index 100% rename from spi/src/examples/java/org/reactivestreams/example/multicast/MulticastExample.java rename to api/src/examples/java/org/reactivestreams/example/multicast/MulticastExample.java diff --git a/spi/src/examples/java/org/reactivestreams/example/multicast/NeverEndingStockStream.java b/api/src/examples/java/org/reactivestreams/example/multicast/NeverEndingStockStream.java similarity index 100% rename from spi/src/examples/java/org/reactivestreams/example/multicast/NeverEndingStockStream.java rename to api/src/examples/java/org/reactivestreams/example/multicast/NeverEndingStockStream.java diff --git a/spi/src/examples/java/org/reactivestreams/example/multicast/Stock.java b/api/src/examples/java/org/reactivestreams/example/multicast/Stock.java similarity index 100% rename from spi/src/examples/java/org/reactivestreams/example/multicast/Stock.java rename to api/src/examples/java/org/reactivestreams/example/multicast/Stock.java diff --git a/spi/src/examples/java/org/reactivestreams/example/multicast/StockPricePublisher.java b/api/src/examples/java/org/reactivestreams/example/multicast/StockPricePublisher.java similarity index 100% rename from spi/src/examples/java/org/reactivestreams/example/multicast/StockPricePublisher.java rename to api/src/examples/java/org/reactivestreams/example/multicast/StockPricePublisher.java diff --git a/spi/src/examples/java/org/reactivestreams/example/multicast/StockPriceSubscriber.java b/api/src/examples/java/org/reactivestreams/example/multicast/StockPriceSubscriber.java similarity index 100% rename from spi/src/examples/java/org/reactivestreams/example/multicast/StockPriceSubscriber.java rename to api/src/examples/java/org/reactivestreams/example/multicast/StockPriceSubscriber.java diff --git a/spi/src/examples/java/org/reactivestreams/example/unicast/InfiniteIncrementNumberPublisher.java b/api/src/examples/java/org/reactivestreams/example/unicast/InfiniteIncrementNumberPublisher.java similarity index 100% rename from spi/src/examples/java/org/reactivestreams/example/unicast/InfiniteIncrementNumberPublisher.java rename to api/src/examples/java/org/reactivestreams/example/unicast/InfiniteIncrementNumberPublisher.java diff --git a/spi/src/examples/java/org/reactivestreams/example/unicast/NumberSubscriberThatHopsThreads.java b/api/src/examples/java/org/reactivestreams/example/unicast/NumberSubscriberThatHopsThreads.java similarity index 100% rename from spi/src/examples/java/org/reactivestreams/example/unicast/NumberSubscriberThatHopsThreads.java rename to api/src/examples/java/org/reactivestreams/example/unicast/NumberSubscriberThatHopsThreads.java diff --git a/spi/src/examples/java/org/reactivestreams/example/unicast/UnicastExample.java b/api/src/examples/java/org/reactivestreams/example/unicast/UnicastExample.java similarity index 100% rename from spi/src/examples/java/org/reactivestreams/example/unicast/UnicastExample.java rename to api/src/examples/java/org/reactivestreams/example/unicast/UnicastExample.java diff --git a/spi/src/main/java/org/reactivestreams/Publisher.java b/api/src/main/java/org/reactivestreams/Publisher.java similarity index 100% rename from spi/src/main/java/org/reactivestreams/Publisher.java rename to api/src/main/java/org/reactivestreams/Publisher.java diff --git a/spi/src/main/java/org/reactivestreams/Subscriber.java b/api/src/main/java/org/reactivestreams/Subscriber.java similarity index 100% rename from spi/src/main/java/org/reactivestreams/Subscriber.java rename to api/src/main/java/org/reactivestreams/Subscriber.java diff --git a/spi/src/main/java/org/reactivestreams/Subscription.java b/api/src/main/java/org/reactivestreams/Subscription.java similarity index 100% rename from spi/src/main/java/org/reactivestreams/Subscription.java rename to api/src/main/java/org/reactivestreams/Subscription.java From bb67400297871b97c7ed6ab741020f43a5fda623 Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Thu, 24 Apr 2014 11:13:58 -0700 Subject: [PATCH 18/19] Remove comparison with other technologies. Focus on spec. --- README.md | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/README.md b/README.md index 553e9429..1b5bc4ed 100644 --- a/README.md +++ b/README.md @@ -19,7 +19,7 @@ The latest preview release is available on Maven Central as Handling streams of data—especially “live” data whose volume is not predetermined—requires special care in an asynchronous system. The most prominent issue is that resource consumption needs to be carefully controlled such that a fast data source does not overwhelm the stream destination. Asynchrony is needed in order to enable the parallel use of computing resources, on collaborating network hosts or multiple CPU cores within a single machine. -The main goal of Reactive Streams is to govern the exchange of stream data across an asynchronous boundary—think passing elements on to another thread or thread-pool—while ensuring that the receiving side is not forced to buffer arbitrary amounts of data. In other words, backpressure is an integral part of this model in order to allow the queues which mediate between threads to be bounded. The benefits of asynchronous processing would be negated if the communication of backpressure were synchronous (see also the [Reactive Manifesto](http://reactivemanifesto.org/)), therefore care has been taken to mandate fully non-blocking and asynchronous behavior of all aspects of a Reactive Streams implementation. +The main goal of Reactive Streams is to govern the exchange of stream data across an asynchronous boundary – think passing elements on to another thread or thread-pool — while ensuring that the receiving side is not forced to buffer arbitrary amounts of data. In other words, backpressure is an integral part of this model in order to allow the queues which mediate between threads to be bounded. The benefits of asynchronous processing would be negated if the communication of backpressure were synchronous (see also the [Reactive Manifesto](http://reactivemanifesto.org/)), therefore care has been taken to mandate fully non-blocking and asynchronous behavior of all aspects of a Reactive Streams implementation. It is the intention of this specification to allow the creation of many conforming implementations, which by virtue of abiding by the rules will be able to interoperate smoothly, preserving the aforementioned benefits and characteristics across the whole processing graph of a stream application. @@ -40,14 +40,6 @@ The Reactive Streams specification consists of the following parts: Implementations are free to implement additional features not covered by the specification as long as they conform to the API requirements and pass the tests in the TCK. -#### Comparison with related technologies #### - -In contrast to reactive streams described in this document, a Future represents exactly one element (or a failure) that is produced asynchronosly while streams can provide a potentially unbounded number of elements. - -Compared to Rx, the SPI described here prescribes a mandatory, non-blocking way to handle back-pressure and requires the processing of an element by a dowstream component to be dispatched asynchronously. - -Iteratees are an abstraction used for consuming a stream, often for parsing it. In this sense they are not a stream transformation or combination tool in themselves. - ### API Components ### The API consists of the following components that are required to be provided by Reactive Stream implementations: From ff0557c922cdb38e6ee1889a9dfa56d194a66d31 Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Thu, 24 Apr 2014 11:35:29 -0700 Subject: [PATCH 19/19] Clarify Sync vs Async ... start of code examples --- README.md | 35 +++++++++++++++++++++++++++++++---- 1 file changed, 31 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 1b5bc4ed..0d636109 100644 --- a/README.md +++ b/README.md @@ -93,15 +93,42 @@ At any time the `Publisher` may signal that it is not able to provide more eleme > For example a `Publisher` representing a strict collection signals completion to its subscriber after it provided all the elements. Now a later subscriber might still receive the whole collection before receiving onComplete. -### Asynchronous processing ### +### Asynchronous vs Synchronous Processing ### -The Reactive Streams API prescribes that all processing of elements (`onNext`) or termination signals (`onError`, `onComplete`) happens outside of the execution stack of the `Publisher`. This is achieved by scheduling the processing to run asynchronously, possibly on a different thread. The Subscriber should make sure to minimize the amount of processing steps used to initiate this process, meaning that all its API-mandated methods shall return as quickly as possible. Note that this does not mean synchronous processing is not permitted; see "Relationship to synchronous stream-processing" below. +The Reactive Streams API prescribes that all processing of elements (`onNext`) or termination signals (`onError`, `onComplete`) do not *block* the `Publisher`. Each of the `on*` handlers can process the events synchronously or asynchronously. -In contrast to communicating back-pressure by blocking the publisher, a non-blocking solution needs to communicate demand through a dedicated control channel. This channel is provided by the `Subscription`: the subscriber controls the maximum amount of future elements it is willing receive by sending explicit demand tokens (by calling request(int)). +For example, this `onNext` implementation does synchronous transformation and enqueues the result for further asynchronous processing: + +```java +void onNext(T t) { + queue.offer(transform(t)); +} +``` + +In a push-based model such as this doing asynchronous processing, back-pressure needs to be provided otherwise buffer bloat can occur. + +In contrast to communicating back-pressure by blocking the publisher, a non-blocking solution needs to communicate demand through a dedicated control channel. This channel is provided by the `Subscription`: the `Subscriber` controls the maximum amount of future elements it is willing receive by sending explicit demand tokens (by calling `request(int)`). + +Expanding on the `onNext` example above, as the queue is drained and processed asynchronously it would signal demand such as this: + +```java +// TODO replace with fully functioning code example rather than this pseudo-code snippet +void process() { + eventLoop.schedule(() -> { + T t; + while((t = queue.poll()) != null) { + doWork(t); + if(queue.size() < THRESHOLD) { + subscription.request(queue.capacity()); + } + } + }) +} +``` #### Relationship to synchronous stream-processing #### -This document describes asynchronous, non-blocking backpressure boundaries but in between those boundaries any kind of synchronous stream processing model is permitted. This is useful for performance optimization (eliminating inter-thread synchronization) and it conveniently transports backpressure implicitly (the calling method cannot continue while the call lasts). As an example consider a section consisting of three connected Processors, A, B and C: +This document defines a protocol for asynchronous, non-blocking backpressure boundaries but in between those boundaries any kind of synchronous stream processing model is permitted. This is useful for performance optimization (eliminating inter-thread synchronization) and it conveniently transports backpressure implicitly (the calling method cannot continue while the call lasts). As an example consider a section consisting of three connected Processors, A, B and C: (...) --> A[S1 --> S2] --> B[S3 --> S4 --> S5] --> C[S6] --> (...)