Skip to content

[WIP] TCK for j.u.c.Flow types "directly" #398

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Oct 31, 2017
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,4 @@ test-results
test-tmp
*.class
gradle.properties
*.orig
1 change: 1 addition & 0 deletions .java-version
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
9
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(can remove later, makes jenv use jdk9)

6 changes: 5 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,11 @@ subprojects {
}
}

if (name in ["reactive-streams", "reactive-streams-tck", "reactive-streams-examples", "reactive-streams-flow-bridge"]) {
if (name in ["reactive-streams",
"reactive-streams-tck",
"reactive-streams-tck-flow",
"reactive-streams-examples",
"reactive-streams-flow-bridge"]) {
apply plugin: "maven"
apply plugin: "signing"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,17 @@ private ReactiveStreamsFlowBridge() {
* @return the equivalent Reactive Streams Publisher
*/
@SuppressWarnings("unchecked")
public static <T> org.reactivestreams.Publisher<T> toReactiveStreams(
public static <T> org.reactivestreams.Publisher<T> toPublisher(
Flow.Publisher<? extends T> flowPublisher) {
if (flowPublisher == null) {
throw new NullPointerException("flowPublisher");
}
if (flowPublisher instanceof org.reactivestreams.Publisher) {
return (org.reactivestreams.Publisher<T>)flowPublisher;
}
if (flowPublisher instanceof FlowPublisherFromReactive) {
return (org.reactivestreams.Publisher<T>)(((FlowPublisherFromReactive<T>)flowPublisher).reactiveStreams);
}
if (flowPublisher instanceof org.reactivestreams.Publisher) {
return (org.reactivestreams.Publisher<T>)flowPublisher;
}
return new ReactivePublisherFromFlow<T>(flowPublisher);
}

Expand All @@ -50,21 +50,21 @@ public static <T> org.reactivestreams.Publisher<T> toReactiveStreams(
* @return the equivalent Flow Publisher
*/
@SuppressWarnings("unchecked")
public static <T> Flow.Publisher<T> toFlow(
public static <T> Flow.Publisher<T> toFlowPublisher(
org.reactivestreams.Publisher<? extends T> reactiveStreamsPublisher
) {
if (reactiveStreamsPublisher == null) {
throw new NullPointerException("reactiveStreamsPublisher");
}
if (reactiveStreamsPublisher instanceof Flow.Publisher) {
return (Flow.Publisher<T>)reactiveStreamsPublisher;
}
if (reactiveStreamsPublisher instanceof ReactivePublisherFromFlow) {
return (Flow.Publisher<T>)(((ReactivePublisherFromFlow<T>)reactiveStreamsPublisher).flow);
}
if (reactiveStreamsPublisher instanceof Flow.Publisher) {
return (Flow.Publisher<T>)reactiveStreamsPublisher;
}
return new FlowPublisherFromReactive<T>(reactiveStreamsPublisher);
}

/**
* Converts a Flow Processor into a Reactive Streams Processor.
* @param <T> the input value type
Expand All @@ -73,18 +73,18 @@ public static <T> Flow.Publisher<T> toFlow(
* @return the equivalent Reactive Streams Processor
*/
@SuppressWarnings("unchecked")
public static <T, U> org.reactivestreams.Processor<T, U> toReactiveStreams(
public static <T, U> org.reactivestreams.Processor<T, U> toProcessor(
Flow.Processor<? super T, ? extends U> flowProcessor
) {
if (flowProcessor == null) {
throw new NullPointerException("flowProcessor");
}
if (flowProcessor instanceof org.reactivestreams.Processor) {
return (org.reactivestreams.Processor<T, U>)flowProcessor;
}
if (flowProcessor instanceof FlowToReactiveProcessor) {
return (org.reactivestreams.Processor<T, U>)(((FlowToReactiveProcessor<T, U>)flowProcessor).reactiveStreams);
}
if (flowProcessor instanceof org.reactivestreams.Processor) {
return (org.reactivestreams.Processor<T, U>)flowProcessor;
}
return new ReactiveToFlowProcessor<T, U>(flowProcessor);
}

Expand All @@ -96,18 +96,18 @@ public static <T, U> org.reactivestreams.Processor<T, U> toReactiveStreams(
* @return the equivalent Flow Processor
*/
@SuppressWarnings("unchecked")
public static <T, U> Flow.Processor<T, U> toFlow(
public static <T, U> Flow.Processor<T, U> toFlowProcessor(
org.reactivestreams.Processor<? super T, ? extends U> reactiveStreamsProcessor
) {
if (reactiveStreamsProcessor == null) {
throw new NullPointerException("reactiveStreamsProcessor");
}
if (reactiveStreamsProcessor instanceof Flow.Processor) {
return (Flow.Processor<T, U>)reactiveStreamsProcessor;
}
if (reactiveStreamsProcessor instanceof ReactiveToFlowProcessor) {
return (Flow.Processor<T, U>)(((ReactiveToFlowProcessor<T, U>)reactiveStreamsProcessor).flow);
}
if (reactiveStreamsProcessor instanceof Flow.Processor) {
return (Flow.Processor<T, U>)reactiveStreamsProcessor;
}
return new FlowToReactiveProcessor<T, U>(reactiveStreamsProcessor);
}

Expand All @@ -117,10 +117,17 @@ public static <T, U> Flow.Processor<T, U> toFlow(
* @param reactiveStreamsSubscriber the Reactive Streams Subscriber instance to convert
* @return the equivalent Flow Subscriber
*/
@SuppressWarnings("unchecked")
public static <T> Flow.Subscriber<T> toFlowSubscriber(org.reactivestreams.Subscriber<T> reactiveStreamsSubscriber) {
if (reactiveStreamsSubscriber == null) {
throw new NullPointerException("reactiveStreamsSubscriber");
}
if (reactiveStreamsSubscriber instanceof ReactiveToFlowSubscriber) {
return (Flow.Subscriber<T>)((ReactiveToFlowSubscriber<T>)reactiveStreamsSubscriber).flow;
}
if (reactiveStreamsSubscriber instanceof Flow.Subscriber) {
return (Flow.Subscriber<T>)reactiveStreamsSubscriber;
}
return new FlowToReactiveSubscriber<T>(reactiveStreamsSubscriber);
}

Expand All @@ -130,19 +137,26 @@ public static <T> Flow.Subscriber<T> toFlowSubscriber(org.reactivestreams.Subscr
* @param flowSubscriber the Flow Subscriber instance to convert
* @return the equivalent Reactive Streams Subscriber
*/
public static <T> org.reactivestreams.Subscriber<T> toReactiveStreamsSubscriber(Flow.Subscriber<T> flowSubscriber) {
@SuppressWarnings("unchecked")
public static <T> org.reactivestreams.Subscriber<T> toSubscriber(Flow.Subscriber<T> flowSubscriber) {
if (flowSubscriber == null) {
throw new NullPointerException("flowSubscriber");
}
if (flowSubscriber instanceof FlowToReactiveSubscriber) {
return (org.reactivestreams.Subscriber<T>)((FlowToReactiveSubscriber<T>)flowSubscriber).reactiveStreams;
}
if (flowSubscriber instanceof org.reactivestreams.Subscriber) {
return (org.reactivestreams.Subscriber<T>)flowSubscriber;
}
return new ReactiveToFlowSubscriber<T>(flowSubscriber);
}

/**
* Wraps a Reactive Streams Subscription and converts the calls to a Flow Subscription.
*/
static final class FlowToReactiveSubscription implements Flow.Subscription {
private final org.reactivestreams.Subscription reactiveStreams;
final org.reactivestreams.Subscription reactiveStreams;

public FlowToReactiveSubscription(org.reactivestreams.Subscription reactive) {
this.reactiveStreams = reactive;
}
Expand All @@ -156,15 +170,15 @@ public void request(long n) {
public void cancel() {
reactiveStreams.cancel();
}

}

/**
* Wraps a Flow Subscription and converts the calls to a Reactive Streams Subscription.
*/
static final class ReactiveToFlowSubscription implements org.reactivestreams.Subscription {
private final Flow.Subscription flow;
final Flow.Subscription flow;

public ReactiveToFlowSubscription(Flow.Subscription flow) {
this.flow = flow;
}
Expand All @@ -178,25 +192,25 @@ public void request(long n) {
public void cancel() {
flow.cancel();
}


}

/**
* Wraps a Reactive Streams Subscriber and forwards methods of the Flow Subscriber to it.
* @param <T> the element type
*/
static final class FlowToReactiveSubscriber<T>
static final class FlowToReactiveSubscriber<T>
implements Flow.Subscriber<T> {
private final org.reactivestreams.Subscriber<? super T> reactiveStreams;
final org.reactivestreams.Subscriber<? super T> reactiveStreams;

public FlowToReactiveSubscriber(org.reactivestreams.Subscriber<? super T> reactive) {
this.reactiveStreams = reactive;
}

@Override
public void onSubscribe(Flow.Subscription subscription) {
reactiveStreams.onSubscribe(new ReactiveToFlowSubscription(subscription));
reactiveStreams.onSubscribe((subscription == null) ? null : new ReactiveToFlowSubscription(subscription));
}

@Override
Expand All @@ -213,24 +227,24 @@ public void onError(Throwable throwable) {
public void onComplete() {
reactiveStreams.onComplete();
}

}

/**
* Wraps a Reactive Streams Subscriber and forwards methods of the Flow Subscriber to it.
* @param <T> the element type
*/
static final class ReactiveToFlowSubscriber<T>
static final class ReactiveToFlowSubscriber<T>
implements org.reactivestreams.Subscriber<T> {
private final Flow.Subscriber<? super T> flow;
final Flow.Subscriber<? super T> flow;

public ReactiveToFlowSubscriber(Flow.Subscriber<? super T> flow) {
this.flow = flow;
}

@Override
public void onSubscribe(org.reactivestreams.Subscription subscription) {
flow.onSubscribe(new FlowToReactiveSubscription(subscription));
flow.onSubscribe((subscription == null) ? null : new FlowToReactiveSubscription(subscription));
}

@Override
Expand All @@ -247,9 +261,9 @@ public void onError(Throwable throwable) {
public void onComplete() {
flow.onComplete();
}

}

/**
* Wraps a Flow Processor and forwards methods of the Reactive Streams Processor to it.
* @param <T> the input type
Expand All @@ -258,14 +272,14 @@ public void onComplete() {
static final class ReactiveToFlowProcessor<T, U>
implements org.reactivestreams.Processor<T, U> {
final Flow.Processor<? super T, ? extends U> flow;

public ReactiveToFlowProcessor(Flow.Processor<? super T, ? extends U> flow) {
this.flow = flow;
}

@Override
public void onSubscribe(org.reactivestreams.Subscription s) {
flow.onSubscribe(new FlowToReactiveSubscription(s));
public void onSubscribe(org.reactivestreams.Subscription subscription) {
flow.onSubscribe((subscription == null) ? null : new FlowToReactiveSubscription(subscription));
}

@Override
Expand All @@ -285,14 +299,10 @@ public void onComplete() {

@Override
public void subscribe(org.reactivestreams.Subscriber<? super U> s) {
if (s == null) {
flow.subscribe(null);
return;
}
flow.subscribe(new FlowToReactiveSubscriber<U>(s));
flow.subscribe((s == null) ? null : new FlowToReactiveSubscriber<U>(s));
}
}

/**
* Wraps a Reactive Streams Processor and forwards methods of the Flow Processor to it.
* @param <T> the input type
Expand All @@ -301,14 +311,14 @@ public void subscribe(org.reactivestreams.Subscriber<? super U> s) {
static final class FlowToReactiveProcessor<T, U>
implements Flow.Processor<T, U> {
final org.reactivestreams.Processor<? super T, ? extends U> reactiveStreams;

public FlowToReactiveProcessor(org.reactivestreams.Processor<? super T, ? extends U> reactive) {
this.reactiveStreams = reactive;
}

@Override
public void onSubscribe(Flow.Subscription s) {
reactiveStreams.onSubscribe(new ReactiveToFlowSubscription(s));
public void onSubscribe(Flow.Subscription subscription) {
reactiveStreams.onSubscribe((subscription == null) ? null : new ReactiveToFlowSubscription(subscription));
}

@Override
Expand All @@ -328,11 +338,7 @@ public void onComplete() {

@Override
public void subscribe(Flow.Subscriber<? super U> s) {
if (s == null) {
reactiveStreams.subscribe(null);
return;
}
reactiveStreams.subscribe(new ReactiveToFlowSubscriber<U>(s));
reactiveStreams.subscribe((s == null) ? null : new ReactiveToFlowSubscriber<U>(s));
}
}

Expand All @@ -350,11 +356,7 @@ public ReactivePublisherFromFlow(Flow.Publisher<? extends T> flowPublisher) {

@Override
public void subscribe(org.reactivestreams.Subscriber<? super T> reactive) {
if (reactive == null) {
flow.subscribe(null);
return;
}
flow.subscribe(new FlowToReactiveSubscriber<T>(reactive));
flow.subscribe((reactive == null) ? null : new FlowToReactiveSubscriber<T>(reactive));
}
}

Expand All @@ -372,12 +374,8 @@ public FlowPublisherFromReactive(org.reactivestreams.Publisher<? extends T> reac

@Override
public void subscribe(Flow.Subscriber<? super T> flow) {
if (flow == null) {
reactiveStreams.subscribe(null);
return;
}
reactiveStreams.subscribe(new ReactiveToFlowSubscriber<T>(flow));
reactiveStreams.subscribe((flow == null) ? null : new ReactiveToFlowSubscriber<T>(flow));
}
}

}
}
Loading