Skip to content

Commit ce038a9

Browse files
committed
moving the handshake to before creating connection
1 parent e067d50 commit ce038a9

File tree

5 files changed

+169
-50
lines changed

5 files changed

+169
-50
lines changed

src/internal/connection-channel.js

Lines changed: 125 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,14 @@ import { Channel } from './node'
2121
import { Chunker, Dechunker } from './chunking'
2222
import { newError, PROTOCOL_ERROR } from '../error'
2323
import ChannelConfig from './channel-config'
24-
import ProtocolHandshaker from './protocol-handshaker'
24+
import ProtocolHandshaker, {
25+
newHandshakeBuffer,
26+
parseNegotiatedResponse
27+
} from './protocol-handshaker'
2528
import Connection from './connection'
2629
import BoltProtocol from './bolt-protocol-v1'
2730
import { ResultStreamObserver } from './stream-observers'
31+
import buffer from 'vinyl-buffer'
2832

2933
// Signature bytes for each response message type
3034
const SUCCESS = 0x70 // 0111 0000 // SUCCESS <metadata>
@@ -42,6 +46,118 @@ const NO_OP_OBSERVER = {
4246

4347
let idGenerator = 0
4448

49+
/**
50+
* Shake hands using the channel and return the protocol version
51+
*
52+
* @param {Channel} channel the channel use to shake hands
53+
* @returns {Promise<{}>} Promise of protocol version and consumeRemainingBuffer
54+
*/
55+
function shakeHands (channel) {
56+
return new Promise((resolve, reject) => {
57+
const handshakeErrorHandler = error => {
58+
reject(error)
59+
}
60+
61+
channel.onerror = handshakeErrorHandler.bind(this)
62+
if (channel._error) {
63+
// channel is already broken
64+
handshakeErrorHandler(channel._error)
65+
}
66+
67+
channel.onmessage = buffer => {
68+
try {
69+
// read the response buffer and initialize the protocol
70+
const protocolVersion = parseNegotiatedResponse(buffer)
71+
72+
resolve({
73+
protocolVersion,
74+
consumeRemainingBuffer: consumer => {
75+
if (buffer.hasRemaining()) {
76+
consumer(buffer.readSlice(buffer.remaining()))
77+
}
78+
}
79+
})
80+
} catch (e) {
81+
this._handleFatalError(e)
82+
reject(e)
83+
}
84+
}
85+
86+
channel.write(newHandshakeBuffer())
87+
})
88+
}
89+
90+
/**
91+
* Crete new connection to the provided address. Returned connection is not connected.
92+
* @param {ServerAddress} address - the Bolt endpoint to connect to.
93+
* @param {Object} config - the driver configuration.
94+
* @param {ConnectionErrorHandler} errorHandler - the error handler for connection errors.
95+
* @param {Logger} log - configured logger.
96+
* @return {Connection} - new connection.
97+
*/
98+
export function createChannelConnection (
99+
address,
100+
config,
101+
errorHandler,
102+
log,
103+
serversideRouting = null
104+
) {
105+
const channelConfig = new ChannelConfig(
106+
address,
107+
config,
108+
errorHandler.errorCode()
109+
)
110+
111+
const channel = new Channel(channelConfig)
112+
113+
return shakeHands(channel)
114+
.then(({ protocolVersion, consumeRemainingBuffer }) => {
115+
const connection = new ChannelConnection(
116+
channel,
117+
errorHandler,
118+
address,
119+
log,
120+
config.disableLosslessIntegers,
121+
serversideRouting
122+
)
123+
124+
// Temporary, this object will not be used in the feature
125+
const protocolHandshaker = new ProtocolHandshaker(
126+
connection,
127+
channel,
128+
connection._chunker,
129+
connection._disableLosslessIntegers,
130+
connection._log,
131+
connection._serversideRouting
132+
)
133+
134+
connection._protocol = protocolHandshaker._createProtocolWithVersion(
135+
protocolVersion
136+
)
137+
138+
// reset the error handler to just handle errors and forget about the handshake promise
139+
channel.onerror = connection._handleFatalError.bind(connection)
140+
141+
// Ok, protocol running. Simply forward all messages to the dechunker
142+
channel.onmessage = buf => connection._dechunker.write(buf)
143+
144+
// setup dechunker to dechunk messages and forward them to the message handler
145+
connection._dechunker.onmessage = buf => {
146+
connection._handleMessage(connection._protocol.unpacker().unpack(buf))
147+
}
148+
149+
// forward all pending bytes to the dechunker
150+
consumeRemainingBuffer(buffer => connection._dechunker.write(buffer))
151+
152+
return connection
153+
})
154+
.catch(reason =>
155+
channel.close().then(() => {
156+
throw reason
157+
})
158+
)
159+
}
160+
45161
export default class ChannelConnection extends Connection {
46162
/**
47163
* @constructor
@@ -148,6 +264,10 @@ export default class ChannelConnection extends Connection {
148264
* @return {Promise<Connection>} promise resolved with the current connection if handshake is successful. Rejected promise otherwise.
149265
*/
150266
_negotiateProtocol () {
267+
if (this._protocol) {
268+
return Promise.resolve(this)
269+
}
270+
151271
const protocolHandshaker = new ProtocolHandshaker(
152272
this,
153273
this._ch,
@@ -262,12 +382,10 @@ export default class ChannelConnection extends Connection {
262382
this._log.debug(`${this} C: ${message}`)
263383
}
264384

265-
this._protocol
266-
.packer()
267-
.packStruct(
268-
message.signature,
269-
message.fields.map(field => this._packable(field))
270-
)
385+
this._protocol.packer().packStruct(
386+
message.signature,
387+
message.fields.map(field => this._packable(field))
388+
)
271389

272390
this._chunker.messageBoundary()
273391

src/internal/connection-provider-direct.js

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
import PooledConnectionProvider from './connection-provider-pooled'
2121
import DelegateConnection from './connection-delegate'
22-
import ChannelConnection from './connection-channel'
22+
import { createChannelConnection } from './connection-channel'
2323
import { BOLT_PROTOCOL_V4_0, BOLT_PROTOCOL_V3 } from './constants'
2424

2525
export default class DirectConnectionProvider extends PooledConnectionProvider {
@@ -40,25 +40,24 @@ export default class DirectConnectionProvider extends PooledConnectionProvider {
4040
}
4141

4242
async _hasProtocolVersion (versionPredicate) {
43-
const connection = ChannelConnection.create(
43+
const connection = await createChannelConnection(
4444
this._address,
4545
this._config,
4646
this._createConnectionErrorHandler(),
4747
this._log
4848
)
4949

50-
try {
51-
await connection._negotiateProtocol()
50+
const protocolVersion = connection.protocol()
51+
? connection.protocol().version
52+
: null
5253

53-
const protocol = connection.protocol()
54-
if (protocol) {
55-
return versionPredicate(protocol.version)
56-
}
54+
await connection.close()
5755

58-
return false
59-
} finally {
60-
await connection.close()
56+
if (protocolVersion) {
57+
return versionPredicate(protocolVersion)
6158
}
59+
60+
return false
6261
}
6362

6463
async supportsMultiDb () {

src/internal/connection-provider-pooled.js

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
* limitations under the License.
1818
*/
1919

20-
import ChannelConnection from './connection-channel'
20+
import { createChannelConnection } from './connection-channel'
2121
import Pool from './pool'
2222
import PoolConfig from './pool-config'
2323
import ConnectionErrorHandler from './connection-error-handler'
@@ -39,7 +39,7 @@ export default class PooledConnectionProvider extends ConnectionProvider {
3939
this._createChannelConnection =
4040
createChannelConnectionHook ||
4141
(address => {
42-
return ChannelConnection.create(
42+
return createChannelConnection(
4343
address,
4444
this._config,
4545
this._createConnectionErrorHandler(),
@@ -72,15 +72,17 @@ export default class PooledConnectionProvider extends ConnectionProvider {
7272
* @access private
7373
*/
7474
_createConnection (address, release) {
75-
const connection = this._createChannelConnection(address)
76-
connection._release = () => release(address, connection)
77-
this._openConnections[connection.id] = connection
78-
79-
return connection.connect(this._userAgent, this._authToken).catch(error => {
80-
// let's destroy this connection
81-
this._destroyConnection(connection)
82-
// propagate the error because connection failed to connect / initialize
83-
throw error
75+
return this._createChannelConnection(address).then(connection => {
76+
connection._release = () => release(address, connection)
77+
this._openConnections[connection.id] = connection
78+
return connection
79+
.connect(this._userAgent, this._authToken)
80+
.catch(error => {
81+
// let's destroy this connection
82+
this._destroyConnection(connection)
83+
// propagate the error because connection failed to connect / initialize
84+
throw error
85+
})
8486
})
8587
}
8688

src/internal/connection-provider-routing.js

Lines changed: 18 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import ConnectionErrorHandler from './connection-error-handler'
2929
import DelegateConnection from './connection-delegate'
3030
import LeastConnectedLoadBalancingStrategy from './least-connected-load-balancing-strategy'
3131
import Bookmark from './bookmark'
32-
import ChannelConnection from './connection-channel'
32+
import { createChannelConnection } from './connection-channel'
3333
import { int } from '../integer'
3434
import { BOLT_PROTOCOL_V3, BOLT_PROTOCOL_V4_0 } from './constants'
3535

@@ -53,7 +53,7 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
5353
routingTablePurgeDelay
5454
}) {
5555
super({ id, config, log, userAgent, authToken }, address => {
56-
return ChannelConnection.create(
56+
return createChannelConnection(
5757
address,
5858
this._config,
5959
this._createConnectionErrorHandler(),
@@ -164,27 +164,30 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
164164

165165
let lastError
166166
for (let i = 0; i < addresses.length; i++) {
167-
const connection = ChannelConnection.create(
168-
addresses[i],
169-
this._config,
170-
this._createConnectionErrorHandler(),
171-
this._log
172-
)
173-
174167
try {
175-
await connection._negotiateProtocol()
168+
const connection = await createChannelConnection(
169+
this._address,
170+
this._config,
171+
this._createConnectionErrorHandler(),
172+
this._log
173+
)
176174

177-
const protocol = connection.protocol()
178-
if (protocol) {
179-
return versionPredicate(protocol.version)
175+
const protocolVersion = connection.protocol()
176+
? connection.protocol().version
177+
: null
178+
179+
await connection.close()
180+
181+
if (protocolVersion) {
182+
return versionPredicate(protocolVersion)
180183
}
181184

182185
return false
183186
} catch (error) {
184187
lastError = error
185-
} finally {
186-
await connection.close()
187188
}
189+
190+
return false
188191
}
189192

190193
if (lastError) {

src/internal/protocol-handshaker.js

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ function createHandshakeMessage (versions) {
7171
return handshakeBuffer
7272
}
7373

74-
function parseNegotiatedResponse (buffer) {
74+
export function parseNegotiatedResponse (buffer) {
7575
const h = [
7676
buffer.readUInt8(),
7777
buffer.readUInt8(),
@@ -118,10 +118,7 @@ export default class ProtocolHandshaker {
118118
* Write a Bolt handshake into the underlying network channel.
119119
*/
120120
writeHandshakeRequest () {
121-
this._chunker.write(newHandshakeBuffer())
122-
this._chunker.messageBoundary()
123-
this._chunker.flush()
124-
// this._channel.write(newHandshakeBuffer())
121+
this._channel.write(newHandshakeBuffer())
125122
}
126123

127124
/**
@@ -201,7 +198,7 @@ export default class ProtocolHandshaker {
201198
* @return {BaseBuffer}
202199
* @private
203200
*/
204-
function newHandshakeBuffer () {
201+
export function newHandshakeBuffer () {
205202
return createHandshakeMessageMemoized([
206203
[BOLT_PROTOCOL_V4_3, BOLT_PROTOCOL_V4_2],
207204
BOLT_PROTOCOL_V4_1,

0 commit comments

Comments
 (0)