Skip to content

Commit 9747535

Browse files
committed
Revamp code to support dynamic backpressure and to reflect discussions around changing the role of the TcpConnection.
1 parent bce6191 commit 9747535

22 files changed

+409
-618
lines changed

build.gradle

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,13 @@ ext {
66

77
// Logging
88
slf4jVersion = '1.7.10'
9-
logbackVersion = '1.1.2'
9+
logbackVersion = '1.1.3'
1010

1111
// Network Transports
1212
nettyVersion = '4.0.26.Final'
1313

1414
// Composition Libraries
15-
rxjava1Version = '1.0.7'
15+
rxjava1Version = '1.0.8'
1616

1717
// Testing
1818
mockitoVersion = '1.10.19'
@@ -69,6 +69,7 @@ subprojects { subproject ->
6969
// Testing
7070
testCompile "junit:junit:$junitVersion",
7171
"org.hamcrest:hamcrest-library:1.3",
72+
"org.mockito:mockito-core:$mockitoVersion",
7273
"org.slf4j:slf4j-api:$slf4jVersion"
7374
testRuntime "ch.qos.logback:logback-classic:$logbackVersion"
7475
}

ripc-core/src/main/java/io/ripc/core/Consumer.java

Lines changed: 0 additions & 11 deletions
This file was deleted.
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package io.ripc.core;
2+
3+
/**
4+
* Created by jbrisbin on 3/26/15.
5+
*/
6+
public interface DemandCalculator {
7+
8+
long calculateDemand(long pending);
9+
10+
}

ripc-core/src/main/java/io/ripc/core/Function.java

Lines changed: 0 additions & 9 deletions
This file was deleted.
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package io.ripc.core;
2+
3+
import org.reactivestreams.Publisher;
4+
5+
/**
6+
* Created by jbrisbin on 3/26/15.
7+
*/
8+
public final class Publishers {
9+
10+
private Publishers() {
11+
}
12+
13+
public static <T> Publisher<?> just(final T obj) {
14+
return new SingletonPublisher<>(obj);
15+
}
16+
17+
}
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package io.ripc.core;
2+
3+
import org.reactivestreams.Publisher;
4+
import org.reactivestreams.Subscriber;
5+
import org.reactivestreams.Subscription;
6+
7+
import java.util.concurrent.atomic.AtomicBoolean;
8+
9+
/**
10+
* Created by jbrisbin on 3/26/15.
11+
*/
12+
public class SingletonPublisher<T> implements Publisher<T>, DemandCalculator {
13+
14+
private final AtomicBoolean requested = new AtomicBoolean(false);
15+
16+
private final T value;
17+
18+
public SingletonPublisher(T value) {
19+
this.value = value;
20+
}
21+
22+
@Override
23+
public long calculateDemand(long pending) {
24+
return (requested.get() ? -1 : 1);
25+
}
26+
27+
@Override
28+
public void subscribe(final Subscriber<? super T> subscriber) {
29+
subscriber.onSubscribe(new Subscription() {
30+
@Override
31+
public void request(long n) {
32+
if (!Specification.spec_3_9_verifyPositiveDemand(n, subscriber)) {
33+
return;
34+
}
35+
if (requested.compareAndSet(false, true)) {
36+
if (null != value) {
37+
subscriber.onNext(value);
38+
}
39+
subscriber.onComplete();
40+
}
41+
}
42+
43+
@Override
44+
public void cancel() {
45+
requested.set(true);
46+
}
47+
});
48+
}
49+
50+
}

ripc-core/src/main/java/io/ripc/core/Supplier.java

Lines changed: 0 additions & 9 deletions
This file was deleted.

ripc-core/src/main/java/io/ripc/core/io/Buffer.java

Lines changed: 0 additions & 129 deletions
This file was deleted.

ripc-core/src/main/java/io/ripc/core/io/BufferSupplier.java

Lines changed: 0 additions & 9 deletions
This file was deleted.
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package io.ripc.protocol.tcp;
2+
3+
/**
4+
* Created by jbrisbin on 3/26/15.
5+
*/
6+
public abstract class AbstractTcpConnectionEventHandler implements TcpConnectionEventHandler, CompleteEventHandler {
7+
8+
@Override
9+
public void onOpen(TcpConnection connection) {
10+
// NO-OP
11+
}
12+
13+
@Override
14+
public void onClose(TcpConnection connection) {
15+
// NO-OP
16+
}
17+
18+
@Override
19+
public void onAbort(TcpConnection connection) {
20+
// NO-OP
21+
}
22+
23+
@Override
24+
public void onError(TcpConnection connection, Throwable cause) {
25+
// NO-OP
26+
}
27+
28+
@Override
29+
public void onReadable(TcpConnection connection) {
30+
// NO-OP
31+
}
32+
33+
@Override
34+
public void onWritable(TcpConnection connection) {
35+
// NO-OP
36+
}
37+
38+
@Override
39+
public boolean onReadComplete(TcpConnection connection) {
40+
return true;
41+
}
42+
43+
@Override
44+
public boolean onWriteComplete(TcpConnection connection, Object msg) {
45+
return false;
46+
}
47+
48+
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package io.ripc.protocol.tcp;
2+
3+
/**
4+
* Created by jbrisbin on 3/26/15.
5+
*/
6+
public interface CompleteEventHandler {
7+
8+
boolean onReadComplete(TcpConnection connection);
9+
10+
boolean onWriteComplete(TcpConnection connection, Object msg);
11+
12+
}

ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/Connection.java

Lines changed: 0 additions & 19 deletions
This file was deleted.
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package io.ripc.protocol.tcp;
2+
3+
import org.reactivestreams.Publisher;
4+
5+
/**
6+
* A {@code Connection} provides a reader for inbound data and a writer for outbound.
7+
*/
8+
public interface TcpConnection {
9+
10+
TcpConnection eventHandler(TcpConnectionEventHandler eventHandler);
11+
12+
Publisher<?> reader();
13+
14+
TcpConnection writer(Publisher<?> sink);
15+
16+
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package io.ripc.protocol.tcp;
2+
3+
/**
4+
* Created by jbrisbin on 3/26/15.
5+
*/
6+
public interface TcpConnectionEventHandler {
7+
8+
void onOpen(TcpConnection connection);
9+
10+
void onClose(TcpConnection connection);
11+
12+
void onAbort(TcpConnection connection);
13+
14+
void onError(TcpConnection connection, Throwable cause);
15+
16+
void onReadable(TcpConnection connection);
17+
18+
void onWritable(TcpConnection connection);
19+
20+
}

0 commit comments

Comments
 (0)