From 9747535301f6a543151e35cdf8c8c4aad47c17c3 Mon Sep 17 00:00:00 2001 From: Jon Brisbin Date: Thu, 26 Mar 2015 11:49:28 -0500 Subject: [PATCH 01/14] Revamp code to support dynamic backpressure and to reflect discussions around changing the role of the TcpConnection. --- build.gradle | 5 +- .../src/main/java/io/ripc/core/Consumer.java | 11 - .../java/io/ripc/core/DemandCalculator.java | 10 + .../src/main/java/io/ripc/core/Function.java | 9 - .../main/java/io/ripc/core/Publishers.java | 17 + .../java/io/ripc/core/SingletonPublisher.java | 50 +++ .../src/main/java/io/ripc/core/Supplier.java | 9 - .../src/main/java/io/ripc/core/io/Buffer.java | 129 ------- .../java/io/ripc/core/io/BufferSupplier.java | 9 - .../AbstractTcpConnectionEventHandler.java | 48 +++ .../protocol/tcp/CompleteEventHandler.java | 12 + .../java/io/ripc/protocol/tcp/Connection.java | 19 - .../io/ripc/protocol/tcp/TcpConnection.java | 16 + .../tcp/TcpConnectionEventHandler.java | 20 ++ ...Handler.java => TcpConnectionHandler.java} | 9 +- .../ripc/transport/netty4/ByteBufBuffer.java | 331 ------------------ .../netty4}/NamedDaemonThreadFactory.java | 2 +- .../ChannelInboundHandlerSubscription.java | 10 +- ...NettyChannelTcpConnectionEventHandler.java | 85 +++++ .../netty4/tcp/server/NettyTcpServer.java | 9 +- .../tcp/server/NettyTcpServerConnection.java | 116 ++++-- .../NettyTcpServerIntegrationTests.java | 101 +++--- 22 files changed, 409 insertions(+), 618 deletions(-) delete mode 100644 ripc-core/src/main/java/io/ripc/core/Consumer.java create mode 100644 ripc-core/src/main/java/io/ripc/core/DemandCalculator.java delete mode 100644 ripc-core/src/main/java/io/ripc/core/Function.java create mode 100644 ripc-core/src/main/java/io/ripc/core/Publishers.java create mode 100644 ripc-core/src/main/java/io/ripc/core/SingletonPublisher.java delete mode 100644 ripc-core/src/main/java/io/ripc/core/Supplier.java delete mode 100644 ripc-core/src/main/java/io/ripc/core/io/Buffer.java delete mode 100644 ripc-core/src/main/java/io/ripc/core/io/BufferSupplier.java create mode 100644 ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/AbstractTcpConnectionEventHandler.java create mode 100644 ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/CompleteEventHandler.java delete mode 100644 ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/Connection.java create mode 100644 ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/TcpConnection.java create mode 100644 ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/TcpConnectionEventHandler.java rename ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/{ConnectionHandler.java => TcpConnectionHandler.java} (67%) delete mode 100644 ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/ByteBufBuffer.java rename {ripc-core/src/main/java/io/ripc/core => ripc-transport-netty4/src/main/java/io/ripc/transport/netty4}/NamedDaemonThreadFactory.java (94%) create mode 100644 ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/NettyChannelTcpConnectionEventHandler.java diff --git a/build.gradle b/build.gradle index 8991c63..5c96b99 100644 --- a/build.gradle +++ b/build.gradle @@ -6,13 +6,13 @@ ext { // Logging slf4jVersion = '1.7.10' - logbackVersion = '1.1.2' + logbackVersion = '1.1.3' // Network Transports nettyVersion = '4.0.26.Final' // Composition Libraries - rxjava1Version = '1.0.7' + rxjava1Version = '1.0.8' // Testing mockitoVersion = '1.10.19' @@ -69,6 +69,7 @@ subprojects { subproject -> // Testing testCompile "junit:junit:$junitVersion", "org.hamcrest:hamcrest-library:1.3", + "org.mockito:mockito-core:$mockitoVersion", "org.slf4j:slf4j-api:$slf4jVersion" testRuntime "ch.qos.logback:logback-classic:$logbackVersion" } diff --git a/ripc-core/src/main/java/io/ripc/core/Consumer.java b/ripc-core/src/main/java/io/ripc/core/Consumer.java deleted file mode 100644 index d1bbbbd..0000000 --- a/ripc-core/src/main/java/io/ripc/core/Consumer.java +++ /dev/null @@ -1,11 +0,0 @@ -package io.ripc.core; - -/** - * Simple functional interface for accepting objects via callback. - */ -@FunctionalInterface -public interface Consumer { - - void accept(T obj); - -} diff --git a/ripc-core/src/main/java/io/ripc/core/DemandCalculator.java b/ripc-core/src/main/java/io/ripc/core/DemandCalculator.java new file mode 100644 index 0000000..1b4e2c7 --- /dev/null +++ b/ripc-core/src/main/java/io/ripc/core/DemandCalculator.java @@ -0,0 +1,10 @@ +package io.ripc.core; + +/** + * Created by jbrisbin on 3/26/15. + */ +public interface DemandCalculator { + + long calculateDemand(long pending); + +} diff --git a/ripc-core/src/main/java/io/ripc/core/Function.java b/ripc-core/src/main/java/io/ripc/core/Function.java deleted file mode 100644 index dcb2f41..0000000 --- a/ripc-core/src/main/java/io/ripc/core/Function.java +++ /dev/null @@ -1,9 +0,0 @@ -package io.ripc.core; - -/** - * Simple functional interface for applying transformations to objects. - */ -@FunctionalInterface -public interface Function { - V apply(T obj); -} diff --git a/ripc-core/src/main/java/io/ripc/core/Publishers.java b/ripc-core/src/main/java/io/ripc/core/Publishers.java new file mode 100644 index 0000000..03476ef --- /dev/null +++ b/ripc-core/src/main/java/io/ripc/core/Publishers.java @@ -0,0 +1,17 @@ +package io.ripc.core; + +import org.reactivestreams.Publisher; + +/** + * Created by jbrisbin on 3/26/15. + */ +public final class Publishers { + + private Publishers() { + } + + public static Publisher just(final T obj) { + return new SingletonPublisher<>(obj); + } + +} diff --git a/ripc-core/src/main/java/io/ripc/core/SingletonPublisher.java b/ripc-core/src/main/java/io/ripc/core/SingletonPublisher.java new file mode 100644 index 0000000..b3ce506 --- /dev/null +++ b/ripc-core/src/main/java/io/ripc/core/SingletonPublisher.java @@ -0,0 +1,50 @@ +package io.ripc.core; + +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; + +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Created by jbrisbin on 3/26/15. + */ +public class SingletonPublisher implements Publisher, DemandCalculator { + + private final AtomicBoolean requested = new AtomicBoolean(false); + + private final T value; + + public SingletonPublisher(T value) { + this.value = value; + } + + @Override + public long calculateDemand(long pending) { + return (requested.get() ? -1 : 1); + } + + @Override + public void subscribe(final Subscriber subscriber) { + subscriber.onSubscribe(new Subscription() { + @Override + public void request(long n) { + if (!Specification.spec_3_9_verifyPositiveDemand(n, subscriber)) { + return; + } + if (requested.compareAndSet(false, true)) { + if (null != value) { + subscriber.onNext(value); + } + subscriber.onComplete(); + } + } + + @Override + public void cancel() { + requested.set(true); + } + }); + } + +} diff --git a/ripc-core/src/main/java/io/ripc/core/Supplier.java b/ripc-core/src/main/java/io/ripc/core/Supplier.java deleted file mode 100644 index 14e147a..0000000 --- a/ripc-core/src/main/java/io/ripc/core/Supplier.java +++ /dev/null @@ -1,9 +0,0 @@ -package io.ripc.core; - -/** - * A simple functional interface to provide instances of a given object. - */ -@FunctionalInterface -public interface Supplier { - T get(); -} diff --git a/ripc-core/src/main/java/io/ripc/core/io/Buffer.java b/ripc-core/src/main/java/io/ripc/core/io/Buffer.java deleted file mode 100644 index 8158e1b..0000000 --- a/ripc-core/src/main/java/io/ripc/core/io/Buffer.java +++ /dev/null @@ -1,129 +0,0 @@ -package io.ripc.core.io; - -import java.nio.ByteBuffer; -import java.nio.charset.CharsetDecoder; -import java.util.List; - -/** - * Common abstraction to provide additional functionality beyond a traditional {@link java.nio.ByteBuffer} while not - * restricting the dedicated functionality provided by concrete implementations from various transport libraries that - * might offer features like zero-copy. - *

- * A {@code Buffer} can be anything. It is not limited to byte buffers. A {@code Buffer} could represent realized - * objects descended from raw data. - *

- */ -public interface Buffer extends Cloneable, - AutoCloseable, - Comparable>, - Iterable> { - - int position(); - - Buffer position(int pos); - - int limit(); - - Buffer limit(int limit); - - int capacity(); - - Buffer capacity(int capacity); - - int remaining(); - - Buffer skip(int len); - - Buffer clear(); - - Buffer compact(); - - Buffer flip(); - - Buffer rewind(); - - Buffer rewind(int len); - - Buffer clone(); - - Buffer copy(); - - Buffer slice(int start, int len); - - Iterable> split(byte delimiter); - - Iterable> split(byte delimiter, boolean stripDelimiter); - - Iterable> split(byte delimiter, boolean stripDelimiter, List> preallocatedList); - - Iterable> split(Buffer delimiter); - - Iterable> split(Buffer delimiter, boolean stripDelimiter); - - Iterable> split(Buffer delimiter, boolean stripDelimiter, List> preallocatedList); - - Buffer prepend(B data); - - Buffer prepend(Buffer buffer); - - Buffer prepend(ByteBuffer buffer); - - Buffer prepend(CharSequence chars); - - Buffer prepend(byte[] bytes); - - Buffer prepend(byte b); - - Buffer prepend(char c); - - Buffer prepend(short s); - - Buffer prepend(int i); - - Buffer prepend(long l); - - Buffer append(B data); - - Buffer append(Buffer buffer); - - Buffer append(ByteBuffer buffer); - - Buffer append(CharSequence chars); - - Buffer append(byte[] bytes); - - Buffer append(byte b); - - Buffer append(char c); - - Buffer append(short s); - - Buffer append(int i); - - Buffer append(long l); - - byte readByte(); - - void readBytes(byte[] bytes); - - short readShort(); - - int readInt(); - - float readFloat(); - - double readDouble(); - - long readLong(); - - char readChar(); - - void readChars(char[] chars); - - String readString(); - - String readString(CharsetDecoder decoder); - - B get(); - -} diff --git a/ripc-core/src/main/java/io/ripc/core/io/BufferSupplier.java b/ripc-core/src/main/java/io/ripc/core/io/BufferSupplier.java deleted file mode 100644 index f55b339..0000000 --- a/ripc-core/src/main/java/io/ripc/core/io/BufferSupplier.java +++ /dev/null @@ -1,9 +0,0 @@ -package io.ripc.core.io; - -import io.ripc.core.Supplier; - -/** - * Created by jbrisbin on 3/18/15. - */ -public interface BufferSupplier extends Supplier> { -} diff --git a/ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/AbstractTcpConnectionEventHandler.java b/ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/AbstractTcpConnectionEventHandler.java new file mode 100644 index 0000000..7a7a0e1 --- /dev/null +++ b/ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/AbstractTcpConnectionEventHandler.java @@ -0,0 +1,48 @@ +package io.ripc.protocol.tcp; + +/** + * Created by jbrisbin on 3/26/15. + */ +public abstract class AbstractTcpConnectionEventHandler implements TcpConnectionEventHandler, CompleteEventHandler { + + @Override + public void onOpen(TcpConnection connection) { + // NO-OP + } + + @Override + public void onClose(TcpConnection connection) { + // NO-OP + } + + @Override + public void onAbort(TcpConnection connection) { + // NO-OP + } + + @Override + public void onError(TcpConnection connection, Throwable cause) { + // NO-OP + } + + @Override + public void onReadable(TcpConnection connection) { + // NO-OP + } + + @Override + public void onWritable(TcpConnection connection) { + // NO-OP + } + + @Override + public boolean onReadComplete(TcpConnection connection) { + return true; + } + + @Override + public boolean onWriteComplete(TcpConnection connection, Object msg) { + return false; + } + +} diff --git a/ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/CompleteEventHandler.java b/ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/CompleteEventHandler.java new file mode 100644 index 0000000..abdffba --- /dev/null +++ b/ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/CompleteEventHandler.java @@ -0,0 +1,12 @@ +package io.ripc.protocol.tcp; + +/** + * Created by jbrisbin on 3/26/15. + */ +public interface CompleteEventHandler { + + boolean onReadComplete(TcpConnection connection); + + boolean onWriteComplete(TcpConnection connection, Object msg); + +} diff --git a/ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/Connection.java b/ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/Connection.java deleted file mode 100644 index 30d9f0e..0000000 --- a/ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/Connection.java +++ /dev/null @@ -1,19 +0,0 @@ -package io.ripc.protocol.tcp; - -import io.ripc.core.io.Buffer; -import org.reactivestreams.Publisher; - -/** - * A {@code Connection} is a Reactive Streams {@link org.reactivestreams.Publisher} that provides subscribers with - * inbound data and exposes the {@link #write(org.reactivestreams.Publisher)} method for sending outbound data. - */ -public interface Connection extends Publisher> { - - /** - * Send outbound data using the Reactive Streams {@code Publisher} contract. - * - * @param data publisher of outbound data - */ - void write(Publisher> data); - -} diff --git a/ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/TcpConnection.java b/ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/TcpConnection.java new file mode 100644 index 0000000..0114df5 --- /dev/null +++ b/ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/TcpConnection.java @@ -0,0 +1,16 @@ +package io.ripc.protocol.tcp; + +import org.reactivestreams.Publisher; + +/** + * A {@code Connection} provides a reader for inbound data and a writer for outbound. + */ +public interface TcpConnection { + + TcpConnection eventHandler(TcpConnectionEventHandler eventHandler); + + Publisher reader(); + + TcpConnection writer(Publisher sink); + +} diff --git a/ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/TcpConnectionEventHandler.java b/ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/TcpConnectionEventHandler.java new file mode 100644 index 0000000..b2bd7ec --- /dev/null +++ b/ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/TcpConnectionEventHandler.java @@ -0,0 +1,20 @@ +package io.ripc.protocol.tcp; + +/** + * Created by jbrisbin on 3/26/15. + */ +public interface TcpConnectionEventHandler { + + void onOpen(TcpConnection connection); + + void onClose(TcpConnection connection); + + void onAbort(TcpConnection connection); + + void onError(TcpConnection connection, Throwable cause); + + void onReadable(TcpConnection connection); + + void onWritable(TcpConnection connection); + +} diff --git a/ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/ConnectionHandler.java b/ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/TcpConnectionHandler.java similarity index 67% rename from ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/ConnectionHandler.java rename to ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/TcpConnectionHandler.java index 49c391b..0965910 100644 --- a/ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/ConnectionHandler.java +++ b/ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/TcpConnectionHandler.java @@ -1,12 +1,13 @@ package io.ripc.protocol.tcp; -import io.ripc.core.Consumer; -import io.ripc.core.Supplier; - /** * A {@code ConnectionHandler} is responsible for composing a Reactive Streams pipeline(s) when a new connection is * received by the server. Implementations will compose an appropriate pipeline based on capabilities and server * configuration. */ -public interface ConnectionHandler extends Supplier>> { +@FunctionalInterface +public interface TcpConnectionHandler { + + void handle(TcpConnection connection); + } diff --git a/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/ByteBufBuffer.java b/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/ByteBufBuffer.java deleted file mode 100644 index 06e2747..0000000 --- a/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/ByteBufBuffer.java +++ /dev/null @@ -1,331 +0,0 @@ -package io.ripc.transport.netty4; - -import io.netty.buffer.ByteBuf; -import io.ripc.core.io.Buffer; - -import java.nio.ByteBuffer; -import java.nio.CharBuffer; -import java.nio.charset.CharacterCodingException; -import java.nio.charset.Charset; -import java.nio.charset.CharsetDecoder; -import java.util.Iterator; -import java.util.List; - -/** - * Created by jbrisbin on 3/10/15. - */ -public class ByteBufBuffer implements Buffer { - - private final ByteBuf buf; - private final boolean writable; - - public ByteBufBuffer(ByteBuf buf, boolean writable) { - this.buf = buf; - this.writable = writable; - } - - @Override - public int position() { - return (writable ? buf.writerIndex() : buf.readerIndex()); - } - - @Override - public Buffer position(int pos) { - return null; - } - - @Override - public int limit() { - return (writable ? buf.writableBytes() : buf.readableBytes()); - } - - @Override - public Buffer limit(int limit) { - return null; - } - - @Override - public int capacity() { - return buf.capacity(); - } - - @Override - public Buffer capacity(int capacity) { - buf.capacity(capacity); - return this; - } - - @Override - public int remaining() { - return capacity() - position(); - } - - @Override - public Buffer skip(int len) { - return null; - } - - @Override - public Buffer clear() { - return null; - } - - @Override - public Buffer compact() { - return null; - } - - @Override - public Buffer flip() { - return null; - } - - @Override - public Buffer rewind() { - return null; - } - - @Override - public Buffer rewind(int len) { - return null; - } - - @Override - public Buffer clone() { - return null; - } - - @Override - public Buffer copy() { - return new ByteBufBuffer(buf.copy(), writable); - } - - @Override - public Buffer slice(int start, int len) { - return new ByteBufBuffer(buf.copy(start, len), writable); - } - - @Override - public Iterable> split(byte delimiter) { - return null; - } - - @Override - public Iterable> split(byte delimiter, boolean stripDelimiter) { - return null; - } - - @Override - public Iterable> split(byte delimiter, - boolean stripDelimiter, - List> preallocatedList) { - return null; - } - - @Override - public Iterable> split(Buffer delimiter) { - return null; - } - - @Override - public Iterable> split(Buffer delimiter, boolean stripDelimiter) { - return null; - } - - @Override - public Iterable> split(Buffer delimiter, - boolean stripDelimiter, - List> preallocatedList) { - return null; - } - - @Override - public Buffer prepend(ByteBuf data) { - return null; - } - - @Override - public Buffer prepend(Buffer buffer) { - return null; - } - - @Override - public Buffer prepend(ByteBuffer buffer) { - return null; - } - - @Override - public Buffer prepend(CharSequence chars) { - return null; - } - - @Override - public Buffer prepend(byte[] bytes) { - return null; - } - - @Override - public Buffer prepend(byte b) { - return null; - } - - @Override - public Buffer prepend(char c) { - return null; - } - - @Override - public Buffer prepend(short s) { - return null; - } - - @Override - public Buffer prepend(int i) { - return null; - } - - @Override - public Buffer prepend(long l) { - return null; - } - - @Override - public Buffer append(ByteBuf data) { - return null; - } - - @Override - public Buffer append(Buffer buffer) { - return null; - } - - @Override - public Buffer append(ByteBuffer buffer) { - return null; - } - - @Override - public Buffer append(CharSequence chars) { - return null; - } - - @Override - public Buffer append(byte[] bytes) { - return null; - } - - @Override - public Buffer append(byte b) { - return null; - } - - @Override - public Buffer append(char c) { - return null; - } - - @Override - public Buffer append(short s) { - return null; - } - - @Override - public Buffer append(int i) { - return null; - } - - @Override - public Buffer append(long l) { - return null; - } - - @Override - public byte readByte() { - return 0; - } - - @Override - public void readBytes(byte[] bytes) { - - } - - @Override - public short readShort() { - return 0; - } - - @Override - public int readInt() { - return 0; - } - - @Override - public float readFloat() { - return 0; - } - - @Override - public double readDouble() { - return 0; - } - - @Override - public long readLong() { - return 0; - } - - @Override - public char readChar() { - return 0; - } - - @Override - public void readChars(char[] chars) { - - } - - @Override - public String readString() { - CharsetDecoder decoder = Charset.defaultCharset().newDecoder(); - try { - CharBuffer cb = decoder.decode(buf.nioBuffer()); - return cb.toString(); - } catch (CharacterCodingException e) { - throw new IllegalStateException(e); - } - } - - @Override - public String readString(CharsetDecoder decoder) { - return null; - } - - @Override - public ByteBuf get() { - return buf; - } - - @Override - public void close() throws Exception { - - } - - @Override - public int compareTo(Buffer o) { - return 0; - } - - @Override - public Iterator> iterator() { - return null; - } - - @Override - public String toString() { - return "ByteBufBuffer{" + - "buf=" + buf + - ", writable=" + writable + - '}'; - } - -} diff --git a/ripc-core/src/main/java/io/ripc/core/NamedDaemonThreadFactory.java b/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/NamedDaemonThreadFactory.java similarity index 94% rename from ripc-core/src/main/java/io/ripc/core/NamedDaemonThreadFactory.java rename to ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/NamedDaemonThreadFactory.java index 2f00e07..2267918 100644 --- a/ripc-core/src/main/java/io/ripc/core/NamedDaemonThreadFactory.java +++ b/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/NamedDaemonThreadFactory.java @@ -1,4 +1,4 @@ -package io.ripc.core; +package io.ripc.transport.netty4; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicLong; diff --git a/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/ChannelInboundHandlerSubscription.java b/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/ChannelInboundHandlerSubscription.java index b8e35e1..d34fa05 100644 --- a/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/ChannelInboundHandlerSubscription.java +++ b/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/ChannelInboundHandlerSubscription.java @@ -5,8 +5,6 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.ripc.core.Specification; -import io.ripc.core.io.Buffer; -import io.ripc.transport.netty4.ByteBufBuffer; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; @@ -20,12 +18,12 @@ public class ChannelInboundHandlerSubscription extends ChannelInboundHandlerAdap private static final AtomicLongFieldUpdater PEND_UPD = AtomicLongFieldUpdater.newUpdater(ChannelInboundHandlerSubscription.class, "pending"); - private final Channel channel; - private final Subscriber> subscriber; + private final Channel channel; + private final Subscriber subscriber; private volatile long pending = 0; - public ChannelInboundHandlerSubscription(Channel channel, Subscriber> subscriber) { + public ChannelInboundHandlerSubscription(Channel channel, Subscriber subscriber) { this.channel = channel; this.subscriber = subscriber; } @@ -64,7 +62,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception ByteBuf buf = (ByteBuf) msg; try { - subscriber.onNext(new ByteBufBuffer(buf, false)); + subscriber.onNext(buf); PEND_UPD.decrementAndGet(this); //channel.read(); } catch (Throwable t) { diff --git a/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/NettyChannelTcpConnectionEventHandler.java b/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/NettyChannelTcpConnectionEventHandler.java new file mode 100644 index 0000000..c160466 --- /dev/null +++ b/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/NettyChannelTcpConnectionEventHandler.java @@ -0,0 +1,85 @@ +package io.ripc.transport.netty4.tcp; + +import io.netty.channel.ChannelDuplexHandler; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPromise; +import io.ripc.protocol.tcp.CompleteEventHandler; +import io.ripc.protocol.tcp.TcpConnection; +import io.ripc.protocol.tcp.TcpConnectionEventHandler; + +/** + * Created by jbrisbin on 3/26/15. + */ +public class NettyChannelTcpConnectionEventHandler extends ChannelDuplexHandler { + + private final TcpConnectionEventHandler eventHandler; + private final TcpConnection connection; + private final CompleteEventHandler completeEventHandler; + + public NettyChannelTcpConnectionEventHandler(TcpConnectionEventHandler eventHandler, TcpConnection connection) { + this.eventHandler = eventHandler; + this.connection = connection; + this.completeEventHandler = eventHandler instanceof CompleteEventHandler + ? (CompleteEventHandler) eventHandler + : null; + } + + @Override + public void disconnect(ChannelHandlerContext ctx, ChannelPromise future) throws Exception { + eventHandler.onAbort(connection); + super.disconnect(ctx, future); + } + + @Override + public void close(ChannelHandlerContext ctx, ChannelPromise future) throws Exception { + eventHandler.onClose(connection); + super.close(ctx, future); + } + + @Override + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { + if (null != completeEventHandler) { + promise.addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (future.isSuccess() && completeEventHandler.onWriteComplete(connection, msg)) { + ctx.flush(); + } + } + }); + } + super.write(ctx, msg, promise); + } + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + eventHandler.onOpen(connection); + eventHandler.onReadable(connection); + super.channelActive(ctx); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + eventHandler.onError(connection, cause); + super.exceptionCaught(ctx, cause); + } + + @Override + public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { + eventHandler.onWritable(connection); + super.channelWritabilityChanged(ctx); + } + + @Override + public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { + if (null != completeEventHandler) { + if (!completeEventHandler.onReadComplete(connection)) { + ctx.close(); + } + } + super.channelReadComplete(ctx); + } + +} diff --git a/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/server/NettyTcpServer.java b/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/server/NettyTcpServer.java index 0795251..55ceff4 100644 --- a/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/server/NettyTcpServer.java +++ b/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/server/NettyTcpServer.java @@ -1,7 +1,6 @@ package io.ripc.transport.netty4.tcp.server; import io.netty.bootstrap.ServerBootstrap; -import io.netty.buffer.ByteBuf; import io.netty.buffer.PooledByteBufAllocator; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; @@ -9,8 +8,8 @@ import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.logging.LoggingHandler; -import io.ripc.core.NamedDaemonThreadFactory; -import io.ripc.protocol.tcp.ConnectionHandler; +import io.ripc.protocol.tcp.TcpConnectionHandler; +import io.ripc.transport.netty4.NamedDaemonThreadFactory; /** * Created by jbrisbin on 3/10/15. @@ -23,7 +22,7 @@ public NettyTcpServer(ServerBootstrap bootstrap) { this.bootstrap = bootstrap; } - public static NettyTcpServer listen(int port, ConnectionHandler handler) { + public static NettyTcpServer listen(int port, TcpConnectionHandler handler) { ServerBootstrap b = new ServerBootstrap(); int threads = Runtime.getRuntime().availableProcessors(); @@ -43,7 +42,7 @@ protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new LoggingHandler()); NettyTcpServerConnection conn = new NettyTcpServerConnection(ch); - handler.get().accept(conn); + handler.handle(conn); } }); diff --git a/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/server/NettyTcpServerConnection.java b/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/server/NettyTcpServerConnection.java index 79146c7..f5118b9 100644 --- a/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/server/NettyTcpServerConnection.java +++ b/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/server/NettyTcpServerConnection.java @@ -1,36 +1,58 @@ package io.ripc.transport.netty4.tcp.server; -import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; -import io.ripc.core.io.Buffer; -import io.ripc.protocol.tcp.Connection; +import io.ripc.core.DemandCalculator; +import io.ripc.protocol.tcp.TcpConnection; +import io.ripc.protocol.tcp.TcpConnectionEventHandler; import io.ripc.transport.netty4.tcp.ChannelInboundHandlerSubscription; +import io.ripc.transport.netty4.tcp.NettyChannelTcpConnectionEventHandler; import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; +import java.util.concurrent.atomic.AtomicLongFieldUpdater; + /** * Represents a Netty Channel. */ -public class NettyTcpServerConnection implements Connection { +public class NettyTcpServerConnection implements TcpConnection { + + private static final AtomicLongFieldUpdater PENDING_UPD + = AtomicLongFieldUpdater.newUpdater(WriteSubscriber.class, "pending"); + + private final Channel channel; + private final ReadPublisher readPublisher; - private final Channel channel; + private TcpConnectionEventHandler eventHandler; public NettyTcpServerConnection(Channel channel) { this.channel = channel; + this.readPublisher = new ReadPublisher(channel); } @Override - public void write(Publisher> data) { - data.subscribe(new WriteSubscriber()); + public TcpConnection eventHandler(TcpConnectionEventHandler eventHandler) { + if (null != this.eventHandler) { + throw new IllegalArgumentException(TcpConnectionEventHandler.class.getSimpleName() + + " already set on this connection"); + } + this.eventHandler = eventHandler; + channel.pipeline().addLast(new NettyChannelTcpConnectionEventHandler(eventHandler, this)); + return this; } @Override - public void subscribe(Subscriber> s) { - ChannelInboundHandlerSubscription sub = new ChannelInboundHandlerSubscription(channel, s); - s.onSubscribe(sub); - channel.pipeline() - .addLast(sub); + public Publisher reader() { + return readPublisher; + } + + @Override + public TcpConnection writer(Publisher sink) { + DemandCalculator demandCalculator = DemandCalculator.class.isAssignableFrom(sink.getClass()) + ? (DemandCalculator) sink + : null; + sink.subscribe(new WriteSubscriber(demandCalculator)); + return this; } @Override @@ -40,35 +62,79 @@ public String toString() { '}'; } - private final class WriteSubscriber implements Subscriber> { - Subscription subscription; + private final static class ReadPublisher implements Publisher { + private final Channel channel; + + public ReadPublisher(Channel channel) { + this.channel = channel; + } + + @Override + public void subscribe(Subscriber subscriber) { + ChannelInboundHandlerSubscription sub = new ChannelInboundHandlerSubscription(channel, subscriber); + subscriber.onSubscribe(sub); + channel.pipeline().addLast(sub); + } + } + + private final class WriteSubscriber implements Subscriber { + private final Runnable subscriptionRequest; + + private Subscription subscription; + + volatile long pending = 0L; + + private WriteSubscriber(DemandCalculator demandCalculator) { + this.subscriptionRequest = new Runnable() { + @Override + public void run() { + final long toRequest; + if (null != demandCalculator) { + toRequest = demandCalculator.calculateDemand(pending); + } else { + toRequest = 1L; + } + + if (toRequest == Long.MAX_VALUE) { + PENDING_UPD.set(WriteSubscriber.this, Long.MAX_VALUE); + subscription.request(Long.MAX_VALUE); + } else if (toRequest > 0) { + PENDING_UPD.addAndGet(WriteSubscriber.this, toRequest); + subscription.request(pending); + } + } + }; + } @Override - public void onSubscribe(Subscription s) { - if (null != subscription) { - s.cancel(); + public void onSubscribe(Subscription subscription) { + if (null != this.subscription) { + subscription.cancel(); return; } - (subscription = s).request(1); + + this.subscription = subscription; + subscriptionRequest.run(); } @Override - public void onNext(Buffer buffer) { - channel.write(buffer.get()); - // This causes a StackOverflowError - //subscription.request(1); - // This doesn't - channel.eventLoop().execute(() -> subscription.request(1)); + public void onNext(Object msg) { + channel.write(msg); + PENDING_UPD.decrementAndGet(this); + channel.eventLoop().execute(subscriptionRequest); } @Override public void onError(Throwable t) { - + if (null != eventHandler) { + eventHandler.onError(NettyTcpServerConnection.this, t); + } } @Override public void onComplete() { channel.flush(); + channel.close(); } } diff --git a/ripc-transport-netty4/src/test/java/io/ripc/transport/netty4/tcp/server/NettyTcpServerIntegrationTests.java b/ripc-transport-netty4/src/test/java/io/ripc/transport/netty4/tcp/server/NettyTcpServerIntegrationTests.java index 84a4e7f..f593266 100644 --- a/ripc-transport-netty4/src/test/java/io/ripc/transport/netty4/tcp/server/NettyTcpServerIntegrationTests.java +++ b/ripc-transport-netty4/src/test/java/io/ripc/transport/netty4/tcp/server/NettyTcpServerIntegrationTests.java @@ -1,11 +1,11 @@ package io.ripc.transport.netty4.tcp.server; -import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; -import io.ripc.core.io.Buffer; -import io.ripc.transport.netty4.ByteBufBuffer; +import io.ripc.core.Publishers; +import io.ripc.protocol.tcp.AbstractTcpConnectionEventHandler; +import io.ripc.protocol.tcp.TcpConnection; +import io.ripc.protocol.tcp.TcpConnectionEventHandler; import org.junit.Test; -import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; import org.slf4j.Logger; @@ -25,60 +25,45 @@ public class NettyTcpServerIntegrationTests { public void canStartNettyTcpServer() throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); - NettyTcpServer server = NettyTcpServer.listen(3000, () -> conn -> { - conn.subscribe(new Subscriber>() { - private Subscription subscription; - - @Override - public void onSubscribe(Subscription s) { - if (null != subscription) { - s.cancel(); - return; - } - (this.subscription = s).request(1); - } - - @Override - public void onNext(Buffer buffer) { - subscription.request(1); - } - - @Override - public void onError(Throwable t) { - t.printStackTrace(); - } - - @Override - public void onComplete() { - ByteBufBuffer out = new ByteBufBuffer(Unpooled.wrappedBuffer("Hello World".getBytes()), true); - - conn.write(new Publisher>() { - @Override - public void subscribe(Subscriber> s) { - s.onSubscribe(new Subscription() { - boolean written; - - @Override - public void request(long n) { - if (!written) { - s.onNext(out); - written = true; - } else { - s.onComplete(); - latch.countDown(); - } - } - - @Override - public void cancel() { - - } - }); - } - }); - } - }); - }); + TcpConnectionEventHandler eventHandler = new AbstractTcpConnectionEventHandler() { + @Override + public void onOpen(TcpConnection connection) { + connection.reader() + .subscribe(new Subscriber() { + Subscription subscription; + + @Override + public void onSubscribe(Subscription s) { + if (null != subscription) { + s.cancel(); + return; + } + this.subscription = s; + s.request(1); + } + + @Override + public void onNext(Object o) { + LOG.info("got msg: {}", o); + connection.writer(Publishers.just(Unpooled.wrappedBuffer("Hello World!".getBytes()))); + subscription.request(1); + } + + @Override + public void onError(Throwable t) { + LOG.error(t.getMessage(), t); + } + + @Override + public void onComplete() { + LOG.info("complete"); + latch.countDown(); + } + }); + } + }; + + NettyTcpServer server = NettyTcpServer.listen(3000, connection -> connection.eventHandler(eventHandler)); while (!latch.await(1, TimeUnit.SECONDS)) { Thread.sleep(1000); From 225218d75f114f637fb80cc04094bea35e13f8a1 Mon Sep 17 00:00:00 2001 From: Jon Brisbin Date: Thu, 26 Mar 2015 14:06:29 -0500 Subject: [PATCH 02/14] Change event handling to be "listener" based by recognizing specific listener types that are subclasses of a marker interface. --- .../AbstractTcpConnectionEventHandler.java | 48 ----------- .../protocol/tcp/CompleteEventHandler.java | 12 --- .../tcp/TcpConnectionEventHandler.java | 20 ----- .../tcp/{ => connection}/TcpConnection.java | 8 +- .../TcpConnectionEventListener.java | 7 ++ .../TcpConnectionHandler.java | 3 +- .../listener/WriteCompleteListener.java | 13 +++ ...NettyChannelTcpConnectionEventHandler.java | 85 ------------------- ...ConnectionEventListenerChannelHandler.java | 61 +++++++++++++ .../netty4/tcp/server/NettyTcpServer.java | 2 +- .../tcp/server/NettyTcpServerConnection.java | 38 +++++---- .../NettyTcpServerIntegrationTests.java | 23 ++--- 12 files changed, 119 insertions(+), 201 deletions(-) delete mode 100644 ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/AbstractTcpConnectionEventHandler.java delete mode 100644 ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/CompleteEventHandler.java delete mode 100644 ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/TcpConnectionEventHandler.java rename ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/{ => connection}/TcpConnection.java (56%) create mode 100644 ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/connection/TcpConnectionEventListener.java rename ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/{ => connection}/TcpConnectionHandler.java (86%) create mode 100644 ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/connection/listener/WriteCompleteListener.java delete mode 100644 ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/NettyChannelTcpConnectionEventHandler.java create mode 100644 ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/TcpConnectionEventListenerChannelHandler.java diff --git a/ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/AbstractTcpConnectionEventHandler.java b/ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/AbstractTcpConnectionEventHandler.java deleted file mode 100644 index 7a7a0e1..0000000 --- a/ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/AbstractTcpConnectionEventHandler.java +++ /dev/null @@ -1,48 +0,0 @@ -package io.ripc.protocol.tcp; - -/** - * Created by jbrisbin on 3/26/15. - */ -public abstract class AbstractTcpConnectionEventHandler implements TcpConnectionEventHandler, CompleteEventHandler { - - @Override - public void onOpen(TcpConnection connection) { - // NO-OP - } - - @Override - public void onClose(TcpConnection connection) { - // NO-OP - } - - @Override - public void onAbort(TcpConnection connection) { - // NO-OP - } - - @Override - public void onError(TcpConnection connection, Throwable cause) { - // NO-OP - } - - @Override - public void onReadable(TcpConnection connection) { - // NO-OP - } - - @Override - public void onWritable(TcpConnection connection) { - // NO-OP - } - - @Override - public boolean onReadComplete(TcpConnection connection) { - return true; - } - - @Override - public boolean onWriteComplete(TcpConnection connection, Object msg) { - return false; - } - -} diff --git a/ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/CompleteEventHandler.java b/ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/CompleteEventHandler.java deleted file mode 100644 index abdffba..0000000 --- a/ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/CompleteEventHandler.java +++ /dev/null @@ -1,12 +0,0 @@ -package io.ripc.protocol.tcp; - -/** - * Created by jbrisbin on 3/26/15. - */ -public interface CompleteEventHandler { - - boolean onReadComplete(TcpConnection connection); - - boolean onWriteComplete(TcpConnection connection, Object msg); - -} diff --git a/ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/TcpConnectionEventHandler.java b/ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/TcpConnectionEventHandler.java deleted file mode 100644 index b2bd7ec..0000000 --- a/ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/TcpConnectionEventHandler.java +++ /dev/null @@ -1,20 +0,0 @@ -package io.ripc.protocol.tcp; - -/** - * Created by jbrisbin on 3/26/15. - */ -public interface TcpConnectionEventHandler { - - void onOpen(TcpConnection connection); - - void onClose(TcpConnection connection); - - void onAbort(TcpConnection connection); - - void onError(TcpConnection connection, Throwable cause); - - void onReadable(TcpConnection connection); - - void onWritable(TcpConnection connection); - -} diff --git a/ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/TcpConnection.java b/ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/connection/TcpConnection.java similarity index 56% rename from ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/TcpConnection.java rename to ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/connection/TcpConnection.java index 0114df5..3ead9b3 100644 --- a/ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/TcpConnection.java +++ b/ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/connection/TcpConnection.java @@ -1,4 +1,4 @@ -package io.ripc.protocol.tcp; +package io.ripc.protocol.tcp.connection; import org.reactivestreams.Publisher; @@ -7,10 +7,10 @@ */ public interface TcpConnection { - TcpConnection eventHandler(TcpConnectionEventHandler eventHandler); - Publisher reader(); - TcpConnection writer(Publisher sink); + TcpConnection writer(Publisher writer); + + TcpConnection addListener(TcpConnectionEventListener listener); } diff --git a/ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/connection/TcpConnectionEventListener.java b/ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/connection/TcpConnectionEventListener.java new file mode 100644 index 0000000..74a59a1 --- /dev/null +++ b/ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/connection/TcpConnectionEventListener.java @@ -0,0 +1,7 @@ +package io.ripc.protocol.tcp.connection; + +/** + * Created by jbrisbin on 3/26/15. + */ +public interface TcpConnectionEventListener { +} diff --git a/ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/TcpConnectionHandler.java b/ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/connection/TcpConnectionHandler.java similarity index 86% rename from ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/TcpConnectionHandler.java rename to ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/connection/TcpConnectionHandler.java index 0965910..df6cb77 100644 --- a/ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/TcpConnectionHandler.java +++ b/ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/connection/TcpConnectionHandler.java @@ -1,11 +1,10 @@ -package io.ripc.protocol.tcp; +package io.ripc.protocol.tcp.connection; /** * A {@code ConnectionHandler} is responsible for composing a Reactive Streams pipeline(s) when a new connection is * received by the server. Implementations will compose an appropriate pipeline based on capabilities and server * configuration. */ -@FunctionalInterface public interface TcpConnectionHandler { void handle(TcpConnection connection); diff --git a/ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/connection/listener/WriteCompleteListener.java b/ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/connection/listener/WriteCompleteListener.java new file mode 100644 index 0000000..a32b4ef --- /dev/null +++ b/ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/connection/listener/WriteCompleteListener.java @@ -0,0 +1,13 @@ +package io.ripc.protocol.tcp.connection.listener; + +import io.ripc.protocol.tcp.connection.TcpConnection; +import io.ripc.protocol.tcp.connection.TcpConnectionEventListener; + +/** + * Created by jbrisbin on 3/26/15. + */ +public interface WriteCompleteListener extends TcpConnectionEventListener { + + boolean writeComplete(TcpConnection connection, long count, Object msg); + +} diff --git a/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/NettyChannelTcpConnectionEventHandler.java b/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/NettyChannelTcpConnectionEventHandler.java deleted file mode 100644 index c160466..0000000 --- a/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/NettyChannelTcpConnectionEventHandler.java +++ /dev/null @@ -1,85 +0,0 @@ -package io.ripc.transport.netty4.tcp; - -import io.netty.channel.ChannelDuplexHandler; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelFutureListener; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelPromise; -import io.ripc.protocol.tcp.CompleteEventHandler; -import io.ripc.protocol.tcp.TcpConnection; -import io.ripc.protocol.tcp.TcpConnectionEventHandler; - -/** - * Created by jbrisbin on 3/26/15. - */ -public class NettyChannelTcpConnectionEventHandler extends ChannelDuplexHandler { - - private final TcpConnectionEventHandler eventHandler; - private final TcpConnection connection; - private final CompleteEventHandler completeEventHandler; - - public NettyChannelTcpConnectionEventHandler(TcpConnectionEventHandler eventHandler, TcpConnection connection) { - this.eventHandler = eventHandler; - this.connection = connection; - this.completeEventHandler = eventHandler instanceof CompleteEventHandler - ? (CompleteEventHandler) eventHandler - : null; - } - - @Override - public void disconnect(ChannelHandlerContext ctx, ChannelPromise future) throws Exception { - eventHandler.onAbort(connection); - super.disconnect(ctx, future); - } - - @Override - public void close(ChannelHandlerContext ctx, ChannelPromise future) throws Exception { - eventHandler.onClose(connection); - super.close(ctx, future); - } - - @Override - public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { - if (null != completeEventHandler) { - promise.addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - if (future.isSuccess() && completeEventHandler.onWriteComplete(connection, msg)) { - ctx.flush(); - } - } - }); - } - super.write(ctx, msg, promise); - } - - @Override - public void channelActive(ChannelHandlerContext ctx) throws Exception { - eventHandler.onOpen(connection); - eventHandler.onReadable(connection); - super.channelActive(ctx); - } - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { - eventHandler.onError(connection, cause); - super.exceptionCaught(ctx, cause); - } - - @Override - public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { - eventHandler.onWritable(connection); - super.channelWritabilityChanged(ctx); - } - - @Override - public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { - if (null != completeEventHandler) { - if (!completeEventHandler.onReadComplete(connection)) { - ctx.close(); - } - } - super.channelReadComplete(ctx); - } - -} diff --git a/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/TcpConnectionEventListenerChannelHandler.java b/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/TcpConnectionEventListenerChannelHandler.java new file mode 100644 index 0000000..07f9a7d --- /dev/null +++ b/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/TcpConnectionEventListenerChannelHandler.java @@ -0,0 +1,61 @@ +package io.ripc.transport.netty4.tcp; + +import io.netty.channel.ChannelDuplexHandler; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPromise; +import io.ripc.protocol.tcp.connection.TcpConnection; +import io.ripc.protocol.tcp.connection.listener.WriteCompleteListener; + +import java.util.concurrent.atomic.AtomicLongFieldUpdater; + +/** + * Created by jbrisbin on 3/26/15. + */ +public class TcpConnectionEventListenerChannelHandler extends ChannelDuplexHandler { + + private static final AtomicLongFieldUpdater WRITTEN_UPD + = AtomicLongFieldUpdater.newUpdater(TcpConnectionEventListenerChannelHandler.class, "writtenSinceLastFlush"); + + private final TcpConnection connection; + + private volatile long writtenSinceLastFlush = 0L; + + private WriteCompleteListener writeCompleteListener; + + public TcpConnectionEventListenerChannelHandler(TcpConnection connection) { + this.connection = connection; + } + + public WriteCompleteListener getWriteCompleteListener() { + return writeCompleteListener; + } + + public void setWriteCompleteListener(WriteCompleteListener writeCompleteListener) { + this.writeCompleteListener = writeCompleteListener; + } + + @Override + public void flush(ChannelHandlerContext ctx) throws Exception { + super.flush(ctx); + WRITTEN_UPD.set(this, 0L); + } + + @Override + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { + if (null != writeCompleteListener) { + promise.addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (future.isSuccess() && writeCompleteListener.writeComplete(connection, writtenSinceLastFlush, msg)) { + ctx.flush(); + } + } + }); + } + super.write(ctx, msg, promise); + WRITTEN_UPD.incrementAndGet(this); + } + +} diff --git a/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/server/NettyTcpServer.java b/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/server/NettyTcpServer.java index 55ceff4..f81eb6e 100644 --- a/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/server/NettyTcpServer.java +++ b/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/server/NettyTcpServer.java @@ -8,7 +8,7 @@ import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.logging.LoggingHandler; -import io.ripc.protocol.tcp.TcpConnectionHandler; +import io.ripc.protocol.tcp.connection.TcpConnectionHandler; import io.ripc.transport.netty4.NamedDaemonThreadFactory; /** diff --git a/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/server/NettyTcpServerConnection.java b/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/server/NettyTcpServerConnection.java index f5118b9..46bda94 100644 --- a/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/server/NettyTcpServerConnection.java +++ b/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/server/NettyTcpServerConnection.java @@ -2,10 +2,11 @@ import io.netty.channel.Channel; import io.ripc.core.DemandCalculator; -import io.ripc.protocol.tcp.TcpConnection; -import io.ripc.protocol.tcp.TcpConnectionEventHandler; +import io.ripc.protocol.tcp.connection.TcpConnection; +import io.ripc.protocol.tcp.connection.TcpConnectionEventListener; +import io.ripc.protocol.tcp.connection.listener.WriteCompleteListener; import io.ripc.transport.netty4.tcp.ChannelInboundHandlerSubscription; -import io.ripc.transport.netty4.tcp.NettyChannelTcpConnectionEventHandler; +import io.ripc.transport.netty4.tcp.TcpConnectionEventListenerChannelHandler; import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; @@ -23,21 +24,24 @@ public class NettyTcpServerConnection implements TcpConnection { private final Channel channel; private final ReadPublisher readPublisher; - private TcpConnectionEventHandler eventHandler; - public NettyTcpServerConnection(Channel channel) { this.channel = channel; this.readPublisher = new ReadPublisher(channel); } @Override - public TcpConnection eventHandler(TcpConnectionEventHandler eventHandler) { - if (null != this.eventHandler) { - throw new IllegalArgumentException(TcpConnectionEventHandler.class.getSimpleName() - + " already set on this connection"); + public TcpConnection addListener(TcpConnectionEventListener listener) { + TcpConnectionEventListenerChannelHandler handler = + channel.pipeline().get(TcpConnectionEventListenerChannelHandler.class); + if (null == handler) { + handler = new TcpConnectionEventListenerChannelHandler(this); + channel.pipeline().addLast(handler); + } + + if (WriteCompleteListener.class.isAssignableFrom(listener.getClass())) { + handler.setWriteCompleteListener((WriteCompleteListener) listener); } - this.eventHandler = eventHandler; - channel.pipeline().addLast(new NettyChannelTcpConnectionEventHandler(eventHandler, this)); + return this; } @@ -47,11 +51,11 @@ public Publisher reader() { } @Override - public TcpConnection writer(Publisher sink) { - DemandCalculator demandCalculator = DemandCalculator.class.isAssignableFrom(sink.getClass()) - ? (DemandCalculator) sink + public TcpConnection writer(Publisher writer) { + DemandCalculator demandCalculator = DemandCalculator.class.isAssignableFrom(writer.getClass()) + ? (DemandCalculator) writer : null; - sink.subscribe(new WriteSubscriber(demandCalculator)); + writer.subscribe(new WriteSubscriber(demandCalculator)); return this; } @@ -126,9 +130,7 @@ public void onNext(Object msg) { @Override public void onError(Throwable t) { - if (null != eventHandler) { - eventHandler.onError(NettyTcpServerConnection.this, t); - } + channel.close(); } @Override diff --git a/ripc-transport-netty4/src/test/java/io/ripc/transport/netty4/tcp/server/NettyTcpServerIntegrationTests.java b/ripc-transport-netty4/src/test/java/io/ripc/transport/netty4/tcp/server/NettyTcpServerIntegrationTests.java index f593266..2dd7730 100644 --- a/ripc-transport-netty4/src/test/java/io/ripc/transport/netty4/tcp/server/NettyTcpServerIntegrationTests.java +++ b/ripc-transport-netty4/src/test/java/io/ripc/transport/netty4/tcp/server/NettyTcpServerIntegrationTests.java @@ -2,9 +2,8 @@ import io.netty.buffer.Unpooled; import io.ripc.core.Publishers; -import io.ripc.protocol.tcp.AbstractTcpConnectionEventHandler; -import io.ripc.protocol.tcp.TcpConnection; -import io.ripc.protocol.tcp.TcpConnectionEventHandler; +import io.ripc.protocol.tcp.connection.TcpConnection; +import io.ripc.protocol.tcp.connection.listener.WriteCompleteListener; import org.junit.Test; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; @@ -25,10 +24,16 @@ public class NettyTcpServerIntegrationTests { public void canStartNettyTcpServer() throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); - TcpConnectionEventHandler eventHandler = new AbstractTcpConnectionEventHandler() { + WriteCompleteListener writeCompleteListener = new WriteCompleteListener() { @Override - public void onOpen(TcpConnection connection) { - connection.reader() + public boolean writeComplete(TcpConnection connection, long count, Object msg) { + return count > 0; + } + }; + + NettyTcpServer server = NettyTcpServer.listen(3000, connection -> + connection.addListener(writeCompleteListener) + .reader() .subscribe(new Subscriber() { Subscription subscription; @@ -59,11 +64,7 @@ public void onComplete() { LOG.info("complete"); latch.countDown(); } - }); - } - }; - - NettyTcpServer server = NettyTcpServer.listen(3000, connection -> connection.eventHandler(eventHandler)); + })); while (!latch.await(1, TimeUnit.SECONDS)) { Thread.sleep(1000); From ece7303cc294b1b9d7e6049685db46b356bb2205 Mon Sep 17 00:00:00 2001 From: Jon Brisbin Date: Thu, 26 Mar 2015 15:12:37 -0500 Subject: [PATCH 03/14] Update javadoc to document what components do and what their intended role is. --- .../java/io/ripc/core/DemandCalculator.java | 11 ++- .../main/java/io/ripc/core/EventListener.java | 8 ++ .../src/main/java/io/ripc/core/Handler.java | 12 +++ .../java/io/ripc/core/SingletonPublisher.java | 4 +- .../main/java/io/ripc/core/Specification.java | 2 +- .../tcp/connection/TcpConnection.java | 23 ++++- .../TcpConnectionEventListener.java | 7 -- .../tcp/connection/TcpConnectionHandler.java | 12 --- .../listener/ReadCompleteListener.java | 23 +++++ .../listener/WriteCompleteListener.java | 16 +++- .../ChannelInboundHandlerSubscription.java | 8 +- ....java => EventListenerChannelHandler.java} | 29 ++++-- .../netty4/tcp/server/NettyTcpServer.java | 5 +- .../tcp/server/NettyTcpServerConnection.java | 42 ++++++--- .../NettyTcpServerIntegrationTests.java | 90 ++++++++++--------- 15 files changed, 204 insertions(+), 88 deletions(-) create mode 100644 ripc-core/src/main/java/io/ripc/core/EventListener.java create mode 100644 ripc-core/src/main/java/io/ripc/core/Handler.java delete mode 100644 ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/connection/TcpConnectionEventListener.java delete mode 100644 ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/connection/TcpConnectionHandler.java create mode 100644 ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/connection/listener/ReadCompleteListener.java rename ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/{TcpConnectionEventListenerChannelHandler.java => EventListenerChannelHandler.java} (59%) diff --git a/ripc-core/src/main/java/io/ripc/core/DemandCalculator.java b/ripc-core/src/main/java/io/ripc/core/DemandCalculator.java index 1b4e2c7..f928d21 100644 --- a/ripc-core/src/main/java/io/ripc/core/DemandCalculator.java +++ b/ripc-core/src/main/java/io/ripc/core/DemandCalculator.java @@ -1,10 +1,19 @@ package io.ripc.core; /** - * Created by jbrisbin on 3/26/15. + * A {@code DemandCalculator} implementation is responsible for calculating what the demand value should be to send to + * {@link org.reactivestreams.Subscription#request(long)}. */ public interface DemandCalculator { + /** + * Calculate demand based on current pending backlog. A value {@code < 1} means "don't make any new requests" since + * values less than {@code 1} are illegal according to the Reactive Streams spec. A value of {@code >= 1} means "use + * this value as the demand". + * + * @param pending outstanding backlog of previous demand accumulations + * @return < 1 to indicate no requests should be performed, >= 1 to indicate positive demand + */ long calculateDemand(long pending); } diff --git a/ripc-core/src/main/java/io/ripc/core/EventListener.java b/ripc-core/src/main/java/io/ripc/core/EventListener.java new file mode 100644 index 0000000..a8fd298 --- /dev/null +++ b/ripc-core/src/main/java/io/ripc/core/EventListener.java @@ -0,0 +1,8 @@ +package io.ripc.core; + +/** + * An {@code EventListener} is a generic component that has no default behavior. Components are meant to use specific + * subclasses of this interface to intercept events and perform appropriate behavior. + */ +public interface EventListener { +} diff --git a/ripc-core/src/main/java/io/ripc/core/Handler.java b/ripc-core/src/main/java/io/ripc/core/Handler.java new file mode 100644 index 0000000..45a1b92 --- /dev/null +++ b/ripc-core/src/main/java/io/ripc/core/Handler.java @@ -0,0 +1,12 @@ +package io.ripc.core; + +/** + * A {@code Handler} takes an arbitrary object as input and "handles" it. + */ +public interface Handler { + /** + * Handle the given object (do something useful with it). + * @param obj + */ + void handle(T obj); +} diff --git a/ripc-core/src/main/java/io/ripc/core/SingletonPublisher.java b/ripc-core/src/main/java/io/ripc/core/SingletonPublisher.java index b3ce506..474050d 100644 --- a/ripc-core/src/main/java/io/ripc/core/SingletonPublisher.java +++ b/ripc-core/src/main/java/io/ripc/core/SingletonPublisher.java @@ -7,7 +7,9 @@ import java.util.concurrent.atomic.AtomicBoolean; /** - * Created by jbrisbin on 3/26/15. + * A {@code SingletonPublisher} provides a single value only once and then calls {@code onComplete}. If the value is + * {@code null}, then the {@link org.reactivestreams.Subscriber#onNext(Object)} is not called but {@link + * org.reactivestreams.Subscriber#onComplete()} is. */ public class SingletonPublisher implements Publisher, DemandCalculator { diff --git a/ripc-core/src/main/java/io/ripc/core/Specification.java b/ripc-core/src/main/java/io/ripc/core/Specification.java index 4cd7786..994b01c 100644 --- a/ripc-core/src/main/java/io/ripc/core/Specification.java +++ b/ripc-core/src/main/java/io/ripc/core/Specification.java @@ -3,7 +3,7 @@ import org.reactivestreams.Subscriber; /** - * Created by jbrisbin on 3/10/15. + * Helper class to encapsulate various checks required by the Reactive Streams Specification. */ public abstract class Specification { diff --git a/ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/connection/TcpConnection.java b/ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/connection/TcpConnection.java index 3ead9b3..a94a635 100644 --- a/ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/connection/TcpConnection.java +++ b/ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/connection/TcpConnection.java @@ -1,5 +1,6 @@ package io.ripc.protocol.tcp.connection; +import io.ripc.core.EventListener; import org.reactivestreams.Publisher; /** @@ -7,10 +8,30 @@ */ public interface TcpConnection { + /** + * Receive the {@link org.reactivestreams.Publisher} which will be producing inbound data. This is intended to be + * composed into a more sophisticated processing pipeline by a higher layer of composition helpers. + * + * @return {@code this} + */ Publisher reader(); + /** + * Set the write {@link org.reactivestreams.Publisher} that will be used to send outbound data to the peer. Can only + * be invoked once. + * + * @param writer the {@link org.reactivestreams.Publisher} which will produce outbound data + * @return {@code this} + */ TcpConnection writer(Publisher writer); - TcpConnection addListener(TcpConnectionEventListener listener); + /** + * Add a type of {@link io.ripc.core.EventListener} to the connection which will respond to the events for which that + * specific listener type is responsible. What's available varies by protocol and server implementation. + * + * @param listener + * @return {@code this} + */ + TcpConnection addListener(EventListener listener); } diff --git a/ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/connection/TcpConnectionEventListener.java b/ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/connection/TcpConnectionEventListener.java deleted file mode 100644 index 74a59a1..0000000 --- a/ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/connection/TcpConnectionEventListener.java +++ /dev/null @@ -1,7 +0,0 @@ -package io.ripc.protocol.tcp.connection; - -/** - * Created by jbrisbin on 3/26/15. - */ -public interface TcpConnectionEventListener { -} diff --git a/ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/connection/TcpConnectionHandler.java b/ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/connection/TcpConnectionHandler.java deleted file mode 100644 index df6cb77..0000000 --- a/ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/connection/TcpConnectionHandler.java +++ /dev/null @@ -1,12 +0,0 @@ -package io.ripc.protocol.tcp.connection; - -/** - * A {@code ConnectionHandler} is responsible for composing a Reactive Streams pipeline(s) when a new connection is - * received by the server. Implementations will compose an appropriate pipeline based on capabilities and server - * configuration. - */ -public interface TcpConnectionHandler { - - void handle(TcpConnection connection); - -} diff --git a/ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/connection/listener/ReadCompleteListener.java b/ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/connection/listener/ReadCompleteListener.java new file mode 100644 index 0000000..ad75c0a --- /dev/null +++ b/ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/connection/listener/ReadCompleteListener.java @@ -0,0 +1,23 @@ +package io.ripc.protocol.tcp.connection.listener; + +import io.ripc.core.EventListener; +import io.ripc.protocol.tcp.connection.TcpConnection; + +/** + * An {@code EventListener} implementation that is used to receive notifications that an IO channel has completed the + * read of all available data. + */ +public interface ReadCompleteListener extends EventListener { + + /** + * Invoked when all data has been read from the given {@link io.ripc.protocol.tcp.connection.TcpConnection}. + * Returning + * {@code true} here means "I'm finished with the connection you may close it now" and returning {@code false} means + * "leave the connection open". + * + * @param connection the connection on which reads are complete + * @return {@code true} to close the connection, {@code false} to leave it open + */ + boolean readComplete(TcpConnection connection); + +} diff --git a/ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/connection/listener/WriteCompleteListener.java b/ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/connection/listener/WriteCompleteListener.java index a32b4ef..e59b958 100644 --- a/ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/connection/listener/WriteCompleteListener.java +++ b/ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/connection/listener/WriteCompleteListener.java @@ -1,13 +1,23 @@ package io.ripc.protocol.tcp.connection.listener; +import io.ripc.core.EventListener; import io.ripc.protocol.tcp.connection.TcpConnection; -import io.ripc.protocol.tcp.connection.TcpConnectionEventListener; /** - * Created by jbrisbin on 3/26/15. + * An {@code EventListener} implementation that is used to receive notifications that writes have been successfully + * completed on the underlying IO channel. */ -public interface WriteCompleteListener extends TcpConnectionEventListener { +public interface WriteCompleteListener extends EventListener { + /** + * Invoked every time a write to the IO channel is complete. Returning {@code false} here means "don't do any + * flushing" and returning {@code true} means "flush the underlying IO channel". + * + * @param connection the connection on which the write was made + * @param count the number of items that have been written since the last flush + * @param msg the last value written + * @return {@code true} to perform a flush of the channel, {@code false} otherwise + */ boolean writeComplete(TcpConnection connection, long count, Object msg); } diff --git a/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/ChannelInboundHandlerSubscription.java b/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/ChannelInboundHandlerSubscription.java index d34fa05..58ed8bd 100644 --- a/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/ChannelInboundHandlerSubscription.java +++ b/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/ChannelInboundHandlerSubscription.java @@ -1,6 +1,5 @@ package io.ripc.transport.netty4.tcp; -import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; @@ -11,7 +10,8 @@ import java.util.concurrent.atomic.AtomicLongFieldUpdater; /** - * Created by jbrisbin on 3/10/15. + * A {@code ChannelInboundHandlerAdapter} that is responsible for propagating data from the IO channel to the read + * {@link org.reactivestreams.Subscriber}. */ public class ChannelInboundHandlerSubscription extends ChannelInboundHandlerAdapter implements Subscription { @@ -60,11 +60,9 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception return; } - ByteBuf buf = (ByteBuf) msg; try { - subscriber.onNext(buf); + subscriber.onNext(msg); PEND_UPD.decrementAndGet(this); - //channel.read(); } catch (Throwable t) { subscriber.onError(t); } diff --git a/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/TcpConnectionEventListenerChannelHandler.java b/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/EventListenerChannelHandler.java similarity index 59% rename from ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/TcpConnectionEventListenerChannelHandler.java rename to ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/EventListenerChannelHandler.java index 07f9a7d..52dda94 100644 --- a/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/TcpConnectionEventListenerChannelHandler.java +++ b/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/EventListenerChannelHandler.java @@ -6,28 +6,39 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; import io.ripc.protocol.tcp.connection.TcpConnection; +import io.ripc.protocol.tcp.connection.listener.ReadCompleteListener; import io.ripc.protocol.tcp.connection.listener.WriteCompleteListener; import java.util.concurrent.atomic.AtomicLongFieldUpdater; /** - * Created by jbrisbin on 3/26/15. + * Listens for events on the Netty IO channel and invokes the appropriate {@link io.ripc.core.EventListener} for that + * type of event. */ -public class TcpConnectionEventListenerChannelHandler extends ChannelDuplexHandler { +public class EventListenerChannelHandler extends ChannelDuplexHandler { - private static final AtomicLongFieldUpdater WRITTEN_UPD - = AtomicLongFieldUpdater.newUpdater(TcpConnectionEventListenerChannelHandler.class, "writtenSinceLastFlush"); + private static final AtomicLongFieldUpdater WRITTEN_UPD + = AtomicLongFieldUpdater.newUpdater(EventListenerChannelHandler.class, "writtenSinceLastFlush"); private final TcpConnection connection; private volatile long writtenSinceLastFlush = 0L; + private ReadCompleteListener readCompleteListener; private WriteCompleteListener writeCompleteListener; - public TcpConnectionEventListenerChannelHandler(TcpConnection connection) { + public EventListenerChannelHandler(TcpConnection connection) { this.connection = connection; } + public ReadCompleteListener getReadCompleteListener() { + return readCompleteListener; + } + + public void setReadCompleteListener(ReadCompleteListener readCompleteListener) { + this.readCompleteListener = readCompleteListener; + } + public WriteCompleteListener getWriteCompleteListener() { return writeCompleteListener; } @@ -58,4 +69,12 @@ public void operationComplete(ChannelFuture future) throws Exception { WRITTEN_UPD.incrementAndGet(this); } + @Override + public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { + super.channelReadComplete(ctx); + if (null != readCompleteListener && readCompleteListener.readComplete(connection)) { + ctx.close(); + } + } + } diff --git a/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/server/NettyTcpServer.java b/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/server/NettyTcpServer.java index f81eb6e..231c028 100644 --- a/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/server/NettyTcpServer.java +++ b/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/server/NettyTcpServer.java @@ -8,7 +8,8 @@ import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.logging.LoggingHandler; -import io.ripc.protocol.tcp.connection.TcpConnectionHandler; +import io.ripc.core.Handler; +import io.ripc.protocol.tcp.connection.TcpConnection; import io.ripc.transport.netty4.NamedDaemonThreadFactory; /** @@ -22,7 +23,7 @@ public NettyTcpServer(ServerBootstrap bootstrap) { this.bootstrap = bootstrap; } - public static NettyTcpServer listen(int port, TcpConnectionHandler handler) { + public static NettyTcpServer listen(int port, Handler handler) { ServerBootstrap b = new ServerBootstrap(); int threads = Runtime.getRuntime().availableProcessors(); diff --git a/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/server/NettyTcpServerConnection.java b/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/server/NettyTcpServerConnection.java index 46bda94..4e023b0 100644 --- a/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/server/NettyTcpServerConnection.java +++ b/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/server/NettyTcpServerConnection.java @@ -2,11 +2,12 @@ import io.netty.channel.Channel; import io.ripc.core.DemandCalculator; +import io.ripc.core.EventListener; import io.ripc.protocol.tcp.connection.TcpConnection; -import io.ripc.protocol.tcp.connection.TcpConnectionEventListener; +import io.ripc.protocol.tcp.connection.listener.ReadCompleteListener; import io.ripc.protocol.tcp.connection.listener.WriteCompleteListener; import io.ripc.transport.netty4.tcp.ChannelInboundHandlerSubscription; -import io.ripc.transport.netty4.tcp.TcpConnectionEventListenerChannelHandler; +import io.ripc.transport.netty4.tcp.EventListenerChannelHandler; import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; @@ -14,7 +15,7 @@ import java.util.concurrent.atomic.AtomicLongFieldUpdater; /** - * Represents a Netty Channel. + * Represents a Reactive IPC {@code TcpConnection}. */ public class NettyTcpServerConnection implements TcpConnection { @@ -24,24 +25,34 @@ public class NettyTcpServerConnection implements TcpConnection { private final Channel channel; private final ReadPublisher readPublisher; + private WriteSubscriber writeSubscriber; + public NettyTcpServerConnection(Channel channel) { this.channel = channel; this.readPublisher = new ReadPublisher(channel); } @Override - public TcpConnection addListener(TcpConnectionEventListener listener) { - TcpConnectionEventListenerChannelHandler handler = - channel.pipeline().get(TcpConnectionEventListenerChannelHandler.class); + public TcpConnection addListener(EventListener listener) { + EventListenerChannelHandler handler = + channel.pipeline().get(EventListenerChannelHandler.class); if (null == handler) { - handler = new TcpConnectionEventListenerChannelHandler(this); + handler = new EventListenerChannelHandler(this); channel.pipeline().addLast(handler); } - if (WriteCompleteListener.class.isAssignableFrom(listener.getClass())) { + Class listenerType = listener.getClass(); + + // Assign WriteCompleteListener that will be notified of successful writes. + if (WriteCompleteListener.class.isAssignableFrom(listenerType)) { handler.setWriteCompleteListener((WriteCompleteListener) listener); } + // Assign a ReadCompleteListener that will be notified when all data has been read. + if (ReadCompleteListener.class.isAssignableFrom(listenerType)) { + handler.setReadCompleteListener((ReadCompleteListener) listener); + } + return this; } @@ -52,10 +63,15 @@ public Publisher reader() { @Override public TcpConnection writer(Publisher writer) { + if (null != writeSubscriber) { + throw new IllegalStateException("A writer has already been set on this connection"); + } + DemandCalculator demandCalculator = DemandCalculator.class.isAssignableFrom(writer.getClass()) ? (DemandCalculator) writer : null; - writer.subscribe(new WriteSubscriber(demandCalculator)); + this.writeSubscriber = new WriteSubscriber(demandCalculator); + writer.subscribe(writeSubscriber); return this; } @@ -99,7 +115,7 @@ public void run() { toRequest = 1L; } - if (toRequest == Long.MAX_VALUE) { + if (toRequest == Long.MAX_VALUE && pending != Long.MAX_VALUE) { PENDING_UPD.set(WriteSubscriber.this, Long.MAX_VALUE); subscription.request(Long.MAX_VALUE); } else if (toRequest > 0) { @@ -118,13 +134,17 @@ public void onSubscribe(Subscription subscription) { } this.subscription = subscription; + // Request for any writes right away. subscriptionRequest.run(); } @Override public void onNext(Object msg) { + // Write to the channel for every onNext signal. channel.write(msg); + // Decrement pending counter to show we've handled this write. PENDING_UPD.decrementAndGet(this); + // We'll get a StackOverflowError if we don't schedule this request. channel.eventLoop().execute(subscriptionRequest); } @@ -135,7 +155,9 @@ public void onError(Throwable t) { @Override public void onComplete() { + // Write completes always result in a flush. channel.flush(); + // This is a terminal state, so close the underlying channel. channel.close(); } } diff --git a/ripc-transport-netty4/src/test/java/io/ripc/transport/netty4/tcp/server/NettyTcpServerIntegrationTests.java b/ripc-transport-netty4/src/test/java/io/ripc/transport/netty4/tcp/server/NettyTcpServerIntegrationTests.java index 2dd7730..a45b067 100644 --- a/ripc-transport-netty4/src/test/java/io/ripc/transport/netty4/tcp/server/NettyTcpServerIntegrationTests.java +++ b/ripc-transport-netty4/src/test/java/io/ripc/transport/netty4/tcp/server/NettyTcpServerIntegrationTests.java @@ -2,9 +2,10 @@ import io.netty.buffer.Unpooled; import io.ripc.core.Publishers; -import io.ripc.protocol.tcp.connection.TcpConnection; +import io.ripc.protocol.tcp.connection.listener.ReadCompleteListener; import io.ripc.protocol.tcp.connection.listener.WriteCompleteListener; import org.junit.Test; +import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; import org.slf4j.Logger; @@ -14,7 +15,7 @@ import java.util.concurrent.TimeUnit; /** - * Created by jbrisbin on 3/10/15. + * Integration tests that check the Netty implementation of Reactive IPC. */ public class NettyTcpServerIntegrationTests { @@ -24,47 +25,56 @@ public class NettyTcpServerIntegrationTests { public void canStartNettyTcpServer() throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); - WriteCompleteListener writeCompleteListener = new WriteCompleteListener() { - @Override - public boolean writeComplete(TcpConnection connection, long count, Object msg) { - return count > 0; - } + // This is going to start writing as soon as a request is made, so don't attach it right away. + Publisher writer = Publishers.just(Unpooled.wrappedBuffer("Hello World!".getBytes())); + + WriteCompleteListener writeCompleteListener = (connection, count, msg) -> { + // Flush after every write. + return count > 0; + }; + + ReadCompleteListener readCompleteListener = (connection) -> { + // only add output writer after everything's been read from the client + connection.writer(writer); + // don't close the connection when you've read everything + return false; }; NettyTcpServer server = NettyTcpServer.listen(3000, connection -> - connection.addListener(writeCompleteListener) - .reader() - .subscribe(new Subscriber() { - Subscription subscription; - - @Override - public void onSubscribe(Subscription s) { - if (null != subscription) { - s.cancel(); - return; - } - this.subscription = s; - s.request(1); - } - - @Override - public void onNext(Object o) { - LOG.info("got msg: {}", o); - connection.writer(Publishers.just(Unpooled.wrappedBuffer("Hello World!".getBytes()))); - subscription.request(1); - } - - @Override - public void onError(Throwable t) { - LOG.error(t.getMessage(), t); - } - - @Override - public void onComplete() { - LOG.info("complete"); - latch.countDown(); - } - })); + connection + .addListener(writeCompleteListener) + .addListener(readCompleteListener) + .reader() + .subscribe(new Subscriber() { + Subscription subscription; + + @Override + public void onSubscribe(Subscription s) { + if (null != subscription) { + s.cancel(); + return; + } + this.subscription = s; + s.request(1); + } + + @Override + public void onNext(Object o) { + LOG.info("got msg: {}", o); + subscription.request(1); + } + + @Override + public void onError(Throwable t) { + LOG.error(t.getMessage(), t); + } + + @Override + public void onComplete() { + LOG.info("complete"); + latch.countDown(); + } + })); while (!latch.await(1, TimeUnit.SECONDS)) { Thread.sleep(1000); From 5061ab52e471beb716b69fd968c0d7c7a2fa53eb Mon Sep 17 00:00:00 2001 From: Jon Brisbin Date: Fri, 27 Mar 2015 11:10:24 -0500 Subject: [PATCH 04/14] Explicitly use `Object` rather than `?` since the generics on the RS Subscriber interface are `? super T` which, when used with `Publisher`, become `Subscriber` --- .../java/io/ripc/protocol/tcp/connection/TcpConnection.java | 4 ++-- .../transport/netty4/tcp/server/NettyTcpServerConnection.java | 4 ++-- .../netty4/tcp/server/NettyTcpServerIntegrationTests.java | 3 ++- 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/connection/TcpConnection.java b/ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/connection/TcpConnection.java index a94a635..40d937a 100644 --- a/ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/connection/TcpConnection.java +++ b/ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/connection/TcpConnection.java @@ -14,7 +14,7 @@ public interface TcpConnection { * * @return {@code this} */ - Publisher reader(); + Publisher reader(); /** * Set the write {@link org.reactivestreams.Publisher} that will be used to send outbound data to the peer. Can only @@ -23,7 +23,7 @@ public interface TcpConnection { * @param writer the {@link org.reactivestreams.Publisher} which will produce outbound data * @return {@code this} */ - TcpConnection writer(Publisher writer); + TcpConnection writer(Publisher writer); /** * Add a type of {@link io.ripc.core.EventListener} to the connection which will respond to the events for which that diff --git a/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/server/NettyTcpServerConnection.java b/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/server/NettyTcpServerConnection.java index 4e023b0..e427895 100644 --- a/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/server/NettyTcpServerConnection.java +++ b/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/server/NettyTcpServerConnection.java @@ -57,12 +57,12 @@ public TcpConnection addListener(EventListener listener) { } @Override - public Publisher reader() { + public Publisher reader() { return readPublisher; } @Override - public TcpConnection writer(Publisher writer) { + public TcpConnection writer(Publisher writer) { if (null != writeSubscriber) { throw new IllegalStateException("A writer has already been set on this connection"); } diff --git a/ripc-transport-netty4/src/test/java/io/ripc/transport/netty4/tcp/server/NettyTcpServerIntegrationTests.java b/ripc-transport-netty4/src/test/java/io/ripc/transport/netty4/tcp/server/NettyTcpServerIntegrationTests.java index a45b067..bf0dca0 100644 --- a/ripc-transport-netty4/src/test/java/io/ripc/transport/netty4/tcp/server/NettyTcpServerIntegrationTests.java +++ b/ripc-transport-netty4/src/test/java/io/ripc/transport/netty4/tcp/server/NettyTcpServerIntegrationTests.java @@ -21,12 +21,13 @@ public class NettyTcpServerIntegrationTests { private static final Logger LOG = LoggerFactory.getLogger(NettyTcpServerIntegrationTests.class); + @SuppressWarnings("unchecked") @Test public void canStartNettyTcpServer() throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); // This is going to start writing as soon as a request is made, so don't attach it right away. - Publisher writer = Publishers.just(Unpooled.wrappedBuffer("Hello World!".getBytes())); + Publisher writer = Publishers.just(Unpooled.wrappedBuffer("Hello World!".getBytes())); WriteCompleteListener writeCompleteListener = (connection, count, msg) -> { // Flush after every write. From e36c46170568fbdd83b819ad0ae4cf6566c569cc Mon Sep 17 00:00:00 2001 From: Jon Brisbin Date: Fri, 27 Mar 2015 11:11:01 -0500 Subject: [PATCH 05/14] Added Reactor composition layer for example usage. --- build.gradle | 13 ++++++ .../reactor/tcp/ReactorTcpServer.java | 43 +++++++++++++++++++ .../tcp/connection/ReactorTcpConnection.java | 31 +++++++++++++ .../ReactorTcpServerIntegrationTests.java | 42 ++++++++++++++++++ settings.gradle | 3 +- 5 files changed, 131 insertions(+), 1 deletion(-) create mode 100644 ripc-composition-reactor/src/main/java/io/ripc/composition/reactor/tcp/ReactorTcpServer.java create mode 100644 ripc-composition-reactor/src/main/java/io/ripc/composition/reactor/tcp/connection/ReactorTcpConnection.java create mode 100644 ripc-composition-reactor/src/test/java/io/ripc/composition/reactor/ReactorTcpServerIntegrationTests.java diff --git a/build.gradle b/build.gradle index 5c96b99..a2e6afc 100644 --- a/build.gradle +++ b/build.gradle @@ -13,6 +13,7 @@ ext { // Composition Libraries rxjava1Version = '1.0.8' + reactorVersion = '2.0.0.RELEASE' // Testing mockitoVersion = '1.10.19' @@ -59,6 +60,7 @@ allprojects { } repositories { + maven { url 'http://repo.spring.io/libs-release' } jcenter() mavenCentral() } @@ -113,6 +115,17 @@ project('ripc-composition-rxjava1') { } } +project('ripc-composition-reactor') { + description = 'Reactive IPC Composition Layer Implementation' + dependencies { + // ripc-tcp + compile project(":ripc-transport-netty4") + + // Reactor + compile "io.projectreactor:reactor-stream:$reactorVersion" + } +} + configure(rootProject) { description = "Reactive IPC" diff --git a/ripc-composition-reactor/src/main/java/io/ripc/composition/reactor/tcp/ReactorTcpServer.java b/ripc-composition-reactor/src/main/java/io/ripc/composition/reactor/tcp/ReactorTcpServer.java new file mode 100644 index 0000000..b35bc4f --- /dev/null +++ b/ripc-composition-reactor/src/main/java/io/ripc/composition/reactor/tcp/ReactorTcpServer.java @@ -0,0 +1,43 @@ +package io.ripc.composition.reactor.tcp; + +import io.ripc.composition.reactor.tcp.connection.ReactorTcpConnection; +import io.ripc.transport.netty4.tcp.server.NettyTcpServer; +import org.reactivestreams.Subscriber; +import reactor.rx.Stream; +import reactor.rx.broadcast.Broadcaster; + +/** + * Created by jbrisbin on 3/27/15. + */ +public class ReactorTcpServer extends Stream> { + + private final Broadcaster> connections; + + private NettyTcpServer server; + + public ReactorTcpServer(Broadcaster> connections) { + this.connections = connections; + } + + public static ReactorTcpServer listen(int port, Class type) { + Broadcaster> connections = Broadcaster.create(); + ReactorTcpServer server = new ReactorTcpServer<>(connections); + + server.server = NettyTcpServer.listen(port, connection -> { + ReactorTcpConnection conn = new ReactorTcpConnection<>(connection, type); + server.connections.onNext(conn); + }); + + return server; + } + + public void shutdown() { + server.shutdown(); + } + + @Override + public void subscribe(Subscriber> s) { + connections.subscribe(s); + } + +} diff --git a/ripc-composition-reactor/src/main/java/io/ripc/composition/reactor/tcp/connection/ReactorTcpConnection.java b/ripc-composition-reactor/src/main/java/io/ripc/composition/reactor/tcp/connection/ReactorTcpConnection.java new file mode 100644 index 0000000..6c78ef7 --- /dev/null +++ b/ripc-composition-reactor/src/main/java/io/ripc/composition/reactor/tcp/connection/ReactorTcpConnection.java @@ -0,0 +1,31 @@ +package io.ripc.composition.reactor.tcp.connection; + +import io.ripc.protocol.tcp.connection.TcpConnection; +import reactor.rx.Stream; +import reactor.rx.Streams; +import reactor.rx.broadcast.Broadcaster; + +/** + * Created by jbrisbin on 3/27/15. + */ +public class ReactorTcpConnection { + + private final Stream in; + private final Broadcaster out; + + @SuppressWarnings("unchecked") + public ReactorTcpConnection(TcpConnection connection, Class type) { + this.in = Streams.wrap(connection.reader()).map(type::cast); + this.out = Broadcaster.create(); + connection.writer(out.map(Object.class::cast)); + } + + public Stream in() { + return in; + } + + public Broadcaster out() { + return out; + } + +} diff --git a/ripc-composition-reactor/src/test/java/io/ripc/composition/reactor/ReactorTcpServerIntegrationTests.java b/ripc-composition-reactor/src/test/java/io/ripc/composition/reactor/ReactorTcpServerIntegrationTests.java new file mode 100644 index 0000000..d7bea05 --- /dev/null +++ b/ripc-composition-reactor/src/test/java/io/ripc/composition/reactor/ReactorTcpServerIntegrationTests.java @@ -0,0 +1,42 @@ +package io.ripc.composition.reactor; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.ripc.composition.reactor.tcp.ReactorTcpServer; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.charset.Charset; + +/** + * Created by jbrisbin on 3/27/15. + */ +public class ReactorTcpServerIntegrationTests { + + private static final Logger LOG = LoggerFactory.getLogger(ReactorTcpServerIntegrationTests.class); + + @Test + public void reactorTcpServerAcceptsData() throws InterruptedException { + ReactorTcpServer.listen(3000, ByteBuf.class) + .log("connection") + .consume(conn -> { + conn.in() + .log("in") + .map(buf -> buf.toString(Charset.defaultCharset())) + .consume(s -> { + LOG.info("received {}: {}", s.getClass(), s); + + conn.out().onComplete(); + }); + + conn.out() + .onNext(Unpooled.wrappedBuffer("Hello World!".getBytes())); + }); + + while (true) { + Thread.sleep(500); + } + } + +} diff --git a/settings.gradle b/settings.gradle index 4c827f1..a821f99 100644 --- a/settings.gradle +++ b/settings.gradle @@ -3,4 +3,5 @@ rootProject.name = 'reactive-ipc' include 'ripc-core', 'ripc-protocol-tcp', 'ripc-transport-netty4', - 'ripc-composition-rxjava1' + 'ripc-composition-rxjava1', + 'ripc-composition-reactor' From f9795cb64356b08b922ca1e606e522291035ec97 Mon Sep 17 00:00:00 2001 From: Jon Brisbin Date: Fri, 27 Mar 2015 11:40:07 -0500 Subject: [PATCH 06/14] Change to having the Publisher passed in rather than returning a Broadcaster. --- .../tcp/connection/ReactorTcpConnection.java | 13 ++++++------- .../reactor/ReactorTcpServerIntegrationTests.java | 8 ++++---- 2 files changed, 10 insertions(+), 11 deletions(-) diff --git a/ripc-composition-reactor/src/main/java/io/ripc/composition/reactor/tcp/connection/ReactorTcpConnection.java b/ripc-composition-reactor/src/main/java/io/ripc/composition/reactor/tcp/connection/ReactorTcpConnection.java index 6c78ef7..6150890 100644 --- a/ripc-composition-reactor/src/main/java/io/ripc/composition/reactor/tcp/connection/ReactorTcpConnection.java +++ b/ripc-composition-reactor/src/main/java/io/ripc/composition/reactor/tcp/connection/ReactorTcpConnection.java @@ -3,29 +3,28 @@ import io.ripc.protocol.tcp.connection.TcpConnection; import reactor.rx.Stream; import reactor.rx.Streams; -import reactor.rx.broadcast.Broadcaster; /** * Created by jbrisbin on 3/27/15. */ public class ReactorTcpConnection { - private final Stream in; - private final Broadcaster out; + private final TcpConnection connection; + private final Stream in; @SuppressWarnings("unchecked") public ReactorTcpConnection(TcpConnection connection, Class type) { + this.connection = connection; this.in = Streams.wrap(connection.reader()).map(type::cast); - this.out = Broadcaster.create(); - connection.writer(out.map(Object.class::cast)); } public Stream in() { return in; } - public Broadcaster out() { - return out; + public ReactorTcpConnection out(Stream out) { + connection.writer(out.map(Object.class::cast)); + return this; } } diff --git a/ripc-composition-reactor/src/test/java/io/ripc/composition/reactor/ReactorTcpServerIntegrationTests.java b/ripc-composition-reactor/src/test/java/io/ripc/composition/reactor/ReactorTcpServerIntegrationTests.java index d7bea05..b77dc77 100644 --- a/ripc-composition-reactor/src/test/java/io/ripc/composition/reactor/ReactorTcpServerIntegrationTests.java +++ b/ripc-composition-reactor/src/test/java/io/ripc/composition/reactor/ReactorTcpServerIntegrationTests.java @@ -6,6 +6,7 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import reactor.rx.Streams; import java.nio.charset.Charset; @@ -26,12 +27,11 @@ public void reactorTcpServerAcceptsData() throws InterruptedException { .map(buf -> buf.toString(Charset.defaultCharset())) .consume(s -> { LOG.info("received {}: {}", s.getClass(), s); - - conn.out().onComplete(); }); - conn.out() - .onNext(Unpooled.wrappedBuffer("Hello World!".getBytes())); + conn.out(Streams.just("Hello World!") + .log("out") + .map(s -> Unpooled.wrappedBuffer(s.getBytes()))); }); while (true) { From 00f3a85fb875dfb98ac26ebce98c2dc9342af54a Mon Sep 17 00:00:00 2001 From: Jon Brisbin Date: Fri, 27 Mar 2015 13:53:53 -0500 Subject: [PATCH 07/14] Experiment with having write notifications sent to a Stream returned from out() in the Reactor compo layer. --- .../reactor/tcp/ReactorTcpServer.java | 8 +++----- .../tcp/connection/ReactorTcpConnection.java | 17 +++++++++++++++-- .../ReactorTcpServerIntegrationTests.java | 19 +++---------------- 3 files changed, 21 insertions(+), 23 deletions(-) diff --git a/ripc-composition-reactor/src/main/java/io/ripc/composition/reactor/tcp/ReactorTcpServer.java b/ripc-composition-reactor/src/main/java/io/ripc/composition/reactor/tcp/ReactorTcpServer.java index b35bc4f..a9fab4f 100644 --- a/ripc-composition-reactor/src/main/java/io/ripc/composition/reactor/tcp/ReactorTcpServer.java +++ b/ripc-composition-reactor/src/main/java/io/ripc/composition/reactor/tcp/ReactorTcpServer.java @@ -11,17 +11,15 @@ */ public class ReactorTcpServer extends Stream> { - private final Broadcaster> connections; + private final Broadcaster> connections = Broadcaster.create(); private NettyTcpServer server; - public ReactorTcpServer(Broadcaster> connections) { - this.connections = connections; + public ReactorTcpServer() { } public static ReactorTcpServer listen(int port, Class type) { - Broadcaster> connections = Broadcaster.create(); - ReactorTcpServer server = new ReactorTcpServer<>(connections); + ReactorTcpServer server = new ReactorTcpServer<>(); server.server = NettyTcpServer.listen(port, connection -> { ReactorTcpConnection conn = new ReactorTcpConnection<>(connection, type); diff --git a/ripc-composition-reactor/src/main/java/io/ripc/composition/reactor/tcp/connection/ReactorTcpConnection.java b/ripc-composition-reactor/src/main/java/io/ripc/composition/reactor/tcp/connection/ReactorTcpConnection.java index 6150890..aba18d2 100644 --- a/ripc-composition-reactor/src/main/java/io/ripc/composition/reactor/tcp/connection/ReactorTcpConnection.java +++ b/ripc-composition-reactor/src/main/java/io/ripc/composition/reactor/tcp/connection/ReactorTcpConnection.java @@ -1,30 +1,43 @@ package io.ripc.composition.reactor.tcp.connection; import io.ripc.protocol.tcp.connection.TcpConnection; +import io.ripc.protocol.tcp.connection.listener.WriteCompleteListener; import reactor.rx.Stream; import reactor.rx.Streams; +import reactor.rx.broadcast.Broadcaster; /** * Created by jbrisbin on 3/27/15. */ public class ReactorTcpConnection { + private final Broadcaster writeComplete = Broadcaster.create(); + private final TcpConnection connection; private final Stream in; @SuppressWarnings("unchecked") public ReactorTcpConnection(TcpConnection connection, Class type) { this.connection = connection; + this.in = Streams.wrap(connection.reader()).map(type::cast); + this.connection.addListener(new WriteCompleteListener() { + @SuppressWarnings("unchecked") + @Override + public boolean writeComplete(TcpConnection connection, long count, Object msg) { + writeComplete.onNext(type.cast(msg)); + return false; + } + }); } public Stream in() { return in; } - public ReactorTcpConnection out(Stream out) { + public Stream out(Stream out) { connection.writer(out.map(Object.class::cast)); - return this; + return writeComplete; } } diff --git a/ripc-composition-reactor/src/test/java/io/ripc/composition/reactor/ReactorTcpServerIntegrationTests.java b/ripc-composition-reactor/src/test/java/io/ripc/composition/reactor/ReactorTcpServerIntegrationTests.java index b77dc77..6355786 100644 --- a/ripc-composition-reactor/src/test/java/io/ripc/composition/reactor/ReactorTcpServerIntegrationTests.java +++ b/ripc-composition-reactor/src/test/java/io/ripc/composition/reactor/ReactorTcpServerIntegrationTests.java @@ -1,14 +1,10 @@ package io.ripc.composition.reactor; import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; import io.ripc.composition.reactor.tcp.ReactorTcpServer; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import reactor.rx.Streams; - -import java.nio.charset.Charset; /** * Created by jbrisbin on 3/27/15. @@ -21,18 +17,9 @@ public class ReactorTcpServerIntegrationTests { public void reactorTcpServerAcceptsData() throws InterruptedException { ReactorTcpServer.listen(3000, ByteBuf.class) .log("connection") - .consume(conn -> { - conn.in() - .log("in") - .map(buf -> buf.toString(Charset.defaultCharset())) - .consume(s -> { - LOG.info("received {}: {}", s.getClass(), s); - }); - - conn.out(Streams.just("Hello World!") - .log("out") - .map(s -> Unpooled.wrappedBuffer(s.getBytes()))); - }); + .consume(conn -> conn.out(conn.in().log("in")) + .log("confirmation") + .consume(buf -> LOG.info("write confirmed: {}", buf))); while (true) { Thread.sleep(500); From aff20d1a285a51d8932f6a45c1eecdc52c9531bd Mon Sep 17 00:00:00 2001 From: Jon Brisbin Date: Fri, 27 Mar 2015 14:54:22 -0500 Subject: [PATCH 08/14] Only decrement if pending != Long.MAX_VALUE --- .../netty4/tcp/ChannelInboundHandlerSubscription.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/ChannelInboundHandlerSubscription.java b/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/ChannelInboundHandlerSubscription.java index 58ed8bd..11a2882 100644 --- a/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/ChannelInboundHandlerSubscription.java +++ b/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/ChannelInboundHandlerSubscription.java @@ -62,7 +62,9 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception try { subscriber.onNext(msg); - PEND_UPD.decrementAndGet(this); + if (pending < Long.MAX_VALUE) { + PEND_UPD.decrementAndGet(this); + } } catch (Throwable t) { subscriber.onError(t); } From fd0ace9435c3633fc6655dc10b08e15cbe3b86a4 Mon Sep 17 00:00:00 2001 From: Jon Brisbin Date: Fri, 27 Mar 2015 14:58:18 -0500 Subject: [PATCH 09/14] Introduce the Interceptor and add an interface for TcpServer. Allows composition layer code to intercept the connection and provide their own type of connection to be handled by the handler. --- .../reactor/tcp/ReactorTcpServer.java | 19 +++--- .../ReactorTcpServerIntegrationTests.java | 24 ++++++-- .../main/java/io/ripc/core/Interceptor.java | 16 +++++ .../java/io/ripc/protocol/tcp/TcpServer.java | 38 ++++++++++++ .../netty4/tcp/server/NettyTcpServer.java | 60 ++++++++++++++----- .../NettyTcpServerIntegrationTests.java | 9 ++- 6 files changed, 132 insertions(+), 34 deletions(-) create mode 100644 ripc-core/src/main/java/io/ripc/core/Interceptor.java create mode 100644 ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/TcpServer.java diff --git a/ripc-composition-reactor/src/main/java/io/ripc/composition/reactor/tcp/ReactorTcpServer.java b/ripc-composition-reactor/src/main/java/io/ripc/composition/reactor/tcp/ReactorTcpServer.java index a9fab4f..d45c5e6 100644 --- a/ripc-composition-reactor/src/main/java/io/ripc/composition/reactor/tcp/ReactorTcpServer.java +++ b/ripc-composition-reactor/src/main/java/io/ripc/composition/reactor/tcp/ReactorTcpServer.java @@ -1,6 +1,7 @@ package io.ripc.composition.reactor.tcp; import io.ripc.composition.reactor.tcp.connection.ReactorTcpConnection; +import io.ripc.protocol.tcp.TcpServer; import io.ripc.transport.netty4.tcp.server.NettyTcpServer; import org.reactivestreams.Subscriber; import reactor.rx.Stream; @@ -13,22 +14,20 @@ public class ReactorTcpServer extends Stream> { private final Broadcaster> connections = Broadcaster.create(); - private NettyTcpServer server; + private TcpServer> server; - public ReactorTcpServer() { + public ReactorTcpServer(int port, Class type) { + this.server = NettyTcpServer.listen(port) + .intercept(conn -> new ReactorTcpConnection<>(conn, type)) + .handler(connections::onNext); + this.server.start(); } public static ReactorTcpServer listen(int port, Class type) { - ReactorTcpServer server = new ReactorTcpServer<>(); - - server.server = NettyTcpServer.listen(port, connection -> { - ReactorTcpConnection conn = new ReactorTcpConnection<>(connection, type); - server.connections.onNext(conn); - }); - - return server; + return new ReactorTcpServer<>(port, type); } + public void shutdown() { server.shutdown(); } diff --git a/ripc-composition-reactor/src/test/java/io/ripc/composition/reactor/ReactorTcpServerIntegrationTests.java b/ripc-composition-reactor/src/test/java/io/ripc/composition/reactor/ReactorTcpServerIntegrationTests.java index 6355786..2cbbf40 100644 --- a/ripc-composition-reactor/src/test/java/io/ripc/composition/reactor/ReactorTcpServerIntegrationTests.java +++ b/ripc-composition-reactor/src/test/java/io/ripc/composition/reactor/ReactorTcpServerIntegrationTests.java @@ -6,6 +6,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + /** * Created by jbrisbin on 3/27/15. */ @@ -15,15 +18,24 @@ public class ReactorTcpServerIntegrationTests { @Test public void reactorTcpServerAcceptsData() throws InterruptedException { - ReactorTcpServer.listen(3000, ByteBuf.class) - .log("connection") - .consume(conn -> conn.out(conn.in().log("in")) - .log("confirmation") - .consume(buf -> LOG.info("write confirmed: {}", buf))); + CountDownLatch latch = new CountDownLatch(1); + + ReactorTcpServer server = ReactorTcpServer.listen(3000, ByteBuf.class); + + server.log("connection") + .consume(conn -> conn.out(conn.in() + .log("in") + .observeComplete(v -> latch.countDown())) + .log("confirmation") + .consume(buf -> LOG.info("write confirmed: {}", buf))); - while (true) { + while (!latch.await(1, TimeUnit.SECONDS)) { Thread.sleep(500); } + + Thread.sleep(500); + + server.shutdown(); } } diff --git a/ripc-core/src/main/java/io/ripc/core/Interceptor.java b/ripc-core/src/main/java/io/ripc/core/Interceptor.java new file mode 100644 index 0000000..d78c815 --- /dev/null +++ b/ripc-core/src/main/java/io/ripc/core/Interceptor.java @@ -0,0 +1,16 @@ +package io.ripc.core; + +/** + * An {@code Interceptor} takes one object, intercepts it, and transforms it into something else. + */ +public interface Interceptor { + + /** + * Transform T to V. + * + * @param obj + * @return + */ + V intercept(T obj); + +} diff --git a/ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/TcpServer.java b/ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/TcpServer.java new file mode 100644 index 0000000..5e00b9f --- /dev/null +++ b/ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/TcpServer.java @@ -0,0 +1,38 @@ +package io.ripc.protocol.tcp; + +import io.ripc.core.Handler; +import io.ripc.core.Interceptor; + +/** + * A {@code TcpServer} is a comopnent that listens for incoming TCP connections. + */ +public interface TcpServer { + + /** + * Transform the type of connection we're using for this server into something else. + * + * @param interceptor + * @param + * @return a new {@code TcpServer} of a different type + */ + TcpServer intercept(Interceptor interceptor); + + /** + * Set the handler that will be used to handle new connections. + * + * @param handler + * @return {@code this} + */ + TcpServer handler(Handler handler); + + /** + * Start this server and bind to the socket. + */ + void start(); + + /** + * Gracefully shutdown any resources allocated by this server. + */ + void shutdown(); + +} diff --git a/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/server/NettyTcpServer.java b/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/server/NettyTcpServer.java index 231c028..e0e6724 100644 --- a/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/server/NettyTcpServer.java +++ b/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/server/NettyTcpServer.java @@ -3,38 +3,59 @@ import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.PooledByteBufAllocator; import io.netty.channel.ChannelInitializer; -import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.logging.LoggingHandler; import io.ripc.core.Handler; +import io.ripc.core.Interceptor; +import io.ripc.protocol.tcp.TcpServer; import io.ripc.protocol.tcp.connection.TcpConnection; import io.ripc.transport.netty4.NamedDaemonThreadFactory; /** - * Created by jbrisbin on 3/10/15. + * An implementation of {@link io.ripc.protocol.tcp.TcpServer} that uses Netty. */ -public class NettyTcpServer { +public class NettyTcpServer implements TcpServer { + private final int port; + private final Interceptor interceptor; private final ServerBootstrap bootstrap; - public NettyTcpServer(ServerBootstrap bootstrap) { + private Handler handler; + + public NettyTcpServer(int port, Interceptor interceptor, ServerBootstrap bootstrap) { + this.port = port; + this.interceptor = interceptor; this.bootstrap = bootstrap; } - public static NettyTcpServer listen(int port, Handler handler) { - ServerBootstrap b = new ServerBootstrap(); - + public static NettyTcpServer listen(int port) { int threads = Runtime.getRuntime().availableProcessors(); - EventLoopGroup ioGroup = new NioEventLoopGroup(threads, new NamedDaemonThreadFactory("netty-io")); - EventLoopGroup workerGroup = new NioEventLoopGroup(threads, new NamedDaemonThreadFactory("netty-worker")); - b.group(ioGroup, workerGroup); - b.channel(NioServerSocketChannel.class); + ServerBootstrap b = new ServerBootstrap() + .group(new NioEventLoopGroup(threads, new NamedDaemonThreadFactory("netty-io")), + new NioEventLoopGroup(threads, new NamedDaemonThreadFactory("netty-worker"))) + .channel(NioServerSocketChannel.class); + + return new NettyTcpServer<>(port, null, b); + } + + @SuppressWarnings("unchecked") + @Override + public TcpServer intercept(Interceptor interceptor) { + return new NettyTcpServer<>(port, interceptor, bootstrap); + } + + @Override + public NettyTcpServer handler(Handler handler) { + if (null != this.handler) { + throw new IllegalArgumentException("A handler has already been set on this TcpServer"); + } + this.handler = handler; - NettyTcpServer server = new NettyTcpServer(b); - b.childHandler(new ChannelInitializer() { + this.bootstrap.childHandler(new ChannelInitializer() { + @SuppressWarnings("unchecked") @Override protected void initChannel(SocketChannel ch) throws Exception { ch.config().setAutoRead(false); @@ -43,13 +64,20 @@ protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new LoggingHandler()); NettyTcpServerConnection conn = new NettyTcpServerConnection(ch); - handler.handle(conn); + if (null != interceptor) { + handler.handle((C) interceptor.intercept(conn)); + } else { + handler.handle((C) conn); + } } }); - b.bind(port); + return this; + } - return server; + @Override + public void start() { + bootstrap.bind(port); } public void shutdown() { diff --git a/ripc-transport-netty4/src/test/java/io/ripc/transport/netty4/tcp/server/NettyTcpServerIntegrationTests.java b/ripc-transport-netty4/src/test/java/io/ripc/transport/netty4/tcp/server/NettyTcpServerIntegrationTests.java index bf0dca0..1ebe94d 100644 --- a/ripc-transport-netty4/src/test/java/io/ripc/transport/netty4/tcp/server/NettyTcpServerIntegrationTests.java +++ b/ripc-transport-netty4/src/test/java/io/ripc/transport/netty4/tcp/server/NettyTcpServerIntegrationTests.java @@ -41,8 +41,9 @@ public void canStartNettyTcpServer() throws InterruptedException { return false; }; - NettyTcpServer server = NettyTcpServer.listen(3000, connection -> - connection + NettyTcpServer server = NettyTcpServer + .listen(3000) + .handler(connection -> connection .addListener(writeCompleteListener) .addListener(readCompleteListener) .reader() @@ -77,10 +78,14 @@ public void onComplete() { } })); + server.start(); + while (!latch.await(1, TimeUnit.SECONDS)) { Thread.sleep(1000); } + Thread.sleep(1000); + server.shutdown(); } From 93f8a4606d317d1d350ffb569665b2e2d2525130 Mon Sep 17 00:00:00 2001 From: Jon Brisbin Date: Fri, 27 Mar 2015 15:45:01 -0500 Subject: [PATCH 10/14] Split the Reactor connection into two types, 1 for read, 1 for write, rather than forcing them to be symmetric as they were previously. --- .../reactor/tcp/ReactorTcpServer.java | 18 +++++++++--------- .../tcp/connection/ReactorTcpConnection.java | 16 ++++++++-------- .../ReactorTcpServerIntegrationTests.java | 4 ++-- 3 files changed, 19 insertions(+), 19 deletions(-) diff --git a/ripc-composition-reactor/src/main/java/io/ripc/composition/reactor/tcp/ReactorTcpServer.java b/ripc-composition-reactor/src/main/java/io/ripc/composition/reactor/tcp/ReactorTcpServer.java index d45c5e6..87a4f67 100644 --- a/ripc-composition-reactor/src/main/java/io/ripc/composition/reactor/tcp/ReactorTcpServer.java +++ b/ripc-composition-reactor/src/main/java/io/ripc/composition/reactor/tcp/ReactorTcpServer.java @@ -8,23 +8,23 @@ import reactor.rx.broadcast.Broadcaster; /** - * Created by jbrisbin on 3/27/15. + * An implementation of a TCP server that emits new connections as they are created. */ -public class ReactorTcpServer extends Stream> { +public class ReactorTcpServer extends Stream> { - private final Broadcaster> connections = Broadcaster.create(); + private final Broadcaster> connections = Broadcaster.create(); - private TcpServer> server; + private TcpServer> server; - public ReactorTcpServer(int port, Class type) { + public ReactorTcpServer(int port, Class readType, Class writeType) { this.server = NettyTcpServer.listen(port) - .intercept(conn -> new ReactorTcpConnection<>(conn, type)) + .intercept(conn -> new ReactorTcpConnection<>(conn, readType, writeType)) .handler(connections::onNext); this.server.start(); } - public static ReactorTcpServer listen(int port, Class type) { - return new ReactorTcpServer<>(port, type); + public static ReactorTcpServer listen(int port, Class readType, Class writeType) { + return new ReactorTcpServer<>(port, readType, writeType); } @@ -33,7 +33,7 @@ public void shutdown() { } @Override - public void subscribe(Subscriber> s) { + public void subscribe(Subscriber> s) { connections.subscribe(s); } diff --git a/ripc-composition-reactor/src/main/java/io/ripc/composition/reactor/tcp/connection/ReactorTcpConnection.java b/ripc-composition-reactor/src/main/java/io/ripc/composition/reactor/tcp/connection/ReactorTcpConnection.java index aba18d2..4347807 100644 --- a/ripc-composition-reactor/src/main/java/io/ripc/composition/reactor/tcp/connection/ReactorTcpConnection.java +++ b/ripc-composition-reactor/src/main/java/io/ripc/composition/reactor/tcp/connection/ReactorTcpConnection.java @@ -9,33 +9,33 @@ /** * Created by jbrisbin on 3/27/15. */ -public class ReactorTcpConnection { +public class ReactorTcpConnection { - private final Broadcaster writeComplete = Broadcaster.create(); + private final Broadcaster writeComplete = Broadcaster.create(); private final TcpConnection connection; - private final Stream in; + private final Stream in; @SuppressWarnings("unchecked") - public ReactorTcpConnection(TcpConnection connection, Class type) { + public ReactorTcpConnection(TcpConnection connection, Class readType, Class writeType) { this.connection = connection; - this.in = Streams.wrap(connection.reader()).map(type::cast); + this.in = Streams.wrap(connection.reader()).map(readType::cast); this.connection.addListener(new WriteCompleteListener() { @SuppressWarnings("unchecked") @Override public boolean writeComplete(TcpConnection connection, long count, Object msg) { - writeComplete.onNext(type.cast(msg)); + writeComplete.onNext(writeType.cast(msg)); return false; } }); } - public Stream in() { + public Stream in() { return in; } - public Stream out(Stream out) { + public Stream out(Stream out) { connection.writer(out.map(Object.class::cast)); return writeComplete; } diff --git a/ripc-composition-reactor/src/test/java/io/ripc/composition/reactor/ReactorTcpServerIntegrationTests.java b/ripc-composition-reactor/src/test/java/io/ripc/composition/reactor/ReactorTcpServerIntegrationTests.java index 2cbbf40..4901335 100644 --- a/ripc-composition-reactor/src/test/java/io/ripc/composition/reactor/ReactorTcpServerIntegrationTests.java +++ b/ripc-composition-reactor/src/test/java/io/ripc/composition/reactor/ReactorTcpServerIntegrationTests.java @@ -10,7 +10,7 @@ import java.util.concurrent.TimeUnit; /** - * Created by jbrisbin on 3/27/15. + * Integration tests for Reactor implementations of RIPC components. */ public class ReactorTcpServerIntegrationTests { @@ -20,7 +20,7 @@ public class ReactorTcpServerIntegrationTests { public void reactorTcpServerAcceptsData() throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); - ReactorTcpServer server = ReactorTcpServer.listen(3000, ByteBuf.class); + ReactorTcpServer server = ReactorTcpServer.listen(3000, ByteBuf.class, ByteBuf.class); server.log("connection") .consume(conn -> conn.out(conn.in() From a70fd682b8b34cbce4d6a4883cf017b93c199826 Mon Sep 17 00:00:00 2001 From: Jon Brisbin Date: Mon, 30 Mar 2015 08:02:37 -0500 Subject: [PATCH 11/14] Transfer signals from the writer onComplete and onError to the write notification Stream --- .../reactor/tcp/connection/ReactorTcpConnection.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/ripc-composition-reactor/src/main/java/io/ripc/composition/reactor/tcp/connection/ReactorTcpConnection.java b/ripc-composition-reactor/src/main/java/io/ripc/composition/reactor/tcp/connection/ReactorTcpConnection.java index 4347807..f1b5694 100644 --- a/ripc-composition-reactor/src/main/java/io/ripc/composition/reactor/tcp/connection/ReactorTcpConnection.java +++ b/ripc-composition-reactor/src/main/java/io/ripc/composition/reactor/tcp/connection/ReactorTcpConnection.java @@ -36,7 +36,9 @@ public Stream in() { } public Stream out(Stream out) { - connection.writer(out.map(Object.class::cast)); + connection.writer(out.observeComplete(v -> writeComplete.onComplete()) + .when(Throwable.class, writeComplete::onError) + .map(Object.class::cast)); return writeComplete; } From c8aeafea4bfabea6b0e8ec50b04f7365674b37ab Mon Sep 17 00:00:00 2001 From: Jon Brisbin Date: Mon, 30 Mar 2015 08:03:28 -0500 Subject: [PATCH 12/14] Move server.start() into subscribe and make the start method idempotent. --- .../io/ripc/composition/reactor/tcp/ReactorTcpServer.java | 3 +-- .../reactor/ReactorTcpServerIntegrationTests.java | 7 ++----- .../ripc/transport/netty4/tcp/server/NettyTcpServer.java | 8 +++++++- 3 files changed, 10 insertions(+), 8 deletions(-) diff --git a/ripc-composition-reactor/src/main/java/io/ripc/composition/reactor/tcp/ReactorTcpServer.java b/ripc-composition-reactor/src/main/java/io/ripc/composition/reactor/tcp/ReactorTcpServer.java index 87a4f67..d31a979 100644 --- a/ripc-composition-reactor/src/main/java/io/ripc/composition/reactor/tcp/ReactorTcpServer.java +++ b/ripc-composition-reactor/src/main/java/io/ripc/composition/reactor/tcp/ReactorTcpServer.java @@ -20,20 +20,19 @@ public ReactorTcpServer(int port, Class readType, Class writeType) { this.server = NettyTcpServer.listen(port) .intercept(conn -> new ReactorTcpConnection<>(conn, readType, writeType)) .handler(connections::onNext); - this.server.start(); } public static ReactorTcpServer listen(int port, Class readType, Class writeType) { return new ReactorTcpServer<>(port, readType, writeType); } - public void shutdown() { server.shutdown(); } @Override public void subscribe(Subscriber> s) { + server.start(); connections.subscribe(s); } diff --git a/ripc-composition-reactor/src/test/java/io/ripc/composition/reactor/ReactorTcpServerIntegrationTests.java b/ripc-composition-reactor/src/test/java/io/ripc/composition/reactor/ReactorTcpServerIntegrationTests.java index 4901335..7488081 100644 --- a/ripc-composition-reactor/src/test/java/io/ripc/composition/reactor/ReactorTcpServerIntegrationTests.java +++ b/ripc-composition-reactor/src/test/java/io/ripc/composition/reactor/ReactorTcpServerIntegrationTests.java @@ -24,17 +24,14 @@ public void reactorTcpServerAcceptsData() throws InterruptedException { server.log("connection") .consume(conn -> conn.out(conn.in() - .log("in") - .observeComplete(v -> latch.countDown())) + .log("in")) .log("confirmation") + .observeComplete(v -> latch.countDown()) .consume(buf -> LOG.info("write confirmed: {}", buf))); while (!latch.await(1, TimeUnit.SECONDS)) { - Thread.sleep(500); } - Thread.sleep(500); - server.shutdown(); } diff --git a/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/server/NettyTcpServer.java b/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/server/NettyTcpServer.java index e0e6724..072cacd 100644 --- a/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/server/NettyTcpServer.java +++ b/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/server/NettyTcpServer.java @@ -2,6 +2,7 @@ import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.PooledByteBufAllocator; +import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; @@ -22,6 +23,8 @@ public class NettyTcpServer implements TcpServer { private final Interceptor interceptor; private final ServerBootstrap bootstrap; + private ChannelFuture startFuture; + private Handler handler; public NettyTcpServer(int port, Interceptor interceptor, ServerBootstrap bootstrap) { @@ -77,7 +80,10 @@ protected void initChannel(SocketChannel ch) throws Exception { @Override public void start() { - bootstrap.bind(port); + if (null != startFuture) { + return; + } + startFuture = bootstrap.bind(port); } public void shutdown() { From 31996680f0057c7bf3776b82e8f2bb6bc830be8d Mon Sep 17 00:00:00 2001 From: Jon Brisbin Date: Mon, 30 Mar 2015 17:01:12 -0500 Subject: [PATCH 13/14] Add RxReactiveStreams --- build.gradle | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/build.gradle b/build.gradle index a2e6afc..237a25a 100644 --- a/build.gradle +++ b/build.gradle @@ -111,7 +111,8 @@ project('ripc-composition-rxjava1') { compile project(":ripc-transport-netty4") // RxJava 1.0 - compile "io.reactivex:rxjava:$rxjava1Version" + compile "io.reactivex:rxjava:$rxjava1Version", + 'io.reactivex:rxjava-reactive-streams:0.3.0' } } From 1aff428e5cf96ac728399a281a92c884d598555a Mon Sep 17 00:00:00 2001 From: Jon Brisbin Date: Mon, 30 Mar 2015 17:03:32 -0500 Subject: [PATCH 14/14] Added Reactor Codec support. Also added a ChannelActiveListener for configuring Netty channels. --- .../tcp/ReactorBufferChannelHandler.java | 37 ++++++++++++++++ .../reactor/tcp/ReactorTcpServer.java | 10 +++-- .../tcp/connection/ReactorTcpConnection.java | 44 +++++++++++++++---- .../ReactorTcpServerIntegrationTests.java | 4 +- .../listener/ChannelActiveListener.java | 11 +++++ .../tcp/EventListenerChannelHandler.java | 18 ++++++++ .../tcp/server/NettyTcpServerConnection.java | 8 +++- 7 files changed, 116 insertions(+), 16 deletions(-) create mode 100644 ripc-composition-reactor/src/main/java/io/ripc/composition/reactor/tcp/ReactorBufferChannelHandler.java create mode 100644 ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/listener/ChannelActiveListener.java diff --git a/ripc-composition-reactor/src/main/java/io/ripc/composition/reactor/tcp/ReactorBufferChannelHandler.java b/ripc-composition-reactor/src/main/java/io/ripc/composition/reactor/tcp/ReactorBufferChannelHandler.java new file mode 100644 index 0000000..fe1e962 --- /dev/null +++ b/ripc-composition-reactor/src/main/java/io/ripc/composition/reactor/tcp/ReactorBufferChannelHandler.java @@ -0,0 +1,37 @@ +package io.ripc.composition.reactor.tcp; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelDuplexHandler; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPromise; +import reactor.io.buffer.Buffer; + +/** + * Created by jbrisbin on 3/30/15. + */ +@ChannelHandler.Sharable +public class ReactorBufferChannelHandler extends ChannelDuplexHandler { + + public static final ChannelDuplexHandler INSTANCE = new ReactorBufferChannelHandler(); + + @Override + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { + if (Buffer.class.isAssignableFrom(msg.getClass())) { + Buffer reactorBuf = (Buffer) msg; + ByteBuf nettyBuf = ctx.alloc().buffer(reactorBuf.capacity()); + nettyBuf.writeBytes(reactorBuf.asBytes()); + msg = nettyBuf; + } + super.write(ctx, msg, promise); + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + if (ByteBuf.class.isAssignableFrom(msg.getClass())) { + msg = new Buffer(((ByteBuf) msg).nioBuffer()); + } + super.channelRead(ctx, msg); + } + +} diff --git a/ripc-composition-reactor/src/main/java/io/ripc/composition/reactor/tcp/ReactorTcpServer.java b/ripc-composition-reactor/src/main/java/io/ripc/composition/reactor/tcp/ReactorTcpServer.java index d31a979..5500573 100644 --- a/ripc-composition-reactor/src/main/java/io/ripc/composition/reactor/tcp/ReactorTcpServer.java +++ b/ripc-composition-reactor/src/main/java/io/ripc/composition/reactor/tcp/ReactorTcpServer.java @@ -4,6 +4,8 @@ import io.ripc.protocol.tcp.TcpServer; import io.ripc.transport.netty4.tcp.server.NettyTcpServer; import org.reactivestreams.Subscriber; +import reactor.io.buffer.Buffer; +import reactor.io.codec.Codec; import reactor.rx.Stream; import reactor.rx.broadcast.Broadcaster; @@ -16,14 +18,14 @@ public class ReactorTcpServer extends Stream> { private TcpServer> server; - public ReactorTcpServer(int port, Class readType, Class writeType) { + public ReactorTcpServer(int port, Codec codec) { this.server = NettyTcpServer.listen(port) - .intercept(conn -> new ReactorTcpConnection<>(conn, readType, writeType)) + .intercept(conn -> new ReactorTcpConnection<>(conn, codec)) .handler(connections::onNext); } - public static ReactorTcpServer listen(int port, Class readType, Class writeType) { - return new ReactorTcpServer<>(port, readType, writeType); + public static ReactorTcpServer listen(int port, Codec codec) { + return new ReactorTcpServer<>(port, codec); } public void shutdown() { diff --git a/ripc-composition-reactor/src/main/java/io/ripc/composition/reactor/tcp/connection/ReactorTcpConnection.java b/ripc-composition-reactor/src/main/java/io/ripc/composition/reactor/tcp/connection/ReactorTcpConnection.java index f1b5694..95acd65 100644 --- a/ripc-composition-reactor/src/main/java/io/ripc/composition/reactor/tcp/connection/ReactorTcpConnection.java +++ b/ripc-composition-reactor/src/main/java/io/ripc/composition/reactor/tcp/connection/ReactorTcpConnection.java @@ -1,7 +1,13 @@ package io.ripc.composition.reactor.tcp.connection; +import io.netty.channel.ChannelHandlerContext; +import io.ripc.composition.reactor.tcp.ReactorBufferChannelHandler; import io.ripc.protocol.tcp.connection.TcpConnection; import io.ripc.protocol.tcp.connection.listener.WriteCompleteListener; +import io.ripc.transport.netty4.listener.ChannelActiveListener; +import reactor.fn.Function; +import reactor.io.buffer.Buffer; +import reactor.io.codec.Codec; import reactor.rx.Stream; import reactor.rx.Streams; import reactor.rx.broadcast.Broadcaster; @@ -11,34 +17,54 @@ */ public class ReactorTcpConnection { - private final Broadcaster writeComplete = Broadcaster.create(); + private final Broadcaster writeComplete = Broadcaster.create(); - private final TcpConnection connection; - private final Stream in; + private final TcpConnection connection; + private final Codec codec; + private final Stream in; @SuppressWarnings("unchecked") - public ReactorTcpConnection(TcpConnection connection, Class readType, Class writeType) { + public ReactorTcpConnection(TcpConnection connection, Codec codec) { this.connection = connection; + this.codec = codec; - this.in = Streams.wrap(connection.reader()).map(readType::cast); this.connection.addListener(new WriteCompleteListener() { @SuppressWarnings("unchecked") @Override public boolean writeComplete(TcpConnection connection, long count, Object msg) { - writeComplete.onNext(writeType.cast(msg)); - return false; + writeComplete.onNext(msg); + return true; } }); + + this.connection.addListener(new ChannelActiveListener() { + @Override + public void channelActive(ChannelHandlerContext ctx) { + ctx.pipeline().addBefore("reactive-ipc-inbound", + "byteBufToBufferHandler", + ReactorBufferChannelHandler.INSTANCE); + } + }); + + this.in = Streams.wrap(connection.reader()) + .map(new Function() { + Function decoder = (null != codec ? codec.decoder() : null); + + @Override + public R apply(Object o) { + return (null != decoder ? decoder.apply((Buffer) o) : (R) o); + } + }); } public Stream in() { return in; } - public Stream out(Stream out) { + public Stream out(Stream out) { connection.writer(out.observeComplete(v -> writeComplete.onComplete()) .when(Throwable.class, writeComplete::onError) - .map(Object.class::cast)); + .map(codec.encoder())); return writeComplete; } diff --git a/ripc-composition-reactor/src/test/java/io/ripc/composition/reactor/ReactorTcpServerIntegrationTests.java b/ripc-composition-reactor/src/test/java/io/ripc/composition/reactor/ReactorTcpServerIntegrationTests.java index 7488081..82b11fc 100644 --- a/ripc-composition-reactor/src/test/java/io/ripc/composition/reactor/ReactorTcpServerIntegrationTests.java +++ b/ripc-composition-reactor/src/test/java/io/ripc/composition/reactor/ReactorTcpServerIntegrationTests.java @@ -1,10 +1,10 @@ package io.ripc.composition.reactor; -import io.netty.buffer.ByteBuf; import io.ripc.composition.reactor.tcp.ReactorTcpServer; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import reactor.io.codec.StandardCodecs; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -20,7 +20,7 @@ public class ReactorTcpServerIntegrationTests { public void reactorTcpServerAcceptsData() throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); - ReactorTcpServer server = ReactorTcpServer.listen(3000, ByteBuf.class, ByteBuf.class); + ReactorTcpServer server = ReactorTcpServer.listen(3000, StandardCodecs.STRING_CODEC); server.log("connection") .consume(conn -> conn.out(conn.in() diff --git a/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/listener/ChannelActiveListener.java b/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/listener/ChannelActiveListener.java new file mode 100644 index 0000000..88bc8e9 --- /dev/null +++ b/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/listener/ChannelActiveListener.java @@ -0,0 +1,11 @@ +package io.ripc.transport.netty4.listener; + +import io.netty.channel.ChannelHandlerContext; +import io.ripc.core.EventListener; + +/** + * Created by jbrisbin on 3/30/15. + */ +public interface ChannelActiveListener extends EventListener { + void channelActive(ChannelHandlerContext ctx); +} diff --git a/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/EventListenerChannelHandler.java b/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/EventListenerChannelHandler.java index 52dda94..d3e708b 100644 --- a/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/EventListenerChannelHandler.java +++ b/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/EventListenerChannelHandler.java @@ -8,6 +8,7 @@ import io.ripc.protocol.tcp.connection.TcpConnection; import io.ripc.protocol.tcp.connection.listener.ReadCompleteListener; import io.ripc.protocol.tcp.connection.listener.WriteCompleteListener; +import io.ripc.transport.netty4.listener.ChannelActiveListener; import java.util.concurrent.atomic.AtomicLongFieldUpdater; @@ -26,6 +27,7 @@ public class EventListenerChannelHandler extends ChannelDuplexHandler { private ReadCompleteListener readCompleteListener; private WriteCompleteListener writeCompleteListener; + private ChannelActiveListener channelActiveListener; public EventListenerChannelHandler(TcpConnection connection) { this.connection = connection; @@ -47,6 +49,22 @@ public void setWriteCompleteListener(WriteCompleteListener writeCompleteListener this.writeCompleteListener = writeCompleteListener; } + public ChannelActiveListener getChannelActiveListener() { + return channelActiveListener; + } + + public void setChannelActiveListener(ChannelActiveListener channelActiveListener) { + this.channelActiveListener = channelActiveListener; + } + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + if (null != channelActiveListener) { + channelActiveListener.channelActive(ctx); + } + super.channelActive(ctx); + } + @Override public void flush(ChannelHandlerContext ctx) throws Exception { super.flush(ctx); diff --git a/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/server/NettyTcpServerConnection.java b/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/server/NettyTcpServerConnection.java index e427895..38eafb5 100644 --- a/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/server/NettyTcpServerConnection.java +++ b/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/server/NettyTcpServerConnection.java @@ -6,6 +6,7 @@ import io.ripc.protocol.tcp.connection.TcpConnection; import io.ripc.protocol.tcp.connection.listener.ReadCompleteListener; import io.ripc.protocol.tcp.connection.listener.WriteCompleteListener; +import io.ripc.transport.netty4.listener.ChannelActiveListener; import io.ripc.transport.netty4.tcp.ChannelInboundHandlerSubscription; import io.ripc.transport.netty4.tcp.EventListenerChannelHandler; import org.reactivestreams.Publisher; @@ -53,6 +54,11 @@ public TcpConnection addListener(EventListener listener) { handler.setReadCompleteListener((ReadCompleteListener) listener); } + // Assign a ChannelActiveListener that will be notified when a Channel becomes active + if (ChannelActiveListener.class.isAssignableFrom(listenerType)) { + handler.setChannelActiveListener((ChannelActiveListener) listener); + } + return this; } @@ -93,7 +99,7 @@ public ReadPublisher(Channel channel) { public void subscribe(Subscriber subscriber) { ChannelInboundHandlerSubscription sub = new ChannelInboundHandlerSubscription(channel, subscriber); subscriber.onSubscribe(sub); - channel.pipeline().addLast(sub); + channel.pipeline().addLast("reactive-ipc-inbound", sub); } }