Skip to content

Commit 68ebc2a

Browse files
committed
Terminate Notification stream on disconnect
We now terminate the notification stream (PostgresqlConnection.getNotifications()) when the connection gets disconnected. If the connection is closed normally, the stream terminates successfully (onComplete). Unintended disconnects result in an error (onError). [#212]
1 parent a78a07c commit 68ebc2a

File tree

7 files changed

+176
-16
lines changed

7 files changed

+176
-16
lines changed

src/main/java/io/r2dbc/postgresql/PostgresqlConnection.java

+29-8
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,14 @@
2424
import io.r2dbc.postgresql.client.SimpleQueryMessageFlow;
2525
import io.r2dbc.postgresql.client.TransactionStatus;
2626
import io.r2dbc.postgresql.codec.Codecs;
27+
import io.r2dbc.postgresql.message.backend.NotificationResponse;
2728
import io.r2dbc.postgresql.util.Assert;
2829
import io.r2dbc.postgresql.util.Operators;
2930
import io.r2dbc.spi.Connection;
3031
import io.r2dbc.spi.IsolationLevel;
3132
import io.r2dbc.spi.ValidationDepth;
3233
import org.reactivestreams.Publisher;
34+
import org.reactivestreams.Subscriber;
3335
import org.reactivestreams.Subscription;
3436
import reactor.core.CoreSubscriber;
3537
import reactor.core.Disposable;
@@ -246,17 +248,17 @@ public Mono<Void> setAutoCommit(boolean autoCommit) {
246248

247249
return useTransactionStatus(transactionStatus -> {
248250

249-
logger.debug(String.format("Setting auto-commit mode to [%s]", autoCommit));
251+
this.logger.debug(String.format("Setting auto-commit mode to [%s]", autoCommit));
250252

251253
if (isAutoCommit()) {
252254
if (!autoCommit) {
253-
logger.debug("Beginning transaction");
255+
this.logger.debug("Beginning transaction");
254256
return beginTransaction();
255257
}
256258
} else {
257259

258260
if (autoCommit) {
259-
logger.debug("Committing pending transactions");
261+
this.logger.debug("Committing pending transactions");
260262
return commitTransaction();
261263
}
262264
}
@@ -314,7 +316,7 @@ public void onNext(Integer integer) {
314316

315317
@Override
316318
public void onError(Throwable t) {
317-
logger.debug("Validation failed", t);
319+
PostgresqlConnection.this.logger.debug("Validation failed", t);
318320
sink.success(false);
319321
}
320322

@@ -360,7 +362,7 @@ static class NotificationAdapter {
360362

361363
private final DirectProcessor<Notification> processor = DirectProcessor.create();
362364

363-
private final FluxSink<Notification> sink = processor.sink();
365+
private final FluxSink<Notification> sink = this.processor.sink();
364366

365367
@Nullable
366368
private volatile Disposable subscription = null;
@@ -374,13 +376,32 @@ void dispose() {
374376

375377
void register(Client client) {
376378

377-
this.subscription = client.addNotificationListener(notificationResponse -> {
378-
sink.next(new NotificationResponseWrapper(notificationResponse));
379+
this.subscription = client.addNotificationListener(new Subscriber<NotificationResponse>() {
380+
381+
@Override
382+
public void onSubscribe(Subscription subscription) {
383+
subscription.request(Long.MAX_VALUE);
384+
}
385+
386+
@Override
387+
public void onNext(NotificationResponse notificationResponse) {
388+
NotificationAdapter.this.sink.next(new NotificationResponseWrapper(notificationResponse));
389+
}
390+
391+
@Override
392+
public void onError(Throwable throwable) {
393+
NotificationAdapter.this.sink.error(throwable);
394+
}
395+
396+
@Override
397+
public void onComplete() {
398+
NotificationAdapter.this.sink.complete();
399+
}
379400
});
380401
}
381402

382403
Flux<Notification> getEvents() {
383-
return processor;
404+
return this.processor;
384405
}
385406
}
386407

src/main/java/io/r2dbc/postgresql/api/PostgresqlConnection.java

+5-2
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@
1818

1919
import io.r2dbc.spi.Connection;
2020
import io.r2dbc.spi.IsolationLevel;
21+
import io.r2dbc.spi.R2dbcNonTransientResourceException;
2122
import io.r2dbc.spi.ValidationDepth;
23+
import org.reactivestreams.Subscriber;
2224
import reactor.core.publisher.Flux;
2325
import reactor.core.publisher.Mono;
2426

@@ -64,8 +66,9 @@ public interface PostgresqlConnection extends Connection {
6466
PostgresqlStatement createStatement(String sql);
6567

6668
/**
67-
* Return a {@link Flux} of {@link Notification} received from {@code LISTEN} registrations.
68-
* The stream is a hot stream producing messages as they are received.
69+
* Return a {@link Flux} of {@link Notification} received from {@code LISTEN} registrations. The stream is a hot stream producing messages as they are received. Notifications received by this
70+
* connection are published as they are received. When the client gets {@link #close() closed}, the subscription {@link Subscriber#onComplete() completes normally}. Otherwise (transport
71+
* connection disconnected unintentionally) with an {@link R2dbcNonTransientResourceException error}.
6972
*
7073
* @return a hot {@link Flux} of {@link Notification Notifications}.
7174
*/

src/main/java/io/r2dbc/postgresql/client/Client.java

+15-1
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,9 @@
2121
import io.r2dbc.postgresql.message.backend.NotificationResponse;
2222
import io.r2dbc.postgresql.message.backend.ReadyForQuery;
2323
import io.r2dbc.postgresql.message.frontend.FrontendMessage;
24+
import io.r2dbc.spi.R2dbcNonTransientResourceException;
2425
import org.reactivestreams.Publisher;
26+
import org.reactivestreams.Subscriber;
2527
import reactor.core.Disposable;
2628
import reactor.core.publisher.Flux;
2729
import reactor.core.publisher.Mono;
@@ -36,14 +38,26 @@
3638
public interface Client {
3739

3840
/**
39-
* Add a consumer of notification messages.
41+
* Add a consumer of notification messages. Notifications received by this connection are sent to the {@link Consumer notification consumer}. Note that connection errors and events such as
42+
* disconnects are not visible to the {@link Consumer notification consumer}.
4043
*
4144
* @param consumer the consumer of notification messages
4245
* @return a new {@link Disposable} that can be used to cancel the underlying subscription.
4346
* @throws IllegalArgumentException if {@code consumer} is {@code null}
4447
*/
4548
Disposable addNotificationListener(Consumer<NotificationResponse> consumer);
4649

50+
/**
51+
* Add a consumer of notification messages. Notifications received by this connection are sent to the {@link Subscriber notification listener}. When the client gets {@link #close() closed}, the
52+
* subscription {@link Subscriber#onComplete() completes normally}. Otherwise (transport connection disconnected unintentionally) with an {@link R2dbcNonTransientResourceException error}.
53+
*
54+
* @param consumer the consumer of notification messages
55+
* @return a new {@link Disposable} that can be used to cancel the underlying subscription.
56+
* @throws IllegalArgumentException if {@code consumer} is {@code null}
57+
* @since 0.8.1
58+
*/
59+
Disposable addNotificationListener(Subscriber<NotificationResponse> consumer);
60+
4761
/**
4862
* Release any resources held by the {@link Client}.
4963
*

src/main/java/io/r2dbc/postgresql/client/ReactorNettyClient.java

+20-5
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import io.r2dbc.postgresql.util.Assert;
4747
import io.r2dbc.spi.R2dbcNonTransientResourceException;
4848
import org.reactivestreams.Publisher;
49+
import org.reactivestreams.Subscriber;
4950
import reactor.core.Disposable;
5051
import reactor.core.publisher.DirectProcessor;
5152
import reactor.core.publisher.EmitterProcessor;
@@ -178,16 +179,16 @@ private ReactorNettyClient(Connection connection) {
178179
.subscribe();
179180
}
180181

181-
@Override
182-
public Disposable addNotificationListener(Consumer<NotificationResponse> consumer) {
183-
return this.notificationProcessor.subscribe(consumer);
184-
}
185-
186182
@Override
187183
public Mono<Void> close() {
188184
return Mono.defer(() -> {
189185

186+
if (!this.notificationProcessor.isTerminated()) {
187+
this.notificationProcessor.onComplete();
188+
}
189+
190190
drainError(EXPECTED);
191+
191192
if (this.isClosed.compareAndSet(false, true)) {
192193

193194
if (!isConnected() || this.processId == null) {
@@ -390,6 +391,16 @@ private static Mono<? extends Void> registerSslHandler(SSLConfig sslConfig, Conn
390391
return Mono.empty();
391392
}
392393

394+
@Override
395+
public Disposable addNotificationListener(Consumer<NotificationResponse> consumer) {
396+
return this.notificationProcessor.subscribe(consumer);
397+
}
398+
399+
@Override
400+
public Disposable addNotificationListener(Subscriber<NotificationResponse> consumer) {
401+
return this.notificationProcessor.subscribe(consumer::onNext, consumer::onError, consumer::onComplete, consumer::onSubscribe);
402+
}
403+
393404
@Override
394405
public ByteBufAllocator getByteBufAllocator() {
395406
return this.byteBufAllocator;
@@ -457,6 +468,10 @@ private void drainError(Supplier<? extends Throwable> supplier) {
457468
while ((receiver = this.conversations.poll()) != null) {
458469
receiver.sink.error(supplier.get());
459470
}
471+
472+
if (!this.notificationProcessor.isTerminated()) {
473+
this.notificationProcessor.onError(supplier.get());
474+
}
460475
}
461476

462477
/**

src/test/java/io/r2dbc/postgresql/PostgresNotificationIntegrationTests.java

+28
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,11 @@
1616

1717
package io.r2dbc.postgresql;
1818

19+
import io.netty.channel.Channel;
1920
import io.r2dbc.postgresql.api.Notification;
21+
import io.r2dbc.postgresql.api.PostgresqlConnection;
22+
import io.r2dbc.postgresql.util.ConnectionIntrospector;
23+
import io.r2dbc.spi.R2dbcNonTransientResourceException;
2024
import org.junit.jupiter.api.Test;
2125
import reactor.core.Disposable;
2226
import reactor.test.StepVerifier;
@@ -55,4 +59,28 @@ void shouldReceivePubSubNotifications() throws Exception {
5559

5660
listener.dispose();
5761
}
62+
63+
@Test
64+
void listenShouldCompleteOnConnectionClose() {
65+
66+
PostgresqlConnection connection = this.connectionFactory.create().block();
67+
68+
connection.getNotifications().as(StepVerifier::create).expectSubscription()
69+
.then(() -> connection.close().subscribe())
70+
.verifyComplete();
71+
}
72+
73+
@Test
74+
void listenShouldFailOnConnectionDisconnected() {
75+
76+
PostgresqlConnection connection = this.connectionFactory.create().block();
77+
78+
connection.getNotifications().as(StepVerifier::create).expectSubscription()
79+
.then(() -> {
80+
81+
Channel channel = ConnectionIntrospector.of(connection).getChannel();
82+
channel.close();
83+
})
84+
.verifyError(R2dbcNonTransientResourceException.class);
85+
}
5886
}

src/test/java/io/r2dbc/postgresql/client/TestClient.java

+6
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import io.r2dbc.postgresql.util.Assert;
2424
import io.r2dbc.postgresql.util.TestByteBufAllocator;
2525
import org.reactivestreams.Publisher;
26+
import org.reactivestreams.Subscriber;
2627
import reactor.core.Disposable;
2728
import reactor.core.publisher.EmitterProcessor;
2829
import reactor.core.publisher.Flux;
@@ -163,6 +164,11 @@ public Disposable addNotificationListener(Consumer<NotificationResponse> consume
163164
return this.notificationProcessor.subscribe(consumer);
164165
}
165166

167+
@Override
168+
public Disposable addNotificationListener(Subscriber<NotificationResponse> consumer) {
169+
return this.notificationProcessor.subscribe(consumer::onNext, consumer::onError, consumer::onComplete, consumer::onSubscribe);
170+
}
171+
166172
public void notify(NotificationResponse notification) {
167173
this.notificationProcessor.onNext(notification);
168174
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
* Copyright 2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.r2dbc.postgresql.util;
18+
19+
import io.netty.channel.Channel;
20+
import io.r2dbc.postgresql.api.PostgresqlConnection;
21+
import io.r2dbc.postgresql.client.ReactorNettyClient;
22+
import org.springframework.beans.DirectFieldAccessor;
23+
import reactor.netty.Connection;
24+
25+
/**
26+
* Utility to introspect a {@link PostgresqlConnection}.
27+
*/
28+
public final class ConnectionIntrospector {
29+
30+
private final PostgresqlConnection connection;
31+
32+
private ConnectionIntrospector(PostgresqlConnection connection) {
33+
this.connection = connection;
34+
}
35+
36+
/**
37+
* Create a new instance.
38+
*
39+
* @param connection
40+
* @return
41+
*/
42+
public static ConnectionIntrospector of(PostgresqlConnection connection) {
43+
return new ConnectionIntrospector(connection);
44+
}
45+
46+
/**
47+
* Return the transport {@link Channel}.
48+
*
49+
* @return the transport {@link Channel}.
50+
*/
51+
public Channel getChannel() {
52+
53+
DirectFieldAccessor accessor = new DirectFieldAccessor(getClient());
54+
Connection connection = (Connection) accessor.getPropertyValue("connection");
55+
56+
return connection.channel();
57+
}
58+
59+
/**
60+
* Return the underlying {@link ReactorNettyClient}.
61+
*
62+
* @return the underlying {@link ReactorNettyClient}.
63+
*/
64+
public ReactorNettyClient getClient() {
65+
66+
DirectFieldAccessor accessor = new DirectFieldAccessor(this.connection);
67+
Object value = accessor.getPropertyValue("client");
68+
69+
Assert.requireType(value, ReactorNettyClient.class, "Client must be of type ReactorNettyClient. Was: " + value.getClass().getName());
70+
71+
return (ReactorNettyClient) value;
72+
}
73+
}

0 commit comments

Comments
 (0)