Skip to content

Commit 4a8b771

Browse files
committed
Ensure callbacks are executed for all queued queries after connection-level errors
Separates socket errors from error messages, sends socket errors to all queries in the queue, marks clients as unusable after socket errors. This is not very pleasant but should maintain backwards compatibility…?
1 parent 93fe0c7 commit 4a8b771

File tree

4 files changed

+69
-17
lines changed

4 files changed

+69
-17
lines changed

lib/client.js

+52-8
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ var Client = function (config) {
3636
this._connecting = false
3737
this._connected = false
3838
this._connectionError = false
39+
this._queryable = true
3940

4041
this.connection = c.connection || new Connection({
4142
stream: c.stream,
@@ -126,15 +127,39 @@ Client.prototype.connect = function (callback) {
126127
}
127128

128129
const connectedErrorHandler = (err) => {
130+
this._queryable = false
131+
132+
const enqueueError = (query) => {
133+
process.nextTick(() => {
134+
query.handleError(err, con)
135+
})
136+
}
137+
129138
if (this.activeQuery) {
130-
var activeQuery = self.activeQuery
139+
enqueueError(this.activeQuery)
131140
this.activeQuery = null
132-
return activeQuery.handleError(err, con)
133141
}
142+
143+
this.queryQueue.forEach(enqueueError)
144+
this.queryQueue = []
145+
134146
this.emit('error', err)
135147
}
136148

149+
const connectedErrorMessageHandler = (msg) => {
150+
const activeQuery = this.activeQuery
151+
152+
if (!activeQuery) {
153+
connectedErrorHandler(msg)
154+
return
155+
}
156+
157+
this.activeQuery = null
158+
activeQuery.handleError(msg, con)
159+
}
160+
137161
con.on('error', connectingErrorHandler)
162+
con.on('errorMessage', connectingErrorHandler)
138163

139164
// hook up query handling events to connection
140165
// after the connection initially becomes ready for queries
@@ -143,7 +168,9 @@ Client.prototype.connect = function (callback) {
143168
self._connected = true
144169
self._attachListeners(con)
145170
con.removeListener('error', connectingErrorHandler)
171+
con.removeListener('errorMessage', connectingErrorHandler)
146172
con.on('error', connectedErrorHandler)
173+
con.on('errorMessage', connectedErrorMessageHandler)
147174

148175
// process possible callback argument to Client#connect
149176
if (callback) {
@@ -353,7 +380,13 @@ Client.prototype._pulseQueryQueue = function () {
353380
if (this.activeQuery) {
354381
this.readyForQuery = false
355382
this.hasExecuted = true
356-
this.activeQuery.submit(this.connection)
383+
384+
const queryError = this.activeQuery.submit(this.connection)
385+
if (queryError) {
386+
this.activeQuery.handleError(queryError, this.connection)
387+
this.readyForQuery = true
388+
this._pulseQueryQueue()
389+
}
357390
} else if (this.hasExecuted) {
358391
this.activeQuery = null
359392
this.emit('drain')
@@ -389,25 +422,36 @@ Client.prototype.query = function (config, values, callback) {
389422
query._result._getTypeParser = this._types.getTypeParser.bind(this._types)
390423
}
391424

425+
if (!this._queryable) {
426+
query.handleError(new Error('Client has encountered a connection error and is not queryable'), this.connection)
427+
return
428+
}
429+
430+
if (this._ending) {
431+
query.handleError(new Error('Client was closed and is not queryable'), this.connection)
432+
return
433+
}
434+
392435
this.queryQueue.push(query)
393436
this._pulseQueryQueue()
394437
return result
395438
}
396439

397440
Client.prototype.end = function (cb) {
398441
this._ending = true
442+
399443
if (this.activeQuery) {
400444
// if we have an active query we need to force a disconnect
401445
// on the socket - otherwise a hung query could block end forever
402-
this.connection.stream.destroy(new Error('Connection terminated by user'))
403-
return cb ? cb() : Promise.resolve()
446+
this.connection.stream.destroy()
447+
} else {
448+
this.connection.end()
404449
}
450+
405451
if (cb) {
406-
this.connection.end()
407452
this.connection.once('end', cb)
408453
} else {
409-
return new global.Promise((resolve, reject) => {
410-
this.connection.end()
454+
return new Promise((resolve) => {
411455
this.connection.once('end', resolve)
412456
})
413457
}

lib/connection.js

+2-1
Original file line numberDiff line numberDiff line change
@@ -112,10 +112,11 @@ Connection.prototype.attachListeners = function (stream) {
112112
var packet = self._reader.read()
113113
while (packet) {
114114
var msg = self.parseMessage(packet)
115+
var eventName = msg.name === 'error' ? 'errorMessage' : msg.name
115116
if (self._emitMessage) {
116117
self.emit('message', msg)
117118
}
118-
self.emit(msg.name, msg)
119+
self.emit(eventName, msg)
119120
packet = self._reader.read()
120121
}
121122
})

lib/query.js

+3-8
Original file line numberDiff line numberDiff line change
@@ -147,22 +147,17 @@ Query.prototype.handleError = function (err, connection) {
147147

148148
Query.prototype.submit = function (connection) {
149149
if (typeof this.text !== 'string' && typeof this.name !== 'string') {
150-
const err = new Error('A query must have either text or a name. Supplying neither is unsupported.')
151-
connection.emit('error', err)
152-
connection.emit('readyForQuery')
153-
return
150+
return new Error('A query must have either text or a name. Supplying neither is unsupported.')
154151
}
155152
if (this.values && !Array.isArray(this.values)) {
156-
const err = new Error('Query values must be an array')
157-
connection.emit('error', err)
158-
connection.emit('readyForQuery')
159-
return
153+
return new Error('Query values must be an array')
160154
}
161155
if (this.requiresPreparation()) {
162156
this.prepare(connection)
163157
} else {
164158
connection.query(this.text)
165159
}
160+
return null
166161
}
167162

168163
Query.prototype.hasBeenParsed = function (connection) {

test/integration/client/error-handling-tests.js

+12
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,18 @@ suite.test('re-using connections results in promise rejection', (done) => {
5050
})
5151
})
5252

53+
suite.test('using a client after closing it results in error', (done) => {
54+
const client = new Client()
55+
client.connect(() => {
56+
client.end(assert.calls(() => {
57+
client.query('SELECT 1', assert.calls((err) => {
58+
assert.equal(err.message, 'Client was closed and is not queryable')
59+
done()
60+
}))
61+
}))
62+
})
63+
})
64+
5365
suite.test('query receives error on client shutdown', function (done) {
5466
var client = new Client()
5567
client.connect(assert.success(function () {

0 commit comments

Comments
 (0)