Skip to content

Commit 80533dc

Browse files
committed
Rename modules and provide a simple implementation of Reactive Streams with Netty 4.
1 parent 1036183 commit 80533dc

File tree

22 files changed

+897
-13
lines changed

22 files changed

+897
-13
lines changed

build.gradle

+8-8
Original file line numberDiff line numberDiff line change
@@ -74,46 +74,46 @@ subprojects { subproject ->
7474
}
7575
}
7676

77-
project('reactive-ipc-core') {
77+
project('ripc-core') {
7878
description = 'Reactive IPC Core Components'
7979
dependencies {
8080
// Reactive Streams
8181
compile "org.reactivestreams:reactive-streams:$reactiveStreamsVersion"
8282
}
8383
}
8484

85-
project('reactive-ipc-tcp') {
85+
project('ripc-protocol-tcp') {
8686
description = 'Reactive IPC TCP Components'
8787
dependencies {
8888
// ripc-core
89-
compile project(":reactive-ipc-core")
89+
compile project(":ripc-core")
9090
}
9191
}
9292

93-
project('reactive-ipc-netty4') {
93+
project('ripc-transport-netty4') {
9494
description = 'Reactive IPC Netty 4.x Transport Implementation'
9595
dependencies {
9696
// ripc-tcp
97-
compile project(":reactive-ipc-tcp")
97+
compile project(":ripc-protocol-tcp")
9898

9999
// Netty
100100
compile "io.netty:netty-all:$nettyVersion"
101101
}
102102
}
103103

104-
project('reactive-ipc-rxjava1') {
104+
project('ripc-composition-rxjava1') {
105105
description = 'Reactive IPC Composition Layer Implementation'
106106
dependencies {
107107
// ripc-tcp
108-
compile project(":reactive-ipc-netty4")
108+
compile project(":ripc-transport-netty4")
109109

110110
// RxJava 1.0
111111
compile "io.reactivex:rxjava:$rxjava1Version"
112112
}
113113
}
114114

115115
configure(rootProject) {
116-
description = "Reactive Streams IPC"
116+
description = "Reactive IPC"
117117

118118
task api(type: Javadoc) {
119119
group = "Documentation"
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package io.ripc.core;
2+
3+
import java.util.concurrent.ThreadFactory;
4+
import java.util.concurrent.atomic.AtomicLong;
5+
6+
/**
7+
* Created by jbrisbin on 3/10/15.
8+
*/
9+
public class NamedDaemonThreadFactory implements ThreadFactory {
10+
11+
private final AtomicLong counter = new AtomicLong(1);
12+
private final String prefix;
13+
14+
public NamedDaemonThreadFactory(String prefix) {
15+
this.prefix = prefix;
16+
}
17+
18+
@Override
19+
public Thread newThread(Runnable r) {
20+
String name = prefix + "-" + counter.getAndIncrement();
21+
Thread t = new Thread(r, name);
22+
t.setDaemon(true);
23+
return t;
24+
}
25+
26+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package io.ripc.core;
2+
3+
import org.reactivestreams.Subscriber;
4+
5+
/**
6+
* Created by jbrisbin on 3/10/15.
7+
*/
8+
public abstract class Specification {
9+
10+
protected Specification() {
11+
}
12+
13+
public static <T> boolean spec_3_9_verifyPositiveDemand(long demand, Subscriber<T> subscriber) {
14+
if (demand > 0) {
15+
return true;
16+
}
17+
subscriber.onError(new IllegalArgumentException("Spec 3.9: Request signals must be a positive number."));
18+
return false;
19+
}
20+
21+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
package io.ripc.core.io;
2+
3+
import java.nio.ByteBuffer;
4+
import java.nio.charset.CharsetDecoder;
5+
import java.util.List;
6+
7+
/**
8+
* Common abstraction to provide additional functionality beyond a traditional {@link java.nio.ByteBuffer} while not
9+
* restricting the dedicated functionality provided by concrete implementations from various transport libraries that
10+
* might offer features like zero-copy.
11+
* <p>
12+
* A {@code Buffer} can be anything. It is not limited to byte buffers. A {@code Buffer} could represent realized
13+
* objects descended from raw data.
14+
* </p>
15+
*/
16+
public interface Buffer<B> extends Cloneable,
17+
AutoCloseable,
18+
Comparable<Buffer<B>>,
19+
Iterable<Buffer<B>> {
20+
21+
int position();
22+
23+
Buffer<B> position(int pos);
24+
25+
int limit();
26+
27+
Buffer<B> limit(int limit);
28+
29+
int capacity();
30+
31+
Buffer<B> capacity(int capacity);
32+
33+
int remaining();
34+
35+
Buffer<B> skip(int len);
36+
37+
Buffer<B> clear();
38+
39+
Buffer<B> compact();
40+
41+
Buffer<B> flip();
42+
43+
Buffer<B> rewind();
44+
45+
Buffer<B> rewind(int len);
46+
47+
Buffer<B> clone();
48+
49+
Buffer<B> copy();
50+
51+
Buffer<B> slice(int start, int len);
52+
53+
Iterable<Buffer<B>> split(byte delimiter);
54+
55+
Iterable<Buffer<B>> split(byte delimiter, boolean stripDelimiter);
56+
57+
Iterable<Buffer<B>> split(byte delimiter, boolean stripDelimiter, List<Buffer<B>> preallocatedList);
58+
59+
Iterable<Buffer<B>> split(Buffer<B> delimiter);
60+
61+
Iterable<Buffer<B>> split(Buffer<B> delimiter, boolean stripDelimiter);
62+
63+
Iterable<Buffer<B>> split(Buffer<B> delimiter, boolean stripDelimiter, List<Buffer<B>> preallocatedList);
64+
65+
Buffer<B> prepend(B data);
66+
67+
Buffer<B> prepend(Buffer<B> buffer);
68+
69+
Buffer<B> prepend(ByteBuffer buffer);
70+
71+
Buffer<B> prepend(CharSequence chars);
72+
73+
Buffer<B> prepend(byte[] bytes);
74+
75+
Buffer<B> prepend(byte b);
76+
77+
Buffer<B> prepend(char c);
78+
79+
Buffer<B> prepend(short s);
80+
81+
Buffer<B> prepend(int i);
82+
83+
Buffer<B> prepend(long l);
84+
85+
Buffer<B> append(B data);
86+
87+
Buffer<B> append(Buffer<B> buffer);
88+
89+
Buffer<B> append(ByteBuffer buffer);
90+
91+
Buffer<B> append(CharSequence chars);
92+
93+
Buffer<B> append(byte[] bytes);
94+
95+
Buffer<B> append(byte b);
96+
97+
Buffer<B> append(char c);
98+
99+
Buffer<B> append(short s);
100+
101+
Buffer<B> append(int i);
102+
103+
Buffer<B> append(long l);
104+
105+
byte readByte();
106+
107+
void readBytes(byte[] bytes);
108+
109+
short readShort();
110+
111+
int readInt();
112+
113+
float readFloat();
114+
115+
double readDouble();
116+
117+
long readLong();
118+
119+
char readChar();
120+
121+
void readChars(char[] chars);
122+
123+
String readString();
124+
125+
String readString(CharsetDecoder decoder);
126+
127+
B get();
128+
129+
}

reactive-ipc-core/src/test/resources/logback.xml renamed to ripc-core/src/test/resources/logback.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010

1111
<logger name="io.ripc" level="debug"/>
1212

13-
<root level="info">
13+
<root level="debug">
1414
<appender-ref ref="stdout"/>
1515
</root>
1616

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package io.ripc.protocol.tcp;
2+
3+
import io.ripc.core.io.Buffer;
4+
import org.reactivestreams.Publisher;
5+
6+
/**
7+
* Created by jbrisbin on 3/10/15.
8+
*/
9+
public interface Connection<B> extends Publisher<Buffer<B>> {
10+
11+
void write(Publisher<Buffer<B>> data);
12+
13+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package io.ripc.protocol.tcp;
2+
3+
import org.reactivestreams.Publisher;
4+
5+
/**
6+
* Created by jbrisbin on 3/10/15.
7+
*/
8+
public interface ConnectionPublisher<B> extends Publisher<Connection<B>> {
9+
}

0 commit comments

Comments
 (0)