Skip to content

Commit 7a62847

Browse files
authored
Merge pull request #468 from huangjunwen/master
Add more MySQL-8.0 meta data to GTIDEvent and TableMapEvent
2 parents 12e8984 + d9d4b74 commit 7a62847

File tree

7 files changed

+802
-17
lines changed

7 files changed

+802
-17
lines changed

notes/field_type.md

+87
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
2+
There are several `types` for each field (column) type, such as `type()`/`real_type()/binlog_type()` ...
3+
4+
By default, `real_type()` and `binlog_type()` are the same as `type()`:
5+
6+
```
7+
// From mysql-8.0/sql/field.h Field
8+
9+
// ...
10+
virtual enum_field_types real_type() const { return type(); }
11+
virtual enum_field_types binlog_type() const {
12+
/*
13+
Binlog stores field->type() as type code by default.
14+
This puts MYSQL_TYPE_STRING in case of CHAR, VARCHAR, SET and ENUM,
15+
with extra data type details put into metadata.
16+
17+
We cannot store field->type() in case of temporal types with
18+
fractional seconds: TIME(n), DATETIME(n) and TIMESTAMP(n),
19+
because binlog records with MYSQL_TYPE_TIME, MYSQL_TYPE_DATETIME
20+
type codes do not have metadata.
21+
So for temporal data types with fractional seconds we'll store
22+
real_type() type codes instead, i.e.
23+
MYSQL_TYPE_TIME2, MYSQL_TYPE_DATETIME2, MYSQL_TYPE_TIMESTAMP2,
24+
and put precision into metatada.
25+
26+
Note: perhaps binlog should eventually be modified to store
27+
real_type() instead of type() for all column types.
28+
*/
29+
return type();
30+
}
31+
// ...
32+
```
33+
34+
Here is a list collected from `mysql-8.0/sql/field.h`:
35+
36+
```
37+
+------------------------------------------------------------+-------------------------------+-----------------------+------------------------+
38+
| Field | type() | real_type() | binlog_type() |
39+
+------------------------------------------------------------+-------------------------------+-----------------------+------------------------+
40+
| | | | |
41+
| Field (abstract) | | | |
42+
| | | | | |
43+
| +--Field_bit | MYSQL_TYPE_BIT | | |
44+
| | +--Field_bit_as_char | | | |
45+
| | | | | |
46+
| +--Field_num (abstract) | | | |
47+
| | | +--Field_real (abstract) | | | |
48+
| | | +--Field_decimal | MYSQL_TYPE_DECIMAL | | |
49+
| | | +--Field_float | MYSQL_TYPE_FLOAT | | |
50+
| | | +--Field_double | MYSQL_TYPE_DOUBLE | | |
51+
| | | | | | |
52+
| | +--Field_new_decimal | MYSQL_TYPE_NEWDECIMAL | | |
53+
| | +--Field_short | MYSQL_TYPE_SHORT | | |
54+
| | +--Field_medium | MYSQL_TYPE_INT24 | | |
55+
| | +--Field_long | MYSQL_TYPE_LONG | | |
56+
| | +--Field_longlong | MYSQL_TYPE_LONGLONG | | |
57+
| | +--Field_tiny | MYSQL_TYPE_TINY | | |
58+
| | +--Field_year | MYSQL_TYPE_YEAR | | |
59+
| | | | | |
60+
| +--Field_str (abstract) | | | |
61+
| | +--Field_longstr | | | |
62+
| | | +--Field_string | MYSQL_TYPE_STRING | MYSQL_TYPE_STRING | |
63+
| | | +--Field_varstring | MYSQL_TYPE_VARCHAR | MYSQL_TYPE_VARCHAR | |
64+
| | | +--Field_blob | MYSQL_TYPE_BLOB | | |
65+
| | | +--Field_geom | MYSQL_TYPE_GEOMETRY | | |
66+
| | | +--Field_json | MYSQL_TYPE_JSON | | |
67+
| | | +--Field_typed_array | real_type_to_type(m_elt_type) | m_elt_type | MYSQL_TYPE_TYPED_ARRAY |
68+
| | | | | | |
69+
| | +--Field_null | MYSQL_TYPE_NULL | | |
70+
| | +--Field_enum | MYSQL_TYPE_STRING | MYSQL_TYPE_ENUM | |
71+
| | +--Field_set | | MYSQL_TYPE_SET | |
72+
| | | | | |
73+
| +--Field_temporal (abstract) | | | |
74+
| +--Field_time_common (abstract) | | | |
75+
| | +--Field_time | MYSQL_TYPE_TIME | | |
76+
| | +--Field_timef | MYSQL_TYPE_TIME | MYSQL_TYPE_TIME2 | MYSQL_TYPE_TIME2 |
77+
| | | | | |
78+
| +--Field_temporal_with_date (abstract) | | | |
79+
| +--Field_newdate | MYSQL_TYPE_DATE | MYSQL_TYPE_NEWDATE | |
80+
| +--Field_temporal_with_date_and_time (abstract) | | | |
81+
| +--Field_timestamp | MYSQL_TYPE_TIMESTAMP | | |
82+
| +--Field_datetime | MYSQL_TYPE_DATETIME | | |
83+
| +--Field_temporal_with_date_and_timef (abstract) | | | |
84+
| +--Field_timestampf | MYSQL_TYPE_TIMESTAMP | MYSQL_TYPE_TIMESTAMP2 | MYSQL_TYPE_TIMESTAMP2 |
85+
| +--Field_datetimef | MYSQL_TYPE_DATETIME | MYSQL_TYPE_DATETIME2 | MYSQL_TYPE_DATETIME2 |
86+
+------------------------------------------------------------+-------------------------------+-----------------------+------------------------+
87+
```

replication/const.go

+16-1
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,7 @@ func (e EventType) String() string {
188188
return "ViewChangeEvent"
189189
case XA_PREPARE_LOG_EVENT:
190190
return "XAPrepareLogEvent"
191-
191+
192192
default:
193193
return "UnknownEvent"
194194
}
@@ -202,3 +202,18 @@ const (
202202
BINLOG_CHECKSUM_ALG_UNDEF byte = 255 // special value to tag undetermined yet checksum
203203
// or events from checksum-unaware servers
204204
)
205+
206+
// These are TABLE_MAP_EVENT's optional metadata field type, from: libbinlogevents/include/rows_event.h
207+
const (
208+
TABLE_MAP_OPT_META_SIGNEDNESS byte = iota + 1
209+
TABLE_MAP_OPT_META_DEFAULT_CHARSET
210+
TABLE_MAP_OPT_META_COLUMN_CHARSET
211+
TABLE_MAP_OPT_META_COLUMN_NAME
212+
TABLE_MAP_OPT_META_SET_STR_VALUE
213+
TABLE_MAP_OPT_META_ENUM_STR_VALUE
214+
TABLE_MAP_OPT_META_GEOMETRY_TYPE
215+
TABLE_MAP_OPT_META_SIMPLE_PRIMARY_KEY
216+
TABLE_MAP_OPT_META_PRIMARY_KEY_WITH_PREFIX
217+
TABLE_MAP_OPT_META_ENUM_AND_SET_DEFAULT_CHARSET
218+
TABLE_MAP_OPT_META_ENUM_AND_SET_COLUMN_CHARSET
219+
)

replication/event.go

+103-15
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ const (
2121
LogicalTimestampTypeCode = 2
2222
PartLogicalTimestampLength = 8
2323
BinlogChecksumLength = 4
24+
UndefinedServerVer = 999999 // UNDEFINED_SERVER_VERSION
2425
)
2526

2627
type BinlogEvent struct {
@@ -225,31 +226,31 @@ type PreviousGTIDsEvent struct {
225226
func (e *PreviousGTIDsEvent) Decode(data []byte) error {
226227
var previousGTIDSets []string
227228
pos := 0
228-
uuidCount := binary.LittleEndian.Uint16(data[pos:pos+8])
229+
uuidCount := binary.LittleEndian.Uint16(data[pos : pos+8])
229230
pos += 8
230231

231-
for i := uint16(0);i < uuidCount; i++ {
232-
uuid := e.decodeUuid(data[pos:pos+16])
232+
for i := uint16(0); i < uuidCount; i++ {
233+
uuid := e.decodeUuid(data[pos : pos+16])
233234
pos += 16
234-
sliceCount := binary.LittleEndian.Uint16(data[pos:pos+8])
235+
sliceCount := binary.LittleEndian.Uint16(data[pos : pos+8])
235236
pos += 8
236237
var intervals []string
237-
for i := uint16(0);i < sliceCount; i++ {
238-
start := e.decodeInterval(data[pos:pos+8])
238+
for i := uint16(0); i < sliceCount; i++ {
239+
start := e.decodeInterval(data[pos : pos+8])
239240
pos += 8
240-
stop := e.decodeInterval(data[pos:pos+8])
241+
stop := e.decodeInterval(data[pos : pos+8])
241242
pos += 8
242243
interval := ""
243244
if stop == start+1 {
244-
interval = fmt.Sprintf("%d",start)
245-
}else {
246-
interval = fmt.Sprintf("%d-%d",start,stop-1)
245+
interval = fmt.Sprintf("%d", start)
246+
} else {
247+
interval = fmt.Sprintf("%d-%d", start, stop-1)
247248
}
248-
intervals = append(intervals,interval)
249+
intervals = append(intervals, interval)
249250
}
250-
previousGTIDSets = append(previousGTIDSets,fmt.Sprintf("%s:%s",uuid,strings.Join(intervals,":")))
251+
previousGTIDSets = append(previousGTIDSets, fmt.Sprintf("%s:%s", uuid, strings.Join(intervals, ":")))
251252
}
252-
e.GTIDSets = fmt.Sprintf("%s",strings.Join(previousGTIDSets,","))
253+
e.GTIDSets = fmt.Sprintf("%s", strings.Join(previousGTIDSets, ","))
253254
return nil
254255
}
255256

@@ -259,8 +260,8 @@ func (e *PreviousGTIDsEvent) Dump(w io.Writer) {
259260
}
260261

261262
func (e *PreviousGTIDsEvent) decodeUuid(data []byte) string {
262-
return fmt.Sprintf("%s-%s-%s-%s-%s",hex.EncodeToString(data[0:4]),hex.EncodeToString(data[4:6]),
263-
hex.EncodeToString(data[6:8]),hex.EncodeToString(data[8:10]),hex.EncodeToString(data[10:]))
263+
return fmt.Sprintf("%s-%s-%s-%s-%s", hex.EncodeToString(data[0:4]), hex.EncodeToString(data[4:6]),
264+
hex.EncodeToString(data[6:8]), hex.EncodeToString(data[8:10]), hex.EncodeToString(data[10:]))
264265
}
265266

266267
func (e *PreviousGTIDsEvent) decodeInterval(data []byte) uint64 {
@@ -349,6 +350,20 @@ type GTIDEvent struct {
349350
GNO int64
350351
LastCommitted int64
351352
SequenceNumber int64
353+
354+
// ImmediateCommitTimestamp/OriginalCommitTimestamp are introduced in MySQL-8.0.1, see:
355+
// https://mysqlhighavailability.com/replication-features-in-mysql-8-0-1/
356+
ImmediateCommitTimestamp uint64
357+
OriginalCommitTimestamp uint64
358+
359+
// Total transaction length (including this GTIDEvent), introduced in MySQL-8.0.2, see:
360+
// https://mysqlhighavailability.com/taking-advantage-of-new-transaction-length-metadata/
361+
TransactionLength uint64
362+
363+
// ImmediateServerVersion/OriginalServerVersion are introduced in MySQL-8.0.14, see
364+
// https://dev.mysql.com/doc/refman/8.0/en/replication-compatibility.html
365+
ImmediateServerVersion uint32
366+
OriginalServerVersion uint32
352367
}
353368

354369
func (e *GTIDEvent) Decode(data []byte) error {
@@ -359,26 +374,99 @@ func (e *GTIDEvent) Decode(data []byte) error {
359374
pos += SidLength
360375
e.GNO = int64(binary.LittleEndian.Uint64(data[pos:]))
361376
pos += 8
377+
362378
if len(data) >= 42 {
363379
if uint8(data[pos]) == LogicalTimestampTypeCode {
364380
pos++
365381
e.LastCommitted = int64(binary.LittleEndian.Uint64(data[pos:]))
366382
pos += PartLogicalTimestampLength
367383
e.SequenceNumber = int64(binary.LittleEndian.Uint64(data[pos:]))
384+
pos += 8
385+
386+
// IMMEDIATE_COMMIT_TIMESTAMP_LENGTH = 7
387+
if len(data)-pos < 7 {
388+
return nil
389+
}
390+
e.ImmediateCommitTimestamp = FixedLengthInt(data[pos : pos+7])
391+
pos += 7
392+
if (e.ImmediateCommitTimestamp & (uint64(1) << 55)) != 0 {
393+
// If the most significant bit set, another 7 byte follows representing OriginalCommitTimestamp
394+
e.ImmediateCommitTimestamp &= ^(uint64(1) << 55)
395+
e.OriginalCommitTimestamp = FixedLengthInt(data[pos : pos+7])
396+
pos += 7
397+
398+
} else {
399+
// Otherwise OriginalCommitTimestamp == ImmediateCommitTimestamp
400+
e.OriginalCommitTimestamp = e.ImmediateCommitTimestamp
401+
402+
}
403+
404+
// TRANSACTION_LENGTH_MIN_LENGTH = 1
405+
if len(data)-pos < 1 {
406+
return nil
407+
}
408+
var n int
409+
e.TransactionLength, _, n = LengthEncodedInt(data[pos:])
410+
pos += n
411+
412+
// IMMEDIATE_SERVER_VERSION_LENGTH = 4
413+
e.ImmediateServerVersion = UndefinedServerVer
414+
e.OriginalServerVersion = UndefinedServerVer
415+
if len(data)-pos < 4 {
416+
return nil
417+
}
418+
e.ImmediateServerVersion = binary.LittleEndian.Uint32(data[pos:])
419+
pos += 4
420+
if (e.ImmediateServerVersion & (uint32(1) << 31)) != 0 {
421+
// If the most significant bit set, another 4 byte follows representing OriginalServerVersion
422+
e.ImmediateServerVersion &= ^(uint32(1) << 31)
423+
e.OriginalServerVersion = binary.LittleEndian.Uint32(data[pos:])
424+
pos += 4
425+
426+
} else {
427+
// Otherwise OriginalServerVersion == ImmediateServerVersion
428+
e.OriginalServerVersion = e.ImmediateServerVersion
429+
430+
}
431+
368432
}
369433
}
370434
return nil
371435
}
372436

373437
func (e *GTIDEvent) Dump(w io.Writer) {
438+
fmtTime := func(t time.Time) string {
439+
if t.IsZero() {
440+
return "<n/a>"
441+
}
442+
return t.Format(time.RFC3339Nano)
443+
}
444+
374445
fmt.Fprintf(w, "Commit flag: %d\n", e.CommitFlag)
375446
u, _ := uuid.FromBytes(e.SID)
376447
fmt.Fprintf(w, "GTID_NEXT: %s:%d\n", u.String(), e.GNO)
377448
fmt.Fprintf(w, "LAST_COMMITTED: %d\n", e.LastCommitted)
378449
fmt.Fprintf(w, "SEQUENCE_NUMBER: %d\n", e.SequenceNumber)
450+
fmt.Fprintf(w, "Immediate commmit timestamp: %d (%s)\n", e.ImmediateCommitTimestamp, fmtTime(e.ImmediateCommitTime()))
451+
fmt.Fprintf(w, "Orignal commmit timestamp: %d (%s)\n", e.OriginalCommitTimestamp, fmtTime(e.OriginalCommitTime()))
452+
fmt.Fprintf(w, "Transaction length: %d\n", e.TransactionLength)
453+
fmt.Fprintf(w, "Immediate server version: %d\n", e.ImmediateServerVersion)
454+
fmt.Fprintf(w, "Orignal server version: %d\n", e.OriginalServerVersion)
379455
fmt.Fprintln(w)
380456
}
381457

458+
// ImmediateCommitTime returns the commit time of this trx on the immediate server
459+
// or zero time if not available.
460+
func (e *GTIDEvent) ImmediateCommitTime() time.Time {
461+
return microSecTimestampToTime(e.ImmediateCommitTimestamp)
462+
}
463+
464+
// OriginalCommitTime returns the commit time of this trx on the original server
465+
// or zero time if not available.
466+
func (e *GTIDEvent) OriginalCommitTime() time.Time {
467+
return microSecTimestampToTime(e.OriginalCommitTimestamp)
468+
}
469+
382470
type BeginLoadQueryEvent struct {
383471
FileID uint32
384472
BlockData []byte

replication/event_test.go

+51
Original file line numberDiff line numberDiff line change
@@ -48,3 +48,54 @@ func (_ *testDecodeSuite) TestMariadbGTIDEvent(c *C) {
4848
c.Assert(ev.IsGroupCommit(), Equals, true)
4949
c.Assert(ev.CommitID, Equals, uint64(0x1716151413121110))
5050
}
51+
52+
func (_ *testDecodeSuite) TestGTIDEventMysql8NewFields(c *C) {
53+
54+
testcases := []struct {
55+
data []byte
56+
expectImmediateCommitTimestamp uint64
57+
expectOriginalCommitTimestamp uint64
58+
expectTransactoinLength uint64
59+
expectImmediateServerVersion uint32
60+
expectOriginalServerVersion uint32
61+
}{
62+
{
63+
// master: mysql-5.7, slave: mysql-8.0
64+
data: []byte("\x00Z\xa7*\u007fD\xa8\x11\xea\x94\u007f\x02B\xac\x19\x00\x02\x02\x01\x00\x00\x00\x00\x00\x00\x02v\x00\x00\x00\x00\x00\x00\x00w\x00\x00\x00\x00\x00\x00\x00\xc1G\x81\x16x\xa0\x85\x00\x00\x00\x00\x00\x00\x00\xfc\xc5\x03\x938\x01\x80\x00\x00\x00\x00"),
65+
expectImmediateCommitTimestamp: 1583812517644225,
66+
expectOriginalCommitTimestamp: 0,
67+
expectTransactoinLength: 965,
68+
expectImmediateServerVersion: 80019,
69+
expectOriginalServerVersion: 0,
70+
},
71+
{
72+
// mysql-5.7 only
73+
data: []byte("\x00Z\xa7*\u007fD\xa8\x11\xea\x94\u007f\x02B\xac\x19\x00\x02\x03\x01\x00\x00\x00\x00\x00\x00\x025\x00\x00\x00\x00\x00\x00\x006\x00\x00\x00\x00\x00\x00\x00"),
74+
expectImmediateCommitTimestamp: 0,
75+
expectOriginalCommitTimestamp: 0,
76+
expectTransactoinLength: 0,
77+
expectImmediateServerVersion: 0,
78+
expectOriginalServerVersion: 0,
79+
},
80+
{
81+
// mysql-8.0 only
82+
data: []byte("\x00\\\xcc\x103D\xa8\x11\xea\xbdY\x02B\xac\x19\x00\x03w\x00\x00\x00\x00\x00\x00\x00\x02x\x00\x00\x00\x00\x00\x00\x00y\x00\x00\x00\x00\x00\x00\x00j0\xb1>x\xa0\x05\xfc\xc3\x03\x938\x01\x00"),
83+
expectImmediateCommitTimestamp: 1583813191872618,
84+
expectOriginalCommitTimestamp: 1583813191872618,
85+
expectTransactoinLength: 963,
86+
expectImmediateServerVersion: 80019,
87+
expectOriginalServerVersion: 80019,
88+
},
89+
}
90+
91+
for _, tc := range testcases {
92+
ev := new(GTIDEvent)
93+
err := ev.Decode(tc.data)
94+
c.Assert(err, IsNil)
95+
c.Assert(ev.ImmediateCommitTimestamp, Equals, tc.expectImmediateCommitTimestamp)
96+
c.Assert(ev.OriginalCommitTimestamp, Equals, tc.expectOriginalCommitTimestamp)
97+
c.Assert(ev.ImmediateServerVersion, Equals, tc.expectImmediateServerVersion)
98+
c.Assert(ev.OriginalServerVersion, Equals, tc.expectOriginalServerVersion)
99+
}
100+
101+
}

0 commit comments

Comments
 (0)