Skip to content

Commit d3f5ab3

Browse files
committed
Add support for Bolt Protocol Handshake Manifest v1
1 parent b70dc41 commit d3f5ab3

File tree

17 files changed

+515
-77
lines changed

17 files changed

+515
-77
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Copyright (c) "Neo4j"
3+
* Neo4j Sweden AB [https://neo4j.com]
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.neo4j.driver.internal.bolt.basicimpl.impl.async.connection;
18+
19+
import org.neo4j.driver.internal.bolt.api.BoltProtocolVersion;
20+
21+
record BoltProtocolMinorVersionRange(int majorVersion, int minorVersion, int minorVersionNum) {
22+
public boolean contains(BoltProtocolVersion version) {
23+
if (majorVersion != version.getMajorVersion()) {
24+
return false;
25+
}
26+
27+
return version.getMinorVersion() <= minorVersion && version.getMinorVersion() >= minorVersion - minorVersionNum;
28+
}
29+
}

bolt-api-netty/src/main/java/org/neo4j/driver/internal/bolt/basicimpl/impl/async/connection/BoltProtocolUtil.java

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,25 @@
2121
import static java.lang.Integer.toHexString;
2222

2323
import io.netty.buffer.ByteBuf;
24+
import java.util.Collections;
25+
import java.util.Comparator;
26+
import java.util.Map;
27+
import java.util.SortedMap;
28+
import java.util.TreeMap;
2429
import org.neo4j.driver.internal.bolt.api.BoltProtocolVersion;
30+
import org.neo4j.driver.internal.bolt.basicimpl.impl.messaging.BoltProtocol;
2531
import org.neo4j.driver.internal.bolt.basicimpl.impl.messaging.v3.BoltProtocolV3;
26-
import org.neo4j.driver.internal.bolt.basicimpl.impl.messaging.v41.BoltProtocolV41;
2732
import org.neo4j.driver.internal.bolt.basicimpl.impl.messaging.v42.BoltProtocolV42;
33+
import org.neo4j.driver.internal.bolt.basicimpl.impl.messaging.v43.BoltProtocolV43;
2834
import org.neo4j.driver.internal.bolt.basicimpl.impl.messaging.v44.BoltProtocolV44;
2935
import org.neo4j.driver.internal.bolt.basicimpl.impl.messaging.v5.BoltProtocolV5;
36+
import org.neo4j.driver.internal.bolt.basicimpl.impl.messaging.v51.BoltProtocolV51;
37+
import org.neo4j.driver.internal.bolt.basicimpl.impl.messaging.v52.BoltProtocolV52;
38+
import org.neo4j.driver.internal.bolt.basicimpl.impl.messaging.v53.BoltProtocolV53;
39+
import org.neo4j.driver.internal.bolt.basicimpl.impl.messaging.v54.BoltProtocolV54;
40+
import org.neo4j.driver.internal.bolt.basicimpl.impl.messaging.v55.BoltProtocolV55;
41+
import org.neo4j.driver.internal.bolt.basicimpl.impl.messaging.v56.BoltProtocolV56;
42+
import org.neo4j.driver.internal.bolt.basicimpl.impl.messaging.v57.BoltProtocolV57;
3043
import org.neo4j.driver.internal.bolt.basicimpl.impl.messaging.v58.BoltProtocolV58;
3144

3245
public final class BoltProtocolUtil {
@@ -37,16 +50,37 @@ public final class BoltProtocolUtil {
3750

3851
public static final int DEFAULT_MAX_OUTBOUND_CHUNK_SIZE_BYTES = Short.MAX_VALUE / 2;
3952

53+
public static final SortedMap<BoltProtocolVersion, BoltProtocol> versionToProtocol;
54+
4055
private static final ByteBuf HANDSHAKE_BUF = unreleasableBuffer(copyInt(
4156
BOLT_MAGIC_PREAMBLE,
57+
0x000001FF,
4258
BoltProtocolV58.VERSION.toIntRange(BoltProtocolV5.VERSION),
4359
BoltProtocolV44.VERSION.toIntRange(BoltProtocolV42.VERSION),
44-
BoltProtocolV41.VERSION.toInt(),
4560
BoltProtocolV3.VERSION.toInt()))
4661
.asReadOnly();
4762

4863
private static final String HANDSHAKE_STRING = createHandshakeString();
4964

65+
static {
66+
var map = new TreeMap<BoltProtocolVersion, BoltProtocol>(Comparator.reverseOrder());
67+
map.putAll(Map.ofEntries(
68+
Map.entry(BoltProtocolV58.VERSION, BoltProtocolV58.INSTANCE),
69+
Map.entry(BoltProtocolV57.VERSION, BoltProtocolV57.INSTANCE),
70+
Map.entry(BoltProtocolV56.VERSION, BoltProtocolV56.INSTANCE),
71+
Map.entry(BoltProtocolV55.VERSION, BoltProtocolV55.INSTANCE),
72+
Map.entry(BoltProtocolV54.VERSION, BoltProtocolV54.INSTANCE),
73+
Map.entry(BoltProtocolV53.VERSION, BoltProtocolV53.INSTANCE),
74+
Map.entry(BoltProtocolV52.VERSION, BoltProtocolV52.INSTANCE),
75+
Map.entry(BoltProtocolV51.VERSION, BoltProtocolV51.INSTANCE),
76+
Map.entry(BoltProtocolV5.VERSION, BoltProtocolV5.INSTANCE),
77+
Map.entry(BoltProtocolV44.VERSION, BoltProtocolV44.INSTANCE),
78+
Map.entry(BoltProtocolV43.VERSION, BoltProtocolV43.INSTANCE),
79+
Map.entry(BoltProtocolV42.VERSION, BoltProtocolV42.INSTANCE),
80+
Map.entry(BoltProtocolV3.VERSION, BoltProtocolV3.INSTANCE)));
81+
versionToProtocol = Collections.unmodifiableSortedMap(map);
82+
}
83+
5084
private BoltProtocolUtil() {}
5185

5286
public static ByteBuf handshakeBuf() {

bolt-api-netty/src/main/java/org/neo4j/driver/internal/bolt/basicimpl/impl/async/connection/HandshakeHandler.java

Lines changed: 40 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ public class HandshakeHandler extends ReplayingDecoder<Void> {
4646
private boolean failed;
4747
private ChannelActivityLogger log;
4848
private ChannelErrorLogger errorLog;
49+
private ManifestHandler manifestHandler;
4950

5051
public HandshakeHandler(
5152
ChannelPipelineBuilder pipelineBuilder,
@@ -100,18 +101,47 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable error) {
100101

101102
@Override
102103
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
103-
var serverSuggestedVersion = BoltProtocolVersion.fromRawBytes(in.readInt());
104-
log.log(System.Logger.Level.DEBUG, "S: [Bolt Handshake] %s", serverSuggestedVersion);
105-
106-
// this is a one-time handler, remove it when protocol version has been read
107-
ctx.pipeline().remove(this);
108-
109-
var protocol = protocolForVersion(serverSuggestedVersion);
110-
if (protocol != null) {
111-
protocolSelected(serverSuggestedVersion, protocol.createMessageFormat(), ctx);
104+
if (manifestHandler != null) {
105+
try {
106+
manifestHandler.decode(in);
107+
} catch (Throwable e) {
108+
fail(ctx, e);
109+
}
112110
} else {
113-
handleUnknownSuggestedProtocolVersion(serverSuggestedVersion, ctx);
111+
var serverSuggestedVersion = BoltProtocolVersion.fromRawBytes(in.readInt());
112+
113+
if (new BoltProtocolVersion(255, 1).equals(serverSuggestedVersion)) {
114+
log.log(System.Logger.Level.DEBUG, "S: [Bolt Handshake Manifest] v1", serverSuggestedVersion);
115+
manifestHandler = new ManifestHandlerV1(ctx.channel(), logging);
116+
} else {
117+
log.log(System.Logger.Level.DEBUG, "S: [Bolt Handshake] %s", serverSuggestedVersion);
118+
119+
// this is a one-time handler, remove it when protocol version has been read
120+
ctx.pipeline().remove(this);
121+
122+
var protocol = protocolForVersion(serverSuggestedVersion);
123+
if (protocol != null) {
124+
protocolSelected(serverSuggestedVersion, protocol.createMessageFormat(), ctx);
125+
} else {
126+
handleUnknownSuggestedProtocolVersion(serverSuggestedVersion, ctx);
127+
}
128+
}
129+
}
130+
}
131+
132+
@Override
133+
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
134+
if (manifestHandler != null) {
135+
// this is a one-time handler, remove it when protocol version has been read
136+
ctx.pipeline().remove(this);
137+
try {
138+
var protocol = manifestHandler.complete();
139+
protocolSelected(protocol.version(), protocol.createMessageFormat(), ctx);
140+
} catch (Throwable e) {
141+
fail(ctx, e);
142+
}
114143
}
144+
super.channelReadComplete(ctx);
115145
}
116146

117147
private BoltProtocol protocolForVersion(BoltProtocolVersion version) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
* Copyright (c) "Neo4j"
3+
* Neo4j Sweden AB [https://neo4j.com]
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.neo4j.driver.internal.bolt.basicimpl.impl.async.connection;
18+
19+
import io.netty.buffer.ByteBuf;
20+
import org.neo4j.driver.internal.bolt.basicimpl.impl.messaging.BoltProtocol;
21+
22+
interface ManifestHandler {
23+
void decode(ByteBuf in);
24+
25+
BoltProtocol complete();
26+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
/*
2+
* Copyright (c) "Neo4j"
3+
* Neo4j Sweden AB [https://neo4j.com]
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.neo4j.driver.internal.bolt.basicimpl.impl.async.connection;
18+
19+
import static io.netty.buffer.Unpooled.unreleasableBuffer;
20+
import static java.lang.Integer.toHexString;
21+
22+
import io.netty.buffer.ByteBuf;
23+
import io.netty.buffer.Unpooled;
24+
import io.netty.channel.Channel;
25+
import java.util.HashSet;
26+
import java.util.Objects;
27+
import java.util.Set;
28+
import org.neo4j.driver.internal.bolt.api.LoggingProvider;
29+
import org.neo4j.driver.internal.bolt.api.exception.BoltClientException;
30+
import org.neo4j.driver.internal.bolt.basicimpl.impl.logging.ChannelActivityLogger;
31+
import org.neo4j.driver.internal.bolt.basicimpl.impl.messaging.BoltProtocol;
32+
33+
final class ManifestHandlerV1 implements ManifestHandler {
34+
private final ChannelActivityLogger log;
35+
private final Channel channel;
36+
private final VarLongBuilder expectedVersionRangesBuilder = new VarLongBuilder();
37+
38+
private long expectedVersionRanges = -1L;
39+
private Set<BoltProtocolMinorVersionRange> serverSupportedVersionRanges;
40+
41+
public ManifestHandlerV1(Channel channel, LoggingProvider logging) {
42+
this.channel = Objects.requireNonNull(channel);
43+
log = new ChannelActivityLogger(channel, logging, getClass());
44+
}
45+
46+
@Override
47+
public void decode(ByteBuf byteBuf) {
48+
if (expectedVersionRanges < 0) {
49+
decodeExpectedVersionsSegment(byteBuf);
50+
} else if (expectedVersionRanges > 0) {
51+
decodeServerSupportedBoltVersionRange(byteBuf);
52+
} else {
53+
byteBuf.readByte();
54+
}
55+
}
56+
57+
@Override
58+
public BoltProtocol complete() {
59+
return findSupportedBoltProtocol();
60+
}
61+
62+
private void decodeExpectedVersionsSegment(ByteBuf byteBuf) {
63+
var segment = byteBuf.readByte();
64+
var value = (byte) (0b01111111 & segment);
65+
66+
try {
67+
expectedVersionRangesBuilder.add(value);
68+
} catch (IllegalStateException e) {
69+
throw new BoltClientException(
70+
"The driver does not support the number of Bolt Protocol version ranges that the server wants to send",
71+
e);
72+
}
73+
74+
var finished = (segment >> 7) == 0;
75+
if (finished) {
76+
expectedVersionRanges = expectedVersionRangesBuilder.build();
77+
var size = (int) expectedVersionRanges;
78+
if (expectedVersionRanges != size) {
79+
throw new BoltClientException(
80+
"The driver does not support the number of Bolt Protocol version ranges that the server wants to send");
81+
} else {
82+
log.log(
83+
System.Logger.Level.DEBUG,
84+
"S: [Bolt Handshake Manifest] [expected version ranges %d]",
85+
expectedVersionRanges);
86+
serverSupportedVersionRanges = new HashSet<>(size);
87+
}
88+
}
89+
}
90+
91+
private void decodeServerSupportedBoltVersionRange(ByteBuf byteBuf) {
92+
var value = byteBuf.readInt();
93+
var major = value & 0x000000FF;
94+
var minor = (value >> 8) & 0x000000FF;
95+
var minorNum = (value >> 16) & 0x000000FF;
96+
serverSupportedVersionRanges.add(new BoltProtocolMinorVersionRange(major, minor, minorNum));
97+
expectedVersionRanges--;
98+
99+
if (expectedVersionRanges == 0) {
100+
log.log(
101+
System.Logger.Level.DEBUG,
102+
"S: [Bolt Handshake Manifest] [server supported version ranges %s]",
103+
serverSupportedVersionRanges);
104+
}
105+
}
106+
107+
private BoltProtocol findSupportedBoltProtocol() {
108+
for (var entry : BoltProtocolUtil.versionToProtocol.entrySet()) {
109+
var version = entry.getKey();
110+
for (var range : serverSupportedVersionRanges) {
111+
if (range.contains(version)) {
112+
var protocol = entry.getValue();
113+
write(protocol.version().toInt());
114+
write((byte) 0);
115+
return protocol;
116+
}
117+
}
118+
}
119+
write(0);
120+
write((byte) 0);
121+
channel.flush();
122+
throw new BoltClientException("No supported Bolt Protocol version was found");
123+
}
124+
125+
private void write(int value) {
126+
log.log(
127+
System.Logger.Level.DEBUG,
128+
"C: [Bolt Handshake Manifest] %s",
129+
String.format("[%s]", toHexString(value)));
130+
channel.write(Unpooled.copyInt(value).asReadOnly());
131+
}
132+
133+
@SuppressWarnings("SameParameterValue")
134+
private void write(byte value) {
135+
log.log(
136+
System.Logger.Level.DEBUG,
137+
"C: [Bolt Handshake Manifest] %s",
138+
String.format("[%s]", toHexString(value)));
139+
channel.write(
140+
unreleasableBuffer(Unpooled.copiedBuffer(new byte[] {value})).asReadOnly());
141+
}
142+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Copyright (c) "Neo4j"
3+
* Neo4j Sweden AB [https://neo4j.com]
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.neo4j.driver.internal.bolt.basicimpl.impl.async.connection;
18+
19+
final class VarLongBuilder {
20+
private long value;
21+
private byte position;
22+
23+
public void add(long segment) {
24+
if (position > 8) {
25+
throw new IllegalStateException("Segment overflow");
26+
}
27+
segment = segment << (position * 7);
28+
value |= segment;
29+
position++;
30+
}
31+
32+
public long build() {
33+
return value;
34+
}
35+
}

0 commit comments

Comments
 (0)