diff --git a/src/main/java/io/r2dbc/postgresql/PostgresqlConnection.java b/src/main/java/io/r2dbc/postgresql/PostgresqlConnection.java index 3a18c8b7..ef753b95 100644 --- a/src/main/java/io/r2dbc/postgresql/PostgresqlConnection.java +++ b/src/main/java/io/r2dbc/postgresql/PostgresqlConnection.java @@ -24,12 +24,14 @@ import io.r2dbc.postgresql.client.SimpleQueryMessageFlow; import io.r2dbc.postgresql.client.TransactionStatus; import io.r2dbc.postgresql.codec.Codecs; +import io.r2dbc.postgresql.message.backend.NotificationResponse; import io.r2dbc.postgresql.util.Assert; import io.r2dbc.postgresql.util.Operators; import io.r2dbc.spi.Connection; import io.r2dbc.spi.IsolationLevel; import io.r2dbc.spi.ValidationDepth; import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; import reactor.core.CoreSubscriber; import reactor.core.Disposable; @@ -246,17 +248,17 @@ public Mono setAutoCommit(boolean autoCommit) { return useTransactionStatus(transactionStatus -> { - logger.debug(String.format("Setting auto-commit mode to [%s]", autoCommit)); + this.logger.debug(String.format("Setting auto-commit mode to [%s]", autoCommit)); if (isAutoCommit()) { if (!autoCommit) { - logger.debug("Beginning transaction"); + this.logger.debug("Beginning transaction"); return beginTransaction(); } } else { if (autoCommit) { - logger.debug("Committing pending transactions"); + this.logger.debug("Committing pending transactions"); return commitTransaction(); } } @@ -314,7 +316,7 @@ public void onNext(Integer integer) { @Override public void onError(Throwable t) { - logger.debug("Validation failed", t); + PostgresqlConnection.this.logger.debug("Validation failed", t); sink.success(false); } @@ -360,7 +362,7 @@ static class NotificationAdapter { private final DirectProcessor processor = DirectProcessor.create(); - private final FluxSink sink = processor.sink(); + private final FluxSink sink = this.processor.sink(); @Nullable private volatile Disposable subscription = null; @@ -374,13 +376,32 @@ void dispose() { void register(Client client) { - this.subscription = client.addNotificationListener(notificationResponse -> { - sink.next(new NotificationResponseWrapper(notificationResponse)); + this.subscription = client.addNotificationListener(new Subscriber() { + + @Override + public void onSubscribe(Subscription subscription) { + subscription.request(Long.MAX_VALUE); + } + + @Override + public void onNext(NotificationResponse notificationResponse) { + NotificationAdapter.this.sink.next(new NotificationResponseWrapper(notificationResponse)); + } + + @Override + public void onError(Throwable throwable) { + NotificationAdapter.this.sink.error(throwable); + } + + @Override + public void onComplete() { + NotificationAdapter.this.sink.complete(); + } }); } Flux getEvents() { - return processor; + return this.processor; } } diff --git a/src/main/java/io/r2dbc/postgresql/api/PostgresqlConnection.java b/src/main/java/io/r2dbc/postgresql/api/PostgresqlConnection.java index 731b7108..663357db 100644 --- a/src/main/java/io/r2dbc/postgresql/api/PostgresqlConnection.java +++ b/src/main/java/io/r2dbc/postgresql/api/PostgresqlConnection.java @@ -18,7 +18,9 @@ import io.r2dbc.spi.Connection; import io.r2dbc.spi.IsolationLevel; +import io.r2dbc.spi.R2dbcNonTransientResourceException; import io.r2dbc.spi.ValidationDepth; +import org.reactivestreams.Subscriber; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -64,8 +66,9 @@ public interface PostgresqlConnection extends Connection { PostgresqlStatement createStatement(String sql); /** - * Return a {@link Flux} of {@link Notification} received from {@code LISTEN} registrations. - * The stream is a hot stream producing messages as they are received. + * Return a {@link Flux} of {@link Notification} received from {@code LISTEN} registrations. The stream is a hot stream producing messages as they are received. Notifications received by this + * connection are published as they are received. When the client gets {@link #close() closed}, the subscription {@link Subscriber#onComplete() completes normally}. Otherwise (transport + * connection disconnected unintentionally) with an {@link R2dbcNonTransientResourceException error}. * * @return a hot {@link Flux} of {@link Notification Notifications}. */ diff --git a/src/main/java/io/r2dbc/postgresql/client/Client.java b/src/main/java/io/r2dbc/postgresql/client/Client.java index 76d8fcc8..a2be5cf5 100644 --- a/src/main/java/io/r2dbc/postgresql/client/Client.java +++ b/src/main/java/io/r2dbc/postgresql/client/Client.java @@ -21,7 +21,9 @@ import io.r2dbc.postgresql.message.backend.NotificationResponse; import io.r2dbc.postgresql.message.backend.ReadyForQuery; import io.r2dbc.postgresql.message.frontend.FrontendMessage; +import io.r2dbc.spi.R2dbcNonTransientResourceException; import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; import reactor.core.Disposable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -36,7 +38,8 @@ public interface Client { /** - * Add a consumer of notification messages. + * Add a consumer of notification messages. Notifications received by this connection are sent to the {@link Consumer notification consumer}. Note that connection errors and events such as + * disconnects are not visible to the {@link Consumer notification consumer}. * * @param consumer the consumer of notification messages * @return a new {@link Disposable} that can be used to cancel the underlying subscription. @@ -44,6 +47,17 @@ public interface Client { */ Disposable addNotificationListener(Consumer consumer); + /** + * Add a consumer of notification messages. Notifications received by this connection are sent to the {@link Subscriber notification listener}. When the client gets {@link #close() closed}, the + * subscription {@link Subscriber#onComplete() completes normally}. Otherwise (transport connection disconnected unintentionally) with an {@link R2dbcNonTransientResourceException error}. + * + * @param consumer the consumer of notification messages + * @return a new {@link Disposable} that can be used to cancel the underlying subscription. + * @throws IllegalArgumentException if {@code consumer} is {@code null} + * @since 0.8.1 + */ + Disposable addNotificationListener(Subscriber consumer); + /** * Release any resources held by the {@link Client}. * diff --git a/src/main/java/io/r2dbc/postgresql/client/ReactorNettyClient.java b/src/main/java/io/r2dbc/postgresql/client/ReactorNettyClient.java index 27b85c94..be22c04f 100644 --- a/src/main/java/io/r2dbc/postgresql/client/ReactorNettyClient.java +++ b/src/main/java/io/r2dbc/postgresql/client/ReactorNettyClient.java @@ -46,6 +46,7 @@ import io.r2dbc.postgresql.util.Assert; import io.r2dbc.spi.R2dbcNonTransientResourceException; import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; import reactor.core.Disposable; import reactor.core.publisher.DirectProcessor; import reactor.core.publisher.EmitterProcessor; @@ -178,16 +179,16 @@ private ReactorNettyClient(Connection connection) { .subscribe(); } - @Override - public Disposable addNotificationListener(Consumer consumer) { - return this.notificationProcessor.subscribe(consumer); - } - @Override public Mono close() { return Mono.defer(() -> { + if (!this.notificationProcessor.isTerminated()) { + this.notificationProcessor.onComplete(); + } + drainError(EXPECTED); + if (this.isClosed.compareAndSet(false, true)) { if (!isConnected() || this.processId == null) { @@ -390,6 +391,16 @@ private static Mono registerSslHandler(SSLConfig sslConfig, Conn return Mono.empty(); } + @Override + public Disposable addNotificationListener(Consumer consumer) { + return this.notificationProcessor.subscribe(consumer); + } + + @Override + public Disposable addNotificationListener(Subscriber consumer) { + return this.notificationProcessor.subscribe(consumer::onNext, consumer::onError, consumer::onComplete, consumer::onSubscribe); + } + @Override public ByteBufAllocator getByteBufAllocator() { return this.byteBufAllocator; @@ -457,6 +468,10 @@ private void drainError(Supplier supplier) { while ((receiver = this.conversations.poll()) != null) { receiver.sink.error(supplier.get()); } + + if (!this.notificationProcessor.isTerminated()) { + this.notificationProcessor.onError(supplier.get()); + } } /** diff --git a/src/test/java/io/r2dbc/postgresql/PostgresNotificationIntegrationTests.java b/src/test/java/io/r2dbc/postgresql/PostgresNotificationIntegrationTests.java index 6e87cd4b..eda9040d 100644 --- a/src/test/java/io/r2dbc/postgresql/PostgresNotificationIntegrationTests.java +++ b/src/test/java/io/r2dbc/postgresql/PostgresNotificationIntegrationTests.java @@ -16,7 +16,11 @@ package io.r2dbc.postgresql; +import io.netty.channel.Channel; import io.r2dbc.postgresql.api.Notification; +import io.r2dbc.postgresql.api.PostgresqlConnection; +import io.r2dbc.postgresql.util.ConnectionIntrospector; +import io.r2dbc.spi.R2dbcNonTransientResourceException; import org.junit.jupiter.api.Test; import reactor.core.Disposable; import reactor.test.StepVerifier; @@ -55,4 +59,28 @@ void shouldReceivePubSubNotifications() throws Exception { listener.dispose(); } + + @Test + void listenShouldCompleteOnConnectionClose() { + + PostgresqlConnection connection = this.connectionFactory.create().block(); + + connection.getNotifications().as(StepVerifier::create).expectSubscription() + .then(() -> connection.close().subscribe()) + .verifyComplete(); + } + + @Test + void listenShouldFailOnConnectionDisconnected() { + + PostgresqlConnection connection = this.connectionFactory.create().block(); + + connection.getNotifications().as(StepVerifier::create).expectSubscription() + .then(() -> { + + Channel channel = ConnectionIntrospector.of(connection).getChannel(); + channel.close(); + }) + .verifyError(R2dbcNonTransientResourceException.class); + } } diff --git a/src/test/java/io/r2dbc/postgresql/client/TestClient.java b/src/test/java/io/r2dbc/postgresql/client/TestClient.java index 748ee71b..50f161c2 100644 --- a/src/test/java/io/r2dbc/postgresql/client/TestClient.java +++ b/src/test/java/io/r2dbc/postgresql/client/TestClient.java @@ -23,6 +23,7 @@ import io.r2dbc.postgresql.util.Assert; import io.r2dbc.postgresql.util.TestByteBufAllocator; import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; import reactor.core.Disposable; import reactor.core.publisher.EmitterProcessor; import reactor.core.publisher.Flux; @@ -163,6 +164,11 @@ public Disposable addNotificationListener(Consumer consume return this.notificationProcessor.subscribe(consumer); } + @Override + public Disposable addNotificationListener(Subscriber consumer) { + return this.notificationProcessor.subscribe(consumer::onNext, consumer::onError, consumer::onComplete, consumer::onSubscribe); + } + public void notify(NotificationResponse notification) { this.notificationProcessor.onNext(notification); } diff --git a/src/test/java/io/r2dbc/postgresql/util/ConnectionIntrospector.java b/src/test/java/io/r2dbc/postgresql/util/ConnectionIntrospector.java new file mode 100644 index 00000000..a4d661d7 --- /dev/null +++ b/src/test/java/io/r2dbc/postgresql/util/ConnectionIntrospector.java @@ -0,0 +1,73 @@ +/* + * Copyright 2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.r2dbc.postgresql.util; + +import io.netty.channel.Channel; +import io.r2dbc.postgresql.api.PostgresqlConnection; +import io.r2dbc.postgresql.client.ReactorNettyClient; +import org.springframework.beans.DirectFieldAccessor; +import reactor.netty.Connection; + +/** + * Utility to introspect a {@link PostgresqlConnection}. + */ +public final class ConnectionIntrospector { + + private final PostgresqlConnection connection; + + private ConnectionIntrospector(PostgresqlConnection connection) { + this.connection = connection; + } + + /** + * Create a new instance. + * + * @param connection + * @return + */ + public static ConnectionIntrospector of(PostgresqlConnection connection) { + return new ConnectionIntrospector(connection); + } + + /** + * Return the transport {@link Channel}. + * + * @return the transport {@link Channel}. + */ + public Channel getChannel() { + + DirectFieldAccessor accessor = new DirectFieldAccessor(getClient()); + Connection connection = (Connection) accessor.getPropertyValue("connection"); + + return connection.channel(); + } + + /** + * Return the underlying {@link ReactorNettyClient}. + * + * @return the underlying {@link ReactorNettyClient}. + */ + public ReactorNettyClient getClient() { + + DirectFieldAccessor accessor = new DirectFieldAccessor(this.connection); + Object value = accessor.getPropertyValue("client"); + + Assert.requireType(value, ReactorNettyClient.class, "Client must be of type ReactorNettyClient. Was: " + value.getClass().getName()); + + return (ReactorNettyClient) value; + } +}