Skip to content

Commit e35272c

Browse files
authored
Fix bug in handling sub events of replication.TransactionPayloadEvent (#875)
1 parent eb2c6d1 commit e35272c

File tree

1 file changed

+98
-91
lines changed

1 file changed

+98
-91
lines changed

canal/sync.go

+98-91
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,6 @@ func (c *Canal) runSyncBinlog() error {
3838
return err
3939
}
4040

41-
savePos := false
42-
force := false
43-
4441
for {
4542
ev, err := s.GetEvent(c.ctx)
4643
if err != nil {
@@ -69,110 +66,120 @@ func (c *Canal) runSyncBinlog() error {
6966
}
7067
}
7168

72-
savePos = false
73-
force = false
74-
pos := c.master.Position()
69+
err = c.handleEvent(ev)
70+
if err != nil {
71+
return err
72+
}
73+
}
74+
}
7575

76-
curPos := pos.Pos
76+
func (c *Canal) handleEvent(ev *replication.BinlogEvent) error {
77+
savePos := false
78+
force := false
79+
pos := c.master.Position()
80+
var err error
7781

78-
// next binlog pos
79-
pos.Pos = ev.Header.LogPos
82+
curPos := pos.Pos
8083

81-
// We only save position with RotateEvent and XIDEvent.
82-
// For RowsEvent, we can't save the position until meeting XIDEvent
83-
// which tells the whole transaction is over.
84-
// TODO: If we meet any DDL query, we must save too.
85-
switch e := ev.Event.(type) {
86-
case *replication.RotateEvent:
87-
pos.Name = string(e.NextLogName)
88-
pos.Pos = uint32(e.Position)
89-
c.cfg.Logger.Infof("rotate binlog to %s", pos)
90-
savePos = true
91-
force = true
92-
if err = c.eventHandler.OnRotate(ev.Header, e); err != nil {
93-
return errors.Trace(err)
94-
}
95-
case *replication.RowsEvent:
96-
// we only focus row based event
97-
err = c.handleRowsEvent(ev)
84+
// next binlog pos
85+
pos.Pos = ev.Header.LogPos
86+
87+
// We only save position with RotateEvent and XIDEvent.
88+
// For RowsEvent, we can't save the position until meeting XIDEvent
89+
// which tells the whole transaction is over.
90+
// TODO: If we meet any DDL query, we must save too.
91+
switch e := ev.Event.(type) {
92+
case *replication.RotateEvent:
93+
pos.Name = string(e.NextLogName)
94+
pos.Pos = uint32(e.Position)
95+
c.cfg.Logger.Infof("rotate binlog to %s", pos)
96+
savePos = true
97+
force = true
98+
if err = c.eventHandler.OnRotate(ev.Header, e); err != nil {
99+
return errors.Trace(err)
100+
}
101+
case *replication.RowsEvent:
102+
// we only focus row based event
103+
err = c.handleRowsEvent(ev)
104+
if err != nil {
105+
c.cfg.Logger.Errorf("handle rows event at (%s, %d) error %v", pos.Name, curPos, err)
106+
return errors.Trace(err)
107+
}
108+
return nil
109+
case *replication.TransactionPayloadEvent:
110+
// handle subevent row by row
111+
ev := ev.Event.(*replication.TransactionPayloadEvent)
112+
for _, subEvent := range ev.Events {
113+
err = c.handleEvent(subEvent)
98114
if err != nil {
99-
c.cfg.Logger.Errorf("handle rows event at (%s, %d) error %v", pos.Name, curPos, err)
115+
c.cfg.Logger.Errorf("handle transaction payload subevent at (%s, %d) error %v", pos.Name, curPos, err)
100116
return errors.Trace(err)
101117
}
102-
continue
103-
case *replication.TransactionPayloadEvent:
104-
// handle subevent row by row
105-
ev := ev.Event.(*replication.TransactionPayloadEvent)
106-
for _, subEvent := range ev.Events {
107-
err = c.handleRowsEvent(subEvent)
108-
if err != nil {
109-
c.cfg.Logger.Errorf("handle transaction payload rows event at (%s, %d) error %v", pos.Name, curPos, err)
118+
}
119+
return nil
120+
case *replication.XIDEvent:
121+
savePos = true
122+
// try to save the position later
123+
if err := c.eventHandler.OnXID(ev.Header, pos); err != nil {
124+
return errors.Trace(err)
125+
}
126+
if e.GSet != nil {
127+
c.master.UpdateGTIDSet(e.GSet)
128+
}
129+
case *replication.MariadbGTIDEvent:
130+
if err := c.eventHandler.OnGTID(ev.Header, e); err != nil {
131+
return errors.Trace(err)
132+
}
133+
case *replication.GTIDEvent:
134+
if err := c.eventHandler.OnGTID(ev.Header, e); err != nil {
135+
return errors.Trace(err)
136+
}
137+
case *replication.RowsQueryEvent:
138+
if err := c.eventHandler.OnRowsQueryEvent(e); err != nil {
139+
return errors.Trace(err)
140+
}
141+
case *replication.QueryEvent:
142+
stmts, _, err := c.parser.Parse(string(e.Query), "", "")
143+
if err != nil {
144+
c.cfg.Logger.Errorf("parse query(%s) err %v, will skip this event", e.Query, err)
145+
return nil
146+
}
147+
for _, stmt := range stmts {
148+
nodes := parseStmt(stmt)
149+
for _, node := range nodes {
150+
if node.db == "" {
151+
node.db = string(e.Schema)
152+
}
153+
if err = c.updateTable(ev.Header, node.db, node.table); err != nil {
110154
return errors.Trace(err)
111155
}
112156
}
113-
continue
114-
case *replication.XIDEvent:
115-
savePos = true
116-
// try to save the position later
117-
if err := c.eventHandler.OnXID(ev.Header, pos); err != nil {
118-
return errors.Trace(err)
119-
}
120-
if e.GSet != nil {
121-
c.master.UpdateGTIDSet(e.GSet)
122-
}
123-
case *replication.MariadbGTIDEvent:
124-
if err := c.eventHandler.OnGTID(ev.Header, e); err != nil {
125-
return errors.Trace(err)
126-
}
127-
case *replication.GTIDEvent:
128-
if err := c.eventHandler.OnGTID(ev.Header, e); err != nil {
129-
return errors.Trace(err)
130-
}
131-
case *replication.RowsQueryEvent:
132-
if err := c.eventHandler.OnRowsQueryEvent(e); err != nil {
133-
return errors.Trace(err)
134-
}
135-
case *replication.QueryEvent:
136-
stmts, _, err := c.parser.Parse(string(e.Query), "", "")
137-
if err != nil {
138-
c.cfg.Logger.Errorf("parse query(%s) err %v, will skip this event", e.Query, err)
139-
continue
140-
}
141-
for _, stmt := range stmts {
142-
nodes := parseStmt(stmt)
143-
for _, node := range nodes {
144-
if node.db == "" {
145-
node.db = string(e.Schema)
146-
}
147-
if err = c.updateTable(ev.Header, node.db, node.table); err != nil {
148-
return errors.Trace(err)
149-
}
150-
}
151-
if len(nodes) > 0 {
152-
savePos = true
153-
force = true
154-
// Now we only handle Table Changed DDL, maybe we will support more later.
155-
if err = c.eventHandler.OnDDL(ev.Header, pos, e); err != nil {
156-
return errors.Trace(err)
157-
}
157+
if len(nodes) > 0 {
158+
savePos = true
159+
force = true
160+
// Now we only handle Table Changed DDL, maybe we will support more later.
161+
if err = c.eventHandler.OnDDL(ev.Header, pos, e); err != nil {
162+
return errors.Trace(err)
158163
}
159164
}
160-
if savePos && e.GSet != nil {
161-
c.master.UpdateGTIDSet(e.GSet)
162-
}
163-
default:
164-
continue
165165
}
166+
if savePos && e.GSet != nil {
167+
c.master.UpdateGTIDSet(e.GSet)
168+
}
169+
default:
170+
return nil
171+
}
166172

167-
if savePos {
168-
c.master.Update(pos)
169-
c.master.UpdateTimestamp(ev.Header.Timestamp)
173+
if savePos {
174+
c.master.Update(pos)
175+
c.master.UpdateTimestamp(ev.Header.Timestamp)
170176

171-
if err := c.eventHandler.OnPosSynced(ev.Header, pos, c.master.GTIDSet(), force); err != nil {
172-
return errors.Trace(err)
173-
}
177+
if err := c.eventHandler.OnPosSynced(ev.Header, pos, c.master.GTIDSet(), force); err != nil {
178+
return errors.Trace(err)
174179
}
175180
}
181+
182+
return nil
176183
}
177184

178185
type node struct {

0 commit comments

Comments
 (0)