Skip to content

Commit 385b291

Browse files
committed
Improve logging in channel pipeline
Made all handlers in the channel pipeline prefix their log messages with channel info. Added tests and fixed issue where code would hang if channel gets disconnected during bolt handshake. Improved layout of java packages.
1 parent bf86eaa commit 385b291

File tree

99 files changed

+447
-642
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

99 files changed

+447
-642
lines changed

driver/src/main/java/org/neo4j/driver/internal/async/BoltServerAddress.java renamed to driver/src/main/java/org/neo4j/driver/internal/BoltServerAddress.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
* See the License for the specific language governing permissions and
1717
* limitations under the License.
1818
*/
19-
package org.neo4j.driver.internal.async;
19+
package org.neo4j.driver.internal;
2020

2121
import java.net.InetAddress;
2222
import java.net.InetSocketAddress;

driver/src/main/java/org/neo4j/driver/internal/DirectConnectionProvider.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020

2121
import java.util.concurrent.CompletionStage;
2222

23-
import org.neo4j.driver.internal.async.BoltServerAddress;
2423
import org.neo4j.driver.internal.spi.Connection;
2524
import org.neo4j.driver.internal.spi.ConnectionPool;
2625
import org.neo4j.driver.internal.spi.ConnectionProvider;

driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import java.net.URI;
2626
import java.security.GeneralSecurityException;
2727

28-
import org.neo4j.driver.internal.async.BoltServerAddress;
2928
import org.neo4j.driver.internal.async.BootstrapFactory;
3029
import org.neo4j.driver.internal.async.ChannelConnector;
3130
import org.neo4j.driver.internal.async.ChannelConnectorImpl;
@@ -143,11 +142,9 @@ private InternalDriver createDriver( URI uri, BoltServerAddress address,
143142
protected InternalDriver createDirectDriver( BoltServerAddress address, Config config,
144143
SecurityPlan securityPlan, RetryLogic retryLogic, ConnectionPool connectionPool )
145144
{
146-
ConnectionProvider connectionProvider =
147-
new DirectConnectionProvider( address, connectionPool );
148-
SessionFactory sessionFactory =
149-
createSessionFactory( connectionProvider, retryLogic, config );
150-
return createDriver( config, securityPlan, sessionFactory );
145+
ConnectionProvider connectionProvider = new DirectConnectionProvider( address, connectionPool );
146+
SessionFactory sessionFactory = createSessionFactory( connectionProvider, retryLogic, config );
147+
return createDriver( sessionFactory, securityPlan, config );
151148
}
152149

153150
/**
@@ -166,17 +163,17 @@ protected InternalDriver createRoutingDriver( BoltServerAddress address, Connect
166163
ConnectionProvider connectionProvider = createLoadBalancer( address, connectionPool, eventExecutorGroup,
167164
config, routingSettings );
168165
SessionFactory sessionFactory = createSessionFactory( connectionProvider, retryLogic, config );
169-
return createDriver( config, securityPlan, sessionFactory );
166+
return createDriver( sessionFactory, securityPlan, config );
170167
}
171168

172169
/**
173170
* Creates new {@link Driver}.
174171
* <p>
175172
* <b>This method is protected only for testing</b>
176173
*/
177-
protected InternalDriver createDriver( Config config, SecurityPlan securityPlan, SessionFactory sessionFactory )
174+
protected InternalDriver createDriver( SessionFactory sessionFactory, SecurityPlan securityPlan, Config config )
178175
{
179-
return new InternalDriver( securityPlan, sessionFactory );
176+
return new InternalDriver( securityPlan, sessionFactory, config.logging() );
180177
}
181178

182179
/**

driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import java.util.function.BiConsumer;
2626
import java.util.function.BiFunction;
2727

28-
import org.neo4j.driver.internal.async.InternalStatementResultCursor;
2928
import org.neo4j.driver.internal.async.QueryRunner;
3029
import org.neo4j.driver.internal.async.ResultCursorsHolder;
3130
import org.neo4j.driver.internal.handlers.BeginTxResponseHandler;

driver/src/main/java/org/neo4j/driver/internal/InternalDriver.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
import org.neo4j.driver.internal.security.SecurityPlan;
2525
import org.neo4j.driver.v1.AccessMode;
2626
import org.neo4j.driver.v1.Driver;
27+
import org.neo4j.driver.v1.Logger;
28+
import org.neo4j.driver.v1.Logging;
2729
import org.neo4j.driver.v1.Session;
2830

2931
import static java.util.concurrent.CompletableFuture.completedFuture;
@@ -33,13 +35,15 @@ public class InternalDriver implements Driver
3335
{
3436
private final SecurityPlan securityPlan;
3537
private final SessionFactory sessionFactory;
38+
private final Logger log;
3639

3740
private AtomicBoolean closed = new AtomicBoolean( false );
3841

39-
InternalDriver( SecurityPlan securityPlan, SessionFactory sessionFactory )
42+
InternalDriver( SecurityPlan securityPlan, SessionFactory sessionFactory, Logging logging )
4043
{
4144
this.securityPlan = securityPlan;
4245
this.sessionFactory = sessionFactory;
46+
this.log = logging.getLog( Driver.class.getSimpleName() );
4347
}
4448

4549
@Override
@@ -108,6 +112,7 @@ public CompletionStage<Void> closeAsync()
108112
{
109113
if ( closed.compareAndSet( false, true ) )
110114
{
115+
log.info( "Driver instance is closing" );
111116
return sessionFactory.close();
112117
}
113118
return completedFuture( null );

driver/src/main/java/org/neo4j/driver/internal/async/InternalStatementResultCursor.java renamed to driver/src/main/java/org/neo4j/driver/internal/InternalStatementResultCursor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
* See the License for the specific language governing permissions and
1717
* limitations under the License.
1818
*/
19-
package org.neo4j.driver.internal.async;
19+
package org.neo4j.driver.internal;
2020

2121
import java.util.ArrayList;
2222
import java.util.List;

driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import java.util.concurrent.CompletionStage;
2525
import java.util.concurrent.atomic.AtomicBoolean;
2626

27-
import org.neo4j.driver.internal.async.InternalStatementResultCursor;
2827
import org.neo4j.driver.internal.async.QueryRunner;
2928
import org.neo4j.driver.internal.logging.DelegatingLogger;
3029
import org.neo4j.driver.internal.retry.RetryLogic;
@@ -74,7 +73,7 @@ public NetworkSession( ConnectionProvider connectionProvider, AccessMode mode, R
7473
this.connectionProvider = connectionProvider;
7574
this.mode = mode;
7675
this.retryLogic = retryLogic;
77-
this.logger = new DelegatingLogger( logging.getLog( LOG_NAME ), String.valueOf( hashCode() ) );
76+
this.logger = new DelegatingLogger( "[" + hashCode() + "]", logging.getLog( LOG_NAME ) );
7877
}
7978

8079
@Override

driver/src/main/java/org/neo4j/driver/internal/RoutingErrorHandler.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,6 @@
1818
*/
1919
package org.neo4j.driver.internal;
2020

21-
import org.neo4j.driver.internal.async.BoltServerAddress;
22-
2321
/**
2422
* Interface used for tracking errors when connected to a cluster.
2523
*/

driver/src/main/java/org/neo4j/driver/internal/async/ChannelAttributes.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import io.netty.channel.Channel;
2222
import io.netty.util.AttributeKey;
2323

24+
import org.neo4j.driver.internal.BoltServerAddress;
2425
import org.neo4j.driver.internal.async.inbound.InboundMessageDispatcher;
2526
import org.neo4j.driver.internal.util.ServerVersion;
2627

driver/src/main/java/org/neo4j/driver/internal/async/ChannelConnectedListener.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
import io.netty.channel.ChannelPipeline;
2525
import io.netty.channel.ChannelPromise;
2626

27+
import org.neo4j.driver.internal.BoltServerAddress;
28+
import org.neo4j.driver.v1.Logger;
2729
import org.neo4j.driver.v1.Logging;
2830
import org.neo4j.driver.v1.exceptions.ServiceUnavailableException;
2931

@@ -36,6 +38,7 @@ public class ChannelConnectedListener implements ChannelFutureListener
3638
private final ChannelPipelineBuilder pipelineBuilder;
3739
private final ChannelPromise handshakeCompletedPromise;
3840
private final Logging logging;
41+
private final Logger log;
3942

4043
public ChannelConnectedListener( BoltServerAddress address, ChannelPipelineBuilder pipelineBuilder,
4144
ChannelPromise handshakeCompletedPromise, Logging logging )
@@ -44,6 +47,7 @@ public ChannelConnectedListener( BoltServerAddress address, ChannelPipelineBuild
4447
this.pipelineBuilder = pipelineBuilder;
4548
this.handshakeCompletedPromise = handshakeCompletedPromise;
4649
this.logging = logging;
50+
this.log = logging.getLog( getClass().getSimpleName() );
4751
}
4852

4953
@Override
@@ -53,8 +57,10 @@ public void operationComplete( ChannelFuture future )
5357

5458
if ( future.isSuccess() )
5559
{
60+
log.trace( "Channel %s connected, running bolt handshake", channel );
61+
5662
ChannelPipeline pipeline = channel.pipeline();
57-
pipeline.addLast( new HandshakeResponseHandler( pipelineBuilder, handshakeCompletedPromise, logging ) );
63+
pipeline.addLast( new HandshakeHandler( pipelineBuilder, handshakeCompletedPromise, logging ) );
5864
ChannelFuture handshakeFuture = channel.writeAndFlush( handshake() );
5965

6066
handshakeFuture.addListener( channelFuture ->

driver/src/main/java/org/neo4j/driver/internal/async/ChannelConnector.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
import io.netty.bootstrap.Bootstrap;
2222
import io.netty.channel.ChannelFuture;
2323

24+
import org.neo4j.driver.internal.BoltServerAddress;
25+
2426
public interface ChannelConnector
2527
{
2628
ChannelFuture connect( BoltServerAddress address, Bootstrap bootstrap );

driver/src/main/java/org/neo4j/driver/internal/async/ChannelConnectorImpl.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626

2727
import java.util.Map;
2828

29+
import org.neo4j.driver.internal.BoltServerAddress;
2930
import org.neo4j.driver.internal.ConnectionSettings;
3031
import org.neo4j.driver.internal.security.InternalAuthToken;
3132
import org.neo4j.driver.internal.security.SecurityPlan;

driver/src/main/java/org/neo4j/driver/internal/async/ChannelPipelineBuilderImpl.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import io.netty.channel.ChannelPipeline;
2222

23+
import org.neo4j.driver.internal.async.inbound.ChannelErrorHandler;
2324
import org.neo4j.driver.internal.async.inbound.ChunkDecoder;
2425
import org.neo4j.driver.internal.async.inbound.InboundMessageHandler;
2526
import org.neo4j.driver.internal.async.inbound.MessageDecoder;
@@ -34,7 +35,7 @@ public void build( MessageFormat messageFormat, ChannelPipeline pipeline, Loggin
3435
{
3536
// inbound handlers
3637
pipeline.addLast( new ChunkDecoder() );
37-
pipeline.addLast( new MessageDecoder() );
38+
pipeline.addLast( new MessageDecoder( logging ) );
3839
pipeline.addLast( new InboundMessageHandler( messageFormat, logging ) );
3940

4041
// outbound handlers

driver/src/main/java/org/neo4j/driver/internal/async/HandshakeResponseHandler.java renamed to driver/src/main/java/org/neo4j/driver/internal/async/HandshakeHandler.java

Lines changed: 49 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -28,53 +28,92 @@
2828
import java.util.List;
2929
import javax.net.ssl.SSLHandshakeException;
3030

31+
import org.neo4j.driver.internal.logging.DelegatingLogger;
3132
import org.neo4j.driver.internal.messaging.MessageFormat;
3233
import org.neo4j.driver.internal.messaging.PackStreamMessageFormatV1;
34+
import org.neo4j.driver.internal.util.ErrorUtil;
3335
import org.neo4j.driver.v1.Logger;
3436
import org.neo4j.driver.v1.Logging;
3537
import org.neo4j.driver.v1.exceptions.ClientException;
3638
import org.neo4j.driver.v1.exceptions.SecurityException;
39+
import org.neo4j.driver.v1.exceptions.ServiceUnavailableException;
3740

3841
import static org.neo4j.driver.internal.async.ProtocolUtil.HTTP;
3942
import static org.neo4j.driver.internal.async.ProtocolUtil.NO_PROTOCOL_VERSION;
4043
import static org.neo4j.driver.internal.async.ProtocolUtil.PROTOCOL_VERSION_1;
4144

42-
public class HandshakeResponseHandler extends ReplayingDecoder<Void>
45+
public class HandshakeHandler extends ReplayingDecoder<Void>
4346
{
4447
private final ChannelPipelineBuilder pipelineBuilder;
4548
private final ChannelPromise handshakeCompletedPromise;
4649
private final Logging logging;
47-
private final Logger log;
4850

49-
public HandshakeResponseHandler( ChannelPipelineBuilder pipelineBuilder, ChannelPromise handshakeCompletedPromise,
51+
private boolean failed;
52+
private Logger log;
53+
54+
public HandshakeHandler( ChannelPipelineBuilder pipelineBuilder, ChannelPromise handshakeCompletedPromise,
5055
Logging logging )
5156
{
5257
this.pipelineBuilder = pipelineBuilder;
5358
this.handshakeCompletedPromise = handshakeCompletedPromise;
5459
this.logging = logging;
55-
this.log = logging.getLog( getClass().getSimpleName() );
60+
}
61+
62+
@Override
63+
public void handlerAdded( ChannelHandlerContext ctx )
64+
{
65+
log = new DelegatingLogger( ctx.channel().toString(), logging, getClass() );
66+
}
67+
68+
@Override
69+
protected void handlerRemoved0( ChannelHandlerContext ctx )
70+
{
71+
failed = false;
72+
log = null;
73+
}
74+
75+
@Override
76+
public void channelInactive( ChannelHandlerContext ctx )
77+
{
78+
log.debug( "Channel is inactive" );
79+
80+
if ( !failed )
81+
{
82+
// channel became inactive while doing bolt handshake, not because of some previous error
83+
ServiceUnavailableException error = ErrorUtil.newConnectionTerminatedError();
84+
fail( ctx, error );
85+
}
5686
}
5787

5888
@Override
5989
public void exceptionCaught( ChannelHandlerContext ctx, Throwable error )
6090
{
61-
// todo: test this unwrapping and SSLHandshakeException propagation
62-
Throwable cause = error instanceof DecoderException ? error.getCause() : error;
63-
if ( cause instanceof SSLHandshakeException )
91+
if ( failed )
6492
{
65-
fail( ctx, new SecurityException( "Failed to establish secured connection with the server", cause ) );
93+
log.warn( "Another fatal error occurred in the pipeline", error );
6694
}
6795
else
6896
{
69-
fail( ctx, cause );
97+
failed = true;
98+
99+
// todo: test this unwrapping and SSLHandshakeException propagation
100+
Throwable cause = error instanceof DecoderException ? error.getCause() : error;
101+
if ( cause instanceof SSLHandshakeException )
102+
{
103+
fail( ctx, new SecurityException( "Failed to establish secured connection with the server", cause ) );
104+
}
105+
else
106+
{
107+
fail( ctx, cause );
108+
}
70109
}
71110
}
72111

73112
@Override
74113
protected void decode( ChannelHandlerContext ctx, ByteBuf in, List<Object> out )
75114
{
76115
int serverSuggestedVersion = in.readInt();
77-
log.debug( "Server suggested protocol version: %s", serverSuggestedVersion );
116+
log.debug( "Server suggested protocol version %s during handshake", serverSuggestedVersion );
78117

79118
ChannelPipeline pipeline = ctx.pipeline();
80119
// this is a one-time handler, remove it when protocol version has been read

0 commit comments

Comments
 (0)