diff --git a/.travis.yml b/.travis.yml index 1a2d1004..4e945266 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,13 +1,29 @@ language: java sudo: required +dist: trusty +#group: edge + script: - ./gradlew check cache: directories: - $HOME/.gradle -jdk: - - oraclejdk8 - - openjdk6 + +before_install: + - export GRADLE_OPTS=-Xmx1024m + +matrix: + include: + - jdk: openjdk7 + - jdk: oraclejdk8 # JDK 1.8.0_131-b11 + - jdk: oraclejdk9 + +# Don't let Travis CI execute './gradlew assemble' by default +# From https://github.com/reactive-streams/reactive-streams-jvm/pull/383 +install: +# Display Gradle, JVM and other versions + - ./gradlew -version + env: global: - TERM=dumb diff --git a/build.gradle b/build.gradle index f57c047a..d5440998 100644 --- a/build.gradle +++ b/build.gradle @@ -42,11 +42,11 @@ subprojects { instructionReplace "Bundle-Vendor", "Reactive Streams SIG" instructionReplace "Bundle-Description", "Reactive Streams API" instructionReplace "Bundle-DocURL", "http://reactive-streams.org" - instructionReplace "Bundle-Version", "1.0.1-RC2" + instructionReplace "Bundle-Version", "1.0.1" } } - if (name in ["reactive-streams", "reactive-streams-tck", "reactive-streams-examples"]) { + if (name in ["reactive-streams", "reactive-streams-tck", "reactive-streams-examples", "reactive-streams-flow-bridge"]) { apply plugin: "maven" apply plugin: "signing" diff --git a/flow-bridge/.gitignore b/flow-bridge/.gitignore new file mode 100644 index 00000000..7447f89a --- /dev/null +++ b/flow-bridge/.gitignore @@ -0,0 +1 @@ +/bin \ No newline at end of file diff --git a/flow-bridge/build.gradle b/flow-bridge/build.gradle new file mode 100644 index 00000000..13debe28 --- /dev/null +++ b/flow-bridge/build.gradle @@ -0,0 +1,13 @@ +description = 'reactive-streams-flow-bridge' + +dependencies { + compile project(':reactive-streams') + + testCompile project(':reactive-streams-tck') + testCompile group: 'org.testng', name: 'testng', version: '5.14.10' +} +test.useTestNG() + +javadoc { + options.links("http://download.java.net/java/jdk9/docs/api") +} \ No newline at end of file diff --git a/flow-bridge/src/main/java/org/reactivestreams/ReactiveStreamsFlowBridge.java b/flow-bridge/src/main/java/org/reactivestreams/ReactiveStreamsFlowBridge.java new file mode 100644 index 00000000..a6914df8 --- /dev/null +++ b/flow-bridge/src/main/java/org/reactivestreams/ReactiveStreamsFlowBridge.java @@ -0,0 +1,357 @@ +/************************************************************************ + * Licensed under Public Domain (CC0) * + * * + * To the extent possible under law, the person who associated CC0 with * + * this code has waived all copyright and related or neighboring * + * rights to this code. * + * * + * You should have received a copy of the CC0 legalcode along with this * + * work. If not, see .* + ************************************************************************/ + +package org.reactivestreams; + +import java.util.concurrent.Flow; + +/** + * Bridge between Reactive Streams API and the Java 9 {@link java.util.concurrent.Flow} API. + */ +public final class ReactiveStreamsFlowBridge { + /** Utility class. */ + private ReactiveStreamsFlowBridge() { + throw new IllegalStateException("No instances!"); + } + + /** + * Converts a Flow Publisher into a Reactive Streams Publisher. + * @param the element type + * @param flowPublisher the source Flow Publisher to convert + * @return the equivalent Reactive Streams Publisher + */ + @SuppressWarnings("unchecked") + public static org.reactivestreams.Publisher toReactiveStreams( + Flow.Publisher flowPublisher) { + if (flowPublisher == null) { + throw new NullPointerException("flowPublisher"); + } + if (flowPublisher instanceof org.reactivestreams.Publisher) { + return (org.reactivestreams.Publisher)flowPublisher; + } + if (flowPublisher instanceof FlowPublisherFromReactive) { + return (org.reactivestreams.Publisher)(((FlowPublisherFromReactive)flowPublisher).reactiveStreams); + } + return new ReactivePublisherFromFlow(flowPublisher); + } + + /** + * Converts a Reactive Streams Publisher into a Flow Publisher. + * @param the element type + * @param reactiveStreamsPublisher the source Reactive Streams Publisher to convert + * @return the equivalent Flow Publisher + */ + @SuppressWarnings("unchecked") + public static Flow.Publisher toFlow( + org.reactivestreams.Publisher reactiveStreamsPublisher + ) { + if (reactiveStreamsPublisher == null) { + throw new NullPointerException("reactiveStreamsPublisher"); + } + if (reactiveStreamsPublisher instanceof Flow.Publisher) { + return (Flow.Publisher)reactiveStreamsPublisher; + } + if (reactiveStreamsPublisher instanceof ReactivePublisherFromFlow) { + return (Flow.Publisher)(((ReactivePublisherFromFlow)reactiveStreamsPublisher).flow); + } + return new FlowPublisherFromReactive(reactiveStreamsPublisher); + } + + /** + * Converts a Flow Processor into a Reactive Streams Processor. + * @param the input value type + * @param the output value type + * @param flowProcessor the source Flow Processor to convert + * @return the equivalent Reactive Streams Processor + */ + @SuppressWarnings("unchecked") + public static org.reactivestreams.Processor toReactiveStreams( + Flow.Processor flowProcessor + ) { + if (flowProcessor == null) { + throw new NullPointerException("flowProcessor"); + } + if (flowProcessor instanceof org.reactivestreams.Processor) { + return (org.reactivestreams.Processor)flowProcessor; + } + if (flowProcessor instanceof FlowToReactiveProcessor) { + return (org.reactivestreams.Processor)(((FlowToReactiveProcessor)flowProcessor).reactiveStreams); + } + return new ReactiveToFlowProcessor(flowProcessor); + } + + /** + * Converts a Reactive Streams Processor into a Flow Processor. + * @param the input value type + * @param the output value type + * @param reactiveStreamsProcessor the source Reactive Streams Processor to convert + * @return the equivalent Flow Processor + */ + @SuppressWarnings("unchecked") + public static Flow.Processor toFlow( + org.reactivestreams.Processor reactiveStreamsProcessor + ) { + if (reactiveStreamsProcessor == null) { + throw new NullPointerException("reactiveStreamsProcessor"); + } + if (reactiveStreamsProcessor instanceof Flow.Processor) { + return (Flow.Processor)reactiveStreamsProcessor; + } + if (reactiveStreamsProcessor instanceof ReactiveToFlowProcessor) { + return (Flow.Processor)(((ReactiveToFlowProcessor)reactiveStreamsProcessor).flow); + } + return new FlowToReactiveProcessor(reactiveStreamsProcessor); + } + + /** + * Wraps a Reactive Streams Subscription and converts the calls to a Flow Subscription. + */ + static final class FlowToReactiveSubscription implements Flow.Subscription { + private final org.reactivestreams.Subscription reactiveStreams; + + public FlowToReactiveSubscription(org.reactivestreams.Subscription reactive) { + this.reactiveStreams = reactive; + } + + @Override + public void request(long n) { + reactiveStreams.request(n); + } + + @Override + public void cancel() { + reactiveStreams.cancel(); + } + + } + + /** + * Wraps a Flow Subscription and converts the calls to a Reactive Streams Subscription. + */ + static final class ReactiveToFlowSubscription implements org.reactivestreams.Subscription { + private final Flow.Subscription flow; + + public ReactiveToFlowSubscription(Flow.Subscription flow) { + this.flow = flow; + } + + @Override + public void request(long n) { + flow.request(n); + } + + @Override + public void cancel() { + flow.cancel(); + } + + + } + + /** + * Wraps a Reactive Streams Subscriber and forwards methods of the Flow Subscriber to it. + * @param the element type + */ + static final class FlowToReactiveSubscriber + implements Flow.Subscriber { + private final org.reactivestreams.Subscriber reactiveStreams; + + public FlowToReactiveSubscriber(org.reactivestreams.Subscriber reactive) { + this.reactiveStreams = reactive; + } + + @Override + public void onSubscribe(Flow.Subscription subscription) { + reactiveStreams.onSubscribe(new ReactiveToFlowSubscription(subscription)); + } + + @Override + public void onNext(T item) { + reactiveStreams.onNext(item); + } + + @Override + public void onError(Throwable throwable) { + reactiveStreams.onError(throwable); + } + + @Override + public void onComplete() { + reactiveStreams.onComplete(); + } + + } + + /** + * Wraps a Reactive Streams Subscriber and forwards methods of the Flow Subscriber to it. + * @param the element type + */ + static final class ReactiveToFlowSubscriber + implements org.reactivestreams.Subscriber { + private final Flow.Subscriber flow; + + public ReactiveToFlowSubscriber(Flow.Subscriber flow) { + this.flow = flow; + } + + @Override + public void onSubscribe(org.reactivestreams.Subscription subscription) { + flow.onSubscribe(new FlowToReactiveSubscription(subscription)); + } + + @Override + public void onNext(T item) { + flow.onNext(item); + } + + @Override + public void onError(Throwable throwable) { + flow.onError(throwable); + } + + @Override + public void onComplete() { + flow.onComplete(); + } + + } + + /** + * Wraps a Flow Processor and forwards methods of the Reactive Streams Processor to it. + * @param the input type + * @param the output type + */ + static final class ReactiveToFlowProcessor + implements org.reactivestreams.Processor { + final Flow.Processor flow; + + public ReactiveToFlowProcessor(Flow.Processor flow) { + this.flow = flow; + } + + @Override + public void onSubscribe(org.reactivestreams.Subscription s) { + flow.onSubscribe(new FlowToReactiveSubscription(s)); + } + + @Override + public void onNext(T t) { + flow.onNext(t); + } + + @Override + public void onError(Throwable t) { + flow.onError(t); + } + + @Override + public void onComplete() { + flow.onComplete(); + } + + @Override + public void subscribe(org.reactivestreams.Subscriber s) { + if (s == null) { + flow.subscribe(null); + return; + } + flow.subscribe(new FlowToReactiveSubscriber(s)); + } + } + + /** + * Wraps a Reactive Streams Processor and forwards methods of the Flow Processor to it. + * @param the input type + * @param the output type + */ + static final class FlowToReactiveProcessor + implements Flow.Processor { + final org.reactivestreams.Processor reactiveStreams; + + public FlowToReactiveProcessor(org.reactivestreams.Processor reactive) { + this.reactiveStreams = reactive; + } + + @Override + public void onSubscribe(Flow.Subscription s) { + reactiveStreams.onSubscribe(new ReactiveToFlowSubscription(s)); + } + + @Override + public void onNext(T t) { + reactiveStreams.onNext(t); + } + + @Override + public void onError(Throwable t) { + reactiveStreams.onError(t); + } + + @Override + public void onComplete() { + reactiveStreams.onComplete(); + } + + @Override + public void subscribe(Flow.Subscriber s) { + if (s == null) { + reactiveStreams.subscribe(null); + return; + } + reactiveStreams.subscribe(new ReactiveToFlowSubscriber(s)); + } + } + + /** + * Reactive Streams Publisher that wraps a Flow Publisher. + * @param the element type + */ + static final class ReactivePublisherFromFlow implements org.reactivestreams.Publisher { + + final Flow.Publisher flow; + + public ReactivePublisherFromFlow(Flow.Publisher flowPublisher) { + this.flow = flowPublisher; + } + + @Override + public void subscribe(org.reactivestreams.Subscriber reactive) { + if (reactive == null) { + flow.subscribe(null); + return; + } + flow.subscribe(new FlowToReactiveSubscriber(reactive)); + } + } + + /** + * Flow Publisher that wraps a Reactive Streams Publisher. + * @param the element type + */ + static final class FlowPublisherFromReactive implements Flow.Publisher { + + final org.reactivestreams.Publisher reactiveStreams; + + public FlowPublisherFromReactive(org.reactivestreams.Publisher reactivePublisher) { + this.reactiveStreams = reactivePublisher; + } + + @Override + public void subscribe(Flow.Subscriber flow) { + if (flow == null) { + reactiveStreams.subscribe(null); + return; + } + reactiveStreams.subscribe(new ReactiveToFlowSubscriber(flow)); + } + } + +} \ No newline at end of file diff --git a/flow-bridge/src/test/java/org/reactivestreams/MulticastPublisher.java b/flow-bridge/src/test/java/org/reactivestreams/MulticastPublisher.java new file mode 100644 index 00000000..31f00543 --- /dev/null +++ b/flow-bridge/src/test/java/org/reactivestreams/MulticastPublisher.java @@ -0,0 +1,358 @@ +/************************************************************************ + * Licensed under Public Domain (CC0) * + * * + * To the extent possible under law, the person who associated CC0 with * + * this code has waived all copyright and related or neighboring * + * rights to this code. * + * * + * You should have received a copy of the CC0 legalcode along with this * + * work. If not, see .* + ************************************************************************/ + +package org.reactivestreams; + +import java.lang.invoke.MethodHandles; +import java.lang.invoke.VarHandle; +import java.util.Objects; +import java.util.concurrent.Executor; +import java.util.concurrent.Flow; +import java.util.concurrent.Flow.*; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.atomic.*; + +final class MulticastPublisher implements Publisher, AutoCloseable { + + final Executor executor; + final int bufferSize; + + final AtomicBoolean done = new AtomicBoolean(); + Throwable error; + + static final InnerSubscription[] EMPTY = new InnerSubscription[0]; + static final InnerSubscription[] TERMINATED = new InnerSubscription[0]; + + + final AtomicReference[]> subscribers = new AtomicReference[]>(); + + public MulticastPublisher() { + this(ForkJoinPool.commonPool(), Flow.defaultBufferSize()); + } + + @SuppressWarnings("unchecked") + public MulticastPublisher(Executor executor, int bufferSize) { + if ((bufferSize & (bufferSize - 1)) != 0) { + throw new IllegalArgumentException("Please provide a power-of-two buffer size"); + } + this.executor = executor; + this.bufferSize = bufferSize; + subscribers.setRelease(EMPTY); + } + + public boolean offer(T item) { + Objects.requireNonNull(item, "item is null"); + + InnerSubscription[] a = subscribers.get(); + synchronized (this) { + for (InnerSubscription inner : a) { + if (inner.isFull()) { + return false; + } + } + for (InnerSubscription inner : a) { + inner.offer(item); + inner.drain(executor); + } + } + + return true; + } + + @SuppressWarnings("unchecked") + public void complete() { + if (done.compareAndSet(false, true)) { + for (InnerSubscription inner : subscribers.getAndSet(TERMINATED)) { + inner.done = true; + inner.drain(executor); + } + } + } + + @SuppressWarnings("unchecked") + public void completeExceptionally(Throwable error) { + if (done.compareAndSet(false, true)) { + this.error = error; + for (InnerSubscription inner : subscribers.getAndSet(TERMINATED)) { + inner.error = error; + inner.done = true; + inner.drain(executor); + } + } + } + + @Override + public void close() { + complete(); + } + + @Override + public void subscribe(Subscriber subscriber) { + Objects.requireNonNull(subscriber, "subscriber is null"); + InnerSubscription inner = new InnerSubscription(subscriber, bufferSize, this); + if (!add(inner)) { + Throwable ex = error; + if (ex != null) { + inner.error = ex; + } + inner.done = true; + } + inner.drain(executor); + } + + public boolean hasSubscribers() { + return subscribers.get().length != 0; + } + + boolean add(InnerSubscription inner) { + + for (;;) { + InnerSubscription[] a = subscribers.get(); + if (a == TERMINATED) { + return false; + } + + int n = a.length; + @SuppressWarnings("unchecked") + InnerSubscription[] b = new InnerSubscription[n + 1]; + System.arraycopy(a, 0, b, 0, n); + b[n] = inner; + if (subscribers.compareAndSet(a, b)) { + return true; + } + } + } + + @SuppressWarnings("unchecked") + void remove(InnerSubscription inner) { + for (;;) { + InnerSubscription[] a = subscribers.get(); + int n = a.length; + if (n == 0) { + break; + } + + int j = -1; + for (int i = 0; i < n; i++) { + if (a[i] == inner) { + j = i; + break; + } + } + if (j < 0) { + break; + } + InnerSubscription[] b; + if (n == 1) { + b = EMPTY; + } else { + b = new InnerSubscription[n - 1]; + System.arraycopy(a, 0, b, 0, j); + System.arraycopy(a, j + 1, b, j, n - j - 1); + } + if (subscribers.compareAndSet(a, b)) { + break; + } + } + } + + static final class InnerSubscription implements Subscription, Runnable { + + final Subscriber actual; + final MulticastPublisher parent; + final AtomicReferenceArray queue; + final int mask; + + volatile boolean badRequest; + final AtomicBoolean cancelled = new AtomicBoolean(); + + volatile boolean done; + Throwable error; + + boolean subscribed; + long emitted; + + final AtomicLong requested = new AtomicLong(); + + final AtomicInteger wip = new AtomicInteger(); + + final AtomicLong producerIndex = new AtomicLong(); + + final AtomicLong consumerIndex = new AtomicLong(); + + InnerSubscription(Subscriber actual, int bufferSize, MulticastPublisher parent) { + this.actual = actual; + this.queue = new AtomicReferenceArray(bufferSize); + this.parent = parent; + this.mask = bufferSize - 1; + } + + void offer(T item) { + AtomicReferenceArray q = queue; + int m = mask; + long pi = producerIndex.get(); + int offset = (int)(pi) & m; + + q.setRelease(offset, item); + producerIndex.setRelease(pi + 1); + } + + T poll() { + AtomicReferenceArray q = queue; + int m = mask; + long ci = consumerIndex.get(); + + int offset = (int)(ci) & m; + T o = q.getAcquire(offset); + if (o != null) { + q.setRelease(offset, null); + consumerIndex.setRelease(ci + 1); + } + return o; + } + + boolean isFull() { + return 1 + mask + consumerIndex.get() == producerIndex.get(); + } + + void drain(Executor executor) { + if (wip.getAndAdd(1) == 0) { + executor.execute(this); + } + } + + @Override + public void request(long n) { + if (n <= 0L) { + badRequest = true; + done = true; + } else { + for (;;) { + long r = requested.get(); + long u = r + n; + if (u < 0) { + u = Long.MAX_VALUE; + } + if (requested.compareAndSet(r, u)) { + break; + } + } + } + drain(parent.executor); + } + + @Override + public void cancel() { + if (cancelled.compareAndSet(false, true)) { + parent.remove(this); + } + } + + void clear() { + error = null; + while (poll() != null) ; + } + + @Override + public void run() { + int missed = 1; + Subscriber a = actual; + + outer: + for (;;) { + + if (subscribed) { + if (cancelled.get()) { + clear(); + } else { + long r = requested.get(); + long e = emitted; + + while (e != r) { + if (cancelled.get()) { + continue outer; + } + + boolean d = done; + + if (d) { + Throwable ex = error; + if (ex != null) { + cancelled.setRelease(true); + a.onError(ex); + continue outer; + } + if (badRequest) { + cancelled.setRelease(true); + parent.remove(this); + a.onError(new IllegalArgumentException("§3.9 violated: request was not positive")); + continue outer; + } + } + + T v = poll(); + boolean empty = v == null; + + if (d && empty) { + cancelled.setRelease(true); + a.onComplete(); + break; + } + + if (empty) { + break; + } + + a.onNext(v); + + e++; + } + + if (e == r) { + if (cancelled.get()) { + continue outer; + } + if (done) { + Throwable ex = error; + if (ex != null) { + cancelled.setRelease(true); + a.onError(ex); + } else + if (badRequest) { + cancelled.setRelease(true); + a.onError(new IllegalArgumentException("§3.9 violated: request was not positive")); + } else + if (producerIndex == consumerIndex) { + cancelled.setRelease(true); + a.onComplete(); + } + } + } + + emitted = e; + } + } else { + subscribed = true; + a.onSubscribe(this); + } + + int w = wip.get(); + if (missed == w) { + w = wip.getAndAdd(-missed); + if (missed == w) { + break; + } + } + missed = w; + } + } + } +} \ No newline at end of file diff --git a/flow-bridge/src/test/java/org/reactivestreams/ReactiveStreamsFlowBridgeTest.java b/flow-bridge/src/test/java/org/reactivestreams/ReactiveStreamsFlowBridgeTest.java new file mode 100644 index 00000000..e1c76087 --- /dev/null +++ b/flow-bridge/src/test/java/org/reactivestreams/ReactiveStreamsFlowBridgeTest.java @@ -0,0 +1,113 @@ +/************************************************************************ + * Licensed under Public Domain (CC0) * + * * + * To the extent possible under law, the person who associated CC0 with * + * this code has waived all copyright and related or neighboring * + * rights to this code. * + * * + * You should have received a copy of the CC0 legalcode along with this * + * work. If not, see .* + ************************************************************************/ + +package org.reactivestreams; + +import org.testng.annotations.Test; + +import java.io.IOException; +import java.util.concurrent.Executor; +import java.util.concurrent.Flow; +import java.util.concurrent.SubmissionPublisher; + +public class ReactiveStreamsFlowBridgeTest { + @Test + public void reactiveToFlowNormal() { + MulticastPublisher p = new MulticastPublisher(new Executor() { + @Override + public void execute(Runnable command) { + command.run(); + } + }, Flow.defaultBufferSize()); + + TestEitherConsumer tc = new TestEitherConsumer(); + + ReactiveStreamsFlowBridge.toFlow(p).subscribe(tc); + + p.offer(1); + p.offer(2); + p.offer(3); + p.offer(4); + p.offer(5); + p.complete(); + + tc.assertRange(1, 5); + } + + @Test + public void reactiveToFlowError() { + MulticastPublisher p = new MulticastPublisher(new Executor() { + @Override + public void execute(Runnable command) { + command.run(); + } + }, Flow.defaultBufferSize()); + + TestEitherConsumer tc = new TestEitherConsumer(); + + ReactiveStreamsFlowBridge.toFlow(p).subscribe(tc); + + p.offer(1); + p.offer(2); + p.offer(3); + p.offer(4); + p.offer(5); + p.completeExceptionally(new IOException()); + + tc.assertFailure(IOException.class, 1, 2, 3, 4, 5); + } + + @Test + public void flowToReactiveNormal() { + SubmissionPublisher p = new SubmissionPublisher(new Executor() { + @Override + public void execute(Runnable command) { + command.run(); + } + }, Flow.defaultBufferSize()); + + TestEitherConsumer tc = new TestEitherConsumer(); + + ReactiveStreamsFlowBridge.toReactiveStreams(p).subscribe(tc); + + p.submit(1); + p.submit(2); + p.submit(3); + p.submit(4); + p.submit(5); + p.close(); + + tc.assertRange(1, 5); + } + + @Test + public void flowToReactiveError() { + SubmissionPublisher p = new SubmissionPublisher(new Executor() { + @Override + public void execute(Runnable command) { + command.run(); + } + }, Flow.defaultBufferSize()); + + TestEitherConsumer tc = new TestEitherConsumer(); + + ReactiveStreamsFlowBridge.toReactiveStreams(p).subscribe(tc); + + p.submit(1); + p.submit(2); + p.submit(3); + p.submit(4); + p.submit(5); + p.closeExceptionally(new IOException()); + + tc.assertFailure(IOException.class, 1, 2, 3, 4, 5); + } +} diff --git a/flow-bridge/src/test/java/org/reactivestreams/SubmissionPublisherTckTest.java b/flow-bridge/src/test/java/org/reactivestreams/SubmissionPublisherTckTest.java new file mode 100644 index 00000000..cc342924 --- /dev/null +++ b/flow-bridge/src/test/java/org/reactivestreams/SubmissionPublisherTckTest.java @@ -0,0 +1,58 @@ +/************************************************************************ + * Licensed under Public Domain (CC0) * + * * + * To the extent possible under law, the person who associated CC0 with * + * this code has waived all copyright and related or neighboring * + * rights to this code. * + * * + * You should have received a copy of the CC0 legalcode along with this * + * work. If not, see .* + ************************************************************************/ + +package org.reactivestreams; + +import org.reactivestreams.tck.PublisherVerification; +import org.reactivestreams.tck.TestEnvironment; +import org.testng.annotations.Test; + +import java.io.IOException; +import java.util.concurrent.SubmissionPublisher; + +@Test +public class SubmissionPublisherTckTest extends PublisherVerification { + + public SubmissionPublisherTckTest() { + super(new TestEnvironment(300)); + } + + @Override + public Publisher createPublisher(final long elements) { + final SubmissionPublisher sp = new SubmissionPublisher(); + new Thread(new Runnable() { + @Override + public void run() { + while (!sp.hasSubscribers()) { + Thread.yield(); + } + for (int i = 0; i < elements; i++) { + sp.submit(i); + } + sp.close(); + } + }).start(); + return ReactiveStreamsFlowBridge.toReactiveStreams(sp); + } + + @Override + public Publisher createFailedPublisher() { + final SubmissionPublisher sp = new SubmissionPublisher(); + sp.closeExceptionally(new IOException()); + return ReactiveStreamsFlowBridge.toReactiveStreams(sp); + } + + @Override + public long maxElementsFromPublisher() { + return 100; + } + +} \ No newline at end of file diff --git a/flow-bridge/src/test/java/org/reactivestreams/TestEitherConsumer.java b/flow-bridge/src/test/java/org/reactivestreams/TestEitherConsumer.java new file mode 100644 index 00000000..5b8d8e63 --- /dev/null +++ b/flow-bridge/src/test/java/org/reactivestreams/TestEitherConsumer.java @@ -0,0 +1,174 @@ +/************************************************************************ + * Licensed under Public Domain (CC0) * + * * + * To the extent possible under law, the person who associated CC0 with * + * this code has waived all copyright and related or neighboring * + * rights to this code. * + * * + * You should have received a copy of the CC0 legalcode along with this * + * work. If not, see .* + ************************************************************************/ + +package org.reactivestreams; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Flow; +import java.util.concurrent.TimeUnit; + +/** + * Class that provides basic state assertions on received elements and + * terminal signals from either a Reactive Streams Publisher or a + * Flow Publisher. + *

+ * As with standard {@link Subscriber}s, an instance of this class + * should be subscribed to at most one (Reactive Streams or + * Flow) Publisher. + *

+ * @param the element type + */ +class TestEitherConsumer implements Flow.Subscriber, Subscriber { + + protected final List values; + + protected final List errors; + + protected int completions; + + protected Flow.Subscription subscription; + + protected Subscription subscriptionRs; + + protected final CountDownLatch done; + + final long initialRequest; + + public TestEitherConsumer() { + this(Long.MAX_VALUE); + } + + public TestEitherConsumer(long initialRequest) { + this.values = new ArrayList(); + this.errors = new ArrayList(); + this.done = new CountDownLatch(1); + this.initialRequest = initialRequest; + } + + @Override + public final void onSubscribe(Flow.Subscription s) { + this.subscription = s; + s.request(initialRequest); + } + + @Override + public void onSubscribe(Subscription s) { + this.subscriptionRs = s; + s.request(initialRequest); + } + + @Override + public void onNext(T item) { + values.add(item); + if (subscription == null && subscriptionRs == null) { + errors.add(new IllegalStateException("onSubscribe not called")); + } + } + + @Override + public void onError(Throwable throwable) { + errors.add(throwable); + if (subscription == null && subscriptionRs == null) { + errors.add(new IllegalStateException("onSubscribe not called")); + } + done.countDown(); + } + + @Override + public void onComplete() { + completions++; + if (subscription == null && subscriptionRs == null) { + errors.add(new IllegalStateException("onSubscribe not called")); + } + done.countDown(); + } + + public final void cancel() { + // FIXME implement deferred cancellation + } + + public final List values() { + return values; + } + + public final List errors() { + return errors; + } + + public final int completions() { + return completions; + } + + public final boolean await(long timeout, TimeUnit unit) throws InterruptedException { + return done.await(timeout, unit); + } + + public final TestEitherConsumer assertResult(T... items) { + if (!values.equals(Arrays.asList(items))) { + throw new AssertionError("Expected: " + Arrays.toString(items) + ", Actual: " + values + ", Completions: " + completions); + } + if (completions != 1) { + throw new AssertionError("Not completed: " + completions); + } + return this; + } + + + public final TestEitherConsumer assertFailure(Class errorClass, T... items) { + if (!values.equals(Arrays.asList(items))) { + throw new AssertionError("Expected: " + Arrays.toString(items) + ", Actual: " + values + ", Completions: " + completions); + } + if (completions != 0) { + throw new AssertionError("Completed: " + completions); + } + if (errors.isEmpty()) { + throw new AssertionError("No errors"); + } + if (!errorClass.isInstance(errors.get(0))) { + AssertionError ae = new AssertionError("Wrong throwable"); + ae.initCause(errors.get(0)); + throw ae; + } + return this; + } + + public final TestEitherConsumer awaitDone(long timeout, TimeUnit unit) { + try { + if (!done.await(timeout, unit)) { + subscription.cancel(); + throw new RuntimeException("Timed out. Values: " + values.size() + + ", Errors: " + errors.size() + ", Completions: " + completions); + } + } catch (InterruptedException ex) { + throw new RuntimeException("Interrupted"); + } + return this; + } + + public final TestEitherConsumer assertRange(int start, int count) { + if (values.size() != count) { + throw new AssertionError("Expected: " + count + ", Actual: " + values.size()); + } + for (int i = 0; i < count; i++) { + if ((Integer)values.get(i) != start + i) { + throw new AssertionError("Index: " + i + ", Expected: " + + (i + start) + ", Actual: " +values.get(i)); + } + } + if (completions != 1) { + throw new AssertionError("Not completed: " + completions); + } + return this; + } +} \ No newline at end of file diff --git a/gradle/wrapper/gradle-wrapper.jar b/gradle/wrapper/gradle-wrapper.jar index 58385981..2c6137b8 100644 Binary files a/gradle/wrapper/gradle-wrapper.jar and b/gradle/wrapper/gradle-wrapper.jar differ diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index d0773a4d..aeb99ed9 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,6 +1,6 @@ -#Tue Aug 19 09:12:00 AST 2014 +#Wed Jul 12 20:56:16 CEST 2017 distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-2.12-bin.zip +distributionUrl=http\://services.gradle.org/distributions/gradle-4.0.1-bin.zip diff --git a/gradlew b/gradlew index 91a7e269..9d82f789 100755 --- a/gradlew +++ b/gradlew @@ -42,11 +42,6 @@ case "`uname`" in ;; esac -# For Cygwin, ensure paths are in UNIX format before anything is touched. -if $cygwin ; then - [ -n "$JAVA_HOME" ] && JAVA_HOME=`cygpath --unix "$JAVA_HOME"` -fi - # Attempt to set APP_HOME # Resolve links: $0 may be a link PRG="$0" @@ -61,9 +56,9 @@ while [ -h "$PRG" ] ; do fi done SAVED="`pwd`" -cd "`dirname \"$PRG\"`/" >&- +cd "`dirname \"$PRG\"`/" >/dev/null APP_HOME="`pwd -P`" -cd "$SAVED" >&- +cd "$SAVED" >/dev/null CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar @@ -114,6 +109,7 @@ fi if $cygwin ; then APP_HOME=`cygpath --path --mixed "$APP_HOME"` CLASSPATH=`cygpath --path --mixed "$CLASSPATH"` + JAVACMD=`cygpath --unix "$JAVACMD"` # We build the pattern for arguments to be converted via cygpath ROOTDIRSRAW=`find -L / -maxdepth 1 -mindepth 1 -type d 2>/dev/null` diff --git a/gradlew.bat b/gradlew.bat index 8a0b282a..5f192121 100755 --- a/gradlew.bat +++ b/gradlew.bat @@ -46,7 +46,7 @@ echo location of your Java installation. goto fail :init -@rem Get command-line arguments, handling Windowz variants +@rem Get command-line arguments, handling Windows variants if not "%OS%" == "Windows_NT" goto win9xME_args if "%@eval[2+2]" == "4" goto 4NT_args diff --git a/settings.gradle b/settings.gradle index 61cdbf6d..01529608 100644 --- a/settings.gradle +++ b/settings.gradle @@ -1,9 +1,27 @@ rootProject.name = 'reactive-streams' + +def jdkFlow = false; + +try { + Class.forName("java.util.concurrent.Flow"); + jdkFlow = true; + println("Java 9 Flow API found") +} catch (Throwable ex) { + // Flow API not available + println("Java 9 Flow API not available") +} + include ':reactive-streams' include ':reactive-streams-tck' include ':reactive-streams-examples' +if (jdkFlow) { + include ':reactive-streams-flow-bridge' +} + project(':reactive-streams').projectDir = "$rootDir/api" as File project(':reactive-streams-tck').projectDir = "$rootDir/tck" as File project(':reactive-streams-examples').projectDir = "$rootDir/examples" as File - +if (jdkFlow) { + project(':reactive-streams-flow-bridge').projectDir = "$rootDir/flow-bridge" as File +}