Skip to content

packets: implemented compression protocol #649

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 37 commits into from
Closed
Show file tree
Hide file tree
Changes from 30 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
e6c682c
packets: implemented compression protocol
Jul 31, 2017
77f6792
packets: implemented compression protocol CR changes
Aug 16, 2017
a0cf94b
packets: implemented compression protocol: remove bytes.Reset for bac…
Aug 16, 2017
4cdff28
Merge branch 'master' of https://github.com/go-sql-driver/mysql
Oct 8, 2017
d0ea1a4
reading working
Aug 18, 2017
477c9f8
writerly changes
Aug 18, 2017
996ed2d
PR 649: adding compression (second code review)
Oct 8, 2017
f74faed
do not query max_allowed_packet by default (#680)
julienschmidt Oct 12, 2017
b3a093e
packets: do not call function on nulled value (#678)
julienschmidt Oct 16, 2017
5eaa5ff
ColumnType interfaces (#667)
julienschmidt Oct 17, 2017
ee46028
Add Aurora errno to rejectReadOnly check (#634)
jeffcharles Oct 17, 2017
93aed73
allow successful TravisCI runs in forks (#639)
jmhodges Oct 17, 2017
4f10ee5
Drop support for Go 1.6 and lower (#696)
julienschmidt Nov 12, 2017
59b0f90
Make gofmt happy (#704)
julienschmidt Nov 14, 2017
3fbf53a
Added support for custom string types in ConvertValue. (#623)
dsmontoya Nov 15, 2017
f9c6a2c
Implement NamedValueChecker for mysqlConn (#690)
pushrax Nov 16, 2017
6046bf0
Fix Valuers by returning driver.ErrSkip if couldn't convert type inte…
randomjunk Nov 16, 2017
385673a
statement: Fix conversion of Valuer (#710)
linxGnu Nov 17, 2017
9031984
Fixed imports for appengine/cloudsql (#700)
rrbrussell Nov 17, 2017
6992fad
Fix tls=true didn't work with host without port (#718)
methane Dec 4, 2017
386f84b
Differentiate between BINARY and CHAR (#724)
kwoodhouse93 Jan 10, 2018
f853432
Test with latest Go patch versions (#693)
AlekSi Jan 10, 2018
d1a8b86
Fix prepared statement (#734)
methane Jan 13, 2018
3167920
driver.ErrBadConn when init packet read fails (#736)
Jan 25, 2018
fb33a2c
packets: implemented compression protocol
Jul 31, 2017
f174605
packets: implemented compression protocol CR changes
Aug 16, 2017
dbd1e2b
third code review changes
Mar 23, 2018
3e12e32
PR 649: minor cleanup
Mar 23, 2018
17a06f1
Merge branch 'master' into master
methane Mar 26, 2018
60bdaec
Sort AUTHORS
methane Mar 26, 2018
422ab6f
Update dsn.go
methane Mar 26, 2018
26ea544
cr4 changes
Jul 23, 2018
f339392
Merge branch 'master' into cr4
Jul 23, 2018
3e559a8
saving work with SimpleReader present
Sep 12, 2018
6ceaef6
removed buf from mysqlConn
Sep 26, 2018
97afd8d
Merge pull request #1 from bLamarche413/cr4
Oct 8, 2018
f617170
removed comment
Oct 8, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions AUTHORS
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ Alexey Palazhchenko <alexey.palazhchenko at gmail.com>
Andrew Reid <andrew.reid at tixtrack.com>
Arne Hormann <arnehormann at gmail.com>
Asta Xie <xiemengjun at gmail.com>
B Lamarche <blam413 at gmail.com>
Bulat Gaifullin <gaifullinbf at gmail.com>
Carlos Nieto <jose.carlos at menteslibres.net>
Chris Moos <chris at tech9computers.com>
Expand Down
4 changes: 2 additions & 2 deletions benchmark_go18_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func benchmarkQueryContext(b *testing.B, db *sql.DB, p int) {
}

func BenchmarkQueryContext(b *testing.B) {
db := initDB(b,
db := initDB(b, false,
"DROP TABLE IF EXISTS foo",
"CREATE TABLE foo (id INT PRIMARY KEY, val CHAR(50))",
`INSERT INTO foo VALUES (1, "one")`,
Expand Down Expand Up @@ -78,7 +78,7 @@ func benchmarkExecContext(b *testing.B, db *sql.DB, p int) {
}

func BenchmarkExecContext(b *testing.B) {
db := initDB(b,
db := initDB(b, false,
"DROP TABLE IF EXISTS foo",
"CREATE TABLE foo (id INT PRIMARY KEY, val CHAR(50))",
`INSERT INTO foo VALUES (1, "one")`,
Expand Down
20 changes: 17 additions & 3 deletions benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,13 @@ func (tb *TB) checkStmt(stmt *sql.Stmt, err error) *sql.Stmt {
return stmt
}

func initDB(b *testing.B, queries ...string) *sql.DB {
func initDB(b *testing.B, useCompression bool, queries ...string) *sql.DB {
tb := (*TB)(b)
db := tb.checkDB(sql.Open("mysql", dsn))
comprStr := ""
if useCompression {
comprStr = "&compress=1"
}
db := tb.checkDB(sql.Open("mysql", dsn+comprStr))
for _, query := range queries {
if _, err := db.Exec(query); err != nil {
b.Fatalf("error on %q: %v", query, err)
Expand All @@ -57,10 +61,19 @@ func initDB(b *testing.B, queries ...string) *sql.DB {
const concurrencyLevel = 10

func BenchmarkQuery(b *testing.B) {
benchmarkQueryHelper(b, false)
}

func BenchmarkQueryCompression(b *testing.B) {
benchmarkQueryHelper(b, true)
}

func benchmarkQueryHelper(b *testing.B, compr bool) {

tb := (*TB)(b)
b.StopTimer()
b.ReportAllocs()
db := initDB(b,
db := initDB(b, compr,
"DROP TABLE IF EXISTS foo",
"CREATE TABLE foo (id INT PRIMARY KEY, val CHAR(50))",
`INSERT INTO foo VALUES (1, "one")`,
Expand Down Expand Up @@ -220,6 +233,7 @@ func BenchmarkInterpolation(b *testing.B) {
maxWriteSize: maxPacketSize - 1,
buf: newBuffer(nil),
}
mc.reader = &mc.buf

args := []driver.Value{
int64(42424242),
Expand Down
234 changes: 234 additions & 0 deletions compress.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,234 @@
package mysql

import (
"bytes"
"compress/zlib"
"io"
)

const (
minCompressLength = 50
)

type compressedReader struct {
buf packetReader
bytesBuf []byte
mc *mysqlConn
zr io.ReadCloser
}

type compressedWriter struct {
connWriter io.Writer
mc *mysqlConn
zw *zlib.Writer
}

func NewCompressedReader(buf packetReader, mc *mysqlConn) *compressedReader {
return &compressedReader{
buf: buf,
bytesBuf: make([]byte, 0),
mc: mc,
}
}

func NewCompressedWriter(connWriter io.Writer, mc *mysqlConn) *compressedWriter {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should these functions be public API?

Copy link
Member

@julienschmidt julienschmidt Mar 26, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@methane: nope. This should be unexported.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done!

return &compressedWriter{
connWriter: connWriter,
mc: mc,
zw: zlib.NewWriter(new(bytes.Buffer)),
Copy link
Member

@julienschmidt julienschmidt Oct 12, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it really need to introduce two extra buffers?
Would it make sense to combine the reader and writer (e.g. as a virtual buffer) and share some data, like the mc and the buffer?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It has been a while since I've looked at my own code so I may be wrong about this, but between compressedReader and compressedWriter, it appears the only thing they are sharing is a reference to mc? The reader uses a slice, but that slice needs to act as memory between reads so I don't think it would make sense to share it. Please let me know what you meant by this comment!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reader uses a slice, but that slice needs to act as memory between reads so I don't think it would make sense to share it

That answers my question. Currently we only have one buffer for each connection, which is shared between reads and writes.

}
}

func (cr *compressedReader) readNext(need int) ([]byte, error) {
for len(cr.bytesBuf) < need {
err := cr.uncompressPacket()
if err != nil {
return nil, err
}
}

data := cr.bytesBuf[:need]
cr.bytesBuf = cr.bytesBuf[need:]
return data, nil
}

func (cr *compressedReader) uncompressPacket() error {
header, err := cr.buf.readNext(7) // size of compressed header

if err != nil {
return err
}

// compressed header structure
comprLength := int(uint32(header[0]) | uint32(header[1])<<8 | uint32(header[2])<<16)
uncompressedLength := int(uint32(header[4]) | uint32(header[5])<<8 | uint32(header[6])<<16)
compressionSequence := uint8(header[3])

if compressionSequence != cr.mc.compressionSequence {
return ErrPktSync
}

cr.mc.compressionSequence++

comprData, err := cr.buf.readNext(comprLength)
if err != nil {
return err
}

// if payload is uncompressed, its length will be specified as zero, and its
// true length is contained in comprLength
if uncompressedLength == 0 {
cr.bytesBuf = append(cr.bytesBuf, comprData...)
return nil
}

// write comprData to a bytes.buffer, then read it using zlib into data
br := bytes.NewReader(comprData)

if cr.zr == nil {
cr.zr, err = zlib.NewReader(br)
} else {
err = cr.zr.(zlib.Resetter).Reset(br, nil)
}

if err != nil {
return err
}

defer cr.zr.Close()

// use existing capacity in bytesBuf if possible
offset := len(cr.bytesBuf)
if cap(cr.bytesBuf)-offset < uncompressedLength {
old := cr.bytesBuf
cr.bytesBuf = make([]byte, offset, offset+uncompressedLength)
copy(cr.bytesBuf, old)
}

data := cr.bytesBuf[offset : offset+uncompressedLength]

lenRead := 0

// http://grokbase.com/t/gg/golang-nuts/146y9ppn6b/go-nuts-stream-compression-with-compress-flate
for lenRead < uncompressedLength {
n, err := cr.zr.Read(data[lenRead:])
lenRead += n

if err == io.EOF {
if lenRead < uncompressedLength {
return io.ErrUnexpectedEOF
}
break
}

if err != nil {
return err
}
}

cr.bytesBuf = append(cr.bytesBuf, data...)

return nil
}

func (cw *compressedWriter) Write(data []byte) (int, error) {
// when asked to write an empty packet, do nothing
if len(data) == 0 {
return 0, nil
}

totalBytes := len(data)
length := len(data) - 4
maxPayloadLength := maxPacketSize - 4
blankHeader := make([]byte, 7)

for length >= maxPayloadLength {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why both of length := len(data) - 4 and maxPayloadLength := maxPacketSize - 4 required?
How about this?

maxPayload := maxPacketSize - 4

for len(data) > maxPayload {
...
    data = data[maxPayload:]
}

payload := data[:maxPayloadLength]
payloadLen := len(payload)

bytesBuf := &bytes.Buffer{}
bytesBuf.Write(blankHeader)
cw.zw.Reset(bytesBuf)
_, err := cw.zw.Write(payload)
if err != nil {
return 0, err
}
cw.zw.Close()

// if compression expands the payload, do not compress
compressedPayload := bytesBuf.Bytes()
if len(compressedPayload) > maxPayloadLength {
compressedPayload = append(blankHeader, payload...)
payloadLen = 0
}

err = cw.writeToNetwork(compressedPayload, payloadLen)

if err != nil {
return 0, err
}

length -= maxPayloadLength
data = data[maxPayloadLength:]
}

payloadLen := len(data)

// do not attempt compression if packet is too small
if payloadLen < minCompressLength {
err := cw.writeToNetwork(append(blankHeader, data...), 0)
if err != nil {
return 0, err
}
return totalBytes, nil
}

bytesBuf := &bytes.Buffer{}
bytesBuf.Write(blankHeader)
cw.zw.Reset(bytesBuf)
_, err := cw.zw.Write(data)
if err != nil {
return 0, err
}
cw.zw.Close()

compressedPayload := bytesBuf.Bytes()

if len(compressedPayload) > len(data) {
compressedPayload = append(blankHeader, data...)
payloadLen = 0
}

// add header and send over the wire
err = cw.writeToNetwork(compressedPayload, payloadLen)
if err != nil {
return 0, err
}

return totalBytes, nil

}

func (cw *compressedWriter) writeToNetwork(data []byte, uncomprLength int) error {

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

superfluous whitespace

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done!

comprLength := len(data) - 7

// compression header
data[0] = byte(0xff & comprLength)
data[1] = byte(0xff & (comprLength >> 8))
data[2] = byte(0xff & (comprLength >> 16))

data[3] = cw.mc.compressionSequence

// this value is never greater than maxPayloadLength
data[4] = byte(0xff & uncomprLength)
data[5] = byte(0xff & (uncomprLength >> 8))
data[6] = byte(0xff & (uncomprLength >> 16))

if _, err := cw.connWriter.Write(data); err != nil {
return err
}

cw.mc.compressionSequence++
return nil
}
Loading