47
47
import io .r2dbc .spi .R2dbcNonTransientResourceException ;
48
48
import org .reactivestreams .Publisher ;
49
49
import org .reactivestreams .Subscriber ;
50
+ import org .reactivestreams .Subscription ;
51
+ import reactor .core .CoreSubscriber ;
50
52
import reactor .core .Disposable ;
51
53
import reactor .core .publisher .DirectProcessor ;
52
54
import reactor .core .publisher .EmitterProcessor ;
53
55
import reactor .core .publisher .Flux ;
54
56
import reactor .core .publisher .FluxSink ;
55
57
import reactor .core .publisher .Mono ;
56
- import reactor .core .publisher .SynchronousSink ;
57
58
import reactor .netty .Connection ;
58
59
import reactor .netty .resources .ConnectionProvider ;
59
60
import reactor .netty .resources .LoopResources ;
63
64
import reactor .util .Loggers ;
64
65
import reactor .util .annotation .Nullable ;
65
66
import reactor .util .concurrent .Queues ;
67
+ import reactor .util .context .Context ;
66
68
67
69
import javax .net .ssl .SSLException ;
68
70
import java .net .InetSocketAddress ;
73
75
import java .util .Queue ;
74
76
import java .util .StringJoiner ;
75
77
import java .util .concurrent .atomic .AtomicBoolean ;
78
+ import java .util .concurrent .atomic .AtomicLong ;
76
79
import java .util .concurrent .atomic .AtomicReference ;
77
80
import java .util .function .Consumer ;
78
81
import java .util .function .Function ;
@@ -104,12 +107,12 @@ public final class ReactorNettyClient implements Client {
104
107
105
108
private final FluxSink <Publisher <FrontendMessage >> requests = this .requestProcessor .sink ();
106
109
107
- private final Queue <Conversation > conversations = Queues .<Conversation >unbounded ().get ();
108
-
109
110
private final DirectProcessor <NotificationResponse > notificationProcessor = DirectProcessor .create ();
110
111
111
112
private final AtomicBoolean isClosed = new AtomicBoolean (false );
112
113
114
+ private final BackendMessageSubscriber messageSubscriber = new BackendMessageSubscriber ();
115
+
113
116
private volatile Integer processId ;
114
117
115
118
private volatile Integer secretKey ;
@@ -133,31 +136,14 @@ private ReactorNettyClient(Connection connection) {
133
136
this .byteBufAllocator = connection .outbound ().alloc ();
134
137
135
138
AtomicReference <Throwable > receiveError = new AtomicReference <>();
136
- Mono <Void > receive = connection .inbound ().receive ()
139
+
140
+ connection .inbound ().receive ()
137
141
.map (BackendMessageDecoder ::decode )
138
- .handle (this ::handleResponse )
139
142
.doOnError (throwable -> {
140
143
receiveError .set (throwable );
141
144
handleConnectionError (throwable );
142
145
})
143
- .handle ((message , sink ) -> {
144
- Conversation receiver = this .conversations .peek ();
145
- if (receiver != null ) {
146
- if (receiver .takeUntil .test (message )) {
147
- receiver .sink .complete ();
148
- this .conversations .poll ();
149
- } else {
150
-
151
- if (receiver .sink .isCancelled ()) {
152
- ReferenceCountUtil .release (message );
153
- } else {
154
- receiver .sink .next (message );
155
- }
156
- }
157
- }
158
- })
159
- .doOnComplete (this ::handleClose )
160
- .then ();
146
+ .subscribe (this .messageSubscriber );
161
147
162
148
Mono <Void > request = this .requestProcessor
163
149
.concatMap (Function .identity ())
@@ -169,9 +155,6 @@ private ReactorNettyClient(Connection connection) {
169
155
}, 1 )
170
156
.then ();
171
157
172
- receive
173
- .onErrorResume (this ::resumeError )
174
- .subscribe ();
175
158
176
159
request
177
160
.onErrorResume (this ::resumeError )
@@ -219,14 +202,7 @@ public Flux<BackendMessage> exchange(Predicate<BackendMessage> takeUntil, Publis
219
202
sink .error (new PostgresConnectionClosedException ("Cannot exchange messages because the connection is closed" ));
220
203
return ;
221
204
}
222
- synchronized (this ) {
223
- this .conversations .add (new Conversation (sink , takeUntil ));
224
- this .requests .next (Flux .from (requests ).doOnNext (m -> {
225
- if (!isConnected ()) {
226
- sink .error (new PostgresConnectionClosedException ("Cannot exchange messages because the connection is closed" ));
227
- }
228
- }));
229
- }
205
+ this .messageSubscriber .addConversation (requests , sink , takeUntil );
230
206
});
231
207
}
232
208
@@ -255,15 +231,15 @@ private static boolean isSslException(Throwable throwable) {
255
231
return throwable instanceof SSLException || throwable .getCause () instanceof SSLException ;
256
232
}
257
233
258
- private void handleResponse (BackendMessage message , SynchronousSink < BackendMessage > sink ) {
234
+ private boolean handleResponse (BackendMessage message ) {
259
235
260
236
if (DEBUG_ENABLED ) {
261
237
logger .debug ("Response: {}" , message );
262
238
}
263
239
264
240
if (message .getClass () == NoticeResponse .class ) {
265
241
logger .warn ("Notice: {}" , toString (((NoticeResponse ) message ).getFields ()));
266
- return ;
242
+ return false ;
267
243
}
268
244
269
245
if (message .getClass () == BackendKeyData .class ) {
@@ -272,7 +248,7 @@ private void handleResponse(BackendMessage message, SynchronousSink<BackendMessa
272
248
273
249
this .processId = backendKeyData .getProcessId ();
274
250
this .secretKey = backendKeyData .getSecretKey ();
275
- return ;
251
+ return false ;
276
252
}
277
253
278
254
if (message .getClass () == ErrorResponse .class ) {
@@ -289,10 +265,10 @@ private void handleResponse(BackendMessage message, SynchronousSink<BackendMessa
289
265
290
266
if (message .getClass () == NotificationResponse .class ) {
291
267
this .notificationProcessor .onNext ((NotificationResponse ) message );
292
- return ;
268
+ return false ;
293
269
}
294
270
295
- sink . next ( message ) ;
271
+ return true ;
296
272
}
297
273
298
274
private void handleParameterStatus (ParameterStatus message ) {
@@ -465,7 +441,7 @@ private void handleConnectionError(Throwable error) {
465
441
private void drainError (Supplier <? extends Throwable > supplier ) {
466
442
Conversation receiver ;
467
443
468
- while ((receiver = this .conversations .poll ()) != null ) {
444
+ while ((receiver = this .messageSubscriber . conversations .poll ()) != null ) {
469
445
receiver .sink .error (supplier .get ());
470
446
}
471
447
@@ -474,23 +450,6 @@ private void drainError(Supplier<? extends Throwable> supplier) {
474
450
}
475
451
}
476
452
477
- /**
478
- * Value object representing a single conversation. The driver permits a single conversation at a time to ensure that request messages get routed to the proper response receiver and do not leak
479
- * into other conversations. A conversation must be finished in the sense that the {@link Publisher} of {@link FrontendMessage} has completed before the next conversation is started.
480
- * <p>
481
- * A single conversation can make use of pipelining.
482
- */
483
- private static class Conversation {
484
-
485
- private final FluxSink <BackendMessage > sink ;
486
-
487
- private final Predicate <BackendMessage > takeUntil ;
488
-
489
- private Conversation (FluxSink <BackendMessage > sink , Predicate <BackendMessage > takeUntil ) {
490
- this .sink = sink ;
491
- this .takeUntil = takeUntil ;
492
- }
493
- }
494
453
495
454
private final class EnsureSubscribersCompleteChannelHandler extends ChannelDuplexHandler {
496
455
@@ -631,4 +590,133 @@ public Mono<Void> disposeLater() {
631
590
}
632
591
}
633
592
593
+
594
+ /**
595
+ * Value object representing a single conversation. The driver permits a single conversation at a time to ensure that request messages get routed to the proper response receiver and do not leak
596
+ * into other conversations. A conversation must be finished in the sense that the {@link Publisher} of {@link FrontendMessage} has completed before the next conversation is started.
597
+ * <p>
598
+ * A single conversation can make use of pipelining.
599
+ */
600
+ private static class Conversation {
601
+
602
+ private final FluxSink <BackendMessage > sink ;
603
+
604
+ private final Predicate <BackendMessage > takeUntil ;
605
+
606
+ private final AtomicLong receiverCounter = new AtomicLong (0 );
607
+
608
+ private final AtomicLong requested = new AtomicLong ();
609
+
610
+ private Conversation (FluxSink <BackendMessage > sink , Predicate <BackendMessage > takeUntil ) {
611
+ this .sink = sink ;
612
+ this .takeUntil = takeUntil ;
613
+ }
614
+ }
615
+
616
+ private class BackendMessageSubscriber implements CoreSubscriber <BackendMessage > {
617
+
618
+ private final Queue <Conversation > conversations = Queues .<Conversation >unbounded ().get ();
619
+
620
+ private final AtomicLong demand = new AtomicLong (0 );
621
+
622
+ private Subscription subscription ;
623
+
624
+ public synchronized void addConversation (Publisher <FrontendMessage > requests , FluxSink <BackendMessage > sink , Predicate <BackendMessage > takeUntil ) {
625
+ Conversation conversation = new Conversation (sink , takeUntil );
626
+ this .conversations .offer (conversation );
627
+ sink .onRequest (n -> this .onRequest (conversation , n ));
628
+ ReactorNettyClient .this .requests .next (Flux .from (requests ).doOnNext (m -> {
629
+ if (!isConnected ()) {
630
+ sink .error (new PostgresConnectionClosedException ("Cannot exchange messages because the connection is closed" ));
631
+ }
632
+ }));
633
+ }
634
+
635
+ public void onRequest (Conversation conversation , long n ) {
636
+ long newRequested = conversation .requested .addAndGet (n );
637
+ if (newRequested < 0 ) {
638
+ conversation .requested .set (Long .MAX_VALUE );
639
+ }
640
+ this .demandMore ();
641
+ }
642
+
643
+ private void demandMore () {
644
+ if (this .demand .compareAndSet (0 , 256 )) {
645
+ this .subscription .request (256 );
646
+ }
647
+ }
648
+
649
+ @ Override
650
+ public void onSubscribe (Subscription s ) {
651
+ this .subscription = s ;
652
+ this .demandMore ();
653
+ }
654
+
655
+ @ Override
656
+ public void onNext (BackendMessage message ) {
657
+ long currentDemand = this .demand .decrementAndGet ();
658
+ if (!ReactorNettyClient .this .handleResponse (message )) {
659
+ if (currentDemand == 0 ) {
660
+ this .demandMore ();
661
+ }
662
+ return ;
663
+ }
664
+ Conversation conversation = this .conversations .peek ();
665
+ if (conversation == null ) {
666
+ // never gonna happen
667
+ return ;
668
+ }
669
+
670
+ long currentCounter = conversation .receiverCounter .incrementAndGet ();
671
+ if (conversation .takeUntil .test (message )) {
672
+ conversation .sink .complete ();
673
+ this .conversations .poll ();
674
+ } else {
675
+ if (!conversation .sink .isCancelled ()) {
676
+ conversation .sink .next (message );
677
+ } else {
678
+ ReferenceCountUtil .release (message );
679
+ }
680
+ }
681
+ if (currentDemand == 0 ) {
682
+ if (conversation .requested .get () == Long .MAX_VALUE || conversation .sink .isCancelled ()) {
683
+ this .demandMore ();
684
+ } else {
685
+ long more = conversation .requested .get () - currentCounter ;
686
+ if (more > 0 ) {
687
+ this .demandMore ();
688
+ }
689
+ }
690
+ }
691
+ }
692
+
693
+ @ Override
694
+ public void onError (Throwable throwable ) {
695
+ handleConnectionError (throwable );
696
+ ReactorNettyClient .this .requestProcessor .onComplete ();
697
+
698
+ if (isSslException (throwable )) {
699
+ logger .debug ("Connection Error" , throwable );
700
+ } else {
701
+ logger .error ("Connection Error" , throwable );
702
+ }
703
+
704
+ ReactorNettyClient .this .close ().subscribe ();
705
+ }
706
+
707
+ @ Override
708
+ public void onComplete () {
709
+ ReactorNettyClient .this .handleClose ();
710
+ }
711
+
712
+ @ Override
713
+ public Context currentContext () {
714
+ Conversation receiver = this .conversations .peek ();
715
+ if (receiver != null ) {
716
+ return receiver .sink .currentContext ();
717
+ } else {
718
+ return Context .empty ();
719
+ }
720
+ }
721
+ }
634
722
}
0 commit comments