Skip to content

Add PoolCluster Feature. #522

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 27, 2013
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 57 additions & 1 deletion Readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,62 @@ addition to those options pools accept a few extras:
before returning an error from `getConnection`. If set to `0`, there is no
limit to the number of queued connection requests. (Default: `0`)

## PoolCluster

PoolCluster provides multiple hosts connection. (group & retry & selector)

```js
// create
var poolCluster = mysql.createPoolCluster();

poolCluster.add(config); // anonymous group
poolCluster.add('MASTER', masterConfig);
poolCluster.add('SLAVE1', slave1Config);
poolCluster.add('SLAVE2', slave2Config);

// Target Group : ALL(anonymous, MASTER, SLAVE1-2), Selector : round-robin(default)
poolCluster.getConnection(function (err, connection) {});

// Target Group : MASTER, Selector : round-robin
poolCluster.getConnection('MASTER', function (err, connection) {});

// Target Group : SLAVE1-2, Selector : order
// If can't connect to SLAVE1, return SLAVE2. (remove SLAVE1 in the cluster)
poolCluster.on('remove', function (nodeId) {
console.log('REMOVED NODE : ' + nodeId); // nodeId = SLAVE1
});

poolCluster.getConnection('SLAVE*', 'ORDER', function (err, connection) {});

// of namespace : of(pattern, selector)
poolCluster.of('*').getConnection(function (err, connection) {});

var pool = poolCluster.of('SLAVE*', 'RANDOM')
pool.getConnection(function (err, connection) {});
pool.getConnection(function (err, connection) {});

// destroy
poolCluster.end();
```

## PoolCluster Option
* `canRetry`: If `true`, `PoolCluster` will attempt to reconnect when connection fails. (Default: `true`)
* `removeNodeErrorCount`: If connection fails, node's `errorCount` increases.
When `errorCount` is greater than `removeNodeErrorCount`, remove a node in the `PoolCluster`. (Default: `5`)
* `defaultSelector`: The default selector. (Default: `RR`)
* `RR`: Select one alternately. (Round-Robin)
* `RANDOM`: Select the node by random function.
* `ORDER`: Select the first node available unconditionally.

```js
var clusterConfig = {
removeNodeErrorCount: 1, // Remove the node immediately when connection fails.
defaultSelector: 'ORDER'
};

var poolCluster = mysql.createPoolCluster(clusterConfig);
```

## Switching users / altering connection state

MySQL offers a changeUser command that allows you to alter the current user and
Expand Down Expand Up @@ -886,7 +942,7 @@ For example, if you have an installation of mysql running on localhost:3306 and
* Make sure the database (e.g. 'test') you want to use exists and the user you entered has the proper rights to use the test database. (E.g. do not forget to execute the SQL-command ```FLUSH PRIVILEGES``` after you have created the user.)
* In a DOS-box (or CMD-shell) in the folder of your application run ```npm install mysql --dev``` or in the mysql folder (```node_modules\mysql```), run ```npm install --dev```. (This will install additional developer-dependencies for node-mysql.)
* Run ```npm test mysql``` in your applications folder or ```npm test``` in the mysql subfolder.
* If you want to log the output into a file use ```npm test mysql > test.log``` or ```npm test > test.log```.
* If you want to log the output into a file use ```npm test mysql > test.log``` or ```npm test > test.log```.

## Todo

Expand Down
5 changes: 5 additions & 0 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ var Types = require('./lib/protocol/constants/types');
var SqlString = require('./lib/protocol/SqlString');
var Pool = require('./lib/Pool');
var PoolConfig = require('./lib/PoolConfig');
var PoolCluster = require('./lib/PoolCluster');

exports.createConnection = function(config) {
return new Connection({config: new ConnectionConfig(config)});
Expand All @@ -13,6 +14,10 @@ exports.createPool = function(config) {
return new Pool({config: new PoolConfig(config)});
};

exports.createPoolCluster = function(config) {
return new PoolCluster(config);
};

exports.createQuery = Connection.createQuery;

exports.Types = Types;
Expand Down
241 changes: 241 additions & 0 deletions lib/PoolCluster.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,241 @@
var Pool = require('./Pool');
var PoolConfig = require('./PoolConfig');
var Util = require('util');
var EventEmitter = require('events').EventEmitter;

module.exports = PoolCluster;

/**
* PoolCluster
*/
function PoolCluster(config) {
EventEmitter.call(this);

config = config || {};
this._canRetry = typeof config.canRetry === 'undefined' ? true : config.canRetry;
this._removeNodeErrorCount = config.removeNodeErrorCount || 5;
this._defaultSelector = config.defaultSelector || 'RR';

this._closed = false;
this._lastId = 0;
this._nodes = {};
this._serviceableNodeIds = [];
this._namespaces = {};
this._findCaches = {};
}

Util.inherits(PoolCluster, EventEmitter);

PoolCluster.prototype.of = function(pattern, selector) {
pattern = pattern || '*';

selector = selector || this._defaultSelector;
selector = selector.toUpperCase();
if (typeof Selector[selector] === 'undefined') {
selector = this._defaultSelector;
}

var key = pattern + selector;

if (typeof this._namespaces[key] === 'undefined') {
this._namespaces[key] = new PoolNamespace(this, pattern, selector);
}

return this._namespaces[key];
};

PoolCluster.prototype.add = function(id, config) {
if (typeof id === 'object') {
config = id;
id = 'CLUSTER::' + (++this._lastId);
}

if (typeof this._nodes[id] === 'undefined') {
this._nodes[id] = {
id: id,
errorCount: 0,
pool: new Pool({config: new PoolConfig(config)})
};

this._serviceableNodeIds.push(id);

this._clearFindCaches();
}
};

PoolCluster.prototype.getConnection = function(pattern, selector, cb) {
if (typeof pattern === 'function') {
cb = pattern;
namespace = this.of();
} else {
if (typeof selector === 'function') {
cb = selector;
selector = this._defaultSelector;
}

namespace = this.of(pattern, selector);
}

namespace.getConnection(cb);
};

PoolCluster.prototype.end = function() {
if (this._closed) {
return;
}

this._closed = true;

for (var id in this._nodes) {
this._nodes[id].pool.end();
}
};

PoolCluster.prototype._findNodeIds = function(pattern) {
if (typeof this._findCaches[pattern] !== 'undefined') {
return this._findCaches[pattern];
}

var foundNodeIds;

if (pattern === '*') { // all
foundNodeIds = this._serviceableNodeIds;
} else if (typeof this._serviceableNodeIds[pattern] !== 'undefined') { // one
foundNodeIds = [pattern];
} else { // wild matching
var keyword = pattern.substring(pattern.length - 1, 0);

foundNodeIds = this._serviceableNodeIds.filter(function (id) {
return id.indexOf(keyword) === 0;
});
}

this._findCaches[pattern] = foundNodeIds;

return foundNodeIds;
};

PoolCluster.prototype._getNode = function(id) {
return this._nodes[id] || null;
};

PoolCluster.prototype._increaseErrorCount = function(node) {
if (++node.errorCount >= this._removeNodeErrorCount) {
var index = this._serviceableNodeIds.indexOf(node.id);
if (index !== -1) {
this._serviceableNodeIds.splice(index, 1);
delete this._nodes[node.id];

this._clearFindCaches();

node.pool.end();

this.emit('remove', node.id);
}
}
};

PoolCluster.prototype._decreaseErrorCount = function(node) {
if (node.errorCount > 0) {
--node.errorCount;
}
};

PoolCluster.prototype._getConnection = function(node, cb) {
var self = this;

node.pool.getConnection(function (err, connection) {
if (err) {
self._increaseErrorCount(node);

if (self._canRetry) {
console.warn('[Error] PoolCluster : ' + err);
return cb(null, 'retry');
} else {
return cb(err);
}
} else {
self._decreaseErrorCount(node);
}

connection._clusterId = node.id;

cb(null, connection);
});
};

PoolCluster.prototype._clearFindCaches = function() {
this._findCaches = {};
};

/**
* PoolNamespace
*/
function PoolNamespace(cluster, pattern, selector) {
this._cluster = cluster;
this._pattern = pattern;
this._selector = new Selector[selector]();
}

PoolNamespace.prototype.getConnection = function(cb) {
var clusterNode = this._getClusterNode();

if (clusterNode === null) {
return cb(new Error('Pool does Not exists.'));
}

this._cluster._getConnection(clusterNode, function(err, connection) {
if (err) {
return cb(err);
}

if (connection === 'retry') {
return this.getConnection(cb);
}

cb(null, connection);
}.bind(this));
};

PoolNamespace.prototype._getClusterNode = function() {
var foundNodeIds = this._cluster._findNodeIds(this._pattern);

if (foundNodeIds.length === 0) {
return null;
}

var nodeId = (foundNodeIds.length === 1) ? foundNodeIds[0] : this._selector(foundNodeIds);

return this._cluster._getNode(nodeId);
};

/**
* Selector
*/
var Selector = {};

Selector.RR = function () {
var index = 0;

return function(clusterIds) {
if (index >= clusterIds.length) {
index = 0;
}

var clusterId = clusterIds[index++];

return clusterId;
};
};

Selector.RANDOM = function () {
return function(clusterIds) {
return clusterIds[Math.floor(Math.random() * clusterIds.length)];
};
};

Selector.ORDER = function () {
return function(clusterIds) {
return clusterIds[0];
};
};
12 changes: 11 additions & 1 deletion test/common.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ common.createPool = function(config) {
return Mysql.createPool(config);
};

common.createPoolCluster = function(config) {
config = mergeTestConfig(config);
config.createConnection = common.createConnection;
return Mysql.createPoolCluster(config);
};

common.createFakeServer = function(options) {
return new FakeServer(_.extend({}, options));
};
Expand All @@ -45,7 +51,11 @@ common.useTestDb = function(connection) {
});

connection.query('USE ' + common.testDatabase);
}
};

common.getTestConfig = function(config) {
return mergeTestConfig(config);
};

function mergeTestConfig(config) {
if (common.isTravis()) {
Expand Down
Loading