Skip to content

Commit 64c2c88

Browse files
committed
Add unit tests
Fixes reactive-ipc#34
1 parent 0793f8f commit 64c2c88

File tree

12 files changed

+284
-7
lines changed

12 files changed

+284
-7
lines changed

build.gradle

+13
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@ project('ripc-transport-netty4') {
125125
// ripc-tcp
126126
compile project(":ripc-protocol-tcp"),
127127
"io.netty:netty-all:$nettyVersion"
128+
testCompile project(":ripc-test")
128129
}
129130
}
130131

@@ -135,6 +136,8 @@ project('ripc-rxjava1') {
135136
compile project(":ripc-protocol-tcp"),
136137
"io.reactivex:rxjava:$rxjava1Version",
137138
"io.reactivex:rxjava-reactive-streams:$rxjavaRsVersion"
139+
testCompile project(":ripc-transport-netty4"),
140+
project(":ripc-test")
138141
}
139142
}
140143

@@ -144,6 +147,8 @@ project('ripc-reactor') {
144147
// ripc-tcp
145148
compile project(":ripc-protocol-tcp"),
146149
"io.projectreactor:reactor-stream:$reactorVersion"
150+
testCompile project(":ripc-transport-netty4"),
151+
project(":ripc-test")
147152
}
148153
}
149154

@@ -169,6 +174,14 @@ project('ripc-reactor-examples') {
169174
}
170175
}
171176

177+
project('ripc-test') {
178+
description = 'Reactive IPC Test Components'
179+
dependencies {
180+
// Reactive Streams
181+
compile "org.reactivestreams:reactive-streams:$reactiveStreamsVersion"
182+
}
183+
}
184+
172185
configure(rootProject) {
173186
description = "Reactive IPC"
174187

ripc-core/src/main/java/io/ripc/internal/Publishers.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,16 @@
99
*/
1010
public class Publishers {
1111

12-
public static <T> Publisher<T> just(final T value) {
12+
public static <T> Publisher<T> just(final T... values) {
1313
return new Publisher<T>() {
1414
@Override
1515
public void subscribe(final Subscriber<? super T> s) {
1616
s.onSubscribe(new Subscription() {
1717
@Override
1818
public void request(long n) {
19-
s.onNext(value);
19+
for(T value : values) {
20+
s.onNext(value);
21+
}
2022
s.onComplete();
2123
}
2224

ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/TcpServer.java

+2
Original file line numberDiff line numberDiff line change
@@ -36,4 +36,6 @@ public final boolean shutdown() {
3636

3737
protected abstract TcpServer<R, W> doStart(TcpHandler<R, W> handler);
3838

39+
public abstract int getPort();
40+
3941
}

ripc-reactor-examples/src/main/java/io/ripc/reactor/protocol/tcp/ReactorTcpServerSample.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,11 @@ public static void main(String... args) throws InterruptedException {
1717
TcpServer<ByteBuf, ByteBuf> transport = Netty4TcpServer.<ByteBuf, ByteBuf>create(0);
1818

1919
ReactorTcpServer.create(transport)
20-
.start(connection -> connection.flatMap(bb -> {
20+
.startAndAwait(connection -> connection.flatMap(bb -> {
2121
String msgStr = "Hello " + bb.toString(Charset.defaultCharset()) + "!";
2222
ByteBuf msg = Unpooled.buffer().writeBytes(msgStr.getBytes());
2323
return connection.writeWith(Streams.just(msg));
2424
}));
2525
}
2626

27-
}
27+
}

ripc-reactor/src/main/java/io/ripc/reactor/protocol/tcp/ReactorTcpServer.java

+15
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,17 @@ public class ReactorTcpServer<R, W> {
2424

2525
public ReactorTcpServer<R, W> start(final ReactorTcpHandler<R, W> handler) {
2626

27+
transport.start(new TcpHandler<R, W>() {
28+
@Override
29+
public Publisher<Void> handle(TcpConnection<R, W> connection) {
30+
return handler.apply(new ReactorTcpConnection<>(connection));
31+
}
32+
});
33+
return this;
34+
}
35+
36+
public ReactorTcpServer<R, W> startAndAwait(final ReactorTcpHandler<R, W> handler) {
37+
2738
transport.startAndAwait(new TcpHandler<R, W>() {
2839
@Override
2940
public Publisher<Void> handle(TcpConnection<R, W> connection) {
@@ -39,6 +50,10 @@ public boolean shutdown() {
3950
return b;
4051
}
4152

53+
public int getPort() {
54+
return transport.getPort();
55+
}
56+
4257
public static <R, W> ReactorTcpServer<R, W> create(TcpServer<R, W> transport) {
4358
return new ReactorTcpServer<>(transport);
4459
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Copyright 2002-2015 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.ripc.reactor;
18+
19+
import java.io.IOException;
20+
21+
import io.netty.buffer.ByteBuf;
22+
import io.netty.buffer.Unpooled;
23+
import io.ripc.reactor.protocol.tcp.ReactorTcpServer;
24+
import io.ripc.test.SocketTestUtils;
25+
import io.ripc.transport.netty4.tcp.Netty4TcpServer;
26+
import org.junit.After;
27+
import static org.junit.Assert.assertEquals;
28+
import org.junit.Before;
29+
import org.junit.Test;
30+
import reactor.rx.Promise;
31+
import reactor.rx.Promises;
32+
import reactor.rx.Streams;
33+
34+
public class ReactorTcpServerTests {
35+
36+
private ReactorTcpServer<ByteBuf, ByteBuf> reactorServer;
37+
38+
@Before
39+
public void setup() {
40+
reactorServer = ReactorTcpServer.create(Netty4TcpServer.<ByteBuf, ByteBuf>create(0));
41+
}
42+
43+
@After
44+
public void tearDown() {
45+
reactorServer.shutdown();
46+
}
47+
48+
@Test
49+
public void writeSingleValue() throws IOException {
50+
reactorServer.start(connection -> connection.writeWith(Streams.just(Unpooled.buffer().writeBytes("test".getBytes()))));
51+
assertEquals("test", SocketTestUtils.read("localhost", reactorServer.getPort()));
52+
}
53+
54+
@Test
55+
public void writeMultipleValues() throws IOException {
56+
Promise<ByteBuf> chunk1 = Promises.success(Unpooled.buffer().writeBytes("This is".getBytes()));
57+
Promise<ByteBuf> chunk2 = Promises.success(Unpooled.buffer().writeBytes(" a test!".getBytes()));
58+
reactorServer.start(connection -> connection.writeWith(Streams.concat(chunk1, chunk2)));
59+
assertEquals("This is a test!", SocketTestUtils.read("localhost", reactorServer.getPort()));
60+
}
61+
62+
}

ripc-rxjava1/src/main/java/io/ripc/rx/protocol/tcp/RxTcpServer.java

+4
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,10 @@ public void awaitShutdown() {
3939
transport.awaitShutdown();
4040
}
4141

42+
public int getPort() {
43+
return transport.getPort();
44+
}
45+
4246
public static <R, W> RxTcpServer<R, W> create(TcpServer<R, W> transport) {
4347
return new RxTcpServer<>(transport);
4448
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
package io.ripc.rx;/*
2+
* Copyright 2002-2015 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
import java.io.IOException;
18+
19+
import io.netty.buffer.ByteBuf;
20+
import io.netty.buffer.Unpooled;
21+
import io.ripc.rx.protocol.tcp.RxTcpServer;
22+
import io.ripc.test.SocketTestUtils;
23+
import io.ripc.transport.netty4.tcp.Netty4TcpServer;
24+
import org.junit.After;
25+
import static org.junit.Assert.assertEquals;
26+
import org.junit.Before;
27+
import org.junit.Test;
28+
import rx.Observable;
29+
30+
public class RxTcpServerTests {
31+
32+
private RxTcpServer<ByteBuf, ByteBuf> rxServer;
33+
34+
@Before
35+
public void setup() {
36+
rxServer = RxTcpServer.create(Netty4TcpServer.<ByteBuf, ByteBuf>create(0));
37+
}
38+
39+
@After
40+
public void tearDown() {
41+
rxServer.shutdown();
42+
}
43+
44+
@Test
45+
public void writeSingleValue() throws IOException {
46+
rxServer.start(connection -> connection.write(Observable.just(Unpooled.buffer().writeBytes("test".getBytes()))));
47+
assertEquals("test", SocketTestUtils.read("localhost", rxServer.getPort()));
48+
}
49+
50+
@Test
51+
public void writeMultipleValues() throws IOException {
52+
Observable<ByteBuf> chunk1 = Observable.just(Unpooled.buffer().writeBytes("This is".getBytes()));
53+
Observable<ByteBuf> chunk2 = Observable.just(Unpooled.buffer().writeBytes(" a test!".getBytes()));
54+
rxServer.start(connection -> connection.write(Observable.merge(chunk1, chunk2)));
55+
assertEquals("This is a test!", SocketTestUtils.read("localhost", rxServer.getPort()));
56+
}
57+
58+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Copyright 2002-2015 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.ripc.test;
18+
19+
import java.io.DataOutputStream;
20+
import java.io.IOException;
21+
import java.io.InputStreamReader;
22+
import java.net.Socket;
23+
24+
public class SocketTestUtils {
25+
26+
public static String read(String host, int port) {
27+
return read(host, port, null);
28+
}
29+
30+
public static String read(String host, int port, String dataToSend) {
31+
try {
32+
Socket socket = new Socket(host, port);
33+
InputStreamReader reader = new InputStreamReader(socket.getInputStream());
34+
if (dataToSend != null) {
35+
DataOutputStream outputStream = new DataOutputStream(socket.getOutputStream());
36+
outputStream.writeBytes(dataToSend);
37+
}
38+
StringBuilder content = new StringBuilder();
39+
int c = reader.read();
40+
while (c != -1) {
41+
content.append((char)c);
42+
c = reader.read();
43+
}
44+
reader.close();
45+
return content.toString();
46+
}
47+
catch (IOException e) {
48+
throw new IllegalStateException(e);
49+
}
50+
}
51+
52+
}

ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/Netty4TcpServer.java

+8-2
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ public class Netty4TcpServer<R, W> extends TcpServer<R, W> {
1818

1919
private static final Logger logger = LoggerFactory.getLogger(Netty4TcpServer.class);
2020

21-
private final int port;
21+
private int port;
2222
private ServerBootstrap bootstrap;
2323
private ChannelFuture bindFuture;
2424

@@ -45,7 +45,8 @@ protected void initChannel(Channel ch) throws Exception {
4545
}
4646
SocketAddress localAddress = bindFuture.channel().localAddress();
4747
if (localAddress instanceof InetSocketAddress) {
48-
logger.info("Started server at port: " + ((InetSocketAddress) localAddress).getPort());
48+
port = ((InetSocketAddress) localAddress).getPort();
49+
logger.info("Started server at port: " + port);
4950
}
5051

5152
} catch (InterruptedException e) {
@@ -76,6 +77,11 @@ public boolean doShutdown() {
7677
}
7778
}
7879

80+
@Override
81+
public int getPort() {
82+
return port;
83+
}
84+
7985
public static <R, W> TcpServer<R, W> create(int port) {
8086
return new Netty4TcpServer<>(port);
8187
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Copyright 2002-2015 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.ripc.transport.netty4;
18+
19+
import java.io.IOException;
20+
21+
import io.netty.buffer.ByteBuf;
22+
import io.netty.buffer.Unpooled;
23+
import io.ripc.internal.Publishers;
24+
import io.ripc.protocol.tcp.TcpServer;
25+
import io.ripc.test.SocketTestUtils;
26+
import io.ripc.transport.netty4.tcp.Netty4TcpServer;
27+
import org.junit.After;
28+
import static org.junit.Assert.assertEquals;
29+
import org.junit.Before;
30+
import org.junit.Test;
31+
32+
public class TcpServerTests {
33+
34+
private TcpServer<ByteBuf, ByteBuf> server;
35+
36+
@Before
37+
public void setup() {
38+
server = Netty4TcpServer.<ByteBuf, ByteBuf>create(0);
39+
}
40+
41+
@After
42+
public void tearDown() {
43+
server.shutdown();
44+
}
45+
46+
@Test
47+
public void writeSingleValue() throws IOException {
48+
server.start(connection -> connection.write(Publishers.just(Unpooled.buffer().writeBytes("test".getBytes()))));
49+
assertEquals("test", SocketTestUtils.read("localhost", server.getPort()));
50+
}
51+
52+
@Test
53+
public void writeMultipleValues() throws IOException {
54+
server.start(connection -> {
55+
ByteBuf chunk1 = Unpooled.buffer().writeBytes("This is".getBytes());
56+
ByteBuf chunk2 = Unpooled.buffer().writeBytes(" a test!".getBytes());
57+
return connection.write(Publishers.just(chunk1, chunk2));
58+
});
59+
assertEquals("This is a test!", SocketTestUtils.read("localhost", server.getPort()));
60+
}
61+
62+
}

settings.gradle

+2-1
Original file line numberDiff line numberDiff line change
@@ -7,4 +7,5 @@ include 'ripc-core',
77
'ripc-reactor',
88
'ripc-rxjava1',
99
'ripc-reactor-examples',
10-
'ripc-rxjava1-examples'
10+
'ripc-rxjava1-examples',
11+
'ripc-test'

0 commit comments

Comments
 (0)