Skip to content

Commit e05eef7

Browse files
committed
Handle thread interruption by closing the channel
Blocking API like `Session#run()` is now implemented on top of async API, i.e. `Session#runAsync()`. Blocking calls simply invoke async ones and uninterruptibly wait for them to complete. Previously, with blocking IO, thread interruptions resulted in `ClosedByInterruptException` which terminated query/transaction in a harsh way. Right now interrupts will do nothing. Handling them is not strictly required because `Thread#interrupt()` is not a nice thing to do. However, interrupts are still used in some environments as a last resort thing. This commit makes driver react on thread interrupts by closing the underlying Netty channel. It's only done in places where channel is actually available. Closing of channel results in exception and the waiting thread fails. It will also make database reset state for this connection, terminate transaction and query. Such aproach seems very similar to the previous one except it will result in `ServiceUnavailableException` instead of `ClosedByInterruptException`.
1 parent 026e684 commit e05eef7

19 files changed

+394
-22
lines changed

driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,9 @@ public final Driver newInstance( URI uri, AuthToken authToken, RoutingSettings r
8282
{
8383
InternalDriver driver = createDriver( uri, address, connectionPool, config, newRoutingSettings,
8484
eventExecutorGroup, securityPlan, retryLogic );
85-
Futures.blockingGet( driver.verifyConnectivity() );
85+
86+
// block to verify connectivity, close connection pool if thread gets interrupted
87+
Futures.blockingGet( driver.verifyConnectivity(), connectionPool::close );
8688
return driver;
8789
}
8890
catch ( Throwable driverError )

driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@
4848

4949
import static java.util.Collections.emptyMap;
5050
import static java.util.concurrent.CompletableFuture.completedFuture;
51-
import static org.neo4j.driver.internal.util.Futures.blockingGet;
5251
import static org.neo4j.driver.internal.util.Futures.completionErrorCause;
5352
import static org.neo4j.driver.internal.util.Futures.failedFuture;
5453
import static org.neo4j.driver.v1.Values.value;
@@ -150,7 +149,8 @@ public void failure()
150149
@Override
151150
public void close()
152151
{
153-
blockingGet( closeAsync() );
152+
Futures.blockingGet( closeAsync(),
153+
() -> terminateConnectionOnThreadInterrupt( "Thread interrupted while closing the transaction" ) );
154154
}
155155

156156
CompletionStage<Void> closeAsync()
@@ -274,8 +274,9 @@ public CompletionStage<StatementResultCursor> runAsync( String statementTemplate
274274
@Override
275275
public StatementResult run( Statement statement )
276276
{
277-
StatementResultCursor cursor = blockingGet( run( statement, false ) );
278-
return new InternalStatementResult( cursor );
277+
StatementResultCursor cursor = Futures.blockingGet( run( statement, false ),
278+
() -> terminateConnectionOnThreadInterrupt( "Thread interrupted while running query in transaction" ) );
279+
return new InternalStatementResult( connection, cursor );
279280
}
280281

281282
@Override
@@ -400,4 +401,9 @@ private BiConsumer<Object,Throwable> transactionClosed( State newState )
400401
session.setBookmark( bookmark );
401402
};
402403
}
404+
405+
private void terminateConnectionOnThreadInterrupt( String reason )
406+
{
407+
connection.terminateAndRelease( reason );
408+
}
403409
}

driver/src/main/java/org/neo4j/driver/internal/InternalDriver.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,14 @@
2222
import java.util.concurrent.atomic.AtomicBoolean;
2323

2424
import org.neo4j.driver.internal.security.SecurityPlan;
25+
import org.neo4j.driver.internal.util.Futures;
2526
import org.neo4j.driver.v1.AccessMode;
2627
import org.neo4j.driver.v1.Driver;
2728
import org.neo4j.driver.v1.Logger;
2829
import org.neo4j.driver.v1.Logging;
2930
import org.neo4j.driver.v1.Session;
3031

3132
import static java.util.concurrent.CompletableFuture.completedFuture;
32-
import static org.neo4j.driver.internal.util.Futures.blockingGet;
3333

3434
public class InternalDriver implements Driver
3535
{
@@ -105,7 +105,7 @@ private Session newSession( AccessMode mode, Bookmark bookmark )
105105
@Override
106106
public void close()
107107
{
108-
blockingGet( closeAsync() );
108+
Futures.blockingGet( closeAsync() );
109109
}
110110

111111
@Override

driver/src/main/java/org/neo4j/driver/internal/InternalStatementResult.java

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,10 @@
1919
package org.neo4j.driver.internal;
2020

2121
import java.util.List;
22+
import java.util.concurrent.CompletionStage;
2223

24+
import org.neo4j.driver.internal.spi.Connection;
25+
import org.neo4j.driver.internal.util.Futures;
2326
import org.neo4j.driver.v1.Record;
2427
import org.neo4j.driver.v1.StatementResult;
2528
import org.neo4j.driver.v1.StatementResultCursor;
@@ -28,15 +31,15 @@
2831
import org.neo4j.driver.v1.summary.ResultSummary;
2932
import org.neo4j.driver.v1.util.Function;
3033

31-
import static org.neo4j.driver.internal.util.Futures.blockingGet;
32-
3334
public class InternalStatementResult implements StatementResult
3435
{
36+
private final Connection connection;
3537
private final StatementResultCursor cursor;
3638
private List<String> keys;
3739

38-
public InternalStatementResult( StatementResultCursor cursor )
40+
public InternalStatementResult( Connection connection, StatementResultCursor cursor )
3941
{
42+
this.connection = connection;
4043
this.cursor = cursor;
4144
}
4245

@@ -114,4 +117,14 @@ public void remove()
114117
{
115118
throw new ClientException( "Removing records from a result is not supported." );
116119
}
120+
121+
private <T> T blockingGet( CompletionStage<T> stage )
122+
{
123+
return Futures.blockingGet( stage, this::terminateConnectionOnThreadInterrupt );
124+
}
125+
126+
private void terminateConnectionOnThreadInterrupt()
127+
{
128+
connection.terminateAndRelease( "Thread interrupted while waiting for result to arrive" );
129+
}
117130
}

driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java

Lines changed: 37 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@
4747
import org.neo4j.driver.v1.types.TypeSystem;
4848

4949
import static java.util.concurrent.CompletableFuture.completedFuture;
50-
import static org.neo4j.driver.internal.util.Futures.blockingGet;
5150
import static org.neo4j.driver.internal.util.Futures.failedFuture;
5251
import static org.neo4j.driver.v1.Values.value;
5352

@@ -132,8 +131,12 @@ public CompletionStage<StatementResultCursor> runAsync( String statementText, Va
132131
@Override
133132
public StatementResult run( Statement statement )
134133
{
135-
StatementResultCursor cursor = blockingGet( runAsync( statement, false ) );
136-
return new InternalStatementResult( cursor );
134+
StatementResultCursor cursor = Futures.blockingGet( runAsync( statement, false ),
135+
() -> terminateConnectionOnThreadInterrupt( "Thread interrupted while running query in session" ) );
136+
137+
// query executed, it is safe to obtain a connection in a blocking way
138+
Connection connection = Futures.getNow( connectionStage );
139+
return new InternalStatementResult( connection, cursor );
137140
}
138141

139142
@Override
@@ -152,7 +155,8 @@ public boolean isOpen()
152155
@Override
153156
public void close()
154157
{
155-
blockingGet( closeAsync() );
158+
Futures.blockingGet( closeAsync(),
159+
() -> terminateConnectionOnThreadInterrupt( "Thread interrupted while closing the session" ) );
156160
}
157161

158162
@Override
@@ -189,7 +193,7 @@ public CompletionStage<Void> closeAsync()
189193
@Override
190194
public Transaction beginTransaction()
191195
{
192-
return blockingGet( beginTransactionAsync( mode ) );
196+
return beginTransaction( mode );
193197
}
194198

195199
@Deprecated
@@ -248,7 +252,8 @@ public String lastBookmark()
248252
@Override
249253
public void reset()
250254
{
251-
blockingGet( resetAsync() );
255+
Futures.blockingGet( resetAsync(),
256+
() -> terminateConnectionOnThreadInterrupt( "Thread interrupted while resetting the session" ) );
252257
}
253258

254259
private CompletionStage<Void> resetAsync()
@@ -288,7 +293,7 @@ private <T> T transaction( AccessMode mode, TransactionWork<T> work )
288293
// event loop thread will bock and wait for itself to read some data
289294
return retryLogic.retry( () ->
290295
{
291-
try ( Transaction tx = blockingGet( beginTransactionAsync( mode ) ) )
296+
try ( Transaction tx = beginTransaction( mode ) )
292297
{
293298
try
294299
{
@@ -433,6 +438,12 @@ private CompletionStage<InternalStatementResultCursor> runAsync( Statement state
433438
return newResultCursorStage;
434439
}
435440

441+
private Transaction beginTransaction( AccessMode mode )
442+
{
443+
return Futures.blockingGet( beginTransactionAsync( mode ),
444+
() -> terminateConnectionOnThreadInterrupt( "Thread interrupted while starting a transaction" ) );
445+
}
446+
436447
private CompletionStage<ExplicitTransaction> beginTransactionAsync( AccessMode mode )
437448
{
438449
ensureSessionIsOpen();
@@ -526,6 +537,25 @@ private CompletionStage<Void> releaseConnection()
526537
} );
527538
}
528539

540+
private void terminateConnectionOnThreadInterrupt( String reason )
541+
{
542+
// try to get current connection in a blocking fashion
543+
Connection connection = null;
544+
try
545+
{
546+
connection = Futures.getNow( connectionStage );
547+
}
548+
catch ( Throwable ignore )
549+
{
550+
// ignore errors because handing interruptions is best effort
551+
}
552+
553+
if ( connection != null )
554+
{
555+
connection.terminateAndRelease( reason );
556+
}
557+
}
558+
529559
private CompletionStage<Void> ensureNoOpenTxBeforeRunningQuery()
530560
{
531561
return ensureNoOpenTx( "Statements cannot be run directly on a session with an open transaction; " +

driver/src/main/java/org/neo4j/driver/internal/async/ChannelAttributes.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ public final class ChannelAttributes
3434
private static final AttributeKey<Long> CREATION_TIMESTAMP = newInstance( "creationTimestamp" );
3535
private static final AttributeKey<Long> LAST_USED_TIMESTAMP = newInstance( "lastUsedTimestamp" );
3636
private static final AttributeKey<InboundMessageDispatcher> MESSAGE_DISPATCHER = newInstance( "messageDispatcher" );
37+
private static final AttributeKey<String> TERMINATION_REASON = newInstance( "terminationReason" );
3738

3839
private ChannelAttributes()
3940
{
@@ -89,6 +90,16 @@ public static void setMessageDispatcher( Channel channel, InboundMessageDispatch
8990
setOnce( channel, MESSAGE_DISPATCHER, messageDispatcher );
9091
}
9192

93+
public static String terminationReason( Channel channel )
94+
{
95+
return get( channel, TERMINATION_REASON );
96+
}
97+
98+
public static void setTerminationReason( Channel channel, String reason )
99+
{
100+
setOnce( channel, TERMINATION_REASON, reason );
101+
}
102+
92103
private static <T> T get( Channel channel, AttributeKey<T> key )
93104
{
94105
return channel.attr( key ).get();

driver/src/main/java/org/neo4j/driver/internal/async/NettyConnection.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@
3939
import org.neo4j.driver.internal.util.ServerVersion;
4040
import org.neo4j.driver.v1.Value;
4141

42+
import static org.neo4j.driver.internal.async.ChannelAttributes.setTerminationReason;
43+
4244
public class NettyConnection implements Connection
4345
{
4446
private final Channel channel;
@@ -115,6 +117,18 @@ public CompletionStage<Void> release()
115117
return releaseFuture;
116118
}
117119

120+
@Override
121+
public void terminateAndRelease( String reason )
122+
{
123+
if ( open.compareAndSet( true, false ) )
124+
{
125+
setTerminationReason( channel, reason );
126+
channel.close();
127+
channelPool.release( channel );
128+
releaseFuture.complete( null );
129+
}
130+
}
131+
118132
@Override
119133
public BoltServerAddress serverAddress()
120134
{

driver/src/main/java/org/neo4j/driver/internal/async/RoutingConnection.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,12 @@ public CompletionStage<Void> release()
8383
return delegate.release();
8484
}
8585

86+
@Override
87+
public void terminateAndRelease( String reason )
88+
{
89+
delegate.terminateAndRelease( reason );
90+
}
91+
8692
@Override
8793
public BoltServerAddress serverAddress()
8894
{

driver/src/main/java/org/neo4j/driver/internal/async/inbound/ChannelErrorHandler.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232

3333
import static java.util.Objects.requireNonNull;
3434
import static org.neo4j.driver.internal.async.ChannelAttributes.messageDispatcher;
35+
import static org.neo4j.driver.internal.async.ChannelAttributes.terminationReason;
3536

3637
public class ChannelErrorHandler extends ChannelInboundHandlerAdapter
3738
{
@@ -69,8 +70,10 @@ public void channelInactive( ChannelHandlerContext ctx )
6970
if ( !failed )
7071
{
7172
// channel became inactive not because of a fatal exception that came from exceptionCaught
72-
// it is most likely inactive because actual network connection broke
73-
ServiceUnavailableException error = ErrorUtil.newConnectionTerminatedError();
73+
// it is most likely inactive because actual network connection broke or was explicitly closed by the driver
74+
75+
String terminationReason = terminationReason( ctx.channel() );
76+
ServiceUnavailableException error = ErrorUtil.newConnectionTerminatedError( terminationReason );
7477
fail( ctx, error );
7578
}
7679
}

driver/src/main/java/org/neo4j/driver/internal/spi/Connection.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ void runAndFlush( String statement, Map<String,Value> parameters, ResponseHandle
4141

4242
CompletionStage<Void> release();
4343

44+
void terminateAndRelease( String reason );
45+
4446
BoltServerAddress serverAddress();
4547

4648
ServerVersion serverVersion();

driver/src/main/java/org/neo4j/driver/internal/util/ErrorUtil.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,15 @@ private ErrorUtil()
3131
{
3232
}
3333

34+
public static ServiceUnavailableException newConnectionTerminatedError( String reason )
35+
{
36+
if ( reason == null )
37+
{
38+
return newConnectionTerminatedError();
39+
}
40+
return new ServiceUnavailableException( "Connection to the database terminated. " + reason );
41+
}
42+
3443
public static ServiceUnavailableException newConnectionTerminatedError()
3544
{
3645
return new ServiceUnavailableException( "Connection to the database terminated. " +

driver/src/main/java/org/neo4j/driver/internal/util/Futures.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,11 @@ public static <T> CompletableFuture<T> failedFuture( Throwable error )
7878
}
7979

8080
public static <V> V blockingGet( CompletionStage<V> stage )
81+
{
82+
return blockingGet( stage, Futures::noOpInterruptHandler );
83+
}
84+
85+
public static <V> V blockingGet( CompletionStage<V> stage, Runnable interruptHandler )
8186
{
8287
EventLoopGroupFactory.assertNotInEventLoopThread();
8388

@@ -93,7 +98,14 @@ public static <V> V blockingGet( CompletionStage<V> stage )
9398
}
9499
catch ( InterruptedException e )
95100
{
101+
// this thread was interrupted while waiting
102+
// computation denoted by the future might still be running
103+
96104
interrupted = true;
105+
106+
// run the interrupt handler and ignore if it throws
107+
// need to wait for IO thread to actually finish, can't simply re-rethrow
108+
safeRun( interruptHandler );
97109
}
98110
catch ( ExecutionException e )
99111
{
@@ -110,6 +122,11 @@ public static <V> V blockingGet( CompletionStage<V> stage )
110122
}
111123
}
112124

125+
public static <T> T getNow( CompletionStage<T> stage )
126+
{
127+
return stage.toCompletableFuture().getNow( null );
128+
}
129+
113130
// todo: test all call sites
114131
public static Throwable completionErrorCause( Throwable error )
115132
{
@@ -119,4 +136,19 @@ public static Throwable completionErrorCause( Throwable error )
119136
}
120137
return error;
121138
}
139+
140+
private static void noOpInterruptHandler()
141+
{
142+
}
143+
144+
private static void safeRun( Runnable runnable )
145+
{
146+
try
147+
{
148+
runnable.run();
149+
}
150+
catch ( Throwable ignore )
151+
{
152+
}
153+
}
122154
}

0 commit comments

Comments
 (0)