diff --git a/Readme.md b/Readme.md index 3d5069341..85ad4ba23 100644 --- a/Readme.md +++ b/Readme.md @@ -452,24 +452,47 @@ trying to gracefully shutdown a server. To end all the connections in the pool, use the `end` method on the pool: ```js -pool.end(function (err) { +pool.end({ + gracefulExit: true +}, function (err) { // all connections in the pool have ended }); ``` -The `end` method takes an _optional_ callback that you can use to know when -all the connections are ended. +The `end` method takes two _optional_ arguments: -**Once `pool.end` is called, `pool.getConnection` and other operations -can no longer be performed.** Wait until all connections in the pool are -released before calling `pool.end`. If you use the shortcut method -`pool.query`, in place of `pool.getConnection` → `connection.query` → -`connection.release`, wait until it completes. +* `options`: + + * `gracefulExit`: Determines whether to end gracefully. If `true`, every +`pool.getConnection` or `pool.query` called before `pool.end` will complete. +If `false`, only commands / queries already in progress will complete, +others will fail. (Default: `false`) + +* `callback`: Will be called when all the connections are ended. + +**Once `pool.end()` has been called, `pool.getConnection` and other operations +can no longer be performed** + +### Under the hood + +If `options.gracefulExit` is set to `true`, after calling `pool.end` the poll will +enter into the `pendingClose` state, all former or queued queries will still +complete. But the pool will no longer accept new queries. + +This works by NOT queueing the `QUIT` packet on all the connections until there +is no connection in the aquiring state and no queued queries. All established +connections will still queue queries which were added before calling `pool.end`. + +If `options.gracefulExit` is set to `false`, `pool.end` immediately calls +`connection.end` on every active connection in the pool. This queues a `QUIT` +packet on the connection and sets a flag to prevent `pool.getConnection` from +creating new connections. All commands / queries already in progress will complete, +but new commands won't execute. + +Wait until all connections in the pool are released before calling `pool.end`. +If you use the shortcut method `pool.query`, in place of `pool.getConnection` → +`connection.query` → `connection.release`, wait until it completes. -`pool.end` calls `connection.end` on every active connection in the pool. -This queues a `QUIT` packet on the connection and sets a flag to prevent -`pool.getConnection` from creating new connections. All commands / queries -already in progress will complete, but new commands won't execute. ## PoolCluster diff --git a/lib/Pool.js b/lib/Pool.js index 87a40114a..24781c654 100644 --- a/lib/Pool.js +++ b/lib/Pool.js @@ -17,9 +17,13 @@ function Pool(options) { this._freeConnections = []; this._connectionQueue = []; this._closed = false; + this._pendingClose = false; + this._endCallback = function (err) { + if (err) throw err; + }; } -Pool.prototype.getConnection = function (cb) { +Pool.prototype.getConnection = function (cb, _queued) { if (this._closed) { var err = new Error('Pool is closed.'); @@ -33,12 +37,21 @@ Pool.prototype.getConnection = function (cb) { var connection; var pool = this; - if (this._freeConnections.length > 0) { + if (this._freeConnections.length > 0 && (!this._pendingClose || _queued)) { connection = this._freeConnections.shift(); this.acquireConnection(connection, cb); return; } + if (this._pendingClose) { + var err = new Error('Pool is closed.'); + err.code = 'POOL_CLOSED'; + process.nextTick(function () { + cb(err); + }); + return; + } + if (this.config.connectionLimit === 0 || this._allConnections.length < this.config.connectionLimit) { connection = new PoolConnection(this, { config: this.config.newConnectionConfig() }); @@ -141,6 +154,12 @@ Pool.prototype.releaseConnection = function releaseConnection(connection) { this._freeConnections.push(connection); this.emit('release', connection); } + + if (this._pendingClose) { + this.end({ + gracefulExit: true + }); + } } if (this._closed) { @@ -154,19 +173,23 @@ Pool.prototype.releaseConnection = function releaseConnection(connection) { }); } else if (this._connectionQueue.length) { // get connection with next waiting callback - this.getConnection(this._connectionQueue.shift()); + this.getConnection(this._connectionQueue.shift(), true); } }; -Pool.prototype.end = function (cb) { - this._closed = true; +Pool.prototype.end = function (options, cb) { + if (!cb && typeof options === 'function') { + cb = options; + options = {}; + } if (typeof cb !== 'function') { - cb = function (err) { - if (err) throw err; - }; + cb = this._endCallback; } + options = options || {}; + + var readyToEnd = false; var calledBack = false; var waitingClose = 0; @@ -177,14 +200,26 @@ Pool.prototype.end = function (cb) { } } - while (this._allConnections.length !== 0) { - waitingClose++; - this._purgeConnection(this._allConnections[0], onEnd); + if (this._acquiringConnections.length === 0 && this._connectionQueue.length === 0) { + readyToEnd = true; } - if (waitingClose === 0) { - process.nextTick(onEnd); + if (!options.gracefulExit || readyToEnd) { + this._closed = true; + + while (this._allConnections.length !== 0) { + waitingClose++; + this._purgeConnection(this._allConnections[0], onEnd); + } + + if (waitingClose === 0) { + process.nextTick(onEnd); + } + return; } + + this._pendingClose = true; + this._endCallback = cb; }; Pool.prototype.query = function (sql, values, cb) { diff --git a/lib/PoolCluster.js b/lib/PoolCluster.js index d0aed2c75..39c739700 100644 --- a/lib/PoolCluster.js +++ b/lib/PoolCluster.js @@ -58,7 +58,12 @@ PoolCluster.prototype.add = function add(id, config) { this._clearFindCaches(); }; -PoolCluster.prototype.end = function end(callback) { +PoolCluster.prototype.end = function end(options, callback) { + if (!callback && typeof options === 'function') { + callback = options; + options = {}; + } + var cb = callback !== undefined ? callback : _cb; @@ -90,7 +95,7 @@ PoolCluster.prototype.end = function end(callback) { var node = this._nodes[nodeId]; waitingClose++; - node.pool.end(onEnd); + node.pool.end(options, onEnd); } if (waitingClose === 0) { diff --git a/test/unit/pool-cluster/test-end-graceful.js b/test/unit/pool-cluster/test-end-graceful.js new file mode 100644 index 000000000..6ee2b6ca2 --- /dev/null +++ b/test/unit/pool-cluster/test-end-graceful.js @@ -0,0 +1,33 @@ +var assert = require('assert'); +var common = require('../../common'); +var cluster = common.createPoolCluster(); +var server = common.createFakeServer(); + +var poolConfig = common.getTestConfig({port: common.fakeServerPort}); +cluster.add('SLAVE1', poolConfig); +cluster.add('SLAVE2', poolConfig); + +server.listen(common.fakeServerPort, function(err) { + assert.ifError(err); + + var pool = cluster.of('SLAVE*', 'ORDER'); + + pool.query('SELECT 1', function (err, rows) { + assert.ifError(err); + assert.equal(rows.length, 1); + assert.equal(rows[0]['1'], 1); + + pool.query('SELECT 2', function (err, rows) { + assert.ifError(err); + assert.equal(rows.length, 1); + assert.equal(rows[0]['2'], 2); + }); + + cluster.end({ + gracefulExit: true + }, function (err) { + assert.ifError(err); + server.destroy(); + }); + }); +}); diff --git a/test/unit/pool-cluster/test-end-query.js b/test/unit/pool-cluster/test-end-query.js new file mode 100644 index 000000000..1c999ef6b --- /dev/null +++ b/test/unit/pool-cluster/test-end-query.js @@ -0,0 +1,31 @@ +var assert = require('assert'); +var common = require('../../common'); +var cluster = common.createPoolCluster(); +var server = common.createFakeServer(); + +var poolConfig = common.getTestConfig({port: common.fakeServerPort}); +cluster.add('SLAVE1', poolConfig); +cluster.add('SLAVE2', poolConfig); + +server.listen(common.fakeServerPort, function(err) { + assert.ifError(err); + + var pool = cluster.of('SLAVE*', 'ORDER'); + + pool.query('SELECT 1', function (err, rows) { + assert.ifError(err); + assert.equal(rows.length, 1); + assert.equal(rows[0]['1'], 1); + + pool.query('SELECT 2', function (err) { + assert.ok(err); + assert.equal(err.message, 'Pool is closed.'); + assert.equal(err.code, 'POOL_CLOSED'); + }); + + cluster.end(function (err) { + assert.ifError(err); + server.destroy(); + }); + }); +}); diff --git a/test/unit/pool/test-graceful-exit-ping.js b/test/unit/pool/test-graceful-exit-ping.js new file mode 100644 index 000000000..5cfae044d --- /dev/null +++ b/test/unit/pool/test-graceful-exit-ping.js @@ -0,0 +1,47 @@ +var common = require('../../common'); +var assert = require('assert'); +var pool = common.createPool({ + connectionLimit : 1, + port : common.fakeServerPort, + queueLimit : 5, + waitForConnections : true +}); + +var server = common.createFakeServer(); + +server.listen(common.fakeServerPort, function (err) { + assert.ifError(err); + + pool.getConnection(function (err, conn) { + assert.ifError(err); + conn.release(); + + pool.getConnection(function (err, conn) { + assert.ifError(err); + conn.release(); + }); + + pool.end({ + gracefulExit: true + }, function (err) { + assert.ifError(err); + server.destroy(); + }); + + pool.getConnection(function (err) { + assert.ok(err); + assert.equal(err.message, 'Pool is closed.'); + assert.equal(err.code, 'POOL_CLOSED'); + }); + }); +}); + +server.on('connection', function (conn) { + conn.handshake(); + conn.on('ping', function () { + setTimeout(function () { + conn._sendPacket(new common.Packets.OkPacket()); + conn._parser.resetPacketNumber(); + }, 100); + }); +}); diff --git a/test/unit/pool/test-graceful-exit-query.js b/test/unit/pool/test-graceful-exit-query.js new file mode 100644 index 000000000..32f93c731 --- /dev/null +++ b/test/unit/pool/test-graceful-exit-query.js @@ -0,0 +1,38 @@ +var common = require('../../common'); +var assert = require('assert'); +var pool = common.createPool({ + connectionLimit : 2, + port : common.fakeServerPort, + queueLimit : 5, + waitForConnections : true +}); + +var server = common.createFakeServer(); + +server.listen(common.fakeServerPort, function (err) { + assert.ifError(err); + + pool.getConnection(function (err, conn) { + assert.ifError(err); + + pool.getConnection(function (err, conn2) { + assert.ifError(err); + + pool.query('SELECT 1', function (err, rows) { + assert.ifError(err); + assert.equal(rows.length, 1); + assert.equal(rows[0]['1'], 1); + }); + + pool.end({ + gracefulExit: true + }, function (err) { + assert.ifError(err); + server.destroy(); + }); + + conn.release(); + conn2.release(); + }); + }); +}); diff --git a/test/unit/pool/test-graceful-exit-queued.js b/test/unit/pool/test-graceful-exit-queued.js new file mode 100644 index 000000000..96553bafe --- /dev/null +++ b/test/unit/pool/test-graceful-exit-queued.js @@ -0,0 +1,38 @@ +var common = require('../../common'); +var assert = require('assert'); +var pool = common.createPool({ + connectionLimit : 1, + port : common.fakeServerPort, + queueLimit : 5, + waitForConnections : true +}); + +var server = common.createFakeServer(); + +server.listen(common.fakeServerPort, function (err) { + assert.ifError(err); + + pool.getConnection(function (err, conn) { + assert.ifError(err); + + pool.end({ + gracefulExit: true + }, function (err) { + assert.ifError(err); + server.destroy(); + }); + + pool.getConnection(function (err) { + assert.ok(err); + assert.equal(err.message, 'Pool is closed.'); + assert.equal(err.code, 'POOL_CLOSED'); + }); + + conn.release(); + }); + + pool.getConnection(function (err, conn) { + assert.ifError(err); + conn.release(); + }); +});