From 2dfe6d570576933460acf1024507d0ca3ab741af Mon Sep 17 00:00:00 2001 From: akarnokd Date: Tue, 3 Oct 2017 13:10:45 +0200 Subject: [PATCH 1/4] Example synchronous range Publisher --- .../example/lazycast/RangePublisher.java | 187 ++++++++++++++++++ .../example/lazycast/RangePublisherTest.java | 31 +++ 2 files changed, 218 insertions(+) create mode 100644 examples/src/main/java/org/reactivestreams/example/lazycast/RangePublisher.java create mode 100644 examples/src/test/java/org/reactivestreams/example/lazycast/RangePublisherTest.java diff --git a/examples/src/main/java/org/reactivestreams/example/lazycast/RangePublisher.java b/examples/src/main/java/org/reactivestreams/example/lazycast/RangePublisher.java new file mode 100644 index 00000000..f23a3f91 --- /dev/null +++ b/examples/src/main/java/org/reactivestreams/example/lazycast/RangePublisher.java @@ -0,0 +1,187 @@ +/************************************************************************ + * Licensed under Public Domain (CC0) * + * * + * To the extent possible under law, the person who associated CC0 with * + * this code has waived all copyright and related or neighboring * + * rights to this code. * + * * + * You should have received a copy of the CC0 legalcode along with this * + * work. If not, see .* + ************************************************************************/ + +package org.reactivestreams.example.lazycast; + +import org.reactivestreams.*; + +import java.util.concurrent.atomic.AtomicLong; + +/** + * A synchronous implementation of the {@link Publisher} that can + * be subscribed to multiple times and each individual subscription + * will receive range of monotonically increasing integer values on demand. + */ +public final class RangePublisher implements Publisher { + + /** The starting value of the range. */ + final int start; + + /** The number of items to emit. */ + final int count; + + /** + * Constructs a RangePublisher instance with the given start and count values + * that yields a sequence of [start, start + count). + * @param start the starting value of the range + * @param count the number of items to emit + */ + public RangePublisher(int start, int count) { + this.start = start; + this.count = count; + } + + @Override + public void subscribe(Subscriber s) { + s.onSubscribe(new RangeSubscription(s, start, start + count)); + } + + /** + * A Subscription implementation that holds the current downstream + * requested amount and responds to the downstream's request() and + * cancel() calls. + */ + static final class RangeSubscription extends AtomicLong implements Subscription { + + private static final long serialVersionUID = -9000845542177067735L; + + /** The Subscriber we are emitting integer values to. */ + final Subscriber downstream; + + /** The end index (exclusive). */ + final int end; + + /** + * The current index and within the [start, start + count) range that + * will be emitted as downstream.onNext(). + */ + int index; + + /** + * Indicates the emission should stop. + */ + volatile boolean cancelled; + + /** + * Holds onto the IllegalArgumentException (containing the offending stacktrace) + * indicating there was a non-positive request() call from the downstream. + */ + volatile Throwable invalidRequest; + + /** + * Constructs a stateful RangeSubscription that emits signals to the given + * downstream from an integer range of [start, end). + * @param downstream the Subscriber receiving the integer values and the completion signal. + * @param start the first integer value emitted, start of the range + * @param end the end of the range, exclusive + */ + RangeSubscription(Subscriber downstream, int start, int end) { + this.downstream = downstream; + this.index = start; + this.end = end; + } + + @Override + public void request(long n) { + // Non-positive requests should be honored with IllegalArgumentException + if (n <= 0L) { + invalidRequest = new IllegalArgumentException("ยง3.9: non-positive requests are not allowed!"); + n = 1; + } + // Downstream requests are cumulative and may come from any thread + for (;;) { + long requested = get(); + long update = requested + n; + // cap the amount at Long.MAX_VALUE + if (update < 0L) { + update = Long.MAX_VALUE; + } + // atomically update the current requested amount + if (compareAndSet(requested, update)) { + // if there was no prior request amount, we start the emission loop + if (requested == 0L) { + emit(update); + } + break; + } + } + } + + @Override + public void cancel() { + // Indicate to the emission loop it should stop. + cancelled = true; + } + + void emit(long currentRequested) { + // Load fields to avoid re-reading them from memory due to volatile accesses in the loop. + Subscriber downstream = this.downstream; + int index = this.index; + int end = this.end; + int emitted = 0; + + for (;;) { + // Check if there was an invalid request and then report it. + Throwable invalidRequest = this.invalidRequest; + if (invalidRequest != null) { + downstream.onError(invalidRequest); + return; + } + + // Loop while the index hasn't reached the end and we haven't + // emitted all that's been requested + while (index != end && emitted != currentRequested) { + // We stop if cancellation was requested + if (cancelled) { + return; + } + + downstream.onNext(index); + + // Increment the index for the next possible emission. + index++; + // Increment the emitted count to prevent overflowing the downstream. + emitted++; + } + + // If the index reached the end, we complete the downstream. + if (index == end) { + // Unless cancellation was requested by the last onNext. + if (!cancelled) { + downstream.onComplete(); + } + return; + } + + // Did the requested amount change while we were looping? + long freshRequested = get(); + if (freshRequested == currentRequested) { + // Save where the loop has left off: the next value to be emitted + this.index = index; + // Atomically subtract the previously requested (also emitted) amount + currentRequested = addAndGet(-currentRequested); + // If there was no new request in between get() and addAndGet(), we simply quit + // The next 0 to N transition in request() will trigger the next emission loop. + if (currentRequested == 0L) { + break; + } + // Looks like there were more async requests, reset the emitted count and continue. + emitted = 0; + } else { + // Yes, avoid the atomic subtraction and resume. + // emitted != currentRequest in this case and index + // still points to the next value to be emitted + currentRequested = freshRequested; + } + } + } + } +} diff --git a/examples/src/test/java/org/reactivestreams/example/lazycast/RangePublisherTest.java b/examples/src/test/java/org/reactivestreams/example/lazycast/RangePublisherTest.java new file mode 100644 index 00000000..fc976b11 --- /dev/null +++ b/examples/src/test/java/org/reactivestreams/example/lazycast/RangePublisherTest.java @@ -0,0 +1,31 @@ +/************************************************************************ + * Licensed under Public Domain (CC0) * + * * + * To the extent possible under law, the person who associated CC0 with * + * this code has waived all copyright and related or neighboring * + * rights to this code. * + * * + * You should have received a copy of the CC0 legalcode along with this * + * work. If not, see .* + ************************************************************************/ + +package org.reactivestreams.example.lazycast; + +import org.reactivestreams.Publisher; +import org.reactivestreams.tck.*; + +public class RangePublisherTest extends PublisherVerification { + public RangePublisherTest() { + super(new TestEnvironment(50, 50)); + } + + @Override + public Publisher createPublisher(long elements) { + return new RangePublisher(1, (int)elements); + } + + @Override + public Publisher createFailedPublisher() { + return null; + } +} From 0178bafeddf353c14816aaf34528dbf49b0ed5ab Mon Sep 17 00:00:00 2001 From: akarnokd Date: Tue, 3 Oct 2017 13:32:50 +0200 Subject: [PATCH 2/4] Udpated with rule numbers in comments --- .../example/lazycast/RangePublisher.java | 149 ++++++++++++------ 1 file changed, 101 insertions(+), 48 deletions(-) diff --git a/examples/src/main/java/org/reactivestreams/example/lazycast/RangePublisher.java b/examples/src/main/java/org/reactivestreams/example/lazycast/RangePublisher.java index f23a3f91..fdd29009 100644 --- a/examples/src/main/java/org/reactivestreams/example/lazycast/RangePublisher.java +++ b/examples/src/main/java/org/reactivestreams/example/lazycast/RangePublisher.java @@ -40,8 +40,24 @@ public RangePublisher(int start, int count) { } @Override - public void subscribe(Subscriber s) { - s.onSubscribe(new RangeSubscription(s, start, start + count)); + public void subscribe(Subscriber subscriber) { + // As per rule 1.11, we have decided to support multiple subscribers + // in a unicast configuration for this `Publisher` implementation. + + // As per rule 1.09, we need to throw a `java.lang.NullPointerException` + // if the `Subscriber` is `null` + if (subscriber == null) throw null; + + // As per 2.13, this method must return normally (i.e. not throw). + try { + subscriber.onSubscribe(new RangeSubscription(subscriber, start, start + count)); + } catch (Throwable ex) { + new IllegalStateException(subscriber + " violated the Reactive Streams rule 2.13 " + + "by throwing an exception from onSubscribe.", ex) + // When onSubscribe fails this way, we don't know what state the + // subscriber is thus calling onError may cause more crashes. + .printStackTrace(); + } } /** @@ -49,7 +65,12 @@ public void subscribe(Subscriber s) { * requested amount and responds to the downstream's request() and * cancel() calls. */ - static final class RangeSubscription extends AtomicLong implements Subscription { + static final class RangeSubscription + // We are using this `AtomicLong` to make sure that this `Subscription` + // doesn't run concurrently with itself, which would violate rule 1.3 + // among others (no concurrent notifications). + // The atomic transition from 0L to N > 0L will ensure this. + extends AtomicLong implements Subscription { private static final long serialVersionUID = -9000845542177067735L; @@ -89,6 +110,8 @@ static final class RangeSubscription extends AtomicLong implements Subscription this.end = end; } + // This method will register inbound demand from our `Subscriber` and + // validate it against rule 3.9 and rule 3.17 @Override public void request(long n) { // Non-positive requests should be honored with IllegalArgumentException @@ -100,7 +123,8 @@ public void request(long n) { for (;;) { long requested = get(); long update = requested + n; - // cap the amount at Long.MAX_VALUE + // As governed by rule 3.17, when demand overflows `Long.MAX_VALUE` + // we treat the signalled demand as "effectively unbounded" if (update < 0L) { update = Long.MAX_VALUE; } @@ -115,6 +139,8 @@ public void request(long n) { } } + // This handles cancellation requests, and is idempotent, thread-safe and not + // synchronously performing heavy computations as specified in rule 3.5 @Override public void cancel() { // Indicate to the emission loop it should stop. @@ -128,59 +154,86 @@ void emit(long currentRequested) { int end = this.end; int emitted = 0; - for (;;) { - // Check if there was an invalid request and then report it. - Throwable invalidRequest = this.invalidRequest; - if (invalidRequest != null) { - downstream.onError(invalidRequest); - return; - } + try { + for (; ; ) { + // Check if there was an invalid request and then report it. + Throwable invalidRequest = this.invalidRequest; + if (invalidRequest != null) { + // When we signal onError, the subscription must be considered as cancelled, as per rule 1.6 + cancelled = true; - // Loop while the index hasn't reached the end and we haven't - // emitted all that's been requested - while (index != end && emitted != currentRequested) { - // We stop if cancellation was requested - if (cancelled) { + downstream.onError(invalidRequest); return; } - downstream.onNext(index); - - // Increment the index for the next possible emission. - index++; - // Increment the emitted count to prevent overflowing the downstream. - emitted++; - } + // Loop while the index hasn't reached the end and we haven't + // emitted all that's been requested + while (index != end && emitted != currentRequested) { + // to make sure that we follow rule 1.8, 3.6 and 3.7 + // We stop if cancellation was requested. + if (cancelled) { + return; + } + + downstream.onNext(index); + + // Increment the index for the next possible emission. + index++; + // Increment the emitted count to prevent overflowing the downstream. + emitted++; + } - // If the index reached the end, we complete the downstream. - if (index == end) { - // Unless cancellation was requested by the last onNext. - if (!cancelled) { - downstream.onComplete(); + // If the index reached the end, we complete the downstream. + if (index == end) { + // to make sure that we follow rule 1.8, 3.6 and 3.7 + // Unless cancellation was requested by the last onNext. + if (!cancelled) { + // We need to consider this `Subscription` as cancelled as per rule 1.6 + // Note, however, that this state is not observable from the outside + // world and since we leave the loop with requested > 0L, any + // further request() will never trigger the loop. + cancelled = true; + + downstream.onComplete(); + } + return; } - return; - } - // Did the requested amount change while we were looping? - long freshRequested = get(); - if (freshRequested == currentRequested) { - // Save where the loop has left off: the next value to be emitted - this.index = index; - // Atomically subtract the previously requested (also emitted) amount - currentRequested = addAndGet(-currentRequested); - // If there was no new request in between get() and addAndGet(), we simply quit - // The next 0 to N transition in request() will trigger the next emission loop. - if (currentRequested == 0L) { - break; + // Did the requested amount change while we were looping? + long freshRequested = get(); + if (freshRequested == currentRequested) { + // Save where the loop has left off: the next value to be emitted + this.index = index; + // Atomically subtract the previously requested (also emitted) amount + currentRequested = addAndGet(-currentRequested); + // If there was no new request in between get() and addAndGet(), we simply quit + // The next 0 to N transition in request() will trigger the next emission loop. + if (currentRequested == 0L) { + break; + } + // Looks like there were more async requests, reset the emitted count and continue. + emitted = 0; + } else { + // Yes, avoid the atomic subtraction and resume. + // emitted != currentRequest in this case and index + // still points to the next value to be emitted + currentRequested = freshRequested; } - // Looks like there were more async requests, reset the emitted count and continue. - emitted = 0; - } else { - // Yes, avoid the atomic subtraction and resume. - // emitted != currentRequest in this case and index - // still points to the next value to be emitted - currentRequested = freshRequested; } + } catch (Throwable ex) { + // We can only get here if `onNext`, `onError` or `onComplete` threw, and they + // are not allowed to according to 2.13, so we can only cancel and log here. + // If `onError` throws an exception, this is a spec violation according to rule 1.9, + // and all we can do is to log it. + + // Make sure that we are cancelled, since we cannot do anything else + // since the `Subscriber` is faulty. + cancelled = true; + + // We can't report the failure to onError as the Subscriber is unreliable. + (new IllegalStateException(downstream + " violated the Reactive Streams rule 2.13 by " + + "throwing an exception from onNext, onError or onComplete.", ex)) + .printStackTrace(); } } } From 0fad86f504a1e496999de6cc260192e50f44b534 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Tue, 3 Oct 2017 13:43:54 +0200 Subject: [PATCH 3/4] Mentioning rule 3.9 again in emit() --- .../org/reactivestreams/example/lazycast/RangePublisher.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/examples/src/main/java/org/reactivestreams/example/lazycast/RangePublisher.java b/examples/src/main/java/org/reactivestreams/example/lazycast/RangePublisher.java index fdd29009..02e76dff 100644 --- a/examples/src/main/java/org/reactivestreams/example/lazycast/RangePublisher.java +++ b/examples/src/main/java/org/reactivestreams/example/lazycast/RangePublisher.java @@ -156,7 +156,9 @@ void emit(long currentRequested) { try { for (; ; ) { - // Check if there was an invalid request and then report it. + // Check if there was an invalid request and then report its exception + // as mandated by rule 3.9. The stacktrace in it should + // help locate the faulty logic in the Subscriber. Throwable invalidRequest = this.invalidRequest; if (invalidRequest != null) { // When we signal onError, the subscription must be considered as cancelled, as per rule 1.6 From e7eaa6b7c96b64c92032d2699e350aafd409c6e5 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Tue, 3 Oct 2017 14:43:09 +0200 Subject: [PATCH 4/4] Move classes to the unicast package. --- .../example/{lazycast => unicast}/RangePublisher.java | 2 +- .../example/{lazycast => unicast}/RangePublisherTest.java | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) rename examples/src/main/java/org/reactivestreams/example/{lazycast => unicast}/RangePublisher.java (99%) rename examples/src/test/java/org/reactivestreams/example/{lazycast => unicast}/RangePublisherTest.java (92%) diff --git a/examples/src/main/java/org/reactivestreams/example/lazycast/RangePublisher.java b/examples/src/main/java/org/reactivestreams/example/unicast/RangePublisher.java similarity index 99% rename from examples/src/main/java/org/reactivestreams/example/lazycast/RangePublisher.java rename to examples/src/main/java/org/reactivestreams/example/unicast/RangePublisher.java index 02e76dff..67c3103d 100644 --- a/examples/src/main/java/org/reactivestreams/example/lazycast/RangePublisher.java +++ b/examples/src/main/java/org/reactivestreams/example/unicast/RangePublisher.java @@ -9,7 +9,7 @@ * work. If not, see .* ************************************************************************/ -package org.reactivestreams.example.lazycast; +package org.reactivestreams.example.unicast; import org.reactivestreams.*; diff --git a/examples/src/test/java/org/reactivestreams/example/lazycast/RangePublisherTest.java b/examples/src/test/java/org/reactivestreams/example/unicast/RangePublisherTest.java similarity index 92% rename from examples/src/test/java/org/reactivestreams/example/lazycast/RangePublisherTest.java rename to examples/src/test/java/org/reactivestreams/example/unicast/RangePublisherTest.java index fc976b11..964d395e 100644 --- a/examples/src/test/java/org/reactivestreams/example/lazycast/RangePublisherTest.java +++ b/examples/src/test/java/org/reactivestreams/example/unicast/RangePublisherTest.java @@ -9,9 +9,10 @@ * work. If not, see .* ************************************************************************/ -package org.reactivestreams.example.lazycast; +package org.reactivestreams.example.unicast; import org.reactivestreams.Publisher; +import org.reactivestreams.example.unicast.RangePublisher; import org.reactivestreams.tck.*; public class RangePublisherTest extends PublisherVerification {