diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/inbound/InboundMessageDispatcher.java b/driver/src/main/java/org/neo4j/driver/internal/async/inbound/InboundMessageDispatcher.java index ba84b7853e..f4d6879ffb 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/inbound/InboundMessageDispatcher.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/inbound/InboundMessageDispatcher.java @@ -147,6 +147,10 @@ public void handleIgnoredMessage() { handler.onFailure(error); } + public HandlerHook getBeforeLastHandlerHook() { + return this.beforeLastHandlerHook; + } + public void handleChannelInactive(Throwable cause) { // report issue if the connection has not been terminated as a result of a graceful shutdown request from its // parent pool diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/pool/NettyChannelHealthChecker.java b/driver/src/main/java/org/neo4j/driver/internal/async/pool/NettyChannelHealthChecker.java index b5e4c5ef34..ea80227ee0 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/pool/NettyChannelHealthChecker.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/pool/NettyChannelHealthChecker.java @@ -27,11 +27,15 @@ import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Promise; import java.util.Optional; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import org.neo4j.driver.Logger; import org.neo4j.driver.Logging; import org.neo4j.driver.exceptions.AuthorizationExpiredException; import org.neo4j.driver.internal.async.connection.AuthorizationStateListener; +import org.neo4j.driver.internal.async.connection.ChannelAttributes; +import org.neo4j.driver.internal.async.inbound.ConnectionReadTimeoutHandler; +import org.neo4j.driver.internal.async.inbound.InboundMessageDispatcher; import org.neo4j.driver.internal.handlers.PingResponseHandler; import org.neo4j.driver.internal.messaging.request.ResetMessage; import org.neo4j.driver.internal.util.Clock; @@ -117,8 +121,24 @@ private boolean hasBeenIdleForTooLong(Channel channel) { private Future ping(Channel channel) { Promise result = channel.eventLoop().newPromise(); - messageDispatcher(channel).enqueue(new PingResponseHandler(result, channel, logging)); + InboundMessageDispatcher messageDispatcher = messageDispatcher(channel); + messageDispatcher.enqueue(new PingResponseHandler(result, channel, logging)); + attachConnectionReadTimeoutHandler(channel, messageDispatcher); channel.writeAndFlush(ResetMessage.RESET, channel.voidPromise()); return result; } + + private void attachConnectionReadTimeoutHandler(Channel channel, InboundMessageDispatcher messageDispatcher) { + ChannelAttributes.connectionReadTimeout(channel).ifPresent(connectionReadTimeout -> { + ConnectionReadTimeoutHandler connectionReadTimeoutHandler = + new ConnectionReadTimeoutHandler(connectionReadTimeout, TimeUnit.SECONDS); + channel.pipeline().addFirst(connectionReadTimeoutHandler); + log.debug("Added ConnectionReadTimeoutHandler"); + messageDispatcher.setBeforeLastHandlerHook((messageType) -> { + channel.pipeline().remove(connectionReadTimeoutHandler); + messageDispatcher.setBeforeLastHandlerHook(null); + log.debug("Removed ConnectionReadTimeoutHandler"); + }); + }); + } } diff --git a/driver/src/test/java/org/neo4j/driver/internal/async/pool/NettyChannelHealthCheckerTest.java b/driver/src/test/java/org/neo4j/driver/internal/async/pool/NettyChannelHealthCheckerTest.java index 4c1855c20b..a23692a818 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/async/pool/NettyChannelHealthCheckerTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/async/pool/NettyChannelHealthCheckerTest.java @@ -22,6 +22,10 @@ import static org.hamcrest.junit.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.neo4j.driver.internal.async.connection.ChannelAttributes.setConnectionReadTimeout; import static org.neo4j.driver.internal.async.connection.ChannelAttributes.setCreationTimestamp; import static org.neo4j.driver.internal.async.connection.ChannelAttributes.setLastUsedTimestamp; import static org.neo4j.driver.internal.async.connection.ChannelAttributes.setMessageDispatcher; @@ -34,6 +38,7 @@ import static org.neo4j.driver.util.TestUtil.await; import io.netty.channel.Channel; +import io.netty.channel.ChannelHandler; import io.netty.channel.embedded.EmbeddedChannel; import io.netty.util.concurrent.Future; import java.util.Collections; @@ -46,6 +51,7 @@ import org.junit.jupiter.api.Test; import org.neo4j.driver.Value; import org.neo4j.driver.exceptions.AuthorizationExpiredException; +import org.neo4j.driver.internal.async.inbound.ConnectionReadTimeoutHandler; import org.neo4j.driver.internal.async.inbound.InboundMessageDispatcher; import org.neo4j.driver.internal.messaging.request.ResetMessage; import org.neo4j.driver.internal.util.Clock; @@ -155,6 +161,65 @@ void shouldKeepIdleConnectionWhenPingSucceeds() { testPing(true); } + @Test + void shouldHandlePingWithConnectionReceiveTimeout() { + int idleTimeBeforeConnectionTest = 1000; + long connectionReadTimeout = 60L; + PoolSettings settings = new PoolSettings( + DEFAULT_MAX_CONNECTION_POOL_SIZE, + DEFAULT_CONNECTION_ACQUISITION_TIMEOUT, + NOT_CONFIGURED, + idleTimeBeforeConnectionTest); + Clock clock = Clock.SYSTEM; + NettyChannelHealthChecker healthChecker = newHealthChecker(settings, clock); + + setCreationTimestamp(channel, clock.millis()); + setConnectionReadTimeout(channel, connectionReadTimeout); + setLastUsedTimestamp(channel, clock.millis() - idleTimeBeforeConnectionTest * 2); + + Future healthy = healthChecker.isHealthy(channel); + channel.runPendingTasks(); + + ChannelHandler firstElementOnPipeline = channel.pipeline().first(); + assertInstanceOf(ConnectionReadTimeoutHandler.class, firstElementOnPipeline); + assertNotNull(dispatcher.getBeforeLastHandlerHook()); + ConnectionReadTimeoutHandler readTimeoutHandler = (ConnectionReadTimeoutHandler) firstElementOnPipeline; + assertEquals(connectionReadTimeout * 1000L, readTimeoutHandler.getReaderIdleTimeInMillis()); + assertEquals(ResetMessage.RESET, single(channel.outboundMessages())); + assertFalse(healthy.isDone()); + + dispatcher.handleSuccessMessage(Collections.emptyMap()); + assertThat(await(healthy), is(true)); + assertNull(channel.pipeline().first()); + assertNull(dispatcher.getBeforeLastHandlerHook()); + } + + @Test + void shouldHandlePingWithoutConnectionReceiveTimeout() { + int idleTimeBeforeConnectionTest = 1000; + PoolSettings settings = new PoolSettings( + DEFAULT_MAX_CONNECTION_POOL_SIZE, + DEFAULT_CONNECTION_ACQUISITION_TIMEOUT, + NOT_CONFIGURED, + idleTimeBeforeConnectionTest); + Clock clock = Clock.SYSTEM; + NettyChannelHealthChecker healthChecker = newHealthChecker(settings, clock); + + setCreationTimestamp(channel, clock.millis()); + setLastUsedTimestamp(channel, clock.millis() - idleTimeBeforeConnectionTest * 2); + + Future healthy = healthChecker.isHealthy(channel); + channel.runPendingTasks(); + + assertNull(channel.pipeline().first()); + assertEquals(ResetMessage.RESET, single(channel.outboundMessages())); + assertFalse(healthy.isDone()); + + dispatcher.handleSuccessMessage(Collections.emptyMap()); + assertThat(await(healthy), is(true)); + assertNull(channel.pipeline().first()); + } + @Test void shouldDropIdleConnectionWhenPingFails() { testPing(false);