Skip to content

Commit e12769f

Browse files
committed
Terminate Notification stream on disconnect
We now terminate the notification stream (PostgresqlConnection.getNotifications()) when the connection gets disconnected. If the connection is closed normally, the stream terminates successfully (onComplete). Unintended disconnects result in an error (onError). [#212]
1 parent 08bc7ea commit e12769f

File tree

7 files changed

+170
-11
lines changed

7 files changed

+170
-11
lines changed

src/main/java/io/r2dbc/postgresql/PostgresqlConnection.java

+29-8
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,13 @@
2424
import io.r2dbc.postgresql.client.SimpleQueryMessageFlow;
2525
import io.r2dbc.postgresql.client.TransactionStatus;
2626
import io.r2dbc.postgresql.codec.Codecs;
27+
import io.r2dbc.postgresql.message.backend.NotificationResponse;
2728
import io.r2dbc.postgresql.util.Assert;
2829
import io.r2dbc.spi.Connection;
2930
import io.r2dbc.spi.IsolationLevel;
3031
import io.r2dbc.spi.ValidationDepth;
3132
import org.reactivestreams.Publisher;
33+
import org.reactivestreams.Subscriber;
3234
import org.reactivestreams.Subscription;
3335
import reactor.core.CoreSubscriber;
3436
import reactor.core.Disposable;
@@ -242,17 +244,17 @@ public Mono<Void> setAutoCommit(boolean autoCommit) {
242244

243245
return useTransactionStatus(transactionStatus -> {
244246

245-
logger.debug(String.format("Setting auto-commit mode to [%s]", autoCommit));
247+
this.logger.debug(String.format("Setting auto-commit mode to [%s]", autoCommit));
246248

247249
if (isAutoCommit()) {
248250
if (!autoCommit) {
249-
logger.debug("Beginning transaction");
251+
this.logger.debug("Beginning transaction");
250252
return beginTransaction();
251253
}
252254
} else {
253255

254256
if (autoCommit) {
255-
logger.debug("Committing pending transactions");
257+
this.logger.debug("Committing pending transactions");
256258
return commitTransaction();
257259
}
258260
}
@@ -310,7 +312,7 @@ public void onNext(Integer integer) {
310312

311313
@Override
312314
public void onError(Throwable t) {
313-
logger.debug("Validation failed", t);
315+
PostgresqlConnection.this.logger.debug("Validation failed", t);
314316
sink.success(false);
315317
}
316318

@@ -354,7 +356,7 @@ static class NotificationAdapter {
354356

355357
private final DirectProcessor<Notification> processor = DirectProcessor.create();
356358

357-
private final FluxSink<Notification> sink = processor.sink();
359+
private final FluxSink<Notification> sink = this.processor.sink();
358360

359361
@Nullable
360362
private volatile Disposable subscription = null;
@@ -368,13 +370,32 @@ void dispose() {
368370

369371
void register(Client client) {
370372

371-
this.subscription = client.addNotificationListener(notificationResponse -> {
372-
sink.next(new NotificationResponseWrapper(notificationResponse));
373+
this.subscription = client.addNotificationListener(new Subscriber<NotificationResponse>() {
374+
375+
@Override
376+
public void onSubscribe(Subscription subscription) {
377+
subscription.request(Long.MAX_VALUE);
378+
}
379+
380+
@Override
381+
public void onNext(NotificationResponse notificationResponse) {
382+
NotificationAdapter.this.sink.next(new NotificationResponseWrapper(notificationResponse));
383+
}
384+
385+
@Override
386+
public void onError(Throwable throwable) {
387+
NotificationAdapter.this.sink.error(throwable);
388+
}
389+
390+
@Override
391+
public void onComplete() {
392+
NotificationAdapter.this.sink.complete();
393+
}
373394
});
374395
}
375396

376397
Flux<Notification> getEvents() {
377-
return processor;
398+
return this.processor;
378399
}
379400
}
380401

src/main/java/io/r2dbc/postgresql/api/PostgresqlConnection.java

+5-2
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@
1818

1919
import io.r2dbc.spi.Connection;
2020
import io.r2dbc.spi.IsolationLevel;
21+
import io.r2dbc.spi.R2dbcNonTransientResourceException;
2122
import io.r2dbc.spi.ValidationDepth;
23+
import org.reactivestreams.Subscriber;
2224
import reactor.core.publisher.Flux;
2325
import reactor.core.publisher.Mono;
2426

@@ -64,8 +66,9 @@ public interface PostgresqlConnection extends Connection {
6466
PostgresqlStatement createStatement(String sql);
6567

6668
/**
67-
* Return a {@link Flux} of {@link Notification} received from {@code LISTEN} registrations.
68-
* The stream is a hot stream producing messages as they are received.
69+
* Return a {@link Flux} of {@link Notification} received from {@code LISTEN} registrations. The stream is a hot stream producing messages as they are received. Notifications received by this
70+
* connection are published as they are received. When the client gets {@link #close() closed}, the subscription {@link Subscriber#onComplete() completes normally}. Otherwise (transport
71+
* connection disconnected unintentionally) with an {@link R2dbcNonTransientResourceException error}.
6972
*
7073
* @return a hot {@link Flux} of {@link Notification Notifications}.
7174
*/

src/main/java/io/r2dbc/postgresql/client/Client.java

+15-1
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,9 @@
2121
import io.r2dbc.postgresql.message.backend.NotificationResponse;
2222
import io.r2dbc.postgresql.message.backend.ReadyForQuery;
2323
import io.r2dbc.postgresql.message.frontend.FrontendMessage;
24+
import io.r2dbc.spi.R2dbcNonTransientResourceException;
2425
import org.reactivestreams.Publisher;
26+
import org.reactivestreams.Subscriber;
2527
import reactor.core.Disposable;
2628
import reactor.core.publisher.Flux;
2729
import reactor.core.publisher.Mono;
@@ -35,14 +37,26 @@
3537
public interface Client {
3638

3739
/**
38-
* Add a consumer of notification messages.
40+
* Add a consumer of notification messages. Notifications received by this connection are sent to the {@link Consumer notification consumer}. Note that connection errors and events such as
41+
* disconnects are not visible to the {@link Consumer notification consumer}.
3942
*
4043
* @param consumer the consumer of notification messages
4144
* @return a new {@link Disposable} that can be used to cancel the underlying subscription.
4245
* @throws IllegalArgumentException if {@code consumer} is {@code null}
4346
*/
4447
Disposable addNotificationListener(Consumer<NotificationResponse> consumer);
4548

49+
/**
50+
* Add a consumer of notification messages. Notifications received by this connection are sent to the {@link Subscriber notification listener}. When the client gets {@link #close() closed}, the
51+
* subscription {@link Subscriber#onComplete() completes normally}. Otherwise (transport connection disconnected unintentionally) with an {@link R2dbcNonTransientResourceException error}.
52+
*
53+
* @param consumer the consumer of notification messages
54+
* @return a new {@link Disposable} that can be used to cancel the underlying subscription.
55+
* @throws IllegalArgumentException if {@code consumer} is {@code null}
56+
* @since 0.8.1
57+
*/
58+
Disposable addNotificationListener(Subscriber<NotificationResponse> consumer);
59+
4660
/**
4761
* Release any resources held by the {@link Client}.
4862
*

src/main/java/io/r2dbc/postgresql/client/ReactorNettyClient.java

+14
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import io.r2dbc.postgresql.util.Assert;
4747
import io.r2dbc.spi.R2dbcNonTransientResourceException;
4848
import org.reactivestreams.Publisher;
49+
import org.reactivestreams.Subscriber;
4950
import reactor.core.Disposable;
5051
import reactor.core.publisher.DirectProcessor;
5152
import reactor.core.publisher.EmitterProcessor;
@@ -337,6 +338,10 @@ private static Mono<? extends Void> registerSslHandler(SSLConfig sslConfig, Conn
337338
public Mono<Void> close() {
338339
return Mono.defer(() -> {
339340

341+
if (!this.notificationProcessor.isTerminated()) {
342+
this.notificationProcessor.onComplete();
343+
}
344+
340345
drainError(EXPECTED);
341346
if (this.isClosed.compareAndSet(false, true)) {
342347

@@ -439,6 +444,11 @@ public Disposable addNotificationListener(Consumer<NotificationResponse> consume
439444
return this.notificationProcessor.subscribe(consumer);
440445
}
441446

447+
@Override
448+
public Disposable addNotificationListener(Subscriber<NotificationResponse> consumer) {
449+
return this.notificationProcessor.subscribe(consumer::onNext, consumer::onError, consumer::onComplete, consumer::onSubscribe);
450+
}
451+
442452
private static String toString(List<Field> fields) {
443453

444454
StringJoiner joiner = new StringJoiner(", ");
@@ -467,6 +477,10 @@ private void drainError(Supplier<? extends Throwable> supplier) {
467477
while ((receiver = this.responseReceivers.poll()) != null) {
468478
receiver.error(supplier.get());
469479
}
480+
481+
if (!this.notificationProcessor.isTerminated()) {
482+
this.notificationProcessor.onError(supplier.get());
483+
}
470484
}
471485

472486
private final class EnsureSubscribersCompleteChannelHandler extends ChannelDuplexHandler {

src/test/java/io/r2dbc/postgresql/PostgresNotificationIntegrationTests.java

+28
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,11 @@
1616

1717
package io.r2dbc.postgresql;
1818

19+
import io.netty.channel.Channel;
1920
import io.r2dbc.postgresql.api.Notification;
21+
import io.r2dbc.postgresql.api.PostgresqlConnection;
22+
import io.r2dbc.postgresql.util.ConnectionIntrospector;
23+
import io.r2dbc.spi.R2dbcNonTransientResourceException;
2024
import org.junit.jupiter.api.Test;
2125
import reactor.core.Disposable;
2226
import reactor.test.StepVerifier;
@@ -55,4 +59,28 @@ void shouldReceivePubSubNotifications() throws Exception {
5559

5660
listener.dispose();
5761
}
62+
63+
@Test
64+
void listenShouldCompleteOnConnectionClose() {
65+
66+
PostgresqlConnection connection = this.connectionFactory.create().block();
67+
68+
connection.getNotifications().as(StepVerifier::create).expectSubscription()
69+
.then(() -> connection.close().subscribe())
70+
.verifyComplete();
71+
}
72+
73+
@Test
74+
void listenShouldFailOnConnectionDisconnected() {
75+
76+
PostgresqlConnection connection = this.connectionFactory.create().block();
77+
78+
connection.getNotifications().as(StepVerifier::create).expectSubscription()
79+
.then(() -> {
80+
81+
Channel channel = ConnectionIntrospector.of(connection).getChannel();
82+
channel.close();
83+
})
84+
.verifyError(R2dbcNonTransientResourceException.class);
85+
}
5886
}

src/test/java/io/r2dbc/postgresql/client/TestClient.java

+6
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import io.r2dbc.postgresql.util.Assert;
2424
import io.r2dbc.postgresql.util.TestByteBufAllocator;
2525
import org.reactivestreams.Publisher;
26+
import org.reactivestreams.Subscriber;
2627
import reactor.core.Disposable;
2728
import reactor.core.publisher.EmitterProcessor;
2829
import reactor.core.publisher.Flux;
@@ -146,6 +147,11 @@ public Disposable addNotificationListener(Consumer<NotificationResponse> consume
146147
return this.notificationProcessor.subscribe(consumer);
147148
}
148149

150+
@Override
151+
public Disposable addNotificationListener(Subscriber<NotificationResponse> consumer) {
152+
return this.notificationProcessor.subscribe(consumer::onNext, consumer::onError, consumer::onComplete, consumer::onSubscribe);
153+
}
154+
149155
public void notify(NotificationResponse notification) {
150156
this.notificationProcessor.onNext(notification);
151157
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
* Copyright 2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.r2dbc.postgresql.util;
18+
19+
import io.netty.channel.Channel;
20+
import io.r2dbc.postgresql.api.PostgresqlConnection;
21+
import io.r2dbc.postgresql.client.ReactorNettyClient;
22+
import org.springframework.beans.DirectFieldAccessor;
23+
import reactor.netty.Connection;
24+
25+
/**
26+
* Utility to introspect a {@link PostgresqlConnection}.
27+
*/
28+
public final class ConnectionIntrospector {
29+
30+
private final PostgresqlConnection connection;
31+
32+
private ConnectionIntrospector(PostgresqlConnection connection) {
33+
this.connection = connection;
34+
}
35+
36+
/**
37+
* Create a new instance.
38+
*
39+
* @param connection
40+
* @return
41+
*/
42+
public static ConnectionIntrospector of(PostgresqlConnection connection) {
43+
return new ConnectionIntrospector(connection);
44+
}
45+
46+
/**
47+
* Return the transport {@link Channel}.
48+
*
49+
* @return the transport {@link Channel}.
50+
*/
51+
public Channel getChannel() {
52+
53+
DirectFieldAccessor accessor = new DirectFieldAccessor(getClient());
54+
Connection connection = (Connection) accessor.getPropertyValue("connection");
55+
56+
return connection.channel();
57+
}
58+
59+
/**
60+
* Return the underlying {@link ReactorNettyClient}.
61+
*
62+
* @return the underlying {@link ReactorNettyClient}.
63+
*/
64+
public ReactorNettyClient getClient() {
65+
66+
DirectFieldAccessor accessor = new DirectFieldAccessor(this.connection);
67+
Object value = accessor.getPropertyValue("client");
68+
69+
Assert.requireType(value, ReactorNettyClient.class, "Client must be of type ReactorNettyClient. Was: " + value.getClass().getName());
70+
71+
return (ReactorNettyClient) value;
72+
}
73+
}

0 commit comments

Comments
 (0)