Skip to content

Commit 97e0e2f

Browse files
committed
More tests for thread interruption handling
Added couple more integrations tests to verify that `Thread#interrupt()` terminates active connection and "unblocks" the blocking thread: * when driver is created towards unresponsive server * when transaction blocks to read back the result * when transaction blocks to commit Also `NettyConnection` will now fail provided handlers except throwing exception directly. Increased test coverage for future helpers.
1 parent e05eef7 commit 97e0e2f

File tree

11 files changed

+588
-79
lines changed

11 files changed

+588
-79
lines changed

driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,8 @@ public final Driver newInstance( URI uri, AuthToken authToken, RoutingSettings r
8484
eventExecutorGroup, securityPlan, retryLogic );
8585

8686
// block to verify connectivity, close connection pool if thread gets interrupted
87-
Futures.blockingGet( driver.verifyConnectivity(), connectionPool::close );
87+
Futures.blockingGet( driver.verifyConnectivity(),
88+
() -> closeConnectionPoolOnThreadInterrupt( connectionPool, config.logging() ) );
8889
return driver;
8990
}
9091
catch ( Throwable driverError )
@@ -315,4 +316,11 @@ private static void assertNoRoutingContext( URI uri, RoutingSettings routingSett
315316
"Routing parameters are not supported with scheme 'bolt'. Given URI: '" + uri + "'" );
316317
}
317318
}
319+
320+
private static void closeConnectionPoolOnThreadInterrupt( ConnectionPool pool, Logging logging )
321+
{
322+
Logger log = logging.getLog( Driver.class.getSimpleName() );
323+
log.warn( "Driver creation interrupted while verifying connectivity. Connection pool will be closed" );
324+
pool.close();
325+
}
318326
}

driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -377,7 +377,17 @@ private BiFunction<Void,Throwable,Void> handleCommitOrRollback( Throwable cursor
377377
{
378378
return ( ignore, commitOrRollbackError ) ->
379379
{
380-
if ( cursorFailure != null )
380+
if ( cursorFailure != null && commitOrRollbackError != null )
381+
{
382+
Throwable cause1 = completionErrorCause( cursorFailure );
383+
Throwable cause2 = completionErrorCause( commitOrRollbackError );
384+
if ( cause1 != cause2 )
385+
{
386+
cause1.addSuppressed( cause2 );
387+
}
388+
throw new CompletionException( cause1 );
389+
}
390+
else if ( cursorFailure != null )
381391
{
382392
throw new CompletionException( completionErrorCause( cursorFailure ) );
383393
}

driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -539,7 +539,7 @@ private CompletionStage<Void> releaseConnection()
539539

540540
private void terminateConnectionOnThreadInterrupt( String reason )
541541
{
542-
// try to get current connection in a blocking fashion
542+
// try to get current connection if it has been acquired
543543
Connection connection = null;
544544
try
545545
{

driver/src/main/java/org/neo4j/driver/internal/async/NettyConnection.java

Lines changed: 37 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
import java.util.Map;
2525
import java.util.concurrent.CompletableFuture;
2626
import java.util.concurrent.CompletionStage;
27-
import java.util.concurrent.atomic.AtomicBoolean;
27+
import java.util.concurrent.atomic.AtomicReference;
2828

2929
import org.neo4j.driver.internal.BoltServerAddress;
3030
import org.neo4j.driver.internal.async.inbound.InboundMessageDispatcher;
@@ -51,7 +51,7 @@ public class NettyConnection implements Connection
5151
private final CompletableFuture<Void> releaseFuture;
5252
private final Clock clock;
5353

54-
private final AtomicBoolean open = new AtomicBoolean( true );
54+
private final AtomicReference<Status> status = new AtomicReference<>( Status.OPEN );
5555

5656
public NettyConnection( Channel channel, ChannelPool channelPool, Clock clock )
5757
{
@@ -67,7 +67,7 @@ public NettyConnection( Channel channel, ChannelPool channelPool, Clock clock )
6767
@Override
6868
public boolean isOpen()
6969
{
70-
return open.get();
70+
return status.get() == Status.OPEN;
7171
}
7272

7373
@Override
@@ -92,22 +92,26 @@ public void disableAutoRead()
9292
public void run( String statement, Map<String,Value> parameters, ResponseHandler runHandler,
9393
ResponseHandler pullAllHandler )
9494
{
95-
assertOpen();
96-
run( statement, parameters, runHandler, pullAllHandler, false );
95+
if ( verifyOpen( runHandler, pullAllHandler ) )
96+
{
97+
run( statement, parameters, runHandler, pullAllHandler, false );
98+
}
9799
}
98100

99101
@Override
100102
public void runAndFlush( String statement, Map<String,Value> parameters, ResponseHandler runHandler,
101103
ResponseHandler pullAllHandler )
102104
{
103-
assertOpen();
104-
run( statement, parameters, runHandler, pullAllHandler, true );
105+
if ( verifyOpen( runHandler, pullAllHandler ) )
106+
{
107+
run( statement, parameters, runHandler, pullAllHandler, true );
108+
}
105109
}
106110

107111
@Override
108112
public CompletionStage<Void> release()
109113
{
110-
if ( open.compareAndSet( true, false ) )
114+
if ( status.compareAndSet( Status.OPEN, Status.RELEASED ) )
111115
{
112116
// auto-read could've been disabled, re-enable it to automatically receive response for RESET
113117
setAutoRead( true );
@@ -120,7 +124,7 @@ public CompletionStage<Void> release()
120124
@Override
121125
public void terminateAndRelease( String reason )
122126
{
123-
if ( open.compareAndSet( true, false ) )
127+
if ( status.compareAndSet( Status.OPEN, Status.TERMINATED ) )
124128
{
125129
setTerminationReason( channel, reason );
126130
channel.close();
@@ -192,11 +196,32 @@ private void setAutoRead( boolean value )
192196
channel.config().setAutoRead( value );
193197
}
194198

195-
private void assertOpen()
199+
private boolean verifyOpen( ResponseHandler runHandler, ResponseHandler pullAllHandler )
196200
{
197-
if ( !isOpen() )
201+
Status connectionStatus = this.status.get();
202+
switch ( connectionStatus )
198203
{
199-
throw new IllegalStateException( "Connection has been released to the pool and can't be reused" );
204+
case OPEN:
205+
return true;
206+
case RELEASED:
207+
Exception error = new IllegalStateException( "Connection has been released to the pool and can't be used" );
208+
runHandler.onFailure( error );
209+
pullAllHandler.onFailure( error );
210+
return false;
211+
case TERMINATED:
212+
Exception terminatedError = new IllegalStateException( "Connection has been terminated and can't be used" );
213+
runHandler.onFailure( terminatedError );
214+
pullAllHandler.onFailure( terminatedError );
215+
return false;
216+
default:
217+
throw new IllegalStateException( "Unknown status: " + connectionStatus );
200218
}
201219
}
220+
221+
private enum Status
222+
{
223+
OPEN,
224+
RELEASED,
225+
TERMINATED
226+
}
202227
}

driver/src/test/java/org/neo4j/driver/internal/async/NettyConnectionTest.java

Lines changed: 48 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import io.netty.util.internal.ConcurrentSet;
2727
import org.junit.After;
2828
import org.junit.Test;
29+
import org.mockito.ArgumentCaptor;
2930

3031
import java.util.Set;
3132
import java.util.concurrent.CompletionStage;
@@ -47,7 +48,6 @@
4748
import static org.junit.Assert.assertFalse;
4849
import static org.junit.Assert.assertThat;
4950
import static org.junit.Assert.assertTrue;
50-
import static org.junit.Assert.fail;
5151
import static org.mockito.Matchers.any;
5252
import static org.mockito.Mockito.mock;
5353
import static org.mockito.Mockito.never;
@@ -153,38 +153,61 @@ public void shouldNotDisableAutoReadWhenReleased()
153153
@Test
154154
public void shouldNotRunWhenReleased()
155155
{
156+
ResponseHandler runHandler = mock( ResponseHandler.class );
157+
ResponseHandler pullAllHandler = mock( ResponseHandler.class );
156158
NettyConnection connection = newConnection( new EmbeddedChannel() );
157159

158160
connection.release();
161+
connection.run( "RETURN 1", emptyMap(), runHandler, pullAllHandler );
159162

160-
try
161-
{
162-
connection.run( "RETURN 1", emptyMap(), mock( ResponseHandler.class ), mock( ResponseHandler.class ) );
163-
fail( "Exception expected" );
164-
}
165-
catch ( IllegalStateException e )
166-
{
167-
assertConnectionReleasedError( e );
168-
}
163+
ArgumentCaptor<IllegalStateException> failureCaptor = ArgumentCaptor.forClass( IllegalStateException.class );
164+
verify( runHandler ).onFailure( failureCaptor.capture() );
165+
assertConnectionReleasedError( failureCaptor.getValue() );
169166
}
170167

171168
@Test
172169
public void shouldNotRunAndFlushWhenReleased()
173170
{
171+
ResponseHandler runHandler = mock( ResponseHandler.class );
172+
ResponseHandler pullAllHandler = mock( ResponseHandler.class );
174173
NettyConnection connection = newConnection( new EmbeddedChannel() );
175174

176175
connection.release();
176+
connection.runAndFlush( "RETURN 1", emptyMap(), runHandler, pullAllHandler );
177177

178-
try
179-
{
180-
connection.runAndFlush( "RETURN 1", emptyMap(), mock( ResponseHandler.class ),
181-
mock( ResponseHandler.class ) );
182-
fail( "Exception expected" );
183-
}
184-
catch ( IllegalStateException e )
185-
{
186-
assertConnectionReleasedError( e );
187-
}
178+
ArgumentCaptor<IllegalStateException> failureCaptor = ArgumentCaptor.forClass( IllegalStateException.class );
179+
verify( runHandler ).onFailure( failureCaptor.capture() );
180+
assertConnectionReleasedError( failureCaptor.getValue() );
181+
}
182+
183+
@Test
184+
public void shouldNotRunWhenTerminated()
185+
{
186+
ResponseHandler runHandler = mock( ResponseHandler.class );
187+
ResponseHandler pullAllHandler = mock( ResponseHandler.class );
188+
NettyConnection connection = newConnection( new EmbeddedChannel() );
189+
190+
connection.terminateAndRelease( "42" );
191+
connection.run( "RETURN 1", emptyMap(), runHandler, pullAllHandler );
192+
193+
ArgumentCaptor<IllegalStateException> failureCaptor = ArgumentCaptor.forClass( IllegalStateException.class );
194+
verify( runHandler ).onFailure( failureCaptor.capture() );
195+
assertConnectionTerminatedError( failureCaptor.getValue() );
196+
}
197+
198+
@Test
199+
public void shouldNotRunAndFlushWhenTerminated()
200+
{
201+
ResponseHandler runHandler = mock( ResponseHandler.class );
202+
ResponseHandler pullAllHandler = mock( ResponseHandler.class );
203+
NettyConnection connection = newConnection( new EmbeddedChannel() );
204+
205+
connection.terminateAndRelease( "42" );
206+
connection.runAndFlush( "RETURN 1", emptyMap(), runHandler, pullAllHandler );
207+
208+
ArgumentCaptor<IllegalStateException> failureCaptor = ArgumentCaptor.forClass( IllegalStateException.class );
209+
verify( runHandler ).onFailure( failureCaptor.capture() );
210+
assertConnectionTerminatedError( failureCaptor.getValue() );
188211
}
189212

190213
@Test
@@ -382,6 +405,11 @@ private static void assertConnectionReleasedError( IllegalStateException e )
382405
assertThat( e.getMessage(), startsWith( "Connection has been released" ) );
383406
}
384407

408+
private static void assertConnectionTerminatedError( IllegalStateException e )
409+
{
410+
assertThat( e.getMessage(), startsWith( "Connection has been terminated" ) );
411+
}
412+
385413
private static class ThreadTrackingInboundMessageDispatcher extends InboundMessageDispatcher
386414
{
387415

0 commit comments

Comments
 (0)