Skip to content

Commit 225218d

Browse files
committed
Change event handling to be "listener" based by recognizing specific listener types that are subclasses of a marker interface.
1 parent 9747535 commit 225218d

File tree

12 files changed

+119
-201
lines changed

12 files changed

+119
-201
lines changed

ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/AbstractTcpConnectionEventHandler.java

Lines changed: 0 additions & 48 deletions
This file was deleted.

ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/CompleteEventHandler.java

Lines changed: 0 additions & 12 deletions
This file was deleted.

ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/TcpConnectionEventHandler.java

Lines changed: 0 additions & 20 deletions
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package io.ripc.protocol.tcp;
1+
package io.ripc.protocol.tcp.connection;
22

33
import org.reactivestreams.Publisher;
44

@@ -7,10 +7,10 @@
77
*/
88
public interface TcpConnection {
99

10-
TcpConnection eventHandler(TcpConnectionEventHandler eventHandler);
11-
1210
Publisher<?> reader();
1311

14-
TcpConnection writer(Publisher<?> sink);
12+
TcpConnection writer(Publisher<?> writer);
13+
14+
TcpConnection addListener(TcpConnectionEventListener listener);
1515

1616
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package io.ripc.protocol.tcp.connection;
2+
3+
/**
4+
* Created by jbrisbin on 3/26/15.
5+
*/
6+
public interface TcpConnectionEventListener {
7+
}

ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/TcpConnectionHandler.java renamed to ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/connection/TcpConnectionHandler.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,10 @@
1-
package io.ripc.protocol.tcp;
1+
package io.ripc.protocol.tcp.connection;
22

33
/**
44
* A {@code ConnectionHandler} is responsible for composing a Reactive Streams pipeline(s) when a new connection is
55
* received by the server. Implementations will compose an appropriate pipeline based on capabilities and server
66
* configuration.
77
*/
8-
@FunctionalInterface
98
public interface TcpConnectionHandler {
109

1110
void handle(TcpConnection connection);
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package io.ripc.protocol.tcp.connection.listener;
2+
3+
import io.ripc.protocol.tcp.connection.TcpConnection;
4+
import io.ripc.protocol.tcp.connection.TcpConnectionEventListener;
5+
6+
/**
7+
* Created by jbrisbin on 3/26/15.
8+
*/
9+
public interface WriteCompleteListener extends TcpConnectionEventListener {
10+
11+
boolean writeComplete(TcpConnection connection, long count, Object msg);
12+
13+
}

ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/NettyChannelTcpConnectionEventHandler.java

Lines changed: 0 additions & 85 deletions
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
package io.ripc.transport.netty4.tcp;
2+
3+
import io.netty.channel.ChannelDuplexHandler;
4+
import io.netty.channel.ChannelFuture;
5+
import io.netty.channel.ChannelFutureListener;
6+
import io.netty.channel.ChannelHandlerContext;
7+
import io.netty.channel.ChannelPromise;
8+
import io.ripc.protocol.tcp.connection.TcpConnection;
9+
import io.ripc.protocol.tcp.connection.listener.WriteCompleteListener;
10+
11+
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
12+
13+
/**
14+
* Created by jbrisbin on 3/26/15.
15+
*/
16+
public class TcpConnectionEventListenerChannelHandler extends ChannelDuplexHandler {
17+
18+
private static final AtomicLongFieldUpdater<TcpConnectionEventListenerChannelHandler> WRITTEN_UPD
19+
= AtomicLongFieldUpdater.newUpdater(TcpConnectionEventListenerChannelHandler.class, "writtenSinceLastFlush");
20+
21+
private final TcpConnection connection;
22+
23+
private volatile long writtenSinceLastFlush = 0L;
24+
25+
private WriteCompleteListener writeCompleteListener;
26+
27+
public TcpConnectionEventListenerChannelHandler(TcpConnection connection) {
28+
this.connection = connection;
29+
}
30+
31+
public WriteCompleteListener getWriteCompleteListener() {
32+
return writeCompleteListener;
33+
}
34+
35+
public void setWriteCompleteListener(WriteCompleteListener writeCompleteListener) {
36+
this.writeCompleteListener = writeCompleteListener;
37+
}
38+
39+
@Override
40+
public void flush(ChannelHandlerContext ctx) throws Exception {
41+
super.flush(ctx);
42+
WRITTEN_UPD.set(this, 0L);
43+
}
44+
45+
@Override
46+
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
47+
if (null != writeCompleteListener) {
48+
promise.addListener(new ChannelFutureListener() {
49+
@Override
50+
public void operationComplete(ChannelFuture future) throws Exception {
51+
if (future.isSuccess() && writeCompleteListener.writeComplete(connection, writtenSinceLastFlush, msg)) {
52+
ctx.flush();
53+
}
54+
}
55+
});
56+
}
57+
super.write(ctx, msg, promise);
58+
WRITTEN_UPD.incrementAndGet(this);
59+
}
60+
61+
}

ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/server/NettyTcpServer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
import io.netty.channel.socket.SocketChannel;
99
import io.netty.channel.socket.nio.NioServerSocketChannel;
1010
import io.netty.handler.logging.LoggingHandler;
11-
import io.ripc.protocol.tcp.TcpConnectionHandler;
11+
import io.ripc.protocol.tcp.connection.TcpConnectionHandler;
1212
import io.ripc.transport.netty4.NamedDaemonThreadFactory;
1313

1414
/**

ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/server/NettyTcpServerConnection.java

Lines changed: 20 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,11 @@
22

33
import io.netty.channel.Channel;
44
import io.ripc.core.DemandCalculator;
5-
import io.ripc.protocol.tcp.TcpConnection;
6-
import io.ripc.protocol.tcp.TcpConnectionEventHandler;
5+
import io.ripc.protocol.tcp.connection.TcpConnection;
6+
import io.ripc.protocol.tcp.connection.TcpConnectionEventListener;
7+
import io.ripc.protocol.tcp.connection.listener.WriteCompleteListener;
78
import io.ripc.transport.netty4.tcp.ChannelInboundHandlerSubscription;
8-
import io.ripc.transport.netty4.tcp.NettyChannelTcpConnectionEventHandler;
9+
import io.ripc.transport.netty4.tcp.TcpConnectionEventListenerChannelHandler;
910
import org.reactivestreams.Publisher;
1011
import org.reactivestreams.Subscriber;
1112
import org.reactivestreams.Subscription;
@@ -23,21 +24,24 @@ public class NettyTcpServerConnection implements TcpConnection {
2324
private final Channel channel;
2425
private final ReadPublisher readPublisher;
2526

26-
private TcpConnectionEventHandler eventHandler;
27-
2827
public NettyTcpServerConnection(Channel channel) {
2928
this.channel = channel;
3029
this.readPublisher = new ReadPublisher(channel);
3130
}
3231

3332
@Override
34-
public TcpConnection eventHandler(TcpConnectionEventHandler eventHandler) {
35-
if (null != this.eventHandler) {
36-
throw new IllegalArgumentException(TcpConnectionEventHandler.class.getSimpleName()
37-
+ " already set on this connection");
33+
public TcpConnection addListener(TcpConnectionEventListener listener) {
34+
TcpConnectionEventListenerChannelHandler handler =
35+
channel.pipeline().get(TcpConnectionEventListenerChannelHandler.class);
36+
if (null == handler) {
37+
handler = new TcpConnectionEventListenerChannelHandler(this);
38+
channel.pipeline().addLast(handler);
39+
}
40+
41+
if (WriteCompleteListener.class.isAssignableFrom(listener.getClass())) {
42+
handler.setWriteCompleteListener((WriteCompleteListener) listener);
3843
}
39-
this.eventHandler = eventHandler;
40-
channel.pipeline().addLast(new NettyChannelTcpConnectionEventHandler(eventHandler, this));
44+
4145
return this;
4246
}
4347

@@ -47,11 +51,11 @@ public Publisher<?> reader() {
4751
}
4852

4953
@Override
50-
public TcpConnection writer(Publisher<?> sink) {
51-
DemandCalculator demandCalculator = DemandCalculator.class.isAssignableFrom(sink.getClass())
52-
? (DemandCalculator) sink
54+
public TcpConnection writer(Publisher<?> writer) {
55+
DemandCalculator demandCalculator = DemandCalculator.class.isAssignableFrom(writer.getClass())
56+
? (DemandCalculator) writer
5357
: null;
54-
sink.subscribe(new WriteSubscriber(demandCalculator));
58+
writer.subscribe(new WriteSubscriber(demandCalculator));
5559
return this;
5660
}
5761

@@ -126,9 +130,7 @@ public void onNext(Object msg) {
126130

127131
@Override
128132
public void onError(Throwable t) {
129-
if (null != eventHandler) {
130-
eventHandler.onError(NettyTcpServerConnection.this, t);
131-
}
133+
channel.close();
132134
}
133135

134136
@Override

0 commit comments

Comments
 (0)