Skip to content

Commit d0807a0

Browse files
ktosoviktorklang
authored andcommitted
[WIP] TCK for j.u.c.Flow types "directly" (#398)
* Add JDK9 TCK, using adapters * Fixing wrapping and unwrapping of the wrappers themselves. * Renames the converters to "toX" for RS and "toFlowX" for Flow. Fixes so that the dist url for gradle is http iso https (TravisCI bug?) Adds regression test for bridge converters. * fix formatting * cleanup
1 parent 7021f18 commit d0807a0

File tree

50 files changed

+1791
-257
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

50 files changed

+1791
-257
lines changed

.gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -26,3 +26,4 @@ test-results
2626
test-tmp
2727
*.class
2828
gradle.properties
29+
*.orig

.java-version

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
9

build.gradle

+5-1
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,11 @@ subprojects {
4646
}
4747
}
4848

49-
if (name in ["reactive-streams", "reactive-streams-tck", "reactive-streams-examples", "reactive-streams-flow-bridge"]) {
49+
if (name in ["reactive-streams",
50+
"reactive-streams-tck",
51+
"reactive-streams-tck-flow",
52+
"reactive-streams-examples",
53+
"reactive-streams-flow-bridge"]) {
5054
apply plugin: "maven"
5155
apply plugin: "signing"
5256

flow-bridge/src/main/java/org/reactivestreams/ReactiveStreamsFlowBridge.java

+64-66
Original file line numberDiff line numberDiff line change
@@ -29,17 +29,17 @@ private ReactiveStreamsFlowBridge() {
2929
* @return the equivalent Reactive Streams Publisher
3030
*/
3131
@SuppressWarnings("unchecked")
32-
public static <T> org.reactivestreams.Publisher<T> toReactiveStreams(
32+
public static <T> org.reactivestreams.Publisher<T> toPublisher(
3333
Flow.Publisher<? extends T> flowPublisher) {
3434
if (flowPublisher == null) {
3535
throw new NullPointerException("flowPublisher");
3636
}
37-
if (flowPublisher instanceof org.reactivestreams.Publisher) {
38-
return (org.reactivestreams.Publisher<T>)flowPublisher;
39-
}
4037
if (flowPublisher instanceof FlowPublisherFromReactive) {
4138
return (org.reactivestreams.Publisher<T>)(((FlowPublisherFromReactive<T>)flowPublisher).reactiveStreams);
4239
}
40+
if (flowPublisher instanceof org.reactivestreams.Publisher) {
41+
return (org.reactivestreams.Publisher<T>)flowPublisher;
42+
}
4343
return new ReactivePublisherFromFlow<T>(flowPublisher);
4444
}
4545

@@ -50,21 +50,21 @@ public static <T> org.reactivestreams.Publisher<T> toReactiveStreams(
5050
* @return the equivalent Flow Publisher
5151
*/
5252
@SuppressWarnings("unchecked")
53-
public static <T> Flow.Publisher<T> toFlow(
53+
public static <T> Flow.Publisher<T> toFlowPublisher(
5454
org.reactivestreams.Publisher<? extends T> reactiveStreamsPublisher
5555
) {
5656
if (reactiveStreamsPublisher == null) {
5757
throw new NullPointerException("reactiveStreamsPublisher");
5858
}
59-
if (reactiveStreamsPublisher instanceof Flow.Publisher) {
60-
return (Flow.Publisher<T>)reactiveStreamsPublisher;
61-
}
6259
if (reactiveStreamsPublisher instanceof ReactivePublisherFromFlow) {
6360
return (Flow.Publisher<T>)(((ReactivePublisherFromFlow<T>)reactiveStreamsPublisher).flow);
6461
}
62+
if (reactiveStreamsPublisher instanceof Flow.Publisher) {
63+
return (Flow.Publisher<T>)reactiveStreamsPublisher;
64+
}
6565
return new FlowPublisherFromReactive<T>(reactiveStreamsPublisher);
6666
}
67-
67+
6868
/**
6969
* Converts a Flow Processor into a Reactive Streams Processor.
7070
* @param <T> the input value type
@@ -73,18 +73,18 @@ public static <T> Flow.Publisher<T> toFlow(
7373
* @return the equivalent Reactive Streams Processor
7474
*/
7575
@SuppressWarnings("unchecked")
76-
public static <T, U> org.reactivestreams.Processor<T, U> toReactiveStreams(
76+
public static <T, U> org.reactivestreams.Processor<T, U> toProcessor(
7777
Flow.Processor<? super T, ? extends U> flowProcessor
7878
) {
7979
if (flowProcessor == null) {
8080
throw new NullPointerException("flowProcessor");
8181
}
82-
if (flowProcessor instanceof org.reactivestreams.Processor) {
83-
return (org.reactivestreams.Processor<T, U>)flowProcessor;
84-
}
8582
if (flowProcessor instanceof FlowToReactiveProcessor) {
8683
return (org.reactivestreams.Processor<T, U>)(((FlowToReactiveProcessor<T, U>)flowProcessor).reactiveStreams);
8784
}
85+
if (flowProcessor instanceof org.reactivestreams.Processor) {
86+
return (org.reactivestreams.Processor<T, U>)flowProcessor;
87+
}
8888
return new ReactiveToFlowProcessor<T, U>(flowProcessor);
8989
}
9090

@@ -96,18 +96,18 @@ public static <T, U> org.reactivestreams.Processor<T, U> toReactiveStreams(
9696
* @return the equivalent Flow Processor
9797
*/
9898
@SuppressWarnings("unchecked")
99-
public static <T, U> Flow.Processor<T, U> toFlow(
99+
public static <T, U> Flow.Processor<T, U> toFlowProcessor(
100100
org.reactivestreams.Processor<? super T, ? extends U> reactiveStreamsProcessor
101101
) {
102102
if (reactiveStreamsProcessor == null) {
103103
throw new NullPointerException("reactiveStreamsProcessor");
104104
}
105-
if (reactiveStreamsProcessor instanceof Flow.Processor) {
106-
return (Flow.Processor<T, U>)reactiveStreamsProcessor;
107-
}
108105
if (reactiveStreamsProcessor instanceof ReactiveToFlowProcessor) {
109106
return (Flow.Processor<T, U>)(((ReactiveToFlowProcessor<T, U>)reactiveStreamsProcessor).flow);
110107
}
108+
if (reactiveStreamsProcessor instanceof Flow.Processor) {
109+
return (Flow.Processor<T, U>)reactiveStreamsProcessor;
110+
}
111111
return new FlowToReactiveProcessor<T, U>(reactiveStreamsProcessor);
112112
}
113113

@@ -117,10 +117,17 @@ public static <T, U> Flow.Processor<T, U> toFlow(
117117
* @param reactiveStreamsSubscriber the Reactive Streams Subscriber instance to convert
118118
* @return the equivalent Flow Subscriber
119119
*/
120+
@SuppressWarnings("unchecked")
120121
public static <T> Flow.Subscriber<T> toFlowSubscriber(org.reactivestreams.Subscriber<T> reactiveStreamsSubscriber) {
121122
if (reactiveStreamsSubscriber == null) {
122123
throw new NullPointerException("reactiveStreamsSubscriber");
123124
}
125+
if (reactiveStreamsSubscriber instanceof ReactiveToFlowSubscriber) {
126+
return (Flow.Subscriber<T>)((ReactiveToFlowSubscriber<T>)reactiveStreamsSubscriber).flow;
127+
}
128+
if (reactiveStreamsSubscriber instanceof Flow.Subscriber) {
129+
return (Flow.Subscriber<T>)reactiveStreamsSubscriber;
130+
}
124131
return new FlowToReactiveSubscriber<T>(reactiveStreamsSubscriber);
125132
}
126133

@@ -130,19 +137,26 @@ public static <T> Flow.Subscriber<T> toFlowSubscriber(org.reactivestreams.Subscr
130137
* @param flowSubscriber the Flow Subscriber instance to convert
131138
* @return the equivalent Reactive Streams Subscriber
132139
*/
133-
public static <T> org.reactivestreams.Subscriber<T> toReactiveStreamsSubscriber(Flow.Subscriber<T> flowSubscriber) {
140+
@SuppressWarnings("unchecked")
141+
public static <T> org.reactivestreams.Subscriber<T> toSubscriber(Flow.Subscriber<T> flowSubscriber) {
134142
if (flowSubscriber == null) {
135143
throw new NullPointerException("flowSubscriber");
136144
}
145+
if (flowSubscriber instanceof FlowToReactiveSubscriber) {
146+
return (org.reactivestreams.Subscriber<T>)((FlowToReactiveSubscriber<T>)flowSubscriber).reactiveStreams;
147+
}
148+
if (flowSubscriber instanceof org.reactivestreams.Subscriber) {
149+
return (org.reactivestreams.Subscriber<T>)flowSubscriber;
150+
}
137151
return new ReactiveToFlowSubscriber<T>(flowSubscriber);
138152
}
139153

140154
/**
141155
* Wraps a Reactive Streams Subscription and converts the calls to a Flow Subscription.
142156
*/
143157
static final class FlowToReactiveSubscription implements Flow.Subscription {
144-
private final org.reactivestreams.Subscription reactiveStreams;
145-
158+
final org.reactivestreams.Subscription reactiveStreams;
159+
146160
public FlowToReactiveSubscription(org.reactivestreams.Subscription reactive) {
147161
this.reactiveStreams = reactive;
148162
}
@@ -156,15 +170,15 @@ public void request(long n) {
156170
public void cancel() {
157171
reactiveStreams.cancel();
158172
}
159-
173+
160174
}
161-
175+
162176
/**
163177
* Wraps a Flow Subscription and converts the calls to a Reactive Streams Subscription.
164178
*/
165179
static final class ReactiveToFlowSubscription implements org.reactivestreams.Subscription {
166-
private final Flow.Subscription flow;
167-
180+
final Flow.Subscription flow;
181+
168182
public ReactiveToFlowSubscription(Flow.Subscription flow) {
169183
this.flow = flow;
170184
}
@@ -178,25 +192,25 @@ public void request(long n) {
178192
public void cancel() {
179193
flow.cancel();
180194
}
181-
182-
195+
196+
183197
}
184-
198+
185199
/**
186200
* Wraps a Reactive Streams Subscriber and forwards methods of the Flow Subscriber to it.
187201
* @param <T> the element type
188202
*/
189-
static final class FlowToReactiveSubscriber<T>
203+
static final class FlowToReactiveSubscriber<T>
190204
implements Flow.Subscriber<T> {
191-
private final org.reactivestreams.Subscriber<? super T> reactiveStreams;
192-
205+
final org.reactivestreams.Subscriber<? super T> reactiveStreams;
206+
193207
public FlowToReactiveSubscriber(org.reactivestreams.Subscriber<? super T> reactive) {
194208
this.reactiveStreams = reactive;
195209
}
196210

197211
@Override
198212
public void onSubscribe(Flow.Subscription subscription) {
199-
reactiveStreams.onSubscribe(new ReactiveToFlowSubscription(subscription));
213+
reactiveStreams.onSubscribe((subscription == null) ? null : new ReactiveToFlowSubscription(subscription));
200214
}
201215

202216
@Override
@@ -213,24 +227,24 @@ public void onError(Throwable throwable) {
213227
public void onComplete() {
214228
reactiveStreams.onComplete();
215229
}
216-
230+
217231
}
218232

219233
/**
220234
* Wraps a Reactive Streams Subscriber and forwards methods of the Flow Subscriber to it.
221235
* @param <T> the element type
222236
*/
223-
static final class ReactiveToFlowSubscriber<T>
237+
static final class ReactiveToFlowSubscriber<T>
224238
implements org.reactivestreams.Subscriber<T> {
225-
private final Flow.Subscriber<? super T> flow;
226-
239+
final Flow.Subscriber<? super T> flow;
240+
227241
public ReactiveToFlowSubscriber(Flow.Subscriber<? super T> flow) {
228242
this.flow = flow;
229243
}
230244

231245
@Override
232246
public void onSubscribe(org.reactivestreams.Subscription subscription) {
233-
flow.onSubscribe(new FlowToReactiveSubscription(subscription));
247+
flow.onSubscribe((subscription == null) ? null : new FlowToReactiveSubscription(subscription));
234248
}
235249

236250
@Override
@@ -247,9 +261,9 @@ public void onError(Throwable throwable) {
247261
public void onComplete() {
248262
flow.onComplete();
249263
}
250-
264+
251265
}
252-
266+
253267
/**
254268
* Wraps a Flow Processor and forwards methods of the Reactive Streams Processor to it.
255269
* @param <T> the input type
@@ -258,14 +272,14 @@ public void onComplete() {
258272
static final class ReactiveToFlowProcessor<T, U>
259273
implements org.reactivestreams.Processor<T, U> {
260274
final Flow.Processor<? super T, ? extends U> flow;
261-
275+
262276
public ReactiveToFlowProcessor(Flow.Processor<? super T, ? extends U> flow) {
263277
this.flow = flow;
264278
}
265279

266280
@Override
267-
public void onSubscribe(org.reactivestreams.Subscription s) {
268-
flow.onSubscribe(new FlowToReactiveSubscription(s));
281+
public void onSubscribe(org.reactivestreams.Subscription subscription) {
282+
flow.onSubscribe((subscription == null) ? null : new FlowToReactiveSubscription(subscription));
269283
}
270284

271285
@Override
@@ -285,14 +299,10 @@ public void onComplete() {
285299

286300
@Override
287301
public void subscribe(org.reactivestreams.Subscriber<? super U> s) {
288-
if (s == null) {
289-
flow.subscribe(null);
290-
return;
291-
}
292-
flow.subscribe(new FlowToReactiveSubscriber<U>(s));
302+
flow.subscribe((s == null) ? null : new FlowToReactiveSubscriber<U>(s));
293303
}
294304
}
295-
305+
296306
/**
297307
* Wraps a Reactive Streams Processor and forwards methods of the Flow Processor to it.
298308
* @param <T> the input type
@@ -301,14 +311,14 @@ public void subscribe(org.reactivestreams.Subscriber<? super U> s) {
301311
static final class FlowToReactiveProcessor<T, U>
302312
implements Flow.Processor<T, U> {
303313
final org.reactivestreams.Processor<? super T, ? extends U> reactiveStreams;
304-
314+
305315
public FlowToReactiveProcessor(org.reactivestreams.Processor<? super T, ? extends U> reactive) {
306316
this.reactiveStreams = reactive;
307317
}
308318

309319
@Override
310-
public void onSubscribe(Flow.Subscription s) {
311-
reactiveStreams.onSubscribe(new ReactiveToFlowSubscription(s));
320+
public void onSubscribe(Flow.Subscription subscription) {
321+
reactiveStreams.onSubscribe((subscription == null) ? null : new ReactiveToFlowSubscription(subscription));
312322
}
313323

314324
@Override
@@ -328,11 +338,7 @@ public void onComplete() {
328338

329339
@Override
330340
public void subscribe(Flow.Subscriber<? super U> s) {
331-
if (s == null) {
332-
reactiveStreams.subscribe(null);
333-
return;
334-
}
335-
reactiveStreams.subscribe(new ReactiveToFlowSubscriber<U>(s));
341+
reactiveStreams.subscribe((s == null) ? null : new ReactiveToFlowSubscriber<U>(s));
336342
}
337343
}
338344

@@ -350,11 +356,7 @@ public ReactivePublisherFromFlow(Flow.Publisher<? extends T> flowPublisher) {
350356

351357
@Override
352358
public void subscribe(org.reactivestreams.Subscriber<? super T> reactive) {
353-
if (reactive == null) {
354-
flow.subscribe(null);
355-
return;
356-
}
357-
flow.subscribe(new FlowToReactiveSubscriber<T>(reactive));
359+
flow.subscribe((reactive == null) ? null : new FlowToReactiveSubscriber<T>(reactive));
358360
}
359361
}
360362

@@ -372,12 +374,8 @@ public FlowPublisherFromReactive(org.reactivestreams.Publisher<? extends T> reac
372374

373375
@Override
374376
public void subscribe(Flow.Subscriber<? super T> flow) {
375-
if (flow == null) {
376-
reactiveStreams.subscribe(null);
377-
return;
378-
}
379-
reactiveStreams.subscribe(new ReactiveToFlowSubscriber<T>(flow));
377+
reactiveStreams.subscribe((flow == null) ? null : new ReactiveToFlowSubscriber<T>(flow));
380378
}
381379
}
382380

383-
}
381+
}

0 commit comments

Comments
 (0)