Skip to content

Commit eb67a9c

Browse files
authored
Merge pull request #419 from lutovich/1.5-fatal-errors-from-db
Treat request errors as fatal
2 parents 9c0f8f5 + 2124cac commit eb67a9c

21 files changed

+637
-70
lines changed

driver/src/main/java/org/neo4j/driver/internal/async/ChannelConnectedListener.java

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,27 +21,29 @@
2121
import io.netty.channel.Channel;
2222
import io.netty.channel.ChannelFuture;
2323
import io.netty.channel.ChannelFutureListener;
24+
import io.netty.channel.ChannelPipeline;
2425
import io.netty.channel.ChannelPromise;
2526

2627
import org.neo4j.driver.v1.Logging;
2728
import org.neo4j.driver.v1.exceptions.ServiceUnavailableException;
2829

2930
import static java.lang.String.format;
30-
import static java.util.Objects.requireNonNull;
3131
import static org.neo4j.driver.internal.async.ProtocolUtil.handshake;
3232

3333
public class ChannelConnectedListener implements ChannelFutureListener
3434
{
3535
private final BoltServerAddress address;
36+
private final ChannelPipelineBuilder pipelineBuilder;
3637
private final ChannelPromise handshakeCompletedPromise;
3738
private final Logging logging;
3839

39-
public ChannelConnectedListener( BoltServerAddress address, ChannelPromise handshakeCompletedPromise,
40-
Logging logging )
40+
public ChannelConnectedListener( BoltServerAddress address, ChannelPipelineBuilder pipelineBuilder,
41+
ChannelPromise handshakeCompletedPromise, Logging logging )
4142
{
42-
this.address = requireNonNull( address );
43-
this.handshakeCompletedPromise = requireNonNull( handshakeCompletedPromise );
44-
this.logging = requireNonNull( logging );
43+
this.address = address;
44+
this.pipelineBuilder = pipelineBuilder;
45+
this.handshakeCompletedPromise = handshakeCompletedPromise;
46+
this.logging = logging;
4547
}
4648

4749
@Override
@@ -51,7 +53,8 @@ public void operationComplete( ChannelFuture future )
5153

5254
if ( future.isSuccess() )
5355
{
54-
channel.pipeline().addLast( new HandshakeResponseHandler( handshakeCompletedPromise, logging ) );
56+
ChannelPipeline pipeline = channel.pipeline();
57+
pipeline.addLast( new HandshakeResponseHandler( pipelineBuilder, handshakeCompletedPromise, logging ) );
5558
ChannelFuture handshakeFuture = channel.writeAndFlush( handshake() );
5659

5760
handshakeFuture.addListener( channelFuture ->

driver/src/main/java/org/neo4j/driver/internal/async/ChannelConnectorImpl.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,17 +43,25 @@ public class ChannelConnectorImpl implements ChannelConnector
4343
private final String userAgent;
4444
private final Map<String,Value> authToken;
4545
private final SecurityPlan securityPlan;
46+
private final ChannelPipelineBuilder pipelineBuilder;
4647
private final int connectTimeoutMillis;
4748
private final Logging logging;
4849
private final Clock clock;
4950

5051
public ChannelConnectorImpl( ConnectionSettings connectionSettings, SecurityPlan securityPlan, Logging logging,
5152
Clock clock )
53+
{
54+
this( connectionSettings, securityPlan, new ChannelPipelineBuilderImpl(), logging, clock );
55+
}
56+
57+
public ChannelConnectorImpl( ConnectionSettings connectionSettings, SecurityPlan securityPlan,
58+
ChannelPipelineBuilder pipelineBuilder, Logging logging, Clock clock )
5259
{
5360
this.userAgent = connectionSettings.userAgent();
5461
this.authToken = tokenAsMap( connectionSettings.authToken() );
5562
this.connectTimeoutMillis = connectionSettings.connectTimeoutMillis();
5663
this.securityPlan = requireNonNull( securityPlan );
64+
this.pipelineBuilder = pipelineBuilder;
5765
this.logging = requireNonNull( logging );
5866
this.clock = requireNonNull( clock );
5967
}
@@ -70,8 +78,10 @@ public ChannelFuture connect( BoltServerAddress address, Bootstrap bootstrap )
7078
ChannelPromise handshakeCompleted = channel.newPromise();
7179
ChannelPromise connectionInitialized = channel.newPromise();
7280

73-
channelConnected.addListener( new ChannelConnectedListener( address, handshakeCompleted, logging ) );
74-
handshakeCompleted.addListener( new HandshakeCompletedListener( userAgent, authToken, connectionInitialized ) );
81+
channelConnected.addListener(
82+
new ChannelConnectedListener( address, pipelineBuilder, handshakeCompleted, logging ) );
83+
handshakeCompleted.addListener(
84+
new HandshakeCompletedListener( userAgent, authToken, connectionInitialized ) );
7585

7686
return connectionInitialized;
7787
}

driver/src/main/java/org/neo4j/driver/internal/async/ChannelErrorHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ private void fail( ChannelHandlerContext ctx, Throwable error )
9393
{
9494
Throwable cause = transformError( error );
9595
messageDispatcher.handleFatalError( cause );
96-
log.debug( "Closing channel: %s", ctx.channel() );
96+
log.debug( "Closing channel because of an error: %s", ctx.channel() );
9797
ctx.close();
9898
}
9999

driver/src/main/java/org/neo4j/driver/internal/async/ChannelPipelineBuilder.java

Lines changed: 2 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -18,35 +18,12 @@
1818
*/
1919
package org.neo4j.driver.internal.async;
2020

21-
import io.netty.channel.Channel;
2221
import io.netty.channel.ChannelPipeline;
2322

24-
import org.neo4j.driver.internal.async.inbound.ChunkDecoder;
25-
import org.neo4j.driver.internal.async.inbound.InboundMessageHandler;
26-
import org.neo4j.driver.internal.async.inbound.MessageDecoder;
27-
import org.neo4j.driver.internal.async.outbound.OutboundMessageHandler;
2823
import org.neo4j.driver.internal.messaging.MessageFormat;
2924
import org.neo4j.driver.v1.Logging;
3025

31-
public final class ChannelPipelineBuilder
26+
public interface ChannelPipelineBuilder
3227
{
33-
private ChannelPipelineBuilder()
34-
{
35-
}
36-
37-
public static void buildPipeline( Channel channel, MessageFormat messageFormat, Logging logging )
38-
{
39-
ChannelPipeline pipeline = channel.pipeline();
40-
41-
// inbound handlers
42-
pipeline.addLast( new ChunkDecoder() );
43-
pipeline.addLast( new MessageDecoder() );
44-
pipeline.addLast( new InboundMessageHandler( messageFormat, logging ) );
45-
46-
// outbound handlers
47-
pipeline.addLast( OutboundMessageHandler.NAME, new OutboundMessageHandler( messageFormat, logging ) );
48-
49-
// last one - error handler
50-
pipeline.addLast( new ChannelErrorHandler( logging ) );
51-
}
28+
void build( MessageFormat messageFormat, ChannelPipeline pipeline, Logging logging );
5229
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Copyright (c) 2002-2017 "Neo Technology,"
3+
* Network Engine for Objects in Lund AB [http://neotechnology.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver.internal.async;
20+
21+
import io.netty.channel.ChannelPipeline;
22+
23+
import org.neo4j.driver.internal.async.inbound.ChunkDecoder;
24+
import org.neo4j.driver.internal.async.inbound.InboundMessageHandler;
25+
import org.neo4j.driver.internal.async.inbound.MessageDecoder;
26+
import org.neo4j.driver.internal.async.outbound.OutboundMessageHandler;
27+
import org.neo4j.driver.internal.messaging.MessageFormat;
28+
import org.neo4j.driver.v1.Logging;
29+
30+
public class ChannelPipelineBuilderImpl implements ChannelPipelineBuilder
31+
{
32+
@Override
33+
public void build( MessageFormat messageFormat, ChannelPipeline pipeline, Logging logging )
34+
{
35+
// inbound handlers
36+
pipeline.addLast( new ChunkDecoder() );
37+
pipeline.addLast( new MessageDecoder() );
38+
pipeline.addLast( new InboundMessageHandler( messageFormat, logging ) );
39+
40+
// outbound handlers
41+
pipeline.addLast( OutboundMessageHandler.NAME, new OutboundMessageHandler( messageFormat, logging ) );
42+
43+
// last one - error handler
44+
pipeline.addLast( new ChannelErrorHandler( logging ) );
45+
}
46+
}

driver/src/main/java/org/neo4j/driver/internal/async/HandshakeResponseHandler.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,12 +41,15 @@
4141

4242
public class HandshakeResponseHandler extends ReplayingDecoder<Void>
4343
{
44+
private final ChannelPipelineBuilder pipelineBuilder;
4445
private final ChannelPromise handshakeCompletedPromise;
4546
private final Logging logging;
4647
private final Logger log;
4748

48-
public HandshakeResponseHandler( ChannelPromise handshakeCompletedPromise, Logging logging )
49+
public HandshakeResponseHandler( ChannelPipelineBuilder pipelineBuilder, ChannelPromise handshakeCompletedPromise,
50+
Logging logging )
4951
{
52+
this.pipelineBuilder = pipelineBuilder;
5053
this.handshakeCompletedPromise = handshakeCompletedPromise;
5154
this.logging = logging;
5255
this.log = logging.getLog( getClass().getSimpleName() );
@@ -80,10 +83,9 @@ protected void decode( ChannelHandlerContext ctx, ByteBuf in, List<Object> out )
8083
switch ( serverSuggestedVersion )
8184
{
8285
case PROTOCOL_VERSION_1:
83-
MessageFormat format = new PackStreamMessageFormatV1();
84-
ChannelPipelineBuilder.buildPipeline( ctx.channel(), format, logging );
86+
MessageFormat messageFormat = new PackStreamMessageFormatV1();
87+
pipelineBuilder.build( messageFormat, pipeline, logging );
8588
handshakeCompletedPromise.setSuccess();
86-
8789
break;
8890
case NO_PROTOCOL_VERSION:
8991
fail( ctx, protocolNoSupportedByServerError() );

driver/src/main/java/org/neo4j/driver/internal/async/inbound/InboundMessageDispatcher.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,14 @@ public void handleFailureMessage( String code, String message )
134134
log.debug( "Received FAILURE message with code '%s' and message '%s'", code, message );
135135
currentError = ErrorUtil.newNeo4jError( code, message );
136136

137+
if ( ErrorUtil.isFatal( currentError ) )
138+
{
139+
// we should not continue using channel after a fatal error
140+
// fire error event back to the pipeline and avoid sending ACK_FAILURE
141+
channel.pipeline().fireExceptionCaught( currentError );
142+
return;
143+
}
144+
137145
// try to write ACK_FAILURE before notifying the next response handler
138146
ackFailureIfNeeded();
139147

@@ -180,6 +188,11 @@ public Throwable currentError()
180188
return currentError;
181189
}
182190

191+
public boolean fatalErrorOccurred()
192+
{
193+
return fatalErrorOccurred;
194+
}
195+
183196
/**
184197
* Makes this message dispatcher not send ACK_FAILURE in response to FAILURE until it's un-muted using
185198
* {@link #unMuteAckFailure()}. Muting ACK_FAILURE is needed <b>only</b> when sending RESET message. RESET "jumps"

driver/src/main/java/org/neo4j/driver/internal/async/inbound/InboundMessageHandler.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,9 +61,16 @@ public void handlerRemoved( ChannelHandlerContext ctx )
6161
@Override
6262
protected void channelRead0( ChannelHandlerContext ctx, ByteBuf msg )
6363
{
64+
if ( messageDispatcher.fatalErrorOccurred() )
65+
{
66+
log.warn( "Message ignored because of the previous fatal error. Channel will be closed. Message:\n%s\n",
67+
prettyHexDump( msg ) );
68+
return;
69+
}
70+
6471
if ( log.isTraceEnabled() )
6572
{
66-
log.trace( "Inbound message received: \n%s\n", prettyHexDump( msg ) );
73+
log.trace( "Inbound message received:\n%s\n", prettyHexDump( msg ) );
6774
}
6875

6976
input.start( msg );

driver/src/main/java/org/neo4j/driver/internal/async/outbound/OutboundMessageHandler.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -68,11 +68,7 @@ protected void encode( ChannelHandlerContext ctx, Message msg, List<Object> out
6868
}
6969
catch ( Throwable error )
7070
{
71-
EncoderException exception = new EncoderException( "Failed to write outbound message: " + msg, error );
72-
// tell ChannelErrorHandler which is the last handler in the pipeline about this error
73-
ctx.fireExceptionCaught( exception );
74-
// rethrow, encoder contract requires handler to either fail or populate out list
75-
throw exception;
71+
throw new EncoderException( "Failed to write outbound message: " + msg, error );
7672
}
7773
finally
7874
{

driver/src/main/java/org/neo4j/driver/internal/util/ErrorUtil.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -51,22 +51,21 @@ public static Neo4jException newNeo4jError( String code, String message )
5151
}
5252
}
5353

54-
// todo: use this method and close channel after unrecoverable error
55-
public static boolean isRecoverable( Throwable error )
54+
public static boolean isFatal( Throwable error )
5655
{
5756
if ( error instanceof Neo4jException )
5857
{
5958
if ( isProtocolViolationError( ((Neo4jException) error) ) )
6059
{
61-
return false;
60+
return true;
6261
}
6362

6463
if ( isClientOrTransientError( ((Neo4jException) error) ) )
6564
{
66-
return true;
65+
return false;
6766
}
6867
}
69-
return false;
68+
return true;
7069
}
7170

7271
private static boolean isProtocolViolationError( Neo4jException error )
@@ -84,6 +83,10 @@ private static boolean isClientOrTransientError( Neo4jException error )
8483
private static String extractClassification( String code )
8584
{
8685
String[] parts = code.split( "\\." );
86+
if ( parts.length < 2 )
87+
{
88+
return "";
89+
}
8790
return parts[1];
8891
}
8992
}

driver/src/test/java/org/neo4j/driver/internal/async/ChannelConnectedListenerTest.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,17 +33,17 @@
3333
import static org.junit.Assert.assertThat;
3434
import static org.junit.Assert.assertTrue;
3535
import static org.junit.Assert.fail;
36+
import static org.neo4j.driver.internal.async.BoltServerAddress.LOCAL_DEFAULT;
3637
import static org.neo4j.driver.internal.async.ProtocolUtil.handshake;
3738
import static org.neo4j.driver.internal.logging.DevNullLogging.DEV_NULL_LOGGING;
38-
import static org.neo4j.driver.internal.async.BoltServerAddress.LOCAL_DEFAULT;
3939
import static org.neo4j.driver.v1.util.TestUtil.await;
4040

4141
public class ChannelConnectedListenerTest
4242
{
4343
private final EmbeddedChannel channel = new EmbeddedChannel();
4444

4545
@After
46-
public void tearDown() throws Exception
46+
public void tearDown()
4747
{
4848
channel.close();
4949
}
@@ -90,6 +90,7 @@ public void shouldWriteHandshakeWhenChannelConnected()
9090

9191
private static ChannelConnectedListener newListener( ChannelPromise handshakeCompletedPromise )
9292
{
93-
return new ChannelConnectedListener( LOCAL_DEFAULT, handshakeCompletedPromise, DEV_NULL_LOGGING );
93+
return new ChannelConnectedListener( LOCAL_DEFAULT, new ChannelPipelineBuilderImpl(),
94+
handshakeCompletedPromise, DEV_NULL_LOGGING );
9495
}
9596
}

driver/src/test/java/org/neo4j/driver/internal/async/ChannelPipelineBuilderTest.java renamed to driver/src/test/java/org/neo4j/driver/internal/async/ChannelPipelineBuilderImplTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,15 +37,15 @@
3737
import static org.junit.Assert.assertThat;
3838
import static org.neo4j.driver.internal.logging.DevNullLogging.DEV_NULL_LOGGING;
3939

40-
public class ChannelPipelineBuilderTest
40+
public class ChannelPipelineBuilderImplTest
4141
{
4242
@Test
4343
public void shouldBuildPipeline()
4444
{
4545
EmbeddedChannel channel = new EmbeddedChannel();
4646
ChannelAttributes.setMessageDispatcher( channel, new InboundMessageDispatcher( channel, DEV_NULL_LOGGING ) );
4747

48-
ChannelPipelineBuilder.buildPipeline( channel, new PackStreamMessageFormatV1(), DEV_NULL_LOGGING );
48+
new ChannelPipelineBuilderImpl().build( new PackStreamMessageFormatV1(), channel.pipeline(), DEV_NULL_LOGGING );
4949

5050
Iterator<Map.Entry<String,ChannelHandler>> iterator = channel.pipeline().iterator();
5151
assertThat( iterator.next().getValue(), instanceOf( ChunkDecoder.class ) );

driver/src/test/java/org/neo4j/driver/internal/async/HandshakeResponseHandlerTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,7 @@ private void testFailure( int serverSuggestedVersion, String expectedMessagePref
158158

159159
private static HandshakeResponseHandler newHandler( ChannelPromise handshakeCompletedPromise )
160160
{
161-
return new HandshakeResponseHandler( handshakeCompletedPromise, DEV_NULL_LOGGING );
161+
return new HandshakeResponseHandler( new ChannelPipelineBuilderImpl(), handshakeCompletedPromise,
162+
DEV_NULL_LOGGING );
162163
}
163164
}

driver/src/test/java/org/neo4j/driver/internal/messaging/MessageFormatTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
import org.neo4j.driver.internal.InternalNode;
3333
import org.neo4j.driver.internal.InternalPath;
3434
import org.neo4j.driver.internal.InternalRelationship;
35-
import org.neo4j.driver.internal.async.ChannelPipelineBuilder;
35+
import org.neo4j.driver.internal.async.ChannelPipelineBuilderImpl;
3636
import org.neo4j.driver.internal.async.inbound.InboundMessageDispatcher;
3737
import org.neo4j.driver.internal.async.outbound.ChunkAwareByteBufOutput;
3838
import org.neo4j.driver.internal.packstream.PackStream;
@@ -151,7 +151,7 @@ private EmbeddedChannel newEmbeddedChannel()
151151
{
152152
EmbeddedChannel channel = new EmbeddedChannel();
153153
setMessageDispatcher( channel, new MemorizingInboundMessageDispatcher( channel, DEV_NULL_LOGGING ) );
154-
ChannelPipelineBuilder.buildPipeline( channel, format, DEV_NULL_LOGGING );
154+
new ChannelPipelineBuilderImpl().build( format, channel.pipeline(), DEV_NULL_LOGGING );
155155
return channel;
156156
}
157157

0 commit comments

Comments
 (0)