Skip to content

Commit cfbff4a

Browse files
authored
Merge pull request #441 from lutovich/1.5-thread-interrupt
Handle thread interruption by closing the channel
2 parents 2f3648c + c788f45 commit cfbff4a

24 files changed

+1024
-96
lines changed

driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java

Lines changed: 72 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
import org.neo4j.driver.v1.Logger;
5454
import org.neo4j.driver.v1.Logging;
5555
import org.neo4j.driver.v1.exceptions.ClientException;
56+
import org.neo4j.driver.v1.exceptions.ServiceUnavailableException;
5657

5758
import static java.lang.String.format;
5859
import static org.neo4j.driver.internal.security.SecurityPlan.insecure;
@@ -78,26 +79,12 @@ public final Driver newInstance( URI uri, AuthToken authToken, RoutingSettings r
7879

7980
ConnectionPool connectionPool = createConnectionPool( authToken, securityPlan, bootstrap, config );
8081

81-
try
82-
{
83-
InternalDriver driver = createDriver( uri, address, connectionPool, config, newRoutingSettings,
84-
eventExecutorGroup, securityPlan, retryLogic );
85-
Futures.blockingGet( driver.verifyConnectivity() );
86-
return driver;
87-
}
88-
catch ( Throwable driverError )
89-
{
90-
// we need to close the connection pool if driver creation threw exception
91-
try
92-
{
93-
Futures.blockingGet( connectionPool.close() );
94-
}
95-
catch ( Throwable closeError )
96-
{
97-
driverError.addSuppressed( closeError );
98-
}
99-
throw driverError;
100-
}
82+
InternalDriver driver = createDriver( uri, address, connectionPool, config, newRoutingSettings,
83+
eventExecutorGroup, securityPlan, retryLogic );
84+
85+
verifyConnectivity( driver, connectionPool, config );
86+
87+
return driver;
10188
}
10289

10390
protected ConnectionPool createConnectionPool( AuthToken authToken, SecurityPlan securityPlan,
@@ -123,17 +110,26 @@ private InternalDriver createDriver( URI uri, BoltServerAddress address,
123110
ConnectionPool connectionPool, Config config, RoutingSettings routingSettings,
124111
EventExecutorGroup eventExecutorGroup, SecurityPlan securityPlan, RetryLogic retryLogic )
125112
{
126-
String scheme = uri.getScheme().toLowerCase();
127-
switch ( scheme )
113+
try
128114
{
129-
case BOLT_URI_SCHEME:
130-
assertNoRoutingContext( uri, routingSettings );
131-
return createDirectDriver( address, config, securityPlan, retryLogic, connectionPool );
132-
case BOLT_ROUTING_URI_SCHEME:
133-
return createRoutingDriver( address, connectionPool, config, routingSettings, securityPlan, retryLogic,
134-
eventExecutorGroup );
135-
default:
136-
throw new ClientException( format( "Unsupported URI scheme: %s", scheme ) );
115+
String scheme = uri.getScheme().toLowerCase();
116+
switch ( scheme )
117+
{
118+
case BOLT_URI_SCHEME:
119+
assertNoRoutingContext( uri, routingSettings );
120+
return createDirectDriver( address, config, securityPlan, retryLogic, connectionPool );
121+
case BOLT_ROUTING_URI_SCHEME:
122+
return createRoutingDriver( address, connectionPool, config, routingSettings, securityPlan, retryLogic,
123+
eventExecutorGroup );
124+
default:
125+
throw new ClientException( format( "Unsupported URI scheme: %s", scheme ) );
126+
}
127+
}
128+
catch ( Throwable driverError )
129+
{
130+
// we need to close the connection pool if driver creation threw exception
131+
closeConnectionPoolAndSuppressError( connectionPool, driverError );
132+
throw driverError;
137133
}
138134
}
139135

@@ -313,4 +309,50 @@ private static void assertNoRoutingContext( URI uri, RoutingSettings routingSett
313309
"Routing parameters are not supported with scheme 'bolt'. Given URI: '" + uri + "'" );
314310
}
315311
}
312+
313+
private static void verifyConnectivity( InternalDriver driver, ConnectionPool connectionPool, Config config )
314+
{
315+
try
316+
{
317+
// block to verify connectivity, close connection pool if thread gets interrupted
318+
Futures.blockingGet( driver.verifyConnectivity(),
319+
() -> closeConnectionPoolOnThreadInterrupt( connectionPool, config.logging() ) );
320+
}
321+
catch ( Throwable connectionError )
322+
{
323+
if ( Thread.currentThread().isInterrupted() )
324+
{
325+
// current thread has been interrupted while verifying connectivity
326+
// connection pool should've been closed
327+
throw new ServiceUnavailableException( "Unable to create driver. Thread has been interrupted.",
328+
connectionError );
329+
}
330+
331+
// we need to close the connection pool if driver creation threw exception
332+
closeConnectionPoolAndSuppressError( connectionPool, connectionError );
333+
throw connectionError;
334+
}
335+
}
336+
337+
private static void closeConnectionPoolAndSuppressError( ConnectionPool connectionPool, Throwable mainError )
338+
{
339+
try
340+
{
341+
Futures.blockingGet( connectionPool.close() );
342+
}
343+
catch ( Throwable closeError )
344+
{
345+
if ( mainError != closeError )
346+
{
347+
mainError.addSuppressed( closeError );
348+
}
349+
}
350+
}
351+
352+
private static void closeConnectionPoolOnThreadInterrupt( ConnectionPool pool, Logging logging )
353+
{
354+
Logger log = logging.getLog( Driver.class.getSimpleName() );
355+
log.warn( "Driver creation interrupted while verifying connectivity. Connection pool will be closed" );
356+
pool.close();
357+
}
316358
}

driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@
4747

4848
import static java.util.Collections.emptyMap;
4949
import static java.util.concurrent.CompletableFuture.completedFuture;
50-
import static org.neo4j.driver.internal.util.Futures.blockingGet;
5150
import static org.neo4j.driver.internal.util.Futures.failedFuture;
5251
import static org.neo4j.driver.v1.Values.value;
5352

@@ -148,7 +147,8 @@ public void failure()
148147
@Override
149148
public void close()
150149
{
151-
blockingGet( closeAsync() );
150+
Futures.blockingGet( closeAsync(),
151+
() -> terminateConnectionOnThreadInterrupt( "Thread interrupted while closing the transaction" ) );
152152
}
153153

154154
CompletionStage<Void> closeAsync()
@@ -272,8 +272,9 @@ public CompletionStage<StatementResultCursor> runAsync( String statementTemplate
272272
@Override
273273
public StatementResult run( Statement statement )
274274
{
275-
StatementResultCursor cursor = blockingGet( run( statement, false ) );
276-
return new InternalStatementResult( cursor );
275+
StatementResultCursor cursor = Futures.blockingGet( run( statement, false ),
276+
() -> terminateConnectionOnThreadInterrupt( "Thread interrupted while running query in transaction" ) );
277+
return new InternalStatementResult( connection, cursor );
277278
}
278279

279280
@Override
@@ -368,7 +369,17 @@ private BiFunction<Void,Throwable,Void> handleCommitOrRollback( Throwable cursor
368369
{
369370
return ( ignore, commitOrRollbackError ) ->
370371
{
371-
if ( cursorFailure != null )
372+
if ( cursorFailure != null && commitOrRollbackError != null )
373+
{
374+
Throwable cause1 = Futures.completionExceptionCause( cursorFailure );
375+
Throwable cause2 = Futures.completionExceptionCause( commitOrRollbackError );
376+
if ( cause1 != cause2 )
377+
{
378+
cause1.addSuppressed( cause2 );
379+
}
380+
throw Futures.asCompletionException( cause1 );
381+
}
382+
else if ( cursorFailure != null )
372383
{
373384
throw Futures.asCompletionException( cursorFailure );
374385
}
@@ -392,4 +403,9 @@ private BiConsumer<Object,Throwable> transactionClosed( State newState )
392403
session.setBookmark( bookmark );
393404
};
394405
}
406+
407+
private void terminateConnectionOnThreadInterrupt( String reason )
408+
{
409+
connection.terminateAndRelease( reason );
410+
}
395411
}

driver/src/main/java/org/neo4j/driver/internal/InternalDriver.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,14 @@
2222
import java.util.concurrent.atomic.AtomicBoolean;
2323

2424
import org.neo4j.driver.internal.security.SecurityPlan;
25+
import org.neo4j.driver.internal.util.Futures;
2526
import org.neo4j.driver.v1.AccessMode;
2627
import org.neo4j.driver.v1.Driver;
2728
import org.neo4j.driver.v1.Logger;
2829
import org.neo4j.driver.v1.Logging;
2930
import org.neo4j.driver.v1.Session;
3031

3132
import static java.util.concurrent.CompletableFuture.completedFuture;
32-
import static org.neo4j.driver.internal.util.Futures.blockingGet;
3333

3434
public class InternalDriver implements Driver
3535
{
@@ -105,7 +105,7 @@ private Session newSession( AccessMode mode, Bookmark bookmark )
105105
@Override
106106
public void close()
107107
{
108-
blockingGet( closeAsync() );
108+
Futures.blockingGet( closeAsync() );
109109
}
110110

111111
@Override

driver/src/main/java/org/neo4j/driver/internal/InternalStatementResult.java

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,10 @@
1919
package org.neo4j.driver.internal;
2020

2121
import java.util.List;
22+
import java.util.concurrent.CompletionStage;
2223

24+
import org.neo4j.driver.internal.spi.Connection;
25+
import org.neo4j.driver.internal.util.Futures;
2326
import org.neo4j.driver.v1.Record;
2427
import org.neo4j.driver.v1.StatementResult;
2528
import org.neo4j.driver.v1.StatementResultCursor;
@@ -28,15 +31,15 @@
2831
import org.neo4j.driver.v1.summary.ResultSummary;
2932
import org.neo4j.driver.v1.util.Function;
3033

31-
import static org.neo4j.driver.internal.util.Futures.blockingGet;
32-
3334
public class InternalStatementResult implements StatementResult
3435
{
36+
private final Connection connection;
3537
private final StatementResultCursor cursor;
3638
private List<String> keys;
3739

38-
public InternalStatementResult( StatementResultCursor cursor )
40+
public InternalStatementResult( Connection connection, StatementResultCursor cursor )
3941
{
42+
this.connection = connection;
4043
this.cursor = cursor;
4144
}
4245

@@ -114,4 +117,14 @@ public void remove()
114117
{
115118
throw new ClientException( "Removing records from a result is not supported." );
116119
}
120+
121+
private <T> T blockingGet( CompletionStage<T> stage )
122+
{
123+
return Futures.blockingGet( stage, this::terminateConnectionOnThreadInterrupt );
124+
}
125+
126+
private void terminateConnectionOnThreadInterrupt()
127+
{
128+
connection.terminateAndRelease( "Thread interrupted while waiting for result to arrive" );
129+
}
117130
}

driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java

Lines changed: 37 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@
4747
import org.neo4j.driver.v1.types.TypeSystem;
4848

4949
import static java.util.concurrent.CompletableFuture.completedFuture;
50-
import static org.neo4j.driver.internal.util.Futures.blockingGet;
5150
import static org.neo4j.driver.internal.util.Futures.failedFuture;
5251
import static org.neo4j.driver.v1.Values.value;
5352

@@ -132,8 +131,12 @@ public CompletionStage<StatementResultCursor> runAsync( String statementText, Va
132131
@Override
133132
public StatementResult run( Statement statement )
134133
{
135-
StatementResultCursor cursor = blockingGet( run( statement, false ) );
136-
return new InternalStatementResult( cursor );
134+
StatementResultCursor cursor = Futures.blockingGet( run( statement, false ),
135+
() -> terminateConnectionOnThreadInterrupt( "Thread interrupted while running query in session" ) );
136+
137+
// query executed, it is safe to obtain a connection in a blocking way
138+
Connection connection = Futures.getNow( connectionStage );
139+
return new InternalStatementResult( connection, cursor );
137140
}
138141

139142
@Override
@@ -152,7 +155,8 @@ public boolean isOpen()
152155
@Override
153156
public void close()
154157
{
155-
blockingGet( closeAsync() );
158+
Futures.blockingGet( closeAsync(),
159+
() -> terminateConnectionOnThreadInterrupt( "Thread interrupted while closing the session" ) );
156160
}
157161

158162
@Override
@@ -188,7 +192,7 @@ public CompletionStage<Void> closeAsync()
188192
@Override
189193
public Transaction beginTransaction()
190194
{
191-
return blockingGet( beginTransactionAsync( mode ) );
195+
return beginTransaction( mode );
192196
}
193197

194198
@Deprecated
@@ -247,7 +251,8 @@ public String lastBookmark()
247251
@Override
248252
public void reset()
249253
{
250-
blockingGet( resetAsync() );
254+
Futures.blockingGet( resetAsync(),
255+
() -> terminateConnectionOnThreadInterrupt( "Thread interrupted while resetting the session" ) );
251256
}
252257

253258
private CompletionStage<Void> resetAsync()
@@ -287,7 +292,7 @@ private <T> T transaction( AccessMode mode, TransactionWork<T> work )
287292
// event loop thread will bock and wait for itself to read some data
288293
return retryLogic.retry( () ->
289294
{
290-
try ( Transaction tx = blockingGet( beginTransactionAsync( mode ) ) )
295+
try ( Transaction tx = beginTransaction( mode ) )
291296
{
292297
try
293298
{
@@ -422,6 +427,12 @@ private CompletionStage<InternalStatementResultCursor> run( Statement statement,
422427
return newResultCursorStage;
423428
}
424429

430+
private Transaction beginTransaction( AccessMode mode )
431+
{
432+
return Futures.blockingGet( beginTransactionAsync( mode ),
433+
() -> terminateConnectionOnThreadInterrupt( "Thread interrupted while starting a transaction" ) );
434+
}
435+
425436
private CompletionStage<ExplicitTransaction> beginTransactionAsync( AccessMode mode )
426437
{
427438
ensureSessionIsOpen();
@@ -515,6 +526,25 @@ private CompletionStage<Void> releaseConnection()
515526
} );
516527
}
517528

529+
private void terminateConnectionOnThreadInterrupt( String reason )
530+
{
531+
// try to get current connection if it has been acquired
532+
Connection connection = null;
533+
try
534+
{
535+
connection = Futures.getNow( connectionStage );
536+
}
537+
catch ( Throwable ignore )
538+
{
539+
// ignore errors because handing interruptions is best effort
540+
}
541+
542+
if ( connection != null )
543+
{
544+
connection.terminateAndRelease( reason );
545+
}
546+
}
547+
518548
private CompletionStage<Void> ensureNoOpenTxBeforeRunningQuery()
519549
{
520550
return ensureNoOpenTx( "Statements cannot be run directly on a session with an open transaction; " +

driver/src/main/java/org/neo4j/driver/internal/async/ChannelAttributes.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ public final class ChannelAttributes
3434
private static final AttributeKey<Long> CREATION_TIMESTAMP = newInstance( "creationTimestamp" );
3535
private static final AttributeKey<Long> LAST_USED_TIMESTAMP = newInstance( "lastUsedTimestamp" );
3636
private static final AttributeKey<InboundMessageDispatcher> MESSAGE_DISPATCHER = newInstance( "messageDispatcher" );
37+
private static final AttributeKey<String> TERMINATION_REASON = newInstance( "terminationReason" );
3738

3839
private ChannelAttributes()
3940
{
@@ -89,6 +90,16 @@ public static void setMessageDispatcher( Channel channel, InboundMessageDispatch
8990
setOnce( channel, MESSAGE_DISPATCHER, messageDispatcher );
9091
}
9192

93+
public static String terminationReason( Channel channel )
94+
{
95+
return get( channel, TERMINATION_REASON );
96+
}
97+
98+
public static void setTerminationReason( Channel channel, String reason )
99+
{
100+
setOnce( channel, TERMINATION_REASON, reason );
101+
}
102+
92103
private static <T> T get( Channel channel, AttributeKey<T> key )
93104
{
94105
return channel.attr( key ).get();

0 commit comments

Comments
 (0)