diff --git a/neokit b/neokit index 8ee8fa80b..868498aa8 160000 --- a/neokit +++ b/neokit @@ -1 +1 @@ -Subproject commit 8ee8fa80b2e560339a18f1872d314d3ba64f0427 +Subproject commit 868498aa8cd589975b3315577c024f879bea0c5b diff --git a/src/v1/driver.js b/src/v1/driver.js index d61c8265a..457c1f6b0 100644 --- a/src/v1/driver.js +++ b/src/v1/driver.js @@ -58,7 +58,7 @@ class Driver { Driver._validateConnection.bind(this), config.connectionPoolSize ); - this._connectionProvider = this._createConnectionProvider(url, this._pool); + this._connectionProvider = this._createConnectionProvider(url, this._pool, this._driverOnErrorCallback.bind(this)); } /** @@ -113,15 +113,7 @@ class Driver { */ session(mode) { const sessionMode = Driver._validateSessionMode(mode); - const connectionPromise = this._connectionProvider.acquireConnection(sessionMode); - connectionPromise.catch((err) => { - if (this.onError && err.code === SERVICE_UNAVAILABLE) { - this.onError(err); - } else { - //we don't need to tell the driver about this error - } - }); - return this._createSession(connectionPromise); + return this._createSession(sessionMode, this._connectionProvider); } static _validateSessionMode(rawMode) { @@ -133,13 +125,22 @@ class Driver { } //Extension point - _createConnectionProvider(address, connectionPool) { - return new DirectConnectionProvider(address, connectionPool); + _createConnectionProvider(address, connectionPool, driverOnErrorCallback) { + return new DirectConnectionProvider(address, connectionPool, driverOnErrorCallback); } //Extension point - _createSession(connectionPromise) { - return new Session(connectionPromise); + _createSession(mode, connectionProvider) { + return new Session(mode, connectionProvider); + } + + _driverOnErrorCallback(error) { + const userDefinedOnErrorCallback = this.onError; + if (userDefinedOnErrorCallback && error.code === SERVICE_UNAVAILABLE) { + userDefinedOnErrorCallback(error); + } else { + // we don't need to tell the driver about this error + } } /** diff --git a/src/v1/internal/connection-holder.js b/src/v1/internal/connection-holder.js new file mode 100644 index 000000000..bae63cacc --- /dev/null +++ b/src/v1/internal/connection-holder.js @@ -0,0 +1,137 @@ +/** + * Copyright (c) 2002-2017 "Neo Technology,"," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import {newError} from '../error'; + +/** + * Utility to lazily initialize connections and return them back to the pool when unused. + */ +export default class ConnectionHolder { + + /** + * @constructor + * @param {string} mode - the access mode for new connection holder. + * @param {ConnectionProvider} connectionProvider - the connection provider to acquire connections from. + */ + constructor(mode, connectionProvider) { + this._mode = mode; + this._connectionProvider = connectionProvider; + this._referenceCount = 0; + this._connectionPromise = Promise.resolve(null); + } + + /** + * Make this holder initialize new connection if none exists already. + * @return {undefined} + */ + initializeConnection() { + if (this._referenceCount === 0) { + this._connectionPromise = this._connectionProvider.acquireConnection(this._mode); + } + this._referenceCount++; + } + + /** + * Get the current connection promise. + * @return {Promise} promise resolved with the current connection. + */ + getConnection() { + return this._connectionPromise; + } + + /** + * Notify this holder that single party does not require current connection any more. + * @return {Promise} promise resolved with the current connection. + */ + releaseConnection() { + if (this._referenceCount === 0) { + return this._connectionPromise; + } + + this._referenceCount--; + if (this._referenceCount === 0) { + // release a connection without muting ACK_FAILURE, this is the last action on this connection + return this._releaseConnection(true); + } + return this._connectionPromise; + } + + /** + * Closes this holder and releases current connection (if any) despite any existing users. + * @return {Promise} promise resolved when current connection is released to the pool. + */ + close() { + if (this._referenceCount === 0) { + return this._connectionPromise; + } + this._referenceCount = 0; + // release a connection and mute ACK_FAILURE, this might be called concurrently with other + // operations and thus should ignore failure handling + return this._releaseConnection(false); + } + + /** + * Return the current pooled connection instance to the connection pool. + * We don't pool Session instances, to avoid users using the Session after they've called close. + * The `Session` object is just a thin wrapper around Connection anyway, so it makes little difference. + * @return {Promise} - promise resolved then connection is returned to the pool. + * @private + */ + _releaseConnection(sync) { + this._connectionPromise = this._connectionPromise.then(connection => { + if (connection) { + if(sync) { + connection.reset(); + } else { + connection.resetAsync(); + } + connection.sync(); + connection._release(); + } + }).catch(ignoredError => { + }); + + return this._connectionPromise; + } +} + +class EmptyConnectionHolder extends ConnectionHolder { + + initializeConnection() { + // nothing to initialize + } + + getConnection() { + return Promise.reject(newError('This connection holder does not serve connections')); + } + + releaseConnection() { + return Promise.resolve(); + } + + close() { + return Promise.resolve(); + } +} + +/** + * Connection holder that does not manage any connections. + * @type {ConnectionHolder} + */ +export const EMPTY_CONNECTION_HOLDER = new EmptyConnectionHolder(); diff --git a/src/v1/internal/connection-providers.js b/src/v1/internal/connection-providers.js index 8b8db5b43..c7dcf31b2 100644 --- a/src/v1/internal/connection-providers.js +++ b/src/v1/internal/connection-providers.js @@ -29,32 +29,46 @@ class ConnectionProvider { acquireConnection(mode) { throw new Error('Abstract method'); } + + _withAdditionalOnErrorCallback(connectionPromise, driverOnErrorCallback) { + // install error handler from the driver on the connection promise; this callback is installed separately + // so that it does not handle errors, instead it is just an additional error reporting facility. + connectionPromise.catch(error => { + driverOnErrorCallback(error) + }); + // return the original connection promise + return connectionPromise; + } } export class DirectConnectionProvider extends ConnectionProvider { - constructor(address, connectionPool) { + constructor(address, connectionPool, driverOnErrorCallback) { super(); this._address = address; this._connectionPool = connectionPool; + this._driverOnErrorCallback = driverOnErrorCallback; } acquireConnection(mode) { - return Promise.resolve(this._connectionPool.acquire(this._address)); + const connection = this._connectionPool.acquire(this._address); + const connectionPromise = Promise.resolve(connection); + return this._withAdditionalOnErrorCallback(connectionPromise, this._driverOnErrorCallback); } } export class LoadBalancer extends ConnectionProvider { - constructor(address, connectionPool) { + constructor(address, connectionPool, driverOnErrorCallback) { super(); this._routingTable = new RoutingTable(new RoundRobinArray([address])); this._rediscovery = new Rediscovery(); this._connectionPool = connectionPool; + this._driverOnErrorCallback = driverOnErrorCallback; } acquireConnection(mode) { - return this._freshRoutingTable().then(routingTable => { + const connectionPromise = this._freshRoutingTable().then(routingTable => { if (mode === READ) { return this._acquireConnectionToServer(routingTable.readers, 'read'); } else if (mode === WRITE) { @@ -63,6 +77,7 @@ export class LoadBalancer extends ConnectionProvider { throw newError('Illegal mode ' + mode); } }); + return this._withAdditionalOnErrorCallback(connectionPromise, this._driverOnErrorCallback); } forget(address) { @@ -132,7 +147,8 @@ export class LoadBalancer extends ConnectionProvider { _createSessionForRediscovery(routerAddress) { const connection = this._connectionPool.acquire(routerAddress); const connectionPromise = Promise.resolve(connection); - return new Session(connectionPromise); + const connectionProvider = new SingleConnectionProvider(connectionPromise); + return new Session(READ, connectionProvider); } _updateRoutingTable(newRoutingTable) { @@ -153,3 +169,17 @@ export class LoadBalancer extends ConnectionProvider { } } } + +export class SingleConnectionProvider extends ConnectionProvider { + + constructor(connectionPromise) { + super(); + this._connectionPromise = connectionPromise; + } + + acquireConnection(mode) { + const connectionPromise = this._connectionPromise; + this._connectionPromise = null; + return connectionPromise; + } +} diff --git a/src/v1/internal/connector.js b/src/v1/internal/connector.js index 901abeefc..7d0ed8dc2 100644 --- a/src/v1/internal/connector.js +++ b/src/v1/internal/connector.js @@ -96,7 +96,6 @@ function log(actor, msg) { } } - function NO_OP(){} let NO_OP_OBSERVER = { @@ -384,9 +383,9 @@ class Connection { this._chunker.messageBoundary(); } - /** Queue a RESET-message to be sent to the database */ - reset( observer ) { - log("C", "RESET"); + /** Queue a RESET-message to be sent to the database. Mutes failure handling. */ + resetAsync( observer ) { + log("C", "RESET_ASYNC"); this._isHandlingFailure = true; let self = this; let wrappedObs = { @@ -404,6 +403,14 @@ class Connection { this._chunker.messageBoundary(); } + /** Queue a RESET-message to be sent to the database */ + reset(observer) { + log('C', 'RESET'); + this._queueObserver(observer); + this._packer.packStruct(RESET, [], (err) => this._handleFatalError(err)); + this._chunker.messageBoundary(); + } + /** Queue a ACK_FAILURE-message to be sent to the database */ _ackFailure( observer ) { log("C", "ACK_FAILURE"); diff --git a/src/v1/internal/get-servers-util.js b/src/v1/internal/get-servers-util.js index 0591f8b29..d94da81df 100644 --- a/src/v1/internal/get-servers-util.js +++ b/src/v1/internal/get-servers-util.js @@ -17,9 +17,9 @@ * limitations under the License. */ -import RoundRobinArray from "./round-robin-array"; -import {newError, PROTOCOL_ERROR, SERVICE_UNAVAILABLE} from "../error"; -import Integer, {int} from "../integer"; +import RoundRobinArray from './round-robin-array'; +import {newError, PROTOCOL_ERROR, SERVICE_UNAVAILABLE} from '../error'; +import Integer, {int} from '../integer'; const PROCEDURE_CALL = 'CALL dbms.cluster.routing.getServers'; const PROCEDURE_NOT_FOUND_CODE = 'Neo.ClientError.Procedure.ProcedureNotFound'; diff --git a/src/v1/result.js b/src/v1/result.js index 6606b58db..98afe77fd 100644 --- a/src/v1/result.js +++ b/src/v1/result.js @@ -18,6 +18,7 @@ */ import ResultSummary from './result-summary'; +import {EMPTY_CONNECTION_HOLDER} from './internal/connection-holder'; /** * A stream of {@link Record} representing the result of a statement. @@ -32,13 +33,15 @@ class Result { * @param {mixed} statement - Cypher statement to execute * @param {Object} parameters - Map with parameters to use in statement * @param metaSupplier function, when called provides metadata + * @param {ConnectionHolder} connectionHolder - to be notified when result is either fully consumed or error happened. */ - constructor(streamObserver, statement, parameters, metaSupplier) { + constructor(streamObserver, statement, parameters, metaSupplier, connectionHolder) { this._streamObserver = streamObserver; this._p = null; this._statement = statement; this._parameters = parameters || {}; this._metaSupplier = metaSupplier || function(){return {};}; + this._connectionHolder = connectionHolder || EMPTY_CONNECTION_HOLDER; } /** @@ -99,23 +102,39 @@ class Result { * @return */ subscribe(observer) { - let onCompletedOriginal = observer.onCompleted; - let self = this; - let onCompletedWrapper = (metadata) => { + const onCompletedOriginal = observer.onCompleted; + const self = this; + const onCompletedWrapper = (metadata) => { - let additionalMeta = self._metaSupplier(); - for(var key in additionalMeta) { + const additionalMeta = self._metaSupplier(); + for(let key in additionalMeta) { if (additionalMeta.hasOwnProperty(key)) { metadata[key] = additionalMeta[key]; } } - let sum = new ResultSummary(this._statement, this._parameters, metadata); - onCompletedOriginal.call(observer, sum); + const sum = new ResultSummary(this._statement, this._parameters, metadata); + + // notify connection holder that the used connection is not needed any more because result has + // been fully consumed; call the original onCompleted callback after that + self._connectionHolder.releaseConnection().then(() => { + onCompletedOriginal.call(observer, sum); + }); }; observer.onCompleted = onCompletedWrapper; - observer.onError = observer.onError || ((err) => { - console.log("Uncaught error when processing result: " + err); + + const onErrorOriginal = observer.onError || (error => { + console.log("Uncaught error when processing result: " + error); }); + + const onErrorWrapper = error => { + // notify connection holder that the used connection is not needed any more because error happened + // and result can't bee consumed any further; call the original onError callback after that + self._connectionHolder.releaseConnection().then(() => { + onErrorOriginal.call(observer, error); + }); + }; + observer.onError = onErrorWrapper; + this._streamObserver.subscribe(observer); } } diff --git a/src/v1/routing-driver.js b/src/v1/routing-driver.js index 1f2091558..8edb01646 100644 --- a/src/v1/routing-driver.js +++ b/src/v1/routing-driver.js @@ -31,30 +31,24 @@ class RoutingDriver extends Driver { super(url, userAgent, token, RoutingDriver._validateConfig(config)); } - _createConnectionProvider(address, connectionPool) { - return new LoadBalancer(address, connectionPool); + _createConnectionProvider(address, connectionPool, driverOnErrorCallback) { + return new LoadBalancer(address, connectionPool, driverOnErrorCallback); } - _createSession(connectionPromise) { - return new RoutingSession(connectionPromise, (error, conn) => { + _createSession(mode, connectionProvider) { + return new RoutingSession(mode, connectionProvider, (error, conn) => { if (error.code === SERVICE_UNAVAILABLE || error.code === SESSION_EXPIRED) { + // connection is undefined if error happened before connection was acquired if (conn) { this._connectionProvider.forget(conn.url); - } else { - connectionPromise.then((conn) => { - this._connectionProvider.forget(conn.url); - }).catch(() => {/*ignore*/}); } return error; } else if (error.code === 'Neo.ClientError.Cluster.NotALeader') { let url = 'UNKNOWN'; + // connection is undefined if error happened before connection was acquired if (conn) { url = conn.url; this._connectionProvider.forgetWriter(conn.url); - } else { - connectionPromise.then((conn) => { - this._connectionProvider.forgetWriter(conn.url); - }).catch(() => {/*ignore*/}); } return newError('No longer possible to write to server at ' + url, SESSION_EXPIRED); } else { @@ -72,8 +66,8 @@ class RoutingDriver extends Driver { } class RoutingSession extends Session { - constructor(connectionPromise, onFailedConnection) { - super(connectionPromise); + constructor(mode, connectionProvider, onFailedConnection) { + super(mode, connectionProvider); this._onFailedConnection = onFailedConnection; } diff --git a/src/v1/session.js b/src/v1/session.js index d308781b6..e7e6723c7 100644 --- a/src/v1/session.js +++ b/src/v1/session.js @@ -21,6 +21,8 @@ import Result from './result'; import Transaction from './transaction'; import {newError} from './error'; import {assertString} from './internal/util'; +import ConnectionHolder from './internal/connection-holder'; +import {READ, WRITE} from './driver'; /** * A Session instance is used for handling the connection and @@ -29,12 +31,16 @@ import {assertString} from './internal/util'; */ class Session { + /** * @constructor - * @param {Promise.} connectionPromise - Promise of a connection to use + * @param {string} mode - the default access mode for this session. + * @param connectionProvider - the connection provider to acquire connections from. */ - constructor(connectionPromise) { - this._connectionPromise = connectionPromise; + constructor(mode, connectionProvider) { + this._mode = mode; + this._readConnectionHolder = new ConnectionHolder(READ, connectionProvider); + this._writeConnectionHolder = new ConnectionHolder(WRITE, connectionProvider); this._open = true; this._hasTx = false; } @@ -55,19 +61,21 @@ class Session { assertString(statement, "Cypher statement"); const streamObserver = new _RunObserver(this._onRunFailure()); + const connectionHolder = this._connectionHolderWithMode(this._mode); if (!this._hasTx) { - this._connectionPromise.then((conn) => { - streamObserver.resolveConnection(conn); - conn.run(statement, parameters, streamObserver); - conn.pullAll(streamObserver); - conn.sync(); - }).catch((err) => streamObserver.onError(err)); + connectionHolder.initializeConnection(); + connectionHolder.getConnection().then(connection => { + streamObserver.resolveConnection(connection); + connection.run(statement, parameters, streamObserver); + connection.pullAll(streamObserver); + connection.sync(); + }).catch(error => streamObserver.onError(error)); } else { streamObserver.onError(newError("Statements cannot be run directly on a " + "session with an open transaction; either run from within the " + "transaction or use a different session.")); } - return new Result( streamObserver, statement, parameters, () => streamObserver.meta() ); + return new Result( streamObserver, statement, parameters, () => streamObserver.meta(), connectionHolder ); } /** @@ -90,8 +98,15 @@ class Session { } this._hasTx = true; - return new Transaction(this._connectionPromise, () => { - this._hasTx = false}, + + // todo: add mode parameter + const mode = this._mode; + const connectionHolder = this._connectionHolderWithMode(mode); + connectionHolder.initializeConnection(); + + return new Transaction(connectionHolder, () => { + this._hasTx = false; + }, this._onRunFailure(), bookmark, (bookmark) => {this._lastBookmark = bookmark}); } @@ -107,7 +122,11 @@ class Session { close(callback = (() => null)) { if (this._open) { this._open = false; - this._releaseCurrentConnection().then(callback); + this._readConnectionHolder.close().then(() => { + this._writeConnectionHolder.close().then(() => { + callback(); + }); + }); } else { callback(); } @@ -118,23 +137,14 @@ class Session { return (err) => {return err}; } - /** - * Return the current pooled connection instance to the connection pool. - * We don't pool Session instances, to avoid users using the Session after they've called close. - * The `Session` object is just a thin wrapper around Connection anyway, so it makes little difference. - * @return {Promise} - promise resolved then connection is returned to the pool. - * @private - */ - _releaseCurrentConnection() { - return this._connectionPromise.then(conn => { - // Queue up a 'reset', to ensure the next user gets a clean session to work with. - conn.reset(); - conn.sync(); - - // Return connection to the pool - conn._release(); - }).catch(ignoredError => { - }); + _connectionHolderWithMode(mode) { + if (mode === READ) { + return this._readConnectionHolder; + } else if (mode === WRITE) { + return this._writeConnectionHolder; + } else { + throw newError('Unknown access mode: ' + mode); + } } } diff --git a/src/v1/transaction.js b/src/v1/transaction.js index b29e534fe..cb3cd6892 100644 --- a/src/v1/transaction.js +++ b/src/v1/transaction.js @@ -19,6 +19,7 @@ import StreamObserver from './internal/stream-observer'; import Result from './result'; import {assertString} from './internal/util'; +import {EMPTY_CONNECTION_HOLDER} from './internal/connection-holder'; /** * Represents a transaction in the Neo4j database. @@ -28,24 +29,25 @@ import {assertString} from './internal/util'; class Transaction { /** * @constructor - * @param {Promise} connectionPromise - A connection to use + * @param {ConnectionHolder} connectionHolder - the connection holder to get connection from. * @param {function()} onClose - Function to be called when transaction is committed or rolled back. * @param errorTransformer callback use to transform error * @param bookmark optional bookmark * @param onBookmark callback invoked when new bookmark is produced */ - constructor(connectionPromise, onClose, errorTransformer, bookmark, onBookmark) { - this._connectionPromise = connectionPromise; + constructor(connectionHolder, onClose, errorTransformer, bookmark, onBookmark) { + this._connectionHolder = connectionHolder; let streamObserver = new _TransactionStreamObserver(this); let params = {}; if (bookmark) { params = {bookmark: bookmark}; } - this._connectionPromise.then((conn) => { + + this._connectionHolder.getConnection().then(conn => { streamObserver.resolveConnection(conn); - conn.run("BEGIN", params, streamObserver); - conn.discardAll(streamObserver); - }).catch(streamObserver.onError); + conn.run('BEGIN', params, streamObserver); + conn.pullAll(streamObserver); + }).catch(error => streamObserver.onError(error)); this._state = _states.ACTIVE; this._onClose = onClose; @@ -68,7 +70,7 @@ class Transaction { } assertString(statement, "Cypher statement"); - return this._state.run(this._connectionPromise, new _TransactionStreamObserver(this), statement, parameters); + return this._state.run(this._connectionHolder, new _TransactionStreamObserver(this), statement, parameters); } /** @@ -79,12 +81,11 @@ class Transaction { * @returns {Result} - New Result */ commit() { - let committed = this._state.commit(this._connectionPromise, new _TransactionStreamObserver(this)); + let committed = this._state.commit(this._connectionHolder, new _TransactionStreamObserver(this)); this._state = committed.state; //clean up this._onClose(); return committed.result; - } /** @@ -95,7 +96,7 @@ class Transaction { * @returns {Result} - New Result */ rollback() { - let committed = this._state.rollback(this._connectionPromise, new _TransactionStreamObserver(this)); + let committed = this._state.rollback(this._connectionHolder, new _TransactionStreamObserver(this)); this._state = committed.state; //clean up this._onClose(); @@ -147,101 +148,136 @@ class _TransactionStreamObserver extends StreamObserver { let _states = { //The transaction is running with no explicit success or failure marked ACTIVE: { - commit: (connectionPromise, observer) => { - return {result: _runDiscardAll("COMMIT", connectionPromise, observer), + commit: (connectionHolder, observer) => { + return {result: _runDiscardAll("COMMIT", connectionHolder, observer), state: _states.SUCCEEDED} }, - rollback: (connectionPromise, observer) => { - return {result: _runDiscardAll("ROLLBACK", connectionPromise, observer), state: _states.ROLLED_BACK}; + rollback: (connectionHolder, observer) => { + return {result: _runDiscardAll("ROLLBACK", connectionHolder, observer), state: _states.ROLLED_BACK}; }, - run: (connectionPromise, observer, statement, parameters) => { - connectionPromise.then((conn) => { + run: (connectionHolder, observer, statement, parameters) => { + connectionHolder.getConnection().then(conn => { observer.resolveConnection(conn); - conn.run( statement, parameters || {}, observer ); - conn.pullAll( observer ); + conn.run(statement, parameters || {}, observer); + conn.pullAll(observer); conn.sync(); - }).catch(observer.onError); + }).catch(error => observer.onError(error)); - return new Result( observer, statement, parameters, () => observer.serverMeta() ); + return newRunResult(observer, statement, parameters, () => observer.serverMeta()); } }, //An error has occurred, transaction can no longer be used and no more messages will // be sent for this transaction. FAILED: { - commit: (conn, observer) => { + commit: (connectionHolder, observer) => { observer.onError({ error: "Cannot commit statements in this transaction, because previous statements in the " + "transaction has failed and the transaction has been rolled back. Please start a new" + " transaction to run another statement." }); - return {result: new Result(observer, "COMMIT", {}), state: _states.FAILED}; + return {result: newDummyResult(observer, "COMMIT", {}), state: _states.FAILED}; }, - rollback: (conn, observer) => { + rollback: (connectionHolder, observer) => { observer.onError({error: "Cannot rollback transaction, because previous statements in the " + "transaction has failed and the transaction has already been rolled back."}); - return {result: new Result(observer, "ROLLBACK", {}), state: _states.FAILED}; + return {result: newDummyResult(observer, "ROLLBACK", {}), state: _states.FAILED}; }, - run: (conn, observer, statement, parameters) => { + run: (connectionHolder, observer, statement, parameters) => { observer.onError({error: "Cannot run statement, because previous statements in the " + "transaction has failed and the transaction has already been rolled back."}); - return new Result(observer, statement, parameters); + return newDummyResult(observer, statement, parameters); } }, //This transaction has successfully committed SUCCEEDED: { - commit: (conn, observer) => { + commit: (connectionHolder, observer) => { observer.onError({ error: "Cannot commit statements in this transaction, because commit has already been successfully called on the transaction and transaction has been closed. Please start a new" + " transaction to run another statement." }); - return {result: new Result(observer, "COMMIT", {}), state: _states.SUCCEEDED}; + return {result: newDummyResult(observer, "COMMIT", {}), state: _states.SUCCEEDED}; }, - rollback: (conn, observer) => { + rollback: (connectionHolder, observer) => { observer.onError({error: "Cannot rollback transaction, because transaction has already been successfully closed."}); - return {result: new Result(observer, "ROLLBACK", {}), state: _states.SUCCEEDED}; + return {result: newDummyResult(observer, "ROLLBACK", {}), state: _states.SUCCEEDED}; }, - run: (conn, observer, statement, parameters) => { + run: (connectionHolder, observer, statement, parameters) => { observer.onError({error: "Cannot run statement, because transaction has already been successfully closed."}); - return new Result(observer, statement, parameters); + return newDummyResult(observer, statement, parameters); } }, //This transaction has been rolled back ROLLED_BACK: { - commit: (conn, observer) => { + commit: (connectionHolder, observer) => { observer.onError({ error: "Cannot commit this transaction, because it has already been rolled back." }); - return {result: new Result(observer, "COMMIT", {}), state: _states.ROLLED_BACK}; + return {result: newDummyResult(observer, "COMMIT", {}), state: _states.ROLLED_BACK}; }, - rollback: (conn, observer) => { + rollback: (connectionHolder, observer) => { observer.onError({error: "Cannot rollback transaction, because transaction has already been rolled back."}); - return {result: new Result(observer, "ROLLBACK", {}), state: _states.ROLLED_BACK}; + return {result: newDummyResult(observer, "ROLLBACK", {}), state: _states.ROLLED_BACK}; }, - run: (conn, observer, statement, parameters) => { + run: (connectionHolder, observer, statement, parameters) => { observer.onError({error: "Cannot run statement, because transaction has already been rolled back."}); - return new Result(observer, statement, parameters); + return newDummyResult(observer, statement, parameters); } } }; -function _runDiscardAll(msg, connectionPromise, observer) { - connectionPromise.then((conn) => { - observer.resolveConnection(conn); - conn.run(msg, {}, observer); - conn.discardAll(observer); - conn.sync(); - }).catch(observer.onError); +function _runDiscardAll(msg, connectionHolder, observer) { + connectionHolder.getConnection().then( + conn => { + observer.resolveConnection(conn); + conn.run(msg, {}, observer); + conn.pullAll(observer); + conn.sync(); + }).catch(error => observer.onError(error)); + + // for commit & rollback we need result that uses real connection holder and notifies it when + // connection is not needed and can be safely released to the pool + return new Result(observer, msg, {}, emptyMetadataSupplier, connectionHolder); +} + +/** + * Creates a {@link Result} with empty connection holder. + * Should be used as a result for running cypher statements. They can result in metadata but should not + * influence real connection holder to release connections because single transaction can have + * {@link Transaction#run} called multiple times. + * @param {StreamObserver} observer - an observer for the created result. + * @param {string} statement - the cypher statement that produced the result. + * @param {object} parameters - the parameters for cypher statement that produced the result. + * @param {function} metadataSupplier - the function that returns a metadata object. + * @return {Result} new result. + */ +function newRunResult(observer, statement, parameters, metadataSupplier) { + return new Result(observer, statement, parameters, metadataSupplier, EMPTY_CONNECTION_HOLDER); +} + +/** + * Creates a {@link Result} without metadata supplier and with empty connection holder. + * For cases when result represents an intermediate or failed action, does not require any metadata and does not + * need to influence real connection holder to release connections. + * @param {StreamObserver} observer - an observer for the created result. + * @param {string} statement - the cypher statement that produced the result. + * @param {object} parameters - the parameters for cypher statement that produced the result. + * @return {Result} new result. + */ +function newDummyResult(observer, statement, parameters) { + return new Result(observer, statement, parameters, emptyMetadataSupplier, EMPTY_CONNECTION_HOLDER); +} - return new Result(observer, msg, {}); +function emptyMetadataSupplier() { + return {}; } export default Transaction; diff --git a/test/internal/connection-holder.test.js b/test/internal/connection-holder.test.js new file mode 100644 index 000000000..8c5d1a854 --- /dev/null +++ b/test/internal/connection-holder.test.js @@ -0,0 +1,197 @@ +/** + * Copyright (c) 2002-2017 "Neo Technology,"," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import ConnectionHolder, {EMPTY_CONNECTION_HOLDER} from '../../src/v1/internal/connection-holder'; +import {SingleConnectionProvider} from '../../src/v1/internal/connection-providers'; +import {READ} from '../../src/v1/driver'; +import FakeConnection from './fake-connection'; + +describe('EmptyConnectionHolder', () => { + + it('should return rejected promise instead of connection', done => { + EMPTY_CONNECTION_HOLDER.getConnection().catch(() => { + done(); + }); + }); + + it('should return resolved promise on release', done => { + EMPTY_CONNECTION_HOLDER.releaseConnection().then(() => { + done(); + }); + }); + + it('should return resolved promise on close', done => { + EMPTY_CONNECTION_HOLDER.close().then(() => { + done(); + }); + }); + +}); + +describe('ConnectionHolder', () => { + + it('should acquire new connection during initialization', () => { + const connectionProvider = new RecordingConnectionProvider([new FakeConnection()]); + const connectionHolder = new ConnectionHolder(READ, connectionProvider); + + connectionHolder.initializeConnection(); + + expect(connectionProvider.acquireConnectionInvoked).toBe(1); + }); + + it('should return acquired during initialization connection', done => { + const connection = new FakeConnection(); + const connectionProvider = newSingleConnectionProvider(connection); + const connectionHolder = new ConnectionHolder(READ, connectionProvider); + + connectionHolder.initializeConnection(); + + connectionHolder.getConnection().then(connection => { + expect(connection).toBe(connection); + done(); + }); + }); + + it('should release connection with single user', done => { + const connection = new FakeConnection(); + const connectionProvider = newSingleConnectionProvider(connection); + const connectionHolder = new ConnectionHolder(READ, connectionProvider); + + connectionHolder.initializeConnection(); + + connectionHolder.releaseConnection().then(() => { + expect(connection.isReleasedOnce()).toBeTruthy(); + done(); + }); + }); + + it('should not release connection with multiple users', done => { + const connection = new FakeConnection(); + const connectionProvider = newSingleConnectionProvider(connection); + const connectionHolder = new ConnectionHolder(READ, connectionProvider); + + connectionHolder.initializeConnection(); + connectionHolder.initializeConnection(); + connectionHolder.initializeConnection(); + + connectionHolder.releaseConnection().then(() => { + expect(connection.isNeverReleased()).toBeTruthy(); + done(); + }); + }); + + it('should release connection with multiple users when all users release', done => { + const connection = new FakeConnection(); + const connectionProvider = newSingleConnectionProvider(connection); + const connectionHolder = new ConnectionHolder(READ, connectionProvider); + + connectionHolder.initializeConnection(); + connectionHolder.initializeConnection(); + connectionHolder.initializeConnection(); + + connectionHolder.releaseConnection().then(() => { + connectionHolder.releaseConnection().then(() => { + connectionHolder.releaseConnection().then(() => { + expect(connection.isReleasedOnce()).toBeTruthy(); + done(); + }); + }); + }); + }); + + it('should do nothing when closed and not initialized', done => { + const connection = new FakeConnection(); + const connectionProvider = newSingleConnectionProvider(connection); + const connectionHolder = new ConnectionHolder(READ, connectionProvider); + + connectionHolder.close().then(() => { + expect(connection.isNeverReleased()).toBeTruthy(); + done(); + }); + }); + + it('should close even when users exist', done => { + const connection = new FakeConnection(); + const connectionProvider = newSingleConnectionProvider(connection); + const connectionHolder = new ConnectionHolder(READ, connectionProvider); + + connectionHolder.initializeConnection(); + connectionHolder.initializeConnection(); + + connectionHolder.close().then(() => { + expect(connection.isReleasedOnceOnSessionClose()).toBeTruthy(); + done(); + }); + }); + + it('should initialize new connection after releasing current one', done => { + const connection1 = new FakeConnection(); + const connection2 = new FakeConnection(); + const connectionProvider = new RecordingConnectionProvider([connection1, connection2]); + const connectionHolder = new ConnectionHolder(READ, connectionProvider); + + connectionHolder.initializeConnection(); + + connectionHolder.releaseConnection().then(() => { + expect(connection1.isReleasedOnce()).toBeTruthy(); + + connectionHolder.initializeConnection(); + connectionHolder.releaseConnection().then(() => { + expect(connection2.isReleasedOnce()).toBeTruthy(); + done(); + }); + }); + }); + + it('should initialize new connection after being closed', done => { + const connection1 = new FakeConnection(); + const connection2 = new FakeConnection(); + const connectionProvider = new RecordingConnectionProvider([connection1, connection2]); + const connectionHolder = new ConnectionHolder(READ, connectionProvider); + + connectionHolder.initializeConnection(); + + connectionHolder.close().then(() => { + expect(connection1.isReleasedOnceOnSessionClose()).toBeTruthy(); + + connectionHolder.initializeConnection(); + connectionHolder.close().then(() => { + expect(connection2.isReleasedOnceOnSessionClose()).toBeTruthy(); + done(); + }); + }); + }); +}); + +class RecordingConnectionProvider extends SingleConnectionProvider { + + constructor(connections) { + super(Promise.resolve()); + this.connectionPromises = connections.map(conn => Promise.resolve(conn)); + this.acquireConnectionInvoked = 0; + } + + acquireConnection(mode) { + return this.connectionPromises[this.acquireConnectionInvoked++]; + } +} + +function newSingleConnectionProvider(connection) { + return new SingleConnectionProvider(Promise.resolve(connection)); +} diff --git a/test/internal/connection-providers.test.js b/test/internal/connection-providers.test.js index 662506041..04329d6a1 100644 --- a/test/internal/connection-providers.test.js +++ b/test/internal/connection-providers.test.js @@ -25,11 +25,14 @@ import RoundRobinArray from '../../src/v1/internal/round-robin-array'; import {DirectConnectionProvider, LoadBalancer} from '../../src/v1/internal/connection-providers'; import Pool from '../../src/v1/internal/pool'; +const NO_OP_DRIVER_CALLBACK = () => { +}; + describe('DirectConnectionProvider', () => { it('acquires connection from the pool', done => { const pool = newPool(); - const connectionProvider = new DirectConnectionProvider('localhost:123', pool); + const connectionProvider = newDirectConnectionProvider('localhost:123', pool); connectionProvider.acquireConnection(READ).then(connection => { expect(connection).toBeDefined(); @@ -132,7 +135,7 @@ describe('LoadBalancer', () => { }); it('initializes routing table with the given router', () => { - const loadBalancer = new LoadBalancer('server-ABC', newPool()); + const loadBalancer = new LoadBalancer('server-ABC', newPool(), NO_OP_DRIVER_CALLBACK); expectRoutingTable(loadBalancer, ['server-ABC'], @@ -581,8 +584,12 @@ describe('LoadBalancer', () => { }); +function newDirectConnectionProvider(address, pool) { + return new DirectConnectionProvider(address, pool, NO_OP_DRIVER_CALLBACK); +} + function newLoadBalancer(routers, readers, writers, pool = null, expirationTime = Integer.MAX_VALUE, routerToRoutingTable = {}) { - const loadBalancer = new LoadBalancer(null, pool || newPool()); + const loadBalancer = new LoadBalancer(null, pool || newPool(), NO_OP_DRIVER_CALLBACK); loadBalancer._routingTable = new RoutingTable( new RoundRobinArray(routers), new RoundRobinArray(readers), diff --git a/test/internal/fake-connection.js b/test/internal/fake-connection.js new file mode 100644 index 000000000..60dd3c11e --- /dev/null +++ b/test/internal/fake-connection.js @@ -0,0 +1,83 @@ +/** + * Copyright (c) 2002-2017 "Neo Technology,"," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * This class is like a mock of {@link Connection} that tracks invocations count. + * It tries to maintain same "interface" as {@link Connection}. + * It could be replaced with a proper mock by a library like testdouble. + * At the time of writing such libraries require {@link Proxy} support but browser tests execute in + * PhantomJS which does not support proxies. + */ +export default class FakeConnection { + + constructor() { + this.resetInvoked = 0; + this.resetAsyncInvoked = 0; + this.syncInvoked = 0; + this.releaseInvoked = 0; + } + + run() { + } + + discardAll() { + } + + reset() { + this.resetInvoked++; + } + + resetAsync() { + this.resetAsyncInvoked++; + } + + sync() { + this.syncInvoked++; + } + + _release() { + this.releaseInvoked++; + } + + isReleasedOnceOnSessionClose() { + return this.isReleasedOnSessionCloseTimes(1); + } + + isReleasedOnSessionCloseTimes(times) { + return this.resetAsyncInvoked === times && + this.resetInvoked === 0 && + this.syncInvoked === times && + this.releaseInvoked === times; + } + + isNeverReleased() { + return this.isReleasedTimes(0); + } + + isReleasedOnce() { + return this.isReleasedTimes(1); + } + + isReleasedTimes(times) { + return this.resetAsyncInvoked === 0 && + this.resetInvoked === times && + this.syncInvoked === times && + this.releaseInvoked === times; + } +}; diff --git a/test/internal/tls.test.js b/test/internal/tls.test.js index 62ce70b77..dc58f4b09 100644 --- a/test/internal/tls.test.js +++ b/test/internal/tls.test.js @@ -269,7 +269,9 @@ describe('trust-on-first-use', function() { knownHosts: knownHostsPath }); - driver.session(); // write into the knownHost file + // create session and transaction to force creation of new connection and writing into the knownHost file + const session = driver.session(); + expect(session.beginTransaction()).toBeDefined(); // duplicate the same serverId twice setTimeout(function() { diff --git a/test/v1/driver.test.js b/test/v1/driver.test.js index 906750da5..c483bf4fa 100644 --- a/test/v1/driver.test.js +++ b/test/v1/driver.test.js @@ -53,7 +53,7 @@ describe('driver', function() { }; // When - driver.session(); + startNewTransaction(driver); }); it('should handle wrong scheme', () => { @@ -84,7 +84,7 @@ describe('driver', function() { }; // When - driver.session(); + startNewTransaction(driver); }); it('should indicate success early on correct credentials', function(done) { @@ -97,7 +97,7 @@ describe('driver', function() { }; // When - driver.session(); + startNewTransaction(driver); }); it('should be possible to pass a realm with basic auth tokens', function(done) { @@ -110,7 +110,7 @@ describe('driver', function() { }; // When - driver.session(); + startNewTransaction(driver); }); it('should be possible to create custom auth tokens', function(done) { @@ -123,7 +123,7 @@ describe('driver', function() { }; // When - driver.session(); + startNewTransaction(driver); }); it('should be possible to create custom auth tokens with additional parameters', function(done) { @@ -136,7 +136,7 @@ describe('driver', function() { }; // When - driver.session(); + startNewTransaction(driver); }); it('should fail nicely when connecting with routing to standalone server', done => { @@ -151,7 +151,7 @@ describe('driver', function() { }; // When - driver.session(); + startNewTransaction(driver); }); it('should have correct user agent', () => { @@ -192,4 +192,13 @@ describe('driver', function() { }); }); + /** + * Starts new transaction to force new network connection. + * @param {Driver} driver - the driver to use. + */ + function startNewTransaction(driver) { + const session = driver.session(); + expect(session.beginTransaction()).toBeDefined(); + } + }); diff --git a/test/v1/routing.driver.boltkit.it.js b/test/v1/routing.driver.boltkit.it.js index 457f48d5a..f4aafa70a 100644 --- a/test/v1/routing.driver.boltkit.it.js +++ b/test/v1/routing.driver.boltkit.it.js @@ -939,11 +939,10 @@ describe('routing driver', () => { for (let i = 0; i < acquiredConnections.length; i++) { expect(acquiredConnections[i]).toBe(releasedConnections[i]); } - done(); }); }); - }); + }).catch(console.log); }); }); diff --git a/test/v1/session.test.js b/test/v1/session.test.js index 30fbcbf67..6f03c01f1 100644 --- a/test/v1/session.test.js +++ b/test/v1/session.test.js @@ -20,6 +20,9 @@ import neo4j from '../../src/v1'; import {statementType} from '../../src/v1/result-summary'; import Session from '../../src/v1/session'; +import {READ} from '../../src/v1/driver'; +import {SingleConnectionProvider} from '../../src/v1/internal/connection-providers'; +import FakeConnection from '../internal/fake-connection'; describe('session', () => { @@ -47,14 +50,14 @@ describe('session', () => { it('close should invoke callback ', done => { const connection = new FakeConnection(); - const session = new Session(Promise.resolve(connection)); + const session = newSessionWithConnection(connection); session.close(done); }); it('close should invoke callback even when already closed ', done => { const connection = new FakeConnection(); - const session = new Session(Promise.resolve(connection)); + const session = newSessionWithConnection(connection); session.close(() => { session.close(() => { @@ -67,16 +70,16 @@ describe('session', () => { it('close should be idempotent ', done => { const connection = new FakeConnection(); - const session = new Session(Promise.resolve(connection)); + const session = newSessionWithConnection(connection); session.close(() => { - expect(connection.closedOnce()).toBeTruthy(); + expect(connection.isReleasedOnceOnSessionClose()).toBeTruthy(); session.close(() => { - expect(connection.closedOnce()).toBeTruthy(); + expect(connection.isReleasedOnceOnSessionClose()).toBeTruthy(); session.close(() => { - expect(connection.closedOnce()).toBeTruthy(); + expect(connection.isReleasedOnceOnSessionClose()).toBeTruthy(); done(); }); }); @@ -370,20 +373,25 @@ describe('session', () => { it('should be able to close a long running query ', done => { //given a long running query - session.run("unwind range(1,1000000) as x create (n {prop:x}) delete n"); + session.run('unwind range(1,1000000) as x create (n {prop:x}) delete n').catch(error => { + // long running query should fail + expect(error).toBeDefined(); - //wait some time than close the session and run - //a new query + // and it should be possible to start another session and run a query + const anotherSession = driver.session(); + anotherSession.run('RETURN 1.0 as a').then(result => { + expect(result.records.length).toBe(1); + expect(result.records[0].get('a')).toBe(1); + done(); + }).catch(error => { + console.log('Query failed after a long running query was terminated', error); + }); + }); + + // wait some time than close the session with a long running query setTimeout(() => { session.close(); - const anotherSession = driver.session(); - setTimeout(() => { - anotherSession.run("RETURN 1.0 as a") - .then(ignore => { - done(); - }); - }, 500); - }, 500); + }, 1000); }); it('should fail nicely on unpackable values ', done => { @@ -441,30 +449,138 @@ describe('session', () => { expect(() => driver.session('ILLEGAL_MODE')).toThrow(); }); - class FakeConnection { + it('should release connection to the pool after run', done => { + withQueryInTmpSession(driver, () => { + const idleConnectionsBefore = idleConnectionCount(driver); - constructor() { - this.resetInvoked = 0; - this.syncInvoked = 0; - this.releaseInvoked = 0; - } + session.run('RETURN 1').then(() => { + const idleConnectionsAfter = idleConnectionCount(driver); + expect(idleConnectionsBefore).toEqual(idleConnectionsAfter); + done(); + }); + }); + }); - reset() { - this.resetInvoked++; - } + it('should release connection to the pool after run failure', done => { + withQueryInTmpSession(driver, () => { + const idleConnectionsBefore = idleConnectionCount(driver); - sync() { - this.syncInvoked++; - } + session.run('RETURN 10 / 0').catch(() => { + const idleConnectionsAfter = idleConnectionCount(driver); + expect(idleConnectionsBefore).toEqual(idleConnectionsAfter); + done(); + }); + }); + }); - _release() { - this.releaseInvoked++; - } + it('should release connection to the pool when result is consumed', done => { + withQueryInTmpSession(driver, () => { + const idleConnectionsBefore = idleConnectionCount(driver); + + session.run('UNWIND range(0, 10) AS x RETURN x + 1').subscribe({ + onNext: () => { + // one less idle connection, one connection is used for the current query + expect(idleConnectionCount(driver)).toBe(idleConnectionsBefore - 1); + }, + onError: error => { + console.log(error); + }, + onCompleted: () => { + expect(idleConnectionCount(driver)).toBe(idleConnectionsBefore); + done(); + } + }); + }); + }); - closedOnce() { - return this.resetInvoked === 1 && this.syncInvoked === 1 && this.releaseInvoked === 1; - } + it('should release connection to the pool when result fails', done => { + withQueryInTmpSession(driver, () => { + const idleConnectionsBefore = idleConnectionCount(driver); + + session.run('UNWIND range(10, 0, -1) AS x RETURN 10 / x').subscribe({ + onNext: () => { + // one less idle connection, one connection is used for the current query + expect(idleConnectionCount(driver)).toBe(idleConnectionsBefore - 1); + }, + onError: ignoredError => { + expect(idleConnectionCount(driver)).toBe(idleConnectionsBefore); + done(); + }, + onCompleted: () => { + } + }); + }); + }); + + it('should release connection to the pool when transaction commits', done => { + withQueryInTmpSession(driver, () => { + const idleConnectionsBefore = idleConnectionCount(driver); + + const tx = session.beginTransaction(); + tx.run('UNWIND range(0, 10) AS x RETURN x + 1').subscribe({ + onNext: () => { + // one less idle connection, one connection is used for the current transaction + expect(idleConnectionCount(driver)).toBe(idleConnectionsBefore - 1); + }, + onError: error => { + console.log(error); + }, + onCompleted: () => { + // one less idle connection, one connection is used for the current transaction + expect(idleConnectionCount(driver)).toBe(idleConnectionsBefore - 1); + tx.commit().then(() => { + expect(idleConnectionCount(driver)).toBe(idleConnectionsBefore); + done(); + }); + } + }); + }); + }); + + it('should release connection to the pool when transaction rolls back', done => { + withQueryInTmpSession(driver, () => { + const idleConnectionsBefore = idleConnectionCount(driver); + + const tx = session.beginTransaction(); + tx.run('UNWIND range(0, 10) AS x RETURN x + 1').subscribe({ + onNext: () => { + // one less idle connection, one connection is used for the current transaction + expect(idleConnectionCount(driver)).toBe(idleConnectionsBefore - 1); + }, + onError: error => { + console.log(error); + }, + onCompleted: () => { + // one less idle connection, one connection is used for the current transaction + expect(idleConnectionCount(driver)).toBe(idleConnectionsBefore - 1); + tx.rollback().then(() => { + expect(idleConnectionCount(driver)).toBe(idleConnectionsBefore); + done(); + }); + } + }); + }); + }); + + function withQueryInTmpSession(driver, callback) { + const tmpSession = driver.session(); + return tmpSession.run('RETURN 1').then(() => { + tmpSession.close(callback); + }); } -}); + function newSessionWithConnection(connection) { + const connectionProvider = new SingleConnectionProvider(Promise.resolve(connection)); + const session = new Session(READ, connectionProvider); + session.beginTransaction(); // force session to acquire new connection + return session; + } + function idleConnectionCount(driver) { + const connectionProvider = driver._connectionProvider; + const address = connectionProvider._address; + const connectionPool = connectionProvider._connectionPool; + const idleConnections = connectionPool._pools[address]; + return idleConnections.length; + } +}); diff --git a/test/v1/tck/steps/erroreportingsteps.js b/test/v1/tck/steps/erroreportingsteps.js index 2c75dfe9d..54d2306fa 100644 --- a/test/v1/tck/steps/erroreportingsteps.js +++ b/test/v1/tck/steps/erroreportingsteps.js @@ -65,7 +65,7 @@ module.exports = function () { var self = this; var driver = neo4j.driver("bolt://localhost:7777", neo4j.auth.basic("neo4j", "neo4j")); driver.onError = function (error) { self.error = error; callback()}; - driver.session(); + driver.session().beginTransaction(); setTimeout(callback, 1000); });