Skip to content

Commit f0f8617

Browse files
hjweddielance6716
andauthored
feat: handle subevents in transaction payload event (#827)
* handle subevents in transaction payload event * encap duplicate code * fix error handle --------- Co-authored-by: lance6716 <[email protected]>
1 parent 3606141 commit f0f8617

File tree

1 file changed

+20
-9
lines changed

1 file changed

+20
-9
lines changed

canal/sync.go

+20-9
Original file line numberDiff line numberDiff line change
@@ -96,12 +96,17 @@ func (c *Canal) runSyncBinlog() error {
9696
// we only focus row based event
9797
err = c.handleRowsEvent(ev)
9898
if err != nil {
99-
e := errors.Cause(err)
100-
// if error is not ErrExcludedTable or ErrTableNotExist or ErrMissingTableMeta, stop canal
101-
if e != ErrExcludedTable &&
102-
e != schema.ErrTableNotExist &&
103-
e != schema.ErrMissingTableMeta {
104-
c.cfg.Logger.Errorf("handle rows event at (%s, %d) error %v", pos.Name, curPos, err)
99+
c.cfg.Logger.Errorf("handle rows event at (%s, %d) error %v", pos.Name, curPos, err)
100+
return errors.Trace(err)
101+
}
102+
continue
103+
case *replication.TransactionPayloadEvent:
104+
// handle subevent row by row
105+
ev := ev.Event.(*replication.TransactionPayloadEvent)
106+
for _, subEvent := range ev.Events {
107+
err = c.handleRowsEvent(subEvent)
108+
if err != nil {
109+
c.cfg.Logger.Errorf("handle transaction payload rows event at (%s, %d) error %v", pos.Name, curPos, err)
105110
return errors.Trace(err)
106111
}
107112
}
@@ -232,11 +237,17 @@ func (c *Canal) handleRowsEvent(e *replication.BinlogEvent) error {
232237
ev := e.Event.(*replication.RowsEvent)
233238

234239
// Caveat: table may be altered at runtime.
235-
schema := string(ev.Table.Schema)
236-
table := string(ev.Table.Table)
240+
schemaName := string(ev.Table.Schema)
241+
tableName := string(ev.Table.Table)
237242

238-
t, err := c.GetTable(schema, table)
243+
t, err := c.GetTable(schemaName, tableName)
239244
if err != nil {
245+
e := errors.Cause(err)
246+
// ignore errors below
247+
if e == ErrExcludedTable || e == schema.ErrTableNotExist || e == schema.ErrMissingTableMeta {
248+
err = nil
249+
}
250+
240251
return err
241252
}
242253
var action string

0 commit comments

Comments
 (0)