Skip to content

Commit e5d4674

Browse files
committed
Work in progress...convert to more efficient reader
1 parent cccf84e commit e5d4674

File tree

13 files changed

+1249
-61
lines changed

13 files changed

+1249
-61
lines changed

.gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -5,3 +5,4 @@ build/
55
node_modules/
66
package-lock.json
77
*.swp
8+
dist
+22
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
{
2+
"name": "pg-packet-stream",
3+
"version": "1.0.0",
4+
"main": "dist/index.js",
5+
"license": "MIT",
6+
"devDependencies": {
7+
"@types/node": "^12.12.21",
8+
"chunky": "^0.0.0",
9+
"mocha": "^6.2.2",
10+
"typescript": "^3.7.3"
11+
},
12+
"scripts": {
13+
"test": "mocha -r ts-node/register src/**/*.test.ts"
14+
},
15+
"dependencies": {
16+
"@types/chai": "^4.2.7",
17+
"@types/mocha": "^5.2.7",
18+
"chai": "^4.2.0",
19+
"mocha": "^6.2.2",
20+
"ts-node": "^8.5.4"
21+
}
22+
}
+103
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
import 'mocha';
2+
import { PgPacketStream, Packet } from './'
3+
import { expect } from 'chai'
4+
import chunky from 'chunky'
5+
6+
const consume = async (stream: PgPacketStream, count: number): Promise<Packet[]> => {
7+
const result: Packet[] = [];
8+
9+
return new Promise((resolve) => {
10+
const read = () => {
11+
stream.once('readable', () => {
12+
let packet;
13+
while (packet = stream.read()) {
14+
result.push(packet)
15+
}
16+
if (result.length === count) {
17+
resolve(result);
18+
} else {
19+
read()
20+
}
21+
22+
})
23+
}
24+
read()
25+
})
26+
}
27+
28+
const emptyMessage = Buffer.from([0x0a, 0x00, 0x00, 0x00, 0x04])
29+
const oneByteMessage = Buffer.from([0x0b, 0x00, 0x00, 0x00, 0x05, 0x0a])
30+
const bigMessage = Buffer.from([0x0f, 0x00, 0x00, 0x00, 0x14, 0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0a, 0x0b, 0x0c, 0x0d, 0x0e0, 0x0f])
31+
32+
describe('PgPacketStream', () => {
33+
it('should chunk a perfect input packet', async () => {
34+
const stream = new PgPacketStream()
35+
stream.write(Buffer.from([0x01, 0x00, 0x00, 0x00, 0x04]))
36+
stream.end()
37+
const buffers = await consume(stream, 1)
38+
expect(buffers).to.have.length(1)
39+
expect(buffers[0].packet).to.deep.equal(Buffer.from([0x1, 0x00, 0x00, 0x00, 0x04]))
40+
});
41+
42+
it('should read 2 chunks into perfect input packet', async () => {
43+
const stream = new PgPacketStream()
44+
stream.write(Buffer.from([0x01, 0x00, 0x00, 0x00, 0x08]))
45+
stream.write(Buffer.from([0x1, 0x2, 0x3, 0x4]))
46+
stream.end()
47+
const buffers = await consume(stream, 1)
48+
expect(buffers).to.have.length(1)
49+
expect(buffers[0].packet).to.deep.equal(Buffer.from([0x1, 0x00, 0x00, 0x00, 0x08, 0x1, 0x2, 0x3, 0x4]))
50+
});
51+
52+
it('should read a bunch of big messages', async () => {
53+
const stream = new PgPacketStream();
54+
let totalBuffer = Buffer.allocUnsafe(0);
55+
const num = 2;
56+
for (let i = 0; i < 2; i++) {
57+
totalBuffer = Buffer.concat([totalBuffer, bigMessage, bigMessage])
58+
}
59+
const chunks = chunky(totalBuffer)
60+
for (const chunk of chunks) {
61+
stream.write(chunk)
62+
}
63+
stream.end()
64+
const messages = await consume(stream, num * 2)
65+
expect(messages.map(x => x.code)).to.eql(new Array(num * 2).fill(0x0f))
66+
})
67+
68+
it('should read multiple messages in a single chunk', async () => {
69+
const stream = new PgPacketStream()
70+
stream.write(Buffer.from([0x01, 0x00, 0x00, 0x00, 0x04, 0x01, 0x00, 0x00, 0x00, 0x04]))
71+
stream.end()
72+
const buffers = await consume(stream, 2)
73+
expect(buffers).to.have.length(2)
74+
expect(buffers[0].packet).to.deep.equal(Buffer.from([0x1, 0x00, 0x00, 0x00, 0x04]))
75+
expect(buffers[1].packet).to.deep.equal(Buffer.from([0x1, 0x00, 0x00, 0x00, 0x04]))
76+
});
77+
78+
it('should read multiple chunks into multiple packets', async () => {
79+
const stream = new PgPacketStream()
80+
stream.write(Buffer.from([0x01, 0x00, 0x00, 0x00, 0x05, 0x0a, 0x01, 0x00, 0x00, 0x00, 0x05, 0x0b]))
81+
stream.write(Buffer.from([0x01, 0x00, 0x00]));
82+
stream.write(Buffer.from([0x00, 0x06, 0x0c, 0x0d, 0x03, 0x00, 0x00, 0x00, 0x04]))
83+
stream.end()
84+
const buffers = await consume(stream, 4)
85+
expect(buffers).to.have.length(4)
86+
expect(buffers[0].packet).to.deep.equal(Buffer.from([0x1, 0x00, 0x00, 0x00, 0x05, 0x0a]))
87+
expect(buffers[1].packet).to.deep.equal(Buffer.from([0x1, 0x00, 0x00, 0x00, 0x05, 0x0b]))
88+
expect(buffers[2].packet).to.deep.equal(Buffer.from([0x1, 0x00, 0x00, 0x00, 0x06, 0x0c, 0x0d]))
89+
expect(buffers[3].packet).to.deep.equal(Buffer.from([0x3, 0x00, 0x00, 0x00, 0x04]))
90+
});
91+
92+
it('reads packet that spans multiple chunks', async () => {
93+
const stream = new PgPacketStream()
94+
stream.write(Buffer.from([0x0d, 0x00, 0x00, 0x00]))
95+
stream.write(Buffer.from([0x09])) // length
96+
stream.write(Buffer.from([0x0a, 0x0b, 0x0c, 0x0d]))
97+
stream.write(Buffer.from([0x0a, 0x0b, 0x0c, 0x0d]))
98+
stream.write(Buffer.from([0x0a, 0x0b, 0x0c, 0x0d]))
99+
stream.end()
100+
const buffers = await consume(stream, 1)
101+
expect(buffers).to.have.length(1)
102+
})
103+
});
+177
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
import { Transform, TransformCallback, TransformOptions } from 'stream';
2+
import assert from 'assert'
3+
4+
export const hello = () => 'Hello world!'
5+
6+
// this is a single byte
7+
const CODE_LENGTH = 1;
8+
// this is a Uint32
9+
const LEN_LENGTH = 4;
10+
11+
export type Packet = {
12+
code: number;
13+
packet: Buffer;
14+
}
15+
16+
type FieldFormat = "text" | "binary"
17+
18+
class Field {
19+
constructor(public name: string) {
20+
21+
}
22+
23+
}
24+
25+
const emptyBuffer = Buffer.allocUnsafe(0);
26+
27+
class BufferReader {
28+
private buffer: Buffer = emptyBuffer;
29+
constructor(private offset: number = 0) {
30+
31+
}
32+
33+
public setBuffer(offset: number, buffer: Buffer): void {
34+
this.offset = offset;
35+
this.buffer = buffer;
36+
}
37+
38+
public int16() {
39+
const result = this.buffer.readInt16BE(this.offset);
40+
this.offset += 2;
41+
return result;
42+
}
43+
44+
public int32() {
45+
const result = this.buffer.readInt32BE(this.offset);
46+
this.offset += 4;
47+
return result;
48+
}
49+
50+
public string(length: number): string {
51+
// TODO(bmc): support non-utf8 encoding
52+
const result = this.buffer.toString('utf8', this.offset, this.offset + length)
53+
this.offset += length;
54+
return result;
55+
}
56+
57+
public bytes(length: number): Buffer {
58+
const result = this.buffer.slice(this.offset, this.offset + length);
59+
this.offset += length;
60+
return result
61+
}
62+
}
63+
64+
type Mode = 'text' | 'binary';
65+
66+
type StreamOptions = TransformOptions & {
67+
mode: Mode
68+
}
69+
70+
const parseComplete = {
71+
name: 'parseComplete',
72+
length: 5,
73+
};
74+
75+
const bindComplete = {
76+
name: 'bindComplete',
77+
length: 5,
78+
}
79+
80+
const closeComplete = {
81+
name: 'closeComplete',
82+
length: 5,
83+
}
84+
85+
export class PgPacketStream extends Transform {
86+
private remainingBuffer: Buffer = emptyBuffer;
87+
private reader = new BufferReader();
88+
private mode: Mode;
89+
90+
constructor(opts: StreamOptions) {
91+
super({
92+
...opts,
93+
readableObjectMode: true
94+
})
95+
if (opts.mode === 'binary') {
96+
throw new Error('Binary mode not supported yet')
97+
}
98+
this.mode = opts.mode;
99+
}
100+
101+
public _transform(buffer: Buffer, encoding: string, callback: TransformCallback) {
102+
const combinedBuffer = this.remainingBuffer.byteLength ? Buffer.concat([this.remainingBuffer, buffer], this.remainingBuffer.length + buffer.length) : buffer;
103+
let offset = 0;
104+
while ((offset + CODE_LENGTH + LEN_LENGTH) <= combinedBuffer.byteLength) {
105+
// code is 1 byte long - it identifies the message type
106+
const code = combinedBuffer[offset];
107+
108+
// length is 1 Uint32BE - it is the length of the message EXCLUDING the code
109+
const length = combinedBuffer.readUInt32BE(offset + CODE_LENGTH);
110+
111+
const fullMessageLength = CODE_LENGTH + length;
112+
113+
if (fullMessageLength + offset <= combinedBuffer.byteLength) {
114+
this.handlePacket(offset, code, length, combinedBuffer);
115+
offset += fullMessageLength;
116+
} else {
117+
break;
118+
}
119+
}
120+
121+
if (offset === combinedBuffer.byteLength) {
122+
this.remainingBuffer = emptyBuffer;
123+
} else {
124+
this.remainingBuffer = combinedBuffer.slice(offset)
125+
}
126+
127+
callback(null);
128+
}
129+
130+
private handlePacket(offset: number, code: number, length: number, combinedBuffer: Buffer) {
131+
switch (code) {
132+
case 0x44: // D
133+
this.parseDataRowMessage(offset, length, combinedBuffer);
134+
break;
135+
case 0x32: // 2
136+
this.emit('message', bindComplete);
137+
break;
138+
case 0x31: // 1
139+
this.emit('message', parseComplete);
140+
break;
141+
case 0x33: // 3
142+
this.emit('message', closeComplete);
143+
break;
144+
default:
145+
const packet = combinedBuffer.slice(offset, CODE_LENGTH + length + offset)
146+
this.push({ code, length, packet, buffer: packet.slice(5) })
147+
}
148+
}
149+
150+
public _flush(callback: TransformCallback) {
151+
}
152+
153+
private parseDataRowMessage(offset: number, length: number, bytes: Buffer) {
154+
this.reader.setBuffer(offset + 5, bytes);
155+
const fieldCount = this.reader.int16();
156+
const fields: any[] = new Array(fieldCount);
157+
for (let i = 0; i < fieldCount; i++) {
158+
const len = this.reader.int32();
159+
if (len === -1) {
160+
fields[i] = null
161+
} else if (this.mode === 'text') {
162+
fields[i] = this.reader.string(len)
163+
}
164+
}
165+
const message = new DataRowMessage(length, fields);
166+
this.emit('message', message);
167+
}
168+
}
169+
170+
171+
class DataRowMessage {
172+
public readonly fieldCount: number;
173+
public readonly name: string = 'dataRow'
174+
constructor(public length: number, public fields: any[]) {
175+
this.fieldCount = fields.length;
176+
}
177+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
declare module 'chunky'
+23
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
{
2+
"compilerOptions": {
3+
"module": "commonjs",
4+
"esModuleInterop": true,
5+
"allowSyntheticDefaultImports": true,
6+
"strict": true,
7+
"target": "es6",
8+
"noImplicitAny": true,
9+
"moduleResolution": "node",
10+
"sourceMap": true,
11+
"outDir": "dist",
12+
"baseUrl": ".",
13+
"paths": {
14+
"*": [
15+
"node_modules/*",
16+
"src/types/*"
17+
]
18+
}
19+
},
20+
"include": [
21+
"src/**/*"
22+
]
23+
}

packages/pg/bench.js

+42
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
const pg = require("./lib");
2+
const pool = new pg.Pool()
3+
4+
const q = {
5+
text:
6+
"select typname, typnamespace, typowner, typlen, typbyval, typcategory, typispreferred, typisdefined, typdelim, typrelid, typelem, typarray from pg_type where typtypmod = $1 and typisdefined = $2",
7+
values: [-1, true]
8+
};
9+
10+
const exec = async client => {
11+
const result = await client.query({
12+
text: q.text,
13+
values: q.values,
14+
rowMode: "array"
15+
});
16+
};
17+
18+
const bench = async (client, time) => {
19+
let start = Date.now();
20+
let count = 0;
21+
while (true) {
22+
await exec(client);
23+
count++;
24+
if (Date.now() - start > time) {
25+
return count;
26+
}
27+
}
28+
};
29+
30+
const run = async () => {
31+
const client = new pg.Client();
32+
await client.connect();
33+
await bench(client, 1000);
34+
console.log("warmup done");
35+
const seconds = 5;
36+
const queries = await bench(client, seconds * 1000);
37+
console.log("queries:", queries);
38+
console.log("qps", queries / seconds);
39+
await client.end();
40+
};
41+
42+
run().catch(e => console.error(e) || process.exit(-1));

0 commit comments

Comments
 (0)