Skip to content

Commit 2be35ef

Browse files
committed
Based on a discussion today with @rstoyanchev and others, I updated the sketch here to remove the server as a Publisher of connections and provided a simple callback in its place. I also implemented the write functionality in a very naive way. It's a batch publisher, though, in that it won't flush the connection until the onComplete signal is received. This is another discussion (whether flush should be tied to onComplete or not) so this code should be looked at only as a point of discussion.
1 parent 80533dc commit 2be35ef

File tree

8 files changed

+149
-165
lines changed

8 files changed

+149
-165
lines changed

ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/Connection.java

+7-1
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,16 @@
44
import org.reactivestreams.Publisher;
55

66
/**
7-
* Created by jbrisbin on 3/10/15.
7+
* A {@code Connection} is a Reactive Streams {@link org.reactivestreams.Publisher} that provides subscribers with
8+
* inbound data and exposes the {@link #write(org.reactivestreams.Publisher)} method for sending outbound data.
89
*/
910
public interface Connection<B> extends Publisher<Buffer<B>> {
1011

12+
/**
13+
* Send outbound data using the Reactive Streams {@code Publisher} contract.
14+
*
15+
* @param data publisher of outbound data
16+
*/
1117
void write(Publisher<Buffer<B>> data);
1218

1319
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package io.ripc.protocol.tcp;
2+
3+
/**
4+
* A {@code ConnectionHandler} is responsible for composing a Reactive Stream pipeline(s) when a new connection is
5+
* received by the server.
6+
*/
7+
public interface ConnectionHandler<B> {
8+
9+
/**
10+
* Implementations will compose an appropriate pipeline based on capabilities and server configuration.
11+
*
12+
* @param connection connection around which to compose the pipeline
13+
*/
14+
void handle(Connection<B> connection);
15+
16+
}

ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/ConnectionPublisher.java

-9
This file was deleted.

ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/ChannelInboundHandlerSubscription.java

+14-12
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,16 @@
1010
import org.reactivestreams.Subscriber;
1111
import org.reactivestreams.Subscription;
1212

13+
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
14+
1315
/**
1416
* Created by jbrisbin on 3/10/15.
1517
*/
1618
public class ChannelInboundHandlerSubscription extends ChannelInboundHandlerAdapter implements Subscription {
1719

20+
private static final AtomicLongFieldUpdater<ChannelInboundHandlerSubscription> PEND_UPD
21+
= AtomicLongFieldUpdater.newUpdater(ChannelInboundHandlerSubscription.class, "pending");
22+
1823
private final Channel channel;
1924
private final Subscriber<? super Buffer<ByteBuf>> subscriber;
2025

@@ -38,16 +43,16 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E
3843
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
3944
if (pending < 1) {
4045
super.channelInactive(ctx);
41-
return;
4246
}
43-
subscriber.onComplete();
4447
}
4548

4649
@Override
4750
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
4851
if (pending < 1) {
4952
super.channelReadComplete(ctx);
53+
return;
5054
}
55+
subscriber.onComplete();
5156
}
5257

5358
@Override
@@ -60,10 +65,8 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
6065
ByteBuf buf = (ByteBuf) msg;
6166
try {
6267
subscriber.onNext(new ByteBufBuffer(buf, false));
63-
synchronized (this) {
64-
pending--;
65-
}
66-
channel.read();
68+
PEND_UPD.decrementAndGet(this);
69+
//channel.read();
6770
} catch (Throwable t) {
6871
subscriber.onError(t);
6972
}
@@ -75,13 +78,12 @@ public void request(long demand) {
7578
return;
7679
}
7780

78-
synchronized (this) {
79-
if (demand < Long.MAX_VALUE) {
80-
pending += demand;
81-
} else {
82-
pending = Long.MAX_VALUE;
83-
}
81+
if (demand < Long.MAX_VALUE) {
82+
PEND_UPD.addAndGet(this, demand);
83+
} else {
84+
PEND_UPD.set(this, Long.MAX_VALUE);
8485
}
86+
8587
channel.read();
8688
}
8789

ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/ChannelInitializerSubscription.java

-59
This file was deleted.

ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/server/NettyTcpServer.java

+18-27
Original file line numberDiff line numberDiff line change
@@ -8,29 +8,25 @@
88
import io.netty.channel.nio.NioEventLoopGroup;
99
import io.netty.channel.socket.SocketChannel;
1010
import io.netty.channel.socket.nio.NioServerSocketChannel;
11+
import io.netty.handler.logging.LoggingHandler;
1112
import io.ripc.core.NamedDaemonThreadFactory;
12-
import io.ripc.protocol.tcp.Connection;
13-
import io.ripc.protocol.tcp.ConnectionPublisher;
14-
import io.ripc.transport.netty4.tcp.ChannelInitializerSubscription;
15-
import org.reactivestreams.Subscriber;
16-
17-
import java.util.ArrayList;
18-
import java.util.List;
13+
import io.ripc.protocol.tcp.ConnectionHandler;
1914

2015
/**
2116
* Created by jbrisbin on 3/10/15.
2217
*/
23-
public class NettyTcpServer extends ChannelInitializer<SocketChannel> implements ConnectionPublisher<ByteBuf> {
24-
25-
private final List<ChannelInitializerSubscription> subscriptions = new ArrayList<>();
18+
public class NettyTcpServer extends ChannelInitializer<SocketChannel> {
2619

27-
private final ServerBootstrap bootstrap;
20+
private final ServerBootstrap bootstrap;
21+
private final ConnectionHandler<ByteBuf> handler;
2822

29-
public NettyTcpServer(ServerBootstrap bootstrap) {
23+
public NettyTcpServer(ServerBootstrap bootstrap,
24+
ConnectionHandler<ByteBuf> handler) {
3025
this.bootstrap = bootstrap;
26+
this.handler = handler;
3127
}
3228

33-
public static ConnectionPublisher<ByteBuf> listen(int port) {
29+
public static NettyTcpServer listen(int port, ConnectionHandler<ByteBuf> handler) {
3430
ServerBootstrap b = new ServerBootstrap();
3531

3632
int threads = Runtime.getRuntime().availableProcessors();
@@ -40,32 +36,27 @@ public static ConnectionPublisher<ByteBuf> listen(int port) {
4036

4137
b.channel(NioServerSocketChannel.class);
4238

43-
NettyTcpServer server = new NettyTcpServer(b);
39+
NettyTcpServer server = new NettyTcpServer(b, handler);
4440
b.childHandler(server);
4541

4642
b.bind(port);
4743

4844
return server;
4945
}
5046

47+
public void shutdown() {
48+
bootstrap.group().shutdownGracefully();
49+
}
50+
5151
@Override
5252
protected void initChannel(SocketChannel ch) throws Exception {
5353
ch.config().setAutoRead(false);
5454
ch.config().setAllocator(PooledByteBufAllocator.DEFAULT);
5555

56-
synchronized (subscriptions) {
57-
for (ChannelInitializerSubscription sub : subscriptions) {
58-
ch.pipeline().addLast(sub);
59-
}
60-
}
61-
}
56+
ch.pipeline().addLast(new LoggingHandler());
6257

63-
@Override
64-
public void subscribe(Subscriber<? super Connection<ByteBuf>> s) {
65-
ChannelInitializerSubscription sub = new ChannelInitializerSubscription(s);
66-
synchronized (subscriptions) {
67-
subscriptions.add(sub);
68-
}
69-
s.onSubscribe(sub);
58+
NettyTcpServerConnection conn = new NettyTcpServerConnection(ch);
59+
handler.handle(conn);
7060
}
61+
7162
}

ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/server/NettyTcpServerConnection.java

+38-15
Original file line numberDiff line numberDiff line change
@@ -2,24 +2,18 @@
22

33
import io.netty.buffer.ByteBuf;
44
import io.netty.channel.Channel;
5-
import io.netty.handler.logging.LogLevel;
6-
import io.netty.handler.logging.LoggingHandler;
75
import io.ripc.core.io.Buffer;
86
import io.ripc.protocol.tcp.Connection;
97
import io.ripc.transport.netty4.tcp.ChannelInboundHandlerSubscription;
108
import org.reactivestreams.Publisher;
119
import org.reactivestreams.Subscriber;
12-
13-
import java.util.ArrayList;
14-
import java.util.List;
10+
import org.reactivestreams.Subscription;
1511

1612
/**
17-
* Created by jbrisbin on 3/10/15.
13+
* Represents a Netty Channel.
1814
*/
1915
public class NettyTcpServerConnection implements Connection<ByteBuf> {
2016

21-
private final List<Subscriber<? super Buffer<ByteBuf>>> subscribers = new ArrayList<>();
22-
2317
private final Channel channel;
2418

2519
public NettyTcpServerConnection(Channel channel) {
@@ -28,25 +22,54 @@ public NettyTcpServerConnection(Channel channel) {
2822

2923
@Override
3024
public void write(Publisher<Buffer<ByteBuf>> data) {
31-
25+
data.subscribe(new WriteSubscriber());
3226
}
3327

3428
@Override
3529
public void subscribe(Subscriber<? super Buffer<ByteBuf>> s) {
3630
ChannelInboundHandlerSubscription sub = new ChannelInboundHandlerSubscription(channel, s);
37-
synchronized (this) {
38-
subscribers.add(s);
39-
}
4031
s.onSubscribe(sub);
4132
channel.pipeline()
42-
.addLast(new LoggingHandler(LogLevel.DEBUG), sub);
33+
.addLast(sub);
4334
}
4435

4536
@Override
4637
public String toString() {
4738
return "NettyTcpServerConnection{" +
48-
"channel=" + channel +
49-
'}';
39+
"channel=" + channel +
40+
'}';
41+
}
42+
43+
private final class WriteSubscriber implements Subscriber<Buffer<ByteBuf>> {
44+
Subscription subscription;
45+
46+
@Override
47+
public void onSubscribe(Subscription s) {
48+
if (null != subscription) {
49+
s.cancel();
50+
return;
51+
}
52+
(subscription = s).request(1);
53+
}
54+
55+
@Override
56+
public void onNext(Buffer<ByteBuf> buffer) {
57+
channel.write(buffer.get());
58+
// This causes a StackOverflowError
59+
//subscription.request(1);
60+
// This doesn't
61+
channel.eventLoop().execute(() -> subscription.request(1));
62+
}
63+
64+
@Override
65+
public void onError(Throwable t) {
66+
67+
}
68+
69+
@Override
70+
public void onComplete() {
71+
channel.flush();
72+
}
5073
}
5174

5275
}

0 commit comments

Comments
 (0)