Skip to content

Commit b977b5e

Browse files
authored
Merge branch 'master' into writeResultsetRefactor
2 parents 9db4d69 + 5399994 commit b977b5e

File tree

14 files changed

+243
-52
lines changed

14 files changed

+243
-52
lines changed

canal/canal.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -386,7 +386,7 @@ func (c *Canal) CheckBinlogRowImage(image string) error {
386386
// need to check MySQL binlog row image? full, minimal or noblob?
387387
// now only log
388388
if c.cfg.Flavor == mysql.MySQLFlavor {
389-
if res, err := c.Execute(`SHOW GLOBAL VARIABLES LIKE "binlog_row_image"`); err != nil {
389+
if res, err := c.Execute(`SHOW GLOBAL VARIABLES LIKE 'binlog_row_image'`); err != nil {
390390
return errors.Trace(err)
391391
} else {
392392
// MySQL has binlog row image from 5.6, so older will return empty
@@ -401,7 +401,7 @@ func (c *Canal) CheckBinlogRowImage(image string) error {
401401
}
402402

403403
func (c *Canal) checkBinlogRowFormat() error {
404-
res, err := c.Execute(`SHOW GLOBAL VARIABLES LIKE "binlog_format";`)
404+
res, err := c.Execute(`SHOW GLOBAL VARIABLES LIKE 'binlog_format';`)
405405
if err != nil {
406406
return errors.Trace(err)
407407
} else if f, _ := res.GetString(0, 1); f != "ROW" {

client/pool.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -466,11 +466,12 @@ func (pool *Pool) startNewConnections(count int) {
466466

467467
func (pool *Pool) ping(conn *Conn) error {
468468
deadline := time.Now().Add(100 * time.Millisecond)
469-
_ = conn.SetWriteDeadline(deadline)
470-
_ = conn.SetReadDeadline(deadline)
469+
_ = conn.SetDeadline(deadline)
471470
err := conn.Ping()
472471
if err != nil {
473472
pool.logFunc(`Pool: ping query fail: %s`, err.Error())
473+
} else {
474+
_ = conn.SetDeadline(time.Time{})
474475
}
475476
return err
476477
}

client/resp.go

+6-5
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,8 @@ func (c *Conn) handleOKPacket(data []byte) (*Result, error) {
5252
pos += 2
5353

5454
//todo:strict_mode, check warnings as error
55-
//Warnings := binary.LittleEndian.Uint16(data[pos:])
56-
//pos += 2
55+
r.Warnings = binary.LittleEndian.Uint16(data[pos:])
56+
pos += 2
5757
} else if c.capability&CLIENT_TRANSACTIONS > 0 {
5858
r.Status = binary.LittleEndian.Uint16(data[pos:])
5959
c.status = r.Status
@@ -254,6 +254,7 @@ func (c *Conn) readResultStreaming(binary bool, result *Result, perRowCb SelectP
254254
result.Status = okResult.Status
255255
result.AffectedRows = okResult.AffectedRows
256256
result.InsertId = okResult.InsertId
257+
result.Warnings = okResult.Warnings
257258
if result.Resultset == nil {
258259
result.Resultset = NewResultset(0)
259260
} else {
@@ -332,7 +333,7 @@ func (c *Conn) readResultColumns(result *Result) (err error) {
332333
// EOF Packet
333334
if c.isEOFPacket(data) {
334335
if c.capability&CLIENT_PROTOCOL_41 > 0 {
335-
//result.Warnings = binary.LittleEndian.Uint16(data[1:])
336+
result.Warnings = binary.LittleEndian.Uint16(data[1:])
336337
//todo add strict_mode, warning will be treat as error
337338
result.Status = binary.LittleEndian.Uint16(data[3:])
338339
c.status = result.Status
@@ -373,7 +374,7 @@ func (c *Conn) readResultRows(result *Result, isBinary bool) (err error) {
373374
// EOF Packet
374375
if c.isEOFPacket(data) {
375376
if c.capability&CLIENT_PROTOCOL_41 > 0 {
376-
//result.Warnings = binary.LittleEndian.Uint16(data[1:])
377+
result.Warnings = binary.LittleEndian.Uint16(data[1:])
377378
//todo add strict_mode, warning will be treat as error
378379
result.Status = binary.LittleEndian.Uint16(data[3:])
379380
c.status = result.Status
@@ -421,7 +422,7 @@ func (c *Conn) readResultRowsStreaming(result *Result, isBinary bool, perRowCb S
421422
// EOF Packet
422423
if c.isEOFPacket(data) {
423424
if c.capability&CLIENT_PROTOCOL_41 > 0 {
424-
// result.Warnings = binary.LittleEndian.Uint16(data[1:])
425+
result.Warnings = binary.LittleEndian.Uint16(data[1:])
425426
// todo add strict_mode, warning will be treat as error
426427
result.Status = binary.LittleEndian.Uint16(data[3:])
427428
c.status = result.Status

client/stmt.go

+9-3
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,9 @@ type Stmt struct {
1313
conn *Conn
1414
id uint32
1515

16-
params int
17-
columns int
16+
params int
17+
columns int
18+
warnings int
1819
}
1920

2021
func (s *Stmt) ParamNum() int {
@@ -25,6 +26,10 @@ func (s *Stmt) ColumnNum() int {
2526
return s.columns
2627
}
2728

29+
func (s *Stmt) WarningsNum() int {
30+
return s.warnings
31+
}
32+
2833
func (s *Stmt) Execute(args ...interface{}) (*Result, error) {
2934
if err := s.write(args...); err != nil {
3035
return nil, errors.Trace(err)
@@ -196,7 +201,8 @@ func (c *Conn) Prepare(query string) (*Stmt, error) {
196201
pos += 2
197202

198203
//warnings
199-
//warnings = binary.LittleEndian.Uint16(data[pos:])
204+
s.warnings = int(binary.LittleEndian.Uint16(data[pos:]))
205+
pos += 2
200206

201207
if s.params > 0 {
202208
if err := s.conn.readUntilEOF(); err != nil {

cmd/go-mysqlbinlog/main.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,10 @@ import (
99
"fmt"
1010
"os"
1111

12+
"github.com/pingcap/errors"
13+
1214
"github.com/go-mysql-org/go-mysql/mysql"
1315
"github.com/go-mysql-org/go-mysql/replication"
14-
"github.com/pingcap/errors"
1516
)
1617

1718
var host = flag.String("host", "127.0.0.1", "MySQL host")

mysql/mysql_test.go

+6
Original file line numberDiff line numberDiff line change
@@ -223,3 +223,9 @@ func (t *mysqlTestSuite) TestMysqlUUIDClone(c *check.C) {
223223
clone := us.Clone()
224224
c.Assert(clone.String(), check.Equals, "de278ad0-2106-11e4-9f8e-6edd0ca20947:1-2")
225225
}
226+
227+
func (t *mysqlTestSuite) TestMysqlEmptyDecode(c *check.C) {
228+
_, isNull, n := LengthEncodedInt(nil)
229+
c.Assert(isNull, check.IsTrue)
230+
c.Assert(n, check.Equals, 0)
231+
}

mysql/result.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
package mysql
22

33
type Result struct {
4-
Status uint16
4+
Status uint16
5+
Warnings uint16
56

67
InsertId uint64
78
AffectedRows uint64

mysql/util.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ func BFixedLengthInt(buf []byte) uint64 {
142142

143143
func LengthEncodedInt(b []byte) (num uint64, isNull bool, n int) {
144144
if len(b) == 0 {
145-
return 0, true, 1
145+
return 0, true, 0
146146
}
147147

148148
switch b[0] {

replication/row_event.go

+56-20
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
package replication
22

33
import (
4-
"bytes"
54
"encoding/binary"
65
"encoding/hex"
76
"fmt"
87
"io"
98
"strconv"
9+
"strings"
1010
"time"
1111

1212
"github.com/pingcap/errors"
@@ -1172,20 +1172,29 @@ func decodeString(data []byte, length int) (v string, n int) {
11721172
return
11731173
}
11741174

1175+
// ref: https://github.com/mysql/mysql-server/blob/a9b0c712de3509d8d08d3ba385d41a4df6348775/strings/decimal.c#L137
11751176
const digitsPerInteger int = 9
11761177

11771178
var compressedBytes = []int{0, 1, 1, 2, 2, 3, 3, 4, 4, 4}
11781179

11791180
func decodeDecimalDecompressValue(compIndx int, data []byte, mask uint8) (size int, value uint32) {
11801181
size = compressedBytes[compIndx]
1181-
databuff := make([]byte, size)
1182-
for i := 0; i < size; i++ {
1183-
databuff[i] = data[i] ^ mask
1182+
switch size {
1183+
case 0:
1184+
case 1:
1185+
value = uint32(data[0] ^ mask)
1186+
case 2:
1187+
value = uint32(data[1]^mask) | uint32(data[0]^mask)<<8
1188+
case 3:
1189+
value = uint32(data[2]^mask) | uint32(data[1]^mask)<<8 | uint32(data[0]^mask)<<16
1190+
case 4:
1191+
value = uint32(data[3]^mask) | uint32(data[2]^mask)<<8 | uint32(data[1]^mask)<<16 | uint32(data[0]^mask)<<24
11841192
}
1185-
value = uint32(BFixedLengthInt(databuff))
11861193
return
11871194
}
11881195

1196+
var zeros = [digitsPerInteger]byte{48, 48, 48, 48, 48, 48, 48, 48, 48}
1197+
11891198
func decodeDecimal(data []byte, precision int, decimals int, useDecimal bool) (interface{}, int, error) {
11901199
//see python mysql replication and https://github.com/jeremycole/mysql_binlog
11911200
integral := precision - decimals
@@ -1207,7 +1216,8 @@ func decodeDecimal(data []byte, precision int, decimals int, useDecimal bool) (i
12071216
// The sign is encoded in the high bit of the the byte
12081217
// But this bit can also be used in the value
12091218
value := uint32(data[0])
1210-
var res bytes.Buffer
1219+
var res strings.Builder
1220+
res.Grow(precision + 2)
12111221
var mask uint32 = 0
12121222
if value&0x80 == 0 {
12131223
mask = uint32((1 << 32) - 1)
@@ -1217,35 +1227,61 @@ func decodeDecimal(data []byte, precision int, decimals int, useDecimal bool) (i
12171227
//clear sign
12181228
data[0] ^= 0x80
12191229

1230+
zeroLeading := true
1231+
12201232
pos, value := decodeDecimalDecompressValue(compIntegral, data, uint8(mask))
1221-
res.WriteString(fmt.Sprintf("%d", value))
1233+
if value != 0 {
1234+
zeroLeading = false
1235+
res.WriteString(strconv.FormatUint(uint64(value), 10))
1236+
}
12221237

12231238
for i := 0; i < uncompIntegral; i++ {
12241239
value = binary.BigEndian.Uint32(data[pos:]) ^ mask
12251240
pos += 4
1226-
res.WriteString(fmt.Sprintf("%09d", value))
1241+
if zeroLeading {
1242+
if value != 0 {
1243+
zeroLeading = false
1244+
res.WriteString(strconv.FormatUint(uint64(value), 10))
1245+
}
1246+
} else {
1247+
toWrite := strconv.FormatUint(uint64(value), 10)
1248+
res.Write(zeros[:digitsPerInteger-len(toWrite)])
1249+
res.WriteString(toWrite)
1250+
}
12271251
}
12281252

1229-
res.WriteString(".")
1230-
1231-
for i := 0; i < uncompFractional; i++ {
1232-
value = binary.BigEndian.Uint32(data[pos:]) ^ mask
1233-
pos += 4
1234-
res.WriteString(fmt.Sprintf("%09d", value))
1253+
if zeroLeading {
1254+
res.WriteString("0")
12351255
}
12361256

1237-
if size, value := decodeDecimalDecompressValue(compFractional, data[pos:], uint8(mask)); size > 0 {
1238-
res.WriteString(fmt.Sprintf("%0*d", compFractional, value))
1239-
pos += size
1257+
if pos < len(data) {
1258+
res.WriteString(".")
1259+
1260+
for i := 0; i < uncompFractional; i++ {
1261+
value = binary.BigEndian.Uint32(data[pos:]) ^ mask
1262+
pos += 4
1263+
toWrite := strconv.FormatUint(uint64(value), 10)
1264+
res.Write(zeros[:digitsPerInteger-len(toWrite)])
1265+
res.WriteString(toWrite)
1266+
}
1267+
1268+
if size, value := decodeDecimalDecompressValue(compFractional, data[pos:], uint8(mask)); size > 0 {
1269+
toWrite := strconv.FormatUint(uint64(value), 10)
1270+
padding := compFractional - len(toWrite)
1271+
if padding > 0 {
1272+
res.Write(zeros[:padding])
1273+
}
1274+
res.WriteString(toWrite)
1275+
pos += size
1276+
}
12401277
}
12411278

12421279
if useDecimal {
1243-
f, err := decimal.NewFromString(hack.String(res.Bytes()))
1280+
f, err := decimal.NewFromString(res.String())
12441281
return f, pos, err
12451282
}
12461283

1247-
f, err := strconv.ParseFloat(hack.String(res.Bytes()), 64)
1248-
return f, pos, err
1284+
return res.String(), pos, nil
12491285
}
12501286

12511287
func decodeBit(data []byte, nbits int, length int) (value int64, err error) {

0 commit comments

Comments
 (0)