Skip to content

Commit 89bc5f1

Browse files
author
Nitesh Kant
committed
Proposal for TCP server at all layers (core, transport & rxjava1)
Based on various discussions around different issues, I am proposing a fresh design for a TCP server in this PR. I have implemented a TCP server at all layers in reactive-ipc, i.e. core, transport(netty) and rxjava1. I have also removed all code which isn't related to the implementation. We can discuss those utilities in isolation as and when required to be added. Although, not used, I have kept the Buffer and related classes as that aspect is not yet discussed to conclusion. There are two samples each for netty transport and rxjava1 layer available to see how will a typical interaction look like in modules ripc-transport-netty4-examples and ripc-rxjava1-examples Backpressure has not been implemented yet.
1 parent 80533dc commit 89bc5f1

File tree

30 files changed

+826
-764
lines changed

30 files changed

+826
-764
lines changed

build.gradle

+40-10
Original file line numberDiff line numberDiff line change
@@ -48,11 +48,6 @@ allprojects {
4848
"-Xlint:-rawtypes" // TODO enable and fix warnings
4949
]
5050

51-
compileJava {
52-
sourceCompatibility = 1.7
53-
targetCompatibility = 1.7
54-
}
55-
5651
compileTestJava {
5752
sourceCompatibility = 1.8
5853
targetCompatibility = 1.8
@@ -87,9 +82,29 @@ project('ripc-protocol-tcp') {
8782
dependencies {
8883
// ripc-core
8984
compile project(":ripc-core")
85+
compile "org.slf4j:slf4j-api:1.7.6"
86+
testCompile 'io.reactivex:rxjava:1.0.8'
87+
testCompile 'io.reactivex:rxjava-reactive-streams:0.3.0'
9088
}
9189
}
9290

91+
project('ripc-transport-netty4-examples') {
92+
description = 'Reactive IPC TCP Component examples'
93+
94+
compileTestJava {
95+
sourceCompatibility = 1.8
96+
targetCompatibility = 1.8
97+
}
98+
99+
dependencies {
100+
// ripc-core
101+
compile project(":ripc-transport-netty4")
102+
compile 'io.reactivex:rxjava:1.0.8'
103+
compile 'io.reactivex:rxjava-reactive-streams:0.3.0'
104+
compile 'org.slf4j:slf4j-simple:1.7.6'
105+
}
106+
}
107+
93108
project('ripc-transport-netty4') {
94109
description = 'Reactive IPC Netty 4.x Transport Implementation'
95110
dependencies {
@@ -98,17 +113,32 @@ project('ripc-transport-netty4') {
98113

99114
// Netty
100115
compile "io.netty:netty-all:$nettyVersion"
116+
compile "org.slf4j:slf4j-api:1.7.6"
101117
}
102118
}
103119

104-
project('ripc-composition-rxjava1') {
120+
project('ripc-rxjava1') {
105121
description = 'Reactive IPC Composition Layer Implementation'
106122
dependencies {
107123
// ripc-tcp
108-
compile project(":ripc-transport-netty4")
124+
compile project(":ripc-protocol-tcp")
125+
compile 'io.reactivex:rxjava:1.0.8'
126+
compile 'io.reactivex:rxjava-reactive-streams:0.3.0'
127+
}
128+
}
109129

110-
// RxJava 1.0
111-
compile "io.reactivex:rxjava:$rxjava1Version"
130+
project('ripc-rxjava1-examples') {
131+
description = 'Reactive IPC Composition Layer examples'
132+
133+
compileTestJava {
134+
sourceCompatibility = 1.8
135+
targetCompatibility = 1.8
136+
}
137+
138+
dependencies {
139+
// ripc-tcp
140+
compile project(":ripc-rxjava1")
141+
compile project(":ripc-transport-netty4-examples")
112142
}
113143
}
114144

@@ -125,7 +155,7 @@ configure(rootProject) {
125155
it.tasks.getByName("jar")
126156
}
127157
}
128-
options.memberLevel = org.gradle.external.javadoc.JavadocMemberLevel.PROTECTED
158+
options.memberLevel = JavadocMemberLevel.PROTECTED
129159
options.author = true
130160
options.header = rootProject.description
131161
//options.overview = "src/api/overview.html"

ripc-core/src/main/java/io/ripc/core/NamedDaemonThreadFactory.java

-26
This file was deleted.

ripc-core/src/main/java/io/ripc/core/Specification.java

-21
This file was deleted.

ripc-core/src/main/java/io/ripc/core/package-info.java

-3
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package io.ripc.internal;
2+
3+
import org.reactivestreams.Publisher;
4+
import org.reactivestreams.Subscriber;
5+
import org.reactivestreams.Subscription;
6+
7+
/**
8+
* Temporary utility class for creating and transforming {@link Publisher}s.
9+
*/
10+
public class Publishers {
11+
12+
public static <T> Publisher<T> just(final T value) {
13+
return new Publisher<T>() {
14+
@Override
15+
public void subscribe(final Subscriber<? super T> s) {
16+
s.onSubscribe(new Subscription() {
17+
@Override
18+
public void request(long n) {
19+
s.onNext(value);
20+
s.onComplete();
21+
}
22+
23+
@Override
24+
public void cancel() {
25+
}
26+
});
27+
}
28+
};
29+
}
30+
31+
public static <T> Publisher<T> error(final Throwable t) {
32+
return new Publisher<T>() {
33+
@Override
34+
public void subscribe(final Subscriber<? super T> s) {
35+
s.onSubscribe(new Subscription() {
36+
@Override
37+
public void request(long n) {
38+
s.onError(t);
39+
}
40+
41+
@Override
42+
public void cancel() {
43+
}
44+
});
45+
}
46+
};
47+
}
48+
}

ripc-core/src/main/java/io/ripc/core/io/Buffer.java renamed to ripc-core/src/main/java/io/ripc/io/Buffer.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package io.ripc.core.io;
1+
package io.ripc.io;
22

33
import java.nio.ByteBuffer;
44
import java.nio.charset.CharsetDecoder;

ripc-core/src/test/java/io/ripc/core/package-info.java

-3
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,53 @@
11
package io.ripc.protocol.tcp;
22

3-
import io.ripc.core.io.Buffer;
43
import org.reactivestreams.Publisher;
54

65
/**
7-
* Created by jbrisbin on 3/10/15.
6+
* An abstraction for a TCP connection.
7+
*
8+
* @param <R> The type of objects read from this connection.
9+
* @param <W> The type of objects written to this connection.
810
*/
9-
public interface Connection<B> extends Publisher<Buffer<B>> {
11+
public interface Connection<R, W> extends Publisher<R> {
1012

11-
void write(Publisher<Buffer<B>> data);
13+
/**
14+
* Writes the passed stream of {@code data} and returns the result as a {@link Publisher}. All items emitted by
15+
* this stream are flushed on completion of the stream.
16+
*
17+
* @param data Data stream to write.
18+
*
19+
* @return Result of write.
20+
*/
21+
Publisher<Void> write(Publisher<W> data);
1222

23+
/**
24+
* Writes the passed stream of {@code data} and returns the result as a {@link Publisher}. All written items are
25+
* flushed whenever the passed {@code flushSelector} returns {@code true}
26+
*
27+
* @param data Data stream to write.
28+
* @param flushSelector Selector that is invoked after every emitted item is written. If this selector returns
29+
* {@code true} then all items written till now are flushed.
30+
*
31+
* @return Result of write.
32+
*/
33+
Publisher<Void> write(Publisher<W> data, FlushSelector<W> flushSelector);
34+
35+
/**
36+
* A function that is used for determining when a flush has to be invoked on the underlying channel.
37+
*
38+
* @param <W> Type of items emitted by the stream using this selector.
39+
*/
40+
interface FlushSelector<W> {
41+
42+
/**
43+
* Selects whether flush should be invoked on the channel.
44+
*
45+
* @param count The index of this item. Starts with 1.
46+
* @param lastWrittenItem Item which was last written before calling this selector.
47+
*
48+
* @return {@code true} if flush should be invoked.
49+
*/
50+
boolean select(long count, W lastWrittenItem);
51+
52+
}
1353
}

ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/ConnectionPublisher.java

-9
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
package io.ripc.protocol.tcp;
2+
3+
import org.reactivestreams.Publisher;
4+
5+
public interface TcpHandler<R, W> {
6+
7+
Publisher<Void> handle(Connection<R, W> connection);
8+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package io.ripc.protocol.tcp;
2+
3+
public interface TcpInterceptor<I, O, II, OO> {
4+
5+
TcpHandler<II, OO> intercept(TcpHandler<I, O> handler);
6+
7+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
package io.ripc.protocol.tcp;
2+
3+
import io.ripc.internal.Publishers;
4+
import org.reactivestreams.Publisher;
5+
6+
import java.util.concurrent.atomic.AtomicBoolean;
7+
8+
public abstract class TcpServer<R, W> {
9+
10+
@SuppressWarnings("rawtypes")
11+
private LazyTcpHandler lazyHandler;
12+
protected final TcpHandler<R, W> thisHandler;
13+
protected final AtomicBoolean started;
14+
15+
protected TcpServer() {
16+
lazyHandler = new LazyTcpHandler<>();
17+
thisHandler = null;
18+
this.started = new AtomicBoolean();
19+
}
20+
21+
protected TcpServer(TcpHandler<R, W> handler) {
22+
thisHandler = handler;
23+
this.started = new AtomicBoolean();
24+
}
25+
26+
public <RR, WW> TcpServer<RR, WW> intercept(TcpInterceptor<R, W, RR, WW> interceptor) {
27+
if (null == thisHandler) {
28+
@SuppressWarnings("unchecked")
29+
TcpServer<RR, WW> toReturn = newServer(interceptor.intercept(lazyHandler));
30+
toReturn.lazyHandler = lazyHandler;
31+
return toReturn;
32+
} else {
33+
TcpServer<RR, WW> toReturn = newServer(interceptor.intercept(thisHandler));
34+
toReturn.lazyHandler = lazyHandler;
35+
return toReturn;
36+
}
37+
}
38+
39+
public final TcpServer<R, W> start(TcpHandler<R, W> handler) {
40+
if (!started.compareAndSet(false, true)) {
41+
throw new IllegalStateException("Server already started");
42+
}
43+
44+
if (null == thisHandler) {
45+
doStart(handler);
46+
} else {
47+
lazyHandler.start(handler);
48+
doStart(thisHandler);
49+
}
50+
return this;
51+
}
52+
53+
public final void startAndAwait(TcpHandler<R, W> handler) {
54+
start(handler);
55+
awaitShutdown();
56+
}
57+
58+
public final boolean shutdown() {
59+
return !started.compareAndSet(true, false) || doShutdown();
60+
}
61+
62+
public abstract void awaitShutdown();
63+
64+
public abstract boolean doShutdown();
65+
66+
protected abstract <RR, WW> TcpServer<RR, WW> newServer(TcpHandler<RR, WW> handler);
67+
68+
protected abstract TcpServer<R, W> doStart(TcpHandler<R, W> handler);
69+
70+
private static class LazyTcpHandler<R, W> implements TcpHandler<R, W> {
71+
72+
private TcpHandler<R, W> delegate;
73+
74+
@Override
75+
public Publisher<Void> handle(Connection<R, W> connection) {
76+
if (null == delegate) {
77+
return Publishers.error(new IllegalStateException("Handler not initialized."));
78+
} else {
79+
return delegate.handle(connection);
80+
}
81+
}
82+
83+
@SuppressWarnings("unchecked")
84+
private void start(TcpHandler handler) {
85+
delegate = handler;
86+
}
87+
}
88+
}

0 commit comments

Comments
 (0)