Skip to content

Commit e35584b

Browse files
committed
Renames the converters to "toX" for RS and "toFlowX" for Flow.
Fixes so that the dist url for gradle is http iso https (TravisCI bug?) Adds regression test for bridge converters.
1 parent db96849 commit e35584b

File tree

9 files changed

+79
-23
lines changed

9 files changed

+79
-23
lines changed

flow-bridge/src/main/java/org/reactivestreams/ReactiveStreamsFlowBridge.java

+7-7
Original file line numberDiff line numberDiff line change
@@ -29,15 +29,15 @@ private ReactiveStreamsFlowBridge() {
2929
* @return the equivalent Reactive Streams Publisher
3030
*/
3131
@SuppressWarnings("unchecked")
32-
public static <T> org.reactivestreams.Publisher<T> toReactiveStreams(
32+
public static <T> org.reactivestreams.Publisher<T> toPublisher(
3333
Flow.Publisher<? extends T> flowPublisher) {
3434
if (flowPublisher == null) {
3535
throw new NullPointerException("flowPublisher");
3636
}
3737
if (flowPublisher instanceof FlowPublisherFromReactive) {
3838
return (org.reactivestreams.Publisher<T>)(((FlowPublisherFromReactive<T>)flowPublisher).reactiveStreams);
39-
if (flowPublisher instanceof org.reactivestreams.Publisher) {
4039
}
40+
if (flowPublisher instanceof org.reactivestreams.Publisher) {
4141
return (org.reactivestreams.Publisher<T>)flowPublisher;
4242
}
4343
return new ReactivePublisherFromFlow<T>(flowPublisher);
@@ -50,7 +50,7 @@ public static <T> org.reactivestreams.Publisher<T> toReactiveStreams(
5050
* @return the equivalent Flow Publisher
5151
*/
5252
@SuppressWarnings("unchecked")
53-
public static <T> Flow.Publisher<T> toFlow(
53+
public static <T> Flow.Publisher<T> toFlowPublisher(
5454
org.reactivestreams.Publisher<? extends T> reactiveStreamsPublisher
5555
) {
5656
if (reactiveStreamsPublisher == null) {
@@ -73,7 +73,7 @@ public static <T> Flow.Publisher<T> toFlow(
7373
* @return the equivalent Reactive Streams Processor
7474
*/
7575
@SuppressWarnings("unchecked")
76-
public static <T, U> org.reactivestreams.Processor<T, U> toReactiveStreams(
76+
public static <T, U> org.reactivestreams.Processor<T, U> toProcessor(
7777
Flow.Processor<? super T, ? extends U> flowProcessor
7878
) {
7979
if (flowProcessor == null) {
@@ -96,7 +96,7 @@ public static <T, U> org.reactivestreams.Processor<T, U> toReactiveStreams(
9696
* @return the equivalent Flow Processor
9797
*/
9898
@SuppressWarnings("unchecked")
99-
public static <T, U> Flow.Processor<T, U> toFlow(
99+
public static <T, U> Flow.Processor<T, U> toFlowProcessor(
100100
org.reactivestreams.Processor<? super T, ? extends U> reactiveStreamsProcessor
101101
) {
102102
if (reactiveStreamsProcessor == null) {
@@ -118,7 +118,7 @@ public static <T, U> Flow.Processor<T, U> toFlow(
118118
* @return the equivalent Flow Subscriber
119119
*/
120120
@SuppressWarnings("unchecked")
121-
public static <T> Flow.Subscriber<T> toFlow(org.reactivestreams.Subscriber<T> reactiveStreamsSubscriber) {
121+
public static <T> Flow.Subscriber<T> toFlowSubscriber(org.reactivestreams.Subscriber<T> reactiveStreamsSubscriber) {
122122
if (reactiveStreamsSubscriber == null) {
123123
throw new NullPointerException("reactiveStreamsSubscriber");
124124
}
@@ -138,7 +138,7 @@ public static <T> Flow.Subscriber<T> toFlow(org.reactivestreams.Subscriber<T> re
138138
* @return the equivalent Reactive Streams Subscriber
139139
*/
140140
@SuppressWarnings("unchecked")
141-
public static <T> org.reactivestreams.Subscriber<T> toReactiveStreams(Flow.Subscriber<T> flowSubscriber) {
141+
public static <T> org.reactivestreams.Subscriber<T> toSubscriber(Flow.Subscriber<T> flowSubscriber) {
142142
if (flowSubscriber == null) {
143143
throw new NullPointerException("flowSubscriber");
144144
}

flow-bridge/src/test/java/org/reactivestreams/ReactiveStreamsFlowBridgeTest.java

+62-6
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ public void execute(Runnable command) {
3131

3232
TestEitherConsumer<Integer> tc = new TestEitherConsumer<Integer>();
3333

34-
ReactiveStreamsFlowBridge.toFlow(p).subscribe(tc);
34+
ReactiveStreamsFlowBridge.toFlowPublisher(p).subscribe(tc);
3535

3636
p.offer(1);
3737
p.offer(2);
@@ -54,7 +54,7 @@ public void execute(Runnable command) {
5454

5555
TestEitherConsumer<Integer> tc = new TestEitherConsumer<Integer>();
5656

57-
ReactiveStreamsFlowBridge.toFlow(p).subscribe(tc);
57+
ReactiveStreamsFlowBridge.toFlowPublisher(p).subscribe(tc);
5858

5959
p.offer(1);
6060
p.offer(2);
@@ -77,7 +77,7 @@ public void execute(Runnable command) {
7777

7878
TestEitherConsumer<Integer> tc = new TestEitherConsumer<Integer>();
7979

80-
ReactiveStreamsFlowBridge.toReactiveStreams(p).subscribe(tc);
80+
ReactiveStreamsFlowBridge.toPublisher(p).subscribe(tc);
8181

8282
p.submit(1);
8383
p.submit(2);
@@ -100,7 +100,7 @@ public void execute(Runnable command) {
100100

101101
TestEitherConsumer<Integer> tc = new TestEitherConsumer<Integer>();
102102

103-
ReactiveStreamsFlowBridge.toReactiveStreams(p).subscribe(tc);
103+
ReactiveStreamsFlowBridge.toPublisher(p).subscribe(tc);
104104

105105
p.submit(1);
106106
p.submit(2);
@@ -116,7 +116,7 @@ public void execute(Runnable command) {
116116
public void reactiveStreamsToFlowSubscriber() {
117117
TestEitherConsumer<Integer> tc = new TestEitherConsumer<Integer>();
118118

119-
Flow.Subscriber<Integer> fs = ReactiveStreamsFlowBridge.toFlow(tc);
119+
Flow.Subscriber<Integer> fs = ReactiveStreamsFlowBridge.toFlowSubscriber(tc);
120120

121121
final Object[] state = { null, null };
122122

@@ -148,7 +148,7 @@ public void cancel() {
148148
public void flowToReactiveStreamsSubscriber() {
149149
TestEitherConsumer<Integer> tc = new TestEitherConsumer<Integer>();
150150

151-
org.reactivestreams.Subscriber<Integer> fs = ReactiveStreamsFlowBridge.toReactiveStreams(tc);
151+
org.reactivestreams.Subscriber<Integer> fs = ReactiveStreamsFlowBridge.toSubscriber(tc);
152152

153153
final Object[] state = { null, null };
154154

@@ -175,4 +175,60 @@ public void cancel() {
175175

176176
Assert.assertNull(state[1]);
177177
}
178+
179+
@Test
180+
public void stableConversionForSubscriber() {
181+
Subscriber<Integer> rsSub = new Subscriber<Integer>() {
182+
@Override public void onSubscribe(Subscription s) {};
183+
@Override public void onNext(Integer i) {};
184+
@Override public void onError(Throwable t) {};
185+
@Override public void onComplete() {};
186+
};
187+
188+
Flow.Subscriber<Integer> fSub = new Flow.Subscriber<Integer>() {
189+
@Override public void onSubscribe(Flow.Subscription s) {};
190+
@Override public void onNext(Integer i) {};
191+
@Override public void onError(Throwable t) {};
192+
@Override public void onComplete() {};
193+
};
194+
195+
Assert.assertSame(ReactiveStreamsFlowBridge.toSubscriber(ReactiveStreamsFlowBridge.toFlowSubscriber(rsSub)), rsSub);
196+
Assert.assertSame(ReactiveStreamsFlowBridge.toFlowSubscriber(ReactiveStreamsFlowBridge.toSubscriber(fSub)), fSub);
197+
}
198+
199+
@Test
200+
public void stableConversionForProcessor() {
201+
Processor<Integer, Integer> rsPro = new Processor<Integer, Integer>() {
202+
@Override public void onSubscribe(Subscription s) {};
203+
@Override public void onNext(Integer i) {};
204+
@Override public void onError(Throwable t) {};
205+
@Override public void onComplete() {};
206+
@Override public void subscribe(Subscriber s) {};
207+
};
208+
209+
Flow.Processor<Integer, Integer> fPro = new Flow.Processor<Integer, Integer>() {
210+
@Override public void onSubscribe(Flow.Subscription s) {};
211+
@Override public void onNext(Integer i) {};
212+
@Override public void onError(Throwable t) {};
213+
@Override public void onComplete() {};
214+
@Override public void subscribe(Flow.Subscriber s) {};
215+
};
216+
217+
Assert.assertSame(ReactiveStreamsFlowBridge.toProcessor(ReactiveStreamsFlowBridge.toFlowProcessor(rsPro)), rsPro);
218+
Assert.assertSame(ReactiveStreamsFlowBridge.toFlowProcessor(ReactiveStreamsFlowBridge.toProcessor(fPro)), fPro);
219+
}
220+
221+
@Test
222+
public void stableConversionForPublisher() {
223+
Publisher<Integer> rsPub = new Publisher<Integer>() {
224+
@Override public void subscribe(Subscriber s) {};
225+
};
226+
227+
Flow.Publisher<Integer> fPub = new Flow.Publisher<Integer>() {
228+
@Override public void subscribe(Flow.Subscriber s) {};
229+
};
230+
231+
Assert.assertSame(ReactiveStreamsFlowBridge.toPublisher(ReactiveStreamsFlowBridge.toFlowPublisher(rsPub)), rsPub);
232+
Assert.assertSame(ReactiveStreamsFlowBridge.toFlowPublisher(ReactiveStreamsFlowBridge.toPublisher(fPub)), fPub);
233+
}
178234
}

flow-bridge/src/test/java/org/reactivestreams/SubmissionPublisherTckTest.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -40,14 +40,14 @@ public void run() {
4040
sp.close();
4141
}
4242
}).start();
43-
return ReactiveStreamsFlowBridge.toReactiveStreams(sp);
43+
return ReactiveStreamsFlowBridge.toPublisher(sp);
4444
}
4545

4646
@Override
4747
public Publisher<Integer> createFailedPublisher() {
4848
final SubmissionPublisher<Integer> sp = new SubmissionPublisher<Integer>();
4949
sp.closeExceptionally(new IOException());
50-
return ReactiveStreamsFlowBridge.toReactiveStreams(sp);
50+
return ReactiveStreamsFlowBridge.toPublisher(sp);
5151
}
5252

5353
@Override

gradle/wrapper/gradle-wrapper.properties

+1-1
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,4 @@ distributionBase=GRADLE_USER_HOME
33
distributionPath=wrapper/dists
44
zipStoreBase=GRADLE_USER_HOME
55
zipStorePath=wrapper/dists
6-
distributionUrl=https\://services.gradle.org/distributions/gradle-4.0.1-bin.zip
6+
distributionUrl=http\://services.gradle.org/distributions/gradle-4.0.1-bin.zip

tck-flow/src/main/java/org/reactivestreams/tck/flow/FlowPublisherVerification.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ public FlowPublisherVerification(TestEnvironment env) {
3636
@Override
3737
final public Publisher<T> createPublisher(long elements) {
3838
final Flow.Publisher<T> flowPublisher = createFlowPublisher(elements);
39-
return ReactiveStreamsFlowBridge.toReactiveStreams(flowPublisher);
39+
return ReactiveStreamsFlowBridge.toPublisher(flowPublisher);
4040
}
4141
/**
4242
* This is the main method you must implement in your test incarnation.
@@ -49,7 +49,7 @@ final public Publisher<T> createPublisher(long elements) {
4949
final public Publisher<T> createFailedPublisher() {
5050
final Flow.Publisher<T> failed = createFailedFlowPublisher();
5151
if (failed == null) return null; // because `null` means "SKIP" in createFailedPublisher
52-
else return ReactiveStreamsFlowBridge.toReactiveStreams(failed);
52+
else return ReactiveStreamsFlowBridge.toPublisher(failed);
5353
}
5454
/**
5555
* By implementing this method, additional TCK tests concerning a "failed" publishers will be run.

tck-flow/src/main/java/org/reactivestreams/tck/flow/FlowSubscriberBlackboxVerification.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ protected FlowSubscriberBlackboxVerification(TestEnvironment env) {
4040

4141
@Override
4242
public final void triggerRequest(Subscriber<? super T> subscriber) {
43-
triggerFlowRequest(ReactiveStreamsFlowBridge.toFlow(subscriber));
43+
triggerFlowRequest(ReactiveStreamsFlowBridge.toFlowSubscriber(subscriber));
4444
}
4545
/**
4646
* Override this method if the {@link java.util.concurrent.Flow.Subscriber} implementation you are verifying
@@ -54,7 +54,7 @@ public void triggerFlowRequest(Flow.Subscriber<? super T> subscriber) {
5454

5555
@Override
5656
public final Subscriber<T> createSubscriber() {
57-
return ReactiveStreamsFlowBridge.<T>toReactiveStreams(createFlowSubscriber());
57+
return ReactiveStreamsFlowBridge.<T>toSubscriber(createFlowSubscriber());
5858
}
5959
/**
6060
* This is the main method you must implement in your test incarnation.

tck-flow/src/main/java/org/reactivestreams/tck/flow/FlowSubscriberWhiteboxVerification.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ protected FlowSubscriberWhiteboxVerification(TestEnvironment env) {
3535

3636
@Override
3737
final public Subscriber<T> createSubscriber(WhiteboxSubscriberProbe<T> probe) {
38-
return ReactiveStreamsFlowBridge.toReactiveStreams(createFlowSubscriber(probe));
38+
return ReactiveStreamsFlowBridge.toSubscriber(createFlowSubscriber(probe));
3939
}
4040
/**
4141
* This is the main method you must implement in your test incarnation.

tck-flow/src/test/java/org/reactivestreams/tck/flow/EmptyLazyFlowPublisherTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ public EmptyLazyFlowPublisherTest() {
4141

4242
@Override
4343
public Publisher<Integer> createFlowPublisher(long elements) {
44-
return ReactiveStreamsFlowBridge.toFlow(
44+
return ReactiveStreamsFlowBridge.toFlowPublisher(
4545
new AsyncIterablePublisher<Integer>(Collections.<Integer>emptyList(), ex)
4646
);
4747
}

tck-flow/src/test/java/org/reactivestreams/tck/flow/SingleElementFlowPublisherTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ public SingleElementFlowPublisherTest() {
4242

4343
@Override
4444
public Flow.Publisher<Integer> createFlowPublisher(long elements) {
45-
return ReactiveStreamsFlowBridge.toFlow(new AsyncIterablePublisher<Integer>(Collections.singleton(1), ex));
45+
return ReactiveStreamsFlowBridge.toFlowPublisher(new AsyncIterablePublisher<Integer>(Collections.singleton(1), ex));
4646
}
4747

4848
@Override

0 commit comments

Comments
 (0)