Skip to content

Commit e2a3113

Browse files
committed
Merge pull request #522 from ifsnow/pool-cluster
Add PoolCluster Feature.
2 parents 0e3988c + d7b8bdc commit e2a3113

File tree

5 files changed

+547
-2
lines changed

5 files changed

+547
-2
lines changed

Readme.md

Lines changed: 57 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -270,6 +270,62 @@ addition to those options pools accept a few extras:
270270
before returning an error from `getConnection`. If set to `0`, there is no
271271
limit to the number of queued connection requests. (Default: `0`)
272272

273+
## PoolCluster
274+
275+
PoolCluster provides multiple hosts connection. (group & retry & selector)
276+
277+
```js
278+
// create
279+
var poolCluster = mysql.createPoolCluster();
280+
281+
poolCluster.add(config); // anonymous group
282+
poolCluster.add('MASTER', masterConfig);
283+
poolCluster.add('SLAVE1', slave1Config);
284+
poolCluster.add('SLAVE2', slave2Config);
285+
286+
// Target Group : ALL(anonymous, MASTER, SLAVE1-2), Selector : round-robin(default)
287+
poolCluster.getConnection(function (err, connection) {});
288+
289+
// Target Group : MASTER, Selector : round-robin
290+
poolCluster.getConnection('MASTER', function (err, connection) {});
291+
292+
// Target Group : SLAVE1-2, Selector : order
293+
// If can't connect to SLAVE1, return SLAVE2. (remove SLAVE1 in the cluster)
294+
poolCluster.on('remove', function (nodeId) {
295+
console.log('REMOVED NODE : ' + nodeId); // nodeId = SLAVE1
296+
});
297+
298+
poolCluster.getConnection('SLAVE*', 'ORDER', function (err, connection) {});
299+
300+
// of namespace : of(pattern, selector)
301+
poolCluster.of('*').getConnection(function (err, connection) {});
302+
303+
var pool = poolCluster.of('SLAVE*', 'RANDOM')
304+
pool.getConnection(function (err, connection) {});
305+
pool.getConnection(function (err, connection) {});
306+
307+
// destroy
308+
poolCluster.end();
309+
```
310+
311+
## PoolCluster Option
312+
* `canRetry`: If `true`, `PoolCluster` will attempt to reconnect when connection fails. (Default: `true`)
313+
* `removeNodeErrorCount`: If connection fails, node's `errorCount` increases.
314+
When `errorCount` is greater than `removeNodeErrorCount`, remove a node in the `PoolCluster`. (Default: `5`)
315+
* `defaultSelector`: The default selector. (Default: `RR`)
316+
* `RR`: Select one alternately. (Round-Robin)
317+
* `RANDOM`: Select the node by random function.
318+
* `ORDER`: Select the first node available unconditionally.
319+
320+
```js
321+
var clusterConfig = {
322+
removeNodeErrorCount: 1, // Remove the node immediately when connection fails.
323+
defaultSelector: 'ORDER'
324+
};
325+
326+
var poolCluster = mysql.createPoolCluster(clusterConfig);
327+
```
328+
273329
## Switching users / altering connection state
274330

275331
MySQL offers a changeUser command that allows you to alter the current user and
@@ -886,7 +942,7 @@ For example, if you have an installation of mysql running on localhost:3306 and
886942
* Make sure the database (e.g. 'test') you want to use exists and the user you entered has the proper rights to use the test database. (E.g. do not forget to execute the SQL-command ```FLUSH PRIVILEGES``` after you have created the user.)
887943
* In a DOS-box (or CMD-shell) in the folder of your application run ```npm install mysql --dev``` or in the mysql folder (```node_modules\mysql```), run ```npm install --dev```. (This will install additional developer-dependencies for node-mysql.)
888944
* Run ```npm test mysql``` in your applications folder or ```npm test``` in the mysql subfolder.
889-
* If you want to log the output into a file use ```npm test mysql > test.log``` or ```npm test > test.log```.
945+
* If you want to log the output into a file use ```npm test mysql > test.log``` or ```npm test > test.log```.
890946

891947
## Todo
892948

index.js

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ var Types = require('./lib/protocol/constants/types');
44
var SqlString = require('./lib/protocol/SqlString');
55
var Pool = require('./lib/Pool');
66
var PoolConfig = require('./lib/PoolConfig');
7+
var PoolCluster = require('./lib/PoolCluster');
78

89
exports.createConnection = function(config) {
910
return new Connection({config: new ConnectionConfig(config)});
@@ -13,6 +14,10 @@ exports.createPool = function(config) {
1314
return new Pool({config: new PoolConfig(config)});
1415
};
1516

17+
exports.createPoolCluster = function(config) {
18+
return new PoolCluster(config);
19+
};
20+
1621
exports.createQuery = Connection.createQuery;
1722

1823
exports.Types = Types;

lib/PoolCluster.js

Lines changed: 241 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,241 @@
1+
var Pool = require('./Pool');
2+
var PoolConfig = require('./PoolConfig');
3+
var Util = require('util');
4+
var EventEmitter = require('events').EventEmitter;
5+
6+
module.exports = PoolCluster;
7+
8+
/**
9+
* PoolCluster
10+
*/
11+
function PoolCluster(config) {
12+
EventEmitter.call(this);
13+
14+
config = config || {};
15+
this._canRetry = typeof config.canRetry === 'undefined' ? true : config.canRetry;
16+
this._removeNodeErrorCount = config.removeNodeErrorCount || 5;
17+
this._defaultSelector = config.defaultSelector || 'RR';
18+
19+
this._closed = false;
20+
this._lastId = 0;
21+
this._nodes = {};
22+
this._serviceableNodeIds = [];
23+
this._namespaces = {};
24+
this._findCaches = {};
25+
}
26+
27+
Util.inherits(PoolCluster, EventEmitter);
28+
29+
PoolCluster.prototype.of = function(pattern, selector) {
30+
pattern = pattern || '*';
31+
32+
selector = selector || this._defaultSelector;
33+
selector = selector.toUpperCase();
34+
if (typeof Selector[selector] === 'undefined') {
35+
selector = this._defaultSelector;
36+
}
37+
38+
var key = pattern + selector;
39+
40+
if (typeof this._namespaces[key] === 'undefined') {
41+
this._namespaces[key] = new PoolNamespace(this, pattern, selector);
42+
}
43+
44+
return this._namespaces[key];
45+
};
46+
47+
PoolCluster.prototype.add = function(id, config) {
48+
if (typeof id === 'object') {
49+
config = id;
50+
id = 'CLUSTER::' + (++this._lastId);
51+
}
52+
53+
if (typeof this._nodes[id] === 'undefined') {
54+
this._nodes[id] = {
55+
id: id,
56+
errorCount: 0,
57+
pool: new Pool({config: new PoolConfig(config)})
58+
};
59+
60+
this._serviceableNodeIds.push(id);
61+
62+
this._clearFindCaches();
63+
}
64+
};
65+
66+
PoolCluster.prototype.getConnection = function(pattern, selector, cb) {
67+
if (typeof pattern === 'function') {
68+
cb = pattern;
69+
namespace = this.of();
70+
} else {
71+
if (typeof selector === 'function') {
72+
cb = selector;
73+
selector = this._defaultSelector;
74+
}
75+
76+
namespace = this.of(pattern, selector);
77+
}
78+
79+
namespace.getConnection(cb);
80+
};
81+
82+
PoolCluster.prototype.end = function() {
83+
if (this._closed) {
84+
return;
85+
}
86+
87+
this._closed = true;
88+
89+
for (var id in this._nodes) {
90+
this._nodes[id].pool.end();
91+
}
92+
};
93+
94+
PoolCluster.prototype._findNodeIds = function(pattern) {
95+
if (typeof this._findCaches[pattern] !== 'undefined') {
96+
return this._findCaches[pattern];
97+
}
98+
99+
var foundNodeIds;
100+
101+
if (pattern === '*') { // all
102+
foundNodeIds = this._serviceableNodeIds;
103+
} else if (typeof this._serviceableNodeIds[pattern] !== 'undefined') { // one
104+
foundNodeIds = [pattern];
105+
} else { // wild matching
106+
var keyword = pattern.substring(pattern.length - 1, 0);
107+
108+
foundNodeIds = this._serviceableNodeIds.filter(function (id) {
109+
return id.indexOf(keyword) === 0;
110+
});
111+
}
112+
113+
this._findCaches[pattern] = foundNodeIds;
114+
115+
return foundNodeIds;
116+
};
117+
118+
PoolCluster.prototype._getNode = function(id) {
119+
return this._nodes[id] || null;
120+
};
121+
122+
PoolCluster.prototype._increaseErrorCount = function(node) {
123+
if (++node.errorCount >= this._removeNodeErrorCount) {
124+
var index = this._serviceableNodeIds.indexOf(node.id);
125+
if (index !== -1) {
126+
this._serviceableNodeIds.splice(index, 1);
127+
delete this._nodes[node.id];
128+
129+
this._clearFindCaches();
130+
131+
node.pool.end();
132+
133+
this.emit('remove', node.id);
134+
}
135+
}
136+
};
137+
138+
PoolCluster.prototype._decreaseErrorCount = function(node) {
139+
if (node.errorCount > 0) {
140+
--node.errorCount;
141+
}
142+
};
143+
144+
PoolCluster.prototype._getConnection = function(node, cb) {
145+
var self = this;
146+
147+
node.pool.getConnection(function (err, connection) {
148+
if (err) {
149+
self._increaseErrorCount(node);
150+
151+
if (self._canRetry) {
152+
console.warn('[Error] PoolCluster : ' + err);
153+
return cb(null, 'retry');
154+
} else {
155+
return cb(err);
156+
}
157+
} else {
158+
self._decreaseErrorCount(node);
159+
}
160+
161+
connection._clusterId = node.id;
162+
163+
cb(null, connection);
164+
});
165+
};
166+
167+
PoolCluster.prototype._clearFindCaches = function() {
168+
this._findCaches = {};
169+
};
170+
171+
/**
172+
* PoolNamespace
173+
*/
174+
function PoolNamespace(cluster, pattern, selector) {
175+
this._cluster = cluster;
176+
this._pattern = pattern;
177+
this._selector = new Selector[selector]();
178+
}
179+
180+
PoolNamespace.prototype.getConnection = function(cb) {
181+
var clusterNode = this._getClusterNode();
182+
183+
if (clusterNode === null) {
184+
return cb(new Error('Pool does Not exists.'));
185+
}
186+
187+
this._cluster._getConnection(clusterNode, function(err, connection) {
188+
if (err) {
189+
return cb(err);
190+
}
191+
192+
if (connection === 'retry') {
193+
return this.getConnection(cb);
194+
}
195+
196+
cb(null, connection);
197+
}.bind(this));
198+
};
199+
200+
PoolNamespace.prototype._getClusterNode = function() {
201+
var foundNodeIds = this._cluster._findNodeIds(this._pattern);
202+
203+
if (foundNodeIds.length === 0) {
204+
return null;
205+
}
206+
207+
var nodeId = (foundNodeIds.length === 1) ? foundNodeIds[0] : this._selector(foundNodeIds);
208+
209+
return this._cluster._getNode(nodeId);
210+
};
211+
212+
/**
213+
* Selector
214+
*/
215+
var Selector = {};
216+
217+
Selector.RR = function () {
218+
var index = 0;
219+
220+
return function(clusterIds) {
221+
if (index >= clusterIds.length) {
222+
index = 0;
223+
}
224+
225+
var clusterId = clusterIds[index++];
226+
227+
return clusterId;
228+
};
229+
};
230+
231+
Selector.RANDOM = function () {
232+
return function(clusterIds) {
233+
return clusterIds[Math.floor(Math.random() * clusterIds.length)];
234+
};
235+
};
236+
237+
Selector.ORDER = function () {
238+
return function(clusterIds) {
239+
return clusterIds[0];
240+
};
241+
};

test/common.js

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,12 @@ common.createPool = function(config) {
3535
return Mysql.createPool(config);
3636
};
3737

38+
common.createPoolCluster = function(config) {
39+
config = mergeTestConfig(config);
40+
config.createConnection = common.createConnection;
41+
return Mysql.createPoolCluster(config);
42+
};
43+
3844
common.createFakeServer = function(options) {
3945
return new FakeServer(_.extend({}, options));
4046
};
@@ -45,7 +51,11 @@ common.useTestDb = function(connection) {
4551
});
4652

4753
connection.query('USE ' + common.testDatabase);
48-
}
54+
};
55+
56+
common.getTestConfig = function(config) {
57+
return mergeTestConfig(config);
58+
};
4959

5060
function mergeTestConfig(config) {
5161
if (common.isTravis()) {

0 commit comments

Comments
 (0)