Skip to content

Commit a7ba2a5

Browse files
committed
TCP POC
- read/write back-pressure - interception as a chain of handlers - netty transport parameterized by ByteBuf (for now) - echo sample
1 parent 2b3f0dc commit a7ba2a5

File tree

22 files changed

+842
-35
lines changed

22 files changed

+842
-35
lines changed

build.gradle

+58-13
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@ ext {
1111
// Network Transports
1212
nettyVersion = '4.0.26.Final'
1313

14+
// Composition Libraries
15+
rxjava1Version = '1.0.7'
16+
1417
// Testing
1518
mockitoVersion = '1.10.19'
1619
junitVersion = '4.12'
@@ -44,11 +47,6 @@ allprojects {
4447
"-Xlint:-fallthrough", // intentionally disabled
4548
"-Xlint:-rawtypes" // TODO enable and fix warnings
4649
]
47-
48-
compileJava {
49-
sourceCompatibility = 1.7
50-
targetCompatibility = 1.7
51-
}
5250

5351
compileTestJava {
5452
sourceCompatibility = 1.8
@@ -71,35 +69,82 @@ subprojects { subproject ->
7169
}
7270
}
7371

74-
project('reactive-ipc-core') {
72+
project('ripc-core') {
7573
description = 'Reactive IPC Core Components'
7674
dependencies {
7775
// Reactive Streams
7876
compile "org.reactivestreams:reactive-streams:$reactiveStreamsVersion"
7977
}
8078
}
8179

82-
project('reactive-ipc-tcp') {
80+
project('ripc-protocol-tcp') {
8381
description = 'Reactive IPC TCP Components'
8482
dependencies {
8583
// ripc-core
86-
compile project(":reactive-ipc-core")
84+
compile project(":ripc-core")
85+
compile "org.slf4j:slf4j-api:1.7.6"
86+
testCompile 'io.reactivex:rxjava:1.0.8'
87+
testCompile 'io.reactivex:rxjava-reactive-streams:0.3.0'
8788
}
8889
}
8990

90-
project('reactive-ipc-netty4') {
91+
project('ripc-transport-netty4') {
9192
description = 'Reactive IPC Netty 4.x Transport Implementation'
9293
dependencies {
9394
// ripc-tcp
94-
compile project(":reactive-ipc-tcp")
95+
compile project(":ripc-protocol-tcp")
9596

9697
// Netty
9798
compile "io.netty:netty-all:$nettyVersion"
99+
compile "org.slf4j:slf4j-api:1.7.6"
100+
}
101+
}
102+
103+
project('ripc-transport-netty4-examples') {
104+
description = 'Reactive IPC TCP Component examples'
105+
106+
compileTestJava {
107+
sourceCompatibility = 1.8
108+
targetCompatibility = 1.8
109+
}
110+
111+
dependencies {
112+
// ripc-core
113+
compile project(":ripc-transport-netty4")
114+
compile 'io.reactivex:rxjava:1.0.8'
115+
compile 'io.reactivex:rxjava-reactive-streams:0.3.0'
116+
compile "org.slf4j:slf4j-api:$slf4jVersion"
117+
compile "ch.qos.logback:logback-classic:$logbackVersion"
118+
}
119+
}
120+
121+
project('ripc-rxjava1') {
122+
description = 'Reactive IPC Composition Layer Implementation'
123+
dependencies {
124+
// ripc-tcp
125+
compile project(":ripc-protocol-tcp")
126+
compile 'io.reactivex:rxjava:1.0.8'
127+
compile 'io.reactivex:rxjava-reactive-streams:0.3.0'
128+
}
129+
}
130+
131+
project('ripc-rxjava1-examples') {
132+
description = 'Reactive IPC Composition Layer examples'
133+
134+
compileTestJava {
135+
sourceCompatibility = 1.8
136+
targetCompatibility = 1.8
137+
}
138+
139+
dependencies {
140+
// ripc-tcp
141+
compile project(":ripc-rxjava1")
142+
compile project(":ripc-transport-netty4-examples")
98143
}
99144
}
100145

101146
configure(rootProject) {
102-
description = "Reactive Streams IPC"
147+
description = "Reactive IPC"
103148

104149
task api(type: Javadoc) {
105150
group = "Documentation"
@@ -111,7 +156,7 @@ configure(rootProject) {
111156
it.tasks.getByName("jar")
112157
}
113158
}
114-
options.memberLevel = org.gradle.external.javadoc.JavadocMemberLevel.PROTECTED
159+
options.memberLevel = JavadocMemberLevel.PROTECTED
115160
options.author = true
116161
options.header = rootProject.description
117162
//options.overview = "src/api/overview.html"
@@ -129,4 +174,4 @@ configure(rootProject) {
129174
classpath = files(subprojects.collect { it.sourceSets.main.compileClasspath })
130175
}
131176
}
132-
}
177+
}

reactive-ipc-core/src/main/java/io/ripc/core/package-info.java

-3
This file was deleted.

reactive-ipc-core/src/test/java/io/ripc/core/package-info.java

-3
This file was deleted.

reactive-ipc-netty4/src/main/java/io/ripc/transport/netty4/package-info.java

-3
This file was deleted.

reactive-ipc-netty4/src/test/java/io/ripc/transport/netty4/package-info.java

-3
This file was deleted.

reactive-ipc-tcp/src/main/java/io/ripc/protocol/tcp/package-info.java

-3
This file was deleted.

reactive-ipc-tcp/src/test/java/io/ripc/protocol/tcp/package-info.java

-3
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package io.ripc.core;
2+
3+
4+
/**
5+
* A contract to request flush operations.
6+
*
7+
* <p>Intended to be implemented by a {@link org.reactivestreams.Subscriber}
8+
* performing write I/O and to be used within the context of the
9+
* {@link org.reactivestreams.Publisher} producing items to be written.
10+
* A {@link #flush()} may be invoked in between calls to
11+
* {@link org.reactivestreams.Subscriber#onNext(Object) onNext} to request
12+
* flushing all items that may have been buffered so far.
13+
*/
14+
public interface Flushable {
15+
16+
/**
17+
* Request a flush operation.
18+
*/
19+
public void flush();
20+
21+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package io.ripc.protocol.tcp;
2+
3+
/**
4+
* Base class for TcpInterceptor implementations.
5+
*
6+
* <p>Implementations that do not transform the connection input or output should
7+
* consider extending {@link io.ripc.protocol.tcp.SimpleTcpInterceptor}.
8+
*/
9+
public abstract class AbstractTcpInterceptor<I, O, II, OO> implements TcpInterceptor<I, O, II, OO> {
10+
11+
private TcpHandler<II, OO> nextHandler;
12+
13+
14+
public TcpHandler<II, OO> getNext() {
15+
return this.nextHandler;
16+
}
17+
18+
@Override
19+
public <III, OOO> TcpInterceptor<II, OO, III, OOO> next(TcpInterceptor<II, OO, III, OOO> handler) {
20+
this.nextHandler = handler;
21+
return handler;
22+
}
23+
24+
@Override
25+
public void last(TcpHandler<II, OO> handler) {
26+
this.nextHandler = handler;
27+
}
28+
29+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package io.ripc.protocol.tcp;
2+
3+
import org.slf4j.Logger;
4+
import org.slf4j.LoggerFactory;
5+
6+
public abstract class AbstractTcpServer<I, O> implements TcpServer {
7+
8+
protected final Logger logger = LoggerFactory.getLogger(getClass());
9+
10+
private TcpHandler<I, O> handler;
11+
12+
13+
/**
14+
* Set up an interception chain of handlers vs a single {@link #handler}.
15+
*/
16+
public <II, OO> TcpInterceptor<I, O, II, OO> interceptor(TcpInterceptor<I, O, II, OO> handler) {
17+
this.handler = handler;
18+
return handler;
19+
}
20+
21+
/**
22+
* Set a single handler vs an interception chain of {@link #interceptor}.
23+
*/
24+
public void handler(TcpHandler<I, O> handler) {
25+
this.handler = handler;
26+
}
27+
28+
/**
29+
* Return the configured TcpHandler.
30+
*/
31+
protected TcpHandler<I, O> getHandler() {
32+
return this.handler;
33+
}
34+
35+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package io.ripc.protocol.tcp;
2+
3+
4+
/**
5+
* A {@link TcpInterceptor} that does not change the connection input and output.
6+
*/
7+
public abstract class SimpleTcpInterceptor<I, O> extends AbstractTcpInterceptor<I, O, I, O> {
8+
9+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package io.ripc.protocol.tcp;
2+
3+
import org.reactivestreams.Publisher;
4+
5+
6+
public interface TcpConnection<I, O> {
7+
8+
Publisher<I> reader();
9+
10+
/**
11+
* @see io.ripc.core.Flushable
12+
*/
13+
Publisher<Void> writer(Publisher<O> publisher);
14+
15+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package io.ripc.protocol.tcp;
2+
3+
4+
import org.reactivestreams.Publisher;
5+
6+
public interface TcpHandler<I, O> {
7+
8+
Publisher<Void> handle(TcpConnection<I, O> connection);
9+
10+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package io.ripc.protocol.tcp;
2+
3+
4+
/**
5+
* A {@code TcpHandler} that can be chained to another handler to form an
6+
* interception chain and may transform the connection input and output.
7+
*
8+
* @param <I> Input (this handler)
9+
* @param <O> Output (this handler)
10+
* @param <II> Input (next handler)
11+
* @param <OO> Output (next handler)
12+
*/
13+
public interface TcpInterceptor<I, O, II, OO> extends TcpHandler<I, O> {
14+
15+
/**
16+
* Configure the next interceptor.
17+
*/
18+
<III, OOO> TcpInterceptor<II, OO, III, OOO> next(TcpInterceptor<II, OO, III, OOO> handler);
19+
20+
/**
21+
* Configure the last handler.
22+
*/
23+
void last(TcpHandler<II, OO> handler);
24+
25+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package io.ripc.protocol.tcp;
2+
3+
4+
public interface TcpServer {
5+
6+
void start();
7+
8+
void shutdown();
9+
10+
}

0 commit comments

Comments
 (0)