Skip to content

Commit 0a0bc59

Browse files
committed
Merge commit '2dc47b30c819a1f195a0471bf7688411e3dbc5a1' to implement partial support for compressed protocol (reads compressed data sent by server).
2 parents 0e6f861 + 2dc47b3 commit 0a0bc59

File tree

10 files changed

+529
-219
lines changed

10 files changed

+529
-219
lines changed

Readme.md

+105-145
Large diffs are not rendered by default.

lib/ConnectionConfig.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ function ConnectionConfig(options) {
2222
this.insecureAuth = options.insecureAuth || false;
2323
this.supportBigNumbers = options.supportBigNumbers || false;
2424
this.bigNumberStrings = options.bigNumberStrings || false;
25+
this.compress = options.compress || false;
2526
this.dateStrings = options.dateStrings || false;
2627
this.debug = options.debug;
2728
this.trace = options.trace !== false;
@@ -98,7 +99,6 @@ ConnectionConfig.getCharsetNumber = function getCharsetNumber(charset) {
9899

99100
ConnectionConfig.getDefaultFlags = function getDefaultFlags(options) {
100101
var defaultFlags = [
101-
'-COMPRESS', // Compression protocol *NOT* supported
102102
'-CONNECT_ATTRS', // Does *NOT* send connection attributes in Protocol::HandshakeResponse41
103103
'+CONNECT_WITH_DB', // One can specify db on connect in Handshake Response Packet
104104
'+FOUND_ROWS', // Send found rows instead of affected rows

lib/protocol/BufferList.js

+9
Original file line numberDiff line numberDiff line change
@@ -23,3 +23,12 @@ BufferList.prototype.push = function push(buf) {
2323
this.bufs.push(buf);
2424
this.size += buf.length;
2525
};
26+
27+
BufferList.prototype.unshift = function unshift(buf) {
28+
if (!buf || !buf.length) {
29+
return;
30+
}
31+
32+
this.bufs.unshift(buf);
33+
this.size += buf.length;
34+
};
+31
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
module.exports = CompressedPacketHeader;
2+
function CompressedPacketHeader(options) {
3+
options = options || {};
4+
5+
this.length = options.length || 0;
6+
this.sequence = options.sequence || 0;
7+
this.size = options.size || 0;
8+
}
9+
10+
CompressedPacketHeader.fromBuffer = function fromBuffer(buffer) {
11+
var length = 0;
12+
var offset = 0;
13+
var sequence = 0;
14+
var size = 0;
15+
16+
for (var i = 0; i < 3; i++) {
17+
length |= buffer[offset++] << (i * 8);
18+
}
19+
20+
sequence = buffer[offset++];
21+
22+
for (var i = 0; i < 3; i++) {
23+
size |= buffer[offset++] << (i * 8);
24+
}
25+
26+
return new CompressedPacketHeader({
27+
length : length,
28+
sequence : sequence,
29+
size : size
30+
});
31+
};

lib/protocol/PacketWriter.js

+127-19
Original file line numberDiff line numberDiff line change
@@ -1,46 +1,86 @@
1-
var BIT_16 = Math.pow(2, 16);
2-
var BIT_24 = Math.pow(2, 24);
3-
var BUFFER_ALLOC_SIZE = Math.pow(2, 8);
1+
var BIT_16 = Math.pow(2, 16);
2+
var BIT_24 = Math.pow(2, 24);
3+
var BUFFER_ALLOC_SIZE = Math.pow(2, 8);
4+
var COMPRESSED_PACKET_HEADER_LENGTH = 7;
45
// The maximum precision JS Numbers can hold precisely
56
// Don't panic: Good enough to represent byte values up to 8192 TB
67
var IEEE_754_BINARY_64_PRECISION = Math.pow(2, 53);
8+
var PACKET_HEADER_LENGTH = 4;
79
var MAX_PACKET_LENGTH = Math.pow(2, 24) - 1;
8-
var Buffer = require('safe-buffer').Buffer;
10+
11+
var Buffer = require('safe-buffer').Buffer;
12+
var BufferList = require('./BufferList');
13+
var EventEmitter = require('events').EventEmitter;
14+
var Util = require('util');
15+
var Zlib = require('zlib');
916

1017
module.exports = PacketWriter;
1118
function PacketWriter() {
12-
this._buffer = null;
13-
this._offset = 0;
19+
this._buffer = null;
20+
this._deflateQueue = [];
21+
this._deflating = false;
22+
this._offset = 0;
23+
this._sync = false;
1424
}
25+
Util.inherits(PacketWriter, EventEmitter);
1526

16-
PacketWriter.prototype.toBuffer = function toBuffer(parser) {
27+
PacketWriter.prototype.finalize = function finalize(parser) {
1728
if (!this._buffer) {
1829
this._buffer = Buffer.alloc(0);
1930
this._offset = 0;
2031
}
2132

33+
var maxPacketLength = parser._compressed
34+
? MAX_PACKET_LENGTH - PACKET_HEADER_LENGTH
35+
: MAX_PACKET_LENGTH;
36+
2237
var buffer = this._buffer;
2338
var length = this._offset;
24-
var packets = Math.floor(length / MAX_PACKET_LENGTH) + 1;
25-
26-
this._buffer = Buffer.allocUnsafe(length + packets * 4);
27-
this._offset = 0;
39+
var packets = Math.floor(length / maxPacketLength) + 1;
2840

2941
for (var packet = 0; packet < packets; packet++) {
3042
var isLast = (packet + 1 === packets);
3143
var packetLength = (isLast)
32-
? length % MAX_PACKET_LENGTH
33-
: MAX_PACKET_LENGTH;
44+
? length % maxPacketLength
45+
: maxPacketLength;
3446

35-
var packetNumber = parser.incrementPacketNumber();
47+
var num = parser.incrementPacketNumber();
48+
var start = packet * maxPacketLength;
49+
var end = start + packetLength;
50+
var buf = this._toPacket(num, buffer.slice(start, end));
51+
52+
if (parser._compressed) {
53+
num = parser.incrementCompressedPacketNumber();
54+
55+
if (this._sync) {
56+
buf = this._toCompressedPacket(num, buf);
57+
} else {
58+
this._toCompressedPacketAsync(num, buf);
59+
buf = null;
60+
}
61+
}
62+
63+
if (buf) {
64+
this.emit('data', buf);
65+
}
66+
}
67+
};
3668

37-
this.writeUnsignedNumber(3, packetLength);
38-
this.writeUnsignedNumber(1, packetNumber);
69+
PacketWriter.prototype.toBuffer = function toBuffer(parser) {
70+
var bufs = new BufferList();
3971

40-
var start = packet * MAX_PACKET_LENGTH;
41-
var end = start + packetLength;
72+
this.on('data', function (data) {
73+
bufs.push(data);
74+
});
75+
76+
this._sync = true;
77+
this.finalize(parser);
4278

43-
this.writeBuffer(buffer.slice(start, end));
79+
this._buffer = Buffer.allocUnsafe(bufs.size);
80+
this._offset = 0;
81+
82+
while (bufs.size > 0) {
83+
this._offset += bufs.shift().copy(this._buffer, this._offset);
4484
}
4585

4686
return this._buffer;
@@ -209,3 +249,71 @@ PacketWriter.prototype._allocate = function _allocate(bytes) {
209249
this._buffer = Buffer.alloc(newSize);
210250
oldBuffer.copy(this._buffer);
211251
};
252+
253+
PacketWriter.prototype._deflateNextPacket = function _deflateNextPacket() {
254+
if (this._deflating) {
255+
return;
256+
}
257+
258+
var item = this._deflateQueue.shift();
259+
var buf = item[1];
260+
var num = item[0];
261+
var len = buf.length;
262+
var self = this;
263+
264+
this._deflating = true;
265+
Zlib.deflate(buf, function (err, data) {
266+
if (err) {
267+
self.emit('error', err);
268+
return;
269+
}
270+
271+
self._deflating = false;
272+
self.emit('data', self._toCompressedPacket(num, data, len));
273+
self._deflateNextPacket();
274+
});
275+
};
276+
277+
PacketWriter.prototype._toCompressedPacket = function _toCompressedPacket(num, buf, len) {
278+
var origBuffer = this._buffer;
279+
var origOffset = this._offset;
280+
281+
this._buffer = Buffer.allocUnsafe(buf.length + COMPRESSED_PACKET_HEADER_LENGTH);
282+
this._offset = 0;
283+
284+
this.writeUnsignedNumber(3, buf.length);
285+
this.writeUnsignedNumber(1, num);
286+
this.writeUnsignedNumber(3, (len || 0));
287+
this.writeBuffer(buf);
288+
289+
var packet = this._buffer;
290+
291+
this._buffer = origBuffer;
292+
this._offset = origOffset;
293+
294+
return packet;
295+
};
296+
297+
PacketWriter.prototype._toCompressedPacketAsync = function _toCompressedPacketAsync(num, buf) {
298+
this._deflateQueue.push(buf);
299+
this._deflateNextPacket();
300+
};
301+
302+
PacketWriter.prototype._toPacket = function _toPacket(num, buf) {
303+
var origBuffer = this._buffer;
304+
var origOffset = this._offset;
305+
306+
this._buffer = Buffer.allocUnsafe(buf.length + PACKET_HEADER_LENGTH);
307+
this._offset = 0;
308+
309+
this.writeUnsignedNumber(3, buf.length);
310+
this.writeUnsignedNumber(1, num);
311+
this.writeBuffer(buf);
312+
313+
var packet = this._buffer;
314+
315+
this._buffer = origBuffer;
316+
this._offset = origOffset;
317+
318+
return packet;
319+
};

0 commit comments

Comments
 (0)