Skip to content

Commit e10f2b1

Browse files
committed
Write message boundary & chunk header directly to IO buffer
Each message consists of one or more chunks. Each chunk is prefixed with a fixed-size header which contains two bytes - chunk body size. Messages are separated by a "message boundary" which is a two byte message "00". Code used to keep message boundary and empty chunk header as shared `ByteBuf` constants. These buffers were duplicated (no content copying involved) for every operation. This resulted in creation of multiple lightweight but unnecessary wrapper objects. This commit makes code write both message boundary and empty temporary chunk header directly into IO buffers and removes the need to duplicate shared buffers. So no temporary objects are created.
1 parent cb57721 commit e10f2b1

File tree

6 files changed

+118
-27
lines changed

6 files changed

+118
-27
lines changed

driver/src/main/java/org/neo4j/driver/internal/async/BoltProtocolV1Util.java

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import io.netty.buffer.ByteBuf;
2222

2323
import static io.netty.buffer.Unpooled.copyInt;
24-
import static io.netty.buffer.Unpooled.copyShort;
2524
import static io.netty.buffer.Unpooled.unreleasableBuffer;
2625

2726
public final class BoltProtocolV1Util
@@ -41,12 +40,7 @@ public final class BoltProtocolV1Util
4140
PROTOCOL_VERSION_1,
4241
NO_PROTOCOL_VERSION,
4342
NO_PROTOCOL_VERSION,
44-
NO_PROTOCOL_VERSION ) )
45-
.asReadOnly();
46-
47-
private static final ByteBuf MESSAGE_BOUNDARY_BUF = unreleasableBuffer( copyShort( 0 ) ).asReadOnly();
48-
49-
private static final ByteBuf CHUNK_HEADER_PLACEHOLDER_BUF = unreleasableBuffer( copyShort( 0 ) ).asReadOnly();
43+
NO_PROTOCOL_VERSION ) ).asReadOnly();
5044

5145
private BoltProtocolV1Util()
5246
{
@@ -62,13 +56,18 @@ public static String handshakeString()
6256
return "[0x6060B017, 1, 0, 0, 0]";
6357
}
6458

65-
public static ByteBuf messageBoundary()
59+
public static void writeMessageBoundary( ByteBuf buf )
60+
{
61+
buf.writeShort( 0 );
62+
}
63+
64+
public static void writeEmptyChunkHeader( ByteBuf buf )
6665
{
67-
return MESSAGE_BOUNDARY_BUF.duplicate();
66+
buf.writeShort( 0 );
6867
}
6968

70-
public static ByteBuf chunkHeaderPlaceholder()
69+
public static void writeChunkHeader( ByteBuf buf, int chunkStartIndex, int headerValue )
7170
{
72-
return CHUNK_HEADER_PLACEHOLDER_BUF.duplicate();
71+
buf.setShort( chunkStartIndex, headerValue );
7372
}
7473
}

driver/src/main/java/org/neo4j/driver/internal/async/outbound/ChunkAwareByteBufOutput.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,12 @@
2020

2121
import io.netty.buffer.ByteBuf;
2222

23+
import org.neo4j.driver.internal.async.BoltProtocolV1Util;
2324
import org.neo4j.driver.internal.packstream.PackOutput;
2425

2526
import static java.util.Objects.requireNonNull;
2627
import static org.neo4j.driver.internal.async.BoltProtocolV1Util.CHUNK_HEADER_SIZE_BYTES;
2728
import static org.neo4j.driver.internal.async.BoltProtocolV1Util.DEFAULT_MAX_OUTBOUND_CHUNK_SIZE_BYTES;
28-
import static org.neo4j.driver.internal.async.BoltProtocolV1Util.chunkHeaderPlaceholder;
2929

3030
public class ChunkAwareByteBufOutput implements PackOutput
3131
{
@@ -138,15 +138,15 @@ private void ensureCanFitInCurrentChunk( int numberOfBytes )
138138
private void startNewChunk( int index )
139139
{
140140
currentChunkStartIndex = index;
141-
buf.writeBytes( chunkHeaderPlaceholder() );
141+
BoltProtocolV1Util.writeEmptyChunkHeader( buf );
142142
currentChunkSize = CHUNK_HEADER_SIZE_BYTES;
143143
}
144144

145145
private void writeChunkSizeHeader()
146146
{
147-
// go to the beginning of the chunk and write 2 byte size header
147+
// go to the beginning of the chunk and write the size header
148148
int chunkBodySize = currentChunkSize - CHUNK_HEADER_SIZE_BYTES;
149-
buf.setShort( currentChunkStartIndex, chunkBodySize );
149+
BoltProtocolV1Util.writeChunkHeader( buf, currentChunkStartIndex, chunkBodySize );
150150
}
151151

152152
private int availableBytesInCurrentChunk()

driver/src/main/java/org/neo4j/driver/internal/async/outbound/OutboundMessageHandler.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,14 +25,14 @@
2525

2626
import java.util.List;
2727

28+
import org.neo4j.driver.internal.async.BoltProtocolV1Util;
2829
import org.neo4j.driver.internal.logging.ChannelActivityLogger;
2930
import org.neo4j.driver.internal.messaging.Message;
3031
import org.neo4j.driver.internal.messaging.MessageFormat;
3132
import org.neo4j.driver.v1.Logger;
3233
import org.neo4j.driver.v1.Logging;
3334

3435
import static io.netty.buffer.ByteBufUtil.hexDump;
35-
import static org.neo4j.driver.internal.async.BoltProtocolV1Util.messageBoundary;
3636

3737
public class OutboundMessageHandler extends MessageToMessageEncoder<Message>
3838
{
@@ -95,8 +95,8 @@ protected void encode( ChannelHandlerContext ctx, Message msg, List<Object> out
9595
log.trace( "C: %s", hexDump( messageBuf ) );
9696
}
9797

98+
BoltProtocolV1Util.writeMessageBoundary( messageBuf );
9899
out.add( messageBuf );
99-
out.add( messageBoundary() );
100100
}
101101

102102
public OutboundMessageHandler withoutByteArraySupport()
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
/*
2+
* Copyright (c) 2002-2017 "Neo Technology,"
3+
* Network Engine for Objects in Lund AB [http://neotechnology.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver.internal.async;
20+
21+
import io.netty.buffer.ByteBuf;
22+
import io.netty.buffer.Unpooled;
23+
import org.junit.Test;
24+
25+
import static org.junit.Assert.assertEquals;
26+
import static org.neo4j.driver.internal.async.BoltProtocolV1Util.BOLT_MAGIC_PREAMBLE;
27+
import static org.neo4j.driver.internal.async.BoltProtocolV1Util.NO_PROTOCOL_VERSION;
28+
import static org.neo4j.driver.internal.async.BoltProtocolV1Util.PROTOCOL_VERSION_1;
29+
import static org.neo4j.driver.internal.async.BoltProtocolV1Util.handshakeBuf;
30+
import static org.neo4j.driver.internal.async.BoltProtocolV1Util.handshakeString;
31+
import static org.neo4j.driver.internal.async.BoltProtocolV1Util.writeChunkHeader;
32+
import static org.neo4j.driver.internal.async.BoltProtocolV1Util.writeEmptyChunkHeader;
33+
import static org.neo4j.driver.internal.async.BoltProtocolV1Util.writeMessageBoundary;
34+
import static org.neo4j.driver.v1.util.TestUtil.assertByteBufContains;
35+
36+
public class BoltProtocolV1UtilTest
37+
{
38+
@Test
39+
public void shouldReturnHandshakeBuf()
40+
{
41+
assertByteBufContains(
42+
handshakeBuf(),
43+
BOLT_MAGIC_PREAMBLE, PROTOCOL_VERSION_1, NO_PROTOCOL_VERSION, NO_PROTOCOL_VERSION, NO_PROTOCOL_VERSION
44+
);
45+
}
46+
47+
@Test
48+
public void shouldReturnHandshakeString()
49+
{
50+
assertEquals( "[0x6060B017, 1, 0, 0, 0]", handshakeString() );
51+
}
52+
53+
@Test
54+
public void shouldWriteMessageBoundary()
55+
{
56+
ByteBuf buf = Unpooled.buffer();
57+
58+
buf.writeInt( 1 );
59+
buf.writeInt( 2 );
60+
buf.writeInt( 3 );
61+
writeMessageBoundary( buf );
62+
63+
assertByteBufContains( buf, 1, 2, 3, (byte) 0, (byte) 0 );
64+
}
65+
66+
@Test
67+
public void shouldWriteEmptyChunkHeader()
68+
{
69+
ByteBuf buf = Unpooled.buffer();
70+
71+
writeEmptyChunkHeader( buf );
72+
buf.writeInt( 1 );
73+
buf.writeInt( 2 );
74+
buf.writeInt( 3 );
75+
76+
assertByteBufContains( buf, (byte) 0, (byte) 0, 1, 2, 3 );
77+
}
78+
79+
@Test
80+
public void shouldWriteChunkHeader()
81+
{
82+
ByteBuf buf = Unpooled.buffer();
83+
84+
writeEmptyChunkHeader( buf );
85+
buf.writeInt( 1 );
86+
buf.writeInt( 2 );
87+
buf.writeInt( 3 );
88+
writeChunkHeader( buf, 0, 42 );
89+
90+
assertByteBufContains( buf, (short) 42, 1, 2, 3 );
91+
}
92+
}

driver/src/test/java/org/neo4j/driver/internal/async/outbound/OutboundMessageHandlerTest.java

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -50,13 +50,11 @@
5050
import static org.mockito.Mockito.doAnswer;
5151
import static org.mockito.Mockito.mock;
5252
import static org.mockito.Mockito.when;
53-
import static org.neo4j.driver.internal.async.BoltProtocolV1Util.messageBoundary;
5453
import static org.neo4j.driver.internal.logging.DevNullLogging.DEV_NULL_LOGGING;
5554
import static org.neo4j.driver.internal.messaging.MessageFormat.Writer;
5655
import static org.neo4j.driver.internal.messaging.PullAllMessage.PULL_ALL;
5756
import static org.neo4j.driver.v1.Values.value;
5857
import static org.neo4j.driver.v1.util.TestUtil.assertByteBufContains;
59-
import static org.neo4j.driver.v1.util.TestUtil.assertByteBufEquals;
6058

6159
public class OutboundMessageHandlerTest
6260
{
@@ -85,13 +83,14 @@ public void shouldOutputByteBufAsWrittenByWriterAndMessageBoundary()
8583
assertTrue( channel.writeOutbound( PULL_ALL ) );
8684
assertTrue( channel.finish() );
8785

88-
assertEquals( 2, channel.outboundMessages().size() );
86+
assertEquals( 1, channel.outboundMessages().size() );
8987

90-
ByteBuf buf1 = channel.readOutbound();
91-
assertByteBufContains( buf1, (short) 5, (byte) 1, (byte) 2, (byte) 3, (byte) 4, (byte) 5 );
92-
93-
ByteBuf buf2 = channel.readOutbound();
94-
assertByteBufEquals( messageBoundary(), buf2 );
88+
ByteBuf buf = channel.readOutbound();
89+
assertByteBufContains(
90+
buf,
91+
(short) 5, (byte) 1, (byte) 2, (byte) 3, (byte) 4, (byte) 5, // message body
92+
(byte) 0, (byte) 0 // message boundary
93+
);
9594
}
9695

9796
@Test

driver/src/test/java/org/neo4j/driver/internal/messaging/MessageFormatTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.neo4j.driver.internal.InternalNode;
3333
import org.neo4j.driver.internal.InternalPath;
3434
import org.neo4j.driver.internal.InternalRelationship;
35+
import org.neo4j.driver.internal.async.BoltProtocolV1Util;
3536
import org.neo4j.driver.internal.async.ChannelPipelineBuilderImpl;
3637
import org.neo4j.driver.internal.async.inbound.InboundMessageDispatcher;
3738
import org.neo4j.driver.internal.async.outbound.ChunkAwareByteBufOutput;
@@ -45,7 +46,6 @@
4546
import static org.junit.Assert.assertTrue;
4647
import static org.neo4j.driver.internal.async.ChannelAttributes.messageDispatcher;
4748
import static org.neo4j.driver.internal.async.ChannelAttributes.setMessageDispatcher;
48-
import static org.neo4j.driver.internal.async.BoltProtocolV1Util.messageBoundary;
4949
import static org.neo4j.driver.internal.logging.DevNullLogging.DEV_NULL_LOGGING;
5050
import static org.neo4j.driver.v1.Values.EmptyMap;
5151
import static org.neo4j.driver.v1.Values.ofValue;
@@ -121,6 +121,7 @@ public void shouldGiveHelpfulErrorOnMalformedNodeStruct() throws Throwable
121121
packer.packStructHeader( 0, PackStreamMessageFormatV1.NODE );
122122

123123
output.stop();
124+
BoltProtocolV1Util.writeMessageBoundary( buf );
124125

125126
// Expect
126127
exception.expect( ClientException.class );
@@ -129,7 +130,7 @@ public void shouldGiveHelpfulErrorOnMalformedNodeStruct() throws Throwable
129130
"received NODE structure has 0 fields." ) );
130131

131132
// When
132-
unpack( Unpooled.wrappedBuffer( buf, messageBoundary() ), newEmbeddedChannel() );
133+
unpack( buf, newEmbeddedChannel() );
133134
}
134135

135136
private void assertSerializesValue( Value value ) throws Throwable

0 commit comments

Comments
 (0)