Skip to content

Commit db96849

Browse files
viktorklangktoso
authored andcommitted
Fixing wrapping and unwrapping of the wrappers themselves.
1 parent 327ff4d commit db96849

File tree

2 files changed

+63
-67
lines changed

2 files changed

+63
-67
lines changed

flow-bridge/src/main/java/org/reactivestreams/ReactiveStreamsFlowBridge.java

+40-44
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,11 @@ public static <T> org.reactivestreams.Publisher<T> toReactiveStreams(
3434
if (flowPublisher == null) {
3535
throw new NullPointerException("flowPublisher");
3636
}
37-
if (flowPublisher instanceof org.reactivestreams.Publisher) {
38-
return (org.reactivestreams.Publisher<T>) flowPublisher;
39-
}
4037
if (flowPublisher instanceof FlowPublisherFromReactive) {
4138
return (org.reactivestreams.Publisher<T>)(((FlowPublisherFromReactive<T>)flowPublisher).reactiveStreams);
39+
if (flowPublisher instanceof org.reactivestreams.Publisher) {
40+
}
41+
return (org.reactivestreams.Publisher<T>)flowPublisher;
4242
}
4343
return new ReactivePublisherFromFlow<T>(flowPublisher);
4444
}
@@ -56,12 +56,12 @@ public static <T> Flow.Publisher<T> toFlow(
5656
if (reactiveStreamsPublisher == null) {
5757
throw new NullPointerException("reactiveStreamsPublisher");
5858
}
59-
if (reactiveStreamsPublisher instanceof Flow.Publisher) {
60-
return (Flow.Publisher<T>) reactiveStreamsPublisher;
61-
}
6259
if (reactiveStreamsPublisher instanceof ReactivePublisherFromFlow) {
6360
return (Flow.Publisher<T>)(((ReactivePublisherFromFlow<T>)reactiveStreamsPublisher).flow);
6461
}
62+
if (reactiveStreamsPublisher instanceof Flow.Publisher) {
63+
return (Flow.Publisher<T>)reactiveStreamsPublisher;
64+
}
6565
return new FlowPublisherFromReactive<T>(reactiveStreamsPublisher);
6666
}
6767

@@ -79,12 +79,12 @@ public static <T, U> org.reactivestreams.Processor<T, U> toReactiveStreams(
7979
if (flowProcessor == null) {
8080
throw new NullPointerException("flowProcessor");
8181
}
82-
if (flowProcessor instanceof org.reactivestreams.Processor) {
83-
return (org.reactivestreams.Processor<T, U>) flowProcessor;
84-
}
8582
if (flowProcessor instanceof FlowToReactiveProcessor) {
8683
return (org.reactivestreams.Processor<T, U>)(((FlowToReactiveProcessor<T, U>)flowProcessor).reactiveStreams);
8784
}
85+
if (flowProcessor instanceof org.reactivestreams.Processor) {
86+
return (org.reactivestreams.Processor<T, U>)flowProcessor;
87+
}
8888
return new ReactiveToFlowProcessor<T, U>(flowProcessor);
8989
}
9090

@@ -102,12 +102,12 @@ public static <T, U> Flow.Processor<T, U> toFlow(
102102
if (reactiveStreamsProcessor == null) {
103103
throw new NullPointerException("reactiveStreamsProcessor");
104104
}
105-
if (reactiveStreamsProcessor instanceof Flow.Processor) {
106-
return (Flow.Processor<T, U>) reactiveStreamsProcessor;
107-
}
108105
if (reactiveStreamsProcessor instanceof ReactiveToFlowProcessor) {
109106
return (Flow.Processor<T, U>)(((ReactiveToFlowProcessor<T, U>)reactiveStreamsProcessor).flow);
110107
}
108+
if (reactiveStreamsProcessor instanceof Flow.Processor) {
109+
return (Flow.Processor<T, U>)reactiveStreamsProcessor;
110+
}
111111
return new FlowToReactiveProcessor<T, U>(reactiveStreamsProcessor);
112112
}
113113

@@ -117,10 +117,17 @@ public static <T, U> Flow.Processor<T, U> toFlow(
117117
* @param reactiveStreamsSubscriber the Reactive Streams Subscriber instance to convert
118118
* @return the equivalent Flow Subscriber
119119
*/
120+
@SuppressWarnings("unchecked")
120121
public static <T> Flow.Subscriber<T> toFlow(org.reactivestreams.Subscriber<T> reactiveStreamsSubscriber) {
121122
if (reactiveStreamsSubscriber == null) {
122123
throw new NullPointerException("reactiveStreamsSubscriber");
123124
}
125+
if (reactiveStreamsSubscriber instanceof ReactiveToFlowSubscriber) {
126+
return (Flow.Subscriber<T>)((ReactiveToFlowSubscriber<T>)reactiveStreamsSubscriber).flow;
127+
}
128+
if (reactiveStreamsSubscriber instanceof Flow.Subscriber) {
129+
return (Flow.Subscriber<T>)reactiveStreamsSubscriber;
130+
}
124131
return new FlowToReactiveSubscriber<T>(reactiveStreamsSubscriber);
125132
}
126133

@@ -130,18 +137,25 @@ public static <T> Flow.Subscriber<T> toFlow(org.reactivestreams.Subscriber<T> re
130137
* @param flowSubscriber the Flow Subscriber instance to convert
131138
* @return the equivalent Reactive Streams Subscriber
132139
*/
140+
@SuppressWarnings("unchecked")
133141
public static <T> org.reactivestreams.Subscriber<T> toReactiveStreams(Flow.Subscriber<T> flowSubscriber) {
134142
if (flowSubscriber == null) {
135143
throw new NullPointerException("flowSubscriber");
136144
}
145+
if (flowSubscriber instanceof FlowToReactiveSubscriber) {
146+
return (org.reactivestreams.Subscriber<T>)((FlowToReactiveSubscriber<T>)flowSubscriber).reactiveStreams;
147+
}
148+
if (flowSubscriber instanceof org.reactivestreams.Subscriber) {
149+
return (org.reactivestreams.Subscriber<T>)flowSubscriber;
150+
}
137151
return new ReactiveToFlowSubscriber<T>(flowSubscriber);
138152
}
139153

140154
/**
141155
* Wraps a Reactive Streams Subscription and converts the calls to a Flow Subscription.
142156
*/
143157
static final class FlowToReactiveSubscription implements Flow.Subscription {
144-
private final org.reactivestreams.Subscription reactiveStreams;
158+
final org.reactivestreams.Subscription reactiveStreams;
145159

146160
public FlowToReactiveSubscription(org.reactivestreams.Subscription reactive) {
147161
this.reactiveStreams = reactive;
@@ -163,7 +177,7 @@ public void cancel() {
163177
* Wraps a Flow Subscription and converts the calls to a Reactive Streams Subscription.
164178
*/
165179
static final class ReactiveToFlowSubscription implements org.reactivestreams.Subscription {
166-
private final Flow.Subscription flow;
180+
final Flow.Subscription flow;
167181

168182
public ReactiveToFlowSubscription(Flow.Subscription flow) {
169183
this.flow = flow;
@@ -188,16 +202,15 @@ public void cancel() {
188202
*/
189203
static final class FlowToReactiveSubscriber<T>
190204
implements Flow.Subscriber<T> {
191-
private final org.reactivestreams.Subscriber<? super T> reactiveStreams;
205+
final org.reactivestreams.Subscriber<? super T> reactiveStreams;
192206

193207
public FlowToReactiveSubscriber(org.reactivestreams.Subscriber<? super T> reactive) {
194-
if (reactive == null) throw null;
195208
this.reactiveStreams = reactive;
196209
}
197210

198211
@Override
199212
public void onSubscribe(Flow.Subscription subscription) {
200-
reactiveStreams.onSubscribe(new ReactiveToFlowSubscription(subscription));
213+
reactiveStreams.onSubscribe((subscription == null) ? null : new ReactiveToFlowSubscription(subscription));
201214
}
202215

203216
@Override
@@ -223,15 +236,15 @@ public void onComplete() {
223236
*/
224237
static final class ReactiveToFlowSubscriber<T>
225238
implements org.reactivestreams.Subscriber<T> {
226-
private final Flow.Subscriber<? super T> flow;
239+
final Flow.Subscriber<? super T> flow;
227240

228241
public ReactiveToFlowSubscriber(Flow.Subscriber<? super T> flow) {
229242
this.flow = flow;
230243
}
231244

232245
@Override
233246
public void onSubscribe(org.reactivestreams.Subscription subscription) {
234-
flow.onSubscribe(new FlowToReactiveSubscription(subscription));
247+
flow.onSubscribe((subscription == null) ? null : new FlowToReactiveSubscription(subscription));
235248
}
236249

237250
@Override
@@ -261,13 +274,12 @@ static final class ReactiveToFlowProcessor<T, U>
261274
final Flow.Processor<? super T, ? extends U> flow;
262275

263276
public ReactiveToFlowProcessor(Flow.Processor<? super T, ? extends U> flow) {
264-
if (flow == null) throw null;
265277
this.flow = flow;
266278
}
267279

268280
@Override
269-
public void onSubscribe(org.reactivestreams.Subscription s) {
270-
flow.onSubscribe(new FlowToReactiveSubscription(s));
281+
public void onSubscribe(org.reactivestreams.Subscription subscription) {
282+
flow.onSubscribe((subscription == null) ? null : new FlowToReactiveSubscription(subscription));
271283
}
272284

273285
@Override
@@ -287,11 +299,7 @@ public void onComplete() {
287299

288300
@Override
289301
public void subscribe(org.reactivestreams.Subscriber<? super U> s) {
290-
if (s == null) {
291-
flow.subscribe(null);
292-
return;
293-
}
294-
flow.subscribe(new FlowToReactiveSubscriber<U>(s));
302+
flow.subscribe((s == null) ? null : new FlowToReactiveSubscriber<U>(s));
295303
}
296304
}
297305

@@ -309,8 +317,8 @@ public FlowToReactiveProcessor(org.reactivestreams.Processor<? super T, ? extend
309317
}
310318

311319
@Override
312-
public void onSubscribe(Flow.Subscription s) {
313-
reactiveStreams.onSubscribe(new ReactiveToFlowSubscription(s));
320+
public void onSubscribe(Flow.Subscription subscription) {
321+
reactiveStreams.onSubscribe((subscription == null) ? null : new ReactiveToFlowSubscription(subscription));
314322
}
315323

316324
@Override
@@ -330,11 +338,7 @@ public void onComplete() {
330338

331339
@Override
332340
public void subscribe(Flow.Subscriber<? super U> s) {
333-
if (s == null) {
334-
reactiveStreams.subscribe(null);
335-
return;
336-
}
337-
reactiveStreams.subscribe(new ReactiveToFlowSubscriber<U>(s));
341+
reactiveStreams.subscribe((s == null) ? null : new ReactiveToFlowSubscriber<U>(s));
338342
}
339343
}
340344

@@ -352,11 +356,7 @@ public ReactivePublisherFromFlow(Flow.Publisher<? extends T> flowPublisher) {
352356

353357
@Override
354358
public void subscribe(org.reactivestreams.Subscriber<? super T> reactive) {
355-
if (reactive == null) {
356-
flow.subscribe(null);
357-
return;
358-
}
359-
flow.subscribe(new FlowToReactiveSubscriber<T>(reactive));
359+
flow.subscribe((reactive == null) ? null : new FlowToReactiveSubscriber<T>(reactive));
360360
}
361361
}
362362

@@ -374,11 +374,7 @@ public FlowPublisherFromReactive(org.reactivestreams.Publisher<? extends T> reac
374374

375375
@Override
376376
public void subscribe(Flow.Subscriber<? super T> flow) {
377-
if (flow == null) {
378-
reactiveStreams.subscribe(null);
379-
return;
380-
}
381-
reactiveStreams.subscribe(new ReactiveToFlowSubscriber<T>(flow));
377+
reactiveStreams.subscribe((flow == null) ? null : new ReactiveToFlowSubscriber<T>(flow));
382378
}
383379
}
384380

tck/src/test/java/org/reactivestreams/tck/BrokenExampleBlackboxTest.java

+23-23
Original file line numberDiff line numberDiff line change
@@ -48,33 +48,33 @@ public Integer createElement(int element) {
4848
public Subscriber<Integer> createSubscriber() {
4949
return new MySubscriber<Integer>();
5050
}
51-
52-
public class MySubscriber<T> implements Subscriber<T> {
53-
private Subscription subscription;
54-
55-
@Override
51+
52+
public class MySubscriber<T> implements Subscriber<T> {
53+
private Subscription subscription;
54+
55+
@Override
5656
public void onSubscribe(Subscription subscription) {
5757
if (this.subscription == null) {
5858
this.subscription = subscription;
59-
subscription.request(1); //a value of Long.MAX_VALUE may be considered as effectively unbounded
59+
subscription.request(1); //a value of Long.MAX_VALUE may be considered as effectively unbounded
6060
} else subscription.cancel();
61-
}
62-
63-
@Override
61+
}
62+
63+
@Override
6464
public void onNext(T item) {
6565
if (item == null) throw null;
66-
System.out.println("Got : " + item);
67-
subscription.request(1); //a value of Long.MAX_VALUE may be considered as effectively unbounded
68-
}
69-
70-
@Override
71-
public void onError(Throwable t) {
72-
t.printStackTrace();
73-
}
74-
75-
@Override
76-
public void onComplete() {
77-
System.out.println("Done");
78-
}
79-
}
66+
System.out.println("Got : " + item);
67+
subscription.request(1); //a value of Long.MAX_VALUE may be considered as effectively unbounded
68+
}
69+
70+
@Override
71+
public void onError(Throwable t) {
72+
t.printStackTrace();
73+
}
74+
75+
@Override
76+
public void onComplete() {
77+
System.out.println("Done");
78+
}
79+
}
8080
}

0 commit comments

Comments
 (0)