1
1
package io .ripc .transport .netty4 .tcp ;
2
2
3
3
4
+ import static rx .RxReactiveStreams .*;
5
+
6
+ import java .util .concurrent .atomic .AtomicLong ;
7
+
4
8
import io .netty .buffer .ByteBuf ;
5
9
import io .netty .buffer .Unpooled ;
6
10
import io .ripc .internal .Publishers ;
11
+ import io .ripc .protocol .tcp .Connection ;
7
12
import io .ripc .protocol .tcp .TcpInterceptor ;
13
+ import org .reactivestreams .Publisher ;
14
+ import org .reactivestreams .Subscriber ;
15
+ import org .reactivestreams .Subscription ;
8
16
import org .slf4j .Logger ;
9
17
import org .slf4j .LoggerFactory ;
10
18
11
- import java .util .concurrent .atomic .AtomicLong ;
12
-
13
- import static java .nio .charset .Charset .defaultCharset ;
14
- import static rx .RxReactiveStreams .*;
15
19
16
20
public class TcpServerSample {
17
21
@@ -37,16 +41,121 @@ public static TcpInterceptor<ByteBuf, ByteBuf, ByteBuf, ByteBuf> shortCircuitAlt
37
41
};
38
42
}
39
43
44
+ public static TcpInterceptor <ByteBuf , ByteBuf , String , String > stringConnectionAdapter () {
45
+ return handler ->
46
+ stringConnection ->
47
+ handler .handle (new ByteBufToStringConnection (stringConnection ));
48
+ }
49
+
40
50
public static void main (String [] args ) {
41
51
Netty4TcpServer .<ByteBuf , ByteBuf >create (0 )
42
- .intercept (log ())
43
- .intercept (shortCircuitAltConnection ())
44
- .start (connection ->
45
- toPublisher (toObservable (connection )
46
- .flatMap (byteBuf -> {
47
- String msg = "Hello " + byteBuf .toString (defaultCharset ());
48
- ByteBuf toWrite = Unpooled .buffer ().writeBytes (msg .getBytes ());
49
- return toObservable (connection .write (Publishers .just (toWrite )));
50
- })));
52
+ .intercept (log ())
53
+ .intercept (stringConnectionAdapter ())
54
+ .start (connection ->
55
+ toPublisher (toObservable (connection )
56
+ .flatMap (data -> toObservable (connection .write (Publishers .just ("Hello " + data )))))
57
+ );
58
+ }
59
+
60
+
61
+ private static class ByteBufToStringConnection implements Connection <ByteBuf , ByteBuf > {
62
+
63
+ private final Connection <String , String > delegate ;
64
+
65
+
66
+ public ByteBufToStringConnection (Connection <String , String > stringConnection ) {
67
+ this .delegate = stringConnection ;
68
+ }
69
+
70
+ @ Override
71
+ public Publisher <Void > write (Publisher <ByteBuf > data ) {
72
+ return delegate .write (new StringToByteBufPublisher (data ));
73
+ }
74
+
75
+ @ Override
76
+ public Publisher <Void > write (Publisher <ByteBuf > data , FlushSelector <ByteBuf > flushSelector ) {
77
+ return null ;
78
+ }
79
+
80
+ @ Override
81
+ public void subscribe (Subscriber <? super ByteBuf > subscriber ) {
82
+ delegate .subscribe (new StringToByteBufSubscriber (subscriber ));
83
+ }
51
84
}
85
+
86
+ private static class StringToByteBufPublisher implements Publisher <String > {
87
+
88
+ private final Publisher <ByteBuf > delegate ;
89
+
90
+
91
+ public StringToByteBufPublisher (Publisher <ByteBuf > byteBufPublisher ) {
92
+ this .delegate = byteBufPublisher ;
93
+ }
94
+
95
+ @ Override
96
+ public void subscribe (Subscriber <? super String > subscriber ) {
97
+ this .delegate .subscribe (new ByteBufToStringSubscriber (subscriber ));
98
+ }
99
+ }
100
+
101
+ private static class ByteBufToStringSubscriber implements Subscriber <ByteBuf > {
102
+
103
+ private final Subscriber <? super String > delegate ;
104
+
105
+
106
+ public ByteBufToStringSubscriber (Subscriber <? super String > subscriber ) {
107
+ this .delegate = subscriber ;
108
+ }
109
+
110
+ @ Override
111
+ public void onSubscribe (Subscription s ) {
112
+ this .delegate .onSubscribe (s );
113
+ }
114
+
115
+ @ Override
116
+ public void onNext (ByteBuf byteBuf ) {
117
+ this .delegate .onNext (byteBuf .toString ());
118
+ }
119
+
120
+ @ Override
121
+ public void onError (Throwable t ) {
122
+ this .delegate .onError (t );
123
+ }
124
+
125
+ @ Override
126
+ public void onComplete () {
127
+ this .delegate .onComplete ();
128
+ }
129
+ }
130
+
131
+ private static class StringToByteBufSubscriber implements Subscriber <String > {
132
+
133
+ private final Subscriber <? super ByteBuf > delegate ;
134
+
135
+
136
+ public StringToByteBufSubscriber (Subscriber <? super ByteBuf > subscriber ) {
137
+ this .delegate = subscriber ;
138
+ }
139
+
140
+ @ Override
141
+ public void onSubscribe (Subscription subscription ) {
142
+ this .delegate .onSubscribe (subscription );
143
+ }
144
+
145
+ @ Override
146
+ public void onNext (String s ) {
147
+ this .delegate .onNext (Unpooled .wrappedBuffer (s .getBytes ()));
148
+ }
149
+
150
+ @ Override
151
+ public void onError (Throwable t ) {
152
+ this .delegate .onError (t );
153
+ }
154
+
155
+ @ Override
156
+ public void onComplete () {
157
+ this .delegate .onComplete ();
158
+ }
159
+ }
160
+
52
161
}
0 commit comments