Skip to content

Add more MySQL-8.0 meta data to GTIDEvent and TableMapEvent #468

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 23 commits into from
Mar 11, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
514d075
Add some optional meta fields for TABLE_MAP_EVENT: signedness/column
huangjunwen Jan 29, 2020
1e585a9
Add mysql 8.0 new fields to GTIDEvent
huangjunwen Jan 30, 2020
782e710
Add some timestamp helper methods for GTIDEvent
huangjunwen Jan 30, 2020
3e9f1d7
Move some code
huangjunwen Jan 30, 2020
5dd4c32
Rename and move a function BytesToUint64 to mysql package
huangjunwen Jan 30, 2020
c61b8a9
Add some helper methods to TableMapEvent
huangjunwen Jan 30, 2020
c66d0a6
Make TableMapEvent's Dump prettier
huangjunwen Jan 31, 2020
d3fef7b
Remove unneccesary code
huangjunwen Jan 31, 2020
a624aa6
Fix compile error before go 1.13: shift count type int, must be unsig…
huangjunwen Jan 31, 2020
27d6b7e
Extract charset/collation info from TableMapEvent
huangjunwen Feb 1, 2020
fcf8f97
Minor fix: do not allocate collation map if not neccessary
huangjunwen Feb 1, 2020
9ddfbf2
Merge branch 'master' into master
siddontang Feb 22, 2020
efddbd1
Remove BytesToUint64 since FixedLengthInt already serve the same purpose
huangjunwen Mar 8, 2020
52c09af
Add note for field type
huangjunwen Mar 8, 2020
41ecfa6
Fix CollationMap error when the table contains enum or set fields
huangjunwen Mar 8, 2020
3e4d0b3
Refactor decodeOptionalMeta
huangjunwen Mar 8, 2020
6881af6
Add support for other table map optional meta as well
huangjunwen Mar 9, 2020
2a96a8c
Remove helper functions in TableMapEvent now. Need more investigation…
huangjunwen Mar 10, 2020
02e8369
Add some test cases for TabeMapEvent optional meta decode
huangjunwen Mar 10, 2020
7626785
Remove type.go
huangjunwen Mar 10, 2020
569203c
Merge branch 'master' of github.com:huangjunwen/go-mysql
huangjunwen Mar 10, 2020
b758aa5
Add some comments about new fields in GTIDEvent..
huangjunwen Mar 10, 2020
d9d4b74
Add more test for GTIDEvent and TableMapEvent
huangjunwen Mar 10, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 87 additions & 0 deletions notes/field_type.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@

There are several `types` for each field (column) type, such as `type()`/`real_type()/binlog_type()` ...

By default, `real_type()` and `binlog_type()` are the same as `type()`:

```
// From mysql-8.0/sql/field.h Field

// ...
virtual enum_field_types real_type() const { return type(); }
virtual enum_field_types binlog_type() const {
/*
Binlog stores field->type() as type code by default.
This puts MYSQL_TYPE_STRING in case of CHAR, VARCHAR, SET and ENUM,
with extra data type details put into metadata.

We cannot store field->type() in case of temporal types with
fractional seconds: TIME(n), DATETIME(n) and TIMESTAMP(n),
because binlog records with MYSQL_TYPE_TIME, MYSQL_TYPE_DATETIME
type codes do not have metadata.
So for temporal data types with fractional seconds we'll store
real_type() type codes instead, i.e.
MYSQL_TYPE_TIME2, MYSQL_TYPE_DATETIME2, MYSQL_TYPE_TIMESTAMP2,
and put precision into metatada.

Note: perhaps binlog should eventually be modified to store
real_type() instead of type() for all column types.
*/
return type();
}
// ...
```

Here is a list collected from `mysql-8.0/sql/field.h`:

```
+------------------------------------------------------------+-------------------------------+-----------------------+------------------------+
| Field | type() | real_type() | binlog_type() |
+------------------------------------------------------------+-------------------------------+-----------------------+------------------------+
| | | | |
| Field (abstract) | | | |
| | | | | |
| +--Field_bit | MYSQL_TYPE_BIT | | |
| | +--Field_bit_as_char | | | |
| | | | | |
| +--Field_num (abstract) | | | |
| | | +--Field_real (abstract) | | | |
| | | +--Field_decimal | MYSQL_TYPE_DECIMAL | | |
| | | +--Field_float | MYSQL_TYPE_FLOAT | | |
| | | +--Field_double | MYSQL_TYPE_DOUBLE | | |
| | | | | | |
| | +--Field_new_decimal | MYSQL_TYPE_NEWDECIMAL | | |
| | +--Field_short | MYSQL_TYPE_SHORT | | |
| | +--Field_medium | MYSQL_TYPE_INT24 | | |
| | +--Field_long | MYSQL_TYPE_LONG | | |
| | +--Field_longlong | MYSQL_TYPE_LONGLONG | | |
| | +--Field_tiny | MYSQL_TYPE_TINY | | |
| | +--Field_year | MYSQL_TYPE_YEAR | | |
| | | | | |
| +--Field_str (abstract) | | | |
| | +--Field_longstr | | | |
| | | +--Field_string | MYSQL_TYPE_STRING | MYSQL_TYPE_STRING | |
| | | +--Field_varstring | MYSQL_TYPE_VARCHAR | MYSQL_TYPE_VARCHAR | |
| | | +--Field_blob | MYSQL_TYPE_BLOB | | |
| | | +--Field_geom | MYSQL_TYPE_GEOMETRY | | |
| | | +--Field_json | MYSQL_TYPE_JSON | | |
| | | +--Field_typed_array | real_type_to_type(m_elt_type) | m_elt_type | MYSQL_TYPE_TYPED_ARRAY |
| | | | | | |
| | +--Field_null | MYSQL_TYPE_NULL | | |
| | +--Field_enum | MYSQL_TYPE_STRING | MYSQL_TYPE_ENUM | |
| | +--Field_set | | MYSQL_TYPE_SET | |
| | | | | |
| +--Field_temporal (abstract) | | | |
| +--Field_time_common (abstract) | | | |
| | +--Field_time | MYSQL_TYPE_TIME | | |
| | +--Field_timef | MYSQL_TYPE_TIME | MYSQL_TYPE_TIME2 | MYSQL_TYPE_TIME2 |
| | | | | |
| +--Field_temporal_with_date (abstract) | | | |
| +--Field_newdate | MYSQL_TYPE_DATE | MYSQL_TYPE_NEWDATE | |
| +--Field_temporal_with_date_and_time (abstract) | | | |
| +--Field_timestamp | MYSQL_TYPE_TIMESTAMP | | |
| +--Field_datetime | MYSQL_TYPE_DATETIME | | |
| +--Field_temporal_with_date_and_timef (abstract) | | | |
| +--Field_timestampf | MYSQL_TYPE_TIMESTAMP | MYSQL_TYPE_TIMESTAMP2 | MYSQL_TYPE_TIMESTAMP2 |
| +--Field_datetimef | MYSQL_TYPE_DATETIME | MYSQL_TYPE_DATETIME2 | MYSQL_TYPE_DATETIME2 |
+------------------------------------------------------------+-------------------------------+-----------------------+------------------------+
```
17 changes: 16 additions & 1 deletion replication/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func (e EventType) String() string {
return "ViewChangeEvent"
case XA_PREPARE_LOG_EVENT:
return "XAPrepareLogEvent"

default:
return "UnknownEvent"
}
Expand All @@ -202,3 +202,18 @@ const (
BINLOG_CHECKSUM_ALG_UNDEF byte = 255 // special value to tag undetermined yet checksum
// or events from checksum-unaware servers
)

// These are TABLE_MAP_EVENT's optional metadata field type, from: libbinlogevents/include/rows_event.h
const (
TABLE_MAP_OPT_META_SIGNEDNESS byte = iota + 1
TABLE_MAP_OPT_META_DEFAULT_CHARSET
TABLE_MAP_OPT_META_COLUMN_CHARSET
TABLE_MAP_OPT_META_COLUMN_NAME
TABLE_MAP_OPT_META_SET_STR_VALUE
TABLE_MAP_OPT_META_ENUM_STR_VALUE
TABLE_MAP_OPT_META_GEOMETRY_TYPE
TABLE_MAP_OPT_META_SIMPLE_PRIMARY_KEY
TABLE_MAP_OPT_META_PRIMARY_KEY_WITH_PREFIX
TABLE_MAP_OPT_META_ENUM_AND_SET_DEFAULT_CHARSET
TABLE_MAP_OPT_META_ENUM_AND_SET_COLUMN_CHARSET
)
118 changes: 103 additions & 15 deletions replication/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ const (
LogicalTimestampTypeCode = 2
PartLogicalTimestampLength = 8
BinlogChecksumLength = 4
UndefinedServerVer = 999999 // UNDEFINED_SERVER_VERSION
)

type BinlogEvent struct {
Expand Down Expand Up @@ -225,31 +226,31 @@ type PreviousGTIDsEvent struct {
func (e *PreviousGTIDsEvent) Decode(data []byte) error {
var previousGTIDSets []string
pos := 0
uuidCount := binary.LittleEndian.Uint16(data[pos:pos+8])
uuidCount := binary.LittleEndian.Uint16(data[pos : pos+8])
pos += 8

for i := uint16(0);i < uuidCount; i++ {
uuid := e.decodeUuid(data[pos:pos+16])
for i := uint16(0); i < uuidCount; i++ {
uuid := e.decodeUuid(data[pos : pos+16])
pos += 16
sliceCount := binary.LittleEndian.Uint16(data[pos:pos+8])
sliceCount := binary.LittleEndian.Uint16(data[pos : pos+8])
pos += 8
var intervals []string
for i := uint16(0);i < sliceCount; i++ {
start := e.decodeInterval(data[pos:pos+8])
for i := uint16(0); i < sliceCount; i++ {
start := e.decodeInterval(data[pos : pos+8])
pos += 8
stop := e.decodeInterval(data[pos:pos+8])
stop := e.decodeInterval(data[pos : pos+8])
pos += 8
interval := ""
if stop == start+1 {
interval = fmt.Sprintf("%d",start)
}else {
interval = fmt.Sprintf("%d-%d",start,stop-1)
interval = fmt.Sprintf("%d", start)
} else {
interval = fmt.Sprintf("%d-%d", start, stop-1)
}
intervals = append(intervals,interval)
intervals = append(intervals, interval)
}
previousGTIDSets = append(previousGTIDSets,fmt.Sprintf("%s:%s",uuid,strings.Join(intervals,":")))
previousGTIDSets = append(previousGTIDSets, fmt.Sprintf("%s:%s", uuid, strings.Join(intervals, ":")))
}
e.GTIDSets = fmt.Sprintf("%s",strings.Join(previousGTIDSets,","))
e.GTIDSets = fmt.Sprintf("%s", strings.Join(previousGTIDSets, ","))
return nil
}

Expand All @@ -259,8 +260,8 @@ func (e *PreviousGTIDsEvent) Dump(w io.Writer) {
}

func (e *PreviousGTIDsEvent) decodeUuid(data []byte) string {
return fmt.Sprintf("%s-%s-%s-%s-%s",hex.EncodeToString(data[0:4]),hex.EncodeToString(data[4:6]),
hex.EncodeToString(data[6:8]),hex.EncodeToString(data[8:10]),hex.EncodeToString(data[10:]))
return fmt.Sprintf("%s-%s-%s-%s-%s", hex.EncodeToString(data[0:4]), hex.EncodeToString(data[4:6]),
hex.EncodeToString(data[6:8]), hex.EncodeToString(data[8:10]), hex.EncodeToString(data[10:]))
}

func (e *PreviousGTIDsEvent) decodeInterval(data []byte) uint64 {
Expand Down Expand Up @@ -349,6 +350,20 @@ type GTIDEvent struct {
GNO int64
LastCommitted int64
SequenceNumber int64

// ImmediateCommitTimestamp/OriginalCommitTimestamp are introduced in MySQL-8.0.1, see:
// https://mysqlhighavailability.com/replication-features-in-mysql-8-0-1/
ImmediateCommitTimestamp uint64
OriginalCommitTimestamp uint64

// Total transaction length (including this GTIDEvent), introduced in MySQL-8.0.2, see:
// https://mysqlhighavailability.com/taking-advantage-of-new-transaction-length-metadata/
TransactionLength uint64

// ImmediateServerVersion/OriginalServerVersion are introduced in MySQL-8.0.14, see
// https://dev.mysql.com/doc/refman/8.0/en/replication-compatibility.html
ImmediateServerVersion uint32
OriginalServerVersion uint32
}

func (e *GTIDEvent) Decode(data []byte) error {
Expand All @@ -359,26 +374,99 @@ func (e *GTIDEvent) Decode(data []byte) error {
pos += SidLength
e.GNO = int64(binary.LittleEndian.Uint64(data[pos:]))
pos += 8

if len(data) >= 42 {
if uint8(data[pos]) == LogicalTimestampTypeCode {
pos++
e.LastCommitted = int64(binary.LittleEndian.Uint64(data[pos:]))
pos += PartLogicalTimestampLength
e.SequenceNumber = int64(binary.LittleEndian.Uint64(data[pos:]))
pos += 8

// IMMEDIATE_COMMIT_TIMESTAMP_LENGTH = 7
if len(data)-pos < 7 {
return nil
}
e.ImmediateCommitTimestamp = FixedLengthInt(data[pos : pos+7])
pos += 7
if (e.ImmediateCommitTimestamp & (uint64(1) << 55)) != 0 {
// If the most significant bit set, another 7 byte follows representing OriginalCommitTimestamp
e.ImmediateCommitTimestamp &= ^(uint64(1) << 55)
e.OriginalCommitTimestamp = FixedLengthInt(data[pos : pos+7])
pos += 7

} else {
// Otherwise OriginalCommitTimestamp == ImmediateCommitTimestamp
e.OriginalCommitTimestamp = e.ImmediateCommitTimestamp

}

// TRANSACTION_LENGTH_MIN_LENGTH = 1
if len(data)-pos < 1 {
return nil
}
var n int
e.TransactionLength, _, n = LengthEncodedInt(data[pos:])
pos += n

// IMMEDIATE_SERVER_VERSION_LENGTH = 4
e.ImmediateServerVersion = UndefinedServerVer
e.OriginalServerVersion = UndefinedServerVer
if len(data)-pos < 4 {
return nil
}
e.ImmediateServerVersion = binary.LittleEndian.Uint32(data[pos:])
pos += 4
if (e.ImmediateServerVersion & (uint32(1) << 31)) != 0 {
// If the most significant bit set, another 4 byte follows representing OriginalServerVersion
e.ImmediateServerVersion &= ^(uint32(1) << 31)
e.OriginalServerVersion = binary.LittleEndian.Uint32(data[pos:])
pos += 4

} else {
// Otherwise OriginalServerVersion == ImmediateServerVersion
e.OriginalServerVersion = e.ImmediateServerVersion

}

}
}
return nil
}

func (e *GTIDEvent) Dump(w io.Writer) {
fmtTime := func(t time.Time) string {
if t.IsZero() {
return "<n/a>"
}
return t.Format(time.RFC3339Nano)
}

fmt.Fprintf(w, "Commit flag: %d\n", e.CommitFlag)
u, _ := uuid.FromBytes(e.SID)
fmt.Fprintf(w, "GTID_NEXT: %s:%d\n", u.String(), e.GNO)
fmt.Fprintf(w, "LAST_COMMITTED: %d\n", e.LastCommitted)
fmt.Fprintf(w, "SEQUENCE_NUMBER: %d\n", e.SequenceNumber)
fmt.Fprintf(w, "Immediate commmit timestamp: %d (%s)\n", e.ImmediateCommitTimestamp, fmtTime(e.ImmediateCommitTime()))
fmt.Fprintf(w, "Orignal commmit timestamp: %d (%s)\n", e.OriginalCommitTimestamp, fmtTime(e.OriginalCommitTime()))
fmt.Fprintf(w, "Transaction length: %d\n", e.TransactionLength)
fmt.Fprintf(w, "Immediate server version: %d\n", e.ImmediateServerVersion)
fmt.Fprintf(w, "Orignal server version: %d\n", e.OriginalServerVersion)
fmt.Fprintln(w)
}

// ImmediateCommitTime returns the commit time of this trx on the immediate server
// or zero time if not available.
func (e *GTIDEvent) ImmediateCommitTime() time.Time {
return microSecTimestampToTime(e.ImmediateCommitTimestamp)
}

// OriginalCommitTime returns the commit time of this trx on the original server
// or zero time if not available.
func (e *GTIDEvent) OriginalCommitTime() time.Time {
return microSecTimestampToTime(e.OriginalCommitTimestamp)
}

type BeginLoadQueryEvent struct {
FileID uint32
BlockData []byte
Expand Down
51 changes: 51 additions & 0 deletions replication/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,54 @@ func (_ *testDecodeSuite) TestMariadbGTIDEvent(c *C) {
c.Assert(ev.IsGroupCommit(), Equals, true)
c.Assert(ev.CommitID, Equals, uint64(0x1716151413121110))
}

func (_ *testDecodeSuite) TestGTIDEventMysql8NewFields(c *C) {

testcases := []struct {
data []byte
expectImmediateCommitTimestamp uint64
expectOriginalCommitTimestamp uint64
expectTransactoinLength uint64
expectImmediateServerVersion uint32
expectOriginalServerVersion uint32
}{
{
// master: mysql-5.7, slave: mysql-8.0
data: []byte("\x00Z\xa7*\u007fD\xa8\x11\xea\x94\u007f\x02B\xac\x19\x00\x02\x02\x01\x00\x00\x00\x00\x00\x00\x02v\x00\x00\x00\x00\x00\x00\x00w\x00\x00\x00\x00\x00\x00\x00\xc1G\x81\x16x\xa0\x85\x00\x00\x00\x00\x00\x00\x00\xfc\xc5\x03\x938\x01\x80\x00\x00\x00\x00"),
expectImmediateCommitTimestamp: 1583812517644225,
expectOriginalCommitTimestamp: 0,
expectTransactoinLength: 965,
expectImmediateServerVersion: 80019,
expectOriginalServerVersion: 0,
},
{
// mysql-5.7 only
data: []byte("\x00Z\xa7*\u007fD\xa8\x11\xea\x94\u007f\x02B\xac\x19\x00\x02\x03\x01\x00\x00\x00\x00\x00\x00\x025\x00\x00\x00\x00\x00\x00\x006\x00\x00\x00\x00\x00\x00\x00"),
expectImmediateCommitTimestamp: 0,
expectOriginalCommitTimestamp: 0,
expectTransactoinLength: 0,
expectImmediateServerVersion: 0,
expectOriginalServerVersion: 0,
},
{
// mysql-8.0 only
data: []byte("\x00\\\xcc\x103D\xa8\x11\xea\xbdY\x02B\xac\x19\x00\x03w\x00\x00\x00\x00\x00\x00\x00\x02x\x00\x00\x00\x00\x00\x00\x00y\x00\x00\x00\x00\x00\x00\x00j0\xb1>x\xa0\x05\xfc\xc3\x03\x938\x01\x00"),
expectImmediateCommitTimestamp: 1583813191872618,
expectOriginalCommitTimestamp: 1583813191872618,
expectTransactoinLength: 963,
expectImmediateServerVersion: 80019,
expectOriginalServerVersion: 80019,
},
}

for _, tc := range testcases {
ev := new(GTIDEvent)
err := ev.Decode(tc.data)
c.Assert(err, IsNil)
c.Assert(ev.ImmediateCommitTimestamp, Equals, tc.expectImmediateCommitTimestamp)
c.Assert(ev.OriginalCommitTimestamp, Equals, tc.expectOriginalCommitTimestamp)
c.Assert(ev.ImmediateServerVersion, Equals, tc.expectImmediateServerVersion)
c.Assert(ev.OriginalServerVersion, Equals, tc.expectOriginalServerVersion)
}

}
Loading