diff --git a/canal/sync.go b/canal/sync.go index 5523d9fe4..ff2eccfd1 100644 --- a/canal/sync.go +++ b/canal/sync.go @@ -96,12 +96,17 @@ func (c *Canal) runSyncBinlog() error { // we only focus row based event err = c.handleRowsEvent(ev) if err != nil { - e := errors.Cause(err) - // if error is not ErrExcludedTable or ErrTableNotExist or ErrMissingTableMeta, stop canal - if e != ErrExcludedTable && - e != schema.ErrTableNotExist && - e != schema.ErrMissingTableMeta { - c.cfg.Logger.Errorf("handle rows event at (%s, %d) error %v", pos.Name, curPos, err) + c.cfg.Logger.Errorf("handle rows event at (%s, %d) error %v", pos.Name, curPos, err) + return errors.Trace(err) + } + continue + case *replication.TransactionPayloadEvent: + // handle subevent row by row + ev := ev.Event.(*replication.TransactionPayloadEvent) + for _, subEvent := range ev.Events { + err = c.handleRowsEvent(subEvent) + if err != nil { + c.cfg.Logger.Errorf("handle transaction payload rows event at (%s, %d) error %v", pos.Name, curPos, err) return errors.Trace(err) } } @@ -232,11 +237,17 @@ func (c *Canal) handleRowsEvent(e *replication.BinlogEvent) error { ev := e.Event.(*replication.RowsEvent) // Caveat: table may be altered at runtime. - schema := string(ev.Table.Schema) - table := string(ev.Table.Table) + schemaName := string(ev.Table.Schema) + tableName := string(ev.Table.Table) - t, err := c.GetTable(schema, table) + t, err := c.GetTable(schemaName, tableName) if err != nil { + e := errors.Cause(err) + // ignore errors below + if e == ErrExcludedTable || e == schema.ErrTableNotExist || e == schema.ErrMissingTableMeta { + err = nil + } + return err } var action string