From 4015790b284f4333ce26c6943b761b0800154774 Mon Sep 17 00:00:00 2001 From: Diogo Resende Date: Fri, 4 Nov 2016 21:23:32 +0000 Subject: [PATCH 1/6] tests: adds mysql.format() coverage --- test/integration/connection/test-format.js | 14 ++++++++++++++ 1 file changed, 14 insertions(+) create mode 100644 test/integration/connection/test-format.js diff --git a/test/integration/connection/test-format.js b/test/integration/connection/test-format.js new file mode 100644 index 000000000..27ae171a7 --- /dev/null +++ b/test/integration/connection/test-format.js @@ -0,0 +1,14 @@ +var path = require('path'); +var assert = require('assert'); +var common = require('../../common'); +var lib = require(path.resolve(common.lib, '../index')); + +assert.equal( + lib.format('SELECT * FROM ?? WHERE ?? = ?', [ 'table', 'property', 123 ]), + 'SELECT * FROM `table` WHERE `property` = 123' +); + +assert.equal( + lib.format('INSERT INTO ?? SET ?', [ 'table', { property: 123 } ]), + 'INSERT INTO `table` SET `property` = 123' +); From dce49bdd92d3195e9da4c2369da60b562ac50c97 Mon Sep 17 00:00:00 2001 From: Diogo Resende Date: Fri, 4 Nov 2016 21:31:52 +0000 Subject: [PATCH 2/6] tests: adds mysql.Types coverage --- test/integration/connection/test-types.js | 11 +++++++++++ 1 file changed, 11 insertions(+) create mode 100644 test/integration/connection/test-types.js diff --git a/test/integration/connection/test-types.js b/test/integration/connection/test-types.js new file mode 100644 index 000000000..a9d97a7ab --- /dev/null +++ b/test/integration/connection/test-types.js @@ -0,0 +1,11 @@ +var path = require('path'); +var assert = require('assert'); +var common = require('../../common'); +var lib = require(path.resolve(common.lib, '../index')); +var types = require(path.resolve(common.lib, 'protocol/constants/types')); + +assert.equal(typeof lib.Types, "object"); + +for (var k in types) { + assert.equal(lib.Types[k], types[k]); +} From 77a23c1ed6c387d6a1ab68ca75854757c567876e Mon Sep 17 00:00:00 2001 From: Diogo Resende Date: Fri, 4 Nov 2016 22:10:36 +0000 Subject: [PATCH 3/6] tests: adds change user charset coverage --- .../connection/test-change-user-charset.js | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) create mode 100644 test/integration/connection/test-change-user-charset.js diff --git a/test/integration/connection/test-change-user-charset.js b/test/integration/connection/test-change-user-charset.js new file mode 100644 index 000000000..9e3e0219f --- /dev/null +++ b/test/integration/connection/test-change-user-charset.js @@ -0,0 +1,18 @@ +var assert = require('assert'); +var common = require('../../common'); + +common.getTestConnection(function (err, connection) { + assert.ifError(err); + + // should change charset + connection.changeUser({charset:'KOI8R_GENERAL_CI'}, function (err) { + assert.ifError(err); + + connection.query('SHOW VARIABLES LIKE \'character_set_client\'', function (err, result) { + assert.ifError(err); + assert.strictEqual(result[0]['Value'], 'koi8r'); + + connection.destroy(); + }); + }); +}); From c97737ae478aec1f0cfcaf3b486a1d26805f3c23 Mon Sep 17 00:00:00 2001 From: Diogo Resende Date: Fri, 4 Nov 2016 22:11:03 +0000 Subject: [PATCH 4/6] tests: adds another way of making a simple query with object params --- test/integration/connection/test-query.js | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/test/integration/connection/test-query.js b/test/integration/connection/test-query.js index 42ddba4d7..2d5aa4cd7 100644 --- a/test/integration/connection/test-query.js +++ b/test/integration/connection/test-query.js @@ -8,6 +8,13 @@ common.getTestConnection(function (err, connection) { assert.ifError(err); assert.deepEqual(rows, [{1: 1}]); assert.equal(fields[0].name, '1'); - connection.end(assert.ifError); + + // this is a coverage test, it shuold perform exactly as the previous one + connection.query({ sql: 'SELECT ?' }, [ 1 ], function (err, rows, fields) { + assert.ifError(err); + assert.deepEqual(rows, [{1: 1}]); + assert.equal(fields[0].name, '1'); + connection.end(assert.ifError); + }); }); }); From 7aa6da6f5928ebedb5a75c9c6c2ddd4ded2d00cd Mon Sep 17 00:00:00 2001 From: Diogo Resende Date: Fri, 4 Nov 2016 22:11:34 +0000 Subject: [PATCH 5/6] tests: updates change user test to cover calling without specifing new user --- test/unit/connection/test-change-user.js | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/test/unit/connection/test-change-user.js b/test/unit/connection/test-change-user.js index fb38fe8bc..dfe59c78a 100644 --- a/test/unit/connection/test-change-user.js +++ b/test/unit/connection/test-change-user.js @@ -21,8 +21,18 @@ server.listen(common.fakeServerPort, function(err) { assert.ifError(err); assert.strictEqual(result[0]['CURRENT_USER()'], 'user_2@localhost'); - connection.destroy(); - server.destroy(); + // should keep current user + connection.changeUser(function (err) { + assert.ifError(err); + + connection.query('SELECT CURRENT_USER()', function (err, result) { + assert.ifError(err); + assert.strictEqual(result[0]['CURRENT_USER()'], 'user_2@localhost'); + + connection.destroy(); + server.destroy(); + }); + }); }); }); }); From 342cf6029444866dedf35526cecdc32af829835f Mon Sep 17 00:00:00 2001 From: ifsnow Date: Thu, 1 Dec 2016 17:17:18 +0900 Subject: [PATCH 6/6] New Pool options and refactoring. --- Readme.md | 26 +- lib/ConnectionConfig.js | 20 +- lib/Pool.js | 259 ++----- lib/PoolConfig.js | 45 +- lib/PoolConnection.js | 37 +- lib/PoolConnectionManager.js | 727 ++++++++++++++++++ lib/protocol/sequences/ChangeUser.js | 10 +- .../unit/pool/test-connection-min-max-free.js | 62 ++ test/unit/pool/test-connection-pool-data.js | 26 + test/unit/pool/test-connection-start-wait.js | 37 + test/unit/pool/test-connection-start.js | 32 + .../unit/pool/test-connection-without-ping.js | 37 + test/unit/pool/test-destroy-connection.js | 4 +- test/unit/pool/test-queue-timeout.js | 33 + test/unit/pool/test-status.js | 35 + 15 files changed, 1125 insertions(+), 265 deletions(-) create mode 100644 lib/PoolConnectionManager.js create mode 100644 test/unit/pool/test-connection-min-max-free.js create mode 100644 test/unit/pool/test-connection-pool-data.js create mode 100644 test/unit/pool/test-connection-start-wait.js create mode 100644 test/unit/pool/test-connection-start.js create mode 100644 test/unit/pool/test-connection-without-ping.js create mode 100644 test/unit/pool/test-queue-timeout.js create mode 100644 test/unit/pool/test-status.js diff --git a/Readme.md b/Readme.md index 5a78c2816..05445c85c 100644 --- a/Readme.md +++ b/Readme.md @@ -389,12 +389,27 @@ constructor. In addition to those options pools accept a few extras: * `queueLimit`: The maximum number of connection requests the pool will queue before returning an error from `getConnection`. If set to `0`, there is no limit to the number of queued connection requests. (Default: `0`) +* `queueWaitTimeout`: The maximum number of milliseconds that the pool will wait + for a connection when there are no available spare connections. If set to `0`, + there is no limit. (Default: `0`) +* `pingCheckInterval`: The number of milliseconds to indicate how often to check + the validity of the connection. If set to `0`, checks whether or not the connection + to the server is working every time. Setting this value in high-traffic can + improve performance. (Default: `0`) +* `startConnections`: The initial number of connections that are created when the + pool is started. (Default: `0`) +* `minSpareConnections`: The minimum number of spare connections that should be + kept in the pool at all times. If set to `0`, it doen't work. (Default: `0`) +* `maxSpareConnections`: The maximum number of spare connections that should be + kept in the pool at all times. If set to `0`, it doen't work. (Default: `0`) +* `spareCheckInterval`: The number of milliseconds to indicate how often to check + the status of spare connections. If set to `0`, it doen't work. (Default: `0`) ## Pool events ### connection -The pool will emit a `connection` event when a new connection is made within the pool. +The pool will emit a `connection` event when a new connection is made within the pool. If you need to set session variables on the connection before it gets used, you can listen to the `connection` event. @@ -415,6 +430,11 @@ pool.on('enqueue', function () { }); ``` +### prepared + +The pool will emit an `prepared` event when all new connections are created in. +It's meaningful when `startConnections` is set. + ## Closing all the connections in a pool When you are done using the pool, you have to end all the connections or the @@ -463,7 +483,7 @@ poolCluster.getConnection('MASTER', function (err, connection) {}); // Target Group : SLAVE1-2, Selector : order // If can't connect to SLAVE1, return SLAVE2. (remove SLAVE1 in the cluster) poolCluster.on('remove', function (nodeId) { - console.log('REMOVED NODE : ' + nodeId); // nodeId = SLAVE1 + console.log('REMOVED NODE : ' + nodeId); // nodeId = SLAVE1 }); poolCluster.getConnection('SLAVE*', 'ORDER', function (err, connection) {}); @@ -485,7 +505,7 @@ poolCluster.end(function (err) { ### PoolCluster options * `canRetry`: If `true`, `PoolCluster` will attempt to reconnect when connection fails. (Default: `true`) -* `removeNodeErrorCount`: If connection fails, node's `errorCount` increases. +* `removeNodeErrorCount`: If connection fails, node's `errorCount` increases. When `errorCount` is greater than `removeNodeErrorCount`, remove a node in the `PoolCluster`. (Default: `5`) * `restoreNodeTimeout`: If connection fails, specifies the number of milliseconds before another connection attempt will be made. If set to `0`, then node will be diff --git a/lib/ConnectionConfig.js b/lib/ConnectionConfig.js index c7543f816..baf846586 100644 --- a/lib/ConnectionConfig.js +++ b/lib/ConnectionConfig.js @@ -56,10 +56,22 @@ function ConnectionConfig(options) { : options.charsetNumber||Charsets.UTF8_GENERAL_CI; // Set the client flags - var defaultFlags = ConnectionConfig.getDefaultFlags(options); - this.clientFlags = ConnectionConfig.mergeFlags(defaultFlags, options.flags); + this.clientFlags = ConnectionConfig._getClientFlags(this.multipleStatements, this.flags); } +var _clientFlagsCache = {}; + +ConnectionConfig._getClientFlags = function(multipleStatements, flags) { + var key = flags + multipleStatements; + + if (_clientFlagsCache[key] === undefined) { + var defaultFlags = ConnectionConfig.getDefaultFlags(multipleStatements); + _clientFlagsCache[key] = ConnectionConfig.mergeFlags(defaultFlags, flags); + } + + return _clientFlagsCache[key]; +}; + ConnectionConfig.mergeFlags = function mergeFlags(defaultFlags, userFlags) { var allFlags = ConnectionConfig.parseFlagList(defaultFlags); var newFlags = ConnectionConfig.parseFlagList(userFlags); @@ -93,7 +105,7 @@ ConnectionConfig.getCharsetNumber = function getCharsetNumber(charset) { return num; }; -ConnectionConfig.getDefaultFlags = function getDefaultFlags(options) { +ConnectionConfig.getDefaultFlags = function getDefaultFlags(multipleStatements) { var defaultFlags = [ '-COMPRESS', // Compression protocol *NOT* supported '-CONNECT_ATTRS', // Does *NOT* send connection attributes in Protocol::HandshakeResponse41 @@ -114,7 +126,7 @@ ConnectionConfig.getDefaultFlags = function getDefaultFlags(options) { '+TRANSACTIONS' // Expects status flags ]; - if (options && options.multipleStatements) { + if (multipleStatements) { // May send multiple statements per COM_QUERY and COM_STMT_PREPARE defaultFlags.push('+MULTI_STATEMENTS'); } diff --git a/lib/Pool.js b/lib/Pool.js index ee8266936..ef4fc6105 100644 --- a/lib/Pool.js +++ b/lib/Pool.js @@ -2,187 +2,58 @@ var mysql = require('../'); var Connection = require('./Connection'); var EventEmitter = require('events').EventEmitter; var Util = require('util'); -var PoolConnection = require('./PoolConnection'); +var PoolConnectionManager = require('./PoolConnectionManager'); module.exports = Pool; Util.inherits(Pool, EventEmitter); function Pool(options) { EventEmitter.call(this); - this.config = options.config; - this.config.connectionConfig.pool = this; - this._acquiringConnections = []; - this._allConnections = []; - this._freeConnections = []; - this._connectionQueue = []; - this._closed = false; + this.config = options.config; + this._connectionManager = new PoolConnectionManager(this, options.config); + this._closed = false; } -Pool.prototype.getConnection = function (cb) { - - if (this._closed) { - var err = new Error('Pool is closed.'); - err.code = 'POOL_CLOSED'; - process.nextTick(function () { - cb(err); - }); - return; - } - - var connection; - var pool = this; - - if (this._freeConnections.length > 0) { - connection = this._freeConnections.shift(); - this.acquireConnection(connection, cb); - return; - } - - if (this.config.connectionLimit === 0 || this._allConnections.length < this.config.connectionLimit) { - connection = new PoolConnection(this, { config: this.config.newConnectionConfig() }); - - this._acquiringConnections.push(connection); - this._allConnections.push(connection); - - connection.connect({timeout: this.config.acquireTimeout}, function onConnect(err) { - spliceConnection(pool._acquiringConnections, connection); - - if (pool._closed) { - err = new Error('Pool is closed.'); - err.code = 'POOL_CLOSED'; - } - - if (err) { - pool._purgeConnection(connection); - cb(err); - return; - } - - pool.emit('connection', connection); - cb(null, connection); - }); +Pool.prototype.getConnection = function (callback) { + if (this._isClosed(callback)) { return; } - if (!this.config.waitForConnections) { - process.nextTick(function(){ - var err = new Error('No connections available.'); - err.code = 'POOL_CONNLIMIT'; - cb(err); - }); - return; - } + var connectionManager = this._connectionManager; - this._enqueueCallback(cb); -}; - -Pool.prototype.acquireConnection = function acquireConnection(connection, cb) { - if (connection._pool !== this) { - throw new Error('Connection acquired from wrong pool.'); - } - - var changeUser = this._needsChangeUser(connection); - var pool = this; - - this._acquiringConnections.push(connection); - - function onOperationComplete(err) { - spliceConnection(pool._acquiringConnections, connection); - - if (pool._closed) { - err = new Error('Pool is closed.'); - err.code = 'POOL_CLOSED'; - } - - if (err) { - pool._connectionQueue.unshift(cb); - pool._purgeConnection(connection); + if (connectionManager.isPrepared()) { + var connection = connectionManager.getSpareConnection(); + if (connection) { + connectionManager.passConnectionToCallback(connection, callback); return; } - if (changeUser) { - pool.emit('connection', connection); + if (connectionManager.canCreateNewConnection()) { + connectionManager.createNewConnection(callback); + return; } - - cb(null, connection); } - if (changeUser) { - // restore user back to pool configuration - connection.config = this.config.newConnectionConfig(); - connection.changeUser({timeout: this.config.acquireTimeout}, onOperationComplete); - } else { - // ping connection - connection.ping({timeout: this.config.acquireTimeout}, onOperationComplete); - } + connectionManager.addCallbackToWaitingList(callback); }; -Pool.prototype.releaseConnection = function releaseConnection(connection) { - var pool = this; - - if (this._acquiringConnections.indexOf(connection) !== -1) { - // connection is being acquired - return; - } - - if (connection._pool) { - if (connection._pool !== this) { - throw new Error('Connection released to wrong pool'); - } - - if (this._freeConnections.indexOf(connection) !== -1) { - // connection already in free connection pool - // this won't catch all double-release cases - throw new Error('Connection already released'); - } else { - // add connection to end of free queue - this._freeConnections.push(connection); - } - } - - if (this._closed) { - // empty the connection queue - this._connectionQueue.splice(0).forEach(function (cb) { - var err = new Error('Pool is closed.'); - err.code = 'POOL_CLOSED'; - process.nextTick(function () { - cb(err); - }); - }); - } else if (this._connectionQueue.length) { - // get connection with next waiting callback - this.getConnection(this._connectionQueue.shift()); +Pool.prototype.releaseConnection = function (connection) { + if (!this._closed) { + this._connectionManager.releaseConnection(connection); } }; -Pool.prototype.end = function (cb) { +Pool.prototype.end = function (callback) { this._closed = true; - if (typeof cb !== 'function') { - cb = function (err) { + if (typeof callback !== 'function') { + callback = function (err) { if (err) throw err; }; } - var calledBack = false; - var waitingClose = 0; - - function onEnd(err) { - if (!calledBack && (err || --waitingClose <= 0)) { - calledBack = true; - cb(err); - } - } - - while (this._allConnections.length !== 0) { - waitingClose++; - this._purgeConnection(this._allConnections[0], onEnd); - } - - if (waitingClose === 0) { - process.nextTick(onEnd); - } + this._connectionManager.destroy(callback); }; Pool.prototype.query = function (sql, values, cb) { @@ -215,78 +86,34 @@ Pool.prototype.query = function (sql, values, cb) { return query; }; -Pool.prototype._enqueueCallback = function _enqueueCallback(callback) { - - if (this.config.queueLimit && this._connectionQueue.length >= this.config.queueLimit) { - process.nextTick(function () { - var err = new Error('Queue limit reached.'); - err.code = 'POOL_ENQUEUELIMIT'; - callback(err); - }); - return; - } - - // Bind to domain, as dequeue will likely occur in a different domain - var cb = process.domain - ? process.domain.bind(callback) - : callback; - - this._connectionQueue.push(cb); - this.emit('enqueue'); -}; - -Pool.prototype._needsChangeUser = function _needsChangeUser(connection) { - var connConfig = connection.config; - var poolConfig = this.config.connectionConfig; - - // check if changeUser values are different - return connConfig.user !== poolConfig.user - || connConfig.database !== poolConfig.database - || connConfig.password !== poolConfig.password - || connConfig.charsetNumber !== poolConfig.charsetNumber; +Pool.prototype.escape = function(value) { + return mysql.escape(value, this.config.connectionConfig.stringifyObjects, this.config.connectionConfig.timezone); }; -Pool.prototype._purgeConnection = function _purgeConnection(connection, callback) { - var cb = callback || function () {}; - - if (connection.state === 'disconnected') { - connection.destroy(); - } - - this._removeConnection(connection); - - if (connection.state !== 'disconnected' && !connection._protocol._quitSequence) { - connection._realEnd(cb); - return; - } - - process.nextTick(cb); +Pool.prototype.escapeId = function escapeId(value) { + return mysql.escapeId(value, false); }; -Pool.prototype._removeConnection = function(connection) { - connection._pool = null; - - // Remove connection from all connections - spliceConnection(this._allConnections, connection); - - // Remove connection from free connections - spliceConnection(this._freeConnections, connection); - - this.releaseConnection(connection); +Pool.prototype.getStatus = function () { + return this._connectionManager.getStatus(); }; -Pool.prototype.escape = function(value) { - return mysql.escape(value, this.config.connectionConfig.stringifyObjects, this.config.connectionConfig.timezone); +Pool.prototype._purgeConnection = function (connection) { + this._connectionManager.purgeConnection(connection); }; -Pool.prototype.escapeId = function escapeId(value) { - return mysql.escapeId(value, false); -}; +Pool.prototype._isClosed = function (callback) { + if (this._closed) { + if (callback) { + var err = new Error('Pool is closed.'); + err.code = 'POOL_CLOSED'; + process.nextTick(function () { + callback(err); + }); + } -function spliceConnection(array, connection) { - var index; - if ((index = array.indexOf(connection)) !== -1) { - // Remove connection from all connections - array.splice(index, 1); + return true; } -} + + return false; +}; diff --git a/lib/PoolConfig.js b/lib/PoolConfig.js index 8c5017a27..472e82aaa 100644 --- a/lib/PoolConfig.js +++ b/lib/PoolConfig.js @@ -7,26 +7,37 @@ function PoolConfig(options) { options = ConnectionConfig.parseUrl(options); } - this.acquireTimeout = (options.acquireTimeout === undefined) - ? 10 * 1000 - : Number(options.acquireTimeout); - this.connectionConfig = new ConnectionConfig(options); - this.waitForConnections = (options.waitForConnections === undefined) - ? true - : Boolean(options.waitForConnections); - this.connectionLimit = (options.connectionLimit === undefined) - ? 10 - : Number(options.connectionLimit); - this.queueLimit = (options.queueLimit === undefined) - ? 0 - : Number(options.queueLimit); + this.connectionConfig = new ConnectionConfig(options); + + this.acquireTimeout = this._getPropertyNumber(options.acquireTimeout, 10 * 1000); + this.waitForConnections = this._getPropertyBoolean(options.waitForConnections, true); + this.connectionLimit = this._getPropertyNumber(options.connectionLimit, 10); + this.queueLimit = this._getPropertyNumber(options.queueLimit, 0); + this.queueWaitTimeout = this._getPropertyNumber(options.queueWaitTimeout, 0); + this.pingCheckInterval = this._getPropertyNumber(options.pingCheckInterval, 0); + this.startConnections = this._getPropertyNumber(options.startConnections, 0); + this.minSpareConnections = this._getPropertyNumber(options.minSpareConnections, 0); + this.maxSpareConnections = this._getPropertyNumber(options.maxSpareConnections, 0); + this.spareCheckInterval = this._getPropertyNumber(options.spareCheckInterval, 0); } +PoolConfig.prototype._getPropertyNumber = function _getPropertyNumber(value, defaultValue) { + return value === undefined ? defaultValue : Number(value); +}; + +PoolConfig.prototype._getPropertyBoolean = function _getPropertyBoolean(value, defaultValue) { + return value === undefined ? defaultValue : Boolean(value); +}; + PoolConfig.prototype.newConnectionConfig = function newConnectionConfig() { - var connectionConfig = new ConnectionConfig(this.connectionConfig); + var newConfig = {}; + var connectionConfig = this.connectionConfig; - connectionConfig.clientFlags = this.connectionConfig.clientFlags; - connectionConfig.maxPacketSize = this.connectionConfig.maxPacketSize; + for (var key in connectionConfig) { + if (connectionConfig.hasOwnProperty(key)) { + newConfig[key] = connectionConfig[key]; + } + } - return connectionConfig; + return newConfig; }; diff --git a/lib/PoolConnection.js b/lib/PoolConnection.js index e4083f7b1..47959e4c0 100644 --- a/lib/PoolConnection.js +++ b/lib/PoolConnection.js @@ -5,9 +5,18 @@ var Events = require('events'); module.exports = PoolConnection; inherits(PoolConnection, Connection); +var poolConnectionId = 1; + function PoolConnection(pool, options) { Connection.call(this, options); - this._pool = pool; + + this._pool = pool; + this._poolData = { + id : poolConnectionId++, + used : true, + removed : false, + lastUsedTime : 0 + }; // Bind connection to pool domain if (Events.usingDomains) { @@ -31,15 +40,10 @@ function PoolConnection(pool, options) { }); } -PoolConnection.prototype.release = function release() { - var pool = this._pool; - var connection = this; - - if (!pool || pool._closed) { - return undefined; +PoolConnection.prototype.release = function () { + if (this._pool) { + this._pool.releaseConnection(this); } - - return pool.releaseConnection(this); }; // TODO: Remove this when we are removing PoolConnection#end @@ -55,17 +59,12 @@ PoolConnection.prototype.end = function () { }; PoolConnection.prototype.destroy = function () { - Connection.prototype.destroy.apply(this, arguments); - this._removeFromPool(this); + Connection.prototype.destroy.call(this); + this._removeFromPool(); }; -PoolConnection.prototype._removeFromPool = function _removeFromPool() { - if (!this._pool || this._pool._closed) { - return; +PoolConnection.prototype._removeFromPool = function () { + if (this._pool) { + this._pool._purgeConnection(this); } - - var pool = this._pool; - this._pool = null; - - pool._purgeConnection(this); }; diff --git a/lib/PoolConnectionManager.js b/lib/PoolConnectionManager.js new file mode 100644 index 000000000..e78bff44b --- /dev/null +++ b/lib/PoolConnectionManager.js @@ -0,0 +1,727 @@ +var PoolConnection = require('./PoolConnection'); + +module.exports = PoolConnectionManager; + +/** + * PoolConnectionManager + * + * @constructor + * @param {Pool} pool + * @param {PoolConfig} config + * @api public + */ + +function PoolConnectionManager(pool, config) { + this._pool = pool; + this._config = config; + + // frequently used variables for performance + this._neverWaitForConnections = !config.waitForConnections; + this._connectionLimit = config.connectionLimit; + this._queueLimit = config.queueLimit; + this._pingCheckInterval = config.pingCheckInterval; + this._minSpareConnections = config.minSpareConnections; + this._maxSpareConnections = config.maxSpareConnections; + this._timeoutConfig = { + timeout: config.acquireTimeout + }; + this._handleWaitingCallbacksFn = this._handleWaitingCallbacks.bind(this); + + // base storage for connection and callback + this._allConnections = this._createAllConnections(); + this._spareConnections = this._createSpareConnections(); + this._waitingList = this._createWaitingList(config.queueWaitTimeout); + + // create initial starting connections + if (config.startConnections > 0) { + this._prepared = false; + this._prepareStartConnections(config.startConnections); + } else { + this._prepared = true; + this._onPreparedStartConnections(0); + } +} + +/** + * Check if `Pool` is prepared. + * + * @return {boolean} + * @api public + */ + +PoolConnectionManager.prototype.isPrepared = function () { + return this._prepared; +}; + +/** + * Return a spare connection. + * If there's no spare connection, return null. + * + * @returns {Object|null} + * @api public + */ + +PoolConnectionManager.prototype.getSpareConnection = function () { + if (this._spareConnections.isEmpty()) { + return null; + } + + var connectionId = this._spareConnections.pick(); + var connection = this._allConnections.getById(connectionId); + + if (connection) { + connection._poolData.used = true; + } + + return connection; +}; + +/** + * Pass the acquired connection to the callback. + * + * @param {PoolConnection} connection + * @return {Function} callback + * @api public + */ + +PoolConnectionManager.prototype.passConnectionToCallback = function (connection, callback) { + var pool = this._pool; + var self = this; + + if (connection._pool !== pool) { + this._raiseError(callback, 'POOL_ERROR', 'Connection acquired from wrong pool.'); + return; + } + + // if user is changed + var changedUser = this._isChangedUser(connection); + + if (changedUser) { + // restore user back to pool configuration + connection.config = this._config.newConnectionConfig(); + connection.changeUser(this._timeoutConfig, onOperationComplete); + return; + } + + // Reuse of recently used connections + if (this._pingCheckInterval && (Date.now() - connection._poolData.lastUsedTime) < this._pingCheckInterval) { + process.nextTick(function() { + if (connection._poolData.removed) { + connection.ping(self._timeoutConfig, onOperationComplete); + } else { + callback(null, connection); + } + }); + return; + } + + connection.ping(this._timeoutConfig, onOperationComplete); + + function onOperationComplete(err) { + if (pool._isClosed(callback)) { + return; + } + + if (err) { + self._waitingList.rollback(callback); + self.purgeConnection(connection); + return; + } + + if (changedUser) { + connection.config.changedUser = false; + pool.emit('connection', connection); + } + + self._updateConnectionLastUsedTime(connection); + + callback(null, connection); + } +}; + +/** + * Check if `Pool` can create a new connection. + * + * @return {Boolean} + * @api public + */ + +PoolConnectionManager.prototype.canCreateNewConnection = function () { + return (this._connectionLimit === 0 || this._allConnections.size() < this._connectionLimit); +}; + +/** + * Create a new connection. + * + * @param {Function} callback + * @api public + */ + +PoolConnectionManager.prototype.createNewConnection = function (callback) { + var pool = this._pool; + var self = this; + + var connection = new PoolConnection(pool, { config: this._config.newConnectionConfig() }); + this._updateConnectionLastUsedTime(connection); + this._allConnections.add(connection); + + connection.connect(this._timeoutConfig, function onConnect(err) { + if (pool._isClosed(callback)) { + return; + } + + if (err) { + self.purgeConnection(connection); + callback(err); + return; + } + + pool.emit('connection', connection); + + callback(null, connection); + }); +}; + +/** + * Add a callback to the waiting list. + * The callback is handled when possible. + * + * @param {Function} callback + * @api public + */ + +PoolConnectionManager.prototype.addCallbackToWaitingList = function (callback) { + if (this.isPrepared()) { + if (this._neverWaitForConnections) { + this._raiseError(callback, 'POOL_CONNLIMIT', 'No connections available.'); + return; + } + + if (this._queueLimit && this._waitingList.size() >= this._queueLimit) { + this._raiseError(callback, 'POOL_ENQUEUELIMIT', 'Queue limit reached.'); + return; + } + } + + // Bind to domain, as dequeue will likely occur in a different domain + if (process.domain) { + this._waitingList.add(process.domain.bind(callback)); + } else { + this._waitingList.add(callback); + } + + this._pool.emit('enqueue'); +}; + +/** + * Release an unnecessary connection. + * + * @param {PoolConnection} connection + * @api public + */ + +PoolConnectionManager.prototype.releaseConnection = function (connection) { + if (!connection._poolData.used) { + throw new Error('Connection already released'); + } + + if (this._waitingList.isEmpty()) { + this._updateConnectionLastUsedTime(connection, false); + this._spareConnections.add(connection); + } else { + var callback = this._waitingList.pick(); + this.passConnectionToCallback(connection, callback); + } +}; + +/** + * Destroy all. + * + * @param {Function} callback + * @api public + */ + +PoolConnectionManager.prototype.destroy = function (callback) { + clearInterval(this._spareCheckHandle); + + // throws error to waiting connections + var self = this; + + this._waitingList.destroy(function (waitingCallback) { + self._raiseError(waitingCallback, 'POOL_CLOSED', 'Pool is closed.'); + }); + + var waitingClose = this._allConnections.size(); + if (waitingClose === 0) { + callback(); + return; + } + + // purges all connections + var calledBack = false; + function onEndPurge(err) { + if (!calledBack && (err || --waitingClose <= 0)) { + calledBack = true; + callback(err); + } + } + + this._allConnections.destroy(function (connection) { + self.purgeConnection(connection, onEndPurge); + }); +}; + +/** + * Purge an connection from `Pool`. + * + * @param {PoolConnection} connection + * @param {Function} callback + * @api public + */ + +PoolConnectionManager.prototype.purgeConnection = function (connection, callback) { + if (connection._poolData.removed) { + return; + } + + connection._poolData.removed = true; + + if (typeof callback === 'undefined') { + callback = function() {}; + } + + var isDisconnected = connection.state === 'disconnected'; + if (isDisconnected) { + connection.destroy(); + } + + this._removeConnection(connection); + + if (!isDisconnected && !connection._protocol._quitSequence) { + connection._realEnd(callback); + return; + } + + process.nextTick(callback); + process.nextTick(this._handleWaitingCallbacksFn); +}; + +/** + * Return the connection status. + * + * @return {Object} + * @api public + */ + +PoolConnectionManager.prototype.getStatus = function () { + return { + all : this._allConnections.size(), + use : this._allConnections.size() - this._spareConnections.size(), + spare : this._spareConnections.size(), + waiting : this._waitingList.size() + }; +}; + +/** + * Remove a connection from internal storages. + * + * @param {PoolConnection} connection + * @api private + */ + +PoolConnectionManager.prototype._removeConnection = function (connection) { + if (connection._pool) { + connection._pool = null; + this._allConnections.remove(connection); + this._spareConnections.remove(connection); + } +}; + +/** + * Update lastUsedTime, used of a connection. + * + * @param {PoolConnection} connection + * @param {Boolean} used + * @api private + */ + +PoolConnectionManager.prototype._updateConnectionLastUsedTime = function (connection, used) { + if (this._pingCheckInterval > 0) { + connection._poolData.lastUsedTime = Date.now(); + } + + if (used !== undefined) { + connection._poolData.used = used; + } +}; + +/** + * Handle one of the callbacks that are waiting. + * + * @api private + */ + +PoolConnectionManager.prototype._handleWaitingCallbacks = function () { + if (this._waitingList.isEmpty()) { + return; + } + + var connection = this.getSpareConnection(); + if (connection) { + this.passConnectionToCallback(connection, this._waitingList.pick()); + } else if (this.canCreateNewConnection()) { + this.createNewConnection(this._waitingList.pick()); + } +}; + +/** + * Check if user is changed. + * + * @param {PoolConnection} connection + * @return {Boolean} + * @api private + */ + +PoolConnectionManager.prototype._isChangedUser = function (connection) { + var connConfig = connection.config; + var poolConfig = this._config.connectionConfig; + + return connConfig.changedUser && + (connConfig.user !== poolConfig.user || + connConfig.database !== poolConfig.database || + connConfig.password !== poolConfig.password || + connConfig.charsetNumber !== poolConfig.charsetNumber); +}; + +/** + * Prepare an initial starting connection. + * + * @param {number} startConnections + * @api private + */ + +PoolConnectionManager.prototype._prepareStartConnections = function (startConnections) { + var self = this; + + var createdConnectionCount = 0; + var step = 0; + + for (var i=1; i <= startConnections; i++) { + this._createNewSpareConnectionForSystem(function(success) { + if (success) { + createdConnectionCount++; + } + + if (++step === startConnections) { + self._onPreparedStartConnections(createdConnectionCount); + } + }); + } +}; + +/** + * Create a new spare connection + * + * @param {Function} callback + * @api private + */ + +PoolConnectionManager.prototype._createNewSpareConnectionForSystem = function (callback) { + if (callback === undefined) { + callback = function() {}; + } + + var pool = this._pool; + var self = this; + + process.nextTick(function() { + var connection = new PoolConnection(pool, { config: self._config.newConnectionConfig() }); + + connection.connect(self._timeoutConfig, function onConnect(err) { + if (pool._isClosed() || err) { + callback(false); + return; + } + + self._updateConnectionLastUsedTime(connection, false); + self._allConnections.add(connection); + self._spareConnections.add(connection); + pool.emit('connection', connection); + callback(true); + }); + }); +}; + +/** + * Called when `Pool` is ready. + * + * @param {number} createdConnectionCount + * @api private + */ + +PoolConnectionManager.prototype._onPreparedStartConnections = function (createdConnectionCount) { + this._maxSpareConnections = Math.max(this._maxSpareConnections, this._minSpareConnections); + + if ((this._minSpareConnections > 0 || this._maxSpareConnections > 0) && this._config.spareCheckInterval) { + var self = this; + this._spareCheckHandle = setInterval(function() { + self._handleSpareConnections(); + }, this._config.spareCheckInterval); + } + + this._prepared = true; + this._pool.emit('prepared', createdConnectionCount); + + if (createdConnectionCount > 0 && !this._waitingList.isEmpty()) { + for(var i = 0, len = createdConnectionCount; i < len; i++) { + process.nextTick(this._handleWaitingCallbacksFn); + } + } +}; + +/** + * Handle spare connections. + * + * @api private + */ + +PoolConnectionManager.prototype._handleSpareConnections = function() { + // check minimum spare connections + if (this._minSpareConnections > 0) { + var createConnectionCount = this._minSpareConnections - this._spareConnections.size(); + if (createConnectionCount > 0) { + while(createConnectionCount--) { + this._createNewSpareConnectionForSystem(); + } + return; + } + } + + // check maximum spare connections + if (this._maxSpareConnections > 0) { + var removeConnectionCount = this._spareConnections.size() - this._maxSpareConnections; + if (removeConnectionCount > 0) { + while(removeConnectionCount--) { + var connection = this.getSpareConnection(); + if (connection) { + this._removeConnection(connection); + process.nextTick(function() { + connection.destroy(); + }); + } + } + } + } +}; + +/** + * Raise error to callback. + * + * @param {Function} callback + * @param {string} code + * @param {string} message + * @api private + */ + +PoolConnectionManager.prototype._raiseError = function (callback, code, message) { + if (callback) { + var err = new Error(message); + err.code = code; + process.nextTick(function () { + callback(err); + }); + } +}; + +/** + * Create a object for handling all connections. + * + * @api private + */ + +PoolConnectionManager.prototype._createAllConnections = function () { + return { + _map: {}, + _size: 0, + isEmpty: function() { + return this._size === 0; + }, + add: function(connection) { + this._map[connection._poolData.id] = connection; + this._size++; + }, + set: function(connection) { + this._map[connection._poolData.id] = connection; + }, + getById: function(id) { + return this._map[id]; + }, + remove: function(connection) { + if (this._map[connection._poolData.id] !== undefined) { + delete this._map[connection._poolData.id]; + this._size--; + } + }, + size: function() { + return this._size; + }, + destroy: function(destroyCallback) { + if (this._size > 0) { + var map = this._map; + for (var id in map) { + destroyCallback(map[id]); + } + + this._map = {}; + this._size = 0; + } + } + }; +}; + +/** + * Create a object for handling spare connections. + * + * @api private + */ + +PoolConnectionManager.prototype._createSpareConnections = function () { + return { + _stack: [], + _top: -1, + isEmpty: function() { + return this._top < 0; + }, + add: function(connection) { + this._stack[++this._top] = connection._poolData.id; + }, + pick: function() { + if (this._top < 0) { + return null; + } + + var id = this._stack[this._top]; + if (id === undefined) { + return null; + } + + this._stack[this._top--] = undefined; + return id; + }, + remove: function(connection) { + var newStack = []; + var top = this._top; + var found = false; + var targetId = connection._poolData.id; + + while(top--) { + var checkId = this._stack[top]; + + if (checkId === undefined) { + break; + } + + if (checkId === targetId) { + found = true; + } else { + newStack.push(this._stack[top]); + } + } + + if (found) { + this._stack = newStack; + this._top--; + } + }, + size: function() { + return this._top + 1; + } + }; +}; + +/** + * Create a object for handling waiting list (callback). + * + * @param {number} timeout + * @api private + */ + +PoolConnectionManager.prototype._createWaitingList = function (timeout) { + return { + _queue: [], + _id : 1, + _timerHandles : {}, + _timeout : timeout, + isEmpty: function() { + return this._queue.length === 0; + }, + add: function(callback) { + this._queue.push(this._makeCallback(callback)); + }, + pick: function() { + var item = this._queue.shift(); + + if (item) { + this._clearTimer(item.id); + return item.callback; + } else { + return item; + } + }, + rollback: function(callback) { + this._queue.unshift(this._makeCallback(callback)); + }, + destroy: function(destroyCallback) { + var queue = this._queue; + + for (var i = 0, len = queue.length; i < len; i++) { + this._clearTimer(queue[i].id); + destroyCallback(queue[i].callback); + } + + this._queue = []; + }, + size: function() { + return this._queue.length; + }, + _makeCallback: function(callback) { + var queueItem = { + id: this._id++, + callback: callback + }; + + if (this._timeout > 0) { + var self = this; + var itemId = queueItem.id; + this._timerHandles[itemId] = setTimeout(function() { + self._onTimeout(itemId); + }, this._timeout); + } + + return queueItem; + }, + _onTimeout: function(id) { + this._clearTimer(id); + + var queue = this._queue; + + for (var i = 0, len = queue.length; i < len; i++) { + if (queue[i].id === id) { + var callback = queue[i].callback; + queue.splice(i, 1); + + var timeoutError = new Error('Queue timeout occurred.'); + timeoutError.code = 'POOL_QUEUETIMEOUT'; + callback(timeoutError); + break; + } + } + }, + _clearTimer: function(id) { + if (this._timerHandles[id] !== undefined) { + clearTimeout(this._timerHandles[id]); + delete this._timerHandles[id]; + } + } + }; +}; diff --git a/lib/protocol/sequences/ChangeUser.js b/lib/protocol/sequences/ChangeUser.js index 26be6dbbd..645737d85 100644 --- a/lib/protocol/sequences/ChangeUser.js +++ b/lib/protocol/sequences/ChangeUser.js @@ -26,10 +26,12 @@ ChangeUser.prototype.start = function(handshakeInitializationPacket) { charsetNumber : this._charsetNumber }); - this._currentConfig.user = this._user; - this._currentConfig.password = this._password; - this._currentConfig.database = this._database; - this._currentConfig.charsetNumber = this._charsetNumber; + var currentConfig = this._currentConfig; + currentConfig.user = this._user; + currentConfig.password = this._password; + currentConfig.database = this._database; + currentConfig.charsetNumber = this._charsetNumber; + currentConfig.changedUser = true; this.emit('packet', packet); }; diff --git a/test/unit/pool/test-connection-min-max-free.js b/test/unit/pool/test-connection-min-max-free.js new file mode 100644 index 000000000..16ec8fbd6 --- /dev/null +++ b/test/unit/pool/test-connection-min-max-free.js @@ -0,0 +1,62 @@ +var assert = require('assert'); +var common = require('../../common'); +var pool = common.createPool({ + connectionLimit : 1, + port : common.fakeServerPort, + minSpareConnections : 2, + maxSpareConnections : 3, + spareCheckInterval : 200 +}); + +var server = common.createFakeServer(); + +server.listen(common.fakeServerPort, function(err) { + assert.ifError(err); + + setTimeout(function() { + // check for minSpareConnections + assert.deepEqual(pool.getStatus(), { + all : 2, + use : 0, + spare : 2, + waiting : 0 + }); + + pool.getConnection(function (err, connection) { + assert.ifError(err); + setTimeout(function() { + connection.release(); + }, 500); + }); + + pool.getConnection(function (err, connection) { + assert.ifError(err); + setTimeout(function() { + connection.release(); + + // check for minSpareConnections + assert.deepEqual(pool.getStatus(), { + all : 4, + use : 0, + spare : 4, + waiting : 0 + }); + + // check for maxSpareConnections + setTimeout(function() { + assert.deepEqual(pool.getStatus(), { + all : 3, + use : 0, + spare : 3, + waiting : 0 + }); + + pool.end(function(err) { + assert.ifError(err); + server.destroy(); + }); + }, 500); + }, 500); + }); + }, 500); +}); diff --git a/test/unit/pool/test-connection-pool-data.js b/test/unit/pool/test-connection-pool-data.js new file mode 100644 index 000000000..4670ba915 --- /dev/null +++ b/test/unit/pool/test-connection-pool-data.js @@ -0,0 +1,26 @@ +var assert = require('assert'); +var common = require('../../common'); +var pool = common.createPool({ + connectionLimit : 1, + port : common.fakeServerPort, + pingCheckInterval : 3000 +}); + +var server = common.createFakeServer(); + +server.listen(common.fakeServerPort, function(err) { + assert.ifError(err); + + pool.getConnection(function(err, conn) { + assert.ifError(err); + + assert.equal(conn._poolData.used, true); + assert.equal(conn._poolData.removed, false); + assert.ok(conn._poolData.lastUsedTime > 0); + + conn.release(); + + assert.equal(conn._poolData.used, false); + server.destroy(); + }); +}); diff --git a/test/unit/pool/test-connection-start-wait.js b/test/unit/pool/test-connection-start-wait.js new file mode 100644 index 000000000..fa56aad69 --- /dev/null +++ b/test/unit/pool/test-connection-start-wait.js @@ -0,0 +1,37 @@ +var assert = require('assert'); +var common = require('../../common'); +var pool = common.createPool({ + connectionLimit : 1, + port : common.fakeServerPort, + startConnections : 2 +}); + +var server = common.createFakeServer(); +var connection1time = 0, connection2time = 0; + +server.listen(common.fakeServerPort, function(err){ + assert.ifError(err); + + pool.getConnection(function(err, conn){ + connection1time = Date.now(); + assert.ifError(err); + + setTimeout(function() { + conn.release(); + }, 500); + }); + + pool.getConnection(function(err, conn) { + assert.ifError(err); + connection2time = Date.now(); + assert.ok(connection2time - connection1time < 250, 'waiting callbacks must be run at the same time'); + conn.release(); + server.destroy(); + }); +}); + +server.on('connection', function(incomingConnection) { + setTimeout(function() { + incomingConnection.handshake(); + }, 300); +}); diff --git a/test/unit/pool/test-connection-start.js b/test/unit/pool/test-connection-start.js new file mode 100644 index 000000000..8a33f4ace --- /dev/null +++ b/test/unit/pool/test-connection-start.js @@ -0,0 +1,32 @@ +var assert = require('assert'); +var common = require('../../common'); +var pool = common.createPool({ + connectionLimit : 1, + port : common.fakeServerPort, + startConnections : 2 +}); + +var server = common.createFakeServer(); +var prepared = false; + +server.listen(common.fakeServerPort, function(err){ + assert.ifError(err); + + pool.on('prepared', function(preparedConnectionCount) { + prepared = true; + assert.equal(preparedConnectionCount, 2); + }); + + pool.getConnection(function(err, conn){ + assert.ifError(err); + conn.release(); + }); + + pool.getConnection(function(err, conn){ + assert.ifError(err); + assert.equal(prepared, true); + + conn.release(); + server.destroy(); + }); +}); diff --git a/test/unit/pool/test-connection-without-ping.js b/test/unit/pool/test-connection-without-ping.js new file mode 100644 index 000000000..08f4d67e7 --- /dev/null +++ b/test/unit/pool/test-connection-without-ping.js @@ -0,0 +1,37 @@ +var assert = require('assert'); +var common = require('../../common'); +var pool = common.createPool({ + connectionLimit : 1, + port : common.fakeServerPort, + pingCheckInterval : 30000 +}); + +var server = common.createFakeServer(); +var ping = false; + +server.listen(common.fakeServerPort, function(err){ + assert.ifError(err); + + pool.getConnection(function(err, conn){ + assert.ifError(err); + conn.release(); + }); + + pool.getConnection(function(err, conn){ + assert.ifError(err); + assert.equal(ping, false); + + conn.release(); + server.destroy(); + }); +}); + +server.on('connection', function(incomingConnection) { + incomingConnection.handshake(); + + incomingConnection.on('ping', function() { + ping = true; + this._sendPacket(new common.Packets.OkPacket()); + this._parser.resetPacketNumber(); + }); +}); diff --git a/test/unit/pool/test-destroy-connection.js b/test/unit/pool/test-destroy-connection.js index eaafe9d07..b237f9795 100644 --- a/test/unit/pool/test-destroy-connection.js +++ b/test/unit/pool/test-destroy-connection.js @@ -11,10 +11,10 @@ server.listen(common.fakeServerPort, function (err) { pool.getConnection(function (err, connection) { assert.ifError(err); - assert.strictEqual(connection, pool._allConnections[0]); + assert.strictEqual(connection, pool._connectionManager._allConnections._map[1]); connection.destroy(); - assert.strictEqual(pool._allConnections.length, 0); + assert.strictEqual(pool._connectionManager._allConnections.size(), 0); assert.ok(!connection._pool); assert.doesNotThrow(function () { connection.release(); }); diff --git a/test/unit/pool/test-queue-timeout.js b/test/unit/pool/test-queue-timeout.js new file mode 100644 index 000000000..e852e6e92 --- /dev/null +++ b/test/unit/pool/test-queue-timeout.js @@ -0,0 +1,33 @@ +var assert = require('assert'); +var common = require('../../common'); +var pool = common.createPool({ + connectionLimit : 1, + port : common.fakeServerPort, + queueWaitTimeout : 100, + acquireTimeout : 200 +}); + +var server = common.createFakeServer(); + +server.listen(common.fakeServerPort, function(err){ + assert.ifError(err); + + pool.getConnection(function(err, conn){ + assert.ifError(err); + + setTimeout(function() { + conn.release(); + }, 200); + }); + + pool.getConnection(function(err, conn) { + assert.ok(err, 'got error'); + assert.equal(err.code, 'POOL_QUEUETIMEOUT'); + }); + + pool.getConnection(function(err, conn) { + assert.ok(err, 'got error'); + assert.equal(err.code, 'POOL_QUEUETIMEOUT'); + server.destroy(); + }); +}); diff --git a/test/unit/pool/test-status.js b/test/unit/pool/test-status.js new file mode 100644 index 000000000..cdaf15fa3 --- /dev/null +++ b/test/unit/pool/test-status.js @@ -0,0 +1,35 @@ +var assert = require('assert'); +var common = require('../../common'); +var pool = common.createPool({ + connectionLimit : 1, + port : common.fakeServerPort, + startConnections : 2 +}); + +var server = common.createFakeServer(); + +server.listen(common.fakeServerPort, function (err) { + assert.ifError(err); + + pool.getConnection(function(err, conn) { + assert.ifError(err); + + assert.deepEqual(pool.getStatus(), { + all : 2, + use : 1, + spare : 1, + waiting : 0 + }); + + conn.release(); + + assert.deepEqual(pool.getStatus(), { + all : 2, + use : 0, + spare : 2, + waiting : 0 + }); + + server.destroy(); + }); +});