From 89bc5f17ef368534fadf70f1bb24ef54bd0e2bee Mon Sep 17 00:00:00 2001 From: Nitesh Kant Date: Thu, 19 Mar 2015 15:50:41 -0700 Subject: [PATCH 1/4] Proposal for TCP server at all layers (core, transport & rxjava1) Based on various discussions around different issues, I am proposing a fresh design for a TCP server in this PR. I have implemented a TCP server at all layers in reactive-ipc, i.e. core, transport(netty) and rxjava1. I have also removed all code which isn't related to the implementation. We can discuss those utilities in isolation as and when required to be added. Although, not used, I have kept the Buffer and related classes as that aspect is not yet discussed to conclusion. There are two samples each for netty transport and rxjava1 layer available to see how will a typical interaction look like in modules ripc-transport-netty4-examples and ripc-rxjava1-examples Backpressure has not been implemented yet. --- build.gradle | 50 ++- .../ripc/core/NamedDaemonThreadFactory.java | 26 -- .../main/java/io/ripc/core/Specification.java | 21 -- .../main/java/io/ripc/core/package-info.java | 3 - .../java/io/ripc/internal/Publishers.java | 48 +++ .../java/io/ripc/{core => }/io/Buffer.java | 2 +- .../test/java/io/ripc/core/package-info.java | 3 - .../java/io/ripc/protocol/tcp/Connection.java | 48 ++- .../protocol/tcp/ConnectionPublisher.java | 9 - .../java/io/ripc/protocol/tcp/TcpHandler.java | 8 + .../io/ripc/protocol/tcp/TcpInterceptor.java | 7 + .../java/io/ripc/protocol/tcp/TcpServer.java | 88 +++++ .../rx/protocol/tcp/RxTcpServerSample.java | 36 ++ .../main/java/io/ripc}/rx/package-info.java | 0 .../io/ripc/rx/protocol/tcp/RxConnection.java | 64 ++++ .../io/ripc/rx/protocol/tcp/RxTcpHandler.java | 8 + .../rx/protocol/tcp/RxTcpInterceptor.java | 6 + .../io/ripc/rx/protocol/tcp/RxTcpServer.java | 69 ++++ .../test/java/io/ripc}/rx/package-info.java | 0 .../transport/netty4/tcp/TcpServerSample.java | 36 ++ .../ripc/transport/netty4/ByteBufBuffer.java | 331 ------------------ .../ChannelInboundHandlerSubscription.java | 96 ----- .../tcp/ChannelInitializerSubscription.java | 59 ---- .../netty4/tcp/ChannelToConnectionBridge.java | 183 ++++++++++ .../transport/netty4/tcp/ConnectionImpl.java | 90 +++++ .../transport/netty4/tcp/Netty4TcpServer.java | 95 +++++ .../netty4/tcp/server/NettyTcpServer.java | 71 ---- .../tcp/server/NettyTcpServerConnection.java | 52 --- .../NettyTcpServerIntegrationTests.java | 77 ---- settings.gradle | 4 +- 30 files changed, 826 insertions(+), 764 deletions(-) delete mode 100644 ripc-core/src/main/java/io/ripc/core/NamedDaemonThreadFactory.java delete mode 100644 ripc-core/src/main/java/io/ripc/core/Specification.java delete mode 100644 ripc-core/src/main/java/io/ripc/core/package-info.java create mode 100644 ripc-core/src/main/java/io/ripc/internal/Publishers.java rename ripc-core/src/main/java/io/ripc/{core => }/io/Buffer.java (99%) delete mode 100644 ripc-core/src/test/java/io/ripc/core/package-info.java delete mode 100644 ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/ConnectionPublisher.java create mode 100644 ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/TcpHandler.java create mode 100644 ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/TcpInterceptor.java create mode 100644 ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/TcpServer.java create mode 100644 ripc-rxjava1-examples/src/main/java/io/rpc/rx/protocol/tcp/RxTcpServerSample.java rename {ripc-composition-rxjava1/src/main/java/io/ripc/composition => ripc-rxjava1/src/main/java/io/ripc}/rx/package-info.java (100%) create mode 100644 ripc-rxjava1/src/main/java/io/ripc/rx/protocol/tcp/RxConnection.java create mode 100644 ripc-rxjava1/src/main/java/io/ripc/rx/protocol/tcp/RxTcpHandler.java create mode 100644 ripc-rxjava1/src/main/java/io/ripc/rx/protocol/tcp/RxTcpInterceptor.java create mode 100644 ripc-rxjava1/src/main/java/io/ripc/rx/protocol/tcp/RxTcpServer.java rename {ripc-composition-rxjava1/src/test/java/io/ripc/composition => ripc-rxjava1/src/test/java/io/ripc}/rx/package-info.java (100%) create mode 100644 ripc-transport-netty4-examples/src/main/java/io/ripc/transport/netty4/tcp/TcpServerSample.java delete mode 100644 ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/ByteBufBuffer.java delete mode 100644 ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/ChannelInboundHandlerSubscription.java delete mode 100644 ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/ChannelInitializerSubscription.java create mode 100644 ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/ChannelToConnectionBridge.java create mode 100644 ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/ConnectionImpl.java create mode 100644 ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/Netty4TcpServer.java delete mode 100644 ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/server/NettyTcpServer.java delete mode 100644 ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/server/NettyTcpServerConnection.java delete mode 100644 ripc-transport-netty4/src/test/java/io/ripc/transport/netty4/tcp/server/NettyTcpServerIntegrationTests.java diff --git a/build.gradle b/build.gradle index 8991c63..4c5bd3a 100644 --- a/build.gradle +++ b/build.gradle @@ -48,11 +48,6 @@ allprojects { "-Xlint:-rawtypes" // TODO enable and fix warnings ] - compileJava { - sourceCompatibility = 1.7 - targetCompatibility = 1.7 - } - compileTestJava { sourceCompatibility = 1.8 targetCompatibility = 1.8 @@ -87,9 +82,29 @@ project('ripc-protocol-tcp') { dependencies { // ripc-core compile project(":ripc-core") + compile "org.slf4j:slf4j-api:1.7.6" + testCompile 'io.reactivex:rxjava:1.0.8' + testCompile 'io.reactivex:rxjava-reactive-streams:0.3.0' } } +project('ripc-transport-netty4-examples') { + description = 'Reactive IPC TCP Component examples' + + compileTestJava { + sourceCompatibility = 1.8 + targetCompatibility = 1.8 + } + + dependencies { + // ripc-core + compile project(":ripc-transport-netty4") + compile 'io.reactivex:rxjava:1.0.8' + compile 'io.reactivex:rxjava-reactive-streams:0.3.0' + compile 'org.slf4j:slf4j-simple:1.7.6' + } +} + project('ripc-transport-netty4') { description = 'Reactive IPC Netty 4.x Transport Implementation' dependencies { @@ -98,17 +113,32 @@ project('ripc-transport-netty4') { // Netty compile "io.netty:netty-all:$nettyVersion" + compile "org.slf4j:slf4j-api:1.7.6" } } -project('ripc-composition-rxjava1') { +project('ripc-rxjava1') { description = 'Reactive IPC Composition Layer Implementation' dependencies { // ripc-tcp - compile project(":ripc-transport-netty4") + compile project(":ripc-protocol-tcp") + compile 'io.reactivex:rxjava:1.0.8' + compile 'io.reactivex:rxjava-reactive-streams:0.3.0' + } +} - // RxJava 1.0 - compile "io.reactivex:rxjava:$rxjava1Version" +project('ripc-rxjava1-examples') { + description = 'Reactive IPC Composition Layer examples' + + compileTestJava { + sourceCompatibility = 1.8 + targetCompatibility = 1.8 + } + + dependencies { + // ripc-tcp + compile project(":ripc-rxjava1") + compile project(":ripc-transport-netty4-examples") } } @@ -125,7 +155,7 @@ configure(rootProject) { it.tasks.getByName("jar") } } - options.memberLevel = org.gradle.external.javadoc.JavadocMemberLevel.PROTECTED + options.memberLevel = JavadocMemberLevel.PROTECTED options.author = true options.header = rootProject.description //options.overview = "src/api/overview.html" diff --git a/ripc-core/src/main/java/io/ripc/core/NamedDaemonThreadFactory.java b/ripc-core/src/main/java/io/ripc/core/NamedDaemonThreadFactory.java deleted file mode 100644 index 2f00e07..0000000 --- a/ripc-core/src/main/java/io/ripc/core/NamedDaemonThreadFactory.java +++ /dev/null @@ -1,26 +0,0 @@ -package io.ripc.core; - -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.atomic.AtomicLong; - -/** - * Created by jbrisbin on 3/10/15. - */ -public class NamedDaemonThreadFactory implements ThreadFactory { - - private final AtomicLong counter = new AtomicLong(1); - private final String prefix; - - public NamedDaemonThreadFactory(String prefix) { - this.prefix = prefix; - } - - @Override - public Thread newThread(Runnable r) { - String name = prefix + "-" + counter.getAndIncrement(); - Thread t = new Thread(r, name); - t.setDaemon(true); - return t; - } - -} diff --git a/ripc-core/src/main/java/io/ripc/core/Specification.java b/ripc-core/src/main/java/io/ripc/core/Specification.java deleted file mode 100644 index 4cd7786..0000000 --- a/ripc-core/src/main/java/io/ripc/core/Specification.java +++ /dev/null @@ -1,21 +0,0 @@ -package io.ripc.core; - -import org.reactivestreams.Subscriber; - -/** - * Created by jbrisbin on 3/10/15. - */ -public abstract class Specification { - - protected Specification() { - } - - public static boolean spec_3_9_verifyPositiveDemand(long demand, Subscriber subscriber) { - if (demand > 0) { - return true; - } - subscriber.onError(new IllegalArgumentException("Spec 3.9: Request signals must be a positive number.")); - return false; - } - -} diff --git a/ripc-core/src/main/java/io/ripc/core/package-info.java b/ripc-core/src/main/java/io/ripc/core/package-info.java deleted file mode 100644 index e1c3537..0000000 --- a/ripc-core/src/main/java/io/ripc/core/package-info.java +++ /dev/null @@ -1,3 +0,0 @@ -/** - * - */ diff --git a/ripc-core/src/main/java/io/ripc/internal/Publishers.java b/ripc-core/src/main/java/io/ripc/internal/Publishers.java new file mode 100644 index 0000000..3cc5787 --- /dev/null +++ b/ripc-core/src/main/java/io/ripc/internal/Publishers.java @@ -0,0 +1,48 @@ +package io.ripc.internal; + +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; + +/** + * Temporary utility class for creating and transforming {@link Publisher}s. + */ +public class Publishers { + + public static Publisher just(final T value) { + return new Publisher() { + @Override + public void subscribe(final Subscriber s) { + s.onSubscribe(new Subscription() { + @Override + public void request(long n) { + s.onNext(value); + s.onComplete(); + } + + @Override + public void cancel() { + } + }); + } + }; + } + + public static Publisher error(final Throwable t) { + return new Publisher() { + @Override + public void subscribe(final Subscriber s) { + s.onSubscribe(new Subscription() { + @Override + public void request(long n) { + s.onError(t); + } + + @Override + public void cancel() { + } + }); + } + }; + } +} diff --git a/ripc-core/src/main/java/io/ripc/core/io/Buffer.java b/ripc-core/src/main/java/io/ripc/io/Buffer.java similarity index 99% rename from ripc-core/src/main/java/io/ripc/core/io/Buffer.java rename to ripc-core/src/main/java/io/ripc/io/Buffer.java index 8158e1b..e05b6bc 100644 --- a/ripc-core/src/main/java/io/ripc/core/io/Buffer.java +++ b/ripc-core/src/main/java/io/ripc/io/Buffer.java @@ -1,4 +1,4 @@ -package io.ripc.core.io; +package io.ripc.io; import java.nio.ByteBuffer; import java.nio.charset.CharsetDecoder; diff --git a/ripc-core/src/test/java/io/ripc/core/package-info.java b/ripc-core/src/test/java/io/ripc/core/package-info.java deleted file mode 100644 index e1c3537..0000000 --- a/ripc-core/src/test/java/io/ripc/core/package-info.java +++ /dev/null @@ -1,3 +0,0 @@ -/** - * - */ diff --git a/ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/Connection.java b/ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/Connection.java index 398396d..64224e5 100644 --- a/ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/Connection.java +++ b/ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/Connection.java @@ -1,13 +1,53 @@ package io.ripc.protocol.tcp; -import io.ripc.core.io.Buffer; import org.reactivestreams.Publisher; /** - * Created by jbrisbin on 3/10/15. + * An abstraction for a TCP connection. + * + * @param The type of objects read from this connection. + * @param The type of objects written to this connection. */ -public interface Connection extends Publisher> { +public interface Connection extends Publisher { - void write(Publisher> data); + /** + * Writes the passed stream of {@code data} and returns the result as a {@link Publisher}. All items emitted by + * this stream are flushed on completion of the stream. + * + * @param data Data stream to write. + * + * @return Result of write. + */ + Publisher write(Publisher data); + /** + * Writes the passed stream of {@code data} and returns the result as a {@link Publisher}. All written items are + * flushed whenever the passed {@code flushSelector} returns {@code true} + * + * @param data Data stream to write. + * @param flushSelector Selector that is invoked after every emitted item is written. If this selector returns + * {@code true} then all items written till now are flushed. + * + * @return Result of write. + */ + Publisher write(Publisher data, FlushSelector flushSelector); + + /** + * A function that is used for determining when a flush has to be invoked on the underlying channel. + * + * @param Type of items emitted by the stream using this selector. + */ + interface FlushSelector { + + /** + * Selects whether flush should be invoked on the channel. + * + * @param count The index of this item. Starts with 1. + * @param lastWrittenItem Item which was last written before calling this selector. + * + * @return {@code true} if flush should be invoked. + */ + boolean select(long count, W lastWrittenItem); + + } } diff --git a/ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/ConnectionPublisher.java b/ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/ConnectionPublisher.java deleted file mode 100644 index 3b583b8..0000000 --- a/ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/ConnectionPublisher.java +++ /dev/null @@ -1,9 +0,0 @@ -package io.ripc.protocol.tcp; - -import org.reactivestreams.Publisher; - -/** - * Created by jbrisbin on 3/10/15. - */ -public interface ConnectionPublisher extends Publisher> { -} diff --git a/ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/TcpHandler.java b/ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/TcpHandler.java new file mode 100644 index 0000000..1f461a0 --- /dev/null +++ b/ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/TcpHandler.java @@ -0,0 +1,8 @@ +package io.ripc.protocol.tcp; + +import org.reactivestreams.Publisher; + +public interface TcpHandler { + + Publisher handle(Connection connection); +} diff --git a/ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/TcpInterceptor.java b/ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/TcpInterceptor.java new file mode 100644 index 0000000..ebcc3f7 --- /dev/null +++ b/ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/TcpInterceptor.java @@ -0,0 +1,7 @@ +package io.ripc.protocol.tcp; + +public interface TcpInterceptor { + + TcpHandler intercept(TcpHandler handler); + +} diff --git a/ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/TcpServer.java b/ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/TcpServer.java new file mode 100644 index 0000000..d942a3e --- /dev/null +++ b/ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/TcpServer.java @@ -0,0 +1,88 @@ +package io.ripc.protocol.tcp; + +import io.ripc.internal.Publishers; +import org.reactivestreams.Publisher; + +import java.util.concurrent.atomic.AtomicBoolean; + +public abstract class TcpServer { + + @SuppressWarnings("rawtypes") + private LazyTcpHandler lazyHandler; + protected final TcpHandler thisHandler; + protected final AtomicBoolean started; + + protected TcpServer() { + lazyHandler = new LazyTcpHandler<>(); + thisHandler = null; + this.started = new AtomicBoolean(); + } + + protected TcpServer(TcpHandler handler) { + thisHandler = handler; + this.started = new AtomicBoolean(); + } + + public TcpServer intercept(TcpInterceptor interceptor) { + if (null == thisHandler) { + @SuppressWarnings("unchecked") + TcpServer toReturn = newServer(interceptor.intercept(lazyHandler)); + toReturn.lazyHandler = lazyHandler; + return toReturn; + } else { + TcpServer toReturn = newServer(interceptor.intercept(thisHandler)); + toReturn.lazyHandler = lazyHandler; + return toReturn; + } + } + + public final TcpServer start(TcpHandler handler) { + if (!started.compareAndSet(false, true)) { + throw new IllegalStateException("Server already started"); + } + + if (null == thisHandler) { + doStart(handler); + } else { + lazyHandler.start(handler); + doStart(thisHandler); + } + return this; + } + + public final void startAndAwait(TcpHandler handler) { + start(handler); + awaitShutdown(); + } + + public final boolean shutdown() { + return !started.compareAndSet(true, false) || doShutdown(); + } + + public abstract void awaitShutdown(); + + public abstract boolean doShutdown(); + + protected abstract TcpServer newServer(TcpHandler handler); + + protected abstract TcpServer doStart(TcpHandler handler); + + private static class LazyTcpHandler implements TcpHandler { + + private TcpHandler delegate; + + @Override + public Publisher handle(Connection connection) { + if (null == delegate) { + return Publishers.error(new IllegalStateException("Handler not initialized.")); + } else { + return delegate.handle(connection); + } + } + + @SuppressWarnings("unchecked") + private void start(TcpHandler handler) { + delegate = handler; + } + } +} diff --git a/ripc-rxjava1-examples/src/main/java/io/rpc/rx/protocol/tcp/RxTcpServerSample.java b/ripc-rxjava1-examples/src/main/java/io/rpc/rx/protocol/tcp/RxTcpServerSample.java new file mode 100644 index 0000000..bba9cb6 --- /dev/null +++ b/ripc-rxjava1-examples/src/main/java/io/rpc/rx/protocol/tcp/RxTcpServerSample.java @@ -0,0 +1,36 @@ +package io.rpc.rx.protocol.tcp; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.ripc.protocol.tcp.TcpServer; +import io.ripc.rx.protocol.tcp.RxTcpInterceptor; +import io.ripc.rx.protocol.tcp.RxTcpServer; +import io.ripc.transport.netty4.tcp.Netty4TcpServer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static java.nio.charset.Charset.*; + +public class RxTcpServerSample { + + private static final Logger logger = LoggerFactory.getLogger(RxTcpServerSample.class); + + public static RxTcpInterceptor log() { + return handler -> input -> { + logger.error("Received a new connection."); + return handler.handle(input); + }; + } + + public static void main(String[] args) { + + TcpServer transport = Netty4TcpServer.create(0); + + RxTcpServer.create(transport) + .intercept(log()) + .startAndAwait(connection -> connection.write(connection.map(bb -> { + String msg = "Hello " + bb.toString(defaultCharset()); + return Unpooled.buffer().writeBytes(msg.getBytes()); + }), (count, item) -> true)); + } +} \ No newline at end of file diff --git a/ripc-composition-rxjava1/src/main/java/io/ripc/composition/rx/package-info.java b/ripc-rxjava1/src/main/java/io/ripc/rx/package-info.java similarity index 100% rename from ripc-composition-rxjava1/src/main/java/io/ripc/composition/rx/package-info.java rename to ripc-rxjava1/src/main/java/io/ripc/rx/package-info.java diff --git a/ripc-rxjava1/src/main/java/io/ripc/rx/protocol/tcp/RxConnection.java b/ripc-rxjava1/src/main/java/io/ripc/rx/protocol/tcp/RxConnection.java new file mode 100644 index 0000000..ccd1564 --- /dev/null +++ b/ripc-rxjava1/src/main/java/io/ripc/rx/protocol/tcp/RxConnection.java @@ -0,0 +1,64 @@ +package io.ripc.rx.protocol.tcp; + +import io.ripc.protocol.tcp.Connection; +import io.ripc.protocol.tcp.Connection.FlushSelector; +import rx.Observable; +import rx.RxReactiveStreams; +import rx.Subscriber; + +import static rx.RxReactiveStreams.*; + +/** + * An adapter for {@link Connection} representated as an {@link Observable} + * + * @param The type of objects read from this connection. + * @param The type of objects written to this connection. + */ +public class RxConnection extends Observable { + + private final Connection delegate; + + protected RxConnection(final Connection delegate) { + super(new OnSubscribe() { + @Override + public void call(Subscriber subscriber) { + RxReactiveStreams.subscribe(delegate, subscriber); + } + }); + this.delegate = delegate; + } + + /** + * Writes the passed stream of {@code data} and returns the result as an {@link Observable}. All items emitted by + * this stream are flushed on completion of the stream. + * + * @param data Data stream to write. + * + * @return Result of write. + */ + public Observable write(Observable data) { + return toObservable(delegate.write(toPublisher(data))); + } + + /** + * Writes the passed stream of {@code data} and returns the result as an {@link Observable}. All written items are + * flushed whenever the passed {@code flushSelector} returns {@code true} + * + * @param data Data stream to write. + * @param flushSelector Selector that is invoked after every emitted item is written. If this selector returns + * {@code true} then all items written till now are flushed. + * + * @return Result of write. + */ + public Observable write(Observable data, FlushSelector flushSelector) { + return toObservable(delegate.write(toPublisher(data), flushSelector)); + } + + Connection getDelegate() { + return delegate; + } + + public static RxConnection create(Connection delegate) { + return new RxConnection<>(delegate); + } +} diff --git a/ripc-rxjava1/src/main/java/io/ripc/rx/protocol/tcp/RxTcpHandler.java b/ripc-rxjava1/src/main/java/io/ripc/rx/protocol/tcp/RxTcpHandler.java new file mode 100644 index 0000000..9126b0c --- /dev/null +++ b/ripc-rxjava1/src/main/java/io/ripc/rx/protocol/tcp/RxTcpHandler.java @@ -0,0 +1,8 @@ +package io.ripc.rx.protocol.tcp; + +import rx.Observable; + +public interface RxTcpHandler { + + Observable handle(RxConnection connection); +} diff --git a/ripc-rxjava1/src/main/java/io/ripc/rx/protocol/tcp/RxTcpInterceptor.java b/ripc-rxjava1/src/main/java/io/ripc/rx/protocol/tcp/RxTcpInterceptor.java new file mode 100644 index 0000000..aae1261 --- /dev/null +++ b/ripc-rxjava1/src/main/java/io/ripc/rx/protocol/tcp/RxTcpInterceptor.java @@ -0,0 +1,6 @@ +package io.ripc.rx.protocol.tcp; + +public interface RxTcpInterceptor { + + RxTcpHandler intercept(RxTcpHandler handler); +} diff --git a/ripc-rxjava1/src/main/java/io/ripc/rx/protocol/tcp/RxTcpServer.java b/ripc-rxjava1/src/main/java/io/ripc/rx/protocol/tcp/RxTcpServer.java new file mode 100644 index 0000000..5ffad7d --- /dev/null +++ b/ripc-rxjava1/src/main/java/io/ripc/rx/protocol/tcp/RxTcpServer.java @@ -0,0 +1,69 @@ +package io.ripc.rx.protocol.tcp; + +import io.ripc.protocol.tcp.Connection; +import io.ripc.protocol.tcp.TcpHandler; +import io.ripc.protocol.tcp.TcpInterceptor; +import io.ripc.protocol.tcp.TcpServer; +import org.reactivestreams.Publisher; +import rx.Observable; +import rx.RxReactiveStreams; + +public final class RxTcpServer { + + private final TcpServer transport; + + private RxTcpServer(final TcpServer transport) { + this.transport = transport; + } + + public RxTcpServer intercept(final RxTcpInterceptor interceptor) { + return new RxTcpServer<>(transport.intercept(new TcpInterceptor() { + @Override + public TcpHandler intercept(final TcpHandler rsHandler) { + /*Create once, not per connection*/ + final RxTcpHandler rxHandler = interceptor.intercept(new RxTcpHandler() { + @Override + public Observable handle(RxConnection connection) { + return RxReactiveStreams.toObservable(rsHandler.handle(connection.getDelegate())); + } + }); + + return new TcpHandler() { + @Override + public Publisher handle(Connection connection) { + return RxReactiveStreams.toPublisher(rxHandler.handle(RxConnection.create(connection))); + } + }; + } + })); + } + + public RxTcpServer start(final RxTcpHandler handler) { + + transport.start(new TcpHandler() { + @Override + public Publisher handle(Connection connection) { + return RxReactiveStreams.toPublisher(handler.handle(new RxConnection<>(connection))); + } + }); + + return this; + } + + public void startAndAwait(RxTcpHandler handler) { + start(handler); + transport.awaitShutdown(); + } + + public final boolean shutdown() { + return transport.shutdown(); + } + + public void awaitShutdown() { + transport.awaitShutdown(); + } + + public static RxTcpServer create(TcpServer transport) { + return new RxTcpServer<>(transport); + } +} diff --git a/ripc-composition-rxjava1/src/test/java/io/ripc/composition/rx/package-info.java b/ripc-rxjava1/src/test/java/io/ripc/rx/package-info.java similarity index 100% rename from ripc-composition-rxjava1/src/test/java/io/ripc/composition/rx/package-info.java rename to ripc-rxjava1/src/test/java/io/ripc/rx/package-info.java diff --git a/ripc-transport-netty4-examples/src/main/java/io/ripc/transport/netty4/tcp/TcpServerSample.java b/ripc-transport-netty4-examples/src/main/java/io/ripc/transport/netty4/tcp/TcpServerSample.java new file mode 100644 index 0000000..84a156a --- /dev/null +++ b/ripc-transport-netty4-examples/src/main/java/io/ripc/transport/netty4/tcp/TcpServerSample.java @@ -0,0 +1,36 @@ +package io.ripc.transport.netty4.tcp; + + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.ripc.internal.Publishers; +import io.ripc.protocol.tcp.TcpInterceptor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static java.nio.charset.Charset.defaultCharset; +import static rx.RxReactiveStreams.*; + +public class TcpServerSample { + + private static final Logger logger = LoggerFactory.getLogger(TcpServerSample.class); + + public static TcpInterceptor log() { + return handler -> input -> { + logger.error("Received a new connection."); + return handler.handle(input); + }; + } + + public static void main(String[] args) { + Netty4TcpServer.create(0) + .intercept(log()) + .start(connection -> + toPublisher(toObservable(connection) + .flatMap(byteBuf -> { + String msg = "Hello " + byteBuf.toString(defaultCharset()); + ByteBuf toWrite = Unpooled.buffer().writeBytes(msg.getBytes()); + return toObservable(connection.write(Publishers.just(toWrite))); + }))); + } +} \ No newline at end of file diff --git a/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/ByteBufBuffer.java b/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/ByteBufBuffer.java deleted file mode 100644 index 06e2747..0000000 --- a/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/ByteBufBuffer.java +++ /dev/null @@ -1,331 +0,0 @@ -package io.ripc.transport.netty4; - -import io.netty.buffer.ByteBuf; -import io.ripc.core.io.Buffer; - -import java.nio.ByteBuffer; -import java.nio.CharBuffer; -import java.nio.charset.CharacterCodingException; -import java.nio.charset.Charset; -import java.nio.charset.CharsetDecoder; -import java.util.Iterator; -import java.util.List; - -/** - * Created by jbrisbin on 3/10/15. - */ -public class ByteBufBuffer implements Buffer { - - private final ByteBuf buf; - private final boolean writable; - - public ByteBufBuffer(ByteBuf buf, boolean writable) { - this.buf = buf; - this.writable = writable; - } - - @Override - public int position() { - return (writable ? buf.writerIndex() : buf.readerIndex()); - } - - @Override - public Buffer position(int pos) { - return null; - } - - @Override - public int limit() { - return (writable ? buf.writableBytes() : buf.readableBytes()); - } - - @Override - public Buffer limit(int limit) { - return null; - } - - @Override - public int capacity() { - return buf.capacity(); - } - - @Override - public Buffer capacity(int capacity) { - buf.capacity(capacity); - return this; - } - - @Override - public int remaining() { - return capacity() - position(); - } - - @Override - public Buffer skip(int len) { - return null; - } - - @Override - public Buffer clear() { - return null; - } - - @Override - public Buffer compact() { - return null; - } - - @Override - public Buffer flip() { - return null; - } - - @Override - public Buffer rewind() { - return null; - } - - @Override - public Buffer rewind(int len) { - return null; - } - - @Override - public Buffer clone() { - return null; - } - - @Override - public Buffer copy() { - return new ByteBufBuffer(buf.copy(), writable); - } - - @Override - public Buffer slice(int start, int len) { - return new ByteBufBuffer(buf.copy(start, len), writable); - } - - @Override - public Iterable> split(byte delimiter) { - return null; - } - - @Override - public Iterable> split(byte delimiter, boolean stripDelimiter) { - return null; - } - - @Override - public Iterable> split(byte delimiter, - boolean stripDelimiter, - List> preallocatedList) { - return null; - } - - @Override - public Iterable> split(Buffer delimiter) { - return null; - } - - @Override - public Iterable> split(Buffer delimiter, boolean stripDelimiter) { - return null; - } - - @Override - public Iterable> split(Buffer delimiter, - boolean stripDelimiter, - List> preallocatedList) { - return null; - } - - @Override - public Buffer prepend(ByteBuf data) { - return null; - } - - @Override - public Buffer prepend(Buffer buffer) { - return null; - } - - @Override - public Buffer prepend(ByteBuffer buffer) { - return null; - } - - @Override - public Buffer prepend(CharSequence chars) { - return null; - } - - @Override - public Buffer prepend(byte[] bytes) { - return null; - } - - @Override - public Buffer prepend(byte b) { - return null; - } - - @Override - public Buffer prepend(char c) { - return null; - } - - @Override - public Buffer prepend(short s) { - return null; - } - - @Override - public Buffer prepend(int i) { - return null; - } - - @Override - public Buffer prepend(long l) { - return null; - } - - @Override - public Buffer append(ByteBuf data) { - return null; - } - - @Override - public Buffer append(Buffer buffer) { - return null; - } - - @Override - public Buffer append(ByteBuffer buffer) { - return null; - } - - @Override - public Buffer append(CharSequence chars) { - return null; - } - - @Override - public Buffer append(byte[] bytes) { - return null; - } - - @Override - public Buffer append(byte b) { - return null; - } - - @Override - public Buffer append(char c) { - return null; - } - - @Override - public Buffer append(short s) { - return null; - } - - @Override - public Buffer append(int i) { - return null; - } - - @Override - public Buffer append(long l) { - return null; - } - - @Override - public byte readByte() { - return 0; - } - - @Override - public void readBytes(byte[] bytes) { - - } - - @Override - public short readShort() { - return 0; - } - - @Override - public int readInt() { - return 0; - } - - @Override - public float readFloat() { - return 0; - } - - @Override - public double readDouble() { - return 0; - } - - @Override - public long readLong() { - return 0; - } - - @Override - public char readChar() { - return 0; - } - - @Override - public void readChars(char[] chars) { - - } - - @Override - public String readString() { - CharsetDecoder decoder = Charset.defaultCharset().newDecoder(); - try { - CharBuffer cb = decoder.decode(buf.nioBuffer()); - return cb.toString(); - } catch (CharacterCodingException e) { - throw new IllegalStateException(e); - } - } - - @Override - public String readString(CharsetDecoder decoder) { - return null; - } - - @Override - public ByteBuf get() { - return buf; - } - - @Override - public void close() throws Exception { - - } - - @Override - public int compareTo(Buffer o) { - return 0; - } - - @Override - public Iterator> iterator() { - return null; - } - - @Override - public String toString() { - return "ByteBufBuffer{" + - "buf=" + buf + - ", writable=" + writable + - '}'; - } - -} diff --git a/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/ChannelInboundHandlerSubscription.java b/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/ChannelInboundHandlerSubscription.java deleted file mode 100644 index 920f817..0000000 --- a/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/ChannelInboundHandlerSubscription.java +++ /dev/null @@ -1,96 +0,0 @@ -package io.ripc.transport.netty4.tcp; - -import io.netty.buffer.ByteBuf; -import io.netty.channel.Channel; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInboundHandlerAdapter; -import io.ripc.core.Specification; -import io.ripc.core.io.Buffer; -import io.ripc.transport.netty4.ByteBufBuffer; -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; - -/** - * Created by jbrisbin on 3/10/15. - */ -public class ChannelInboundHandlerSubscription extends ChannelInboundHandlerAdapter implements Subscription { - - private final Channel channel; - private final Subscriber> subscriber; - - private volatile long pending = 0; - - public ChannelInboundHandlerSubscription(Channel channel, Subscriber> subscriber) { - this.channel = channel; - this.subscriber = subscriber; - } - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { - if (pending < 1) { - super.exceptionCaught(ctx, cause); - return; - } - subscriber.onError(cause); - } - - @Override - public void channelInactive(ChannelHandlerContext ctx) throws Exception { - if (pending < 1) { - super.channelInactive(ctx); - return; - } - subscriber.onComplete(); - } - - @Override - public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { - if (pending < 1) { - super.channelReadComplete(ctx); - } - } - - @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - if (pending < 1) { - super.channelRead(ctx, msg); - return; - } - - ByteBuf buf = (ByteBuf) msg; - try { - subscriber.onNext(new ByteBufBuffer(buf, false)); - synchronized (this) { - pending--; - } - channel.read(); - } catch (Throwable t) { - subscriber.onError(t); - } - } - - @Override - public void request(long demand) { - if (!Specification.spec_3_9_verifyPositiveDemand(demand, subscriber)) { - return; - } - - synchronized (this) { - if (demand < Long.MAX_VALUE) { - pending += demand; - } else { - pending = Long.MAX_VALUE; - } - } - channel.read(); - } - - @Override - public void cancel() { - synchronized (this) { - pending = -1; - } - channel.close(); - } - -} diff --git a/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/ChannelInitializerSubscription.java b/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/ChannelInitializerSubscription.java deleted file mode 100644 index e10951e..0000000 --- a/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/ChannelInitializerSubscription.java +++ /dev/null @@ -1,59 +0,0 @@ -package io.ripc.transport.netty4.tcp; - -import io.netty.buffer.ByteBuf; -import io.netty.channel.ChannelInitializer; -import io.netty.channel.socket.SocketChannel; -import io.ripc.core.Specification; -import io.ripc.protocol.tcp.Connection; -import io.ripc.transport.netty4.tcp.server.NettyTcpServerConnection; -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; - -/** - * Created by jbrisbin on 3/10/15. - */ -public class ChannelInitializerSubscription extends ChannelInitializer implements Subscription { - - private final Subscriber> subscriber; - - private volatile long pending = 0l; - - public ChannelInitializerSubscription(Subscriber> subscriber) { - this.subscriber = subscriber; - } - - @Override - protected void initChannel(SocketChannel ch) throws Exception { - if (pending < 1) { - return; - } - NettyTcpServerConnection conn = new NettyTcpServerConnection(ch); - try { - subscriber.onNext(conn); - synchronized (this) { - pending--; - } - } catch (Throwable t) { - subscriber.onError(t); - } - } - - @Override - public void request(long demand) { - if (!Specification.spec_3_9_verifyPositiveDemand(demand, subscriber)) { - return; - } - synchronized (this) { - pending += demand; - } - } - - @Override - public void cancel() { - synchronized (this) { - pending = -1; - } - - } - -} diff --git a/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/ChannelToConnectionBridge.java b/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/ChannelToConnectionBridge.java new file mode 100644 index 0000000..14d70e3 --- /dev/null +++ b/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/ChannelToConnectionBridge.java @@ -0,0 +1,183 @@ +package io.ripc.transport.netty4.tcp; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelDuplexHandler; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPromise; +import io.netty.util.ReferenceCountUtil; +import io.ripc.protocol.tcp.Connection; +import io.ripc.protocol.tcp.TcpHandler; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A bridge between netty's {@link Channel} and {@link Connection}. It has the following responsibilities: + * + *
    +
  • Create a new {@link Connection} instance when the channel is active and forwards it to the configured + {@link TcpHandler}.
  • +
  • Reads any data from the channel and forwards it to the {@link Subscriber} attached via the event + {@link ChannelToConnectionBridge.ConnectionInputSubscriberEvent}
  • +
  • Accepts writes of {@link Publisher} on the channel and translates the items emitted from that publisher to the + channel.
  • +
+ * + * @param The type of objects read from the underneath channel. + * @param The type of objects read written to the underneath channel. + */ +public class ChannelToConnectionBridge extends ChannelDuplexHandler { + + private static final Logger logger = LoggerFactory.getLogger(ChannelToConnectionBridge.class); + + private final TcpHandler handler; + private ConnectionImpl conn; + private Subscriber inputSubscriber; /*Populated via event ConnectionInputSubscriberEvent*/ + + public ChannelToConnectionBridge(TcpHandler handler) { + this.handler = handler; + } + + @Override + public void channelActive(final ChannelHandlerContext ctx) throws Exception { + super.channelActive(ctx); + conn = new ConnectionImpl<>(ctx.channel()); + handler.handle(conn) + .subscribe(new Subscriber() { + @Override + public void onSubscribe(Subscription s) { + // Void, no op + } + + @Override + public void onNext(Void aVoid) { + // Void, no op + } + + @Override + public void onError(Throwable t) { + logger.error("Error processing connection. Closing the channel.", t); + ctx.channel().close(); + } + + @Override + public void onComplete() { + ctx.channel().close(); + } + }); + } + + @SuppressWarnings("unchecked") + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + if (null == conn || null == inputSubscriber) { + logger.error("No connection input subscriber available. Disposing data."); + ReferenceCountUtil.release(msg); + return; + } + + try { + inputSubscriber.onNext((R) msg); + } catch (ClassCastException e) { + logger.error("Invalid message type read from the pipeline.", e); + inputSubscriber.onError(e); + } + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + if (null != conn && inputSubscriber != null) { + inputSubscriber.onComplete(); + } + super.channelInactive(ctx); + } + + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + if (evt instanceof ConnectionInputSubscriberEvent) { + @SuppressWarnings("unchecked") + ConnectionInputSubscriberEvent subscriberEvent = (ConnectionInputSubscriberEvent) evt; + if (null == inputSubscriber) { + inputSubscriber = subscriberEvent.getInputSubscriber(); + } else { + inputSubscriber.onError(new IllegalStateException("Only one connection input subscriber allowed.")); + } + } + super.userEventTriggered(ctx, evt); + } + + @Override + public void write(final ChannelHandlerContext ctx, Object msg, final ChannelPromise promise) throws Exception { + if (msg instanceof Publisher) { + @SuppressWarnings("unchecked") + final Publisher data = (Publisher) msg; + + data.subscribe(new Subscriber() { + + // TODO: Needs to be fixed to wire all futures to the promise of the Publisher write. + private ChannelFuture lastWriteFuture; + + @Override + public void onSubscribe(Subscription s) { + s.request(Long.MAX_VALUE); // TODO: Backpressure + } + + @Override + public void onNext(W w) { + lastWriteFuture = ctx.channel().write(w); + } + + @Override + public void onError(Throwable t) { + onTerminate(); + } + + @Override + public void onComplete() { + onTerminate(); + } + + private void onTerminate() { + ctx.channel().flush(); + lastWriteFuture.addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (future.isSuccess()) { + promise.trySuccess(); + } else { + promise.tryFailure(future.cause()); + } + } + }); + } + }); + } else { + super.write(ctx, msg, promise); + } + } + + /** + * An event to attach a {@link Subscriber} to the {@link Connection} created by {@link ChannelToConnectionBridge} + * + * @param + */ + public static final class ConnectionInputSubscriberEvent { + + private final Subscriber inputSubscriber; + + public ConnectionInputSubscriberEvent(Subscriber inputSubscriber) { + if (null == inputSubscriber) { + throw new IllegalArgumentException("Connection input subscriber must not be null."); + } + this.inputSubscriber = inputSubscriber; + } + + public Subscriber getInputSubscriber() { + return inputSubscriber; + } + } +} diff --git a/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/ConnectionImpl.java b/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/ConnectionImpl.java new file mode 100644 index 0000000..7ed21c7 --- /dev/null +++ b/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/ConnectionImpl.java @@ -0,0 +1,90 @@ +package io.ripc.transport.netty4.tcp; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.ripc.protocol.tcp.Connection; +import io.ripc.transport.netty4.tcp.ChannelToConnectionBridge.ConnectionInputSubscriberEvent; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; + +public class ConnectionImpl implements Connection { + + private final Channel nettyChannel; + + public ConnectionImpl(Channel nettyChannel) { + this.nettyChannel = nettyChannel; + } + + @Override + public Publisher write(final Publisher data) { + return new Publisher() { + @Override + public void subscribe(Subscriber s) { + bridgeFutureToSub(nettyChannel.write(data), s); + } + }; + } + + @Override + public Publisher write(final Publisher data, final FlushSelector flushSelector) { + return new Publisher() { + @Override + public void subscribe(Subscriber s) { + + final Publisher flushAwareData = new Publisher() { + @Override + public void subscribe(final Subscriber subscriber) { + data.subscribe(new Subscriber() { + private long itemCount; + + @Override + public void onSubscribe(Subscription subscription) { + subscriber.onSubscribe(subscription); + } + + @Override + public void onNext(W w) { + subscriber.onNext(w); + if (flushSelector.select(++itemCount, w)) { + nettyChannel.flush(); + } + } + + @Override + public void onError(Throwable t) { + subscriber.onError(t); + } + + @Override + public void onComplete() { + subscriber.onComplete(); + } + }); + } + }; + + bridgeFutureToSub(nettyChannel.write(flushAwareData), s); + } + }; + } + + @Override + public void subscribe(Subscriber s) { + nettyChannel.pipeline().fireUserEventTriggered(new ConnectionInputSubscriberEvent<>(s)); + } + + private void bridgeFutureToSub(ChannelFuture future, final Subscriber s) { + future.addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (future.isSuccess()) { + s.onComplete(); + } else { + s.onError(future.cause()); + } + } + }); + } +} diff --git a/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/Netty4TcpServer.java b/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/Netty4TcpServer.java new file mode 100644 index 0000000..c527b37 --- /dev/null +++ b/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/Netty4TcpServer.java @@ -0,0 +1,95 @@ +package io.ripc.transport.netty4.tcp; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.ripc.protocol.tcp.TcpHandler; +import io.ripc.protocol.tcp.TcpServer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetSocketAddress; +import java.net.SocketAddress; + +public class Netty4TcpServer extends TcpServer { + + private static final Logger logger = LoggerFactory.getLogger(Netty4TcpServer.class); + + private final int port; + private ServerBootstrap bootstrap; + private ChannelFuture bindFuture; + + protected Netty4TcpServer(int port) { + this.port = port; + bootstrap = new ServerBootstrap() + .group(new NioEventLoopGroup()) + .channel(NioServerSocketChannel.class); + } + + protected Netty4TcpServer(int port, TcpHandler handler) { + super(handler); + this.port = port; + bootstrap = new ServerBootstrap() + .group(new NioEventLoopGroup()) + .channel(NioServerSocketChannel.class); + } + + @Override + protected Netty4TcpServer doStart(final TcpHandler handler) { + bootstrap.childHandler(new ChannelInitializer() { + @Override + protected void initChannel(Channel ch) throws Exception { + ch.pipeline().addLast("server_handler", new ChannelToConnectionBridge<>(handler)); + } + }); + + try { + bindFuture = bootstrap.bind(port).sync(); + if (!bindFuture.isSuccess()) { + throw new RuntimeException(bindFuture.cause()); + } + SocketAddress localAddress = bindFuture.channel().localAddress(); + if (localAddress instanceof InetSocketAddress) { + logger.info("Started server at port: " + ((InetSocketAddress) localAddress).getPort()); + } + + } catch (InterruptedException e) { + e.printStackTrace(); + } + + return this; + } + + @Override + public void awaitShutdown() { + try { + bindFuture.channel().closeFuture().await(); + } catch (InterruptedException e) { + Thread.interrupted(); // Reset the interrupted status + logger.error("Interrupted while waiting for the server socket to close.", e); + } + } + + @Override + public boolean doShutdown() { + try { + bindFuture.channel().close().sync(); + return true; + } catch (InterruptedException e) { + logger.error("Failed to shutdown the server.", e); + return false; + } + } + + public static TcpServer create(int port) { + return new Netty4TcpServer<>(port); + } + + @Override + protected TcpServer newServer(TcpHandler handler) { + return new Netty4TcpServer<>(port, handler); + } +} diff --git a/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/server/NettyTcpServer.java b/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/server/NettyTcpServer.java deleted file mode 100644 index 01a1c6e..0000000 --- a/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/server/NettyTcpServer.java +++ /dev/null @@ -1,71 +0,0 @@ -package io.ripc.transport.netty4.tcp.server; - -import io.netty.bootstrap.ServerBootstrap; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.PooledByteBufAllocator; -import io.netty.channel.ChannelInitializer; -import io.netty.channel.EventLoopGroup; -import io.netty.channel.nio.NioEventLoopGroup; -import io.netty.channel.socket.SocketChannel; -import io.netty.channel.socket.nio.NioServerSocketChannel; -import io.ripc.core.NamedDaemonThreadFactory; -import io.ripc.protocol.tcp.Connection; -import io.ripc.protocol.tcp.ConnectionPublisher; -import io.ripc.transport.netty4.tcp.ChannelInitializerSubscription; -import org.reactivestreams.Subscriber; - -import java.util.ArrayList; -import java.util.List; - -/** - * Created by jbrisbin on 3/10/15. - */ -public class NettyTcpServer extends ChannelInitializer implements ConnectionPublisher { - - private final List subscriptions = new ArrayList<>(); - - private final ServerBootstrap bootstrap; - - public NettyTcpServer(ServerBootstrap bootstrap) { - this.bootstrap = bootstrap; - } - - public static ConnectionPublisher listen(int port) { - ServerBootstrap b = new ServerBootstrap(); - - int threads = Runtime.getRuntime().availableProcessors(); - EventLoopGroup ioGroup = new NioEventLoopGroup(threads, new NamedDaemonThreadFactory("netty-io")); - EventLoopGroup workerGroup = new NioEventLoopGroup(threads, new NamedDaemonThreadFactory("netty-worker")); - b.group(ioGroup, workerGroup); - - b.channel(NioServerSocketChannel.class); - - NettyTcpServer server = new NettyTcpServer(b); - b.childHandler(server); - - b.bind(port); - - return server; - } - - @Override - protected void initChannel(SocketChannel ch) throws Exception { - ch.config().setAutoRead(false); - ch.config().setAllocator(PooledByteBufAllocator.DEFAULT); - - synchronized (subscriptions) { - for (ChannelInitializerSubscription sub : subscriptions) { - ch.pipeline().addLast(sub); - } - } - } - - @Override - public void subscribe(Subscriber> s) { - ChannelInitializerSubscription sub = new ChannelInitializerSubscription(s); - synchronized (subscriptions) { - subscriptions.add(sub); - } - s.onSubscribe(sub); - } -} diff --git a/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/server/NettyTcpServerConnection.java b/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/server/NettyTcpServerConnection.java deleted file mode 100644 index 8e92363..0000000 --- a/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/server/NettyTcpServerConnection.java +++ /dev/null @@ -1,52 +0,0 @@ -package io.ripc.transport.netty4.tcp.server; - -import io.netty.buffer.ByteBuf; -import io.netty.channel.Channel; -import io.netty.handler.logging.LogLevel; -import io.netty.handler.logging.LoggingHandler; -import io.ripc.core.io.Buffer; -import io.ripc.protocol.tcp.Connection; -import io.ripc.transport.netty4.tcp.ChannelInboundHandlerSubscription; -import org.reactivestreams.Publisher; -import org.reactivestreams.Subscriber; - -import java.util.ArrayList; -import java.util.List; - -/** - * Created by jbrisbin on 3/10/15. - */ -public class NettyTcpServerConnection implements Connection { - - private final List>> subscribers = new ArrayList<>(); - - private final Channel channel; - - public NettyTcpServerConnection(Channel channel) { - this.channel = channel; - } - - @Override - public void write(Publisher> data) { - - } - - @Override - public void subscribe(Subscriber> s) { - ChannelInboundHandlerSubscription sub = new ChannelInboundHandlerSubscription(channel, s); - synchronized (this) { - subscribers.add(s); - } - s.onSubscribe(sub); - channel.pipeline() - .addLast(new LoggingHandler(LogLevel.DEBUG), sub); - } - - @Override - public String toString() { - return "NettyTcpServerConnection{" + - "channel=" + channel + - '}'; - } - -} diff --git a/ripc-transport-netty4/src/test/java/io/ripc/transport/netty4/tcp/server/NettyTcpServerIntegrationTests.java b/ripc-transport-netty4/src/test/java/io/ripc/transport/netty4/tcp/server/NettyTcpServerIntegrationTests.java deleted file mode 100644 index aaea4df..0000000 --- a/ripc-transport-netty4/src/test/java/io/ripc/transport/netty4/tcp/server/NettyTcpServerIntegrationTests.java +++ /dev/null @@ -1,77 +0,0 @@ -package io.ripc.transport.netty4.tcp.server; - -import io.netty.buffer.ByteBuf; -import io.ripc.core.io.Buffer; -import io.ripc.protocol.tcp.Connection; -import io.ripc.protocol.tcp.ConnectionPublisher; -import org.junit.Test; -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -/** - * Created by jbrisbin on 3/10/15. - */ -public class NettyTcpServerIntegrationTests { - - private static final Logger LOG = LoggerFactory.getLogger(NettyTcpServerIntegrationTests.class); - - @Test - public void canStartNettyTcpServer() throws InterruptedException { - ConnectionPublisher server = NettyTcpServer.listen(3000); - CountDownLatch latch = new CountDownLatch(1); - - server.subscribe(new Subscriber>() { - @Override - public void onSubscribe(Subscription s) { - s.request(Long.MAX_VALUE); - } - - @Override - public void onNext(Connection conn) { - LOG.debug("new connection={}", conn); - - conn.subscribe(new Subscriber>() { - @Override - public void onSubscribe(Subscription s) { - s.request(Long.MAX_VALUE); - } - - @Override - public void onNext(Buffer buffer) { - LOG.debug("data received: {}", buffer); - } - - @Override - public void onError(Throwable t) { - - } - - @Override - public void onComplete() { - LOG.debug("connection closed"); - latch.countDown(); - } - }); - } - - @Override - public void onError(Throwable t) { - t.printStackTrace(); - } - - @Override - public void onComplete() { - } - }); - - while (!latch.await(1, TimeUnit.SECONDS)) { - Thread.sleep(1000); - } - } - -} diff --git a/settings.gradle b/settings.gradle index 4c827f1..88a432e 100644 --- a/settings.gradle +++ b/settings.gradle @@ -3,4 +3,6 @@ rootProject.name = 'reactive-ipc' include 'ripc-core', 'ripc-protocol-tcp', 'ripc-transport-netty4', - 'ripc-composition-rxjava1' + 'ripc-transport-netty4-examples', + 'ripc-rxjava1', + 'ripc-rxjava1-examples' From 70956d498cda47ef7ee1654947788cd1efed0367 Mon Sep 17 00:00:00 2001 From: Nitesh Kant Date: Fri, 27 Mar 2015 16:16:04 -0700 Subject: [PATCH 2/4] Adding a short-circuit interception sample. --- .../transport/netty4/tcp/TcpServerSample.java | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/ripc-transport-netty4-examples/src/main/java/io/ripc/transport/netty4/tcp/TcpServerSample.java b/ripc-transport-netty4-examples/src/main/java/io/ripc/transport/netty4/tcp/TcpServerSample.java index 84a156a..e790228 100644 --- a/ripc-transport-netty4-examples/src/main/java/io/ripc/transport/netty4/tcp/TcpServerSample.java +++ b/ripc-transport-netty4-examples/src/main/java/io/ripc/transport/netty4/tcp/TcpServerSample.java @@ -8,6 +8,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.concurrent.atomic.AtomicLong; + import static java.nio.charset.Charset.defaultCharset; import static rx.RxReactiveStreams.*; @@ -22,9 +24,23 @@ public static TcpInterceptor log() { }; } + public static TcpInterceptor shortCircuitAltConnection() { + return handler -> { + final AtomicLong connCounter = new AtomicLong(); + return input -> { + if (connCounter.incrementAndGet() % 2 == 0) { + logger.error("Short-circuiting further processing."); + return input.write(Publishers.just(Unpooled.buffer().writeBytes("Go Away!!! \n".getBytes()))); + } + return handler.handle(input); + }; + }; + } + public static void main(String[] args) { Netty4TcpServer.create(0) .intercept(log()) + .intercept(shortCircuitAltConnection()) .start(connection -> toPublisher(toObservable(connection) .flatMap(byteBuf -> { From 44b860b0587d21464c952d94971737963bfe3d48 Mon Sep 17 00:00:00 2001 From: Nitesh Kant Date: Thu, 23 Apr 2015 11:06:43 -0700 Subject: [PATCH 3/4] Removed Interceptors and Flush semantics. Based on the various discussions on github, there is no agreement as of now on how to do flush and interception. Removing them for now so that we can iterate on this code to add those features incrementally. Additional changes: - Update rx-reactive-streams version. - Rename `Connection` to `TcpConnection` --- build.gradle | 6 +- .../java/io/ripc/protocol/tcp/Connection.java | 53 ----------- .../io/ripc/protocol/tcp/TcpConnection.java | 23 +++++ .../java/io/ripc/protocol/tcp/TcpHandler.java | 2 +- .../java/io/ripc/protocol/tcp/TcpServer.java | 51 +---------- .../rx/protocol/tcp/RxTcpServerSample.java | 25 ++---- .../io/ripc/rx/protocol/tcp/RxConnection.java | 52 ++++++----- .../io/ripc/rx/protocol/tcp/RxTcpServer.java | 30 +------ .../transport/netty4/tcp/TcpServerSample.java | 31 +------ .../netty4/tcp/ChannelToConnectionBridge.java | 11 ++- .../transport/netty4/tcp/ConnectionImpl.java | 90 ------------------- .../transport/netty4/tcp/Netty4TcpServer.java | 14 +-- .../netty4/tcp/TcpConnectionImpl.java | 46 ++++++++++ 13 files changed, 116 insertions(+), 318 deletions(-) delete mode 100644 ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/Connection.java create mode 100644 ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/TcpConnection.java delete mode 100644 ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/ConnectionImpl.java create mode 100644 ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/TcpConnectionImpl.java diff --git a/build.gradle b/build.gradle index 4c5bd3a..3f33bb5 100644 --- a/build.gradle +++ b/build.gradle @@ -84,7 +84,7 @@ project('ripc-protocol-tcp') { compile project(":ripc-core") compile "org.slf4j:slf4j-api:1.7.6" testCompile 'io.reactivex:rxjava:1.0.8' - testCompile 'io.reactivex:rxjava-reactive-streams:0.3.0' + testCompile 'io.reactivex:rxjava-reactive-streams:0.5.0' } } @@ -100,7 +100,7 @@ project('ripc-transport-netty4-examples') { // ripc-core compile project(":ripc-transport-netty4") compile 'io.reactivex:rxjava:1.0.8' - compile 'io.reactivex:rxjava-reactive-streams:0.3.0' + compile 'io.reactivex:rxjava-reactive-streams:0.5.0' compile 'org.slf4j:slf4j-simple:1.7.6' } } @@ -123,7 +123,7 @@ project('ripc-rxjava1') { // ripc-tcp compile project(":ripc-protocol-tcp") compile 'io.reactivex:rxjava:1.0.8' - compile 'io.reactivex:rxjava-reactive-streams:0.3.0' + compile 'io.reactivex:rxjava-reactive-streams:0.5.0' } } diff --git a/ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/Connection.java b/ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/Connection.java deleted file mode 100644 index 64224e5..0000000 --- a/ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/Connection.java +++ /dev/null @@ -1,53 +0,0 @@ -package io.ripc.protocol.tcp; - -import org.reactivestreams.Publisher; - -/** - * An abstraction for a TCP connection. - * - * @param The type of objects read from this connection. - * @param The type of objects written to this connection. - */ -public interface Connection extends Publisher { - - /** - * Writes the passed stream of {@code data} and returns the result as a {@link Publisher}. All items emitted by - * this stream are flushed on completion of the stream. - * - * @param data Data stream to write. - * - * @return Result of write. - */ - Publisher write(Publisher data); - - /** - * Writes the passed stream of {@code data} and returns the result as a {@link Publisher}. All written items are - * flushed whenever the passed {@code flushSelector} returns {@code true} - * - * @param data Data stream to write. - * @param flushSelector Selector that is invoked after every emitted item is written. If this selector returns - * {@code true} then all items written till now are flushed. - * - * @return Result of write. - */ - Publisher write(Publisher data, FlushSelector flushSelector); - - /** - * A function that is used for determining when a flush has to be invoked on the underlying channel. - * - * @param Type of items emitted by the stream using this selector. - */ - interface FlushSelector { - - /** - * Selects whether flush should be invoked on the channel. - * - * @param count The index of this item. Starts with 1. - * @param lastWrittenItem Item which was last written before calling this selector. - * - * @return {@code true} if flush should be invoked. - */ - boolean select(long count, W lastWrittenItem); - - } -} diff --git a/ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/TcpConnection.java b/ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/TcpConnection.java new file mode 100644 index 0000000..808f399 --- /dev/null +++ b/ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/TcpConnection.java @@ -0,0 +1,23 @@ +package io.ripc.protocol.tcp; + +import org.reactivestreams.Publisher; + +/** + * An abstraction for a TCP connection. + * + * @param The type of objects read from this connection. + * @param The type of objects written to this connection. + */ +public interface TcpConnection extends Publisher { + + /** + * Writes the passed stream of {@code data} and returns the result as a {@link Publisher}. All items emitted by + * this stream are flushed on completion of the stream. + * + * @param data Data stream to write. + * + * @return Result of write. + */ + Publisher write(Publisher data); + +} diff --git a/ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/TcpHandler.java b/ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/TcpHandler.java index 1f461a0..c094ff2 100644 --- a/ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/TcpHandler.java +++ b/ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/TcpHandler.java @@ -4,5 +4,5 @@ public interface TcpHandler { - Publisher handle(Connection connection); + Publisher handle(TcpConnection connection); } diff --git a/ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/TcpServer.java b/ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/TcpServer.java index d942a3e..d171532 100644 --- a/ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/TcpServer.java +++ b/ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/TcpServer.java @@ -1,52 +1,23 @@ package io.ripc.protocol.tcp; -import io.ripc.internal.Publishers; -import org.reactivestreams.Publisher; - import java.util.concurrent.atomic.AtomicBoolean; public abstract class TcpServer { - @SuppressWarnings("rawtypes") - private LazyTcpHandler lazyHandler; protected final TcpHandler thisHandler; protected final AtomicBoolean started; protected TcpServer() { - lazyHandler = new LazyTcpHandler<>(); thisHandler = null; this.started = new AtomicBoolean(); } - protected TcpServer(TcpHandler handler) { - thisHandler = handler; - this.started = new AtomicBoolean(); - } - - public TcpServer intercept(TcpInterceptor interceptor) { - if (null == thisHandler) { - @SuppressWarnings("unchecked") - TcpServer toReturn = newServer(interceptor.intercept(lazyHandler)); - toReturn.lazyHandler = lazyHandler; - return toReturn; - } else { - TcpServer toReturn = newServer(interceptor.intercept(thisHandler)); - toReturn.lazyHandler = lazyHandler; - return toReturn; - } - } - public final TcpServer start(TcpHandler handler) { if (!started.compareAndSet(false, true)) { throw new IllegalStateException("Server already started"); } - if (null == thisHandler) { - doStart(handler); - } else { - lazyHandler.start(handler); - doStart(thisHandler); - } + doStart(handler); return this; } @@ -63,26 +34,6 @@ public final boolean shutdown() { public abstract boolean doShutdown(); - protected abstract TcpServer newServer(TcpHandler handler); - protected abstract TcpServer doStart(TcpHandler handler); - private static class LazyTcpHandler implements TcpHandler { - - private TcpHandler delegate; - - @Override - public Publisher handle(Connection connection) { - if (null == delegate) { - return Publishers.error(new IllegalStateException("Handler not initialized.")); - } else { - return delegate.handle(connection); - } - } - - @SuppressWarnings("unchecked") - private void start(TcpHandler handler) { - delegate = handler; - } - } } diff --git a/ripc-rxjava1-examples/src/main/java/io/rpc/rx/protocol/tcp/RxTcpServerSample.java b/ripc-rxjava1-examples/src/main/java/io/rpc/rx/protocol/tcp/RxTcpServerSample.java index bba9cb6..39237f1 100644 --- a/ripc-rxjava1-examples/src/main/java/io/rpc/rx/protocol/tcp/RxTcpServerSample.java +++ b/ripc-rxjava1-examples/src/main/java/io/rpc/rx/protocol/tcp/RxTcpServerSample.java @@ -3,34 +3,23 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.ripc.protocol.tcp.TcpServer; -import io.ripc.rx.protocol.tcp.RxTcpInterceptor; import io.ripc.rx.protocol.tcp.RxTcpServer; import io.ripc.transport.netty4.tcp.Netty4TcpServer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import rx.Observable; import static java.nio.charset.Charset.*; public class RxTcpServerSample { - private static final Logger logger = LoggerFactory.getLogger(RxTcpServerSample.class); - - public static RxTcpInterceptor log() { - return handler -> input -> { - logger.error("Received a new connection."); - return handler.handle(input); - }; - } - - public static void main(String[] args) { + public static void main(String[] args) throws InterruptedException { TcpServer transport = Netty4TcpServer.create(0); RxTcpServer.create(transport) - .intercept(log()) - .startAndAwait(connection -> connection.write(connection.map(bb -> { - String msg = "Hello " + bb.toString(defaultCharset()); - return Unpooled.buffer().writeBytes(msg.getBytes()); - }), (count, item) -> true)); + .startAndAwait(connection -> connection.flatMap(bb -> { + String msgStr = "Hello " + bb.toString(defaultCharset()); + ByteBuf msg = Unpooled.buffer().writeBytes(msgStr.getBytes()); + return connection.write(Observable.just(msg).doOnCompleted(() -> System.out.println("Done!"))); + })); } } \ No newline at end of file diff --git a/ripc-rxjava1/src/main/java/io/ripc/rx/protocol/tcp/RxConnection.java b/ripc-rxjava1/src/main/java/io/ripc/rx/protocol/tcp/RxConnection.java index ccd1564..b1cceb4 100644 --- a/ripc-rxjava1/src/main/java/io/ripc/rx/protocol/tcp/RxConnection.java +++ b/ripc-rxjava1/src/main/java/io/ripc/rx/protocol/tcp/RxConnection.java @@ -1,28 +1,29 @@ package io.ripc.rx.protocol.tcp; -import io.ripc.protocol.tcp.Connection; -import io.ripc.protocol.tcp.Connection.FlushSelector; +import io.ripc.protocol.tcp.TcpConnection; import rx.Observable; -import rx.RxReactiveStreams; import rx.Subscriber; +import rx.functions.Action0; +import rx.functions.Action1; +import rx.internal.reactivestreams.SubscriberAdapter; import static rx.RxReactiveStreams.*; /** - * An adapter for {@link Connection} representated as an {@link Observable} + * An adapter for {@link io.ripc.protocol.tcp.TcpConnection} representated as an {@link Observable} * * @param The type of objects read from this connection. * @param The type of objects written to this connection. */ public class RxConnection extends Observable { - private final Connection delegate; + private final TcpConnection delegate; - protected RxConnection(final Connection delegate) { + protected RxConnection(final TcpConnection delegate) { super(new OnSubscribe() { @Override public void call(Subscriber subscriber) { - RxReactiveStreams.subscribe(delegate, subscriber); + delegate.subscribe(new SubscriberAdapter<>(subscriber)); } }); this.delegate = delegate; @@ -37,28 +38,25 @@ public void call(Subscriber subscriber) { * @return Result of write. */ public Observable write(Observable data) { - return toObservable(delegate.write(toPublisher(data))); - } - - /** - * Writes the passed stream of {@code data} and returns the result as an {@link Observable}. All written items are - * flushed whenever the passed {@code flushSelector} returns {@code true} - * - * @param data Data stream to write. - * @param flushSelector Selector that is invoked after every emitted item is written. If this selector returns - * {@code true} then all items written till now are flushed. - * - * @return Result of write. - */ - public Observable write(Observable data, FlushSelector flushSelector) { - return toObservable(delegate.write(toPublisher(data), flushSelector)); - } - - Connection getDelegate() { - return delegate; + return toObservable(delegate.write(toPublisher(data.doOnSubscribe(new Action0() { + @Override + public void call() { + System.out.println("Subscribed"); + } + }).doOnNext(new Action1() { + @Override + public void call(W w) { + System.out.println(w); + } + }).doOnTerminate(new Action0() { + @Override + public void call() { + System.out.println("COmpleted"); + } + })))); } - public static RxConnection create(Connection delegate) { + public static RxConnection create(TcpConnection delegate) { return new RxConnection<>(delegate); } } diff --git a/ripc-rxjava1/src/main/java/io/ripc/rx/protocol/tcp/RxTcpServer.java b/ripc-rxjava1/src/main/java/io/ripc/rx/protocol/tcp/RxTcpServer.java index 5ffad7d..11c7606 100644 --- a/ripc-rxjava1/src/main/java/io/ripc/rx/protocol/tcp/RxTcpServer.java +++ b/ripc-rxjava1/src/main/java/io/ripc/rx/protocol/tcp/RxTcpServer.java @@ -1,11 +1,9 @@ package io.ripc.rx.protocol.tcp; -import io.ripc.protocol.tcp.Connection; +import io.ripc.protocol.tcp.TcpConnection; import io.ripc.protocol.tcp.TcpHandler; -import io.ripc.protocol.tcp.TcpInterceptor; import io.ripc.protocol.tcp.TcpServer; import org.reactivestreams.Publisher; -import rx.Observable; import rx.RxReactiveStreams; public final class RxTcpServer { @@ -16,34 +14,12 @@ private RxTcpServer(final TcpServer transport) { this.transport = transport; } - public RxTcpServer intercept(final RxTcpInterceptor interceptor) { - return new RxTcpServer<>(transport.intercept(new TcpInterceptor() { - @Override - public TcpHandler intercept(final TcpHandler rsHandler) { - /*Create once, not per connection*/ - final RxTcpHandler rxHandler = interceptor.intercept(new RxTcpHandler() { - @Override - public Observable handle(RxConnection connection) { - return RxReactiveStreams.toObservable(rsHandler.handle(connection.getDelegate())); - } - }); - - return new TcpHandler() { - @Override - public Publisher handle(Connection connection) { - return RxReactiveStreams.toPublisher(rxHandler.handle(RxConnection.create(connection))); - } - }; - } - })); - } - public RxTcpServer start(final RxTcpHandler handler) { transport.start(new TcpHandler() { @Override - public Publisher handle(Connection connection) { - return RxReactiveStreams.toPublisher(handler.handle(new RxConnection<>(connection))); + public Publisher handle(TcpConnection connection) { + return RxReactiveStreams.toPublisher(handler.handle(RxConnection.create(connection))); } }); diff --git a/ripc-transport-netty4-examples/src/main/java/io/ripc/transport/netty4/tcp/TcpServerSample.java b/ripc-transport-netty4-examples/src/main/java/io/ripc/transport/netty4/tcp/TcpServerSample.java index e790228..c7eddfd 100644 --- a/ripc-transport-netty4-examples/src/main/java/io/ripc/transport/netty4/tcp/TcpServerSample.java +++ b/ripc-transport-netty4-examples/src/main/java/io/ripc/transport/netty4/tcp/TcpServerSample.java @@ -4,43 +4,14 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.ripc.internal.Publishers; -import io.ripc.protocol.tcp.TcpInterceptor; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import java.util.concurrent.atomic.AtomicLong; - -import static java.nio.charset.Charset.defaultCharset; +import static java.nio.charset.Charset.*; import static rx.RxReactiveStreams.*; public class TcpServerSample { - private static final Logger logger = LoggerFactory.getLogger(TcpServerSample.class); - - public static TcpInterceptor log() { - return handler -> input -> { - logger.error("Received a new connection."); - return handler.handle(input); - }; - } - - public static TcpInterceptor shortCircuitAltConnection() { - return handler -> { - final AtomicLong connCounter = new AtomicLong(); - return input -> { - if (connCounter.incrementAndGet() % 2 == 0) { - logger.error("Short-circuiting further processing."); - return input.write(Publishers.just(Unpooled.buffer().writeBytes("Go Away!!! \n".getBytes()))); - } - return handler.handle(input); - }; - }; - } - public static void main(String[] args) { Netty4TcpServer.create(0) - .intercept(log()) - .intercept(shortCircuitAltConnection()) .start(connection -> toPublisher(toObservable(connection) .flatMap(byteBuf -> { diff --git a/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/ChannelToConnectionBridge.java b/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/ChannelToConnectionBridge.java index 14d70e3..329752a 100644 --- a/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/ChannelToConnectionBridge.java +++ b/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/ChannelToConnectionBridge.java @@ -7,7 +7,6 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; import io.netty.util.ReferenceCountUtil; -import io.ripc.protocol.tcp.Connection; import io.ripc.protocol.tcp.TcpHandler; import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; @@ -16,10 +15,10 @@ import org.slf4j.LoggerFactory; /** - * A bridge between netty's {@link Channel} and {@link Connection}. It has the following responsibilities: + * A bridge between netty's {@link Channel} and {@link io.ripc.protocol.tcp.TcpConnection}. It has the following responsibilities: * *
    -
  • Create a new {@link Connection} instance when the channel is active and forwards it to the configured +
  • Create a new {@link io.ripc.protocol.tcp.TcpConnection} instance when the channel is active and forwards it to the configured {@link TcpHandler}.
  • Reads any data from the channel and forwards it to the {@link Subscriber} attached via the event {@link ChannelToConnectionBridge.ConnectionInputSubscriberEvent}
  • @@ -35,7 +34,7 @@ public class ChannelToConnectionBridge extends ChannelDuplexHandler { private static final Logger logger = LoggerFactory.getLogger(ChannelToConnectionBridge.class); private final TcpHandler handler; - private ConnectionImpl conn; + private TcpConnectionImpl conn; private Subscriber inputSubscriber; /*Populated via event ConnectionInputSubscriberEvent*/ public ChannelToConnectionBridge(TcpHandler handler) { @@ -45,7 +44,7 @@ public ChannelToConnectionBridge(TcpHandler handler) { @Override public void channelActive(final ChannelHandlerContext ctx) throws Exception { super.channelActive(ctx); - conn = new ConnectionImpl<>(ctx.channel()); + conn = new TcpConnectionImpl<>(ctx.channel()); handler.handle(conn) .subscribe(new Subscriber() { @Override @@ -161,7 +160,7 @@ public void operationComplete(ChannelFuture future) throws Exception { } /** - * An event to attach a {@link Subscriber} to the {@link Connection} created by {@link ChannelToConnectionBridge} + * An event to attach a {@link Subscriber} to the {@link io.ripc.protocol.tcp.TcpConnection} created by {@link ChannelToConnectionBridge} * * @param */ diff --git a/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/ConnectionImpl.java b/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/ConnectionImpl.java deleted file mode 100644 index 7ed21c7..0000000 --- a/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/ConnectionImpl.java +++ /dev/null @@ -1,90 +0,0 @@ -package io.ripc.transport.netty4.tcp; - -import io.netty.channel.Channel; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelFutureListener; -import io.ripc.protocol.tcp.Connection; -import io.ripc.transport.netty4.tcp.ChannelToConnectionBridge.ConnectionInputSubscriberEvent; -import org.reactivestreams.Publisher; -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; - -public class ConnectionImpl implements Connection { - - private final Channel nettyChannel; - - public ConnectionImpl(Channel nettyChannel) { - this.nettyChannel = nettyChannel; - } - - @Override - public Publisher write(final Publisher data) { - return new Publisher() { - @Override - public void subscribe(Subscriber s) { - bridgeFutureToSub(nettyChannel.write(data), s); - } - }; - } - - @Override - public Publisher write(final Publisher data, final FlushSelector flushSelector) { - return new Publisher() { - @Override - public void subscribe(Subscriber s) { - - final Publisher flushAwareData = new Publisher() { - @Override - public void subscribe(final Subscriber subscriber) { - data.subscribe(new Subscriber() { - private long itemCount; - - @Override - public void onSubscribe(Subscription subscription) { - subscriber.onSubscribe(subscription); - } - - @Override - public void onNext(W w) { - subscriber.onNext(w); - if (flushSelector.select(++itemCount, w)) { - nettyChannel.flush(); - } - } - - @Override - public void onError(Throwable t) { - subscriber.onError(t); - } - - @Override - public void onComplete() { - subscriber.onComplete(); - } - }); - } - }; - - bridgeFutureToSub(nettyChannel.write(flushAwareData), s); - } - }; - } - - @Override - public void subscribe(Subscriber s) { - nettyChannel.pipeline().fireUserEventTriggered(new ConnectionInputSubscriberEvent<>(s)); - } - - private void bridgeFutureToSub(ChannelFuture future, final Subscriber s) { - future.addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - if (future.isSuccess()) { - s.onComplete(); - } else { - s.onError(future.cause()); - } - } - }); - } -} diff --git a/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/Netty4TcpServer.java b/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/Netty4TcpServer.java index c527b37..a8d8ab8 100644 --- a/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/Netty4TcpServer.java +++ b/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/Netty4TcpServer.java @@ -29,14 +29,6 @@ protected Netty4TcpServer(int port) { .channel(NioServerSocketChannel.class); } - protected Netty4TcpServer(int port, TcpHandler handler) { - super(handler); - this.port = port; - bootstrap = new ServerBootstrap() - .group(new NioEventLoopGroup()) - .channel(NioServerSocketChannel.class); - } - @Override protected Netty4TcpServer doStart(final TcpHandler handler) { bootstrap.childHandler(new ChannelInitializer() { @@ -57,7 +49,7 @@ protected void initChannel(Channel ch) throws Exception { } } catch (InterruptedException e) { - e.printStackTrace(); + logger.error("Error waiting for binding server port: " + port, e); } return this; @@ -88,8 +80,4 @@ public static TcpServer create(int port) { return new Netty4TcpServer<>(port); } - @Override - protected TcpServer newServer(TcpHandler handler) { - return new Netty4TcpServer<>(port, handler); - } } diff --git a/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/TcpConnectionImpl.java b/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/TcpConnectionImpl.java new file mode 100644 index 0000000..000df58 --- /dev/null +++ b/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/TcpConnectionImpl.java @@ -0,0 +1,46 @@ +package io.ripc.transport.netty4.tcp; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.ripc.protocol.tcp.TcpConnection; +import io.ripc.transport.netty4.tcp.ChannelToConnectionBridge.ConnectionInputSubscriberEvent; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; + +public class TcpConnectionImpl implements TcpConnection { + + private final Channel nettyChannel; + + public TcpConnectionImpl(Channel nettyChannel) { + this.nettyChannel = nettyChannel; + } + + @Override + public Publisher write(final Publisher data) { + return new Publisher() { + @Override + public void subscribe(Subscriber s) { + bridgeFutureToSub(nettyChannel.write(data), s); + } + }; + } + + @Override + public void subscribe(Subscriber s) { + nettyChannel.pipeline().fireUserEventTriggered(new ConnectionInputSubscriberEvent<>(s)); + } + + private void bridgeFutureToSub(ChannelFuture future, final Subscriber s) { + future.addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (future.isSuccess()) { + s.onComplete(); + } else { + s.onError(future.cause()); + } + } + }); + } +} From 19c863940ca420bbaccc9b04ebf7ca877b0fe9b1 Mon Sep 17 00:00:00 2001 From: Nitesh Kant Date: Mon, 27 Apr 2015 10:17:47 -0700 Subject: [PATCH 4/4] Fixed buid file and removed debug statements --- build.gradle | 266 +++++++++--------- .../rx/protocol/tcp/RxTcpServerSample.java | 2 +- .../io/ripc/rx/protocol/tcp/RxConnection.java | 19 +- 3 files changed, 132 insertions(+), 155 deletions(-) diff --git a/build.gradle b/build.gradle index 3f33bb5..d1d37f9 100644 --- a/build.gradle +++ b/build.gradle @@ -1,176 +1,170 @@ ext { - gradleVersion = '2.3' + gradleVersion = '2.3' - // Libraries - reactiveStreamsVersion = '1.0.0.RC3' + // Libraries + reactiveStreamsVersion = '1.0.0.RC5' - // Logging - slf4jVersion = '1.7.10' - logbackVersion = '1.1.2' + // Logging + slf4jVersion = '1.7.10' + logbackVersion = '1.1.2' - // Network Transports - nettyVersion = '4.0.26.Final' + // Network Transports + nettyVersion = '4.0.26.Final' - // Composition Libraries - rxjava1Version = '1.0.7' + // Composition Libraries + rxjava1Version = '1.0.8' + rxjavaRsVersion = '0.5.0' - // Testing - mockitoVersion = '1.10.19' - junitVersion = '4.12' + // Testing + mockitoVersion = '1.10.19' + junitVersion = '4.12' + hamcrestVersion = '1.3' - javadocLinks = [ - "http://docs.oracle.com/javase/7/docs/api/", - "http://www.reactive-streams.org/reactive-streams-1.0.0.RC3-javadoc/" - ] as String[] + javadocLinks = [ + "http://docs.oracle.com/javase/7/docs/api/", + "http://www.reactive-streams.org/reactive-streams-1.0.0.RC3-javadoc/" + ] as String[] } allprojects { - apply plugin: 'java' - - [compileJava, compileTestJava]*.options*.compilerArgs = [ - "-Xlint:varargs", - "-Xlint:cast", - "-Xlint:classfile", - "-Xlint:dep-ann", - "-Xlint:divzero", - "-Xlint:empty", - "-Xlint:finally", - "-Xlint:overrides", - "-Xlint:path", - "-Xlint:processing", - "-Xlint:static", - "-Xlint:try", - "-Xlint:deprecation", - "-Xlint:unchecked", - "-Xlint:-serial", // intentionally disabled - "-Xlint:-options", // intentionally disabled - "-Xlint:-fallthrough", // intentionally disabled - "-Xlint:-rawtypes" // TODO enable and fix warnings - ] - - compileTestJava { - sourceCompatibility = 1.8 - targetCompatibility = 1.8 - } - - repositories { - jcenter() - mavenCentral() - } + apply plugin: 'java' + + [compileJava, compileTestJava]*.options*.compilerArgs = [ + "-Xlint:varargs", + "-Xlint:cast", + "-Xlint:classfile", + "-Xlint:dep-ann", + "-Xlint:divzero", + "-Xlint:empty", + "-Xlint:finally", + "-Xlint:overrides", + "-Xlint:path", + "-Xlint:processing", + "-Xlint:static", + "-Xlint:try", + "-Xlint:deprecation", + "-Xlint:unchecked", + "-Xlint:-serial", // intentionally disabled + "-Xlint:-options", // intentionally disabled + "-Xlint:-fallthrough", // intentionally disabled + "-Xlint:-rawtypes" // TODO enable and fix warnings + ] + + compileJava { + sourceCompatibility = 1.7 + targetCompatibility = 1.7 + } + + compileTestJava { + sourceCompatibility = 1.8 + targetCompatibility = 1.8 + } + + repositories { + jcenter() + mavenCentral() + } } subprojects { subproject -> - dependencies { - // Testing - testCompile "junit:junit:$junitVersion", - "org.hamcrest:hamcrest-library:1.3", - "org.slf4j:slf4j-api:$slf4jVersion" - testRuntime "ch.qos.logback:logback-classic:$logbackVersion" - } + dependencies { + // Testing + testCompile "junit:junit:$junitVersion", + "org.hamcrest:hamcrest-library:$hamcrestVersion", + "org.slf4j:slf4j-api:$slf4jVersion" + testRuntime "ch.qos.logback:logback-classic:$logbackVersion" + } } project('ripc-core') { - description = 'Reactive IPC Core Components' - dependencies { - // Reactive Streams - compile "org.reactivestreams:reactive-streams:$reactiveStreamsVersion" - } + description = 'Reactive IPC Core Components' + dependencies { + // Reactive Streams + compile "org.reactivestreams:reactive-streams:$reactiveStreamsVersion" + } } project('ripc-protocol-tcp') { - description = 'Reactive IPC TCP Components' - dependencies { - // ripc-core - compile project(":ripc-core") - compile "org.slf4j:slf4j-api:1.7.6" - testCompile 'io.reactivex:rxjava:1.0.8' - testCompile 'io.reactivex:rxjava-reactive-streams:0.5.0' - } + description = 'Reactive IPC TCP Components' + dependencies { + // ripc-core + compile project(":ripc-core") + compile "org.slf4j:slf4j-api:$slf4jVersion" + } } project('ripc-transport-netty4-examples') { - description = 'Reactive IPC TCP Component examples' - - compileTestJava { - sourceCompatibility = 1.8 - targetCompatibility = 1.8 - } + description = 'Reactive IPC TCP Component examples' dependencies { - // ripc-core - compile project(":ripc-transport-netty4") - compile 'io.reactivex:rxjava:1.0.8' - compile 'io.reactivex:rxjava-reactive-streams:0.5.0' - compile 'org.slf4j:slf4j-simple:1.7.6' + // ripc-core + compile project(":ripc-transport-netty4") + compile "io.reactivex:rxjava:$rxjava1Version" + compile "io.reactivex:rxjava-reactive-streams:$rxjavaRsVersion" + compile "org.slf4j:slf4j-simple:$slf4jVersion" } } project('ripc-transport-netty4') { - description = 'Reactive IPC Netty 4.x Transport Implementation' - dependencies { - // ripc-tcp - compile project(":ripc-protocol-tcp") - - // Netty - compile "io.netty:netty-all:$nettyVersion" - compile "org.slf4j:slf4j-api:1.7.6" - } + description = 'Reactive IPC Netty 4.x Transport Implementation' + dependencies { + // ripc-tcp + compile project(":ripc-protocol-tcp") + + // Netty + compile "io.netty:netty-codec:$nettyVersion" + } } project('ripc-rxjava1') { - description = 'Reactive IPC Composition Layer Implementation' - dependencies { - // ripc-tcp - compile project(":ripc-protocol-tcp") - compile 'io.reactivex:rxjava:1.0.8' - compile 'io.reactivex:rxjava-reactive-streams:0.5.0' - } + description = 'Reactive IPC Composition Layer Implementation' + dependencies { + // ripc-tcp + compile project(":ripc-protocol-tcp") + compile "io.reactivex:rxjava:$rxjava1Version" + compile "io.reactivex:rxjava-reactive-streams:$rxjavaRsVersion" + } } project('ripc-rxjava1-examples') { - description = 'Reactive IPC Composition Layer examples' + description = 'Reactive IPC Composition Layer examples' - compileTestJava { - sourceCompatibility = 1.8 - targetCompatibility = 1.8 + dependencies { + // ripc-tcp + compile project(":ripc-rxjava1") + compile project(":ripc-transport-netty4-examples") } - - dependencies { - // ripc-tcp - compile project(":ripc-rxjava1") - compile project(":ripc-transport-netty4-examples") - } } configure(rootProject) { - description = "Reactive IPC" - - task api(type: Javadoc) { - group = "Documentation" - description = "Generates aggregated Javadoc API documentation." - title = "${rootProject.description} ${version} API" - - dependsOn { - subprojects.collect { - it.tasks.getByName("jar") - } - } - options.memberLevel = JavadocMemberLevel.PROTECTED - options.author = true - options.header = rootProject.description - //options.overview = "src/api/overview.html" - options.stylesheetFile = file("src/api/stylesheet.css") - options.links(project.ext.javadocLinks) - - source subprojects.collect { project -> - project.sourceSets.main.allJava - } - - maxMemory = "1024m" - destinationDir = new File(buildDir, "api") - - doFirst { - classpath = files(subprojects.collect { it.sourceSets.main.compileClasspath }) - } - } + description = "Reactive IPC" + + task api(type: Javadoc) { + group = "Documentation" + description = "Generates aggregated Javadoc API documentation." + title = "${rootProject.description} ${version} API" + + dependsOn { + subprojects.collect { + it.tasks.getByName("jar") + } + } + options.memberLevel = JavadocMemberLevel.PROTECTED + options.author = true + options.header = rootProject.description + //options.overview = "src/api/overview.html" + options.stylesheetFile = file("src/api/stylesheet.css") + options.links(project.ext.javadocLinks) + + source subprojects.collect { project -> + project.sourceSets.main.allJava + } + + maxMemory = "1024m" + destinationDir = new File(buildDir, "api") + + doFirst { + classpath = files(subprojects.collect { it.sourceSets.main.compileClasspath }) + } + } } diff --git a/ripc-rxjava1-examples/src/main/java/io/rpc/rx/protocol/tcp/RxTcpServerSample.java b/ripc-rxjava1-examples/src/main/java/io/rpc/rx/protocol/tcp/RxTcpServerSample.java index 39237f1..6ecb384 100644 --- a/ripc-rxjava1-examples/src/main/java/io/rpc/rx/protocol/tcp/RxTcpServerSample.java +++ b/ripc-rxjava1-examples/src/main/java/io/rpc/rx/protocol/tcp/RxTcpServerSample.java @@ -19,7 +19,7 @@ public static void main(String[] args) throws InterruptedException { .startAndAwait(connection -> connection.flatMap(bb -> { String msgStr = "Hello " + bb.toString(defaultCharset()); ByteBuf msg = Unpooled.buffer().writeBytes(msgStr.getBytes()); - return connection.write(Observable.just(msg).doOnCompleted(() -> System.out.println("Done!"))); + return connection.write(Observable.just(msg)); })); } } \ No newline at end of file diff --git a/ripc-rxjava1/src/main/java/io/ripc/rx/protocol/tcp/RxConnection.java b/ripc-rxjava1/src/main/java/io/ripc/rx/protocol/tcp/RxConnection.java index b1cceb4..780a244 100644 --- a/ripc-rxjava1/src/main/java/io/ripc/rx/protocol/tcp/RxConnection.java +++ b/ripc-rxjava1/src/main/java/io/ripc/rx/protocol/tcp/RxConnection.java @@ -3,8 +3,6 @@ import io.ripc.protocol.tcp.TcpConnection; import rx.Observable; import rx.Subscriber; -import rx.functions.Action0; -import rx.functions.Action1; import rx.internal.reactivestreams.SubscriberAdapter; import static rx.RxReactiveStreams.*; @@ -38,22 +36,7 @@ public void call(Subscriber subscriber) { * @return Result of write. */ public Observable write(Observable data) { - return toObservable(delegate.write(toPublisher(data.doOnSubscribe(new Action0() { - @Override - public void call() { - System.out.println("Subscribed"); - } - }).doOnNext(new Action1() { - @Override - public void call(W w) { - System.out.println(w); - } - }).doOnTerminate(new Action0() { - @Override - public void call() { - System.out.println("COmpleted"); - } - })))); + return toObservable(delegate.write(toPublisher(data))); } public static RxConnection create(TcpConnection delegate) {