Skip to content

Commit 83f1cbb

Browse files
committed
Add WritePublisher
1 parent a7ba2a5 commit 83f1cbb

File tree

3 files changed

+81
-14
lines changed

3 files changed

+81
-14
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
package io.ripc.core;
2+
3+
import org.reactivestreams.Publisher;
4+
import org.reactivestreams.Subscriber;
5+
import org.reactivestreams.Subscription;
6+
7+
/**
8+
* A {@code Publisher} that adapts another for write I/O, for example applying
9+
* requests to flush based on some strategy.
10+
*/
11+
public class WritePublisher<T> implements Publisher<T> {
12+
13+
private final Publisher<T> adaptee;
14+
15+
private int flushCount;
16+
17+
18+
private WritePublisher(Publisher<T> adaptee) {
19+
this.adaptee = adaptee;
20+
}
21+
22+
/**
23+
* Create a WritePublisher by wrapping the actual Publisher.
24+
*/
25+
public static <S> WritePublisher<S> adapt(Publisher<S> publisher) {
26+
return new WritePublisher<>(publisher);
27+
}
28+
29+
/**
30+
* Configure how many items should be written before requesting flush.
31+
*/
32+
public WritePublisher<T> withFlushCount(int count) {
33+
this.flushCount = count;
34+
return this;
35+
}
36+
37+
@Override
38+
public void subscribe(Subscriber<? super T> subscriber) {
39+
40+
if (this.flushCount > 0 && subscriber instanceof Flushable) {
41+
Flushable flushable = (Flushable) subscriber;
42+
43+
this.adaptee.subscribe(new Subscriber<T>() {
44+
45+
private volatile int count;
46+
47+
@Override
48+
public void onSubscribe(Subscription s) {
49+
subscriber.onSubscribe(s);
50+
}
51+
52+
@Override
53+
public void onNext(T t) {
54+
subscriber.onNext(t);
55+
if (++this.count == WritePublisher.this.flushCount) {
56+
flushable.flush();
57+
WritePublisher.this.flushCount = 0;
58+
}
59+
}
60+
61+
@Override
62+
public void onError(Throwable t) {
63+
subscriber.onError(t);
64+
}
65+
66+
@Override
67+
public void onComplete() {
68+
subscriber.onComplete();
69+
}
70+
});
71+
}
72+
else {
73+
this.adaptee.subscribe(subscriber);
74+
}
75+
}
76+
77+
}

ripc-protocol-tcp/src/main/java/io/ripc/protocol/tcp/TcpConnection.java

+2
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,9 @@ public interface TcpConnection<I, O> {
88
Publisher<I> reader();
99

1010
/**
11+
*
1112
* @see io.ripc.core.Flushable
13+
* @see io.ripc.core.WritePublisher
1214
*/
1315
Publisher<Void> writer(Publisher<O> publisher);
1416

ripc-transport-netty4-examples/src/main/java/io/ripc/transport/netty4/tcp/TcpServerSample.java

+2-14
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99

1010
import io.netty.buffer.ByteBuf;
1111
import io.netty.buffer.Unpooled;
12-
import io.ripc.core.Flushable;
12+
import io.ripc.core.WritePublisher;
1313
import io.ripc.protocol.tcp.SimpleTcpInterceptor;
1414
import io.ripc.protocol.tcp.TcpConnection;
1515
import org.reactivestreams.Processor;
@@ -34,7 +34,7 @@ public static void main(String[] args) {
3434
CompletionPublisher completionPublisher = new CompletionPublisher();
3535
EchoProcessor echoProcessor = new EchoProcessor(completionPublisher);
3636
connection.reader().subscribe(echoProcessor);
37-
connection.writer(echoProcessor);
37+
connection.writer(WritePublisher.adapt(echoProcessor).withFlushCount(1));
3838
return completionPublisher;
3939
});
4040

@@ -83,8 +83,6 @@ private static class EchoProcessor implements Processor<ByteBuf, ByteBuf> {
8383

8484
private Subscriber<? super ByteBuf> writeSubscriber;
8585

86-
private Flushable flushable;
87-
8886

8987
private EchoProcessor(CompletionPublisher publisher) {
9088
this.completionPublisher = publisher;
@@ -106,7 +104,6 @@ public void onNext(ByteBuf byteBuf) {
106104
else {
107105
logger.debug("Echoing message: " + message);
108106
this.writeSubscriber.onNext(Unpooled.buffer().writeBytes(("Hello " + message).getBytes(charset)));
109-
this.flushable.flush();
110107
}
111108
}
112109

@@ -121,7 +118,6 @@ public void onComplete() {
121118
@Override
122119
public void subscribe(Subscriber<? super ByteBuf> subscriber) {
123120
this.writeSubscriber = subscriber;
124-
this.flushable = (subscriber instanceof Flushable ? (Flushable) subscriber : new NoOpFlushable());
125121
this.writeSubscriber.onSubscribe(new Subscription() {
126122
@Override
127123
public void request(long n) {
@@ -137,14 +133,6 @@ public void cancel() {
137133
}
138134
}
139135

140-
private static class NoOpFlushable implements Flushable {
141-
142-
@Override
143-
public void flush() {
144-
// no-op
145-
}
146-
}
147-
148136
/**
149137
* Naive implementation
150138
*/

0 commit comments

Comments
 (0)