Skip to content

Commit 3cd9768

Browse files
gleonidFu Wenhui
authored andcommitted
on reconnect in the middle of transaction make sure to reread interrupted transaction (go-mysql-org#420)
* on reconnect in the middle of transaction make sure to read interrupted transaction from the beginning this is another stab at issues go-mysql-org#414 and go-mysql-org#416
1 parent 7a07094 commit 3cd9768

File tree

1 file changed

+45
-19
lines changed

1 file changed

+45
-19
lines changed

replication/binlogsyncer.go

Lines changed: 45 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ type BinlogSyncer struct {
118118

119119
nextPos Position
120120

121-
gset GTIDSet
121+
prevGset, currGset GTIDSet
122122

123123
running bool
124124

@@ -380,7 +380,7 @@ func (b *BinlogSyncer) StartSync(pos Position) (*BinlogStreamer, error) {
380380
func (b *BinlogSyncer) StartSyncGTID(gset GTIDSet) (*BinlogStreamer, error) {
381381
log.Infof("begin to sync binlog from GTID set %s", gset)
382382

383-
b.gset = gset
383+
b.prevGset = gset
384384

385385
b.m.Lock()
386386
defer b.m.Unlock()
@@ -389,6 +389,10 @@ func (b *BinlogSyncer) StartSyncGTID(gset GTIDSet) (*BinlogStreamer, error) {
389389
return nil, errors.Trace(errSyncRunning)
390390
}
391391

392+
// establishing network connection here and will start getting binlog events from "gset + 1", thus until first
393+
// MariadbGTIDEvent/GTIDEvent event is received - we effectively do not have a "current GTID"
394+
b.currGset = nil
395+
392396
if err := b.prepare(); err != nil {
393397
return nil, errors.Trace(err)
394398
}
@@ -573,9 +577,14 @@ func (b *BinlogSyncer) retrySync() error {
573577

574578
b.parser.Reset()
575579

576-
if b.gset != nil {
577-
log.Infof("begin to re-sync from %s", b.gset.String())
578-
if err := b.prepareSyncGTID(b.gset); err != nil {
580+
if b.prevGset != nil {
581+
msg := fmt.Sprintf("begin to re-sync from %s", b.prevGset.String())
582+
if b.currGset != nil {
583+
msg = fmt.Sprintf("%v (last read GTID=%v)", msg, b.currGset)
584+
}
585+
log.Infof(msg)
586+
587+
if err := b.prepareSyncGTID(b.prevGset); err != nil {
579588
return errors.Trace(err)
580589
}
581590
} else {
@@ -608,6 +617,10 @@ func (b *BinlogSyncer) prepareSyncPos(pos Position) error {
608617
func (b *BinlogSyncer) prepareSyncGTID(gset GTIDSet) error {
609618
var err error
610619

620+
// re establishing network connection here and will start getting binlog events from "gset + 1", thus until first
621+
// MariadbGTIDEvent/GTIDEvent event is received - we effectively do not have a "current GTID"
622+
b.currGset = nil
623+
611624
if err = b.prepare(); err != nil {
612625
return errors.Trace(err)
613626
}
@@ -644,7 +657,7 @@ func (b *BinlogSyncer) onStream(s *BinlogStreamer) {
644657

645658
// we meet connection error, should re-connect again with
646659
// last nextPos or nextGTID we got.
647-
if len(b.nextPos.Name) == 0 && b.gset == nil {
660+
if len(b.nextPos.Name) == 0 && b.prevGset == nil {
648661
// we can't get the correct position, close.
649662
s.closeWithError(err)
650663
return
@@ -732,33 +745,53 @@ func (b *BinlogSyncer) parseEvent(spanContext opentracing.SpanContext, s *Binlog
732745
// Some events like FormatDescriptionEvent return 0, ignore.
733746
b.nextPos.Pos = e.Header.LogPos
734747
}
748+
749+
getCurrentGtidSet := func() GTIDSet {
750+
if b.currGset == nil {
751+
return nil
752+
}
753+
return b.currGset.Clone()
754+
}
755+
756+
advanceCurrentGtidSet := func(gtid string) error {
757+
if b.currGset == nil {
758+
b.currGset = b.prevGset.Clone()
759+
}
760+
prev := b.currGset.Clone()
761+
err := b.currGset.Update(gtid)
762+
if err == nil {
763+
b.prevGset = prev
764+
}
765+
return err
766+
}
767+
735768
switch event := e.Event.(type) {
736769
case *RotateEvent:
737770
b.nextPos.Name = string(event.NextLogName)
738771
b.nextPos.Pos = uint32(event.Position)
739772
log.Infof("rotate to %s", b.nextPos)
740773
case *GTIDEvent:
741-
if b.gset == nil {
774+
if b.prevGset == nil {
742775
break
743776
}
744777
u, _ := uuid.FromBytes(event.SID)
745-
err := b.gset.Update(fmt.Sprintf("%s:%d", u.String(), event.GNO))
778+
err := advanceCurrentGtidSet(fmt.Sprintf("%s:%d", u.String(), event.GNO))
746779
if err != nil {
747780
return errors.Trace(err)
748781
}
749782
case *MariadbGTIDEvent:
750-
if b.gset == nil {
783+
if b.prevGset == nil {
751784
break
752785
}
753786
GTID := event.GTID
754-
err := b.gset.Update(fmt.Sprintf("%d-%d-%d", GTID.DomainID, GTID.ServerID, GTID.SequenceNumber))
787+
err := advanceCurrentGtidSet(fmt.Sprintf("%d-%d-%d", GTID.DomainID, GTID.ServerID, GTID.SequenceNumber))
755788
if err != nil {
756789
return errors.Trace(err)
757790
}
758791
case *XIDEvent:
759-
event.GSet = b.getGtidSet()
792+
event.GSet = getCurrentGtidSet()
760793
case *QueryEvent:
761-
event.GSet = b.getGtidSet()
794+
event.GSet = getCurrentGtidSet()
762795
}
763796

764797
needStop := false
@@ -782,13 +815,6 @@ func (b *BinlogSyncer) parseEvent(spanContext opentracing.SpanContext, s *Binlog
782815
return nil
783816
}
784817

785-
func (b *BinlogSyncer) getGtidSet() GTIDSet {
786-
if b.gset == nil {
787-
return nil
788-
}
789-
return b.gset.Clone()
790-
}
791-
792818
// LastConnectionID returns last connectionID.
793819
func (b *BinlogSyncer) LastConnectionID() uint32 {
794820
return b.lastConnectionID

0 commit comments

Comments
 (0)