diff --git a/canal/canal.go b/canal/canal.go index 048ddcce6..67c61ddec 100644 --- a/canal/canal.go +++ b/canal/canal.go @@ -258,7 +258,7 @@ func (c *Canal) Close() { c.conn = nil c.connLock.Unlock() - _ = c.eventHandler.OnPosSynced(c.master.Position(), c.master.GTIDSet(), true) + _ = c.eventHandler.OnPosSynced(nil, c.master.Position(), c.master.GTIDSet(), true) } func (c *Canal) WaitDumpDone() <-chan struct{} { diff --git a/canal/canal_test.go b/canal/canal_test.go index 509c419e3..c48f2b058 100644 --- a/canal/canal_test.go +++ b/canal/canal_test.go @@ -123,7 +123,7 @@ func (h *testEventHandler) String() string { return "testEventHandler" } -func (h *testEventHandler) OnPosSynced(p mysql.Position, set mysql.GTIDSet, f bool) error { +func (h *testEventHandler) OnPosSynced(header *replication.EventHeader, p mysql.Position, set mysql.GTIDSet, f bool) error { return nil } diff --git a/canal/dump.go b/canal/dump.go index 67b3c0027..3a6a9e9db 100644 --- a/canal/dump.go +++ b/canal/dump.go @@ -176,7 +176,7 @@ func (c *Canal) dump() error { pos := mysql.Position{Name: h.name, Pos: uint32(h.pos)} c.master.Update(pos) c.master.UpdateGTIDSet(h.gset) - if err := c.eventHandler.OnPosSynced(pos, c.master.GTIDSet(), true); err != nil { + if err := c.eventHandler.OnPosSynced(nil, pos, c.master.GTIDSet(), true); err != nil { return errors.Trace(err) } var startPos fmt.Stringer = pos diff --git a/canal/handler.go b/canal/handler.go index 80a10a956..71eccd74c 100644 --- a/canal/handler.go +++ b/canal/handler.go @@ -6,32 +6,38 @@ import ( ) type EventHandler interface { - OnRotate(rotateEvent *replication.RotateEvent) error + OnRotate(header *replication.EventHeader, rotateEvent *replication.RotateEvent) error // OnTableChanged is called when the table is created, altered, renamed or dropped. // You need to clear the associated data like cache with the table. // It will be called before OnDDL. - OnTableChanged(schema string, table string) error - OnDDL(nextPos mysql.Position, queryEvent *replication.QueryEvent) error + OnTableChanged(header *replication.EventHeader, schema string, table string) error + OnDDL(header *replication.EventHeader, nextPos mysql.Position, queryEvent *replication.QueryEvent) error OnRow(e *RowsEvent) error - OnXID(nextPos mysql.Position) error - OnGTID(gtid mysql.GTIDSet) error + OnXID(header *replication.EventHeader, nextPos mysql.Position) error + OnGTID(header *replication.EventHeader, gtid mysql.GTIDSet) error // OnPosSynced Use your own way to sync position. When force is true, sync position immediately. - OnPosSynced(pos mysql.Position, set mysql.GTIDSet, force bool) error + OnPosSynced(header *replication.EventHeader, pos mysql.Position, set mysql.GTIDSet, force bool) error String() string } type DummyEventHandler struct { } -func (h *DummyEventHandler) OnRotate(*replication.RotateEvent) error { return nil } -func (h *DummyEventHandler) OnTableChanged(schema string, table string) error { return nil } -func (h *DummyEventHandler) OnDDL(nextPos mysql.Position, queryEvent *replication.QueryEvent) error { +func (h *DummyEventHandler) OnRotate(*replication.EventHeader, *replication.RotateEvent) error { + return nil +} +func (h *DummyEventHandler) OnTableChanged(*replication.EventHeader, string, string) error { + return nil +} +func (h *DummyEventHandler) OnDDL(*replication.EventHeader, mysql.Position, *replication.QueryEvent) error { + return nil +} +func (h *DummyEventHandler) OnRow(*RowsEvent) error { return nil } +func (h *DummyEventHandler) OnXID(*replication.EventHeader, mysql.Position) error { return nil } +func (h *DummyEventHandler) OnGTID(*replication.EventHeader, mysql.GTIDSet) error { return nil } +func (h *DummyEventHandler) OnPosSynced(*replication.EventHeader, mysql.Position, mysql.GTIDSet, bool) error { return nil } -func (h *DummyEventHandler) OnRow(*RowsEvent) error { return nil } -func (h *DummyEventHandler) OnXID(mysql.Position) error { return nil } -func (h *DummyEventHandler) OnGTID(mysql.GTIDSet) error { return nil } -func (h *DummyEventHandler) OnPosSynced(mysql.Position, mysql.GTIDSet, bool) error { return nil } func (h *DummyEventHandler) String() string { return "DummyEventHandler" } diff --git a/canal/sync.go b/canal/sync.go index 1c9c8d308..abd835c96 100644 --- a/canal/sync.go +++ b/canal/sync.go @@ -95,7 +95,7 @@ func (c *Canal) runSyncBinlog() error { c.cfg.Logger.Infof("rotate binlog to %s", pos) savePos = true force = true - if err = c.eventHandler.OnRotate(e); err != nil { + if err = c.eventHandler.OnRotate(ev.Header, e); err != nil { return errors.Trace(err) } case *replication.RowsEvent: @@ -115,7 +115,7 @@ func (c *Canal) runSyncBinlog() error { case *replication.XIDEvent: savePos = true // try to save the position later - if err := c.eventHandler.OnXID(pos); err != nil { + if err := c.eventHandler.OnXID(ev.Header, pos); err != nil { return errors.Trace(err) } if e.GSet != nil { @@ -127,7 +127,7 @@ func (c *Canal) runSyncBinlog() error { if err != nil { return errors.Trace(err) } - if err := c.eventHandler.OnGTID(gtid); err != nil { + if err := c.eventHandler.OnGTID(ev.Header, gtid); err != nil { return errors.Trace(err) } case *replication.GTIDEvent: @@ -136,7 +136,7 @@ func (c *Canal) runSyncBinlog() error { if err != nil { return errors.Trace(err) } - if err := c.eventHandler.OnGTID(gtid); err != nil { + if err := c.eventHandler.OnGTID(ev.Header, gtid); err != nil { return errors.Trace(err) } case *replication.QueryEvent: @@ -151,7 +151,7 @@ func (c *Canal) runSyncBinlog() error { if node.db == "" { node.db = string(e.Schema) } - if err = c.updateTable(node.db, node.table); err != nil { + if err = c.updateTable(ev.Header, node.db, node.table); err != nil { return errors.Trace(err) } } @@ -159,7 +159,7 @@ func (c *Canal) runSyncBinlog() error { savePos = true force = true // Now we only handle Table Changed DDL, maybe we will support more later. - if err = c.eventHandler.OnDDL(pos, e); err != nil { + if err = c.eventHandler.OnDDL(ev.Header, pos, e); err != nil { return errors.Trace(err) } } @@ -176,7 +176,7 @@ func (c *Canal) runSyncBinlog() error { c.master.UpdateTimestamp(ev.Header.Timestamp) fakeRotateLogName = "" - if err := c.eventHandler.OnPosSynced(pos, c.master.GTIDSet(), force); err != nil { + if err := c.eventHandler.OnPosSynced(ev.Header, pos, c.master.GTIDSet(), force); err != nil { return errors.Trace(err) } } @@ -228,10 +228,10 @@ func parseStmt(stmt ast.StmtNode) (ns []*node) { return ns } -func (c *Canal) updateTable(db, table string) (err error) { +func (c *Canal) updateTable(header *replication.EventHeader, db, table string) (err error) { c.ClearTableCache([]byte(db), []byte(table)) c.cfg.Logger.Infof("table structure changed, clear table cache: %s.%s\n", db, table) - if err = c.eventHandler.OnTableChanged(db, table); err != nil && errors.Cause(err) != schema.ErrTableNotExist { + if err = c.eventHandler.OnTableChanged(header, db, table); err != nil && errors.Cause(err) != schema.ErrTableNotExist { return errors.Trace(err) } return