From 71573d34905e1bd9c230f8654a111f8b6f2e5755 Mon Sep 17 00:00:00 2001 From: BLAZZ Date: Fri, 25 Nov 2022 22:21:31 +0800 Subject: [PATCH 1/4] EventHandlerV2 support handle event with replication.EventHeader --- canal/canal.go | 7 +++++- canal/dump.go | 15 +++++++++--- canal/handler.go | 21 +++++++++++++++++ canal/sync.go | 59 +++++++++++++++++++++++++++++++++++++++--------- 4 files changed, 87 insertions(+), 15 deletions(-) diff --git a/canal/canal.go b/canal/canal.go index 27799de80..dad0ca14f 100644 --- a/canal/canal.go +++ b/canal/canal.go @@ -36,6 +36,7 @@ type Canal struct { syncer *replication.BinlogSyncer eventHandler EventHandler + eventHandlerV2 EventHandlerV2 connLock sync.Mutex conn *client.Conn @@ -248,7 +249,11 @@ func (c *Canal) Close() { c.conn = nil c.connLock.Unlock() - _ = c.eventHandler.OnPosSynced(c.master.Position(), c.master.GTIDSet(), true) + if c.eventHandlerV2 == nil { + _ = c.eventHandler.OnPosSynced(c.master.Position(), c.master.GTIDSet(), true) + } else { + _ = c.eventHandlerV2.OnPosSynced(nil, c.master.Position(), c.master.GTIDSet(), true) + } } func (c *Canal) WaitDumpDone() <-chan struct{} { diff --git a/canal/dump.go b/canal/dump.go index 67b3c0027..793e6d05f 100644 --- a/canal/dump.go +++ b/canal/dump.go @@ -110,7 +110,11 @@ func (h *dumpParseHandler) Data(db string, table string, values []string) error } events := newRowsEvent(tableInfo, InsertAction, [][]interface{}{vs}, nil) - return h.c.eventHandler.OnRow(events) + if h.c.eventHandlerV2 == nil { + return h.c.eventHandler.OnRow(events) + } else { + return h.c.eventHandlerV2.OnRow(events) + } } func (c *Canal) AddDumpDatabases(dbs ...string) { @@ -175,8 +179,13 @@ func (c *Canal) dump() error { pos := mysql.Position{Name: h.name, Pos: uint32(h.pos)} c.master.Update(pos) - c.master.UpdateGTIDSet(h.gset) - if err := c.eventHandler.OnPosSynced(pos, c.master.GTIDSet(), true); err != nil { + c.master.UpdateGTIDSet(h.gset)var err error + if c.eventHandlerV2 == nil { + err = c.eventHandler.OnPosSynced(pos, c.master.GTIDSet(), true) + } else { + err = c.eventHandlerV2.OnPosSynced(nil, pos, c.master.GTIDSet(), true) + } + if err != nil { return errors.Trace(err) } var startPos fmt.Stringer = pos diff --git a/canal/handler.go b/canal/handler.go index 80a10a956..f7700fe86 100644 --- a/canal/handler.go +++ b/canal/handler.go @@ -40,3 +40,24 @@ func (h *DummyEventHandler) String() string { return "DummyEventHandler" } func (c *Canal) SetEventHandler(h EventHandler) { c.eventHandler = h } + +// EventHandlerV2 can process event with replication.EventHeader +type EventHandlerV2 interface { + OnRotate(header *replication.EventHeader, rotateEvent *replication.RotateEvent) error + // OnTableChanged is called when the table is created, altered, renamed or dropped. + // You need to clear the associated data like cache with the table. + // It will be called before OnDDL. + OnTableChanged(header *replication.EventHeader, schema string, table string) error + OnDDL(header *replication.EventHeader, nextPos mysql.Position, queryEvent *replication.QueryEvent) error + OnRow(e *RowsEvent) error + OnXID(header *replication.EventHeader, nextPos mysql.Position) error + OnGTID(header *replication.EventHeader, gtid mysql.GTIDSet) error + // OnPosSynced Use your own way to sync position. When force is true, sync position immediately. + OnPosSynced(header *replication.EventHeader, pos mysql.Position, set mysql.GTIDSet, force bool) error + String() string +} + +// SetEventHandlerV2 to registers EventHandlerV2 handler replace the EventHandler +func (c *Canal) SetEventHandlerV2(h EventHandlerV2) { + c.eventHandlerV2 = h +} diff --git a/canal/sync.go b/canal/sync.go index 1c9c8d308..0ade16232 100644 --- a/canal/sync.go +++ b/canal/sync.go @@ -95,9 +95,12 @@ func (c *Canal) runSyncBinlog() error { c.cfg.Logger.Infof("rotate binlog to %s", pos) savePos = true force = true - if err = c.eventHandler.OnRotate(e); err != nil { - return errors.Trace(err) + if c.eventHandlerV2 == nil { + err = c.eventHandler.OnRotate(e) + } else { + err = c.eventHandlerV2.OnRotate(ev.Header, e) } + if err != nil { case *replication.RowsEvent: // we only focus row based event err = c.handleRowsEvent(ev) @@ -115,7 +118,12 @@ func (c *Canal) runSyncBinlog() error { case *replication.XIDEvent: savePos = true // try to save the position later - if err := c.eventHandler.OnXID(pos); err != nil { + if c.eventHandlerV2 == nil { + err = c.eventHandler.OnXID(pos) + } else { + err = c.eventHandlerV2.OnXID(ev.Header, pos) + } + if err != nil { return errors.Trace(err) } if e.GSet != nil { @@ -127,7 +135,12 @@ func (c *Canal) runSyncBinlog() error { if err != nil { return errors.Trace(err) } - if err := c.eventHandler.OnGTID(gtid); err != nil { + if c.eventHandlerV2 == nil { + err = c.eventHandler.OnGTID(gtid) + } else { + err = c.eventHandlerV2.OnGTID(ev.Header, gtid) + } + if err != nil { return errors.Trace(err) } case *replication.GTIDEvent: @@ -136,7 +149,12 @@ func (c *Canal) runSyncBinlog() error { if err != nil { return errors.Trace(err) } - if err := c.eventHandler.OnGTID(gtid); err != nil { + if c.eventHandlerV2 == nil { + err = c.eventHandler.OnGTID(gtid) + } else { + err = c.eventHandlerV2.OnGTID(ev.Header, gtid) + } + if err != nil { return errors.Trace(err) } case *replication.QueryEvent: @@ -151,7 +169,7 @@ func (c *Canal) runSyncBinlog() error { if node.db == "" { node.db = string(e.Schema) } - if err = c.updateTable(node.db, node.table); err != nil { + if err = c.updateTable(ev.Header, node.db, node.table); err != nil { return errors.Trace(err) } } @@ -159,7 +177,12 @@ func (c *Canal) runSyncBinlog() error { savePos = true force = true // Now we only handle Table Changed DDL, maybe we will support more later. - if err = c.eventHandler.OnDDL(pos, e); err != nil { + if c.eventHandlerV2 == nil { + err = c.eventHandler.OnDDL(pos, e) + } else { + err = c.eventHandlerV2.OnDDL(ev.Header, pos, e) + } + if err != nil { return errors.Trace(err) } } @@ -176,7 +199,12 @@ func (c *Canal) runSyncBinlog() error { c.master.UpdateTimestamp(ev.Header.Timestamp) fakeRotateLogName = "" - if err := c.eventHandler.OnPosSynced(pos, c.master.GTIDSet(), force); err != nil { + if c.eventHandlerV2 == nil { + err = c.eventHandler.OnPosSynced(pos, c.master.GTIDSet(), force) + } else { + err = c.eventHandlerV2.OnPosSynced(ev.Header, pos, c.master.GTIDSet(), force) + } + if err != nil { return errors.Trace(err) } } @@ -228,10 +256,15 @@ func parseStmt(stmt ast.StmtNode) (ns []*node) { return ns } -func (c *Canal) updateTable(db, table string) (err error) { +func (c *Canal) updateTable(header *replication.EventHeader, db, table string) (err error) { c.ClearTableCache([]byte(db), []byte(table)) c.cfg.Logger.Infof("table structure changed, clear table cache: %s.%s\n", db, table) - if err = c.eventHandler.OnTableChanged(db, table); err != nil && errors.Cause(err) != schema.ErrTableNotExist { + if c.eventHandlerV2 == nil { + err = c.eventHandler.OnTableChanged(db, table) + } else { + err = c.eventHandlerV2.OnTableChanged(header, db, table) + } + if err != nil && errors.Cause(err) != schema.ErrTableNotExist { return errors.Trace(err) } return @@ -268,7 +301,11 @@ func (c *Canal) handleRowsEvent(e *replication.BinlogEvent) error { return errors.Errorf("%s not supported now", e.Header.EventType) } events := newRowsEvent(t, action, ev.Rows, e.Header) - return c.eventHandler.OnRow(events) + if c.eventHandlerV2 == nil { + return c.eventHandler.OnRow(events) + } else { + return c.eventHandlerV2.OnRow(events) + } } func (c *Canal) FlushBinlog() error { From 68d659f430437bdf484c2b47558331f5ae845cfc Mon Sep 17 00:00:00 2001 From: BLAZZ Date: Fri, 25 Nov 2022 22:43:44 +0800 Subject: [PATCH 2/4] fix sytanx error sytanx error --- canal/dump.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/canal/dump.go b/canal/dump.go index 793e6d05f..c8cf4a842 100644 --- a/canal/dump.go +++ b/canal/dump.go @@ -179,7 +179,8 @@ func (c *Canal) dump() error { pos := mysql.Position{Name: h.name, Pos: uint32(h.pos)} c.master.Update(pos) - c.master.UpdateGTIDSet(h.gset)var err error + c.master.UpdateGTIDSet(h.gset) + var err error if c.eventHandlerV2 == nil { err = c.eventHandler.OnPosSynced(pos, c.master.GTIDSet(), true) } else { From e98e7673f680773cadc9460919b0e8bc5474ae13 Mon Sep 17 00:00:00 2001 From: BLAZZ Date: Tue, 29 Nov 2022 20:08:49 +0800 Subject: [PATCH 3/4] change EventHandlerV2 to EventHandler --- canal/canal.go | 7 +----- canal/dump.go | 14 ++---------- canal/handler.go | 53 +++++++++++++++++----------------------------- canal/sync.go | 55 ++++++++---------------------------------------- 4 files changed, 31 insertions(+), 98 deletions(-) diff --git a/canal/canal.go b/canal/canal.go index dad0ca14f..2e69632dd 100644 --- a/canal/canal.go +++ b/canal/canal.go @@ -36,7 +36,6 @@ type Canal struct { syncer *replication.BinlogSyncer eventHandler EventHandler - eventHandlerV2 EventHandlerV2 connLock sync.Mutex conn *client.Conn @@ -249,11 +248,7 @@ func (c *Canal) Close() { c.conn = nil c.connLock.Unlock() - if c.eventHandlerV2 == nil { - _ = c.eventHandler.OnPosSynced(c.master.Position(), c.master.GTIDSet(), true) - } else { - _ = c.eventHandlerV2.OnPosSynced(nil, c.master.Position(), c.master.GTIDSet(), true) - } + _ = c.eventHandler.OnPosSynced(nil, c.master.Position(), c.master.GTIDSet(), true) } func (c *Canal) WaitDumpDone() <-chan struct{} { diff --git a/canal/dump.go b/canal/dump.go index c8cf4a842..3a6a9e9db 100644 --- a/canal/dump.go +++ b/canal/dump.go @@ -110,11 +110,7 @@ func (h *dumpParseHandler) Data(db string, table string, values []string) error } events := newRowsEvent(tableInfo, InsertAction, [][]interface{}{vs}, nil) - if h.c.eventHandlerV2 == nil { - return h.c.eventHandler.OnRow(events) - } else { - return h.c.eventHandlerV2.OnRow(events) - } + return h.c.eventHandler.OnRow(events) } func (c *Canal) AddDumpDatabases(dbs ...string) { @@ -180,13 +176,7 @@ func (c *Canal) dump() error { pos := mysql.Position{Name: h.name, Pos: uint32(h.pos)} c.master.Update(pos) c.master.UpdateGTIDSet(h.gset) - var err error - if c.eventHandlerV2 == nil { - err = c.eventHandler.OnPosSynced(pos, c.master.GTIDSet(), true) - } else { - err = c.eventHandlerV2.OnPosSynced(nil, pos, c.master.GTIDSet(), true) - } - if err != nil { + if err := c.eventHandler.OnPosSynced(nil, pos, c.master.GTIDSet(), true); err != nil { return errors.Trace(err) } var startPos fmt.Stringer = pos diff --git a/canal/handler.go b/canal/handler.go index f7700fe86..71eccd74c 100644 --- a/canal/handler.go +++ b/canal/handler.go @@ -6,32 +6,38 @@ import ( ) type EventHandler interface { - OnRotate(rotateEvent *replication.RotateEvent) error + OnRotate(header *replication.EventHeader, rotateEvent *replication.RotateEvent) error // OnTableChanged is called when the table is created, altered, renamed or dropped. // You need to clear the associated data like cache with the table. // It will be called before OnDDL. - OnTableChanged(schema string, table string) error - OnDDL(nextPos mysql.Position, queryEvent *replication.QueryEvent) error + OnTableChanged(header *replication.EventHeader, schema string, table string) error + OnDDL(header *replication.EventHeader, nextPos mysql.Position, queryEvent *replication.QueryEvent) error OnRow(e *RowsEvent) error - OnXID(nextPos mysql.Position) error - OnGTID(gtid mysql.GTIDSet) error + OnXID(header *replication.EventHeader, nextPos mysql.Position) error + OnGTID(header *replication.EventHeader, gtid mysql.GTIDSet) error // OnPosSynced Use your own way to sync position. When force is true, sync position immediately. - OnPosSynced(pos mysql.Position, set mysql.GTIDSet, force bool) error + OnPosSynced(header *replication.EventHeader, pos mysql.Position, set mysql.GTIDSet, force bool) error String() string } type DummyEventHandler struct { } -func (h *DummyEventHandler) OnRotate(*replication.RotateEvent) error { return nil } -func (h *DummyEventHandler) OnTableChanged(schema string, table string) error { return nil } -func (h *DummyEventHandler) OnDDL(nextPos mysql.Position, queryEvent *replication.QueryEvent) error { +func (h *DummyEventHandler) OnRotate(*replication.EventHeader, *replication.RotateEvent) error { + return nil +} +func (h *DummyEventHandler) OnTableChanged(*replication.EventHeader, string, string) error { + return nil +} +func (h *DummyEventHandler) OnDDL(*replication.EventHeader, mysql.Position, *replication.QueryEvent) error { + return nil +} +func (h *DummyEventHandler) OnRow(*RowsEvent) error { return nil } +func (h *DummyEventHandler) OnXID(*replication.EventHeader, mysql.Position) error { return nil } +func (h *DummyEventHandler) OnGTID(*replication.EventHeader, mysql.GTIDSet) error { return nil } +func (h *DummyEventHandler) OnPosSynced(*replication.EventHeader, mysql.Position, mysql.GTIDSet, bool) error { return nil } -func (h *DummyEventHandler) OnRow(*RowsEvent) error { return nil } -func (h *DummyEventHandler) OnXID(mysql.Position) error { return nil } -func (h *DummyEventHandler) OnGTID(mysql.GTIDSet) error { return nil } -func (h *DummyEventHandler) OnPosSynced(mysql.Position, mysql.GTIDSet, bool) error { return nil } func (h *DummyEventHandler) String() string { return "DummyEventHandler" } @@ -40,24 +46,3 @@ func (h *DummyEventHandler) String() string { return "DummyEventHandler" } func (c *Canal) SetEventHandler(h EventHandler) { c.eventHandler = h } - -// EventHandlerV2 can process event with replication.EventHeader -type EventHandlerV2 interface { - OnRotate(header *replication.EventHeader, rotateEvent *replication.RotateEvent) error - // OnTableChanged is called when the table is created, altered, renamed or dropped. - // You need to clear the associated data like cache with the table. - // It will be called before OnDDL. - OnTableChanged(header *replication.EventHeader, schema string, table string) error - OnDDL(header *replication.EventHeader, nextPos mysql.Position, queryEvent *replication.QueryEvent) error - OnRow(e *RowsEvent) error - OnXID(header *replication.EventHeader, nextPos mysql.Position) error - OnGTID(header *replication.EventHeader, gtid mysql.GTIDSet) error - // OnPosSynced Use your own way to sync position. When force is true, sync position immediately. - OnPosSynced(header *replication.EventHeader, pos mysql.Position, set mysql.GTIDSet, force bool) error - String() string -} - -// SetEventHandlerV2 to registers EventHandlerV2 handler replace the EventHandler -func (c *Canal) SetEventHandlerV2(h EventHandlerV2) { - c.eventHandlerV2 = h -} diff --git a/canal/sync.go b/canal/sync.go index 0ade16232..abd835c96 100644 --- a/canal/sync.go +++ b/canal/sync.go @@ -95,12 +95,9 @@ func (c *Canal) runSyncBinlog() error { c.cfg.Logger.Infof("rotate binlog to %s", pos) savePos = true force = true - if c.eventHandlerV2 == nil { - err = c.eventHandler.OnRotate(e) - } else { - err = c.eventHandlerV2.OnRotate(ev.Header, e) + if err = c.eventHandler.OnRotate(ev.Header, e); err != nil { + return errors.Trace(err) } - if err != nil { case *replication.RowsEvent: // we only focus row based event err = c.handleRowsEvent(ev) @@ -118,12 +115,7 @@ func (c *Canal) runSyncBinlog() error { case *replication.XIDEvent: savePos = true // try to save the position later - if c.eventHandlerV2 == nil { - err = c.eventHandler.OnXID(pos) - } else { - err = c.eventHandlerV2.OnXID(ev.Header, pos) - } - if err != nil { + if err := c.eventHandler.OnXID(ev.Header, pos); err != nil { return errors.Trace(err) } if e.GSet != nil { @@ -135,12 +127,7 @@ func (c *Canal) runSyncBinlog() error { if err != nil { return errors.Trace(err) } - if c.eventHandlerV2 == nil { - err = c.eventHandler.OnGTID(gtid) - } else { - err = c.eventHandlerV2.OnGTID(ev.Header, gtid) - } - if err != nil { + if err := c.eventHandler.OnGTID(ev.Header, gtid); err != nil { return errors.Trace(err) } case *replication.GTIDEvent: @@ -149,12 +136,7 @@ func (c *Canal) runSyncBinlog() error { if err != nil { return errors.Trace(err) } - if c.eventHandlerV2 == nil { - err = c.eventHandler.OnGTID(gtid) - } else { - err = c.eventHandlerV2.OnGTID(ev.Header, gtid) - } - if err != nil { + if err := c.eventHandler.OnGTID(ev.Header, gtid); err != nil { return errors.Trace(err) } case *replication.QueryEvent: @@ -177,12 +159,7 @@ func (c *Canal) runSyncBinlog() error { savePos = true force = true // Now we only handle Table Changed DDL, maybe we will support more later. - if c.eventHandlerV2 == nil { - err = c.eventHandler.OnDDL(pos, e) - } else { - err = c.eventHandlerV2.OnDDL(ev.Header, pos, e) - } - if err != nil { + if err = c.eventHandler.OnDDL(ev.Header, pos, e); err != nil { return errors.Trace(err) } } @@ -199,12 +176,7 @@ func (c *Canal) runSyncBinlog() error { c.master.UpdateTimestamp(ev.Header.Timestamp) fakeRotateLogName = "" - if c.eventHandlerV2 == nil { - err = c.eventHandler.OnPosSynced(pos, c.master.GTIDSet(), force) - } else { - err = c.eventHandlerV2.OnPosSynced(ev.Header, pos, c.master.GTIDSet(), force) - } - if err != nil { + if err := c.eventHandler.OnPosSynced(ev.Header, pos, c.master.GTIDSet(), force); err != nil { return errors.Trace(err) } } @@ -259,12 +231,7 @@ func parseStmt(stmt ast.StmtNode) (ns []*node) { func (c *Canal) updateTable(header *replication.EventHeader, db, table string) (err error) { c.ClearTableCache([]byte(db), []byte(table)) c.cfg.Logger.Infof("table structure changed, clear table cache: %s.%s\n", db, table) - if c.eventHandlerV2 == nil { - err = c.eventHandler.OnTableChanged(db, table) - } else { - err = c.eventHandlerV2.OnTableChanged(header, db, table) - } - if err != nil && errors.Cause(err) != schema.ErrTableNotExist { + if err = c.eventHandler.OnTableChanged(header, db, table); err != nil && errors.Cause(err) != schema.ErrTableNotExist { return errors.Trace(err) } return @@ -301,11 +268,7 @@ func (c *Canal) handleRowsEvent(e *replication.BinlogEvent) error { return errors.Errorf("%s not supported now", e.Header.EventType) } events := newRowsEvent(t, action, ev.Rows, e.Header) - if c.eventHandlerV2 == nil { - return c.eventHandler.OnRow(events) - } else { - return c.eventHandlerV2.OnRow(events) - } + return c.eventHandler.OnRow(events) } func (c *Canal) FlushBinlog() error { From 986b50fd928ca5c14da6862556b0b72905592a24 Mon Sep 17 00:00:00 2001 From: BLAZZ Date: Wed, 30 Nov 2022 10:00:09 +0800 Subject: [PATCH 4/4] change test case handler impl --- canal/canal_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/canal/canal_test.go b/canal/canal_test.go index 509c419e3..c48f2b058 100644 --- a/canal/canal_test.go +++ b/canal/canal_test.go @@ -123,7 +123,7 @@ func (h *testEventHandler) String() string { return "testEventHandler" } -func (h *testEventHandler) OnPosSynced(p mysql.Position, set mysql.GTIDSet, f bool) error { +func (h *testEventHandler) OnPosSynced(header *replication.EventHeader, p mysql.Position, set mysql.GTIDSet, f bool) error { return nil }