@@ -172,6 +172,35 @@ private ReactorNettyClient(Connection connection) {
172
172
.subscribe ();
173
173
}
174
174
175
+ @ Override
176
+ public Disposable addNotificationListener (Consumer <NotificationResponse > consumer ) {
177
+ return this .notificationProcessor .subscribe (consumer );
178
+ }
179
+
180
+ @ Override
181
+ public Mono <Void > close () {
182
+ return Mono .defer (() -> {
183
+
184
+ drainError (EXPECTED );
185
+ if (this .isClosed .compareAndSet (false , true )) {
186
+
187
+ if (!isConnected () || this .processId == null ) {
188
+ this .connection .dispose ();
189
+ return this .connection .onDispose ();
190
+ }
191
+
192
+ return Flux .just (Terminate .INSTANCE )
193
+ .doOnNext (message -> logger .debug ("Request: {}" , message ))
194
+ .concatMap (message -> this .connection .outbound ().send (message .encode (this .connection .outbound ().alloc ())))
195
+ .then ()
196
+ .doOnSuccess (v -> this .connection .dispose ())
197
+ .then (this .connection .onDispose ());
198
+ }
199
+
200
+ return Mono .empty ();
201
+ });
202
+ }
203
+
175
204
@ Override
176
205
public Flux <BackendMessage > exchange (Predicate <BackendMessage > takeUntil , Publisher <FrontendMessage > requests ) {
177
206
Assert .requireNonNull (takeUntil , "takeUntil must not be null" );
@@ -355,38 +384,6 @@ private static Mono<? extends Void> registerSslHandler(SSLConfig sslConfig, Conn
355
384
return Mono .empty ();
356
385
}
357
386
358
- @ Override
359
- public Mono <Void > close () {
360
- return Mono .defer (() -> {
361
-
362
- drainError (EXPECTED );
363
- if (this .isClosed .compareAndSet (false , true )) {
364
-
365
- if (!isConnected () || this .processId == null ) {
366
- this .connection .dispose ();
367
- return this .connection .onDispose ();
368
- }
369
-
370
- return Flux .just (Terminate .INSTANCE )
371
- .doOnNext (message -> logger .debug ("Request: {}" , message ))
372
- .concatMap (message -> this .connection .outbound ().send (message .encode (this .connection .outbound ().alloc ())))
373
- .then ()
374
- .doOnSuccess (v -> this .connection .dispose ())
375
- .then (this .connection .onDispose ());
376
- }
377
-
378
- return Mono .empty ();
379
- });
380
- }
381
-
382
- private void drainError (Supplier <? extends Throwable > supplier ) {
383
- Conversation receiver ;
384
-
385
- while ((receiver = this .conversations .poll ()) != null ) {
386
- receiver .sink .error (supplier .get ());
387
- }
388
- }
389
-
390
387
@ Override
391
388
public ByteBufAllocator getByteBufAllocator () {
392
389
return this .byteBufAllocator ;
@@ -426,11 +423,6 @@ public boolean isConnected() {
426
423
return channel .isOpen ();
427
424
}
428
425
429
- @ Override
430
- public Disposable addNotificationListener (Consumer <NotificationResponse > consumer ) {
431
- return this .notificationProcessor .subscribe (consumer );
432
- }
433
-
434
426
private static String toString (List <Field > fields ) {
435
427
436
428
StringJoiner joiner = new StringJoiner (", " );
@@ -453,6 +445,14 @@ private void handleConnectionError(Throwable error) {
453
445
drainError (() -> new PostgresConnectionException (error ));
454
446
}
455
447
448
+ private void drainError (Supplier <? extends Throwable > supplier ) {
449
+ Conversation receiver ;
450
+
451
+ while ((receiver = this .conversations .poll ()) != null ) {
452
+ receiver .sink .error (supplier .get ());
453
+ }
454
+ }
455
+
456
456
/**
457
457
* Value object representing a single conversation. The driver permits a single conversation at a time to ensure that request messages get routed to the proper response receiver and do not leak
458
458
* into other conversations. A conversation must be finished in the sense that the {@link Publisher} of {@link FrontendMessage} has completed before the next conversation is started.
0 commit comments