Skip to content

Commit 65458c6

Browse files
authored
Merge pull request #1 from siddontang/master
update code in my fork
2 parents a8c16ae + 0c5789d commit 65458c6

19 files changed

+641
-297
lines changed

.travis.yml

+2
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ language: go
22

33
go:
44
- "1.11"
5+
- "1.14"
6+
- tip
57

68
services:
79
mysql

README.md

+52-36
Original file line numberDiff line numberDiff line change
@@ -23,18 +23,18 @@ You can use it as a MySQL slave to sync binlog from master then do something, li
2323

2424
```go
2525
import (
26-
"github.com/siddontang/go-mysql/replication"
27-
"os"
26+
"github.com/siddontang/go-mysql/replication"
27+
"os"
2828
)
2929
// Create a binlog syncer with a unique server id, the server id must be different from other MySQL's.
3030
// flavor is mysql or mariadb
3131
cfg := replication.BinlogSyncerConfig {
32-
ServerID: 100,
33-
Flavor: "mysql",
34-
Host: "127.0.0.1",
35-
Port: 3306,
36-
User: "root",
37-
Password: "",
32+
ServerID: 100,
33+
Flavor: "mysql",
34+
Host: "127.0.0.1",
35+
Port: 3306,
36+
User: "root",
37+
Password: "",
3838
}
3939
syncer := replication.NewBinlogSyncer(cfg)
4040

@@ -47,23 +47,23 @@ streamer, _ := syncer.StartSync(mysql.Position{binlogFile, binlogPos})
4747
// the mariadb GTID set likes this "0-1-100"
4848

4949
for {
50-
ev, _ := streamer.GetEvent(context.Background())
51-
// Dump event
52-
ev.Dump(os.Stdout)
50+
ev, _ := streamer.GetEvent(context.Background())
51+
// Dump event
52+
ev.Dump(os.Stdout)
5353
}
5454

5555
// or we can use a timeout context
5656
for {
57-
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
58-
ev, err := s.GetEvent(ctx)
59-
cancel()
57+
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
58+
ev, err := s.GetEvent(ctx)
59+
cancel()
6060

61-
if err == context.DeadlineExceeded {
62-
// meet timeout
63-
continue
64-
}
61+
if err == context.DeadlineExceeded {
62+
// meet timeout
63+
continue
64+
}
6565

66-
ev.Dump(os.Stdout)
66+
ev.Dump(os.Stdout)
6767
}
6868
```
6969

@@ -117,16 +117,16 @@ cfg.Dump.Tables = []string{"canal_test"}
117117
c, err := NewCanal(cfg)
118118

119119
type MyEventHandler struct {
120-
DummyEventHandler
120+
DummyEventHandler
121121
}
122122

123123
func (h *MyEventHandler) OnRow(e *RowsEvent) error {
124-
log.Infof("%s %v\n", e.Action, e.Rows)
125-
return nil
124+
log.Infof("%s %v\n", e.Action, e.Rows)
125+
return nil
126126
}
127127

128128
func (h *MyEventHandler) String() string {
129-
return "MyEventHandler"
129+
return "MyEventHandler"
130130
}
131131

132132
// Register a handler to handle RowsEvent
@@ -146,7 +146,7 @@ Client package supports a simple MySQL connection driver which you can use it to
146146

147147
```go
148148
import (
149-
"github.com/siddontang/go-mysql/client"
149+
"github.com/siddontang/go-mysql/client"
150150
)
151151

152152
// Connect MySQL at 127.0.0.1:3306, with user root, an empty password and database test
@@ -155,7 +155,7 @@ conn, _ := client.Connect("127.0.0.1:3306", "root", "", "test")
155155
// Or to use SSL/TLS connection if MySQL server supports TLS
156156
//conn, _ := client.Connect("127.0.0.1:3306", "root", "", "test", func(c *Conn) {c.UseSSL(true)})
157157

158-
// or to set your own client-side certificates for identity verification for security
158+
// Or to set your own client-side certificates for identity verification for security
159159
//tlsConfig := NewClientTLSConfig(caPem, certPem, keyPem, false, "your-server-name")
160160
//conn, _ := client.Connect("127.0.0.1:3306", "root", "", "test", func(c *Conn) {c.SetTLSConfig(tlsConfig)})
161161

@@ -166,13 +166,29 @@ r, _ := conn.Execute(`insert into table (id, name) values (1, "abc")`)
166166

167167
// Get last insert id
168168
println(r.InsertId)
169+
// Or affected rows count
170+
println(r.AffectedRows)
169171

170172
// Select
171-
r, _ := conn.Execute(`select id, name from table where id = 1`)
173+
r, err := conn.Execute(`select id, name from table where id = 1`)
174+
175+
// Close result for reuse memory (it's not necessary but very useful)
176+
defer r.Close()
172177

173178
// Handle resultset
174179
v, _ := r.GetInt(0, 0)
175-
v, _ = r.GetIntByName(0, "id")
180+
v, _ = r.GetIntByName(0, "id")
181+
182+
// Direct access to fields
183+
for _, row := range r.Values {
184+
for _, val := range row {
185+
_ = val.Value() // interface{}
186+
// or
187+
if val.Type == mysql.FieldValueTypeFloat {
188+
_ = val.AsFloat64() // float64
189+
}
190+
}
191+
}
176192
```
177193

178194
Tested MySQL versions for the client include:
@@ -191,8 +207,8 @@ so that most MySQL clients should be able to connect to the Server without modif
191207

192208
```go
193209
import (
194-
"github.com/siddontang/go-mysql/server"
195-
"net"
210+
"github.com/siddontang/go-mysql/server"
211+
"net"
196212
)
197213

198214
l, _ := net.Listen("tcp", "127.0.0.1:4000")
@@ -204,7 +220,7 @@ c, _ := l.Accept()
204220
conn, _ := server.NewConn(c, "root", "", server.EmptyHandler{})
205221

206222
for {
207-
conn.HandleCommand()
223+
conn.HandleCommand()
208224
}
209225
```
210226

@@ -243,16 +259,16 @@ Driver is the package that you can use go-mysql with go database/sql like other
243259
package main
244260

245261
import (
246-
"database/sql"
262+
"database/sql"
247263

248-
_ "github.com/siddontang/go-mysql/driver"
264+
_ "github.com/siddontang/go-mysql/driver"
249265
)
250266

251267
func main() {
252-
// dsn format: "user:password@addr?dbname"
253-
dsn := "[email protected]:3306?test"
254-
db, _ := sql.Open(dsn)
255-
db.Close()
268+
// dsn format: "user:password@addr?dbname"
269+
dsn := "[email protected]:3306?test"
270+
db, _ := sql.Open(dsn)
271+
db.Close()
256272
}
257273
```
258274

canal/canal.go

+1
Original file line numberDiff line numberDiff line change
@@ -424,6 +424,7 @@ func (c *Canal) prepareSyncer() error {
424424
ParseTime: c.cfg.ParseTime,
425425
SemiSyncEnabled: c.cfg.SemiSyncEnabled,
426426
MaxReconnectAttempts: c.cfg.MaxReconnectAttempts,
427+
DisableRetrySync: c.cfg.DisableRetrySync,
427428
TimestampStringLocation: c.cfg.TimestampStringLocation,
428429
TLSConfig: c.cfg.TLSConfig,
429430
}

canal/config.go

+5-2
Original file line numberDiff line numberDiff line change
@@ -77,10 +77,13 @@ type Config struct {
7777
// SemiSyncEnabled enables semi-sync or not.
7878
SemiSyncEnabled bool `toml:"semi_sync_enabled"`
7979

80-
// Set to change the maximum number of attempts to re-establish a broken
81-
// connection
80+
// maximum number of attempts to re-establish a broken connection, zero or negative number means infinite retry.
81+
// this configuration will not work if DisableRetrySync is true
8282
MaxReconnectAttempts int `toml:"max_reconnect_attempts"`
8383

84+
// whether disable re-sync for broken connection
85+
DisableRetrySync bool `toml:"disable_retry_sync"`
86+
8487
// Set TLS config
8588
TLSConfig *tls.Config
8689
}

client/req.go

+11-15
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
package client
22

3+
import (
4+
"github.com/siddontang/go-mysql/utils"
5+
)
6+
37
func (c *Conn) writeCommand(command byte) error {
48
c.ResetSequence()
59

@@ -16,28 +20,20 @@ func (c *Conn) writeCommandBuf(command byte, arg []byte) error {
1620
c.ResetSequence()
1721

1822
length := len(arg) + 1
19-
20-
data := make([]byte, length+4)
21-
23+
data := utils.ByteSliceGet(length + 4)
2224
data[4] = command
2325

2426
copy(data[5:], arg)
2527

26-
return c.WritePacket(data)
27-
}
28-
29-
func (c *Conn) writeCommandStr(command byte, arg string) error {
30-
c.ResetSequence()
31-
32-
length := len(arg) + 1
33-
34-
data := make([]byte, length+4)
28+
err := c.WritePacket(data)
3529

36-
data[4] = command
30+
utils.ByteSlicePut(data)
3731

38-
copy(data[5:], arg)
32+
return err
33+
}
3934

40-
return c.WritePacket(data)
35+
func (c *Conn) writeCommandStr(command byte, arg string) error {
36+
return c.writeCommandBuf(command, utils.StringToByteSlice(arg))
4137
}
4238

4339
func (c *Conn) writeCommandUint32(command byte, arg uint32) error {

client/resp.go

+30-25
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,15 @@
11
package client
22

33
import (
4-
"encoding/binary"
5-
64
"bytes"
75
"crypto/rsa"
86
"crypto/x509"
7+
"encoding/binary"
98
"encoding/pem"
109

1110
"github.com/pingcap/errors"
1211
. "github.com/siddontang/go-mysql/mysql"
12+
"github.com/siddontang/go-mysql/utils"
1313
"github.com/siddontang/go/hack"
1414
)
1515

@@ -212,40 +212,35 @@ func (c *Conn) readOK() (*Result, error) {
212212
}
213213

214214
func (c *Conn) readResult(binary bool) (*Result, error) {
215-
data, err := c.ReadPacket()
215+
firstPkgBuf, err := c.ReadPacketReuseMem(utils.ByteSliceGet(16)[:0])
216+
defer utils.ByteSlicePut(firstPkgBuf)
217+
216218
if err != nil {
217219
return nil, errors.Trace(err)
218220
}
219221

220-
if data[0] == OK_HEADER {
221-
return c.handleOKPacket(data)
222-
} else if data[0] == ERR_HEADER {
223-
return nil, c.handleErrorPacket(data)
224-
} else if data[0] == LocalInFile_HEADER {
222+
if firstPkgBuf[0] == OK_HEADER {
223+
return c.handleOKPacket(firstPkgBuf)
224+
} else if firstPkgBuf[0] == ERR_HEADER {
225+
return nil, c.handleErrorPacket(append([]byte{}, firstPkgBuf...))
226+
} else if firstPkgBuf[0] == LocalInFile_HEADER {
225227
return nil, ErrMalformPacket
226228
}
227229

228-
return c.readResultset(data, binary)
230+
return c.readResultset(firstPkgBuf, binary)
229231
}
230232

231233
func (c *Conn) readResultset(data []byte, binary bool) (*Result, error) {
232-
result := &Result{
233-
Status: 0,
234-
InsertId: 0,
235-
AffectedRows: 0,
236-
237-
Resultset: &Resultset{},
238-
}
239-
240234
// column count
241235
count, _, n := LengthEncodedInt(data)
242236

243237
if n-len(data) != 0 {
244238
return nil, ErrMalformPacket
245239
}
246240

247-
result.Fields = make([]*Field, count)
248-
result.FieldNames = make(map[string]int, count)
241+
result := &Result{
242+
Resultset: NewResultset(int(count)),
243+
}
249244

250245
if err := c.readResultColumns(result); err != nil {
251246
return nil, errors.Trace(err)
@@ -263,10 +258,12 @@ func (c *Conn) readResultColumns(result *Result) (err error) {
263258
var data []byte
264259

265260
for {
266-
data, err = c.ReadPacket()
261+
rawPkgLen := len(result.RawPkg)
262+
result.RawPkg, err = c.ReadPacketReuseMem(result.RawPkg)
267263
if err != nil {
268264
return
269265
}
266+
data = result.RawPkg[rawPkgLen:]
270267

271268
// EOF Packet
272269
if c.isEOFPacket(data) {
@@ -284,7 +281,10 @@ func (c *Conn) readResultColumns(result *Result) (err error) {
284281
return
285282
}
286283

287-
result.Fields[i], err = FieldData(data).Parse()
284+
if result.Fields[i] == nil {
285+
result.Fields[i] = &Field{}
286+
}
287+
err = result.Fields[i].Parse(data)
288288
if err != nil {
289289
return
290290
}
@@ -299,11 +299,12 @@ func (c *Conn) readResultRows(result *Result, isBinary bool) (err error) {
299299
var data []byte
300300

301301
for {
302-
data, err = c.ReadPacket()
303-
302+
rawPkgLen := len(result.RawPkg)
303+
result.RawPkg, err = c.ReadPacketReuseMem(result.RawPkg)
304304
if err != nil {
305305
return
306306
}
307+
data = result.RawPkg[rawPkgLen:]
307308

308309
// EOF Packet
309310
if c.isEOFPacket(data) {
@@ -324,10 +325,14 @@ func (c *Conn) readResultRows(result *Result, isBinary bool) (err error) {
324325
result.RowDatas = append(result.RowDatas, data)
325326
}
326327

327-
result.Values = make([][]interface{}, len(result.RowDatas))
328+
if cap(result.Values) < len(result.RowDatas) {
329+
result.Values = make([][]FieldValue, len(result.RowDatas))
330+
} else {
331+
result.Values = result.Values[:len(result.RowDatas)]
332+
}
328333

329334
for i := range result.Values {
330-
result.Values[i], err = result.RowDatas[i].Parse(result.Fields, isBinary)
335+
result.Values[i], err = result.RowDatas[i].Parse(result.Fields, isBinary, result.Values[i])
331336

332337
if err != nil {
333338
return errors.Trace(err)
File renamed without changes.

0 commit comments

Comments
 (0)