diff --git a/packages/bolt-connection/src/bolt/bolt-protocol-v5x6.js b/packages/bolt-connection/src/bolt/bolt-protocol-v5x6.js index d1177d28b..82f3cc5b8 100644 --- a/packages/bolt-connection/src/bolt/bolt-protocol-v5x6.js +++ b/packages/bolt-connection/src/bolt/bolt-protocol-v5x6.js @@ -16,7 +16,7 @@ */ import BoltProtocolV5x5 from './bolt-protocol-v5x5' -import transformersFactories from './bolt-protocol-v5x5.transformer' +import transformersFactories from './bolt-protocol-v5x6.transformer' import Transformer from './transformer' import { internal } from 'neo4j-driver-core' diff --git a/packages/bolt-connection/src/bolt/bolt-protocol-v5x7.js b/packages/bolt-connection/src/bolt/bolt-protocol-v5x7.js index 719e3b1bc..6636d1d75 100644 --- a/packages/bolt-connection/src/bolt/bolt-protocol-v5x7.js +++ b/packages/bolt-connection/src/bolt/bolt-protocol-v5x7.js @@ -16,7 +16,7 @@ */ import BoltProtocolV5x6 from './bolt-protocol-v5x6' -import transformersFactories from './bolt-protocol-v5x5.transformer' +import transformersFactories from './bolt-protocol-v5x7.transformer' import Transformer from './transformer' import { internal } from 'neo4j-driver-core' diff --git a/packages/bolt-connection/src/bolt/bolt-protocol-v5x8.js b/packages/bolt-connection/src/bolt/bolt-protocol-v5x8.js new file mode 100644 index 000000000..ce7b86269 --- /dev/null +++ b/packages/bolt-connection/src/bolt/bolt-protocol-v5x8.js @@ -0,0 +1,104 @@ +/** + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [https://neo4j.com] + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import BoltProtocolV5x7 from './bolt-protocol-v5x7' + +import transformersFactories from './bolt-protocol-v5x8.transformer' +import Transformer from './transformer' +import RequestMessage from './request-message' +import { ResultStreamObserver } from './stream-observers' + +import { internal } from 'neo4j-driver-core' + +const { + constants: { BOLT_PROTOCOL_V5_8, FETCH_ALL } +} = internal + +export default class BoltProtocol extends BoltProtocolV5x7 { + get version () { + return BOLT_PROTOCOL_V5_8 + } + + get transformer () { + if (this._transformer === undefined) { + this._transformer = new Transformer(Object.values(transformersFactories).map(create => create(this._config, this._log))) + } + return this._transformer + } + + run ( + query, + parameters, + { + bookmarks, + txConfig, + database, + mode, + impersonatedUser, + notificationFilter, + beforeKeys, + afterKeys, + beforeError, + afterError, + beforeComplete, + afterComplete, + flush = true, + reactive = false, + fetchSize = FETCH_ALL, + highRecordWatermark = Number.MAX_VALUE, + lowRecordWatermark = Number.MAX_VALUE, + onDb + } = {} + ) { + const observer = new ResultStreamObserver({ + server: this._server, + reactive, + fetchSize, + moreFunction: this._requestMore.bind(this), + discardFunction: this._requestDiscard.bind(this), + beforeKeys, + afterKeys, + beforeError, + afterError, + beforeComplete, + afterComplete, + highRecordWatermark, + lowRecordWatermark, + enrichMetadata: this._enrichMetadata, + onDb + }) + + const flushRun = reactive + this.write( + RequestMessage.runWithMetadata5x5(query, parameters, { + bookmarks, + txConfig, + database, + mode, + impersonatedUser, + notificationFilter + }), + observer, + flushRun && flush + ) + + if (!reactive) { + this.write(RequestMessage.pull({ n: fetchSize }), observer, flush) + } + + return observer + } +} diff --git a/packages/bolt-connection/src/bolt/bolt-protocol-v5x8.transformer.js b/packages/bolt-connection/src/bolt/bolt-protocol-v5x8.transformer.js new file mode 100644 index 000000000..e0f0c13bd --- /dev/null +++ b/packages/bolt-connection/src/bolt/bolt-protocol-v5x8.transformer.js @@ -0,0 +1,22 @@ +/** + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [https://neo4j.com] + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import v5x7 from './bolt-protocol-v5x7.transformer' + +export default { + ...v5x7 +} diff --git a/packages/bolt-connection/src/bolt/create.js b/packages/bolt-connection/src/bolt/create.js index fe204b612..1b8f792a5 100644 --- a/packages/bolt-connection/src/bolt/create.js +++ b/packages/bolt-connection/src/bolt/create.js @@ -32,6 +32,7 @@ import BoltProtocolV5x4 from './bolt-protocol-v5x4' import BoltProtocolV5x5 from './bolt-protocol-v5x5' import BoltProtocolV5x6 from './bolt-protocol-v5x6' import BoltProtocolV5x7 from './bolt-protocol-v5x7' +import BoltProtocolV5x8 from './bolt-protocol-v5x8' // eslint-disable-next-line no-unused-vars import { Chunker, Dechunker } from '../channel' import ResponseHandler from './response-handler' @@ -257,6 +258,14 @@ function createProtocol ( log, onProtocolError, serversideRouting) + case 5.8: + return new BoltProtocolV5x8(server, + chunker, + packingConfig, + createResponseHandler, + log, + onProtocolError, + serversideRouting) default: throw newError('Unknown Bolt protocol version: ' + version) } diff --git a/packages/bolt-connection/src/bolt/handshake.js b/packages/bolt-connection/src/bolt/handshake.js index abf318025..58250e2af 100644 --- a/packages/bolt-connection/src/bolt/handshake.js +++ b/packages/bolt-connection/src/bolt/handshake.js @@ -76,7 +76,7 @@ function parseNegotiatedResponse (buffer, log) { */ function newHandshakeBuffer () { return createHandshakeMessage([ - [version(5, 7), version(5, 0)], + [version(5, 8), version(5, 0)], [version(4, 4), version(4, 2)], version(4, 1), version(3, 0) diff --git a/packages/bolt-connection/src/bolt/stream-observers.js b/packages/bolt-connection/src/bolt/stream-observers.js index 336cbb30a..8bda32aea 100644 --- a/packages/bolt-connection/src/bolt/stream-observers.js +++ b/packages/bolt-connection/src/bolt/stream-observers.js @@ -79,7 +79,8 @@ class ResultStreamObserver extends StreamObserver { server, highRecordWatermark = Number.MAX_VALUE, lowRecordWatermark = Number.MAX_VALUE, - enrichMetadata + enrichMetadata, + onDb } = {}) { super() @@ -113,6 +114,7 @@ class ResultStreamObserver extends StreamObserver { this._paused = false this._pulled = !reactive this._haveRecordStreamed = false + this._onDb = onDb } /** @@ -319,6 +321,10 @@ class ResultStreamObserver extends StreamObserver { } } + if (meta.db !== null && this._onDb !== undefined) { + this._onDb(meta.db) + } + if (meta.fields != null) { // remove fields key from metadata object delete meta.fields diff --git a/packages/bolt-connection/src/connection-provider/connection-provider-routing.js b/packages/bolt-connection/src/connection-provider/connection-provider-routing.js index 8653b64ca..38243b868 100644 --- a/packages/bolt-connection/src/connection-provider/connection-provider-routing.js +++ b/packages/bolt-connection/src/connection-provider/connection-provider-routing.js @@ -78,7 +78,8 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider this._createConnectionErrorHandler(), this._log, await this._clientCertificateHolder.getClientCertificate(), - this._routingContext + this._routingContext, + this._channelSsrCallback.bind(this) ) }) @@ -99,6 +100,8 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider ) this._refreshRoutingTable = functional.reuseOngoingRequest(this._refreshRoutingTable, this) + this._withSSR = 0 + this._withoutSSR = 0 } _createConnectionErrorHandler () { @@ -139,19 +142,30 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider * See {@link ConnectionProvider} for more information about this method and * its arguments. */ - async acquireConnection ({ accessMode, database, bookmarks, impersonatedUser, onDatabaseNameResolved, auth } = {}) { - let name - let address + async acquireConnection ({ accessMode, database, bookmarks, impersonatedUser, onDatabaseNameResolved, auth, homeDb } = {}) { const context = { database: database || DEFAULT_DB_NAME } const databaseSpecificErrorHandler = new ConnectionErrorHandler( SESSION_EXPIRED, (error, address) => this._handleUnavailability(error, address, context.database), - (error, address) => this._handleWriteFailure(error, address, context.database), - (error, address, conn) => - this._handleSecurityError(error, address, conn, context.database) + (error, address) => this._handleWriteFailure(error, address, homeDb ?? context.database), + (error, address, conn) => this._handleSecurityError(error, address, conn, context.database) ) + let conn + if (this.SSREnabled() && homeDb !== undefined && database === '') { + const currentRoutingTable = this._routingTableRegistry.get( + homeDb, + () => new RoutingTable({ database: homeDb }) + ) + if (currentRoutingTable && !currentRoutingTable.isStaleFor(accessMode)) { + conn = await this.getConnectionFromRoutingTable(currentRoutingTable, auth, accessMode, databaseSpecificErrorHandler) + if (this.SSREnabled()) { + return conn + } + conn.release() + } + } const routingTable = await this._freshRoutingTable({ accessMode, database: context.database, @@ -165,7 +179,12 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider } } }) + return this.getConnectionFromRoutingTable(routingTable, auth, accessMode, databaseSpecificErrorHandler) + } + async getConnectionFromRoutingTable (routingTable, auth, accessMode, databaseSpecificErrorHandler) { + let name + let address // select a target server based on specified access mode if (accessMode === READ) { address = this._loadBalancingStrategy.selectReader(routingTable.readers) @@ -663,6 +682,28 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider routingTable.forgetRouter(address) } } + + _channelSsrCallback (isEnabled, action) { + if (action === 'OPEN') { + if (isEnabled === true) { + this._withSSR = this._withSSR + 1 + } else { + this._withoutSSR = this._withoutSSR + 1 + } + } else if (action === 'CLOSE') { + if (isEnabled === true) { + this._withSSR = this._withSSR - 1 + } else { + this._withoutSSR = this._withoutSSR - 1 + } + } else { + throw newError("Channel SSR Callback invoked with action other than 'OPEN' or 'CLOSE'") + } + } + + SSREnabled () { + return this._withSSR > 0 && this._withoutSSR === 0 + } } /** diff --git a/packages/bolt-connection/src/connection/connection-channel.js b/packages/bolt-connection/src/connection/connection-channel.js index bf4e65ca7..b57542462 100644 --- a/packages/bolt-connection/src/connection/connection-channel.js +++ b/packages/bolt-connection/src/connection/connection-channel.js @@ -34,6 +34,8 @@ let idGenerator = 0 * @param {ConnectionErrorHandler} errorHandler - the error handler for connection errors. * @param {Logger} log - configured logger. * @param {clientCertificate} clientCertificate - configured client certificate + * @param ssrCallback - callback function used to update the counts of ssr enabled and disabled connections + * @param createChannel - function taking a channelConfig object and creating a channel with it * @return {Connection} - new connection. */ export function createChannelConnection ( @@ -43,6 +45,7 @@ export function createChannelConnection ( log, clientCertificate, serversideRouting = null, + ssrCallback, createChannel = channelConfig => new Channel(channelConfig) ) { const channelConfig = new ChannelConfig( @@ -89,7 +92,8 @@ export function createChannelConnection ( chunker, config.notificationFilter, createProtocol, - config.telemetryDisabled + config.telemetryDisabled, + ssrCallback ) // forward all pending bytes to the dechunker @@ -110,9 +114,11 @@ export default class ChannelConnection extends Connection { * @param {ConnectionErrorHandler} errorHandler the error handler. * @param {ServerAddress} address - the server address to connect to. * @param {Logger} log - the configured logger. - * @param {boolean} disableLosslessIntegers if this connection should convert all received integers to native JS numbers. - * @param {Chunker} chunker the chunker - * @param protocolSupplier Bolt protocol supplier + * @param {boolean} disableLosslessIntegers - if this connection should convert all received integers to native JS numbers. + * @param {Chunker} chunker - the chunker + * @param protocolSupplier - Bolt protocol supplier + * @param {boolean} telemetryDisabled - wether telemetry has been disabled in driver config. + * @param ssrCallback - callback function used to update the counts of ssr enabled and disabled connections. */ constructor ( channel, @@ -124,7 +130,8 @@ export default class ChannelConnection extends Connection { chunker, // to be removed, notificationFilter, protocolSupplier, - telemetryDisabled + telemetryDisabled, + ssrCallback = (_) => {} ) { super(errorHandler) this._authToken = null @@ -143,6 +150,7 @@ export default class ChannelConnection extends Connection { this._notificationFilter = notificationFilter this._telemetryDisabledDriverConfig = telemetryDisabled === true this._telemetryDisabledConnection = true + this._ssrCallback = ssrCallback // connection from the database, returned in response for HELLO message and might not be available this._dbConnectionId = null @@ -331,7 +339,9 @@ export default class ChannelConnection extends Connection { if (telemetryEnabledHint === true) { this._telemetryDisabledConnection = false } + this.SSREnabledHint = metadata.hints['ssr.enabled'] } + this._ssrCallback(this.SSREnabledHint ?? false, 'OPEN') } resolve(self) } @@ -538,6 +548,7 @@ export default class ChannelConnection extends Connection { * @returns {Promise} - A promise that will be resolved when the underlying channel is closed. */ async close () { + this._ssrCallback(this.SSREnabledHint ?? false, 'CLOSE') if (this._log.isDebugEnabled()) { this._log.debug('closing') } diff --git a/packages/bolt-connection/test/bolt/__snapshots__/bolt-protocol-v5x8.test.js.snap b/packages/bolt-connection/test/bolt/__snapshots__/bolt-protocol-v5x8.test.js.snap new file mode 100644 index 000000000..38356e707 --- /dev/null +++ b/packages/bolt-connection/test/bolt/__snapshots__/bolt-protocol-v5x8.test.js.snap @@ -0,0 +1,61 @@ +// Jest Snapshot v1, https://goo.gl/fbAQLP + +exports[`#unit BoltProtocolV5x8 .packable() should resultant function not pack graph types (Node) 1`] = `"It is not allowed to pass nodes in query parameters, given: (c:a {a:"b"})"`; + +exports[`#unit BoltProtocolV5x8 .packable() should resultant function not pack graph types (Path) 1`] = `"It is not allowed to pass paths in query parameters, given: [object Object]"`; + +exports[`#unit BoltProtocolV5x8 .packable() should resultant function not pack graph types (Relationship) 1`] = `"It is not allowed to pass relationships in query parameters, given: (e)-[:a {b:"c"}]->(f)"`; + +exports[`#unit BoltProtocolV5x8 .packable() should resultant function not pack graph types (UnboundRelationship) 1`] = `"It is not allowed to pass unbound relationships in query parameters, given: -[:a {b:"c"}]->"`; + +exports[`#unit BoltProtocolV5x8 .unpack() should not unpack with wrong size (Date with less fields) 1`] = `"Wrong struct size for Date, expected 1 but was 0"`; + +exports[`#unit BoltProtocolV5x8 .unpack() should not unpack with wrong size (Date with more fields) 1`] = `"Wrong struct size for Date, expected 1 but was 2"`; + +exports[`#unit BoltProtocolV5x8 .unpack() should not unpack with wrong size (DateTimeWithZoneId with less fields) 1`] = `"Wrong struct size for DateTimeWithZoneId, expected 3 but was 2"`; + +exports[`#unit BoltProtocolV5x8 .unpack() should not unpack with wrong size (DateTimeWithZoneId with more fields) 1`] = `"Wrong struct size for DateTimeWithZoneId, expected 3 but was 4"`; + +exports[`#unit BoltProtocolV5x8 .unpack() should not unpack with wrong size (DateTimeWithZoneOffset with less fields) 1`] = `"Wrong struct size for DateTimeWithZoneOffset, expected 3 but was 2"`; + +exports[`#unit BoltProtocolV5x8 .unpack() should not unpack with wrong size (DateTimeWithZoneOffset with more fields) 1`] = `"Wrong struct size for DateTimeWithZoneOffset, expected 3 but was 4"`; + +exports[`#unit BoltProtocolV5x8 .unpack() should not unpack with wrong size (Duration with less fields) 1`] = `"Wrong struct size for Duration, expected 4 but was 3"`; + +exports[`#unit BoltProtocolV5x8 .unpack() should not unpack with wrong size (Duration with more fields) 1`] = `"Wrong struct size for Duration, expected 4 but was 5"`; + +exports[`#unit BoltProtocolV5x8 .unpack() should not unpack with wrong size (LocalDateTime with less fields) 1`] = `"Wrong struct size for LocalDateTime, expected 2 but was 1"`; + +exports[`#unit BoltProtocolV5x8 .unpack() should not unpack with wrong size (LocalDateTime with more fields) 1`] = `"Wrong struct size for LocalDateTime, expected 2 but was 3"`; + +exports[`#unit BoltProtocolV5x8 .unpack() should not unpack with wrong size (LocalTime with less fields) 1`] = `"Wrong struct size for LocalTime, expected 1 but was 0"`; + +exports[`#unit BoltProtocolV5x8 .unpack() should not unpack with wrong size (LocalTime with more fields) 1`] = `"Wrong struct size for LocalTime, expected 1 but was 2"`; + +exports[`#unit BoltProtocolV5x8 .unpack() should not unpack with wrong size (Node with less fields) 1`] = `"Wrong struct size for Node, expected 4 but was 3"`; + +exports[`#unit BoltProtocolV5x8 .unpack() should not unpack with wrong size (Node with more fields) 1`] = `"Wrong struct size for Node, expected 4 but was 5"`; + +exports[`#unit BoltProtocolV5x8 .unpack() should not unpack with wrong size (Path with less fields) 1`] = `"Wrong struct size for Path, expected 3 but was 2"`; + +exports[`#unit BoltProtocolV5x8 .unpack() should not unpack with wrong size (Path with more fields) 1`] = `"Wrong struct size for Path, expected 3 but was 4"`; + +exports[`#unit BoltProtocolV5x8 .unpack() should not unpack with wrong size (Point with less fields) 1`] = `"Wrong struct size for Point2D, expected 3 but was 2"`; + +exports[`#unit BoltProtocolV5x8 .unpack() should not unpack with wrong size (Point with more fields) 1`] = `"Wrong struct size for Point2D, expected 3 but was 4"`; + +exports[`#unit BoltProtocolV5x8 .unpack() should not unpack with wrong size (Point3D with less fields) 1`] = `"Wrong struct size for Point3D, expected 4 but was 3"`; + +exports[`#unit BoltProtocolV5x8 .unpack() should not unpack with wrong size (Point3D with more fields) 1`] = `"Wrong struct size for Point3D, expected 4 but was 5"`; + +exports[`#unit BoltProtocolV5x8 .unpack() should not unpack with wrong size (Relationship with less fields) 1`] = `"Wrong struct size for Relationship, expected 8 but was 5"`; + +exports[`#unit BoltProtocolV5x8 .unpack() should not unpack with wrong size (Relationship with more fields) 1`] = `"Wrong struct size for Relationship, expected 8 but was 9"`; + +exports[`#unit BoltProtocolV5x8 .unpack() should not unpack with wrong size (Time with less fields) 1`] = `"Wrong struct size for Time, expected 2 but was 1"`; + +exports[`#unit BoltProtocolV5x8 .unpack() should not unpack with wrong size (Time with more fileds) 1`] = `"Wrong struct size for Time, expected 2 but was 3"`; + +exports[`#unit BoltProtocolV5x8 .unpack() should not unpack with wrong size (UnboundRelationship with less fields) 1`] = `"Wrong struct size for UnboundRelationship, expected 4 but was 3"`; + +exports[`#unit BoltProtocolV5x8 .unpack() should not unpack with wrong size (UnboundRelationship with more fields) 1`] = `"Wrong struct size for UnboundRelationship, expected 4 but was 5"`; diff --git a/packages/bolt-connection/test/bolt/bolt-protocol-v5x8.test.js b/packages/bolt-connection/test/bolt/bolt-protocol-v5x8.test.js new file mode 100644 index 000000000..0ba26e7a3 --- /dev/null +++ b/packages/bolt-connection/test/bolt/bolt-protocol-v5x8.test.js @@ -0,0 +1,1579 @@ +/** + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [https://neo4j.com] + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import BoltProtocolV5x8 from '../../src/bolt/bolt-protocol-v5x8' +import RequestMessage from '../../src/bolt/request-message' +import { v2, structure } from '../../src/packstream' +import utils from '../test-utils' +import { LoginObserver, RouteObserver } from '../../src/bolt/stream-observers' +import fc from 'fast-check' +import { + Date, + DateTime, + Duration, + LocalDateTime, + LocalTime, + Path, + PathSegment, + Point, + Relationship, + Time, + UnboundRelationship, + Node, + internal +} from 'neo4j-driver-core' + +import { alloc } from '../../src/channel' +import { notificationFilterBehaviour, telemetryBehaviour } from './behaviour' + +const WRITE = 'WRITE' + +const { + txConfig: { TxConfig }, + bookmarks: { Bookmarks }, + logger: { Logger }, + temporalUtil +} = internal + +describe('#unit BoltProtocolV5x8', () => { + beforeEach(() => { + expect.extend(utils.matchers) + }) + + telemetryBehaviour.protocolSupportsTelemetry(newProtocol) + + it('should enrich error metadata', () => { + const protocol = new BoltProtocolV5x8() + const enrichedData = protocol.enrichErrorMetadata({ neo4j_code: 'hello', diagnostic_record: {} }) + expect(enrichedData.code).toBe('hello') + expect(enrichedData.diagnostic_record.OPERATION).toBe('') + expect(enrichedData.diagnostic_record.OPERATION_CODE).toBe('0') + expect(enrichedData.diagnostic_record.CURRENT_SCHEMA).toBe('/') + }) + + it('should request routing information', () => { + const recorder = new utils.MessageRecordingConnection() + const protocol = new BoltProtocolV5x8(recorder, null, false) + utils.spyProtocolWrite(protocol) + const routingContext = { someContextParam: 'value' } + const databaseName = 'name' + + const observer = protocol.requestRoutingInformation({ + routingContext, + databaseName + }) + + protocol.verifyMessageCount(1) + expect(protocol.messages[0]).toBeMessage( + RequestMessage.routeV4x4(routingContext, [], { databaseName, impersonatedUser: null }) + ) + expect(protocol.observers).toEqual([observer]) + expect(observer).toEqual(expect.any(RouteObserver)) + expect(protocol.flushes).toEqual([true]) + }) + + it('should request routing information sending bookmarks', () => { + const recorder = new utils.MessageRecordingConnection() + const protocol = new BoltProtocolV5x8(recorder, null, false) + utils.spyProtocolWrite(protocol) + const routingContext = { someContextParam: 'value' } + const listOfBookmarks = ['a', 'b', 'c'] + const bookmarks = new Bookmarks(listOfBookmarks) + const databaseName = 'name' + + const observer = protocol.requestRoutingInformation({ + routingContext, + databaseName, + sessionContext: { bookmarks } + }) + + protocol.verifyMessageCount(1) + expect(protocol.messages[0]).toBeMessage( + RequestMessage.routeV4x4(routingContext, listOfBookmarks, { databaseName, impersonatedUser: null }) + ) + expect(protocol.observers).toEqual([observer]) + expect(observer).toEqual(expect.any(RouteObserver)) + expect(protocol.flushes).toEqual([true]) + }) + + it('should run a query', () => { + const database = 'testdb' + const bookmarks = new Bookmarks([ + 'neo4j:bookmark:v1:tx1', + 'neo4j:bookmark:v1:tx2' + ]) + const txConfig = new TxConfig({ + timeout: 5000, + metadata: { x: 1, y: 'something' } + }) + const recorder = new utils.MessageRecordingConnection() + const protocol = new BoltProtocolV5x8(recorder, null, false) + utils.spyProtocolWrite(protocol) + + const query = 'RETURN $x, $y' + const parameters = { x: 'x', y: 'y' } + + const observer = protocol.run(query, parameters, { + bookmarks, + txConfig, + database, + mode: WRITE + }) + + protocol.verifyMessageCount(2) + + expect(protocol.messages[0]).toBeMessage( + RequestMessage.runWithMetadata(query, parameters, { + bookmarks, + txConfig, + database, + mode: WRITE + }) + ) + expect(protocol.messages[1]).toBeMessage(RequestMessage.pull()) + expect(protocol.observers).toEqual([observer, observer]) + expect(protocol.flushes).toEqual([false, true]) + }) + + it('should run a with impersonated user', () => { + const database = 'testdb' + const impersonatedUser = 'the impostor' + const bookmarks = new Bookmarks([ + 'neo4j:bookmark:v1:tx1', + 'neo4j:bookmark:v1:tx2' + ]) + const txConfig = new TxConfig({ + timeout: 5000, + metadata: { x: 1, y: 'something' } + }) + const recorder = new utils.MessageRecordingConnection() + const protocol = new BoltProtocolV5x8(recorder, null, false) + utils.spyProtocolWrite(protocol) + + const query = 'RETURN $x, $y' + const parameters = { x: 'x', y: 'y' } + + const observer = protocol.run(query, parameters, { + bookmarks, + txConfig, + database, + mode: WRITE, + impersonatedUser + }) + + protocol.verifyMessageCount(2) + + expect(protocol.messages[0]).toBeMessage( + RequestMessage.runWithMetadata(query, parameters, { + bookmarks, + txConfig, + database, + mode: WRITE, + impersonatedUser + }) + ) + expect(protocol.messages[1]).toBeMessage(RequestMessage.pull()) + expect(protocol.observers).toEqual([observer, observer]) + expect(protocol.flushes).toEqual([false, true]) + }) + + it('should begin a transaction', () => { + const database = 'testdb' + const bookmarks = new Bookmarks([ + 'neo4j:bookmark:v1:tx1', + 'neo4j:bookmark:v1:tx2' + ]) + const txConfig = new TxConfig({ + timeout: 5000, + metadata: { x: 1, y: 'something' } + }) + const recorder = new utils.MessageRecordingConnection() + const protocol = new BoltProtocolV5x8(recorder, null, false) + utils.spyProtocolWrite(protocol) + + const observer = protocol.beginTransaction({ + bookmarks, + txConfig, + database, + mode: WRITE + }) + + protocol.verifyMessageCount(1) + expect(protocol.messages[0]).toBeMessage( + RequestMessage.begin({ bookmarks, txConfig, database, mode: WRITE }) + ) + expect(protocol.observers).toEqual([observer]) + expect(protocol.flushes).toEqual([true]) + }) + + it('should begin a transaction with impersonated user', () => { + const database = 'testdb' + const impersonatedUser = 'the impostor' + const bookmarks = new Bookmarks([ + 'neo4j:bookmark:v1:tx1', + 'neo4j:bookmark:v1:tx2' + ]) + const txConfig = new TxConfig({ + timeout: 5000, + metadata: { x: 1, y: 'something' } + }) + const recorder = new utils.MessageRecordingConnection() + const protocol = new BoltProtocolV5x8(recorder, null, false) + utils.spyProtocolWrite(protocol) + + const observer = protocol.beginTransaction({ + bookmarks, + txConfig, + database, + mode: WRITE, + impersonatedUser + }) + + protocol.verifyMessageCount(1) + expect(protocol.messages[0]).toBeMessage( + RequestMessage.begin({ bookmarks, txConfig, database, mode: WRITE, impersonatedUser }) + ) + expect(protocol.observers).toEqual([observer]) + expect(protocol.flushes).toEqual([true]) + }) + + it('should return correct bolt version number', () => { + const protocol = new BoltProtocolV5x8(null, null, false) + + expect(protocol.version).toBe(5.8) + }) + + it('should update metadata', () => { + const metadata = { t_first: 1, t_last: 2, db_hits: 3, some_other_key: 4 } + const protocol = new BoltProtocolV5x8(null, null, false) + + const transformedMetadata = protocol.transformMetadata(metadata) + + expect(transformedMetadata).toEqual({ + result_available_after: 1, + result_consumed_after: 2, + db_hits: 3, + some_other_key: 4 + }) + }) + + it('should initialize connection', () => { + const recorder = new utils.MessageRecordingConnection() + const protocol = new BoltProtocolV5x8(recorder, null, false) + utils.spyProtocolWrite(protocol) + + const clientName = 'js-driver/1.2.3' + const boltAgent = { + product: 'neo4j-javascript/5.28', + platform: 'netbsd 1.1.1; Some arch', + languageDetails: 'Node/16.0.1 (v8 1.7.0)' + } + const authToken = { username: 'neo4j', password: 'secret' } + + const observer = protocol.initialize({ userAgent: clientName, boltAgent, authToken }) + + protocol.verifyMessageCount(2) + expect(protocol.messages[0]).toBeMessage( + RequestMessage.hello5x3(clientName, boltAgent) + ) + expect(protocol.messages[1]).toBeMessage( + RequestMessage.logon(authToken) + ) + + expect(protocol.observers.length).toBe(2) + + // hello observer + const helloObserver = protocol.observers[0] + expect(helloObserver).toBeInstanceOf(LoginObserver) + expect(helloObserver).not.toBe(observer) + + // login observer + const loginObserver = protocol.observers[1] + expect(loginObserver).toBeInstanceOf(LoginObserver) + expect(loginObserver).toBe(observer) + + expect(protocol.flushes).toEqual([false, true]) + }) + + it.each([ + 'javascript-driver/5.8.0', + '', + undefined, + null + ])('should always use the user agent set by the user', (userAgent) => { + const recorder = new utils.MessageRecordingConnection() + const protocol = new BoltProtocolV5x8(recorder, null, false) + utils.spyProtocolWrite(protocol) + + const boltAgent = { + product: 'neo4j-javascript/5.28', + platform: 'netbsd 1.1.1; Some arch', + languageDetails: 'Node/16.0.1 (v8 1.7.0)' + } + const authToken = { username: 'neo4j', password: 'secret' } + + const observer = protocol.initialize({ userAgent, boltAgent, authToken }) + + protocol.verifyMessageCount(2) + expect(protocol.messages[0]).toBeMessage( + RequestMessage.hello5x3(userAgent, boltAgent) + ) + expect(protocol.messages[1]).toBeMessage( + RequestMessage.logon(authToken) + ) + + expect(protocol.observers.length).toBe(2) + + // hello observer + const helloObserver = protocol.observers[0] + expect(helloObserver).toBeInstanceOf(LoginObserver) + expect(helloObserver).not.toBe(observer) + + // login observer + const loginObserver = protocol.observers[1] + expect(loginObserver).toBeInstanceOf(LoginObserver) + expect(loginObserver).toBe(observer) + + expect(protocol.flushes).toEqual([false, true]) + }) + + it.each( + [true, false] + )('should logon to the server [flush=%s]', (flush) => { + const recorder = new utils.MessageRecordingConnection() + const protocol = new BoltProtocolV5x8(recorder, null, false) + utils.spyProtocolWrite(protocol) + + const authToken = { username: 'neo4j', password: 'secret' } + + const observer = protocol.logon({ authToken, flush }) + + protocol.verifyMessageCount(1) + + expect(protocol.messages[0]).toBeMessage( + RequestMessage.logon(authToken) + ) + + expect(protocol.observers).toEqual([observer]) + expect(protocol.flushes).toEqual([flush]) + }) + + it.each( + [true, false] + )('should logoff from the server [flush=%s]', (flush) => { + const recorder = new utils.MessageRecordingConnection() + const protocol = new BoltProtocolV5x8(recorder, null, false) + utils.spyProtocolWrite(protocol) + + const observer = protocol.logoff({ flush }) + + protocol.verifyMessageCount(1) + + expect(protocol.messages[0]).toBeMessage( + RequestMessage.logoff() + ) + + expect(protocol.observers).toEqual([observer]) + expect(protocol.flushes).toEqual([flush]) + }) + + it('should begin a transaction', () => { + const bookmarks = new Bookmarks([ + 'neo4j:bookmark:v1:tx1', + 'neo4j:bookmark:v1:tx2' + ]) + const txConfig = new TxConfig({ + timeout: 5000, + metadata: { x: 1, y: 'something' } + }) + const recorder = new utils.MessageRecordingConnection() + const protocol = new BoltProtocolV5x8(recorder, null, false) + utils.spyProtocolWrite(protocol) + + const observer = protocol.beginTransaction({ + bookmarks, + txConfig, + mode: WRITE + }) + + protocol.verifyMessageCount(1) + expect(protocol.messages[0]).toBeMessage( + RequestMessage.begin({ bookmarks, txConfig, mode: WRITE }) + ) + expect(protocol.observers).toEqual([observer]) + expect(protocol.flushes).toEqual([true]) + }) + + it('should commit', () => { + const recorder = new utils.MessageRecordingConnection() + const protocol = new BoltProtocolV5x8(recorder, null, false) + utils.spyProtocolWrite(protocol) + + const observer = protocol.commitTransaction() + + protocol.verifyMessageCount(1) + expect(protocol.messages[0]).toBeMessage(RequestMessage.commit()) + expect(protocol.observers).toEqual([observer]) + expect(protocol.flushes).toEqual([true]) + }) + + it('should rollback', () => { + const recorder = new utils.MessageRecordingConnection() + const protocol = new BoltProtocolV5x8(recorder, null, false) + utils.spyProtocolWrite(protocol) + + const observer = protocol.rollbackTransaction() + + protocol.verifyMessageCount(1) + expect(protocol.messages[0]).toBeMessage(RequestMessage.rollback()) + expect(protocol.observers).toEqual([observer]) + expect(protocol.flushes).toEqual([true]) + }) + + it('should support logoff', () => { + const recorder = new utils.MessageRecordingConnection() + const protocol = new BoltProtocolV5x8(recorder, null, false) + + expect(protocol.supportsReAuth).toBe(true) + }) + + describe('unpacker configuration', () => { + test.each([ + [false, false], + [false, true], + [true, false], + [true, true] + ])( + 'should create unpacker with disableLosslessIntegers=%p and useBigInt=%p', + (disableLosslessIntegers, useBigInt) => { + const protocol = new BoltProtocolV5x8(null, null, { + disableLosslessIntegers, + useBigInt + }) + expect(protocol._unpacker._disableLosslessIntegers).toBe( + disableLosslessIntegers + ) + expect(protocol._unpacker._useBigInt).toBe(useBigInt) + } + ) + }) + + describe('notificationFilter', () => { + notificationFilterBehaviour.shouldSupportGqlNotificationFilterOnInitialize(newProtocol) + notificationFilterBehaviour.shouldSupportGqlNotificationFilterOnBeginTransaction(newProtocol) + notificationFilterBehaviour.shouldSupportGqlNotificationFilterOnRun(newProtocol) + }) + + describe('watermarks', () => { + it('.run() should configure watermarks', () => { + const recorder = new utils.MessageRecordingConnection() + const protocol = utils.spyProtocolWrite( + new BoltProtocolV5x8(recorder, null, false) + ) + + const query = 'RETURN $x, $y' + const parameters = { x: 'x', y: 'y' } + const observer = protocol.run(query, parameters, { + bookmarks: Bookmarks.empty(), + txConfig: TxConfig.empty(), + lowRecordWatermark: 100, + highRecordWatermark: 200 + }) + + expect(observer._lowRecordWatermark).toEqual(100) + expect(observer._highRecordWatermark).toEqual(200) + }) + }) + + describe('packstream', () => { + it('should configure v2 packer', () => { + const protocol = new BoltProtocolV5x8(null, null, false) + expect(protocol.packer()).toBeInstanceOf(v2.Packer) + }) + + it('should configure v2 unpacker', () => { + const protocol = new BoltProtocolV5x8(null, null, false) + expect(protocol.unpacker()).toBeInstanceOf(v2.Unpacker) + }) + }) + + describe('.packable()', () => { + it.each([ + ['Node', new Node(1, ['a'], { a: 'b' }, 'c')], + ['Relationship', new Relationship(1, 2, 3, 'a', { b: 'c' }, 'd', 'e', 'f')], + ['UnboundRelationship', new UnboundRelationship(1, 'a', { b: 'c' }, '1')], + ['Path', new Path(new Node(1, [], {}), new Node(2, [], {}), [])] + ])('should resultant function not pack graph types (%s)', (_, graphType) => { + const protocol = new BoltProtocolV5x8( + new utils.MessageRecordingConnection(), + null, + false + ) + + const packable = protocol.packable(graphType) + + expect(packable).toThrowErrorMatchingSnapshot() + }) + + it.each([ + ['Duration', new Duration(1, 1, 1, 1)], + ['LocalTime', new LocalTime(1, 1, 1, 1)], + ['Time', new Time(1, 1, 1, 1, 1)], + ['Date', new Date(1, 1, 1)], + ['LocalDateTime', new LocalDateTime(1, 1, 1, 1, 1, 1, 1)], + [ + 'DateTimeWithZoneOffset', + new DateTime(2022, 6, 14, 15, 21, 18, 183_000_000, 120 * 60) + ], + [ + 'DateTimeWithZoneOffset / 1978', + new DateTime(1978, 12, 16, 10, 5, 59, 128000987, -150 * 60) + ], + [ + 'DateTimeWithZoneId / Berlin 2:30 CET', + new DateTime(2022, 10, 30, 2, 30, 0, 183_000_000, 2 * 60 * 60, 'Europe/Berlin') + ], + [ + 'DateTimeWithZoneId / Berlin 2:30 CEST', + new DateTime(2022, 10, 30, 2, 30, 0, 183_000_000, 1 * 60 * 60, 'Europe/Berlin') + ], + ['Point2D', new Point(1, 1, 1)], + ['Point3D', new Point(1, 1, 1, 1)] + ])('should pack spatial types and temporal types (%s)', (_, object) => { + const buffer = alloc(256) + const protocol = new BoltProtocolV5x8( + new utils.MessageRecordingConnection(), + buffer, + { + disableLosslessIntegers: true + } + ) + + const packable = protocol.packable(object) + + expect(packable).not.toThrow() + + buffer.reset() + + const unpacked = protocol.unpack(buffer) + + expect(unpacked).toEqual(object) + }) + + it.each([ + [ + 'DateTimeWithZoneId / Australia', + new DateTime(2022, 6, 15, 15, 21, 18, 183_000_000, undefined, 'Australia/Eucla') + ], + [ + 'DateTimeWithZoneId', + new DateTime(2022, 6, 22, 15, 21, 18, 183_000_000, undefined, 'Europe/Berlin') + ], + [ + 'DateTimeWithZoneId / Europe just before turn CEST', + new DateTime(2022, 3, 27, 1, 59, 59, 183_000_000, undefined, 'Europe/Berlin') + ], + [ + 'DateTimeWithZoneId / Europe just 1 before turn CEST', + new DateTime(2022, 3, 27, 0, 59, 59, 183_000_000, undefined, 'Europe/Berlin') + ], + [ + 'DateTimeWithZoneId / Europe just after turn CEST', + new DateTime(2022, 3, 27, 3, 0, 0, 183_000_000, undefined, 'Europe/Berlin') + ], + [ + 'DateTimeWithZoneId / Europe just 1 after turn CEST', + new DateTime(2022, 3, 27, 4, 0, 0, 183_000_000, undefined, 'Europe/Berlin') + ], + [ + 'DateTimeWithZoneId / Europe just before turn CET', + new DateTime(2022, 10, 30, 2, 59, 59, 183_000_000, undefined, 'Europe/Berlin') + ], + [ + 'DateTimeWithZoneId / Europe just 1 before turn CET', + new DateTime(2022, 10, 30, 1, 59, 59, 183_000_000, undefined, 'Europe/Berlin') + ], + [ + 'DateTimeWithZoneId / Europe just after turn CET', + new DateTime(2022, 10, 30, 3, 0, 0, 183_000_000, undefined, 'Europe/Berlin') + ], + [ + 'DateTimeWithZoneId / Europe just 1 after turn CET', + new DateTime(2022, 10, 30, 4, 0, 0, 183_000_000, undefined, 'Europe/Berlin') + ], + [ + 'DateTimeWithZoneId / Sao Paulo just before turn summer time', + new DateTime(2018, 11, 4, 11, 59, 59, 183_000_000, undefined, 'America/Sao_Paulo') + ], + [ + 'DateTimeWithZoneId / Sao Paulo just 1 before turn summer time', + new DateTime(2018, 11, 4, 10, 59, 59, 183_000_000, undefined, 'America/Sao_Paulo') + ], + [ + 'DateTimeWithZoneId / Sao Paulo just after turn summer time', + new DateTime(2018, 11, 5, 1, 0, 0, 183_000_000, undefined, 'America/Sao_Paulo') + ], + [ + 'DateTimeWithZoneId / Sao Paulo just 1 after turn summer time', + new DateTime(2018, 11, 5, 2, 0, 0, 183_000_000, undefined, 'America/Sao_Paulo') + ], + [ + 'DateTimeWithZoneId / Sao Paulo just before turn winter time', + new DateTime(2019, 2, 17, 11, 59, 59, 183_000_000, undefined, 'America/Sao_Paulo') + ], + [ + 'DateTimeWithZoneId / Sao Paulo just 1 before turn winter time', + new DateTime(2019, 2, 17, 10, 59, 59, 183_000_000, undefined, 'America/Sao_Paulo') + ], + [ + 'DateTimeWithZoneId / Sao Paulo just after turn winter time', + new DateTime(2019, 2, 18, 0, 0, 0, 183_000_000, undefined, 'America/Sao_Paulo') + ], + [ + 'DateTimeWithZoneId / Sao Paulo just 1 after turn winter time', + new DateTime(2019, 2, 18, 1, 0, 0, 183_000_000, undefined, 'America/Sao_Paulo') + ], + [ + 'DateTimeWithZoneId / Istanbul', + new DateTime(1978, 12, 16, 12, 35, 59, 128000987, undefined, 'Europe/Istanbul') + ], + [ + 'DateTimeWithZoneId / Istanbul', + new DateTime(2020, 6, 15, 4, 30, 0, 183_000_000, undefined, 'Pacific/Honolulu') + ], + [ + 'DateWithWithZoneId / Berlin before common era', + new DateTime(-2020, 6, 15, 4, 30, 0, 183_000_000, undefined, 'Europe/Berlin') + ], + [ + 'DateWithWithZoneId / Max Date', + new DateTime(99_999, 12, 31, 23, 59, 59, 999_999_999, undefined, 'Pacific/Kiritimati') + ], + [ + 'DateWithWithZoneId / Min Date', + new DateTime(-99_999, 12, 31, 23, 59, 59, 999_999_999, undefined, 'Pacific/Samoa') + ], + [ + 'DateWithWithZoneId / Ambiguous date between 00 and 99', + new DateTime(50, 12, 31, 23, 59, 59, 999_999_999, undefined, 'Pacific/Samoa') + ] + ])('should pack and unpack DateTimeWithZoneId and without offset (%s)', (_, object) => { + const buffer = alloc(256) + const loggerFunction = jest.fn() + const protocol = new BoltProtocolV5x8( + new utils.MessageRecordingConnection(), + buffer, + { + disableLosslessIntegers: true + }, + undefined, + new Logger('debug', loggerFunction) + ) + + const packable = protocol.packable(object) + + expect(packable).not.toThrow() + expect(loggerFunction) + .toBeCalledWith('warn', + 'DateTime objects without "timeZoneOffsetSeconds" property ' + + 'are prune to bugs related to ambiguous times. For instance, ' + + '2022-10-30T2:30:00[Europe/Berlin] could be GMT+1 or GMT+2.') + + buffer.reset() + + const unpacked = protocol.unpack(buffer) + + expect(unpacked.timeZoneOffsetSeconds).toBeDefined() + + const unpackedDateTimeWithoutOffset = new DateTime( + unpacked.year, + unpacked.month, + unpacked.day, + unpacked.hour, + unpacked.minute, + unpacked.second, + unpacked.nanosecond, + undefined, + unpacked.timeZoneId + ) + + expect(unpackedDateTimeWithoutOffset).toEqual(object) + }) + + it('should pack and unpack DateTimeWithOffset', () => { + fc.assert( + fc.property( + fc.date({ + min: temporalUtil.newDate(utils.MIN_UTC_IN_MS + utils.ONE_DAY_IN_MS), + max: temporalUtil.newDate(utils.MAX_UTC_IN_MS - utils.ONE_DAY_IN_MS) + }), + fc.integer({ min: 0, max: 999_999 }), + utils.arbitraryTimeZoneId(), + (date, nanoseconds, timeZoneId) => { + const object = new DateTime( + date.getUTCFullYear(), + date.getUTCMonth() + 1, + date.getUTCDate(), + date.getUTCHours(), + date.getUTCMinutes(), + date.getUTCSeconds(), + date.getUTCMilliseconds() * 1_000_000 + nanoseconds, + undefined, + timeZoneId + ) + const buffer = alloc(256) + const loggerFunction = jest.fn() + const protocol = new BoltProtocolV5x8( + new utils.MessageRecordingConnection(), + buffer, + { + disableLosslessIntegers: true + }, + undefined, + new Logger('debug', loggerFunction) + ) + + const packable = protocol.packable(object) + + expect(packable).not.toThrow() + expect(loggerFunction) + .toBeCalledWith('warn', + 'DateTime objects without "timeZoneOffsetSeconds" property ' + + 'are prune to bugs related to ambiguous times. For instance, ' + + '2022-10-30T2:30:00[Europe/Berlin] could be GMT+1 or GMT+2.') + + buffer.reset() + + const unpacked = protocol.unpack(buffer) + + expect(unpacked.timeZoneOffsetSeconds).toBeDefined() + + const unpackedDateTimeWithoutOffset = new DateTime( + unpacked.year, + unpacked.month, + unpacked.day, + unpacked.hour, + unpacked.minute, + unpacked.second, + unpacked.nanosecond, + undefined, + unpacked.timeZoneId + ) + + expect(unpackedDateTimeWithoutOffset).toEqual(object) + }) + ) + }) + + it('should pack and unpack DateTimeWithZoneIdAndNoOffset', () => { + fc.assert( + fc.property(fc.date(), date => { + const object = DateTime.fromStandardDate(date) + const buffer = alloc(256) + const loggerFunction = jest.fn() + const protocol = new BoltProtocolV5x8( + new utils.MessageRecordingConnection(), + buffer, + { + disableLosslessIntegers: true + }, + undefined, + new Logger('debug', loggerFunction) + ) + + const packable = protocol.packable(object) + + expect(packable).not.toThrow() + + buffer.reset() + + const unpacked = protocol.unpack(buffer) + + expect(unpacked.timeZoneOffsetSeconds).toBeDefined() + + expect(unpacked).toEqual(object) + }) + ) + }) + }) + + describe('.unpack()', () => { + it.each([ + [ + 'Node', + new structure.Structure(0x4e, [1, ['a'], { c: 'd' }, 'elementId']), + new Node(1, ['a'], { c: 'd' }, 'elementId') + ], + [ + 'Relationship', + new structure.Structure(0x52, [1, 2, 3, '4', { 5: 6 }, 'elementId', 'node1', 'node2']), + new Relationship(1, 2, 3, '4', { 5: 6 }, 'elementId', 'node1', 'node2') + ], + [ + 'UnboundRelationship', + new structure.Structure(0x72, [1, '2', { 3: 4 }, 'elementId']), + new UnboundRelationship(1, '2', { 3: 4 }, 'elementId') + ], + [ + 'Path', + new structure.Structure( + 0x50, + [ + [ + new structure.Structure(0x4e, [1, ['2'], { 3: '4' }, 'node1']), + new structure.Structure(0x4e, [4, ['5'], { 6: 7 }, 'node2']), + new structure.Structure(0x4e, [2, ['3'], { 4: '5' }, 'node3']) + ], + [ + new structure.Structure(0x52, [3, 1, 4, 'reltype1', { 4: '5' }, 'rel1', 'node1', 'node2']), + new structure.Structure(0x52, [5, 4, 2, 'reltype2', { 6: 7 }, 'rel2', 'node2', 'node3']) + ], + [1, 1, 2, 2] + ] + ), + new Path( + new Node(1, ['2'], { 3: '4' }, 'node1'), + new Node(2, ['3'], { 4: '5' }, 'node3'), + [ + new PathSegment( + new Node(1, ['2'], { 3: '4' }, 'node1'), + new Relationship(3, 1, 4, 'reltype1', { 4: '5' }, 'rel1', 'node1', 'node2'), + new Node(4, ['5'], { 6: 7 }, 'node2') + ), + new PathSegment( + new Node(4, ['5'], { 6: 7 }, 'node2'), + new Relationship(5, 4, 2, 'reltype2', { 6: 7 }, 'rel2', 'node2', 'node3'), + new Node(2, ['3'], { 4: '5' }, 'node3') + ) + ] + ) + ] + ])('should unpack graph types (%s)', (_, struct, graphObject) => { + const buffer = alloc(256) + const protocol = new BoltProtocolV5x8( + new utils.MessageRecordingConnection(), + buffer, + false + ) + + const packable = protocol.packable(struct) + + expect(packable).not.toThrow() + + buffer.reset() + + const unpacked = protocol.unpack(buffer) + expect(unpacked).toEqual(graphObject) + }) + + it.each([ + [ + 'Node with less fields', + new structure.Structure(0x4e, [1, ['a'], { c: 'd' }]) + ], + [ + 'Node with more fields', + new structure.Structure(0x4e, [1, ['a'], { c: 'd' }, '1', 'b']) + ], + [ + 'Relationship with less fields', + new structure.Structure(0x52, [1, 2, 3, '4', { 5: 6 }]) + ], + [ + 'Relationship with more fields', + new structure.Structure(0x52, [1, 2, 3, '4', { 5: 6 }, '1', '2', '3', '4']) + ], + [ + 'UnboundRelationship with less fields', + new structure.Structure(0x72, [1, '2', { 3: 4 }]) + ], + [ + 'UnboundRelationship with more fields', + new structure.Structure(0x72, [1, '2', { 3: 4 }, '1', '2']) + ], + [ + 'Path with less fields', + new structure.Structure( + 0x50, + [ + [ + new structure.Structure(0x4e, [1, ['2'], { 3: '4' }]), + new structure.Structure(0x4e, [4, ['5'], { 6: 7 }]), + new structure.Structure(0x4e, [2, ['3'], { 4: '5' }]) + ], + [ + new structure.Structure(0x52, [3, 1, 4, 'rel1', { 4: '5' }]), + new structure.Structure(0x52, [5, 4, 2, 'rel2', { 6: 7 }]) + ] + ] + ) + ], + [ + 'Path with more fields', + new structure.Structure( + 0x50, + [ + [ + new structure.Structure(0x4e, [1, ['2'], { 3: '4' }]), + new structure.Structure(0x4e, [4, ['5'], { 6: 7 }]), + new structure.Structure(0x4e, [2, ['3'], { 4: '5' }]) + ], + [ + new structure.Structure(0x52, [3, 1, 4, 'rel1', { 4: '5' }]), + new structure.Structure(0x52, [5, 4, 2, 'rel2', { 6: 7 }]) + ], + [1, 1, 2, 2], + 'a' + ] + ) + ], + [ + 'Point with less fields', + new structure.Structure(0x58, [1, 2]) + ], + [ + 'Point with more fields', + new structure.Structure(0x58, [1, 2, 3, 4]) + ], + [ + 'Point3D with less fields', + new structure.Structure(0x59, [1, 2, 3]) + ], + + [ + 'Point3D with more fields', + new structure.Structure(0x59, [1, 2, 3, 4, 6]) + ], + [ + 'Duration with less fields', + new structure.Structure(0x45, [1, 2, 3]) + ], + [ + 'Duration with more fields', + new structure.Structure(0x45, [1, 2, 3, 4, 5]) + ], + [ + 'LocalTime with less fields', + new structure.Structure(0x74, []) + ], + [ + 'LocalTime with more fields', + new structure.Structure(0x74, [1, 2]) + ], + [ + 'Time with less fields', + new structure.Structure(0x54, [1]) + ], + [ + 'Time with more fileds', + new structure.Structure(0x54, [1, 2, 3]) + ], + [ + 'Date with less fields', + new structure.Structure(0x44, []) + ], + [ + 'Date with more fields', + new structure.Structure(0x44, [1, 2]) + ], + [ + 'LocalDateTime with less fields', + new structure.Structure(0x64, [1]) + ], + [ + 'LocalDateTime with more fields', + new structure.Structure(0x64, [1, 2, 3]) + ], + [ + 'DateTimeWithZoneOffset with less fields', + new structure.Structure(0x49, [1, 2]) + ], + [ + 'DateTimeWithZoneOffset with more fields', + new structure.Structure(0x49, [1, 2, 3, 4]) + ], + [ + 'DateTimeWithZoneId with less fields', + new structure.Structure(0x69, [1, 2]) + ], + [ + 'DateTimeWithZoneId with more fields', + new structure.Structure(0x69, [1, 2, 'America/Sao Paulo', 'Brasil']) + ] + ])('should not unpack with wrong size (%s)', (_, struct) => { + const buffer = alloc(256) + const protocol = new BoltProtocolV5x8( + new utils.MessageRecordingConnection(), + buffer, + false + ) + + const packable = protocol.packable(struct) + + expect(packable).not.toThrow() + + buffer.reset() + + const unpacked = protocol.unpack(buffer) + expect(() => unpacked instanceof structure.Structure).toThrowErrorMatchingSnapshot() + }) + + it.each([ + [ + 'Point', + new structure.Structure(0x58, [1, 2, 3]), + new Point(1, 2, 3) + ], + [ + 'Point3D', + new structure.Structure(0x59, [1, 2, 3, 4]), + new Point(1, 2, 3, 4) + ], + [ + 'Duration', + new structure.Structure(0x45, [1, 2, 3, 4]), + new Duration(1, 2, 3, 4) + ], + [ + 'LocalTime', + new structure.Structure(0x74, [1]), + new LocalTime(0, 0, 0, 1) + ], + [ + 'Time', + new structure.Structure(0x54, [1, 2]), + new Time(0, 0, 0, 1, 2) + ], + [ + 'Date', + new structure.Structure(0x44, [1]), + new Date(1970, 1, 2) + ], + [ + 'LocalDateTime', + new structure.Structure(0x64, [1, 2]), + new LocalDateTime(1970, 1, 1, 0, 0, 1, 2) + ], + [ + 'DateTimeWithZoneOffset', + new structure.Structure(0x49, [ + 1655212878, 183_000_000, 120 * 60 + ]), + new DateTime(2022, 6, 14, 15, 21, 18, 183_000_000, 120 * 60) + ], + [ + 'DateTimeWithZoneOffset / 1978', + new structure.Structure(0x49, [ + 282659759, 128000987, -150 * 60 + ]), + new DateTime(1978, 12, 16, 10, 5, 59, 128000987, -150 * 60) + ], + [ + 'DateTimeWithZoneId', + new structure.Structure(0x69, [ + 1655212878, 183_000_000, 'Europe/Berlin' + ]), + new DateTime(2022, 6, 14, 15, 21, 18, 183_000_000, 2 * 60 * 60, 'Europe/Berlin') + ], + [ + 'DateTimeWithZoneId / Australia', + new structure.Structure(0x69, [ + 1655212878, 183_000_000, 'Australia/Eucla' + ]), + new DateTime(2022, 6, 14, 22, 6, 18, 183_000_000, 8 * 60 * 60 + 45 * 60, 'Australia/Eucla') + ], + [ + 'DateTimeWithZoneId / Honolulu', + new structure.Structure(0x69, [ + 1592231400, 183_000_000, 'Pacific/Honolulu' + ]), + new DateTime(2020, 6, 15, 4, 30, 0, 183_000_000, -10 * 60 * 60, 'Pacific/Honolulu') + ], + [ + 'DateTimeWithZoneId / Midnight', + new structure.Structure(0x69, [ + 1685397950, 183_000_000, 'Europe/Berlin' + ]), + new DateTime(2023, 5, 30, 0, 5, 50, 183_000_000, 2 * 60 * 60, 'Europe/Berlin') + ] + ])('should unpack spatial types and temporal types (%s)', (_, struct, object) => { + const buffer = alloc(256) + const protocol = new BoltProtocolV5x8( + new utils.MessageRecordingConnection(), + buffer, + { + disableLosslessIntegers: true + } + ) + + const packable = protocol.packable(struct) + + expect(packable).not.toThrow() + + buffer.reset() + + const unpacked = protocol.unpack(buffer) + expect(unpacked).toEqual(object) + }) + + it.each([ + [ + 'DateTimeWithZoneOffset/0x46', + new structure.Structure(0x46, [1, 2, 3]) + ], + [ + 'DateTimeWithZoneId/0x66', + new structure.Structure(0x66, [1, 2, 'America/Sao_Paulo']) + ] + ])('should unpack deprecated temporal types as unknown structs (%s)', (_, struct) => { + const buffer = alloc(256) + const protocol = new BoltProtocolV5x8( + new utils.MessageRecordingConnection(), + buffer, + { + disableLosslessIntegers: true + } + ) + + const packable = protocol.packable(struct) + + expect(packable).not.toThrow() + + buffer.reset() + + const unpacked = protocol.unpack(buffer) + expect(unpacked).toEqual(struct) + }) + }) + + describe('result metadata enrichment', () => { + it('run should configure BoltProtocolV5x8._enrichMetadata as enrichMetadata', () => { + const database = 'testdb' + const bookmarks = new Bookmarks([ + 'neo4j:bookmark:v1:tx1', + 'neo4j:bookmark:v1:tx2' + ]) + const txConfig = new TxConfig({ + timeout: 5000, + metadata: { x: 1, y: 'something' } + }) + const recorder = new utils.MessageRecordingConnection() + const protocol = new BoltProtocolV5x8(recorder, null, false) + utils.spyProtocolWrite(protocol) + + const query = 'RETURN $x, $y' + const parameters = { x: 'x', y: 'y' } + + const observer = protocol.run(query, parameters, { + bookmarks, + txConfig, + database, + mode: WRITE + }) + + expect(observer._enrichMetadata).toBe(protocol._enrichMetadata) + }) + + describe('BoltProtocolV5x8._enrichMetadata', () => { + const protocol = newProtocol() + + it('should handle empty metadata', () => { + const metadata = protocol._enrichMetadata({}) + + expect(metadata).toEqual({}) + }) + + it('should handle metadata with random objects', () => { + const metadata = protocol._enrichMetadata({ + a: 1133, + b: 345 + }) + + expect(metadata).toEqual({ + a: 1133, + b: 345 + }) + }) + + it('should handle metadata not change notifications ', () => { + const metadata = protocol._enrichMetadata({ + a: 1133, + b: 345, + notifications: [ + { + severity: 'WARNING', + category: 'HINT' + } + ] + }) + + expect(metadata).toEqual({ + a: 1133, + b: 345, + notifications: [ + { + severity: 'WARNING', + category: 'HINT' + } + ] + }) + }) + + it.each([ + [null, null], + [undefined, undefined], + [[], []], + [statusesWithDiagnosticRecord(null, null), statusesWithDiagnosticRecord(null, null)], + [statusesWithDiagnosticRecord(undefined, undefined), statusesWithDiagnosticRecord({ + OPERATION: '', + OPERATION_CODE: '0', + CURRENT_SCHEMA: '/' + }, + { + OPERATION: '', + OPERATION_CODE: '0', + CURRENT_SCHEMA: '/' + })], + [ + statusesWithDiagnosticRecord({ + OPERATION: 'A' + }), + statusesWithDiagnosticRecord({ + OPERATION: 'A', + OPERATION_CODE: '0', + CURRENT_SCHEMA: '/' + }) + ], + [ + statusesWithDiagnosticRecord({ + OPERATION: 'A', + OPERATION_CODE: 'B' + }), + statusesWithDiagnosticRecord({ + OPERATION: 'A', + OPERATION_CODE: 'B', + CURRENT_SCHEMA: '/' + }) + ], + [ + statusesWithDiagnosticRecord({ + OPERATION: 'A', + OPERATION_CODE: 'B', + CURRENT_SCHEMA: '/C' + }), + statusesWithDiagnosticRecord({ + OPERATION: 'A', + OPERATION_CODE: 'B', + CURRENT_SCHEMA: '/C' + }) + ], + [ + statusesWithDiagnosticRecord({ + OPERATION: 'A', + OPERATION_CODE: 'B', + CURRENT_SCHEMA: '/C', + _status_parameters: { d: 'E' } + }), + statusesWithDiagnosticRecord({ + OPERATION: 'A', + OPERATION_CODE: 'B', + CURRENT_SCHEMA: '/C', + _status_parameters: { d: 'E' } + }) + ], + [ + statusesWithDiagnosticRecord({ + OPERATION: 'A', + OPERATION_CODE: 'B', + CURRENT_SCHEMA: '/C', + _status_parameters: { d: 'E' }, + _severity: 'F' + }), + statusesWithDiagnosticRecord({ + OPERATION: 'A', + OPERATION_CODE: 'B', + CURRENT_SCHEMA: '/C', + _status_parameters: { d: 'E' }, + _severity: 'F' + }) + ], + [ + statusesWithDiagnosticRecord({ + OPERATION: 'A', + OPERATION_CODE: 'B', + CURRENT_SCHEMA: '/C', + _status_parameters: { d: 'E' }, + _severity: 'F', + _classification: 'G' + }), + statusesWithDiagnosticRecord({ + OPERATION: 'A', + OPERATION_CODE: 'B', + CURRENT_SCHEMA: '/C', + _status_parameters: { d: 'E' }, + _severity: 'F', + _classification: 'G' + }) + ], + [ + statusesWithDiagnosticRecord({ + OPERATION: 'A', + OPERATION_CODE: 'B', + CURRENT_SCHEMA: '/C', + _status_parameters: { d: 'E' }, + _severity: 'F', + _classification: 'G', + _position: { + offset: 1, + line: 2, + column: 3 + } + }), + statusesWithDiagnosticRecord({ + OPERATION: 'A', + OPERATION_CODE: 'B', + CURRENT_SCHEMA: '/C', + _status_parameters: { d: 'E' }, + _severity: 'F', + _classification: 'G', + _position: { + offset: 1, + line: 2, + column: 3 + } + }) + ], + [ + statusesWithDiagnosticRecord({ + OPERATION: null + }), + statusesWithDiagnosticRecord({ + OPERATION: null, + OPERATION_CODE: '0', + CURRENT_SCHEMA: '/' + }) + ], + [ + statusesWithDiagnosticRecord({ + OPERATION: null, + OPERATION_CODE: null + }), + statusesWithDiagnosticRecord({ + OPERATION: null, + OPERATION_CODE: null, + CURRENT_SCHEMA: '/' + }) + ], + [ + statusesWithDiagnosticRecord({ + OPERATION: null, + OPERATION_CODE: null, + CURRENT_SCHEMA: null + }), + statusesWithDiagnosticRecord({ + OPERATION: null, + OPERATION_CODE: null, + CURRENT_SCHEMA: null + }) + ], + [ + statusesWithDiagnosticRecord({ + OPERATION: null, + OPERATION_CODE: null, + CURRENT_SCHEMA: null, + _status_parameters: null + }), + statusesWithDiagnosticRecord({ + OPERATION: null, + OPERATION_CODE: null, + CURRENT_SCHEMA: null, + _status_parameters: null + }) + ], + [ + statusesWithDiagnosticRecord({ + OPERATION: null, + OPERATION_CODE: null, + CURRENT_SCHEMA: null, + _status_parameters: null, + _severity: null + }), + statusesWithDiagnosticRecord({ + OPERATION: null, + OPERATION_CODE: null, + CURRENT_SCHEMA: null, + _status_parameters: null, + _severity: null + }) + ], + [ + statusesWithDiagnosticRecord({ + OPERATION: null, + OPERATION_CODE: null, + CURRENT_SCHEMA: null, + _status_parameters: null, + _severity: null, + _classification: null + }), + statusesWithDiagnosticRecord({ + OPERATION: null, + OPERATION_CODE: null, + CURRENT_SCHEMA: null, + _status_parameters: null, + _severity: null, + _classification: null + }) + ], + [ + statusesWithDiagnosticRecord({ + OPERATION: null, + OPERATION_CODE: null, + CURRENT_SCHEMA: null, + _status_parameters: null, + _severity: null, + _classification: null, + _position: null + }), + statusesWithDiagnosticRecord({ + OPERATION: null, + OPERATION_CODE: null, + CURRENT_SCHEMA: null, + _status_parameters: null, + _severity: null, + _classification: null, + _position: null + }) + ], + [ + statusesWithDiagnosticRecord({ + OPERATION: undefined, + OPERATION_CODE: undefined, + CURRENT_SCHEMA: undefined, + _status_parameters: undefined, + _severity: undefined, + _classification: undefined, + _position: undefined + }), + statusesWithDiagnosticRecord({ + OPERATION: undefined, + OPERATION_CODE: undefined, + CURRENT_SCHEMA: undefined, + _status_parameters: undefined, + _severity: undefined, + _classification: undefined, + _position: undefined + }) + ], + [ + [{ + gql_status: '03N33', + status_description: 'info: description', + neo4j_code: 'Neo.Info.My.Code', + title: 'Mitt title', + diagnostic_record: { + _classification: 'SOME', + _severity: 'INFORMATION' + } + }], + [{ + gql_status: '03N33', + status_description: 'info: description', + neo4j_code: 'Neo.Info.My.Code', + title: 'Mitt title', + diagnostic_record: { + OPERATION: '', + OPERATION_CODE: '0', + CURRENT_SCHEMA: '/', + _classification: 'SOME', + _severity: 'INFORMATION' + } + }] + ], + [ + [{ + gql_status: '03N33', + status_description: 'info: description', + neo4j_code: 'Neo.Info.My.Code', + description: 'description', + title: 'Mitt title', + diagnostic_record: { + _classification: 'SOME', + _severity: 'INFORMATION' + } + }], + [{ + gql_status: '03N33', + status_description: 'info: description', + neo4j_code: 'Neo.Info.My.Code', + title: 'Mitt title', + description: 'description', + diagnostic_record: { + OPERATION: '', + OPERATION_CODE: '0', + CURRENT_SCHEMA: '/', + _classification: 'SOME', + _severity: 'INFORMATION' + } + }] + ], + [ + [{ + gql_status: '03N33', + status_description: 'info: description', + description: 'description' + }], + [{ + gql_status: '03N33', + status_description: 'info: description', + description: 'description', + diagnostic_record: { + OPERATION: '', + OPERATION_CODE: '0', + CURRENT_SCHEMA: '/' + } + }] + ] + ])('should handle statuses (%o) ', (statuses, expectedStatuses) => { + const metadata = protocol._enrichMetadata({ + a: 1133, + b: 345, + statuses + }) + + expect(metadata).toEqual({ + a: 1133, + b: 345, + statuses: expectedStatuses + }) + }) + }) + + function statusesWithDiagnosticRecord (...diagnosticRecords) { + return diagnosticRecords.map(diagnosticRecord => { + return { + gql_status: '00000', + status_description: 'note: successful completion', + diagnostic_record: diagnosticRecord + } + }) + } + }) + + function newProtocol (recorder) { + return new BoltProtocolV5x8(recorder, null, false, undefined, undefined, () => {}) + } +}) diff --git a/packages/bolt-connection/test/bolt/index.test.js b/packages/bolt-connection/test/bolt/index.test.js index 1a9117556..7e3c1303b 100644 --- a/packages/bolt-connection/test/bolt/index.test.js +++ b/packages/bolt-connection/test/bolt/index.test.js @@ -48,7 +48,7 @@ describe('#unit Bolt', () => { const writtenBuffer = channel.written[0] const boltMagicPreamble = '60 60 b0 17' - const protocolVersion5x7to5x0 = '00 07 07 05' + const protocolVersion5x7to5x0 = '00 08 08 05' const protocolVersion4x4to4x2 = '00 02 04 04' const protocolVersion4x1 = '00 00 01 04' const protocolVersion3 = '00 00 00 03' diff --git a/packages/core/src/connection-provider.ts b/packages/core/src/connection-provider.ts index 0cfacc1d4..693c0354e 100644 --- a/packages/core/src/connection-provider.ts +++ b/packages/core/src/connection-provider.ts @@ -56,7 +56,9 @@ class ConnectionProvider { * @property {string} param.database - the target database for the to-be-acquired connection * @property {Bookmarks} param.bookmarks - the bookmarks to send to routing discovery * @property {string} param.impersonatedUser - the impersonated user - * @property {function (databaseName:string?)} param.onDatabaseNameResolved - Callback called when the database name get resolved + * @property {function (databaseName:string)} param.onDatabaseNameResolved - Callback called when the database name get resolved + * @property {AuthToken} param.auth - auth token used to authorize for connection acquisition + * @property {string} param.homeDb - the driver's best guess at the current home database for the user * @returns {Promise} */ acquireConnection (param?: { @@ -64,8 +66,9 @@ class ConnectionProvider { database?: string bookmarks: bookmarks.Bookmarks impersonatedUser?: string - onDatabaseNameResolved?: (databaseName?: string) => void + onDatabaseNameResolved?: (database: string) => void auth?: AuthToken + homeDb?: string }): Promise { throw Error('Not implemented') } @@ -110,6 +113,10 @@ class ConnectionProvider { throw Error('Not implemented') } + SSREnabled (): boolean { + return false + } + /** * This method verifies the connectivity of the database by trying to acquire a connection * for each server available in the cluster. diff --git a/packages/core/src/connection.ts b/packages/core/src/connection.ts index ed2e72cfa..f24d035dd 100644 --- a/packages/core/src/connection.ts +++ b/packages/core/src/connection.ts @@ -58,6 +58,7 @@ interface RunQueryConfig extends BeginTransactionConfig { highRecordWatermark: number lowRecordWatermark: number reactive: boolean + onDb?: (database: string) => void } /** diff --git a/packages/core/src/driver.ts b/packages/core/src/driver.ts index 9a7192425..257cc9c4b 100644 --- a/packages/core/src/driver.ts +++ b/packages/core/src/driver.ts @@ -46,6 +46,8 @@ import resultTransformers, { ResultTransformer } from './result-transformers' import QueryExecutor from './internal/query-executor' import { newError } from './error' import NotificationFilter from './notification-filter' +import HomeDatabaseCache from './internal/homedb-cache' +import { cacheKey } from './internal/auth-util' const DEFAULT_MAX_CONNECTION_LIFETIME: number = 60 * 60 * 1000 // 1 hour @@ -55,6 +57,11 @@ const DEFAULT_MAX_CONNECTION_LIFETIME: number = 60 * 60 * 1000 // 1 hour */ const DEFAULT_FETCH_SIZE: number = 1000 +/** + * The maximum number of entries allowed in the home database cache before pruning. + */ +const HOMEDB_CACHE_MAX_SIZE: number = 10000 + /** * Constant that represents read session access mode. * Should be used like this: `driver.session({ defaultAccessMode: neo4j.session.READ })`. @@ -97,6 +104,7 @@ type CreateSession = (args: { notificationFilter?: NotificationFilter auth?: AuthToken log: Logger + homeDatabaseCallback?: (user: string, database: any) => void }) => Session type CreateQueryExecutor = (createSession: (config: { database?: string, bookmarkManager?: BookmarkManager }) => Session) => QueryExecutor @@ -470,6 +478,7 @@ class Driver { private readonly _createSession: CreateSession private readonly _defaultExecuteQueryBookmarkManager: BookmarkManager private readonly _queryExecutor: QueryExecutor + private readonly homeDatabaseCache: HomeDatabaseCache /** * You should not be calling this directly, instead use {@link driver}. @@ -509,6 +518,11 @@ class Driver { */ this._connectionProvider = null + /** + * @private + */ + this.homeDatabaseCache = new HomeDatabaseCache(HOMEDB_CACHE_MAX_SIZE) + this._afterConstruction() } @@ -837,6 +851,10 @@ class Driver { ) } + _homeDatabaseCallback (cacheKey: string, database: any): void { + this.homeDatabaseCache.set(cacheKey, database) + } + /** * @private */ @@ -863,6 +881,9 @@ class Driver { }): Session { const sessionMode = Session._validateSessionMode(defaultAccessMode) const connectionProvider = this._getOrCreateConnectionProvider() + // eslint-disable-next-line + const cachedHomeDatabase = this.homeDatabaseCache.get(cacheKey(auth, impersonatedUser)) + const homeDatabaseCallback = this._homeDatabaseCallback.bind(this) const bookmarks = bookmarkOrBookmarks != null ? new Bookmarks(bookmarkOrBookmarks) : Bookmarks.empty() @@ -872,14 +893,19 @@ class Driver { database: database ?? '', connectionProvider, bookmarks, - config: this._config, + config: { + cachedHomeDatabase, + routingDriver: this._supportsRouting(), + ...this._config + }, reactive, impersonatedUser, fetchSize, bookmarkManager, notificationFilter, auth, - log: this._log + log: this._log, + homeDatabaseCallback }) } diff --git a/packages/core/src/internal/auth-util.ts b/packages/core/src/internal/auth-util.ts new file mode 100644 index 000000000..3d5e4ed4e --- /dev/null +++ b/packages/core/src/internal/auth-util.ts @@ -0,0 +1,41 @@ +/** + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [https://neo4j.com] + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { stringify } from '../json' +import { AuthToken } from '../types' + +export function cacheKey (auth?: AuthToken, impersonatedUser?: string): string { + if (impersonatedUser != null) { + return 'basic:' + impersonatedUser + } + if (auth === undefined) { + return 'DEFAULT' + } + if (auth.scheme === 'basic') { + return 'basic:' + (auth.principal ?? '') + } + if (auth.scheme === 'kerberos') { + return 'kerberos:' + auth.credentials + } + if (auth.scheme === 'bearer') { + return 'bearer:' + auth.credentials + } + if (auth.scheme === 'none') { + return 'none' + } + return stringify(auth, { sortedElements: true }) +} diff --git a/packages/core/src/internal/connection-holder.ts b/packages/core/src/internal/connection-holder.ts index 2087b875f..1fa58ffe8 100644 --- a/packages/core/src/internal/connection-holder.ts +++ b/packages/core/src/internal/connection-holder.ts @@ -84,7 +84,7 @@ class ConnectionHolder implements ConnectionHolderInterface { private _connectionPromise: Promise private readonly _impersonatedUser?: string private readonly _getConnectionAcquistionBookmarks: () => Promise - private readonly _onDatabaseNameResolved?: (databaseName?: string) => void + private readonly _onDatabaseNameResolved?: (databaseName: string) => void private readonly _auth?: AuthToken private readonly _log: Logger private _closed: boolean @@ -117,7 +117,7 @@ class ConnectionHolder implements ConnectionHolderInterface { bookmarks?: Bookmarks connectionProvider?: ConnectionProvider impersonatedUser?: string - onDatabaseNameResolved?: (databaseName?: string) => void + onDatabaseNameResolved?: (database: string) => void getConnectionAcquistionBookmarks?: () => Promise auth?: AuthToken log: Logger @@ -161,9 +161,9 @@ class ConnectionHolder implements ConnectionHolderInterface { return this._referenceCount } - initializeConnection (): boolean { + initializeConnection (homeDatabase?: string): boolean { if (this._referenceCount === 0 && (this._connectionProvider != null)) { - this._connectionPromise = this._createConnectionPromise(this._connectionProvider) + this._connectionPromise = this._createConnectionPromise(this._connectionProvider, homeDatabase) } else { this._referenceCount++ return false @@ -172,14 +172,15 @@ class ConnectionHolder implements ConnectionHolderInterface { return true } - private async _createConnectionPromise (connectionProvider: ConnectionProvider): Promise { + private async _createConnectionPromise (connectionProvider: ConnectionProvider, homeDatabase?: string): Promise { return await connectionProvider.acquireConnection({ accessMode: this._mode, database: this._database, bookmarks: await this._getBookmarks(), impersonatedUser: this._impersonatedUser, onDatabaseNameResolved: this._onDatabaseNameResolved, - auth: this._auth + auth: this._auth, + homeDb: homeDatabase }) } diff --git a/packages/core/src/internal/constants.ts b/packages/core/src/internal/constants.ts index ed32dcef5..b45034da9 100644 --- a/packages/core/src/internal/constants.ts +++ b/packages/core/src/internal/constants.ts @@ -39,6 +39,7 @@ const BOLT_PROTOCOL_V5_4: number = 5.4 const BOLT_PROTOCOL_V5_5: number = 5.5 const BOLT_PROTOCOL_V5_6: number = 5.6 const BOLT_PROTOCOL_V5_7: number = 5.7 +const BOLT_PROTOCOL_V5_8: number = 5.8 const TELEMETRY_APIS = { MANAGED_TRANSACTION: 0, @@ -74,5 +75,6 @@ export { BOLT_PROTOCOL_V5_5, BOLT_PROTOCOL_V5_6, BOLT_PROTOCOL_V5_7, + BOLT_PROTOCOL_V5_8, TELEMETRY_APIS } diff --git a/packages/core/src/internal/homedb-cache.ts b/packages/core/src/internal/homedb-cache.ts new file mode 100644 index 000000000..7025b1032 --- /dev/null +++ b/packages/core/src/internal/homedb-cache.ts @@ -0,0 +1,87 @@ +/** + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [https://neo4j.com] + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Cache which maps users to their last known home database, along with the last time the entry was accessed. + * + * @private + */ +export default class HomeDatabaseCache { + maxSize: number + map: Map + pruneCount: number + + constructor (maxSize: number) { + this.maxSize = maxSize + this.pruneCount = Math.max(Math.round(0.01 * maxSize * Math.log(maxSize)), 1) + this.map = new Map() + } + + /** + * Updates or adds an entry to the cache, and prunes the cache if above the maximum allowed size + * + * @param {string} user cache key for the user to set + * @param {string} database new home database to set for the user + */ + set (user: string, database: string): void { + this.map.set(user, { database, lastUsed: Date.now() }) + this._pruneCache() + } + + /** + * retrieves the last known home database for a user + * + * @param {string} user cache key for the user to get + */ + get (user: string): string | undefined { + const value = this.map.get(user) + if (value !== undefined) { + value.lastUsed = Date.now() + return value.database + } + return undefined + } + + /** + * removes the entry for a given user in the cache + * + * @param {string} user cache key for the user to remove + */ + delete (user: string): void { + this.map.delete(user) + } + + /** + * Removes a number of the oldest entries in the cache if the number of entries has exceeded the maximum size. + */ + private _pruneCache (): void { + if (this.map.size > this.maxSize) { + const sortedArray = Array.from(this.map.entries()).sort((a, b) => a[1].lastUsed - b[1].lastUsed) + for (let i = 0; i < this.pruneCount; i++) { + this.map.delete(sortedArray[i][0]) + } + } + } +} + +/** + * Interface for an entry in the cache. + */ +interface HomeDatabaseEntry { + database: string + lastUsed: number +} diff --git a/packages/core/src/json.ts b/packages/core/src/json.ts index cba26b0ba..b2e7c3131 100644 --- a/packages/core/src/json.ts +++ b/packages/core/src/json.ts @@ -19,6 +19,7 @@ import { isBrokenObject, getBrokenObjectReason } from './internal/object-util' interface StringifyOpts { useCustomToString?: boolean + sortedElements?: boolean } /** @@ -40,6 +41,19 @@ export function stringify (val: any, opts?: StringifyOpts): string { return `${value}n` } + if (opts?.sortedElements === true && + typeof value === 'object' && + !Array.isArray(value)) { + return Object.keys(value).sort().reduce( + (obj, key) => { + // @ts-expect-error: no way to avoid implicit 'any' + obj[key] = value[key] + return obj + }, + {} + ) + } + if (opts?.useCustomToString === true && typeof value === 'object' && !Array.isArray(value) && @@ -47,6 +61,7 @@ export function stringify (val: any, opts?: StringifyOpts): string { value.toString !== Object.prototype.toString) { return value?.toString() } + return value }) } diff --git a/packages/core/src/session.ts b/packages/core/src/session.ts index f08318569..bd3e01082 100644 --- a/packages/core/src/session.ts +++ b/packages/core/src/session.ts @@ -37,6 +37,7 @@ import BookmarkManager from './bookmark-manager' import { RecordShape } from './record' import NotificationFilter from './notification-filter' import { Logger } from './internal/logger' +import { cacheKey } from './internal/auth-util' type ConnectionConsumer = (connection: Connection) => Promise | T type TransactionWork = (tx: Transaction) => Promise | T @@ -74,6 +75,10 @@ class Session { private readonly _bookmarkManager?: BookmarkManager private readonly _notificationFilter?: NotificationFilter private readonly _log: Logger + private readonly _homeDatabaseCallback: Function | undefined + private readonly _auth: AuthToken | undefined + private _databaseGuess: string | undefined + private readonly _isRoutingSession: boolean /** * @constructor * @protected @@ -86,8 +91,11 @@ class Session { * @param {boolean} args.reactive - Whether this session should create reactive streams * @param {number} args.fetchSize - Defines how many records is pulled in each pulling batch * @param {string} args.impersonatedUser - The username which the user wants to impersonate for the duration of the session. - * @param {AuthToken} args.auth - the target auth for the to-be-acquired connection + * @param {BookmarkManager} args.bookmarkManager - The bookmark manager used for this session. * @param {NotificationFilter} args.notificationFilter - The notification filter used for this session. + * @param {AuthToken} args.auth - the target auth for the to-be-acquired connection + * @param {Logger} args.log - the logger used for logs in this session. + * @param {(user:string, database:string) => void} args.homeDatabaseCallback - callback used to update the home database cache */ constructor ({ mode, @@ -101,7 +109,8 @@ class Session { bookmarkManager, notificationFilter, auth, - log + log, + homeDatabaseCallback }: { mode: SessionMode connectionProvider: ConnectionProvider @@ -115,12 +124,14 @@ class Session { notificationFilter?: NotificationFilter auth?: AuthToken log: Logger + homeDatabaseCallback?: (user: string, database: string) => void }) { this._mode = mode this._database = database this._reactive = reactive this._fetchSize = fetchSize - this._onDatabaseNameResolved = this._onDatabaseNameResolved.bind(this) + this._homeDatabaseCallback = homeDatabaseCallback + this._auth = auth this._getConnectionAcquistionBookmarks = this._getConnectionAcquistionBookmarks.bind(this) this._readConnectionHolder = new ConnectionHolder({ mode: ACCESS_MODE_READ, @@ -129,7 +140,7 @@ class Session { bookmarks, connectionProvider, impersonatedUser, - onDatabaseNameResolved: this._onDatabaseNameResolved, + onDatabaseNameResolved: this._onDatabaseNameResolved.bind(this), getConnectionAcquistionBookmarks: this._getConnectionAcquistionBookmarks, log }) @@ -140,7 +151,7 @@ class Session { bookmarks, connectionProvider, impersonatedUser, - onDatabaseNameResolved: this._onDatabaseNameResolved, + onDatabaseNameResolved: this._onDatabaseNameResolved.bind(this), getConnectionAcquistionBookmarks: this._getConnectionAcquistionBookmarks, log }) @@ -158,6 +169,8 @@ class Session { this._bookmarkManager = bookmarkManager this._notificationFilter = notificationFilter this._log = log + this._databaseGuess = config?.cachedHomeDatabase + this._isRoutingSession = config?.routingDriver ?? false } /** @@ -201,7 +214,8 @@ class Session { fetchSize: this._fetchSize, lowRecordWatermark: this._lowRecordWatermark, highRecordWatermark: this._highRecordWatermark, - notificationFilter: this._notificationFilter + notificationFilter: this._notificationFilter, + onDb: this._onDatabaseNameResolved.bind(this) }) }) this._results.push(result) @@ -254,7 +268,7 @@ class Session { resultPromise = Promise.reject( newError('Cannot run query in a closed session.') ) - } else if (!this._hasTx && connectionHolder.initializeConnection()) { + } else if (!this._hasTx && connectionHolder.initializeConnection(this._databaseGuess)) { resultPromise = connectionHolder .getConnection() // Connection won't be null at this point since the initialize method @@ -310,7 +324,7 @@ class Session { const mode = Session._validateSessionMode(accessMode) const connectionHolder = this._connectionHolderWithMode(mode) - connectionHolder.initializeConnection() + connectionHolder.initializeConnection(this._databaseGuess) this._hasTx = true const tx = new TransactionPromise({ @@ -324,7 +338,8 @@ class Session { lowRecordWatermark: this._lowRecordWatermark, highRecordWatermark: this._highRecordWatermark, notificationFilter: this._notificationFilter, - apiTelemetryConfig + apiTelemetryConfig, + onDbCallback: this._onDatabaseNameResolved.bind(this) }) tx._begin(() => this._bookmarks(), txConfig) return tx @@ -508,12 +523,18 @@ class Session { * @returns {void} */ _onDatabaseNameResolved (database?: string): void { - if (!this._databaseNameResolved) { - const normalizedDatabase = database ?? '' - this._database = normalizedDatabase - this._readConnectionHolder.setDatabase(normalizedDatabase) - this._writeConnectionHolder.setDatabase(normalizedDatabase) - this._databaseNameResolved = true + if (this._isRoutingSession) { + this._databaseGuess = database + if (!this._databaseNameResolved) { + const normalizedDatabase = database ?? '' + this._database = normalizedDatabase + this._readConnectionHolder.setDatabase(normalizedDatabase) + this._writeConnectionHolder.setDatabase(normalizedDatabase) + this._databaseNameResolved = true + if (this._homeDatabaseCallback != null) { + this._homeDatabaseCallback(cacheKey(this._auth, this._impersonatedUser), database) + } + } } } diff --git a/packages/core/src/transaction-promise.ts b/packages/core/src/transaction-promise.ts index af184fa70..6583163fc 100644 --- a/packages/core/src/transaction-promise.ts +++ b/packages/core/src/transaction-promise.ts @@ -43,6 +43,7 @@ class TransactionPromise extends Transaction implements Promise { private _beginPromise?: Promise private _reject?: (error: Error) => void private _resolve?: (value: Transaction | PromiseLike) => void + private readonly _onDbCallback: (database: string) => void /** * @constructor @@ -69,7 +70,8 @@ class TransactionPromise extends Transaction implements Promise { highRecordWatermark, lowRecordWatermark, notificationFilter, - apiTelemetryConfig + apiTelemetryConfig, + onDbCallback }: { connectionHolder: ConnectionHolder onClose: () => void @@ -82,6 +84,7 @@ class TransactionPromise extends Transaction implements Promise { lowRecordWatermark: number notificationFilter?: NotificationFilter apiTelemetryConfig?: NonAutoCommitApiTelemetryConfig + onDbCallback: (database: string) => void }) { super({ connectionHolder, @@ -96,6 +99,7 @@ class TransactionPromise extends Transaction implements Promise { notificationFilter, apiTelemetryConfig }) + this._onDbCallback = onDbCallback } /** @@ -174,7 +178,8 @@ class TransactionPromise extends Transaction implements Promise { _begin (bookmarks: () => Promise, txConfig: TxConfig): void { return super._begin(bookmarks, txConfig, { onError: this._onBeginError.bind(this), - onComplete: this._onBeginMetadata.bind(this) + onComplete: this._onBeginMetadata.bind(this), + onDB: this._onDbCallback }) } diff --git a/packages/core/src/transaction.ts b/packages/core/src/transaction.ts index 1f3dee59d..85f1331d9 100644 --- a/packages/core/src/transaction.ts +++ b/packages/core/src/transaction.ts @@ -140,6 +140,7 @@ class Transaction { _begin (getBookmarks: () => Promise, txConfig: TxConfig, events?: { onError: (error: Error) => void onComplete: (metadata: any) => void + onDB: (database: string) => void }): void { this._connectionHolder .getConnection() @@ -165,6 +166,9 @@ class Transaction { if (events != null) { events.onComplete(metadata) } + if (metadata.db !== undefined && ((events?.onDB) != null)) { + events.onDB(metadata.db) + } this._onComplete(metadata) } }) diff --git a/packages/core/test/auth.test.ts b/packages/core/test/auth.test.ts index 710935709..abd3ae142 100644 --- a/packages/core/test/auth.test.ts +++ b/packages/core/test/auth.test.ts @@ -15,6 +15,7 @@ * limitations under the License. */ import auth from '../src/auth' +import { cacheKey } from '../src/internal/auth-util' describe('auth', () => { test('.bearer()', () => { @@ -56,4 +57,73 @@ describe('auth', () => { ])('.custom()', (args, output) => { expect(auth.custom.apply(auth, args)).toEqual(output) }) + + test.each([ + [ + ['user', 'pass', 'realm', 'scheme', { param: 'param' }], + ['user', 'pass', 'realm', 'scheme', { param: 'param' }], + true + ], + [ + ['user', 'pass', 'realm', 'scheme', { param2: 'param2', param: 'param' }], + ['user', 'pass', 'realm', 'scheme', { param: 'param', param2: 'param2' }], + true + ], + [ + ['user', 'pass', 'realm', 'scheme', { a: { param2: 'param2', param: 'param' } }], + ['user', 'pass', 'realm', 'scheme', { a: { param: 'param', param2: 'param2' } }], + true + ], + [ + ['user', 'pass', 'realm', 'scheme', { param: [1, 2, 3] }], + ['user', 'pass', 'realm', 'scheme', { param: [1, 2, 3] }], + true + ], + [ + ['user', 'pass', 'realm', 'scheme', { param: [1, 2, 3] }], + ['user', 'pass', 'realm', 'scheme', { param: [1, 3, 2] }], + false + ] + + ])('custom token cacheKey', (args1, args2, shouldMatch) => { + if (shouldMatch) { + expect(cacheKey(auth.custom.apply(auth, args1))).toEqual(cacheKey(auth.custom.apply(auth, args2))) + } else { + expect(cacheKey(auth.custom.apply(auth, args1))).not.toEqual(cacheKey(auth.custom.apply(auth, args2))) + } + }) + + test.each([ + [ + { + scheme: 'basic', + principal: 'user', + credentials: 'password' + }, + 'basic:user' + ], + [ + { + scheme: 'bearer', + credentials: 'Base64EncodedString' + }, + 'bearer:Base64EncodedString' + ], + [ + { + scheme: 'kerberos', + credentials: 'Base64EncodedString' + }, + 'kerberos:Base64EncodedString' + ], + [ + { + scheme: 'none', + credentials: '' + }, + 'none' + ] + ])('token cacheKey', (token, expected) => { + expect(cacheKey(token)).toEqual(expected) + }) }) diff --git a/packages/core/test/driver.test.ts b/packages/core/test/driver.test.ts index be1c6ce75..de2c8ef0b 100644 --- a/packages/core/test/driver.test.ts +++ b/packages/core/test/driver.test.ts @@ -699,7 +699,8 @@ describe('Driver', () => { fetchSize: 1000, maxConnectionLifetime: 3600000, maxConnectionPoolSize: 100, - connectionTimeout: 30000 + connectionTimeout: 30000, + routingDriver: false }, connectionProvider, database: '', @@ -709,6 +710,7 @@ describe('Driver', () => { impersonatedUser: undefined, // @ts-expect-error log: driver?._log, + homeDatabaseCallback: expect.any(Function), ...extra } } diff --git a/packages/core/test/internal/auth-util.test.ts b/packages/core/test/internal/auth-util.test.ts new file mode 100644 index 000000000..dfa806cee --- /dev/null +++ b/packages/core/test/internal/auth-util.test.ts @@ -0,0 +1,31 @@ +/** + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [https://neo4j.com] + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import auth from '../../src/auth' +import { cacheKey } from '../../src/internal/auth-util' + +describe('#unit cacheKey()', () => { + it.each([ + ['basic', auth.basic('hello', 'basic'), 'basic:hello'], + ['kerberos', auth.kerberos('kerberosString'), 'kerberos:kerberosString'], + ['bearer', auth.bearer('bearerToken'), 'bearer:bearerToken'], + ['custom without parameters', auth.custom('hello', 'custom', 'realm', 'scheme'), '{"credentials":"custom","principal":"hello","realm":"realm","scheme":"scheme"}'], + ['custom with parameters', auth.custom('hello', 'custom', 'realm', 'scheme', { array: [1, 2, 3] }), '{"credentials":"custom","parameters":{"array":[1,2,3]},"principal":"hello","realm":"realm","scheme":"scheme"}'] + ])('should create correct cacheKey for % auth token', (_, token, expectedKey) => { + expect(cacheKey(token)).toEqual(expectedKey) + }) +}) diff --git a/packages/core/test/internal/homedb-cache.test.ts b/packages/core/test/internal/homedb-cache.test.ts new file mode 100644 index 000000000..0b18ce635 --- /dev/null +++ b/packages/core/test/internal/homedb-cache.test.ts @@ -0,0 +1,47 @@ +/** + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [https://neo4j.com] + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import HomeDatabaseCache from '../../src/internal/homedb-cache' + +describe('#unit HomeDatabaseCache', () => { + it('should build homedb cache', () => { + const cache = new HomeDatabaseCache(10000) + cache.set('DEFAULT', 'neo4j') + expect(cache.get('DEFAULT')).toBe('neo4j') + cache.set('basic:hi', 'neo4j') + expect(cache.get('basic:hi')).toBe('neo4j') + }) + + it('should cap homeDb size by removing least recently used', async () => { + const cache = new HomeDatabaseCache(1000) + for (let i = 0; i < 999; i++) { + cache.set(i.toString(), 'neo4j') + } + await new Promise(resolve => setTimeout(resolve, 100)) + cache.set('5', 'neo4j') + cache.get('55') + for (let i = 999; i < 1050; i++) { + cache.set(i.toString(), 'neo4j') + } + expect(cache.get('1')).toEqual(undefined) + expect(cache.get('61')).toEqual(undefined) + expect(cache.get('5')).toEqual('neo4j') + expect(cache.get('55')).toEqual('neo4j') + expect(cache.get('101')).toEqual('neo4j') + expect(cache.get('1001')).toEqual('neo4j') + }) +}) diff --git a/packages/core/test/transaction.test.ts b/packages/core/test/transaction.test.ts index 0558e4105..e95617594 100644 --- a/packages/core/test/transaction.test.ts +++ b/packages/core/test/transaction.test.ts @@ -566,7 +566,8 @@ function newTransactionPromise ({ impersonatedUser: '', highRecordWatermark, lowRecordWatermark, - notificationFilter + notificationFilter, + onDbCallback: (_: string) => { } }) return transaction diff --git a/packages/neo4j-driver-deno/lib/bolt-connection/bolt/bolt-protocol-v5x6.js b/packages/neo4j-driver-deno/lib/bolt-connection/bolt/bolt-protocol-v5x6.js index ea5bd840b..b80cfbb15 100644 --- a/packages/neo4j-driver-deno/lib/bolt-connection/bolt/bolt-protocol-v5x6.js +++ b/packages/neo4j-driver-deno/lib/bolt-connection/bolt/bolt-protocol-v5x6.js @@ -16,7 +16,7 @@ */ import BoltProtocolV5x5 from './bolt-protocol-v5x5.js' -import transformersFactories from './bolt-protocol-v5x5.transformer.js' +import transformersFactories from './bolt-protocol-v5x6.transformer.js' import Transformer from './transformer.js' import { internal } from '../../core/index.ts' diff --git a/packages/neo4j-driver-deno/lib/bolt-connection/bolt/bolt-protocol-v5x7.js b/packages/neo4j-driver-deno/lib/bolt-connection/bolt/bolt-protocol-v5x7.js index 9f417ab50..71f658f0f 100644 --- a/packages/neo4j-driver-deno/lib/bolt-connection/bolt/bolt-protocol-v5x7.js +++ b/packages/neo4j-driver-deno/lib/bolt-connection/bolt/bolt-protocol-v5x7.js @@ -16,7 +16,7 @@ */ import BoltProtocolV5x6 from './bolt-protocol-v5x6.js' -import transformersFactories from './bolt-protocol-v5x5.transformer.js' +import transformersFactories from './bolt-protocol-v5x7.transformer.js' import Transformer from './transformer.js' import { internal } from '../../core/index.ts' diff --git a/packages/neo4j-driver-deno/lib/bolt-connection/bolt/bolt-protocol-v5x8.js b/packages/neo4j-driver-deno/lib/bolt-connection/bolt/bolt-protocol-v5x8.js new file mode 100644 index 000000000..c2d832982 --- /dev/null +++ b/packages/neo4j-driver-deno/lib/bolt-connection/bolt/bolt-protocol-v5x8.js @@ -0,0 +1,104 @@ +/** + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [https://neo4j.com] + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import BoltProtocolV5x7 from './bolt-protocol-v5x7.js' + +import transformersFactories from './bolt-protocol-v5x8.transformer.js' +import Transformer from './transformer.js' +import RequestMessage from './request-message.js' +import { ResultStreamObserver } from './stream-observers.js' + +import { internal } from '../../core/index.ts' + +const { + constants: { BOLT_PROTOCOL_V5_8, FETCH_ALL } +} = internal + +export default class BoltProtocol extends BoltProtocolV5x7 { + get version () { + return BOLT_PROTOCOL_V5_8 + } + + get transformer () { + if (this._transformer === undefined) { + this._transformer = new Transformer(Object.values(transformersFactories).map(create => create(this._config, this._log))) + } + return this._transformer + } + + run ( + query, + parameters, + { + bookmarks, + txConfig, + database, + mode, + impersonatedUser, + notificationFilter, + beforeKeys, + afterKeys, + beforeError, + afterError, + beforeComplete, + afterComplete, + flush = true, + reactive = false, + fetchSize = FETCH_ALL, + highRecordWatermark = Number.MAX_VALUE, + lowRecordWatermark = Number.MAX_VALUE, + onDb + } = {} + ) { + const observer = new ResultStreamObserver({ + server: this._server, + reactive, + fetchSize, + moreFunction: this._requestMore.bind(this), + discardFunction: this._requestDiscard.bind(this), + beforeKeys, + afterKeys, + beforeError, + afterError, + beforeComplete, + afterComplete, + highRecordWatermark, + lowRecordWatermark, + enrichMetadata: this._enrichMetadata, + onDb + }) + + const flushRun = reactive + this.write( + RequestMessage.runWithMetadata5x5(query, parameters, { + bookmarks, + txConfig, + database, + mode, + impersonatedUser, + notificationFilter + }), + observer, + flushRun && flush + ) + + if (!reactive) { + this.write(RequestMessage.pull({ n: fetchSize }), observer, flush) + } + + return observer + } +} diff --git a/packages/neo4j-driver-deno/lib/bolt-connection/bolt/bolt-protocol-v5x8.transformer.js b/packages/neo4j-driver-deno/lib/bolt-connection/bolt/bolt-protocol-v5x8.transformer.js new file mode 100644 index 000000000..89304f7a9 --- /dev/null +++ b/packages/neo4j-driver-deno/lib/bolt-connection/bolt/bolt-protocol-v5x8.transformer.js @@ -0,0 +1,22 @@ +/** + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [https://neo4j.com] + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import v5x7 from './bolt-protocol-v5x7.transformer.js' + +export default { + ...v5x7 +} diff --git a/packages/neo4j-driver-deno/lib/bolt-connection/bolt/create.js b/packages/neo4j-driver-deno/lib/bolt-connection/bolt/create.js index 7a0fe2165..188e8c6bd 100644 --- a/packages/neo4j-driver-deno/lib/bolt-connection/bolt/create.js +++ b/packages/neo4j-driver-deno/lib/bolt-connection/bolt/create.js @@ -32,6 +32,7 @@ import BoltProtocolV5x4 from './bolt-protocol-v5x4.js' import BoltProtocolV5x5 from './bolt-protocol-v5x5.js' import BoltProtocolV5x6 from './bolt-protocol-v5x6.js' import BoltProtocolV5x7 from './bolt-protocol-v5x7.js' +import BoltProtocolV5x8 from './bolt-protocol-v5x8.js' // eslint-disable-next-line no-unused-vars import { Chunker, Dechunker } from '../channel/index.js' import ResponseHandler from './response-handler.js' @@ -257,6 +258,14 @@ function createProtocol ( log, onProtocolError, serversideRouting) + case 5.8: + return new BoltProtocolV5x8(server, + chunker, + packingConfig, + createResponseHandler, + log, + onProtocolError, + serversideRouting) default: throw newError('Unknown Bolt protocol version: ' + version) } diff --git a/packages/neo4j-driver-deno/lib/bolt-connection/bolt/handshake.js b/packages/neo4j-driver-deno/lib/bolt-connection/bolt/handshake.js index c91e0e18f..caff6d596 100644 --- a/packages/neo4j-driver-deno/lib/bolt-connection/bolt/handshake.js +++ b/packages/neo4j-driver-deno/lib/bolt-connection/bolt/handshake.js @@ -76,7 +76,7 @@ function parseNegotiatedResponse (buffer, log) { */ function newHandshakeBuffer () { return createHandshakeMessage([ - [version(5, 7), version(5, 0)], + [version(5, 8), version(5, 0)], [version(4, 4), version(4, 2)], version(4, 1), version(3, 0) diff --git a/packages/neo4j-driver-deno/lib/bolt-connection/bolt/stream-observers.js b/packages/neo4j-driver-deno/lib/bolt-connection/bolt/stream-observers.js index fb75d83ab..e409974a5 100644 --- a/packages/neo4j-driver-deno/lib/bolt-connection/bolt/stream-observers.js +++ b/packages/neo4j-driver-deno/lib/bolt-connection/bolt/stream-observers.js @@ -79,7 +79,8 @@ class ResultStreamObserver extends StreamObserver { server, highRecordWatermark = Number.MAX_VALUE, lowRecordWatermark = Number.MAX_VALUE, - enrichMetadata + enrichMetadata, + onDb } = {}) { super() @@ -113,6 +114,7 @@ class ResultStreamObserver extends StreamObserver { this._paused = false this._pulled = !reactive this._haveRecordStreamed = false + this._onDb = onDb } /** @@ -319,6 +321,10 @@ class ResultStreamObserver extends StreamObserver { } } + if (meta.db !== null && this._onDb !== undefined) { + this._onDb(meta.db) + } + if (meta.fields != null) { // remove fields key from metadata object delete meta.fields diff --git a/packages/neo4j-driver-deno/lib/bolt-connection/connection-provider/connection-provider-routing.js b/packages/neo4j-driver-deno/lib/bolt-connection/connection-provider/connection-provider-routing.js index fbd1a8b4d..46014726f 100644 --- a/packages/neo4j-driver-deno/lib/bolt-connection/connection-provider/connection-provider-routing.js +++ b/packages/neo4j-driver-deno/lib/bolt-connection/connection-provider/connection-provider-routing.js @@ -78,7 +78,8 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider this._createConnectionErrorHandler(), this._log, await this._clientCertificateHolder.getClientCertificate(), - this._routingContext + this._routingContext, + this._channelSsrCallback.bind(this) ) }) @@ -99,6 +100,8 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider ) this._refreshRoutingTable = functional.reuseOngoingRequest(this._refreshRoutingTable, this) + this._withSSR = 0 + this._withoutSSR = 0 } _createConnectionErrorHandler () { @@ -139,19 +142,30 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider * See {@link ConnectionProvider} for more information about this method and * its arguments. */ - async acquireConnection ({ accessMode, database, bookmarks, impersonatedUser, onDatabaseNameResolved, auth } = {}) { - let name - let address + async acquireConnection ({ accessMode, database, bookmarks, impersonatedUser, onDatabaseNameResolved, auth, homeDb } = {}) { const context = { database: database || DEFAULT_DB_NAME } const databaseSpecificErrorHandler = new ConnectionErrorHandler( SESSION_EXPIRED, (error, address) => this._handleUnavailability(error, address, context.database), - (error, address) => this._handleWriteFailure(error, address, context.database), - (error, address, conn) => - this._handleSecurityError(error, address, conn, context.database) + (error, address) => this._handleWriteFailure(error, address, homeDb ?? context.database), + (error, address, conn) => this._handleSecurityError(error, address, conn, context.database) ) + let conn + if (this.SSREnabled() && homeDb !== undefined && database === '') { + const currentRoutingTable = this._routingTableRegistry.get( + homeDb, + () => new RoutingTable({ database: homeDb }) + ) + if (currentRoutingTable && !currentRoutingTable.isStaleFor(accessMode)) { + conn = await this.getConnectionFromRoutingTable(currentRoutingTable, auth, accessMode, databaseSpecificErrorHandler) + if (this.SSREnabled()) { + return conn + } + conn.release() + } + } const routingTable = await this._freshRoutingTable({ accessMode, database: context.database, @@ -165,7 +179,12 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider } } }) + return this.getConnectionFromRoutingTable(routingTable, auth, accessMode, databaseSpecificErrorHandler) + } + async getConnectionFromRoutingTable (routingTable, auth, accessMode, databaseSpecificErrorHandler) { + let name + let address // select a target server based on specified access mode if (accessMode === READ) { address = this._loadBalancingStrategy.selectReader(routingTable.readers) @@ -663,6 +682,28 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider routingTable.forgetRouter(address) } } + + _channelSsrCallback (isEnabled, action) { + if (action === 'OPEN') { + if (isEnabled === true) { + this._withSSR = this._withSSR + 1 + } else { + this._withoutSSR = this._withoutSSR + 1 + } + } else if (action === 'CLOSE') { + if (isEnabled === true) { + this._withSSR = this._withSSR - 1 + } else { + this._withoutSSR = this._withoutSSR - 1 + } + } else { + throw newError("Channel SSR Callback invoked with action other than 'OPEN' or 'CLOSE'") + } + } + + SSREnabled () { + return this._withSSR > 0 && this._withoutSSR === 0 + } } /** diff --git a/packages/neo4j-driver-deno/lib/bolt-connection/connection/connection-channel.js b/packages/neo4j-driver-deno/lib/bolt-connection/connection/connection-channel.js index 0da4592db..7fcf4689c 100644 --- a/packages/neo4j-driver-deno/lib/bolt-connection/connection/connection-channel.js +++ b/packages/neo4j-driver-deno/lib/bolt-connection/connection/connection-channel.js @@ -34,6 +34,8 @@ let idGenerator = 0 * @param {ConnectionErrorHandler} errorHandler - the error handler for connection errors. * @param {Logger} log - configured logger. * @param {clientCertificate} clientCertificate - configured client certificate + * @param ssrCallback - callback function used to update the counts of ssr enabled and disabled connections + * @param createChannel - function taking a channelConfig object and creating a channel with it * @return {Connection} - new connection. */ export function createChannelConnection ( @@ -43,6 +45,7 @@ export function createChannelConnection ( log, clientCertificate, serversideRouting = null, + ssrCallback, createChannel = channelConfig => new Channel(channelConfig) ) { const channelConfig = new ChannelConfig( @@ -89,7 +92,8 @@ export function createChannelConnection ( chunker, config.notificationFilter, createProtocol, - config.telemetryDisabled + config.telemetryDisabled, + ssrCallback ) // forward all pending bytes to the dechunker @@ -110,9 +114,11 @@ export default class ChannelConnection extends Connection { * @param {ConnectionErrorHandler} errorHandler the error handler. * @param {ServerAddress} address - the server address to connect to. * @param {Logger} log - the configured logger. - * @param {boolean} disableLosslessIntegers if this connection should convert all received integers to native JS numbers. - * @param {Chunker} chunker the chunker - * @param protocolSupplier Bolt protocol supplier + * @param {boolean} disableLosslessIntegers - if this connection should convert all received integers to native JS numbers. + * @param {Chunker} chunker - the chunker + * @param protocolSupplier - Bolt protocol supplier + * @param {boolean} telemetryDisabled - wether telemetry has been disabled in driver config. + * @param ssrCallback - callback function used to update the counts of ssr enabled and disabled connections. */ constructor ( channel, @@ -124,7 +130,8 @@ export default class ChannelConnection extends Connection { chunker, // to be removed, notificationFilter, protocolSupplier, - telemetryDisabled + telemetryDisabled, + ssrCallback = (_) => {} ) { super(errorHandler) this._authToken = null @@ -143,6 +150,7 @@ export default class ChannelConnection extends Connection { this._notificationFilter = notificationFilter this._telemetryDisabledDriverConfig = telemetryDisabled === true this._telemetryDisabledConnection = true + this._ssrCallback = ssrCallback // connection from the database, returned in response for HELLO message and might not be available this._dbConnectionId = null @@ -331,7 +339,9 @@ export default class ChannelConnection extends Connection { if (telemetryEnabledHint === true) { this._telemetryDisabledConnection = false } + this.SSREnabledHint = metadata.hints['ssr.enabled'] } + this._ssrCallback(this.SSREnabledHint ?? false, 'OPEN') } resolve(self) } @@ -538,6 +548,7 @@ export default class ChannelConnection extends Connection { * @returns {Promise} - A promise that will be resolved when the underlying channel is closed. */ async close () { + this._ssrCallback(this.SSREnabledHint ?? false, 'CLOSE') if (this._log.isDebugEnabled()) { this._log.debug('closing') } diff --git a/packages/neo4j-driver-deno/lib/core/connection-provider.ts b/packages/neo4j-driver-deno/lib/core/connection-provider.ts index 977aeeada..59aff6d33 100644 --- a/packages/neo4j-driver-deno/lib/core/connection-provider.ts +++ b/packages/neo4j-driver-deno/lib/core/connection-provider.ts @@ -56,7 +56,9 @@ class ConnectionProvider { * @property {string} param.database - the target database for the to-be-acquired connection * @property {Bookmarks} param.bookmarks - the bookmarks to send to routing discovery * @property {string} param.impersonatedUser - the impersonated user - * @property {function (databaseName:string?)} param.onDatabaseNameResolved - Callback called when the database name get resolved + * @property {function (databaseName:string)} param.onDatabaseNameResolved - Callback called when the database name get resolved + * @property {AuthToken} param.auth - auth token used to authorize for connection acquisition + * @property {string} param.homeDb - the driver's best guess at the current home database for the user * @returns {Promise} */ acquireConnection (param?: { @@ -64,8 +66,9 @@ class ConnectionProvider { database?: string bookmarks: bookmarks.Bookmarks impersonatedUser?: string - onDatabaseNameResolved?: (databaseName?: string) => void + onDatabaseNameResolved?: (database: string) => void auth?: AuthToken + homeDb?: string }): Promise { throw Error('Not implemented') } @@ -110,6 +113,10 @@ class ConnectionProvider { throw Error('Not implemented') } + SSREnabled (): boolean { + return false + } + /** * This method verifies the connectivity of the database by trying to acquire a connection * for each server available in the cluster. diff --git a/packages/neo4j-driver-deno/lib/core/connection.ts b/packages/neo4j-driver-deno/lib/core/connection.ts index 53cdbdb9d..b45e746ca 100644 --- a/packages/neo4j-driver-deno/lib/core/connection.ts +++ b/packages/neo4j-driver-deno/lib/core/connection.ts @@ -58,6 +58,7 @@ interface RunQueryConfig extends BeginTransactionConfig { highRecordWatermark: number lowRecordWatermark: number reactive: boolean + onDb?: (database: string) => void } /** diff --git a/packages/neo4j-driver-deno/lib/core/driver.ts b/packages/neo4j-driver-deno/lib/core/driver.ts index 0e1f33f42..86b43a088 100644 --- a/packages/neo4j-driver-deno/lib/core/driver.ts +++ b/packages/neo4j-driver-deno/lib/core/driver.ts @@ -46,6 +46,8 @@ import resultTransformers, { ResultTransformer } from './result-transformers.ts' import QueryExecutor from './internal/query-executor.ts' import { newError } from './error.ts' import NotificationFilter from './notification-filter.ts' +import HomeDatabaseCache from './internal/homedb-cache.ts' +import { cacheKey } from './internal/auth-util.ts' const DEFAULT_MAX_CONNECTION_LIFETIME: number = 60 * 60 * 1000 // 1 hour @@ -55,6 +57,11 @@ const DEFAULT_MAX_CONNECTION_LIFETIME: number = 60 * 60 * 1000 // 1 hour */ const DEFAULT_FETCH_SIZE: number = 1000 +/** + * The maximum number of entries allowed in the home database cache before pruning. + */ +const HOMEDB_CACHE_MAX_SIZE: number = 10000 + /** * Constant that represents read session access mode. * Should be used like this: `driver.session({ defaultAccessMode: neo4j.session.READ })`. @@ -97,6 +104,7 @@ type CreateSession = (args: { notificationFilter?: NotificationFilter auth?: AuthToken log: Logger + homeDatabaseCallback?: (user: string, database: any) => void }) => Session type CreateQueryExecutor = (createSession: (config: { database?: string, bookmarkManager?: BookmarkManager }) => Session) => QueryExecutor @@ -470,6 +478,7 @@ class Driver { private readonly _createSession: CreateSession private readonly _defaultExecuteQueryBookmarkManager: BookmarkManager private readonly _queryExecutor: QueryExecutor + private readonly homeDatabaseCache: HomeDatabaseCache /** * You should not be calling this directly, instead use {@link driver}. @@ -509,6 +518,11 @@ class Driver { */ this._connectionProvider = null + /** + * @private + */ + this.homeDatabaseCache = new HomeDatabaseCache(HOMEDB_CACHE_MAX_SIZE) + this._afterConstruction() } @@ -837,6 +851,10 @@ class Driver { ) } + _homeDatabaseCallback (cacheKey: string, database: any): void { + this.homeDatabaseCache.set(cacheKey, database) + } + /** * @private */ @@ -863,6 +881,9 @@ class Driver { }): Session { const sessionMode = Session._validateSessionMode(defaultAccessMode) const connectionProvider = this._getOrCreateConnectionProvider() + // eslint-disable-next-line + const cachedHomeDatabase = this.homeDatabaseCache.get(cacheKey(auth, impersonatedUser)) + const homeDatabaseCallback = this._homeDatabaseCallback.bind(this) const bookmarks = bookmarkOrBookmarks != null ? new Bookmarks(bookmarkOrBookmarks) : Bookmarks.empty() @@ -872,14 +893,19 @@ class Driver { database: database ?? '', connectionProvider, bookmarks, - config: this._config, + config: { + cachedHomeDatabase, + routingDriver: this._supportsRouting(), + ...this._config + }, reactive, impersonatedUser, fetchSize, bookmarkManager, notificationFilter, auth, - log: this._log + log: this._log, + homeDatabaseCallback }) } diff --git a/packages/neo4j-driver-deno/lib/core/internal/auth-util.ts b/packages/neo4j-driver-deno/lib/core/internal/auth-util.ts new file mode 100644 index 000000000..bb9c700f5 --- /dev/null +++ b/packages/neo4j-driver-deno/lib/core/internal/auth-util.ts @@ -0,0 +1,41 @@ +/** + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [https://neo4j.com] + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { stringify } from '../json.ts' +import { AuthToken } from '../types.ts' + +export function cacheKey (auth?: AuthToken, impersonatedUser?: string): string { + if (impersonatedUser != null) { + return 'basic:' + impersonatedUser + } + if (auth === undefined) { + return 'DEFAULT' + } + if (auth.scheme === 'basic') { + return 'basic:' + (auth.principal ?? '') + } + if (auth.scheme === 'kerberos') { + return 'kerberos:' + auth.credentials + } + if (auth.scheme === 'bearer') { + return 'bearer:' + auth.credentials + } + if (auth.scheme === 'none') { + return 'none' + } + return stringify(auth, { sortedElements: true }) +} diff --git a/packages/neo4j-driver-deno/lib/core/internal/connection-holder.ts b/packages/neo4j-driver-deno/lib/core/internal/connection-holder.ts index 1f304c313..f8e1027cb 100644 --- a/packages/neo4j-driver-deno/lib/core/internal/connection-holder.ts +++ b/packages/neo4j-driver-deno/lib/core/internal/connection-holder.ts @@ -84,7 +84,7 @@ class ConnectionHolder implements ConnectionHolderInterface { private _connectionPromise: Promise private readonly _impersonatedUser?: string private readonly _getConnectionAcquistionBookmarks: () => Promise - private readonly _onDatabaseNameResolved?: (databaseName?: string) => void + private readonly _onDatabaseNameResolved?: (databaseName: string) => void private readonly _auth?: AuthToken private readonly _log: Logger private _closed: boolean @@ -117,7 +117,7 @@ class ConnectionHolder implements ConnectionHolderInterface { bookmarks?: Bookmarks connectionProvider?: ConnectionProvider impersonatedUser?: string - onDatabaseNameResolved?: (databaseName?: string) => void + onDatabaseNameResolved?: (database: string) => void getConnectionAcquistionBookmarks?: () => Promise auth?: AuthToken log: Logger @@ -161,9 +161,9 @@ class ConnectionHolder implements ConnectionHolderInterface { return this._referenceCount } - initializeConnection (): boolean { + initializeConnection (homeDatabase?: string): boolean { if (this._referenceCount === 0 && (this._connectionProvider != null)) { - this._connectionPromise = this._createConnectionPromise(this._connectionProvider) + this._connectionPromise = this._createConnectionPromise(this._connectionProvider, homeDatabase) } else { this._referenceCount++ return false @@ -172,14 +172,15 @@ class ConnectionHolder implements ConnectionHolderInterface { return true } - private async _createConnectionPromise (connectionProvider: ConnectionProvider): Promise { + private async _createConnectionPromise (connectionProvider: ConnectionProvider, homeDatabase?: string): Promise { return await connectionProvider.acquireConnection({ accessMode: this._mode, database: this._database, bookmarks: await this._getBookmarks(), impersonatedUser: this._impersonatedUser, onDatabaseNameResolved: this._onDatabaseNameResolved, - auth: this._auth + auth: this._auth, + homeDb: homeDatabase }) } diff --git a/packages/neo4j-driver-deno/lib/core/internal/constants.ts b/packages/neo4j-driver-deno/lib/core/internal/constants.ts index ed32dcef5..b45034da9 100644 --- a/packages/neo4j-driver-deno/lib/core/internal/constants.ts +++ b/packages/neo4j-driver-deno/lib/core/internal/constants.ts @@ -39,6 +39,7 @@ const BOLT_PROTOCOL_V5_4: number = 5.4 const BOLT_PROTOCOL_V5_5: number = 5.5 const BOLT_PROTOCOL_V5_6: number = 5.6 const BOLT_PROTOCOL_V5_7: number = 5.7 +const BOLT_PROTOCOL_V5_8: number = 5.8 const TELEMETRY_APIS = { MANAGED_TRANSACTION: 0, @@ -74,5 +75,6 @@ export { BOLT_PROTOCOL_V5_5, BOLT_PROTOCOL_V5_6, BOLT_PROTOCOL_V5_7, + BOLT_PROTOCOL_V5_8, TELEMETRY_APIS } diff --git a/packages/neo4j-driver-deno/lib/core/internal/homedb-cache.ts b/packages/neo4j-driver-deno/lib/core/internal/homedb-cache.ts new file mode 100644 index 000000000..7025b1032 --- /dev/null +++ b/packages/neo4j-driver-deno/lib/core/internal/homedb-cache.ts @@ -0,0 +1,87 @@ +/** + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [https://neo4j.com] + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Cache which maps users to their last known home database, along with the last time the entry was accessed. + * + * @private + */ +export default class HomeDatabaseCache { + maxSize: number + map: Map + pruneCount: number + + constructor (maxSize: number) { + this.maxSize = maxSize + this.pruneCount = Math.max(Math.round(0.01 * maxSize * Math.log(maxSize)), 1) + this.map = new Map() + } + + /** + * Updates or adds an entry to the cache, and prunes the cache if above the maximum allowed size + * + * @param {string} user cache key for the user to set + * @param {string} database new home database to set for the user + */ + set (user: string, database: string): void { + this.map.set(user, { database, lastUsed: Date.now() }) + this._pruneCache() + } + + /** + * retrieves the last known home database for a user + * + * @param {string} user cache key for the user to get + */ + get (user: string): string | undefined { + const value = this.map.get(user) + if (value !== undefined) { + value.lastUsed = Date.now() + return value.database + } + return undefined + } + + /** + * removes the entry for a given user in the cache + * + * @param {string} user cache key for the user to remove + */ + delete (user: string): void { + this.map.delete(user) + } + + /** + * Removes a number of the oldest entries in the cache if the number of entries has exceeded the maximum size. + */ + private _pruneCache (): void { + if (this.map.size > this.maxSize) { + const sortedArray = Array.from(this.map.entries()).sort((a, b) => a[1].lastUsed - b[1].lastUsed) + for (let i = 0; i < this.pruneCount; i++) { + this.map.delete(sortedArray[i][0]) + } + } + } +} + +/** + * Interface for an entry in the cache. + */ +interface HomeDatabaseEntry { + database: string + lastUsed: number +} diff --git a/packages/neo4j-driver-deno/lib/core/json.ts b/packages/neo4j-driver-deno/lib/core/json.ts index 25142ec12..003286a55 100644 --- a/packages/neo4j-driver-deno/lib/core/json.ts +++ b/packages/neo4j-driver-deno/lib/core/json.ts @@ -19,6 +19,7 @@ import { isBrokenObject, getBrokenObjectReason } from './internal/object-util.ts interface StringifyOpts { useCustomToString?: boolean + sortedElements?: boolean } /** @@ -40,6 +41,19 @@ export function stringify (val: any, opts?: StringifyOpts): string { return `${value}n` } + if (opts?.sortedElements === true && + typeof value === 'object' && + !Array.isArray(value)) { + return Object.keys(value).sort().reduce( + (obj, key) => { + // @ts-expect-error: no way to avoid implicit 'any' + obj[key] = value[key] + return obj + }, + {} + ) + } + if (opts?.useCustomToString === true && typeof value === 'object' && !Array.isArray(value) && @@ -47,6 +61,7 @@ export function stringify (val: any, opts?: StringifyOpts): string { value.toString !== Object.prototype.toString) { return value?.toString() } + return value }) } diff --git a/packages/neo4j-driver-deno/lib/core/session.ts b/packages/neo4j-driver-deno/lib/core/session.ts index 8dce7b804..a2055a954 100644 --- a/packages/neo4j-driver-deno/lib/core/session.ts +++ b/packages/neo4j-driver-deno/lib/core/session.ts @@ -37,6 +37,7 @@ import BookmarkManager from './bookmark-manager.ts' import { RecordShape } from './record.ts' import NotificationFilter from './notification-filter.ts' import { Logger } from './internal/logger.ts' +import { cacheKey } from './internal/auth-util.ts' type ConnectionConsumer = (connection: Connection) => Promise | T type TransactionWork = (tx: Transaction) => Promise | T @@ -74,6 +75,10 @@ class Session { private readonly _bookmarkManager?: BookmarkManager private readonly _notificationFilter?: NotificationFilter private readonly _log: Logger + private readonly _homeDatabaseCallback: Function | undefined + private readonly _auth: AuthToken | undefined + private _databaseGuess: string | undefined + private readonly _isRoutingSession: boolean /** * @constructor * @protected @@ -86,8 +91,11 @@ class Session { * @param {boolean} args.reactive - Whether this session should create reactive streams * @param {number} args.fetchSize - Defines how many records is pulled in each pulling batch * @param {string} args.impersonatedUser - The username which the user wants to impersonate for the duration of the session. - * @param {AuthToken} args.auth - the target auth for the to-be-acquired connection + * @param {BookmarkManager} args.bookmarkManager - The bookmark manager used for this session. * @param {NotificationFilter} args.notificationFilter - The notification filter used for this session. + * @param {AuthToken} args.auth - the target auth for the to-be-acquired connection + * @param {Logger} args.log - the logger used for logs in this session. + * @param {(user:string, database:string) => void} args.homeDatabaseCallback - callback used to update the home database cache */ constructor ({ mode, @@ -101,7 +109,8 @@ class Session { bookmarkManager, notificationFilter, auth, - log + log, + homeDatabaseCallback }: { mode: SessionMode connectionProvider: ConnectionProvider @@ -115,12 +124,14 @@ class Session { notificationFilter?: NotificationFilter auth?: AuthToken log: Logger + homeDatabaseCallback?: (user: string, database: string) => void }) { this._mode = mode this._database = database this._reactive = reactive this._fetchSize = fetchSize - this._onDatabaseNameResolved = this._onDatabaseNameResolved.bind(this) + this._homeDatabaseCallback = homeDatabaseCallback + this._auth = auth this._getConnectionAcquistionBookmarks = this._getConnectionAcquistionBookmarks.bind(this) this._readConnectionHolder = new ConnectionHolder({ mode: ACCESS_MODE_READ, @@ -129,7 +140,7 @@ class Session { bookmarks, connectionProvider, impersonatedUser, - onDatabaseNameResolved: this._onDatabaseNameResolved, + onDatabaseNameResolved: this._onDatabaseNameResolved.bind(this), getConnectionAcquistionBookmarks: this._getConnectionAcquistionBookmarks, log }) @@ -140,7 +151,7 @@ class Session { bookmarks, connectionProvider, impersonatedUser, - onDatabaseNameResolved: this._onDatabaseNameResolved, + onDatabaseNameResolved: this._onDatabaseNameResolved.bind(this), getConnectionAcquistionBookmarks: this._getConnectionAcquistionBookmarks, log }) @@ -158,6 +169,8 @@ class Session { this._bookmarkManager = bookmarkManager this._notificationFilter = notificationFilter this._log = log + this._databaseGuess = config?.cachedHomeDatabase + this._isRoutingSession = config?.routingDriver ?? false } /** @@ -201,7 +214,8 @@ class Session { fetchSize: this._fetchSize, lowRecordWatermark: this._lowRecordWatermark, highRecordWatermark: this._highRecordWatermark, - notificationFilter: this._notificationFilter + notificationFilter: this._notificationFilter, + onDb: this._onDatabaseNameResolved.bind(this) }) }) this._results.push(result) @@ -254,7 +268,7 @@ class Session { resultPromise = Promise.reject( newError('Cannot run query in a closed session.') ) - } else if (!this._hasTx && connectionHolder.initializeConnection()) { + } else if (!this._hasTx && connectionHolder.initializeConnection(this._databaseGuess)) { resultPromise = connectionHolder .getConnection() // Connection won't be null at this point since the initialize method @@ -310,7 +324,7 @@ class Session { const mode = Session._validateSessionMode(accessMode) const connectionHolder = this._connectionHolderWithMode(mode) - connectionHolder.initializeConnection() + connectionHolder.initializeConnection(this._databaseGuess) this._hasTx = true const tx = new TransactionPromise({ @@ -324,7 +338,8 @@ class Session { lowRecordWatermark: this._lowRecordWatermark, highRecordWatermark: this._highRecordWatermark, notificationFilter: this._notificationFilter, - apiTelemetryConfig + apiTelemetryConfig, + onDbCallback: this._onDatabaseNameResolved.bind(this) }) tx._begin(() => this._bookmarks(), txConfig) return tx @@ -508,12 +523,18 @@ class Session { * @returns {void} */ _onDatabaseNameResolved (database?: string): void { - if (!this._databaseNameResolved) { - const normalizedDatabase = database ?? '' - this._database = normalizedDatabase - this._readConnectionHolder.setDatabase(normalizedDatabase) - this._writeConnectionHolder.setDatabase(normalizedDatabase) - this._databaseNameResolved = true + if (this._isRoutingSession) { + this._databaseGuess = database + if (!this._databaseNameResolved) { + const normalizedDatabase = database ?? '' + this._database = normalizedDatabase + this._readConnectionHolder.setDatabase(normalizedDatabase) + this._writeConnectionHolder.setDatabase(normalizedDatabase) + this._databaseNameResolved = true + if (this._homeDatabaseCallback != null) { + this._homeDatabaseCallback(cacheKey(this._auth, this._impersonatedUser), database) + } + } } } diff --git a/packages/neo4j-driver-deno/lib/core/transaction-promise.ts b/packages/neo4j-driver-deno/lib/core/transaction-promise.ts index 66eb5b79c..b12f21c58 100644 --- a/packages/neo4j-driver-deno/lib/core/transaction-promise.ts +++ b/packages/neo4j-driver-deno/lib/core/transaction-promise.ts @@ -43,6 +43,7 @@ class TransactionPromise extends Transaction implements Promise { private _beginPromise?: Promise private _reject?: (error: Error) => void private _resolve?: (value: Transaction | PromiseLike) => void + private readonly _onDbCallback: (database: string) => void /** * @constructor @@ -69,7 +70,8 @@ class TransactionPromise extends Transaction implements Promise { highRecordWatermark, lowRecordWatermark, notificationFilter, - apiTelemetryConfig + apiTelemetryConfig, + onDbCallback }: { connectionHolder: ConnectionHolder onClose: () => void @@ -82,6 +84,7 @@ class TransactionPromise extends Transaction implements Promise { lowRecordWatermark: number notificationFilter?: NotificationFilter apiTelemetryConfig?: NonAutoCommitApiTelemetryConfig + onDbCallback: (database: string) => void }) { super({ connectionHolder, @@ -96,6 +99,7 @@ class TransactionPromise extends Transaction implements Promise { notificationFilter, apiTelemetryConfig }) + this._onDbCallback = onDbCallback } /** @@ -174,7 +178,8 @@ class TransactionPromise extends Transaction implements Promise { _begin (bookmarks: () => Promise, txConfig: TxConfig): void { return super._begin(bookmarks, txConfig, { onError: this._onBeginError.bind(this), - onComplete: this._onBeginMetadata.bind(this) + onComplete: this._onBeginMetadata.bind(this), + onDB: this._onDbCallback }) } diff --git a/packages/neo4j-driver-deno/lib/core/transaction.ts b/packages/neo4j-driver-deno/lib/core/transaction.ts index 6291bd961..aec0062e8 100644 --- a/packages/neo4j-driver-deno/lib/core/transaction.ts +++ b/packages/neo4j-driver-deno/lib/core/transaction.ts @@ -140,6 +140,7 @@ class Transaction { _begin (getBookmarks: () => Promise, txConfig: TxConfig, events?: { onError: (error: Error) => void onComplete: (metadata: any) => void + onDB: (database: string) => void }): void { this._connectionHolder .getConnection() @@ -165,6 +166,9 @@ class Transaction { if (events != null) { events.onComplete(metadata) } + if (metadata.db !== undefined && ((events?.onDB) != null)) { + events.onDB(metadata.db) + } this._onComplete(metadata) } }) diff --git a/packages/neo4j-driver-lite/test/unit/index.test.ts b/packages/neo4j-driver-lite/test/unit/index.test.ts index 2ff63249d..98971e38a 100644 --- a/packages/neo4j-driver-lite/test/unit/index.test.ts +++ b/packages/neo4j-driver-lite/test/unit/index.test.ts @@ -263,7 +263,8 @@ describe('index', () => { verifyConnectivityAndGetServerInfo: async () => new ServerInfo({}), getNegotiatedProtocolVersion: async () => 5.0, verifyAuthentication: async () => true, - supportsSessionAuth: async () => true + supportsSessionAuth: async () => true, + SSREnabled: () => false } }) expect(session).toBeDefined() diff --git a/packages/neo4j-driver/test/driver.test.js b/packages/neo4j-driver/test/driver.test.js index 224086a56..d77f54c2f 100644 --- a/packages/neo4j-driver/test/driver.test.js +++ b/packages/neo4j-driver/test/driver.test.js @@ -182,6 +182,52 @@ describe('#unit driver', () => { expect(session._session._log).toBe(driver._log) }) }) + + describe('homeDatabaseCache', () => { + it('should build homedb cache from callback functions', () => { + driver = neo4j.driver( + `neo4j+ssc://${sharedNeo4j.hostnameWithBoltPort}`, + sharedNeo4j.authToken + ) + driver._homeDatabaseCallback('DEFAULT', 'neo4j') + expect(driver.homeDatabaseCache.get('DEFAULT')).toBe('neo4j') + driver._homeDatabaseCallback('basic:hi', 'neo4j') + expect(driver.homeDatabaseCache.get('basic:hi')).toBe('neo4j') + }) + + it('should change homedb entries with new info', () => { + driver = neo4j.driver( + `neo4j+ssc://${sharedNeo4j.hostnameWithBoltPort}`, + sharedNeo4j.authToken + ) + driver._homeDatabaseCallback('DEFAULT', 'neo4j') + expect(driver.homeDatabaseCache.get('DEFAULT')).toBe('neo4j') + driver._homeDatabaseCallback('DEFAULT', 'neo5j') + expect(driver.homeDatabaseCache.get('DEFAULT')).toBe('neo5j') + }) + + it('should cap homeDb size by removing least recently used', async () => { + driver = neo4j.driver( + `neo4j+ssc://${sharedNeo4j.hostnameWithBoltPort}`, + sharedNeo4j.authToken + ) + for (let i = 0; i < 9999; i++) { + driver._homeDatabaseCallback(i.toString(), 'neo4j') + } + await new Promise(resolve => setTimeout(resolve, 100)) + driver._homeDatabaseCallback('5', 'neo4j') + driver.homeDatabaseCache.get('55') + for (let i = 9999; i < 10050; i++) { + driver._homeDatabaseCallback(i.toString(), 'neo4j') + } + expect(driver.homeDatabaseCache.get('1')).toEqual(undefined) + expect(driver.homeDatabaseCache.get('901')).toEqual(undefined) + expect(driver.homeDatabaseCache.get('5')).toEqual('neo4j') + expect(driver.homeDatabaseCache.get('55')).toEqual('neo4j') + expect(driver.homeDatabaseCache.get('1001')).toEqual('neo4j') + expect(driver.homeDatabaseCache.get('10001')).toEqual('neo4j') + }) + }) }) describe('#integration driver', () => { @@ -536,6 +582,33 @@ describe('#integration driver', () => { expect(connections1[0]).not.toEqual(connections2[0]) }) + describe('HomeDatabaseCache', () => { + [['with driver auth', {}, 'DEFAULT'], + ['with session auth', { auth: sharedNeo4j.authToken }, 'basic:neo4j'], + ['with impersonated user', { impersonatedUser: 'neo4j' }, 'basic:neo4j']].forEach(([string, auth, key]) => { + it('should build home database cache ' + string, async () => { + driver = neo4j.driver( + `neo4j://${sharedNeo4j.hostnameWithBoltPort}`, + sharedNeo4j.authToken + ) + if (protocolVersion >= 5.8) { + try { + const session1 = driver.session(auth) + await session1.run('CREATE () RETURN 42') + expect(driver.homeDatabaseCache.get(key)).toBe('neo4j') // should have set the homedb in cache + expect(session1._database).toBe('neo4j') // should have pinned database to the session + const session2 = driver.session(auth) + expect(session2._databaseGuess).toBe('neo4j') // second session should use the homedb as a guess... + expect(session2._database).toBe('') // ...but should not pin this to the session. + await session2.run('CREATE () RETURN 43') + } catch (e) { + expect(e.message.includes('Impersonation is not supported in community edition.')).toBe(true) + } + } + }) + }) + }) + it('should discard old connections', async () => { const maxLifetime = 100000 driver = neo4j.driver( diff --git a/packages/neo4j-driver/test/internal/connection-channel.test.js b/packages/neo4j-driver/test/internal/connection-channel.test.js index 34e94663d..7e42a8c6a 100644 --- a/packages/neo4j-driver/test/internal/connection-channel.test.js +++ b/packages/neo4j-driver/test/internal/connection-channel.test.js @@ -158,6 +158,7 @@ describe('#integration ChannelConnection', () => { Logger.noOp(), null, null, + undefined, () => channel ) .then(c => { diff --git a/packages/testkit-backend/src/feature/common.js b/packages/testkit-backend/src/feature/common.js index 5dec96283..19253b8e5 100644 --- a/packages/testkit-backend/src/feature/common.js +++ b/packages/testkit-backend/src/feature/common.js @@ -27,10 +27,12 @@ const features = [ 'Feature:Bolt:5.5', 'Feature:Bolt:5.6', 'Feature:Bolt:5.7', + 'Feature:Bolt:5.8', 'Feature:Bolt:Patch:UTC', 'Feature:API:ConnectionAcquisitionTimeout', 'Feature:API:Driver.ExecuteQuery', 'Feature:API:Driver.ExecuteQuery:WithAuth', + 'Feature:API:Driver:MaxConnectionLifetime', 'Feature:API:Driver:NotificationsConfig', 'Feature:API:Driver:GetServerInfo', 'Feature:API:Driver.SupportsSessionAuth', @@ -46,6 +48,8 @@ const features = [ 'Optimization:MinimalBookmarksSet', 'Optimization:MinimalResets', 'Optimization:AuthPipelining', + 'Optimization:HomeDatabaseCache', + 'Optimization:HomeDbCacheBasicPrincipalIsImpersonatedUser', 'Detail:NumberIsNumber' ] diff --git a/packages/testkit-backend/src/request-handlers.js b/packages/testkit-backend/src/request-handlers.js index 4c3a32423..304b1b7dd 100644 --- a/packages/testkit-backend/src/request-handlers.js +++ b/packages/testkit-backend/src/request-handlers.js @@ -67,6 +67,9 @@ export function NewDriver ({ neo4j }, context, data, wire) { if ('connectionTimeoutMs' in data) { config.connectionTimeout = data.connectionTimeoutMs } + if ('maxConnectionLifetimeMs' in data) { + config.maxConnectionLifetime = data.maxConnectionLifetimeMs + } if ('fetchSize' in data) { config.fetchSize = data.fetchSize } diff --git a/packages/testkit-backend/src/skipped-tests/common.js b/packages/testkit-backend/src/skipped-tests/common.js index 2f1e6f6ee..0f0882de7 100644 --- a/packages/testkit-backend/src/skipped-tests/common.js +++ b/packages/testkit-backend/src/skipped-tests/common.js @@ -197,6 +197,10 @@ const skippedTests = [ skip( 'Needs trying all DNS resolved addresses for hosts in the routing table', ifEndsWith('.test_ipv6_read').and(startsWith('stub.routing.test_routing_')) + ), + skip( + 'Driver has separate timeouts for every connection it attempts to open. This will change in 6.0', + ifEquals('stub.homedb.test_homedb.TestHomeDbMixedCluster.test_connection_acquisition_timeout_during_fallback') ) ]