Skip to content

Commit 2124cac

Browse files
committed
Treat request errors as fatal
Fatal errors result in network channel being closed. Previously only some errors that originate in the driver were treated as fatal. However some errors received from the database mean communication is broken. This commit makes protocol errors that start with `Neo.ClientError.Request` be treated as fatal. No ACK_FAILURE will be send for such errors and netty channel will be closed.
1 parent 9c0f8f5 commit 2124cac

21 files changed

+637
-70
lines changed

driver/src/main/java/org/neo4j/driver/internal/async/ChannelConnectedListener.java

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,27 +21,29 @@
2121
import io.netty.channel.Channel;
2222
import io.netty.channel.ChannelFuture;
2323
import io.netty.channel.ChannelFutureListener;
24+
import io.netty.channel.ChannelPipeline;
2425
import io.netty.channel.ChannelPromise;
2526

2627
import org.neo4j.driver.v1.Logging;
2728
import org.neo4j.driver.v1.exceptions.ServiceUnavailableException;
2829

2930
import static java.lang.String.format;
30-
import static java.util.Objects.requireNonNull;
3131
import static org.neo4j.driver.internal.async.ProtocolUtil.handshake;
3232

3333
public class ChannelConnectedListener implements ChannelFutureListener
3434
{
3535
private final BoltServerAddress address;
36+
private final ChannelPipelineBuilder pipelineBuilder;
3637
private final ChannelPromise handshakeCompletedPromise;
3738
private final Logging logging;
3839

39-
public ChannelConnectedListener( BoltServerAddress address, ChannelPromise handshakeCompletedPromise,
40-
Logging logging )
40+
public ChannelConnectedListener( BoltServerAddress address, ChannelPipelineBuilder pipelineBuilder,
41+
ChannelPromise handshakeCompletedPromise, Logging logging )
4142
{
42-
this.address = requireNonNull( address );
43-
this.handshakeCompletedPromise = requireNonNull( handshakeCompletedPromise );
44-
this.logging = requireNonNull( logging );
43+
this.address = address;
44+
this.pipelineBuilder = pipelineBuilder;
45+
this.handshakeCompletedPromise = handshakeCompletedPromise;
46+
this.logging = logging;
4547
}
4648

4749
@Override
@@ -51,7 +53,8 @@ public void operationComplete( ChannelFuture future )
5153

5254
if ( future.isSuccess() )
5355
{
54-
channel.pipeline().addLast( new HandshakeResponseHandler( handshakeCompletedPromise, logging ) );
56+
ChannelPipeline pipeline = channel.pipeline();
57+
pipeline.addLast( new HandshakeResponseHandler( pipelineBuilder, handshakeCompletedPromise, logging ) );
5558
ChannelFuture handshakeFuture = channel.writeAndFlush( handshake() );
5659

5760
handshakeFuture.addListener( channelFuture ->

driver/src/main/java/org/neo4j/driver/internal/async/ChannelConnectorImpl.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,17 +43,25 @@ public class ChannelConnectorImpl implements ChannelConnector
4343
private final String userAgent;
4444
private final Map<String,Value> authToken;
4545
private final SecurityPlan securityPlan;
46+
private final ChannelPipelineBuilder pipelineBuilder;
4647
private final int connectTimeoutMillis;
4748
private final Logging logging;
4849
private final Clock clock;
4950

5051
public ChannelConnectorImpl( ConnectionSettings connectionSettings, SecurityPlan securityPlan, Logging logging,
5152
Clock clock )
53+
{
54+
this( connectionSettings, securityPlan, new ChannelPipelineBuilderImpl(), logging, clock );
55+
}
56+
57+
public ChannelConnectorImpl( ConnectionSettings connectionSettings, SecurityPlan securityPlan,
58+
ChannelPipelineBuilder pipelineBuilder, Logging logging, Clock clock )
5259
{
5360
this.userAgent = connectionSettings.userAgent();
5461
this.authToken = tokenAsMap( connectionSettings.authToken() );
5562
this.connectTimeoutMillis = connectionSettings.connectTimeoutMillis();
5663
this.securityPlan = requireNonNull( securityPlan );
64+
this.pipelineBuilder = pipelineBuilder;
5765
this.logging = requireNonNull( logging );
5866
this.clock = requireNonNull( clock );
5967
}
@@ -70,8 +78,10 @@ public ChannelFuture connect( BoltServerAddress address, Bootstrap bootstrap )
7078
ChannelPromise handshakeCompleted = channel.newPromise();
7179
ChannelPromise connectionInitialized = channel.newPromise();
7280

73-
channelConnected.addListener( new ChannelConnectedListener( address, handshakeCompleted, logging ) );
74-
handshakeCompleted.addListener( new HandshakeCompletedListener( userAgent, authToken, connectionInitialized ) );
81+
channelConnected.addListener(
82+
new ChannelConnectedListener( address, pipelineBuilder, handshakeCompleted, logging ) );
83+
handshakeCompleted.addListener(
84+
new HandshakeCompletedListener( userAgent, authToken, connectionInitialized ) );
7585

7686
return connectionInitialized;
7787
}

driver/src/main/java/org/neo4j/driver/internal/async/ChannelErrorHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ private void fail( ChannelHandlerContext ctx, Throwable error )
9393
{
9494
Throwable cause = transformError( error );
9595
messageDispatcher.handleFatalError( cause );
96-
log.debug( "Closing channel: %s", ctx.channel() );
96+
log.debug( "Closing channel because of an error: %s", ctx.channel() );
9797
ctx.close();
9898
}
9999

driver/src/main/java/org/neo4j/driver/internal/async/ChannelPipelineBuilder.java

Lines changed: 2 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -18,35 +18,12 @@
1818
*/
1919
package org.neo4j.driver.internal.async;
2020

21-
import io.netty.channel.Channel;
2221
import io.netty.channel.ChannelPipeline;
2322

24-
import org.neo4j.driver.internal.async.inbound.ChunkDecoder;
25-
import org.neo4j.driver.internal.async.inbound.InboundMessageHandler;
26-
import org.neo4j.driver.internal.async.inbound.MessageDecoder;
27-
import org.neo4j.driver.internal.async.outbound.OutboundMessageHandler;
2823
import org.neo4j.driver.internal.messaging.MessageFormat;
2924
import org.neo4j.driver.v1.Logging;
3025

31-
public final class ChannelPipelineBuilder
26+
public interface ChannelPipelineBuilder
3227
{
33-
private ChannelPipelineBuilder()
34-
{
35-
}
36-
37-
public static void buildPipeline( Channel channel, MessageFormat messageFormat, Logging logging )
38-
{
39-
ChannelPipeline pipeline = channel.pipeline();
40-
41-
// inbound handlers
42-
pipeline.addLast( new ChunkDecoder() );
43-
pipeline.addLast( new MessageDecoder() );
44-
pipeline.addLast( new InboundMessageHandler( messageFormat, logging ) );
45-
46-
// outbound handlers
47-
pipeline.addLast( OutboundMessageHandler.NAME, new OutboundMessageHandler( messageFormat, logging ) );
48-
49-
// last one - error handler
50-
pipeline.addLast( new ChannelErrorHandler( logging ) );
51-
}
28+
void build( MessageFormat messageFormat, ChannelPipeline pipeline, Logging logging );
5229
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Copyright (c) 2002-2017 "Neo Technology,"
3+
* Network Engine for Objects in Lund AB [http://neotechnology.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver.internal.async;
20+
21+
import io.netty.channel.ChannelPipeline;
22+
23+
import org.neo4j.driver.internal.async.inbound.ChunkDecoder;
24+
import org.neo4j.driver.internal.async.inbound.InboundMessageHandler;
25+
import org.neo4j.driver.internal.async.inbound.MessageDecoder;
26+
import org.neo4j.driver.internal.async.outbound.OutboundMessageHandler;
27+
import org.neo4j.driver.internal.messaging.MessageFormat;
28+
import org.neo4j.driver.v1.Logging;
29+
30+
public class ChannelPipelineBuilderImpl implements ChannelPipelineBuilder
31+
{
32+
@Override
33+
public void build( MessageFormat messageFormat, ChannelPipeline pipeline, Logging logging )
34+
{
35+
// inbound handlers
36+
pipeline.addLast( new ChunkDecoder() );
37+
pipeline.addLast( new MessageDecoder() );
38+
pipeline.addLast( new InboundMessageHandler( messageFormat, logging ) );
39+
40+
// outbound handlers
41+
pipeline.addLast( OutboundMessageHandler.NAME, new OutboundMessageHandler( messageFormat, logging ) );
42+
43+
// last one - error handler
44+
pipeline.addLast( new ChannelErrorHandler( logging ) );
45+
}
46+
}

driver/src/main/java/org/neo4j/driver/internal/async/HandshakeResponseHandler.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,12 +41,15 @@
4141

4242
public class HandshakeResponseHandler extends ReplayingDecoder<Void>
4343
{
44+
private final ChannelPipelineBuilder pipelineBuilder;
4445
private final ChannelPromise handshakeCompletedPromise;
4546
private final Logging logging;
4647
private final Logger log;
4748

48-
public HandshakeResponseHandler( ChannelPromise handshakeCompletedPromise, Logging logging )
49+
public HandshakeResponseHandler( ChannelPipelineBuilder pipelineBuilder, ChannelPromise handshakeCompletedPromise,
50+
Logging logging )
4951
{
52+
this.pipelineBuilder = pipelineBuilder;
5053
this.handshakeCompletedPromise = handshakeCompletedPromise;
5154
this.logging = logging;
5255
this.log = logging.getLog( getClass().getSimpleName() );
@@ -80,10 +83,9 @@ protected void decode( ChannelHandlerContext ctx, ByteBuf in, List<Object> out )
8083
switch ( serverSuggestedVersion )
8184
{
8285
case PROTOCOL_VERSION_1:
83-
MessageFormat format = new PackStreamMessageFormatV1();
84-
ChannelPipelineBuilder.buildPipeline( ctx.channel(), format, logging );
86+
MessageFormat messageFormat = new PackStreamMessageFormatV1();
87+
pipelineBuilder.build( messageFormat, pipeline, logging );
8588
handshakeCompletedPromise.setSuccess();
86-
8789
break;
8890
case NO_PROTOCOL_VERSION:
8991
fail( ctx, protocolNoSupportedByServerError() );

driver/src/main/java/org/neo4j/driver/internal/async/inbound/InboundMessageDispatcher.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,14 @@ public void handleFailureMessage( String code, String message )
134134
log.debug( "Received FAILURE message with code '%s' and message '%s'", code, message );
135135
currentError = ErrorUtil.newNeo4jError( code, message );
136136

137+
if ( ErrorUtil.isFatal( currentError ) )
138+
{
139+
// we should not continue using channel after a fatal error
140+
// fire error event back to the pipeline and avoid sending ACK_FAILURE
141+
channel.pipeline().fireExceptionCaught( currentError );
142+
return;
143+
}
144+
137145
// try to write ACK_FAILURE before notifying the next response handler
138146
ackFailureIfNeeded();
139147

@@ -180,6 +188,11 @@ public Throwable currentError()
180188
return currentError;
181189
}
182190

191+
public boolean fatalErrorOccurred()
192+
{
193+
return fatalErrorOccurred;
194+
}
195+
183196
/**
184197
* Makes this message dispatcher not send ACK_FAILURE in response to FAILURE until it's un-muted using
185198
* {@link #unMuteAckFailure()}. Muting ACK_FAILURE is needed <b>only</b> when sending RESET message. RESET "jumps"

driver/src/main/java/org/neo4j/driver/internal/async/inbound/InboundMessageHandler.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,9 +61,16 @@ public void handlerRemoved( ChannelHandlerContext ctx )
6161
@Override
6262
protected void channelRead0( ChannelHandlerContext ctx, ByteBuf msg )
6363
{
64+
if ( messageDispatcher.fatalErrorOccurred() )
65+
{
66+
log.warn( "Message ignored because of the previous fatal error. Channel will be closed. Message:\n%s\n",
67+
prettyHexDump( msg ) );
68+
return;
69+
}
70+
6471
if ( log.isTraceEnabled() )
6572
{
66-
log.trace( "Inbound message received: \n%s\n", prettyHexDump( msg ) );
73+
log.trace( "Inbound message received:\n%s\n", prettyHexDump( msg ) );
6774
}
6875

6976
input.start( msg );

driver/src/main/java/org/neo4j/driver/internal/async/outbound/OutboundMessageHandler.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -68,11 +68,7 @@ protected void encode( ChannelHandlerContext ctx, Message msg, List<Object> out
6868
}
6969
catch ( Throwable error )
7070
{
71-
EncoderException exception = new EncoderException( "Failed to write outbound message: " + msg, error );
72-
// tell ChannelErrorHandler which is the last handler in the pipeline about this error
73-
ctx.fireExceptionCaught( exception );
74-
// rethrow, encoder contract requires handler to either fail or populate out list
75-
throw exception;
71+
throw new EncoderException( "Failed to write outbound message: " + msg, error );
7672
}
7773
finally
7874
{

driver/src/main/java/org/neo4j/driver/internal/util/ErrorUtil.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -51,22 +51,21 @@ public static Neo4jException newNeo4jError( String code, String message )
5151
}
5252
}
5353

54-
// todo: use this method and close channel after unrecoverable error
55-
public static boolean isRecoverable( Throwable error )
54+
public static boolean isFatal( Throwable error )
5655
{
5756
if ( error instanceof Neo4jException )
5857
{
5958
if ( isProtocolViolationError( ((Neo4jException) error) ) )
6059
{
61-
return false;
60+
return true;
6261
}
6362

6463
if ( isClientOrTransientError( ((Neo4jException) error) ) )
6564
{
66-
return true;
65+
return false;
6766
}
6867
}
69-
return false;
68+
return true;
7069
}
7170

7271
private static boolean isProtocolViolationError( Neo4jException error )
@@ -84,6 +83,10 @@ private static boolean isClientOrTransientError( Neo4jException error )
8483
private static String extractClassification( String code )
8584
{
8685
String[] parts = code.split( "\\." );
86+
if ( parts.length < 2 )
87+
{
88+
return "";
89+
}
8790
return parts[1];
8891
}
8992
}

driver/src/test/java/org/neo4j/driver/internal/async/ChannelConnectedListenerTest.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,17 +33,17 @@
3333
import static org.junit.Assert.assertThat;
3434
import static org.junit.Assert.assertTrue;
3535
import static org.junit.Assert.fail;
36+
import static org.neo4j.driver.internal.async.BoltServerAddress.LOCAL_DEFAULT;
3637
import static org.neo4j.driver.internal.async.ProtocolUtil.handshake;
3738
import static org.neo4j.driver.internal.logging.DevNullLogging.DEV_NULL_LOGGING;
38-
import static org.neo4j.driver.internal.async.BoltServerAddress.LOCAL_DEFAULT;
3939
import static org.neo4j.driver.v1.util.TestUtil.await;
4040

4141
public class ChannelConnectedListenerTest
4242
{
4343
private final EmbeddedChannel channel = new EmbeddedChannel();
4444

4545
@After
46-
public void tearDown() throws Exception
46+
public void tearDown()
4747
{
4848
channel.close();
4949
}
@@ -90,6 +90,7 @@ public void shouldWriteHandshakeWhenChannelConnected()
9090

9191
private static ChannelConnectedListener newListener( ChannelPromise handshakeCompletedPromise )
9292
{
93-
return new ChannelConnectedListener( LOCAL_DEFAULT, handshakeCompletedPromise, DEV_NULL_LOGGING );
93+
return new ChannelConnectedListener( LOCAL_DEFAULT, new ChannelPipelineBuilderImpl(),
94+
handshakeCompletedPromise, DEV_NULL_LOGGING );
9495
}
9596
}

driver/src/test/java/org/neo4j/driver/internal/async/ChannelPipelineBuilderTest.java renamed to driver/src/test/java/org/neo4j/driver/internal/async/ChannelPipelineBuilderImplTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,15 +37,15 @@
3737
import static org.junit.Assert.assertThat;
3838
import static org.neo4j.driver.internal.logging.DevNullLogging.DEV_NULL_LOGGING;
3939

40-
public class ChannelPipelineBuilderTest
40+
public class ChannelPipelineBuilderImplTest
4141
{
4242
@Test
4343
public void shouldBuildPipeline()
4444
{
4545
EmbeddedChannel channel = new EmbeddedChannel();
4646
ChannelAttributes.setMessageDispatcher( channel, new InboundMessageDispatcher( channel, DEV_NULL_LOGGING ) );
4747

48-
ChannelPipelineBuilder.buildPipeline( channel, new PackStreamMessageFormatV1(), DEV_NULL_LOGGING );
48+
new ChannelPipelineBuilderImpl().build( new PackStreamMessageFormatV1(), channel.pipeline(), DEV_NULL_LOGGING );
4949

5050
Iterator<Map.Entry<String,ChannelHandler>> iterator = channel.pipeline().iterator();
5151
assertThat( iterator.next().getValue(), instanceOf( ChunkDecoder.class ) );

driver/src/test/java/org/neo4j/driver/internal/async/HandshakeResponseHandlerTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,7 @@ private void testFailure( int serverSuggestedVersion, String expectedMessagePref
158158

159159
private static HandshakeResponseHandler newHandler( ChannelPromise handshakeCompletedPromise )
160160
{
161-
return new HandshakeResponseHandler( handshakeCompletedPromise, DEV_NULL_LOGGING );
161+
return new HandshakeResponseHandler( new ChannelPipelineBuilderImpl(), handshakeCompletedPromise,
162+
DEV_NULL_LOGGING );
162163
}
163164
}

driver/src/test/java/org/neo4j/driver/internal/messaging/MessageFormatTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
import org.neo4j.driver.internal.InternalNode;
3333
import org.neo4j.driver.internal.InternalPath;
3434
import org.neo4j.driver.internal.InternalRelationship;
35-
import org.neo4j.driver.internal.async.ChannelPipelineBuilder;
35+
import org.neo4j.driver.internal.async.ChannelPipelineBuilderImpl;
3636
import org.neo4j.driver.internal.async.inbound.InboundMessageDispatcher;
3737
import org.neo4j.driver.internal.async.outbound.ChunkAwareByteBufOutput;
3838
import org.neo4j.driver.internal.packstream.PackStream;
@@ -151,7 +151,7 @@ private EmbeddedChannel newEmbeddedChannel()
151151
{
152152
EmbeddedChannel channel = new EmbeddedChannel();
153153
setMessageDispatcher( channel, new MemorizingInboundMessageDispatcher( channel, DEV_NULL_LOGGING ) );
154-
ChannelPipelineBuilder.buildPipeline( channel, format, DEV_NULL_LOGGING );
154+
new ChannelPipelineBuilderImpl().build( format, channel.pipeline(), DEV_NULL_LOGGING );
155155
return channel;
156156
}
157157

0 commit comments

Comments
 (0)