Skip to content

Implementing renaming proposal for the interop with j.u.c.Flow #402

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Nov 8, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ subprojects {
"reactive-streams-tck",
"reactive-streams-tck-flow",
"reactive-streams-examples",
"reactive-streams-flow-bridge"]) {
"reactive-streams-flow-adapters"]) {
apply plugin: "maven"
apply plugin: "signing"

Expand Down
File renamed without changes.
2 changes: 1 addition & 1 deletion flow-bridge/build.gradle → flow-adapters/build.gradle
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
description = 'reactive-streams-flow-bridge'
description = 'reactive-streams-flow-adapters'

dependencies {
compile project(':reactive-streams')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@
/**
* Bridge between Reactive Streams API and the Java 9 {@link java.util.concurrent.Flow} API.
*/
public final class ReactiveStreamsFlowBridge {
public final class FlowAdapters {
/** Utility class. */
private ReactiveStreamsFlowBridge() {
private FlowAdapters() {
throw new IllegalStateException("No instances!");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;

public class ReactiveStreamsFlowBridgeTest {
public class FlowAdaptersTest {
@Test
public void reactiveToFlowNormal() {
MulticastPublisher<Integer> p = new MulticastPublisher<Integer>(new Executor() {
Expand All @@ -31,7 +31,7 @@ public void execute(Runnable command) {

TestEitherConsumer<Integer> tc = new TestEitherConsumer<Integer>();

ReactiveStreamsFlowBridge.toFlowPublisher(p).subscribe(tc);
FlowAdapters.toFlowPublisher(p).subscribe(tc);

p.offer(1);
p.offer(2);
Expand All @@ -54,7 +54,7 @@ public void execute(Runnable command) {

TestEitherConsumer<Integer> tc = new TestEitherConsumer<Integer>();

ReactiveStreamsFlowBridge.toFlowPublisher(p).subscribe(tc);
FlowAdapters.toFlowPublisher(p).subscribe(tc);

p.offer(1);
p.offer(2);
Expand All @@ -77,7 +77,7 @@ public void execute(Runnable command) {

TestEitherConsumer<Integer> tc = new TestEitherConsumer<Integer>();

ReactiveStreamsFlowBridge.toPublisher(p).subscribe(tc);
FlowAdapters.toPublisher(p).subscribe(tc);

p.submit(1);
p.submit(2);
Expand All @@ -100,7 +100,7 @@ public void execute(Runnable command) {

TestEitherConsumer<Integer> tc = new TestEitherConsumer<Integer>();

ReactiveStreamsFlowBridge.toPublisher(p).subscribe(tc);
FlowAdapters.toPublisher(p).subscribe(tc);

p.submit(1);
p.submit(2);
Expand All @@ -116,7 +116,7 @@ public void execute(Runnable command) {
public void reactiveStreamsToFlowSubscriber() {
TestEitherConsumer<Integer> tc = new TestEitherConsumer<Integer>();

Flow.Subscriber<Integer> fs = ReactiveStreamsFlowBridge.toFlowSubscriber(tc);
Flow.Subscriber<Integer> fs = FlowAdapters.toFlowSubscriber(tc);

final Object[] state = { null, null };

Expand Down Expand Up @@ -148,7 +148,7 @@ public void cancel() {
public void flowToReactiveStreamsSubscriber() {
TestEitherConsumer<Integer> tc = new TestEitherConsumer<Integer>();

org.reactivestreams.Subscriber<Integer> fs = ReactiveStreamsFlowBridge.toSubscriber(tc);
org.reactivestreams.Subscriber<Integer> fs = FlowAdapters.toSubscriber(tc);

final Object[] state = { null, null };

Expand Down Expand Up @@ -192,8 +192,8 @@ public void stableConversionForSubscriber() {
@Override public void onComplete() {};
};

Assert.assertSame(ReactiveStreamsFlowBridge.toSubscriber(ReactiveStreamsFlowBridge.toFlowSubscriber(rsSub)), rsSub);
Assert.assertSame(ReactiveStreamsFlowBridge.toFlowSubscriber(ReactiveStreamsFlowBridge.toSubscriber(fSub)), fSub);
Assert.assertSame(FlowAdapters.toSubscriber(FlowAdapters.toFlowSubscriber(rsSub)), rsSub);
Assert.assertSame(FlowAdapters.toFlowSubscriber(FlowAdapters.toSubscriber(fSub)), fSub);
}

@Test
Expand All @@ -214,8 +214,8 @@ public void stableConversionForProcessor() {
@Override public void subscribe(Flow.Subscriber s) {};
};

Assert.assertSame(ReactiveStreamsFlowBridge.toProcessor(ReactiveStreamsFlowBridge.toFlowProcessor(rsPro)), rsPro);
Assert.assertSame(ReactiveStreamsFlowBridge.toFlowProcessor(ReactiveStreamsFlowBridge.toProcessor(fPro)), fPro);
Assert.assertSame(FlowAdapters.toProcessor(FlowAdapters.toFlowProcessor(rsPro)), rsPro);
Assert.assertSame(FlowAdapters.toFlowProcessor(FlowAdapters.toProcessor(fPro)), fPro);
}

@Test
Expand All @@ -228,7 +228,7 @@ public void stableConversionForPublisher() {
@Override public void subscribe(Flow.Subscriber s) {};
};

Assert.assertSame(ReactiveStreamsFlowBridge.toPublisher(ReactiveStreamsFlowBridge.toFlowPublisher(rsPub)), rsPub);
Assert.assertSame(ReactiveStreamsFlowBridge.toFlowPublisher(ReactiveStreamsFlowBridge.toPublisher(fPub)), fPub);
Assert.assertSame(FlowAdapters.toPublisher(FlowAdapters.toFlowPublisher(rsPub)), rsPub);
Assert.assertSame(FlowAdapters.toFlowPublisher(FlowAdapters.toPublisher(fPub)), fPub);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,14 @@ public void run() {
sp.close();
}
}).start();
return ReactiveStreamsFlowBridge.toPublisher(sp);
return FlowAdapters.toPublisher(sp);
}

@Override
public Publisher<Integer> createFailedPublisher() {
final SubmissionPublisher<Integer> sp = new SubmissionPublisher<Integer>();
sp.closeExceptionally(new IOException());
return ReactiveStreamsFlowBridge.toPublisher(sp);
return FlowAdapters.toPublisher(sp);
}

@Override
Expand Down
14 changes: 7 additions & 7 deletions settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -11,29 +11,29 @@ try {
Class.forName("java.util.concurrent.Flow")
jdkFlow = true
println(ANSI_GREEN + " INFO: ------------------ JDK9 classes detected ---------------------------------" + ANSI_RESET)
println(ANSI_GREEN + " INFO: Java 9 Flow API found; Including [flow-bridge, tck-flow] in build. " + ANSI_RESET)
println(ANSI_GREEN + " INFO: Java 9 Flow API found; Including [flow-adapters, tck-flow] in build. " + ANSI_RESET)
println(ANSI_GREEN + " INFO: --------------------------------------------------------------------------" + ANSI_RESET)
} catch (Throwable ex) {
// Flow API not available
println(ANSI_RED + "WARNING: ------------------ JDK9 classes NOT detected -----------------------------" + ANSI_RESET)
println(ANSI_RED + "WARNING: Java 9 Flow API not found; Not including [flow-bridge, tck-flow] in build." + ANSI_RESET)
println(ANSI_RED + "WARNING: In order to execute the complete test-suite run the build using JDK9+. " + ANSI_RESET)
println(ANSI_RED + "WARNING: --------------------------------------------------------------------------" + ANSI_RESET)
println(ANSI_RED + "WARNING: -------------------- JDK9 classes NOT detected -----------------------------" + ANSI_RESET)
println(ANSI_RED + "WARNING: Java 9 Flow API not found; Not including [flow-adapters, tck-flow] in build." + ANSI_RESET)
println(ANSI_RED + "WARNING: In order to execute the complete test-suite run the build using JDK9+. " + ANSI_RESET)
println(ANSI_RED + "WARNING: ----------------------------------------------------------------------------" + ANSI_RESET)
}

include ':reactive-streams'
include ':reactive-streams-tck'
include ':reactive-streams-examples'

if (jdkFlow) {
include ':reactive-streams-flow-bridge'
include ':reactive-streams-flow-adapters'
include ':reactive-streams-tck-flow'
}

project(':reactive-streams').projectDir = "$rootDir/api" as File
project(':reactive-streams-tck').projectDir = "$rootDir/tck" as File
project(':reactive-streams-examples').projectDir = "$rootDir/examples" as File
if (jdkFlow) {
project(':reactive-streams-flow-bridge').projectDir = "$rootDir/flow-bridge" as File
project(':reactive-streams-flow-adapters').projectDir = "$rootDir/flow-adapters" as File
project(':reactive-streams-tck-flow').projectDir = "$rootDir/tck-flow" as File
}
2 changes: 1 addition & 1 deletion tck-flow/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@ description = 'reactive-streams-tck-flow'
dependencies {
compile group: 'org.testng', name: 'testng', version:'5.14.10'
compile project(':reactive-streams-tck')
compile project(':reactive-streams-flow-bridge')
compile project(':reactive-streams-flow-adapters')
}
test.useTestNG()
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
package org.reactivestreams.tck.flow;

import org.reactivestreams.Publisher;
import org.reactivestreams.ReactiveStreamsFlowBridge;
import org.reactivestreams.FlowAdapters;
import org.reactivestreams.tck.PublisherVerification;
import org.reactivestreams.tck.TestEnvironment;

Expand All @@ -36,7 +36,7 @@ public FlowPublisherVerification(TestEnvironment env) {
@Override
final public Publisher<T> createPublisher(long elements) {
final Flow.Publisher<T> flowPublisher = createFlowPublisher(elements);
return ReactiveStreamsFlowBridge.toPublisher(flowPublisher);
return FlowAdapters.toPublisher(flowPublisher);
}
/**
* This is the main method you must implement in your test incarnation.
Expand All @@ -49,7 +49,7 @@ final public Publisher<T> createPublisher(long elements) {
final public Publisher<T> createFailedPublisher() {
final Flow.Publisher<T> failed = createFailedFlowPublisher();
if (failed == null) return null; // because `null` means "SKIP" in createFailedPublisher
else return ReactiveStreamsFlowBridge.toPublisher(failed);
else return FlowAdapters.toPublisher(failed);
}
/**
* By implementing this method, additional TCK tests concerning a "failed" publishers will be run.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

package org.reactivestreams.tck.flow;

import org.reactivestreams.ReactiveStreamsFlowBridge;
import org.reactivestreams.FlowAdapters;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.reactivestreams.tck.SubscriberBlackboxVerification;
Expand Down Expand Up @@ -40,7 +40,7 @@ protected FlowSubscriberBlackboxVerification(TestEnvironment env) {

@Override
public final void triggerRequest(Subscriber<? super T> subscriber) {
triggerFlowRequest(ReactiveStreamsFlowBridge.toFlowSubscriber(subscriber));
triggerFlowRequest(FlowAdapters.toFlowSubscriber(subscriber));
}
/**
* Override this method if the {@link java.util.concurrent.Flow.Subscriber} implementation you are verifying
Expand All @@ -54,7 +54,7 @@ public void triggerFlowRequest(Flow.Subscriber<? super T> subscriber) {

@Override
public final Subscriber<T> createSubscriber() {
return ReactiveStreamsFlowBridge.<T>toSubscriber(createFlowSubscriber());
return FlowAdapters.<T>toSubscriber(createFlowSubscriber());
}
/**
* This is the main method you must implement in your test incarnation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

package org.reactivestreams.tck.flow;

import org.reactivestreams.ReactiveStreamsFlowBridge;
import org.reactivestreams.FlowAdapters;
import org.reactivestreams.Subscriber;
import org.reactivestreams.tck.SubscriberWhiteboxVerification;
import org.reactivestreams.tck.TestEnvironment;
Expand All @@ -35,7 +35,7 @@ protected FlowSubscriberWhiteboxVerification(TestEnvironment env) {

@Override
final public Subscriber<T> createSubscriber(WhiteboxSubscriberProbe<T> probe) {
return ReactiveStreamsFlowBridge.toSubscriber(createFlowSubscriber(probe));
return FlowAdapters.toSubscriber(createFlowSubscriber(probe));
}
/**
* This is the main method you must implement in your test incarnation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

package org.reactivestreams.tck.flow;

import org.reactivestreams.ReactiveStreamsFlowBridge;
import org.reactivestreams.FlowAdapters;
import org.reactivestreams.example.unicast.AsyncIterablePublisher;
import java.util.concurrent.Flow.Publisher;

Expand Down Expand Up @@ -41,7 +41,7 @@ public EmptyLazyFlowPublisherTest() {

@Override
public Publisher<Integer> createFlowPublisher(long elements) {
return ReactiveStreamsFlowBridge.toFlowPublisher(
return FlowAdapters.toFlowPublisher(
new AsyncIterablePublisher<Integer>(Collections.<Integer>emptyList(), ex)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import java.util.concurrent.Flow;
import java.util.concurrent.Flow.Publisher;

import org.reactivestreams.ReactiveStreamsFlowBridge;
import org.reactivestreams.FlowAdapters;
import org.reactivestreams.example.unicast.AsyncIterablePublisher;
import org.reactivestreams.tck.TestEnvironment;
import org.testng.annotations.AfterClass;
Expand Down Expand Up @@ -42,7 +42,7 @@ public SingleElementFlowPublisherTest() {

@Override
public Flow.Publisher<Integer> createFlowPublisher(long elements) {
return ReactiveStreamsFlowBridge.toFlowPublisher(new AsyncIterablePublisher<Integer>(Collections.singleton(1), ex));
return FlowAdapters.toFlowPublisher(new AsyncIterablePublisher<Integer>(Collections.singleton(1), ex));
}

@Override
Expand Down