1
1
/*
2
- * Copyright 2017-2019 the original author or authors.
2
+ * Copyright 2017-2020 the original author or authors.
3
3
*
4
4
* Licensed under the Apache License, Version 2.0 (the "License");
5
5
* you may not use this file except in compliance with the License.
29
29
import io .netty .handler .codec .LengthFieldBasedFrameDecoder ;
30
30
import io .netty .handler .logging .LogLevel ;
31
31
import io .netty .handler .logging .LoggingHandler ;
32
- import io .netty .util .ReferenceCountUtil ;
33
32
import io .netty .util .internal .logging .InternalLogger ;
34
33
import io .netty .util .internal .logging .InternalLoggerFactory ;
35
34
import io .r2dbc .postgresql .message .backend .BackendKeyData ;
72
71
import java .util .Queue ;
73
72
import java .util .StringJoiner ;
74
73
import java .util .concurrent .atomic .AtomicBoolean ;
75
- import java .util .concurrent .atomic .AtomicInteger ;
76
74
import java .util .concurrent .atomic .AtomicReference ;
77
75
import java .util .function .Consumer ;
76
+ import java .util .function .Function ;
78
77
import java .util .function .Predicate ;
79
78
import java .util .function .Supplier ;
80
79
@@ -99,11 +98,11 @@ public final class ReactorNettyClient implements Client {
99
98
100
99
private final Connection connection ;
101
100
102
- private final EmitterProcessor <FrontendMessage > requestProcessor = EmitterProcessor .create (false );
101
+ private final EmitterProcessor <Publisher < FrontendMessage > > requestProcessor = EmitterProcessor .create (false );
103
102
104
- private final FluxSink <FrontendMessage > requests = this .requestProcessor .sink ();
103
+ private final FluxSink <Publisher < FrontendMessage > > requests = this .requestProcessor .sink ();
105
104
106
- private final Queue <ResponseReceiver > responseReceivers = Queues .<ResponseReceiver >unbounded ().get ();
105
+ private final Queue <Conversation > conversations = Queues .<Conversation >unbounded ().get ();
107
106
108
107
private final DirectProcessor <NotificationResponse > notificationProcessor = DirectProcessor .create ();
109
108
@@ -140,11 +139,11 @@ private ReactorNettyClient(Connection connection) {
140
139
handleConnectionError (throwable );
141
140
})
142
141
.handle ((message , sink ) -> {
143
- ResponseReceiver receiver = this .responseReceivers .peek ();
142
+ Conversation receiver = this .conversations .peek ();
144
143
if (receiver != null ) {
145
144
if (receiver .takeUntil .test (message )) {
146
145
receiver .sink .complete ();
147
- this .responseReceivers .poll ();
146
+ this .conversations .poll ();
148
147
} else {
149
148
receiver .sink .next (message );
150
149
}
@@ -154,6 +153,7 @@ private ReactorNettyClient(Connection connection) {
154
153
.then ();
155
154
156
155
Mono <Void > request = this .requestProcessor
156
+ .concatMap (Function .identity ())
157
157
.flatMap (message -> {
158
158
if (DEBUG_ENABLED ) {
159
159
logger .debug ("Request: {}" , message );
@@ -173,47 +173,61 @@ private ReactorNettyClient(Connection connection) {
173
173
}
174
174
175
175
@ Override
176
- public Flux <BackendMessage > exchange (Predicate <BackendMessage > takeUntil , Publisher <FrontendMessage > requests ) {
177
- Assert .requireNonNull (requests , "requests must not be null" );
176
+ public Disposable addNotificationListener (Consumer <NotificationResponse > consumer ) {
177
+ return this .notificationProcessor .subscribe (consumer );
178
+ }
178
179
179
- return Flux
180
- .create (sink -> {
180
+ @ Override
181
+ public Mono <Void > close () {
182
+ return Mono .defer (() -> {
181
183
182
- final AtomicInteger once = new AtomicInteger ();
184
+ drainError (EXPECTED );
185
+ if (this .isClosed .compareAndSet (false , true )) {
183
186
184
- Flux .from (requests )
185
- .subscribe (message -> {
187
+ if (!isConnected () || this .processId == null ) {
188
+ this .connection .dispose ();
189
+ return this .connection .onDispose ();
190
+ }
186
191
187
- if (!isConnected ()) {
188
- ReferenceCountUtil .safeRelease (message );
189
- sink .error (new PostgresConnectionClosedException ("Cannot exchange messages because the connection is closed" ));
190
- return ;
191
- }
192
+ return Flux .just (Terminate .INSTANCE )
193
+ .doOnNext (message -> logger .debug ("Request: {}" , message ))
194
+ .concatMap (message -> this .connection .outbound ().send (message .encode (this .connection .outbound ().alloc ())))
195
+ .then ()
196
+ .doOnSuccess (v -> this .connection .dispose ())
197
+ .then (this .connection .onDispose ());
198
+ }
192
199
193
- if (once .get () == 0 && once .compareAndSet (0 , 1 )) {
194
- synchronized (this ) {
195
- this .responseReceivers .add (new ResponseReceiver (sink , takeUntil ));
196
- this .requests .next (message );
197
- }
198
- } else {
199
- this .requests .next (message );
200
- }
200
+ return Mono .empty ();
201
+ });
202
+ }
201
203
202
- }, this .requests ::error , () -> {
204
+ @ Override
205
+ public Flux <BackendMessage > exchange (Predicate <BackendMessage > takeUntil , Publisher <FrontendMessage > requests ) {
206
+ Assert .requireNonNull (takeUntil , "takeUntil must not be null" );
207
+ Assert .requireNonNull (requests , "requests must not be null" );
203
208
209
+ return Flux
210
+ .create (sink -> {
211
+ if (!isConnected ()) {
212
+ sink .error (new PostgresConnectionClosedException ("Cannot exchange messages because the connection is closed" ));
213
+ return ;
214
+ }
215
+ synchronized (this ) {
216
+ this .conversations .add (new Conversation (sink , takeUntil ));
217
+ this .requests .next (Flux .from (requests ).doOnNext (m -> {
204
218
if (!isConnected ()) {
205
219
sink .error (new PostgresConnectionClosedException ("Cannot exchange messages because the connection is closed" ));
206
220
}
207
- });
208
-
221
+ })) ;
222
+ }
209
223
});
210
224
}
211
225
212
226
@ Override
213
227
public void send (FrontendMessage message ) {
214
228
Assert .requireNonNull (message , "requests must not be null" );
215
229
216
- this .requests .next (message );
230
+ this .requests .next (Mono . just ( message ) );
217
231
}
218
232
219
233
private Mono <Void > resumeError (Throwable throwable ) {
@@ -370,38 +384,6 @@ private static Mono<? extends Void> registerSslHandler(SSLConfig sslConfig, Conn
370
384
return Mono .empty ();
371
385
}
372
386
373
- @ Override
374
- public Mono <Void > close () {
375
- return Mono .defer (() -> {
376
-
377
- drainError (EXPECTED );
378
- if (this .isClosed .compareAndSet (false , true )) {
379
-
380
- if (!isConnected () || this .processId == null ) {
381
- this .connection .dispose ();
382
- return this .connection .onDispose ();
383
- }
384
-
385
- return Flux .just (Terminate .INSTANCE )
386
- .doOnNext (message -> logger .debug ("Request: {}" , message ))
387
- .concatMap (message -> this .connection .outbound ().send (message .encode (this .connection .outbound ().alloc ())))
388
- .then ()
389
- .doOnSuccess (v -> this .connection .dispose ())
390
- .then (this .connection .onDispose ());
391
- }
392
-
393
- return Mono .empty ();
394
- });
395
- }
396
-
397
- private void drainError (Supplier <? extends Throwable > supplier ) {
398
- ResponseReceiver receiver ;
399
-
400
- while ((receiver = this .responseReceivers .poll ()) != null ) {
401
- receiver .sink .error (supplier .get ());
402
- }
403
- }
404
-
405
387
@ Override
406
388
public ByteBufAllocator getByteBufAllocator () {
407
389
return this .byteBufAllocator ;
@@ -441,11 +423,6 @@ public boolean isConnected() {
441
423
return channel .isOpen ();
442
424
}
443
425
444
- @ Override
445
- public Disposable addNotificationListener (Consumer <NotificationResponse > consumer ) {
446
- return this .notificationProcessor .subscribe (consumer );
447
- }
448
-
449
426
private static String toString (List <Field > fields ) {
450
427
451
428
StringJoiner joiner = new StringJoiner (", " );
@@ -468,23 +445,37 @@ private void handleConnectionError(Throwable error) {
468
445
drainError (() -> new PostgresConnectionException (error ));
469
446
}
470
447
471
- private static class ResponseReceiver {
448
+ private void drainError (Supplier <? extends Throwable > supplier ) {
449
+ Conversation receiver ;
450
+
451
+ while ((receiver = this .conversations .poll ()) != null ) {
452
+ receiver .sink .error (supplier .get ());
453
+ }
454
+ }
455
+
456
+ /**
457
+ * Value object representing a single conversation. The driver permits a single conversation at a time to ensure that request messages get routed to the proper response receiver and do not leak
458
+ * into other conversations. A conversation must be finished in the sense that the {@link Publisher} of {@link FrontendMessage} has completed before the next conversation is started.
459
+ * <p>
460
+ * A single conversation can make use of pipelining.
461
+ */
462
+ private static class Conversation {
472
463
473
464
private final FluxSink <BackendMessage > sink ;
474
465
475
466
private final Predicate <BackendMessage > takeUntil ;
476
467
477
- private ResponseReceiver (FluxSink <BackendMessage > sink , Predicate <BackendMessage > takeUntil ) {
468
+ private Conversation (FluxSink <BackendMessage > sink , Predicate <BackendMessage > takeUntil ) {
478
469
this .sink = sink ;
479
470
this .takeUntil = takeUntil ;
480
471
}
481
472
}
482
473
483
474
private final class EnsureSubscribersCompleteChannelHandler extends ChannelDuplexHandler {
484
475
485
- private final EmitterProcessor <FrontendMessage > requestProcessor ;
476
+ private final EmitterProcessor <Publisher < FrontendMessage > > requestProcessor ;
486
477
487
- private EnsureSubscribersCompleteChannelHandler (EmitterProcessor <FrontendMessage > requestProcessor ) {
478
+ private EnsureSubscribersCompleteChannelHandler (EmitterProcessor <Publisher < FrontendMessage > > requestProcessor ) {
488
479
this .requestProcessor = requestProcessor ;
489
480
}
490
481
0 commit comments