Skip to content

Commit edf7897

Browse files
committed
Add unit tests
Fixes reactive-ipc#34
1 parent 455676d commit edf7897

File tree

13 files changed

+290
-16
lines changed

13 files changed

+290
-16
lines changed

build.gradle

+13
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,7 @@ project('ripc-transport-netty4') {
130130
// ripc-tcp
131131
compile project(":ripc-protocol-tcp"),
132132
"io.netty:netty-all:$nettyVersion"
133+
testCompile project(":ripc-test")
133134
}
134135
}
135136

@@ -140,6 +141,8 @@ project('ripc-rxjava1') {
140141
compile project(":ripc-protocol-tcp"),
141142
"io.reactivex:rxjava:$rxjava1Version",
142143
"io.reactivex:rxjava-reactive-streams:$rxjavaRsVersion"
144+
testCompile project(":ripc-transport-netty4"),
145+
project(":ripc-test")
143146
}
144147
}
145148

@@ -149,6 +152,8 @@ project('ripc-reactor') {
149152
// ripc-tcp
150153
compile project(":ripc-protocol-tcp"),
151154
"io.projectreactor:reactor-stream:$reactorVersion"
155+
testCompile project(":ripc-transport-netty4"),
156+
project(":ripc-test")
152157
}
153158
}
154159

@@ -175,6 +180,14 @@ project('ripc-reactor-examples') {
175180
}
176181
}
177182

183+
project('ripc-test') {
184+
description = 'Reactive IPC Test Components'
185+
dependencies {
186+
// Reactive Streams
187+
compile "org.reactivestreams:reactive-streams:$reactiveStreamsVersion"
188+
}
189+
}
190+
178191
configure(rootProject) {
179192
description = "Reactive IPC"
180193

ripc-core/src/main/java/io/ripc/internal/Publishers.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,16 @@
99
*/
1010
public class Publishers {
1111

12-
public static <T> Publisher<T> just(final T value) {
12+
public static <T> Publisher<T> just(final T... values) {
1313
return new Publisher<T>() {
1414
@Override
1515
public void subscribe(final Subscriber<? super T> s) {
1616
s.onSubscribe(new Subscription() {
1717
@Override
1818
public void request(long n) {
19-
s.onNext(value);
19+
for (T value : values) {
20+
s.onNext(value);
21+
}
2022
s.onComplete();
2123
}
2224

ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/TcpServer.java

+2
Original file line numberDiff line numberDiff line change
@@ -36,4 +36,6 @@ public final boolean shutdown() {
3636

3737
protected abstract TcpServer<R, W> doStart(TcpHandler<R, W> handler);
3838

39+
public abstract int getPort();
40+
3941
}

ripc-reactor-examples/src/main/java/io/ripc/reactor/protocol/tcp/CodecSample.java

+6-9
Original file line numberDiff line numberDiff line change
@@ -51,14 +51,11 @@ protected void initChannel(Channel channel) throws Exception {
5151
}
5252
});
5353

54-
ReactorTcpServer.create(transport).start(connection -> {
55-
connection.log("input")
56-
.observeComplete(v -> LOG.info("Connection input complete"))
57-
.capacity(1)
58-
.consume(line -> {
59-
String response = "Hello " + line + "\n";
60-
Streams.wrap(connection.writeWith(Streams.just(response))).consume();
61-
});
54+
ReactorTcpServer.create(transport).startAndAwait(connection -> {
55+
connection.log("input").observeComplete(v -> LOG.info("Connection input complete")).capacity(1).consume(line -> {
56+
String response = "Hello " + line + "\n";
57+
Streams.wrap(connection.writeWith(Streams.just(response))).consume();
58+
});
6259
return Streams.never();
6360
});
6461
}
@@ -77,7 +74,7 @@ protected void initChannel(Channel channel) throws Exception {
7774
});
7875

7976
ReactorTcpServer.create(transport)
80-
.start(connection -> {
77+
.startAndAwait(connection -> {
8178
connection.log("input")
8279
.observeComplete(v -> LOG.info("Connection input complete"))
8380
.capacity(1)

ripc-reactor-examples/src/main/java/io/ripc/reactor/protocol/tcp/ReactorTcpServerSample.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ public static void main(String... args) throws InterruptedException {
2626
*/
2727
private static void echo(TcpServer<ByteBuf, ByteBuf> transport) {
2828
ReactorTcpServer.create(transport)
29-
.start(connection -> {
29+
.startAndAwait(connection -> {
3030
connection.flatMap(inByteBuf -> {
3131
String text = "Hello " + inByteBuf.toString(Charset.defaultCharset());
3232
ByteBuf outByteBuf = Unpooled.buffer().writeBytes(text.getBytes());
@@ -41,7 +41,7 @@ private static void echo(TcpServer<ByteBuf, ByteBuf> transport) {
4141
*/
4242
private static void echoWithQuitCommand(TcpServer<ByteBuf, ByteBuf> transport) {
4343
ReactorTcpServer.create(transport)
44-
.start(connection -> {
44+
.startAndAwait(connection -> {
4545
Promise<Void> promise = Promises.prepare();
4646
connection.flatMap(inByteBuf -> {
4747
String input = inByteBuf.toString(Charset.defaultCharset()).trim();

ripc-reactor/src/main/java/io/ripc/reactor/protocol/tcp/ReactorTcpServer.java

+15
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,17 @@ public class ReactorTcpServer<R, W> {
2424

2525
public ReactorTcpServer<R, W> start(final ReactorTcpHandler<R, W> handler) {
2626

27+
transport.start(new TcpHandler<R, W>() {
28+
@Override
29+
public Publisher<Void> handle(TcpConnection<R, W> connection) {
30+
return handler.apply(new ReactorTcpConnection<>(connection));
31+
}
32+
});
33+
return this;
34+
}
35+
36+
public ReactorTcpServer<R, W> startAndAwait(final ReactorTcpHandler<R, W> handler) {
37+
2738
transport.startAndAwait(new TcpHandler<R, W>() {
2839
@Override
2940
public Publisher<Void> handle(TcpConnection<R, W> connection) {
@@ -39,6 +50,10 @@ public boolean shutdown() {
3950
return b;
4051
}
4152

53+
public int getPort() {
54+
return transport.getPort();
55+
}
56+
4257
public static <R, W> ReactorTcpServer<R, W> create(TcpServer<R, W> transport) {
4358
return new ReactorTcpServer<>(transport);
4459
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Copyright 2002-2015 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.ripc.reactor;
18+
19+
import java.io.IOException;
20+
21+
import io.netty.buffer.ByteBuf;
22+
import io.netty.buffer.Unpooled;
23+
import io.ripc.reactor.protocol.tcp.ReactorTcpServer;
24+
import io.ripc.test.SocketTestUtils;
25+
import io.ripc.transport.netty4.tcp.Netty4TcpServer;
26+
import org.junit.After;
27+
import static org.junit.Assert.assertEquals;
28+
import org.junit.Before;
29+
import org.junit.Test;
30+
import reactor.rx.Promise;
31+
import reactor.rx.Promises;
32+
import reactor.rx.Streams;
33+
34+
public class ReactorTcpServerTests {
35+
36+
private ReactorTcpServer<ByteBuf, ByteBuf> reactorServer;
37+
38+
@Before
39+
public void setup() {
40+
reactorServer = ReactorTcpServer.create(Netty4TcpServer.<ByteBuf, ByteBuf>create(0));
41+
}
42+
43+
@After
44+
public void tearDown() {
45+
reactorServer.shutdown();
46+
}
47+
48+
@Test
49+
public void writeSingleValue() throws IOException {
50+
reactorServer.start(connection -> connection.writeWith(Streams.just(Unpooled.buffer().writeBytes("test".getBytes()))));
51+
assertEquals("test", SocketTestUtils.read("localhost", reactorServer.getPort()));
52+
}
53+
54+
@Test
55+
public void writeMultipleValues() throws IOException {
56+
Promise<ByteBuf> chunk1 = Promises.success(Unpooled.buffer().writeBytes("This is".getBytes()));
57+
Promise<ByteBuf> chunk2 = Promises.success(Unpooled.buffer().writeBytes(" a test!".getBytes()));
58+
reactorServer.start(connection -> connection.writeWith(Streams.concat(chunk1, chunk2)));
59+
assertEquals("This is a test!", SocketTestUtils.read("localhost", reactorServer.getPort()));
60+
}
61+
62+
}

ripc-rxjava1/src/main/java/io/ripc/rx/protocol/tcp/RxTcpServer.java

+4
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,10 @@ public void awaitShutdown() {
3939
transport.awaitShutdown();
4040
}
4141

42+
public int getPort() {
43+
return transport.getPort();
44+
}
45+
4246
public static <R, W> RxTcpServer<R, W> create(TcpServer<R, W> transport) {
4347
return new RxTcpServer<>(transport);
4448
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
package io.ripc.rx;/*
2+
* Copyright 2002-2015 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
import java.io.IOException;
18+
19+
import io.netty.buffer.ByteBuf;
20+
import io.netty.buffer.Unpooled;
21+
import io.ripc.rx.protocol.tcp.RxTcpServer;
22+
import io.ripc.test.SocketTestUtils;
23+
import io.ripc.transport.netty4.tcp.Netty4TcpServer;
24+
import org.junit.After;
25+
import static org.junit.Assert.assertEquals;
26+
import org.junit.Before;
27+
import org.junit.Test;
28+
import rx.Observable;
29+
30+
public class RxTcpServerTests {
31+
32+
private RxTcpServer<ByteBuf, ByteBuf> rxServer;
33+
34+
@Before
35+
public void setup() {
36+
rxServer = RxTcpServer.create(Netty4TcpServer.<ByteBuf, ByteBuf>create(0));
37+
}
38+
39+
@After
40+
public void tearDown() {
41+
rxServer.shutdown();
42+
}
43+
44+
@Test
45+
public void writeSingleValue() throws IOException {
46+
rxServer.start(connection -> connection.write(Observable.just(Unpooled.buffer().writeBytes("test".getBytes()))));
47+
assertEquals("test", SocketTestUtils.read("localhost", rxServer.getPort()));
48+
}
49+
50+
@Test
51+
public void writeMultipleValues() throws IOException {
52+
Observable<ByteBuf> chunk1 = Observable.just(Unpooled.buffer().writeBytes("This is".getBytes()));
53+
Observable<ByteBuf> chunk2 = Observable.just(Unpooled.buffer().writeBytes(" a test!".getBytes()));
54+
rxServer.start(connection -> connection.write(Observable.merge(chunk1, chunk2)));
55+
assertEquals("This is a test!", SocketTestUtils.read("localhost", rxServer.getPort()));
56+
}
57+
58+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Copyright 2002-2015 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.ripc.test;
18+
19+
import java.io.DataOutputStream;
20+
import java.io.IOException;
21+
import java.io.InputStreamReader;
22+
import java.net.Socket;
23+
24+
public class SocketTestUtils {
25+
26+
public static String read(String host, int port) {
27+
return read(host, port, null);
28+
}
29+
30+
public static String read(String host, int port, String dataToSend) {
31+
try {
32+
Socket socket = new Socket(host, port);
33+
InputStreamReader reader = new InputStreamReader(socket.getInputStream());
34+
if (dataToSend != null) {
35+
DataOutputStream outputStream = new DataOutputStream(socket.getOutputStream());
36+
outputStream.writeBytes(dataToSend);
37+
}
38+
StringBuilder content = new StringBuilder();
39+
int c = reader.read();
40+
while (c != -1) {
41+
content.append((char)c);
42+
c = reader.read();
43+
}
44+
reader.close();
45+
return content.toString();
46+
}
47+
catch (IOException e) {
48+
throw new IllegalStateException(e);
49+
}
50+
}
51+
52+
}

ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/Netty4TcpServer.java

+8-2
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ public class Netty4TcpServer<R, W> extends TcpServer<R, W> {
1818

1919
private static final Logger logger = LoggerFactory.getLogger(Netty4TcpServer.class);
2020

21-
private final int port;
21+
private int port;
2222
private final ChannelInitializer<Channel> initializer;
2323
private ServerBootstrap bootstrap;
2424
private ChannelFuture bindFuture;
@@ -54,7 +54,8 @@ protected void initChannel(Channel ch) throws Exception {
5454
}
5555
SocketAddress localAddress = bindFuture.channel().localAddress();
5656
if (localAddress instanceof InetSocketAddress) {
57-
logger.info("Started server at port: " + ((InetSocketAddress) localAddress).getPort());
57+
port = ((InetSocketAddress) localAddress).getPort();
58+
logger.info("Started server at port: " + port);
5859
}
5960

6061
} catch (InterruptedException e) {
@@ -85,6 +86,11 @@ public boolean doShutdown() {
8586
}
8687
}
8788

89+
@Override
90+
public int getPort() {
91+
return port;
92+
}
93+
8894
public static <R, W> TcpServer<R, W> create(int port) {
8995
return new Netty4TcpServer<>(port);
9096
}

0 commit comments

Comments
 (0)