Skip to content

Commit a1f6475

Browse files
authored
Merge pull request #2 from reactive-streams/wip-jdk9-flow-√
Renames the converters to "toX" for RS and "toFlowX" for Flow.
2 parents db96849 + e35584b commit a1f6475

File tree

9 files changed

+79
-23
lines changed

9 files changed

+79
-23
lines changed

flow-bridge/src/main/java/org/reactivestreams/ReactiveStreamsFlowBridge.java

+7-7
Original file line numberDiff line numberDiff line change
@@ -29,15 +29,15 @@ private ReactiveStreamsFlowBridge() {
2929
* @return the equivalent Reactive Streams Publisher
3030
*/
3131
@SuppressWarnings("unchecked")
32-
public static <T> org.reactivestreams.Publisher<T> toReactiveStreams(
32+
public static <T> org.reactivestreams.Publisher<T> toPublisher(
3333
Flow.Publisher<? extends T> flowPublisher) {
3434
if (flowPublisher == null) {
3535
throw new NullPointerException("flowPublisher");
3636
}
3737
if (flowPublisher instanceof FlowPublisherFromReactive) {
3838
return (org.reactivestreams.Publisher<T>)(((FlowPublisherFromReactive<T>)flowPublisher).reactiveStreams);
39-
if (flowPublisher instanceof org.reactivestreams.Publisher) {
4039
}
40+
if (flowPublisher instanceof org.reactivestreams.Publisher) {
4141
return (org.reactivestreams.Publisher<T>)flowPublisher;
4242
}
4343
return new ReactivePublisherFromFlow<T>(flowPublisher);
@@ -50,7 +50,7 @@ public static <T> org.reactivestreams.Publisher<T> toReactiveStreams(
5050
* @return the equivalent Flow Publisher
5151
*/
5252
@SuppressWarnings("unchecked")
53-
public static <T> Flow.Publisher<T> toFlow(
53+
public static <T> Flow.Publisher<T> toFlowPublisher(
5454
org.reactivestreams.Publisher<? extends T> reactiveStreamsPublisher
5555
) {
5656
if (reactiveStreamsPublisher == null) {
@@ -73,7 +73,7 @@ public static <T> Flow.Publisher<T> toFlow(
7373
* @return the equivalent Reactive Streams Processor
7474
*/
7575
@SuppressWarnings("unchecked")
76-
public static <T, U> org.reactivestreams.Processor<T, U> toReactiveStreams(
76+
public static <T, U> org.reactivestreams.Processor<T, U> toProcessor(
7777
Flow.Processor<? super T, ? extends U> flowProcessor
7878
) {
7979
if (flowProcessor == null) {
@@ -96,7 +96,7 @@ public static <T, U> org.reactivestreams.Processor<T, U> toReactiveStreams(
9696
* @return the equivalent Flow Processor
9797
*/
9898
@SuppressWarnings("unchecked")
99-
public static <T, U> Flow.Processor<T, U> toFlow(
99+
public static <T, U> Flow.Processor<T, U> toFlowProcessor(
100100
org.reactivestreams.Processor<? super T, ? extends U> reactiveStreamsProcessor
101101
) {
102102
if (reactiveStreamsProcessor == null) {
@@ -118,7 +118,7 @@ public static <T, U> Flow.Processor<T, U> toFlow(
118118
* @return the equivalent Flow Subscriber
119119
*/
120120
@SuppressWarnings("unchecked")
121-
public static <T> Flow.Subscriber<T> toFlow(org.reactivestreams.Subscriber<T> reactiveStreamsSubscriber) {
121+
public static <T> Flow.Subscriber<T> toFlowSubscriber(org.reactivestreams.Subscriber<T> reactiveStreamsSubscriber) {
122122
if (reactiveStreamsSubscriber == null) {
123123
throw new NullPointerException("reactiveStreamsSubscriber");
124124
}
@@ -138,7 +138,7 @@ public static <T> Flow.Subscriber<T> toFlow(org.reactivestreams.Subscriber<T> re
138138
* @return the equivalent Reactive Streams Subscriber
139139
*/
140140
@SuppressWarnings("unchecked")
141-
public static <T> org.reactivestreams.Subscriber<T> toReactiveStreams(Flow.Subscriber<T> flowSubscriber) {
141+
public static <T> org.reactivestreams.Subscriber<T> toSubscriber(Flow.Subscriber<T> flowSubscriber) {
142142
if (flowSubscriber == null) {
143143
throw new NullPointerException("flowSubscriber");
144144
}

flow-bridge/src/test/java/org/reactivestreams/ReactiveStreamsFlowBridgeTest.java

+62-6
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ public void execute(Runnable command) {
3131

3232
TestEitherConsumer<Integer> tc = new TestEitherConsumer<Integer>();
3333

34-
ReactiveStreamsFlowBridge.toFlow(p).subscribe(tc);
34+
ReactiveStreamsFlowBridge.toFlowPublisher(p).subscribe(tc);
3535

3636
p.offer(1);
3737
p.offer(2);
@@ -54,7 +54,7 @@ public void execute(Runnable command) {
5454

5555
TestEitherConsumer<Integer> tc = new TestEitherConsumer<Integer>();
5656

57-
ReactiveStreamsFlowBridge.toFlow(p).subscribe(tc);
57+
ReactiveStreamsFlowBridge.toFlowPublisher(p).subscribe(tc);
5858

5959
p.offer(1);
6060
p.offer(2);
@@ -77,7 +77,7 @@ public void execute(Runnable command) {
7777

7878
TestEitherConsumer<Integer> tc = new TestEitherConsumer<Integer>();
7979

80-
ReactiveStreamsFlowBridge.toReactiveStreams(p).subscribe(tc);
80+
ReactiveStreamsFlowBridge.toPublisher(p).subscribe(tc);
8181

8282
p.submit(1);
8383
p.submit(2);
@@ -100,7 +100,7 @@ public void execute(Runnable command) {
100100

101101
TestEitherConsumer<Integer> tc = new TestEitherConsumer<Integer>();
102102

103-
ReactiveStreamsFlowBridge.toReactiveStreams(p).subscribe(tc);
103+
ReactiveStreamsFlowBridge.toPublisher(p).subscribe(tc);
104104

105105
p.submit(1);
106106
p.submit(2);
@@ -116,7 +116,7 @@ public void execute(Runnable command) {
116116
public void reactiveStreamsToFlowSubscriber() {
117117
TestEitherConsumer<Integer> tc = new TestEitherConsumer<Integer>();
118118

119-
Flow.Subscriber<Integer> fs = ReactiveStreamsFlowBridge.toFlow(tc);
119+
Flow.Subscriber<Integer> fs = ReactiveStreamsFlowBridge.toFlowSubscriber(tc);
120120

121121
final Object[] state = { null, null };
122122

@@ -148,7 +148,7 @@ public void cancel() {
148148
public void flowToReactiveStreamsSubscriber() {
149149
TestEitherConsumer<Integer> tc = new TestEitherConsumer<Integer>();
150150

151-
org.reactivestreams.Subscriber<Integer> fs = ReactiveStreamsFlowBridge.toReactiveStreams(tc);
151+
org.reactivestreams.Subscriber<Integer> fs = ReactiveStreamsFlowBridge.toSubscriber(tc);
152152

153153
final Object[] state = { null, null };
154154

@@ -175,4 +175,60 @@ public void cancel() {
175175

176176
Assert.assertNull(state[1]);
177177
}
178+
179+
@Test
180+
public void stableConversionForSubscriber() {
181+
Subscriber<Integer> rsSub = new Subscriber<Integer>() {
182+
@Override public void onSubscribe(Subscription s) {};
183+
@Override public void onNext(Integer i) {};
184+
@Override public void onError(Throwable t) {};
185+
@Override public void onComplete() {};
186+
};
187+
188+
Flow.Subscriber<Integer> fSub = new Flow.Subscriber<Integer>() {
189+
@Override public void onSubscribe(Flow.Subscription s) {};
190+
@Override public void onNext(Integer i) {};
191+
@Override public void onError(Throwable t) {};
192+
@Override public void onComplete() {};
193+
};
194+
195+
Assert.assertSame(ReactiveStreamsFlowBridge.toSubscriber(ReactiveStreamsFlowBridge.toFlowSubscriber(rsSub)), rsSub);
196+
Assert.assertSame(ReactiveStreamsFlowBridge.toFlowSubscriber(ReactiveStreamsFlowBridge.toSubscriber(fSub)), fSub);
197+
}
198+
199+
@Test
200+
public void stableConversionForProcessor() {
201+
Processor<Integer, Integer> rsPro = new Processor<Integer, Integer>() {
202+
@Override public void onSubscribe(Subscription s) {};
203+
@Override public void onNext(Integer i) {};
204+
@Override public void onError(Throwable t) {};
205+
@Override public void onComplete() {};
206+
@Override public void subscribe(Subscriber s) {};
207+
};
208+
209+
Flow.Processor<Integer, Integer> fPro = new Flow.Processor<Integer, Integer>() {
210+
@Override public void onSubscribe(Flow.Subscription s) {};
211+
@Override public void onNext(Integer i) {};
212+
@Override public void onError(Throwable t) {};
213+
@Override public void onComplete() {};
214+
@Override public void subscribe(Flow.Subscriber s) {};
215+
};
216+
217+
Assert.assertSame(ReactiveStreamsFlowBridge.toProcessor(ReactiveStreamsFlowBridge.toFlowProcessor(rsPro)), rsPro);
218+
Assert.assertSame(ReactiveStreamsFlowBridge.toFlowProcessor(ReactiveStreamsFlowBridge.toProcessor(fPro)), fPro);
219+
}
220+
221+
@Test
222+
public void stableConversionForPublisher() {
223+
Publisher<Integer> rsPub = new Publisher<Integer>() {
224+
@Override public void subscribe(Subscriber s) {};
225+
};
226+
227+
Flow.Publisher<Integer> fPub = new Flow.Publisher<Integer>() {
228+
@Override public void subscribe(Flow.Subscriber s) {};
229+
};
230+
231+
Assert.assertSame(ReactiveStreamsFlowBridge.toPublisher(ReactiveStreamsFlowBridge.toFlowPublisher(rsPub)), rsPub);
232+
Assert.assertSame(ReactiveStreamsFlowBridge.toFlowPublisher(ReactiveStreamsFlowBridge.toPublisher(fPub)), fPub);
233+
}
178234
}

flow-bridge/src/test/java/org/reactivestreams/SubmissionPublisherTckTest.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -40,14 +40,14 @@ public void run() {
4040
sp.close();
4141
}
4242
}).start();
43-
return ReactiveStreamsFlowBridge.toReactiveStreams(sp);
43+
return ReactiveStreamsFlowBridge.toPublisher(sp);
4444
}
4545

4646
@Override
4747
public Publisher<Integer> createFailedPublisher() {
4848
final SubmissionPublisher<Integer> sp = new SubmissionPublisher<Integer>();
4949
sp.closeExceptionally(new IOException());
50-
return ReactiveStreamsFlowBridge.toReactiveStreams(sp);
50+
return ReactiveStreamsFlowBridge.toPublisher(sp);
5151
}
5252

5353
@Override

gradle/wrapper/gradle-wrapper.properties

+1-1
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,4 @@ distributionBase=GRADLE_USER_HOME
33
distributionPath=wrapper/dists
44
zipStoreBase=GRADLE_USER_HOME
55
zipStorePath=wrapper/dists
6-
distributionUrl=https\://services.gradle.org/distributions/gradle-4.0.1-bin.zip
6+
distributionUrl=http\://services.gradle.org/distributions/gradle-4.0.1-bin.zip

tck-flow/src/main/java/org/reactivestreams/tck/flow/FlowPublisherVerification.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ public FlowPublisherVerification(TestEnvironment env) {
3636
@Override
3737
final public Publisher<T> createPublisher(long elements) {
3838
final Flow.Publisher<T> flowPublisher = createFlowPublisher(elements);
39-
return ReactiveStreamsFlowBridge.toReactiveStreams(flowPublisher);
39+
return ReactiveStreamsFlowBridge.toPublisher(flowPublisher);
4040
}
4141
/**
4242
* This is the main method you must implement in your test incarnation.
@@ -49,7 +49,7 @@ final public Publisher<T> createPublisher(long elements) {
4949
final public Publisher<T> createFailedPublisher() {
5050
final Flow.Publisher<T> failed = createFailedFlowPublisher();
5151
if (failed == null) return null; // because `null` means "SKIP" in createFailedPublisher
52-
else return ReactiveStreamsFlowBridge.toReactiveStreams(failed);
52+
else return ReactiveStreamsFlowBridge.toPublisher(failed);
5353
}
5454
/**
5555
* By implementing this method, additional TCK tests concerning a "failed" publishers will be run.

tck-flow/src/main/java/org/reactivestreams/tck/flow/FlowSubscriberBlackboxVerification.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ protected FlowSubscriberBlackboxVerification(TestEnvironment env) {
4040

4141
@Override
4242
public final void triggerRequest(Subscriber<? super T> subscriber) {
43-
triggerFlowRequest(ReactiveStreamsFlowBridge.toFlow(subscriber));
43+
triggerFlowRequest(ReactiveStreamsFlowBridge.toFlowSubscriber(subscriber));
4444
}
4545
/**
4646
* Override this method if the {@link java.util.concurrent.Flow.Subscriber} implementation you are verifying
@@ -54,7 +54,7 @@ public void triggerFlowRequest(Flow.Subscriber<? super T> subscriber) {
5454

5555
@Override
5656
public final Subscriber<T> createSubscriber() {
57-
return ReactiveStreamsFlowBridge.<T>toReactiveStreams(createFlowSubscriber());
57+
return ReactiveStreamsFlowBridge.<T>toSubscriber(createFlowSubscriber());
5858
}
5959
/**
6060
* This is the main method you must implement in your test incarnation.

tck-flow/src/main/java/org/reactivestreams/tck/flow/FlowSubscriberWhiteboxVerification.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ protected FlowSubscriberWhiteboxVerification(TestEnvironment env) {
3535

3636
@Override
3737
final public Subscriber<T> createSubscriber(WhiteboxSubscriberProbe<T> probe) {
38-
return ReactiveStreamsFlowBridge.toReactiveStreams(createFlowSubscriber(probe));
38+
return ReactiveStreamsFlowBridge.toSubscriber(createFlowSubscriber(probe));
3939
}
4040
/**
4141
* This is the main method you must implement in your test incarnation.

tck-flow/src/test/java/org/reactivestreams/tck/flow/EmptyLazyFlowPublisherTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ public EmptyLazyFlowPublisherTest() {
4141

4242
@Override
4343
public Publisher<Integer> createFlowPublisher(long elements) {
44-
return ReactiveStreamsFlowBridge.toFlow(
44+
return ReactiveStreamsFlowBridge.toFlowPublisher(
4545
new AsyncIterablePublisher<Integer>(Collections.<Integer>emptyList(), ex)
4646
);
4747
}

tck-flow/src/test/java/org/reactivestreams/tck/flow/SingleElementFlowPublisherTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ public SingleElementFlowPublisherTest() {
4242

4343
@Override
4444
public Flow.Publisher<Integer> createFlowPublisher(long elements) {
45-
return ReactiveStreamsFlowBridge.toFlow(new AsyncIterablePublisher<Integer>(Collections.singleton(1), ex));
45+
return ReactiveStreamsFlowBridge.toFlowPublisher(new AsyncIterablePublisher<Integer>(Collections.singleton(1), ex));
4646
}
4747

4848
@Override

0 commit comments

Comments
 (0)