diff --git a/canal/dump.go b/canal/dump.go index 74086be6c..ff01dc880 100644 --- a/canal/dump.go +++ b/canal/dump.go @@ -27,6 +27,15 @@ func (h *dumpParseHandler) BinLog(name string, pos uint64) error { return nil } +func (h *dumpParseHandler) GtidSet(gtidsets string) (err error) { + if h.gset != nil { + err = h.gset.Update(gtidsets) + } else { + h.gset, err = mysql.ParseGTIDSet("mysql", gtidsets) + } + return err +} + func (h *dumpParseHandler) Data(db string, table string, values []string) error { if err := h.c.ctx.Err(); err != nil { return err @@ -167,6 +176,7 @@ func (c *Canal) dump() error { pos := mysql.Position{Name: h.name, Pos: uint32(h.pos)} c.master.Update(pos) + c.master.UpdateGTIDSet(h.gset) if err := c.eventHandler.OnPosSynced(pos, c.master.GTIDSet(), true); err != nil { return errors.Trace(err) } diff --git a/canal/sync.go b/canal/sync.go index 89bc0159d..0b5a0dc99 100644 --- a/canal/sync.go +++ b/canal/sync.go @@ -16,7 +16,7 @@ import ( func (c *Canal) startSyncer() (*replication.BinlogStreamer, error) { gset := c.master.GTIDSet() - if gset == nil { + if gset == nil || gset.String() == "" { pos := c.master.Position() s, err := c.syncer.StartSync(pos) if err != nil { diff --git a/dump/dump_test.go b/dump/dump_test.go index eed4c7507..e42e8c1be 100644 --- a/dump/dump_test.go +++ b/dump/dump_test.go @@ -11,6 +11,7 @@ import ( . "github.com/pingcap/check" "github.com/siddontang/go-mysql/client" + "github.com/siddontang/go-mysql/mysql" ) // use docker mysql for test @@ -113,16 +114,88 @@ func (s *schemaTestSuite) TestDump(c *C) { } type testParseHandler struct { + gset mysql.GTIDSet } func (h *testParseHandler) BinLog(name string, pos uint64) error { return nil } +func (h *testParseHandler) GtidSet(gtidsets string) (err error) { + if h.gset != nil { + err = h.gset.Update(gtidsets) + } else { + h.gset, err = mysql.ParseGTIDSet("mysql", gtidsets) + } + return err +} + func (h *testParseHandler) Data(schema string, table string, values []string) error { return nil } +type GtidParseTest struct { + gset mysql.GTIDSet +} + +func (h *GtidParseTest) UpdateGtidSet(gtidStr string) (err error) { + if h.gset != nil { + err = h.gset.Update(gtidStr) + } else { + h.gset, err = mysql.ParseGTIDSet("mysql", gtidStr) + } + return err +} + +func (s *parserTestSuite) TestParseGtidExp(c *C) { + // binlogExp := regexp.MustCompile("^CHANGE MASTER TO MASTER_LOG_FILE='(.+)', MASTER_LOG_POS=(\\d+);") + // gtidExp := regexp.MustCompile("(\\w{8}(-\\w{4}){3}-\\w{12}:\\d+-\\d+)") + tbls := []struct { + input string + expected string + }{ + {`SET @@GLOBAL.GTID_PURGED='071a84e8-b253-11e8-8472-005056a27e86:1-76, +2337be48-0456-11e9-bd1c-00505690543b:1-7, +41d816cd-0455-11e9-be42-005056901a22:1-2, +5f1eea9e-b1e5-11e8-bc77-005056a221ed:1-144609156, +75848cdb-8131-11e7-b6fc-1c1b0de85e7b:1-151378598, +780ad602-0456-11e9-8bcd-005056901a22:1-516653148, +92809ddd-1e3c-11e9-9d04-00505690f6ab:1-11858565, +c59598c7-0467-11e9-bbbe-005056901a22:1-226464969, +cbd7809d-0433-11e9-b1cf-00505690543b:1-18233950, +cca778e9-8cdf-11e8-94d0-005056a247b1:1-303899574, +cf80679b-7695-11e8-8873-1c1b0d9a4ab9:1-12836047, +d0951f24-1e21-11e9-bb2e-00505690b730:1-4758092, +e7574090-b123-11e8-8bb4-005056a29643:1-12' +`, "071a84e8-b253-11e8-8472-005056a27e86:1-76,2337be48-0456-11e9-bd1c-00505690543b:1-7,41d816cd-0455-11e9-be42-005056901a22:1-2,5f1eea9e-b1e5-11e8-bc77-005056a221ed:1-144609156,75848cdb-8131-11e7-b6fc-1c1b0de85e7b:1-151378598,780ad602-0456-11e9-8bcd-005056901a22:1-516653148,92809ddd-1e3c-11e9-9d04-00505690f6ab:1-11858565,c59598c7-0467-11e9-bbbe-005056901a22:1-226464969,cbd7809d-0433-11e9-b1cf-00505690543b:1-18233950,cca778e9-8cdf-11e8-94d0-005056a247b1:1-303899574,cf80679b-7695-11e8-8873-1c1b0d9a4ab9:1-12836047,d0951f24-1e21-11e9-bb2e-00505690b730:1-4758092,e7574090-b123-11e8-8bb4-005056a29643:1-12"}, + {`SET @@GLOBAL.GTID_PURGED='071a84e8-b253-11e8-8472-005056a27e86:1-76, +2337be48-0456-11e9-bd1c-00505690543b:1-7'; +`, "071a84e8-b253-11e8-8472-005056a27e86:1-76,2337be48-0456-11e9-bd1c-00505690543b:1-7"}, + {`SET @@GLOBAL.GTID_PURGED='c0977f88-3104-11e9-81e1-00505690245b:1-274559'; +`, "c0977f88-3104-11e9-81e1-00505690245b:1-274559"}, + {`CHANGE MASTER TO MASTER_LOG_FILE='mysql-bin.008995', MASTER_LOG_POS=102052485;`, ""}, + } + + for _, tt := range tbls { + reader := strings.NewReader(tt.input) + var handler = new(testParseHandler) + + Parse(reader, handler, true) + + if tt.expected == "" { + if handler.gset != nil { + c.Assert(handler.gset, IsNil) + } else { + continue + } + } + expectedGtidset, err := mysql.ParseGTIDSet("mysql", tt.expected) + c.Assert(err, IsNil) + c.Assert(expectedGtidset.Equal(handler.gset), IsTrue) + } + +} + func (s *parserTestSuite) TestParseFindTable(c *C) { tbl := []struct { sql string diff --git a/dump/parser.go b/dump/parser.go index f0222a9eb..2b99fb864 100644 --- a/dump/parser.go +++ b/dump/parser.go @@ -19,18 +19,22 @@ var ( type ParseHandler interface { // Parse CHANGE MASTER TO MASTER_LOG_FILE=name, MASTER_LOG_POS=pos; BinLog(name string, pos uint64) error - + GtidSet(gtidsets string) error Data(schema string, table string, values []string) error } var binlogExp *regexp.Regexp var useExp *regexp.Regexp var valuesExp *regexp.Regexp +var gtidExp *regexp.Regexp func init() { binlogExp = regexp.MustCompile("^CHANGE MASTER TO MASTER_LOG_FILE='(.+)', MASTER_LOG_POS=(\\d+);") useExp = regexp.MustCompile("^USE `(.+)`;") valuesExp = regexp.MustCompile("^INSERT INTO `(.+?)` VALUES \\((.+)\\);$") + // The pattern will only match MySQL GTID, as you know SET GLOBAL gtid_slave_pos='0-1-4' is used for MariaDB. + //SET @@GLOBAL.GTID_PURGED='1638041a-0457-11e9-bb9f-00505690b730:1-429405150'; + gtidExp = regexp.MustCompile("(\\w{8}(-\\w{4}){3}-\\w{12}:\\d+-\\d+)") } // Parse the dump data with Dumper generate. @@ -55,6 +59,16 @@ func Parse(r io.Reader, h ParseHandler, parseBinlogPos bool) error { }) if parseBinlogPos && !binlogParsed { + // parsed gtid set from mysqldump + // gtid comes before binlog file-positon + if m := gtidExp.FindAllStringSubmatch(line, -1); len(m) == 1 { + gtidStr := m[0][1] + if gtidStr != "" { + if err := h.GtidSet(gtidStr); err != nil { + return errors.Trace(err) + } + } + } if m := binlogExp.FindAllStringSubmatch(line, -1); len(m) == 1 { name := m[0][1] pos, err := strconv.ParseUint(m[0][2], 10, 64) diff --git a/mysql/mysql_gtid.go b/mysql/mysql_gtid.go index 0c12d0924..151581f85 100644 --- a/mysql/mysql_gtid.go +++ b/mysql/mysql_gtid.go @@ -10,7 +10,7 @@ import ( "strings" "github.com/pingcap/errors" - "github.com/satori/go.uuid" + uuid "github.com/satori/go.uuid" "github.com/siddontang/go/hack" ) @@ -398,6 +398,10 @@ func (s *MysqlGTIDSet) Equal(o GTIDSet) bool { return false } + if len(sub.Sets) != len(s.Sets) { + return false + } + for key, set := range sub.Sets { o, ok := s.Sets[key] if !ok {