From 0be0f1f14b462d33ca0da613ea0d6cb7c3953747 Mon Sep 17 00:00:00 2001 From: huangjunwei Date: Fri, 6 Oct 2023 14:24:13 +0800 Subject: [PATCH 1/3] handle subevents in transaction payload event --- canal/sync.go | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/canal/sync.go b/canal/sync.go index 5523d9fe4..8fe566d04 100644 --- a/canal/sync.go +++ b/canal/sync.go @@ -106,6 +106,23 @@ func (c *Canal) runSyncBinlog() error { } } continue + case *replication.TransactionPayloadEvent: + // handle subevent row by row + ev := ev.Event.(*replication.TransactionPayloadEvent) + for _, subEvent := range ev.Events { + err = c.handleRowsEvent(subEvent) + if err != nil { + e := errors.Cause(err) + // if error is not ErrExcludedTable or ErrTableNotExist or ErrMissingTableMeta, stop canal + if e != ErrExcludedTable && + e != schema.ErrTableNotExist && + e != schema.ErrMissingTableMeta { + c.cfg.Logger.Errorf("handle transaction payload rows event at (%s, %d) error %v", pos.Name, curPos, err) + return errors.Trace(err) + } + } + } + continue case *replication.XIDEvent: savePos = true // try to save the position later From 130a049cfa4f484c6bdd16d3a9b4c5880edd8673 Mon Sep 17 00:00:00 2001 From: huangjunwei Date: Sun, 8 Oct 2023 21:56:29 +0800 Subject: [PATCH 2/3] encap duplicate code --- canal/sync.go | 38 ++++++++++++++++++-------------------- 1 file changed, 18 insertions(+), 20 deletions(-) diff --git a/canal/sync.go b/canal/sync.go index 8fe566d04..69f58c824 100644 --- a/canal/sync.go +++ b/canal/sync.go @@ -96,14 +96,8 @@ func (c *Canal) runSyncBinlog() error { // we only focus row based event err = c.handleRowsEvent(ev) if err != nil { - e := errors.Cause(err) - // if error is not ErrExcludedTable or ErrTableNotExist or ErrMissingTableMeta, stop canal - if e != ErrExcludedTable && - e != schema.ErrTableNotExist && - e != schema.ErrMissingTableMeta { - c.cfg.Logger.Errorf("handle rows event at (%s, %d) error %v", pos.Name, curPos, err) - return errors.Trace(err) - } + c.cfg.Logger.Errorf("handle rows event at (%s, %d) error %v", pos.Name, curPos, err) + return errors.Trace(err) } continue case *replication.TransactionPayloadEvent: @@ -112,14 +106,8 @@ func (c *Canal) runSyncBinlog() error { for _, subEvent := range ev.Events { err = c.handleRowsEvent(subEvent) if err != nil { - e := errors.Cause(err) - // if error is not ErrExcludedTable or ErrTableNotExist or ErrMissingTableMeta, stop canal - if e != ErrExcludedTable && - e != schema.ErrTableNotExist && - e != schema.ErrMissingTableMeta { - c.cfg.Logger.Errorf("handle transaction payload rows event at (%s, %d) error %v", pos.Name, curPos, err) - return errors.Trace(err) - } + c.cfg.Logger.Errorf("handle transaction payload rows event at (%s, %d) error %v", pos.Name, curPos, err) + return errors.Trace(err) } } continue @@ -249,10 +237,10 @@ func (c *Canal) handleRowsEvent(e *replication.BinlogEvent) error { ev := e.Event.(*replication.RowsEvent) // Caveat: table may be altered at runtime. - schema := string(ev.Table.Schema) - table := string(ev.Table.Table) + schemaName := string(ev.Table.Schema) + tableName := string(ev.Table.Table) - t, err := c.GetTable(schema, table) + t, err := c.GetTable(schemaName, tableName) if err != nil { return err } @@ -268,7 +256,17 @@ func (c *Canal) handleRowsEvent(e *replication.BinlogEvent) error { return errors.Errorf("%s not supported now", e.Header.EventType) } events := newRowsEvent(t, action, ev.Rows, e.Header) - return c.eventHandler.OnRow(events) + err = c.eventHandler.OnRow(events) + if err != nil { + e := errors.Cause(err) + if e != ErrExcludedTable && + e != schema.ErrTableNotExist && + e != schema.ErrMissingTableMeta { + return err + } + } + + return nil } func (c *Canal) FlushBinlog() error { From 69234e2a2d0d83d2c47734c4a718a2ad0e79fdc4 Mon Sep 17 00:00:00 2001 From: huangjunwei Date: Mon, 9 Oct 2023 21:47:24 +0800 Subject: [PATCH 3/3] fix error handle --- canal/sync.go | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/canal/sync.go b/canal/sync.go index 69f58c824..ff2eccfd1 100644 --- a/canal/sync.go +++ b/canal/sync.go @@ -242,6 +242,12 @@ func (c *Canal) handleRowsEvent(e *replication.BinlogEvent) error { t, err := c.GetTable(schemaName, tableName) if err != nil { + e := errors.Cause(err) + // ignore errors below + if e == ErrExcludedTable || e == schema.ErrTableNotExist || e == schema.ErrMissingTableMeta { + err = nil + } + return err } var action string @@ -256,17 +262,7 @@ func (c *Canal) handleRowsEvent(e *replication.BinlogEvent) error { return errors.Errorf("%s not supported now", e.Header.EventType) } events := newRowsEvent(t, action, ev.Rows, e.Header) - err = c.eventHandler.OnRow(events) - if err != nil { - e := errors.Cause(err) - if e != ErrExcludedTable && - e != schema.ErrTableNotExist && - e != schema.ErrMissingTableMeta { - return err - } - } - - return nil + return c.eventHandler.OnRow(events) } func (c *Canal) FlushBinlog() error {