Skip to content

Commit a378b15

Browse files
committed
add gtid set of MySQL after mysqldump #439
truncate os.Hostname if lengther than 60
1 parent e4fc336 commit a378b15

File tree

9 files changed

+270
-4
lines changed

9 files changed

+270
-4
lines changed

canal/canal.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,23 @@ func NewCanal(cfg *Config) (*Canal, error) {
120120
return c, nil
121121
}
122122

123+
func (c *Canal) validateSetGtidPurged() error {
124+
gtidPuged := strings.ToLower(c.cfg.Dump.GtidPurged)
125+
if gtidPuged == "none" {
126+
return nil
127+
} else if gtidPuged == "auto" {
128+
res, err := c.Execute(`SHOW GLOBAL VARIABLES LIKE "gtid_mode";`)
129+
if err != nil {
130+
return errors.Trace(err)
131+
} else if f, _ := res.GetString(0, 1); f != "ON" {
132+
return errors.Errorf("set-gtid-purged: %s, gtid_mode should be ON %s, but now is ", c.cfg.Dump.GtidPurged, f)
133+
}
134+
return nil
135+
}
136+
137+
return errors.Errorf("set_gtid_purged: on OR auto can be set, current is %s", gtidPuged)
138+
}
139+
123140
func (c *Canal) prepareDumper() error {
124141
var err error
125142
dumpPath := c.cfg.Dump.ExecutionPath
@@ -128,6 +145,12 @@ func (c *Canal) prepareDumper() error {
128145
return nil
129146
}
130147

148+
// validate c.cfg.Dump.GtidPurged)
149+
err = c.validateSetGtidPurged()
150+
if err != nil {
151+
return err
152+
}
153+
131154
if c.dumper, err = dump.NewDumper(dumpPath,
132155
c.cfg.Addr, c.cfg.User, c.cfg.Password); err != nil {
133156
return errors.Trace(err)
@@ -153,6 +176,7 @@ func (c *Canal) prepareDumper() error {
153176

154177
c.dumper.SetWhere(c.cfg.Dump.Where)
155178
c.dumper.SkipMasterData(c.cfg.Dump.SkipMasterData)
179+
c.dumper.SetGtidPurged(strings.ToLower(c.cfg.Dump.GtidPurged))
156180
c.dumper.SetMaxAllowedPacket(c.cfg.Dump.MaxAllowedPacketMB)
157181
c.dumper.SetProtocol(c.cfg.Dump.Protocol)
158182
// Use hex blob for mysqldump

canal/canal_test.go

Lines changed: 129 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,13 @@
11
package canal
22

33
import (
4+
"bufio"
45
"flag"
56
"fmt"
7+
"io"
8+
"regexp"
9+
"strconv"
10+
"strings"
611
"testing"
712
"time"
813

@@ -14,7 +19,10 @@ import (
1419
"github.com/siddontang/go-mysql/replication"
1520
)
1621

17-
var testHost = flag.String("host", "127.0.0.1", "MySQL host")
22+
var (
23+
ErrSkip = errors.New("Handler error, but skipped")
24+
testHost = flag.String("host", "127.0.0.1", "MySQL host")
25+
)
1826

1927
func Test(t *testing.T) {
2028
TestingT(t)
@@ -74,6 +82,126 @@ func (s *canalTestSuite) SetUpSuite(c *C) {
7482
}()
7583
}
7684

85+
func TestCanalHandler(t *testing.T) {
86+
oneGtidExp := regexp.MustCompile("SET @@GLOBAL.GTID_PURGED='(.+)'")
87+
mutilGtidStartExp := regexp.MustCompile("SET @@GLOBAL.GTID_PURGED='(.+),")
88+
midUuidSet := regexp.MustCompile("(^\\w{8}(-\\w{4}){3}-\\w{12}:\\d+-\\d+),")
89+
endUuidSet := regexp.MustCompile("(^\\w{8}(-\\w{4}){3}-\\w{12}:\\d+-\\d+)'")
90+
binlogExp := regexp.MustCompile("^CHANGE MASTER TO MASTER_LOG_FILE='(.+)', MASTER_LOG_POS=(\\d+);")
91+
tbls := []struct {
92+
input string
93+
expected string
94+
}{
95+
{`SET @@GLOBAL.GTID_PURGED='071a84e8-b253-11e8-8472-005056a27e86:1-76,
96+
2337be48-0456-11e9-bd1c-00505690543b:1-7,
97+
41d816cd-0455-11e9-be42-005056901a22:1-2,
98+
5f1eea9e-b1e5-11e8-bc77-005056a221ed:1-144609156,
99+
75848cdb-8131-11e7-b6fc-1c1b0de85e7b:1-151378598,
100+
780ad602-0456-11e9-8bcd-005056901a22:1-516653148,
101+
92809ddd-1e3c-11e9-9d04-00505690f6ab:1-11858565,
102+
c59598c7-0467-11e9-bbbe-005056901a22:1-226464969,
103+
cbd7809d-0433-11e9-b1cf-00505690543b:1-18233950,
104+
cca778e9-8cdf-11e8-94d0-005056a247b1:1-303899574,
105+
cf80679b-7695-11e8-8873-1c1b0d9a4ab9:1-12836047,
106+
d0951f24-1e21-11e9-bb2e-00505690b730:1-4758092,
107+
e7574090-b123-11e8-8bb4-005056a29643:1-12'`, "071a84e8-b253-11e8-8472-005056a27e86:1-76,2337be48-0456-11e9-bd1c-00505690543b:1-7,41d816cd-0455-11e9-be42-005056901a22:1-2,5f1eea9e-b1e5-11e8-bc77-005056a221ed:1-144609156,75848cdb-8131-11e7-b6fc-1c1b0de85e7b:1-151378598,780ad602-0456-11e9-8bcd-005056901a22:1-516653148,92809ddd-1e3c-11e9-9d04-00505690f6ab:1-11858565,c59598c7-0467-11e9-bbbe-005056901a22:1-226464969,cbd7809d-0433-11e9-b1cf-00505690543b:1-18233950,cca778e9-8cdf-11e8-94d0-005056a247b1:1-303899574,cf80679b-7695-11e8-8873-1c1b0d9a4ab9:1-12836047,d0951f24-1e21-11e9-bb2e-00505690b730:1-4758092,e7574090-b123-11e8-8bb4-005056a29643:1-12"},
108+
{`SET @@GLOBAL.GTID_PURGED='071a84e8-b253-11e8-8472-005056a27e86:1-76,
109+
2337be48-0456-11e9-bd1c-00505690543b:1-7';`, "071a84e8-b253-11e8-8472-005056a27e86:1-76,2337be48-0456-11e9-bd1c-00505690543b:1-7"},
110+
{`SET @@GLOBAL.GTID_PURGED='c0977f88-3104-11e9-81e1-00505690245b:1-274559';`, "c0977f88-3104-11e9-81e1-00505690245b:1-274559"},
111+
{`CHANGE MASTER TO MASTER_LOG_FILE='mysql-bin.008995', MASTER_LOG_POS=102052485;`, ""},
112+
}
113+
114+
for _, tt := range tbls {
115+
h := dumpParseHandler{}
116+
reader := strings.NewReader(tt.input)
117+
newReader := bufio.NewReader(reader)
118+
var binlogParsed bool
119+
var gtidDoneParsed bool
120+
var mutilGtidParsed bool
121+
parseBinlogPos := true
122+
for {
123+
bytes, _, err := newReader.ReadLine()
124+
line := string(bytes)
125+
if err != io.EOF {
126+
fmt.Println(string(line))
127+
} else {
128+
break
129+
}
130+
if parseBinlogPos && !gtidDoneParsed && !binlogParsed {
131+
if m := oneGtidExp.FindAllStringSubmatch(line, -1); len(m) == 1 {
132+
gset := m[0][1]
133+
if err := h.UpdateGtidFromPurged(gset); err != nil {
134+
errors.Trace(err)
135+
}
136+
gtidDoneParsed = true
137+
}
138+
if m := mutilGtidStartExp.FindAllStringSubmatch(line, -1); len(m) == 1 {
139+
gset := m[0][1]
140+
if err := h.UpdateGtidFromPurged(gset); err != nil {
141+
errors.Trace(err)
142+
}
143+
mutilGtidParsed = true
144+
}
145+
146+
if mutilGtidParsed && !gtidDoneParsed {
147+
if m := midUuidSet.FindAllStringSubmatch(line, -1); len(m) == 1 {
148+
gset := m[0][1]
149+
if err := h.UpdateGtidFromPurged(gset); err != nil {
150+
errors.Trace(err)
151+
}
152+
153+
}
154+
155+
if m := endUuidSet.FindAllStringSubmatch(line, -1); len(m) == 1 {
156+
gset := m[0][1]
157+
if err := h.UpdateGtidFromPurged(gset); err != nil {
158+
errors.Trace(err)
159+
}
160+
gtidDoneParsed = true
161+
}
162+
163+
}
164+
}
165+
166+
if parseBinlogPos && !binlogParsed {
167+
if m := binlogExp.FindAllStringSubmatch(line, -1); len(m) == 1 {
168+
name := m[0][1]
169+
pos, err := strconv.ParseUint(m[0][2], 10, 64)
170+
if err != nil {
171+
errors.Errorf("parse binlog %v err, invalid number", line)
172+
}
173+
174+
if err = h.BinLog(name, pos); err != nil && err != ErrSkip {
175+
errors.Trace(err)
176+
}
177+
178+
binlogParsed = true
179+
gtidDoneParsed = true
180+
}
181+
}
182+
183+
}
184+
185+
if tt.expected == "" {
186+
if h.gset != nil {
187+
log.Fatalf("expected nil, but get %v", h.gset)
188+
}
189+
continue
190+
}
191+
expectedGtidset, err := mysql.ParseGTIDSet("mysql", tt.expected)
192+
if err != nil {
193+
log.Fatalf("Gtid:%s failed parsed, err: %v", tt.expected, err)
194+
}
195+
if !expectedGtidset.Equal(h.gset) {
196+
log.Fatalf("expected:%v , but get: %v", expectedGtidset, h.gset)
197+
}
198+
199+
// c.Assert(expectedGtidset.Equal(h.gset), IsTrue)
200+
// c.Logf("parsed gtidset: %v", h.gset)
201+
}
202+
203+
}
204+
77205
func (s *canalTestSuite) TearDownSuite(c *C) {
78206
// To test the heartbeat and read timeout,so need to sleep 1 seconds without data transmission
79207
c.Logf("Start testing the heartbeat and read timeout")

canal/config.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,9 @@ type DumpConfig struct {
3434
// 'FLUSH TABLES WITH READ LOCK'
3535
SkipMasterData bool `toml:"skip_master_data"`
3636

37+
// set --set-gtid-purged none, auto; none for gtid is disabled or "version too low", auto for gtid_mode=on;
38+
GtidPurged string `toml:"set_gtid_purged"`
39+
3740
// Set to change the default max_allowed_packet size
3841
MaxAllowedPacketMB int `toml:"max_allowed_packet_mb"`
3942

@@ -113,6 +116,8 @@ func NewDefaultConfig() *Config {
113116
c.Dump.ExecutionPath = "mysqldump"
114117
c.Dump.DiscardErr = true
115118
c.Dump.SkipMasterData = false
119+
// add default value to disable mysqldump --set-gtid-purged
120+
c.Dump.GtidPurged = "none"
116121

117122
return c
118123
}

canal/dump.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,15 @@ func (h *dumpParseHandler) BinLog(name string, pos uint64) error {
2727
return nil
2828
}
2929

30+
func (h *dumpParseHandler) UpdateGtidFromPurged(gtidsets string) (err error) {
31+
if h.gset != nil {
32+
err = h.gset.Update(gtidsets)
33+
} else {
34+
h.gset, err = mysql.ParseGTIDSet("mysql", gtidsets)
35+
}
36+
return err
37+
}
38+
3039
func (h *dumpParseHandler) Data(db string, table string, values []string) error {
3140
if err := h.c.ctx.Err(); err != nil {
3241
return err
@@ -158,6 +167,7 @@ func (c *Canal) dump() error {
158167

159168
pos := mysql.Position{Name: h.name, Pos: uint32(h.pos)}
160169
c.master.Update(pos)
170+
c.master.UpdateGTIDSet(h.gset)
161171
if err := c.eventHandler.OnPosSynced(pos, c.master.GTIDSet(), true); err != nil {
162172
return errors.Trace(err)
163173
}

dump/dump.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ type Dumper struct {
3737

3838
masterDataSkipped bool
3939
maxAllowedPacket int
40+
gtidPurged string
4041
hexBlob bool
4142
}
4243

@@ -87,6 +88,11 @@ func (d *Dumper) SkipMasterData(v bool) {
8788
d.masterDataSkipped = v
8889
}
8990

91+
// SetGtidPurged: none, auto; none for gtid is disabled or "version too low", auto for gtid_mode=on;
92+
func (d *Dumper) SetGtidPurged(gtid string) {
93+
d.gtidPurged = gtid
94+
}
95+
9096
func (d *Dumper) SetMaxAllowedPacket(i int) {
9197
d.maxAllowedPacket = i
9298
}
@@ -143,6 +149,10 @@ func (d *Dumper) Dump(w io.Writer) error {
143149
args = append(args, "--master-data")
144150
}
145151

152+
if d.gtidPurged == "auto" {
153+
args = append(args, fmt.Sprintf("--set-gtid-purged=%s", d.gtidPurged))
154+
}
155+
146156
if d.maxAllowedPacket > 0 {
147157
// mysqldump param should be --max-allowed-packet=%dM not be --max_allowed_packet=%dM
148158
args = append(args, fmt.Sprintf("--max-allowed-packet=%dM", d.maxAllowedPacket))
@@ -215,6 +225,7 @@ func (d *Dumper) DumpAndParse(h ParseHandler) error {
215225

216226
done := make(chan error, 1)
217227
go func() {
228+
// TODO: set_gtid_purged indicate if parse SET @@GLOBAL.GTID_PURGED='c0977f88-3104-11e9-81e1-00505690245b:1-274559' OR NOT;
218229
err := Parse(r, h, !d.masterDataSkipped)
219230
r.CloseWithError(err)
220231
done <- err

dump/dump_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ func (s *schemaTestSuite) SetUpSuite(c *C) {
3636
c.Assert(err, IsNil)
3737

3838
s.d, err = NewDumper(*execution, fmt.Sprintf("%s:%d", *host, *port), "root", "")
39+
s.d.gtidPurged = "none"
3940
c.Assert(err, IsNil)
4041
c.Assert(s.d, NotNil)
4142

@@ -119,6 +120,10 @@ func (h *testParseHandler) BinLog(name string, pos uint64) error {
119120
return nil
120121
}
121122

123+
func (h *testParseHandler) UpdateGtidFromPurged(gtidsets string) (err error) {
124+
return nil
125+
}
126+
122127
func (h *testParseHandler) Data(schema string, table string, values []string) error {
123128
return nil
124129
}

dump/parser.go

Lines changed: 52 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,24 @@ var (
1919
type ParseHandler interface {
2020
// Parse CHANGE MASTER TO MASTER_LOG_FILE=name, MASTER_LOG_POS=pos;
2121
BinLog(name string, pos uint64) error
22-
22+
UpdateGtidFromPurged(gtidsets string) error
2323
Data(schema string, table string, values []string) error
2424
}
2525

26+
var oneGtidExp *regexp.Regexp
27+
var mutilGtidStartExp *regexp.Regexp
28+
var midUuidSet *regexp.Regexp
29+
var endUuidSet *regexp.Regexp
2630
var binlogExp *regexp.Regexp
2731
var useExp *regexp.Regexp
2832
var valuesExp *regexp.Regexp
2933

3034
func init() {
35+
oneGtidExp = regexp.MustCompile("SET @@GLOBAL.GTID_PURGED='(.+)'")
36+
mutilGtidStartExp = regexp.MustCompile("SET @@GLOBAL.GTID_PURGED='(.+),")
37+
midUuidSet = regexp.MustCompile("(^\\w{8}(-\\w{4}){3}-\\w{12}:\\d+-\\d+),")
38+
endUuidSet = regexp.MustCompile("(^\\w{8}(-\\w{4}){3}-\\w{12}:\\d+-\\d+)'")
39+
3140
binlogExp = regexp.MustCompile("^CHANGE MASTER TO MASTER_LOG_FILE='(.+)', MASTER_LOG_POS=(\\d+);")
3241
useExp = regexp.MustCompile("^USE `(.+)`;")
3342
valuesExp = regexp.MustCompile("^INSERT INTO `(.+?)` VALUES \\((.+)\\);$")
@@ -40,6 +49,8 @@ func Parse(r io.Reader, h ParseHandler, parseBinlogPos bool) error {
4049

4150
var db string
4251
var binlogParsed bool
52+
var gtidDoneParsed bool
53+
var mutilGtidParsed bool
4354

4455
for {
4556
line, err := rb.ReadString('\n')
@@ -54,19 +65,57 @@ func Parse(r io.Reader, h ParseHandler, parseBinlogPos bool) error {
5465
return c == '\r' || c == '\n'
5566
})
5667

68+
// parsed gtid set from mysqldump,refer to canal_test.go TestDumperHandler function
69+
if parseBinlogPos && !gtidDoneParsed && !binlogParsed {
70+
if m := oneGtidExp.FindAllStringSubmatch(line, -1); len(m) == 1 {
71+
gset := m[0][1]
72+
if err := h.UpdateGtidFromPurged(gset); err != nil {
73+
errors.Trace(err)
74+
}
75+
gtidDoneParsed = true
76+
}
77+
if m := mutilGtidStartExp.FindAllStringSubmatch(line, -1); len(m) == 1 {
78+
gset := m[0][1]
79+
if err := h.UpdateGtidFromPurged(gset); err != nil {
80+
errors.Trace(err)
81+
}
82+
mutilGtidParsed = true
83+
}
84+
85+
if mutilGtidParsed && !gtidDoneParsed {
86+
if m := midUuidSet.FindAllStringSubmatch(line, -1); len(m) == 1 {
87+
gset := m[0][1]
88+
if err := h.UpdateGtidFromPurged(gset); err != nil {
89+
errors.Trace(err)
90+
}
91+
92+
}
93+
94+
if m := endUuidSet.FindAllStringSubmatch(line, -1); len(m) == 1 {
95+
gset := m[0][1]
96+
if err := h.UpdateGtidFromPurged(gset); err != nil {
97+
errors.Trace(err)
98+
}
99+
gtidDoneParsed = true
100+
}
101+
102+
}
103+
}
104+
57105
if parseBinlogPos && !binlogParsed {
58106
if m := binlogExp.FindAllStringSubmatch(line, -1); len(m) == 1 {
59107
name := m[0][1]
60108
pos, err := strconv.ParseUint(m[0][2], 10, 64)
61109
if err != nil {
62-
return errors.Errorf("parse binlog %v err, invalid number", line)
110+
errors.Errorf("parse binlog %v err, invalid number", line)
63111
}
64112

65113
if err = h.BinLog(name, pos); err != nil && err != ErrSkip {
66-
return errors.Trace(err)
114+
errors.Trace(err)
67115
}
68116

69117
binlogParsed = true
118+
gtidDoneParsed = true
70119
}
71120
}
72121

0 commit comments

Comments
 (0)