Skip to content

on reconnect in the middle of transaction make sure to reread interrupted transaction #420

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 32 additions & 19 deletions replication/binlogsyncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ type BinlogSyncer struct {

nextPos Position

gset GTIDSet
prevGset, currGset GTIDSet

running bool

Expand Down Expand Up @@ -376,7 +376,7 @@ func (b *BinlogSyncer) StartSync(pos Position) (*BinlogStreamer, error) {
func (b *BinlogSyncer) StartSyncGTID(gset GTIDSet) (*BinlogStreamer, error) {
log.Infof("begin to sync binlog from GTID set %s", gset)

b.gset = gset
b.prevGset = gset

b.m.Lock()
defer b.m.Unlock()
Expand Down Expand Up @@ -569,9 +569,9 @@ func (b *BinlogSyncer) retrySync() error {

b.parser.Reset()

if b.gset != nil {
log.Infof("begin to re-sync from %s", b.gset.String())
if err := b.prepareSyncGTID(b.gset); err != nil {
if b.prevGset != nil {
log.Infof("begin to re-sync from %s", b.prevGset.String())
if err := b.prepareSyncGTID(b.prevGset); err != nil {
return errors.Trace(err)
}
} else {
Expand Down Expand Up @@ -643,7 +643,7 @@ func (b *BinlogSyncer) onStream(s *BinlogStreamer) {
log.Error(err)
// we meet connection error, should re-connect again with
// last nextPos or nextGTID we got.
if len(b.nextPos.Name) == 0 && b.gset == nil {
if len(b.nextPos.Name) == 0 && b.prevGset == nil {
// we can't get the correct position, close.
s.closeWithError(err)
return
Expand Down Expand Up @@ -733,33 +733,53 @@ func (b *BinlogSyncer) parseEvent(s *BinlogStreamer, data []byte) error {
// Some events like FormatDescriptionEvent return 0, ignore.
b.nextPos.Pos = e.Header.LogPos
}

getCurrentGtidSet := func() GTIDSet {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we move these two functions to outside?

Copy link
Contributor Author

@gleonid gleonid Sep 9, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can, however having those functions local emphasizes that they are used/called only in scope of parseEvent.

if b.currGset == nil {
return nil
}
return b.currGset.Clone()
}

advanceCurrentGtidSet := func(gtid string) error {
if b.currGset == nil {
Copy link
Contributor Author

@gleonid gleonid Sep 2, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

initially currGset is nil, while prevGset is set
purpose of b.currGset = b.prevGset.Clone() to once allocate memory for currGset so b.currGset.Update call on line 749 can be made

b.currGset = b.prevGset.Clone()
}
prev := b.currGset.Clone()
err := b.currGset.Update(gtid)
if err == nil {
b.prevGset = prev
}
return err
}

switch event := e.Event.(type) {
case *RotateEvent:
b.nextPos.Name = string(event.NextLogName)
b.nextPos.Pos = uint32(event.Position)
log.Infof("rotate to %s", b.nextPos)
case *GTIDEvent:
if b.gset == nil {
if b.prevGset == nil {
break
}
u, _ := uuid.FromBytes(event.SID)
err := b.gset.Update(fmt.Sprintf("%s:%d", u.String(), event.GNO))
err := advanceCurrentGtidSet(fmt.Sprintf("%s:%d", u.String(), event.GNO))
if err != nil {
return errors.Trace(err)
}
case *MariadbGTIDEvent:
if b.gset == nil {
if b.prevGset == nil {
break
}
GTID := event.GTID
err := b.gset.Update(fmt.Sprintf("%d-%d-%d", GTID.DomainID, GTID.ServerID, GTID.SequenceNumber))
err := advanceCurrentGtidSet(fmt.Sprintf("%d-%d-%d", GTID.DomainID, GTID.ServerID, GTID.SequenceNumber))
if err != nil {
return errors.Trace(err)
}
case *XIDEvent:
event.GSet = b.getGtidSet()
event.GSet = getCurrentGtidSet()
case *QueryEvent:
event.GSet = b.getGtidSet()
event.GSet = getCurrentGtidSet()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getCurrentGtidSet is a replacement for BinlogSyncer.getGtidSet()
a. getCurrentGtidSet returns currGset if set
b. it became local function to emphasize scope of usage

}

needStop := false
Expand All @@ -783,13 +803,6 @@ func (b *BinlogSyncer) parseEvent(s *BinlogStreamer, data []byte) error {
return nil
}

func (b *BinlogSyncer) getGtidSet() GTIDSet {
if b.gset == nil {
return nil
}
return b.gset.Clone()
}

// LastConnectionID returns last connectionID.
func (b *BinlogSyncer) LastConnectionID() uint32 {
return b.lastConnectionID
Expand Down