@@ -43,6 +43,11 @@ func (c *Canal) runSyncBinlog() error {
43
43
44
44
savePos := false
45
45
force := false
46
+
47
+ // The name of the binlog file received in the fake rotate event.
48
+ // It must be preserved until the new position is saved.
49
+ fakeRotateLogName := ""
50
+
46
51
for {
47
52
ev , err := s .GetEvent (c .ctx )
48
53
if err != nil {
@@ -51,11 +56,21 @@ func (c *Canal) runSyncBinlog() error {
51
56
52
57
// Update the delay between the Canal and the Master before the handler hooks are called
53
58
c .updateReplicationDelay (ev )
54
- // if log pos equal zero ,it is a fake rotate event,ignore it.
55
- // see https://github.com/mysql/mysql-server/blob/8cc757da3d87bf4a1f07dcfb2d3c96fed3806870/sql/rpl_binlog_sender.cc#L899
59
+
60
+ // If log pos equals zero then the received event is a fake rotate event and
61
+ // contains only a name of the next binlog file
62
+ // See https://github.com/mysql/mysql-server/blob/8e797a5d6eb3a87f16498edcb7261a75897babae/sql/rpl_binlog_sender.h#L235
63
+ // and https://github.com/mysql/mysql-server/blob/8cc757da3d87bf4a1f07dcfb2d3c96fed3806870/sql/rpl_binlog_sender.cc#L899
56
64
if ev .Header .LogPos == 0 {
65
+ switch e := ev .Event .(type ) {
66
+ case * replication.RotateEvent :
67
+ fakeRotateLogName = string (e .NextLogName )
68
+ log .Infof ("received fake rotate event, next log name is %s" , e .NextLogName )
69
+ }
70
+
57
71
continue
58
72
}
73
+
59
74
savePos = false
60
75
force = false
61
76
pos := c .master .Position ()
@@ -64,6 +79,11 @@ func (c *Canal) runSyncBinlog() error {
64
79
// next binlog pos
65
80
pos .Pos = ev .Header .LogPos
66
81
82
+ // new file name received in the fake rotate event
83
+ if fakeRotateLogName != "" {
84
+ pos .Name = fakeRotateLogName
85
+ }
86
+
67
87
// We only save position with RotateEvent and XIDEvent.
68
88
// For RowsEvent, we can't save the position until meeting XIDEvent
69
89
// which tells the whole transaction is over.
@@ -154,6 +174,8 @@ func (c *Canal) runSyncBinlog() error {
154
174
if savePos {
155
175
c .master .Update (pos )
156
176
c .master .UpdateTimestamp (ev .Header .Timestamp )
177
+ fakeRotateLogName = ""
178
+
157
179
if err := c .eventHandler .OnPosSynced (pos , c .master .GTIDSet (), force ); err != nil {
158
180
return errors .Trace (err )
159
181
}
0 commit comments