Skip to content

Terminate Notification stream on disconnect #227

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jan 28, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 29 additions & 8 deletions src/main/java/io/r2dbc/postgresql/PostgresqlConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@
import io.r2dbc.postgresql.client.SimpleQueryMessageFlow;
import io.r2dbc.postgresql.client.TransactionStatus;
import io.r2dbc.postgresql.codec.Codecs;
import io.r2dbc.postgresql.message.backend.NotificationResponse;
import io.r2dbc.postgresql.util.Assert;
import io.r2dbc.postgresql.util.Operators;
import io.r2dbc.spi.Connection;
import io.r2dbc.spi.IsolationLevel;
import io.r2dbc.spi.ValidationDepth;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
Expand Down Expand Up @@ -246,17 +248,17 @@ public Mono<Void> setAutoCommit(boolean autoCommit) {

return useTransactionStatus(transactionStatus -> {

logger.debug(String.format("Setting auto-commit mode to [%s]", autoCommit));
this.logger.debug(String.format("Setting auto-commit mode to [%s]", autoCommit));

if (isAutoCommit()) {
if (!autoCommit) {
logger.debug("Beginning transaction");
this.logger.debug("Beginning transaction");
return beginTransaction();
}
} else {

if (autoCommit) {
logger.debug("Committing pending transactions");
this.logger.debug("Committing pending transactions");
return commitTransaction();
}
}
Expand Down Expand Up @@ -314,7 +316,7 @@ public void onNext(Integer integer) {

@Override
public void onError(Throwable t) {
logger.debug("Validation failed", t);
PostgresqlConnection.this.logger.debug("Validation failed", t);
sink.success(false);
}

Expand Down Expand Up @@ -360,7 +362,7 @@ static class NotificationAdapter {

private final DirectProcessor<Notification> processor = DirectProcessor.create();

private final FluxSink<Notification> sink = processor.sink();
private final FluxSink<Notification> sink = this.processor.sink();

@Nullable
private volatile Disposable subscription = null;
Expand All @@ -374,13 +376,32 @@ void dispose() {

void register(Client client) {

this.subscription = client.addNotificationListener(notificationResponse -> {
sink.next(new NotificationResponseWrapper(notificationResponse));
this.subscription = client.addNotificationListener(new Subscriber<NotificationResponse>() {

@Override
public void onSubscribe(Subscription subscription) {
subscription.request(Long.MAX_VALUE);
}

@Override
public void onNext(NotificationResponse notificationResponse) {
NotificationAdapter.this.sink.next(new NotificationResponseWrapper(notificationResponse));
}

@Override
public void onError(Throwable throwable) {
NotificationAdapter.this.sink.error(throwable);
}

@Override
public void onComplete() {
NotificationAdapter.this.sink.complete();
}
});
}

Flux<Notification> getEvents() {
return processor;
return this.processor;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@

import io.r2dbc.spi.Connection;
import io.r2dbc.spi.IsolationLevel;
import io.r2dbc.spi.R2dbcNonTransientResourceException;
import io.r2dbc.spi.ValidationDepth;
import org.reactivestreams.Subscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

Expand Down Expand Up @@ -64,8 +66,9 @@ public interface PostgresqlConnection extends Connection {
PostgresqlStatement createStatement(String sql);

/**
* Return a {@link Flux} of {@link Notification} received from {@code LISTEN} registrations.
* The stream is a hot stream producing messages as they are received.
* Return a {@link Flux} of {@link Notification} received from {@code LISTEN} registrations. The stream is a hot stream producing messages as they are received. Notifications received by this
* connection are published as they are received. When the client gets {@link #close() closed}, the subscription {@link Subscriber#onComplete() completes normally}. Otherwise (transport
* connection disconnected unintentionally) with an {@link R2dbcNonTransientResourceException error}.
*
* @return a hot {@link Flux} of {@link Notification Notifications}.
*/
Expand Down
16 changes: 15 additions & 1 deletion src/main/java/io/r2dbc/postgresql/client/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
import io.r2dbc.postgresql.message.backend.NotificationResponse;
import io.r2dbc.postgresql.message.backend.ReadyForQuery;
import io.r2dbc.postgresql.message.frontend.FrontendMessage;
import io.r2dbc.spi.R2dbcNonTransientResourceException;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
Expand All @@ -36,14 +38,26 @@
public interface Client {

/**
* Add a consumer of notification messages.
* Add a consumer of notification messages. Notifications received by this connection are sent to the {@link Consumer notification consumer}. Note that connection errors and events such as
* disconnects are not visible to the {@link Consumer notification consumer}.
*
* @param consumer the consumer of notification messages
* @return a new {@link Disposable} that can be used to cancel the underlying subscription.
* @throws IllegalArgumentException if {@code consumer} is {@code null}
*/
Disposable addNotificationListener(Consumer<NotificationResponse> consumer);

/**
* Add a consumer of notification messages. Notifications received by this connection are sent to the {@link Subscriber notification listener}. When the client gets {@link #close() closed}, the
* subscription {@link Subscriber#onComplete() completes normally}. Otherwise (transport connection disconnected unintentionally) with an {@link R2dbcNonTransientResourceException error}.
*
* @param consumer the consumer of notification messages
* @return a new {@link Disposable} that can be used to cancel the underlying subscription.
* @throws IllegalArgumentException if {@code consumer} is {@code null}
* @since 0.8.1
*/
Disposable addNotificationListener(Subscriber<NotificationResponse> consumer);

/**
* Release any resources held by the {@link Client}.
*
Expand Down
25 changes: 20 additions & 5 deletions src/main/java/io/r2dbc/postgresql/client/ReactorNettyClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import io.r2dbc.postgresql.util.Assert;
import io.r2dbc.spi.R2dbcNonTransientResourceException;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import reactor.core.Disposable;
import reactor.core.publisher.DirectProcessor;
import reactor.core.publisher.EmitterProcessor;
Expand Down Expand Up @@ -178,16 +179,16 @@ private ReactorNettyClient(Connection connection) {
.subscribe();
}

@Override
public Disposable addNotificationListener(Consumer<NotificationResponse> consumer) {
return this.notificationProcessor.subscribe(consumer);
}

@Override
public Mono<Void> close() {
return Mono.defer(() -> {

if (!this.notificationProcessor.isTerminated()) {
this.notificationProcessor.onComplete();
}

drainError(EXPECTED);

if (this.isClosed.compareAndSet(false, true)) {

if (!isConnected() || this.processId == null) {
Expand Down Expand Up @@ -390,6 +391,16 @@ private static Mono<? extends Void> registerSslHandler(SSLConfig sslConfig, Conn
return Mono.empty();
}

@Override
public Disposable addNotificationListener(Consumer<NotificationResponse> consumer) {
return this.notificationProcessor.subscribe(consumer);
}

@Override
public Disposable addNotificationListener(Subscriber<NotificationResponse> consumer) {
return this.notificationProcessor.subscribe(consumer::onNext, consumer::onError, consumer::onComplete, consumer::onSubscribe);
}

@Override
public ByteBufAllocator getByteBufAllocator() {
return this.byteBufAllocator;
Expand Down Expand Up @@ -457,6 +468,10 @@ private void drainError(Supplier<? extends Throwable> supplier) {
while ((receiver = this.conversations.poll()) != null) {
receiver.sink.error(supplier.get());
}

if (!this.notificationProcessor.isTerminated()) {
this.notificationProcessor.onError(supplier.get());
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,11 @@

package io.r2dbc.postgresql;

import io.netty.channel.Channel;
import io.r2dbc.postgresql.api.Notification;
import io.r2dbc.postgresql.api.PostgresqlConnection;
import io.r2dbc.postgresql.util.ConnectionIntrospector;
import io.r2dbc.spi.R2dbcNonTransientResourceException;
import org.junit.jupiter.api.Test;
import reactor.core.Disposable;
import reactor.test.StepVerifier;
Expand Down Expand Up @@ -55,4 +59,28 @@ void shouldReceivePubSubNotifications() throws Exception {

listener.dispose();
}

@Test
void listenShouldCompleteOnConnectionClose() {

PostgresqlConnection connection = this.connectionFactory.create().block();

connection.getNotifications().as(StepVerifier::create).expectSubscription()
.then(() -> connection.close().subscribe())
.verifyComplete();
}

@Test
void listenShouldFailOnConnectionDisconnected() {

PostgresqlConnection connection = this.connectionFactory.create().block();

connection.getNotifications().as(StepVerifier::create).expectSubscription()
.then(() -> {

Channel channel = ConnectionIntrospector.of(connection).getChannel();
channel.close();
})
.verifyError(R2dbcNonTransientResourceException.class);
}
}
6 changes: 6 additions & 0 deletions src/test/java/io/r2dbc/postgresql/client/TestClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.r2dbc.postgresql.util.Assert;
import io.r2dbc.postgresql.util.TestByteBufAllocator;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import reactor.core.Disposable;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
Expand Down Expand Up @@ -163,6 +164,11 @@ public Disposable addNotificationListener(Consumer<NotificationResponse> consume
return this.notificationProcessor.subscribe(consumer);
}

@Override
public Disposable addNotificationListener(Subscriber<NotificationResponse> consumer) {
return this.notificationProcessor.subscribe(consumer::onNext, consumer::onError, consumer::onComplete, consumer::onSubscribe);
}

public void notify(NotificationResponse notification) {
this.notificationProcessor.onNext(notification);
}
Expand Down
73 changes: 73 additions & 0 deletions src/test/java/io/r2dbc/postgresql/util/ConnectionIntrospector.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Copyright 2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.r2dbc.postgresql.util;

import io.netty.channel.Channel;
import io.r2dbc.postgresql.api.PostgresqlConnection;
import io.r2dbc.postgresql.client.ReactorNettyClient;
import org.springframework.beans.DirectFieldAccessor;
import reactor.netty.Connection;

/**
* Utility to introspect a {@link PostgresqlConnection}.
*/
public final class ConnectionIntrospector {

private final PostgresqlConnection connection;

private ConnectionIntrospector(PostgresqlConnection connection) {
this.connection = connection;
}

/**
* Create a new instance.
*
* @param connection
* @return
*/
public static ConnectionIntrospector of(PostgresqlConnection connection) {
return new ConnectionIntrospector(connection);
}

/**
* Return the transport {@link Channel}.
*
* @return the transport {@link Channel}.
*/
public Channel getChannel() {

DirectFieldAccessor accessor = new DirectFieldAccessor(getClient());
Connection connection = (Connection) accessor.getPropertyValue("connection");

return connection.channel();
}

/**
* Return the underlying {@link ReactorNettyClient}.
*
* @return the underlying {@link ReactorNettyClient}.
*/
public ReactorNettyClient getClient() {

DirectFieldAccessor accessor = new DirectFieldAccessor(this.connection);
Object value = accessor.getPropertyValue("client");

Assert.requireType(value, ReactorNettyClient.class, "Client must be of type ReactorNettyClient. Was: " + value.getClass().getName());

return (ReactorNettyClient) value;
}
}