Skip to content

Commit 9470c45

Browse files
committed
fix(NODE-6630): read all messages in buffer when chunk arrives
1 parent 4f03359 commit 9470c45

File tree

2 files changed

+83
-11
lines changed

2 files changed

+83
-11
lines changed

src/cmap/connection.ts

Lines changed: 36 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import {
2727
MongoNetworkTimeoutError,
2828
MongoOperationTimeoutError,
2929
MongoParseError,
30+
MongoRuntimeError,
3031
MongoServerError,
3132
MongoUnexpectedServerResponseError
3233
} from '../error';
@@ -791,22 +792,46 @@ export class SizedMessageTransform extends Transform {
791792
}
792793

793794
this.bufferPool.append(chunk);
794-
const sizeOfMessage = this.bufferPool.getInt32();
795795

796-
if (sizeOfMessage == null) {
797-
return callback();
798-
}
796+
while (this.bufferPool.length) {
797+
// While there are any bytes in the buffer
799798

800-
if (sizeOfMessage < 0) {
801-
return callback(new MongoParseError(`Invalid message size: ${sizeOfMessage}, too small`));
802-
}
799+
// Try to fetch a size from the top 4 bytes
800+
const sizeOfMessage = this.bufferPool.getInt32();
801+
802+
if (sizeOfMessage == null) {
803+
// Not even an int32 worth of data. Stop the loop, we need more chunks.
804+
break;
805+
}
803806

804-
if (sizeOfMessage > this.bufferPool.length) {
805-
return callback();
807+
if (sizeOfMessage < 0) {
808+
// The size in the message has a negative value, this is probably corruption, throw:
809+
return callback(new MongoParseError(`Message size cannot be negative: ${sizeOfMessage}`));
810+
}
811+
812+
if (sizeOfMessage > this.bufferPool.length) {
813+
// We do not have enough bytes to make a sizeOfMessage chunk
814+
break;
815+
}
816+
817+
// Add a message to the stream
818+
const message = this.bufferPool.read(sizeOfMessage);
819+
820+
if (!this.push(message)) {
821+
// We only subscribe to data events so we should never get backpressure
822+
// if we do, we do not have the handling for it.
823+
return callback(
824+
new MongoRuntimeError(`SizedMessageTransform does not support backpressure`)
825+
);
826+
}
806827
}
807828

808-
const message = this.bufferPool.read(sizeOfMessage);
809-
return callback(null, message);
829+
callback();
830+
}
831+
832+
override pipe<T extends NodeJS.WritableStream>(destination: T, options?: { end?: boolean }): T {
833+
destination.on('drain', this.emit.bind('drain'));
834+
return super.pipe(destination, options);
810835
}
811836
}
812837

test/unit/cmap/connection.test.ts

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import { Socket } from 'node:net';
2+
import { Writable } from 'node:stream';
23

34
import { expect } from 'chai';
45
import * as sinon from 'sinon';
@@ -11,7 +12,9 @@ import {
1112
MongoClientAuthProviders,
1213
MongoDBCollectionNamespace,
1314
MongoNetworkTimeoutError,
15+
MongoRuntimeError,
1416
ns,
17+
promiseWithResolvers,
1518
SizedMessageTransform
1619
} from '../../mongodb';
1720
import * as mock from '../../tools/mongodb-mock/index';
@@ -333,5 +336,49 @@ describe('new Connection()', function () {
333336
expect(stream.read(1)).to.deep.equal(Buffer.from([6, 0, 0, 0, 5, 6]));
334337
expect(stream.read(1)).to.equal(null);
335338
});
339+
340+
it('parses many wire messages when chunk arrives', function () {
341+
const stream = new SizedMessageTransform({ connection: {} as any });
342+
343+
let dataCount = 0;
344+
stream.on('data', () => {
345+
dataCount += 1;
346+
});
347+
348+
// 3 messages of size 8
349+
stream.write(
350+
Buffer.from([
351+
...[8, 0, 0, 0, 0, 0, 0, 0],
352+
...[8, 0, 0, 0, 0, 0, 0, 0],
353+
...[8, 0, 0, 0, 0, 0, 0, 0]
354+
])
355+
);
356+
357+
expect(dataCount).to.equal(3);
358+
});
359+
360+
it('waits for a drain event when destination needs backpressure', async function () {
361+
const stream = new SizedMessageTransform({ connection: {} as any });
362+
const destination = new Writable({
363+
highWaterMark: 1,
364+
objectMode: true,
365+
write: (chunk, encoding, callback) => {
366+
void stream;
367+
setTimeout(1).then(() => callback());
368+
}
369+
});
370+
371+
// 1000 messages of size 8
372+
stream.write(
373+
Buffer.from(Array.from({ length: 1000 }, () => [8, 0, 0, 0, 0, 0, 0, 0]).flat(1))
374+
);
375+
376+
const { promise, resolve, reject } = promiseWithResolvers();
377+
378+
stream.on('error', reject).pipe(destination).on('error', reject).on('finish', resolve);
379+
380+
const error = await promise.catch(error => error);
381+
expect(error).to.be.instanceOf(MongoRuntimeError);
382+
});
336383
});
337384
});

0 commit comments

Comments
 (0)