From f3377211544147fc2f00cf9d487635e3e80e4ddf Mon Sep 17 00:00:00 2001 From: akarnokd Date: Wed, 6 Dec 2017 14:38:48 +0100 Subject: [PATCH 1/4] Fix the IdentityFlowProcessorVerification class --- .../IdentityFlowProcessorVerification.java | 35 +- .../tck/flow/LockstepFlowProcessorTest.java | 338 ++++++++++++++++++ 2 files changed, 345 insertions(+), 28 deletions(-) create mode 100644 tck-flow/src/test/java/org/reactivestreams/tck/flow/LockstepFlowProcessorTest.java diff --git a/tck-flow/src/main/java/org/reactivestreams/tck/flow/IdentityFlowProcessorVerification.java b/tck-flow/src/main/java/org/reactivestreams/tck/flow/IdentityFlowProcessorVerification.java index 4e899afe..ff36fa2d 100644 --- a/tck-flow/src/main/java/org/reactivestreams/tck/flow/IdentityFlowProcessorVerification.java +++ b/tck-flow/src/main/java/org/reactivestreams/tck/flow/IdentityFlowProcessorVerification.java @@ -11,14 +11,14 @@ package org.reactivestreams.tck.flow; -import org.reactivestreams.Processor; -import org.reactivestreams.Publisher; -import org.reactivestreams.Subscriber; +import org.reactivestreams.*; import org.reactivestreams.tck.IdentityProcessorVerification; import org.reactivestreams.tck.TestEnvironment; import org.reactivestreams.tck.flow.support.SubscriberWhiteboxVerificationRules; import org.reactivestreams.tck.flow.support.PublisherVerificationRules; +import java.util.concurrent.Flow; + public abstract class IdentityFlowProcessorVerification extends IdentityProcessorVerification implements SubscriberWhiteboxVerificationRules, PublisherVerificationRules { @@ -34,39 +34,18 @@ public IdentityFlowProcessorVerification(TestEnvironment env, long publisherRefe super(env, publisherReferenceGCTimeoutMillis, processorBufferSize); } - protected abstract Publisher createFailedFlowPublisher(); - - protected abstract Processor createIdentityFlowProcessor(int bufferSize); - - protected abstract Subscriber createFlowSubscriber(FlowSubscriberWhiteboxVerification.WhiteboxSubscriberProbe probe); + protected abstract Flow.Publisher createFailedFlowPublisher(); - protected abstract Publisher createFlowHelperPublisher(long elements); - - protected abstract Publisher createFlowPublisher(long elements); - - @Override - public final Publisher createHelperPublisher(long elements) { - return createFlowHelperPublisher(elements); - } + protected abstract Flow.Processor createIdentityFlowProcessor(int bufferSize); @Override public final Processor createIdentityProcessor(int bufferSize) { - return createIdentityFlowProcessor(bufferSize); + return FlowAdapters.toProcessor(createIdentityFlowProcessor(bufferSize)); } @Override public final Publisher createFailedPublisher() { - return createFailedFlowPublisher(); - } - - @Override - public final Publisher createPublisher(long elements) { - return createFlowPublisher(elements); - } - - @Override - public final Subscriber createSubscriber(FlowSubscriberWhiteboxVerification.WhiteboxSubscriberProbe probe) { - return createFlowSubscriber(probe); + return FlowAdapters.toPublisher(createFailedFlowPublisher()); } } diff --git a/tck-flow/src/test/java/org/reactivestreams/tck/flow/LockstepFlowProcessorTest.java b/tck-flow/src/test/java/org/reactivestreams/tck/flow/LockstepFlowProcessorTest.java new file mode 100644 index 00000000..d232cab8 --- /dev/null +++ b/tck-flow/src/test/java/org/reactivestreams/tck/flow/LockstepFlowProcessorTest.java @@ -0,0 +1,338 @@ +/************************************************************************ + * Licensed under Public Domain (CC0) * + * * + * To the extent possible under law, the person who associated CC0 with * + * this code has waived all copyright and related or neighboring * + * rights to this code. * + * * + * You should have received a copy of the CC0 legalcode along with this * + * work. If not, see .* + ************************************************************************/ + +package org.reactivestreams.tck.flow; + +import org.reactivestreams.*; +import org.reactivestreams.tck.*; +import org.testng.annotations.Test; + +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +@Test +public class LockstepFlowProcessorTest extends IdentityFlowProcessorVerification { + + public LockstepFlowProcessorTest() { + super(new TestEnvironment()); + } + @Override + public Flow.Processor createIdentityFlowProcessor(int bufferSize) { + return new LockstepProcessor(); + } + + @Override + public Flow.Publisher createFailedFlowPublisher() { + LockstepProcessor proc = new LockstepProcessor(); + proc.onError(new Exception()); + return proc; + } + + @Override + public ExecutorService publisherExecutorService() { + return Executors.newCachedThreadPool(); + } + + @Override + public Integer createElement(int element) { + return element; + } + + @Override + public long maxSupportedSubscribers() { + return 2; + } + + @Override + public boolean doesCoordinatedEmission() { + return true; + } + + static final class LockstepProcessor implements Flow.Processor { + + final AtomicReference[]> subscribers = + new AtomicReference[]>(EMPTY); + + static final LockstepSubscription[] EMPTY = new LockstepSubscription[0]; + static final LockstepSubscription[] TERMINATED = new LockstepSubscription[0]; + + volatile boolean done; + Throwable error; + + final AtomicReference upstream = + new AtomicReference(); + + final AtomicReferenceArray queue = + new AtomicReferenceArray(BUFFER_MASK + 1); + + final AtomicLong producerIndex = new AtomicLong(); + + final AtomicLong consumerIndex = new AtomicLong(); + + final AtomicInteger wip = new AtomicInteger(); + + static final int BUFFER_MASK = 127; + + int consumed; + + @Override + public void subscribe(Flow.Subscriber s) { + LockstepSubscription subscription = new LockstepSubscription(s, this); + s.onSubscribe(subscription); + if (add(subscription)) { + if (subscription.isCancelled()) { + remove(subscription); + } else { + drain(); + } + } else { + Throwable ex = error; + if (ex != null) { + s.onError(ex); + } else { + s.onComplete(); + } + } + } + + boolean add(LockstepSubscription sub) { + for (;;) { + LockstepSubscription[] a = subscribers.get(); + if (a == TERMINATED) { + return false; + } + int n = a.length; + LockstepSubscription[] b = new LockstepSubscription[n + 1]; + System.arraycopy(a, 0, b, 0, n); + b[n] = sub; + if (subscribers.compareAndSet(a, b)) { + return true; + } + } + } + + void remove(LockstepSubscription sub) { + for (;;) { + LockstepSubscription[] a = subscribers.get(); + int n = a.length; + + if (n == 0) { + break; + } + + int j = -1; + for (int i = 0; i < n; i++) { + if (a[i] == sub) { + j = i; + break; + } + } + + if (j < 0) { + break; + } + LockstepSubscription[] b; + if (n == 1) { + b = TERMINATED; + } else { + b = new LockstepSubscription[n - 1]; + System.arraycopy(a, 0, b, 0, j); + System.arraycopy(a, j + 1, b, j, n - j - 1); + } + if (subscribers.compareAndSet(a, b)) { + if (b == TERMINATED) { + Flow.Subscription s = upstream.getAndSet(CancelledSubscription.INSTANCE); + if (s != null) { + s.cancel(); + } + } + break; + } + } + } + + @Override + public void onSubscribe(Flow.Subscription s) { + if (upstream.compareAndSet(null, s)) { + s.request(BUFFER_MASK + 1); + } else { + s.cancel(); + } + } + + @Override + public void onNext(T t) { + if (t == null) { + throw new NullPointerException("t == null"); + } + long pi = producerIndex.get(); + queue.lazySet((int)pi & BUFFER_MASK, t); + producerIndex.lazySet(pi + 1); + drain(); + } + + @Override + public void onError(Throwable t) { + if (t == null) { + throw new NullPointerException("t == null"); + } + error = t; + done = true; + drain(); + } + + @Override + public void onComplete() { + done = true; + drain(); + } + + void drain() { + if (wip.getAndIncrement() != 0) { + return; + } + + int limit = (BUFFER_MASK + 1) - ((BUFFER_MASK + 1) >> 2); + int missed = 1; + for (;;) { + + for (;;) { + LockstepSubscription[] subscribers = this.subscribers.get(); + int n = subscribers.length; + + long ci = consumerIndex.get(); + + boolean d = done; + boolean empty = producerIndex.get() == ci; + + if (d) { + Throwable ex = error; + if (ex != null) { + for (LockstepSubscription sub : this.subscribers.getAndSet(TERMINATED)) { + sub.subscriber.onError(ex); + } + break; + } else if (empty) { + for (LockstepSubscription sub : this.subscribers.getAndSet(TERMINATED)) { + sub.subscriber.onComplete(); + } + break; + } + } + + if (n != 0 && !empty) { + long ready = Long.MAX_VALUE; + int c = 0; + for (LockstepSubscription sub : subscribers) { + long req = sub.get(); + if (req != Long.MIN_VALUE) { + ready = Math.min(ready, req - sub.emitted); + c++; + } + } + + if (ready != 0 && c != 0) { + int offset = (int) ci & BUFFER_MASK; + T value = queue.get(offset); + queue.lazySet(offset, null); + consumerIndex.lazySet(ci + 1); + + for (LockstepSubscription sub : subscribers) { + sub.subscriber.onNext(value); + sub.emitted++; + } + + if (++consumed == limit) { + consumed = 0; + upstream.get().request(limit); + } + } else { + break; + } + } else { + break; + } + } + + missed = wip.addAndGet(-missed); + if (missed == 0) { + break; + } + } + } + + static final class LockstepSubscription extends AtomicLong + implements Flow.Subscription { + + final Flow.Subscriber subscriber; + + final LockstepProcessor parent; + + long emitted; + + LockstepSubscription(Flow.Subscriber subscriber, LockstepProcessor parent) { + this.subscriber = subscriber; + this.parent = parent; + } + + @Override + public void request(long n) { + if (n <= 0L) { + cancel(); + subscriber.onError(new IllegalArgumentException("ยง3.9 violated: positive request amount required")); + return; + } + for (;;) { + long current = get(); + if (current == Long.MIN_VALUE || current == Long.MAX_VALUE) { + break; + } + + long updated = current + n; + if (updated < 0L) { + updated = Long.MAX_VALUE; + } + if (compareAndSet(current, updated)) { + parent.drain(); + break; + } + } + } + + @Override + public void cancel() { + if (getAndSet(Long.MIN_VALUE) != Long.MIN_VALUE) { + parent.remove(this); + parent.drain(); + } + } + + boolean isCancelled() { + return get() == Long.MIN_VALUE; + } + } + } + + enum CancelledSubscription implements Flow.Subscription { + + INSTANCE; + + @Override + public void request(long n) { + // Subscription already cancelled + } + + @Override + public void cancel() { + // Subscription already cancelled + } + } +} From 4d14e4b04de52b56868ff854d31669673db264ac Mon Sep 17 00:00:00 2001 From: akarnokd Date: Wed, 6 Dec 2017 14:43:39 +0100 Subject: [PATCH 2/4] Replicate Javadoc from the bridged original methods --- .../flow/IdentityFlowProcessorVerification.java | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/tck-flow/src/main/java/org/reactivestreams/tck/flow/IdentityFlowProcessorVerification.java b/tck-flow/src/main/java/org/reactivestreams/tck/flow/IdentityFlowProcessorVerification.java index ff36fa2d..ebeeedee 100644 --- a/tck-flow/src/main/java/org/reactivestreams/tck/flow/IdentityFlowProcessorVerification.java +++ b/tck-flow/src/main/java/org/reactivestreams/tck/flow/IdentityFlowProcessorVerification.java @@ -34,8 +34,23 @@ public IdentityFlowProcessorVerification(TestEnvironment env, long publisherRefe super(env, publisherReferenceGCTimeoutMillis, processorBufferSize); } + /** + * By implementing this method, additional TCK tests concerning a "failed" Flow publishers will be run. + * + * The expected behaviour of the {@link Flow.Publisher} returned by this method is hand out a subscription, + * followed by signalling {@code onError} on it, as specified by Rule 1.9. + * + * If you ignore these additional tests, return {@code null} from this method. + */ protected abstract Flow.Publisher createFailedFlowPublisher(); + /** + * This is the main method you must implement in your test incarnation. + * It must create a {@link Flow.Publisher}, which simply forwards all stream elements from its upstream + * to its downstream. It must be able to internally buffer the given number of elements. + * + * @param bufferSize number of elements the processor is required to be able to buffer. + */ protected abstract Flow.Processor createIdentityFlowProcessor(int bufferSize); @Override From 2538c5ec823081c62b85ccd5610ef9179f3b35bb Mon Sep 17 00:00:00 2001 From: akarnokd Date: Wed, 6 Dec 2017 15:27:35 +0100 Subject: [PATCH 3/4] Fix Javadoc mentioning Publisher instead of Processor --- .../tck/flow/IdentityFlowProcessorVerification.java | 2 +- .../org/reactivestreams/tck/IdentityProcessorVerification.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/tck-flow/src/main/java/org/reactivestreams/tck/flow/IdentityFlowProcessorVerification.java b/tck-flow/src/main/java/org/reactivestreams/tck/flow/IdentityFlowProcessorVerification.java index ebeeedee..b3e0830b 100644 --- a/tck-flow/src/main/java/org/reactivestreams/tck/flow/IdentityFlowProcessorVerification.java +++ b/tck-flow/src/main/java/org/reactivestreams/tck/flow/IdentityFlowProcessorVerification.java @@ -46,7 +46,7 @@ public IdentityFlowProcessorVerification(TestEnvironment env, long publisherRefe /** * This is the main method you must implement in your test incarnation. - * It must create a {@link Flow.Publisher}, which simply forwards all stream elements from its upstream + * It must create a {@link Flow.Processor}, which simply forwards all stream elements from its upstream * to its downstream. It must be able to internally buffer the given number of elements. * * @param bufferSize number of elements the processor is required to be able to buffer. diff --git a/tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java b/tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java index 56456ac2..e6e5e7e0 100644 --- a/tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java +++ b/tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java @@ -128,7 +128,7 @@ public boolean skipStochasticTests() { /** * This is the main method you must implement in your test incarnation. - * It must create a Publisher, which simply forwards all stream elements from its upstream + * It must create a {@link Processor}, which simply forwards all stream elements from its upstream * to its downstream. It must be able to internally buffer the given number of elements. * * @param bufferSize number of elements the processor is required to be able to buffer. From d1679233ff72b871980ee4a915f0d42352e3b726 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Thu, 7 Dec 2017 09:41:04 +0100 Subject: [PATCH 4/4] Adjust wording on createFailedPublisher --- .../tck/flow/IdentityFlowProcessorVerification.java | 2 +- .../org/reactivestreams/tck/IdentityProcessorVerification.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/tck-flow/src/main/java/org/reactivestreams/tck/flow/IdentityFlowProcessorVerification.java b/tck-flow/src/main/java/org/reactivestreams/tck/flow/IdentityFlowProcessorVerification.java index b3e0830b..f4479e71 100644 --- a/tck-flow/src/main/java/org/reactivestreams/tck/flow/IdentityFlowProcessorVerification.java +++ b/tck-flow/src/main/java/org/reactivestreams/tck/flow/IdentityFlowProcessorVerification.java @@ -40,7 +40,7 @@ public IdentityFlowProcessorVerification(TestEnvironment env, long publisherRefe * The expected behaviour of the {@link Flow.Publisher} returned by this method is hand out a subscription, * followed by signalling {@code onError} on it, as specified by Rule 1.9. * - * If you ignore these additional tests, return {@code null} from this method. + * If you want to ignore these additional tests, return {@code null} from this method. */ protected abstract Flow.Publisher createFailedFlowPublisher(); diff --git a/tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java b/tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java index e6e5e7e0..361e0821 100644 --- a/tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java +++ b/tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java @@ -141,7 +141,7 @@ public boolean skipStochasticTests() { * The expected behaviour of the {@link Publisher} returned by this method is hand out a subscription, * followed by signalling {@code onError} on it, as specified by Rule 1.9. * - * If you ignore these additional tests, return {@code null} from this method. + * If you want to ignore these additional tests, return {@code null} from this method. */ public abstract Publisher createFailedPublisher();