Skip to content

Commit 06a439e

Browse files
authored
Merge branch 'master' into selectStreaming
2 parents 1ba17ce + 7fd0cb3 commit 06a439e

28 files changed

+685
-83
lines changed

canal/canal.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -386,7 +386,7 @@ func (c *Canal) CheckBinlogRowImage(image string) error {
386386
// need to check MySQL binlog row image? full, minimal or noblob?
387387
// now only log
388388
if c.cfg.Flavor == mysql.MySQLFlavor {
389-
if res, err := c.Execute(`SHOW GLOBAL VARIABLES LIKE "binlog_row_image"`); err != nil {
389+
if res, err := c.Execute(`SHOW GLOBAL VARIABLES LIKE 'binlog_row_image'`); err != nil {
390390
return errors.Trace(err)
391391
} else {
392392
// MySQL has binlog row image from 5.6, so older will return empty
@@ -401,7 +401,7 @@ func (c *Canal) CheckBinlogRowImage(image string) error {
401401
}
402402

403403
func (c *Canal) checkBinlogRowFormat() error {
404-
res, err := c.Execute(`SHOW GLOBAL VARIABLES LIKE "binlog_format";`)
404+
res, err := c.Execute(`SHOW GLOBAL VARIABLES LIKE 'binlog_format';`)
405405
if err != nil {
406406
return errors.Trace(err)
407407
} else if f, _ := res.GetString(0, 1); f != "ROW" {

canal/sync.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -223,7 +223,7 @@ func parseStmt(stmt ast.StmtNode) (ns []*node) {
223223
case *ast.TruncateTableStmt:
224224
n := &node{
225225
db: t.Table.Schema.String(),
226-
table: t.Table.Schema.String(),
226+
table: t.Table.Name.String(),
227227
}
228228
ns = []*node{n}
229229
}

client/auth.go

+11-2
Original file line numberDiff line numberDiff line change
@@ -140,9 +140,18 @@ func (c *Conn) writeAuthHandshake() error {
140140
if !authPluginAllowed(c.authPluginName) {
141141
return fmt.Errorf("unknow auth plugin name '%s'", c.authPluginName)
142142
}
143-
// Adjust client capability flags based on server support
143+
144+
// Set default client capabilities that reflect the abilities of this library
144145
capability := CLIENT_PROTOCOL_41 | CLIENT_SECURE_CONNECTION |
145-
CLIENT_LONG_PASSWORD | CLIENT_TRANSACTIONS | CLIENT_PLUGIN_AUTH | c.capability&CLIENT_LONG_FLAG
146+
CLIENT_LONG_PASSWORD | CLIENT_TRANSACTIONS | CLIENT_PLUGIN_AUTH
147+
// Adjust client capability flags based on server support
148+
capability |= c.capability & CLIENT_LONG_FLAG
149+
// Adjust client capability flags on specific client requests
150+
// Only flags that would make any sense setting and aren't handled elsewhere
151+
// in the library are supported here
152+
capability |= c.ccaps&CLIENT_FOUND_ROWS | c.ccaps&CLIENT_IGNORE_SPACE |
153+
c.ccaps&CLIENT_MULTI_STATEMENTS | c.ccaps&CLIENT_MULTI_RESULTS |
154+
c.ccaps&CLIENT_PS_MULTI_RESULTS | c.ccaps&CLIENT_CONNECT_ATTRS
146155

147156
// To enable TLS / SSL
148157
if c.tlsConfig != nil {

client/client_test.go

+45
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package client
22

33
import (
4+
"encoding/json"
45
"flag"
56
"fmt"
67
"strings"
@@ -82,6 +83,7 @@ func (s *clientTestSuite) testConn_CreateTable(c *C) {
8283
e enum("test1", "test2"),
8384
u tinyint unsigned,
8485
i tinyint,
86+
j json,
8587
PRIMARY KEY (id)
8688
) ENGINE=InnoDB DEFAULT CHARSET=utf8`
8789

@@ -94,6 +96,41 @@ func (s *clientTestSuite) TestConn_Ping(c *C) {
9496
c.Assert(err, IsNil)
9597
}
9698

99+
func (s *clientTestSuite) TestConn_SetCapability(c *C) {
100+
caps := []uint32{
101+
mysql.CLIENT_LONG_PASSWORD,
102+
mysql.CLIENT_FOUND_ROWS,
103+
mysql.CLIENT_LONG_FLAG,
104+
mysql.CLIENT_CONNECT_WITH_DB,
105+
mysql.CLIENT_NO_SCHEMA,
106+
mysql.CLIENT_COMPRESS,
107+
mysql.CLIENT_ODBC,
108+
mysql.CLIENT_LOCAL_FILES,
109+
mysql.CLIENT_IGNORE_SPACE,
110+
mysql.CLIENT_PROTOCOL_41,
111+
mysql.CLIENT_INTERACTIVE,
112+
mysql.CLIENT_SSL,
113+
mysql.CLIENT_IGNORE_SIGPIPE,
114+
mysql.CLIENT_TRANSACTIONS,
115+
mysql.CLIENT_RESERVED,
116+
mysql.CLIENT_SECURE_CONNECTION,
117+
mysql.CLIENT_MULTI_STATEMENTS,
118+
mysql.CLIENT_MULTI_RESULTS,
119+
mysql.CLIENT_PS_MULTI_RESULTS,
120+
mysql.CLIENT_PLUGIN_AUTH,
121+
mysql.CLIENT_CONNECT_ATTRS,
122+
mysql.CLIENT_PLUGIN_AUTH_LENENC_CLIENT_DATA,
123+
}
124+
125+
for _, cap := range caps {
126+
c.Assert(s.c.ccaps&cap > 0, IsFalse)
127+
s.c.SetCapability(cap)
128+
c.Assert(s.c.ccaps&cap > 0, IsTrue)
129+
s.c.UnsetCapability(cap)
130+
c.Assert(s.c.ccaps&cap > 0, IsFalse)
131+
}
132+
}
133+
97134
// NOTE for MySQL 5.5 and 5.6, server side has to config SSL to pass the TLS test, otherwise, it will throw error that
98135
// MySQL server does not support TLS required by the client. However, for MySQL 5.7 and above, auto generated certificates
99136
// are used by default so that manual config is no longer necessary.
@@ -149,6 +186,14 @@ func (s *clientTestSuite) TestConn_Insert(c *C) {
149186
c.Assert(pkg.AffectedRows, Equals, uint64(1))
150187
}
151188

189+
func (s *clientTestSuite) TestConn_Insert2(c *C) {
190+
str := `insert into mixer_test_conn (id, j) values(?, ?)`
191+
j := json.RawMessage(`[]`)
192+
pkg, err := s.c.Execute(str, []interface{}{2, j}...)
193+
c.Assert(err, IsNil)
194+
c.Assert(pkg.AffectedRows, Equals, uint64(1))
195+
}
196+
152197
func (s *clientTestSuite) TestConn_Select(c *C) {
153198
str := `select str, f, e from mixer_test_conn where id = 1`
154199

client/conn.go

+13
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,10 @@ type Conn struct {
2121
tlsConfig *tls.Config
2222
proto string
2323

24+
// server capabilities
2425
capability uint32
26+
// client-set capabilities only
27+
ccaps uint32
2528

2629
status uint16
2730

@@ -123,6 +126,16 @@ func (c *Conn) Ping() error {
123126
return nil
124127
}
125128

129+
// SetCapability enables the use of a specific capability
130+
func (c *Conn) SetCapability(cap uint32) {
131+
c.ccaps |= cap
132+
}
133+
134+
// UnsetCapability disables the use of a specific capability
135+
func (c *Conn) UnsetCapability(cap uint32) {
136+
c.ccaps &= ^cap
137+
}
138+
126139
// UseSSL: use default SSL
127140
// pass to options when connect
128141
func (c *Conn) UseSSL(insecureSkipVerify bool) {

client/pool.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -466,11 +466,12 @@ func (pool *Pool) startNewConnections(count int) {
466466

467467
func (pool *Pool) ping(conn *Conn) error {
468468
deadline := time.Now().Add(100 * time.Millisecond)
469-
_ = conn.SetWriteDeadline(deadline)
470-
_ = conn.SetReadDeadline(deadline)
469+
_ = conn.SetDeadline(deadline)
471470
err := conn.Ping()
472471
if err != nil {
473472
pool.logFunc(`Pool: ping query fail: %s`, err.Error())
473+
} else {
474+
_ = conn.SetDeadline(time.Time{})
474475
}
475476
return err
476477
}

client/resp.go

+6-5
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,8 @@ func (c *Conn) handleOKPacket(data []byte) (*Result, error) {
5252
pos += 2
5353

5454
//todo:strict_mode, check warnings as error
55-
//Warnings := binary.LittleEndian.Uint16(data[pos:])
56-
//pos += 2
55+
r.Warnings = binary.LittleEndian.Uint16(data[pos:])
56+
pos += 2
5757
} else if c.capability&CLIENT_TRANSACTIONS > 0 {
5858
r.Status = binary.LittleEndian.Uint16(data[pos:])
5959
c.status = r.Status
@@ -254,6 +254,7 @@ func (c *Conn) readResultStreaming(binary bool, result *Result, perRowCb SelectP
254254
result.Status = okResult.Status
255255
result.AffectedRows = okResult.AffectedRows
256256
result.InsertId = okResult.InsertId
257+
result.Warnings = okResult.Warnings
257258
if result.Resultset == nil {
258259
result.Resultset = NewResultset(0)
259260
} else {
@@ -344,7 +345,7 @@ func (c *Conn) readResultColumns(result *Result) (err error) {
344345
// EOF Packet
345346
if c.isEOFPacket(data) {
346347
if c.capability&CLIENT_PROTOCOL_41 > 0 {
347-
//result.Warnings = binary.LittleEndian.Uint16(data[1:])
348+
result.Warnings = binary.LittleEndian.Uint16(data[1:])
348349
//todo add strict_mode, warning will be treat as error
349350
result.Status = binary.LittleEndian.Uint16(data[3:])
350351
c.status = result.Status
@@ -385,7 +386,7 @@ func (c *Conn) readResultRows(result *Result, isBinary bool) (err error) {
385386
// EOF Packet
386387
if c.isEOFPacket(data) {
387388
if c.capability&CLIENT_PROTOCOL_41 > 0 {
388-
//result.Warnings = binary.LittleEndian.Uint16(data[1:])
389+
result.Warnings = binary.LittleEndian.Uint16(data[1:])
389390
//todo add strict_mode, warning will be treat as error
390391
result.Status = binary.LittleEndian.Uint16(data[3:])
391392
c.status = result.Status
@@ -433,7 +434,7 @@ func (c *Conn) readResultRowsStreaming(result *Result, isBinary bool, perRowCb S
433434
// EOF Packet
434435
if c.isEOFPacket(data) {
435436
if c.capability&CLIENT_PROTOCOL_41 > 0 {
436-
// result.Warnings = binary.LittleEndian.Uint16(data[1:])
437+
result.Warnings = binary.LittleEndian.Uint16(data[1:])
437438
// todo add strict_mode, warning will be treat as error
438439
result.Status = binary.LittleEndian.Uint16(data[3:])
439440
c.status = result.Status

client/stmt.go

+13-3
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package client
22

33
import (
44
"encoding/binary"
5+
"encoding/json"
56
"fmt"
67
"math"
78

@@ -13,8 +14,9 @@ type Stmt struct {
1314
conn *Conn
1415
id uint32
1516

16-
params int
17-
columns int
17+
params int
18+
columns int
19+
warnings int
1820
}
1921

2022
func (s *Stmt) ParamNum() int {
@@ -25,6 +27,10 @@ func (s *Stmt) ColumnNum() int {
2527
return s.columns
2628
}
2729

30+
func (s *Stmt) WarningsNum() int {
31+
return s.warnings
32+
}
33+
2834
func (s *Stmt) Execute(args ...interface{}) (*Result, error) {
2935
if err := s.write(args...); err != nil {
3036
return nil, errors.Trace(err)
@@ -130,6 +136,9 @@ func (s *Stmt) write(args ...interface{}) error {
130136
case []byte:
131137
paramTypes[i<<1] = MYSQL_TYPE_STRING
132138
paramValues[i] = append(PutLengthEncodedInt(uint64(len(v))), v...)
139+
case json.RawMessage:
140+
paramTypes[i<<1] = MYSQL_TYPE_STRING
141+
paramValues[i] = append(PutLengthEncodedInt(uint64(len(v))), v...)
133142
default:
134143
return fmt.Errorf("invalid argument type %T", args[i])
135144
}
@@ -204,7 +213,8 @@ func (c *Conn) Prepare(query string) (*Stmt, error) {
204213
pos += 2
205214

206215
//warnings
207-
//warnings = binary.LittleEndian.Uint16(data[pos:])
216+
s.warnings = int(binary.LittleEndian.Uint16(data[pos:]))
217+
pos += 2
208218

209219
if s.params > 0 {
210220
if err := s.conn.readUntilEOF(); err != nil {

cmd/go-mysqlbinlog/main.go

+24-5
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,10 @@ import (
99
"fmt"
1010
"os"
1111

12+
"github.com/pingcap/errors"
13+
1214
"github.com/go-mysql-org/go-mysql/mysql"
1315
"github.com/go-mysql-org/go-mysql/replication"
14-
"github.com/pingcap/errors"
1516
)
1617

1718
var host = flag.String("host", "127.0.0.1", "MySQL host")
@@ -23,6 +24,7 @@ var flavor = flag.String("flavor", "mysql", "Flavor: mysql or mariadb")
2324

2425
var file = flag.String("file", "", "Binlog filename")
2526
var pos = flag.Int("pos", 4, "Binlog position")
27+
var gtid = flag.String("gtid", "", "Binlog GTID set that this slave has executed")
2628

2729
var semiSync = flag.Bool("semisync", false, "Support semi sync")
2830
var backupPath = flag.String("backup_path", "", "backup path to store binlog files")
@@ -56,10 +58,27 @@ func main() {
5658
return
5759
}
5860
} else {
59-
s, err := b.StartSync(pos)
60-
if err != nil {
61-
fmt.Printf("Start sync error: %v\n", errors.ErrorStack(err))
62-
return
61+
var (
62+
s *replication.BinlogStreamer
63+
err error
64+
)
65+
if len(*gtid) > 0 {
66+
gset, err := mysql.ParseGTIDSet(*flavor, *gtid)
67+
if err != nil {
68+
fmt.Printf("Failed to parse gtid %s with flavor %s, error: %v\n",
69+
*gtid, *flavor, errors.ErrorStack(err))
70+
}
71+
s, err = b.StartSyncGTID(gset)
72+
if err != nil {
73+
fmt.Printf("Start sync by GTID error: %v\n", errors.ErrorStack(err))
74+
return
75+
}
76+
} else {
77+
s, err = b.StartSync(pos)
78+
if err != nil {
79+
fmt.Printf("Start sync error: %v\n", errors.ErrorStack(err))
80+
return
81+
}
6382
}
6483

6584
for {

dump/dumper.go

+5-1
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,9 @@ func (d *Dumper) Dump(w io.Writer) error {
164164
}
165165

166166
args = append(args, fmt.Sprintf("--user=%s", d.User))
167-
args = append(args, fmt.Sprintf("--password=%s", d.Password))
167+
passwordArg := fmt.Sprintf("--password=%s", d.Password)
168+
args = append(args, passwordArg)
169+
passwordArgIndex := len(args) - 1
168170

169171
if !d.masterDataSkipped {
170172
args = append(args, "--master-data")
@@ -238,7 +240,9 @@ func (d *Dumper) Dump(w io.Writer) error {
238240
}
239241
}
240242

243+
args[passwordArgIndex] = "--password=******"
241244
log.Infof("exec mysqldump with %v", args)
245+
args[passwordArgIndex] = passwordArg
242246
cmd := exec.Command(d.ExecutionPath, args...)
243247

244248
cmd.Stderr = d.ErrOut

mysql/mysql_test.go

+6
Original file line numberDiff line numberDiff line change
@@ -223,3 +223,9 @@ func (t *mysqlTestSuite) TestMysqlUUIDClone(c *check.C) {
223223
clone := us.Clone()
224224
c.Assert(clone.String(), check.Equals, "de278ad0-2106-11e4-9f8e-6edd0ca20947:1-2")
225225
}
226+
227+
func (t *mysqlTestSuite) TestMysqlEmptyDecode(c *check.C) {
228+
_, isNull, n := LengthEncodedInt(nil)
229+
c.Assert(isNull, check.IsTrue)
230+
c.Assert(n, check.Equals, 0)
231+
}

mysql/result.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
package mysql
22

33
type Result struct {
4-
Status uint16
4+
Status uint16
5+
Warnings uint16
56

67
InsertId uint64
78
AffectedRows uint64

mysql/rowdata.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ func (p RowData) ParseBinary(f []*Field, dst []FieldValue) ([]FieldValue, error)
177177
case MYSQL_TYPE_DECIMAL, MYSQL_TYPE_NEWDECIMAL, MYSQL_TYPE_VARCHAR,
178178
MYSQL_TYPE_BIT, MYSQL_TYPE_ENUM, MYSQL_TYPE_SET, MYSQL_TYPE_TINY_BLOB,
179179
MYSQL_TYPE_MEDIUM_BLOB, MYSQL_TYPE_LONG_BLOB, MYSQL_TYPE_BLOB,
180-
MYSQL_TYPE_VAR_STRING, MYSQL_TYPE_STRING, MYSQL_TYPE_GEOMETRY:
180+
MYSQL_TYPE_VAR_STRING, MYSQL_TYPE_STRING, MYSQL_TYPE_GEOMETRY, MYSQL_TYPE_JSON:
181181
v, isNull, n, err = LengthEncodedString(p[pos:])
182182
pos += n
183183
if err != nil {

mysql/util.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ func BFixedLengthInt(buf []byte) uint64 {
142142

143143
func LengthEncodedInt(b []byte) (num uint64, isNull bool, n int) {
144144
if len(b) == 0 {
145-
return 0, true, 1
145+
return 0, true, 0
146146
}
147147

148148
switch b[0] {

replication/const.go

+8
Original file line numberDiff line numberDiff line change
@@ -217,3 +217,11 @@ const (
217217
TABLE_MAP_OPT_META_ENUM_AND_SET_DEFAULT_CHARSET
218218
TABLE_MAP_OPT_META_ENUM_AND_SET_COLUMN_CHARSET
219219
)
220+
221+
type IntVarEventType byte
222+
223+
const (
224+
INVALID IntVarEventType = iota
225+
LAST_INSERT_ID
226+
INSERT_ID
227+
)

0 commit comments

Comments
 (0)