();
+ sp.closeExceptionally(new IOException());
+ return ReactiveStreamsFlowBridge.toReactiveStreams(sp);
+ }
+
+ @Override
+ public long maxElementsFromPublisher() {
+ return 100;
+ }
+
+}
\ No newline at end of file
diff --git a/flow-bridge/src/test/java/org/reactivestreams/TestEitherConsumer.java b/flow-bridge/src/test/java/org/reactivestreams/TestEitherConsumer.java
new file mode 100644
index 00000000..5b8d8e63
--- /dev/null
+++ b/flow-bridge/src/test/java/org/reactivestreams/TestEitherConsumer.java
@@ -0,0 +1,174 @@
+/************************************************************************
+ * Licensed under Public Domain (CC0) *
+ * *
+ * To the extent possible under law, the person who associated CC0 with *
+ * this code has waived all copyright and related or neighboring *
+ * rights to this code. *
+ * *
+ * You should have received a copy of the CC0 legalcode along with this *
+ * work. If not, see .*
+ ************************************************************************/
+
+package org.reactivestreams;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Flow;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Class that provides basic state assertions on received elements and
+ * terminal signals from either a Reactive Streams Publisher or a
+ * Flow Publisher.
+ *
+ * As with standard {@link Subscriber}s, an instance of this class
+ * should be subscribed to at most one (Reactive Streams or
+ * Flow) Publisher.
+ *
+ * @param the element type
+ */
+class TestEitherConsumer implements Flow.Subscriber, Subscriber {
+
+ protected final List values;
+
+ protected final List errors;
+
+ protected int completions;
+
+ protected Flow.Subscription subscription;
+
+ protected Subscription subscriptionRs;
+
+ protected final CountDownLatch done;
+
+ final long initialRequest;
+
+ public TestEitherConsumer() {
+ this(Long.MAX_VALUE);
+ }
+
+ public TestEitherConsumer(long initialRequest) {
+ this.values = new ArrayList();
+ this.errors = new ArrayList();
+ this.done = new CountDownLatch(1);
+ this.initialRequest = initialRequest;
+ }
+
+ @Override
+ public final void onSubscribe(Flow.Subscription s) {
+ this.subscription = s;
+ s.request(initialRequest);
+ }
+
+ @Override
+ public void onSubscribe(Subscription s) {
+ this.subscriptionRs = s;
+ s.request(initialRequest);
+ }
+
+ @Override
+ public void onNext(T item) {
+ values.add(item);
+ if (subscription == null && subscriptionRs == null) {
+ errors.add(new IllegalStateException("onSubscribe not called"));
+ }
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ errors.add(throwable);
+ if (subscription == null && subscriptionRs == null) {
+ errors.add(new IllegalStateException("onSubscribe not called"));
+ }
+ done.countDown();
+ }
+
+ @Override
+ public void onComplete() {
+ completions++;
+ if (subscription == null && subscriptionRs == null) {
+ errors.add(new IllegalStateException("onSubscribe not called"));
+ }
+ done.countDown();
+ }
+
+ public final void cancel() {
+ // FIXME implement deferred cancellation
+ }
+
+ public final List values() {
+ return values;
+ }
+
+ public final List errors() {
+ return errors;
+ }
+
+ public final int completions() {
+ return completions;
+ }
+
+ public final boolean await(long timeout, TimeUnit unit) throws InterruptedException {
+ return done.await(timeout, unit);
+ }
+
+ public final TestEitherConsumer assertResult(T... items) {
+ if (!values.equals(Arrays.asList(items))) {
+ throw new AssertionError("Expected: " + Arrays.toString(items) + ", Actual: " + values + ", Completions: " + completions);
+ }
+ if (completions != 1) {
+ throw new AssertionError("Not completed: " + completions);
+ }
+ return this;
+ }
+
+
+ public final TestEitherConsumer assertFailure(Class extends Throwable> errorClass, T... items) {
+ if (!values.equals(Arrays.asList(items))) {
+ throw new AssertionError("Expected: " + Arrays.toString(items) + ", Actual: " + values + ", Completions: " + completions);
+ }
+ if (completions != 0) {
+ throw new AssertionError("Completed: " + completions);
+ }
+ if (errors.isEmpty()) {
+ throw new AssertionError("No errors");
+ }
+ if (!errorClass.isInstance(errors.get(0))) {
+ AssertionError ae = new AssertionError("Wrong throwable");
+ ae.initCause(errors.get(0));
+ throw ae;
+ }
+ return this;
+ }
+
+ public final TestEitherConsumer awaitDone(long timeout, TimeUnit unit) {
+ try {
+ if (!done.await(timeout, unit)) {
+ subscription.cancel();
+ throw new RuntimeException("Timed out. Values: " + values.size()
+ + ", Errors: " + errors.size() + ", Completions: " + completions);
+ }
+ } catch (InterruptedException ex) {
+ throw new RuntimeException("Interrupted");
+ }
+ return this;
+ }
+
+ public final TestEitherConsumer assertRange(int start, int count) {
+ if (values.size() != count) {
+ throw new AssertionError("Expected: " + count + ", Actual: " + values.size());
+ }
+ for (int i = 0; i < count; i++) {
+ if ((Integer)values.get(i) != start + i) {
+ throw new AssertionError("Index: " + i + ", Expected: "
+ + (i + start) + ", Actual: " +values.get(i));
+ }
+ }
+ if (completions != 1) {
+ throw new AssertionError("Not completed: " + completions);
+ }
+ return this;
+ }
+}
\ No newline at end of file
diff --git a/gradle/wrapper/gradle-wrapper.jar b/gradle/wrapper/gradle-wrapper.jar
index 58385981..2c6137b8 100644
Binary files a/gradle/wrapper/gradle-wrapper.jar and b/gradle/wrapper/gradle-wrapper.jar differ
diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties
index d0773a4d..aeb99ed9 100644
--- a/gradle/wrapper/gradle-wrapper.properties
+++ b/gradle/wrapper/gradle-wrapper.properties
@@ -1,6 +1,6 @@
-#Tue Aug 19 09:12:00 AST 2014
+#Wed Jul 12 20:56:16 CEST 2017
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
-distributionUrl=https\://services.gradle.org/distributions/gradle-2.12-bin.zip
+distributionUrl=http\://services.gradle.org/distributions/gradle-4.0.1-bin.zip
diff --git a/gradlew b/gradlew
index 91a7e269..9d82f789 100755
--- a/gradlew
+++ b/gradlew
@@ -42,11 +42,6 @@ case "`uname`" in
;;
esac
-# For Cygwin, ensure paths are in UNIX format before anything is touched.
-if $cygwin ; then
- [ -n "$JAVA_HOME" ] && JAVA_HOME=`cygpath --unix "$JAVA_HOME"`
-fi
-
# Attempt to set APP_HOME
# Resolve links: $0 may be a link
PRG="$0"
@@ -61,9 +56,9 @@ while [ -h "$PRG" ] ; do
fi
done
SAVED="`pwd`"
-cd "`dirname \"$PRG\"`/" >&-
+cd "`dirname \"$PRG\"`/" >/dev/null
APP_HOME="`pwd -P`"
-cd "$SAVED" >&-
+cd "$SAVED" >/dev/null
CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar
@@ -114,6 +109,7 @@ fi
if $cygwin ; then
APP_HOME=`cygpath --path --mixed "$APP_HOME"`
CLASSPATH=`cygpath --path --mixed "$CLASSPATH"`
+ JAVACMD=`cygpath --unix "$JAVACMD"`
# We build the pattern for arguments to be converted via cygpath
ROOTDIRSRAW=`find -L / -maxdepth 1 -mindepth 1 -type d 2>/dev/null`
diff --git a/gradlew.bat b/gradlew.bat
index 8a0b282a..5f192121 100755
--- a/gradlew.bat
+++ b/gradlew.bat
@@ -46,7 +46,7 @@ echo location of your Java installation.
goto fail
:init
-@rem Get command-line arguments, handling Windowz variants
+@rem Get command-line arguments, handling Windows variants
if not "%OS%" == "Windows_NT" goto win9xME_args
if "%@eval[2+2]" == "4" goto 4NT_args
diff --git a/settings.gradle b/settings.gradle
index 61cdbf6d..01529608 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -1,9 +1,27 @@
rootProject.name = 'reactive-streams'
+
+def jdkFlow = false;
+
+try {
+ Class.forName("java.util.concurrent.Flow");
+ jdkFlow = true;
+ println("Java 9 Flow API found")
+} catch (Throwable ex) {
+ // Flow API not available
+ println("Java 9 Flow API not available")
+}
+
include ':reactive-streams'
include ':reactive-streams-tck'
include ':reactive-streams-examples'
+if (jdkFlow) {
+ include ':reactive-streams-flow-bridge'
+}
+
project(':reactive-streams').projectDir = "$rootDir/api" as File
project(':reactive-streams-tck').projectDir = "$rootDir/tck" as File
project(':reactive-streams-examples').projectDir = "$rootDir/examples" as File
-
+if (jdkFlow) {
+ project(':reactive-streams-flow-bridge').projectDir = "$rootDir/flow-bridge" as File
+}