Skip to content

Commit ef48d78

Browse files
authored
Merge pull request #484 from lutovich/1.5-tx-conn-error
Improve fatal error propagation in commit & rollback
2 parents 364fca4 + f2eec3f commit ef48d78

File tree

14 files changed

+314
-231
lines changed

14 files changed

+314
-231
lines changed

driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -173,11 +173,6 @@ else if ( state == State.ROLLED_BACK )
173173
{
174174
return failedFuture( new ClientException( "Can't commit, transaction has been rolled back" ) );
175175
}
176-
else if ( state == State.TERMINATED )
177-
{
178-
transactionClosed( State.ROLLED_BACK );
179-
return failedFuture( new ClientException( "Can't commit, transaction has been terminated" ) );
180-
}
181176
else
182177
{
183178
return resultCursors.retrieveNotConsumedError()
@@ -197,12 +192,6 @@ else if ( state == State.ROLLED_BACK )
197192
{
198193
return completedWithNull();
199194
}
200-
else if ( state == State.TERMINATED )
201-
{
202-
// no need for explicit rollback, transaction should've been rolled back by the database
203-
transactionClosed( State.ROLLED_BACK );
204-
return completedWithNull();
205-
}
206195
else
207196
{
208197
return resultCursors.retrieveNotConsumedError()
@@ -344,6 +333,11 @@ public void setBookmark( Bookmark bookmark )
344333

345334
private CompletionStage<Void> doCommitAsync()
346335
{
336+
if ( state == State.TERMINATED )
337+
{
338+
return failedFuture( new ClientException( "Can't commit, transaction has been terminated" ) );
339+
}
340+
347341
CompletableFuture<Void> commitFuture = new CompletableFuture<>();
348342
ResponseHandler pullAllHandler = new CommitTxResponseHandler( commitFuture, this );
349343
connection.runAndFlush( COMMIT_QUERY, emptyMap(), NoOpResponseHandler.INSTANCE, pullAllHandler );
@@ -352,6 +346,11 @@ private CompletionStage<Void> doCommitAsync()
352346

353347
private CompletionStage<Void> doRollbackAsync()
354348
{
349+
if ( state == State.TERMINATED )
350+
{
351+
return completedWithNull();
352+
}
353+
355354
CompletableFuture<Void> rollbackFuture = new CompletableFuture<>();
356355
ResponseHandler pullAllHandler = new RollbackTxResponseHandler( rollbackFuture );
357356
connection.runAndFlush( ROLLBACK_QUERY, emptyMap(), NoOpResponseHandler.INSTANCE, pullAllHandler );

driver/src/main/java/org/neo4j/driver/internal/async/HandshakeHandler.java

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -95,16 +95,8 @@ public void exceptionCaught( ChannelHandlerContext ctx, Throwable error )
9595
else
9696
{
9797
failed = true;
98-
99-
Throwable cause = error instanceof DecoderException ? error.getCause() : error;
100-
if ( cause instanceof SSLHandshakeException )
101-
{
102-
fail( ctx, new SecurityException( "Failed to establish secured connection with the server", cause ) );
103-
}
104-
else
105-
{
106-
fail( ctx, cause );
107-
}
98+
Throwable cause = transformError( error );
99+
fail( ctx, cause );
108100
}
109101
}
110102

@@ -161,4 +153,21 @@ private static Throwable protocolNoSupportedByDriverError( int suggestedProtocol
161153
return new ClientException(
162154
"Protocol error, server suggested unexpected protocol version: " + suggestedProtocolVersion );
163155
}
156+
157+
private static Throwable transformError( Throwable error )
158+
{
159+
Throwable cause = error instanceof DecoderException ? error.getCause() : error;
160+
if ( cause instanceof ServiceUnavailableException )
161+
{
162+
return cause;
163+
}
164+
else if ( cause instanceof SSLHandshakeException )
165+
{
166+
return new SecurityException( "Failed to establish secured connection with the server", cause );
167+
}
168+
else
169+
{
170+
return new ServiceUnavailableException( "Failed to establish connection with the server", cause );
171+
}
172+
}
164173
}

driver/src/main/java/org/neo4j/driver/internal/async/inbound/ChannelErrorHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ private void fail( ChannelHandlerContext ctx, Throwable error )
101101
ctx.close();
102102
}
103103

104-
private Throwable transformError( Throwable error )
104+
private static Throwable transformError( Throwable error )
105105
{
106106
if ( error instanceof CodecException )
107107
{

driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingSettings.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,12 @@
1818
*/
1919
package org.neo4j.driver.internal.cluster;
2020

21+
import static java.util.concurrent.TimeUnit.SECONDS;
22+
2123
public class RoutingSettings
2224
{
25+
public static final RoutingSettings DEFAULT = new RoutingSettings( 1, SECONDS.toMillis( 5 ) );
26+
2327
private final int maxRoutingFailures;
2428
private final long retryTimeoutDelay;
2529
private final RoutingContext routingContext;

driver/src/main/java/org/neo4j/driver/internal/handlers/BookmarkResponseHandler.java

Lines changed: 0 additions & 56 deletions
This file was deleted.

driver/src/main/java/org/neo4j/driver/v1/Config.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -267,8 +267,8 @@ public static class ConfigBuilder
267267
private boolean encrypted = true;
268268
private TrustStrategy trustStrategy = trustAllCertificates();
269269
private LoadBalancingStrategy loadBalancingStrategy = LoadBalancingStrategy.LEAST_CONNECTED;
270-
private int routingFailureLimit = 1;
271-
private long routingRetryDelayMillis = TimeUnit.SECONDS.toMillis( 5 );
270+
private int routingFailureLimit = RoutingSettings.DEFAULT.maxRoutingFailures();
271+
private long routingRetryDelayMillis = RoutingSettings.DEFAULT.retryTimeoutDelay();
272272
private int connectionTimeoutMillis = (int) TimeUnit.SECONDS.toMillis( 5 );
273273
private RetrySettings retrySettings = RetrySettings.DEFAULT;
274274

driver/src/test/java/org/neo4j/driver/internal/async/ChannelConnectorImplIT.java

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,9 @@
2929
import org.junit.Test;
3030

3131
import java.io.IOException;
32+
import java.io.UncheckedIOException;
3233
import java.net.ServerSocket;
34+
import java.net.Socket;
3335
import java.util.concurrent.ExecutionException;
3436
import java.util.concurrent.TimeUnit;
3537

@@ -44,6 +46,7 @@
4446
import org.neo4j.driver.v1.exceptions.ServiceUnavailableException;
4547
import org.neo4j.driver.v1.util.TestNeo4j;
4648

49+
import static java.util.concurrent.CompletableFuture.runAsync;
4750
import static org.hamcrest.Matchers.instanceOf;
4851
import static org.hamcrest.Matchers.startsWith;
4952
import static org.junit.Assert.assertEquals;
@@ -186,6 +189,42 @@ public void shouldFailWhenTLSHandshakeTakesTooLong() throws Exception
186189
testReadTimeoutOnConnect( SecurityPlan.forAllCertificates() );
187190
}
188191

192+
@Test
193+
public void shouldThrowServiceUnavailableExceptionOnFailureDuringConnect() throws Exception
194+
{
195+
ServerSocket server = new ServerSocket( 0 );
196+
BoltServerAddress address = new BoltServerAddress( "localhost", server.getLocalPort() );
197+
198+
runAsync( () ->
199+
{
200+
try
201+
{
202+
// wait for a connection
203+
Socket socket = server.accept();
204+
// and terminate it immediately so that client gets a "reset by peer" IOException
205+
socket.close();
206+
server.close();
207+
}
208+
catch ( IOException e )
209+
{
210+
throw new UncheckedIOException( e );
211+
}
212+
} );
213+
214+
ChannelConnector connector = newConnector( neo4j.authToken() );
215+
ChannelFuture channelFuture = connector.connect( address, bootstrap );
216+
217+
// connect operation should fail with ServiceUnavailableException
218+
try
219+
{
220+
await( channelFuture );
221+
fail( "Exception expected" );
222+
}
223+
catch ( ServiceUnavailableException ignore )
224+
{
225+
}
226+
}
227+
189228
private void testReadTimeoutOnConnect( SecurityPlan securityPlan ) throws IOException
190229
{
191230
try ( ServerSocket server = new ServerSocket( 0 ) ) // server that accepts connections but does not reply

driver/src/test/java/org/neo4j/driver/internal/async/HandshakeHandlerTest.java

Lines changed: 32 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -46,10 +46,10 @@
4646
import static org.junit.Assert.assertNull;
4747
import static org.junit.Assert.assertThat;
4848
import static org.junit.Assert.fail;
49-
import static org.neo4j.driver.internal.async.ChannelAttributes.setMessageDispatcher;
5049
import static org.neo4j.driver.internal.async.BoltProtocolV1Util.HTTP;
5150
import static org.neo4j.driver.internal.async.BoltProtocolV1Util.NO_PROTOCOL_VERSION;
5251
import static org.neo4j.driver.internal.async.BoltProtocolV1Util.PROTOCOL_VERSION_1;
52+
import static org.neo4j.driver.internal.async.ChannelAttributes.setMessageDispatcher;
5353
import static org.neo4j.driver.internal.logging.DevNullLogging.DEV_NULL_LOGGING;
5454
import static org.neo4j.driver.v1.util.TestUtil.await;
5555

@@ -85,9 +85,34 @@ public void shouldFailGivenPromiseWhenExceptionCaught()
8585
await( handshakeCompletedPromise );
8686
fail( "Exception expected" );
8787
}
88-
catch ( Exception e )
88+
catch ( ServiceUnavailableException e )
89+
{
90+
assertEquals( cause, e.getCause() );
91+
}
92+
93+
// channel should be closed
94+
assertNull( await( channel.closeFuture() ) );
95+
}
96+
97+
@Test
98+
public void shouldFailGivenPromiseWhenServiceUnavailableExceptionCaught()
99+
{
100+
ChannelPromise handshakeCompletedPromise = channel.newPromise();
101+
HandshakeHandler handler = newHandler( handshakeCompletedPromise );
102+
channel.pipeline().addLast( handler );
103+
104+
ServiceUnavailableException error = new ServiceUnavailableException( "Bad error" );
105+
channel.pipeline().fireExceptionCaught( error );
106+
107+
try
89108
{
90-
assertEquals( cause, e );
109+
// promise should fail
110+
await( handshakeCompletedPromise );
111+
fail( "Exception expected" );
112+
}
113+
catch ( ServiceUnavailableException e )
114+
{
115+
assertEquals( error, e );
91116
}
92117

93118
// channel should be closed
@@ -112,9 +137,9 @@ public void shouldFailGivenPromiseWhenMultipleExceptionsCaught()
112137
await( handshakeCompletedPromise );
113138
fail( "Exception expected" );
114139
}
115-
catch ( RuntimeException e )
140+
catch ( ServiceUnavailableException e )
116141
{
117-
assertEquals( error1, e );
142+
assertEquals( error1, e.getCause() );
118143
}
119144

120145
// channel should be closed
@@ -147,9 +172,9 @@ public void shouldUnwrapDecoderException()
147172
await( handshakeCompletedPromise );
148173
fail( "Exception expected" );
149174
}
150-
catch ( Exception e )
175+
catch ( ServiceUnavailableException e )
151176
{
152-
assertEquals( cause, e );
177+
assertEquals( cause, e.getCause() );
153178
}
154179

155180
// channel should be closed

0 commit comments

Comments
 (0)