@@ -214,14 +214,22 @@ public Flux<BackendMessage> exchange(Predicate<BackendMessage> takeUntil, Publis
214
214
Assert .requireNonNull (takeUntil , "takeUntil must not be null" );
215
215
Assert .requireNonNull (requests , "requests must not be null" );
216
216
217
- return this .messageSubscriber .addConversation (takeUntil , requests , this .requests ::next , this ::isConnected );
217
+ if (!isConnected ()) {
218
+ return Flux .error (this .messageSubscriber .createClientClosedException ());
219
+ }
220
+
221
+ return this .messageSubscriber .addConversation (takeUntil , requests , this ::doSendRequest , this ::isConnected );
218
222
}
219
223
220
224
@ Override
221
225
public void send (FrontendMessage message ) {
222
226
Assert .requireNonNull (message , "requests must not be null" );
223
227
224
- this .requests .next (Mono .just (message ));
228
+ doSendRequest (Mono .just (message ));
229
+ }
230
+
231
+ private void doSendRequest (Publisher <FrontendMessage > it ) {
232
+ this .requests .next (it );
225
233
}
226
234
227
235
private Mono <Void > resumeError (Throwable throwable ) {
@@ -695,7 +703,7 @@ public Flux<BackendMessage> addConversation(Predicate<BackendMessage> takeUntil,
695
703
sink .onRequest (value -> onRequest (conversation , value ));
696
704
697
705
if (!isConnected .get ()) {
698
- sink .error (new PostgresConnectionClosedException ( "Cannot exchange messages because the connection is closed" ));
706
+ sink .error (createClientClosedException ( ));
699
707
return ;
700
708
}
701
709
@@ -708,12 +716,15 @@ public Flux<BackendMessage> addConversation(Predicate<BackendMessage> takeUntil,
708
716
sender .accept (requestMessages );
709
717
} else {
710
718
sink .error (new RequestQueueException ("Cannot exchange messages because the request queue limit is exceeded" ));
711
-
712
719
}
713
720
}
714
721
});
715
722
}
716
723
724
+ PostgresConnectionClosedException createClientClosedException () {
725
+ return new PostgresConnectionClosedException ("Cannot exchange messages because the connection is closed" );
726
+ }
727
+
717
728
/**
718
729
* {@link Subscription#request(long)} callback. Request more for a {@link Conversation}. Potentially, demands also more upstream elements.
719
730
*
0 commit comments