diff --git a/replication/binlogsyncer.go b/replication/binlogsyncer.go index 3a5a60643..db24302c8 100644 --- a/replication/binlogsyncer.go +++ b/replication/binlogsyncer.go @@ -120,7 +120,7 @@ type BinlogSyncer struct { nextPos Position - gset GTIDSet + prevGset, currGset GTIDSet running bool @@ -376,7 +376,7 @@ func (b *BinlogSyncer) StartSync(pos Position) (*BinlogStreamer, error) { func (b *BinlogSyncer) StartSyncGTID(gset GTIDSet) (*BinlogStreamer, error) { log.Infof("begin to sync binlog from GTID set %s", gset) - b.gset = gset + b.prevGset = gset b.m.Lock() defer b.m.Unlock() @@ -385,6 +385,10 @@ func (b *BinlogSyncer) StartSyncGTID(gset GTIDSet) (*BinlogStreamer, error) { return nil, errors.Trace(errSyncRunning) } + // establishing network connection here and will start getting binlog events from "gset + 1", thus until first + // MariadbGTIDEvent/GTIDEvent event is received - we effectively do not have a "current GTID" + b.currGset = nil + if err := b.prepare(); err != nil { return nil, errors.Trace(err) } @@ -569,9 +573,14 @@ func (b *BinlogSyncer) retrySync() error { b.parser.Reset() - if b.gset != nil { - log.Infof("begin to re-sync from %s", b.gset.String()) - if err := b.prepareSyncGTID(b.gset); err != nil { + if b.prevGset != nil { + msg := fmt.Sprintf("begin to re-sync from %s", b.prevGset.String()) + if b.currGset != nil { + msg = fmt.Sprintf("%v (last read GTID=%v)", msg, b.currGset) + } + log.Infof(msg) + + if err := b.prepareSyncGTID(b.prevGset); err != nil { return errors.Trace(err) } } else { @@ -604,6 +613,10 @@ func (b *BinlogSyncer) prepareSyncPos(pos Position) error { func (b *BinlogSyncer) prepareSyncGTID(gset GTIDSet) error { var err error + // re establishing network connection here and will start getting binlog events from "gset + 1", thus until first + // MariadbGTIDEvent/GTIDEvent event is received - we effectively do not have a "current GTID" + b.currGset = nil + if err = b.prepare(); err != nil { return errors.Trace(err) } @@ -643,7 +656,7 @@ func (b *BinlogSyncer) onStream(s *BinlogStreamer) { log.Error(err) // we meet connection error, should re-connect again with // last nextPos or nextGTID we got. - if len(b.nextPos.Name) == 0 && b.gset == nil { + if len(b.nextPos.Name) == 0 && b.prevGset == nil { // we can't get the correct position, close. s.closeWithError(err) return @@ -733,33 +746,53 @@ func (b *BinlogSyncer) parseEvent(s *BinlogStreamer, data []byte) error { // Some events like FormatDescriptionEvent return 0, ignore. b.nextPos.Pos = e.Header.LogPos } + + getCurrentGtidSet := func() GTIDSet { + if b.currGset == nil { + return nil + } + return b.currGset.Clone() + } + + advanceCurrentGtidSet := func(gtid string) error { + if b.currGset == nil { + b.currGset = b.prevGset.Clone() + } + prev := b.currGset.Clone() + err := b.currGset.Update(gtid) + if err == nil { + b.prevGset = prev + } + return err + } + switch event := e.Event.(type) { case *RotateEvent: b.nextPos.Name = string(event.NextLogName) b.nextPos.Pos = uint32(event.Position) log.Infof("rotate to %s", b.nextPos) case *GTIDEvent: - if b.gset == nil { + if b.prevGset == nil { break } u, _ := uuid.FromBytes(event.SID) - err := b.gset.Update(fmt.Sprintf("%s:%d", u.String(), event.GNO)) + err := advanceCurrentGtidSet(fmt.Sprintf("%s:%d", u.String(), event.GNO)) if err != nil { return errors.Trace(err) } case *MariadbGTIDEvent: - if b.gset == nil { + if b.prevGset == nil { break } GTID := event.GTID - err := b.gset.Update(fmt.Sprintf("%d-%d-%d", GTID.DomainID, GTID.ServerID, GTID.SequenceNumber)) + err := advanceCurrentGtidSet(fmt.Sprintf("%d-%d-%d", GTID.DomainID, GTID.ServerID, GTID.SequenceNumber)) if err != nil { return errors.Trace(err) } case *XIDEvent: - event.GSet = b.getGtidSet() + event.GSet = getCurrentGtidSet() case *QueryEvent: - event.GSet = b.getGtidSet() + event.GSet = getCurrentGtidSet() } needStop := false @@ -783,13 +816,6 @@ func (b *BinlogSyncer) parseEvent(s *BinlogStreamer, data []byte) error { return nil } -func (b *BinlogSyncer) getGtidSet() GTIDSet { - if b.gset == nil { - return nil - } - return b.gset.Clone() -} - // LastConnectionID returns last connectionID. func (b *BinlogSyncer) LastConnectionID() uint32 { return b.lastConnectionID