-
Notifications
You must be signed in to change notification settings - Fork 1k
Add more MySQL-8.0 meta data to GTIDEvent and TableMapEvent #468
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 12 commits
514d075
1e585a9
782e710
3e9f1d7
5dd4c32
c61b8a9
c66d0a6
d3fef7b
a624aa6
27d6b7e
fcf8f97
9ddfbf2
efddbd1
52c09af
41ecfa6
3e4d0b3
6881af6
2a96a8c
02e8369
7626785
569203c
b758aa5
d9d4b74
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,6 +21,7 @@ const ( | |
LogicalTimestampTypeCode = 2 | ||
PartLogicalTimestampLength = 8 | ||
BinlogChecksumLength = 4 | ||
UndefinedServerVer = 999999 // UNDEFINED_SERVER_VERSION | ||
) | ||
|
||
type BinlogEvent struct { | ||
|
@@ -349,6 +350,18 @@ type GTIDEvent struct { | |
GNO int64 | ||
LastCommitted int64 | ||
SequenceNumber int64 | ||
|
||
// The followings are available only after MySQL-8.0 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How about adding more details about these fields? ( There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No problem |
||
|
||
ImmediateCommitTimestamp uint64 | ||
OriginalCommitTimestamp uint64 | ||
|
||
// Total transaction length (including this GTIDEvent), see: | ||
// https://mysqlhighavailability.com/taking-advantage-of-new-transaction-length-metadata/ | ||
TransactionLength uint64 | ||
|
||
ImmediateServerVersion uint32 | ||
OriginalServerVersion uint32 | ||
} | ||
|
||
func (e *GTIDEvent) Decode(data []byte) error { | ||
|
@@ -359,26 +372,99 @@ func (e *GTIDEvent) Decode(data []byte) error { | |
pos += SidLength | ||
e.GNO = int64(binary.LittleEndian.Uint64(data[pos:])) | ||
pos += 8 | ||
|
||
if len(data) >= 42 { | ||
if uint8(data[pos]) == LogicalTimestampTypeCode { | ||
pos++ | ||
e.LastCommitted = int64(binary.LittleEndian.Uint64(data[pos:])) | ||
pos += PartLogicalTimestampLength | ||
e.SequenceNumber = int64(binary.LittleEndian.Uint64(data[pos:])) | ||
pos += 8 | ||
|
||
// IMMEDIATE_COMMIT_TIMESTAMP_LENGTH = 7 | ||
if len(data)-pos < 7 { | ||
return nil | ||
} | ||
e.ImmediateCommitTimestamp = BytesToUint64(data[pos : pos+7]) | ||
pos += 7 | ||
if (e.ImmediateCommitTimestamp & (uint64(1) << 55)) != 0 { | ||
// If the most significant bit set, another 7 byte follows representing OriginalCommitTimestamp | ||
e.ImmediateCommitTimestamp &= ^(uint64(1) << 55) | ||
e.OriginalCommitTimestamp = BytesToUint64(data[pos : pos+7]) | ||
pos += 7 | ||
|
||
} else { | ||
// Otherwise OriginalCommitTimestamp == ImmediateCommitTimestamp | ||
e.OriginalCommitTimestamp = e.ImmediateCommitTimestamp | ||
|
||
} | ||
|
||
// TRANSACTION_LENGTH_MIN_LENGTH = 1 | ||
if len(data)-pos < 1 { | ||
return nil | ||
} | ||
var n int | ||
e.TransactionLength, _, n = LengthEncodedInt(data[pos:]) | ||
pos += n | ||
|
||
// IMMEDIATE_SERVER_VERSION_LENGTH = 4 | ||
e.ImmediateServerVersion = UndefinedServerVer | ||
e.OriginalServerVersion = UndefinedServerVer | ||
if len(data)-pos < 4 { | ||
return nil | ||
} | ||
e.ImmediateServerVersion = binary.LittleEndian.Uint32(data[pos:]) | ||
pos += 4 | ||
if (e.ImmediateServerVersion & (uint32(1) << 31)) != 0 { | ||
// If the most significant bit set, another 4 byte follows representing OriginalServerVersion | ||
e.ImmediateServerVersion &= ^(uint32(1) << 31) | ||
e.OriginalServerVersion = binary.LittleEndian.Uint32(data[pos:]) | ||
pos += 4 | ||
|
||
} else { | ||
// Otherwise OriginalServerVersion == ImmediateServerVersion | ||
e.OriginalServerVersion = e.ImmediateServerVersion | ||
|
||
} | ||
|
||
} | ||
} | ||
return nil | ||
} | ||
|
||
func (e *GTIDEvent) Dump(w io.Writer) { | ||
fmtTime := func(t time.Time) string { | ||
if t.IsZero() { | ||
return "<n/a>" | ||
} | ||
return t.Format(time.RFC3339Nano) | ||
} | ||
|
||
fmt.Fprintf(w, "Commit flag: %d\n", e.CommitFlag) | ||
u, _ := uuid.FromBytes(e.SID) | ||
fmt.Fprintf(w, "GTID_NEXT: %s:%d\n", u.String(), e.GNO) | ||
fmt.Fprintf(w, "LAST_COMMITTED: %d\n", e.LastCommitted) | ||
fmt.Fprintf(w, "SEQUENCE_NUMBER: %d\n", e.SequenceNumber) | ||
fmt.Fprintf(w, "Immediate commmit timestamp: %d (%s)\n", e.ImmediateCommitTimestamp, fmtTime(e.ImmediateCommitTime())) | ||
fmt.Fprintf(w, "Orignal commmit timestamp: %d (%s)\n", e.OriginalCommitTimestamp, fmtTime(e.OriginalCommitTime())) | ||
fmt.Fprintf(w, "Transaction length: %d\n", e.TransactionLength) | ||
fmt.Fprintf(w, "Immediate server version: %d\n", e.ImmediateServerVersion) | ||
fmt.Fprintf(w, "Orignal server version: %d\n", e.OriginalServerVersion) | ||
fmt.Fprintln(w) | ||
} | ||
|
||
// ImmediateCommitTime returns the commit time of this trx on the immediate server | ||
// or zero time if not available. | ||
func (e *GTIDEvent) ImmediateCommitTime() time.Time { | ||
return microSecTimestampToTime(e.ImmediateCommitTimestamp) | ||
} | ||
|
||
// OriginalCommitTime returns the commit time of this trx on the original server | ||
// or zero time if not available. | ||
func (e *GTIDEvent) OriginalCommitTime() time.Time { | ||
return microSecTimestampToTime(e.OriginalCommitTimestamp) | ||
} | ||
|
||
type BeginLoadQueryEvent struct { | ||
FileID uint32 | ||
BlockData []byte | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about returning an error instead of
panic
as otherfunc
did in this file?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No problem