Skip to content

Commit 9ba49b7

Browse files
authored
Merge pull request #2241 from PruvoNet/#2240
fix: major performance issues with bytea performance #2240
2 parents 3447319 + 1d3f155 commit 9ba49b7

File tree

2 files changed

+69
-22
lines changed

2 files changed

+69
-22
lines changed

packages/pg-protocol/src/parser.ts

+51-19
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,9 @@ const enum MessageCodes {
7474
export type MessageCallback = (msg: BackendMessage) => void
7575

7676
export class Parser {
77-
private remainingBuffer: Buffer = emptyBuffer
77+
private buffer: Buffer = emptyBuffer
78+
private bufferLength: number = 0
79+
private bufferOffset: number = 0
7880
private reader = new BufferReader()
7981
private mode: Mode
8082

@@ -86,35 +88,65 @@ export class Parser {
8688
}
8789

8890
public parse(buffer: Buffer, callback: MessageCallback) {
89-
let combinedBuffer = buffer
90-
if (this.remainingBuffer.byteLength) {
91-
combinedBuffer = Buffer.allocUnsafe(this.remainingBuffer.byteLength + buffer.byteLength)
92-
this.remainingBuffer.copy(combinedBuffer)
93-
buffer.copy(combinedBuffer, this.remainingBuffer.byteLength)
94-
}
95-
let offset = 0
96-
while (offset + HEADER_LENGTH <= combinedBuffer.byteLength) {
91+
this.mergeBuffer(buffer)
92+
const bufferFullLength = this.bufferOffset + this.bufferLength
93+
let offset = this.bufferOffset
94+
while (offset + HEADER_LENGTH <= bufferFullLength) {
9795
// code is 1 byte long - it identifies the message type
98-
const code = combinedBuffer[offset]
99-
96+
const code = this.buffer[offset]
10097
// length is 1 Uint32BE - it is the length of the message EXCLUDING the code
101-
const length = combinedBuffer.readUInt32BE(offset + CODE_LENGTH)
102-
98+
const length = this.buffer.readUInt32BE(offset + CODE_LENGTH)
10399
const fullMessageLength = CODE_LENGTH + length
104-
105-
if (fullMessageLength + offset <= combinedBuffer.byteLength) {
106-
const message = this.handlePacket(offset + HEADER_LENGTH, code, length, combinedBuffer)
100+
if (fullMessageLength + offset <= bufferFullLength) {
101+
const message = this.handlePacket(offset + HEADER_LENGTH, code, length, this.buffer)
107102
callback(message)
108103
offset += fullMessageLength
109104
} else {
110105
break
111106
}
112107
}
108+
if (offset === bufferFullLength) {
109+
// No more use for the buffer
110+
this.buffer = emptyBuffer
111+
this.bufferLength = 0
112+
this.bufferOffset = 0
113+
} else {
114+
// Adjust the cursors of remainingBuffer
115+
this.bufferLength = bufferFullLength - offset
116+
this.bufferOffset = offset
117+
}
118+
}
113119

114-
if (offset === combinedBuffer.byteLength) {
115-
this.remainingBuffer = emptyBuffer
120+
private mergeBuffer(buffer: Buffer): void {
121+
if (this.bufferLength > 0) {
122+
const newLength = this.bufferLength + buffer.byteLength
123+
const newFullLength = newLength + this.bufferOffset
124+
if (newFullLength > this.buffer.byteLength) {
125+
// We can't concat the new buffer with the remaining one
126+
let newBuffer: Buffer
127+
if (newLength <= this.buffer.byteLength && this.bufferOffset >= this.bufferLength) {
128+
// We can move the relevant part to the beginning of the buffer instead of allocating a new buffer
129+
newBuffer = this.buffer
130+
} else {
131+
// Allocate a new larger buffer
132+
let newBufferLength = this.buffer.byteLength * 2
133+
while (newLength >= newBufferLength) {
134+
newBufferLength *= 2
135+
}
136+
newBuffer = Buffer.allocUnsafe(newBufferLength)
137+
}
138+
// Move the remaining buffer to the new one
139+
this.buffer.copy(newBuffer, 0, this.bufferOffset, this.bufferOffset + this.bufferLength)
140+
this.buffer = newBuffer
141+
this.bufferOffset = 0
142+
}
143+
// Concat the new buffer with the remaining one
144+
buffer.copy(this.buffer, this.bufferOffset + this.bufferLength)
145+
this.bufferLength = newLength
116146
} else {
117-
this.remainingBuffer = combinedBuffer.slice(offset)
147+
this.buffer = buffer
148+
this.bufferOffset = 0
149+
this.bufferLength = buffer.byteLength
118150
}
119151
}
120152

packages/pg/bench.js

+18-3
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
const pg = require('./lib')
2-
const pool = new pg.Pool()
32

43
const params = {
54
text:
@@ -17,7 +16,7 @@ const seq = {
1716
}
1817

1918
const exec = async (client, q) => {
20-
const result = await client.query({
19+
await client.query({
2120
text: q.text,
2221
values: q.values,
2322
rowMode: 'array',
@@ -39,7 +38,9 @@ const bench = async (client, q, time) => {
3938
const run = async () => {
4039
const client = new pg.Client()
4140
await client.connect()
41+
console.log('start')
4242
await client.query('CREATE TEMP TABLE foobar(name TEXT, age NUMERIC)')
43+
await client.query('CREATE TEMP TABLE buf(name TEXT, data BYTEA)')
4344
await bench(client, params, 1000)
4445
console.log('warmup done')
4546
const seconds = 5
@@ -61,7 +62,21 @@ const run = async () => {
6162
console.log('insert queries:', queries)
6263
console.log('qps', queries / seconds)
6364
console.log('on my laptop best so far seen 5799 qps')
64-
console.log()
65+
66+
console.log('')
67+
console.log('Warming up bytea test')
68+
await client.query({
69+
text: 'INSERT INTO buf(name, data) VALUES ($1, $2)',
70+
values: ['test', Buffer.allocUnsafe(104857600)],
71+
})
72+
console.log('bytea warmup done')
73+
const start = Date.now()
74+
const results = await client.query('SELECT * FROM buf')
75+
const time = Date.now() - start
76+
console.log('bytea time:', time, 'ms')
77+
console.log('bytea length:', results.rows[0].data.byteLength, 'bytes')
78+
console.log('on my laptop best so far seen 1107ms and 104857600 bytes')
79+
6580
await client.end()
6681
await client.end()
6782
}

0 commit comments

Comments
 (0)