Skip to content

Commit ff21d99

Browse files
injectivesbigmontz
andauthored
Fix liveness checking for unresponsive connections (neo4j#1514) (neo4j#1515)
* Fix liveness checking for unresponsive connections The driver performs liveness checking for connections when remove them from the pool. When this connections are not responsive, the driver hangs waiting for the result of the `SUCCESS` message get back. This problem occurs becaue driver are not taking in consideration the connection hint `connection.recv_timeout_seconds` in the liveness check routinge. The problem is solved by add the ConnectionReadTimeoutHandler to the pipeline also in case of liveness check ping. * Remove unused imports * apply code style * Update driver/src/main/java/org/neo4j/driver/internal/async/pool/NettyChannelHealthChecker.java --------- Co-authored-by: Antonio Barcélos <[email protected]>
1 parent 8b49103 commit ff21d99

File tree

3 files changed

+90
-1
lines changed

3 files changed

+90
-1
lines changed

driver/src/main/java/org/neo4j/driver/internal/async/inbound/InboundMessageDispatcher.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,10 @@ public void handleIgnoredMessage() {
147147
handler.onFailure(error);
148148
}
149149

150+
public HandlerHook getBeforeLastHandlerHook() {
151+
return this.beforeLastHandlerHook;
152+
}
153+
150154
public void handleChannelInactive(Throwable cause) {
151155
// report issue if the connection has not been terminated as a result of a graceful shutdown request from its
152156
// parent pool

driver/src/main/java/org/neo4j/driver/internal/async/pool/NettyChannelHealthChecker.java

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,15 @@
2727
import io.netty.util.concurrent.Future;
2828
import io.netty.util.concurrent.Promise;
2929
import java.util.Optional;
30+
import java.util.concurrent.TimeUnit;
3031
import java.util.concurrent.atomic.AtomicReference;
3132
import org.neo4j.driver.Logger;
3233
import org.neo4j.driver.Logging;
3334
import org.neo4j.driver.exceptions.AuthorizationExpiredException;
3435
import org.neo4j.driver.internal.async.connection.AuthorizationStateListener;
36+
import org.neo4j.driver.internal.async.connection.ChannelAttributes;
37+
import org.neo4j.driver.internal.async.inbound.ConnectionReadTimeoutHandler;
38+
import org.neo4j.driver.internal.async.inbound.InboundMessageDispatcher;
3539
import org.neo4j.driver.internal.handlers.PingResponseHandler;
3640
import org.neo4j.driver.internal.messaging.request.ResetMessage;
3741
import org.neo4j.driver.internal.util.Clock;
@@ -117,8 +121,24 @@ private boolean hasBeenIdleForTooLong(Channel channel) {
117121

118122
private Future<Boolean> ping(Channel channel) {
119123
Promise<Boolean> result = channel.eventLoop().newPromise();
120-
messageDispatcher(channel).enqueue(new PingResponseHandler(result, channel, logging));
124+
InboundMessageDispatcher messageDispatcher = messageDispatcher(channel);
125+
messageDispatcher.enqueue(new PingResponseHandler(result, channel, logging));
126+
attachConnectionReadTimeoutHandler(channel, messageDispatcher);
121127
channel.writeAndFlush(ResetMessage.RESET, channel.voidPromise());
122128
return result;
123129
}
130+
131+
private void attachConnectionReadTimeoutHandler(Channel channel, InboundMessageDispatcher messageDispatcher) {
132+
ChannelAttributes.connectionReadTimeout(channel).ifPresent(connectionReadTimeout -> {
133+
ConnectionReadTimeoutHandler connectionReadTimeoutHandler =
134+
new ConnectionReadTimeoutHandler(connectionReadTimeout, TimeUnit.SECONDS);
135+
channel.pipeline().addFirst(connectionReadTimeoutHandler);
136+
log.debug("Added ConnectionReadTimeoutHandler");
137+
messageDispatcher.setBeforeLastHandlerHook((messageType) -> {
138+
channel.pipeline().remove(connectionReadTimeoutHandler);
139+
messageDispatcher.setBeforeLastHandlerHook(null);
140+
log.debug("Removed ConnectionReadTimeoutHandler");
141+
});
142+
});
143+
}
124144
}

driver/src/test/java/org/neo4j/driver/internal/async/pool/NettyChannelHealthCheckerTest.java

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,10 @@
2222
import static org.hamcrest.junit.MatcherAssert.assertThat;
2323
import static org.junit.jupiter.api.Assertions.assertEquals;
2424
import static org.junit.jupiter.api.Assertions.assertFalse;
25+
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
26+
import static org.junit.jupiter.api.Assertions.assertNotNull;
27+
import static org.junit.jupiter.api.Assertions.assertNull;
28+
import static org.neo4j.driver.internal.async.connection.ChannelAttributes.setConnectionReadTimeout;
2529
import static org.neo4j.driver.internal.async.connection.ChannelAttributes.setCreationTimestamp;
2630
import static org.neo4j.driver.internal.async.connection.ChannelAttributes.setLastUsedTimestamp;
2731
import static org.neo4j.driver.internal.async.connection.ChannelAttributes.setMessageDispatcher;
@@ -34,6 +38,7 @@
3438
import static org.neo4j.driver.util.TestUtil.await;
3539

3640
import io.netty.channel.Channel;
41+
import io.netty.channel.ChannelHandler;
3742
import io.netty.channel.embedded.EmbeddedChannel;
3843
import io.netty.util.concurrent.Future;
3944
import java.util.Collections;
@@ -46,6 +51,7 @@
4651
import org.junit.jupiter.api.Test;
4752
import org.neo4j.driver.Value;
4853
import org.neo4j.driver.exceptions.AuthorizationExpiredException;
54+
import org.neo4j.driver.internal.async.inbound.ConnectionReadTimeoutHandler;
4955
import org.neo4j.driver.internal.async.inbound.InboundMessageDispatcher;
5056
import org.neo4j.driver.internal.messaging.request.ResetMessage;
5157
import org.neo4j.driver.internal.util.Clock;
@@ -155,6 +161,65 @@ void shouldKeepIdleConnectionWhenPingSucceeds() {
155161
testPing(true);
156162
}
157163

164+
@Test
165+
void shouldHandlePingWithConnectionReceiveTimeout() {
166+
int idleTimeBeforeConnectionTest = 1000;
167+
long connectionReadTimeout = 60L;
168+
PoolSettings settings = new PoolSettings(
169+
DEFAULT_MAX_CONNECTION_POOL_SIZE,
170+
DEFAULT_CONNECTION_ACQUISITION_TIMEOUT,
171+
NOT_CONFIGURED,
172+
idleTimeBeforeConnectionTest);
173+
Clock clock = Clock.SYSTEM;
174+
NettyChannelHealthChecker healthChecker = newHealthChecker(settings, clock);
175+
176+
setCreationTimestamp(channel, clock.millis());
177+
setConnectionReadTimeout(channel, connectionReadTimeout);
178+
setLastUsedTimestamp(channel, clock.millis() - idleTimeBeforeConnectionTest * 2);
179+
180+
Future<Boolean> healthy = healthChecker.isHealthy(channel);
181+
channel.runPendingTasks();
182+
183+
ChannelHandler firstElementOnPipeline = channel.pipeline().first();
184+
assertInstanceOf(ConnectionReadTimeoutHandler.class, firstElementOnPipeline);
185+
assertNotNull(dispatcher.getBeforeLastHandlerHook());
186+
ConnectionReadTimeoutHandler readTimeoutHandler = (ConnectionReadTimeoutHandler) firstElementOnPipeline;
187+
assertEquals(connectionReadTimeout * 1000L, readTimeoutHandler.getReaderIdleTimeInMillis());
188+
assertEquals(ResetMessage.RESET, single(channel.outboundMessages()));
189+
assertFalse(healthy.isDone());
190+
191+
dispatcher.handleSuccessMessage(Collections.emptyMap());
192+
assertThat(await(healthy), is(true));
193+
assertNull(channel.pipeline().first());
194+
assertNull(dispatcher.getBeforeLastHandlerHook());
195+
}
196+
197+
@Test
198+
void shouldHandlePingWithoutConnectionReceiveTimeout() {
199+
int idleTimeBeforeConnectionTest = 1000;
200+
PoolSettings settings = new PoolSettings(
201+
DEFAULT_MAX_CONNECTION_POOL_SIZE,
202+
DEFAULT_CONNECTION_ACQUISITION_TIMEOUT,
203+
NOT_CONFIGURED,
204+
idleTimeBeforeConnectionTest);
205+
Clock clock = Clock.SYSTEM;
206+
NettyChannelHealthChecker healthChecker = newHealthChecker(settings, clock);
207+
208+
setCreationTimestamp(channel, clock.millis());
209+
setLastUsedTimestamp(channel, clock.millis() - idleTimeBeforeConnectionTest * 2);
210+
211+
Future<Boolean> healthy = healthChecker.isHealthy(channel);
212+
channel.runPendingTasks();
213+
214+
assertNull(channel.pipeline().first());
215+
assertEquals(ResetMessage.RESET, single(channel.outboundMessages()));
216+
assertFalse(healthy.isDone());
217+
218+
dispatcher.handleSuccessMessage(Collections.emptyMap());
219+
assertThat(await(healthy), is(true));
220+
assertNull(channel.pipeline().first());
221+
}
222+
158223
@Test
159224
void shouldDropIdleConnectionWhenPingFails() {
160225
testPing(false);

0 commit comments

Comments
 (0)