Skip to content

Commit 8bbd42a

Browse files
committed
back port connection leak fix from 2.0
1 parent bcf5cb6 commit 8bbd42a

File tree

1 file changed

+126
-46
lines changed

1 file changed

+126
-46
lines changed

index.js

+126-46
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
11
var inherits = require('util').inherits
22
var EventEmitter = require('events').EventEmitter
33
var Transaction = require('any-db-transaction')
4-
var Pool = require('generic-pool').Pool
5-
var once = require('once')
4+
var GenericPool = require('generic-pool').Pool
65
var chain = require('./lib/chain')
76

87
module.exports = ConnectionPool
@@ -17,12 +16,14 @@ function ConnectionPool(adapter, connParams, options) {
1716

1817
options = options || {}
1918
connParams = connParams || {}
20-
21-
if (options.create || options.destroy) {
22-
throw new Error("Use onConnect/reset options instead of create/destroy.")
19+
if (options.create) {
20+
console.warn("PoolConfig.create ignored, use PoolConfig.onConnect instead")
21+
}
22+
if (options.destroy) {
23+
console.warn("PoolConfig.destroy ignored")
2324
}
2425

25-
if (connParams.adapter == 'sqlite3'
26+
if (adapter.name == 'sqlite3'
2627
&& /:memory:$/i.test(connParams.database)
2728
&& (options.min > 1 || options.max > 1))
2829
{
@@ -34,74 +35,142 @@ function ConnectionPool(adapter, connParams, options) {
3435
if (options.min) options.min = 1
3536
options.max = 1
3637
}
37-
38+
39+
var onConnect = options.onConnect || function (c, done) { done(null, c) }
40+
3841
var poolOpts = {
3942
min: options.min || 0,
4043
max: options.max || 10,
4144
create: function (ready) {
4245
adapter.createConnection(connParams, function (err, conn) {
4346
if (err) return ready(err);
44-
else if (options.onConnect) options.onConnect(conn, ready)
45-
else ready(null, conn)
47+
48+
onConnect(conn, function (err, connection) {
49+
if (err) return ready(err);
50+
conn.on('error', self._handleIdleError)
51+
ready(null, connection)
52+
})
4653
})
4754
},
48-
destroy: function (conn) {
49-
conn.end()
50-
conn._events = {}
55+
56+
destroy: function (connection) {
57+
connection.removeAllListeners()
58+
connection.on('error', function () {})
59+
connection.end()
5160
},
5261

53-
log: options.log
62+
log: options.log,
63+
64+
idleTimeoutMillis: options.idleTimeout,
65+
reapIntervalMillis: options.reapInterval,
66+
}
67+
68+
if (options.hasOwnProperty('refreshIdle')) {
69+
poolOpts.refreshIdle = options.refreshIdle
5470
}
5571

56-
var resetSteps = [];
72+
this._pool = new GenericPool(poolOpts)
73+
74+
var resetSteps = []
5775
if (adapter.reset) resetSteps.unshift(adapter.reset)
5876
if (options.reset) resetSteps.unshift(options.reset)
59-
this.adapter = adapter.name
60-
this._adapter = adapter
77+
this.adapter = adapter
6178
this._reset = chain(resetSteps)
62-
this._pool = new Pool(poolOpts)
79+
80+
this._shouldDestroyConnection = options.shouldDestroyConnection || function (err) { return true }
81+
82+
var self = this
83+
self._handleIdleError = function (err) {
84+
var connection = this
85+
self._maybeDestroy(connection, err)
86+
self.emit('error', err, connection)
87+
}
6388
}
6489

6590
ConnectionPool.prototype.query = function (statement, params, callback) {
6691
var self = this
67-
, query = this._adapter.createQuery(statement, params, callback)
68-
69-
this.acquire(function (err, conn) {
70-
if (err) {
71-
if (typeof params === 'function') {
72-
return params(err)
73-
} else if (callback) {
74-
return callback(err);
75-
} else {
76-
return query.emit('error', err);
77-
}
92+
, query = this.adapter.createQuery(statement, params, callback)
93+
, connection = null
94+
95+
if (query.callback) {
96+
callback = query.callback
97+
query.callback = function (err, result) {
98+
self._maybeDestroy(connection, err)
99+
callback(err, result)
78100
}
79-
conn.query(query);
80-
self.emit('query', query)
81-
var release = once(self.release.bind(self, conn))
82-
query.once('end', release).once('error', function (err) {
83-
release()
84-
// If this was the only error listener, re-emit the error.
101+
} else {
102+
var finished = false
103+
query.once('end', function () {
104+
if (!finished) {
105+
finished = true
106+
self.release(connection)
107+
}
108+
})
109+
query.once('error', function (err) {
110+
if (!finished) {
111+
finished = true
112+
self._maybeDestroy(connection, err)
113+
}
114+
// If this was the only error listener, re-emit the error from the pool.
85115
if (!this.listeners('error').length) {
86-
this.emit('error', err)
116+
self.emit('error', err, query)
87117
}
88118
})
119+
}
120+
121+
/**
122+
* if a connection cannot be acquired, or emits an 'error' event while a
123+
* query is in progress, the error should be handled by the query object.
124+
*/
125+
var handleConnectionError = function (error) {
126+
self._maybeDestroy(connection, error)
127+
if (query.callback) {
128+
query.callback(error)
129+
} else {
130+
query.emit('error', error)
131+
}
132+
}
133+
134+
this.acquire(function (err, connection_) {
135+
if (err) {
136+
return handleConnectionError(err)
137+
}
138+
139+
140+
// expose the connection to everything else in the outer scope
141+
connection = connection_
142+
143+
// attach error event listener to the connection
144+
connection.on('error', handleConnectionError)
145+
query.once('end', function () {
146+
connection.removeListener('error', handleConnectionError)
147+
})
148+
149+
connection.query(query);
150+
self.emit('query', query)
89151
})
90152

91153
return query
92154
}
93155

94156
ConnectionPool.prototype.acquire = function (callback) {
95-
this.emit('acquire')
96-
this._pool.acquire(callback);
157+
var self = this
158+
self._pool.acquire(function (err, connection) {
159+
if (err) return callback(err);
160+
connection.removeListener('error', self._handleIdleError)
161+
self.emit('acquire', connection)
162+
callback(null, connection)
163+
});
97164
}
98165

99166
ConnectionPool.prototype.release = function (connection) {
100-
this.emit('release')
101-
var pool = this._pool
102-
this._reset(connection, function (err) {
103-
if (err) return pool.destroy(connection)
104-
pool.release(connection)
167+
var self = this
168+
self.emit('release', connection)
169+
connection.removeAllListeners()
170+
self._reset(connection, function (err) {
171+
if (err) return self.destroy(connection);
172+
connection.on('error', self._handleIdleError)
173+
self._pool.release(connection)
105174
})
106175
}
107176

@@ -112,14 +181,25 @@ ConnectionPool.prototype.destroy = function (connection) {
112181
ConnectionPool.prototype.close = function (callback) {
113182
var self = this
114183
this._pool.drain(function () {
115-
self._pool.destroyAllNow()
116-
self.emit('close')
117-
if (callback) callback()
184+
self._pool.destroyAllNow(function () {
185+
self.emit('close')
186+
if (callback) callback()
187+
})
118188
})
119189
}
120190

191+
ConnectionPool.prototype._maybeDestroy = function (connection, error) {
192+
if (connection) {
193+
if (error && this._shouldDestroyConnection(error)) {
194+
this.destroy(connection)
195+
} else {
196+
this.release(connection)
197+
}
198+
}
199+
}
200+
121201
ConnectionPool.prototype.begin = function (beginStatement, callback) {
122-
var tx = Transaction.begin(this._adapter.createQuery, beginStatement, callback)
202+
var tx = Transaction.begin(this.adapter.createQuery, beginStatement, callback)
123203

124204
var pool = this
125205
this.acquire(function (err, connection) {

0 commit comments

Comments
 (0)