Skip to content

Commit fc8a2f4

Browse files
committed
Restructuring the adapters to use requireNonNull & single returns
1 parent 138c473 commit fc8a2f4

File tree

1 file changed

+79
-83
lines changed

1 file changed

+79
-83
lines changed

api/src/main/java9/org/reactivestreams/FlowAdapters.java

+79-83
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
package org.reactivestreams;
1313

1414
import java.util.concurrent.Flow;
15+
import static java.util.Objects.requireNonNull;
1516

1617
/**
1718
* Bridge between Reactive Streams API and the Java 9 {@link java.util.concurrent.Flow} API.
@@ -31,16 +32,16 @@ private FlowAdapters() {
3132
@SuppressWarnings("unchecked")
3233
public static <T> org.reactivestreams.Publisher<T> toPublisher(
3334
Flow.Publisher<? extends T> flowPublisher) {
34-
if (flowPublisher == null) {
35-
throw new NullPointerException("flowPublisher");
36-
}
35+
requireNonNull(flowPublisher, "flowPublisher");
36+
final org.reactivestreams.Publisher<T> publisher;
3737
if (flowPublisher instanceof FlowPublisherFromReactive) {
38-
return (org.reactivestreams.Publisher<T>)(((FlowPublisherFromReactive<T>)flowPublisher).reactiveStreams);
39-
}
40-
if (flowPublisher instanceof org.reactivestreams.Publisher) {
41-
return (org.reactivestreams.Publisher<T>)flowPublisher;
38+
publisher = (org.reactivestreams.Publisher<T>)(((FlowPublisherFromReactive<T>)flowPublisher).reactiveStreams);
39+
} else if (flowPublisher instanceof org.reactivestreams.Publisher) {
40+
publisher = (org.reactivestreams.Publisher<T>)flowPublisher;
41+
} else {
42+
publisher = new ReactivePublisherFromFlow<T>(flowPublisher);
4243
}
43-
return new ReactivePublisherFromFlow<T>(flowPublisher);
44+
return publisher;
4445
}
4546

4647
/**
@@ -53,16 +54,16 @@ public static <T> org.reactivestreams.Publisher<T> toPublisher(
5354
public static <T> Flow.Publisher<T> toFlowPublisher(
5455
org.reactivestreams.Publisher<? extends T> reactiveStreamsPublisher
5556
) {
56-
if (reactiveStreamsPublisher == null) {
57-
throw new NullPointerException("reactiveStreamsPublisher");
58-
}
57+
requireNonNull(reactiveStreamsPublisher, "reactiveStreamsPublisher");
58+
final Flow.Publisher<T> flowPublisher;
5959
if (reactiveStreamsPublisher instanceof ReactivePublisherFromFlow) {
60-
return (Flow.Publisher<T>)(((ReactivePublisherFromFlow<T>)reactiveStreamsPublisher).flow);
61-
}
62-
if (reactiveStreamsPublisher instanceof Flow.Publisher) {
63-
return (Flow.Publisher<T>)reactiveStreamsPublisher;
60+
flowPublisher = (Flow.Publisher<T>)(((ReactivePublisherFromFlow<T>)reactiveStreamsPublisher).flow);
61+
} else if (reactiveStreamsPublisher instanceof Flow.Publisher) {
62+
flowPublisher = (Flow.Publisher<T>)reactiveStreamsPublisher;
63+
} else {
64+
flowPublisher = new FlowPublisherFromReactive<T>(reactiveStreamsPublisher);
6465
}
65-
return new FlowPublisherFromReactive<T>(reactiveStreamsPublisher);
66+
return flowPublisher;
6667
}
6768

6869
/**
@@ -76,16 +77,16 @@ public static <T> Flow.Publisher<T> toFlowPublisher(
7677
public static <T, U> org.reactivestreams.Processor<T, U> toProcessor(
7778
Flow.Processor<? super T, ? extends U> flowProcessor
7879
) {
79-
if (flowProcessor == null) {
80-
throw new NullPointerException("flowProcessor");
81-
}
80+
requireNonNull(flowProcessor, "flowProcessor");
81+
final org.reactivestreams.Processor<T, U> processor;
8282
if (flowProcessor instanceof FlowToReactiveProcessor) {
83-
return (org.reactivestreams.Processor<T, U>)(((FlowToReactiveProcessor<T, U>)flowProcessor).reactiveStreams);
84-
}
85-
if (flowProcessor instanceof org.reactivestreams.Processor) {
86-
return (org.reactivestreams.Processor<T, U>)flowProcessor;
83+
processor = (org.reactivestreams.Processor<T, U>)(((FlowToReactiveProcessor<T, U>)flowProcessor).reactiveStreams);
84+
} else if (flowProcessor instanceof org.reactivestreams.Processor) {
85+
processor = (org.reactivestreams.Processor<T, U>)flowProcessor;
86+
} else {
87+
processor = new ReactiveToFlowProcessor<T, U>(flowProcessor);
8788
}
88-
return new ReactiveToFlowProcessor<T, U>(flowProcessor);
89+
return processor;
8990
}
9091

9192
/**
@@ -99,16 +100,16 @@ public static <T, U> org.reactivestreams.Processor<T, U> toProcessor(
99100
public static <T, U> Flow.Processor<T, U> toFlowProcessor(
100101
org.reactivestreams.Processor<? super T, ? extends U> reactiveStreamsProcessor
101102
) {
102-
if (reactiveStreamsProcessor == null) {
103-
throw new NullPointerException("reactiveStreamsProcessor");
104-
}
103+
requireNonNull(reactiveStreamsProcessor, "reactiveStreamsProcessor");
104+
final Flow.Processor<T, U> flowProcessor;
105105
if (reactiveStreamsProcessor instanceof ReactiveToFlowProcessor) {
106-
return (Flow.Processor<T, U>)(((ReactiveToFlowProcessor<T, U>)reactiveStreamsProcessor).flow);
106+
flowProcessor = (Flow.Processor<T, U>)(((ReactiveToFlowProcessor<T, U>)reactiveStreamsProcessor).flow);
107+
} else if (reactiveStreamsProcessor instanceof Flow.Processor) {
108+
flowProcessor = (Flow.Processor<T, U>)reactiveStreamsProcessor;
109+
} else {
110+
flowProcessor = new FlowToReactiveProcessor<T, U>(reactiveStreamsProcessor);
107111
}
108-
if (reactiveStreamsProcessor instanceof Flow.Processor) {
109-
return (Flow.Processor<T, U>)reactiveStreamsProcessor;
110-
}
111-
return new FlowToReactiveProcessor<T, U>(reactiveStreamsProcessor);
112+
return flowProcessor;
112113
}
113114

114115
/**
@@ -119,16 +120,16 @@ public static <T, U> Flow.Processor<T, U> toFlowProcessor(
119120
*/
120121
@SuppressWarnings("unchecked")
121122
public static <T> Flow.Subscriber<T> toFlowSubscriber(org.reactivestreams.Subscriber<T> reactiveStreamsSubscriber) {
122-
if (reactiveStreamsSubscriber == null) {
123-
throw new NullPointerException("reactiveStreamsSubscriber");
124-
}
123+
requireNonNull(reactiveStreamsSubscriber, "reactiveStreamsSubscriber");
124+
final Flow.Subscriber<T> flowSubscriber;
125125
if (reactiveStreamsSubscriber instanceof ReactiveToFlowSubscriber) {
126-
return (Flow.Subscriber<T>)((ReactiveToFlowSubscriber<T>)reactiveStreamsSubscriber).flow;
127-
}
128-
if (reactiveStreamsSubscriber instanceof Flow.Subscriber) {
129-
return (Flow.Subscriber<T>)reactiveStreamsSubscriber;
126+
flowSubscriber = (Flow.Subscriber<T>)((ReactiveToFlowSubscriber<T>)reactiveStreamsSubscriber).flow;
127+
} else if (reactiveStreamsSubscriber instanceof Flow.Subscriber) {
128+
flowSubscriber = (Flow.Subscriber<T>)reactiveStreamsSubscriber;
129+
} else {
130+
flowSubscriber = new FlowToReactiveSubscriber<T>(reactiveStreamsSubscriber);
130131
}
131-
return new FlowToReactiveSubscriber<T>(reactiveStreamsSubscriber);
132+
return flowSubscriber;
132133
}
133134

134135
/**
@@ -139,16 +140,16 @@ public static <T> Flow.Subscriber<T> toFlowSubscriber(org.reactivestreams.Subscr
139140
*/
140141
@SuppressWarnings("unchecked")
141142
public static <T> org.reactivestreams.Subscriber<T> toSubscriber(Flow.Subscriber<T> flowSubscriber) {
142-
if (flowSubscriber == null) {
143-
throw new NullPointerException("flowSubscriber");
144-
}
143+
requireNonNull(flowSubscriber, "flowSubscriber");
144+
final org.reactivestreams.Subscriber<T> subscriber;
145145
if (flowSubscriber instanceof FlowToReactiveSubscriber) {
146-
return (org.reactivestreams.Subscriber<T>)((FlowToReactiveSubscriber<T>)flowSubscriber).reactiveStreams;
147-
}
148-
if (flowSubscriber instanceof org.reactivestreams.Subscriber) {
149-
return (org.reactivestreams.Subscriber<T>)flowSubscriber;
146+
subscriber = (org.reactivestreams.Subscriber<T>)((FlowToReactiveSubscriber<T>)flowSubscriber).reactiveStreams;
147+
} else if (flowSubscriber instanceof org.reactivestreams.Subscriber) {
148+
subscriber = (org.reactivestreams.Subscriber<T>)flowSubscriber;
149+
} else {
150+
subscriber = new ReactiveToFlowSubscriber<T>(flowSubscriber);
150151
}
151-
return new ReactiveToFlowSubscriber<T>(flowSubscriber);
152+
return subscriber;
152153
}
153154

154155
/**
@@ -157,12 +158,12 @@ public static <T> org.reactivestreams.Subscriber<T> toSubscriber(Flow.Subscriber
157158
static final class FlowToReactiveSubscription implements Flow.Subscription {
158159
final org.reactivestreams.Subscription reactiveStreams;
159160

160-
public FlowToReactiveSubscription(org.reactivestreams.Subscription reactive) {
161+
public FlowToReactiveSubscription(final org.reactivestreams.Subscription reactive) {
161162
this.reactiveStreams = reactive;
162163
}
163164

164165
@Override
165-
public void request(long n) {
166+
public void request(final long n) {
166167
reactiveStreams.request(n);
167168
}
168169

@@ -179,12 +180,12 @@ public void cancel() {
179180
static final class ReactiveToFlowSubscription implements org.reactivestreams.Subscription {
180181
final Flow.Subscription flow;
181182

182-
public ReactiveToFlowSubscription(Flow.Subscription flow) {
183+
public ReactiveToFlowSubscription(final Flow.Subscription flow) {
183184
this.flow = flow;
184185
}
185186

186187
@Override
187-
public void request(long n) {
188+
public void request(final long n) {
188189
flow.request(n);
189190
}
190191

@@ -200,26 +201,25 @@ public void cancel() {
200201
* Wraps a Reactive Streams Subscriber and forwards methods of the Flow Subscriber to it.
201202
* @param <T> the element type
202203
*/
203-
static final class FlowToReactiveSubscriber<T>
204-
implements Flow.Subscriber<T> {
204+
static final class FlowToReactiveSubscriber<T> implements Flow.Subscriber<T> {
205205
final org.reactivestreams.Subscriber<? super T> reactiveStreams;
206206

207-
public FlowToReactiveSubscriber(org.reactivestreams.Subscriber<? super T> reactive) {
207+
public FlowToReactiveSubscriber(final org.reactivestreams.Subscriber<? super T> reactive) {
208208
this.reactiveStreams = reactive;
209209
}
210210

211211
@Override
212-
public void onSubscribe(Flow.Subscription subscription) {
212+
public void onSubscribe(final Flow.Subscription subscription) {
213213
reactiveStreams.onSubscribe((subscription == null) ? null : new ReactiveToFlowSubscription(subscription));
214214
}
215215

216216
@Override
217-
public void onNext(T item) {
217+
public void onNext(final T item) {
218218
reactiveStreams.onNext(item);
219219
}
220220

221221
@Override
222-
public void onError(Throwable throwable) {
222+
public void onError(final Throwable throwable) {
223223
reactiveStreams.onError(throwable);
224224
}
225225

@@ -234,26 +234,25 @@ public void onComplete() {
234234
* Wraps a Reactive Streams Subscriber and forwards methods of the Flow Subscriber to it.
235235
* @param <T> the element type
236236
*/
237-
static final class ReactiveToFlowSubscriber<T>
238-
implements org.reactivestreams.Subscriber<T> {
237+
static final class ReactiveToFlowSubscriber<T> implements org.reactivestreams.Subscriber<T> {
239238
final Flow.Subscriber<? super T> flow;
240239

241-
public ReactiveToFlowSubscriber(Flow.Subscriber<? super T> flow) {
240+
public ReactiveToFlowSubscriber(final Flow.Subscriber<? super T> flow) {
242241
this.flow = flow;
243242
}
244243

245244
@Override
246-
public void onSubscribe(org.reactivestreams.Subscription subscription) {
245+
public void onSubscribe(final org.reactivestreams.Subscription subscription) {
247246
flow.onSubscribe((subscription == null) ? null : new FlowToReactiveSubscription(subscription));
248247
}
249248

250249
@Override
251-
public void onNext(T item) {
250+
public void onNext(final T item) {
252251
flow.onNext(item);
253252
}
254253

255254
@Override
256-
public void onError(Throwable throwable) {
255+
public void onError(final Throwable throwable) {
257256
flow.onError(throwable);
258257
}
259258

@@ -269,26 +268,25 @@ public void onComplete() {
269268
* @param <T> the input type
270269
* @param <U> the output type
271270
*/
272-
static final class ReactiveToFlowProcessor<T, U>
273-
implements org.reactivestreams.Processor<T, U> {
271+
static final class ReactiveToFlowProcessor<T, U> implements org.reactivestreams.Processor<T, U> {
274272
final Flow.Processor<? super T, ? extends U> flow;
275273

276-
public ReactiveToFlowProcessor(Flow.Processor<? super T, ? extends U> flow) {
274+
public ReactiveToFlowProcessor(final Flow.Processor<? super T, ? extends U> flow) {
277275
this.flow = flow;
278276
}
279277

280278
@Override
281-
public void onSubscribe(org.reactivestreams.Subscription subscription) {
279+
public void onSubscribe(final org.reactivestreams.Subscription subscription) {
282280
flow.onSubscribe((subscription == null) ? null : new FlowToReactiveSubscription(subscription));
283281
}
284282

285283
@Override
286-
public void onNext(T t) {
284+
public void onNext(final T t) {
287285
flow.onNext(t);
288286
}
289287

290288
@Override
291-
public void onError(Throwable t) {
289+
public void onError(final Throwable t) {
292290
flow.onError(t);
293291
}
294292

@@ -298,7 +296,7 @@ public void onComplete() {
298296
}
299297

300298
@Override
301-
public void subscribe(org.reactivestreams.Subscriber<? super U> s) {
299+
public void subscribe(final org.reactivestreams.Subscriber<? super U> s) {
302300
flow.subscribe((s == null) ? null : new FlowToReactiveSubscriber<U>(s));
303301
}
304302
}
@@ -308,26 +306,25 @@ public void subscribe(org.reactivestreams.Subscriber<? super U> s) {
308306
* @param <T> the input type
309307
* @param <U> the output type
310308
*/
311-
static final class FlowToReactiveProcessor<T, U>
312-
implements Flow.Processor<T, U> {
309+
static final class FlowToReactiveProcessor<T, U> implements Flow.Processor<T, U> {
313310
final org.reactivestreams.Processor<? super T, ? extends U> reactiveStreams;
314311

315-
public FlowToReactiveProcessor(org.reactivestreams.Processor<? super T, ? extends U> reactive) {
312+
public FlowToReactiveProcessor(final org.reactivestreams.Processor<? super T, ? extends U> reactive) {
316313
this.reactiveStreams = reactive;
317314
}
318315

319316
@Override
320-
public void onSubscribe(Flow.Subscription subscription) {
317+
public void onSubscribe(final Flow.Subscription subscription) {
321318
reactiveStreams.onSubscribe((subscription == null) ? null : new ReactiveToFlowSubscription(subscription));
322319
}
323320

324321
@Override
325-
public void onNext(T t) {
322+
public void onNext(final T t) {
326323
reactiveStreams.onNext(t);
327324
}
328325

329326
@Override
330-
public void onError(Throwable t) {
327+
public void onError(final Throwable t) {
331328
reactiveStreams.onError(t);
332329
}
333330

@@ -337,7 +334,7 @@ public void onComplete() {
337334
}
338335

339336
@Override
340-
public void subscribe(Flow.Subscriber<? super U> s) {
337+
public void subscribe(final Flow.Subscriber<? super U> s) {
341338
reactiveStreams.subscribe((s == null) ? null : new ReactiveToFlowSubscriber<U>(s));
342339
}
343340
}
@@ -347,15 +344,14 @@ public void subscribe(Flow.Subscriber<? super U> s) {
347344
* @param <T> the element type
348345
*/
349346
static final class ReactivePublisherFromFlow<T> implements org.reactivestreams.Publisher<T> {
350-
351347
final Flow.Publisher<? extends T> flow;
352348

353-
public ReactivePublisherFromFlow(Flow.Publisher<? extends T> flowPublisher) {
349+
public ReactivePublisherFromFlow(final Flow.Publisher<? extends T> flowPublisher) {
354350
this.flow = flowPublisher;
355351
}
356352

357353
@Override
358-
public void subscribe(org.reactivestreams.Subscriber<? super T> reactive) {
354+
public void subscribe(final org.reactivestreams.Subscriber<? super T> reactive) {
359355
flow.subscribe((reactive == null) ? null : new FlowToReactiveSubscriber<T>(reactive));
360356
}
361357
}
@@ -368,12 +364,12 @@ static final class FlowPublisherFromReactive<T> implements Flow.Publisher<T> {
368364

369365
final org.reactivestreams.Publisher<? extends T> reactiveStreams;
370366

371-
public FlowPublisherFromReactive(org.reactivestreams.Publisher<? extends T> reactivePublisher) {
367+
public FlowPublisherFromReactive(final org.reactivestreams.Publisher<? extends T> reactivePublisher) {
372368
this.reactiveStreams = reactivePublisher;
373369
}
374370

375371
@Override
376-
public void subscribe(Flow.Subscriber<? super T> flow) {
372+
public void subscribe(final Flow.Subscriber<? super T> flow) {
377373
reactiveStreams.subscribe((flow == null) ? null : new ReactiveToFlowSubscriber<T>(flow));
378374
}
379375
}

0 commit comments

Comments
 (0)