Skip to content

Commit 41e5f10

Browse files
author
Ian Zapolsky
committed
Changes to correct retrySync behavior
1 parent c6ab05a commit 41e5f10

File tree

1 file changed

+35
-16
lines changed

1 file changed

+35
-16
lines changed

replication/binlogsyncer.go

+35-16
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,8 @@ type BinlogSyncer struct {
117117

118118
nextPos Position
119119

120-
gset GTIDSet
120+
currGset GTIDSet
121+
prevGset GTIDSet
121122

122123
running bool
123124

@@ -379,7 +380,7 @@ func (b *BinlogSyncer) StartSync(pos Position) (*BinlogStreamer, error) {
379380
func (b *BinlogSyncer) StartSyncGTID(gset GTIDSet) (*BinlogStreamer, error) {
380381
log.Infof("begin to sync binlog from GTID set %s", gset)
381382

382-
b.gset = gset
383+
b.prevGset = gset
383384

384385
b.m.Lock()
385386
defer b.m.Unlock()
@@ -572,9 +573,9 @@ func (b *BinlogSyncer) retrySync() error {
572573

573574
b.parser.Reset()
574575

575-
if b.gset != nil {
576-
log.Infof("begin to re-sync from %s", b.gset.String())
577-
if err := b.prepareSyncGTID(b.gset); err != nil {
576+
if b.prevGset != nil {
577+
log.Infof("begin to re-sync from %s", b.prevGset.String())
578+
if err := b.prepareSyncGTID(b.prevGset); err != nil {
578579
return errors.Trace(err)
579580
}
580581
} else {
@@ -640,7 +641,7 @@ func (b *BinlogSyncer) onStream(s *BinlogStreamer) {
640641

641642
// we meet connection error, should re-connect again with
642643
// last nextPos or nextGTID we got.
643-
if len(b.nextPos.Name) == 0 && b.gset == nil {
644+
if len(b.nextPos.Name) == 0 && b.prevGset == nil {
644645
// we can't get the correct position, close.
645646
s.closeWithError(err)
646647
return
@@ -730,26 +731,44 @@ func (b *BinlogSyncer) parseEvent(s *BinlogStreamer, data []byte) error {
730731
b.nextPos.Pos = uint32(event.Position)
731732
log.Infof("rotate to %s", b.nextPos)
732733
case *GTIDEvent:
733-
if b.gset == nil {
734+
if b.prevGset == nil {
734735
break
735736
}
736737
u, _ := uuid.FromBytes(event.SID)
737-
err := b.gset.Update(fmt.Sprintf("%s:%d", u.String(), event.GNO))
738-
if err != nil {
739-
return errors.Trace(err)
738+
if b.currGset == nil {
739+
gset, err := ParseGTIDSet(MySQLFlavor, u.String())
740+
if err != nil {
741+
return errors.Trace(err)
742+
}
743+
b.currGset = gset
744+
} else {
745+
err := b.currGset.Update(fmt.Sprintf("%s:%d", u.String(), event.GNO))
746+
if err != nil {
747+
return errors.Trace(err)
748+
}
740749
}
741750
case *MariadbGTIDEvent:
742-
if b.gset == nil {
751+
if b.prevGset == nil {
743752
break
744753
}
745754
GTID := event.GTID
746-
err := b.gset.Update(fmt.Sprintf("%d-%d-%d", GTID.DomainID, GTID.ServerID, GTID.SequenceNumber))
747-
if err != nil {
748-
return errors.Trace(err)
755+
if b.currGset == nil {
756+
gset, err := ParseGTIDSet(MariaDBFlavor, GTID.String())
757+
if err != nil {
758+
return errors.Trace(err)
759+
}
760+
b.currGset = gset
761+
} else {
762+
err := b.currGset.Update(fmt.Sprintf("%d-%d-%d", GTID.DomainID, GTID.ServerID, GTID.SequenceNumber))
763+
if err != nil {
764+
return errors.Trace(err)
765+
}
749766
}
750767
case *XIDEvent:
768+
b.prevGset = b.currGset.Clone()
751769
event.GSet = b.getGtidSet()
752770
case *QueryEvent:
771+
b.prevGset = b.currGset.Clone()
753772
event.GSet = b.getGtidSet()
754773
}
755774

@@ -775,10 +794,10 @@ func (b *BinlogSyncer) parseEvent(s *BinlogStreamer, data []byte) error {
775794
}
776795

777796
func (b *BinlogSyncer) getGtidSet() GTIDSet {
778-
if b.gset == nil {
797+
if b.prevGset == nil {
779798
return nil
780799
}
781-
return b.gset.Clone()
800+
return b.prevGset.Clone()
782801
}
783802

784803
// LastConnectionID returns last connectionID.

0 commit comments

Comments
 (0)