Skip to content

Add read timeout support to BoltConnection #1612

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,11 @@ public CompletionStage<Void> close() {
return close.exceptionally(ignored -> null);
}

@Override
public CompletionStage<Void> setReadTimeout(Duration duration) {
return executeInEventLoop(() -> connection.setReadTimeout(duration));
}

@Override
public BoltConnectionState state() {
var state = stateRef.get();
Expand Down Expand Up @@ -528,6 +533,11 @@ public boolean serverSideRoutingEnabled() {
return serverSideRouting;
}

@Override
public Optional<Duration> defaultReadTimeout() {
return connection.defaultReadTimeoutMillis();
}

private CompletionStage<Void> executeInEventLoop(Runnable runnable) {
var executeFuture = new CompletableFuture<Void>();
Runnable stageCompletingRunnable = () -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.EventLoop;
import java.time.Duration;
import java.util.Collections;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -56,7 +58,8 @@ public class NetworkConnection implements Connection {
private final boolean ssrEnabled;
private final BoltProtocol protocol;

private final Long connectionReadTimeout;
private final Duration defaultReadTimeout;
private Duration readTimeout;

private ChannelHandler connectionReadTimeoutHandler;

Expand All @@ -70,8 +73,10 @@ public NetworkConnection(Channel channel, LoggingProvider logging) {
this.telemetryEnabled = ChannelAttributes.telemetryEnabled(channel);
this.ssrEnabled = ChannelAttributes.ssrEnabled(channel);
this.protocol = BoltProtocol.forChannel(channel);
this.connectionReadTimeout =
ChannelAttributes.connectionReadTimeout(channel).orElse(null);
this.defaultReadTimeout = ChannelAttributes.connectionReadTimeout(channel)
.map(Duration::ofSeconds)
.orElse(null);
this.readTimeout = defaultReadTimeout;
}

@Override
Expand Down Expand Up @@ -179,6 +184,25 @@ public EventLoop eventLoop() {
return channel.eventLoop();
}

@Override
public Optional<Duration> defaultReadTimeoutMillis() {
return Optional.ofNullable(defaultReadTimeout);
}

@Override
public void setReadTimeout(Duration duration) {
if (!channel.eventLoop().inEventLoop()) {
throw new IllegalStateException("This method may only be called in the EventLoop");
}

if (duration != null && duration.toMillis() > 0) {
// only values greater than zero milliseconds are supported
this.readTimeout = duration;
} else {
this.readTimeout = this.defaultReadTimeout;
}
}

private CompletionStage<Void> writeMessageInEventLoop(Message message, ResponseHandler handler) {
var future = new CompletableFuture<Void>();
Runnable runnable = () -> {
Expand Down Expand Up @@ -215,8 +239,9 @@ private void registerConnectionReadTimeout(Channel channel) {
throw new IllegalStateException("This method may only be called in the EventLoop");
}

if (connectionReadTimeout != null && connectionReadTimeoutHandler == null) {
connectionReadTimeoutHandler = new ConnectionReadTimeoutHandler(connectionReadTimeout, TimeUnit.SECONDS);
if (this.readTimeout != null && connectionReadTimeoutHandler == null) {
connectionReadTimeoutHandler =
new ConnectionReadTimeoutHandler(readTimeout.toMillis(), TimeUnit.MILLISECONDS);
channel.pipeline().addFirst(connectionReadTimeoutHandler);
log.log(System.Logger.Level.DEBUG, "Added ConnectionReadTimeoutHandler");
messageDispatcher.setBeforeLastHandlerHook(() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package org.neo4j.driver.internal.bolt.basicimpl.impl.spi;

import io.netty.channel.EventLoop;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.CompletionStage;
import org.neo4j.driver.internal.bolt.api.BoltServerAddress;
import org.neo4j.driver.internal.bolt.basicimpl.impl.messaging.BoltProtocol;
Expand Down Expand Up @@ -48,4 +50,8 @@ public interface Connection {
CompletionStage<Void> close();

EventLoop eventLoop();

Optional<Duration> defaultReadTimeoutMillis();

void setReadTimeout(Duration duration);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.time.Duration;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
Expand Down Expand Up @@ -316,6 +317,11 @@ public CompletionStage<Void> close() {
return closeFuture;
}

@Override
public CompletionStage<Void> setReadTimeout(Duration duration) {
return delegate.setReadTimeout(duration);
}

@Override
public BoltConnectionState state() {
return delegate.state();
Expand Down Expand Up @@ -351,6 +357,11 @@ public boolean serverSideRoutingEnabled() {
return delegate.serverSideRoutingEnabled();
}

@Override
public Optional<Duration> defaultReadTimeout() {
return delegate.defaultReadTimeout();
}

// internal use only
public BoltConnection delegate() {
return delegate;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.time.Duration;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletionStage;
import org.neo4j.driver.internal.bolt.api.AccessMode;
Expand Down Expand Up @@ -273,6 +274,11 @@ public CompletionStage<Void> close() {
return delegate.close();
}

@Override
public CompletionStage<Void> setReadTimeout(Duration duration) {
return delegate.setReadTimeout(duration);
}

@Override
public BoltConnectionState state() {
return delegate.state();
Expand Down Expand Up @@ -308,6 +314,11 @@ public boolean serverSideRoutingEnabled() {
return delegate.serverSideRoutingEnabled();
}

@Override
public Optional<Duration> defaultReadTimeout() {
return delegate.defaultReadTimeout();
}

private Throwable handledError(Throwable receivedError, boolean notifyHandler) {
var error = FutureUtil.completionExceptionCause(receivedError);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.time.Duration;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletionStage;
import org.neo4j.driver.internal.bolt.api.values.Value;
Expand Down Expand Up @@ -75,6 +76,10 @@ CompletionStage<BoltConnection> runInAutoCommitTransaction(

CompletionStage<Void> close();

// ----- STATE UPDATES -----

CompletionStage<Void> setReadTimeout(Duration duration);

// ----- MUTABLE DATA -----

BoltConnectionState state();
Expand All @@ -92,4 +97,6 @@ CompletionStage<BoltConnection> runInAutoCommitTransaction(
boolean telemetrySupported();

boolean serverSideRoutingEnabled();

Optional<Duration> defaultReadTimeout();
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.time.Duration;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletionStage;
import org.neo4j.driver.internal.bolt.api.AccessMode;
Expand Down Expand Up @@ -168,6 +169,11 @@ public CompletionStage<Void> close() {
return delegate.close().whenComplete((ignored, throwable) -> boltConnectionListener.onClose(this));
}

@Override
public CompletionStage<Void> setReadTimeout(Duration duration) {
return delegate.setReadTimeout(duration);
}

@Override
public BoltConnectionState state() {
return delegate.state();
Expand Down Expand Up @@ -202,4 +208,9 @@ public boolean telemetrySupported() {
public boolean serverSideRoutingEnabled() {
return delegate.serverSideRoutingEnabled();
}

@Override
public Optional<Duration> defaultReadTimeout() {
return delegate.defaultReadTimeout();
}
}