From 3526dcceef94aef2ee1664d02bf886c7a972e161 Mon Sep 17 00:00:00 2001 From: jianhaiqing Date: Mon, 21 Oct 2019 22:33:15 +0800 Subject: [PATCH 01/15] add gtid set of MySQL after mysqldump #439 --- canal/canal.go | 24 ++++++++ canal/canal_test.go | 130 +++++++++++++++++++++++++++++++++++++++++++- canal/config.go | 5 ++ canal/dump.go | 10 ++++ dump/dump.go | 11 ++++ dump/dump_test.go | 5 ++ dump/parser.go | 51 ++++++++++++++++- 7 files changed, 234 insertions(+), 2 deletions(-) diff --git a/canal/canal.go b/canal/canal.go index 9714adc95..a287d6b98 100644 --- a/canal/canal.go +++ b/canal/canal.go @@ -120,6 +120,23 @@ func NewCanal(cfg *Config) (*Canal, error) { return c, nil } +func (c *Canal) validateSetGtidPurged() error { + gtidPuged := strings.ToLower(c.cfg.Dump.GtidPurged) + if gtidPuged == "none" { + return nil + } else if gtidPuged == "auto" { + res, err := c.Execute(`SHOW GLOBAL VARIABLES LIKE "gtid_mode";`) + if err != nil { + return errors.Trace(err) + } else if f, _ := res.GetString(0, 1); f != "ON" { + return errors.Errorf("set-gtid-purged: %s, gtid_mode should be ON, but now is %s", c.cfg.Dump.GtidPurged, f) + } + return nil + } + + return errors.Errorf("set_gtid_purged: on OR auto can be set, current is %s", gtidPuged) +} + func (c *Canal) prepareDumper() error { var err error dumpPath := c.cfg.Dump.ExecutionPath @@ -128,6 +145,12 @@ func (c *Canal) prepareDumper() error { return nil } + // validate c.cfg.Dump.GtidPurged) + err = c.validateSetGtidPurged() + if err != nil { + return err + } + if c.dumper, err = dump.NewDumper(dumpPath, c.cfg.Addr, c.cfg.User, c.cfg.Password); err != nil { return errors.Trace(err) @@ -153,6 +176,7 @@ func (c *Canal) prepareDumper() error { c.dumper.SetWhere(c.cfg.Dump.Where) c.dumper.SkipMasterData(c.cfg.Dump.SkipMasterData) + c.dumper.SetGtidPurged(strings.ToLower(c.cfg.Dump.GtidPurged)) c.dumper.SetMaxAllowedPacket(c.cfg.Dump.MaxAllowedPacketMB) c.dumper.SetProtocol(c.cfg.Dump.Protocol) // Use hex blob for mysqldump diff --git a/canal/canal_test.go b/canal/canal_test.go index edd83fc02..a93fe0e0a 100755 --- a/canal/canal_test.go +++ b/canal/canal_test.go @@ -1,8 +1,13 @@ package canal import ( + "bufio" "flag" "fmt" + "io" + "regexp" + "strconv" + "strings" "testing" "time" @@ -14,7 +19,10 @@ import ( "github.com/siddontang/go-mysql/replication" ) -var testHost = flag.String("host", "127.0.0.1", "MySQL host") +var ( + ErrSkip = errors.New("Handler error, but skipped") + testHost = flag.String("host", "127.0.0.1", "MySQL host") +) func Test(t *testing.T) { TestingT(t) @@ -74,6 +82,126 @@ func (s *canalTestSuite) SetUpSuite(c *C) { }() } +func TestCanalHandler(t *testing.T) { + oneGtidExp := regexp.MustCompile("SET @@GLOBAL.GTID_PURGED='(.+)'") + mutilGtidStartExp := regexp.MustCompile("SET @@GLOBAL.GTID_PURGED='(.+),") + midUuidSet := regexp.MustCompile("(^\\w{8}(-\\w{4}){3}-\\w{12}:\\d+-\\d+),") + endUuidSet := regexp.MustCompile("(^\\w{8}(-\\w{4}){3}-\\w{12}:\\d+-\\d+)'") + binlogExp := regexp.MustCompile("^CHANGE MASTER TO MASTER_LOG_FILE='(.+)', MASTER_LOG_POS=(\\d+);") + tbls := []struct { + input string + expected string + }{ + {`SET @@GLOBAL.GTID_PURGED='071a84e8-b253-11e8-8472-005056a27e86:1-76, +2337be48-0456-11e9-bd1c-00505690543b:1-7, +41d816cd-0455-11e9-be42-005056901a22:1-2, +5f1eea9e-b1e5-11e8-bc77-005056a221ed:1-144609156, +75848cdb-8131-11e7-b6fc-1c1b0de85e7b:1-151378598, +780ad602-0456-11e9-8bcd-005056901a22:1-516653148, +92809ddd-1e3c-11e9-9d04-00505690f6ab:1-11858565, +c59598c7-0467-11e9-bbbe-005056901a22:1-226464969, +cbd7809d-0433-11e9-b1cf-00505690543b:1-18233950, +cca778e9-8cdf-11e8-94d0-005056a247b1:1-303899574, +cf80679b-7695-11e8-8873-1c1b0d9a4ab9:1-12836047, +d0951f24-1e21-11e9-bb2e-00505690b730:1-4758092, +e7574090-b123-11e8-8bb4-005056a29643:1-12'`, "071a84e8-b253-11e8-8472-005056a27e86:1-76,2337be48-0456-11e9-bd1c-00505690543b:1-7,41d816cd-0455-11e9-be42-005056901a22:1-2,5f1eea9e-b1e5-11e8-bc77-005056a221ed:1-144609156,75848cdb-8131-11e7-b6fc-1c1b0de85e7b:1-151378598,780ad602-0456-11e9-8bcd-005056901a22:1-516653148,92809ddd-1e3c-11e9-9d04-00505690f6ab:1-11858565,c59598c7-0467-11e9-bbbe-005056901a22:1-226464969,cbd7809d-0433-11e9-b1cf-00505690543b:1-18233950,cca778e9-8cdf-11e8-94d0-005056a247b1:1-303899574,cf80679b-7695-11e8-8873-1c1b0d9a4ab9:1-12836047,d0951f24-1e21-11e9-bb2e-00505690b730:1-4758092,e7574090-b123-11e8-8bb4-005056a29643:1-12"}, + {`SET @@GLOBAL.GTID_PURGED='071a84e8-b253-11e8-8472-005056a27e86:1-76, +2337be48-0456-11e9-bd1c-00505690543b:1-7';`, "071a84e8-b253-11e8-8472-005056a27e86:1-76,2337be48-0456-11e9-bd1c-00505690543b:1-7"}, + {`SET @@GLOBAL.GTID_PURGED='c0977f88-3104-11e9-81e1-00505690245b:1-274559';`, "c0977f88-3104-11e9-81e1-00505690245b:1-274559"}, + {`CHANGE MASTER TO MASTER_LOG_FILE='mysql-bin.008995', MASTER_LOG_POS=102052485;`, ""}, + } + + for _, tt := range tbls { + h := dumpParseHandler{} + reader := strings.NewReader(tt.input) + newReader := bufio.NewReader(reader) + var binlogParsed bool + var gtidDoneParsed bool + var mutilGtidParsed bool + parseBinlogPos := true + for { + bytes, _, err := newReader.ReadLine() + line := string(bytes) + if err != io.EOF { + fmt.Println(string(line)) + } else { + break + } + if parseBinlogPos && !gtidDoneParsed && !binlogParsed { + if m := oneGtidExp.FindAllStringSubmatch(line, -1); len(m) == 1 { + gset := m[0][1] + if err := h.UpdateGtidFromPurged(gset); err != nil { + errors.Trace(err) + } + gtidDoneParsed = true + } + if m := mutilGtidStartExp.FindAllStringSubmatch(line, -1); len(m) == 1 { + gset := m[0][1] + if err := h.UpdateGtidFromPurged(gset); err != nil { + errors.Trace(err) + } + mutilGtidParsed = true + } + + if mutilGtidParsed && !gtidDoneParsed { + if m := midUuidSet.FindAllStringSubmatch(line, -1); len(m) == 1 { + gset := m[0][1] + if err := h.UpdateGtidFromPurged(gset); err != nil { + errors.Trace(err) + } + + } + + if m := endUuidSet.FindAllStringSubmatch(line, -1); len(m) == 1 { + gset := m[0][1] + if err := h.UpdateGtidFromPurged(gset); err != nil { + errors.Trace(err) + } + gtidDoneParsed = true + } + + } + } + + if parseBinlogPos && !binlogParsed { + if m := binlogExp.FindAllStringSubmatch(line, -1); len(m) == 1 { + name := m[0][1] + pos, err := strconv.ParseUint(m[0][2], 10, 64) + if err != nil { + errors.Errorf("parse binlog %v err, invalid number", line) + } + + if err = h.BinLog(name, pos); err != nil && err != ErrSkip { + errors.Trace(err) + } + + binlogParsed = true + gtidDoneParsed = true + } + } + + } + + if tt.expected == "" { + if h.gset != nil { + log.Fatalf("expected nil, but get %v", h.gset) + } + continue + } + expectedGtidset, err := mysql.ParseGTIDSet("mysql", tt.expected) + if err != nil { + log.Fatalf("Gtid:%s failed parsed, err: %v", tt.expected, err) + } + if !expectedGtidset.Equal(h.gset) { + log.Fatalf("expected:%v , but get: %v", expectedGtidset, h.gset) + } + + // c.Assert(expectedGtidset.Equal(h.gset), IsTrue) + // c.Logf("parsed gtidset: %v", h.gset) + } + +} + func (s *canalTestSuite) TearDownSuite(c *C) { // To test the heartbeat and read timeout,so need to sleep 1 seconds without data transmission c.Logf("Start testing the heartbeat and read timeout") diff --git a/canal/config.go b/canal/config.go index 103a6cf73..05814fc01 100644 --- a/canal/config.go +++ b/canal/config.go @@ -34,6 +34,9 @@ type DumpConfig struct { // 'FLUSH TABLES WITH READ LOCK' SkipMasterData bool `toml:"skip_master_data"` + // set --set-gtid-purged none, auto; none for gtid is disabled or "version too low", auto for gtid_mode=on; + GtidPurged string `toml:"set_gtid_purged"` + // Set to change the default max_allowed_packet size MaxAllowedPacketMB int `toml:"max_allowed_packet_mb"` @@ -113,6 +116,8 @@ func NewDefaultConfig() *Config { c.Dump.ExecutionPath = "mysqldump" c.Dump.DiscardErr = true c.Dump.SkipMasterData = false + // add default value to disable mysqldump --set-gtid-purged + c.Dump.GtidPurged = "none" return c } diff --git a/canal/dump.go b/canal/dump.go index e4f712d66..96931c466 100644 --- a/canal/dump.go +++ b/canal/dump.go @@ -27,6 +27,15 @@ func (h *dumpParseHandler) BinLog(name string, pos uint64) error { return nil } +func (h *dumpParseHandler) UpdateGtidFromPurged(gtidsets string) (err error) { + if h.gset != nil { + err = h.gset.Update(gtidsets) + } else { + h.gset, err = mysql.ParseGTIDSet("mysql", gtidsets) + } + return err +} + func (h *dumpParseHandler) Data(db string, table string, values []string) error { if err := h.c.ctx.Err(); err != nil { return err @@ -158,6 +167,7 @@ func (c *Canal) dump() error { pos := mysql.Position{Name: h.name, Pos: uint32(h.pos)} c.master.Update(pos) + c.master.UpdateGTIDSet(h.gset) if err := c.eventHandler.OnPosSynced(pos, c.master.GTIDSet(), true); err != nil { return errors.Trace(err) } diff --git a/dump/dump.go b/dump/dump.go index 53c3ebc43..128e4d9f8 100644 --- a/dump/dump.go +++ b/dump/dump.go @@ -37,6 +37,7 @@ type Dumper struct { masterDataSkipped bool maxAllowedPacket int + gtidPurged string hexBlob bool } @@ -87,6 +88,11 @@ func (d *Dumper) SkipMasterData(v bool) { d.masterDataSkipped = v } +// SetGtidPurged: none, auto; none for gtid is disabled or "version too low", auto for gtid_mode=on; +func (d *Dumper) SetGtidPurged(gtid string) { + d.gtidPurged = gtid +} + func (d *Dumper) SetMaxAllowedPacket(i int) { d.maxAllowedPacket = i } @@ -143,6 +149,10 @@ func (d *Dumper) Dump(w io.Writer) error { args = append(args, "--master-data") } + if d.gtidPurged == "auto" { + args = append(args, fmt.Sprintf("--set-gtid-purged=%s", d.gtidPurged)) + } + if d.maxAllowedPacket > 0 { // mysqldump param should be --max-allowed-packet=%dM not be --max_allowed_packet=%dM args = append(args, fmt.Sprintf("--max-allowed-packet=%dM", d.maxAllowedPacket)) @@ -215,6 +225,7 @@ func (d *Dumper) DumpAndParse(h ParseHandler) error { done := make(chan error, 1) go func() { + // TODO: set_gtid_purged indicate if parse SET @@GLOBAL.GTID_PURGED='c0977f88-3104-11e9-81e1-00505690245b:1-274559' OR NOT; err := Parse(r, h, !d.masterDataSkipped) r.CloseWithError(err) done <- err diff --git a/dump/dump_test.go b/dump/dump_test.go index eed4c7507..7594b9054 100644 --- a/dump/dump_test.go +++ b/dump/dump_test.go @@ -36,6 +36,7 @@ func (s *schemaTestSuite) SetUpSuite(c *C) { c.Assert(err, IsNil) s.d, err = NewDumper(*execution, fmt.Sprintf("%s:%d", *host, *port), "root", "") + s.d.gtidPurged = "none" c.Assert(err, IsNil) c.Assert(s.d, NotNil) @@ -119,6 +120,10 @@ func (h *testParseHandler) BinLog(name string, pos uint64) error { return nil } +func (h *testParseHandler) UpdateGtidFromPurged(gtidsets string) (err error) { + return nil +} + func (h *testParseHandler) Data(schema string, table string, values []string) error { return nil } diff --git a/dump/parser.go b/dump/parser.go index f0222a9eb..828433408 100644 --- a/dump/parser.go +++ b/dump/parser.go @@ -19,15 +19,24 @@ var ( type ParseHandler interface { // Parse CHANGE MASTER TO MASTER_LOG_FILE=name, MASTER_LOG_POS=pos; BinLog(name string, pos uint64) error - + UpdateGtidFromPurged(gtidsets string) error Data(schema string, table string, values []string) error } +var oneGtidExp *regexp.Regexp +var mutilGtidStartExp *regexp.Regexp +var midUuidSet *regexp.Regexp +var endUuidSet *regexp.Regexp var binlogExp *regexp.Regexp var useExp *regexp.Regexp var valuesExp *regexp.Regexp func init() { + oneGtidExp = regexp.MustCompile("SET @@GLOBAL.GTID_PURGED='(.+)'") + mutilGtidStartExp = regexp.MustCompile("SET @@GLOBAL.GTID_PURGED='(.+),") + midUuidSet = regexp.MustCompile("(^\\w{8}(-\\w{4}){3}-\\w{12}:\\d+-\\d+),") + endUuidSet = regexp.MustCompile("(^\\w{8}(-\\w{4}){3}-\\w{12}:\\d+-\\d+)'") + binlogExp = regexp.MustCompile("^CHANGE MASTER TO MASTER_LOG_FILE='(.+)', MASTER_LOG_POS=(\\d+);") useExp = regexp.MustCompile("^USE `(.+)`;") valuesExp = regexp.MustCompile("^INSERT INTO `(.+?)` VALUES \\((.+)\\);$") @@ -40,6 +49,8 @@ func Parse(r io.Reader, h ParseHandler, parseBinlogPos bool) error { var db string var binlogParsed bool + var gtidDoneParsed bool + var mutilGtidParsed bool for { line, err := rb.ReadString('\n') @@ -54,6 +65,43 @@ func Parse(r io.Reader, h ParseHandler, parseBinlogPos bool) error { return c == '\r' || c == '\n' }) + // parsed gtid set from mysqldump,refer to canal_test.go TestDumperHandler function + if parseBinlogPos && !gtidDoneParsed && !binlogParsed { + if m := oneGtidExp.FindAllStringSubmatch(line, -1); len(m) == 1 { + gset := m[0][1] + if err := h.UpdateGtidFromPurged(gset); err != nil { + return errors.Trace(err) + } + gtidDoneParsed = true + } + if m := mutilGtidStartExp.FindAllStringSubmatch(line, -1); len(m) == 1 { + gset := m[0][1] + if err := h.UpdateGtidFromPurged(gset); err != nil { + return errors.Trace(err) + } + mutilGtidParsed = true + } + + if mutilGtidParsed && !gtidDoneParsed { + if m := midUuidSet.FindAllStringSubmatch(line, -1); len(m) == 1 { + gset := m[0][1] + if err := h.UpdateGtidFromPurged(gset); err != nil { + return errors.Trace(err) + } + + } + + if m := endUuidSet.FindAllStringSubmatch(line, -1); len(m) == 1 { + gset := m[0][1] + if err := h.UpdateGtidFromPurged(gset); err != nil { + return errors.Trace(err) + } + gtidDoneParsed = true + } + + } + } + if parseBinlogPos && !binlogParsed { if m := binlogExp.FindAllStringSubmatch(line, -1); len(m) == 1 { name := m[0][1] @@ -67,6 +115,7 @@ func Parse(r io.Reader, h ParseHandler, parseBinlogPos bool) error { } binlogParsed = true + gtidDoneParsed = true } } From f00e6d017b582143361dfb41385c1787417141d5 Mon Sep 17 00:00:00 2001 From: jianhaiqing Date: Wed, 6 Nov 2019 22:34:33 +0800 Subject: [PATCH 02/15] fix set-gtid-purged error tracing output. --- canal/canal.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/canal/canal.go b/canal/canal.go index a287d6b98..f0255ed93 100644 --- a/canal/canal.go +++ b/canal/canal.go @@ -134,7 +134,7 @@ func (c *Canal) validateSetGtidPurged() error { return nil } - return errors.Errorf("set_gtid_purged: on OR auto can be set, current is %s", gtidPuged) + return errors.Errorf("set-gtid-purged: none or auto can be set, current is %s", gtidPuged) } func (c *Canal) prepareDumper() error { From 883f5616b159a7741512823e755e5f52924d8507 Mon Sep 17 00:00:00 2001 From: jianhaiqing Date: Fri, 8 Nov 2019 00:01:13 +0800 Subject: [PATCH 03/15] extract GetGtidMode --- canal/canal.go | 21 +++++++++++++++------ canal/canal_test.go | 3 --- dump/parser.go | 2 +- 3 files changed, 16 insertions(+), 10 deletions(-) diff --git a/canal/canal.go b/canal/canal.go index f0255ed93..4e50a5c34 100644 --- a/canal/canal.go +++ b/canal/canal.go @@ -125,16 +125,25 @@ func (c *Canal) validateSetGtidPurged() error { if gtidPuged == "none" { return nil } else if gtidPuged == "auto" { - res, err := c.Execute(`SHOW GLOBAL VARIABLES LIKE "gtid_mode";`) - if err != nil { - return errors.Trace(err) - } else if f, _ := res.GetString(0, 1); f != "ON" { - return errors.Errorf("set-gtid-purged: %s, gtid_mode should be ON, but now is %s", c.cfg.Dump.GtidPurged, f) + isOn, err := c.GetGtidMode() + if !isOn { + return err } return nil } - return errors.Errorf("set-gtid-purged: none or auto can be set, current is %s", gtidPuged) + return errors.Errorf("set-gtid-purged: none or auto can be set, current is %s", gtidPuged) +} + +// if MySQL gtid_mode is on, return true, otherwise false +func (c *Canal) GetGtidMode() (bool, error) { + res, err := c.Execute(`SHOW GLOBAL VARIABLES LIKE "gtid_mode";`) + if err != nil { + return false, errors.Trace(err) + } else if f, _ := res.GetString(0, 1); strings.ToLower(f) != "on" { + return false, errors.Errorf("set-gtid-purged: %s,gtid_mode should be on, but now is %s", c.cfg.Dump.GtidPurged, f) + } + return true, nil } func (c *Canal) prepareDumper() error { diff --git a/canal/canal_test.go b/canal/canal_test.go index a93fe0e0a..372a7b3b3 100755 --- a/canal/canal_test.go +++ b/canal/canal_test.go @@ -195,9 +195,6 @@ e7574090-b123-11e8-8bb4-005056a29643:1-12'`, "071a84e8-b253-11e8-8472-005056a27e if !expectedGtidset.Equal(h.gset) { log.Fatalf("expected:%v , but get: %v", expectedGtidset, h.gset) } - - // c.Assert(expectedGtidset.Equal(h.gset), IsTrue) - // c.Logf("parsed gtidset: %v", h.gset) } } diff --git a/dump/parser.go b/dump/parser.go index 828433408..10b2d8201 100644 --- a/dump/parser.go +++ b/dump/parser.go @@ -65,7 +65,7 @@ func Parse(r io.Reader, h ParseHandler, parseBinlogPos bool) error { return c == '\r' || c == '\n' }) - // parsed gtid set from mysqldump,refer to canal_test.go TestDumperHandler function + // parsed gtid set from mysqldump, refer to canal_test.go TestDumperHandler function if parseBinlogPos && !gtidDoneParsed && !binlogParsed { if m := oneGtidExp.FindAllStringSubmatch(line, -1); len(m) == 1 { gset := m[0][1] From 48303e55e6860efabaaa25eaf474ecbed0225a59 Mon Sep 17 00:00:00 2001 From: jianhaiqing Date: Mon, 2 Dec 2019 00:11:31 +0800 Subject: [PATCH 04/15] refactor gtid parsing for mysqldump output. --- canal/canal.go | 10 ++-- canal/canal_test.go | 122 -------------------------------------------- canal/config.go | 5 +- canal/dump.go | 2 +- dump/dump_test.go | 108 ++++++++++++++++++++++++++++++++++++++- dump/parser.go | 77 ++++++++++++++++------------ 6 files changed, 161 insertions(+), 163 deletions(-) diff --git a/canal/canal.go b/canal/canal.go index 4e50a5c34..7167789a6 100644 --- a/canal/canal.go +++ b/canal/canal.go @@ -120,12 +120,15 @@ func NewCanal(cfg *Config) (*Canal, error) { return c, nil } +// validateSetGtidPurged: check config parameter set-gtid-purged is supported by MySQL at the moment +// an error is supposed to return,if set-gtid-purged:auto is set, but MySQL's gtid_mode is off, or GTID is not supported by MySQL server, +// otherwise, nil is returned. func (c *Canal) validateSetGtidPurged() error { gtidPuged := strings.ToLower(c.cfg.Dump.GtidPurged) if gtidPuged == "none" { return nil } else if gtidPuged == "auto" { - isOn, err := c.GetGtidMode() + isOn, err := c.IsGtidModeEnabled() if !isOn { return err } @@ -135,12 +138,13 @@ func (c *Canal) validateSetGtidPurged() error { return errors.Errorf("set-gtid-purged: none or auto can be set, current is %s", gtidPuged) } -// if MySQL gtid_mode is on, return true, otherwise false -func (c *Canal) GetGtidMode() (bool, error) { +// IsGtidModeEnabled: return true when gtid_mode of MySQL set to on, otherwise return false +func (c *Canal) IsGtidModeEnabled() (bool, error) { res, err := c.Execute(`SHOW GLOBAL VARIABLES LIKE "gtid_mode";`) if err != nil { return false, errors.Trace(err) } else if f, _ := res.GetString(0, 1); strings.ToLower(f) != "on" { + // if MySQL's gtid_mode is set to off or gtid_mode is not supported return false, errors.Errorf("set-gtid-purged: %s,gtid_mode should be on, but now is %s", c.cfg.Dump.GtidPurged, f) } return true, nil diff --git a/canal/canal_test.go b/canal/canal_test.go index 372a7b3b3..39851f26d 100755 --- a/canal/canal_test.go +++ b/canal/canal_test.go @@ -1,13 +1,8 @@ package canal import ( - "bufio" "flag" "fmt" - "io" - "regexp" - "strconv" - "strings" "testing" "time" @@ -82,123 +77,6 @@ func (s *canalTestSuite) SetUpSuite(c *C) { }() } -func TestCanalHandler(t *testing.T) { - oneGtidExp := regexp.MustCompile("SET @@GLOBAL.GTID_PURGED='(.+)'") - mutilGtidStartExp := regexp.MustCompile("SET @@GLOBAL.GTID_PURGED='(.+),") - midUuidSet := regexp.MustCompile("(^\\w{8}(-\\w{4}){3}-\\w{12}:\\d+-\\d+),") - endUuidSet := regexp.MustCompile("(^\\w{8}(-\\w{4}){3}-\\w{12}:\\d+-\\d+)'") - binlogExp := regexp.MustCompile("^CHANGE MASTER TO MASTER_LOG_FILE='(.+)', MASTER_LOG_POS=(\\d+);") - tbls := []struct { - input string - expected string - }{ - {`SET @@GLOBAL.GTID_PURGED='071a84e8-b253-11e8-8472-005056a27e86:1-76, -2337be48-0456-11e9-bd1c-00505690543b:1-7, -41d816cd-0455-11e9-be42-005056901a22:1-2, -5f1eea9e-b1e5-11e8-bc77-005056a221ed:1-144609156, -75848cdb-8131-11e7-b6fc-1c1b0de85e7b:1-151378598, -780ad602-0456-11e9-8bcd-005056901a22:1-516653148, -92809ddd-1e3c-11e9-9d04-00505690f6ab:1-11858565, -c59598c7-0467-11e9-bbbe-005056901a22:1-226464969, -cbd7809d-0433-11e9-b1cf-00505690543b:1-18233950, -cca778e9-8cdf-11e8-94d0-005056a247b1:1-303899574, -cf80679b-7695-11e8-8873-1c1b0d9a4ab9:1-12836047, -d0951f24-1e21-11e9-bb2e-00505690b730:1-4758092, -e7574090-b123-11e8-8bb4-005056a29643:1-12'`, "071a84e8-b253-11e8-8472-005056a27e86:1-76,2337be48-0456-11e9-bd1c-00505690543b:1-7,41d816cd-0455-11e9-be42-005056901a22:1-2,5f1eea9e-b1e5-11e8-bc77-005056a221ed:1-144609156,75848cdb-8131-11e7-b6fc-1c1b0de85e7b:1-151378598,780ad602-0456-11e9-8bcd-005056901a22:1-516653148,92809ddd-1e3c-11e9-9d04-00505690f6ab:1-11858565,c59598c7-0467-11e9-bbbe-005056901a22:1-226464969,cbd7809d-0433-11e9-b1cf-00505690543b:1-18233950,cca778e9-8cdf-11e8-94d0-005056a247b1:1-303899574,cf80679b-7695-11e8-8873-1c1b0d9a4ab9:1-12836047,d0951f24-1e21-11e9-bb2e-00505690b730:1-4758092,e7574090-b123-11e8-8bb4-005056a29643:1-12"}, - {`SET @@GLOBAL.GTID_PURGED='071a84e8-b253-11e8-8472-005056a27e86:1-76, -2337be48-0456-11e9-bd1c-00505690543b:1-7';`, "071a84e8-b253-11e8-8472-005056a27e86:1-76,2337be48-0456-11e9-bd1c-00505690543b:1-7"}, - {`SET @@GLOBAL.GTID_PURGED='c0977f88-3104-11e9-81e1-00505690245b:1-274559';`, "c0977f88-3104-11e9-81e1-00505690245b:1-274559"}, - {`CHANGE MASTER TO MASTER_LOG_FILE='mysql-bin.008995', MASTER_LOG_POS=102052485;`, ""}, - } - - for _, tt := range tbls { - h := dumpParseHandler{} - reader := strings.NewReader(tt.input) - newReader := bufio.NewReader(reader) - var binlogParsed bool - var gtidDoneParsed bool - var mutilGtidParsed bool - parseBinlogPos := true - for { - bytes, _, err := newReader.ReadLine() - line := string(bytes) - if err != io.EOF { - fmt.Println(string(line)) - } else { - break - } - if parseBinlogPos && !gtidDoneParsed && !binlogParsed { - if m := oneGtidExp.FindAllStringSubmatch(line, -1); len(m) == 1 { - gset := m[0][1] - if err := h.UpdateGtidFromPurged(gset); err != nil { - errors.Trace(err) - } - gtidDoneParsed = true - } - if m := mutilGtidStartExp.FindAllStringSubmatch(line, -1); len(m) == 1 { - gset := m[0][1] - if err := h.UpdateGtidFromPurged(gset); err != nil { - errors.Trace(err) - } - mutilGtidParsed = true - } - - if mutilGtidParsed && !gtidDoneParsed { - if m := midUuidSet.FindAllStringSubmatch(line, -1); len(m) == 1 { - gset := m[0][1] - if err := h.UpdateGtidFromPurged(gset); err != nil { - errors.Trace(err) - } - - } - - if m := endUuidSet.FindAllStringSubmatch(line, -1); len(m) == 1 { - gset := m[0][1] - if err := h.UpdateGtidFromPurged(gset); err != nil { - errors.Trace(err) - } - gtidDoneParsed = true - } - - } - } - - if parseBinlogPos && !binlogParsed { - if m := binlogExp.FindAllStringSubmatch(line, -1); len(m) == 1 { - name := m[0][1] - pos, err := strconv.ParseUint(m[0][2], 10, 64) - if err != nil { - errors.Errorf("parse binlog %v err, invalid number", line) - } - - if err = h.BinLog(name, pos); err != nil && err != ErrSkip { - errors.Trace(err) - } - - binlogParsed = true - gtidDoneParsed = true - } - } - - } - - if tt.expected == "" { - if h.gset != nil { - log.Fatalf("expected nil, but get %v", h.gset) - } - continue - } - expectedGtidset, err := mysql.ParseGTIDSet("mysql", tt.expected) - if err != nil { - log.Fatalf("Gtid:%s failed parsed, err: %v", tt.expected, err) - } - if !expectedGtidset.Equal(h.gset) { - log.Fatalf("expected:%v , but get: %v", expectedGtidset, h.gset) - } - } - -} - func (s *canalTestSuite) TearDownSuite(c *C) { // To test the heartbeat and read timeout,so need to sleep 1 seconds without data transmission c.Logf("Start testing the heartbeat and read timeout") diff --git a/canal/config.go b/canal/config.go index 05814fc01..5c53fbb2f 100644 --- a/canal/config.go +++ b/canal/config.go @@ -34,7 +34,10 @@ type DumpConfig struct { // 'FLUSH TABLES WITH READ LOCK' SkipMasterData bool `toml:"skip_master_data"` - // set --set-gtid-purged none, auto; none for gtid is disabled or "version too low", auto for gtid_mode=on; + // Only MySQL's GTID is supported, cause mysqldump's parameter is a little different between MySQL and MariaDB + // mysqldump of MySQL uses --set-gtid-purged, MariaDB uses --gtid instead. + // GtidPurged:auto when MySQL supports GTID,and GtidPurged:none for gtid is disabled,or gtid is not supported, + // or you strongly wants file-position replication. GtidPurged string `toml:"set_gtid_purged"` // Set to change the default max_allowed_packet size diff --git a/canal/dump.go b/canal/dump.go index 96931c466..ddb57a427 100644 --- a/canal/dump.go +++ b/canal/dump.go @@ -27,7 +27,7 @@ func (h *dumpParseHandler) BinLog(name string, pos uint64) error { return nil } -func (h *dumpParseHandler) UpdateGtidFromPurged(gtidsets string) (err error) { +func (h *dumpParseHandler) GtidSet(gtidsets string) (err error) { if h.gset != nil { err = h.gset.Update(gtidsets) } else { diff --git a/dump/dump_test.go b/dump/dump_test.go index 7594b9054..10c3eae7a 100644 --- a/dump/dump_test.go +++ b/dump/dump_test.go @@ -1,14 +1,22 @@ package dump import ( + "bufio" "bytes" "flag" "fmt" + "io" "io/ioutil" "os" + "regexp" + "strconv" "strings" "testing" + "github.com/pingcap/errors" + "github.com/siddontang/go-log/log" + "github.com/siddontang/go-mysql/mysql" + . "github.com/pingcap/check" "github.com/siddontang/go-mysql/client" ) @@ -114,20 +122,116 @@ func (s *schemaTestSuite) TestDump(c *C) { } type testParseHandler struct { + gset mysql.GTIDSet } func (h *testParseHandler) BinLog(name string, pos uint64) error { return nil } -func (h *testParseHandler) UpdateGtidFromPurged(gtidsets string) (err error) { - return nil +func (h *testParseHandler) GtidSet(gtidsets string) (err error) { + if h.gset != nil { + err = h.gset.Update(gtidsets) + } else { + h.gset, err = mysql.ParseGTIDSet("mysql", gtidsets) + } + return err } func (h *testParseHandler) Data(schema string, table string, values []string) error { return nil } +func TestParseGtidStrFromMysqlDump(t *testing.T) { + binlogExp := regexp.MustCompile("^CHANGE MASTER TO MASTER_LOG_FILE='(.+)', MASTER_LOG_POS=(\\d+);") + tbls := []struct { + input string + expected string + }{ + {`SET @@GLOBAL.GTID_PURGED='071a84e8-b253-11e8-8472-005056a27e86:1-76, +2337be48-0456-11e9-bd1c-00505690543b:1-7, +41d816cd-0455-11e9-be42-005056901a22:1-2, +5f1eea9e-b1e5-11e8-bc77-005056a221ed:1-144609156, +75848cdb-8131-11e7-b6fc-1c1b0de85e7b:1-151378598, +780ad602-0456-11e9-8bcd-005056901a22:1-516653148, +92809ddd-1e3c-11e9-9d04-00505690f6ab:1-11858565, +c59598c7-0467-11e9-bbbe-005056901a22:1-226464969, +cbd7809d-0433-11e9-b1cf-00505690543b:1-18233950, +cca778e9-8cdf-11e8-94d0-005056a247b1:1-303899574, +cf80679b-7695-11e8-8873-1c1b0d9a4ab9:1-12836047, +d0951f24-1e21-11e9-bb2e-00505690b730:1-4758092, +e7574090-b123-11e8-8bb4-005056a29643:1-12'`, "071a84e8-b253-11e8-8472-005056a27e86:1-76,2337be48-0456-11e9-bd1c-00505690543b:1-7,41d816cd-0455-11e9-be42-005056901a22:1-2,5f1eea9e-b1e5-11e8-bc77-005056a221ed:1-144609156,75848cdb-8131-11e7-b6fc-1c1b0de85e7b:1-151378598,780ad602-0456-11e9-8bcd-005056901a22:1-516653148,92809ddd-1e3c-11e9-9d04-00505690f6ab:1-11858565,c59598c7-0467-11e9-bbbe-005056901a22:1-226464969,cbd7809d-0433-11e9-b1cf-00505690543b:1-18233950,cca778e9-8cdf-11e8-94d0-005056a247b1:1-303899574,cf80679b-7695-11e8-8873-1c1b0d9a4ab9:1-12836047,d0951f24-1e21-11e9-bb2e-00505690b730:1-4758092,e7574090-b123-11e8-8bb4-005056a29643:1-12"}, + {`SET @@GLOBAL.GTID_PURGED='071a84e8-b253-11e8-8472-005056a27e86:1-76, +2337be48-0456-11e9-bd1c-00505690543b:1-7';`, "071a84e8-b253-11e8-8472-005056a27e86:1-76,2337be48-0456-11e9-bd1c-00505690543b:1-7"}, + {`SET @@GLOBAL.GTID_PURGED='c0977f88-3104-11e9-81e1-00505690245b:1-274559';`, "c0977f88-3104-11e9-81e1-00505690245b:1-274559"}, + {`CHANGE MASTER TO MASTER_LOG_FILE='mysql-bin.008995', MASTER_LOG_POS=102052485;`, ""}, + } + + for _, tt := range tbls { + h := testParseHandler{nil} + reader := strings.NewReader(tt.input) + newReader := bufio.NewReader(reader) + var binlogParsed bool + var gtidDoneParsed bool + var mutilGtidParsed bool + parseBinlogPos := true + for { + bytes, _, err := newReader.ReadLine() + line := string(bytes) + if err != io.EOF { + fmt.Println(string(line)) + } else { + break + } + + // begin parsed gtid + if parseBinlogPos && !gtidDoneParsed && !binlogParsed { + gtidStr, IsMultiSetReturned, IsDoneOfGtidParsed := ParseGtidStrFromMysqlDump(line, mutilGtidParsed) + if err := h.GtidSet(gtidStr); err != nil { + mutilGtidParsed = IsMultiSetReturned + gtidDoneParsed = IsDoneOfGtidParsed + if err != nil { + errors.Errorf("ParseGtidSetFromMysqlDump err: %v", err) + } + } + + if parseBinlogPos && !binlogParsed { + if m := binlogExp.FindAllStringSubmatch(line, -1); len(m) == 1 { + name := m[0][1] + pos, err := strconv.ParseUint(m[0][2], 10, 64) + if err != nil { + errors.Errorf("parse binlog %v err, invalid number", line) + } + + if err = h.BinLog(name, pos); err != nil && err != ErrSkip { + errors.Trace(err) + } + + binlogParsed = true + gtidDoneParsed = true + } + } + + } + } + + if tt.expected == "" { + if h.gset != nil && h.gset.String() != "" { + log.Fatalf("expected nil, but get %v", h.gset) + } + continue + } + expectedGtidset, err := mysql.ParseGTIDSet("mysql", tt.expected) + if err != nil { + log.Fatalf("Gtid:%s failed parsed, err: %v", tt.expected, err) + } + if !expectedGtidset.Equal(h.gset) { + log.Fatalf("expected:%v , but get: %v", expectedGtidset, h.gset) + } + } + +} + func (s *parserTestSuite) TestParseFindTable(c *C) { tbl := []struct { sql string diff --git a/dump/parser.go b/dump/parser.go index 10b2d8201..eca31abef 100644 --- a/dump/parser.go +++ b/dump/parser.go @@ -19,7 +19,7 @@ var ( type ParseHandler interface { // Parse CHANGE MASTER TO MASTER_LOG_FILE=name, MASTER_LOG_POS=pos; BinLog(name string, pos uint64) error - UpdateGtidFromPurged(gtidsets string) error + GtidSet(gtidsets string) error Data(schema string, table string, values []string) error } @@ -32,7 +32,12 @@ var useExp *regexp.Regexp var valuesExp *regexp.Regexp func init() { + //SET @@GLOBAL.GTID_PURGED='1638041a-0457-11e9-bb9f-00505690b730:1-429405150'; oneGtidExp = regexp.MustCompile("SET @@GLOBAL.GTID_PURGED='(.+)'") + + //SET @@GLOBAL.GTID_PURGED='071a84e8-b253-11e8-8472-005056a27e86:1-76, // start of multi-gtid set + //2337be48-0456-11e9-bd1c-00505690543b:1-7, // middle gtid set + //41d816cd-0455-11e9-be42-005056901a22:1-2'; // end gtid set mutilGtidStartExp = regexp.MustCompile("SET @@GLOBAL.GTID_PURGED='(.+),") midUuidSet = regexp.MustCompile("(^\\w{8}(-\\w{4}){3}-\\w{12}:\\d+-\\d+),") endUuidSet = regexp.MustCompile("(^\\w{8}(-\\w{4}){3}-\\w{12}:\\d+-\\d+)'") @@ -42,6 +47,36 @@ func init() { valuesExp = regexp.MustCompile("^INSERT INTO `(.+?)` VALUES \\((.+)\\);$") } +// ParseGtidSetFromMysqlDump: Parsing Gtid from mysqldump output, +// IsMultiGtidSet default false at the very beginning +func ParseGtidStrFromMysqlDump(line string, IsMultiGtidSet bool) (gtidstr string, IsMultiSetReturned, IsDoneOfGtidParsed bool) { + if m := oneGtidExp.FindAllStringSubmatch(line, -1); len(m) == 1 { + gtidstr = m[0][1] + IsMultiSetReturned = false + IsDoneOfGtidParsed = true + } + if m := mutilGtidStartExp.FindAllStringSubmatch(line, -1); len(m) == 1 { + gtidstr = m[0][1] + IsMultiSetReturned = true + IsDoneOfGtidParsed = false + } + + if IsMultiGtidSet { + if m := midUuidSet.FindAllStringSubmatch(line, -1); len(m) == 1 { + gtidstr = m[0][1] + IsMultiSetReturned = true + IsDoneOfGtidParsed = false + } + if m := endUuidSet.FindAllStringSubmatch(line, -1); len(m) == 1 { + gtidstr = m[0][1] + IsMultiSetReturned = true + IsDoneOfGtidParsed = true + } + + } + return gtidstr, IsMultiSetReturned, IsDoneOfGtidParsed +} + // Parse the dump data with Dumper generate. // It can not parse all the data formats with mysqldump outputs func Parse(r io.Reader, h ParseHandler, parseBinlogPos bool) error { @@ -51,7 +86,6 @@ func Parse(r io.Reader, h ParseHandler, parseBinlogPos bool) error { var binlogParsed bool var gtidDoneParsed bool var mutilGtidParsed bool - for { line, err := rb.ReadString('\n') if err != nil && err != io.EOF { @@ -66,40 +100,15 @@ func Parse(r io.Reader, h ParseHandler, parseBinlogPos bool) error { }) // parsed gtid set from mysqldump, refer to canal_test.go TestDumperHandler function - if parseBinlogPos && !gtidDoneParsed && !binlogParsed { - if m := oneGtidExp.FindAllStringSubmatch(line, -1); len(m) == 1 { - gset := m[0][1] - if err := h.UpdateGtidFromPurged(gset); err != nil { - return errors.Trace(err) - } - gtidDoneParsed = true - } - if m := mutilGtidStartExp.FindAllStringSubmatch(line, -1); len(m) == 1 { - gset := m[0][1] - if err := h.UpdateGtidFromPurged(gset); err != nil { - return errors.Trace(err) - } - mutilGtidParsed = true - } - - if mutilGtidParsed && !gtidDoneParsed { - if m := midUuidSet.FindAllStringSubmatch(line, -1); len(m) == 1 { - gset := m[0][1] - if err := h.UpdateGtidFromPurged(gset); err != nil { - return errors.Trace(err) - } - - } - - if m := endUuidSet.FindAllStringSubmatch(line, -1); len(m) == 1 { - gset := m[0][1] - if err := h.UpdateGtidFromPurged(gset); err != nil { - return errors.Trace(err) - } - gtidDoneParsed = true - } + // begin parsed gtid + if parseBinlogPos && !gtidDoneParsed && !binlogParsed { + gtidStr, IsMultiSetReturned, IsDoneOfGtidParsed := ParseGtidStrFromMysqlDump(line, mutilGtidParsed) + if err := h.GtidSet(gtidStr); err != nil { + return errors.Trace(err) } + mutilGtidParsed = IsMultiSetReturned + gtidDoneParsed = IsDoneOfGtidParsed } if parseBinlogPos && !binlogParsed { From e82c075d88c05ef1f4a66cf900921744728a0c92 Mon Sep 17 00:00:00 2001 From: jianhaiqing Date: Mon, 2 Dec 2019 09:24:07 +0800 Subject: [PATCH 05/15] remove coupling with dumphandler, and fix test err --- dump/dump_test.go | 48 +++++++++++++++++++++-------------------------- 1 file changed, 21 insertions(+), 27 deletions(-) diff --git a/dump/dump_test.go b/dump/dump_test.go index 10c3eae7a..28913ce06 100644 --- a/dump/dump_test.go +++ b/dump/dump_test.go @@ -9,7 +9,6 @@ import ( "io/ioutil" "os" "regexp" - "strconv" "strings" "testing" @@ -122,7 +121,6 @@ func (s *schemaTestSuite) TestDump(c *C) { } type testParseHandler struct { - gset mysql.GTIDSet } func (h *testParseHandler) BinLog(name string, pos uint64) error { @@ -130,18 +128,26 @@ func (h *testParseHandler) BinLog(name string, pos uint64) error { } func (h *testParseHandler) GtidSet(gtidsets string) (err error) { - if h.gset != nil { - err = h.gset.Update(gtidsets) - } else { - h.gset, err = mysql.ParseGTIDSet("mysql", gtidsets) - } - return err + return nil } func (h *testParseHandler) Data(schema string, table string, values []string) error { return nil } +type GtidParseTest struct { + gset mysql.GTIDSet +} + +func (h *GtidParseTest) UpdateGtidSet(gtidStr string) (err error) { + if h.gset != nil { + err = h.gset.Update(gtidStr) + } else { + h.gset, err = mysql.ParseGTIDSet("mysql", gtidStr) + } + return err +} + func TestParseGtidStrFromMysqlDump(t *testing.T) { binlogExp := regexp.MustCompile("^CHANGE MASTER TO MASTER_LOG_FILE='(.+)', MASTER_LOG_POS=(\\d+);") tbls := []struct { @@ -168,9 +174,9 @@ e7574090-b123-11e8-8bb4-005056a29643:1-12'`, "071a84e8-b253-11e8-8472-005056a27e } for _, tt := range tbls { - h := testParseHandler{nil} reader := strings.NewReader(tt.input) newReader := bufio.NewReader(reader) + var gtidParse = new(GtidParseTest) var binlogParsed bool var gtidDoneParsed bool var mutilGtidParsed bool @@ -178,16 +184,14 @@ e7574090-b123-11e8-8bb4-005056a29643:1-12'`, "071a84e8-b253-11e8-8472-005056a27e for { bytes, _, err := newReader.ReadLine() line := string(bytes) - if err != io.EOF { - fmt.Println(string(line)) - } else { + if err == io.EOF { break } // begin parsed gtid if parseBinlogPos && !gtidDoneParsed && !binlogParsed { gtidStr, IsMultiSetReturned, IsDoneOfGtidParsed := ParseGtidStrFromMysqlDump(line, mutilGtidParsed) - if err := h.GtidSet(gtidStr); err != nil { + if err := gtidParse.UpdateGtidSet(gtidStr); err != nil { mutilGtidParsed = IsMultiSetReturned gtidDoneParsed = IsDoneOfGtidParsed if err != nil { @@ -197,16 +201,6 @@ e7574090-b123-11e8-8bb4-005056a29643:1-12'`, "071a84e8-b253-11e8-8472-005056a27e if parseBinlogPos && !binlogParsed { if m := binlogExp.FindAllStringSubmatch(line, -1); len(m) == 1 { - name := m[0][1] - pos, err := strconv.ParseUint(m[0][2], 10, 64) - if err != nil { - errors.Errorf("parse binlog %v err, invalid number", line) - } - - if err = h.BinLog(name, pos); err != nil && err != ErrSkip { - errors.Trace(err) - } - binlogParsed = true gtidDoneParsed = true } @@ -216,8 +210,8 @@ e7574090-b123-11e8-8bb4-005056a29643:1-12'`, "071a84e8-b253-11e8-8472-005056a27e } if tt.expected == "" { - if h.gset != nil && h.gset.String() != "" { - log.Fatalf("expected nil, but get %v", h.gset) + if gtidParse.gset != nil && gtidParse.gset.String() != "" { + log.Fatalf("expected nil, but get %v", gtidParse.gset) } continue } @@ -225,8 +219,8 @@ e7574090-b123-11e8-8bb4-005056a29643:1-12'`, "071a84e8-b253-11e8-8472-005056a27e if err != nil { log.Fatalf("Gtid:%s failed parsed, err: %v", tt.expected, err) } - if !expectedGtidset.Equal(h.gset) { - log.Fatalf("expected:%v , but get: %v", expectedGtidset, h.gset) + if !expectedGtidset.Equal(gtidParse.gset) { + log.Fatalf("expected:%v , but get: %v", expectedGtidset, gtidParse.gset) } } From f709dd1bd20cde8b9f268d142e0f4d10049fd923 Mon Sep 17 00:00:00 2001 From: jianhaiqing Date: Mon, 2 Dec 2019 11:15:53 +0800 Subject: [PATCH 06/15] fix bug for parsing empty string of gtid, and failed to startWithGtid --- canal/sync.go | 2 +- dump/parser.go | 7 +++++-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/canal/sync.go b/canal/sync.go index 89bc0159d..386725062 100644 --- a/canal/sync.go +++ b/canal/sync.go @@ -16,7 +16,7 @@ import ( func (c *Canal) startSyncer() (*replication.BinlogStreamer, error) { gset := c.master.GTIDSet() - if gset == nil { + if gset == nil || (gset != nil && gset.String() == "") { pos := c.master.Position() s, err := c.syncer.StartSync(pos) if err != nil { diff --git a/dump/parser.go b/dump/parser.go index eca31abef..ac1276fc7 100644 --- a/dump/parser.go +++ b/dump/parser.go @@ -104,8 +104,11 @@ func Parse(r io.Reader, h ParseHandler, parseBinlogPos bool) error { // begin parsed gtid if parseBinlogPos && !gtidDoneParsed && !binlogParsed { gtidStr, IsMultiSetReturned, IsDoneOfGtidParsed := ParseGtidStrFromMysqlDump(line, mutilGtidParsed) - if err := h.GtidSet(gtidStr); err != nil { - return errors.Trace(err) + + if gtidStr != "" { + if err := h.GtidSet(gtidStr); err != nil { + return errors.Trace(err) + } } mutilGtidParsed = IsMultiSetReturned gtidDoneParsed = IsDoneOfGtidParsed From 0c6da888837cf44c5e26f6732a458cebfb1b4e61 Mon Sep 17 00:00:00 2001 From: jianhaiqing Date: Mon, 2 Dec 2019 23:02:45 +0800 Subject: [PATCH 07/15] remove set-gtid-purged config --- canal/canal.go | 37 ------------------------------------- canal/config.go | 8 -------- dump/dump.go | 5 ----- 3 files changed, 50 deletions(-) diff --git a/canal/canal.go b/canal/canal.go index b8a80dde8..f603b01ed 100644 --- a/canal/canal.go +++ b/canal/canal.go @@ -120,36 +120,6 @@ func NewCanal(cfg *Config) (*Canal, error) { return c, nil } -// validateSetGtidPurged: check config parameter set-gtid-purged is supported by MySQL at the moment -// an error is supposed to return,if set-gtid-purged:auto is set, but MySQL's gtid_mode is off, or GTID is not supported by MySQL server, -// otherwise, nil is returned. -func (c *Canal) validateSetGtidPurged() error { - gtidPuged := strings.ToLower(c.cfg.Dump.GtidPurged) - if gtidPuged == "none" { - return nil - } else if gtidPuged == "auto" { - isOn, err := c.IsGtidModeEnabled() - if !isOn { - return err - } - return nil - } - - return errors.Errorf("set-gtid-purged: none or auto can be set, current is %s", gtidPuged) -} - -// IsGtidModeEnabled: return true when gtid_mode of MySQL set to on, otherwise return false -func (c *Canal) IsGtidModeEnabled() (bool, error) { - res, err := c.Execute(`SHOW GLOBAL VARIABLES LIKE "gtid_mode";`) - if err != nil { - return false, errors.Trace(err) - } else if f, _ := res.GetString(0, 1); strings.ToLower(f) != "on" { - // if MySQL's gtid_mode is set to off or gtid_mode is not supported - return false, errors.Errorf("set-gtid-purged: %s,gtid_mode should be on, but now is %s", c.cfg.Dump.GtidPurged, f) - } - return true, nil -} - func (c *Canal) prepareDumper() error { var err error dumpPath := c.cfg.Dump.ExecutionPath @@ -158,12 +128,6 @@ func (c *Canal) prepareDumper() error { return nil } - // validate c.cfg.Dump.GtidPurged) - err = c.validateSetGtidPurged() - if err != nil { - return err - } - if c.dumper, err = dump.NewDumper(dumpPath, c.cfg.Addr, c.cfg.User, c.cfg.Password); err != nil { return errors.Trace(err) @@ -189,7 +153,6 @@ func (c *Canal) prepareDumper() error { c.dumper.SetWhere(c.cfg.Dump.Where) c.dumper.SkipMasterData(c.cfg.Dump.SkipMasterData) - c.dumper.SetGtidPurged(strings.ToLower(c.cfg.Dump.GtidPurged)) c.dumper.SetMaxAllowedPacket(c.cfg.Dump.MaxAllowedPacketMB) c.dumper.SetProtocol(c.cfg.Dump.Protocol) c.dumper.SetExtraOptions(c.cfg.Dump.ExtraOptions) diff --git a/canal/config.go b/canal/config.go index 7ba192bc2..121e09ea5 100644 --- a/canal/config.go +++ b/canal/config.go @@ -34,12 +34,6 @@ type DumpConfig struct { // 'FLUSH TABLES WITH READ LOCK' SkipMasterData bool `toml:"skip_master_data"` - // Only MySQL's GTID is supported, cause mysqldump's parameter is a little different between MySQL and MariaDB - // mysqldump of MySQL uses --set-gtid-purged, MariaDB uses --gtid instead. - // GtidPurged:auto when MySQL supports GTID,and GtidPurged:none for gtid is disabled,or gtid is not supported, - // or you strongly wants file-position replication. - GtidPurged string `toml:"set_gtid_purged"` - // Set to change the default max_allowed_packet size MaxAllowedPacketMB int `toml:"max_allowed_packet_mb"` @@ -122,8 +116,6 @@ func NewDefaultConfig() *Config { c.Dump.ExecutionPath = "mysqldump" c.Dump.DiscardErr = true c.Dump.SkipMasterData = false - // add default value to disable mysqldump --set-gtid-purged - c.Dump.GtidPurged = "none" return c } diff --git a/dump/dump.go b/dump/dump.go index eb3cf03d4..87825fe62 100644 --- a/dump/dump.go +++ b/dump/dump.go @@ -95,11 +95,6 @@ func (d *Dumper) SkipMasterData(v bool) { d.masterDataSkipped = v } -// SetGtidPurged: none, auto; none for gtid is disabled or "version too low", auto for gtid_mode=on; -func (d *Dumper) SetGtidPurged(gtid string) { - d.gtidPurged = gtid -} - func (d *Dumper) SetMaxAllowedPacket(i int) { d.maxAllowedPacket = i } From df4fecdd319998184ee116bfa20b782f3d1bdfe4 Mon Sep 17 00:00:00 2001 From: jianhaiqing Date: Wed, 4 Dec 2019 00:03:36 +0800 Subject: [PATCH 08/15] parse gtid --- dump/dump_test.go | 31 +++++++++++-------------- dump/parser.go | 59 ++++++++++------------------------------------- 2 files changed, 25 insertions(+), 65 deletions(-) diff --git a/dump/dump_test.go b/dump/dump_test.go index 28913ce06..5ae7fa876 100644 --- a/dump/dump_test.go +++ b/dump/dump_test.go @@ -13,6 +13,7 @@ import ( "testing" "github.com/pingcap/errors" + "github.com/siddontang/go-log/log" "github.com/siddontang/go-mysql/mysql" @@ -150,6 +151,7 @@ func (h *GtidParseTest) UpdateGtidSet(gtidStr string) (err error) { func TestParseGtidStrFromMysqlDump(t *testing.T) { binlogExp := regexp.MustCompile("^CHANGE MASTER TO MASTER_LOG_FILE='(.+)', MASTER_LOG_POS=(\\d+);") + gtidExp := regexp.MustCompile("(\\w{8}(-\\w{4}){3}-\\w{12}:\\d+-\\d+)") tbls := []struct { input string expected string @@ -178,8 +180,6 @@ e7574090-b123-11e8-8bb4-005056a29643:1-12'`, "071a84e8-b253-11e8-8472-005056a27e newReader := bufio.NewReader(reader) var gtidParse = new(GtidParseTest) var binlogParsed bool - var gtidDoneParsed bool - var mutilGtidParsed bool parseBinlogPos := true for { bytes, _, err := newReader.ReadLine() @@ -188,25 +188,20 @@ e7574090-b123-11e8-8bb4-005056a29643:1-12'`, "071a84e8-b253-11e8-8472-005056a27e break } - // begin parsed gtid - if parseBinlogPos && !gtidDoneParsed && !binlogParsed { - gtidStr, IsMultiSetReturned, IsDoneOfGtidParsed := ParseGtidStrFromMysqlDump(line, mutilGtidParsed) - if err := gtidParse.UpdateGtidSet(gtidStr); err != nil { - mutilGtidParsed = IsMultiSetReturned - gtidDoneParsed = IsDoneOfGtidParsed - if err != nil { - errors.Errorf("ParseGtidSetFromMysqlDump err: %v", err) + if parseBinlogPos && !binlogParsed { + if m := gtidExp.FindAllStringSubmatch(line, -1); len(m) == 1 { + gtidStr := m[0][1] + if gtidStr != "" { + if err := gtidParse.UpdateGtidSet(gtidStr); err != nil { + errors.Errorf("gtid failed to parsed: %v", err) + } } } - - if parseBinlogPos && !binlogParsed { - if m := binlogExp.FindAllStringSubmatch(line, -1); len(m) == 1 { - binlogParsed = true - gtidDoneParsed = true - } + if m := binlogExp.FindAllStringSubmatch(line, -1); len(m) == 1 { + binlogParsed = true } - } + } if tt.expected == "" { @@ -220,7 +215,7 @@ e7574090-b123-11e8-8bb4-005056a29643:1-12'`, "071a84e8-b253-11e8-8472-005056a27e log.Fatalf("Gtid:%s failed parsed, err: %v", tt.expected, err) } if !expectedGtidset.Equal(gtidParse.gset) { - log.Fatalf("expected:%v , but get: %v", expectedGtidset, gtidParse.gset) + log.Fatalf("expected:%v, but get: %v", expectedGtidset, gtidParse.gset) } } diff --git a/dump/parser.go b/dump/parser.go index ac1276fc7..60919de45 100644 --- a/dump/parser.go +++ b/dump/parser.go @@ -30,6 +30,7 @@ var endUuidSet *regexp.Regexp var binlogExp *regexp.Regexp var useExp *regexp.Regexp var valuesExp *regexp.Regexp +var gtidExp *regexp.Regexp func init() { //SET @@GLOBAL.GTID_PURGED='1638041a-0457-11e9-bb9f-00505690b730:1-429405150'; @@ -45,36 +46,7 @@ func init() { binlogExp = regexp.MustCompile("^CHANGE MASTER TO MASTER_LOG_FILE='(.+)', MASTER_LOG_POS=(\\d+);") useExp = regexp.MustCompile("^USE `(.+)`;") valuesExp = regexp.MustCompile("^INSERT INTO `(.+?)` VALUES \\((.+)\\);$") -} - -// ParseGtidSetFromMysqlDump: Parsing Gtid from mysqldump output, -// IsMultiGtidSet default false at the very beginning -func ParseGtidStrFromMysqlDump(line string, IsMultiGtidSet bool) (gtidstr string, IsMultiSetReturned, IsDoneOfGtidParsed bool) { - if m := oneGtidExp.FindAllStringSubmatch(line, -1); len(m) == 1 { - gtidstr = m[0][1] - IsMultiSetReturned = false - IsDoneOfGtidParsed = true - } - if m := mutilGtidStartExp.FindAllStringSubmatch(line, -1); len(m) == 1 { - gtidstr = m[0][1] - IsMultiSetReturned = true - IsDoneOfGtidParsed = false - } - - if IsMultiGtidSet { - if m := midUuidSet.FindAllStringSubmatch(line, -1); len(m) == 1 { - gtidstr = m[0][1] - IsMultiSetReturned = true - IsDoneOfGtidParsed = false - } - if m := endUuidSet.FindAllStringSubmatch(line, -1); len(m) == 1 { - gtidstr = m[0][1] - IsMultiSetReturned = true - IsDoneOfGtidParsed = true - } - - } - return gtidstr, IsMultiSetReturned, IsDoneOfGtidParsed + gtidExp = regexp.MustCompile("(\\w{8}(-\\w{4}){3}-\\w{12}:\\d+-\\d+)") } // Parse the dump data with Dumper generate. @@ -84,8 +56,7 @@ func Parse(r io.Reader, h ParseHandler, parseBinlogPos bool) error { var db string var binlogParsed bool - var gtidDoneParsed bool - var mutilGtidParsed bool + for { line, err := rb.ReadString('\n') if err != nil && err != io.EOF { @@ -99,22 +70,17 @@ func Parse(r io.Reader, h ParseHandler, parseBinlogPos bool) error { return c == '\r' || c == '\n' }) - // parsed gtid set from mysqldump, refer to canal_test.go TestDumperHandler function - - // begin parsed gtid - if parseBinlogPos && !gtidDoneParsed && !binlogParsed { - gtidStr, IsMultiSetReturned, IsDoneOfGtidParsed := ParseGtidStrFromMysqlDump(line, mutilGtidParsed) - - if gtidStr != "" { - if err := h.GtidSet(gtidStr); err != nil { - return errors.Trace(err) + if parseBinlogPos && !binlogParsed { + // parsed gtid set from mysqldump + // gtid comes before binlog file-positon + if m := gtidExp.FindAllStringSubmatch(line, -1); len(m) == 1 { + gtidStr := m[0][1] + if gtidStr != "" { + if err := h.GtidSet(gtidStr); err != nil { + return errors.Trace(err) + } } } - mutilGtidParsed = IsMultiSetReturned - gtidDoneParsed = IsDoneOfGtidParsed - } - - if parseBinlogPos && !binlogParsed { if m := binlogExp.FindAllStringSubmatch(line, -1); len(m) == 1 { name := m[0][1] pos, err := strconv.ParseUint(m[0][2], 10, 64) @@ -127,7 +93,6 @@ func Parse(r io.Reader, h ParseHandler, parseBinlogPos bool) error { } binlogParsed = true - gtidDoneParsed = true } } From b8c46636cc61f390aa12f12dd7fb8917039cc5a8 Mon Sep 17 00:00:00 2001 From: jianhaiqing Date: Wed, 4 Dec 2019 00:14:35 +0800 Subject: [PATCH 09/15] remove useless comment --- dump/dump.go | 6 ------ 1 file changed, 6 deletions(-) diff --git a/dump/dump.go b/dump/dump.go index 87825fe62..540c36c1a 100644 --- a/dump/dump.go +++ b/dump/dump.go @@ -39,7 +39,6 @@ type Dumper struct { masterDataSkipped bool maxAllowedPacket int - gtidPurged string hexBlob bool } @@ -151,10 +150,6 @@ func (d *Dumper) Dump(w io.Writer) error { args = append(args, "--master-data") } - if d.gtidPurged == "auto" { - args = append(args, fmt.Sprintf("--set-gtid-purged=%s", d.gtidPurged)) - } - if d.maxAllowedPacket > 0 { // mysqldump param should be --max-allowed-packet=%dM not be --max_allowed_packet=%dM args = append(args, fmt.Sprintf("--max-allowed-packet=%dM", d.maxAllowedPacket)) @@ -231,7 +226,6 @@ func (d *Dumper) DumpAndParse(h ParseHandler) error { done := make(chan error, 1) go func() { - // TODO: set_gtid_purged indicate if parse SET @@GLOBAL.GTID_PURGED='c0977f88-3104-11e9-81e1-00505690245b:1-274559' OR NOT; err := Parse(r, h, !d.masterDataSkipped) r.CloseWithError(err) done <- err From e6d71607fd28ef899e5143adf1bb6a36dbcf099c Mon Sep 17 00:00:00 2001 From: jianhaiqing Date: Wed, 4 Dec 2019 00:21:53 +0800 Subject: [PATCH 10/15] remove unrelated comment --- canal/canal_test.go | 5 +---- dump/dump_test.go | 1 - dump/parser.go | 15 +-------------- 3 files changed, 2 insertions(+), 19 deletions(-) diff --git a/canal/canal_test.go b/canal/canal_test.go index 39851f26d..edd83fc02 100755 --- a/canal/canal_test.go +++ b/canal/canal_test.go @@ -14,10 +14,7 @@ import ( "github.com/siddontang/go-mysql/replication" ) -var ( - ErrSkip = errors.New("Handler error, but skipped") - testHost = flag.String("host", "127.0.0.1", "MySQL host") -) +var testHost = flag.String("host", "127.0.0.1", "MySQL host") func Test(t *testing.T) { TestingT(t) diff --git a/dump/dump_test.go b/dump/dump_test.go index 5ae7fa876..6dd6e2a89 100644 --- a/dump/dump_test.go +++ b/dump/dump_test.go @@ -44,7 +44,6 @@ func (s *schemaTestSuite) SetUpSuite(c *C) { c.Assert(err, IsNil) s.d, err = NewDumper(*execution, fmt.Sprintf("%s:%d", *host, *port), "root", "") - s.d.gtidPurged = "none" c.Assert(err, IsNil) c.Assert(s.d, NotNil) diff --git a/dump/parser.go b/dump/parser.go index 60919de45..33876e0ce 100644 --- a/dump/parser.go +++ b/dump/parser.go @@ -23,29 +23,16 @@ type ParseHandler interface { Data(schema string, table string, values []string) error } -var oneGtidExp *regexp.Regexp -var mutilGtidStartExp *regexp.Regexp -var midUuidSet *regexp.Regexp -var endUuidSet *regexp.Regexp var binlogExp *regexp.Regexp var useExp *regexp.Regexp var valuesExp *regexp.Regexp var gtidExp *regexp.Regexp func init() { - //SET @@GLOBAL.GTID_PURGED='1638041a-0457-11e9-bb9f-00505690b730:1-429405150'; - oneGtidExp = regexp.MustCompile("SET @@GLOBAL.GTID_PURGED='(.+)'") - - //SET @@GLOBAL.GTID_PURGED='071a84e8-b253-11e8-8472-005056a27e86:1-76, // start of multi-gtid set - //2337be48-0456-11e9-bd1c-00505690543b:1-7, // middle gtid set - //41d816cd-0455-11e9-be42-005056901a22:1-2'; // end gtid set - mutilGtidStartExp = regexp.MustCompile("SET @@GLOBAL.GTID_PURGED='(.+),") - midUuidSet = regexp.MustCompile("(^\\w{8}(-\\w{4}){3}-\\w{12}:\\d+-\\d+),") - endUuidSet = regexp.MustCompile("(^\\w{8}(-\\w{4}){3}-\\w{12}:\\d+-\\d+)'") - binlogExp = regexp.MustCompile("^CHANGE MASTER TO MASTER_LOG_FILE='(.+)', MASTER_LOG_POS=(\\d+);") useExp = regexp.MustCompile("^USE `(.+)`;") valuesExp = regexp.MustCompile("^INSERT INTO `(.+?)` VALUES \\((.+)\\);$") + //SET @@GLOBAL.GTID_PURGED='1638041a-0457-11e9-bb9f-00505690b730:1-429405150'; gtidExp = regexp.MustCompile("(\\w{8}(-\\w{4}){3}-\\w{12}:\\d+-\\d+)") } From af20eddfaf06192bbc11c28cf02c906d83983009 Mon Sep 17 00:00:00 2001 From: jianhaiqing Date: Thu, 5 Dec 2019 23:25:35 +0800 Subject: [PATCH 11/15] Add test for Parse func, and fix MysqlGTIDSet.Equal --- dump/dump_test.go | 66 +++++++++++++++++---------------------------- mysql/mysql_gtid.go | 7 ++++- 2 files changed, 30 insertions(+), 43 deletions(-) diff --git a/dump/dump_test.go b/dump/dump_test.go index 6dd6e2a89..ecf90ad17 100644 --- a/dump/dump_test.go +++ b/dump/dump_test.go @@ -1,19 +1,14 @@ package dump import ( - "bufio" "bytes" "flag" "fmt" - "io" "io/ioutil" "os" - "regexp" "strings" "testing" - "github.com/pingcap/errors" - "github.com/siddontang/go-log/log" "github.com/siddontang/go-mysql/mysql" @@ -121,6 +116,7 @@ func (s *schemaTestSuite) TestDump(c *C) { } type testParseHandler struct { + gset mysql.GTIDSet } func (h *testParseHandler) BinLog(name string, pos uint64) error { @@ -128,7 +124,12 @@ func (h *testParseHandler) BinLog(name string, pos uint64) error { } func (h *testParseHandler) GtidSet(gtidsets string) (err error) { - return nil + if h.gset != nil { + err = h.gset.Update(gtidsets) + } else { + h.gset, err = mysql.ParseGTIDSet("mysql", gtidsets) + } + return err } func (h *testParseHandler) Data(schema string, table string, values []string) error { @@ -148,9 +149,9 @@ func (h *GtidParseTest) UpdateGtidSet(gtidStr string) (err error) { return err } -func TestParseGtidStrFromMysqlDump(t *testing.T) { - binlogExp := regexp.MustCompile("^CHANGE MASTER TO MASTER_LOG_FILE='(.+)', MASTER_LOG_POS=(\\d+);") - gtidExp := regexp.MustCompile("(\\w{8}(-\\w{4}){3}-\\w{12}:\\d+-\\d+)") +func TestParse4GtidExp(t *testing.T) { + // binlogExp := regexp.MustCompile("^CHANGE MASTER TO MASTER_LOG_FILE='(.+)', MASTER_LOG_POS=(\\d+);") + // gtidExp := regexp.MustCompile("(\\w{8}(-\\w{4}){3}-\\w{12}:\\d+-\\d+)") tbls := []struct { input string expected string @@ -167,54 +168,35 @@ cbd7809d-0433-11e9-b1cf-00505690543b:1-18233950, cca778e9-8cdf-11e8-94d0-005056a247b1:1-303899574, cf80679b-7695-11e8-8873-1c1b0d9a4ab9:1-12836047, d0951f24-1e21-11e9-bb2e-00505690b730:1-4758092, -e7574090-b123-11e8-8bb4-005056a29643:1-12'`, "071a84e8-b253-11e8-8472-005056a27e86:1-76,2337be48-0456-11e9-bd1c-00505690543b:1-7,41d816cd-0455-11e9-be42-005056901a22:1-2,5f1eea9e-b1e5-11e8-bc77-005056a221ed:1-144609156,75848cdb-8131-11e7-b6fc-1c1b0de85e7b:1-151378598,780ad602-0456-11e9-8bcd-005056901a22:1-516653148,92809ddd-1e3c-11e9-9d04-00505690f6ab:1-11858565,c59598c7-0467-11e9-bbbe-005056901a22:1-226464969,cbd7809d-0433-11e9-b1cf-00505690543b:1-18233950,cca778e9-8cdf-11e8-94d0-005056a247b1:1-303899574,cf80679b-7695-11e8-8873-1c1b0d9a4ab9:1-12836047,d0951f24-1e21-11e9-bb2e-00505690b730:1-4758092,e7574090-b123-11e8-8bb4-005056a29643:1-12"}, +e7574090-b123-11e8-8bb4-005056a29643:1-12' +`, "071a84e8-b253-11e8-8472-005056a27e86:1-76,2337be48-0456-11e9-bd1c-00505690543b:1-7,41d816cd-0455-11e9-be42-005056901a22:1-2,5f1eea9e-b1e5-11e8-bc77-005056a221ed:1-144609156,75848cdb-8131-11e7-b6fc-1c1b0de85e7b:1-151378598,780ad602-0456-11e9-8bcd-005056901a22:1-516653148,92809ddd-1e3c-11e9-9d04-00505690f6ab:1-11858565,c59598c7-0467-11e9-bbbe-005056901a22:1-226464969,cbd7809d-0433-11e9-b1cf-00505690543b:1-18233950,cca778e9-8cdf-11e8-94d0-005056a247b1:1-303899574,cf80679b-7695-11e8-8873-1c1b0d9a4ab9:1-12836047,d0951f24-1e21-11e9-bb2e-00505690b730:1-4758092,e7574090-b123-11e8-8bb4-005056a29643:1-12"}, {`SET @@GLOBAL.GTID_PURGED='071a84e8-b253-11e8-8472-005056a27e86:1-76, -2337be48-0456-11e9-bd1c-00505690543b:1-7';`, "071a84e8-b253-11e8-8472-005056a27e86:1-76,2337be48-0456-11e9-bd1c-00505690543b:1-7"}, - {`SET @@GLOBAL.GTID_PURGED='c0977f88-3104-11e9-81e1-00505690245b:1-274559';`, "c0977f88-3104-11e9-81e1-00505690245b:1-274559"}, +2337be48-0456-11e9-bd1c-00505690543b:1-7'; +`, "071a84e8-b253-11e8-8472-005056a27e86:1-76,2337be48-0456-11e9-bd1c-00505690543b:1-7"}, + {`SET @@GLOBAL.GTID_PURGED='c0977f88-3104-11e9-81e1-00505690245b:1-274559'; +`, "c0977f88-3104-11e9-81e1-00505690245b:1-274559"}, {`CHANGE MASTER TO MASTER_LOG_FILE='mysql-bin.008995', MASTER_LOG_POS=102052485;`, ""}, } for _, tt := range tbls { reader := strings.NewReader(tt.input) - newReader := bufio.NewReader(reader) - var gtidParse = new(GtidParseTest) - var binlogParsed bool - parseBinlogPos := true - for { - bytes, _, err := newReader.ReadLine() - line := string(bytes) - if err == io.EOF { - break - } + var handler = new(testParseHandler) - if parseBinlogPos && !binlogParsed { - if m := gtidExp.FindAllStringSubmatch(line, -1); len(m) == 1 { - gtidStr := m[0][1] - if gtidStr != "" { - if err := gtidParse.UpdateGtidSet(gtidStr); err != nil { - errors.Errorf("gtid failed to parsed: %v", err) - } - } - } - if m := binlogExp.FindAllStringSubmatch(line, -1); len(m) == 1 { - binlogParsed = true - } - } - - } + Parse(reader, handler, true) if tt.expected == "" { - if gtidParse.gset != nil && gtidParse.gset.String() != "" { - log.Fatalf("expected nil, but get %v", gtidParse.gset) + if handler.gset != nil { + log.Fatalf("expected nil, but get %v", handler.gset) + } else { + continue } - continue } expectedGtidset, err := mysql.ParseGTIDSet("mysql", tt.expected) if err != nil { log.Fatalf("Gtid:%s failed parsed, err: %v", tt.expected, err) } - if !expectedGtidset.Equal(gtidParse.gset) { - log.Fatalf("expected:%v, but get: %v", expectedGtidset, gtidParse.gset) + if !expectedGtidset.Equal(handler.gset) { + log.Fatalf("expected:%v, but get: %v", expectedGtidset, handler.gset) } } diff --git a/mysql/mysql_gtid.go b/mysql/mysql_gtid.go index 0c12d0924..8974b6004 100644 --- a/mysql/mysql_gtid.go +++ b/mysql/mysql_gtid.go @@ -9,8 +9,9 @@ import ( "strconv" "strings" + uuid "github.com/satori/go.uuid" + "github.com/pingcap/errors" - "github.com/satori/go.uuid" "github.com/siddontang/go/hack" ) @@ -398,6 +399,10 @@ func (s *MysqlGTIDSet) Equal(o GTIDSet) bool { return false } + if len(sub.Sets) != len(s.Sets) { + return false + } + for key, set := range sub.Sets { o, ok := s.Sets[key] if !ok { From a3a3677ff9a31ac8d3aa6c74a933c1390f570899 Mon Sep 17 00:00:00 2001 From: jianhaiqing Date: Fri, 6 Dec 2019 00:27:47 +0800 Subject: [PATCH 12/15] format the code, and add more comment --- dump/dump_test.go | 5 ++--- dump/parser.go | 1 + 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/dump/dump_test.go b/dump/dump_test.go index ecf90ad17..68096dffa 100644 --- a/dump/dump_test.go +++ b/dump/dump_test.go @@ -9,11 +9,10 @@ import ( "strings" "testing" - "github.com/siddontang/go-log/log" - "github.com/siddontang/go-mysql/mysql" - . "github.com/pingcap/check" + "github.com/siddontang/go-log/log" "github.com/siddontang/go-mysql/client" + "github.com/siddontang/go-mysql/mysql" ) // use docker mysql for test diff --git a/dump/parser.go b/dump/parser.go index 33876e0ce..2b99fb864 100644 --- a/dump/parser.go +++ b/dump/parser.go @@ -32,6 +32,7 @@ func init() { binlogExp = regexp.MustCompile("^CHANGE MASTER TO MASTER_LOG_FILE='(.+)', MASTER_LOG_POS=(\\d+);") useExp = regexp.MustCompile("^USE `(.+)`;") valuesExp = regexp.MustCompile("^INSERT INTO `(.+?)` VALUES \\((.+)\\);$") + // The pattern will only match MySQL GTID, as you know SET GLOBAL gtid_slave_pos='0-1-4' is used for MariaDB. //SET @@GLOBAL.GTID_PURGED='1638041a-0457-11e9-bb9f-00505690b730:1-429405150'; gtidExp = regexp.MustCompile("(\\w{8}(-\\w{4}){3}-\\w{12}:\\d+-\\d+)") } From 4b8fe0e73ac85ba3ca0201c783858d5d1dad9a17 Mon Sep 17 00:00:00 2001 From: jianhaiqing Date: Fri, 6 Dec 2019 08:54:58 +0800 Subject: [PATCH 13/15] format the code --- mysql/mysql_gtid.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/mysql/mysql_gtid.go b/mysql/mysql_gtid.go index 8974b6004..151581f85 100644 --- a/mysql/mysql_gtid.go +++ b/mysql/mysql_gtid.go @@ -9,9 +9,8 @@ import ( "strconv" "strings" - uuid "github.com/satori/go.uuid" - "github.com/pingcap/errors" + uuid "github.com/satori/go.uuid" "github.com/siddontang/go/hack" ) From fa1f5d5971f7e49814bfd790aaa17b2565e10264 Mon Sep 17 00:00:00 2001 From: jianhaiqing Date: Fri, 6 Dec 2019 12:31:05 +0800 Subject: [PATCH 14/15] use Check to run unit test --- dump/dump_test.go | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/dump/dump_test.go b/dump/dump_test.go index 68096dffa..e42e8c1be 100644 --- a/dump/dump_test.go +++ b/dump/dump_test.go @@ -10,7 +10,6 @@ import ( "testing" . "github.com/pingcap/check" - "github.com/siddontang/go-log/log" "github.com/siddontang/go-mysql/client" "github.com/siddontang/go-mysql/mysql" ) @@ -148,7 +147,7 @@ func (h *GtidParseTest) UpdateGtidSet(gtidStr string) (err error) { return err } -func TestParse4GtidExp(t *testing.T) { +func (s *parserTestSuite) TestParseGtidExp(c *C) { // binlogExp := regexp.MustCompile("^CHANGE MASTER TO MASTER_LOG_FILE='(.+)', MASTER_LOG_POS=(\\d+);") // gtidExp := regexp.MustCompile("(\\w{8}(-\\w{4}){3}-\\w{12}:\\d+-\\d+)") tbls := []struct { @@ -185,18 +184,14 @@ e7574090-b123-11e8-8bb4-005056a29643:1-12' if tt.expected == "" { if handler.gset != nil { - log.Fatalf("expected nil, but get %v", handler.gset) + c.Assert(handler.gset, IsNil) } else { continue } } expectedGtidset, err := mysql.ParseGTIDSet("mysql", tt.expected) - if err != nil { - log.Fatalf("Gtid:%s failed parsed, err: %v", tt.expected, err) - } - if !expectedGtidset.Equal(handler.gset) { - log.Fatalf("expected:%v, but get: %v", expectedGtidset, handler.gset) - } + c.Assert(err, IsNil) + c.Assert(expectedGtidset.Equal(handler.gset), IsTrue) } } From 3c663a5fb1877caed777bb95e3a44d533662769f Mon Sep 17 00:00:00 2001 From: jianhaiqing Date: Sat, 7 Dec 2019 23:00:46 +0800 Subject: [PATCH 15/15] remove redundant condition --- canal/sync.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/canal/sync.go b/canal/sync.go index 386725062..0b5a0dc99 100644 --- a/canal/sync.go +++ b/canal/sync.go @@ -16,7 +16,7 @@ import ( func (c *Canal) startSyncer() (*replication.BinlogStreamer, error) { gset := c.master.GTIDSet() - if gset == nil || (gset != nil && gset.String() == "") { + if gset == nil || gset.String() == "" { pos := c.master.Position() s, err := c.syncer.StartSync(pos) if err != nil {