Skip to content

Java 9 Flow bridge: add Subscriber converters #399

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,33 @@ public static <T, U> Flow.Processor<T, U> toFlow(
}
return new FlowToReactiveProcessor<T, U>(reactiveStreamsProcessor);
}


/**
* Converts a Reactive Streams Subscriber into a Flow Subscriber.
* @param <T> the input and output value type
* @param reactiveStreamsSubscriber the Reactive Streams Subscriber instance to convert
* @return the equivalent Flow Subscriber
*/
public static <T> Flow.Subscriber<T> toFlowSubscriber(org.reactivestreams.Subscriber<T> reactiveStreamsSubscriber) {
if (reactiveStreamsSubscriber == null) {
throw new NullPointerException("reactiveStreamsSubscriber");
}
return new FlowToReactiveSubscriber<T>(reactiveStreamsSubscriber);
}

/**
* Converts a Flow Subscriber into a Reactive Streams Subscriber.
* @param <T> the input and output value type
* @param flowSubscriber the Flow Subscriber instance to convert
* @return the equivalent Flow Subscriber
*/
public static <T> org.reactivestreams.Subscriber<T> toReactiveStreamsSubscriber(Flow.Subscriber<T> flowSubscriber) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, I wonder actually; now it's onconsistent since most methods are toFlow and to ReactiveStreams and this one adds the specific type name - should we align them to the same style?

The overloaded version works fine too I think, just toX everywhere, but I have no strong preference WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there might be an ambiguity problem with toFlow(Processor) when trying to convert a Processor, requiring the user to cast. For consistency though, a postfix could be added to the other methods.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the intended usage is to import static then I think it's fine to harmonize on "toReactiveStreamsX" and "toFlowX". Thoughts?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah good catch that a processor would be ambigious then... I'd avoid having 2 methods to do the same, so I guess for consistency we should do the long names everywhere.

if (flowSubscriber == null) {
throw new NullPointerException("flowSubscriber");
}
return new ReactiveToFlowSubscriber<T>(flowSubscriber);
}

/**
* Wraps a Reactive Streams Subscription and converts the calls to a Flow Subscription.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

package org.reactivestreams;

import org.testng.Assert;
import org.testng.annotations.Test;

import java.io.IOException;
Expand Down Expand Up @@ -110,4 +111,68 @@ public void execute(Runnable command) {

tc.assertFailure(IOException.class, 1, 2, 3, 4, 5);
}

@Test
public void reactiveStreamsToFlowSubscriber() {
TestEitherConsumer<Integer> tc = new TestEitherConsumer<Integer>();

Flow.Subscriber<Integer> fs = ReactiveStreamsFlowBridge.toFlowSubscriber(tc);

final Object[] state = { null, null };

fs.onSubscribe(new Flow.Subscription() {
@Override
public void request(long n) {
state[0] = n;
}

@Override
public void cancel() {
state[1] = true;
}
});

Assert.assertEquals(state[0], Long.MAX_VALUE);

fs.onNext(1);
fs.onNext(2);
fs.onNext(3);
fs.onComplete();

tc.assertResult(1, 2, 3);

Assert.assertNull(state[1]);
}

@Test
public void flowToReactiveStreamsSubscriber() {
TestEitherConsumer<Integer> tc = new TestEitherConsumer<Integer>();

org.reactivestreams.Subscriber<Integer> fs = ReactiveStreamsFlowBridge.toReactiveStreamsSubscriber(tc);

final Object[] state = { null, null };

fs.onSubscribe(new org.reactivestreams.Subscription() {
@Override
public void request(long n) {
state[0] = n;
}

@Override
public void cancel() {
state[1] = true;
}
});

Assert.assertEquals(state[0], Long.MAX_VALUE);

fs.onNext(1);
fs.onNext(2);
fs.onNext(3);
fs.onComplete();

tc.assertResult(1, 2, 3);

Assert.assertNull(state[1]);
}
}