Skip to content

Commit dcf2e23

Browse files
committed
Remove doOnNext(…) hook to close the sink if the connection is closed while emitting requests.
Once the conversation is accepted, we no longer need to check on a new backend message whether the connection is closed as a channelInactive()/connection.close() signal terminates conversations anyway. [#492] Signed-off-by: Mark Paluch <[email protected]>
1 parent 6c98f43 commit dcf2e23

File tree

1 file changed

+17
-9
lines changed

1 file changed

+17
-9
lines changed

src/main/java/io/r2dbc/postgresql/client/ReactorNettyClient.java

+17-9
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
import reactor.core.publisher.Operators;
5656
import reactor.core.scheduler.Schedulers;
5757
import reactor.netty.Connection;
58+
import reactor.netty.channel.AbortedException;
5859
import reactor.netty.resources.ConnectionProvider;
5960
import reactor.netty.resources.LoopResources;
6061
import reactor.netty.tcp.TcpClient;
@@ -513,6 +514,11 @@ private void handleClose() {
513514
}
514515

515516
private void handleConnectionError(Throwable error) {
517+
518+
if (AbortedException.isConnectionReset(error) && !isConnected()) {
519+
drainError(() -> this.messageSubscriber.createClientClosedException(error));
520+
}
521+
516522
drainError(() -> new PostgresConnectionException(error));
517523
}
518524

@@ -554,6 +560,10 @@ public PostgresConnectionClosedException(String reason) {
554560
super(reason);
555561
}
556562

563+
public PostgresConnectionClosedException(String reason, @Nullable Throwable cause) {
564+
super(reason, cause);
565+
}
566+
557567
}
558568

559569
static class PostgresConnectionException extends R2dbcNonTransientResourceException {
@@ -689,7 +699,7 @@ private class BackendMessageSubscriber implements CoreSubscriber<BackendMessage>
689699

690700
private Subscription upstream;
691701

692-
public Flux<BackendMessage> addConversation(Predicate<BackendMessage> takeUntil, Publisher<FrontendMessage> requests, Consumer<Flux<FrontendMessage>> sender,
702+
public Flux<BackendMessage> addConversation(Predicate<BackendMessage> takeUntil, Publisher<FrontendMessage> requests, Consumer<Publisher<FrontendMessage>> sender,
693703
Supplier<Boolean> isConnected) {
694704

695705
return Flux.create(sink -> {
@@ -707,13 +717,7 @@ public Flux<BackendMessage> addConversation(Predicate<BackendMessage> takeUntil,
707717
return;
708718
}
709719

710-
Flux<FrontendMessage> requestMessages = Flux.from(requests).doOnNext(m -> {
711-
if (!isConnected.get()) {
712-
sink.error(new PostgresConnectionClosedException("Cannot exchange messages because the connection is closed"));
713-
}
714-
});
715-
716-
sender.accept(requestMessages);
720+
sender.accept(requests);
717721
} else {
718722
sink.error(new RequestQueueException("Cannot exchange messages because the request queue limit is exceeded"));
719723
}
@@ -722,7 +726,11 @@ public Flux<BackendMessage> addConversation(Predicate<BackendMessage> takeUntil,
722726
}
723727

724728
PostgresConnectionClosedException createClientClosedException() {
725-
return new PostgresConnectionClosedException("Cannot exchange messages because the connection is closed");
729+
return createClientClosedException(null);
730+
}
731+
732+
PostgresConnectionClosedException createClientClosedException(@Nullable Throwable cause) {
733+
return new PostgresConnectionClosedException("Cannot exchange messages because the connection is closed", cause);
726734
}
727735

728736
/**

0 commit comments

Comments
 (0)