Skip to content

Commit 376753e

Browse files
gleonidsiddontang
authored andcommitted
on reconnect in the middle of transaction make sure to reread interrupted transaction (#420)
* on reconnect in the middle of transaction make sure to read interrupted transaction from the beginning this is another stab at issues #414 and #416
1 parent 1dc6383 commit 376753e

File tree

1 file changed

+45
-19
lines changed

1 file changed

+45
-19
lines changed

replication/binlogsyncer.go

Lines changed: 45 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ type BinlogSyncer struct {
120120

121121
nextPos Position
122122

123-
gset GTIDSet
123+
prevGset, currGset GTIDSet
124124

125125
running bool
126126

@@ -376,7 +376,7 @@ func (b *BinlogSyncer) StartSync(pos Position) (*BinlogStreamer, error) {
376376
func (b *BinlogSyncer) StartSyncGTID(gset GTIDSet) (*BinlogStreamer, error) {
377377
log.Infof("begin to sync binlog from GTID set %s", gset)
378378

379-
b.gset = gset
379+
b.prevGset = gset
380380

381381
b.m.Lock()
382382
defer b.m.Unlock()
@@ -385,6 +385,10 @@ func (b *BinlogSyncer) StartSyncGTID(gset GTIDSet) (*BinlogStreamer, error) {
385385
return nil, errors.Trace(errSyncRunning)
386386
}
387387

388+
// establishing network connection here and will start getting binlog events from "gset + 1", thus until first
389+
// MariadbGTIDEvent/GTIDEvent event is received - we effectively do not have a "current GTID"
390+
b.currGset = nil
391+
388392
if err := b.prepare(); err != nil {
389393
return nil, errors.Trace(err)
390394
}
@@ -569,9 +573,14 @@ func (b *BinlogSyncer) retrySync() error {
569573

570574
b.parser.Reset()
571575

572-
if b.gset != nil {
573-
log.Infof("begin to re-sync from %s", b.gset.String())
574-
if err := b.prepareSyncGTID(b.gset); err != nil {
576+
if b.prevGset != nil {
577+
msg := fmt.Sprintf("begin to re-sync from %s", b.prevGset.String())
578+
if b.currGset != nil {
579+
msg = fmt.Sprintf("%v (last read GTID=%v)", msg, b.currGset)
580+
}
581+
log.Infof(msg)
582+
583+
if err := b.prepareSyncGTID(b.prevGset); err != nil {
575584
return errors.Trace(err)
576585
}
577586
} else {
@@ -604,6 +613,10 @@ func (b *BinlogSyncer) prepareSyncPos(pos Position) error {
604613
func (b *BinlogSyncer) prepareSyncGTID(gset GTIDSet) error {
605614
var err error
606615

616+
// re establishing network connection here and will start getting binlog events from "gset + 1", thus until first
617+
// MariadbGTIDEvent/GTIDEvent event is received - we effectively do not have a "current GTID"
618+
b.currGset = nil
619+
607620
if err = b.prepare(); err != nil {
608621
return errors.Trace(err)
609622
}
@@ -643,7 +656,7 @@ func (b *BinlogSyncer) onStream(s *BinlogStreamer) {
643656
log.Error(err)
644657
// we meet connection error, should re-connect again with
645658
// last nextPos or nextGTID we got.
646-
if len(b.nextPos.Name) == 0 && b.gset == nil {
659+
if len(b.nextPos.Name) == 0 && b.prevGset == nil {
647660
// we can't get the correct position, close.
648661
s.closeWithError(err)
649662
return
@@ -733,33 +746,53 @@ func (b *BinlogSyncer) parseEvent(s *BinlogStreamer, data []byte) error {
733746
// Some events like FormatDescriptionEvent return 0, ignore.
734747
b.nextPos.Pos = e.Header.LogPos
735748
}
749+
750+
getCurrentGtidSet := func() GTIDSet {
751+
if b.currGset == nil {
752+
return nil
753+
}
754+
return b.currGset.Clone()
755+
}
756+
757+
advanceCurrentGtidSet := func(gtid string) error {
758+
if b.currGset == nil {
759+
b.currGset = b.prevGset.Clone()
760+
}
761+
prev := b.currGset.Clone()
762+
err := b.currGset.Update(gtid)
763+
if err == nil {
764+
b.prevGset = prev
765+
}
766+
return err
767+
}
768+
736769
switch event := e.Event.(type) {
737770
case *RotateEvent:
738771
b.nextPos.Name = string(event.NextLogName)
739772
b.nextPos.Pos = uint32(event.Position)
740773
log.Infof("rotate to %s", b.nextPos)
741774
case *GTIDEvent:
742-
if b.gset == nil {
775+
if b.prevGset == nil {
743776
break
744777
}
745778
u, _ := uuid.FromBytes(event.SID)
746-
err := b.gset.Update(fmt.Sprintf("%s:%d", u.String(), event.GNO))
779+
err := advanceCurrentGtidSet(fmt.Sprintf("%s:%d", u.String(), event.GNO))
747780
if err != nil {
748781
return errors.Trace(err)
749782
}
750783
case *MariadbGTIDEvent:
751-
if b.gset == nil {
784+
if b.prevGset == nil {
752785
break
753786
}
754787
GTID := event.GTID
755-
err := b.gset.Update(fmt.Sprintf("%d-%d-%d", GTID.DomainID, GTID.ServerID, GTID.SequenceNumber))
788+
err := advanceCurrentGtidSet(fmt.Sprintf("%d-%d-%d", GTID.DomainID, GTID.ServerID, GTID.SequenceNumber))
756789
if err != nil {
757790
return errors.Trace(err)
758791
}
759792
case *XIDEvent:
760-
event.GSet = b.getGtidSet()
793+
event.GSet = getCurrentGtidSet()
761794
case *QueryEvent:
762-
event.GSet = b.getGtidSet()
795+
event.GSet = getCurrentGtidSet()
763796
}
764797

765798
needStop := false
@@ -783,13 +816,6 @@ func (b *BinlogSyncer) parseEvent(s *BinlogStreamer, data []byte) error {
783816
return nil
784817
}
785818

786-
func (b *BinlogSyncer) getGtidSet() GTIDSet {
787-
if b.gset == nil {
788-
return nil
789-
}
790-
return b.gset.Clone()
791-
}
792-
793819
// LastConnectionID returns last connectionID.
794820
func (b *BinlogSyncer) LastConnectionID() uint32 {
795821
return b.lastConnectionID

0 commit comments

Comments
 (0)