Skip to content

Commit 001ac02

Browse files
committed
Remove doOnNext(…) hook to close the sink if the connection is closed while emitting requests.
Once the conversation is accepted, we no longer need to check on a new backend message whether the connection is closed as a channelInactive()/connection.close() signal terminates conversations anyway. [#492] Signed-off-by: Mark Paluch <[email protected]>
1 parent f150a0f commit 001ac02

File tree

1 file changed

+17
-9
lines changed

1 file changed

+17
-9
lines changed

src/main/java/io/r2dbc/postgresql/client/ReactorNettyClient.java

+17-9
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
import reactor.core.publisher.Sinks;
5555
import reactor.core.scheduler.Schedulers;
5656
import reactor.netty.Connection;
57+
import reactor.netty.channel.AbortedException;
5758
import reactor.netty.tcp.TcpClient;
5859
import reactor.util.Logger;
5960
import reactor.util.Loggers;
@@ -496,6 +497,11 @@ private void handleClose() {
496497
}
497498

498499
private void handleConnectionError(Throwable error) {
500+
501+
if (AbortedException.isConnectionReset(error) && !isConnected()) {
502+
drainError(() -> this.messageSubscriber.createClientClosedException(error));
503+
}
504+
499505
drainError(() -> new PostgresConnectionException(error));
500506
}
501507

@@ -535,6 +541,10 @@ public PostgresConnectionClosedException(String reason) {
535541
super(reason);
536542
}
537543

544+
public PostgresConnectionClosedException(String reason, @Nullable Throwable cause) {
545+
super(reason, cause);
546+
}
547+
538548
}
539549

540550
static class PostgresConnectionException extends R2dbcNonTransientResourceException {
@@ -670,7 +680,7 @@ private class BackendMessageSubscriber implements CoreSubscriber<BackendMessage>
670680

671681
private Subscription upstream;
672682

673-
public Flux<BackendMessage> addConversation(Predicate<BackendMessage> takeUntil, Publisher<FrontendMessage> requests, Consumer<Flux<FrontendMessage>> sender,
683+
public Flux<BackendMessage> addConversation(Predicate<BackendMessage> takeUntil, Publisher<FrontendMessage> requests, Consumer<Publisher<FrontendMessage>> sender,
674684
Supplier<Boolean> isConnected) {
675685

676686
return Flux.create(sink -> {
@@ -688,13 +698,7 @@ public Flux<BackendMessage> addConversation(Predicate<BackendMessage> takeUntil,
688698
return;
689699
}
690700

691-
Flux<FrontendMessage> requestMessages = Flux.from(requests).doOnNext(m -> {
692-
if (!isConnected.get()) {
693-
sink.error(new PostgresConnectionClosedException("Cannot exchange messages because the connection is closed"));
694-
}
695-
});
696-
697-
sender.accept(requestMessages);
701+
sender.accept(requests);
698702
} else {
699703
sink.error(new RequestQueueException("Cannot exchange messages because the request queue limit is exceeded"));
700704
}
@@ -703,7 +707,11 @@ public Flux<BackendMessage> addConversation(Predicate<BackendMessage> takeUntil,
703707
}
704708

705709
PostgresConnectionClosedException createClientClosedException() {
706-
return new PostgresConnectionClosedException("Cannot exchange messages because the connection is closed");
710+
return createClientClosedException(null);
711+
}
712+
713+
PostgresConnectionClosedException createClientClosedException(@Nullable Throwable cause) {
714+
return new PostgresConnectionClosedException("Cannot exchange messages because the connection is closed", cause);
707715
}
708716

709717
/**

0 commit comments

Comments
 (0)