Skip to content

packets: implemented compression protocol #649

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 37 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
e6c682c
packets: implemented compression protocol
Jul 31, 2017
77f6792
packets: implemented compression protocol CR changes
Aug 16, 2017
a0cf94b
packets: implemented compression protocol: remove bytes.Reset for bac…
Aug 16, 2017
4cdff28
Merge branch 'master' of https://github.com/go-sql-driver/mysql
Oct 8, 2017
d0ea1a4
reading working
Aug 18, 2017
477c9f8
writerly changes
Aug 18, 2017
996ed2d
PR 649: adding compression (second code review)
Oct 8, 2017
f74faed
do not query max_allowed_packet by default (#680)
julienschmidt Oct 12, 2017
b3a093e
packets: do not call function on nulled value (#678)
julienschmidt Oct 16, 2017
5eaa5ff
ColumnType interfaces (#667)
julienschmidt Oct 17, 2017
ee46028
Add Aurora errno to rejectReadOnly check (#634)
jeffcharles Oct 17, 2017
93aed73
allow successful TravisCI runs in forks (#639)
jmhodges Oct 17, 2017
4f10ee5
Drop support for Go 1.6 and lower (#696)
julienschmidt Nov 12, 2017
59b0f90
Make gofmt happy (#704)
julienschmidt Nov 14, 2017
3fbf53a
Added support for custom string types in ConvertValue. (#623)
dsmontoya Nov 15, 2017
f9c6a2c
Implement NamedValueChecker for mysqlConn (#690)
pushrax Nov 16, 2017
6046bf0
Fix Valuers by returning driver.ErrSkip if couldn't convert type inte…
randomjunk Nov 16, 2017
385673a
statement: Fix conversion of Valuer (#710)
linxGnu Nov 17, 2017
9031984
Fixed imports for appengine/cloudsql (#700)
rrbrussell Nov 17, 2017
6992fad
Fix tls=true didn't work with host without port (#718)
methane Dec 4, 2017
386f84b
Differentiate between BINARY and CHAR (#724)
kwoodhouse93 Jan 10, 2018
f853432
Test with latest Go patch versions (#693)
AlekSi Jan 10, 2018
d1a8b86
Fix prepared statement (#734)
methane Jan 13, 2018
3167920
driver.ErrBadConn when init packet read fails (#736)
Jan 25, 2018
fb33a2c
packets: implemented compression protocol
Jul 31, 2017
f174605
packets: implemented compression protocol CR changes
Aug 16, 2017
dbd1e2b
third code review changes
Mar 23, 2018
3e12e32
PR 649: minor cleanup
Mar 23, 2018
17a06f1
Merge branch 'master' into master
methane Mar 26, 2018
60bdaec
Sort AUTHORS
methane Mar 26, 2018
422ab6f
Update dsn.go
methane Mar 26, 2018
26ea544
cr4 changes
Jul 23, 2018
f339392
Merge branch 'master' into cr4
Jul 23, 2018
3e559a8
saving work with SimpleReader present
Sep 12, 2018
6ceaef6
removed buf from mysqlConn
Sep 26, 2018
97afd8d
Merge pull request #1 from bLamarche413/cr4
Oct 8, 2018
f617170
removed comment
Oct 8, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions AUTHORS
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ Aaron Hopkins <go-sql-driver at die.net>
Achille Roussel <achille.roussel at gmail.com>
Arne Hormann <arnehormann at gmail.com>
Asta Xie <xiemengjun at gmail.com>
B Lamarche <blam413 at gmail.com>
Bulat Gaifullin <gaifullinbf at gmail.com>
Carlos Nieto <jose.carlos at menteslibres.net>
Chris Moos <chris at tech9computers.com>
Expand Down
1 change: 1 addition & 0 deletions benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ func BenchmarkInterpolation(b *testing.B) {
maxWriteSize: maxPacketSize - 1,
buf: newBuffer(nil),
}
mc.reader = &mc.buf

args := []driver.Value{
int64(42424242),
Expand Down
217 changes: 217 additions & 0 deletions compress.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
package mysql

import (
"bytes"
"compress/zlib"
"io"
)

const (
minCompressLength = 50
)

type packetReader interface {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is used also when no compression is used. It is probably better to move this interface to connection.go

readNext(need int) ([]byte, error)
}

type compressedReader struct {
buf packetReader
bytesBuf []byte
mc *mysqlConn
}

type compressedWriter struct {
connWriter io.Writer
mc *mysqlConn
}

func NewCompressedReader(buf packetReader, mc *mysqlConn) *compressedReader {
return &compressedReader{
buf: buf,
bytesBuf: make([]byte, 0),
mc: mc,
}
}

func NewCompressedWriter(connWriter io.Writer, mc *mysqlConn) *compressedWriter {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should these functions be public API?

Copy link
Member

@julienschmidt julienschmidt Mar 26, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@methane: nope. This should be unexported.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done!

return &compressedWriter{
connWriter: connWriter,
mc: mc,
}
}

func (cr *compressedReader) readNext(need int) ([]byte, error) {
for len(cr.bytesBuf) < need {
err := cr.uncompressPacket()
if err != nil {
return nil, err
}
}

data := make([]byte, need)

copy(data, cr.bytesBuf[:len(data)])

cr.bytesBuf = cr.bytesBuf[len(data):]

return data, nil
}

func (cr *compressedReader) uncompressPacket() error {
header, err := cr.buf.readNext(7) // size of compressed header

if err != nil {
return err
}

// compressed header structure
comprLength := int(uint32(header[0]) | uint32(header[1])<<8 | uint32(header[2])<<16)
uncompressedLength := int(uint32(header[4]) | uint32(header[5])<<8 | uint32(header[6])<<16)
compressionSequence := uint8(header[3])

if compressionSequence != cr.mc.compressionSequence {
return ErrPktSync
}

cr.mc.compressionSequence++

comprData, err := cr.buf.readNext(comprLength)
if err != nil {
return err
}

// if payload is uncompressed, its length will be specified as zero, and its
// true length is contained in comprLength
if uncompressedLength == 0 {
cr.bytesBuf = append(cr.bytesBuf, comprData...)
return nil
}

// write comprData to a bytes.buffer, then read it using zlib into data
var b bytes.Buffer
b.Write(comprData)
r, err := zlib.NewReader(&b)

if r != nil {
defer r.Close()
}

if err != nil {
return err
}

data := make([]byte, uncompressedLength)
lenRead := 0

// http://grokbase.com/t/gg/golang-nuts/146y9ppn6b/go-nuts-stream-compression-with-compress-flate
for lenRead < uncompressedLength {

tmp := data[lenRead:]

n, err := r.Read(tmp)
lenRead += n

if err == io.EOF {
if lenRead < uncompressedLength {
return io.ErrUnexpectedEOF
}
break
}

if err != nil {
return err
}
}

cr.bytesBuf = append(cr.bytesBuf, data...)

return nil
}

func (cw *compressedWriter) Write(data []byte) (int, error) {
// when asked to write an empty packet, do nothing
if len(data) == 0 {
return 0, nil
}
totalBytes := len(data)

length := len(data) - 4

maxPayloadLength := maxPacketSize - 4

for length >= maxPayloadLength {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why both of length := len(data) - 4 and maxPayloadLength := maxPacketSize - 4 required?
How about this?

maxPayload := maxPacketSize - 4

for len(data) > maxPayload {
...
    data = data[maxPayload:]
}

// cut off a slice of size max payload length
dataSmall := data[:maxPayloadLength]
lenSmall := len(dataSmall)

var b bytes.Buffer
writer := zlib.NewWriter(&b)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bytes.Buffer and zlib's writer are resettable.
At least, move them before loop.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can cache those on the compressedWriter and reset both. This is a more significant gain that simply declaring them before the loop, as in a majority of use cases the loop is not actually hit.

_, err := writer.Write(dataSmall)
writer.Close()
if err != nil {
return 0, err
}

err = cw.writeComprPacketToNetwork(b.Bytes(), lenSmall)
if err != nil {
return 0, err
}

length -= maxPayloadLength
data = data[maxPayloadLength:]
}

lenSmall := len(data)

// do not compress if packet is too small
if lenSmall < minCompressLength {
err := cw.writeComprPacketToNetwork(data, 0)
if err != nil {
return 0, err
}

return totalBytes, nil
}

var b bytes.Buffer
writer := zlib.NewWriter(&b)

_, err := writer.Write(data)
writer.Close()

if err != nil {
return 0, err
}

err = cw.writeComprPacketToNetwork(b.Bytes(), lenSmall)

if err != nil {
return 0, err
}
return totalBytes, nil
}

func (cw *compressedWriter) writeComprPacketToNetwork(data []byte, uncomprLength int) error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This name seems bit long. writeCompressedPacket() may be better name.
payload is better than data too.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll change data to payload. As for compressedWriter's writeComprPacketToNetwork, I was thinking of changing its name to writeToNetwork for readability and brevity reasons. Having compressedWriter have both a Write and a WriteCompressedPacket function would not clarify the difference between the two functions -- namely, that it is writeComprPacketToNetwork does the actual network write, whereas Write is the top level function that does all the needed actions.

data = append([]byte{0, 0, 0, 0, 0, 0, 0}, data...)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

superfluous whitespace

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done!

comprLength := len(data) - 7

// compression header
data[0] = byte(0xff & comprLength)
data[1] = byte(0xff & (comprLength >> 8))
data[2] = byte(0xff & (comprLength >> 16))

data[3] = cw.mc.compressionSequence

//this value is never greater than maxPayloadLength
data[4] = byte(0xff & uncomprLength)
data[5] = byte(0xff & (uncomprLength >> 8))
data[6] = byte(0xff & (uncomprLength >> 16))

if _, err := cw.connWriter.Write(data); err != nil {
return err
}

cw.mc.compressionSequence++
return nil
}
Loading