From 376b1598e49955e28611add07a4e9dae431c0880 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Thu, 12 Oct 2017 13:31:28 +0200 Subject: [PATCH 1/2] Java 9 Flow bridge: add Subscriber converters --- .../ReactiveStreamsFlowBridge.java | 28 +++++++- .../ReactiveStreamsFlowBridgeTest.java | 65 +++++++++++++++++++ 2 files changed, 92 insertions(+), 1 deletion(-) diff --git a/flow-bridge/src/main/java/org/reactivestreams/ReactiveStreamsFlowBridge.java b/flow-bridge/src/main/java/org/reactivestreams/ReactiveStreamsFlowBridge.java index a6914df8..862ab39d 100644 --- a/flow-bridge/src/main/java/org/reactivestreams/ReactiveStreamsFlowBridge.java +++ b/flow-bridge/src/main/java/org/reactivestreams/ReactiveStreamsFlowBridge.java @@ -110,7 +110,33 @@ public static Flow.Processor toFlow( } return new FlowToReactiveProcessor(reactiveStreamsProcessor); } - + + /** + * Converts a Reactive Streams Subscriber into a Flow Subscriber. + * @param the input and output value type + * @param reactiveStreamsSubscriber the Reactive Streams Subscriber instance to convert + * @return the equivalent Flow Subscriber + */ + public static Flow.Subscriber toFlowSubscriber(org.reactivestreams.Subscriber reactiveStreamsSubscriber) { + if (reactiveStreamsSubscriber == null) { + throw new NullPointerException("reactiveStreamsSubscriber"); + } + return new FlowToReactiveSubscriber(reactiveStreamsSubscriber); + } + + /** + * Converts a Flow Subscriber into a Reactive Streams Subscriber. + * @param the input and output value type + * @param flowSubscriber the Flow Subscriber instance to convert + * @return the equivalent Flow Subscriber + */ + public static org.reactivestreams.Subscriber toReactiveStreamsSubscriber(Flow.Subscriber flowSubscriber) { + if (flowSubscriber == null) { + throw new NullPointerException("flowSubscriber"); + } + return new ReactiveToFlowSubscriber(flowSubscriber); + } + /** * Wraps a Reactive Streams Subscription and converts the calls to a Flow Subscription. */ diff --git a/flow-bridge/src/test/java/org/reactivestreams/ReactiveStreamsFlowBridgeTest.java b/flow-bridge/src/test/java/org/reactivestreams/ReactiveStreamsFlowBridgeTest.java index e1c76087..03573a0a 100644 --- a/flow-bridge/src/test/java/org/reactivestreams/ReactiveStreamsFlowBridgeTest.java +++ b/flow-bridge/src/test/java/org/reactivestreams/ReactiveStreamsFlowBridgeTest.java @@ -11,6 +11,7 @@ package org.reactivestreams; +import org.testng.Assert; import org.testng.annotations.Test; import java.io.IOException; @@ -110,4 +111,68 @@ public void execute(Runnable command) { tc.assertFailure(IOException.class, 1, 2, 3, 4, 5); } + + @Test + public void reactiveStreamsToFlowSubscriber() { + TestEitherConsumer tc = new TestEitherConsumer(); + + Flow.Subscriber fs = ReactiveStreamsFlowBridge.toFlowSubscriber(tc); + + final Object[] state = { null, null }; + + fs.onSubscribe(new Flow.Subscription() { + @Override + public void request(long n) { + state[0] = n; + } + + @Override + public void cancel() { + state[1] = true; + } + }); + + Assert.assertEquals(state[0], Long.MAX_VALUE); + + fs.onNext(1); + fs.onNext(2); + fs.onNext(3); + fs.onComplete(); + + tc.assertResult(1, 2, 3); + + Assert.assertNull(state[1]); + } + + @Test + public void flowToReactiveStreamsSubscriber() { + TestEitherConsumer tc = new TestEitherConsumer(); + + org.reactivestreams.Subscriber fs = ReactiveStreamsFlowBridge.toReactiveStreamsSubscriber(tc); + + final Object[] state = { null, null }; + + fs.onSubscribe(new org.reactivestreams.Subscription() { + @Override + public void request(long n) { + state[0] = n; + } + + @Override + public void cancel() { + state[1] = true; + } + }); + + Assert.assertEquals(state[0], Long.MAX_VALUE); + + fs.onNext(1); + fs.onNext(2); + fs.onNext(3); + fs.onComplete(); + + tc.assertResult(1, 2, 3); + + Assert.assertNull(state[1]); + } } From 18459a49c0e1911f238cdec1f17b0d99606b5270 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Thu, 12 Oct 2017 13:36:26 +0200 Subject: [PATCH 2/2] Fix return type javadoc --- .../java/org/reactivestreams/ReactiveStreamsFlowBridge.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flow-bridge/src/main/java/org/reactivestreams/ReactiveStreamsFlowBridge.java b/flow-bridge/src/main/java/org/reactivestreams/ReactiveStreamsFlowBridge.java index 862ab39d..bb8b320f 100644 --- a/flow-bridge/src/main/java/org/reactivestreams/ReactiveStreamsFlowBridge.java +++ b/flow-bridge/src/main/java/org/reactivestreams/ReactiveStreamsFlowBridge.java @@ -128,7 +128,7 @@ public static Flow.Subscriber toFlowSubscriber(org.reactivestreams.Subscr * Converts a Flow Subscriber into a Reactive Streams Subscriber. * @param the input and output value type * @param flowSubscriber the Flow Subscriber instance to convert - * @return the equivalent Flow Subscriber + * @return the equivalent Reactive Streams Subscriber */ public static org.reactivestreams.Subscriber toReactiveStreamsSubscriber(Flow.Subscriber flowSubscriber) { if (flowSubscriber == null) {