From d7b8bdcbad267fafc578b3fb055ea72c855da1cb Mon Sep 17 00:00:00 2001 From: ifsnow Date: Fri, 21 Jun 2013 18:05:01 +0900 Subject: [PATCH] Add PoolCluster Feature. --- Readme.md | 58 ++++++- index.js | 5 + lib/PoolCluster.js | 241 ++++++++++++++++++++++++++ test/common.js | 12 +- test/integration/pool/test-cluster.js | 233 +++++++++++++++++++++++++ 5 files changed, 547 insertions(+), 2 deletions(-) create mode 100644 lib/PoolCluster.js create mode 100644 test/integration/pool/test-cluster.js diff --git a/Readme.md b/Readme.md index 7aac8e9c8..a99da2c03 100644 --- a/Readme.md +++ b/Readme.md @@ -270,6 +270,62 @@ addition to those options pools accept a few extras: before returning an error from `getConnection`. If set to `0`, there is no limit to the number of queued connection requests. (Default: `0`) +## PoolCluster + +PoolCluster provides multiple hosts connection. (group & retry & selector) + +```js +// create +var poolCluster = mysql.createPoolCluster(); + +poolCluster.add(config); // anonymous group +poolCluster.add('MASTER', masterConfig); +poolCluster.add('SLAVE1', slave1Config); +poolCluster.add('SLAVE2', slave2Config); + +// Target Group : ALL(anonymous, MASTER, SLAVE1-2), Selector : round-robin(default) +poolCluster.getConnection(function (err, connection) {}); + +// Target Group : MASTER, Selector : round-robin +poolCluster.getConnection('MASTER', function (err, connection) {}); + +// Target Group : SLAVE1-2, Selector : order +// If can't connect to SLAVE1, return SLAVE2. (remove SLAVE1 in the cluster) +poolCluster.on('remove', function (nodeId) { + console.log('REMOVED NODE : ' + nodeId); // nodeId = SLAVE1 +}); + +poolCluster.getConnection('SLAVE*', 'ORDER', function (err, connection) {}); + +// of namespace : of(pattern, selector) +poolCluster.of('*').getConnection(function (err, connection) {}); + +var pool = poolCluster.of('SLAVE*', 'RANDOM') +pool.getConnection(function (err, connection) {}); +pool.getConnection(function (err, connection) {}); + +// destroy +poolCluster.end(); +``` + +## PoolCluster Option +* `canRetry`: If `true`, `PoolCluster` will attempt to reconnect when connection fails. (Default: `true`) +* `removeNodeErrorCount`: If connection fails, node's `errorCount` increases. + When `errorCount` is greater than `removeNodeErrorCount`, remove a node in the `PoolCluster`. (Default: `5`) +* `defaultSelector`: The default selector. (Default: `RR`) + * `RR`: Select one alternately. (Round-Robin) + * `RANDOM`: Select the node by random function. + * `ORDER`: Select the first node available unconditionally. + +```js +var clusterConfig = { + removeNodeErrorCount: 1, // Remove the node immediately when connection fails. + defaultSelector: 'ORDER' +}; + +var poolCluster = mysql.createPoolCluster(clusterConfig); +``` + ## Switching users / altering connection state MySQL offers a changeUser command that allows you to alter the current user and @@ -886,7 +942,7 @@ For example, if you have an installation of mysql running on localhost:3306 and * Make sure the database (e.g. 'test') you want to use exists and the user you entered has the proper rights to use the test database. (E.g. do not forget to execute the SQL-command ```FLUSH PRIVILEGES``` after you have created the user.) * In a DOS-box (or CMD-shell) in the folder of your application run ```npm install mysql --dev``` or in the mysql folder (```node_modules\mysql```), run ```npm install --dev```. (This will install additional developer-dependencies for node-mysql.) * Run ```npm test mysql``` in your applications folder or ```npm test``` in the mysql subfolder. -* If you want to log the output into a file use ```npm test mysql > test.log``` or ```npm test > test.log```. +* If you want to log the output into a file use ```npm test mysql > test.log``` or ```npm test > test.log```. ## Todo diff --git a/index.js b/index.js index f589e7f45..8f41182ae 100644 --- a/index.js +++ b/index.js @@ -4,6 +4,7 @@ var Types = require('./lib/protocol/constants/types'); var SqlString = require('./lib/protocol/SqlString'); var Pool = require('./lib/Pool'); var PoolConfig = require('./lib/PoolConfig'); +var PoolCluster = require('./lib/PoolCluster'); exports.createConnection = function(config) { return new Connection({config: new ConnectionConfig(config)}); @@ -13,6 +14,10 @@ exports.createPool = function(config) { return new Pool({config: new PoolConfig(config)}); }; +exports.createPoolCluster = function(config) { + return new PoolCluster(config); +}; + exports.createQuery = Connection.createQuery; exports.Types = Types; diff --git a/lib/PoolCluster.js b/lib/PoolCluster.js new file mode 100644 index 000000000..f7a9e3085 --- /dev/null +++ b/lib/PoolCluster.js @@ -0,0 +1,241 @@ +var Pool = require('./Pool'); +var PoolConfig = require('./PoolConfig'); +var Util = require('util'); +var EventEmitter = require('events').EventEmitter; + +module.exports = PoolCluster; + +/** + * PoolCluster + */ +function PoolCluster(config) { + EventEmitter.call(this); + + config = config || {}; + this._canRetry = typeof config.canRetry === 'undefined' ? true : config.canRetry; + this._removeNodeErrorCount = config.removeNodeErrorCount || 5; + this._defaultSelector = config.defaultSelector || 'RR'; + + this._closed = false; + this._lastId = 0; + this._nodes = {}; + this._serviceableNodeIds = []; + this._namespaces = {}; + this._findCaches = {}; +} + +Util.inherits(PoolCluster, EventEmitter); + +PoolCluster.prototype.of = function(pattern, selector) { + pattern = pattern || '*'; + + selector = selector || this._defaultSelector; + selector = selector.toUpperCase(); + if (typeof Selector[selector] === 'undefined') { + selector = this._defaultSelector; + } + + var key = pattern + selector; + + if (typeof this._namespaces[key] === 'undefined') { + this._namespaces[key] = new PoolNamespace(this, pattern, selector); + } + + return this._namespaces[key]; +}; + +PoolCluster.prototype.add = function(id, config) { + if (typeof id === 'object') { + config = id; + id = 'CLUSTER::' + (++this._lastId); + } + + if (typeof this._nodes[id] === 'undefined') { + this._nodes[id] = { + id: id, + errorCount: 0, + pool: new Pool({config: new PoolConfig(config)}) + }; + + this._serviceableNodeIds.push(id); + + this._clearFindCaches(); + } +}; + +PoolCluster.prototype.getConnection = function(pattern, selector, cb) { + if (typeof pattern === 'function') { + cb = pattern; + namespace = this.of(); + } else { + if (typeof selector === 'function') { + cb = selector; + selector = this._defaultSelector; + } + + namespace = this.of(pattern, selector); + } + + namespace.getConnection(cb); +}; + +PoolCluster.prototype.end = function() { + if (this._closed) { + return; + } + + this._closed = true; + + for (var id in this._nodes) { + this._nodes[id].pool.end(); + } +}; + +PoolCluster.prototype._findNodeIds = function(pattern) { + if (typeof this._findCaches[pattern] !== 'undefined') { + return this._findCaches[pattern]; + } + + var foundNodeIds; + + if (pattern === '*') { // all + foundNodeIds = this._serviceableNodeIds; + } else if (typeof this._serviceableNodeIds[pattern] !== 'undefined') { // one + foundNodeIds = [pattern]; + } else { // wild matching + var keyword = pattern.substring(pattern.length - 1, 0); + + foundNodeIds = this._serviceableNodeIds.filter(function (id) { + return id.indexOf(keyword) === 0; + }); + } + + this._findCaches[pattern] = foundNodeIds; + + return foundNodeIds; +}; + +PoolCluster.prototype._getNode = function(id) { + return this._nodes[id] || null; +}; + +PoolCluster.prototype._increaseErrorCount = function(node) { + if (++node.errorCount >= this._removeNodeErrorCount) { + var index = this._serviceableNodeIds.indexOf(node.id); + if (index !== -1) { + this._serviceableNodeIds.splice(index, 1); + delete this._nodes[node.id]; + + this._clearFindCaches(); + + node.pool.end(); + + this.emit('remove', node.id); + } + } +}; + +PoolCluster.prototype._decreaseErrorCount = function(node) { + if (node.errorCount > 0) { + --node.errorCount; + } +}; + +PoolCluster.prototype._getConnection = function(node, cb) { + var self = this; + + node.pool.getConnection(function (err, connection) { + if (err) { + self._increaseErrorCount(node); + + if (self._canRetry) { + console.warn('[Error] PoolCluster : ' + err); + return cb(null, 'retry'); + } else { + return cb(err); + } + } else { + self._decreaseErrorCount(node); + } + + connection._clusterId = node.id; + + cb(null, connection); + }); +}; + +PoolCluster.prototype._clearFindCaches = function() { + this._findCaches = {}; +}; + +/** + * PoolNamespace + */ +function PoolNamespace(cluster, pattern, selector) { + this._cluster = cluster; + this._pattern = pattern; + this._selector = new Selector[selector](); +} + +PoolNamespace.prototype.getConnection = function(cb) { + var clusterNode = this._getClusterNode(); + + if (clusterNode === null) { + return cb(new Error('Pool does Not exists.')); + } + + this._cluster._getConnection(clusterNode, function(err, connection) { + if (err) { + return cb(err); + } + + if (connection === 'retry') { + return this.getConnection(cb); + } + + cb(null, connection); + }.bind(this)); +}; + +PoolNamespace.prototype._getClusterNode = function() { + var foundNodeIds = this._cluster._findNodeIds(this._pattern); + + if (foundNodeIds.length === 0) { + return null; + } + + var nodeId = (foundNodeIds.length === 1) ? foundNodeIds[0] : this._selector(foundNodeIds); + + return this._cluster._getNode(nodeId); +}; + +/** + * Selector + */ +var Selector = {}; + +Selector.RR = function () { + var index = 0; + + return function(clusterIds) { + if (index >= clusterIds.length) { + index = 0; + } + + var clusterId = clusterIds[index++]; + + return clusterId; + }; +}; + +Selector.RANDOM = function () { + return function(clusterIds) { + return clusterIds[Math.floor(Math.random() * clusterIds.length)]; + }; +}; + +Selector.ORDER = function () { + return function(clusterIds) { + return clusterIds[0]; + }; +}; diff --git a/test/common.js b/test/common.js index 4984228da..fda4dd091 100644 --- a/test/common.js +++ b/test/common.js @@ -35,6 +35,12 @@ common.createPool = function(config) { return Mysql.createPool(config); }; +common.createPoolCluster = function(config) { + config = mergeTestConfig(config); + config.createConnection = common.createConnection; + return Mysql.createPoolCluster(config); +}; + common.createFakeServer = function(options) { return new FakeServer(_.extend({}, options)); }; @@ -45,7 +51,11 @@ common.useTestDb = function(connection) { }); connection.query('USE ' + common.testDatabase); -} +}; + +common.getTestConfig = function(config) { + return mergeTestConfig(config); +}; function mergeTestConfig(config) { if (common.isTravis()) { diff --git a/test/integration/pool/test-cluster.js b/test/integration/pool/test-cluster.js new file mode 100644 index 000000000..6d1975a09 --- /dev/null +++ b/test/integration/pool/test-cluster.js @@ -0,0 +1,233 @@ +var common = require('../../common'); +var assert = require('assert'); + +function createPoolCluster(clusterConfig, poolConfig) { + var cluster = common.createPoolCluster(clusterConfig); + + if (typeof poolConfig === 'undefined') { + poolConfig = common.getTestConfig(); + } + + cluster.add(poolConfig); + cluster.add('MASTER', poolConfig); + cluster.add('SLAVE1', poolConfig); + cluster.add('SLAVE2', poolConfig); + + return cluster; +} + +// Test_base_function +(function () { + var cluster = createPoolCluster(); + + // added nodes + assert.deepEqual(cluster._serviceableNodeIds, ['CLUSTER::1', 'MASTER', 'SLAVE1', 'SLAVE2']); + + // _findNodeIds + assert.deepEqual(cluster._findNodeIds('MASTER'), ['MASTER']); + assert.deepEqual(cluster._findNodeIds('SLAVE*'), ['SLAVE1', 'SLAVE2']); + + // of singletone instance + var poolNamespace = cluster.of('*', 'RR'); + var poolNamespace2 = cluster.of('*'); + assert.strictEqual(poolNamespace, poolNamespace2); + + // empty pattern + var emptyPoolNamespace = cluster.of(); + assert.strictEqual(poolNamespace, emptyPoolNamespace); + + // wrong selector + var wrongPoolNamespace = cluster.of('*', 'RR2'); + assert.strictEqual(poolNamespace, wrongPoolNamespace); + + cluster.end(); +})(); + +// Test_getConnection_one +(function() { + var cluster = createPoolCluster(); + + cluster.getConnection('MASTER', function(err, connection) { + cluster.end(); + + if (!err) { + assert.strictEqual(connection._clusterId, 'MASTER'); + } + }.bind(this)); +})(); + +// Test_of_getConnection_one +(function() { + var cluster = createPoolCluster(); + + cluster.of('MASTER').getConnection(function(err, connection) { + cluster.end(); + + if (!err) { + assert.strictEqual(connection._clusterId, 'MASTER'); + } + }.bind(this)); +})(); + +// Test_getConnection_multi +(function() { + var cluster = createPoolCluster(); + + cluster.getConnection('SLAVE*', 'RR', function(err, connection) { + if (!err) { + assert.strictEqual(connection._clusterId, 'SLAVE1'); + } + + cluster.getConnection('SLAVE*', 'RR', function(err, connection) { + cluster.end(); + + if (!err) { + assert.strictEqual(connection._clusterId, 'SLAVE2'); + } + }); + }); +})(); + +// Test_of_getConnection_multi +(function() { + var cluster = createPoolCluster(); + var pool = cluster.of('SLAVE*', 'RR'); + + pool.getConnection(function(err, connection) { + if (!err) { + assert.strictEqual(connection._clusterId, 'SLAVE1'); + } + + pool.getConnection(function(err, connection) { + cluster.end(); + + if (!err) { + assert.strictEqual(connection._clusterId, 'SLAVE2'); + } + }); + }); +})(); + +// Test_of_getConnection_ORDER_selector +(function() { + var cluster = createPoolCluster(); + var pool = cluster.of('SLAVE*', 'ORDER'); + + pool.getConnection(function(err, connection) { + if (!err) { + assert.strictEqual(connection._clusterId, 'SLAVE1'); + } + + pool.getConnection(function(err, connection) { + cluster.end(); + + if (!err) { + assert.strictEqual(connection._clusterId, 'SLAVE1'); + } + }); + }); +})(); + + +// Test_of_getConnection_default_selector +(function() { + var cluster = createPoolCluster({ + defaultSelector: 'ORDER' + }); + + var pool = cluster.of('SLAVE*'); + + pool.getConnection(function(err, connection) { + if (!err) { + assert.strictEqual(connection._clusterId, 'SLAVE1'); + } + + pool.getConnection(function(err, connection) { + cluster.end(); + + if (!err) { + assert.strictEqual(connection._clusterId, 'SLAVE1'); + } + }); + }); +})(); + +// Test_retry_throw_error +(function() { + var cluster = common.createPoolCluster({ + canRetry: false + }); + + var poolConfig = common.getTestConfig(); + + var origPort = poolConfig.port; + poolConfig.port = 3300; + cluster.add('ERROR', poolConfig); + + poolConfig.port = origPort; + cluster.add('CORRECT', poolConfig); + + cluster.of('*').getConnection(function (err, connection) { + cluster.end(); + + assert.ok(err instanceof Error); + }); +})(); + +// Test_retry +(function() { + var cluster = common.createPoolCluster(); + + var poolConfig = common.getTestConfig(); + + var origPort = poolConfig.port; + poolConfig.port = 3300; + cluster.add('ERROR', poolConfig); + + poolConfig.port = origPort; + cluster.add('CORRECT', poolConfig); + + cluster.of('*', 'RR').getConnection(function (err, connection) { + cluster.end(); + + assert.ok(err === null); + assert.equal(connection._clusterId, 'CORRECT'); + + assert.equal(cluster._nodes.ERROR.errorCount, 1); + }); +})(); + +// Test_remove_node +(function() { + var cluster = common.createPoolCluster({ + removeNodeErrorCount: 1 + }); + + var poolConfig = common.getTestConfig(); + + var origPort = poolConfig.port; + poolConfig.port = 3300; + cluster.add('ERROR', poolConfig); + + poolConfig.port = origPort; + cluster.add('CORRECT', poolConfig); + + var removedNodeId = ''; + + cluster.on('remove', function(nodeId) { + removedNodeId = nodeId; + }); + + cluster.of('*', 'RR').getConnection(function (err, connection) { + cluster.end(); + + assert.ok(err === null); + assert.equal(connection._clusterId, 'CORRECT'); + + assert.equal(removedNodeId, 'ERROR'); + + assert.ok(typeof cluster._nodes.ERROR === 'undefined'); + assert.equal(cluster._serviceableNodeIds.length, 1); + assert.deepEqual(cluster._serviceableNodeIds, ['CORRECT']); + }); +})();