Skip to content

Commit ef145a7

Browse files
committed
more WIP
1 parent 6507209 commit ef145a7

File tree

10 files changed

+184
-303
lines changed

10 files changed

+184
-303
lines changed

flow-bridge/src/main/java/org/reactivestreams/ReactiveStreamsFlowBridge.java

+21-21
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ public static <T> Flow.Publisher<T> toFlow(
6464
}
6565
return new FlowPublisherFromReactive<T>(reactiveStreamsPublisher);
6666
}
67-
67+
6868
/**
6969
* Converts a Flow Processor into a Reactive Streams Processor.
7070
* @param <T> the input value type
@@ -117,7 +117,7 @@ public static <T, U> Flow.Processor<T, U> toFlow(
117117
* @param reactiveStreamsSubscriber the Reactive Streams Subscriber instance to convert
118118
* @return the equivalent Flow Subscriber
119119
*/
120-
public static <T> Flow.Subscriber<T> toFlowSubscriber(org.reactivestreams.Subscriber<T> reactiveStreamsSubscriber) {
120+
public static <T> Flow.Subscriber<T> toFlow(org.reactivestreams.Subscriber<T> reactiveStreamsSubscriber) {
121121
if (reactiveStreamsSubscriber == null) {
122122
throw new NullPointerException("reactiveStreamsSubscriber");
123123
}
@@ -130,7 +130,7 @@ public static <T> Flow.Subscriber<T> toFlowSubscriber(org.reactivestreams.Subscr
130130
* @param flowSubscriber the Flow Subscriber instance to convert
131131
* @return the equivalent Reactive Streams Subscriber
132132
*/
133-
public static <T> org.reactivestreams.Subscriber<T> toReactiveStreamsSubscriber(Flow.Subscriber<T> flowSubscriber) {
133+
public static <T> org.reactivestreams.Subscriber<T> toReactiveStreams(Flow.Subscriber<T> flowSubscriber) {
134134
if (flowSubscriber == null) {
135135
throw new NullPointerException("flowSubscriber");
136136
}
@@ -142,7 +142,7 @@ public static <T> org.reactivestreams.Subscriber<T> toReactiveStreamsSubscriber(
142142
*/
143143
static final class FlowToReactiveSubscription implements Flow.Subscription {
144144
private final org.reactivestreams.Subscription reactiveStreams;
145-
145+
146146
public FlowToReactiveSubscription(org.reactivestreams.Subscription reactive) {
147147
this.reactiveStreams = reactive;
148148
}
@@ -156,15 +156,15 @@ public void request(long n) {
156156
public void cancel() {
157157
reactiveStreams.cancel();
158158
}
159-
159+
160160
}
161-
161+
162162
/**
163163
* Wraps a Flow Subscription and converts the calls to a Reactive Streams Subscription.
164164
*/
165165
static final class ReactiveToFlowSubscription implements org.reactivestreams.Subscription {
166166
private final Flow.Subscription flow;
167-
167+
168168
public ReactiveToFlowSubscription(Flow.Subscription flow) {
169169
this.flow = flow;
170170
}
@@ -178,18 +178,18 @@ public void request(long n) {
178178
public void cancel() {
179179
flow.cancel();
180180
}
181-
182-
181+
182+
183183
}
184-
184+
185185
/**
186186
* Wraps a Reactive Streams Subscriber and forwards methods of the Flow Subscriber to it.
187187
* @param <T> the element type
188188
*/
189-
static final class FlowToReactiveSubscriber<T>
189+
static final class FlowToReactiveSubscriber<T>
190190
implements Flow.Subscriber<T> {
191191
private final org.reactivestreams.Subscriber<? super T> reactiveStreams;
192-
192+
193193
public FlowToReactiveSubscriber(org.reactivestreams.Subscriber<? super T> reactive) {
194194
this.reactiveStreams = reactive;
195195
}
@@ -213,17 +213,17 @@ public void onError(Throwable throwable) {
213213
public void onComplete() {
214214
reactiveStreams.onComplete();
215215
}
216-
216+
217217
}
218218

219219
/**
220220
* Wraps a Reactive Streams Subscriber and forwards methods of the Flow Subscriber to it.
221221
* @param <T> the element type
222222
*/
223-
static final class ReactiveToFlowSubscriber<T>
223+
static final class ReactiveToFlowSubscriber<T>
224224
implements org.reactivestreams.Subscriber<T> {
225225
private final Flow.Subscriber<? super T> flow;
226-
226+
227227
public ReactiveToFlowSubscriber(Flow.Subscriber<? super T> flow) {
228228
this.flow = flow;
229229
}
@@ -247,9 +247,9 @@ public void onError(Throwable throwable) {
247247
public void onComplete() {
248248
flow.onComplete();
249249
}
250-
250+
251251
}
252-
252+
253253
/**
254254
* Wraps a Flow Processor and forwards methods of the Reactive Streams Processor to it.
255255
* @param <T> the input type
@@ -258,7 +258,7 @@ public void onComplete() {
258258
static final class ReactiveToFlowProcessor<T, U>
259259
implements org.reactivestreams.Processor<T, U> {
260260
final Flow.Processor<? super T, ? extends U> flow;
261-
261+
262262
public ReactiveToFlowProcessor(Flow.Processor<? super T, ? extends U> flow) {
263263
this.flow = flow;
264264
}
@@ -292,7 +292,7 @@ public void subscribe(org.reactivestreams.Subscriber<? super U> s) {
292292
flow.subscribe(new FlowToReactiveSubscriber<U>(s));
293293
}
294294
}
295-
295+
296296
/**
297297
* Wraps a Reactive Streams Processor and forwards methods of the Flow Processor to it.
298298
* @param <T> the input type
@@ -301,7 +301,7 @@ public void subscribe(org.reactivestreams.Subscriber<? super U> s) {
301301
static final class FlowToReactiveProcessor<T, U>
302302
implements Flow.Processor<T, U> {
303303
final org.reactivestreams.Processor<? super T, ? extends U> reactiveStreams;
304-
304+
305305
public FlowToReactiveProcessor(org.reactivestreams.Processor<? super T, ? extends U> reactive) {
306306
this.reactiveStreams = reactive;
307307
}
@@ -380,4 +380,4 @@ public void subscribe(Flow.Subscriber<? super T> flow) {
380380
}
381381
}
382382

383-
}
383+
}

flow-bridge/src/test/java/org/reactivestreams/ReactiveStreamsFlowBridgeTest.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ public void execute(Runnable command) {
116116
public void reactiveStreamsToFlowSubscriber() {
117117
TestEitherConsumer<Integer> tc = new TestEitherConsumer<Integer>();
118118

119-
Flow.Subscriber<Integer> fs = ReactiveStreamsFlowBridge.toFlowSubscriber(tc);
119+
Flow.Subscriber<Integer> fs = ReactiveStreamsFlowBridge.toFlow(tc);
120120

121121
final Object[] state = { null, null };
122122

@@ -148,7 +148,7 @@ public void cancel() {
148148
public void flowToReactiveStreamsSubscriber() {
149149
TestEitherConsumer<Integer> tc = new TestEitherConsumer<Integer>();
150150

151-
org.reactivestreams.Subscriber<Integer> fs = ReactiveStreamsFlowBridge.toReactiveStreamsSubscriber(tc);
151+
org.reactivestreams.Subscriber<Integer> fs = ReactiveStreamsFlowBridge.toReactiveStreams(tc);
152152

153153
final Object[] state = { null, null };
154154

tck-flow/src/main/java/org/reactivestreams/tck/flow/FlowPublisherVerification.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ final public Publisher<T> createFailedPublisher() {
5454
/**
5555
* By implementing this method, additional TCK tests concerning a "failed" publishers will be run.
5656
*
57-
* The expected behaviour of the {@link Publisher} returned by this method is hand out a subscription,
57+
* The expected behaviour of the {@link Flow.Publisher} returned by this method is hand out a subscription,
5858
* followed by signalling {@code onError} on it, as specified by Rule 1.9.
5959
*
6060
* If you ignore these additional tests, return {@code null} from this method.

tck-flow/src/main/java/org/reactivestreams/tck/flow/FlowSubscriberBlackboxVerification.java

+10-10
Original file line numberDiff line numberDiff line change
@@ -37,28 +37,28 @@ protected FlowSubscriberBlackboxVerification(TestEnvironment env) {
3737
super(env);
3838
}
3939

40+
@Override
41+
public final void triggerRequest(Subscriber<? super T> subscriber) {
42+
triggerFlowRequest(ReactiveStreamsFlowBridge.toFlow(subscriber));
43+
}
4044
/**
4145
* Override this method if the {@link java.util.concurrent.Flow.Subscriber} implementation you are verifying
4246
* needs an external signal before it signals demand to its Publisher.
4347
*
4448
* By default this method does nothing.
4549
*/
46-
abstract public void triggerFlowRequest(Flow.Subscriber<? super T> subscriber);
50+
public void triggerFlowRequest(Flow.Subscriber<? super T> subscriber) {
51+
// this method is intentionally left blank
52+
}
4753

4854
@Override
49-
public final void triggerRequest(Subscriber<? super T> subscriber) {
50-
triggerFlowRequest(ReactiveStreamsFlowBridge.toFlow(subscriber));
55+
public final Subscriber<T> createSubscriber() {
56+
return ReactiveStreamsFlowBridge.<T>toReactiveStreams(createFlowSubscriber());
5157
}
52-
5358
/**
5459
* This is the main method you must implement in your test incarnation.
55-
* It must create a new {@link java.util.concurrent.Flow.Subscriber} instance to be subjected to the testing logic.
60+
* It must create a new {@link Flow.Subscriber} instance to be subjected to the testing logic.
5661
*/
5762
abstract public Flow.Subscriber<T> createFlowSubscriber();
5863

59-
@Override
60-
public final Subscriber<T> createSubscriber() {
61-
return ReactiveStreamsFlowBridge.<T>toReactiveStreams(createFlowSubscriber());
62-
}
63-
6464
}

tck-flow/src/main/java/org/reactivestreams/tck/flow/FlowSubscriberWhiteboxVerification.java

+16-6
Original file line numberDiff line numberDiff line change
@@ -11,16 +11,20 @@
1111

1212
package org.reactivestreams.tck.flow;
1313

14+
import org.reactivestreams.ReactiveStreamsFlowBridge;
1415
import org.reactivestreams.Subscriber;
1516
import org.reactivestreams.tck.SubscriberWhiteboxVerification;
1617
import org.reactivestreams.tck.TestEnvironment;
1718
import org.reactivestreams.tck.flow.support.SubscriberWhiteboxVerificationRules;
1819

20+
import java.util.concurrent.Flow;
21+
1922
/**
20-
* Provides tests for verifying {@link org.reactivestreams.Subscriber} and {@link org.reactivestreams.Subscription} specification rules.
23+
* Provides whitebox style tests for verifying {@link java.util.concurrent.Flow.Subscriber}
24+
* and {@link java.util.concurrent.Flow.Subscription} specification rules.
2125
*
22-
* @see org.reactivestreams.Subscriber
23-
* @see org.reactivestreams.Subscription
26+
* @see java.util.concurrent.Flow.Subscriber
27+
* @see java.util.concurrent.Flow.Subscription
2428
*/
2529
public abstract class FlowSubscriberWhiteboxVerification<T> extends SubscriberWhiteboxVerification<T>
2630
implements SubscriberWhiteboxVerificationRules {
@@ -31,8 +35,14 @@ protected FlowSubscriberWhiteboxVerification(TestEnvironment env) {
3135

3236
@Override
3337
final public Subscriber<T> createSubscriber(WhiteboxSubscriberProbe<T> probe) {
34-
return createFlowSubscriber(probe);
38+
return ReactiveStreamsFlowBridge.toReactiveStreams(createFlowSubscriber(probe));
3539
}
36-
37-
protected abstract Subscriber<T> createFlowSubscriber(WhiteboxSubscriberProbe<T> probe);
40+
/**
41+
* This is the main method you must implement in your test incarnation.
42+
* It must create a new {@link org.reactivestreams.Subscriber} instance to be subjected to the testing logic.
43+
*
44+
* In order to be meaningfully testable your Subscriber must inform the given
45+
* `WhiteboxSubscriberProbe` of the respective events having been received.
46+
*/
47+
protected abstract Flow.Subscriber<T> createFlowSubscriber(WhiteboxSubscriberProbe<T> probe);
3848
}

0 commit comments

Comments
 (0)