diff --git a/examples/src/main/java/org/reactivestreams/example/unicast/RangePublisher.java b/examples/src/main/java/org/reactivestreams/example/unicast/RangePublisher.java new file mode 100644 index 00000000..67c3103d --- /dev/null +++ b/examples/src/main/java/org/reactivestreams/example/unicast/RangePublisher.java @@ -0,0 +1,242 @@ +/************************************************************************ + * Licensed under Public Domain (CC0) * + * * + * To the extent possible under law, the person who associated CC0 with * + * this code has waived all copyright and related or neighboring * + * rights to this code. * + * * + * You should have received a copy of the CC0 legalcode along with this * + * work. If not, see .* + ************************************************************************/ + +package org.reactivestreams.example.unicast; + +import org.reactivestreams.*; + +import java.util.concurrent.atomic.AtomicLong; + +/** + * A synchronous implementation of the {@link Publisher} that can + * be subscribed to multiple times and each individual subscription + * will receive range of monotonically increasing integer values on demand. + */ +public final class RangePublisher implements Publisher { + + /** The starting value of the range. */ + final int start; + + /** The number of items to emit. */ + final int count; + + /** + * Constructs a RangePublisher instance with the given start and count values + * that yields a sequence of [start, start + count). + * @param start the starting value of the range + * @param count the number of items to emit + */ + public RangePublisher(int start, int count) { + this.start = start; + this.count = count; + } + + @Override + public void subscribe(Subscriber subscriber) { + // As per rule 1.11, we have decided to support multiple subscribers + // in a unicast configuration for this `Publisher` implementation. + + // As per rule 1.09, we need to throw a `java.lang.NullPointerException` + // if the `Subscriber` is `null` + if (subscriber == null) throw null; + + // As per 2.13, this method must return normally (i.e. not throw). + try { + subscriber.onSubscribe(new RangeSubscription(subscriber, start, start + count)); + } catch (Throwable ex) { + new IllegalStateException(subscriber + " violated the Reactive Streams rule 2.13 " + + "by throwing an exception from onSubscribe.", ex) + // When onSubscribe fails this way, we don't know what state the + // subscriber is thus calling onError may cause more crashes. + .printStackTrace(); + } + } + + /** + * A Subscription implementation that holds the current downstream + * requested amount and responds to the downstream's request() and + * cancel() calls. + */ + static final class RangeSubscription + // We are using this `AtomicLong` to make sure that this `Subscription` + // doesn't run concurrently with itself, which would violate rule 1.3 + // among others (no concurrent notifications). + // The atomic transition from 0L to N > 0L will ensure this. + extends AtomicLong implements Subscription { + + private static final long serialVersionUID = -9000845542177067735L; + + /** The Subscriber we are emitting integer values to. */ + final Subscriber downstream; + + /** The end index (exclusive). */ + final int end; + + /** + * The current index and within the [start, start + count) range that + * will be emitted as downstream.onNext(). + */ + int index; + + /** + * Indicates the emission should stop. + */ + volatile boolean cancelled; + + /** + * Holds onto the IllegalArgumentException (containing the offending stacktrace) + * indicating there was a non-positive request() call from the downstream. + */ + volatile Throwable invalidRequest; + + /** + * Constructs a stateful RangeSubscription that emits signals to the given + * downstream from an integer range of [start, end). + * @param downstream the Subscriber receiving the integer values and the completion signal. + * @param start the first integer value emitted, start of the range + * @param end the end of the range, exclusive + */ + RangeSubscription(Subscriber downstream, int start, int end) { + this.downstream = downstream; + this.index = start; + this.end = end; + } + + // This method will register inbound demand from our `Subscriber` and + // validate it against rule 3.9 and rule 3.17 + @Override + public void request(long n) { + // Non-positive requests should be honored with IllegalArgumentException + if (n <= 0L) { + invalidRequest = new IllegalArgumentException("ยง3.9: non-positive requests are not allowed!"); + n = 1; + } + // Downstream requests are cumulative and may come from any thread + for (;;) { + long requested = get(); + long update = requested + n; + // As governed by rule 3.17, when demand overflows `Long.MAX_VALUE` + // we treat the signalled demand as "effectively unbounded" + if (update < 0L) { + update = Long.MAX_VALUE; + } + // atomically update the current requested amount + if (compareAndSet(requested, update)) { + // if there was no prior request amount, we start the emission loop + if (requested == 0L) { + emit(update); + } + break; + } + } + } + + // This handles cancellation requests, and is idempotent, thread-safe and not + // synchronously performing heavy computations as specified in rule 3.5 + @Override + public void cancel() { + // Indicate to the emission loop it should stop. + cancelled = true; + } + + void emit(long currentRequested) { + // Load fields to avoid re-reading them from memory due to volatile accesses in the loop. + Subscriber downstream = this.downstream; + int index = this.index; + int end = this.end; + int emitted = 0; + + try { + for (; ; ) { + // Check if there was an invalid request and then report its exception + // as mandated by rule 3.9. The stacktrace in it should + // help locate the faulty logic in the Subscriber. + Throwable invalidRequest = this.invalidRequest; + if (invalidRequest != null) { + // When we signal onError, the subscription must be considered as cancelled, as per rule 1.6 + cancelled = true; + + downstream.onError(invalidRequest); + return; + } + + // Loop while the index hasn't reached the end and we haven't + // emitted all that's been requested + while (index != end && emitted != currentRequested) { + // to make sure that we follow rule 1.8, 3.6 and 3.7 + // We stop if cancellation was requested. + if (cancelled) { + return; + } + + downstream.onNext(index); + + // Increment the index for the next possible emission. + index++; + // Increment the emitted count to prevent overflowing the downstream. + emitted++; + } + + // If the index reached the end, we complete the downstream. + if (index == end) { + // to make sure that we follow rule 1.8, 3.6 and 3.7 + // Unless cancellation was requested by the last onNext. + if (!cancelled) { + // We need to consider this `Subscription` as cancelled as per rule 1.6 + // Note, however, that this state is not observable from the outside + // world and since we leave the loop with requested > 0L, any + // further request() will never trigger the loop. + cancelled = true; + + downstream.onComplete(); + } + return; + } + + // Did the requested amount change while we were looping? + long freshRequested = get(); + if (freshRequested == currentRequested) { + // Save where the loop has left off: the next value to be emitted + this.index = index; + // Atomically subtract the previously requested (also emitted) amount + currentRequested = addAndGet(-currentRequested); + // If there was no new request in between get() and addAndGet(), we simply quit + // The next 0 to N transition in request() will trigger the next emission loop. + if (currentRequested == 0L) { + break; + } + // Looks like there were more async requests, reset the emitted count and continue. + emitted = 0; + } else { + // Yes, avoid the atomic subtraction and resume. + // emitted != currentRequest in this case and index + // still points to the next value to be emitted + currentRequested = freshRequested; + } + } + } catch (Throwable ex) { + // We can only get here if `onNext`, `onError` or `onComplete` threw, and they + // are not allowed to according to 2.13, so we can only cancel and log here. + // If `onError` throws an exception, this is a spec violation according to rule 1.9, + // and all we can do is to log it. + + // Make sure that we are cancelled, since we cannot do anything else + // since the `Subscriber` is faulty. + cancelled = true; + + // We can't report the failure to onError as the Subscriber is unreliable. + (new IllegalStateException(downstream + " violated the Reactive Streams rule 2.13 by " + + "throwing an exception from onNext, onError or onComplete.", ex)) + .printStackTrace(); + } + } + } +} diff --git a/examples/src/test/java/org/reactivestreams/example/unicast/RangePublisherTest.java b/examples/src/test/java/org/reactivestreams/example/unicast/RangePublisherTest.java new file mode 100644 index 00000000..964d395e --- /dev/null +++ b/examples/src/test/java/org/reactivestreams/example/unicast/RangePublisherTest.java @@ -0,0 +1,32 @@ +/************************************************************************ + * Licensed under Public Domain (CC0) * + * * + * To the extent possible under law, the person who associated CC0 with * + * this code has waived all copyright and related or neighboring * + * rights to this code. * + * * + * You should have received a copy of the CC0 legalcode along with this * + * work. If not, see .* + ************************************************************************/ + +package org.reactivestreams.example.unicast; + +import org.reactivestreams.Publisher; +import org.reactivestreams.example.unicast.RangePublisher; +import org.reactivestreams.tck.*; + +public class RangePublisherTest extends PublisherVerification { + public RangePublisherTest() { + super(new TestEnvironment(50, 50)); + } + + @Override + public Publisher createPublisher(long elements) { + return new RangePublisher(1, (int)elements); + } + + @Override + public Publisher createFailedPublisher() { + return null; + } +}