From 1a6520cf86d772dbf0bdab2a73d8c1fb37c6d6bf Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Fri, 5 Jun 2015 16:03:36 -0400 Subject: [PATCH 1/5] Add ReactorTcpHandler --- .../reactor/protocol/tcp/ReactorTcpServerSample.java | 5 +++-- .../reactor/protocol/tcp/ReactorTcpConnection.java | 1 - .../ripc/reactor/protocol/tcp/ReactorTcpHandler.java | 8 ++++++++ .../ripc/reactor/protocol/tcp/ReactorTcpServer.java | 12 ++++++++++-- 4 files changed, 21 insertions(+), 5 deletions(-) create mode 100644 ripc-reactor/src/main/java/io/ripc/reactor/protocol/tcp/ReactorTcpHandler.java diff --git a/ripc-reactor-examples/src/main/java/io/ripc/reactor/protocol/tcp/ReactorTcpServerSample.java b/ripc-reactor-examples/src/main/java/io/ripc/reactor/protocol/tcp/ReactorTcpServerSample.java index 430d785..bfd6835 100644 --- a/ripc-reactor-examples/src/main/java/io/ripc/reactor/protocol/tcp/ReactorTcpServerSample.java +++ b/ripc-reactor-examples/src/main/java/io/ripc/reactor/protocol/tcp/ReactorTcpServerSample.java @@ -12,14 +12,15 @@ * Created by jbrisbin on 5/28/15. */ public class ReactorTcpServerSample { + public static void main(String... args) throws InterruptedException { TcpServer transport = Netty4TcpServer.create(0); ReactorTcpServer.create(transport) - .start(ch -> ch.flatMap(bb -> { + .start(connection -> connection.flatMap(bb -> { String msgStr = "Hello " + bb.toString(Charset.defaultCharset()) + "!"; ByteBuf msg = Unpooled.buffer().writeBytes(msgStr.getBytes()); - return ch.writeWith(Streams.just(msg)); + return connection.writeWith(Streams.just(msg)); })); } diff --git a/ripc-reactor/src/main/java/io/ripc/reactor/protocol/tcp/ReactorTcpConnection.java b/ripc-reactor/src/main/java/io/ripc/reactor/protocol/tcp/ReactorTcpConnection.java index 3d469e1..cc2092f 100644 --- a/ripc-reactor/src/main/java/io/ripc/reactor/protocol/tcp/ReactorTcpConnection.java +++ b/ripc-reactor/src/main/java/io/ripc/reactor/protocol/tcp/ReactorTcpConnection.java @@ -13,7 +13,6 @@ public class ReactorTcpConnection extends Stream { private final TcpConnection transport; public ReactorTcpConnection(TcpConnection transport) { - super(); this.transport = transport; } diff --git a/ripc-reactor/src/main/java/io/ripc/reactor/protocol/tcp/ReactorTcpHandler.java b/ripc-reactor/src/main/java/io/ripc/reactor/protocol/tcp/ReactorTcpHandler.java new file mode 100644 index 0000000..4611873 --- /dev/null +++ b/ripc-reactor/src/main/java/io/ripc/reactor/protocol/tcp/ReactorTcpHandler.java @@ -0,0 +1,8 @@ +package io.ripc.reactor.protocol.tcp; + +import org.reactivestreams.Publisher; +import reactor.fn.Function; + +public interface ReactorTcpHandler extends Function, Publisher> { + +} \ No newline at end of file diff --git a/ripc-reactor/src/main/java/io/ripc/reactor/protocol/tcp/ReactorTcpServer.java b/ripc-reactor/src/main/java/io/ripc/reactor/protocol/tcp/ReactorTcpServer.java index e6580c0..94e0df7 100644 --- a/ripc-reactor/src/main/java/io/ripc/reactor/protocol/tcp/ReactorTcpServer.java +++ b/ripc-reactor/src/main/java/io/ripc/reactor/protocol/tcp/ReactorTcpServer.java @@ -1,5 +1,7 @@ package io.ripc.reactor.protocol.tcp; +import io.ripc.protocol.tcp.TcpConnection; +import io.ripc.protocol.tcp.TcpHandler; import io.ripc.protocol.tcp.TcpServer; import org.reactivestreams.Publisher; import reactor.Environment; @@ -20,8 +22,14 @@ public class ReactorTcpServer { this.transport = transport; } - public ReactorTcpServer start(Function, Publisher> handler) { - transport.startAndAwait(conn -> handler.apply(new ReactorTcpConnection<>(conn))); + public ReactorTcpServer start(ReactorTcpHandler handler) { + + transport.startAndAwait(new TcpHandler() { + @Override + public Publisher handle(TcpConnection connection) { + return handler.apply(new ReactorTcpConnection<>(connection)); + } + }); return this; } From 1b0d85eb873ab8e2dfa177f3e56f335c2acb6228 Mon Sep 17 00:00:00 2001 From: Brian Clozel Date: Tue, 16 Jun 2015 15:47:56 +0200 Subject: [PATCH 2/5] Fix JDK source/target levels and deps in build Here is the JDK level policy enforced by the Gradle build: * in core modules, sources should be JDK 1.7 compatible * in core modules, tests should be JDK1.8 compatible * in example modules, sources+tests should be JDK 1.8 compatible This commit fixes the gradle build configuration and a missing `final` keyword in a core implementation class. This change also switch the `reactor-net` dependency to a `reactor-stream` dep. --- build.gradle | 14 +++++++++++++- .../reactor/protocol/tcp/ReactorTcpServer.java | 2 +- 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/build.gradle b/build.gradle index a3b4c07..607af84 100644 --- a/build.gradle +++ b/build.gradle @@ -27,6 +27,10 @@ ext { ] as String[] } +def exampleProjects = [ project(':ripc-transport-netty4-examples'), + project(':ripc-rxjava1-examples'), + project(':ripc-reactor-examples') ] + allprojects { apply plugin: 'java' @@ -78,6 +82,14 @@ subprojects { subproject -> } } +configure(exampleProjects) { + compileJava { + sourceCompatibility = 1.8 + targetCompatibility = 1.8 + } +} + + project('ripc-core') { description = 'Reactive IPC Core Components' dependencies { @@ -131,7 +143,7 @@ project('ripc-reactor') { dependencies { // ripc-tcp compile project(":ripc-protocol-tcp"), - "io.projectreactor:reactor-net:$reactorVersion" + "io.projectreactor:reactor-stream:$reactorVersion" } } diff --git a/ripc-reactor/src/main/java/io/ripc/reactor/protocol/tcp/ReactorTcpServer.java b/ripc-reactor/src/main/java/io/ripc/reactor/protocol/tcp/ReactorTcpServer.java index 94e0df7..d239f4f 100644 --- a/ripc-reactor/src/main/java/io/ripc/reactor/protocol/tcp/ReactorTcpServer.java +++ b/ripc-reactor/src/main/java/io/ripc/reactor/protocol/tcp/ReactorTcpServer.java @@ -22,7 +22,7 @@ public class ReactorTcpServer { this.transport = transport; } - public ReactorTcpServer start(ReactorTcpHandler handler) { + public ReactorTcpServer start(final ReactorTcpHandler handler) { transport.startAndAwait(new TcpHandler() { @Override From 2dbddd65cd7f0b6f473ab8ccee86c54057886a33 Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Mon, 29 Jun 2015 15:29:46 -0400 Subject: [PATCH 3/5] Add sample code with Netty codecs --- build.gradle | 7 +- .../handler/codec/json/JsonObjectDecoder.java | 216 ++++++++++++++++ .../reactor/protocol/tcp/CodecSample.java | 151 +++++++++++ .../protocol/tcp/ReactorTcpServerSample.java | 45 +++- .../handler/codec/json/JsonObjectDecoder.java | 214 ++++++++++++++++ .../transport/netty4/tcp/CodecSample.java | 238 ++++++++++++++++++ .../transport/netty4/tcp/Netty4TcpServer.java | 13 + 7 files changed, 876 insertions(+), 8 deletions(-) create mode 100644 ripc-reactor-examples/src/main/java/io/netty/handler/codec/json/JsonObjectDecoder.java create mode 100644 ripc-reactor-examples/src/main/java/io/ripc/reactor/protocol/tcp/CodecSample.java create mode 100644 ripc-transport-netty4-examples/src/main/java/io/netty/handler/codec/json/JsonObjectDecoder.java create mode 100644 ripc-transport-netty4-examples/src/main/java/io/ripc/transport/netty4/tcp/CodecSample.java diff --git a/build.gradle b/build.gradle index 607af84..47cd6a9 100644 --- a/build.gradle +++ b/build.gradle @@ -14,7 +14,7 @@ ext { // Composition Libraries rxjava1Version = '1.0.8' rxjavaRsVersion = '0.5.0' - reactorVersion = '2.0.3.RELEASE' + reactorVersion = '2.1.0.BUILD-SNAPSHOT' // Testing mockitoVersion = '1.10.19' @@ -113,6 +113,7 @@ project('ripc-transport-netty4-examples') { dependencies { // ripc-core compile project(":ripc-transport-netty4"), + "com.fasterxml.jackson.core:jackson-databind:2.5.4", "io.reactivex:rxjava:$rxjava1Version", "io.reactivex:rxjava-reactive-streams:$rxjavaRsVersion" runtime "ch.qos.logback:logback-classic:$logbackVersion" @@ -163,8 +164,8 @@ project('ripc-reactor-examples') { dependencies { // ripc-tcp compile project(":ripc-reactor"), - project(":ripc-transport-netty4") - + project(":ripc-transport-netty4"), + "com.fasterxml.jackson.core:jackson-databind:2.5.4" runtime "ch.qos.logback:logback-classic:$logbackVersion" } } diff --git a/ripc-reactor-examples/src/main/java/io/netty/handler/codec/json/JsonObjectDecoder.java b/ripc-reactor-examples/src/main/java/io/netty/handler/codec/json/JsonObjectDecoder.java new file mode 100644 index 0000000..765de16 --- /dev/null +++ b/ripc-reactor-examples/src/main/java/io/netty/handler/codec/json/JsonObjectDecoder.java @@ -0,0 +1,216 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.handler.codec.json; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufUtil; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.ByteToMessageDecoder; +import io.netty.channel.ChannelHandler; +import io.netty.handler.codec.CorruptedFrameException; +import io.netty.handler.codec.TooLongFrameException; +import io.netty.channel.ChannelPipeline; + +import java.util.List; + +/** + * NOTE: This class was copied from Netty 4.1 sources. + * + * Splits a byte stream of JSON objects and arrays into individual objects/arrays and passes them up the + * {@link ChannelPipeline}. + * + * This class does not do any real parsing or validation. A sequence of bytes is considered a JSON object/array + * if it contains a matching number of opening and closing braces/brackets. It's up to a subsequent + * {@link ChannelHandler} to parse the JSON text into a more usable form i.e. a POJO. + */ +public class JsonObjectDecoder extends ByteToMessageDecoder { + + private static final int ST_CORRUPTED = -1; + private static final int ST_INIT = 0; + private static final int ST_DECODING_NORMAL = 1; + private static final int ST_DECODING_ARRAY_STREAM = 2; + + private int openBraces; + private int idx; + + private int state; + private boolean insideString; + + private final int maxObjectLength; + private final boolean streamArrayElements; + + public JsonObjectDecoder() { + // 1 MB + this(1024 * 1024); + } + + public JsonObjectDecoder(int maxObjectLength) { + this(maxObjectLength, false); + } + + public JsonObjectDecoder(boolean streamArrayElements) { + this(1024 * 1024, streamArrayElements); + } + + /** + * @param maxObjectLength maximum number of bytes a JSON object/array may use (including braces and all). + * Objects exceeding this length are dropped and an {@link TooLongFrameException} + * is thrown. + * @param streamArrayElements if set to true and the "top level" JSON object is an array, each of its entries + * is passed through the pipeline individually and immediately after it was fully + * received, allowing for arrays with "infinitely" many elements. + * + */ + public JsonObjectDecoder(int maxObjectLength, boolean streamArrayElements) { + if (maxObjectLength < 1) { + throw new IllegalArgumentException("maxObjectLength must be a positive int"); + } + this.maxObjectLength = maxObjectLength; + this.streamArrayElements = streamArrayElements; + } + + @Override + protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { + if (state == ST_CORRUPTED) { + in.skipBytes(in.readableBytes()); + return; + } + + // index of next byte to process. + int idx = this.idx; + int wrtIdx = in.writerIndex(); + + if (wrtIdx > maxObjectLength) { + // buffer size exceeded maxObjectLength; discarding the complete buffer. + in.skipBytes(in.readableBytes()); + reset(); + throw new TooLongFrameException( + "object length exceeds " + maxObjectLength + ": " + wrtIdx + " bytes discarded"); + } + + for (/* use current idx */; idx < wrtIdx; idx++) { + byte c = in.getByte(idx); + if (state == ST_DECODING_NORMAL) { + decodeByte(c, in, idx); + + // All opening braces/brackets have been closed. That's enough to conclude + // that the JSON object/array is complete. + if (openBraces == 0) { + ByteBuf json = extractObject(ctx, in, in.readerIndex(), idx + 1 - in.readerIndex()); + if (json != null) { + out.add(json); + } + + // The JSON object/array was extracted => discard the bytes from + // the input buffer. + in.readerIndex(idx + 1); + // Reset the object state to get ready for the next JSON object/text + // coming along the byte stream. + reset(); + } + } else if (state == ST_DECODING_ARRAY_STREAM) { + decodeByte(c, in, idx); + + if (!insideString && (openBraces == 1 && c == ',' || openBraces == 0 && c == ']')) { + // skip leading spaces. No range check is needed and the loop will terminate + // because the byte at position idx is not a whitespace. + for (int i = in.readerIndex(); Character.isWhitespace(in.getByte(i)); i++) { + in.skipBytes(1); + } + + // skip trailing spaces. + int idxNoSpaces = idx - 1; + while (idxNoSpaces >= in.readerIndex() && Character.isWhitespace(in.getByte(idxNoSpaces))) { + idxNoSpaces--; + } + + ByteBuf json = extractObject(ctx, in, in.readerIndex(), idxNoSpaces + 1 - in.readerIndex()); + if (json != null) { + out.add(json); + } + + in.readerIndex(idx + 1); + + if (c == ']') { + reset(); + } + } + // JSON object/array detected. Accumulate bytes until all braces/brackets are closed. + } else if (c == '{' || c == '[') { + initDecoding(c); + + if (state == ST_DECODING_ARRAY_STREAM) { + // Discard the array bracket + in.skipBytes(1); + } + // Discard leading spaces in front of a JSON object/array. + } else if (Character.isWhitespace(c)) { + in.skipBytes(1); + } else { + state = ST_CORRUPTED; + throw new CorruptedFrameException( + "invalid JSON received at byte position " + idx + ": " + ByteBufUtil.hexDump(in)); + } + } + + if (in.readableBytes() == 0) { + this.idx = 0; + } else { + this.idx = idx; + } + } + + /** + * Override this method if you want to filter the json objects/arrays that get passed through the pipeline. + */ + @SuppressWarnings("UnusedParameters") + protected ByteBuf extractObject(ChannelHandlerContext ctx, ByteBuf buffer, int index, int length) { + return buffer.slice(index, length).retain(); + } + + private void decodeByte(byte c, ByteBuf in, int idx) { + if ((c == '{' || c == '[') && !insideString) { + openBraces++; + } else if ((c == '}' || c == ']') && !insideString) { + openBraces--; + } else if (c == '"') { + // start of a new JSON string. It's necessary to detect strings as they may + // also contain braces/brackets and that could lead to incorrect results. + if (!insideString) { + insideString = true; + // If the double quote wasn't escaped then this is the end of a string. + } else if (in.getByte(idx - 1) != '\\') { + insideString = false; + } + } + } + + private void initDecoding(byte openingBrace) { + openBraces = 1; + if (openingBrace == '[' && streamArrayElements) { + state = ST_DECODING_ARRAY_STREAM; + } else { + state = ST_DECODING_NORMAL; + } + } + + private void reset() { + insideString = false; + state = ST_INIT; + openBraces = 0; + } +} \ No newline at end of file diff --git a/ripc-reactor-examples/src/main/java/io/ripc/reactor/protocol/tcp/CodecSample.java b/ripc-reactor-examples/src/main/java/io/ripc/reactor/protocol/tcp/CodecSample.java new file mode 100644 index 0000000..1890b93 --- /dev/null +++ b/ripc-reactor-examples/src/main/java/io/ripc/reactor/protocol/tcp/CodecSample.java @@ -0,0 +1,151 @@ +package io.ripc.reactor.protocol.tcp; + +import com.fasterxml.jackson.databind.ObjectMapper; +import io.netty.buffer.ByteBuf; +import io.netty.channel.Channel; +import io.netty.channel.ChannelConfig; +import io.netty.channel.ChannelDuplexHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.ChannelPromise; +import io.netty.channel.FixedRecvByteBufAllocator; +import io.netty.handler.codec.LineBasedFrameDecoder; +import io.netty.handler.codec.json.JsonObjectDecoder; +import io.netty.handler.codec.string.StringDecoder; +import io.netty.handler.codec.string.StringEncoder; +import io.netty.util.CharsetUtil; +import io.ripc.protocol.tcp.TcpServer; +import io.ripc.transport.netty4.tcp.Netty4TcpServer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import reactor.rx.Streams; + +import java.nio.charset.Charset; + +public class CodecSample { + + private static final Logger LOG = LoggerFactory.getLogger(CodecSample.class); + + + public static void main(String... args) throws InterruptedException { + //runLineBasedFrameDecoder(); + echoJsonStreamDecoding(); + } + + private static void runLineBasedFrameDecoder() { + + TcpServer transport = Netty4TcpServer.create( + 0, + new ChannelInitializer() { + @Override + protected void initChannel(Channel channel) throws Exception { + int bufferSize = 1; + ChannelConfig config = channel.config(); + config.setOption(ChannelOption.SO_RCVBUF, bufferSize); + config.setOption(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(bufferSize)); + channel.pipeline().addFirst( + new LineBasedFrameDecoder(256), + new StringDecoder(CharsetUtil.UTF_8), + new StringEncoder(CharsetUtil.UTF_8)); + } + }); + + ReactorTcpServer.create(transport).start(connection -> { + connection.log("input") + .observeComplete(v -> LOG.info("Connection input complete")) + .capacity(1) + .consume(line -> { + String response = "Hello " + line + "\n"; + Streams.wrap(connection.writeWith(Streams.just(response))).consume(); + }); + return Streams.never(); + }); + } + + private static void echoJsonStreamDecoding() { + + TcpServer transport = Netty4TcpServer.create( + 0, + new ChannelInitializer() { + @Override + protected void initChannel(Channel channel) throws Exception { + channel.pipeline().addFirst( + new JsonObjectDecoder(), + new JacksonJsonCodec()); + } + }); + + ReactorTcpServer.create(transport) + .start(connection -> { + connection.log("input") + .observeComplete(v -> LOG.info("Connection input complete")) + .capacity(1) + .consume(person -> { + person = new Person(person.getLastName(), person.getFirstName()); + Streams.wrap(connection.writeWith(Streams.just(person))).consume(); + }); + return Streams.never(); + }); + + } + + private static class JacksonJsonCodec extends ChannelDuplexHandler { + + private final ObjectMapper mapper = new ObjectMapper(); + + @Override + public void channelRead(ChannelHandlerContext context, Object message) throws Exception { + if (message instanceof ByteBuf) { + Charset charset = Charset.defaultCharset(); + message = this.mapper.readValue(((ByteBuf) message).toString(charset), Person.class); + } + super.channelRead(context, message); + } + + @Override + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { + if (msg instanceof Person) { + byte[] buff = mapper.writeValueAsBytes(msg); + ByteBuf bb = ctx.alloc().buffer(buff.length); + bb.writeBytes(buff); + msg = bb; + } + super.write(ctx, msg, promise); + } + } + + private static class Person { + + private String firstName; + + private String lastName; + + public Person() { + } + + public Person(String firstName, String lastName) { + this.firstName = firstName; + this.lastName = lastName; + } + + public String getFirstName() { + return firstName; + } + + public Person setFirstName(String firstName) { + this.firstName = firstName; + return this; + } + + public String getLastName() { + return lastName; + } + + public Person setLastName(String lastName) { + this.lastName = lastName; + return this; + } + } + +} diff --git a/ripc-reactor-examples/src/main/java/io/ripc/reactor/protocol/tcp/ReactorTcpServerSample.java b/ripc-reactor-examples/src/main/java/io/ripc/reactor/protocol/tcp/ReactorTcpServerSample.java index bfd6835..e0ab929 100644 --- a/ripc-reactor-examples/src/main/java/io/ripc/reactor/protocol/tcp/ReactorTcpServerSample.java +++ b/ripc-reactor-examples/src/main/java/io/ripc/reactor/protocol/tcp/ReactorTcpServerSample.java @@ -4,6 +4,8 @@ import io.netty.buffer.Unpooled; import io.ripc.protocol.tcp.TcpServer; import io.ripc.transport.netty4.tcp.Netty4TcpServer; +import reactor.rx.Promise; +import reactor.rx.Promises; import reactor.rx.Streams; import java.nio.charset.Charset; @@ -15,13 +17,46 @@ public class ReactorTcpServerSample { public static void main(String... args) throws InterruptedException { TcpServer transport = Netty4TcpServer.create(0); +// echo(transport); + echoWithQuitCommand(transport); + } + + /** + * Keep echoing until the client goes away. + */ + private static void echo(TcpServer transport) { + ReactorTcpServer.create(transport) + .start(connection -> { + connection.flatMap(inByteBuf -> { + String text = "Hello " + inByteBuf.toString(Charset.defaultCharset()); + ByteBuf outByteBuf = Unpooled.buffer().writeBytes(text.getBytes()); + return connection.writeWith(Streams.just(outByteBuf)); + }).consume(); + return Streams.never(); + }); + } + /** + * Keep echoing until the client sends "quite". + */ + private static void echoWithQuitCommand(TcpServer transport) { ReactorTcpServer.create(transport) - .start(connection -> connection.flatMap(bb -> { - String msgStr = "Hello " + bb.toString(Charset.defaultCharset()) + "!"; - ByteBuf msg = Unpooled.buffer().writeBytes(msgStr.getBytes()); - return connection.writeWith(Streams.just(msg)); - })); + .start(connection -> { + Promise promise = Promises.prepare(); + connection.flatMap(inByteBuf -> { + String input = inByteBuf.toString(Charset.defaultCharset()).trim(); + if ("quit".equalsIgnoreCase(input)) { + promise.onComplete(); + return promise; + } + else { + String text = "Hello " + inByteBuf.toString(Charset.defaultCharset()); + ByteBuf outByteBuf = Unpooled.buffer().writeBytes(text.getBytes()); + return connection.writeWith(Streams.just(outByteBuf)); + } + }).consume(); + return promise; + }); } } diff --git a/ripc-transport-netty4-examples/src/main/java/io/netty/handler/codec/json/JsonObjectDecoder.java b/ripc-transport-netty4-examples/src/main/java/io/netty/handler/codec/json/JsonObjectDecoder.java new file mode 100644 index 0000000..3bbce65 --- /dev/null +++ b/ripc-transport-netty4-examples/src/main/java/io/netty/handler/codec/json/JsonObjectDecoder.java @@ -0,0 +1,214 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.handler.codec.json; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufUtil; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.ByteToMessageDecoder; +import io.netty.handler.codec.CorruptedFrameException; +import io.netty.handler.codec.TooLongFrameException; + +import java.util.List; + +/** + * NOTE: This class was copied from Netty 4.1 sources. + * + * Splits a byte stream of JSON objects and arrays into individual objects/arrays and passes them up the + * {@link io.netty.channel.ChannelPipeline}. + * + * This class does not do any real parsing or validation. A sequence of bytes is considered a JSON object/array + * if it contains a matching number of opening and closing braces/brackets. It's up to a subsequent + * {@link io.netty.channel.ChannelHandler} to parse the JSON text into a more usable form i.e. a POJO. + */ +public class JsonObjectDecoder extends ByteToMessageDecoder { + + private static final int ST_CORRUPTED = -1; + private static final int ST_INIT = 0; + private static final int ST_DECODING_NORMAL = 1; + private static final int ST_DECODING_ARRAY_STREAM = 2; + + private int openBraces; + private int idx; + + private int state; + private boolean insideString; + + private final int maxObjectLength; + private final boolean streamArrayElements; + + public JsonObjectDecoder() { + // 1 MB + this(1024 * 1024); + } + + public JsonObjectDecoder(int maxObjectLength) { + this(maxObjectLength, false); + } + + public JsonObjectDecoder(boolean streamArrayElements) { + this(1024 * 1024, streamArrayElements); + } + + /** + * @param maxObjectLength maximum number of bytes a JSON object/array may use (including braces and all). + * Objects exceeding this length are dropped and an {@link io.netty.handler.codec.TooLongFrameException} + * is thrown. + * @param streamArrayElements if set to true and the "top level" JSON object is an array, each of its entries + * is passed through the pipeline individually and immediately after it was fully + * received, allowing for arrays with "infinitely" many elements. + * + */ + public JsonObjectDecoder(int maxObjectLength, boolean streamArrayElements) { + if (maxObjectLength < 1) { + throw new IllegalArgumentException("maxObjectLength must be a positive int"); + } + this.maxObjectLength = maxObjectLength; + this.streamArrayElements = streamArrayElements; + } + + @Override + protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { + if (state == ST_CORRUPTED) { + in.skipBytes(in.readableBytes()); + return; + } + + // index of next byte to process. + int idx = this.idx; + int wrtIdx = in.writerIndex(); + + if (wrtIdx > maxObjectLength) { + // buffer size exceeded maxObjectLength; discarding the complete buffer. + in.skipBytes(in.readableBytes()); + reset(); + throw new TooLongFrameException( + "object length exceeds " + maxObjectLength + ": " + wrtIdx + " bytes discarded"); + } + + for (/* use current idx */; idx < wrtIdx; idx++) { + byte c = in.getByte(idx); + if (state == ST_DECODING_NORMAL) { + decodeByte(c, in, idx); + + // All opening braces/brackets have been closed. That's enough to conclude + // that the JSON object/array is complete. + if (openBraces == 0) { + ByteBuf json = extractObject(ctx, in, in.readerIndex(), idx + 1 - in.readerIndex()); + if (json != null) { + out.add(json); + } + + // The JSON object/array was extracted => discard the bytes from + // the input buffer. + in.readerIndex(idx + 1); + // Reset the object state to get ready for the next JSON object/text + // coming along the byte stream. + reset(); + } + } else if (state == ST_DECODING_ARRAY_STREAM) { + decodeByte(c, in, idx); + + if (!insideString && (openBraces == 1 && c == ',' || openBraces == 0 && c == ']')) { + // skip leading spaces. No range check is needed and the loop will terminate + // because the byte at position idx is not a whitespace. + for (int i = in.readerIndex(); Character.isWhitespace(in.getByte(i)); i++) { + in.skipBytes(1); + } + + // skip trailing spaces. + int idxNoSpaces = idx - 1; + while (idxNoSpaces >= in.readerIndex() && Character.isWhitespace(in.getByte(idxNoSpaces))) { + idxNoSpaces--; + } + + ByteBuf json = extractObject(ctx, in, in.readerIndex(), idxNoSpaces + 1 - in.readerIndex()); + if (json != null) { + out.add(json); + } + + in.readerIndex(idx + 1); + + if (c == ']') { + reset(); + } + } + // JSON object/array detected. Accumulate bytes until all braces/brackets are closed. + } else if (c == '{' || c == '[') { + initDecoding(c); + + if (state == ST_DECODING_ARRAY_STREAM) { + // Discard the array bracket + in.skipBytes(1); + } + // Discard leading spaces in front of a JSON object/array. + } else if (Character.isWhitespace(c)) { + in.skipBytes(1); + } else { + state = ST_CORRUPTED; + throw new CorruptedFrameException( + "invalid JSON received at byte position " + idx + ": " + ByteBufUtil.hexDump(in)); + } + } + + if (in.readableBytes() == 0) { + this.idx = 0; + } else { + this.idx = idx; + } + } + + /** + * Override this method if you want to filter the json objects/arrays that get passed through the pipeline. + */ + @SuppressWarnings("UnusedParameters") + protected ByteBuf extractObject(ChannelHandlerContext ctx, ByteBuf buffer, int index, int length) { + return buffer.slice(index, length).retain(); + } + + private void decodeByte(byte c, ByteBuf in, int idx) { + if ((c == '{' || c == '[') && !insideString) { + openBraces++; + } else if ((c == '}' || c == ']') && !insideString) { + openBraces--; + } else if (c == '"') { + // start of a new JSON string. It's necessary to detect strings as they may + // also contain braces/brackets and that could lead to incorrect results. + if (!insideString) { + insideString = true; + // If the double quote wasn't escaped then this is the end of a string. + } else if (in.getByte(idx - 1) != '\\') { + insideString = false; + } + } + } + + private void initDecoding(byte openingBrace) { + openBraces = 1; + if (openingBrace == '[' && streamArrayElements) { + state = ST_DECODING_ARRAY_STREAM; + } else { + state = ST_DECODING_NORMAL; + } + } + + private void reset() { + insideString = false; + state = ST_INIT; + openBraces = 0; + } +} \ No newline at end of file diff --git a/ripc-transport-netty4-examples/src/main/java/io/ripc/transport/netty4/tcp/CodecSample.java b/ripc-transport-netty4-examples/src/main/java/io/ripc/transport/netty4/tcp/CodecSample.java new file mode 100644 index 0000000..be24c62 --- /dev/null +++ b/ripc-transport-netty4-examples/src/main/java/io/ripc/transport/netty4/tcp/CodecSample.java @@ -0,0 +1,238 @@ +package io.ripc.transport.netty4.tcp; + +import com.fasterxml.jackson.databind.ObjectMapper; +import io.netty.buffer.ByteBuf; +import io.netty.channel.Channel; +import io.netty.channel.ChannelConfig; +import io.netty.channel.ChannelDuplexHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.ChannelPromise; +import io.netty.channel.FixedRecvByteBufAllocator; +import io.netty.handler.codec.LineBasedFrameDecoder; +import io.netty.handler.codec.json.JsonObjectDecoder; +import io.netty.handler.codec.string.StringDecoder; +import io.netty.handler.codec.string.StringEncoder; +import io.netty.util.CharsetUtil; +import io.ripc.internal.Publishers; +import io.ripc.protocol.tcp.TcpServer; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.charset.Charset; +import java.util.concurrent.CompletableFuture; + +public class CodecSample { + + private static final Logger LOG = LoggerFactory.getLogger(CodecSample.class); + + + public static void main(String... args) { + echoWithLineBasedFrameDecoder(); +// echoJsonStreamDecoding(); + } + + private static void echoWithLineBasedFrameDecoder() { + + TcpServer server = Netty4TcpServer.create( + 0, + new ChannelInitializer() { + @Override + protected void initChannel(Channel channel) throws Exception { + int bufferSize = 1024; + ChannelConfig config = channel.config(); + config.setOption(ChannelOption.SO_RCVBUF, bufferSize); + config.setOption(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(bufferSize)); + channel.pipeline().addFirst( + new LineBasedFrameDecoder(256), + new StringDecoder(CharsetUtil.UTF_8), + new StringEncoder(CharsetUtil.UTF_8)); + } + }); + + server.start(conn -> { + conn.subscribe(new Subscriber() { + + private Subscription subscription; + + @Override + public void onSubscribe(Subscription s) { + this.subscription = s; + LOG.info("requesting 1..."); + s.request(1); + } + + @Override + public void onNext(String s) { + LOG.info("onNext: {}", s); + conn.write(Publishers.just("Hello " + s + "\n")) + .subscribe(new Subscriber() { + + @Override + public void onSubscribe(Subscription s) { + } + + @Override + public void onNext(Void aVoid) { + } + + @Override + public void onError(Throwable t) { + } + + @Override + public void onComplete() { + LOG.info("requesting 1..."); + subscription.request(1); + } + }); + } + + @Override + public void onError(Throwable t) { + LOG.error(t.getMessage(), t); + } + + @Override + public void onComplete() { + LOG.info("onComplete"); + } + }); + + return Publishers.just(null); + }); + + server.awaitShutdown(); + } + + private static void echoJsonStreamDecoding() { + + TcpServer server = Netty4TcpServer.create( + 0, + new ChannelInitializer() { + @Override + protected void initChannel(Channel channel) throws Exception { + channel.pipeline() + .addFirst(new JsonObjectDecoder(), + new JacksonJsonCodec()); + } + }); + + server.start(conn -> { + conn.subscribe(new Subscriber() { + + private Subscription subscription; + + @Override + public void onSubscribe(Subscription s) { + this.subscription = s; + LOG.info("requesting 1..."); + s.request(1); + } + + @Override + public void onNext(Person p) { + LOG.info("onNext: {}", p); + conn.write(Publishers.just(new Person(p.getLastName(), p.getFirstName()))) + .subscribe(new Subscriber() { + + @Override + public void onSubscribe(Subscription s) { + } + + @Override + public void onNext(Void aVoid) { + } + + @Override + public void onError(Throwable t) { + } + + @Override + public void onComplete() { + LOG.info("requesting 1..."); + subscription.request(1); + } + }); + } + + @Override + public void onError(Throwable t) { + LOG.error(t.getMessage(), t); + } + + @Override + public void onComplete() { + LOG.info("onComplete"); + } + }); + + return Publishers.just(null); + }); + + server.awaitShutdown(); + } + + private static class JacksonJsonCodec extends ChannelDuplexHandler { + + private final ObjectMapper mapper = new ObjectMapper(); + + @Override + public void channelRead(ChannelHandlerContext context, Object message) throws Exception { + if (message instanceof ByteBuf) { + Charset charset = Charset.defaultCharset(); + message = this.mapper.readValue(((ByteBuf) message).toString(charset), Person.class); + } + super.channelRead(context, message); + } + + @Override + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { + if (msg instanceof Person) { + byte[] buff = mapper.writeValueAsBytes(msg); + ByteBuf bb = ctx.alloc().buffer(buff.length); + bb.writeBytes(buff); + msg = bb; + } + super.write(ctx, msg, promise); + } + } + + private static class Person { + + private String firstName; + + private String lastName; + + public Person() { + } + + public Person(String firstName, String lastName) { + this.firstName = firstName; + this.lastName = lastName; + } + + public String getFirstName() { + return firstName; + } + + public Person setFirstName(String firstName) { + this.firstName = firstName; + return this; + } + + public String getLastName() { + return lastName; + } + + public Person setLastName(String lastName) { + this.lastName = lastName; + return this; + } + } + +} diff --git a/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/Netty4TcpServer.java b/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/Netty4TcpServer.java index a8d8ab8..217cfaf 100644 --- a/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/Netty4TcpServer.java +++ b/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/Netty4TcpServer.java @@ -19,11 +19,17 @@ public class Netty4TcpServer extends TcpServer { private static final Logger logger = LoggerFactory.getLogger(Netty4TcpServer.class); private final int port; + private final ChannelInitializer initializer; private ServerBootstrap bootstrap; private ChannelFuture bindFuture; protected Netty4TcpServer(int port) { + this(port, null); + } + + protected Netty4TcpServer(int port, ChannelInitializer initializer) { this.port = port; + this.initializer = initializer; bootstrap = new ServerBootstrap() .group(new NioEventLoopGroup()) .channel(NioServerSocketChannel.class); @@ -34,6 +40,9 @@ protected Netty4TcpServer doStart(final TcpHandler handler) { bootstrap.childHandler(new ChannelInitializer() { @Override protected void initChannel(Channel ch) throws Exception { + if (initializer != null) { + ch.pipeline().addLast(initializer); + } ch.pipeline().addLast("server_handler", new ChannelToConnectionBridge<>(handler)); } }); @@ -80,4 +89,8 @@ public static TcpServer create(int port) { return new Netty4TcpServer<>(port); } + public static TcpServer create(int port, ChannelInitializer initializer) { + return new Netty4TcpServer(port, initializer); + } + } From 17441f087490e1b01c55e631ea13e49866ae5a26 Mon Sep 17 00:00:00 2001 From: Sebastien Deleuze Date: Wed, 1 Jul 2015 11:36:59 +0200 Subject: [PATCH 4/5] Add unit tests Fixes #34 --- build.gradle | 13 ++++ .../java/io/ripc/internal/Publishers.java | 6 +- .../java/io/ripc/protocol/tcp/TcpServer.java | 2 + .../protocol/tcp/ReactorTcpServerSample.java | 4 +- .../protocol/tcp/ReactorTcpServer.java | 15 +++++ .../ripc/reactor/ReactorTcpServerTests.java | 62 +++++++++++++++++++ .../io/ripc/rx/protocol/tcp/RxTcpServer.java | 4 ++ .../java/io/ripc/rx/RxTcpServerTests.java | 58 +++++++++++++++++ .../java/io/ripc/test/SocketTestUtils.java | 52 ++++++++++++++++ .../transport/netty4/tcp/Netty4TcpServer.java | 10 ++- .../ripc/transport/netty4/TcpServerTests.java | 62 +++++++++++++++++++ settings.gradle | 3 +- 12 files changed, 284 insertions(+), 7 deletions(-) create mode 100644 ripc-reactor/src/test/java/io/ripc/reactor/ReactorTcpServerTests.java create mode 100644 ripc-rxjava1/src/test/java/io/ripc/rx/RxTcpServerTests.java create mode 100644 ripc-test/src/main/java/io/ripc/test/SocketTestUtils.java create mode 100644 ripc-transport-netty4/src/test/java/io/ripc/transport/netty4/TcpServerTests.java diff --git a/build.gradle b/build.gradle index 47cd6a9..d8cfa3b 100644 --- a/build.gradle +++ b/build.gradle @@ -126,6 +126,7 @@ project('ripc-transport-netty4') { // ripc-tcp compile project(":ripc-protocol-tcp"), "io.netty:netty-all:$nettyVersion" + testCompile project(":ripc-test") } } @@ -136,6 +137,8 @@ project('ripc-rxjava1') { compile project(":ripc-protocol-tcp"), "io.reactivex:rxjava:$rxjava1Version", "io.reactivex:rxjava-reactive-streams:$rxjavaRsVersion" + testCompile project(":ripc-transport-netty4"), + project(":ripc-test") } } @@ -145,6 +148,8 @@ project('ripc-reactor') { // ripc-tcp compile project(":ripc-protocol-tcp"), "io.projectreactor:reactor-stream:$reactorVersion" + testCompile project(":ripc-transport-netty4"), + project(":ripc-test") } } @@ -170,6 +175,14 @@ project('ripc-reactor-examples') { } } +project('ripc-test') { + description = 'Reactive IPC Test Components' + dependencies { + // Reactive Streams + compile "org.reactivestreams:reactive-streams:$reactiveStreamsVersion" + } +} + configure(rootProject) { description = "Reactive IPC" diff --git a/ripc-core/src/main/java/io/ripc/internal/Publishers.java b/ripc-core/src/main/java/io/ripc/internal/Publishers.java index 3cc5787..7f5718f 100644 --- a/ripc-core/src/main/java/io/ripc/internal/Publishers.java +++ b/ripc-core/src/main/java/io/ripc/internal/Publishers.java @@ -9,14 +9,16 @@ */ public class Publishers { - public static Publisher just(final T value) { + public static Publisher just(final T... values) { return new Publisher() { @Override public void subscribe(final Subscriber s) { s.onSubscribe(new Subscription() { @Override public void request(long n) { - s.onNext(value); + for (T value : values) { + s.onNext(value); + } s.onComplete(); } diff --git a/ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/TcpServer.java b/ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/TcpServer.java index d171532..5747d4e 100644 --- a/ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/TcpServer.java +++ b/ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/TcpServer.java @@ -36,4 +36,6 @@ public final boolean shutdown() { protected abstract TcpServer doStart(TcpHandler handler); + public abstract int getPort(); + } diff --git a/ripc-reactor-examples/src/main/java/io/ripc/reactor/protocol/tcp/ReactorTcpServerSample.java b/ripc-reactor-examples/src/main/java/io/ripc/reactor/protocol/tcp/ReactorTcpServerSample.java index e0ab929..8b6c17e 100644 --- a/ripc-reactor-examples/src/main/java/io/ripc/reactor/protocol/tcp/ReactorTcpServerSample.java +++ b/ripc-reactor-examples/src/main/java/io/ripc/reactor/protocol/tcp/ReactorTcpServerSample.java @@ -26,7 +26,7 @@ public static void main(String... args) throws InterruptedException { */ private static void echo(TcpServer transport) { ReactorTcpServer.create(transport) - .start(connection -> { + .startAndAwait(connection -> { connection.flatMap(inByteBuf -> { String text = "Hello " + inByteBuf.toString(Charset.defaultCharset()); ByteBuf outByteBuf = Unpooled.buffer().writeBytes(text.getBytes()); @@ -59,4 +59,4 @@ private static void echoWithQuitCommand(TcpServer transport) { }); } -} +} \ No newline at end of file diff --git a/ripc-reactor/src/main/java/io/ripc/reactor/protocol/tcp/ReactorTcpServer.java b/ripc-reactor/src/main/java/io/ripc/reactor/protocol/tcp/ReactorTcpServer.java index d239f4f..5612e9b 100644 --- a/ripc-reactor/src/main/java/io/ripc/reactor/protocol/tcp/ReactorTcpServer.java +++ b/ripc-reactor/src/main/java/io/ripc/reactor/protocol/tcp/ReactorTcpServer.java @@ -24,6 +24,17 @@ public class ReactorTcpServer { public ReactorTcpServer start(final ReactorTcpHandler handler) { + transport.start(new TcpHandler() { + @Override + public Publisher handle(TcpConnection connection) { + return handler.apply(new ReactorTcpConnection<>(connection)); + } + }); + return this; + } + + public ReactorTcpServer startAndAwait(final ReactorTcpHandler handler) { + transport.startAndAwait(new TcpHandler() { @Override public Publisher handle(TcpConnection connection) { @@ -39,6 +50,10 @@ public boolean shutdown() { return b; } + public int getPort() { + return transport.getPort(); + } + public static ReactorTcpServer create(TcpServer transport) { return new ReactorTcpServer<>(transport); } diff --git a/ripc-reactor/src/test/java/io/ripc/reactor/ReactorTcpServerTests.java b/ripc-reactor/src/test/java/io/ripc/reactor/ReactorTcpServerTests.java new file mode 100644 index 0000000..1e70063 --- /dev/null +++ b/ripc-reactor/src/test/java/io/ripc/reactor/ReactorTcpServerTests.java @@ -0,0 +1,62 @@ +/* + * Copyright 2002-2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.ripc.reactor; + +import java.io.IOException; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.ripc.reactor.protocol.tcp.ReactorTcpServer; +import io.ripc.test.SocketTestUtils; +import io.ripc.transport.netty4.tcp.Netty4TcpServer; +import org.junit.After; +import static org.junit.Assert.assertEquals; +import org.junit.Before; +import org.junit.Test; +import reactor.rx.Promise; +import reactor.rx.Promises; +import reactor.rx.Streams; + +public class ReactorTcpServerTests { + + private ReactorTcpServer reactorServer; + + @Before + public void setup() { + reactorServer = ReactorTcpServer.create(Netty4TcpServer.create(0)); + } + + @After + public void tearDown() { + reactorServer.shutdown(); + } + + @Test + public void writeSingleValue() throws IOException { + reactorServer.start(connection -> connection.writeWith(Streams.just(Unpooled.buffer().writeBytes("test".getBytes())))); + assertEquals("test", SocketTestUtils.read("localhost", reactorServer.getPort())); + } + + @Test + public void writeMultipleValues() throws IOException { + Promise chunk1 = Promises.success(Unpooled.buffer().writeBytes("This is".getBytes())); + Promise chunk2 = Promises.success(Unpooled.buffer().writeBytes(" a test!".getBytes())); + reactorServer.start(connection -> connection.writeWith(Streams.concat(chunk1, chunk2))); + assertEquals("This is a test!", SocketTestUtils.read("localhost", reactorServer.getPort())); + } + +} diff --git a/ripc-rxjava1/src/main/java/io/ripc/rx/protocol/tcp/RxTcpServer.java b/ripc-rxjava1/src/main/java/io/ripc/rx/protocol/tcp/RxTcpServer.java index 11c7606..b526cef 100644 --- a/ripc-rxjava1/src/main/java/io/ripc/rx/protocol/tcp/RxTcpServer.java +++ b/ripc-rxjava1/src/main/java/io/ripc/rx/protocol/tcp/RxTcpServer.java @@ -39,6 +39,10 @@ public void awaitShutdown() { transport.awaitShutdown(); } + public int getPort() { + return transport.getPort(); + } + public static RxTcpServer create(TcpServer transport) { return new RxTcpServer<>(transport); } diff --git a/ripc-rxjava1/src/test/java/io/ripc/rx/RxTcpServerTests.java b/ripc-rxjava1/src/test/java/io/ripc/rx/RxTcpServerTests.java new file mode 100644 index 0000000..a4d3128 --- /dev/null +++ b/ripc-rxjava1/src/test/java/io/ripc/rx/RxTcpServerTests.java @@ -0,0 +1,58 @@ +package io.ripc.rx;/* + * Copyright 2002-2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.IOException; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.ripc.rx.protocol.tcp.RxTcpServer; +import io.ripc.test.SocketTestUtils; +import io.ripc.transport.netty4.tcp.Netty4TcpServer; +import org.junit.After; +import static org.junit.Assert.assertEquals; +import org.junit.Before; +import org.junit.Test; +import rx.Observable; + +public class RxTcpServerTests { + + private RxTcpServer rxServer; + + @Before + public void setup() { + rxServer = RxTcpServer.create(Netty4TcpServer.create(0)); + } + + @After + public void tearDown() { + rxServer.shutdown(); + } + + @Test + public void writeSingleValue() throws IOException { + rxServer.start(connection -> connection.write(Observable.just(Unpooled.buffer().writeBytes("test".getBytes())))); + assertEquals("test", SocketTestUtils.read("localhost", rxServer.getPort())); + } + + @Test + public void writeMultipleValues() throws IOException { + Observable chunk1 = Observable.just(Unpooled.buffer().writeBytes("This is".getBytes())); + Observable chunk2 = Observable.just(Unpooled.buffer().writeBytes(" a test!".getBytes())); + rxServer.start(connection -> connection.write(Observable.merge(chunk1, chunk2))); + assertEquals("This is a test!", SocketTestUtils.read("localhost", rxServer.getPort())); + } + +} diff --git a/ripc-test/src/main/java/io/ripc/test/SocketTestUtils.java b/ripc-test/src/main/java/io/ripc/test/SocketTestUtils.java new file mode 100644 index 0000000..6914f7c --- /dev/null +++ b/ripc-test/src/main/java/io/ripc/test/SocketTestUtils.java @@ -0,0 +1,52 @@ +/* + * Copyright 2002-2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.ripc.test; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.net.Socket; + +public class SocketTestUtils { + + public static String read(String host, int port) { + return read(host, port, null); + } + + public static String read(String host, int port, String dataToSend) { + try { + Socket socket = new Socket(host, port); + InputStreamReader reader = new InputStreamReader(socket.getInputStream()); + if (dataToSend != null) { + DataOutputStream outputStream = new DataOutputStream(socket.getOutputStream()); + outputStream.writeBytes(dataToSend); + } + StringBuilder content = new StringBuilder(); + int c = reader.read(); + while (c != -1) { + content.append((char)c); + c = reader.read(); + } + reader.close(); + return content.toString(); + } + catch (IOException e) { + throw new IllegalStateException(e); + } + } + +} diff --git a/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/Netty4TcpServer.java b/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/Netty4TcpServer.java index 217cfaf..389bc29 100644 --- a/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/Netty4TcpServer.java +++ b/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/Netty4TcpServer.java @@ -18,7 +18,7 @@ public class Netty4TcpServer extends TcpServer { private static final Logger logger = LoggerFactory.getLogger(Netty4TcpServer.class); - private final int port; + private int port; private final ChannelInitializer initializer; private ServerBootstrap bootstrap; private ChannelFuture bindFuture; @@ -54,7 +54,8 @@ protected void initChannel(Channel ch) throws Exception { } SocketAddress localAddress = bindFuture.channel().localAddress(); if (localAddress instanceof InetSocketAddress) { - logger.info("Started server at port: " + ((InetSocketAddress) localAddress).getPort()); + port = ((InetSocketAddress) localAddress).getPort(); + logger.info("Started server at port: " + port); } } catch (InterruptedException e) { @@ -85,6 +86,11 @@ public boolean doShutdown() { } } + @Override + public int getPort() { + return port; + } + public static TcpServer create(int port) { return new Netty4TcpServer<>(port); } diff --git a/ripc-transport-netty4/src/test/java/io/ripc/transport/netty4/TcpServerTests.java b/ripc-transport-netty4/src/test/java/io/ripc/transport/netty4/TcpServerTests.java new file mode 100644 index 0000000..ce1e645 --- /dev/null +++ b/ripc-transport-netty4/src/test/java/io/ripc/transport/netty4/TcpServerTests.java @@ -0,0 +1,62 @@ +/* + * Copyright 2002-2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.ripc.transport.netty4; + +import java.io.IOException; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.ripc.internal.Publishers; +import io.ripc.protocol.tcp.TcpServer; +import io.ripc.test.SocketTestUtils; +import io.ripc.transport.netty4.tcp.Netty4TcpServer; +import org.junit.After; +import static org.junit.Assert.assertEquals; +import org.junit.Before; +import org.junit.Test; + +public class TcpServerTests { + + private TcpServer server; + + @Before + public void setup() { + server = Netty4TcpServer.create(0); + } + + @After + public void tearDown() { + server.shutdown(); + } + + @Test + public void writeSingleValue() throws IOException { + server.start(connection -> connection.write(Publishers.just(Unpooled.buffer().writeBytes("test".getBytes())))); + assertEquals("test", SocketTestUtils.read("localhost", server.getPort())); + } + + @Test + public void writeMultipleValues() throws IOException { + server.start(connection -> { + ByteBuf chunk1 = Unpooled.buffer().writeBytes("This is".getBytes()); + ByteBuf chunk2 = Unpooled.buffer().writeBytes(" a test!".getBytes()); + return connection.write(Publishers.just(chunk1, chunk2)); + }); + assertEquals("This is a test!", SocketTestUtils.read("localhost", server.getPort())); + } + +} diff --git a/settings.gradle b/settings.gradle index 5aa324b..d726466 100644 --- a/settings.gradle +++ b/settings.gradle @@ -7,4 +7,5 @@ include 'ripc-core', 'ripc-reactor', 'ripc-rxjava1', 'ripc-reactor-examples', - 'ripc-rxjava1-examples' + 'ripc-rxjava1-examples', + 'ripc-test' From 32f1dc4707149fbce90362c84c8080faf139c464 Mon Sep 17 00:00:00 2001 From: Stephane Maldini Date: Fri, 3 Jul 2015 14:49:32 +0100 Subject: [PATCH 5/5] Port PublisherFactory (Lite version) over from Reactor to have a TCK ready Publishers.just Move Publishers to common test dependency ripc-test --- build.gradle | 8 + .../protocol/tcp/ReactorTcpServerSample.java | 45 +-- .../main/java/io/ripc/test}/Publishers.java | 35 +- .../ripc/test/internal/PublisherFactory.java | 347 ++++++++++++++++++ .../test/internal/SubscriberWithContext.java | 109 ++++++ .../transport/netty4/tcp/CodecSample.java | 4 +- .../transport/netty4/tcp/TcpServerSample.java | 2 +- .../netty4/tcp/ChannelToConnectionBridge.java | 98 +++-- .../transport/netty4/tcp/Netty4TcpServer.java | 1 + .../netty4/tcp/TcpConnectionImpl.java | 13 + .../ripc/transport/netty4/TcpServerTests.java | 2 +- 11 files changed, 579 insertions(+), 85 deletions(-) rename {ripc-core/src/main/java/io/ripc/internal => ripc-test/src/main/java/io/ripc/test}/Publishers.java (57%) create mode 100644 ripc-test/src/main/java/io/ripc/test/internal/PublisherFactory.java create mode 100644 ripc-test/src/main/java/io/ripc/test/internal/SubscriberWithContext.java diff --git a/build.gradle b/build.gradle index d8cfa3b..9b4b080 100644 --- a/build.gradle +++ b/build.gradle @@ -116,6 +116,10 @@ project('ripc-transport-netty4-examples') { "com.fasterxml.jackson.core:jackson-databind:2.5.4", "io.reactivex:rxjava:$rxjava1Version", "io.reactivex:rxjava-reactive-streams:$rxjavaRsVersion" + + //common facilities for samples + compile project(":ripc-test") + runtime "ch.qos.logback:logback-classic:$logbackVersion" } } @@ -171,6 +175,10 @@ project('ripc-reactor-examples') { compile project(":ripc-reactor"), project(":ripc-transport-netty4"), "com.fasterxml.jackson.core:jackson-databind:2.5.4" + + //common facilities for samples + compile project(":ripc-test") + runtime "ch.qos.logback:logback-classic:$logbackVersion" } } diff --git a/ripc-reactor-examples/src/main/java/io/ripc/reactor/protocol/tcp/ReactorTcpServerSample.java b/ripc-reactor-examples/src/main/java/io/ripc/reactor/protocol/tcp/ReactorTcpServerSample.java index 8b6c17e..acd5c01 100644 --- a/ripc-reactor-examples/src/main/java/io/ripc/reactor/protocol/tcp/ReactorTcpServerSample.java +++ b/ripc-reactor-examples/src/main/java/io/ripc/reactor/protocol/tcp/ReactorTcpServerSample.java @@ -26,37 +26,32 @@ public static void main(String... args) throws InterruptedException { */ private static void echo(TcpServer transport) { ReactorTcpServer.create(transport) - .startAndAwait(connection -> { - connection.flatMap(inByteBuf -> { - String text = "Hello " + inByteBuf.toString(Charset.defaultCharset()); - ByteBuf outByteBuf = Unpooled.buffer().writeBytes(text.getBytes()); - return connection.writeWith(Streams.just(outByteBuf)); - }).consume(); - return Streams.never(); - }); + .startAndAwait(connection -> { + connection.flatMap(inByteBuf -> { + String text = "Hello " + inByteBuf.toString(Charset.defaultCharset()); + ByteBuf outByteBuf = Unpooled.buffer().writeBytes(text.getBytes()); + return connection.writeWith(Streams.just(outByteBuf)); + }).consume(); + return Streams.never(); + }); } /** - * Keep echoing until the client sends "quite". + * Keep echoing until the client sends "quit". */ private static void echoWithQuitCommand(TcpServer transport) { ReactorTcpServer.create(transport) - .start(connection -> { - Promise promise = Promises.prepare(); - connection.flatMap(inByteBuf -> { - String input = inByteBuf.toString(Charset.defaultCharset()).trim(); - if ("quit".equalsIgnoreCase(input)) { - promise.onComplete(); - return promise; - } - else { - String text = "Hello " + inByteBuf.toString(Charset.defaultCharset()); - ByteBuf outByteBuf = Unpooled.buffer().writeBytes(text.getBytes()); - return connection.writeWith(Streams.just(outByteBuf)); - } - }).consume(); - return promise; - }); + .start(connection -> connection + .map(byteBuf -> byteBuf.toString(Charset.defaultCharset())) + .takeWhile(input -> !"quit".equalsIgnoreCase(input.trim())) + .filter(input -> !"quit".equalsIgnoreCase(input.trim())) + .map(input -> "Hello " + input) + .flatMap(text -> connection.writeWith( + Streams.just(Unpooled.buffer().writeBytes(text.getBytes())) + ) + ) + .after() + ); } } \ No newline at end of file diff --git a/ripc-core/src/main/java/io/ripc/internal/Publishers.java b/ripc-test/src/main/java/io/ripc/test/Publishers.java similarity index 57% rename from ripc-core/src/main/java/io/ripc/internal/Publishers.java rename to ripc-test/src/main/java/io/ripc/test/Publishers.java index 7f5718f..b8660c4 100644 --- a/ripc-core/src/main/java/io/ripc/internal/Publishers.java +++ b/ripc-test/src/main/java/io/ripc/test/Publishers.java @@ -1,33 +1,30 @@ -package io.ripc.internal; +package io.ripc.test; +import io.ripc.test.internal.PublisherFactory; import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; +import java.util.Arrays; +import java.util.List; + /** * Temporary utility class for creating and transforming {@link Publisher}s. */ public class Publishers { public static Publisher just(final T... values) { - return new Publisher() { - @Override - public void subscribe(final Subscriber s) { - s.onSubscribe(new Subscription() { - @Override - public void request(long n) { - for (T value : values) { - s.onNext(value); - } - s.onComplete(); - } - - @Override - public void cancel() { - } - }); - } - }; + final List list = Arrays.asList(values); + return PublisherFactory.forEach( + sub -> { + if (sub.context().hasNext()) { + sub.onNext(sub.context().next()); + } else { + sub.onComplete(); + } + }, + sub -> list.iterator() + ); } public static Publisher error(final Throwable t) { diff --git a/ripc-test/src/main/java/io/ripc/test/internal/PublisherFactory.java b/ripc-test/src/main/java/io/ripc/test/internal/PublisherFactory.java new file mode 100644 index 0000000..28ead53 --- /dev/null +++ b/ripc-test/src/main/java/io/ripc/test/internal/PublisherFactory.java @@ -0,0 +1,347 @@ +/* + * Copyright (c) 2011-2015 Pivotal Software Inc, All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.ripc.test.internal; + +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; + + +import java.util.concurrent.atomic.AtomicLongFieldUpdater; +import java.util.function.BiConsumer; +import java.util.function.Consumer; +import java.util.function.Function; + +/** + * Original {@see reactor.core.reactivestreams.PublisherFactory} from {@see http://projectreactor.io}. + * Use JDK 8 constructs. + *

+ * A Reactive Streams {@link org.reactivestreams.Publisher} factory which callbacks on start, request and shutdown + *

+ * The Publisher will directly forward all the signals passed to the subscribers and complete when onComplete is called. + *

+ * Create such publisher with the provided factory, E.g.: + *

+ * {@code
+ * PublisherFactory.create((n, sub) -> {
+ *  for(int i = 0; i < n; i++){
+ *    sub.onNext(i);
+ *  }
+ * }
+ * }
+ * 
+ * + * @author Stephane Maldini + */ +public final class PublisherFactory { + + /** + * Create a {@link Publisher} reacting on requests with the passed {@link BiConsumer} + * + * @param requestConsumer A {@link BiConsumer} with left argument request and right argument target subscriber + * @param The type of the data sequence + * @return a fresh Reactive Streams publisher ready to be subscribed + */ + public static Publisher create(BiConsumer> requestConsumer) { + return create(requestConsumer, null, null); + } + + /** + * Create a {@link Publisher} reacting on requests with the passed {@link BiConsumer} + * The argument {@code contextFactory} is executed once by new subscriber to generate a context shared by every + * request calls. + * + * @param requestConsumer A {@link BiConsumer} with left argument request and right argument target subscriber + * @param contextFactory A {@link Function} called for every new subscriber returning an immutable context (IO + * connection...) + * @param The type of the data sequence + * @param The type of contextual information to be read by the requestConsumer + * @return a fresh Reactive Streams publisher ready to be subscribed + */ + public static Publisher create(BiConsumer> requestConsumer, + Function, C> contextFactory) { + return create(requestConsumer, contextFactory, null); + } + + + /** + * Create a {@link Publisher} reacting on requests with the passed {@link BiConsumer}. + * The argument {@code contextFactory} is executed once by new subscriber to generate a context shared by every + * request calls. + * The argument {@code shutdownConsumer} is executed once by subscriber termination event (cancel, onComplete, + * onError). + * + * @param requestConsumer A {@link BiConsumer} with left argument request and right argument target subscriber + * @param contextFactory A {@link Function} called once for every new subscriber returning an immutable context + * (IO connection...) + * @param shutdownConsumer A {@link Consumer} called once everytime a subscriber terminates: cancel, onComplete(), + * onError() + * @param The type of the data sequence + * @param The type of contextual information to be read by the requestConsumer + * @return a fresh Reactive Streams publisher ready to be subscribed + */ + public static Publisher create(BiConsumer> requestConsumer, + Function, C> contextFactory, + Consumer shutdownConsumer) { + + return new ReactorPublisher(requestConsumer, contextFactory, shutdownConsumer); + } + + + /** + * Create a {@link Publisher} reacting on each available {@link Subscriber} read derived with the passed {@link + * Consumer}. If a previous request is still running, avoid recursion and extend the previous request iterations. + * + * @param requestConsumer A {@link Consumer} invoked when available read with the target subscriber + * @param The type of the data sequence + * @return a fresh Reactive Streams publisher ready to be subscribed + */ + public static Publisher forEach(Consumer> requestConsumer) { + return forEach(requestConsumer, null, null); + } + + /** + * Create a {@link Publisher} reacting on each available {@link Subscriber} read derived with the passed {@link + * Consumer}. If a previous request is still running, avoid recursion and extend the previous request iterations. + * The argument {@code contextFactory} is executed once by new subscriber to generate a context shared by every + * request calls. + * + * @param requestConsumer A {@link Consumer} invoked when available read with the target subscriber + * @param contextFactory A {@link Function} called for every new subscriber returning an immutable context (IO + * connection...) + * @param The type of the data sequence + * @param The type of contextual information to be read by the requestConsumer + * @return a fresh Reactive Streams publisher ready to be subscribed + */ + public static Publisher forEach(Consumer> requestConsumer, + Function, C> contextFactory) { + return forEach(requestConsumer, contextFactory, null); + } + + + /** + * Create a {@link Publisher} reacting on each available {@link Subscriber} read derived with the passed {@link + * Consumer}. If a previous request is still running, avoid recursion and extend the previous request iterations. + * The argument {@code contextFactory} is executed once by new subscriber to generate a context shared by every + * request calls. + * The argument {@code shutdownConsumer} is executed once by subscriber termination event (cancel, onComplete, + * onError). + * + * @param requestConsumer A {@link Consumer} invoked when available read with the target subscriber + * @param contextFactory A {@link Function} called once for every new subscriber returning an immutable context + * (IO connection...) + * @param shutdownConsumer A {@link Consumer} called once everytime a subscriber terminates: cancel, onComplete(), + * onError() + * @param The type of the data sequence + * @param The type of contextual information to be read by the requestConsumer + * @return a fresh Reactive Streams publisher ready to be subscribed + */ + public static Publisher forEach(final Consumer> requestConsumer, + Function, C> contextFactory, + Consumer shutdownConsumer) { + return new ForEachPublisher(requestConsumer, contextFactory, shutdownConsumer); + } + + private static class ReactorPublisher implements Publisher { + + protected final Function, C> contextFactory; + protected final BiConsumer> requestConsumer; + protected final Consumer shutdownConsumer; + + protected ReactorPublisher(BiConsumer> requestConsumer, + Function, C> contextFactory, + Consumer shutdownConsumer) { + this.requestConsumer = requestConsumer; + this.contextFactory = contextFactory; + this.shutdownConsumer = shutdownConsumer; + } + + @Override + final public void subscribe(final Subscriber subscriber) { + try { + final C context = contextFactory != null ? contextFactory.apply(subscriber) : null; + subscriber.onSubscribe(createSubscription(subscriber, context)); + } catch (PrematureCompleteException pce) { + //IGNORE + } catch (Throwable throwable) { + subscriber.onError(throwable); + } + } + + protected Subscription createSubscription(Subscriber subscriber, C context) { + return new SubscriberProxy<>(subscriber, context, requestConsumer, shutdownConsumer); + } + } + + private static final class ForEachPublisher extends ReactorPublisher { + + final Consumer> forEachConsumer; + + + public ForEachPublisher(Consumer> forEachConsumer, Function, C> contextFactory, Consumer shutdownConsumer) { + super(null, contextFactory, shutdownConsumer); + this.forEachConsumer = forEachConsumer; + } + + @Override + protected Subscription createSubscription(Subscriber subscriber, C context) { + return new SubscriberProxy<>(subscriber, context, new ForEachBiConsumer<>(forEachConsumer), + shutdownConsumer); + } + } + + private final static class SubscriberProxy extends SubscriberWithContext implements Subscription { + + private final BiConsumer> requestConsumer; + private final Consumer shutdownConsumer; + + + public SubscriberProxy(Subscriber subscriber, + C context, + BiConsumer> requestConsumer, + Consumer shutdownConsumer + ) { + super(context, subscriber); + this.requestConsumer = requestConsumer; + this.shutdownConsumer = shutdownConsumer; + } + + @Override + public void request(long n) { + if (isCancelled()) { + return; + } + + if (n <= 0) { + onError(new IllegalArgumentException("Spec. Rule 3.9 - Cannot request a non strictly positive number:" + + " " + n)); + return; + } + + try { + requestConsumer.accept(n, this); + } catch (Throwable t) { + onError(t); + } + } + + @Override + public void cancel() { + if (TERMINAL_UPDATER.compareAndSet(this, 0, 1)) { + doShutdown(); + } + } + + @Override + public void onError(Throwable t) { + if (TERMINAL_UPDATER.compareAndSet(this, 0, 1)) { + doShutdown(); + subscriber.onError(t); + } + } + + @Override + public void onComplete() { + if (TERMINAL_UPDATER.compareAndSet(this, 0, 1)) { + doShutdown(); + try { + subscriber.onComplete(); + } catch (Throwable t) { + subscriber.onError(t); + } + } + } + + private void doShutdown() { + if (shutdownConsumer == null) return; + + try { + shutdownConsumer.accept(context); + } catch (Throwable t) { + subscriber.onError(t); + } + } + + @Override + public void onSubscribe(Subscription s) { + throw new UnsupportedOperationException(" the delegate subscriber is already subscribed"); + } + + @Override + public String toString() { + return context != null ? context.toString() : ("SubscriberProxy{" + + "requestConsumer=" + requestConsumer + + ", shutdownConsumer=" + shutdownConsumer + + '}'); + } + } + + private final static class ForEachBiConsumer implements BiConsumer> { + + private final Consumer> requestConsumer; + + private volatile long pending = 0L; + + private final static AtomicLongFieldUpdater PENDING_UPDATER = + AtomicLongFieldUpdater.newUpdater(ForEachBiConsumer.class, "pending"); + + public ForEachBiConsumer(Consumer> requestConsumer) { + this.requestConsumer = requestConsumer; + } + + @Override + public void accept(Long n, SubscriberWithContext sub) { + + if (pending == Long.MAX_VALUE) { + return; + } + + long demand = n; + long afterAdd; + if (!PENDING_UPDATER.compareAndSet(this, 0L, demand) + && (afterAdd = PENDING_UPDATER.addAndGet(this, demand)) != demand) { + if (afterAdd < 0L) { + if (!PENDING_UPDATER.compareAndSet(this, afterAdd, Long.MAX_VALUE)) { + return; + } + } else { + return; + } + } + + do { + long requestCursor = 0l; + while ((requestCursor++ < demand || demand == Long.MAX_VALUE) && !sub.isCancelled()) { + requestConsumer.accept(sub); + } + } while ((demand = PENDING_UPDATER.addAndGet(this, -demand)) > 0L && !sub.isCancelled()); + + } + } + + public static class PrematureCompleteException extends RuntimeException { + static public final PrematureCompleteException INSTANCE = new PrematureCompleteException(); + + private PrematureCompleteException() { + } + + @Override + public synchronized Throwable fillInStackTrace() { + return this; + } + } +} diff --git a/ripc-test/src/main/java/io/ripc/test/internal/SubscriberWithContext.java b/ripc-test/src/main/java/io/ripc/test/internal/SubscriberWithContext.java new file mode 100644 index 0000000..51f54af --- /dev/null +++ b/ripc-test/src/main/java/io/ripc/test/internal/SubscriberWithContext.java @@ -0,0 +1,109 @@ +/* + * Copyright (c) 2011-2015 Pivotal Software Inc, All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.ripc.test.internal; + +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; + +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; + +/** + * Original {@see reactor.core.reactivestreams.PublisherFactory} from {@see http://projectreactor.io}. + *

+ * A {@link Subscriber} with a typed stateful context. Some error isolation is also provided + * (onSubscribe, onNext and onComplete error is forwarded to onError). + * + * @author Stephane Maldini + */ +public class SubscriberWithContext implements Subscriber { + + private volatile int terminated = 0; + protected static final AtomicIntegerFieldUpdater TERMINAL_UPDATER = + AtomicIntegerFieldUpdater + .newUpdater(SubscriberWithContext.class, "terminated"); + + + protected final C context; + protected final Subscriber subscriber; + + /** + * Attach a given arbitrary context (stateful information) to a {@link Subscriber}, all Subscriber methods + * will delegate properly. + * + * @param subscriber the delegate subscriber to invoke on signal + * @param context the contextual state of any type to bind for later use + * @param Type of data sequence + * @param Type of attached stateful context + * @return a new Susbscriber with context information + */ + public static SubscriberWithContext create(Subscriber subscriber, C context) { + return new SubscriberWithContext<>(context, subscriber); + } + + protected SubscriberWithContext(C context, Subscriber subscriber) { + this.context = context; + this.subscriber = subscriber; + } + + /** + * The stateful context C + * + * @return the bound context + */ + public C context() { + return context; + } + + @Override + public void onSubscribe(Subscription s) { + try { + subscriber.onSubscribe(s); + } catch (Throwable throwable) { + subscriber.onError(throwable); + } + } + + @Override + public void onNext(T t) { + try { + subscriber.onNext(t); + } catch (Throwable throwable) { + subscriber.onError(throwable); + } + } + + @Override + public void onError(Throwable t) { + if (TERMINAL_UPDATER.compareAndSet(this, 0, 1)) { + subscriber.onError(t); + } + } + + @Override + public void onComplete() { + try { + if (TERMINAL_UPDATER.compareAndSet(this, 0, 1)) { + subscriber.onComplete(); + } + } catch (Throwable throwable) { + subscriber.onError(throwable); + } + } + + public boolean isCancelled() { + return terminated == 1; + } +} diff --git a/ripc-transport-netty4-examples/src/main/java/io/ripc/transport/netty4/tcp/CodecSample.java b/ripc-transport-netty4-examples/src/main/java/io/ripc/transport/netty4/tcp/CodecSample.java index be24c62..a2de07c 100644 --- a/ripc-transport-netty4-examples/src/main/java/io/ripc/transport/netty4/tcp/CodecSample.java +++ b/ripc-transport-netty4-examples/src/main/java/io/ripc/transport/netty4/tcp/CodecSample.java @@ -15,16 +15,14 @@ import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import io.netty.util.CharsetUtil; -import io.ripc.internal.Publishers; +import io.ripc.test.Publishers; import io.ripc.protocol.tcp.TcpServer; -import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.nio.charset.Charset; -import java.util.concurrent.CompletableFuture; public class CodecSample { diff --git a/ripc-transport-netty4-examples/src/main/java/io/ripc/transport/netty4/tcp/TcpServerSample.java b/ripc-transport-netty4-examples/src/main/java/io/ripc/transport/netty4/tcp/TcpServerSample.java index c7eddfd..016e451 100644 --- a/ripc-transport-netty4-examples/src/main/java/io/ripc/transport/netty4/tcp/TcpServerSample.java +++ b/ripc-transport-netty4-examples/src/main/java/io/ripc/transport/netty4/tcp/TcpServerSample.java @@ -3,7 +3,7 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; -import io.ripc.internal.Publishers; +import io.ripc.test.Publishers; import static java.nio.charset.Charset.*; import static rx.RxReactiveStreams.*; diff --git a/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/ChannelToConnectionBridge.java b/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/ChannelToConnectionBridge.java index 7a385a6..44e134d 100644 --- a/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/ChannelToConnectionBridge.java +++ b/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/ChannelToConnectionBridge.java @@ -15,16 +15,18 @@ import org.slf4j.LoggerFactory; /** - * A bridge between netty's {@link Channel} and {@link io.ripc.protocol.tcp.TcpConnection}. It has the following responsibilities: - * + * A bridge between netty's {@link Channel} and {@link io.ripc.protocol.tcp.TcpConnection}. It has the following + * responsibilities: + *

*

    -
  • Create a new {@link io.ripc.protocol.tcp.TcpConnection} instance when the channel is active and forwards it to the configured - {@link TcpHandler}.
  • -
  • Reads any data from the channel and forwards it to the {@link Subscriber} attached via the event - {@link ChannelToConnectionBridge.ConnectionInputSubscriberEvent}
  • -
  • Accepts writes of {@link Publisher} on the channel and translates the items emitted from that publisher to the - channel.
  • -
+ *
  • Create a new {@link io.ripc.protocol.tcp.TcpConnection} instance when the channel is active and forwards it to + * the configured + * {@link TcpHandler}.
  • + *
  • Reads any data from the channel and forwards it to the {@link Subscriber} attached via the event + * {@link ChannelToConnectionBridge.ConnectionInputSubscriberEvent}
  • + *
  • Accepts writes of {@link Publisher} on the channel and translates the items emitted from that publisher to the + * channel.
  • + * * * @param The type of objects read from the underneath channel. * @param The type of objects read written to the underneath channel. @@ -33,9 +35,9 @@ public class ChannelToConnectionBridge extends ChannelDuplexHandler { private static final Logger logger = LoggerFactory.getLogger(ChannelToConnectionBridge.class); - private final TcpHandler handler; - private TcpConnectionImpl conn; - private Subscriber inputSubscriber; /*Populated via event ConnectionInputSubscriberEvent*/ + private final TcpHandler handler; + private TcpConnectionImpl conn; + private Subscriber inputSubscriber; /*Populated via event ConnectionInputSubscriberEvent*/ public ChannelToConnectionBridge(TcpHandler handler) { this.handler = handler; @@ -46,28 +48,28 @@ public void channelActive(final ChannelHandlerContext ctx) throws Exception { super.channelActive(ctx); conn = new TcpConnectionImpl<>(ctx.channel()); handler.handle(conn) - .subscribe(new Subscriber() { - @Override - public void onSubscribe(Subscription s) { - // Void, no op - } - - @Override - public void onNext(Void aVoid) { - // Void, no op - } - - @Override - public void onError(Throwable t) { - logger.error("Error processing connection. Closing the channel.", t); - ctx.channel().close(); - } - - @Override - public void onComplete() { - ctx.channel().close(); - } - }); + .subscribe(new Subscriber() { + @Override + public void onSubscribe(Subscription s) { + s.request(Long.MAX_VALUE); //no op + } + + @Override + public void onNext(Void aVoid) { + // Void, no op + } + + @Override + public void onError(Throwable t) { + logger.error("Error processing connection. Closing the channel.", t); + ctx.channel().close(); + } + + @Override + public void onComplete() { + ctx.channel().close(); + } + }); } @SuppressWarnings("unchecked") @@ -102,6 +104,7 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc ConnectionInputSubscriberEvent subscriberEvent = (ConnectionInputSubscriberEvent) evt; if (null == inputSubscriber) { inputSubscriber = subscriberEvent.getInputSubscriber(); + subscriberEvent.init(ctx); } else { inputSubscriber.onError(new IllegalStateException("Only one connection input subscriber allowed.")); } @@ -161,11 +164,12 @@ public void operationComplete(ChannelFuture future) throws Exception { @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { - logger.error(cause.getMessage(),cause); + logger.error(cause.getMessage(), cause); } /** - * An event to attach a {@link Subscriber} to the {@link io.ripc.protocol.tcp.TcpConnection} created by {@link ChannelToConnectionBridge} + * An event to attach a {@link Subscriber} to the {@link io.ripc.protocol.tcp.TcpConnection} created by {@link + * ChannelToConnectionBridge} * * @param */ @@ -183,5 +187,27 @@ public ConnectionInputSubscriberEvent(Subscriber inputSubscriber) { public Subscriber getInputSubscriber() { return inputSubscriber; } + + void init(ChannelHandlerContext ctx) { + try { + inputSubscriber.onSubscribe(new Subscription() { + @Override + public void request(long n) { + /*if(n == Long.MAX_VALUE){ + ctx.channel().config().setAutoRead(true); + }*/ + //ctx.read(); implements backpressure + ctx.channel().config().setAutoRead(true); + } + + @Override + public void cancel() { + //implements close on cancel (must be after any pending onComplete) + } + }); + } catch (Throwable error) { + inputSubscriber.onError(error); + } + } } } diff --git a/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/Netty4TcpServer.java b/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/Netty4TcpServer.java index 389bc29..9e91174 100644 --- a/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/Netty4TcpServer.java +++ b/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/Netty4TcpServer.java @@ -43,6 +43,7 @@ protected void initChannel(Channel ch) throws Exception { if (initializer != null) { ch.pipeline().addLast(initializer); } + ch.config().setAutoRead(false); ch.pipeline().addLast("server_handler", new ChannelToConnectionBridge<>(handler)); } }); diff --git a/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/TcpConnectionImpl.java b/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/TcpConnectionImpl.java index 9768cb2..fe48ec2 100644 --- a/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/TcpConnectionImpl.java +++ b/ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/TcpConnectionImpl.java @@ -7,6 +7,7 @@ import io.ripc.transport.netty4.tcp.ChannelToConnectionBridge.ConnectionInputSubscriberEvent; import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; public class TcpConnectionImpl implements TcpConnection { @@ -31,6 +32,17 @@ public void subscribe(Subscriber s) { nettyChannel.pipeline().fireUserEventTriggered(new ConnectionInputSubscriberEvent<>(s)); } + private static final Subscription IGNORE_SUBSCRIPTION = new Subscription() { + @Override + public void request(long n) { + //IGNORE + } + + @Override + public void cancel() { + //IGNORE + } + }; private static class FutureToSubscriberBridge implements ChannelFutureListener { @@ -38,6 +50,7 @@ private static class FutureToSubscriberBridge implements ChannelFutureListener { public FutureToSubscriberBridge(Subscriber subscriber) { this.subscriber = subscriber; + subscriber.onSubscribe(IGNORE_SUBSCRIPTION); } @Override diff --git a/ripc-transport-netty4/src/test/java/io/ripc/transport/netty4/TcpServerTests.java b/ripc-transport-netty4/src/test/java/io/ripc/transport/netty4/TcpServerTests.java index ce1e645..ffcb222 100644 --- a/ripc-transport-netty4/src/test/java/io/ripc/transport/netty4/TcpServerTests.java +++ b/ripc-transport-netty4/src/test/java/io/ripc/transport/netty4/TcpServerTests.java @@ -20,7 +20,7 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; -import io.ripc.internal.Publishers; +import io.ripc.test.Publishers; import io.ripc.protocol.tcp.TcpServer; import io.ripc.test.SocketTestUtils; import io.ripc.transport.netty4.tcp.Netty4TcpServer;