Skip to content

Commit 08dd4b5

Browse files
jianhaiqingsiddontang
authored andcommitted
parse gtid for parseHandler if MySQL works in GTID_MODE, and begin to startWithGTID after mysqldump is done (go-mysql-org#444)
* add gtid set of MySQL after mysqldump go-mysql-org#439
1 parent 07ca784 commit 08dd4b5

File tree

5 files changed

+104
-3
lines changed

5 files changed

+104
-3
lines changed

canal/dump.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,15 @@ func (h *dumpParseHandler) BinLog(name string, pos uint64) error {
2727
return nil
2828
}
2929

30+
func (h *dumpParseHandler) GtidSet(gtidsets string) (err error) {
31+
if h.gset != nil {
32+
err = h.gset.Update(gtidsets)
33+
} else {
34+
h.gset, err = mysql.ParseGTIDSet("mysql", gtidsets)
35+
}
36+
return err
37+
}
38+
3039
func (h *dumpParseHandler) Data(db string, table string, values []string) error {
3140
if err := h.c.ctx.Err(); err != nil {
3241
return err
@@ -167,6 +176,7 @@ func (c *Canal) dump() error {
167176

168177
pos := mysql.Position{Name: h.name, Pos: uint32(h.pos)}
169178
c.master.Update(pos)
179+
c.master.UpdateGTIDSet(h.gset)
170180
if err := c.eventHandler.OnPosSynced(pos, c.master.GTIDSet(), true); err != nil {
171181
return errors.Trace(err)
172182
}

canal/sync.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ import (
1616

1717
func (c *Canal) startSyncer() (*replication.BinlogStreamer, error) {
1818
gset := c.master.GTIDSet()
19-
if gset == nil {
19+
if gset == nil || gset.String() == "" {
2020
pos := c.master.Position()
2121
s, err := c.syncer.StartSync(pos)
2222
if err != nil {

dump/dump_test.go

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111

1212
. "github.com/pingcap/check"
1313
"github.com/siddontang/go-mysql/client"
14+
"github.com/siddontang/go-mysql/mysql"
1415
)
1516

1617
// use docker mysql for test
@@ -113,16 +114,88 @@ func (s *schemaTestSuite) TestDump(c *C) {
113114
}
114115

115116
type testParseHandler struct {
117+
gset mysql.GTIDSet
116118
}
117119

118120
func (h *testParseHandler) BinLog(name string, pos uint64) error {
119121
return nil
120122
}
121123

124+
func (h *testParseHandler) GtidSet(gtidsets string) (err error) {
125+
if h.gset != nil {
126+
err = h.gset.Update(gtidsets)
127+
} else {
128+
h.gset, err = mysql.ParseGTIDSet("mysql", gtidsets)
129+
}
130+
return err
131+
}
132+
122133
func (h *testParseHandler) Data(schema string, table string, values []string) error {
123134
return nil
124135
}
125136

137+
type GtidParseTest struct {
138+
gset mysql.GTIDSet
139+
}
140+
141+
func (h *GtidParseTest) UpdateGtidSet(gtidStr string) (err error) {
142+
if h.gset != nil {
143+
err = h.gset.Update(gtidStr)
144+
} else {
145+
h.gset, err = mysql.ParseGTIDSet("mysql", gtidStr)
146+
}
147+
return err
148+
}
149+
150+
func (s *parserTestSuite) TestParseGtidExp(c *C) {
151+
// binlogExp := regexp.MustCompile("^CHANGE MASTER TO MASTER_LOG_FILE='(.+)', MASTER_LOG_POS=(\\d+);")
152+
// gtidExp := regexp.MustCompile("(\\w{8}(-\\w{4}){3}-\\w{12}:\\d+-\\d+)")
153+
tbls := []struct {
154+
input string
155+
expected string
156+
}{
157+
{`SET @@GLOBAL.GTID_PURGED='071a84e8-b253-11e8-8472-005056a27e86:1-76,
158+
2337be48-0456-11e9-bd1c-00505690543b:1-7,
159+
41d816cd-0455-11e9-be42-005056901a22:1-2,
160+
5f1eea9e-b1e5-11e8-bc77-005056a221ed:1-144609156,
161+
75848cdb-8131-11e7-b6fc-1c1b0de85e7b:1-151378598,
162+
780ad602-0456-11e9-8bcd-005056901a22:1-516653148,
163+
92809ddd-1e3c-11e9-9d04-00505690f6ab:1-11858565,
164+
c59598c7-0467-11e9-bbbe-005056901a22:1-226464969,
165+
cbd7809d-0433-11e9-b1cf-00505690543b:1-18233950,
166+
cca778e9-8cdf-11e8-94d0-005056a247b1:1-303899574,
167+
cf80679b-7695-11e8-8873-1c1b0d9a4ab9:1-12836047,
168+
d0951f24-1e21-11e9-bb2e-00505690b730:1-4758092,
169+
e7574090-b123-11e8-8bb4-005056a29643:1-12'
170+
`, "071a84e8-b253-11e8-8472-005056a27e86:1-76,2337be48-0456-11e9-bd1c-00505690543b:1-7,41d816cd-0455-11e9-be42-005056901a22:1-2,5f1eea9e-b1e5-11e8-bc77-005056a221ed:1-144609156,75848cdb-8131-11e7-b6fc-1c1b0de85e7b:1-151378598,780ad602-0456-11e9-8bcd-005056901a22:1-516653148,92809ddd-1e3c-11e9-9d04-00505690f6ab:1-11858565,c59598c7-0467-11e9-bbbe-005056901a22:1-226464969,cbd7809d-0433-11e9-b1cf-00505690543b:1-18233950,cca778e9-8cdf-11e8-94d0-005056a247b1:1-303899574,cf80679b-7695-11e8-8873-1c1b0d9a4ab9:1-12836047,d0951f24-1e21-11e9-bb2e-00505690b730:1-4758092,e7574090-b123-11e8-8bb4-005056a29643:1-12"},
171+
{`SET @@GLOBAL.GTID_PURGED='071a84e8-b253-11e8-8472-005056a27e86:1-76,
172+
2337be48-0456-11e9-bd1c-00505690543b:1-7';
173+
`, "071a84e8-b253-11e8-8472-005056a27e86:1-76,2337be48-0456-11e9-bd1c-00505690543b:1-7"},
174+
{`SET @@GLOBAL.GTID_PURGED='c0977f88-3104-11e9-81e1-00505690245b:1-274559';
175+
`, "c0977f88-3104-11e9-81e1-00505690245b:1-274559"},
176+
{`CHANGE MASTER TO MASTER_LOG_FILE='mysql-bin.008995', MASTER_LOG_POS=102052485;`, ""},
177+
}
178+
179+
for _, tt := range tbls {
180+
reader := strings.NewReader(tt.input)
181+
var handler = new(testParseHandler)
182+
183+
Parse(reader, handler, true)
184+
185+
if tt.expected == "" {
186+
if handler.gset != nil {
187+
c.Assert(handler.gset, IsNil)
188+
} else {
189+
continue
190+
}
191+
}
192+
expectedGtidset, err := mysql.ParseGTIDSet("mysql", tt.expected)
193+
c.Assert(err, IsNil)
194+
c.Assert(expectedGtidset.Equal(handler.gset), IsTrue)
195+
}
196+
197+
}
198+
126199
func (s *parserTestSuite) TestParseFindTable(c *C) {
127200
tbl := []struct {
128201
sql string

dump/parser.go

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,18 +19,22 @@ var (
1919
type ParseHandler interface {
2020
// Parse CHANGE MASTER TO MASTER_LOG_FILE=name, MASTER_LOG_POS=pos;
2121
BinLog(name string, pos uint64) error
22-
22+
GtidSet(gtidsets string) error
2323
Data(schema string, table string, values []string) error
2424
}
2525

2626
var binlogExp *regexp.Regexp
2727
var useExp *regexp.Regexp
2828
var valuesExp *regexp.Regexp
29+
var gtidExp *regexp.Regexp
2930

3031
func init() {
3132
binlogExp = regexp.MustCompile("^CHANGE MASTER TO MASTER_LOG_FILE='(.+)', MASTER_LOG_POS=(\\d+);")
3233
useExp = regexp.MustCompile("^USE `(.+)`;")
3334
valuesExp = regexp.MustCompile("^INSERT INTO `(.+?)` VALUES \\((.+)\\);$")
35+
// The pattern will only match MySQL GTID, as you know SET GLOBAL gtid_slave_pos='0-1-4' is used for MariaDB.
36+
//SET @@GLOBAL.GTID_PURGED='1638041a-0457-11e9-bb9f-00505690b730:1-429405150';
37+
gtidExp = regexp.MustCompile("(\\w{8}(-\\w{4}){3}-\\w{12}:\\d+-\\d+)")
3438
}
3539

3640
// Parse the dump data with Dumper generate.
@@ -55,6 +59,16 @@ func Parse(r io.Reader, h ParseHandler, parseBinlogPos bool) error {
5559
})
5660

5761
if parseBinlogPos && !binlogParsed {
62+
// parsed gtid set from mysqldump
63+
// gtid comes before binlog file-positon
64+
if m := gtidExp.FindAllStringSubmatch(line, -1); len(m) == 1 {
65+
gtidStr := m[0][1]
66+
if gtidStr != "" {
67+
if err := h.GtidSet(gtidStr); err != nil {
68+
return errors.Trace(err)
69+
}
70+
}
71+
}
5872
if m := binlogExp.FindAllStringSubmatch(line, -1); len(m) == 1 {
5973
name := m[0][1]
6074
pos, err := strconv.ParseUint(m[0][2], 10, 64)

mysql/mysql_gtid.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import (
1010
"strings"
1111

1212
"github.com/pingcap/errors"
13-
"github.com/satori/go.uuid"
13+
uuid "github.com/satori/go.uuid"
1414
"github.com/siddontang/go/hack"
1515
)
1616

@@ -398,6 +398,10 @@ func (s *MysqlGTIDSet) Equal(o GTIDSet) bool {
398398
return false
399399
}
400400

401+
if len(sub.Sets) != len(s.Sets) {
402+
return false
403+
}
404+
401405
for key, set := range sub.Sets {
402406
o, ok := s.Sets[key]
403407
if !ok {

0 commit comments

Comments
 (0)