@@ -120,7 +120,7 @@ type BinlogSyncer struct {
120
120
121
121
nextPos Position
122
122
123
- gset GTIDSet
123
+ prevGset , currGset GTIDSet
124
124
125
125
running bool
126
126
@@ -376,7 +376,7 @@ func (b *BinlogSyncer) StartSync(pos Position) (*BinlogStreamer, error) {
376
376
func (b * BinlogSyncer ) StartSyncGTID (gset GTIDSet ) (* BinlogStreamer , error ) {
377
377
log .Infof ("begin to sync binlog from GTID set %s" , gset )
378
378
379
- b .gset = gset
379
+ b .prevGset = gset
380
380
381
381
b .m .Lock ()
382
382
defer b .m .Unlock ()
@@ -569,9 +569,9 @@ func (b *BinlogSyncer) retrySync() error {
569
569
570
570
b .parser .Reset ()
571
571
572
- if b .gset != nil {
573
- log .Infof ("begin to re-sync from %s" , b .gset .String ())
574
- if err := b .prepareSyncGTID (b .gset ); err != nil {
572
+ if b .prevGset != nil {
573
+ log .Infof ("begin to re-sync from %s" , b .prevGset .String ())
574
+ if err := b .prepareSyncGTID (b .prevGset ); err != nil {
575
575
return errors .Trace (err )
576
576
}
577
577
} else {
@@ -643,7 +643,7 @@ func (b *BinlogSyncer) onStream(s *BinlogStreamer) {
643
643
log .Error (err )
644
644
// we meet connection error, should re-connect again with
645
645
// last nextPos or nextGTID we got.
646
- if len (b .nextPos .Name ) == 0 && b .gset == nil {
646
+ if len (b .nextPos .Name ) == 0 && b .prevGset == nil {
647
647
// we can't get the correct position, close.
648
648
s .closeWithError (err )
649
649
return
@@ -733,33 +733,53 @@ func (b *BinlogSyncer) parseEvent(s *BinlogStreamer, data []byte) error {
733
733
// Some events like FormatDescriptionEvent return 0, ignore.
734
734
b .nextPos .Pos = e .Header .LogPos
735
735
}
736
+
737
+ getCurrentGtidSet := func () GTIDSet {
738
+ if b .currGset == nil {
739
+ return nil
740
+ }
741
+ return b .currGset .Clone ()
742
+ }
743
+
744
+ advanceCurrentGtidSet := func (gtid string ) error {
745
+ if b .currGset == nil {
746
+ b .currGset = b .prevGset .Clone ()
747
+ }
748
+ prev := b .currGset .Clone ()
749
+ err := b .currGset .Update (gtid )
750
+ if err == nil {
751
+ b .prevGset = prev
752
+ }
753
+ return err
754
+ }
755
+
736
756
switch event := e .Event .(type ) {
737
757
case * RotateEvent :
738
758
b .nextPos .Name = string (event .NextLogName )
739
759
b .nextPos .Pos = uint32 (event .Position )
740
760
log .Infof ("rotate to %s" , b .nextPos )
741
761
case * GTIDEvent :
742
- if b .gset == nil {
762
+ if b .prevGset == nil {
743
763
break
744
764
}
745
765
u , _ := uuid .FromBytes (event .SID )
746
- err := b . gset . Update (fmt .Sprintf ("%s:%d" , u .String (), event .GNO ))
766
+ err := advanceCurrentGtidSet (fmt .Sprintf ("%s:%d" , u .String (), event .GNO ))
747
767
if err != nil {
748
768
return errors .Trace (err )
749
769
}
750
770
case * MariadbGTIDEvent :
751
- if b .gset == nil {
771
+ if b .prevGset == nil {
752
772
break
753
773
}
754
774
GTID := event .GTID
755
- err := b . gset . Update (fmt .Sprintf ("%d-%d-%d" , GTID .DomainID , GTID .ServerID , GTID .SequenceNumber ))
775
+ err := advanceCurrentGtidSet (fmt .Sprintf ("%d-%d-%d" , GTID .DomainID , GTID .ServerID , GTID .SequenceNumber ))
756
776
if err != nil {
757
777
return errors .Trace (err )
758
778
}
759
779
case * XIDEvent :
760
- event .GSet = b . getGtidSet ()
780
+ event .GSet = getCurrentGtidSet ()
761
781
case * QueryEvent :
762
- event .GSet = b . getGtidSet ()
782
+ event .GSet = getCurrentGtidSet ()
763
783
}
764
784
765
785
needStop := false
@@ -783,13 +803,6 @@ func (b *BinlogSyncer) parseEvent(s *BinlogStreamer, data []byte) error {
783
803
return nil
784
804
}
785
805
786
- func (b * BinlogSyncer ) getGtidSet () GTIDSet {
787
- if b .gset == nil {
788
- return nil
789
- }
790
- return b .gset .Clone ()
791
- }
792
-
793
806
// LastConnectionID returns last connectionID.
794
807
func (b * BinlogSyncer ) LastConnectionID () uint32 {
795
808
return b .lastConnectionID
0 commit comments