Skip to content

Commit a1bef7e

Browse files
committed
Merge branch 'NotificationTermination'
[#227][#212]
2 parents a78a07c + 68ebc2a commit a1bef7e

File tree

7 files changed

+176
-16
lines changed

7 files changed

+176
-16
lines changed

src/main/java/io/r2dbc/postgresql/PostgresqlConnection.java

+29-8
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,14 @@
2424
import io.r2dbc.postgresql.client.SimpleQueryMessageFlow;
2525
import io.r2dbc.postgresql.client.TransactionStatus;
2626
import io.r2dbc.postgresql.codec.Codecs;
27+
import io.r2dbc.postgresql.message.backend.NotificationResponse;
2728
import io.r2dbc.postgresql.util.Assert;
2829
import io.r2dbc.postgresql.util.Operators;
2930
import io.r2dbc.spi.Connection;
3031
import io.r2dbc.spi.IsolationLevel;
3132
import io.r2dbc.spi.ValidationDepth;
3233
import org.reactivestreams.Publisher;
34+
import org.reactivestreams.Subscriber;
3335
import org.reactivestreams.Subscription;
3436
import reactor.core.CoreSubscriber;
3537
import reactor.core.Disposable;
@@ -246,17 +248,17 @@ public Mono<Void> setAutoCommit(boolean autoCommit) {
246248

247249
return useTransactionStatus(transactionStatus -> {
248250

249-
logger.debug(String.format("Setting auto-commit mode to [%s]", autoCommit));
251+
this.logger.debug(String.format("Setting auto-commit mode to [%s]", autoCommit));
250252

251253
if (isAutoCommit()) {
252254
if (!autoCommit) {
253-
logger.debug("Beginning transaction");
255+
this.logger.debug("Beginning transaction");
254256
return beginTransaction();
255257
}
256258
} else {
257259

258260
if (autoCommit) {
259-
logger.debug("Committing pending transactions");
261+
this.logger.debug("Committing pending transactions");
260262
return commitTransaction();
261263
}
262264
}
@@ -314,7 +316,7 @@ public void onNext(Integer integer) {
314316

315317
@Override
316318
public void onError(Throwable t) {
317-
logger.debug("Validation failed", t);
319+
PostgresqlConnection.this.logger.debug("Validation failed", t);
318320
sink.success(false);
319321
}
320322

@@ -360,7 +362,7 @@ static class NotificationAdapter {
360362

361363
private final DirectProcessor<Notification> processor = DirectProcessor.create();
362364

363-
private final FluxSink<Notification> sink = processor.sink();
365+
private final FluxSink<Notification> sink = this.processor.sink();
364366

365367
@Nullable
366368
private volatile Disposable subscription = null;
@@ -374,13 +376,32 @@ void dispose() {
374376

375377
void register(Client client) {
376378

377-
this.subscription = client.addNotificationListener(notificationResponse -> {
378-
sink.next(new NotificationResponseWrapper(notificationResponse));
379+
this.subscription = client.addNotificationListener(new Subscriber<NotificationResponse>() {
380+
381+
@Override
382+
public void onSubscribe(Subscription subscription) {
383+
subscription.request(Long.MAX_VALUE);
384+
}
385+
386+
@Override
387+
public void onNext(NotificationResponse notificationResponse) {
388+
NotificationAdapter.this.sink.next(new NotificationResponseWrapper(notificationResponse));
389+
}
390+
391+
@Override
392+
public void onError(Throwable throwable) {
393+
NotificationAdapter.this.sink.error(throwable);
394+
}
395+
396+
@Override
397+
public void onComplete() {
398+
NotificationAdapter.this.sink.complete();
399+
}
379400
});
380401
}
381402

382403
Flux<Notification> getEvents() {
383-
return processor;
404+
return this.processor;
384405
}
385406
}
386407

src/main/java/io/r2dbc/postgresql/api/PostgresqlConnection.java

+5-2
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@
1818

1919
import io.r2dbc.spi.Connection;
2020
import io.r2dbc.spi.IsolationLevel;
21+
import io.r2dbc.spi.R2dbcNonTransientResourceException;
2122
import io.r2dbc.spi.ValidationDepth;
23+
import org.reactivestreams.Subscriber;
2224
import reactor.core.publisher.Flux;
2325
import reactor.core.publisher.Mono;
2426

@@ -64,8 +66,9 @@ public interface PostgresqlConnection extends Connection {
6466
PostgresqlStatement createStatement(String sql);
6567

6668
/**
67-
* Return a {@link Flux} of {@link Notification} received from {@code LISTEN} registrations.
68-
* The stream is a hot stream producing messages as they are received.
69+
* Return a {@link Flux} of {@link Notification} received from {@code LISTEN} registrations. The stream is a hot stream producing messages as they are received. Notifications received by this
70+
* connection are published as they are received. When the client gets {@link #close() closed}, the subscription {@link Subscriber#onComplete() completes normally}. Otherwise (transport
71+
* connection disconnected unintentionally) with an {@link R2dbcNonTransientResourceException error}.
6972
*
7073
* @return a hot {@link Flux} of {@link Notification Notifications}.
7174
*/

src/main/java/io/r2dbc/postgresql/client/Client.java

+15-1
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,9 @@
2121
import io.r2dbc.postgresql.message.backend.NotificationResponse;
2222
import io.r2dbc.postgresql.message.backend.ReadyForQuery;
2323
import io.r2dbc.postgresql.message.frontend.FrontendMessage;
24+
import io.r2dbc.spi.R2dbcNonTransientResourceException;
2425
import org.reactivestreams.Publisher;
26+
import org.reactivestreams.Subscriber;
2527
import reactor.core.Disposable;
2628
import reactor.core.publisher.Flux;
2729
import reactor.core.publisher.Mono;
@@ -36,14 +38,26 @@
3638
public interface Client {
3739

3840
/**
39-
* Add a consumer of notification messages.
41+
* Add a consumer of notification messages. Notifications received by this connection are sent to the {@link Consumer notification consumer}. Note that connection errors and events such as
42+
* disconnects are not visible to the {@link Consumer notification consumer}.
4043
*
4144
* @param consumer the consumer of notification messages
4245
* @return a new {@link Disposable} that can be used to cancel the underlying subscription.
4346
* @throws IllegalArgumentException if {@code consumer} is {@code null}
4447
*/
4548
Disposable addNotificationListener(Consumer<NotificationResponse> consumer);
4649

50+
/**
51+
* Add a consumer of notification messages. Notifications received by this connection are sent to the {@link Subscriber notification listener}. When the client gets {@link #close() closed}, the
52+
* subscription {@link Subscriber#onComplete() completes normally}. Otherwise (transport connection disconnected unintentionally) with an {@link R2dbcNonTransientResourceException error}.
53+
*
54+
* @param consumer the consumer of notification messages
55+
* @return a new {@link Disposable} that can be used to cancel the underlying subscription.
56+
* @throws IllegalArgumentException if {@code consumer} is {@code null}
57+
* @since 0.8.1
58+
*/
59+
Disposable addNotificationListener(Subscriber<NotificationResponse> consumer);
60+
4761
/**
4862
* Release any resources held by the {@link Client}.
4963
*

src/main/java/io/r2dbc/postgresql/client/ReactorNettyClient.java

+20-5
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import io.r2dbc.postgresql.util.Assert;
4747
import io.r2dbc.spi.R2dbcNonTransientResourceException;
4848
import org.reactivestreams.Publisher;
49+
import org.reactivestreams.Subscriber;
4950
import reactor.core.Disposable;
5051
import reactor.core.publisher.DirectProcessor;
5152
import reactor.core.publisher.EmitterProcessor;
@@ -178,16 +179,16 @@ private ReactorNettyClient(Connection connection) {
178179
.subscribe();
179180
}
180181

181-
@Override
182-
public Disposable addNotificationListener(Consumer<NotificationResponse> consumer) {
183-
return this.notificationProcessor.subscribe(consumer);
184-
}
185-
186182
@Override
187183
public Mono<Void> close() {
188184
return Mono.defer(() -> {
189185

186+
if (!this.notificationProcessor.isTerminated()) {
187+
this.notificationProcessor.onComplete();
188+
}
189+
190190
drainError(EXPECTED);
191+
191192
if (this.isClosed.compareAndSet(false, true)) {
192193

193194
if (!isConnected() || this.processId == null) {
@@ -390,6 +391,16 @@ private static Mono<? extends Void> registerSslHandler(SSLConfig sslConfig, Conn
390391
return Mono.empty();
391392
}
392393

394+
@Override
395+
public Disposable addNotificationListener(Consumer<NotificationResponse> consumer) {
396+
return this.notificationProcessor.subscribe(consumer);
397+
}
398+
399+
@Override
400+
public Disposable addNotificationListener(Subscriber<NotificationResponse> consumer) {
401+
return this.notificationProcessor.subscribe(consumer::onNext, consumer::onError, consumer::onComplete, consumer::onSubscribe);
402+
}
403+
393404
@Override
394405
public ByteBufAllocator getByteBufAllocator() {
395406
return this.byteBufAllocator;
@@ -457,6 +468,10 @@ private void drainError(Supplier<? extends Throwable> supplier) {
457468
while ((receiver = this.conversations.poll()) != null) {
458469
receiver.sink.error(supplier.get());
459470
}
471+
472+
if (!this.notificationProcessor.isTerminated()) {
473+
this.notificationProcessor.onError(supplier.get());
474+
}
460475
}
461476

462477
/**

src/test/java/io/r2dbc/postgresql/PostgresNotificationIntegrationTests.java

+28
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,11 @@
1616

1717
package io.r2dbc.postgresql;
1818

19+
import io.netty.channel.Channel;
1920
import io.r2dbc.postgresql.api.Notification;
21+
import io.r2dbc.postgresql.api.PostgresqlConnection;
22+
import io.r2dbc.postgresql.util.ConnectionIntrospector;
23+
import io.r2dbc.spi.R2dbcNonTransientResourceException;
2024
import org.junit.jupiter.api.Test;
2125
import reactor.core.Disposable;
2226
import reactor.test.StepVerifier;
@@ -55,4 +59,28 @@ void shouldReceivePubSubNotifications() throws Exception {
5559

5660
listener.dispose();
5761
}
62+
63+
@Test
64+
void listenShouldCompleteOnConnectionClose() {
65+
66+
PostgresqlConnection connection = this.connectionFactory.create().block();
67+
68+
connection.getNotifications().as(StepVerifier::create).expectSubscription()
69+
.then(() -> connection.close().subscribe())
70+
.verifyComplete();
71+
}
72+
73+
@Test
74+
void listenShouldFailOnConnectionDisconnected() {
75+
76+
PostgresqlConnection connection = this.connectionFactory.create().block();
77+
78+
connection.getNotifications().as(StepVerifier::create).expectSubscription()
79+
.then(() -> {
80+
81+
Channel channel = ConnectionIntrospector.of(connection).getChannel();
82+
channel.close();
83+
})
84+
.verifyError(R2dbcNonTransientResourceException.class);
85+
}
5886
}

src/test/java/io/r2dbc/postgresql/client/TestClient.java

+6
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import io.r2dbc.postgresql.util.Assert;
2424
import io.r2dbc.postgresql.util.TestByteBufAllocator;
2525
import org.reactivestreams.Publisher;
26+
import org.reactivestreams.Subscriber;
2627
import reactor.core.Disposable;
2728
import reactor.core.publisher.EmitterProcessor;
2829
import reactor.core.publisher.Flux;
@@ -163,6 +164,11 @@ public Disposable addNotificationListener(Consumer<NotificationResponse> consume
163164
return this.notificationProcessor.subscribe(consumer);
164165
}
165166

167+
@Override
168+
public Disposable addNotificationListener(Subscriber<NotificationResponse> consumer) {
169+
return this.notificationProcessor.subscribe(consumer::onNext, consumer::onError, consumer::onComplete, consumer::onSubscribe);
170+
}
171+
166172
public void notify(NotificationResponse notification) {
167173
this.notificationProcessor.onNext(notification);
168174
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
* Copyright 2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.r2dbc.postgresql.util;
18+
19+
import io.netty.channel.Channel;
20+
import io.r2dbc.postgresql.api.PostgresqlConnection;
21+
import io.r2dbc.postgresql.client.ReactorNettyClient;
22+
import org.springframework.beans.DirectFieldAccessor;
23+
import reactor.netty.Connection;
24+
25+
/**
26+
* Utility to introspect a {@link PostgresqlConnection}.
27+
*/
28+
public final class ConnectionIntrospector {
29+
30+
private final PostgresqlConnection connection;
31+
32+
private ConnectionIntrospector(PostgresqlConnection connection) {
33+
this.connection = connection;
34+
}
35+
36+
/**
37+
* Create a new instance.
38+
*
39+
* @param connection
40+
* @return
41+
*/
42+
public static ConnectionIntrospector of(PostgresqlConnection connection) {
43+
return new ConnectionIntrospector(connection);
44+
}
45+
46+
/**
47+
* Return the transport {@link Channel}.
48+
*
49+
* @return the transport {@link Channel}.
50+
*/
51+
public Channel getChannel() {
52+
53+
DirectFieldAccessor accessor = new DirectFieldAccessor(getClient());
54+
Connection connection = (Connection) accessor.getPropertyValue("connection");
55+
56+
return connection.channel();
57+
}
58+
59+
/**
60+
* Return the underlying {@link ReactorNettyClient}.
61+
*
62+
* @return the underlying {@link ReactorNettyClient}.
63+
*/
64+
public ReactorNettyClient getClient() {
65+
66+
DirectFieldAccessor accessor = new DirectFieldAccessor(this.connection);
67+
Object value = accessor.getPropertyValue("client");
68+
69+
Assert.requireType(value, ReactorNettyClient.class, "Client must be of type ReactorNettyClient. Was: " + value.getClass().getName());
70+
71+
return (ReactorNettyClient) value;
72+
}
73+
}

0 commit comments

Comments
 (0)