Skip to content

Commit ece7303

Browse files
committed
Update javadoc to document what components do and what their intended role is.
1 parent 225218d commit ece7303

File tree

15 files changed

+204
-88
lines changed

15 files changed

+204
-88
lines changed
Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,19 @@
11
package io.ripc.core;
22

33
/**
4-
* Created by jbrisbin on 3/26/15.
4+
* A {@code DemandCalculator} implementation is responsible for calculating what the demand value should be to send to
5+
* {@link org.reactivestreams.Subscription#request(long)}.
56
*/
67
public interface DemandCalculator {
78

9+
/**
10+
* Calculate demand based on current pending backlog. A value {@code < 1} means "don't make any new requests" since
11+
* values less than {@code 1} are illegal according to the Reactive Streams spec. A value of {@code >= 1} means "use
12+
* this value as the demand".
13+
*
14+
* @param pending outstanding backlog of previous demand accumulations
15+
* @return &lt 1 to indicate no requests should be performed, &gt= 1 to indicate positive demand
16+
*/
817
long calculateDemand(long pending);
918

1019
}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
package io.ripc.core;
2+
3+
/**
4+
* An {@code EventListener} is a generic component that has no default behavior. Components are meant to use specific
5+
* subclasses of this interface to intercept events and perform appropriate behavior.
6+
*/
7+
public interface EventListener {
8+
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package io.ripc.core;
2+
3+
/**
4+
* A {@code Handler} takes an arbitrary object as input and "handles" it.
5+
*/
6+
public interface Handler<T> {
7+
/**
8+
* Handle the given object (do something useful with it).
9+
* @param obj
10+
*/
11+
void handle(T obj);
12+
}

ripc-core/src/main/java/io/ripc/core/SingletonPublisher.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@
77
import java.util.concurrent.atomic.AtomicBoolean;
88

99
/**
10-
* Created by jbrisbin on 3/26/15.
10+
* A {@code SingletonPublisher} provides a single value only once and then calls {@code onComplete}. If the value is
11+
* {@code null}, then the {@link org.reactivestreams.Subscriber#onNext(Object)} is not called but {@link
12+
* org.reactivestreams.Subscriber#onComplete()} is.
1113
*/
1214
public class SingletonPublisher<T> implements Publisher<T>, DemandCalculator {
1315

ripc-core/src/main/java/io/ripc/core/Specification.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
import org.reactivestreams.Subscriber;
44

55
/**
6-
* Created by jbrisbin on 3/10/15.
6+
* Helper class to encapsulate various checks required by the Reactive Streams Specification.
77
*/
88
public abstract class Specification {
99

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,37 @@
11
package io.ripc.protocol.tcp.connection;
22

3+
import io.ripc.core.EventListener;
34
import org.reactivestreams.Publisher;
45

56
/**
67
* A {@code Connection} provides a reader for inbound data and a writer for outbound.
78
*/
89
public interface TcpConnection {
910

11+
/**
12+
* Receive the {@link org.reactivestreams.Publisher} which will be producing inbound data. This is intended to be
13+
* composed into a more sophisticated processing pipeline by a higher layer of composition helpers.
14+
*
15+
* @return {@code this}
16+
*/
1017
Publisher<?> reader();
1118

19+
/**
20+
* Set the write {@link org.reactivestreams.Publisher} that will be used to send outbound data to the peer. Can only
21+
* be invoked once.
22+
*
23+
* @param writer the {@link org.reactivestreams.Publisher} which will produce outbound data
24+
* @return {@code this}
25+
*/
1226
TcpConnection writer(Publisher<?> writer);
1327

14-
TcpConnection addListener(TcpConnectionEventListener listener);
28+
/**
29+
* Add a type of {@link io.ripc.core.EventListener} to the connection which will respond to the events for which that
30+
* specific listener type is responsible. What's available varies by protocol and server implementation.
31+
*
32+
* @param listener
33+
* @return {@code this}
34+
*/
35+
TcpConnection addListener(EventListener listener);
1536

1637
}

ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/connection/TcpConnectionEventListener.java

Lines changed: 0 additions & 7 deletions
This file was deleted.

ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/connection/TcpConnectionHandler.java

Lines changed: 0 additions & 12 deletions
This file was deleted.
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package io.ripc.protocol.tcp.connection.listener;
2+
3+
import io.ripc.core.EventListener;
4+
import io.ripc.protocol.tcp.connection.TcpConnection;
5+
6+
/**
7+
* An {@code EventListener} implementation that is used to receive notifications that an IO channel has completed the
8+
* read of all available data.
9+
*/
10+
public interface ReadCompleteListener extends EventListener {
11+
12+
/**
13+
* Invoked when all data has been read from the given {@link io.ripc.protocol.tcp.connection.TcpConnection}.
14+
* Returning
15+
* {@code true} here means "I'm finished with the connection you may close it now" and returning {@code false} means
16+
* "leave the connection open".
17+
*
18+
* @param connection the connection on which reads are complete
19+
* @return {@code true} to close the connection, {@code false} to leave it open
20+
*/
21+
boolean readComplete(TcpConnection connection);
22+
23+
}
Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,23 @@
11
package io.ripc.protocol.tcp.connection.listener;
22

3+
import io.ripc.core.EventListener;
34
import io.ripc.protocol.tcp.connection.TcpConnection;
4-
import io.ripc.protocol.tcp.connection.TcpConnectionEventListener;
55

66
/**
7-
* Created by jbrisbin on 3/26/15.
7+
* An {@code EventListener} implementation that is used to receive notifications that writes have been successfully
8+
* completed on the underlying IO channel.
89
*/
9-
public interface WriteCompleteListener extends TcpConnectionEventListener {
10+
public interface WriteCompleteListener extends EventListener {
1011

12+
/**
13+
* Invoked every time a write to the IO channel is complete. Returning {@code false} here means "don't do any
14+
* flushing" and returning {@code true} means "flush the underlying IO channel".
15+
*
16+
* @param connection the connection on which the write was made
17+
* @param count the number of items that have been written since the last flush
18+
* @param msg the last value written
19+
* @return {@code true} to perform a flush of the channel, {@code false} otherwise
20+
*/
1121
boolean writeComplete(TcpConnection connection, long count, Object msg);
1222

1323
}

ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/ChannelInboundHandlerSubscription.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package io.ripc.transport.netty4.tcp;
22

3-
import io.netty.buffer.ByteBuf;
43
import io.netty.channel.Channel;
54
import io.netty.channel.ChannelHandlerContext;
65
import io.netty.channel.ChannelInboundHandlerAdapter;
@@ -11,7 +10,8 @@
1110
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
1211

1312
/**
14-
* Created by jbrisbin on 3/10/15.
13+
* A {@code ChannelInboundHandlerAdapter} that is responsible for propagating data from the IO channel to the read
14+
* {@link org.reactivestreams.Subscriber}.
1515
*/
1616
public class ChannelInboundHandlerSubscription extends ChannelInboundHandlerAdapter implements Subscription {
1717

@@ -60,11 +60,9 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
6060
return;
6161
}
6262

63-
ByteBuf buf = (ByteBuf) msg;
6463
try {
65-
subscriber.onNext(buf);
64+
subscriber.onNext(msg);
6665
PEND_UPD.decrementAndGet(this);
67-
//channel.read();
6866
} catch (Throwable t) {
6967
subscriber.onError(t);
7068
}
Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,28 +6,39 @@
66
import io.netty.channel.ChannelHandlerContext;
77
import io.netty.channel.ChannelPromise;
88
import io.ripc.protocol.tcp.connection.TcpConnection;
9+
import io.ripc.protocol.tcp.connection.listener.ReadCompleteListener;
910
import io.ripc.protocol.tcp.connection.listener.WriteCompleteListener;
1011

1112
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
1213

1314
/**
14-
* Created by jbrisbin on 3/26/15.
15+
* Listens for events on the Netty IO channel and invokes the appropriate {@link io.ripc.core.EventListener} for that
16+
* type of event.
1517
*/
16-
public class TcpConnectionEventListenerChannelHandler extends ChannelDuplexHandler {
18+
public class EventListenerChannelHandler extends ChannelDuplexHandler {
1719

18-
private static final AtomicLongFieldUpdater<TcpConnectionEventListenerChannelHandler> WRITTEN_UPD
19-
= AtomicLongFieldUpdater.newUpdater(TcpConnectionEventListenerChannelHandler.class, "writtenSinceLastFlush");
20+
private static final AtomicLongFieldUpdater<EventListenerChannelHandler> WRITTEN_UPD
21+
= AtomicLongFieldUpdater.newUpdater(EventListenerChannelHandler.class, "writtenSinceLastFlush");
2022

2123
private final TcpConnection connection;
2224

2325
private volatile long writtenSinceLastFlush = 0L;
2426

27+
private ReadCompleteListener readCompleteListener;
2528
private WriteCompleteListener writeCompleteListener;
2629

27-
public TcpConnectionEventListenerChannelHandler(TcpConnection connection) {
30+
public EventListenerChannelHandler(TcpConnection connection) {
2831
this.connection = connection;
2932
}
3033

34+
public ReadCompleteListener getReadCompleteListener() {
35+
return readCompleteListener;
36+
}
37+
38+
public void setReadCompleteListener(ReadCompleteListener readCompleteListener) {
39+
this.readCompleteListener = readCompleteListener;
40+
}
41+
3142
public WriteCompleteListener getWriteCompleteListener() {
3243
return writeCompleteListener;
3344
}
@@ -58,4 +69,12 @@ public void operationComplete(ChannelFuture future) throws Exception {
5869
WRITTEN_UPD.incrementAndGet(this);
5970
}
6071

72+
@Override
73+
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
74+
super.channelReadComplete(ctx);
75+
if (null != readCompleteListener && readCompleteListener.readComplete(connection)) {
76+
ctx.close();
77+
}
78+
}
79+
6180
}

ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/server/NettyTcpServer.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,8 @@
88
import io.netty.channel.socket.SocketChannel;
99
import io.netty.channel.socket.nio.NioServerSocketChannel;
1010
import io.netty.handler.logging.LoggingHandler;
11-
import io.ripc.protocol.tcp.connection.TcpConnectionHandler;
11+
import io.ripc.core.Handler;
12+
import io.ripc.protocol.tcp.connection.TcpConnection;
1213
import io.ripc.transport.netty4.NamedDaemonThreadFactory;
1314

1415
/**
@@ -22,7 +23,7 @@ public NettyTcpServer(ServerBootstrap bootstrap) {
2223
this.bootstrap = bootstrap;
2324
}
2425

25-
public static NettyTcpServer listen(int port, TcpConnectionHandler handler) {
26+
public static NettyTcpServer listen(int port, Handler<TcpConnection> handler) {
2627
ServerBootstrap b = new ServerBootstrap();
2728

2829
int threads = Runtime.getRuntime().availableProcessors();

ripc-transport-netty4/src/main/java/io/ripc/transport/netty4/tcp/server/NettyTcpServerConnection.java

Lines changed: 32 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,19 +2,20 @@
22

33
import io.netty.channel.Channel;
44
import io.ripc.core.DemandCalculator;
5+
import io.ripc.core.EventListener;
56
import io.ripc.protocol.tcp.connection.TcpConnection;
6-
import io.ripc.protocol.tcp.connection.TcpConnectionEventListener;
7+
import io.ripc.protocol.tcp.connection.listener.ReadCompleteListener;
78
import io.ripc.protocol.tcp.connection.listener.WriteCompleteListener;
89
import io.ripc.transport.netty4.tcp.ChannelInboundHandlerSubscription;
9-
import io.ripc.transport.netty4.tcp.TcpConnectionEventListenerChannelHandler;
10+
import io.ripc.transport.netty4.tcp.EventListenerChannelHandler;
1011
import org.reactivestreams.Publisher;
1112
import org.reactivestreams.Subscriber;
1213
import org.reactivestreams.Subscription;
1314

1415
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
1516

1617
/**
17-
* Represents a Netty Channel.
18+
* Represents a Reactive IPC {@code TcpConnection}.
1819
*/
1920
public class NettyTcpServerConnection implements TcpConnection {
2021

@@ -24,24 +25,34 @@ public class NettyTcpServerConnection implements TcpConnection {
2425
private final Channel channel;
2526
private final ReadPublisher readPublisher;
2627

28+
private WriteSubscriber writeSubscriber;
29+
2730
public NettyTcpServerConnection(Channel channel) {
2831
this.channel = channel;
2932
this.readPublisher = new ReadPublisher(channel);
3033
}
3134

3235
@Override
33-
public TcpConnection addListener(TcpConnectionEventListener listener) {
34-
TcpConnectionEventListenerChannelHandler handler =
35-
channel.pipeline().get(TcpConnectionEventListenerChannelHandler.class);
36+
public TcpConnection addListener(EventListener listener) {
37+
EventListenerChannelHandler handler =
38+
channel.pipeline().get(EventListenerChannelHandler.class);
3639
if (null == handler) {
37-
handler = new TcpConnectionEventListenerChannelHandler(this);
40+
handler = new EventListenerChannelHandler(this);
3841
channel.pipeline().addLast(handler);
3942
}
4043

41-
if (WriteCompleteListener.class.isAssignableFrom(listener.getClass())) {
44+
Class<?> listenerType = listener.getClass();
45+
46+
// Assign WriteCompleteListener that will be notified of successful writes.
47+
if (WriteCompleteListener.class.isAssignableFrom(listenerType)) {
4248
handler.setWriteCompleteListener((WriteCompleteListener) listener);
4349
}
4450

51+
// Assign a ReadCompleteListener that will be notified when all data has been read.
52+
if (ReadCompleteListener.class.isAssignableFrom(listenerType)) {
53+
handler.setReadCompleteListener((ReadCompleteListener) listener);
54+
}
55+
4556
return this;
4657
}
4758

@@ -52,10 +63,15 @@ public Publisher<?> reader() {
5263

5364
@Override
5465
public TcpConnection writer(Publisher<?> writer) {
66+
if (null != writeSubscriber) {
67+
throw new IllegalStateException("A writer has already been set on this connection");
68+
}
69+
5570
DemandCalculator demandCalculator = DemandCalculator.class.isAssignableFrom(writer.getClass())
5671
? (DemandCalculator) writer
5772
: null;
58-
writer.subscribe(new WriteSubscriber(demandCalculator));
73+
this.writeSubscriber = new WriteSubscriber(demandCalculator);
74+
writer.subscribe(writeSubscriber);
5975
return this;
6076
}
6177

@@ -99,7 +115,7 @@ public void run() {
99115
toRequest = 1L;
100116
}
101117

102-
if (toRequest == Long.MAX_VALUE) {
118+
if (toRequest == Long.MAX_VALUE && pending != Long.MAX_VALUE) {
103119
PENDING_UPD.set(WriteSubscriber.this, Long.MAX_VALUE);
104120
subscription.request(Long.MAX_VALUE);
105121
} else if (toRequest > 0) {
@@ -118,13 +134,17 @@ public void onSubscribe(Subscription subscription) {
118134
}
119135

120136
this.subscription = subscription;
137+
// Request for any writes right away.
121138
subscriptionRequest.run();
122139
}
123140

124141
@Override
125142
public void onNext(Object msg) {
143+
// Write to the channel for every onNext signal.
126144
channel.write(msg);
145+
// Decrement pending counter to show we've handled this write.
127146
PENDING_UPD.decrementAndGet(this);
147+
// We'll get a StackOverflowError if we don't schedule this request.
128148
channel.eventLoop().execute(subscriptionRequest);
129149
}
130150

@@ -135,7 +155,9 @@ public void onError(Throwable t) {
135155

136156
@Override
137157
public void onComplete() {
158+
// Write completes always result in a flush.
138159
channel.flush();
160+
// This is a terminal state, so close the underlying channel.
139161
channel.close();
140162
}
141163
}

0 commit comments

Comments
 (0)