Skip to content

Commit d56f7dd

Browse files
committed
extract bolt handshake to it own package
1 parent b1d0349 commit d56f7dd

File tree

3 files changed

+153
-42
lines changed

3 files changed

+153
-42
lines changed

src/internal/bolt/handshake.js

Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
/**
2+
* Copyright (c) 2002-2020 "Neo4j,"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
import {
20+
BOLT_PROTOCOL_V3,
21+
BOLT_PROTOCOL_V4_0,
22+
BOLT_PROTOCOL_V4_1,
23+
BOLT_PROTOCOL_V4_2,
24+
BOLT_PROTOCOL_V4_3
25+
} from '../constants'
26+
27+
import { alloc } from '../node'
28+
import { newError } from '../../error'
29+
import { memoize } from '../memoize'
30+
31+
const BOLT_MAGIC_PREAMBLE = 0x6060b017
32+
33+
function versionify (version) {
34+
const major = Math.floor(version)
35+
const minor = version * 10 - major * 10
36+
return {
37+
minor,
38+
major
39+
}
40+
}
41+
42+
function createHandshakeMessage (versions) {
43+
if (versions.length > 4) {
44+
throw newError('It should not have more than 4 versions of the protocol')
45+
}
46+
const handshakeBuffer = alloc(5 * 4)
47+
48+
handshakeBuffer.writeInt32(BOLT_MAGIC_PREAMBLE)
49+
50+
versions.forEach(version => {
51+
if (version instanceof Array) {
52+
const { major, minor } = versionify(version[0])
53+
const { minor: minMinor } = versionify(version[1])
54+
const range = minor - minMinor
55+
handshakeBuffer.writeInt32((range << 16) | (minor << 8) | major)
56+
} else {
57+
const { major, minor } = versionify(version)
58+
handshakeBuffer.writeInt32((minor << 8) | major)
59+
}
60+
})
61+
62+
handshakeBuffer.reset()
63+
64+
return handshakeBuffer
65+
}
66+
67+
function parseNegotiatedResponse (buffer) {
68+
const h = [
69+
buffer.readUInt8(),
70+
buffer.readUInt8(),
71+
buffer.readUInt8(),
72+
buffer.readUInt8()
73+
]
74+
if (h[0] === 0x48 && h[1] === 0x54 && h[2] === 0x54 && h[3] === 0x50) {
75+
throw newError(
76+
'Server responded HTTP. Make sure you are not trying to connect to the http endpoint ' +
77+
'(HTTP defaults to port 7474 whereas BOLT defaults to port 7687)'
78+
)
79+
}
80+
return Number(h[3] + '.' + h[2])
81+
}
82+
83+
const createHandshakeMessageMemoized = memoize(createHandshakeMessage)
84+
85+
/**
86+
* @return {BaseBuffer}
87+
* @private
88+
*/
89+
function newHandshakeBuffer () {
90+
return createHandshakeMessageMemoized([
91+
[BOLT_PROTOCOL_V4_3, BOLT_PROTOCOL_V4_2],
92+
BOLT_PROTOCOL_V4_1,
93+
BOLT_PROTOCOL_V4_0,
94+
BOLT_PROTOCOL_V3
95+
])
96+
}
97+
98+
/**
99+
* This callback is displayed as a global member.
100+
* @callback BufferConsumerCallback
101+
* @param {buffer} buffer the remaining buffer
102+
*/
103+
/**
104+
* @typedef HandshakeResult
105+
* @property {number} protocolVersion The protocol version negotiated in the handshake
106+
* @property {function(BufferConsumerCallback)} consumeRemainingBuffer A function to consume the remaining buffer if it exists
107+
*/
108+
/**
109+
* Shake hands using the channel and return the protocol version
110+
*
111+
* @param {Channel} channel the channel use to shake hands
112+
* @returns {Promise<HandshakeResult>} Promise of protocol version and consumeRemainingBuffer
113+
*/
114+
export default function handshake (channel) {
115+
return new Promise((resolve, reject) => {
116+
const handshakeErrorHandler = error => {
117+
reject(error)
118+
}
119+
120+
channel.onerror = handshakeErrorHandler.bind(this)
121+
if (channel._error) {
122+
handshakeErrorHandler(channel._error)
123+
}
124+
125+
channel.onmessage = buffer => {
126+
try {
127+
// read the response buffer and initialize the protocol
128+
const protocolVersion = parseNegotiatedResponse(buffer)
129+
130+
resolve({
131+
protocolVersion,
132+
consumeRemainingBuffer: consumer => {
133+
if (buffer.hasRemaining()) {
134+
consumer(buffer.readSlice(buffer.remaining()))
135+
}
136+
}
137+
})
138+
} catch (e) {
139+
this._handleFatalError(e)
140+
reject(e)
141+
}
142+
}
143+
144+
channel.write(newHandshakeBuffer())
145+
})
146+
}

src/internal/bolt/index.js

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
import handshake from './handshake'
2+
3+
export default {
4+
handshake
5+
}

src/internal/connection-channel.js

Lines changed: 2 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import Connection from './connection'
2929
import BoltProtocol from './bolt-protocol-v1'
3030
import { ResultStreamObserver } from './stream-observers'
3131
import buffer from 'vinyl-buffer'
32+
import Bolt from './bolt'
3233

3334
// Signature bytes for each response message type
3435
const SUCCESS = 0x70 // 0111 0000 // SUCCESS <metadata>
@@ -46,47 +47,6 @@ const NO_OP_OBSERVER = {
4647

4748
let idGenerator = 0
4849

49-
/**
50-
* Shake hands using the channel and return the protocol version
51-
*
52-
* @param {Channel} channel the channel use to shake hands
53-
* @returns {Promise<{}>} Promise of protocol version and consumeRemainingBuffer
54-
*/
55-
function shakeHands (channel) {
56-
return new Promise((resolve, reject) => {
57-
const handshakeErrorHandler = error => {
58-
reject(error)
59-
}
60-
61-
channel.onerror = handshakeErrorHandler.bind(this)
62-
if (channel._error) {
63-
// channel is already broken
64-
handshakeErrorHandler(channel._error)
65-
}
66-
67-
channel.onmessage = buffer => {
68-
try {
69-
// read the response buffer and initialize the protocol
70-
const protocolVersion = parseNegotiatedResponse(buffer)
71-
72-
resolve({
73-
protocolVersion,
74-
consumeRemainingBuffer: consumer => {
75-
if (buffer.hasRemaining()) {
76-
consumer(buffer.readSlice(buffer.remaining()))
77-
}
78-
}
79-
})
80-
} catch (e) {
81-
this._handleFatalError(e)
82-
reject(e)
83-
}
84-
}
85-
86-
channel.write(newHandshakeBuffer())
87-
})
88-
}
89-
9050
/**
9151
* Crete new connection to the provided address. Returned connection is not connected.
9252
* @param {ServerAddress} address - the Bolt endpoint to connect to.
@@ -110,7 +70,7 @@ export function createChannelConnection (
11070

11171
const channel = new Channel(channelConfig)
11272

113-
return shakeHands(channel)
73+
return Bolt.handshake(channel)
11474
.then(({ protocolVersion, consumeRemainingBuffer }) => {
11575
const connection = new ChannelConnection(
11676
channel,

0 commit comments

Comments
 (0)