Skip to content

Commit 5e9d030

Browse files
committed
Polishing.
Reuse connection-closed exception factory method. [#492] Signed-off-by: Mark Paluch <[email protected]>
1 parent aacf383 commit 5e9d030

File tree

1 file changed

+15
-4
lines changed

1 file changed

+15
-4
lines changed

src/main/java/io/r2dbc/postgresql/client/ReactorNettyClient.java

+15-4
Original file line numberDiff line numberDiff line change
@@ -210,14 +210,22 @@ public Flux<BackendMessage> exchange(Predicate<BackendMessage> takeUntil, Publis
210210
Assert.requireNonNull(takeUntil, "takeUntil must not be null");
211211
Assert.requireNonNull(requests, "requests must not be null");
212212

213-
return this.messageSubscriber.addConversation(takeUntil, requests, it -> this.requestSink.emitNext(it, Sinks.EmitFailureHandler.FAIL_FAST), this::isConnected);
213+
if (!isConnected()) {
214+
return Flux.error(this.messageSubscriber.createClientClosedException());
215+
}
216+
217+
return this.messageSubscriber.addConversation(takeUntil, requests, this::doSendRequest, this::isConnected);
214218
}
215219

216220
@Override
217221
public void send(FrontendMessage message) {
218222
Assert.requireNonNull(message, "requests must not be null");
219223

220-
this.requestSink.emitNext(Mono.just(message), Sinks.EmitFailureHandler.FAIL_FAST);
224+
doSendRequest(Mono.just(message));
225+
}
226+
227+
private void doSendRequest(Publisher<FrontendMessage> it) {
228+
this.requestSink.emitNext(it, Sinks.EmitFailureHandler.FAIL_FAST);
221229
}
222230

223231
private Mono<Void> resumeError(Throwable throwable) {
@@ -676,7 +684,7 @@ public Flux<BackendMessage> addConversation(Predicate<BackendMessage> takeUntil,
676684
sink.onRequest(value -> onRequest(conversation, value));
677685

678686
if (!isConnected.get()) {
679-
sink.error(new PostgresConnectionClosedException("Cannot exchange messages because the connection is closed"));
687+
sink.error(createClientClosedException());
680688
return;
681689
}
682690

@@ -689,12 +697,15 @@ public Flux<BackendMessage> addConversation(Predicate<BackendMessage> takeUntil,
689697
sender.accept(requestMessages);
690698
} else {
691699
sink.error(new RequestQueueException("Cannot exchange messages because the request queue limit is exceeded"));
692-
693700
}
694701
}
695702
});
696703
}
697704

705+
PostgresConnectionClosedException createClientClosedException() {
706+
return new PostgresConnectionClosedException("Cannot exchange messages because the connection is closed");
707+
}
708+
698709
/**
699710
* {@link Subscription#request(long)} callback. Request more for a {@link Conversation}. Potentially, demands also more upstream elements.
700711
*

0 commit comments

Comments
 (0)