Skip to content

Commit daa7523

Browse files
committed
Remove a bunch of additional code
1 parent e3cb40c commit daa7523

File tree

4 files changed

+28
-49
lines changed

4 files changed

+28
-49
lines changed

packages/pg-query-stream/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ I'm very open to contribution! Open a pull request with your code or idea and w
4747

4848
The MIT License (MIT)
4949

50-
Copyright (c) 2013 Brian M. Carlson
50+
Copyright (c) 2013-2019 Brian M. Carlson
5151

5252
Permission is hereby granted, free of charge, to any person obtaining a copy
5353
of this software and associated documentation files (the "Software"), to deal

packages/pg-query-stream/index.js

Lines changed: 3 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,6 @@ class PgQueryStream extends Readable {
88
super({ objectMode: true, emitClose: true, autoDestroy: true, highWaterMark: batchSize })
99
this.cursor = new Cursor(text, values, config)
1010

11-
this._reading = false
12-
this._callbacks = []
13-
this._err = undefined;
14-
1511
// delegate Submittable callbacks to cursor
1612
this.handleRowDescription = this.cursor.handleRowDescription.bind(this.cursor)
1713
this.handleDataRow = this.cursor.handleDataRow.bind(this.cursor)
@@ -25,42 +21,16 @@ class PgQueryStream extends Readable {
2521
this.cursor.submit(connection)
2622
}
2723

28-
close(callback) {
29-
if (this.destroyed) {
30-
if (callback) setImmediate(callback)
31-
} else {
32-
if (callback) this.once('close', callback)
33-
this.destroy()
34-
}
35-
}
36-
37-
_close() {
24+
_destroy(_err, cb) {
3825
this.cursor.close((err) => {
39-
let cb
40-
while ((cb = this._callbacks.pop())) cb(err || this._err)
26+
cb && cb(err || _err)
4127
})
4228
}
4329

44-
_destroy(_err, callback) {
45-
this._err = _err;
46-
this._callbacks.push(callback)
47-
if (!this._reading) {
48-
this._close()
49-
}
50-
}
51-
5230
// https://nodejs.org/api/stream.html#stream_readable_read_size_1
5331
_read(size) {
54-
// Prevent _destroy() from closing while reading
55-
this._reading = true
56-
5732
this.cursor.read(size, (err, rows, result) => {
58-
this._reading = false
59-
60-
if (this.destroyed) {
61-
// Destroyed while reading?
62-
this._close()
63-
} else if (err) {
33+
if (err) {
6434
// https://nodejs.org/api/stream.html#stream_errors_while_reading
6535
this.destroy(err)
6636
} else {

packages/pg-query-stream/test/async-iterator.es6

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ describe('Async iterator', () => {
5959
const pool = new pg.Pool({ max: 1 })
6060
const client = await pool.connect()
6161
const rows = []
62-
for await (const row of client.query(new QueryStream(queryText, []))) {
62+
for await (const row of client.query(new QueryStream(queryText, [], { batchSize: 1 }))) {
6363
rows.push(row)
6464
break;
6565
}

packages/pg-query-stream/test/close.js

Lines changed: 23 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,10 @@ var concat = require('concat-stream')
44
var QueryStream = require('../')
55
var helper = require('./helper')
66

7+
if (process.version.startsWith('v8.')) {
8+
return console.error('warning! node versions less than 10lts no longer supported & stream closing semantics may not behave properly');
9+
}
10+
711
helper('close', function (client) {
812
it('emits close', function (done) {
913
var stream = new QueryStream('SELECT * FROM generate_series(0, $1) num', [3], { batchSize: 2, highWaterMark: 2 })
@@ -23,27 +27,32 @@ helper('early close', function (client) {
2327
query.read()
2428
})
2529
query.once('readable', function () {
26-
query.close()
30+
query.destroy()
2731
})
2832
query.on('close', function () {
2933
assert(readCount < 10, 'should not have read more than 10 rows')
3034
done()
3135
})
3236
})
33-
})
3437

35-
helper('close callback', function (client) {
36-
it('notifies an optional callback when the conneciton is closed', function (done) {
37-
var stream = new QueryStream('SELECT * FROM generate_series(0, $1) num', [10], { batchSize: 2, highWaterMark: 2 })
38-
var query = client.query(stream)
39-
query.once('readable', function () { // only reading once
40-
query.read()
41-
})
42-
query.once('readable', function () {
43-
query.close(function () {
44-
// nothing to assert. This test will time out if the callback does not work.
45-
done()
46-
})
38+
it('can destroy stream while reading', function (done) {
39+
var stream = new QueryStream('SELECT * FROM generate_series(0, 100), pg_sleep(1)')
40+
client.query(stream)
41+
stream.on('data', () => done(new Error('stream should not have returned rows')))
42+
setTimeout(() => {
43+
stream.destroy()
44+
stream.on('close', done)
45+
}, 100)
46+
})
47+
48+
it('can destroy stream while reading an error', function (done) {
49+
var stream = new QueryStream('SELECT * from pg_sleep(1), basdfasdf;')
50+
client.query(stream)
51+
stream.on('data', () => done(new Error('stream should not have returned rows')))
52+
stream.once('error', () => {
53+
stream.destroy()
54+
// wait a bit to let any other errors shake through
55+
setTimeout(done, 100)
4756
})
4857
})
4958
})

0 commit comments

Comments
 (0)