Skip to content

Commit 200857b

Browse files
committed
More logging improvements in channel pipeline
* `C:` and `S:` message prefixes as in other driver verisons * removed logging of chunks without headers * added trace logging of full chunks and message boundary * log raw routing procedure response on debug level
1 parent 18740a8 commit 200857b

File tree

9 files changed

+142
-67
lines changed

9 files changed

+142
-67
lines changed

driver/src/main/java/org/neo4j/driver/internal/async/ChannelPipelineBuilderImpl.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,8 @@ public class ChannelPipelineBuilderImpl implements ChannelPipelineBuilder
3434
public void build( MessageFormat messageFormat, ChannelPipeline pipeline, Logging logging )
3535
{
3636
// inbound handlers
37-
pipeline.addLast( new ChunkDecoder() );
38-
pipeline.addLast( new MessageDecoder( logging ) );
37+
pipeline.addLast( new ChunkDecoder( logging ) );
38+
pipeline.addLast( new MessageDecoder() );
3939
pipeline.addLast( new InboundMessageHandler( messageFormat, logging ) );
4040

4141
// outbound handlers

driver/src/main/java/org/neo4j/driver/internal/async/inbound/ChunkDecoder.java

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,16 @@
1818
*/
1919
package org.neo4j.driver.internal.async.inbound;
2020

21+
import io.netty.buffer.ByteBuf;
22+
import io.netty.channel.ChannelHandlerContext;
2123
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
2224

25+
import org.neo4j.driver.internal.logging.DelegatingLogger;
26+
import org.neo4j.driver.v1.Logger;
27+
import org.neo4j.driver.v1.Logging;
28+
29+
import static io.netty.buffer.ByteBufUtil.prettyHexDump;
30+
2331
public class ChunkDecoder extends LengthFieldBasedFrameDecoder
2432
{
2533
private static final int MAX_FRAME_LENGTH = Short.MAX_VALUE;
@@ -28,8 +36,38 @@ public class ChunkDecoder extends LengthFieldBasedFrameDecoder
2836
private static final int LENGTH_ADJUSTMENT = 0;
2937
private static final int INITIAL_BYTES_TO_STRIP = LENGTH_FIELD_LENGTH;
3038

31-
public ChunkDecoder()
39+
private final Logging logging;
40+
private Logger log;
41+
42+
public ChunkDecoder( Logging logging )
3243
{
3344
super( MAX_FRAME_LENGTH, LENGTH_FIELD_OFFSET, LENGTH_FIELD_LENGTH, LENGTH_ADJUSTMENT, INITIAL_BYTES_TO_STRIP );
45+
this.logging = logging;
46+
}
47+
48+
@Override
49+
public void handlerAdded( ChannelHandlerContext ctx )
50+
{
51+
log = new DelegatingLogger( ctx.channel().toString(), logging, getClass() );
52+
}
53+
54+
@Override
55+
protected void handlerRemoved0( ChannelHandlerContext ctx )
56+
{
57+
log = null;
58+
}
59+
60+
@Override
61+
protected ByteBuf extractFrame( ChannelHandlerContext ctx, ByteBuf buffer, int index, int length )
62+
{
63+
if ( log.isTraceEnabled() )
64+
{
65+
int originalReaderIndex = buffer.readerIndex();
66+
int readerIndexWithChunkHeader = originalReaderIndex - INITIAL_BYTES_TO_STRIP;
67+
int lengthWithChunkHeader = INITIAL_BYTES_TO_STRIP + buffer.readableBytes();
68+
String hexDump = prettyHexDump( buffer, readerIndexWithChunkHeader, lengthWithChunkHeader );
69+
log.trace( "S:\n%s", hexDump );
70+
}
71+
return super.extractFrame( ctx, buffer, index, length );
3472
}
3573
}

driver/src/main/java/org/neo4j/driver/internal/async/inbound/InboundMessageDispatcher.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -113,27 +113,26 @@ public void handleAckFailureMessage()
113113
@Override
114114
public void handleSuccessMessage( Map<String,Value> meta )
115115
{
116+
log.debug( "S: SUCCESS %s", meta );
116117
ResponseHandler handler = handlers.remove();
117-
log.debug( "Received SUCCESS message with metadata %s for handler %s", meta, handler );
118118
handler.onSuccess( meta );
119119
}
120120

121121
@Override
122122
public void handleRecordMessage( Value[] fields )
123123
{
124-
ResponseHandler handler = handlers.peek();
125124
if ( log.isDebugEnabled() )
126125
{
127-
log.debug( "Received RECORD message with metadata %s for handler %s", Arrays.toString( fields ), handler );
126+
log.debug( "S: RECORD %s", Arrays.toString( fields ) );
128127
}
128+
ResponseHandler handler = handlers.peek();
129129
handler.onRecord( fields );
130130
}
131131

132132
@Override
133133
public void handleFailureMessage( String code, String message )
134134
{
135-
ResponseHandler handler = handlers.remove();
136-
log.debug( "Received FAILURE message with code '%s' and message '%s' for handler %s", code, message, handler );
135+
log.debug( "S: FAILURE %s \"%s\"", code, message );
137136

138137
currentError = ErrorUtil.newNeo4jError( code, message );
139138

@@ -148,15 +147,16 @@ public void handleFailureMessage( String code, String message )
148147
// try to write ACK_FAILURE before notifying the next response handler
149148
ackFailureIfNeeded();
150149

150+
ResponseHandler handler = handlers.remove();
151151
handler.onFailure( currentError );
152152
}
153153

154154
@Override
155155
public void handleIgnoredMessage()
156156
{
157-
ResponseHandler handler = handlers.remove();
158-
log.debug( "Received IGNORED message for handler %s", handler );
157+
log.debug( "S: IGNORED" );
159158

159+
ResponseHandler handler = handlers.remove();
160160
if ( currentError != null )
161161
{
162162
handler.onFailure( currentError );

driver/src/main/java/org/neo4j/driver/internal/async/inbound/InboundMessageHandler.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,14 +67,14 @@ protected void channelRead0( ChannelHandlerContext ctx, ByteBuf msg )
6767
{
6868
if ( messageDispatcher.fatalErrorOccurred() )
6969
{
70-
log.warn( "Message ignored because of the previous fatal error. Channel will be closed. Message:\n%s\n",
70+
log.warn( "Message ignored because of the previous fatal error. Channel will be closed. Message:\n%s",
7171
prettyHexDump( msg ) );
7272
return;
7373
}
7474

7575
if ( log.isTraceEnabled() )
7676
{
77-
log.trace( "Message received:\n%s\n", prettyHexDump( msg ) );
77+
log.trace( "S:\n%s", prettyHexDump( msg ) );
7878
}
7979

8080
input.start( msg );

driver/src/main/java/org/neo4j/driver/internal/async/inbound/MessageDecoder.java

Lines changed: 1 addition & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -24,59 +24,18 @@
2424

2525
import java.util.List;
2626

27-
import org.neo4j.driver.internal.logging.DelegatingLogger;
28-
import org.neo4j.driver.v1.Logger;
29-
import org.neo4j.driver.v1.Logging;
30-
31-
import static io.netty.buffer.ByteBufUtil.prettyHexDump;
32-
3327
public class MessageDecoder extends ByteToMessageDecoder
3428
{
35-
private final Logging logging;
36-
3729
private boolean readMessageBoundary;
38-
private Logger log;
39-
40-
public MessageDecoder( Logging logging )
41-
{
42-
this.logging = logging;
43-
}
44-
45-
@Override
46-
public void handlerAdded( ChannelHandlerContext ctx )
47-
{
48-
log = new DelegatingLogger( ctx.channel().toString(), logging, getClass() );
49-
}
50-
51-
@Override
52-
protected void handlerRemoved0( ChannelHandlerContext ctx ) throws Exception
53-
{
54-
readMessageBoundary = false;
55-
log = null;
56-
}
5730

5831
@Override
5932
public void channelRead( ChannelHandlerContext ctx, Object msg ) throws Exception
6033
{
6134
if ( msg instanceof ByteBuf )
6235
{
63-
ByteBuf chunkBuf = (ByteBuf) msg;
64-
6536
// on every read check if input buffer is empty or not
6637
// if it is empty then it's a message boundary and full message is in the buffer
67-
readMessageBoundary = chunkBuf.readableBytes() == 0;
68-
69-
if ( log.isTraceEnabled() )
70-
{
71-
if ( readMessageBoundary )
72-
{
73-
log.trace( "Received message boundary" );
74-
}
75-
else
76-
{
77-
log.trace( "Received message chunk:\n%s\n", prettyHexDump( chunkBuf ) );
78-
}
79-
}
38+
readMessageBoundary = ((ByteBuf) msg).readableBytes() == 0;
8039
}
8140
super.channelRead( ctx, msg );
8241
}

driver/src/main/java/org/neo4j/driver/internal/async/outbound/OutboundMessageHandler.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ public void handlerRemoved( ChannelHandlerContext ctx )
7373
@Override
7474
protected void encode( ChannelHandlerContext ctx, Message msg, List<Object> out )
7575
{
76-
log.debug( "Sending message %s", msg );
76+
log.debug( "C: %s", msg );
7777

7878
ByteBuf messageBuf = ctx.alloc().ioBuffer();
7979
output.start( messageBuf );
@@ -92,7 +92,7 @@ protected void encode( ChannelHandlerContext ctx, Message msg, List<Object> out
9292

9393
if ( log.isTraceEnabled() )
9494
{
95-
log.trace( "Message %s encoded as\n%s\n", msg, prettyHexDump( messageBuf ) );
95+
log.trace( "C: %s encoded as\n%s", msg, prettyHexDump( messageBuf ) );
9696
}
9797

9898
out.add( messageBuf );

driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingProcedureClusterCompositionProvider.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,11 @@ private ClusterCompositionResponse processRoutingResponse( RoutingProcedureRespo
7373

7474
List<Record> records = response.records();
7575

76-
log.info( "Got getServers response: %s", records );
76+
if ( log.isDebugEnabled() )
77+
{
78+
log.debug( "Received response from %s procedure: %s", invokedProcedureString( response ), records );
79+
}
80+
7781
long now = clock.millis();
7882

7983
// the record size is wrong

driver/src/test/java/org/neo4j/driver/internal/async/inbound/ChunkDecoderTest.java

Lines changed: 83 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,30 +22,46 @@
2222
import io.netty.channel.embedded.EmbeddedChannel;
2323
import org.junit.After;
2424
import org.junit.Test;
25+
import org.mockito.ArgumentCaptor;
2526

27+
import org.neo4j.driver.v1.Logger;
28+
import org.neo4j.driver.v1.Logging;
29+
30+
import static io.netty.buffer.ByteBufUtil.prettyHexDump;
2631
import static io.netty.buffer.Unpooled.buffer;
2732
import static io.netty.buffer.Unpooled.copyShort;
2833
import static io.netty.buffer.Unpooled.wrappedBuffer;
2934
import static org.junit.Assert.assertEquals;
3035
import static org.junit.Assert.assertFalse;
3136
import static org.junit.Assert.assertTrue;
37+
import static org.mockito.Matchers.anyString;
38+
import static org.mockito.Mockito.mock;
39+
import static org.mockito.Mockito.verify;
40+
import static org.mockito.Mockito.when;
41+
import static org.neo4j.driver.internal.logging.DevNullLogging.DEV_NULL_LOGGING;
3242
import static org.neo4j.driver.v1.util.TestUtil.assertByteBufEquals;
3343

3444
public class ChunkDecoderTest
3545
{
36-
private final EmbeddedChannel channel = new EmbeddedChannel( new ChunkDecoder() );
46+
private ByteBuf buffer;
47+
private EmbeddedChannel channel = new EmbeddedChannel( newChunkDecoder() );
3748

3849
@After
3950
public void tearDown()
4051
{
41-
channel.finishAndReleaseAll();
52+
if ( buffer != null )
53+
{
54+
buffer.release( buffer.refCnt() );
55+
}
56+
if ( channel != null )
57+
{
58+
channel.finishAndReleaseAll();
59+
}
4260
}
4361

4462
@Test
4563
public void shouldDecodeFullChunk()
4664
{
47-
EmbeddedChannel channel = new EmbeddedChannel( new ChunkDecoder() );
48-
4965
// whole chunk with header and body arrives at once
5066
ByteBuf input = buffer();
5167
input.writeShort( 7 );
@@ -70,8 +86,6 @@ public void shouldDecodeFullChunk()
7086
@Test
7187
public void shouldDecodeSplitChunk()
7288
{
73-
EmbeddedChannel channel = new EmbeddedChannel( new ChunkDecoder() );
74-
7589
// first part of the chunk contains size header and some bytes
7690
ByteBuf input1 = buffer();
7791
input1.writeShort( 9 );
@@ -113,8 +127,6 @@ public void shouldDecodeSplitChunk()
113127
@Test
114128
public void shouldDecodeEmptyChunk()
115129
{
116-
EmbeddedChannel channel = new EmbeddedChannel( new ChunkDecoder() );
117-
118130
// chunk contains just the size header which is zero
119131
ByteBuf input = copyShort( 0 );
120132
assertTrue( channel.writeInbound( input ) );
@@ -125,4 +137,67 @@ public void shouldDecodeEmptyChunk()
125137
// it should have no size header and empty body
126138
assertByteBufEquals( wrappedBuffer( new byte[0] ), channel.readInbound() );
127139
}
140+
141+
@Test
142+
public void shouldLogEmptyChunkOnTraceLevel()
143+
{
144+
Logger logger = newTraceLogger();
145+
channel = new EmbeddedChannel( new ChunkDecoder( newLogging( logger ) ) );
146+
147+
buffer = copyShort( 0 );
148+
assertTrue( channel.writeInbound( buffer.copy() ) ); // copy buffer so we can verify against it later
149+
assertTrue( channel.finish() );
150+
151+
ArgumentCaptor<String> messageCaptor = ArgumentCaptor.forClass( String.class );
152+
verify( logger ).trace( anyString(), messageCaptor.capture() );
153+
154+
// pretty hex dump should be logged
155+
assertEquals( prettyHexDump( buffer ), messageCaptor.getValue() );
156+
// single empty chunk should be available for reading
157+
assertEquals( 1, channel.inboundMessages().size() );
158+
assertByteBufEquals( wrappedBuffer( new byte[0] ), channel.readInbound() );
159+
}
160+
161+
@Test
162+
public void shouldLogNonEmptyChunkOnTraceLevel()
163+
{
164+
Logger logger = newTraceLogger();
165+
channel = new EmbeddedChannel( new ChunkDecoder( newLogging( logger ) ) );
166+
167+
byte[] bytes = "Hello".getBytes();
168+
buffer = buffer();
169+
buffer.writeShort( bytes.length );
170+
buffer.writeBytes( bytes );
171+
172+
assertTrue( channel.writeInbound( buffer.copy() ) ); // copy buffer so we can verify against it later
173+
assertTrue( channel.finish() );
174+
175+
ArgumentCaptor<String> messageCaptor = ArgumentCaptor.forClass( String.class );
176+
verify( logger ).trace( anyString(), messageCaptor.capture() );
177+
178+
// pretty hex dump should be logged
179+
assertEquals( prettyHexDump( buffer ), messageCaptor.getValue() );
180+
// single chunk should be available for reading
181+
assertEquals( 1, channel.inboundMessages().size() );
182+
assertByteBufEquals( wrappedBuffer( bytes ), channel.readInbound() );
183+
}
184+
185+
private static ChunkDecoder newChunkDecoder()
186+
{
187+
return new ChunkDecoder( DEV_NULL_LOGGING );
188+
}
189+
190+
private static Logger newTraceLogger()
191+
{
192+
Logger logger = mock( Logger.class );
193+
when( logger.isTraceEnabled() ).thenReturn( true );
194+
return logger;
195+
}
196+
197+
private static Logging newLogging( Logger logger )
198+
{
199+
Logging logging = mock( Logging.class );
200+
when( logging.getLog( anyString() ) ).thenReturn( logger );
201+
return logging;
202+
}
128203
}

driver/src/test/java/org/neo4j/driver/internal/async/inbound/MessageDecoderTest.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,11 @@
2626
import static org.junit.Assert.assertEquals;
2727
import static org.junit.Assert.assertFalse;
2828
import static org.junit.Assert.assertTrue;
29-
import static org.neo4j.driver.internal.logging.DevNullLogging.DEV_NULL_LOGGING;
3029
import static org.neo4j.driver.v1.util.TestUtil.assertByteBufEquals;
3130

3231
public class MessageDecoderTest
3332
{
34-
private final EmbeddedChannel channel = new EmbeddedChannel( new MessageDecoder( DEV_NULL_LOGGING ) );
33+
private final EmbeddedChannel channel = new EmbeddedChannel( new MessageDecoder() );
3534

3635
@After
3736
public void tearDown()

0 commit comments

Comments
 (0)