Skip to content

Commit 2f69868

Browse files
authored
Merge branch 'master' into reuse-buf
2 parents d7f3683 + 423b04c commit 2f69868

27 files changed

+522
-47
lines changed

README.md

+2-2
Original file line numberDiff line numberDiff line change
@@ -237,8 +237,8 @@ err := conn.ExecuteSelectStreaming(`select id, name from table LIMIT 100500`, &r
237237
// Copy it if you need.
238238
// ...
239239
}
240-
return false, nil
241-
})
240+
return nil
241+
}, nil)
242242

243243
// ...
244244
```

canal/canal.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -386,7 +386,7 @@ func (c *Canal) CheckBinlogRowImage(image string) error {
386386
// need to check MySQL binlog row image? full, minimal or noblob?
387387
// now only log
388388
if c.cfg.Flavor == mysql.MySQLFlavor {
389-
if res, err := c.Execute(`SHOW GLOBAL VARIABLES LIKE "binlog_row_image"`); err != nil {
389+
if res, err := c.Execute(`SHOW GLOBAL VARIABLES LIKE 'binlog_row_image'`); err != nil {
390390
return errors.Trace(err)
391391
} else {
392392
// MySQL has binlog row image from 5.6, so older will return empty
@@ -401,7 +401,7 @@ func (c *Canal) CheckBinlogRowImage(image string) error {
401401
}
402402

403403
func (c *Canal) checkBinlogRowFormat() error {
404-
res, err := c.Execute(`SHOW GLOBAL VARIABLES LIKE "binlog_format";`)
404+
res, err := c.Execute(`SHOW GLOBAL VARIABLES LIKE 'binlog_format';`)
405405
if err != nil {
406406
return errors.Trace(err)
407407
} else if f, _ := res.GetString(0, 1); f != "ROW" {

canal/sync.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -223,7 +223,7 @@ func parseStmt(stmt ast.StmtNode) (ns []*node) {
223223
case *ast.TruncateTableStmt:
224224
n := &node{
225225
db: t.Table.Schema.String(),
226-
table: t.Table.Schema.String(),
226+
table: t.Table.Name.String(),
227227
}
228228
ns = []*node{n}
229229
}

client/auth.go

+11-2
Original file line numberDiff line numberDiff line change
@@ -140,9 +140,18 @@ func (c *Conn) writeAuthHandshake() error {
140140
if !authPluginAllowed(c.authPluginName) {
141141
return fmt.Errorf("unknow auth plugin name '%s'", c.authPluginName)
142142
}
143-
// Adjust client capability flags based on server support
143+
144+
// Set default client capabilities that reflect the abilities of this library
144145
capability := CLIENT_PROTOCOL_41 | CLIENT_SECURE_CONNECTION |
145-
CLIENT_LONG_PASSWORD | CLIENT_TRANSACTIONS | CLIENT_PLUGIN_AUTH | c.capability&CLIENT_LONG_FLAG
146+
CLIENT_LONG_PASSWORD | CLIENT_TRANSACTIONS | CLIENT_PLUGIN_AUTH
147+
// Adjust client capability flags based on server support
148+
capability |= c.capability & CLIENT_LONG_FLAG
149+
// Adjust client capability flags on specific client requests
150+
// Only flags that would make any sense setting and aren't handled elsewhere
151+
// in the library are supported here
152+
capability |= c.ccaps&CLIENT_FOUND_ROWS | c.ccaps&CLIENT_IGNORE_SPACE |
153+
c.ccaps&CLIENT_MULTI_STATEMENTS | c.ccaps&CLIENT_MULTI_RESULTS |
154+
c.ccaps&CLIENT_PS_MULTI_RESULTS | c.ccaps&CLIENT_CONNECT_ATTRS
146155

147156
// To enable TLS / SSL
148157
if c.tlsConfig != nil {

client/client_test.go

+45
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package client
22

33
import (
4+
"encoding/json"
45
"flag"
56
"fmt"
67
"strings"
@@ -82,6 +83,7 @@ func (s *clientTestSuite) testConn_CreateTable(c *C) {
8283
e enum("test1", "test2"),
8384
u tinyint unsigned,
8485
i tinyint,
86+
j json,
8587
PRIMARY KEY (id)
8688
) ENGINE=InnoDB DEFAULT CHARSET=utf8`
8789

@@ -94,6 +96,41 @@ func (s *clientTestSuite) TestConn_Ping(c *C) {
9496
c.Assert(err, IsNil)
9597
}
9698

99+
func (s *clientTestSuite) TestConn_SetCapability(c *C) {
100+
caps := []uint32{
101+
mysql.CLIENT_LONG_PASSWORD,
102+
mysql.CLIENT_FOUND_ROWS,
103+
mysql.CLIENT_LONG_FLAG,
104+
mysql.CLIENT_CONNECT_WITH_DB,
105+
mysql.CLIENT_NO_SCHEMA,
106+
mysql.CLIENT_COMPRESS,
107+
mysql.CLIENT_ODBC,
108+
mysql.CLIENT_LOCAL_FILES,
109+
mysql.CLIENT_IGNORE_SPACE,
110+
mysql.CLIENT_PROTOCOL_41,
111+
mysql.CLIENT_INTERACTIVE,
112+
mysql.CLIENT_SSL,
113+
mysql.CLIENT_IGNORE_SIGPIPE,
114+
mysql.CLIENT_TRANSACTIONS,
115+
mysql.CLIENT_RESERVED,
116+
mysql.CLIENT_SECURE_CONNECTION,
117+
mysql.CLIENT_MULTI_STATEMENTS,
118+
mysql.CLIENT_MULTI_RESULTS,
119+
mysql.CLIENT_PS_MULTI_RESULTS,
120+
mysql.CLIENT_PLUGIN_AUTH,
121+
mysql.CLIENT_CONNECT_ATTRS,
122+
mysql.CLIENT_PLUGIN_AUTH_LENENC_CLIENT_DATA,
123+
}
124+
125+
for _, cap := range caps {
126+
c.Assert(s.c.ccaps&cap > 0, IsFalse)
127+
s.c.SetCapability(cap)
128+
c.Assert(s.c.ccaps&cap > 0, IsTrue)
129+
s.c.UnsetCapability(cap)
130+
c.Assert(s.c.ccaps&cap > 0, IsFalse)
131+
}
132+
}
133+
97134
// NOTE for MySQL 5.5 and 5.6, server side has to config SSL to pass the TLS test, otherwise, it will throw error that
98135
// MySQL server does not support TLS required by the client. However, for MySQL 5.7 and above, auto generated certificates
99136
// are used by default so that manual config is no longer necessary.
@@ -149,6 +186,14 @@ func (s *clientTestSuite) TestConn_Insert(c *C) {
149186
c.Assert(pkg.AffectedRows, Equals, uint64(1))
150187
}
151188

189+
func (s *clientTestSuite) TestConn_Insert2(c *C) {
190+
str := `insert into mixer_test_conn (id, j) values(?, ?)`
191+
j := json.RawMessage(`[]`)
192+
pkg, err := s.c.Execute(str, []interface{}{2, j}...)
193+
c.Assert(err, IsNil)
194+
c.Assert(pkg.AffectedRows, Equals, uint64(1))
195+
}
196+
152197
func (s *clientTestSuite) TestConn_Select(c *C) {
153198
str := `select str, f, e from mixer_test_conn where id = 1`
154199

client/conn.go

+20-3
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,10 @@ type Conn struct {
2121
tlsConfig *tls.Config
2222
proto string
2323

24+
// server capabilities
2425
capability uint32
26+
// client-set capabilities only
27+
ccaps uint32
2528

2629
status uint16
2730

@@ -36,6 +39,9 @@ type Conn struct {
3639
// This function will be called for every row in resultset from ExecuteSelectStreaming.
3740
type SelectPerRowCallback func(row []FieldValue) error
3841

42+
// This function will be called once per result from ExecuteSelectStreaming
43+
type SelectPerResultCallback func(result *Result) error
44+
3945
func getNetProto(addr string) string {
4046
proto := "tcp"
4147
if strings.Contains(addr, "/") {
@@ -120,6 +126,16 @@ func (c *Conn) Ping() error {
120126
return nil
121127
}
122128

129+
// SetCapability enables the use of a specific capability
130+
func (c *Conn) SetCapability(cap uint32) {
131+
c.ccaps |= cap
132+
}
133+
134+
// UnsetCapability disables the use of a specific capability
135+
func (c *Conn) UnsetCapability(cap uint32) {
136+
c.ccaps &= ^cap
137+
}
138+
123139
// UseSSL: use default SSL
124140
// pass to options when connect
125141
func (c *Conn) UseSSL(insecureSkipVerify bool) {
@@ -170,6 +186,7 @@ func (c *Conn) Execute(command string, args ...interface{}) (*Result, error) {
170186

171187
// ExecuteSelectStreaming will call perRowCallback for every row in resultset
172188
// WITHOUT saving any row data to Result.{Values/RawPkg/RowDatas} fields.
189+
// When given, perResultCallback will be called once per result
173190
//
174191
// ExecuteSelectStreaming should be used only for SELECT queries with a large response resultset for memory preserving.
175192
//
@@ -180,14 +197,14 @@ func (c *Conn) Execute(command string, args ...interface{}) (*Result, error) {
180197
// // Use the row as you want.
181198
// // You must not save FieldValue.AsString() value after this callback is done. Copy it if you need.
182199
// return nil
183-
// })
200+
// }, nil)
184201
//
185-
func (c *Conn) ExecuteSelectStreaming(command string, result *Result, perRowCallback SelectPerRowCallback) error {
202+
func (c *Conn) ExecuteSelectStreaming(command string, result *Result, perRowCallback SelectPerRowCallback, perResultCallback SelectPerResultCallback) error {
186203
if err := c.writeCommandStr(COM_QUERY, command); err != nil {
187204
return errors.Trace(err)
188205
}
189206

190-
return c.readResultStreaming(false, result, perRowCallback)
207+
return c.readResultStreaming(false, result, perRowCallback, perResultCallback)
191208
}
192209

193210
func (c *Conn) Begin() error {

client/pool.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -466,11 +466,12 @@ func (pool *Pool) startNewConnections(count int) {
466466

467467
func (pool *Pool) ping(conn *Conn) error {
468468
deadline := time.Now().Add(100 * time.Millisecond)
469-
_ = conn.SetWriteDeadline(deadline)
470-
_ = conn.SetReadDeadline(deadline)
469+
_ = conn.SetDeadline(deadline)
471470
err := conn.Ping()
472471
if err != nil {
473472
pool.logFunc(`Pool: ping query fail: %s`, err.Error())
473+
} else {
474+
_ = conn.SetDeadline(time.Time{})
474475
}
475476
return err
476477
}

client/resp.go

+17-3
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,8 @@ func (c *Conn) handleAuthResult() error {
129129
if data[0] == CACHE_SHA2_FAST_AUTH {
130130
if _, err = c.readOK(); err == nil {
131131
return nil // auth successful
132+
} else {
133+
return err
132134
}
133135
} else if data[0] == CACHE_SHA2_FULL_AUTH {
134136
// need full authentication
@@ -233,7 +235,7 @@ func (c *Conn) readResult(binary bool) (*Result, error) {
233235
return c.readResultset(firstPkgBuf, binary)
234236
}
235237

236-
func (c *Conn) readResultStreaming(binary bool, result *Result, perRowCb SelectPerRowCallback) error {
238+
func (c *Conn) readResultStreaming(binary bool, result *Result, perRowCb SelectPerRowCallback, perResCb SelectPerResultCallback) error {
237239
firstPkgBuf, err := c.ReadPacketReuseMem(utils.ByteSliceGet(16)[:0])
238240
defer utils.ByteSlicePut(firstPkgBuf)
239241

@@ -267,7 +269,7 @@ func (c *Conn) readResultStreaming(binary bool, result *Result, perRowCb SelectP
267269
return ErrMalformPacket
268270
}
269271

270-
return c.readResultsetStreaming(firstPkgBuf, binary, result, perRowCb)
272+
return c.readResultsetStreaming(firstPkgBuf, binary, result, perRowCb, perResCb)
271273
}
272274

273275
func (c *Conn) readResultset(data []byte, binary bool) (*Result, error) {
@@ -293,7 +295,7 @@ func (c *Conn) readResultset(data []byte, binary bool) (*Result, error) {
293295
return result, nil
294296
}
295297

296-
func (c *Conn) readResultsetStreaming(data []byte, binary bool, result *Result, perRowCb SelectPerRowCallback) error {
298+
func (c *Conn) readResultsetStreaming(data []byte, binary bool, result *Result, perRowCb SelectPerRowCallback, perResCb SelectPerResultCallback) error {
297299
columnCount, _, n := LengthEncodedInt(data)
298300

299301
if n-len(data) != 0 {
@@ -307,14 +309,26 @@ func (c *Conn) readResultsetStreaming(data []byte, binary bool, result *Result,
307309
result.Reset(int(columnCount))
308310
}
309311

312+
// this is a streaming resultset
313+
result.Resultset.Streaming = true
314+
310315
if err := c.readResultColumns(result); err != nil {
311316
return errors.Trace(err)
312317
}
313318

319+
if perResCb != nil {
320+
if err := perResCb(result); err != nil {
321+
return err
322+
}
323+
}
324+
314325
if err := c.readResultRowsStreaming(result, binary, perRowCb); err != nil {
315326
return errors.Trace(err)
316327
}
317328

329+
// this resultset is done streaming
330+
result.Resultset.StreamingDone = true
331+
318332
return nil
319333
}
320334

client/stmt.go

+12
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package client
22

33
import (
44
"encoding/binary"
5+
"encoding/json"
56
"fmt"
67
"math"
78

@@ -38,6 +39,14 @@ func (s *Stmt) Execute(args ...interface{}) (*Result, error) {
3839
return s.conn.readResult(true)
3940
}
4041

42+
func (s *Stmt) ExecuteSelectStreaming(result *Result, perRowCb SelectPerRowCallback, perResCb SelectPerResultCallback, args ...interface{}) error {
43+
if err := s.write(args...); err != nil {
44+
return errors.Trace(err)
45+
}
46+
47+
return s.conn.readResultStreaming(true, result, perRowCb, perResCb)
48+
}
49+
4150
func (s *Stmt) Close() error {
4251
if err := s.conn.writeCommandUint32(COM_STMT_CLOSE, s.id); err != nil {
4352
return errors.Trace(err)
@@ -127,6 +136,9 @@ func (s *Stmt) write(args ...interface{}) error {
127136
case []byte:
128137
paramTypes[i<<1] = MYSQL_TYPE_STRING
129138
paramValues[i] = append(PutLengthEncodedInt(uint64(len(v))), v...)
139+
case json.RawMessage:
140+
paramTypes[i<<1] = MYSQL_TYPE_STRING
141+
paramValues[i] = append(PutLengthEncodedInt(uint64(len(v))), v...)
130142
default:
131143
return fmt.Errorf("invalid argument type %T", args[i])
132144
}

cmd/go-mysqlbinlog/main.go

+22-4
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ var flavor = flag.String("flavor", "mysql", "Flavor: mysql or mariadb")
2424

2525
var file = flag.String("file", "", "Binlog filename")
2626
var pos = flag.Int("pos", 4, "Binlog position")
27+
var gtid = flag.String("gtid", "", "Binlog GTID set that this slave has executed")
2728

2829
var semiSync = flag.Bool("semisync", false, "Support semi sync")
2930
var backupPath = flag.String("backup_path", "", "backup path to store binlog files")
@@ -57,10 +58,27 @@ func main() {
5758
return
5859
}
5960
} else {
60-
s, err := b.StartSync(pos)
61-
if err != nil {
62-
fmt.Printf("Start sync error: %v\n", errors.ErrorStack(err))
63-
return
61+
var (
62+
s *replication.BinlogStreamer
63+
err error
64+
)
65+
if len(*gtid) > 0 {
66+
gset, err := mysql.ParseGTIDSet(*flavor, *gtid)
67+
if err != nil {
68+
fmt.Printf("Failed to parse gtid %s with flavor %s, error: %v\n",
69+
*gtid, *flavor, errors.ErrorStack(err))
70+
}
71+
s, err = b.StartSyncGTID(gset)
72+
if err != nil {
73+
fmt.Printf("Start sync by GTID error: %v\n", errors.ErrorStack(err))
74+
return
75+
}
76+
} else {
77+
s, err = b.StartSync(pos)
78+
if err != nil {
79+
fmt.Printf("Start sync error: %v\n", errors.ErrorStack(err))
80+
return
81+
}
6482
}
6583

6684
for {

dump/dumper.go

+5-1
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,9 @@ func (d *Dumper) Dump(w io.Writer) error {
164164
}
165165

166166
args = append(args, fmt.Sprintf("--user=%s", d.User))
167-
args = append(args, fmt.Sprintf("--password=%s", d.Password))
167+
passwordArg := fmt.Sprintf("--password=%s", d.Password)
168+
args = append(args, passwordArg)
169+
passwordArgIndex := len(args) - 1
168170

169171
if !d.masterDataSkipped {
170172
args = append(args, "--master-data")
@@ -238,7 +240,9 @@ func (d *Dumper) Dump(w io.Writer) error {
238240
}
239241
}
240242

243+
args[passwordArgIndex] = "--password=******"
241244
log.Infof("exec mysqldump with %v", args)
245+
args[passwordArgIndex] = passwordArg
242246
cmd := exec.Command(d.ExecutionPath, args...)
243247

244248
cmd.Stderr = d.ErrOut

mysql/mysql_test.go

+6
Original file line numberDiff line numberDiff line change
@@ -223,3 +223,9 @@ func (t *mysqlTestSuite) TestMysqlUUIDClone(c *check.C) {
223223
clone := us.Clone()
224224
c.Assert(clone.String(), check.Equals, "de278ad0-2106-11e4-9f8e-6edd0ca20947:1-2")
225225
}
226+
227+
func (t *mysqlTestSuite) TestMysqlEmptyDecode(c *check.C) {
228+
_, isNull, n := LengthEncodedInt(nil)
229+
c.Assert(isNull, check.IsTrue)
230+
c.Assert(n, check.Equals, 0)
231+
}

mysql/resultset.go

+3
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@ type Resultset struct {
1717
RawPkg []byte
1818

1919
RowDatas []RowData
20+
21+
Streaming bool
22+
StreamingDone bool
2023
}
2124

2225
var (

0 commit comments

Comments
 (0)