Skip to content

Commit ee9746c

Browse files
bigmontzinjectives
andauthored
Fix liveness checking for unresponsive connections (#1514)
* Fix liveness checking for unresponsive connections The driver performs liveness checking for connections when remove them from the pool. When this connections are not responsive, the driver hangs waiting for the result of the `SUCCESS` message get back. This problem occurs becaue driver are not taking in consideration the connection hint `connection.recv_timeout_seconds` in the liveness check routinge. The problem is solved by add the ConnectionReadTimeoutHandler to the pipeline also in case of liveness check ping. * Remove unused imports * apply code style * Update driver/src/main/java/org/neo4j/driver/internal/async/pool/NettyChannelHealthChecker.java Co-authored-by: Dmitriy Tverdiakov <[email protected]> --------- Co-authored-by: Dmitriy Tverdiakov <[email protected]>
1 parent 7ec7c4f commit ee9746c

File tree

3 files changed

+89
-1
lines changed

3 files changed

+89
-1
lines changed

driver/src/main/java/org/neo4j/driver/internal/async/inbound/InboundMessageDispatcher.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,10 @@ public void handleIgnoredMessage() {
164164
handler.onFailure(error);
165165
}
166166

167+
public HandlerHook getBeforeLastHandlerHook() {
168+
return this.beforeLastHandlerHook;
169+
}
170+
167171
private Optional<ResetResponseHandler> getPendingResetHandler() {
168172
return handlers.stream()
169173
.filter(h -> h instanceof ResetResponseHandler)

driver/src/main/java/org/neo4j/driver/internal/async/pool/NettyChannelHealthChecker.java

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,14 @@
2828
import io.netty.util.concurrent.Promise;
2929
import io.netty.util.concurrent.PromiseNotifier;
3030
import java.time.Clock;
31+
import java.util.concurrent.TimeUnit;
3132
import java.util.concurrent.atomic.AtomicLong;
3233
import org.neo4j.driver.Logger;
3334
import org.neo4j.driver.Logging;
3435
import org.neo4j.driver.internal.async.connection.AuthorizationStateListener;
36+
import org.neo4j.driver.internal.async.connection.ChannelAttributes;
37+
import org.neo4j.driver.internal.async.inbound.ConnectionReadTimeoutHandler;
38+
import org.neo4j.driver.internal.async.inbound.InboundMessageDispatcher;
3539
import org.neo4j.driver.internal.handlers.PingResponseHandler;
3640
import org.neo4j.driver.internal.messaging.request.ResetMessage;
3741
import org.neo4j.driver.internal.messaging.v51.BoltProtocolV51;
@@ -164,8 +168,24 @@ private boolean hasBeenIdleForTooLong(Channel channel) {
164168

165169
private Future<Boolean> ping(Channel channel) {
166170
Promise<Boolean> result = channel.eventLoop().newPromise();
167-
messageDispatcher(channel).enqueue(new PingResponseHandler(result, channel, logging));
171+
var messageDispatcher = messageDispatcher(channel);
172+
messageDispatcher.enqueue(new PingResponseHandler(result, channel, logging));
173+
attachConnectionReadTimeoutHandler(channel, messageDispatcher);
168174
channel.writeAndFlush(ResetMessage.RESET, channel.voidPromise());
169175
return result;
170176
}
177+
178+
private void attachConnectionReadTimeoutHandler(Channel channel, InboundMessageDispatcher messageDispatcher) {
179+
ChannelAttributes.connectionReadTimeout(channel).ifPresent(connectionReadTimeout -> {
180+
var connectionReadTimeoutHandler =
181+
new ConnectionReadTimeoutHandler(connectionReadTimeout, TimeUnit.SECONDS);
182+
channel.pipeline().addFirst(connectionReadTimeoutHandler);
183+
log.debug("Added ConnectionReadTimeoutHandler");
184+
messageDispatcher.setBeforeLastHandlerHook((messageType) -> {
185+
channel.pipeline().remove(connectionReadTimeoutHandler);
186+
messageDispatcher.setBeforeLastHandlerHook(null);
187+
log.debug("Removed ConnectionReadTimeoutHandler");
188+
});
189+
});
190+
}
171191
}

driver/src/test/java/org/neo4j/driver/internal/async/pool/NettyChannelHealthCheckerTest.java

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@
2121
import static org.hamcrest.Matchers.is;
2222
import static org.junit.jupiter.api.Assertions.assertEquals;
2323
import static org.junit.jupiter.api.Assertions.assertFalse;
24+
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
25+
import static org.junit.jupiter.api.Assertions.assertNotNull;
26+
import static org.junit.jupiter.api.Assertions.assertNull;
2427
import static org.junit.jupiter.api.Assertions.assertTrue;
2528
import static org.mockito.BDDMockito.given;
2629
import static org.mockito.BDDMockito.then;
@@ -29,6 +32,7 @@
2932
import static org.mockito.Mockito.times;
3033
import static org.neo4j.driver.internal.async.connection.ChannelAttributes.authContext;
3134
import static org.neo4j.driver.internal.async.connection.ChannelAttributes.setAuthContext;
35+
import static org.neo4j.driver.internal.async.connection.ChannelAttributes.setConnectionReadTimeout;
3236
import static org.neo4j.driver.internal.async.connection.ChannelAttributes.setCreationTimestamp;
3337
import static org.neo4j.driver.internal.async.connection.ChannelAttributes.setLastUsedTimestamp;
3438
import static org.neo4j.driver.internal.async.connection.ChannelAttributes.setMessageDispatcher;
@@ -54,6 +58,7 @@
5458
import org.junit.jupiter.params.provider.MethodSource;
5559
import org.neo4j.driver.AuthTokenManager;
5660
import org.neo4j.driver.AuthTokens;
61+
import org.neo4j.driver.internal.async.inbound.ConnectionReadTimeoutHandler;
5762
import org.neo4j.driver.internal.async.inbound.InboundMessageDispatcher;
5863
import org.neo4j.driver.internal.messaging.BoltProtocolVersion;
5964
import org.neo4j.driver.internal.messaging.request.ResetMessage;
@@ -248,6 +253,65 @@ void shouldKeepIdleConnectionWhenPingSucceeds() {
248253
testPing(true);
249254
}
250255

256+
@Test
257+
void shouldHandlePingWithConnectionReceiveTimeout() {
258+
var idleTimeBeforeConnectionTest = 1000;
259+
var connectionReadTimeout = 60L;
260+
var settings = new PoolSettings(
261+
DEFAULT_MAX_CONNECTION_POOL_SIZE,
262+
DEFAULT_CONNECTION_ACQUISITION_TIMEOUT,
263+
NOT_CONFIGURED,
264+
idleTimeBeforeConnectionTest);
265+
var clock = Clock.systemUTC();
266+
var healthChecker = newHealthChecker(settings, clock);
267+
268+
setCreationTimestamp(channel, clock.millis());
269+
setConnectionReadTimeout(channel, connectionReadTimeout);
270+
setLastUsedTimestamp(channel, clock.millis() - idleTimeBeforeConnectionTest * 2);
271+
272+
var healthy = healthChecker.isHealthy(channel);
273+
channel.runPendingTasks();
274+
275+
var firstElementOnPipeline = channel.pipeline().first();
276+
assertInstanceOf(ConnectionReadTimeoutHandler.class, firstElementOnPipeline);
277+
assertNotNull(dispatcher.getBeforeLastHandlerHook());
278+
var readTimeoutHandler = (ConnectionReadTimeoutHandler) firstElementOnPipeline;
279+
assertEquals(connectionReadTimeout * 1000L, readTimeoutHandler.getReaderIdleTimeInMillis());
280+
assertEquals(ResetMessage.RESET, single(channel.outboundMessages()));
281+
assertFalse(healthy.isDone());
282+
283+
dispatcher.handleSuccessMessage(Collections.emptyMap());
284+
assertThat(await(healthy), is(true));
285+
assertNull(channel.pipeline().first());
286+
assertNull(dispatcher.getBeforeLastHandlerHook());
287+
}
288+
289+
@Test
290+
void shouldHandlePingWithoutConnectionReceiveTimeout() {
291+
var idleTimeBeforeConnectionTest = 1000;
292+
var settings = new PoolSettings(
293+
DEFAULT_MAX_CONNECTION_POOL_SIZE,
294+
DEFAULT_CONNECTION_ACQUISITION_TIMEOUT,
295+
NOT_CONFIGURED,
296+
idleTimeBeforeConnectionTest);
297+
var clock = Clock.systemUTC();
298+
var healthChecker = newHealthChecker(settings, clock);
299+
300+
setCreationTimestamp(channel, clock.millis());
301+
setLastUsedTimestamp(channel, clock.millis() - idleTimeBeforeConnectionTest * 2);
302+
303+
var healthy = healthChecker.isHealthy(channel);
304+
channel.runPendingTasks();
305+
306+
assertNull(channel.pipeline().first());
307+
assertEquals(ResetMessage.RESET, single(channel.outboundMessages()));
308+
assertFalse(healthy.isDone());
309+
310+
dispatcher.handleSuccessMessage(Collections.emptyMap());
311+
assertThat(await(healthy), is(true));
312+
assertNull(channel.pipeline().first());
313+
}
314+
251315
@Test
252316
void shouldDropIdleConnectionWhenPingFails() {
253317
testPing(false);

0 commit comments

Comments
 (0)