From f0605b98977753cfbfd3e3608365a4e9b922c8fd Mon Sep 17 00:00:00 2001 From: lutovich Date: Fri, 10 Mar 2017 00:03:29 +0100 Subject: [PATCH 1/3] Upgrade neokit --- neokit | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/neokit b/neokit index 8ee8fa80b..868498aa8 160000 --- a/neokit +++ b/neokit @@ -1 +1 @@ -Subproject commit 8ee8fa80b2e560339a18f1872d314d3ba64f0427 +Subproject commit 868498aa8cd589975b3315577c024f879bea0c5b From b01d369ce9bfe1322edd5e431398cdee8f9991b8 Mon Sep 17 00:00:00 2001 From: lutovich Date: Tue, 7 Mar 2017 18:07:32 +0100 Subject: [PATCH 2/3] Make session aware of connection provider So it could later use it to obtain connections on demand rather than hold a single connection permanently. --- src/v1/driver.js | 29 +++++++++-------- src/v1/internal/connection-providers.js | 37 +++++++++++++++++++--- src/v1/routing-driver.js | 22 +++++-------- src/v1/session.js | 8 +++-- test/internal/connection-providers.test.js | 13 ++++++-- test/v1/session.test.js | 19 +++++++++-- 6 files changed, 86 insertions(+), 42 deletions(-) diff --git a/src/v1/driver.js b/src/v1/driver.js index d61c8265a..457c1f6b0 100644 --- a/src/v1/driver.js +++ b/src/v1/driver.js @@ -58,7 +58,7 @@ class Driver { Driver._validateConnection.bind(this), config.connectionPoolSize ); - this._connectionProvider = this._createConnectionProvider(url, this._pool); + this._connectionProvider = this._createConnectionProvider(url, this._pool, this._driverOnErrorCallback.bind(this)); } /** @@ -113,15 +113,7 @@ class Driver { */ session(mode) { const sessionMode = Driver._validateSessionMode(mode); - const connectionPromise = this._connectionProvider.acquireConnection(sessionMode); - connectionPromise.catch((err) => { - if (this.onError && err.code === SERVICE_UNAVAILABLE) { - this.onError(err); - } else { - //we don't need to tell the driver about this error - } - }); - return this._createSession(connectionPromise); + return this._createSession(sessionMode, this._connectionProvider); } static _validateSessionMode(rawMode) { @@ -133,13 +125,22 @@ class Driver { } //Extension point - _createConnectionProvider(address, connectionPool) { - return new DirectConnectionProvider(address, connectionPool); + _createConnectionProvider(address, connectionPool, driverOnErrorCallback) { + return new DirectConnectionProvider(address, connectionPool, driverOnErrorCallback); } //Extension point - _createSession(connectionPromise) { - return new Session(connectionPromise); + _createSession(mode, connectionProvider) { + return new Session(mode, connectionProvider); + } + + _driverOnErrorCallback(error) { + const userDefinedOnErrorCallback = this.onError; + if (userDefinedOnErrorCallback && error.code === SERVICE_UNAVAILABLE) { + userDefinedOnErrorCallback(error); + } else { + // we don't need to tell the driver about this error + } } /** diff --git a/src/v1/internal/connection-providers.js b/src/v1/internal/connection-providers.js index 8b8db5b43..6928fb7d1 100644 --- a/src/v1/internal/connection-providers.js +++ b/src/v1/internal/connection-providers.js @@ -29,32 +29,43 @@ class ConnectionProvider { acquireConnection(mode) { throw new Error('Abstract method'); } + + _withAdditionalOnErrorCallback(connectionPromise, driverOnErrorCallback) { + // install error handler from the driver on the connection promise; this callback is installed separately + // so that it does not handle errors, instead it is just an additional error reporting facility. + connectionPromise.catch(error => driverOnErrorCallback(error)); + // return the original connection promise + return connectionPromise; + } } export class DirectConnectionProvider extends ConnectionProvider { - constructor(address, connectionPool) { + constructor(address, connectionPool, driverOnErrorCallback) { super(); this._address = address; this._connectionPool = connectionPool; + this._driverOnErrorCallback = driverOnErrorCallback; } acquireConnection(mode) { - return Promise.resolve(this._connectionPool.acquire(this._address)); + const connectionPromise = Promise.resolve(this._connectionPool.acquire(this._address)); + return this._withAdditionalOnErrorCallback(connectionPromise, this._driverOnErrorCallback); } } export class LoadBalancer extends ConnectionProvider { - constructor(address, connectionPool) { + constructor(address, connectionPool, driverOnErrorCallback) { super(); this._routingTable = new RoutingTable(new RoundRobinArray([address])); this._rediscovery = new Rediscovery(); this._connectionPool = connectionPool; + this._driverOnErrorCallback = driverOnErrorCallback; } acquireConnection(mode) { - return this._freshRoutingTable().then(routingTable => { + const connectionPromise = this._freshRoutingTable().then(routingTable => { if (mode === READ) { return this._acquireConnectionToServer(routingTable.readers, 'read'); } else if (mode === WRITE) { @@ -63,6 +74,7 @@ export class LoadBalancer extends ConnectionProvider { throw newError('Illegal mode ' + mode); } }); + return this._withAdditionalOnErrorCallback(connectionPromise, this._driverOnErrorCallback); } forget(address) { @@ -132,7 +144,8 @@ export class LoadBalancer extends ConnectionProvider { _createSessionForRediscovery(routerAddress) { const connection = this._connectionPool.acquire(routerAddress); const connectionPromise = Promise.resolve(connection); - return new Session(connectionPromise); + const connectionProvider = new SingleConnectionProvider(connectionPromise); + return new Session(READ, connectionProvider); } _updateRoutingTable(newRoutingTable) { @@ -153,3 +166,17 @@ export class LoadBalancer extends ConnectionProvider { } } } + +export class SingleConnectionProvider extends ConnectionProvider { + + constructor(connectionPromise) { + super(); + this._connectionPromise = connectionPromise; + } + + acquireConnection(mode) { + const connectionPromise = this._connectionPromise; + this._connectionPromise = null; + return connectionPromise; + } +} diff --git a/src/v1/routing-driver.js b/src/v1/routing-driver.js index 1f2091558..8edb01646 100644 --- a/src/v1/routing-driver.js +++ b/src/v1/routing-driver.js @@ -31,30 +31,24 @@ class RoutingDriver extends Driver { super(url, userAgent, token, RoutingDriver._validateConfig(config)); } - _createConnectionProvider(address, connectionPool) { - return new LoadBalancer(address, connectionPool); + _createConnectionProvider(address, connectionPool, driverOnErrorCallback) { + return new LoadBalancer(address, connectionPool, driverOnErrorCallback); } - _createSession(connectionPromise) { - return new RoutingSession(connectionPromise, (error, conn) => { + _createSession(mode, connectionProvider) { + return new RoutingSession(mode, connectionProvider, (error, conn) => { if (error.code === SERVICE_UNAVAILABLE || error.code === SESSION_EXPIRED) { + // connection is undefined if error happened before connection was acquired if (conn) { this._connectionProvider.forget(conn.url); - } else { - connectionPromise.then((conn) => { - this._connectionProvider.forget(conn.url); - }).catch(() => {/*ignore*/}); } return error; } else if (error.code === 'Neo.ClientError.Cluster.NotALeader') { let url = 'UNKNOWN'; + // connection is undefined if error happened before connection was acquired if (conn) { url = conn.url; this._connectionProvider.forgetWriter(conn.url); - } else { - connectionPromise.then((conn) => { - this._connectionProvider.forgetWriter(conn.url); - }).catch(() => {/*ignore*/}); } return newError('No longer possible to write to server at ' + url, SESSION_EXPIRED); } else { @@ -72,8 +66,8 @@ class RoutingDriver extends Driver { } class RoutingSession extends Session { - constructor(connectionPromise, onFailedConnection) { - super(connectionPromise); + constructor(mode, connectionProvider, onFailedConnection) { + super(mode, connectionProvider); this._onFailedConnection = onFailedConnection; } diff --git a/src/v1/session.js b/src/v1/session.js index d308781b6..9532b16e7 100644 --- a/src/v1/session.js +++ b/src/v1/session.js @@ -31,10 +31,12 @@ import {assertString} from './internal/util'; class Session { /** * @constructor - * @param {Promise.} connectionPromise - Promise of a connection to use + * todo: doc params */ - constructor(connectionPromise) { - this._connectionPromise = connectionPromise; + constructor(mode, connectionProvider) { + this._mode = mode; + this._connectionProvider = connectionProvider; + this._connectionPromise = this._connectionProvider.acquireConnection(this._mode); this._open = true; this._hasTx = false; } diff --git a/test/internal/connection-providers.test.js b/test/internal/connection-providers.test.js index 662506041..04329d6a1 100644 --- a/test/internal/connection-providers.test.js +++ b/test/internal/connection-providers.test.js @@ -25,11 +25,14 @@ import RoundRobinArray from '../../src/v1/internal/round-robin-array'; import {DirectConnectionProvider, LoadBalancer} from '../../src/v1/internal/connection-providers'; import Pool from '../../src/v1/internal/pool'; +const NO_OP_DRIVER_CALLBACK = () => { +}; + describe('DirectConnectionProvider', () => { it('acquires connection from the pool', done => { const pool = newPool(); - const connectionProvider = new DirectConnectionProvider('localhost:123', pool); + const connectionProvider = newDirectConnectionProvider('localhost:123', pool); connectionProvider.acquireConnection(READ).then(connection => { expect(connection).toBeDefined(); @@ -132,7 +135,7 @@ describe('LoadBalancer', () => { }); it('initializes routing table with the given router', () => { - const loadBalancer = new LoadBalancer('server-ABC', newPool()); + const loadBalancer = new LoadBalancer('server-ABC', newPool(), NO_OP_DRIVER_CALLBACK); expectRoutingTable(loadBalancer, ['server-ABC'], @@ -581,8 +584,12 @@ describe('LoadBalancer', () => { }); +function newDirectConnectionProvider(address, pool) { + return new DirectConnectionProvider(address, pool, NO_OP_DRIVER_CALLBACK); +} + function newLoadBalancer(routers, readers, writers, pool = null, expirationTime = Integer.MAX_VALUE, routerToRoutingTable = {}) { - const loadBalancer = new LoadBalancer(null, pool || newPool()); + const loadBalancer = new LoadBalancer(null, pool || newPool(), NO_OP_DRIVER_CALLBACK); loadBalancer._routingTable = new RoutingTable( new RoundRobinArray(routers), new RoundRobinArray(readers), diff --git a/test/v1/session.test.js b/test/v1/session.test.js index 30fbcbf67..d9459232a 100644 --- a/test/v1/session.test.js +++ b/test/v1/session.test.js @@ -20,6 +20,8 @@ import neo4j from '../../src/v1'; import {statementType} from '../../src/v1/result-summary'; import Session from '../../src/v1/session'; +import {READ} from '../../src/v1/driver'; +import {SingleConnectionProvider} from '../../src/v1/internal/connection-providers'; describe('session', () => { @@ -47,14 +49,14 @@ describe('session', () => { it('close should invoke callback ', done => { const connection = new FakeConnection(); - const session = new Session(Promise.resolve(connection)); + const session = newSessionWithConnection(connection); session.close(done); }); it('close should invoke callback even when already closed ', done => { const connection = new FakeConnection(); - const session = new Session(Promise.resolve(connection)); + const session = newSessionWithConnection(connection); session.close(() => { session.close(() => { @@ -67,7 +69,7 @@ describe('session', () => { it('close should be idempotent ', done => { const connection = new FakeConnection(); - const session = new Session(Promise.resolve(connection)); + const session = newSessionWithConnection(connection); session.close(() => { expect(connection.closedOnce()).toBeTruthy(); @@ -441,6 +443,11 @@ describe('session', () => { expect(() => driver.session('ILLEGAL_MODE')).toThrow(); }); + function newSessionWithConnection(connection) { + const connectionProvider = new SingleConnectionProvider(Promise.resolve(connection)); + return new Session(READ, connectionProvider); + } + class FakeConnection { constructor() { @@ -449,6 +456,12 @@ describe('session', () => { this.releaseInvoked = 0; } + run() { + } + + discardAll() { + } + reset() { this.resetInvoked++; } From 48e0649b554b07df71a3200594010dc8d6fc34f0 Mon Sep 17 00:00:00 2001 From: lutovich Date: Tue, 7 Mar 2017 19:04:02 +0100 Subject: [PATCH 3/3] Lazy connection initialization and eager release This commit effectively decouples session from connection. Session now holds two `ConnectionHolder`s that lazily pick connection from the pool when it is needed and try to release connection back to the pool when it does not have any users. Session keeps one connection holder for read connection and one for write connection. Connection is acquired for `Session#run()` and released when the returned result is either fully consumed or error happens. Also connection is acquired for `Session#beginTransaction()` and released when transaction commits or rolls back. --- src/v1/internal/connection-holder.js | 137 ++++++++++++++++ src/v1/internal/connection-providers.js | 7 +- src/v1/internal/connector.js | 15 +- src/v1/internal/get-servers-util.js | 6 +- src/v1/result.js | 39 +++-- src/v1/session.js | 68 ++++---- src/v1/transaction.js | 130 ++++++++++------ test/internal/connection-holder.test.js | 197 ++++++++++++++++++++++++ test/internal/fake-connection.js | 83 ++++++++++ test/internal/tls.test.js | 4 +- test/v1/driver.test.js | 23 ++- test/v1/routing.driver.boltkit.it.js | 3 +- test/v1/session.test.js | 185 +++++++++++++++++----- test/v1/tck/steps/erroreportingsteps.js | 2 +- 14 files changed, 751 insertions(+), 148 deletions(-) create mode 100644 src/v1/internal/connection-holder.js create mode 100644 test/internal/connection-holder.test.js create mode 100644 test/internal/fake-connection.js diff --git a/src/v1/internal/connection-holder.js b/src/v1/internal/connection-holder.js new file mode 100644 index 000000000..bae63cacc --- /dev/null +++ b/src/v1/internal/connection-holder.js @@ -0,0 +1,137 @@ +/** + * Copyright (c) 2002-2017 "Neo Technology,"," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import {newError} from '../error'; + +/** + * Utility to lazily initialize connections and return them back to the pool when unused. + */ +export default class ConnectionHolder { + + /** + * @constructor + * @param {string} mode - the access mode for new connection holder. + * @param {ConnectionProvider} connectionProvider - the connection provider to acquire connections from. + */ + constructor(mode, connectionProvider) { + this._mode = mode; + this._connectionProvider = connectionProvider; + this._referenceCount = 0; + this._connectionPromise = Promise.resolve(null); + } + + /** + * Make this holder initialize new connection if none exists already. + * @return {undefined} + */ + initializeConnection() { + if (this._referenceCount === 0) { + this._connectionPromise = this._connectionProvider.acquireConnection(this._mode); + } + this._referenceCount++; + } + + /** + * Get the current connection promise. + * @return {Promise} promise resolved with the current connection. + */ + getConnection() { + return this._connectionPromise; + } + + /** + * Notify this holder that single party does not require current connection any more. + * @return {Promise} promise resolved with the current connection. + */ + releaseConnection() { + if (this._referenceCount === 0) { + return this._connectionPromise; + } + + this._referenceCount--; + if (this._referenceCount === 0) { + // release a connection without muting ACK_FAILURE, this is the last action on this connection + return this._releaseConnection(true); + } + return this._connectionPromise; + } + + /** + * Closes this holder and releases current connection (if any) despite any existing users. + * @return {Promise} promise resolved when current connection is released to the pool. + */ + close() { + if (this._referenceCount === 0) { + return this._connectionPromise; + } + this._referenceCount = 0; + // release a connection and mute ACK_FAILURE, this might be called concurrently with other + // operations and thus should ignore failure handling + return this._releaseConnection(false); + } + + /** + * Return the current pooled connection instance to the connection pool. + * We don't pool Session instances, to avoid users using the Session after they've called close. + * The `Session` object is just a thin wrapper around Connection anyway, so it makes little difference. + * @return {Promise} - promise resolved then connection is returned to the pool. + * @private + */ + _releaseConnection(sync) { + this._connectionPromise = this._connectionPromise.then(connection => { + if (connection) { + if(sync) { + connection.reset(); + } else { + connection.resetAsync(); + } + connection.sync(); + connection._release(); + } + }).catch(ignoredError => { + }); + + return this._connectionPromise; + } +} + +class EmptyConnectionHolder extends ConnectionHolder { + + initializeConnection() { + // nothing to initialize + } + + getConnection() { + return Promise.reject(newError('This connection holder does not serve connections')); + } + + releaseConnection() { + return Promise.resolve(); + } + + close() { + return Promise.resolve(); + } +} + +/** + * Connection holder that does not manage any connections. + * @type {ConnectionHolder} + */ +export const EMPTY_CONNECTION_HOLDER = new EmptyConnectionHolder(); diff --git a/src/v1/internal/connection-providers.js b/src/v1/internal/connection-providers.js index 6928fb7d1..c7dcf31b2 100644 --- a/src/v1/internal/connection-providers.js +++ b/src/v1/internal/connection-providers.js @@ -33,7 +33,9 @@ class ConnectionProvider { _withAdditionalOnErrorCallback(connectionPromise, driverOnErrorCallback) { // install error handler from the driver on the connection promise; this callback is installed separately // so that it does not handle errors, instead it is just an additional error reporting facility. - connectionPromise.catch(error => driverOnErrorCallback(error)); + connectionPromise.catch(error => { + driverOnErrorCallback(error) + }); // return the original connection promise return connectionPromise; } @@ -49,7 +51,8 @@ export class DirectConnectionProvider extends ConnectionProvider { } acquireConnection(mode) { - const connectionPromise = Promise.resolve(this._connectionPool.acquire(this._address)); + const connection = this._connectionPool.acquire(this._address); + const connectionPromise = Promise.resolve(connection); return this._withAdditionalOnErrorCallback(connectionPromise, this._driverOnErrorCallback); } } diff --git a/src/v1/internal/connector.js b/src/v1/internal/connector.js index 901abeefc..7d0ed8dc2 100644 --- a/src/v1/internal/connector.js +++ b/src/v1/internal/connector.js @@ -96,7 +96,6 @@ function log(actor, msg) { } } - function NO_OP(){} let NO_OP_OBSERVER = { @@ -384,9 +383,9 @@ class Connection { this._chunker.messageBoundary(); } - /** Queue a RESET-message to be sent to the database */ - reset( observer ) { - log("C", "RESET"); + /** Queue a RESET-message to be sent to the database. Mutes failure handling. */ + resetAsync( observer ) { + log("C", "RESET_ASYNC"); this._isHandlingFailure = true; let self = this; let wrappedObs = { @@ -404,6 +403,14 @@ class Connection { this._chunker.messageBoundary(); } + /** Queue a RESET-message to be sent to the database */ + reset(observer) { + log('C', 'RESET'); + this._queueObserver(observer); + this._packer.packStruct(RESET, [], (err) => this._handleFatalError(err)); + this._chunker.messageBoundary(); + } + /** Queue a ACK_FAILURE-message to be sent to the database */ _ackFailure( observer ) { log("C", "ACK_FAILURE"); diff --git a/src/v1/internal/get-servers-util.js b/src/v1/internal/get-servers-util.js index 0591f8b29..d94da81df 100644 --- a/src/v1/internal/get-servers-util.js +++ b/src/v1/internal/get-servers-util.js @@ -17,9 +17,9 @@ * limitations under the License. */ -import RoundRobinArray from "./round-robin-array"; -import {newError, PROTOCOL_ERROR, SERVICE_UNAVAILABLE} from "../error"; -import Integer, {int} from "../integer"; +import RoundRobinArray from './round-robin-array'; +import {newError, PROTOCOL_ERROR, SERVICE_UNAVAILABLE} from '../error'; +import Integer, {int} from '../integer'; const PROCEDURE_CALL = 'CALL dbms.cluster.routing.getServers'; const PROCEDURE_NOT_FOUND_CODE = 'Neo.ClientError.Procedure.ProcedureNotFound'; diff --git a/src/v1/result.js b/src/v1/result.js index 6606b58db..98afe77fd 100644 --- a/src/v1/result.js +++ b/src/v1/result.js @@ -18,6 +18,7 @@ */ import ResultSummary from './result-summary'; +import {EMPTY_CONNECTION_HOLDER} from './internal/connection-holder'; /** * A stream of {@link Record} representing the result of a statement. @@ -32,13 +33,15 @@ class Result { * @param {mixed} statement - Cypher statement to execute * @param {Object} parameters - Map with parameters to use in statement * @param metaSupplier function, when called provides metadata + * @param {ConnectionHolder} connectionHolder - to be notified when result is either fully consumed or error happened. */ - constructor(streamObserver, statement, parameters, metaSupplier) { + constructor(streamObserver, statement, parameters, metaSupplier, connectionHolder) { this._streamObserver = streamObserver; this._p = null; this._statement = statement; this._parameters = parameters || {}; this._metaSupplier = metaSupplier || function(){return {};}; + this._connectionHolder = connectionHolder || EMPTY_CONNECTION_HOLDER; } /** @@ -99,23 +102,39 @@ class Result { * @return */ subscribe(observer) { - let onCompletedOriginal = observer.onCompleted; - let self = this; - let onCompletedWrapper = (metadata) => { + const onCompletedOriginal = observer.onCompleted; + const self = this; + const onCompletedWrapper = (metadata) => { - let additionalMeta = self._metaSupplier(); - for(var key in additionalMeta) { + const additionalMeta = self._metaSupplier(); + for(let key in additionalMeta) { if (additionalMeta.hasOwnProperty(key)) { metadata[key] = additionalMeta[key]; } } - let sum = new ResultSummary(this._statement, this._parameters, metadata); - onCompletedOriginal.call(observer, sum); + const sum = new ResultSummary(this._statement, this._parameters, metadata); + + // notify connection holder that the used connection is not needed any more because result has + // been fully consumed; call the original onCompleted callback after that + self._connectionHolder.releaseConnection().then(() => { + onCompletedOriginal.call(observer, sum); + }); }; observer.onCompleted = onCompletedWrapper; - observer.onError = observer.onError || ((err) => { - console.log("Uncaught error when processing result: " + err); + + const onErrorOriginal = observer.onError || (error => { + console.log("Uncaught error when processing result: " + error); }); + + const onErrorWrapper = error => { + // notify connection holder that the used connection is not needed any more because error happened + // and result can't bee consumed any further; call the original onError callback after that + self._connectionHolder.releaseConnection().then(() => { + onErrorOriginal.call(observer, error); + }); + }; + observer.onError = onErrorWrapper; + this._streamObserver.subscribe(observer); } } diff --git a/src/v1/session.js b/src/v1/session.js index 9532b16e7..e7e6723c7 100644 --- a/src/v1/session.js +++ b/src/v1/session.js @@ -21,6 +21,8 @@ import Result from './result'; import Transaction from './transaction'; import {newError} from './error'; import {assertString} from './internal/util'; +import ConnectionHolder from './internal/connection-holder'; +import {READ, WRITE} from './driver'; /** * A Session instance is used for handling the connection and @@ -29,14 +31,16 @@ import {assertString} from './internal/util'; */ class Session { + /** * @constructor - * todo: doc params + * @param {string} mode - the default access mode for this session. + * @param connectionProvider - the connection provider to acquire connections from. */ constructor(mode, connectionProvider) { this._mode = mode; - this._connectionProvider = connectionProvider; - this._connectionPromise = this._connectionProvider.acquireConnection(this._mode); + this._readConnectionHolder = new ConnectionHolder(READ, connectionProvider); + this._writeConnectionHolder = new ConnectionHolder(WRITE, connectionProvider); this._open = true; this._hasTx = false; } @@ -57,19 +61,21 @@ class Session { assertString(statement, "Cypher statement"); const streamObserver = new _RunObserver(this._onRunFailure()); + const connectionHolder = this._connectionHolderWithMode(this._mode); if (!this._hasTx) { - this._connectionPromise.then((conn) => { - streamObserver.resolveConnection(conn); - conn.run(statement, parameters, streamObserver); - conn.pullAll(streamObserver); - conn.sync(); - }).catch((err) => streamObserver.onError(err)); + connectionHolder.initializeConnection(); + connectionHolder.getConnection().then(connection => { + streamObserver.resolveConnection(connection); + connection.run(statement, parameters, streamObserver); + connection.pullAll(streamObserver); + connection.sync(); + }).catch(error => streamObserver.onError(error)); } else { streamObserver.onError(newError("Statements cannot be run directly on a " + "session with an open transaction; either run from within the " + "transaction or use a different session.")); } - return new Result( streamObserver, statement, parameters, () => streamObserver.meta() ); + return new Result( streamObserver, statement, parameters, () => streamObserver.meta(), connectionHolder ); } /** @@ -92,8 +98,15 @@ class Session { } this._hasTx = true; - return new Transaction(this._connectionPromise, () => { - this._hasTx = false}, + + // todo: add mode parameter + const mode = this._mode; + const connectionHolder = this._connectionHolderWithMode(mode); + connectionHolder.initializeConnection(); + + return new Transaction(connectionHolder, () => { + this._hasTx = false; + }, this._onRunFailure(), bookmark, (bookmark) => {this._lastBookmark = bookmark}); } @@ -109,7 +122,11 @@ class Session { close(callback = (() => null)) { if (this._open) { this._open = false; - this._releaseCurrentConnection().then(callback); + this._readConnectionHolder.close().then(() => { + this._writeConnectionHolder.close().then(() => { + callback(); + }); + }); } else { callback(); } @@ -120,23 +137,14 @@ class Session { return (err) => {return err}; } - /** - * Return the current pooled connection instance to the connection pool. - * We don't pool Session instances, to avoid users using the Session after they've called close. - * The `Session` object is just a thin wrapper around Connection anyway, so it makes little difference. - * @return {Promise} - promise resolved then connection is returned to the pool. - * @private - */ - _releaseCurrentConnection() { - return this._connectionPromise.then(conn => { - // Queue up a 'reset', to ensure the next user gets a clean session to work with. - conn.reset(); - conn.sync(); - - // Return connection to the pool - conn._release(); - }).catch(ignoredError => { - }); + _connectionHolderWithMode(mode) { + if (mode === READ) { + return this._readConnectionHolder; + } else if (mode === WRITE) { + return this._writeConnectionHolder; + } else { + throw newError('Unknown access mode: ' + mode); + } } } diff --git a/src/v1/transaction.js b/src/v1/transaction.js index b29e534fe..cb3cd6892 100644 --- a/src/v1/transaction.js +++ b/src/v1/transaction.js @@ -19,6 +19,7 @@ import StreamObserver from './internal/stream-observer'; import Result from './result'; import {assertString} from './internal/util'; +import {EMPTY_CONNECTION_HOLDER} from './internal/connection-holder'; /** * Represents a transaction in the Neo4j database. @@ -28,24 +29,25 @@ import {assertString} from './internal/util'; class Transaction { /** * @constructor - * @param {Promise} connectionPromise - A connection to use + * @param {ConnectionHolder} connectionHolder - the connection holder to get connection from. * @param {function()} onClose - Function to be called when transaction is committed or rolled back. * @param errorTransformer callback use to transform error * @param bookmark optional bookmark * @param onBookmark callback invoked when new bookmark is produced */ - constructor(connectionPromise, onClose, errorTransformer, bookmark, onBookmark) { - this._connectionPromise = connectionPromise; + constructor(connectionHolder, onClose, errorTransformer, bookmark, onBookmark) { + this._connectionHolder = connectionHolder; let streamObserver = new _TransactionStreamObserver(this); let params = {}; if (bookmark) { params = {bookmark: bookmark}; } - this._connectionPromise.then((conn) => { + + this._connectionHolder.getConnection().then(conn => { streamObserver.resolveConnection(conn); - conn.run("BEGIN", params, streamObserver); - conn.discardAll(streamObserver); - }).catch(streamObserver.onError); + conn.run('BEGIN', params, streamObserver); + conn.pullAll(streamObserver); + }).catch(error => streamObserver.onError(error)); this._state = _states.ACTIVE; this._onClose = onClose; @@ -68,7 +70,7 @@ class Transaction { } assertString(statement, "Cypher statement"); - return this._state.run(this._connectionPromise, new _TransactionStreamObserver(this), statement, parameters); + return this._state.run(this._connectionHolder, new _TransactionStreamObserver(this), statement, parameters); } /** @@ -79,12 +81,11 @@ class Transaction { * @returns {Result} - New Result */ commit() { - let committed = this._state.commit(this._connectionPromise, new _TransactionStreamObserver(this)); + let committed = this._state.commit(this._connectionHolder, new _TransactionStreamObserver(this)); this._state = committed.state; //clean up this._onClose(); return committed.result; - } /** @@ -95,7 +96,7 @@ class Transaction { * @returns {Result} - New Result */ rollback() { - let committed = this._state.rollback(this._connectionPromise, new _TransactionStreamObserver(this)); + let committed = this._state.rollback(this._connectionHolder, new _TransactionStreamObserver(this)); this._state = committed.state; //clean up this._onClose(); @@ -147,101 +148,136 @@ class _TransactionStreamObserver extends StreamObserver { let _states = { //The transaction is running with no explicit success or failure marked ACTIVE: { - commit: (connectionPromise, observer) => { - return {result: _runDiscardAll("COMMIT", connectionPromise, observer), + commit: (connectionHolder, observer) => { + return {result: _runDiscardAll("COMMIT", connectionHolder, observer), state: _states.SUCCEEDED} }, - rollback: (connectionPromise, observer) => { - return {result: _runDiscardAll("ROLLBACK", connectionPromise, observer), state: _states.ROLLED_BACK}; + rollback: (connectionHolder, observer) => { + return {result: _runDiscardAll("ROLLBACK", connectionHolder, observer), state: _states.ROLLED_BACK}; }, - run: (connectionPromise, observer, statement, parameters) => { - connectionPromise.then((conn) => { + run: (connectionHolder, observer, statement, parameters) => { + connectionHolder.getConnection().then(conn => { observer.resolveConnection(conn); - conn.run( statement, parameters || {}, observer ); - conn.pullAll( observer ); + conn.run(statement, parameters || {}, observer); + conn.pullAll(observer); conn.sync(); - }).catch(observer.onError); + }).catch(error => observer.onError(error)); - return new Result( observer, statement, parameters, () => observer.serverMeta() ); + return newRunResult(observer, statement, parameters, () => observer.serverMeta()); } }, //An error has occurred, transaction can no longer be used and no more messages will // be sent for this transaction. FAILED: { - commit: (conn, observer) => { + commit: (connectionHolder, observer) => { observer.onError({ error: "Cannot commit statements in this transaction, because previous statements in the " + "transaction has failed and the transaction has been rolled back. Please start a new" + " transaction to run another statement." }); - return {result: new Result(observer, "COMMIT", {}), state: _states.FAILED}; + return {result: newDummyResult(observer, "COMMIT", {}), state: _states.FAILED}; }, - rollback: (conn, observer) => { + rollback: (connectionHolder, observer) => { observer.onError({error: "Cannot rollback transaction, because previous statements in the " + "transaction has failed and the transaction has already been rolled back."}); - return {result: new Result(observer, "ROLLBACK", {}), state: _states.FAILED}; + return {result: newDummyResult(observer, "ROLLBACK", {}), state: _states.FAILED}; }, - run: (conn, observer, statement, parameters) => { + run: (connectionHolder, observer, statement, parameters) => { observer.onError({error: "Cannot run statement, because previous statements in the " + "transaction has failed and the transaction has already been rolled back."}); - return new Result(observer, statement, parameters); + return newDummyResult(observer, statement, parameters); } }, //This transaction has successfully committed SUCCEEDED: { - commit: (conn, observer) => { + commit: (connectionHolder, observer) => { observer.onError({ error: "Cannot commit statements in this transaction, because commit has already been successfully called on the transaction and transaction has been closed. Please start a new" + " transaction to run another statement." }); - return {result: new Result(observer, "COMMIT", {}), state: _states.SUCCEEDED}; + return {result: newDummyResult(observer, "COMMIT", {}), state: _states.SUCCEEDED}; }, - rollback: (conn, observer) => { + rollback: (connectionHolder, observer) => { observer.onError({error: "Cannot rollback transaction, because transaction has already been successfully closed."}); - return {result: new Result(observer, "ROLLBACK", {}), state: _states.SUCCEEDED}; + return {result: newDummyResult(observer, "ROLLBACK", {}), state: _states.SUCCEEDED}; }, - run: (conn, observer, statement, parameters) => { + run: (connectionHolder, observer, statement, parameters) => { observer.onError({error: "Cannot run statement, because transaction has already been successfully closed."}); - return new Result(observer, statement, parameters); + return newDummyResult(observer, statement, parameters); } }, //This transaction has been rolled back ROLLED_BACK: { - commit: (conn, observer) => { + commit: (connectionHolder, observer) => { observer.onError({ error: "Cannot commit this transaction, because it has already been rolled back." }); - return {result: new Result(observer, "COMMIT", {}), state: _states.ROLLED_BACK}; + return {result: newDummyResult(observer, "COMMIT", {}), state: _states.ROLLED_BACK}; }, - rollback: (conn, observer) => { + rollback: (connectionHolder, observer) => { observer.onError({error: "Cannot rollback transaction, because transaction has already been rolled back."}); - return {result: new Result(observer, "ROLLBACK", {}), state: _states.ROLLED_BACK}; + return {result: newDummyResult(observer, "ROLLBACK", {}), state: _states.ROLLED_BACK}; }, - run: (conn, observer, statement, parameters) => { + run: (connectionHolder, observer, statement, parameters) => { observer.onError({error: "Cannot run statement, because transaction has already been rolled back."}); - return new Result(observer, statement, parameters); + return newDummyResult(observer, statement, parameters); } } }; -function _runDiscardAll(msg, connectionPromise, observer) { - connectionPromise.then((conn) => { - observer.resolveConnection(conn); - conn.run(msg, {}, observer); - conn.discardAll(observer); - conn.sync(); - }).catch(observer.onError); +function _runDiscardAll(msg, connectionHolder, observer) { + connectionHolder.getConnection().then( + conn => { + observer.resolveConnection(conn); + conn.run(msg, {}, observer); + conn.pullAll(observer); + conn.sync(); + }).catch(error => observer.onError(error)); + + // for commit & rollback we need result that uses real connection holder and notifies it when + // connection is not needed and can be safely released to the pool + return new Result(observer, msg, {}, emptyMetadataSupplier, connectionHolder); +} + +/** + * Creates a {@link Result} with empty connection holder. + * Should be used as a result for running cypher statements. They can result in metadata but should not + * influence real connection holder to release connections because single transaction can have + * {@link Transaction#run} called multiple times. + * @param {StreamObserver} observer - an observer for the created result. + * @param {string} statement - the cypher statement that produced the result. + * @param {object} parameters - the parameters for cypher statement that produced the result. + * @param {function} metadataSupplier - the function that returns a metadata object. + * @return {Result} new result. + */ +function newRunResult(observer, statement, parameters, metadataSupplier) { + return new Result(observer, statement, parameters, metadataSupplier, EMPTY_CONNECTION_HOLDER); +} + +/** + * Creates a {@link Result} without metadata supplier and with empty connection holder. + * For cases when result represents an intermediate or failed action, does not require any metadata and does not + * need to influence real connection holder to release connections. + * @param {StreamObserver} observer - an observer for the created result. + * @param {string} statement - the cypher statement that produced the result. + * @param {object} parameters - the parameters for cypher statement that produced the result. + * @return {Result} new result. + */ +function newDummyResult(observer, statement, parameters) { + return new Result(observer, statement, parameters, emptyMetadataSupplier, EMPTY_CONNECTION_HOLDER); +} - return new Result(observer, msg, {}); +function emptyMetadataSupplier() { + return {}; } export default Transaction; diff --git a/test/internal/connection-holder.test.js b/test/internal/connection-holder.test.js new file mode 100644 index 000000000..8c5d1a854 --- /dev/null +++ b/test/internal/connection-holder.test.js @@ -0,0 +1,197 @@ +/** + * Copyright (c) 2002-2017 "Neo Technology,"," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import ConnectionHolder, {EMPTY_CONNECTION_HOLDER} from '../../src/v1/internal/connection-holder'; +import {SingleConnectionProvider} from '../../src/v1/internal/connection-providers'; +import {READ} from '../../src/v1/driver'; +import FakeConnection from './fake-connection'; + +describe('EmptyConnectionHolder', () => { + + it('should return rejected promise instead of connection', done => { + EMPTY_CONNECTION_HOLDER.getConnection().catch(() => { + done(); + }); + }); + + it('should return resolved promise on release', done => { + EMPTY_CONNECTION_HOLDER.releaseConnection().then(() => { + done(); + }); + }); + + it('should return resolved promise on close', done => { + EMPTY_CONNECTION_HOLDER.close().then(() => { + done(); + }); + }); + +}); + +describe('ConnectionHolder', () => { + + it('should acquire new connection during initialization', () => { + const connectionProvider = new RecordingConnectionProvider([new FakeConnection()]); + const connectionHolder = new ConnectionHolder(READ, connectionProvider); + + connectionHolder.initializeConnection(); + + expect(connectionProvider.acquireConnectionInvoked).toBe(1); + }); + + it('should return acquired during initialization connection', done => { + const connection = new FakeConnection(); + const connectionProvider = newSingleConnectionProvider(connection); + const connectionHolder = new ConnectionHolder(READ, connectionProvider); + + connectionHolder.initializeConnection(); + + connectionHolder.getConnection().then(connection => { + expect(connection).toBe(connection); + done(); + }); + }); + + it('should release connection with single user', done => { + const connection = new FakeConnection(); + const connectionProvider = newSingleConnectionProvider(connection); + const connectionHolder = new ConnectionHolder(READ, connectionProvider); + + connectionHolder.initializeConnection(); + + connectionHolder.releaseConnection().then(() => { + expect(connection.isReleasedOnce()).toBeTruthy(); + done(); + }); + }); + + it('should not release connection with multiple users', done => { + const connection = new FakeConnection(); + const connectionProvider = newSingleConnectionProvider(connection); + const connectionHolder = new ConnectionHolder(READ, connectionProvider); + + connectionHolder.initializeConnection(); + connectionHolder.initializeConnection(); + connectionHolder.initializeConnection(); + + connectionHolder.releaseConnection().then(() => { + expect(connection.isNeverReleased()).toBeTruthy(); + done(); + }); + }); + + it('should release connection with multiple users when all users release', done => { + const connection = new FakeConnection(); + const connectionProvider = newSingleConnectionProvider(connection); + const connectionHolder = new ConnectionHolder(READ, connectionProvider); + + connectionHolder.initializeConnection(); + connectionHolder.initializeConnection(); + connectionHolder.initializeConnection(); + + connectionHolder.releaseConnection().then(() => { + connectionHolder.releaseConnection().then(() => { + connectionHolder.releaseConnection().then(() => { + expect(connection.isReleasedOnce()).toBeTruthy(); + done(); + }); + }); + }); + }); + + it('should do nothing when closed and not initialized', done => { + const connection = new FakeConnection(); + const connectionProvider = newSingleConnectionProvider(connection); + const connectionHolder = new ConnectionHolder(READ, connectionProvider); + + connectionHolder.close().then(() => { + expect(connection.isNeverReleased()).toBeTruthy(); + done(); + }); + }); + + it('should close even when users exist', done => { + const connection = new FakeConnection(); + const connectionProvider = newSingleConnectionProvider(connection); + const connectionHolder = new ConnectionHolder(READ, connectionProvider); + + connectionHolder.initializeConnection(); + connectionHolder.initializeConnection(); + + connectionHolder.close().then(() => { + expect(connection.isReleasedOnceOnSessionClose()).toBeTruthy(); + done(); + }); + }); + + it('should initialize new connection after releasing current one', done => { + const connection1 = new FakeConnection(); + const connection2 = new FakeConnection(); + const connectionProvider = new RecordingConnectionProvider([connection1, connection2]); + const connectionHolder = new ConnectionHolder(READ, connectionProvider); + + connectionHolder.initializeConnection(); + + connectionHolder.releaseConnection().then(() => { + expect(connection1.isReleasedOnce()).toBeTruthy(); + + connectionHolder.initializeConnection(); + connectionHolder.releaseConnection().then(() => { + expect(connection2.isReleasedOnce()).toBeTruthy(); + done(); + }); + }); + }); + + it('should initialize new connection after being closed', done => { + const connection1 = new FakeConnection(); + const connection2 = new FakeConnection(); + const connectionProvider = new RecordingConnectionProvider([connection1, connection2]); + const connectionHolder = new ConnectionHolder(READ, connectionProvider); + + connectionHolder.initializeConnection(); + + connectionHolder.close().then(() => { + expect(connection1.isReleasedOnceOnSessionClose()).toBeTruthy(); + + connectionHolder.initializeConnection(); + connectionHolder.close().then(() => { + expect(connection2.isReleasedOnceOnSessionClose()).toBeTruthy(); + done(); + }); + }); + }); +}); + +class RecordingConnectionProvider extends SingleConnectionProvider { + + constructor(connections) { + super(Promise.resolve()); + this.connectionPromises = connections.map(conn => Promise.resolve(conn)); + this.acquireConnectionInvoked = 0; + } + + acquireConnection(mode) { + return this.connectionPromises[this.acquireConnectionInvoked++]; + } +} + +function newSingleConnectionProvider(connection) { + return new SingleConnectionProvider(Promise.resolve(connection)); +} diff --git a/test/internal/fake-connection.js b/test/internal/fake-connection.js new file mode 100644 index 000000000..60dd3c11e --- /dev/null +++ b/test/internal/fake-connection.js @@ -0,0 +1,83 @@ +/** + * Copyright (c) 2002-2017 "Neo Technology,"," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * This class is like a mock of {@link Connection} that tracks invocations count. + * It tries to maintain same "interface" as {@link Connection}. + * It could be replaced with a proper mock by a library like testdouble. + * At the time of writing such libraries require {@link Proxy} support but browser tests execute in + * PhantomJS which does not support proxies. + */ +export default class FakeConnection { + + constructor() { + this.resetInvoked = 0; + this.resetAsyncInvoked = 0; + this.syncInvoked = 0; + this.releaseInvoked = 0; + } + + run() { + } + + discardAll() { + } + + reset() { + this.resetInvoked++; + } + + resetAsync() { + this.resetAsyncInvoked++; + } + + sync() { + this.syncInvoked++; + } + + _release() { + this.releaseInvoked++; + } + + isReleasedOnceOnSessionClose() { + return this.isReleasedOnSessionCloseTimes(1); + } + + isReleasedOnSessionCloseTimes(times) { + return this.resetAsyncInvoked === times && + this.resetInvoked === 0 && + this.syncInvoked === times && + this.releaseInvoked === times; + } + + isNeverReleased() { + return this.isReleasedTimes(0); + } + + isReleasedOnce() { + return this.isReleasedTimes(1); + } + + isReleasedTimes(times) { + return this.resetAsyncInvoked === 0 && + this.resetInvoked === times && + this.syncInvoked === times && + this.releaseInvoked === times; + } +}; diff --git a/test/internal/tls.test.js b/test/internal/tls.test.js index 62ce70b77..dc58f4b09 100644 --- a/test/internal/tls.test.js +++ b/test/internal/tls.test.js @@ -269,7 +269,9 @@ describe('trust-on-first-use', function() { knownHosts: knownHostsPath }); - driver.session(); // write into the knownHost file + // create session and transaction to force creation of new connection and writing into the knownHost file + const session = driver.session(); + expect(session.beginTransaction()).toBeDefined(); // duplicate the same serverId twice setTimeout(function() { diff --git a/test/v1/driver.test.js b/test/v1/driver.test.js index 906750da5..c483bf4fa 100644 --- a/test/v1/driver.test.js +++ b/test/v1/driver.test.js @@ -53,7 +53,7 @@ describe('driver', function() { }; // When - driver.session(); + startNewTransaction(driver); }); it('should handle wrong scheme', () => { @@ -84,7 +84,7 @@ describe('driver', function() { }; // When - driver.session(); + startNewTransaction(driver); }); it('should indicate success early on correct credentials', function(done) { @@ -97,7 +97,7 @@ describe('driver', function() { }; // When - driver.session(); + startNewTransaction(driver); }); it('should be possible to pass a realm with basic auth tokens', function(done) { @@ -110,7 +110,7 @@ describe('driver', function() { }; // When - driver.session(); + startNewTransaction(driver); }); it('should be possible to create custom auth tokens', function(done) { @@ -123,7 +123,7 @@ describe('driver', function() { }; // When - driver.session(); + startNewTransaction(driver); }); it('should be possible to create custom auth tokens with additional parameters', function(done) { @@ -136,7 +136,7 @@ describe('driver', function() { }; // When - driver.session(); + startNewTransaction(driver); }); it('should fail nicely when connecting with routing to standalone server', done => { @@ -151,7 +151,7 @@ describe('driver', function() { }; // When - driver.session(); + startNewTransaction(driver); }); it('should have correct user agent', () => { @@ -192,4 +192,13 @@ describe('driver', function() { }); }); + /** + * Starts new transaction to force new network connection. + * @param {Driver} driver - the driver to use. + */ + function startNewTransaction(driver) { + const session = driver.session(); + expect(session.beginTransaction()).toBeDefined(); + } + }); diff --git a/test/v1/routing.driver.boltkit.it.js b/test/v1/routing.driver.boltkit.it.js index 457f48d5a..f4aafa70a 100644 --- a/test/v1/routing.driver.boltkit.it.js +++ b/test/v1/routing.driver.boltkit.it.js @@ -939,11 +939,10 @@ describe('routing driver', () => { for (let i = 0; i < acquiredConnections.length; i++) { expect(acquiredConnections[i]).toBe(releasedConnections[i]); } - done(); }); }); - }); + }).catch(console.log); }); }); diff --git a/test/v1/session.test.js b/test/v1/session.test.js index d9459232a..6f03c01f1 100644 --- a/test/v1/session.test.js +++ b/test/v1/session.test.js @@ -22,6 +22,7 @@ import {statementType} from '../../src/v1/result-summary'; import Session from '../../src/v1/session'; import {READ} from '../../src/v1/driver'; import {SingleConnectionProvider} from '../../src/v1/internal/connection-providers'; +import FakeConnection from '../internal/fake-connection'; describe('session', () => { @@ -72,13 +73,13 @@ describe('session', () => { const session = newSessionWithConnection(connection); session.close(() => { - expect(connection.closedOnce()).toBeTruthy(); + expect(connection.isReleasedOnceOnSessionClose()).toBeTruthy(); session.close(() => { - expect(connection.closedOnce()).toBeTruthy(); + expect(connection.isReleasedOnceOnSessionClose()).toBeTruthy(); session.close(() => { - expect(connection.closedOnce()).toBeTruthy(); + expect(connection.isReleasedOnceOnSessionClose()).toBeTruthy(); done(); }); }); @@ -372,20 +373,25 @@ describe('session', () => { it('should be able to close a long running query ', done => { //given a long running query - session.run("unwind range(1,1000000) as x create (n {prop:x}) delete n"); + session.run('unwind range(1,1000000) as x create (n {prop:x}) delete n').catch(error => { + // long running query should fail + expect(error).toBeDefined(); - //wait some time than close the session and run - //a new query + // and it should be possible to start another session and run a query + const anotherSession = driver.session(); + anotherSession.run('RETURN 1.0 as a').then(result => { + expect(result.records.length).toBe(1); + expect(result.records[0].get('a')).toBe(1); + done(); + }).catch(error => { + console.log('Query failed after a long running query was terminated', error); + }); + }); + + // wait some time than close the session with a long running query setTimeout(() => { session.close(); - const anotherSession = driver.session(); - setTimeout(() => { - anotherSession.run("RETURN 1.0 as a") - .then(ignore => { - done(); - }); - }, 500); - }, 500); + }, 1000); }); it('should fail nicely on unpackable values ', done => { @@ -443,41 +449,138 @@ describe('session', () => { expect(() => driver.session('ILLEGAL_MODE')).toThrow(); }); - function newSessionWithConnection(connection) { - const connectionProvider = new SingleConnectionProvider(Promise.resolve(connection)); - return new Session(READ, connectionProvider); - } + it('should release connection to the pool after run', done => { + withQueryInTmpSession(driver, () => { + const idleConnectionsBefore = idleConnectionCount(driver); - class FakeConnection { + session.run('RETURN 1').then(() => { + const idleConnectionsAfter = idleConnectionCount(driver); + expect(idleConnectionsBefore).toEqual(idleConnectionsAfter); + done(); + }); + }); + }); - constructor() { - this.resetInvoked = 0; - this.syncInvoked = 0; - this.releaseInvoked = 0; - } + it('should release connection to the pool after run failure', done => { + withQueryInTmpSession(driver, () => { + const idleConnectionsBefore = idleConnectionCount(driver); - run() { - } + session.run('RETURN 10 / 0').catch(() => { + const idleConnectionsAfter = idleConnectionCount(driver); + expect(idleConnectionsBefore).toEqual(idleConnectionsAfter); + done(); + }); + }); + }); - discardAll() { - } + it('should release connection to the pool when result is consumed', done => { + withQueryInTmpSession(driver, () => { + const idleConnectionsBefore = idleConnectionCount(driver); + + session.run('UNWIND range(0, 10) AS x RETURN x + 1').subscribe({ + onNext: () => { + // one less idle connection, one connection is used for the current query + expect(idleConnectionCount(driver)).toBe(idleConnectionsBefore - 1); + }, + onError: error => { + console.log(error); + }, + onCompleted: () => { + expect(idleConnectionCount(driver)).toBe(idleConnectionsBefore); + done(); + } + }); + }); + }); - reset() { - this.resetInvoked++; - } + it('should release connection to the pool when result fails', done => { + withQueryInTmpSession(driver, () => { + const idleConnectionsBefore = idleConnectionCount(driver); + + session.run('UNWIND range(10, 0, -1) AS x RETURN 10 / x').subscribe({ + onNext: () => { + // one less idle connection, one connection is used for the current query + expect(idleConnectionCount(driver)).toBe(idleConnectionsBefore - 1); + }, + onError: ignoredError => { + expect(idleConnectionCount(driver)).toBe(idleConnectionsBefore); + done(); + }, + onCompleted: () => { + } + }); + }); + }); - sync() { - this.syncInvoked++; - } + it('should release connection to the pool when transaction commits', done => { + withQueryInTmpSession(driver, () => { + const idleConnectionsBefore = idleConnectionCount(driver); + + const tx = session.beginTransaction(); + tx.run('UNWIND range(0, 10) AS x RETURN x + 1').subscribe({ + onNext: () => { + // one less idle connection, one connection is used for the current transaction + expect(idleConnectionCount(driver)).toBe(idleConnectionsBefore - 1); + }, + onError: error => { + console.log(error); + }, + onCompleted: () => { + // one less idle connection, one connection is used for the current transaction + expect(idleConnectionCount(driver)).toBe(idleConnectionsBefore - 1); + tx.commit().then(() => { + expect(idleConnectionCount(driver)).toBe(idleConnectionsBefore); + done(); + }); + } + }); + }); + }); - _release() { - this.releaseInvoked++; - } + it('should release connection to the pool when transaction rolls back', done => { + withQueryInTmpSession(driver, () => { + const idleConnectionsBefore = idleConnectionCount(driver); + + const tx = session.beginTransaction(); + tx.run('UNWIND range(0, 10) AS x RETURN x + 1').subscribe({ + onNext: () => { + // one less idle connection, one connection is used for the current transaction + expect(idleConnectionCount(driver)).toBe(idleConnectionsBefore - 1); + }, + onError: error => { + console.log(error); + }, + onCompleted: () => { + // one less idle connection, one connection is used for the current transaction + expect(idleConnectionCount(driver)).toBe(idleConnectionsBefore - 1); + tx.rollback().then(() => { + expect(idleConnectionCount(driver)).toBe(idleConnectionsBefore); + done(); + }); + } + }); + }); + }); - closedOnce() { - return this.resetInvoked === 1 && this.syncInvoked === 1 && this.releaseInvoked === 1; - } + function withQueryInTmpSession(driver, callback) { + const tmpSession = driver.session(); + return tmpSession.run('RETURN 1').then(() => { + tmpSession.close(callback); + }); } -}); + function newSessionWithConnection(connection) { + const connectionProvider = new SingleConnectionProvider(Promise.resolve(connection)); + const session = new Session(READ, connectionProvider); + session.beginTransaction(); // force session to acquire new connection + return session; + } + function idleConnectionCount(driver) { + const connectionProvider = driver._connectionProvider; + const address = connectionProvider._address; + const connectionPool = connectionProvider._connectionPool; + const idleConnections = connectionPool._pools[address]; + return idleConnections.length; + } +}); diff --git a/test/v1/tck/steps/erroreportingsteps.js b/test/v1/tck/steps/erroreportingsteps.js index 2c75dfe9d..54d2306fa 100644 --- a/test/v1/tck/steps/erroreportingsteps.js +++ b/test/v1/tck/steps/erroreportingsteps.js @@ -65,7 +65,7 @@ module.exports = function () { var self = this; var driver = neo4j.driver("bolt://localhost:7777", neo4j.auth.basic("neo4j", "neo4j")); driver.onError = function (error) { self.error = error; callback()}; - driver.session(); + driver.session().beginTransaction(); setTimeout(callback, 1000); });