Skip to content

Commit bce6191

Browse files
committed
Attempt to make it more functional by not forcing the ConnectionHandler to be a singleton (it **can** be, it just doesn't have to be). Introduced some functional interfaces to leverage like Consumer and Supplier but extended those into specific subinterfaces that have strongly-typed parameters.
1 parent 2be35ef commit bce6191

File tree

7 files changed

+62
-32
lines changed

7 files changed

+62
-32
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package io.ripc.core;
2+
3+
/**
4+
* Simple functional interface for accepting objects via callback.
5+
*/
6+
@FunctionalInterface
7+
public interface Consumer<T> {
8+
9+
void accept(T obj);
10+
11+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package io.ripc.core;
2+
3+
/**
4+
* Simple functional interface for applying transformations to objects.
5+
*/
6+
@FunctionalInterface
7+
public interface Function<T, V> {
8+
V apply(T obj);
9+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package io.ripc.core;
2+
3+
/**
4+
* A simple functional interface to provide instances of a given object.
5+
*/
6+
@FunctionalInterface
7+
public interface Supplier<T> {
8+
T get();
9+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package io.ripc.core.io;
2+
3+
import io.ripc.core.Supplier;
4+
5+
/**
6+
* Created by jbrisbin on 3/18/15.
7+
*/
8+
public interface BufferSupplier<B> extends Supplier<Buffer<B>> {
9+
}
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,12 @@
11
package io.ripc.protocol.tcp;
22

3+
import io.ripc.core.Consumer;
4+
import io.ripc.core.Supplier;
5+
36
/**
4-
* A {@code ConnectionHandler} is responsible for composing a Reactive Stream pipeline(s) when a new connection is
5-
* received by the server.
7+
* A {@code ConnectionHandler} is responsible for composing a Reactive Streams pipeline(s) when a new connection is
8+
* received by the server. Implementations will compose an appropriate pipeline based on capabilities and server
9+
* configuration.
610
*/
7-
public interface ConnectionHandler<B> {
8-
9-
/**
10-
* Implementations will compose an appropriate pipeline based on capabilities and server configuration.
11-
*
12-
* @param connection connection around which to compose the pipeline
13-
*/
14-
void handle(Connection<B> connection);
15-
11+
public interface ConnectionHandler<B> extends Supplier<Consumer<Connection<B>>> {
1612
}

ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/server/NettyTcpServer.java

+16-19
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,12 @@
1515
/**
1616
* Created by jbrisbin on 3/10/15.
1717
*/
18-
public class NettyTcpServer extends ChannelInitializer<SocketChannel> {
18+
public class NettyTcpServer {
1919

20-
private final ServerBootstrap bootstrap;
21-
private final ConnectionHandler<ByteBuf> handler;
20+
private final ServerBootstrap bootstrap;
2221

23-
public NettyTcpServer(ServerBootstrap bootstrap,
24-
ConnectionHandler<ByteBuf> handler) {
22+
public NettyTcpServer(ServerBootstrap bootstrap) {
2523
this.bootstrap = bootstrap;
26-
this.handler = handler;
2724
}
2825

2926
public static NettyTcpServer listen(int port, ConnectionHandler<ByteBuf> handler) {
@@ -36,8 +33,19 @@ public static NettyTcpServer listen(int port, ConnectionHandler<ByteBuf> handler
3633

3734
b.channel(NioServerSocketChannel.class);
3835

39-
NettyTcpServer server = new NettyTcpServer(b, handler);
40-
b.childHandler(server);
36+
NettyTcpServer server = new NettyTcpServer(b);
37+
b.childHandler(new ChannelInitializer<SocketChannel>() {
38+
@Override
39+
protected void initChannel(SocketChannel ch) throws Exception {
40+
ch.config().setAutoRead(false);
41+
ch.config().setAllocator(PooledByteBufAllocator.DEFAULT);
42+
43+
ch.pipeline().addLast(new LoggingHandler());
44+
45+
NettyTcpServerConnection conn = new NettyTcpServerConnection(ch);
46+
handler.get().accept(conn);
47+
}
48+
});
4149

4250
b.bind(port);
4351

@@ -48,15 +56,4 @@ public void shutdown() {
4856
bootstrap.group().shutdownGracefully();
4957
}
5058

51-
@Override
52-
protected void initChannel(SocketChannel ch) throws Exception {
53-
ch.config().setAutoRead(false);
54-
ch.config().setAllocator(PooledByteBufAllocator.DEFAULT);
55-
56-
ch.pipeline().addLast(new LoggingHandler());
57-
58-
NettyTcpServerConnection conn = new NettyTcpServerConnection(ch);
59-
handler.handle(conn);
60-
}
61-
6259
}

ripc-transport-netty4/src/test/java/io/ripc/transport/netty4/tcp/server/NettyTcpServerIntegrationTests.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ public class NettyTcpServerIntegrationTests {
2525
public void canStartNettyTcpServer() throws InterruptedException {
2626
CountDownLatch latch = new CountDownLatch(1);
2727

28-
NettyTcpServer server = NettyTcpServer.listen(3000, conn -> {
28+
NettyTcpServer server = NettyTcpServer.listen(3000, () -> conn -> {
2929
conn.subscribe(new Subscriber<Buffer<ByteBuf>>() {
3030
private Subscription subscription;
3131

@@ -78,7 +78,6 @@ public void cancel() {
7878
});
7979
}
8080
});
81-
8281
});
8382

8483
while (!latch.await(1, TimeUnit.SECONDS)) {

0 commit comments

Comments
 (0)