-
Notifications
You must be signed in to change notification settings - Fork 1k
on reconnect in the middle of transaction make sure to reread interrupted transaction #420
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
866edb9
ae858cb
f6e1379
697f001
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -120,7 +120,7 @@ type BinlogSyncer struct { | |
|
||
nextPos Position | ||
|
||
gset GTIDSet | ||
prevGset, currGset GTIDSet | ||
|
||
running bool | ||
|
||
|
@@ -376,7 +376,7 @@ func (b *BinlogSyncer) StartSync(pos Position) (*BinlogStreamer, error) { | |
func (b *BinlogSyncer) StartSyncGTID(gset GTIDSet) (*BinlogStreamer, error) { | ||
log.Infof("begin to sync binlog from GTID set %s", gset) | ||
|
||
b.gset = gset | ||
b.prevGset = gset | ||
|
||
b.m.Lock() | ||
defer b.m.Unlock() | ||
|
@@ -385,6 +385,10 @@ func (b *BinlogSyncer) StartSyncGTID(gset GTIDSet) (*BinlogStreamer, error) { | |
return nil, errors.Trace(errSyncRunning) | ||
} | ||
|
||
// establishing network connection here and will start getting binlog events from "gset + 1", thus until first | ||
// MariadbGTIDEvent/GTIDEvent event is received - we effectively do not have a "current GTID" | ||
b.currGset = nil | ||
|
||
if err := b.prepare(); err != nil { | ||
return nil, errors.Trace(err) | ||
} | ||
|
@@ -569,9 +573,14 @@ func (b *BinlogSyncer) retrySync() error { | |
|
||
b.parser.Reset() | ||
|
||
if b.gset != nil { | ||
log.Infof("begin to re-sync from %s", b.gset.String()) | ||
if err := b.prepareSyncGTID(b.gset); err != nil { | ||
if b.prevGset != nil { | ||
msg := fmt.Sprintf("begin to re-sync from %s", b.prevGset.String()) | ||
if b.currGset != nil { | ||
msg = fmt.Sprintf("%v (last read GTID=%v)", msg, b.currGset) | ||
} | ||
log.Infof(msg) | ||
|
||
if err := b.prepareSyncGTID(b.prevGset); err != nil { | ||
return errors.Trace(err) | ||
} | ||
} else { | ||
|
@@ -604,6 +613,10 @@ func (b *BinlogSyncer) prepareSyncPos(pos Position) error { | |
func (b *BinlogSyncer) prepareSyncGTID(gset GTIDSet) error { | ||
var err error | ||
|
||
// re establishing network connection here and will start getting binlog events from "gset + 1", thus until first | ||
// MariadbGTIDEvent/GTIDEvent event is received - we effectively do not have a "current GTID" | ||
b.currGset = nil | ||
|
||
if err = b.prepare(); err != nil { | ||
return errors.Trace(err) | ||
} | ||
|
@@ -643,7 +656,7 @@ func (b *BinlogSyncer) onStream(s *BinlogStreamer) { | |
log.Error(err) | ||
// we meet connection error, should re-connect again with | ||
// last nextPos or nextGTID we got. | ||
if len(b.nextPos.Name) == 0 && b.gset == nil { | ||
if len(b.nextPos.Name) == 0 && b.prevGset == nil { | ||
// we can't get the correct position, close. | ||
s.closeWithError(err) | ||
return | ||
|
@@ -733,33 +746,53 @@ func (b *BinlogSyncer) parseEvent(s *BinlogStreamer, data []byte) error { | |
// Some events like FormatDescriptionEvent return 0, ignore. | ||
b.nextPos.Pos = e.Header.LogPos | ||
} | ||
|
||
getCurrentGtidSet := func() GTIDSet { | ||
if b.currGset == nil { | ||
return nil | ||
} | ||
return b.currGset.Clone() | ||
} | ||
|
||
advanceCurrentGtidSet := func(gtid string) error { | ||
if b.currGset == nil { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. initially |
||
b.currGset = b.prevGset.Clone() | ||
} | ||
prev := b.currGset.Clone() | ||
err := b.currGset.Update(gtid) | ||
if err == nil { | ||
b.prevGset = prev | ||
} | ||
return err | ||
} | ||
|
||
switch event := e.Event.(type) { | ||
case *RotateEvent: | ||
b.nextPos.Name = string(event.NextLogName) | ||
b.nextPos.Pos = uint32(event.Position) | ||
log.Infof("rotate to %s", b.nextPos) | ||
case *GTIDEvent: | ||
if b.gset == nil { | ||
if b.prevGset == nil { | ||
break | ||
} | ||
u, _ := uuid.FromBytes(event.SID) | ||
err := b.gset.Update(fmt.Sprintf("%s:%d", u.String(), event.GNO)) | ||
err := advanceCurrentGtidSet(fmt.Sprintf("%s:%d", u.String(), event.GNO)) | ||
if err != nil { | ||
return errors.Trace(err) | ||
} | ||
case *MariadbGTIDEvent: | ||
if b.gset == nil { | ||
if b.prevGset == nil { | ||
break | ||
} | ||
GTID := event.GTID | ||
err := b.gset.Update(fmt.Sprintf("%d-%d-%d", GTID.DomainID, GTID.ServerID, GTID.SequenceNumber)) | ||
err := advanceCurrentGtidSet(fmt.Sprintf("%d-%d-%d", GTID.DomainID, GTID.ServerID, GTID.SequenceNumber)) | ||
if err != nil { | ||
return errors.Trace(err) | ||
} | ||
case *XIDEvent: | ||
event.GSet = b.getGtidSet() | ||
event.GSet = getCurrentGtidSet() | ||
case *QueryEvent: | ||
event.GSet = b.getGtidSet() | ||
event.GSet = getCurrentGtidSet() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
} | ||
|
||
needStop := false | ||
|
@@ -783,13 +816,6 @@ func (b *BinlogSyncer) parseEvent(s *BinlogStreamer, data []byte) error { | |
return nil | ||
} | ||
|
||
func (b *BinlogSyncer) getGtidSet() GTIDSet { | ||
if b.gset == nil { | ||
return nil | ||
} | ||
return b.gset.Clone() | ||
} | ||
|
||
// LastConnectionID returns last connectionID. | ||
func (b *BinlogSyncer) LastConnectionID() uint32 { | ||
return b.lastConnectionID | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we move these two functions to outside?
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can, however having those functions local emphasizes that they are used/called only in scope of
parseEvent
.