29
29
import io .netty .handler .codec .LengthFieldBasedFrameDecoder ;
30
30
import io .netty .handler .logging .LogLevel ;
31
31
import io .netty .handler .logging .LoggingHandler ;
32
+ import io .netty .util .ReferenceCountUtil ;
32
33
import io .netty .util .internal .logging .InternalLogger ;
33
34
import io .netty .util .internal .logging .InternalLoggerFactory ;
34
35
import io .r2dbc .postgresql .message .backend .BackendKeyData ;
45
46
import io .r2dbc .postgresql .util .Assert ;
46
47
import io .r2dbc .spi .R2dbcNonTransientResourceException ;
47
48
import org .reactivestreams .Publisher ;
49
+ import org .reactivestreams .Subscription ;
50
+ import reactor .core .CoreSubscriber ;
48
51
import reactor .core .Disposable ;
49
52
import reactor .core .publisher .DirectProcessor ;
50
53
import reactor .core .publisher .EmitterProcessor ;
61
64
import reactor .util .Loggers ;
62
65
import reactor .util .annotation .Nullable ;
63
66
import reactor .util .concurrent .Queues ;
67
+ import reactor .util .context .Context ;
64
68
65
69
import javax .net .ssl .SSLException ;
66
70
import java .net .InetSocketAddress ;
@@ -108,6 +112,8 @@ public final class ReactorNettyClient implements Client {
108
112
109
113
private final AtomicBoolean isClosed = new AtomicBoolean (false );
110
114
115
+ private final BackendMessageSubscriber messageSubscriber = new BackendMessageSubscriber ();
116
+
111
117
private volatile Integer processId ;
112
118
113
119
private volatile Integer secretKey ;
@@ -131,26 +137,15 @@ private ReactorNettyClient(Connection connection) {
131
137
this .byteBufAllocator = connection .outbound ().alloc ();
132
138
133
139
AtomicReference <Throwable > receiveError = new AtomicReference <>();
134
- Mono <Void > receive = connection .inbound ().receive ()
140
+
141
+ connection .inbound ().receive ()
135
142
.map (BackendMessageDecoder ::decode )
136
143
.handle (this ::handleResponse )
137
144
.doOnError (throwable -> {
138
145
receiveError .set (throwable );
139
146
handleConnectionError (throwable );
140
147
})
141
- .handle ((message , sink ) -> {
142
- Conversation receiver = this .conversations .peek ();
143
- if (receiver != null ) {
144
- if (receiver .takeUntil .test (message )) {
145
- receiver .sink .complete ();
146
- this .conversations .poll ();
147
- } else {
148
- receiver .sink .next (message );
149
- }
150
- }
151
- })
152
- .doOnComplete (this ::handleClose )
153
- .then ();
148
+ .subscribe (this .messageSubscriber );
154
149
155
150
Mono <Void > request = this .requestProcessor
156
151
.concatMap (Function .identity ())
@@ -162,9 +157,6 @@ private ReactorNettyClient(Connection connection) {
162
157
}, 1 )
163
158
.then ();
164
159
165
- receive
166
- .onErrorResume (this ::resumeError )
167
- .subscribe ();
168
160
169
161
request
170
162
.onErrorResume (this ::resumeError )
@@ -213,6 +205,7 @@ public Flux<BackendMessage> exchange(Predicate<BackendMessage> takeUntil, Publis
213
205
return ;
214
206
}
215
207
synchronized (this ) {
208
+ sink .onRequest (this .messageSubscriber ::onRequest );
216
209
this .conversations .add (new Conversation (sink , takeUntil ));
217
210
this .requests .next (Flux .from (requests ).doOnNext (m -> {
218
211
if (!isConnected ()) {
@@ -610,4 +603,65 @@ public Mono<Void> disposeLater() {
610
603
}
611
604
}
612
605
606
+
607
+ private class BackendMessageSubscriber implements CoreSubscriber <BackendMessage > {
608
+
609
+ private Subscription subscription ;
610
+
611
+ public void onRequest (long n ) {
612
+ this .subscription .request (n );
613
+ }
614
+
615
+ @ Override
616
+ public void onSubscribe (Subscription s ) {
617
+ this .subscription = s ;
618
+ s .request (Queues .SMALL_BUFFER_SIZE );
619
+ }
620
+
621
+ @ Override
622
+ public void onNext (BackendMessage message ) {
623
+ Conversation receiver = ReactorNettyClient .this .conversations .peek ();
624
+ if (receiver != null ) {
625
+ if (receiver .takeUntil .test (message )) {
626
+ receiver .sink .complete ();
627
+ ReactorNettyClient .this .conversations .poll ();
628
+ } else {
629
+ if (!receiver .sink .isCancelled ()) {
630
+ receiver .sink .next (message );
631
+ } else {
632
+ ReferenceCountUtil .release (message );
633
+ }
634
+ }
635
+ }
636
+ }
637
+
638
+ @ Override
639
+ public void onError (Throwable throwable ) {
640
+ handleConnectionError (throwable );
641
+ ReactorNettyClient .this .requestProcessor .onComplete ();
642
+
643
+ if (isSslException (throwable )) {
644
+ logger .debug ("Connection Error" , throwable );
645
+ } else {
646
+ logger .error ("Connection Error" , throwable );
647
+ }
648
+
649
+ ReactorNettyClient .this .close ().subscribe ();
650
+ }
651
+
652
+ @ Override
653
+ public void onComplete () {
654
+ ReactorNettyClient .this .handleClose ();
655
+ }
656
+
657
+ @ Override
658
+ public Context currentContext () {
659
+ Conversation receiver = ReactorNettyClient .this .conversations .peek ();
660
+ if (receiver != null ) {
661
+ return receiver .sink .currentContext ();
662
+ } else {
663
+ return Context .empty ();
664
+ }
665
+ }
666
+ }
613
667
}
0 commit comments