From 514d075349668f938db1cba08de7e611b8de1f25 Mon Sep 17 00:00:00 2001 From: jayven Date: Wed, 29 Jan 2020 18:18:39 +0800 Subject: [PATCH 01/21] Add some optional meta fields for TABLE_MAP_EVENT: signedness/column names/primary key --- replication/const.go | 17 +++++++- replication/row_event.go | 94 ++++++++++++++++++++++++++++++++++++++-- 2 files changed, 107 insertions(+), 4 deletions(-) diff --git a/replication/const.go b/replication/const.go index 3420b5d63..6230257fc 100644 --- a/replication/const.go +++ b/replication/const.go @@ -188,7 +188,7 @@ func (e EventType) String() string { return "ViewChangeEvent" case XA_PREPARE_LOG_EVENT: return "XAPrepareLogEvent" - + default: return "UnknownEvent" } @@ -202,3 +202,18 @@ const ( BINLOG_CHECKSUM_ALG_UNDEF byte = 255 // special value to tag undetermined yet checksum // or events from checksum-unaware servers ) + +// These are TABLE_MAP_EVENT's optional metadata field type, from: libbinlogevents/include/rows_event.h +const ( + TABLE_MAP_OPT_META_SIGNEDNESS byte = iota + 1 + TABLE_MAP_OPT_META_DEFAULT_CHARSET + TABLE_MAP_OPT_META_COLUMN_CHARSET + TABLE_MAP_OPT_META_COLUMN_NAME + TABLE_MAP_OPT_META_SET_STR_VALUE + TABLE_MAP_OPT_META_ENUM_STR_VALUE + TABLE_MAP_OPT_META_GEOMETRY_TYPE + TABLE_MAP_OPT_META_SIMPLE_PRIMARY_KEY + TABLE_MAP_OPT_META_PRIMARY_KEY_WITH_PREFIX + TABLE_MAP_OPT_META_ENUM_AND_SET_DEFAULT_CHARSET + TABLE_MAP_OPT_META_ENUM_AND_SET_COLUMN_CHARSET +) diff --git a/replication/row_event.go b/replication/row_event.go index 0c8f41a60..51cc50ddb 100644 --- a/replication/row_event.go +++ b/replication/row_event.go @@ -32,8 +32,17 @@ type TableMapEvent struct { ColumnType []byte ColumnMeta []uint16 - //len = (ColumnCount + 7) / 8 - NullBitmap []byte + NullBitmap []byte // len = (ColumnCount + 7) / 8 + + // The followings are available only after MySQL-8.0.1, see: `--binlog_row_metadata` and + // https://mysqlhighavailability.com/more-metadata-is-written-into-binary-log/ + + OptionalMeta []byte + + SignednessBitmap []byte // len = (ColumnCount + 7) / 8 + ColumnName [][]byte + PrimaryKey []uint64 // A sequence of column indexes + PrimaryKeyPrefix []uint64 // Prefix length 0 means that the whole column value is used } func (e *TableMapEvent) Decode(data []byte) error { @@ -88,7 +97,16 @@ func (e *TableMapEvent) Decode(data []byte) error { e.NullBitmap = data[pos : pos+nullBitmapSize] - // TODO: handle optional field meta + pos += nullBitmapSize + + e.OptionalMeta = data[pos:] + if len(e.OptionalMeta) == 0 { + return nil + } + + if err = e.decodeOptionalMeta(e.OptionalMeta); err != nil { + return err + } return nil } @@ -186,6 +204,68 @@ func (e *TableMapEvent) decodeMeta(data []byte) error { return nil } +func (e *TableMapEvent) decodeOptionalMeta(data []byte) error { + + pos := 0 + for pos < len(data) { + + // optional metadata fields are stored in Type, Length, Value(TLV) format + // Type takes 1 byte. Length is a packed integer value. Values takes Length bytes + t := data[pos] + pos++ + + l, _, n := LengthEncodedInt(data[pos:]) + pos += n + + v := data[pos : pos+int(l)] + pos += int(l) + + switch t { + case TABLE_MAP_OPT_META_SIGNEDNESS: + e.SignednessBitmap = v + + case TABLE_MAP_OPT_META_COLUMN_NAME: + p := 0 + e.ColumnName = make([][]byte, 0, e.ColumnCount) + for p < len(v) { + n := int(v[p]) + p++ + e.ColumnName = append(e.ColumnName, v[p:p+n]) + p += n + } + + if len(e.ColumnName) != int(e.ColumnCount) { + return errors.Errorf("Expect %d column names but got %d", e.ColumnCount, len(e.ColumnName)) + } + + case TABLE_MAP_OPT_META_SIMPLE_PRIMARY_KEY: + p := 0 + for p < len(v) { + i, _, n := LengthEncodedInt(v[p:]) + e.PrimaryKey = append(e.PrimaryKey, i) + e.PrimaryKeyPrefix = append(e.PrimaryKeyPrefix, 0) + p += n + } + + case TABLE_MAP_OPT_META_PRIMARY_KEY_WITH_PREFIX: + p := 0 + for p < len(v) { + i, _, n := LengthEncodedInt(v[p:]) + e.PrimaryKey = append(e.PrimaryKey, i) + p += n + i, _, n = LengthEncodedInt(v[p:]) + e.PrimaryKeyPrefix = append(e.PrimaryKeyPrefix, i) + p += n + } + + default: + } + + } + + return nil +} + func (e *TableMapEvent) Dump(w io.Writer) { fmt.Fprintf(w, "TableID: %d\n", e.TableID) fmt.Fprintf(w, "TableID size: %d\n", e.tableIDSize) @@ -195,6 +275,14 @@ func (e *TableMapEvent) Dump(w io.Writer) { fmt.Fprintf(w, "Column count: %d\n", e.ColumnCount) fmt.Fprintf(w, "Column type: \n%s", hex.Dump(e.ColumnType)) fmt.Fprintf(w, "NULL bitmap: \n%s", hex.Dump(e.NullBitmap)) + fmt.Fprintf(w, "Optional meta: \n%s", hex.Dump(e.OptionalMeta)) + fmt.Fprintf(w, "Signedness bitmap\n%s", hex.Dump(e.SignednessBitmap)) + fmt.Fprintf(w, "Column name: \n") + for _, name := range e.ColumnName { + fmt.Fprintf(w, " %s\n", name) + } + fmt.Fprintf(w, "Primary key: %v\n", e.PrimaryKey) + fmt.Fprintf(w, "Primary key prefix: %v\n", e.PrimaryKeyPrefix) fmt.Fprintln(w) } From 1e585a90d0cc3321458f6ced0db74161ce464427 Mon Sep 17 00:00:00 2001 From: jayven Date: Thu, 30 Jan 2020 10:01:59 +0800 Subject: [PATCH 02/21] Add mysql 8.0 new fields to GTIDEvent --- replication/event.go | 67 ++++++++++++++++++++++++++++++++++++++++++++ replication/util.go | 31 ++++++++++++++++++++ 2 files changed, 98 insertions(+) create mode 100644 replication/util.go diff --git a/replication/event.go b/replication/event.go index 837055c1c..7d9195560 100644 --- a/replication/event.go +++ b/replication/event.go @@ -20,6 +20,7 @@ const ( LogicalTimestampTypeCode = 2 PartLogicalTimestampLength = 8 BinlogChecksumLength = 4 + UndefinedServerVer = 999999 // UNDEFINED_SERVER_VERSION ) type BinlogEvent struct { @@ -299,6 +300,18 @@ type GTIDEvent struct { GNO int64 LastCommitted int64 SequenceNumber int64 + + // The followings are available only after MySQL-8.0 + + ImmediateCommitTimestamp uint64 + OriginalCommitTimestamp uint64 + + // Total transaction length (including this GTIDEvent), see: + // https://mysqlhighavailability.com/taking-advantage-of-new-transaction-length-metadata/ + TransactionLength uint64 + + ImmediateServerVersion uint32 + OriginalServerVersion uint32 } func (e *GTIDEvent) Decode(data []byte) error { @@ -309,12 +322,61 @@ func (e *GTIDEvent) Decode(data []byte) error { pos += SidLength e.GNO = int64(binary.LittleEndian.Uint64(data[pos:])) pos += 8 + if len(data) >= 42 { if uint8(data[pos]) == LogicalTimestampTypeCode { pos++ e.LastCommitted = int64(binary.LittleEndian.Uint64(data[pos:])) pos += PartLogicalTimestampLength e.SequenceNumber = int64(binary.LittleEndian.Uint64(data[pos:])) + pos += 8 + + // IMMEDIATE_COMMIT_TIMESTAMP_LENGTH = 7 + if len(data)-pos < 7 { + return nil + } + e.ImmediateCommitTimestamp = LittleEndianUint64(data[pos : pos+7]) + pos += 7 + if (e.ImmediateCommitTimestamp & (uint64(1) << 55)) != 0 { + // If the most significant bit set, another 7 byte follows representing OriginalCommitTimestamp + e.ImmediateCommitTimestamp &= ^(uint64(1) << 55) + e.OriginalCommitTimestamp = LittleEndianUint64(data[pos : pos+7]) + pos += 7 + + } else { + // Otherwise OriginalCommitTimestamp == ImmediateCommitTimestamp + e.OriginalCommitTimestamp = e.ImmediateCommitTimestamp + + } + + // TRANSACTION_LENGTH_MIN_LENGTH = 1 + if len(data)-pos < 1 { + return nil + } + var n int + e.TransactionLength, _, n = LengthEncodedInt(data[pos:]) + pos += n + + // IMMEDIATE_SERVER_VERSION_LENGTH = 4 + e.ImmediateServerVersion = UndefinedServerVer + e.OriginalServerVersion = UndefinedServerVer + if len(data)-pos < 4 { + return nil + } + e.ImmediateServerVersion = binary.LittleEndian.Uint32(data[pos:]) + pos += 4 + if (e.ImmediateServerVersion & (uint32(1) << 31)) != 0 { + // If the most significant bit set, another 4 byte follows representing OriginalServerVersion + e.ImmediateServerVersion &= ^(uint32(1) << 31) + e.OriginalServerVersion = binary.LittleEndian.Uint32(data[pos:]) + pos += 4 + + } else { + // Otherwise OriginalServerVersion == ImmediateServerVersion + e.OriginalServerVersion = e.ImmediateServerVersion + + } + } } return nil @@ -326,6 +388,11 @@ func (e *GTIDEvent) Dump(w io.Writer) { fmt.Fprintf(w, "GTID_NEXT: %s:%d\n", u.String(), e.GNO) fmt.Fprintf(w, "LAST_COMMITTED: %d\n", e.LastCommitted) fmt.Fprintf(w, "SEQUENCE_NUMBER: %d\n", e.SequenceNumber) + fmt.Fprintf(w, "Immediate commmit timestamp: %d\n", e.ImmediateCommitTimestamp) + fmt.Fprintf(w, "Orignal commmit timestamp: %d\n", e.OriginalCommitTimestamp) + fmt.Fprintf(w, "Transaction length: %d\n", e.TransactionLength) + fmt.Fprintf(w, "Immediate server version: %d\n", e.ImmediateServerVersion) + fmt.Fprintf(w, "Orignal server version: %d\n", e.OriginalServerVersion) fmt.Fprintln(w) } diff --git a/replication/util.go b/replication/util.go new file mode 100644 index 000000000..b503e19b6 --- /dev/null +++ b/replication/util.go @@ -0,0 +1,31 @@ +package replication + +func LittleEndianUint64(b []byte) uint64 { + var ret uint64 + switch len(b) { + case 8: + ret |= (uint64(b[7]) << 56) + fallthrough + case 7: + ret |= (uint64(b[6]) << 48) + fallthrough + case 6: + ret |= (uint64(b[5]) << 40) + fallthrough + case 5: + ret |= (uint64(b[4]) << 32) + fallthrough + case 4: + ret |= (uint64(b[3]) << 24) + fallthrough + case 3: + ret |= (uint64(b[2]) << 16) + fallthrough + case 2: + ret |= (uint64(b[1]) << 8) + fallthrough + case 1: + ret |= uint64(b[0]) + } + return ret +} From 782e7102e701a822b408295be21947ec2ee5697f Mon Sep 17 00:00:00 2001 From: jayven Date: Thu, 30 Jan 2020 14:22:23 +0800 Subject: [PATCH 03/21] Add some timestamp helper methods for GTIDEvent --- replication/event.go | 30 ++++++++++++++++++++++++++++-- 1 file changed, 28 insertions(+), 2 deletions(-) diff --git a/replication/event.go b/replication/event.go index 7d9195560..20e871b40 100644 --- a/replication/event.go +++ b/replication/event.go @@ -388,14 +388,40 @@ func (e *GTIDEvent) Dump(w io.Writer) { fmt.Fprintf(w, "GTID_NEXT: %s:%d\n", u.String(), e.GNO) fmt.Fprintf(w, "LAST_COMMITTED: %d\n", e.LastCommitted) fmt.Fprintf(w, "SEQUENCE_NUMBER: %d\n", e.SequenceNumber) - fmt.Fprintf(w, "Immediate commmit timestamp: %d\n", e.ImmediateCommitTimestamp) - fmt.Fprintf(w, "Orignal commmit timestamp: %d\n", e.OriginalCommitTimestamp) + fmt.Fprintf(w, "Immediate commmit timestamp: %d (%s)\n", e.ImmediateCommitTimestamp, e.fmtTime(e.ImmediateCommitTime())) + fmt.Fprintf(w, "Orignal commmit timestamp: %d (%s)\n", e.OriginalCommitTimestamp, e.fmtTime(e.OriginalCommitTime())) fmt.Fprintf(w, "Transaction length: %d\n", e.TransactionLength) fmt.Fprintf(w, "Immediate server version: %d\n", e.ImmediateServerVersion) fmt.Fprintf(w, "Orignal server version: %d\n", e.OriginalServerVersion) fmt.Fprintln(w) } +// ImmediateCommitTime returns the commit time of this trx on the immediate server +// or zero time if not available. +func (e *GTIDEvent) ImmediateCommitTime() time.Time { + return e.microsecTimestamp2Time(e.ImmediateCommitTimestamp) +} + +// OriginalCommitTime returns the commit time of this trx on the original server +// or zero time if not available. +func (e *GTIDEvent) OriginalCommitTime() time.Time { + return e.microsecTimestamp2Time(e.OriginalCommitTimestamp) +} + +func (e *GTIDEvent) microsecTimestamp2Time(ts uint64) time.Time { + if ts == 0 { + return time.Time{} + } + return time.Unix(int64(ts/1000000), int64(ts%1000000)*1000) +} + +func (e *GTIDEvent) fmtTime(t time.Time) string { + if t.IsZero() { + return "N/A" + } + return t.Format(time.RFC3339Nano) +} + type BeginLoadQueryEvent struct { FileID uint32 BlockData []byte From 3e9f1d735d4ac742d0e5840195a4217afba1026d Mon Sep 17 00:00:00 2001 From: jayven Date: Thu, 30 Jan 2020 16:13:02 +0800 Subject: [PATCH 04/21] Move some code --- replication/event.go | 29 +++++++++++------------------ replication/time.go | 7 +++++++ 2 files changed, 18 insertions(+), 18 deletions(-) diff --git a/replication/event.go b/replication/event.go index 20e871b40..cdf2b1aaf 100644 --- a/replication/event.go +++ b/replication/event.go @@ -383,13 +383,20 @@ func (e *GTIDEvent) Decode(data []byte) error { } func (e *GTIDEvent) Dump(w io.Writer) { + fmtTime := func(t time.Time) string { + if t.IsZero() { + return "N/A" + } + return t.Format(time.RFC3339Nano) + } + fmt.Fprintf(w, "Commit flag: %d\n", e.CommitFlag) u, _ := uuid.FromBytes(e.SID) fmt.Fprintf(w, "GTID_NEXT: %s:%d\n", u.String(), e.GNO) fmt.Fprintf(w, "LAST_COMMITTED: %d\n", e.LastCommitted) fmt.Fprintf(w, "SEQUENCE_NUMBER: %d\n", e.SequenceNumber) - fmt.Fprintf(w, "Immediate commmit timestamp: %d (%s)\n", e.ImmediateCommitTimestamp, e.fmtTime(e.ImmediateCommitTime())) - fmt.Fprintf(w, "Orignal commmit timestamp: %d (%s)\n", e.OriginalCommitTimestamp, e.fmtTime(e.OriginalCommitTime())) + fmt.Fprintf(w, "Immediate commmit timestamp: %d (%s)\n", e.ImmediateCommitTimestamp, fmtTime(e.ImmediateCommitTime())) + fmt.Fprintf(w, "Orignal commmit timestamp: %d (%s)\n", e.OriginalCommitTimestamp, fmtTime(e.OriginalCommitTime())) fmt.Fprintf(w, "Transaction length: %d\n", e.TransactionLength) fmt.Fprintf(w, "Immediate server version: %d\n", e.ImmediateServerVersion) fmt.Fprintf(w, "Orignal server version: %d\n", e.OriginalServerVersion) @@ -399,27 +406,13 @@ func (e *GTIDEvent) Dump(w io.Writer) { // ImmediateCommitTime returns the commit time of this trx on the immediate server // or zero time if not available. func (e *GTIDEvent) ImmediateCommitTime() time.Time { - return e.microsecTimestamp2Time(e.ImmediateCommitTimestamp) + return microSecTimestampToTime(e.ImmediateCommitTimestamp) } // OriginalCommitTime returns the commit time of this trx on the original server // or zero time if not available. func (e *GTIDEvent) OriginalCommitTime() time.Time { - return e.microsecTimestamp2Time(e.OriginalCommitTimestamp) -} - -func (e *GTIDEvent) microsecTimestamp2Time(ts uint64) time.Time { - if ts == 0 { - return time.Time{} - } - return time.Unix(int64(ts/1000000), int64(ts%1000000)*1000) -} - -func (e *GTIDEvent) fmtTime(t time.Time) string { - if t.IsZero() { - return "N/A" - } - return t.Format(time.RFC3339Nano) + return microSecTimestampToTime(e.OriginalCommitTimestamp) } type BeginLoadQueryEvent struct { diff --git a/replication/time.go b/replication/time.go index 99614edca..2adc832f4 100644 --- a/replication/time.go +++ b/replication/time.go @@ -50,6 +50,13 @@ func formatBeforeUnixZeroTime(year, month, day, hour, minute, second, frac, dec return s[0 : len(s)-(6-dec)] } +func microSecTimestampToTime(ts uint64) time.Time { + if ts == 0 { + return time.Time{} + } + return time.Unix(int64(ts/1000000), int64(ts%1000000)*1000) +} + func init() { fracTimeFormat = make([]string, 7) fracTimeFormat[0] = "2006-01-02 15:04:05" From 5dd4c32af5c6b2d282d63750cf4cc6e43e4f3b1e Mon Sep 17 00:00:00 2001 From: jayven Date: Thu, 30 Jan 2020 18:30:16 +0800 Subject: [PATCH 05/21] Rename and move a function BytesToUint64 to mysql package --- mysql/util.go | 31 +++++++++++++++++++++++++++++++ replication/event.go | 4 ++-- replication/util.go | 31 ------------------------------- 3 files changed, 33 insertions(+), 33 deletions(-) delete mode 100644 replication/util.go diff --git a/mysql/util.go b/mysql/util.go index 5ab653227..b4f6df05a 100644 --- a/mysql/util.go +++ b/mysql/util.go @@ -259,6 +259,37 @@ func Uint64ToBytes(n uint64) []byte { } } +func BytesToUint64(b []byte) (u uint64) { + switch len(b) { + case 8: + u |= (uint64(b[7]) << 56) + fallthrough + case 7: + u |= (uint64(b[6]) << 48) + fallthrough + case 6: + u |= (uint64(b[5]) << 40) + fallthrough + case 5: + u |= (uint64(b[4]) << 32) + fallthrough + case 4: + u |= (uint64(b[3]) << 24) + fallthrough + case 3: + u |= (uint64(b[2]) << 16) + fallthrough + case 2: + u |= (uint64(b[1]) << 8) + fallthrough + case 1: + u |= uint64(b[0]) + default: + panic(fmt.Errorf("BytesToUint64 byte slice length must be in range [1, 8]")) + } + return +} + func FormatBinaryDate(n int, data []byte) ([]byte, error) { switch n { case 0: diff --git a/replication/event.go b/replication/event.go index cdf2b1aaf..ebc953c6f 100644 --- a/replication/event.go +++ b/replication/event.go @@ -335,12 +335,12 @@ func (e *GTIDEvent) Decode(data []byte) error { if len(data)-pos < 7 { return nil } - e.ImmediateCommitTimestamp = LittleEndianUint64(data[pos : pos+7]) + e.ImmediateCommitTimestamp = BytesToUint64(data[pos : pos+7]) pos += 7 if (e.ImmediateCommitTimestamp & (uint64(1) << 55)) != 0 { // If the most significant bit set, another 7 byte follows representing OriginalCommitTimestamp e.ImmediateCommitTimestamp &= ^(uint64(1) << 55) - e.OriginalCommitTimestamp = LittleEndianUint64(data[pos : pos+7]) + e.OriginalCommitTimestamp = BytesToUint64(data[pos : pos+7]) pos += 7 } else { diff --git a/replication/util.go b/replication/util.go deleted file mode 100644 index b503e19b6..000000000 --- a/replication/util.go +++ /dev/null @@ -1,31 +0,0 @@ -package replication - -func LittleEndianUint64(b []byte) uint64 { - var ret uint64 - switch len(b) { - case 8: - ret |= (uint64(b[7]) << 56) - fallthrough - case 7: - ret |= (uint64(b[6]) << 48) - fallthrough - case 6: - ret |= (uint64(b[5]) << 40) - fallthrough - case 5: - ret |= (uint64(b[4]) << 32) - fallthrough - case 4: - ret |= (uint64(b[3]) << 24) - fallthrough - case 3: - ret |= (uint64(b[2]) << 16) - fallthrough - case 2: - ret |= (uint64(b[1]) << 8) - fallthrough - case 1: - ret |= uint64(b[0]) - } - return ret -} From c61b8a9d1fe5c84971b4e2f3b810cf75161b2954 Mon Sep 17 00:00:00 2001 From: jayven Date: Thu, 30 Jan 2020 20:21:40 +0800 Subject: [PATCH 06/21] Add some helper methods to TableMapEvent --- mysql/type.go | 21 +++++++++ replication/row_event.go | 92 +++++++++++++++++++++++++++++++++++----- 2 files changed, 102 insertions(+), 11 deletions(-) create mode 100644 mysql/type.go diff --git a/mysql/type.go b/mysql/type.go new file mode 100644 index 000000000..cdb3bb8af --- /dev/null +++ b/mysql/type.go @@ -0,0 +1,21 @@ +package mysql + +// IsNumericType returns true if the given type is numeric. +func IsNumericType(typ byte) bool { + switch typ { + case MYSQL_TYPE_TINY, + MYSQL_TYPE_SHORT, + MYSQL_TYPE_INT24, + MYSQL_TYPE_LONG, + MYSQL_TYPE_LONGLONG, + MYSQL_TYPE_FLOAT, + MYSQL_TYPE_DOUBLE, + MYSQL_TYPE_DECIMAL, + MYSQL_TYPE_NEWDECIMAL: + return true + + default: + return false + } + +} diff --git a/replication/row_event.go b/replication/row_event.go index 51cc50ddb..2264a05bb 100644 --- a/replication/row_event.go +++ b/replication/row_event.go @@ -37,9 +37,9 @@ type TableMapEvent struct { // The followings are available only after MySQL-8.0.1, see: `--binlog_row_metadata` and // https://mysqlhighavailability.com/more-metadata-is-written-into-binary-log/ - OptionalMeta []byte + optionalMeta []byte - SignednessBitmap []byte // len = (ColumnCount + 7) / 8 + SignednessBitmap []byte ColumnName [][]byte PrimaryKey []uint64 // A sequence of column indexes PrimaryKeyPrefix []uint64 // Prefix length 0 means that the whole column value is used @@ -99,12 +99,12 @@ func (e *TableMapEvent) Decode(data []byte) error { pos += nullBitmapSize - e.OptionalMeta = data[pos:] - if len(e.OptionalMeta) == 0 { + e.optionalMeta = data[pos:] + if len(e.optionalMeta) == 0 { return nil } - if err = e.decodeOptionalMeta(e.OptionalMeta); err != nil { + if err = e.decodeOptionalMeta(e.optionalMeta); err != nil { return err } @@ -259,6 +259,7 @@ func (e *TableMapEvent) decodeOptionalMeta(data []byte) error { } default: + // TODO: other meta } } @@ -275,17 +276,86 @@ func (e *TableMapEvent) Dump(w io.Writer) { fmt.Fprintf(w, "Column count: %d\n", e.ColumnCount) fmt.Fprintf(w, "Column type: \n%s", hex.Dump(e.ColumnType)) fmt.Fprintf(w, "NULL bitmap: \n%s", hex.Dump(e.NullBitmap)) - fmt.Fprintf(w, "Optional meta: \n%s", hex.Dump(e.OptionalMeta)) - fmt.Fprintf(w, "Signedness bitmap\n%s", hex.Dump(e.SignednessBitmap)) - fmt.Fprintf(w, "Column name: \n") - for _, name := range e.ColumnName { - fmt.Fprintf(w, " %s\n", name) - } + fmt.Fprintf(w, "Optional meta: \n%s", hex.Dump(e.optionalMeta)) + fmt.Fprintf(w, "Signedness bitmap: \n%s", hex.Dump(e.SignednessBitmap)) fmt.Fprintf(w, "Primary key: %v\n", e.PrimaryKey) fmt.Fprintf(w, "Primary key prefix: %v\n", e.PrimaryKeyPrefix) + + colNameArr := e.ColumnNameArray() + nullArr := e.NullableArray() + unsignedArr := e.UnsignedArray() + fmt.Fprintf(w, "Columns: \n") + for i := 0; i < int(e.ColumnCount); i++ { + if colNameArr != nil { + fmt.Fprintf(w, " %s", colNameArr[i]) + } else { + fmt.Fprintf(w, " ") + } + + fmt.Fprintf(w, " type:%d", e.ColumnType[i]) + + if unsignedArr != nil && unsignedArr[i] { + fmt.Fprintf(w, " unsigned") + } + + if nullArr != nil { + if nullArr[i] { + fmt.Fprintf(w, " null") + } else { + fmt.Fprintf(w, " notnull") + } + } + + fmt.Fprintf(w, "\n") + } fmt.Fprintln(w) } +// NullableArray returns an array of nullablity for each column: true if the column is nullable. +// It returns nil if not available. +func (e *TableMapEvent) NullableArray() []bool { + if len(e.NullBitmap) == 0 { + return nil + } + ret := make([]bool, e.ColumnCount) + for i := 0; i < len(ret); i++ { + ret[i] = e.NullBitmap[i/8]&(1<<(i%8)) != 0 + } + return ret +} + +// ColumnNameArray returns an array of column names. +// It returns nil if not available. +func (e *TableMapEvent) ColumnNameArray() []string { + if len(e.ColumnName) == 0 { + return nil + } + ret := make([]string, e.ColumnCount) + for i := 0; i < len(ret); i++ { + ret[i] = string(e.ColumnName[i]) + } + return ret +} + +// UnsignedArray returns an array of signedness for each column: true if the column is numeric and it's unsigned. +// It returns nil if not available. +func (e *TableMapEvent) UnsignedArray() []bool { + if len(e.SignednessBitmap) == 0 { + return nil + } + p := 0 + ret := make([]bool, e.ColumnCount) + for i := 0; i < len(ret); i++ { + if !IsNumericType(e.ColumnType[i]) { + ret[i] = false + continue + } + ret[i] = e.SignednessBitmap[p/8]&(1<<(7-p%8)) != 0 + p++ + } + return ret +} + // RowsEventStmtEndFlag is set in the end of the statement. const RowsEventStmtEndFlag = 0x01 From c66d0a6dc8e7904b189671cb97f9b07b0eb1af85 Mon Sep 17 00:00:00 2001 From: jayven Date: Fri, 31 Jan 2020 08:05:35 +0800 Subject: [PATCH 07/21] Make TableMapEvent's Dump prettier --- replication/row_event.go | 110 +++++++++++++++++++++------------------ 1 file changed, 60 insertions(+), 50 deletions(-) diff --git a/replication/row_event.go b/replication/row_event.go index 2264a05bb..c0f661bdc 100644 --- a/replication/row_event.go +++ b/replication/row_event.go @@ -32,14 +32,14 @@ type TableMapEvent struct { ColumnType []byte ColumnMeta []uint16 - NullBitmap []byte // len = (ColumnCount + 7) / 8 + //len = (ColumnCount + 7) / 8 + NullBitmap []byte // The followings are available only after MySQL-8.0.1, see: `--binlog_row_metadata` and // https://mysqlhighavailability.com/more-metadata-is-written-into-binary-log/ - optionalMeta []byte - SignednessBitmap []byte + ColumnName [][]byte PrimaryKey []uint64 // A sequence of column indexes PrimaryKeyPrefix []uint64 // Prefix length 0 means that the whole column value is used @@ -99,12 +99,12 @@ func (e *TableMapEvent) Decode(data []byte) error { pos += nullBitmapSize - e.optionalMeta = data[pos:] - if len(e.optionalMeta) == 0 { + optionalMeta := data[pos:] + if len(optionalMeta) == 0 { return nil } - if err = e.decodeOptionalMeta(e.optionalMeta); err != nil { + if err = e.decodeOptionalMeta(optionalMeta); err != nil { return err } @@ -276,78 +276,88 @@ func (e *TableMapEvent) Dump(w io.Writer) { fmt.Fprintf(w, "Column count: %d\n", e.ColumnCount) fmt.Fprintf(w, "Column type: \n%s", hex.Dump(e.ColumnType)) fmt.Fprintf(w, "NULL bitmap: \n%s", hex.Dump(e.NullBitmap)) - fmt.Fprintf(w, "Optional meta: \n%s", hex.Dump(e.optionalMeta)) + fmt.Fprintf(w, "Signedness bitmap: \n%s", hex.Dump(e.SignednessBitmap)) fmt.Fprintf(w, "Primary key: %v\n", e.PrimaryKey) fmt.Fprintf(w, "Primary key prefix: %v\n", e.PrimaryKeyPrefix) - colNameArr := e.ColumnNameArray() - nullArr := e.NullableArray() - unsignedArr := e.UnsignedArray() + unsignedMap := e.UnsignedMap() + + nameMaxLen := 0 + for _, name := range e.ColumnName { + if len(name) > nameMaxLen { + nameMaxLen = len(name) + } + } + nameFmt := " %s" + if nameMaxLen > 0 { + nameFmt = fmt.Sprintf(" %%-%ds", nameMaxLen) + } + + primaryKey := map[int]struct{}{} + for _, pk := range e.PrimaryKey { + primaryKey[int(pk)] = struct{}{} + } + fmt.Fprintf(w, "Columns: \n") for i := 0; i < int(e.ColumnCount); i++ { - if colNameArr != nil { - fmt.Fprintf(w, " %s", colNameArr[i]) + if len(e.ColumnName) == 0 { + fmt.Fprintf(w, nameFmt, "") } else { - fmt.Fprintf(w, " ") + fmt.Fprintf(w, nameFmt, e.ColumnName[i]) } - fmt.Fprintf(w, " type:%d", e.ColumnType[i]) + fmt.Fprintf(w, " type=%-3d", e.ColumnType[i]) - if unsignedArr != nil && unsignedArr[i] { - fmt.Fprintf(w, " unsigned") - } - - if nullArr != nil { - if nullArr[i] { - fmt.Fprintf(w, " null") + if IsNumericType(e.ColumnType[i]) { + if unsignedMap == nil { + fmt.Fprintf(w, " unsigned=") + } else if unsignedMap[i] { + fmt.Fprintf(w, " unsigned=yes") } else { - fmt.Fprintf(w, " notnull") + fmt.Fprintf(w, " unsigned=no ") } } + available, nullable := e.Nullable(i) + if !available { + fmt.Fprintf(w, " null=") + } else if nullable { + fmt.Fprintf(w, " null=yes") + } else { + fmt.Fprintf(w, " null=no ") + } + + if _, ok := primaryKey[i]; ok { + fmt.Fprintf(w, " pri") + } + fmt.Fprintf(w, "\n") } fmt.Fprintln(w) } -// NullableArray returns an array of nullablity for each column: true if the column is nullable. -// It returns nil if not available. -func (e *TableMapEvent) NullableArray() []bool { +// Nullable returns the nullablity of the i-th column. +// If null bits are not available, available is false. +// i must be in range [0, ColumnCount). +func (e *TableMapEvent) Nullable(i int) (available, nullable bool) { if len(e.NullBitmap) == 0 { - return nil - } - ret := make([]bool, e.ColumnCount) - for i := 0; i < len(ret); i++ { - ret[i] = e.NullBitmap[i/8]&(1<<(i%8)) != 0 - } - return ret -} - -// ColumnNameArray returns an array of column names. -// It returns nil if not available. -func (e *TableMapEvent) ColumnNameArray() []string { - if len(e.ColumnName) == 0 { - return nil + return } - ret := make([]string, e.ColumnCount) - for i := 0; i < len(ret); i++ { - ret[i] = string(e.ColumnName[i]) - } - return ret + return true, e.NullBitmap[i/8]&(1<<(i%8)) != 0 } -// UnsignedArray returns an array of signedness for each column: true if the column is numeric and it's unsigned. -// It returns nil if not available. -func (e *TableMapEvent) UnsignedArray() []bool { +// UnsignedMap returns a map: column index -> unsigned. +// Note that only numeric columns will be returned. +// If signedness bits are not available, nil is returned. +func (e *TableMapEvent) UnsignedMap() map[int]bool { if len(e.SignednessBitmap) == 0 { return nil } p := 0 - ret := make([]bool, e.ColumnCount) - for i := 0; i < len(ret); i++ { + ret := make(map[int]bool) + for i := 0; i < int(e.ColumnCount); i++ { if !IsNumericType(e.ColumnType[i]) { - ret[i] = false continue } ret[i] = e.SignednessBitmap[p/8]&(1<<(7-p%8)) != 0 From d3fef7b1ddc1d684c57f00dd211e35e1881f65af Mon Sep 17 00:00:00 2001 From: jayven Date: Fri, 31 Jan 2020 09:56:23 +0800 Subject: [PATCH 08/21] Remove unneccesary code --- replication/row_event.go | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/replication/row_event.go b/replication/row_event.go index c0f661bdc..bacf0b1aa 100644 --- a/replication/row_event.go +++ b/replication/row_event.go @@ -99,12 +99,7 @@ func (e *TableMapEvent) Decode(data []byte) error { pos += nullBitmapSize - optionalMeta := data[pos:] - if len(optionalMeta) == 0 { - return nil - } - - if err = e.decodeOptionalMeta(optionalMeta); err != nil { + if err = e.decodeOptionalMeta(data[pos:]); err != nil { return err } From a624aa6cbff8c063fefc2d78e652701e198a8b22 Mon Sep 17 00:00:00 2001 From: jayven Date: Fri, 31 Jan 2020 10:56:40 +0800 Subject: [PATCH 09/21] Fix compile error before go 1.13: shift count type int, must be unsigned integer --- replication/row_event.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/replication/row_event.go b/replication/row_event.go index bacf0b1aa..60c6afdd7 100644 --- a/replication/row_event.go +++ b/replication/row_event.go @@ -339,7 +339,7 @@ func (e *TableMapEvent) Nullable(i int) (available, nullable bool) { if len(e.NullBitmap) == 0 { return } - return true, e.NullBitmap[i/8]&(1<<(i%8)) != 0 + return true, e.NullBitmap[i/8]&(1< unsigned. @@ -355,7 +355,7 @@ func (e *TableMapEvent) UnsignedMap() map[int]bool { if !IsNumericType(e.ColumnType[i]) { continue } - ret[i] = e.SignednessBitmap[p/8]&(1<<(7-p%8)) != 0 + ret[i] = e.SignednessBitmap[p/8]&(1< Date: Sat, 1 Feb 2020 19:06:45 +0800 Subject: [PATCH 10/21] Extract charset/collation info from TableMapEvent --- mysql/type.go | 21 ---------- replication/event.go | 2 +- replication/row_event.go | 90 +++++++++++++++++++++++++++++++++++++++- replication/type.go | 40 ++++++++++++++++++ 4 files changed, 129 insertions(+), 24 deletions(-) delete mode 100644 mysql/type.go create mode 100644 replication/type.go diff --git a/mysql/type.go b/mysql/type.go deleted file mode 100644 index cdb3bb8af..000000000 --- a/mysql/type.go +++ /dev/null @@ -1,21 +0,0 @@ -package mysql - -// IsNumericType returns true if the given type is numeric. -func IsNumericType(typ byte) bool { - switch typ { - case MYSQL_TYPE_TINY, - MYSQL_TYPE_SHORT, - MYSQL_TYPE_INT24, - MYSQL_TYPE_LONG, - MYSQL_TYPE_LONGLONG, - MYSQL_TYPE_FLOAT, - MYSQL_TYPE_DOUBLE, - MYSQL_TYPE_DECIMAL, - MYSQL_TYPE_NEWDECIMAL: - return true - - default: - return false - } - -} diff --git a/replication/event.go b/replication/event.go index ebc953c6f..7bc8dd7b6 100644 --- a/replication/event.go +++ b/replication/event.go @@ -385,7 +385,7 @@ func (e *GTIDEvent) Decode(data []byte) error { func (e *GTIDEvent) Dump(w io.Writer) { fmtTime := func(t time.Time) string { if t.IsZero() { - return "N/A" + return "" } return t.Format(time.RFC3339Nano) } diff --git a/replication/row_event.go b/replication/row_event.go index 60c6afdd7..2ea193593 100644 --- a/replication/row_event.go +++ b/replication/row_event.go @@ -40,6 +40,13 @@ type TableMapEvent struct { SignednessBitmap []byte + // DefaultCharset[0] is the default collation; + // For character columns that have different charset, + // (character column index, column collation) pairs follows + DefaultCharset []uint64 + // ColumnCharset contains collation sequence for all character columns + ColumnCharset []uint64 + ColumnName [][]byte PrimaryKey []uint64 // A sequence of column indexes PrimaryKeyPrefix []uint64 // Prefix length 0 means that the whole column value is used @@ -219,6 +226,22 @@ func (e *TableMapEvent) decodeOptionalMeta(data []byte) error { case TABLE_MAP_OPT_META_SIGNEDNESS: e.SignednessBitmap = v + case TABLE_MAP_OPT_META_DEFAULT_CHARSET: + p := 0 + for p < len(v) { + c, _, n := LengthEncodedInt(v[p:]) + p += n + e.DefaultCharset = append(e.DefaultCharset, c) + } + + case TABLE_MAP_OPT_META_COLUMN_CHARSET: + p := 0 + for p < len(v) { + c, _, n := LengthEncodedInt(v[p:]) + p += n + e.ColumnCharset = append(e.ColumnCharset, c) + } + case TABLE_MAP_OPT_META_COLUMN_NAME: p := 0 e.ColumnName = make([][]byte, 0, e.ColumnCount) @@ -273,10 +296,15 @@ func (e *TableMapEvent) Dump(w io.Writer) { fmt.Fprintf(w, "NULL bitmap: \n%s", hex.Dump(e.NullBitmap)) fmt.Fprintf(w, "Signedness bitmap: \n%s", hex.Dump(e.SignednessBitmap)) + fmt.Fprintf(w, "Default charset: %v\n", e.DefaultCharset) + fmt.Fprintf(w, "Column charset: %v\n", e.ColumnCharset) fmt.Fprintf(w, "Primary key: %v\n", e.PrimaryKey) fmt.Fprintf(w, "Primary key prefix: %v\n", e.PrimaryKeyPrefix) unsignedMap := e.UnsignedMap() + fmt.Fprintf(w, "UnsignedMap: %#v\n", unsignedMap) + collationMap := e.CollationMap() + fmt.Fprintf(w, "CollationMap: %#v\n", collationMap) nameMaxLen := 0 for _, name := range e.ColumnName { @@ -305,13 +333,19 @@ func (e *TableMapEvent) Dump(w io.Writer) { fmt.Fprintf(w, " type=%-3d", e.ColumnType[i]) if IsNumericType(e.ColumnType[i]) { - if unsignedMap == nil { + if len(unsignedMap) == 0 { fmt.Fprintf(w, " unsigned=") } else if unsignedMap[i] { fmt.Fprintf(w, " unsigned=yes") } else { fmt.Fprintf(w, " unsigned=no ") } + } else if IsCharacterType(e.ColumnType[i]) { + if len(collationMap) == 0 { + fmt.Fprintf(w, " collation=") + } else { + fmt.Fprintf(w, " collation=%d ", collationMap[i]) + } } available, nullable := e.Nullable(i) @@ -344,7 +378,7 @@ func (e *TableMapEvent) Nullable(i int) (available, nullable bool) { // UnsignedMap returns a map: column index -> unsigned. // Note that only numeric columns will be returned. -// If signedness bits are not available, nil is returned. +// nil is returned if not available or no numeric columns at all. func (e *TableMapEvent) UnsignedMap() map[int]bool { if len(e.SignednessBitmap) == 0 { return nil @@ -355,12 +389,64 @@ func (e *TableMapEvent) UnsignedMap() map[int]bool { if !IsNumericType(e.ColumnType[i]) { continue } + ret[i] = e.SignednessBitmap[p/8]&(1< collation id. +// Note that only character columns will be returned. +// nil is returned if not available or no character columns at all. +func (e *TableMapEvent) CollationMap() map[int]uint64 { + + ret := make(map[int]uint64) + + if len(e.DefaultCharset) != 0 { + defaultCollation := e.DefaultCharset[0] + + // character column index -> collation + collations := make(map[int]uint64) + for i := 1; i < len(e.DefaultCharset); i += 2 { + collations[int(e.DefaultCharset[i])] = e.DefaultCharset[i+1] + } + + p := 0 + for i := 0; i < int(e.ColumnCount); i++ { + if !IsCharacterType(e.ColumnType[i]) { + continue + } + + if collation, ok := collations[p]; ok { + ret[i] = collation + } else { + ret[i] = defaultCollation + } + p++ + } + + return ret + } + + if len(e.ColumnCharset) != 0 { + + p := 0 + for i := 0; i < int(e.ColumnCount); i++ { + if !IsCharacterType(e.ColumnType[i]) { + continue + } + + ret[i] = e.ColumnCharset[p] + p++ + } + + return ret + } + + return nil +} + // RowsEventStmtEndFlag is set in the end of the statement. const RowsEventStmtEndFlag = 0x01 diff --git a/replication/type.go b/replication/type.go new file mode 100644 index 000000000..d3a5b790b --- /dev/null +++ b/replication/type.go @@ -0,0 +1,40 @@ +package replication + +import ( + . "github.com/siddontang/go-mysql/mysql" +) + +// IsNumericType returns true if the given type is numeric type. From: sql/log_event.cc and sql/field.h +func IsNumericType(typ byte) bool { + switch typ { + case MYSQL_TYPE_TINY, + MYSQL_TYPE_SHORT, + MYSQL_TYPE_INT24, + MYSQL_TYPE_LONG, + MYSQL_TYPE_LONGLONG, + MYSQL_TYPE_FLOAT, + MYSQL_TYPE_DOUBLE, + MYSQL_TYPE_DECIMAL, + MYSQL_TYPE_NEWDECIMAL: + return true + + default: + return false + } + +} + +// IsCharacterType returns true if the given type is character type. From: sql/log_event.cc +func IsCharacterType(typ byte) bool { + switch typ { + case MYSQL_TYPE_STRING, + MYSQL_TYPE_VAR_STRING, + MYSQL_TYPE_VARCHAR, + MYSQL_TYPE_BLOB: + return true + + default: + return false + } + +} From fcf8f975c057c3a5782f64c36557faf6ddc0589f Mon Sep 17 00:00:00 2001 From: jayven Date: Sat, 1 Feb 2020 19:45:55 +0800 Subject: [PATCH 11/21] Minor fix: do not allocate collation map if not neccessary --- replication/row_event.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/replication/row_event.go b/replication/row_event.go index 2ea193593..4d61de112 100644 --- a/replication/row_event.go +++ b/replication/row_event.go @@ -401,8 +401,6 @@ func (e *TableMapEvent) UnsignedMap() map[int]bool { // nil is returned if not available or no character columns at all. func (e *TableMapEvent) CollationMap() map[int]uint64 { - ret := make(map[int]uint64) - if len(e.DefaultCharset) != 0 { defaultCollation := e.DefaultCharset[0] @@ -413,6 +411,7 @@ func (e *TableMapEvent) CollationMap() map[int]uint64 { } p := 0 + ret := make(map[int]uint64) for i := 0; i < int(e.ColumnCount); i++ { if !IsCharacterType(e.ColumnType[i]) { continue @@ -432,6 +431,7 @@ func (e *TableMapEvent) CollationMap() map[int]uint64 { if len(e.ColumnCharset) != 0 { p := 0 + ret := make(map[int]uint64) for i := 0; i < int(e.ColumnCount); i++ { if !IsCharacterType(e.ColumnType[i]) { continue From efddbd15a01250df00305de1e436f956213c32dd Mon Sep 17 00:00:00 2001 From: jayven Date: Sun, 8 Mar 2020 15:40:51 +0800 Subject: [PATCH 12/21] Remove BytesToUint64 since FixedLengthInt already serve the same purpose --- mysql/util.go | 31 ------------------------------- replication/event.go | 4 ++-- 2 files changed, 2 insertions(+), 33 deletions(-) diff --git a/mysql/util.go b/mysql/util.go index b4f6df05a..5ab653227 100644 --- a/mysql/util.go +++ b/mysql/util.go @@ -259,37 +259,6 @@ func Uint64ToBytes(n uint64) []byte { } } -func BytesToUint64(b []byte) (u uint64) { - switch len(b) { - case 8: - u |= (uint64(b[7]) << 56) - fallthrough - case 7: - u |= (uint64(b[6]) << 48) - fallthrough - case 6: - u |= (uint64(b[5]) << 40) - fallthrough - case 5: - u |= (uint64(b[4]) << 32) - fallthrough - case 4: - u |= (uint64(b[3]) << 24) - fallthrough - case 3: - u |= (uint64(b[2]) << 16) - fallthrough - case 2: - u |= (uint64(b[1]) << 8) - fallthrough - case 1: - u |= uint64(b[0]) - default: - panic(fmt.Errorf("BytesToUint64 byte slice length must be in range [1, 8]")) - } - return -} - func FormatBinaryDate(n int, data []byte) ([]byte, error) { switch n { case 0: diff --git a/replication/event.go b/replication/event.go index 7bc8dd7b6..e1b7da37a 100644 --- a/replication/event.go +++ b/replication/event.go @@ -335,12 +335,12 @@ func (e *GTIDEvent) Decode(data []byte) error { if len(data)-pos < 7 { return nil } - e.ImmediateCommitTimestamp = BytesToUint64(data[pos : pos+7]) + e.ImmediateCommitTimestamp = FixedLengthInt(data[pos : pos+7]) pos += 7 if (e.ImmediateCommitTimestamp & (uint64(1) << 55)) != 0 { // If the most significant bit set, another 7 byte follows representing OriginalCommitTimestamp e.ImmediateCommitTimestamp &= ^(uint64(1) << 55) - e.OriginalCommitTimestamp = BytesToUint64(data[pos : pos+7]) + e.OriginalCommitTimestamp = FixedLengthInt(data[pos : pos+7]) pos += 7 } else { From 52c09af3dd949f0388ca5df19e0b259a36fd1d80 Mon Sep 17 00:00:00 2001 From: jayven Date: Sun, 8 Mar 2020 15:52:01 +0800 Subject: [PATCH 13/21] Add note for field type --- notes/field_type.md | 87 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 87 insertions(+) create mode 100644 notes/field_type.md diff --git a/notes/field_type.md b/notes/field_type.md new file mode 100644 index 000000000..6ceb1b42f --- /dev/null +++ b/notes/field_type.md @@ -0,0 +1,87 @@ + +There are several `types` for each field (column) type, such as `type()`/`real_type()/binlog_type()` ... + +By default, `real_type()` and `binlog_type()` are the same as `type()`: + +``` + // From mysql-8.0/sql/field.h Field + + // ... + virtual enum_field_types real_type() const { return type(); } + virtual enum_field_types binlog_type() const { + /* + Binlog stores field->type() as type code by default. + This puts MYSQL_TYPE_STRING in case of CHAR, VARCHAR, SET and ENUM, + with extra data type details put into metadata. + + We cannot store field->type() in case of temporal types with + fractional seconds: TIME(n), DATETIME(n) and TIMESTAMP(n), + because binlog records with MYSQL_TYPE_TIME, MYSQL_TYPE_DATETIME + type codes do not have metadata. + So for temporal data types with fractional seconds we'll store + real_type() type codes instead, i.e. + MYSQL_TYPE_TIME2, MYSQL_TYPE_DATETIME2, MYSQL_TYPE_TIMESTAMP2, + and put precision into metatada. + + Note: perhaps binlog should eventually be modified to store + real_type() instead of type() for all column types. + */ + return type(); + } + // ... +``` + +Here is a list collected from `mysql-8.0/sql/field.h`: + +``` ++------------------------------------------------------------+-------------------------------+-----------------------+------------------------+ +| Field | type() | real_type() | binlog_type() | ++------------------------------------------------------------+-------------------------------+-----------------------+------------------------+ +| | | | | +| Field (abstract) | | | | +| | | | | | +| +--Field_bit | MYSQL_TYPE_BIT | | | +| | +--Field_bit_as_char | | | | +| | | | | | +| +--Field_num (abstract) | | | | +| | | +--Field_real (abstract) | | | | +| | | +--Field_decimal | MYSQL_TYPE_DECIMAL | | | +| | | +--Field_float | MYSQL_TYPE_FLOAT | | | +| | | +--Field_double | MYSQL_TYPE_DOUBLE | | | +| | | | | | | +| | +--Field_new_decimal | MYSQL_TYPE_NEWDECIMAL | | | +| | +--Field_short | MYSQL_TYPE_SHORT | | | +| | +--Field_medium | MYSQL_TYPE_INT24 | | | +| | +--Field_long | MYSQL_TYPE_LONG | | | +| | +--Field_longlong | MYSQL_TYPE_LONGLONG | | | +| | +--Field_tiny | MYSQL_TYPE_TINY | | | +| | +--Field_year | MYSQL_TYPE_YEAR | | | +| | | | | | +| +--Field_str (abstract) | | | | +| | +--Field_longstr | | | | +| | | +--Field_string | MYSQL_TYPE_STRING | MYSQL_TYPE_STRING | | +| | | +--Field_varstring | MYSQL_TYPE_VARCHAR | MYSQL_TYPE_VARCHAR | | +| | | +--Field_blob | MYSQL_TYPE_BLOB | | | +| | | +--Field_geom | MYSQL_TYPE_GEOMETRY | | | +| | | +--Field_json | MYSQL_TYPE_JSON | | | +| | | +--Field_typed_array | real_type_to_type(m_elt_type) | m_elt_type | MYSQL_TYPE_TYPED_ARRAY | +| | | | | | | +| | +--Field_null | MYSQL_TYPE_NULL | | | +| | +--Field_enum | MYSQL_TYPE_STRING | MYSQL_TYPE_ENUM | | +| | +--Field_set | | MYSQL_TYPE_SET | | +| | | | | | +| +--Field_temporal (abstract) | | | | +| +--Field_time_common (abstract) | | | | +| | +--Field_time | MYSQL_TYPE_TIME | | | +| | +--Field_timef | MYSQL_TYPE_TIME | MYSQL_TYPE_TIME2 | MYSQL_TYPE_TIME2 | +| | | | | | +| +--Field_temporal_with_date (abstract) | | | | +| +--Field_newdate | MYSQL_TYPE_DATE | MYSQL_TYPE_NEWDATE | | +| +--Field_temporal_with_date_and_time (abstract) | | | | +| +--Field_timestamp | MYSQL_TYPE_TIMESTAMP | | | +| +--Field_datetime | MYSQL_TYPE_DATETIME | | | +| +--Field_temporal_with_date_and_timef (abstract) | | | | +| +--Field_timestampf | MYSQL_TYPE_TIMESTAMP | MYSQL_TYPE_TIMESTAMP2 | MYSQL_TYPE_TIMESTAMP2 | +| +--Field_datetimef | MYSQL_TYPE_DATETIME | MYSQL_TYPE_DATETIME2 | MYSQL_TYPE_DATETIME2 | ++------------------------------------------------------------+-------------------------------+-----------------------+------------------------+ +``` From 41ecfa61734cc0357be7449ff0ff634279242ac9 Mon Sep 17 00:00:00 2001 From: jayven Date: Sun, 8 Mar 2020 16:56:14 +0800 Subject: [PATCH 14/21] Fix CollationMap error when the table contains enum or set fields --- replication/row_event.go | 56 +++++++++++++++++++++++++++++++++++++--- 1 file changed, 52 insertions(+), 4 deletions(-) diff --git a/replication/row_event.go b/replication/row_event.go index 4d61de112..3e0891e3e 100644 --- a/replication/row_event.go +++ b/replication/row_event.go @@ -330,7 +330,7 @@ func (e *TableMapEvent) Dump(w io.Writer) { fmt.Fprintf(w, nameFmt, e.ColumnName[i]) } - fmt.Fprintf(w, " type=%-3d", e.ColumnType[i]) + fmt.Fprintf(w, " type=%-3d", e.realType(i)) if IsNumericType(e.ColumnType[i]) { if len(unsignedMap) == 0 { @@ -340,7 +340,7 @@ func (e *TableMapEvent) Dump(w io.Writer) { } else { fmt.Fprintf(w, " unsigned=no ") } - } else if IsCharacterType(e.ColumnType[i]) { + } else if e.isCharacterField(i) { if len(collationMap) == 0 { fmt.Fprintf(w, " collation=") } else { @@ -413,7 +413,7 @@ func (e *TableMapEvent) CollationMap() map[int]uint64 { p := 0 ret := make(map[int]uint64) for i := 0; i < int(e.ColumnCount); i++ { - if !IsCharacterType(e.ColumnType[i]) { + if !e.isCharacterField(i) { continue } @@ -433,7 +433,7 @@ func (e *TableMapEvent) CollationMap() map[int]uint64 { p := 0 ret := make(map[int]uint64) for i := 0; i < int(e.ColumnCount); i++ { - if !IsCharacterType(e.ColumnType[i]) { + if !e.isCharacterField(i) { continue } @@ -447,6 +447,54 @@ func (e *TableMapEvent) CollationMap() map[int]uint64 { return nil } +// Get the `real_type` of column i. Note that types stored in ColumnType are the `binlog_type`. +// See: mysql-8.0/sql/rpl_utility.h table_def::type +// Also see: notes/field_type.md +func (e *TableMapEvent) realType(i int) byte { + + typ := e.ColumnType[i] + meta := e.ColumnMeta[i] + + switch typ { + case MYSQL_TYPE_STRING: + realTyp := byte(meta >> 8) + if realTyp == MYSQL_TYPE_ENUM || realTyp == MYSQL_TYPE_SET { + return realTyp + } + + case MYSQL_TYPE_DATE: + return MYSQL_TYPE_NEWDATE + + } + + return typ + +} + +// Returns true if the i-th column is numeric field. +// See: mysql-8.0/sql/log_event.cc is_numeric_field +func (e *TableMapEvent) isNumericField(i int) bool { + return IsNumericType(e.ColumnType[i]) +} + +// Returns true if the i-th column is character field. +// See: mysql-8.0/sql/log_event.cc is_character_field +func (e *TableMapEvent) isCharacterField(i int) bool { + return IsCharacterType(e.realType(i)) +} + +// Returns true if the i-th column is enum field. +// See: mysql-8.0/sql/log_event.cc is_enum_field +func (e *TableMapEvent) isEnumField(i int) bool { + return e.realType(i) == MYSQL_TYPE_ENUM +} + +// Returns true if the i-th column is set field. +// See: mysql-8.0/sql/log_event.cc is_set_field +func (e *TableMapEvent) isSetField(i int) bool { + return e.realType(i) == MYSQL_TYPE_SET +} + // RowsEventStmtEndFlag is set in the end of the statement. const RowsEventStmtEndFlag = 0x01 From 3e4d0b3bc90cb37d1919ebbb81413a277762b7c7 Mon Sep 17 00:00:00 2001 From: jayven Date: Sun, 8 Mar 2020 17:43:47 +0800 Subject: [PATCH 15/21] Refactor decodeOptionalMeta --- replication/row_event.go | 116 ++++++++++++++++++++++++++------------- 1 file changed, 78 insertions(+), 38 deletions(-) diff --git a/replication/row_event.go b/replication/row_event.go index 3e0891e3e..1e785ff3d 100644 --- a/replication/row_event.go +++ b/replication/row_event.go @@ -40,7 +40,7 @@ type TableMapEvent struct { SignednessBitmap []byte - // DefaultCharset[0] is the default collation; + // DefaultCharset[0] is the default collation of character columns. // For character columns that have different charset, // (character column index, column collation) pairs follows DefaultCharset []uint64 @@ -206,7 +206,7 @@ func (e *TableMapEvent) decodeMeta(data []byte) error { return nil } -func (e *TableMapEvent) decodeOptionalMeta(data []byte) error { +func (e *TableMapEvent) decodeOptionalMeta(data []byte) (err error) { pos := 0 for pos < len(data) { @@ -227,53 +227,30 @@ func (e *TableMapEvent) decodeOptionalMeta(data []byte) error { e.SignednessBitmap = v case TABLE_MAP_OPT_META_DEFAULT_CHARSET: - p := 0 - for p < len(v) { - c, _, n := LengthEncodedInt(v[p:]) - p += n - e.DefaultCharset = append(e.DefaultCharset, c) + e.DefaultCharset, err = e.decodeDefaultCharset(v) + if err != nil { + return err } case TABLE_MAP_OPT_META_COLUMN_CHARSET: - p := 0 - for p < len(v) { - c, _, n := LengthEncodedInt(v[p:]) - p += n - e.ColumnCharset = append(e.ColumnCharset, c) + e.ColumnCharset, err = e.decodeColumnCharset(v) + if err != nil { + return err } case TABLE_MAP_OPT_META_COLUMN_NAME: - p := 0 - e.ColumnName = make([][]byte, 0, e.ColumnCount) - for p < len(v) { - n := int(v[p]) - p++ - e.ColumnName = append(e.ColumnName, v[p:p+n]) - p += n - } - - if len(e.ColumnName) != int(e.ColumnCount) { - return errors.Errorf("Expect %d column names but got %d", e.ColumnCount, len(e.ColumnName)) + if err = e.decodeColumnNames(v); err != nil { + return err } case TABLE_MAP_OPT_META_SIMPLE_PRIMARY_KEY: - p := 0 - for p < len(v) { - i, _, n := LengthEncodedInt(v[p:]) - e.PrimaryKey = append(e.PrimaryKey, i) - e.PrimaryKeyPrefix = append(e.PrimaryKeyPrefix, 0) - p += n + if err = e.decodeSimplePrimaryKey(v); err != nil { + return err } case TABLE_MAP_OPT_META_PRIMARY_KEY_WITH_PREFIX: - p := 0 - for p < len(v) { - i, _, n := LengthEncodedInt(v[p:]) - e.PrimaryKey = append(e.PrimaryKey, i) - p += n - i, _, n = LengthEncodedInt(v[p:]) - e.PrimaryKeyPrefix = append(e.PrimaryKeyPrefix, i) - p += n + if err = e.decodePrimaryKeyWithPrefix(v); err != nil { + return err } default: @@ -285,6 +262,70 @@ func (e *TableMapEvent) decodeOptionalMeta(data []byte) error { return nil } +func (e *TableMapEvent) decodeDefaultCharset(v []byte) (ret []uint64, err error) { + p := 0 + for p < len(v) { + c, _, n := LengthEncodedInt(v[p:]) + p += n + ret = append(ret, c) + } + + if len(ret)%2 != 1 { + return nil, errors.Errorf("Expect odd item in DefaultCharset but got %d", len(ret)) + } + return +} + +func (e *TableMapEvent) decodeColumnCharset(v []byte) (ret []uint64, err error) { + p := 0 + for p < len(v) { + c, _, n := LengthEncodedInt(v[p:]) + p += n + ret = append(ret, c) + } + return +} + +func (e *TableMapEvent) decodeColumnNames(v []byte) error { + p := 0 + e.ColumnName = make([][]byte, 0, e.ColumnCount) + for p < len(v) { + n := int(v[p]) + p++ + e.ColumnName = append(e.ColumnName, v[p:p+n]) + p += n + } + + if len(e.ColumnName) != int(e.ColumnCount) { + return errors.Errorf("Expect %d column names but got %d", e.ColumnCount, len(e.ColumnName)) + } + return nil +} + +func (e *TableMapEvent) decodeSimplePrimaryKey(v []byte) error { + p := 0 + for p < len(v) { + i, _, n := LengthEncodedInt(v[p:]) + e.PrimaryKey = append(e.PrimaryKey, i) + e.PrimaryKeyPrefix = append(e.PrimaryKeyPrefix, 0) + p += n + } + return nil +} + +func (e *TableMapEvent) decodePrimaryKeyWithPrefix(v []byte) error { + p := 0 + for p < len(v) { + i, _, n := LengthEncodedInt(v[p:]) + e.PrimaryKey = append(e.PrimaryKey, i) + p += n + i, _, n = LengthEncodedInt(v[p:]) + e.PrimaryKeyPrefix = append(e.PrimaryKeyPrefix, i) + p += n + } + return nil +} + func (e *TableMapEvent) Dump(w io.Writer) { fmt.Fprintf(w, "TableID: %d\n", e.TableID) fmt.Fprintf(w, "TableID size: %d\n", e.tableIDSize) @@ -464,7 +505,6 @@ func (e *TableMapEvent) realType(i int) byte { case MYSQL_TYPE_DATE: return MYSQL_TYPE_NEWDATE - } return typ From 6881af6ced8a136060bf32099be713b3bac7072f Mon Sep 17 00:00:00 2001 From: jayven Date: Mon, 9 Mar 2020 08:07:24 +0800 Subject: [PATCH 16/21] Add support for other table map optional meta as well --- replication/row_event.go | 285 +++++++++++++++++++++++++++++++++------ 1 file changed, 242 insertions(+), 43 deletions(-) diff --git a/replication/row_event.go b/replication/row_event.go index 1e785ff3d..b04a8f24b 100644 --- a/replication/row_event.go +++ b/replication/row_event.go @@ -35,11 +35,18 @@ type TableMapEvent struct { //len = (ColumnCount + 7) / 8 NullBitmap []byte - // The followings are available only after MySQL-8.0.1, see: `--binlog_row_metadata` and - // https://mysqlhighavailability.com/more-metadata-is-written-into-binary-log/ + /* + The followings are available only after MySQL-8.0.1, see: `--binlog_row_metadata` and + https://mysqlhighavailability.com/more-metadata-is-written-into-binary-log/ + */ + // SignednessBitmap stores signedness info for numeric columns. + // Usually you should use UnsignedMap() instead. SignednessBitmap []byte + // DefaultCharset/ColumnCharset stores collation info for character columns. + // Usually you should use CollationMap() instead. + // DefaultCharset[0] is the default collation of character columns. // For character columns that have different charset, // (character column index, column collation) pairs follows @@ -47,9 +54,32 @@ type TableMapEvent struct { // ColumnCharset contains collation sequence for all character columns ColumnCharset []uint64 - ColumnName [][]byte - PrimaryKey []uint64 // A sequence of column indexes - PrimaryKeyPrefix []uint64 // Prefix length 0 means that the whole column value is used + // SetStrValue stores values for set columns. + // Usually you should use SetStrValueMap() instead. + SetStrValue [][][]byte + + // EnumStrValue stores values for enum columns. + // Usually you should use EnumStrValueMap() instead. + EnumStrValue [][][]byte + + // ColumnName list all column names. + ColumnName [][]byte + + // GeometryType stores real type for geometry columns. + // Usually you should use GeometryTypeMap() instead. + GeometryType []uint64 + + // PrimaryKey is a sequence of column indexes of primary key. + PrimaryKey []uint64 + + // PrimaryKeyPrefix is the prefix length used for each column of primary key. + // 0 means that the whole column length is used. + PrimaryKeyPrefix []uint64 + + // EnumSetDefaultCharset/EnumSetColumnCharset is similar to DefaultCharset/ColumnCharset but for enum/set columns. + // Usually you should use EnumSetCollationMap() instead. + EnumSetDefaultCharset []uint64 + EnumSetColumnCharset []uint64 } func (e *TableMapEvent) Decode(data []byte) error { @@ -233,7 +263,7 @@ func (e *TableMapEvent) decodeOptionalMeta(data []byte) (err error) { } case TABLE_MAP_OPT_META_COLUMN_CHARSET: - e.ColumnCharset, err = e.decodeColumnCharset(v) + e.ColumnCharset, err = e.decodeIntSeq(v) if err != nil { return err } @@ -243,6 +273,24 @@ func (e *TableMapEvent) decodeOptionalMeta(data []byte) (err error) { return err } + case TABLE_MAP_OPT_META_SET_STR_VALUE: + e.SetStrValue, err = e.decodeStrValue(v) + if err != nil { + return err + } + + case TABLE_MAP_OPT_META_ENUM_STR_VALUE: + e.EnumStrValue, err = e.decodeStrValue(v) + if err != nil { + return err + } + + case TABLE_MAP_OPT_META_GEOMETRY_TYPE: + e.GeometryType, err = e.decodeIntSeq(v) + if err != nil { + return err + } + case TABLE_MAP_OPT_META_SIMPLE_PRIMARY_KEY: if err = e.decodeSimplePrimaryKey(v); err != nil { return err @@ -253,35 +301,43 @@ func (e *TableMapEvent) decodeOptionalMeta(data []byte) (err error) { return err } + case TABLE_MAP_OPT_META_ENUM_AND_SET_DEFAULT_CHARSET: + e.EnumSetDefaultCharset, err = e.decodeDefaultCharset(v) + if err != nil { + return err + } + + case TABLE_MAP_OPT_META_ENUM_AND_SET_COLUMN_CHARSET: + e.EnumSetColumnCharset, err = e.decodeIntSeq(v) + if err != nil { + return err + } + default: - // TODO: other meta + // Ignore for future extension } - } return nil } -func (e *TableMapEvent) decodeDefaultCharset(v []byte) (ret []uint64, err error) { +func (e *TableMapEvent) decodeIntSeq(v []byte) (ret []uint64, err error) { p := 0 for p < len(v) { - c, _, n := LengthEncodedInt(v[p:]) + i, _, n := LengthEncodedInt(v[p:]) p += n - ret = append(ret, c) - } - - if len(ret)%2 != 1 { - return nil, errors.Errorf("Expect odd item in DefaultCharset but got %d", len(ret)) + ret = append(ret, i) } return } -func (e *TableMapEvent) decodeColumnCharset(v []byte) (ret []uint64, err error) { - p := 0 - for p < len(v) { - c, _, n := LengthEncodedInt(v[p:]) - p += n - ret = append(ret, c) +func (e *TableMapEvent) decodeDefaultCharset(v []byte) (ret []uint64, err error) { + ret, err = e.decodeIntSeq(v) + if err != nil { + return + } + if len(ret)%2 != 1 { + return nil, errors.Errorf("Expect odd item in DefaultCharset but got %d", len(ret)) } return } @@ -302,6 +358,25 @@ func (e *TableMapEvent) decodeColumnNames(v []byte) error { return nil } +func (e *TableMapEvent) decodeStrValue(v []byte) (ret [][][]byte, err error) { + p := 0 + for p < len(v) { + nVal, _, n := LengthEncodedInt(v[p:]) + p += n + vals := make([][]byte, 0, int(nVal)) + for i := 0; i < int(nVal); i++ { + val, _, n, err := LengthEncodedString(v[p:]) + if err != nil { + return nil, err + } + p += n + vals = append(vals, val) + } + ret = append(ret, vals) + } + return +} + func (e *TableMapEvent) decodeSimplePrimaryKey(v []byte) error { p := 0 for p < len(v) { @@ -339,14 +414,32 @@ func (e *TableMapEvent) Dump(w io.Writer) { fmt.Fprintf(w, "Signedness bitmap: \n%s", hex.Dump(e.SignednessBitmap)) fmt.Fprintf(w, "Default charset: %v\n", e.DefaultCharset) fmt.Fprintf(w, "Column charset: %v\n", e.ColumnCharset) + fmt.Fprintf(w, "Set str value: %v\n", e.SetStrValue) + fmt.Fprintf(w, "Enum str value: %v\n", e.EnumStrValue) + fmt.Fprintf(w, "Geometry type: %v\n", e.GeometryType) fmt.Fprintf(w, "Primary key: %v\n", e.PrimaryKey) fmt.Fprintf(w, "Primary key prefix: %v\n", e.PrimaryKeyPrefix) + fmt.Fprintf(w, "Enum/set default charset: %v\n", e.EnumSetDefaultCharset) + fmt.Fprintf(w, "Enum/set column charset: %v\n", e.EnumSetColumnCharset) unsignedMap := e.UnsignedMap() fmt.Fprintf(w, "UnsignedMap: %#v\n", unsignedMap) + collationMap := e.CollationMap() fmt.Fprintf(w, "CollationMap: %#v\n", collationMap) + enumSetCollationMap := e.EnumSetCollationMap() + fmt.Fprintf(w, "EnumSetCollationMap: %#v\n", enumSetCollationMap) + + enumStrValueMap := e.EnumStrValueMap() + fmt.Fprintf(w, "EnumStrValueMap: %#v\n", enumStrValueMap) + + setStrValueMap := e.SetStrValueMap() + fmt.Fprintf(w, "SetStrValueMap: %#v\n", setStrValueMap) + + geometryTypeMap := e.GeometryTypeMap() + fmt.Fprintf(w, "GeometryTypeMap: %#v\n", geometryTypeMap) + nameMaxLen := 0 for _, name := range e.ColumnName { if len(name) > nameMaxLen { @@ -373,7 +466,7 @@ func (e *TableMapEvent) Dump(w io.Writer) { fmt.Fprintf(w, " type=%-3d", e.realType(i)) - if IsNumericType(e.ColumnType[i]) { + if e.IsNumericColumn(i) { if len(unsignedMap) == 0 { fmt.Fprintf(w, " unsigned=") } else if unsignedMap[i] { @@ -381,12 +474,42 @@ func (e *TableMapEvent) Dump(w io.Writer) { } else { fmt.Fprintf(w, " unsigned=no ") } - } else if e.isCharacterField(i) { + } else if e.IsCharacterColumn(i) { if len(collationMap) == 0 { fmt.Fprintf(w, " collation=") } else { fmt.Fprintf(w, " collation=%d ", collationMap[i]) } + } else if e.IsEnumColumn(i) { + if len(enumStrValueMap) == 0 { + fmt.Fprintf(w, " enum=") + } else { + fmt.Fprintf(w, " enum=%v", enumStrValueMap[i]) + } + + if len(enumSetCollationMap) == 0 { + fmt.Fprintf(w, " enum_collation=") + } else { + fmt.Fprintf(w, " enum_collation=%d", enumSetCollationMap[i]) + } + } else if e.IsSetColumn(i) { + if len(setStrValueMap) == 0 { + fmt.Fprintf(w, " set=") + } else { + fmt.Fprintf(w, " set=%v", setStrValueMap[i]) + } + + if len(enumSetCollationMap) == 0 { + fmt.Fprintf(w, " set_collation=") + } else { + fmt.Fprintf(w, " set_collation=%d", enumSetCollationMap[i]) + } + } else if e.IsGeometryColumn(i) { + if len(geometryTypeMap) == 0 { + fmt.Fprintf(w, " geometry_type=") + } else { + fmt.Fprintf(w, " geometry_type=%v", geometryTypeMap[i]) + } } available, nullable := e.Nullable(i) @@ -427,7 +550,7 @@ func (e *TableMapEvent) UnsignedMap() map[int]bool { p := 0 ret := make(map[int]bool) for i := 0; i < int(e.ColumnCount); i++ { - if !IsNumericType(e.ColumnType[i]) { + if !e.IsNumericColumn(i) { continue } @@ -441,20 +564,31 @@ func (e *TableMapEvent) UnsignedMap() map[int]bool { // Note that only character columns will be returned. // nil is returned if not available or no character columns at all. func (e *TableMapEvent) CollationMap() map[int]uint64 { + return e.collationMap(e.IsCharacterColumn, e.DefaultCharset, e.ColumnCharset) +} + +// EnumSetCollationMap returns a map: column index -> collation id. +// Note that only enum or set columns will be returned. +// nil is returned if not available or no enum/set columns at all. +func (e *TableMapEvent) EnumSetCollationMap() map[int]uint64 { + return e.collationMap(e.IsEnumOrSetColumn, e.EnumSetDefaultCharset, e.EnumSetColumnCharset) +} - if len(e.DefaultCharset) != 0 { - defaultCollation := e.DefaultCharset[0] +func (e *TableMapEvent) collationMap(includeType func(int) bool, defaultCharset, columnCharset []uint64) map[int]uint64 { + + if len(defaultCharset) != 0 { + defaultCollation := defaultCharset[0] // character column index -> collation collations := make(map[int]uint64) - for i := 1; i < len(e.DefaultCharset); i += 2 { - collations[int(e.DefaultCharset[i])] = e.DefaultCharset[i+1] + for i := 1; i < len(defaultCharset); i += 2 { + collations[int(defaultCharset[i])] = defaultCharset[i+1] } p := 0 ret := make(map[int]uint64) for i := 0; i < int(e.ColumnCount); i++ { - if !e.isCharacterField(i) { + if !includeType(i) { continue } @@ -469,16 +603,16 @@ func (e *TableMapEvent) CollationMap() map[int]uint64 { return ret } - if len(e.ColumnCharset) != 0 { + if len(columnCharset) != 0 { p := 0 ret := make(map[int]uint64) for i := 0; i < int(e.ColumnCount); i++ { - if !e.isCharacterField(i) { + if !includeType(i) { continue } - ret[i] = e.ColumnCharset[p] + ret[i] = columnCharset[p] p++ } @@ -488,6 +622,59 @@ func (e *TableMapEvent) CollationMap() map[int]uint64 { return nil } +// EnumStrValueMap returns a map: column index -> enum string value. +// Note that only enum columns will be returned. +// nil is returned if not available or no enum columns at all. +func (e *TableMapEvent) EnumStrValueMap() map[int][]string { + return e.strValueMap(e.EnumStrValue, e.IsEnumColumn) +} + +// SetStrValueMap returns a map: column index -> set string value. +// Note that only set columns will be returned. +// nil is returned if not available or no set columns at all. +func (e *TableMapEvent) SetStrValueMap() map[int][]string { + return e.strValueMap(e.SetStrValue, e.IsSetColumn) +} + +func (e *TableMapEvent) strValueMap(strValue [][][]byte, includeType func(int) bool) map[int][]string { + if len(strValue) == 0 { + return nil + } + p := 0 + ret := make(map[int][]string) + for i := 0; i < int(e.ColumnCount); i++ { + if !includeType(i) { + continue + } + vals := []string{} + for _, val := range strValue[p] { + vals = append(vals, string(val)) + } + ret[i] = vals + } + return ret +} + +// GeometryTypeMap returns a map: column index -> geometry type. +// Note that only geometry columns will be returned. +// nil is returned if not available or no geometry columns at all. +func (e *TableMapEvent) GeometryTypeMap() map[int]uint64 { + if len(e.GeometryType) == 0 { + return nil + } + p := 0 + ret := make(map[int]uint64) + for i := 0; i < int(e.ColumnCount); i++ { + if !e.IsGeometryColumn(i) { + continue + } + + ret[i] = e.GeometryType[p] + p++ + } + return ret +} + // Get the `real_type` of column i. Note that types stored in ColumnType are the `binlog_type`. // See: mysql-8.0/sql/rpl_utility.h table_def::type // Also see: notes/field_type.md @@ -498,9 +685,9 @@ func (e *TableMapEvent) realType(i int) byte { switch typ { case MYSQL_TYPE_STRING: - realTyp := byte(meta >> 8) - if realTyp == MYSQL_TYPE_ENUM || realTyp == MYSQL_TYPE_SET { - return realTyp + rt := byte(meta >> 8) + if rt == MYSQL_TYPE_ENUM || rt == MYSQL_TYPE_SET { + return rt } case MYSQL_TYPE_DATE: @@ -511,30 +698,42 @@ func (e *TableMapEvent) realType(i int) byte { } -// Returns true if the i-th column is numeric field. +// IsNumericColumn returns true if the i-th column is numeric field. // See: mysql-8.0/sql/log_event.cc is_numeric_field -func (e *TableMapEvent) isNumericField(i int) bool { +func (e *TableMapEvent) IsNumericColumn(i int) bool { return IsNumericType(e.ColumnType[i]) } -// Returns true if the i-th column is character field. +// IsCharacterColumn returns true if the i-th column is character field. // See: mysql-8.0/sql/log_event.cc is_character_field -func (e *TableMapEvent) isCharacterField(i int) bool { +func (e *TableMapEvent) IsCharacterColumn(i int) bool { return IsCharacterType(e.realType(i)) } -// Returns true if the i-th column is enum field. +// IsEnumColumn returns true if the i-th column is enum field. // See: mysql-8.0/sql/log_event.cc is_enum_field -func (e *TableMapEvent) isEnumField(i int) bool { +func (e *TableMapEvent) IsEnumColumn(i int) bool { return e.realType(i) == MYSQL_TYPE_ENUM } -// Returns true if the i-th column is set field. +// IsSetColumn returns true if the i-th column is set field. // See: mysql-8.0/sql/log_event.cc is_set_field -func (e *TableMapEvent) isSetField(i int) bool { +func (e *TableMapEvent) IsSetColumn(i int) bool { return e.realType(i) == MYSQL_TYPE_SET } +// IsEnumOrSetColumn returns true if the i-th column is enum or set field. +func (e *TableMapEvent) IsEnumOrSetColumn(i int) bool { + rt := e.realType(i) + return rt == MYSQL_TYPE_ENUM || rt == MYSQL_TYPE_SET +} + +// IsGeometryColumn returns true if the i-th column is geometry field. +// See: mysql-8.0/sql/log_event.cc is_geometry_field +func (e *TableMapEvent) IsGeometryColumn(i int) bool { + return e.realType(i) == MYSQL_TYPE_GEOMETRY +} + // RowsEventStmtEndFlag is set in the end of the statement. const RowsEventStmtEndFlag = 0x01 From 2a96a8c0c821962f58c21e8f8b77f5087502ecc0 Mon Sep 17 00:00:00 2001 From: jayven Date: Tue, 10 Mar 2020 08:02:33 +0800 Subject: [PATCH 17/21] Remove helper functions in TableMapEvent now. Need more investigation on types to implement them correctly. --- replication/row_event.go | 346 ++++++--------------------------------- 1 file changed, 52 insertions(+), 294 deletions(-) diff --git a/replication/row_event.go b/replication/row_event.go index b04a8f24b..ceddcc5a2 100644 --- a/replication/row_event.go +++ b/replication/row_event.go @@ -36,16 +36,17 @@ type TableMapEvent struct { NullBitmap []byte /* - The followings are available only after MySQL-8.0.1, see: `--binlog_row_metadata` and - https://mysqlhighavailability.com/more-metadata-is-written-into-binary-log/ + The followings are available only after MySQL-8.0.1 or MariaDB-10.5.0 + see: + - https://dev.mysql.com/doc/refman/8.0/en/replication-options-binary-log.html#sysvar_binlog_row_metadata + - https://mysqlhighavailability.com/more-metadata-is-written-into-binary-log/ + - https://jira.mariadb.org/browse/MDEV-20477 */ // SignednessBitmap stores signedness info for numeric columns. - // Usually you should use UnsignedMap() instead. SignednessBitmap []byte // DefaultCharset/ColumnCharset stores collation info for character columns. - // Usually you should use CollationMap() instead. // DefaultCharset[0] is the default collation of character columns. // For character columns that have different charset, @@ -55,18 +56,18 @@ type TableMapEvent struct { ColumnCharset []uint64 // SetStrValue stores values for set columns. - // Usually you should use SetStrValueMap() instead. - SetStrValue [][][]byte + SetStrValue [][][]byte + setStrValueString [][]string // EnumStrValue stores values for enum columns. - // Usually you should use EnumStrValueMap() instead. - EnumStrValue [][][]byte + EnumStrValue [][][]byte + enumStrValueString [][]string // ColumnName list all column names. - ColumnName [][]byte + ColumnName [][]byte + columnNameString []string // GeometryType stores real type for geometry columns. - // Usually you should use GeometryTypeMap() instead. GeometryType []uint64 // PrimaryKey is a sequence of column indexes of primary key. @@ -77,7 +78,6 @@ type TableMapEvent struct { PrimaryKeyPrefix []uint64 // EnumSetDefaultCharset/EnumSetColumnCharset is similar to DefaultCharset/ColumnCharset but for enum/set columns. - // Usually you should use EnumSetCollationMap() instead. EnumSetDefaultCharset []uint64 EnumSetColumnCharset []uint64 } @@ -414,119 +414,15 @@ func (e *TableMapEvent) Dump(w io.Writer) { fmt.Fprintf(w, "Signedness bitmap: \n%s", hex.Dump(e.SignednessBitmap)) fmt.Fprintf(w, "Default charset: %v\n", e.DefaultCharset) fmt.Fprintf(w, "Column charset: %v\n", e.ColumnCharset) - fmt.Fprintf(w, "Set str value: %v\n", e.SetStrValue) - fmt.Fprintf(w, "Enum str value: %v\n", e.EnumStrValue) + fmt.Fprintf(w, "Set str value: %v\n", e.SetStrValueString()) + fmt.Fprintf(w, "Enum str value: %v\n", e.EnumStrValueString()) + fmt.Fprintf(w, "Column name: %v\n", e.ColumnNameString()) fmt.Fprintf(w, "Geometry type: %v\n", e.GeometryType) fmt.Fprintf(w, "Primary key: %v\n", e.PrimaryKey) fmt.Fprintf(w, "Primary key prefix: %v\n", e.PrimaryKeyPrefix) fmt.Fprintf(w, "Enum/set default charset: %v\n", e.EnumSetDefaultCharset) fmt.Fprintf(w, "Enum/set column charset: %v\n", e.EnumSetColumnCharset) - unsignedMap := e.UnsignedMap() - fmt.Fprintf(w, "UnsignedMap: %#v\n", unsignedMap) - - collationMap := e.CollationMap() - fmt.Fprintf(w, "CollationMap: %#v\n", collationMap) - - enumSetCollationMap := e.EnumSetCollationMap() - fmt.Fprintf(w, "EnumSetCollationMap: %#v\n", enumSetCollationMap) - - enumStrValueMap := e.EnumStrValueMap() - fmt.Fprintf(w, "EnumStrValueMap: %#v\n", enumStrValueMap) - - setStrValueMap := e.SetStrValueMap() - fmt.Fprintf(w, "SetStrValueMap: %#v\n", setStrValueMap) - - geometryTypeMap := e.GeometryTypeMap() - fmt.Fprintf(w, "GeometryTypeMap: %#v\n", geometryTypeMap) - - nameMaxLen := 0 - for _, name := range e.ColumnName { - if len(name) > nameMaxLen { - nameMaxLen = len(name) - } - } - nameFmt := " %s" - if nameMaxLen > 0 { - nameFmt = fmt.Sprintf(" %%-%ds", nameMaxLen) - } - - primaryKey := map[int]struct{}{} - for _, pk := range e.PrimaryKey { - primaryKey[int(pk)] = struct{}{} - } - - fmt.Fprintf(w, "Columns: \n") - for i := 0; i < int(e.ColumnCount); i++ { - if len(e.ColumnName) == 0 { - fmt.Fprintf(w, nameFmt, "") - } else { - fmt.Fprintf(w, nameFmt, e.ColumnName[i]) - } - - fmt.Fprintf(w, " type=%-3d", e.realType(i)) - - if e.IsNumericColumn(i) { - if len(unsignedMap) == 0 { - fmt.Fprintf(w, " unsigned=") - } else if unsignedMap[i] { - fmt.Fprintf(w, " unsigned=yes") - } else { - fmt.Fprintf(w, " unsigned=no ") - } - } else if e.IsCharacterColumn(i) { - if len(collationMap) == 0 { - fmt.Fprintf(w, " collation=") - } else { - fmt.Fprintf(w, " collation=%d ", collationMap[i]) - } - } else if e.IsEnumColumn(i) { - if len(enumStrValueMap) == 0 { - fmt.Fprintf(w, " enum=") - } else { - fmt.Fprintf(w, " enum=%v", enumStrValueMap[i]) - } - - if len(enumSetCollationMap) == 0 { - fmt.Fprintf(w, " enum_collation=") - } else { - fmt.Fprintf(w, " enum_collation=%d", enumSetCollationMap[i]) - } - } else if e.IsSetColumn(i) { - if len(setStrValueMap) == 0 { - fmt.Fprintf(w, " set=") - } else { - fmt.Fprintf(w, " set=%v", setStrValueMap[i]) - } - - if len(enumSetCollationMap) == 0 { - fmt.Fprintf(w, " set_collation=") - } else { - fmt.Fprintf(w, " set_collation=%d", enumSetCollationMap[i]) - } - } else if e.IsGeometryColumn(i) { - if len(geometryTypeMap) == 0 { - fmt.Fprintf(w, " geometry_type=") - } else { - fmt.Fprintf(w, " geometry_type=%v", geometryTypeMap[i]) - } - } - - available, nullable := e.Nullable(i) - if !available { - fmt.Fprintf(w, " null=") - } else if nullable { - fmt.Fprintf(w, " null=yes") - } else { - fmt.Fprintf(w, " null=no ") - } - - if _, ok := primaryKey[i]; ok { - fmt.Fprintf(w, " pri") - } - - fmt.Fprintf(w, "\n") - } fmt.Fprintln(w) } @@ -540,200 +436,62 @@ func (e *TableMapEvent) Nullable(i int) (available, nullable bool) { return true, e.NullBitmap[i/8]&(1< unsigned. -// Note that only numeric columns will be returned. -// nil is returned if not available or no numeric columns at all. -func (e *TableMapEvent) UnsignedMap() map[int]bool { - if len(e.SignednessBitmap) == 0 { - return nil - } - p := 0 - ret := make(map[int]bool) - for i := 0; i < int(e.ColumnCount); i++ { - if !e.IsNumericColumn(i) { - continue +// SetStrValueString returns values for set columns as string slices. +// nil is returned if not available or no set columns at all. +func (e *TableMapEvent) SetStrValueString() [][]string { + if e.setStrValueString == nil { + if len(e.SetStrValue) == 0 { + return nil + } + e.setStrValueString = make([][]string, 0, len(e.SetStrValue)) + for _, vals := range e.SetStrValue { + e.setStrValueString = append( + e.setStrValueString, + e.bytesSlice2StrSlice(vals), + ) } - - ret[i] = e.SignednessBitmap[p/8]&(1< collation id. -// Note that only character columns will be returned. -// nil is returned if not available or no character columns at all. -func (e *TableMapEvent) CollationMap() map[int]uint64 { - return e.collationMap(e.IsCharacterColumn, e.DefaultCharset, e.ColumnCharset) -} - -// EnumSetCollationMap returns a map: column index -> collation id. -// Note that only enum or set columns will be returned. -// nil is returned if not available or no enum/set columns at all. -func (e *TableMapEvent) EnumSetCollationMap() map[int]uint64 { - return e.collationMap(e.IsEnumOrSetColumn, e.EnumSetDefaultCharset, e.EnumSetColumnCharset) -} - -func (e *TableMapEvent) collationMap(includeType func(int) bool, defaultCharset, columnCharset []uint64) map[int]uint64 { - - if len(defaultCharset) != 0 { - defaultCollation := defaultCharset[0] - - // character column index -> collation - collations := make(map[int]uint64) - for i := 1; i < len(defaultCharset); i += 2 { - collations[int(defaultCharset[i])] = defaultCharset[i+1] - } - - p := 0 - ret := make(map[int]uint64) - for i := 0; i < int(e.ColumnCount); i++ { - if !includeType(i) { - continue - } - - if collation, ok := collations[p]; ok { - ret[i] = collation - } else { - ret[i] = defaultCollation - } - p++ +// EnumStrValueString returns values for enum columns as string slices. +// nil is returned if not available or no enum columns at all. +func (e *TableMapEvent) EnumStrValueString() [][]string { + if e.enumStrValueString == nil { + if len(e.EnumStrValue) == 0 { + return nil } - - return ret - } - - if len(columnCharset) != 0 { - - p := 0 - ret := make(map[int]uint64) - for i := 0; i < int(e.ColumnCount); i++ { - if !includeType(i) { - continue - } - - ret[i] = columnCharset[p] - p++ + e.enumStrValueString = make([][]string, 0, len(e.EnumStrValue)) + for _, vals := range e.EnumStrValue { + e.enumStrValueString = append( + e.enumStrValueString, + e.bytesSlice2StrSlice(vals), + ) } - - return ret } - - return nil + return e.enumStrValueString } -// EnumStrValueMap returns a map: column index -> enum string value. -// Note that only enum columns will be returned. -// nil is returned if not available or no enum columns at all. -func (e *TableMapEvent) EnumStrValueMap() map[int][]string { - return e.strValueMap(e.EnumStrValue, e.IsEnumColumn) -} - -// SetStrValueMap returns a map: column index -> set string value. -// Note that only set columns will be returned. -// nil is returned if not available or no set columns at all. -func (e *TableMapEvent) SetStrValueMap() map[int][]string { - return e.strValueMap(e.SetStrValue, e.IsSetColumn) -} - -func (e *TableMapEvent) strValueMap(strValue [][][]byte, includeType func(int) bool) map[int][]string { - if len(strValue) == 0 { - return nil +// ColumnNameString returns column names as string slice. +// nil is returned if not available. +func (e *TableMapEvent) ColumnNameString() []string { + if e.columnNameString == nil { + e.columnNameString = e.bytesSlice2StrSlice(e.ColumnName) } - p := 0 - ret := make(map[int][]string) - for i := 0; i < int(e.ColumnCount); i++ { - if !includeType(i) { - continue - } - vals := []string{} - for _, val := range strValue[p] { - vals = append(vals, string(val)) - } - ret[i] = vals - } - return ret + return e.columnNameString } -// GeometryTypeMap returns a map: column index -> geometry type. -// Note that only geometry columns will be returned. -// nil is returned if not available or no geometry columns at all. -func (e *TableMapEvent) GeometryTypeMap() map[int]uint64 { - if len(e.GeometryType) == 0 { +func (e *TableMapEvent) bytesSlice2StrSlice(src [][]byte) []string { + if src == nil { return nil } - p := 0 - ret := make(map[int]uint64) - for i := 0; i < int(e.ColumnCount); i++ { - if !e.IsGeometryColumn(i) { - continue - } - - ret[i] = e.GeometryType[p] - p++ + ret := make([]string, 0, len(src)) + for _, item := range src { + ret = append(ret, string(item)) } return ret } -// Get the `real_type` of column i. Note that types stored in ColumnType are the `binlog_type`. -// See: mysql-8.0/sql/rpl_utility.h table_def::type -// Also see: notes/field_type.md -func (e *TableMapEvent) realType(i int) byte { - - typ := e.ColumnType[i] - meta := e.ColumnMeta[i] - - switch typ { - case MYSQL_TYPE_STRING: - rt := byte(meta >> 8) - if rt == MYSQL_TYPE_ENUM || rt == MYSQL_TYPE_SET { - return rt - } - - case MYSQL_TYPE_DATE: - return MYSQL_TYPE_NEWDATE - } - - return typ - -} - -// IsNumericColumn returns true if the i-th column is numeric field. -// See: mysql-8.0/sql/log_event.cc is_numeric_field -func (e *TableMapEvent) IsNumericColumn(i int) bool { - return IsNumericType(e.ColumnType[i]) -} - -// IsCharacterColumn returns true if the i-th column is character field. -// See: mysql-8.0/sql/log_event.cc is_character_field -func (e *TableMapEvent) IsCharacterColumn(i int) bool { - return IsCharacterType(e.realType(i)) -} - -// IsEnumColumn returns true if the i-th column is enum field. -// See: mysql-8.0/sql/log_event.cc is_enum_field -func (e *TableMapEvent) IsEnumColumn(i int) bool { - return e.realType(i) == MYSQL_TYPE_ENUM -} - -// IsSetColumn returns true if the i-th column is set field. -// See: mysql-8.0/sql/log_event.cc is_set_field -func (e *TableMapEvent) IsSetColumn(i int) bool { - return e.realType(i) == MYSQL_TYPE_SET -} - -// IsEnumOrSetColumn returns true if the i-th column is enum or set field. -func (e *TableMapEvent) IsEnumOrSetColumn(i int) bool { - rt := e.realType(i) - return rt == MYSQL_TYPE_ENUM || rt == MYSQL_TYPE_SET -} - -// IsGeometryColumn returns true if the i-th column is geometry field. -// See: mysql-8.0/sql/log_event.cc is_geometry_field -func (e *TableMapEvent) IsGeometryColumn(i int) bool { - return e.realType(i) == MYSQL_TYPE_GEOMETRY -} - // RowsEventStmtEndFlag is set in the end of the statement. const RowsEventStmtEndFlag = 0x01 From 02e8369096c073bcb3cb6cbf923224ff0cc8f0f2 Mon Sep 17 00:00:00 2001 From: jayven Date: Tue, 10 Mar 2020 08:28:49 +0800 Subject: [PATCH 18/21] Add some test cases for TabeMapEvent optional meta decode --- replication/row_event_test.go | 214 ++++++++++++++++++++++++++++++++++ 1 file changed, 214 insertions(+) diff --git a/replication/row_event_test.go b/replication/row_event_test.go index 8b066bad6..d608915c5 100644 --- a/replication/row_event_test.go +++ b/replication/row_event_test.go @@ -763,3 +763,217 @@ func (_ *testDecodeSuite) TestDecodeDatetime2(c *C) { } } } + +func (_ *testDecodeSuite) TestTableMapOptMetaNames(c *C) { + /* + CREATE TABLE `_types` ( + `b_bit` bit(64) NOT NULL DEFAULT b'0', + + `n_boolean` boolean not null default '0', + `n_tinyint` tinyint not null default '0', + `n_smallint` smallint not null default '0', + `n_mediumint` mediumint not null default '0', + `n_int` int not null default '0', + `n_bigint` bigint not null default '0', + `n_decimal` decimal(65,30) not null default '0', + `n_float` float not null default '0', + `n_double` double not null default '0', + + `nu_tinyint` tinyint unsigned not null default '0', + `nu_smallint` smallint unsigned not null default '0', + `nu_mediumint` mediumint unsigned not null default '0', + `nu_int` int unsigned not null default '0', + `nu_bigint` bigint unsigned not null default '0', + `nu_decimal` decimal(65,30) unsigned not null default '0', + `nu_float` float unsigned not null default '0', + `nu_double` double unsigned not null default '0', + + `t_year` year default null, + `t_date` date default null, + `t_time` time default null, + `t_ftime` time(6) default null, + `t_datetime` datetime default null, + `t_fdatetime` datetime(6) default null, + `t_timestamp` timestamp default current_timestamp, + `t_ftimestamp` timestamp(6) default current_timestamp(6), + + `c_char` char(255) not null default '', + `c_varchar` varchar(255) not null default '', + `c_binary` binary(64) not null default '', + `c_varbinary` varbinary(64) not null default '', + `c_tinyblob` tinyblob, + `c_blob` blob, + `c_mediumblob` mediumblob, + `c_longblob` longblob, + `c_tinytext` tinytext, + `c_text` text, + `c_mediumtext` mediumtext, + `c_longtext` longtext, + + `e_enum` enum('a','b') default 'a', + `s_set` set('1','2') default '1', + `g_geometry` geometry DEFAULT NULL, + `j_json` json DEFAULT NULL + ); + insert into _types values (); + */ + colNames := []string{ + "b_bit", + "n_boolean", + "n_tinyint", + "n_smallint", + "n_mediumint", + "n_int", + "n_bigint", + "n_decimal", + "n_float", + "n_double", + "nu_tinyint", + "nu_smallint", + "nu_mediumint", + "nu_int", + "nu_bigint", + "nu_decimal", + "nu_float", + "nu_double", + "t_year", + "t_date", + "t_time", + "t_ftime", + "t_datetime", + "t_fdatetime", + "t_timestamp", + "t_ftimestamp", + "c_char", + "c_varchar", + "c_binary", + "c_varbinary", + "c_tinyblob", + "c_blob", + "c_mediumblob", + "c_longblob", + "c_tinytext", + "c_text", + "c_mediumtext", + "c_longtext", + "e_enum", + "s_set", + "g_geometry", + "j_json", + } + enumVals := [][]string{{"a", "b"}} + setVals := [][]string{{"1", "2"}} + + testcases := []struct { + data []byte + hasNames bool + }{ + // mysql 5.7 + {data: []byte("u\x00\x00\x00\x00\x00\x01\x00\x04test\x00\x06_types\x00*\x10\x01\x01\x02\t\x03\b\xf6\x04\x05\x01\x02\t\x03\b\xf6\x04\x05\r\n\x13\x13\x12\x12\x11\x11\xfe\x0f\xfe\x0f\xfc\xfc\xfc\xfc\xfc\xfc\xfc\xfc\xfe\xfe\xff\xf5&\x00\bA\x1e\x04\bA\x1e\x04\b\x00\x06\x00\x06\x00\x06\xce\xfc\xfc\x03\xfe@@\x00\x01\x02\x03\x04\x01\x02\x03\x04\xf7\x01\xf8\x01\x04\x04\x00\x00\xfc\xc0\xff\x03")}, + // mysql 8.0 + {data: []byte("j\x00\x00\x00\x00\x00\x01\x00\x04test\x00\x06_types\x00*\x10\x01\x01\x02\t\x03\b\xf6\x04\x05\x01\x02\t\x03\b\xf6\x04\x05\r\n\x13\x13\x12\x12\x11\x11\xfe\x0f\xfe\x0f\xfc\xfc\xfc\xfc\xfc\xfc\xfc\xfc\xfe\xfe\xff\xf5&\x00\bA\x1e\x04\bA\x1e\x04\b\x00\x06\x00\x06\x00\x06\xce\xfc\xfc\x03\xfe@@\x00\x01\x02\x03\x04\x01\x02\x03\x04\xf7\x01\xf8\x01\x04\x04\x00\x00\xfc\xc3\xff\x03\x01\x03\x00\u007f\x80\x03\f\xe0\xe0??????\xe0\xe0\xe0\xe0\a\x01\x00\x04\xfc\x94\x01\x05b_bit\tn_boolean\tn_tinyint\nn_smallint\vn_mediumint\x05n_int\bn_bigint\tn_decimal\an_float\bn_double\nnu_tinyint\vnu_smallint\fnu_mediumint\x06nu_int\tnu_bigint\nnu_decimal\bnu_float\tnu_double\x06t_year\x06t_date\x06t_time\at_ftime\nt_datetime\vt_fdatetime\vt_timestamp\ft_ftimestamp\x06c_char\tc_varchar\bc_binary\vc_varbinary\nc_tinyblob\x06c_blob\fc_mediumblob\nc_longblob\nc_tinytext\x06c_text\fc_mediumtext\nc_longtext\x06e_enum\x05s_set\ng_geometry\x06j_json\n\x01\xe0\x05\x05\x02\x011\x012\x06\x05\x02\x01a\x01b"), hasNames: true}, + // mariadb 10.4 + {data: []byte("\x1b\x00\x00\x00\x00\x00\x01\x00\x04test\x00\x06_types\x00*\x10\x01\x01\x02\t\x03\b\xf6\x04\x05\x01\x02\t\x03\b\xf6\x04\x05\r\n\x13\x13\x12\x12\x11\x11\xfe\x0f\xfe\x0f\xfc\xfc\xfc\xfc\xfc\xfc\xfc\xfc\xfe\xfe\xff\xfc&\x00\bA\x1e\x04\bA\x1e\x04\b\x00\x06\x00\x06\x00\x06\xce\xfc\xfc\x03\xfe@@\x00\x01\x02\x03\x04\x01\x02\x03\x04\xf7\x01\xf8\x01\x04\x04\x00\x00\xfc\xc0\xff\x03")}, + // mariadb 10.5 + {data: []byte("\x1a\x00\x00\x00\x00\x00\x01\x00\x04test\x00\x06_types\x00*\x10\x01\x01\x02\t\x03\b\xf6\x04\x05\x01\x02\t\x03\b\xf6\x04\x05\r\n\x13\x13\x12\x12\x11\x11\xfe\x0f\xfe\x0f\xfc\xfc\xfc\xfc\xfc\xfc\xfc\xfc\xfe\xfe\xff\xfc&\x00\bA\x1e\x04\bA\x1e\x04\b\x00\x06\x00\x06\x00\x06\xce\xfc\xfc\x03\xfe@@\x00\x01\x02\x03\x04\x01\x02\x03\x04\xf7\x01\xf8\x01\x04\x04\x00\x00\xfc\xc0\xff\x03\x01\x03\x00\u007f\xc0\x03\x0e\xe0\xe0??????\xe0\xe0\xe0\xe0?.\a\x01\x00\x04\xfc\x94\x01\x05b_bit\tn_boolean\tn_tinyint\nn_smallint\vn_mediumint\x05n_int\bn_bigint\tn_decimal\an_float\bn_double\nnu_tinyint\vnu_smallint\fnu_mediumint\x06nu_int\tnu_bigint\nnu_decimal\bnu_float\tnu_double\x06t_year\x06t_date\x06t_time\at_ftime\nt_datetime\vt_fdatetime\vt_timestamp\ft_ftimestamp\x06c_char\tc_varchar\bc_binary\vc_varbinary\nc_tinyblob\x06c_blob\fc_mediumblob\nc_longblob\nc_tinytext\x06c_text\fc_mediumtext\nc_longtext\x06e_enum\x05s_set\ng_geometry\x06j_json\n\x01\xe0\x05\x05\x02\x011\x012\x06\x05\x02\x01a\x01b"), hasNames: true}, + } + + for _, tc := range testcases { + + tableMapEvent := new(TableMapEvent) + tableMapEvent.tableIDSize = 6 + err := tableMapEvent.Decode(tc.data) + c.Assert(err, IsNil) + + if tc.hasNames { + c.Assert(tableMapEvent.ColumnNameString(), DeepEquals, colNames) + c.Assert(tableMapEvent.SetStrValueString(), DeepEquals, setVals) + c.Assert(tableMapEvent.EnumStrValueString(), DeepEquals, enumVals) + } else { + c.Assert(tableMapEvent.ColumnNameString(), DeepEquals, []string(nil)) + c.Assert(tableMapEvent.SetStrValueString(), DeepEquals, [][]string(nil)) + c.Assert(tableMapEvent.EnumStrValueString(), DeepEquals, [][]string(nil)) + } + } + +} + +func (_ *testDecodeSuite) TestTableMapOptMetaPrimaryKey(c *C) { + /* + create table _prim (id2 int, col varchar(30), id1 bigint, primary key (id1, id2)); + */ + case1PrimaryKey := []uint64{2, 0} + case1PrimaryKeyPrefix := []uint64{0, 0} + + /* + create table _prim2 (col1 int, id1 char(10), col2 int, id2 varchar(20), primary key (id1, id2(10))); + */ + case2PrimaryKey := []uint64{1, 3} + case2PrimaryKeyPrefix := []uint64{0, 10} + + testcases := []struct { + data []byte + expectedPrimaryKey []uint64 + expectedPrimaryKeyPrefix []uint64 + }{ + { + // mysql 5.7, case1 + data: []byte("w\x00\x00\x00\x00\x00\x01\x00\x04test\x00\x05_prim\x00\x03\x03\x0f\b\x02x\x00\x02"), + expectedPrimaryKey: []uint64(nil), + expectedPrimaryKeyPrefix: []uint64(nil), + }, + { + // mysql 8.0, case1 + data: []byte("l\x00\x00\x00\x00\x00\x01\x00\x04test\x00\x05_prim\x00\x03\x03\x0f\b\x02x\x00\x02\x01\x01\x00\x02\x01\xe0\x04\f\x03id2\x03col\x03id1\b\x02\x02\x00"), + expectedPrimaryKey: case1PrimaryKey, + expectedPrimaryKeyPrefix: case1PrimaryKeyPrefix, + }, + { + // mariadb 10.4, case1 + data: []byte("\x1c\x00\x00\x00\x00\x00\x01\x00\x04test\x00\x05_prim\x00\x03\x03\x0f\b\x02x\x00\x02"), + expectedPrimaryKey: []uint64(nil), + expectedPrimaryKeyPrefix: []uint64(nil), + }, + { + // mariadb 10.5, case1 + data: []byte("\x1b\x00\x00\x00\x00\x00\x01\x00\x04test\x00\x05_prim\x00\x03\x03\x0f\b\x02x\x00\x02\x01\x01\x00\x02\x01\xe0\x04\f\x03id2\x03col\x03id1\b\x02\x02\x00"), + expectedPrimaryKey: case1PrimaryKey, + expectedPrimaryKeyPrefix: case1PrimaryKeyPrefix, + }, + { + // mysql 5.7, case2 + data: []byte("y\x00\x00\x00\x00\x00\x01\x00\x04test\x00\x06_prim2\x00\x04\x03\xfe\x03\x0f\x04\xfe(P\x00\x05"), + expectedPrimaryKey: []uint64(nil), + expectedPrimaryKeyPrefix: []uint64(nil), + }, + { + // mysql 8.0, case2 + data: []byte("m\x00\x00\x00\x00\x00\x01\x00\x04test\x00\x06_prim2\x00\x04\x03\xfe\x03\x0f\x04\xfe(P\x00\x05\x01\x01\x00\x02\x01\xe0\x04\x12\x04col1\x03id1\x04col2\x03id2\t\x04\x01\x00\x03\n"), + expectedPrimaryKey: case2PrimaryKey, + expectedPrimaryKeyPrefix: case2PrimaryKeyPrefix, + }, + { + // mariadb 10.4, case2 + data: []byte("\x1d\x00\x00\x00\x00\x00\x01\x00\x04test\x00\x06_prim2\x00\x04\x03\xfe\x03\x0f\x04\xfe(P\x00\x05"), + expectedPrimaryKey: []uint64(nil), + expectedPrimaryKeyPrefix: []uint64(nil), + }, + { + // mariadb 10.5, case2 + data: []byte("\x1c\x00\x00\x00\x00\x00\x01\x00\x04test\x00\x06_prim2\x00\x04\x03\xfe\x03\x0f\x04\xfe(P\x00\x05\x01\x01\x00\x02\x01\xe0\x04\x12\x04col1\x03id1\x04col2\x03id2\t\x04\x01\x00\x03\n"), + expectedPrimaryKey: case2PrimaryKey, + expectedPrimaryKeyPrefix: case2PrimaryKeyPrefix, + }, + } + + for _, tc := range testcases { + + tableMapEvent := new(TableMapEvent) + tableMapEvent.tableIDSize = 6 + err := tableMapEvent.Decode(tc.data) + c.Assert(err, IsNil) + c.Assert(tableMapEvent.PrimaryKey, DeepEquals, tc.expectedPrimaryKey) + c.Assert(tableMapEvent.PrimaryKeyPrefix, DeepEquals, tc.expectedPrimaryKeyPrefix) + } + +} From 76267855ceb526a8ed8c7ac0d9c794f8ee2574c1 Mon Sep 17 00:00:00 2001 From: jayven Date: Tue, 10 Mar 2020 09:00:24 +0800 Subject: [PATCH 19/21] Remove type.go --- replication/type.go | 40 ---------------------------------------- 1 file changed, 40 deletions(-) delete mode 100644 replication/type.go diff --git a/replication/type.go b/replication/type.go deleted file mode 100644 index d3a5b790b..000000000 --- a/replication/type.go +++ /dev/null @@ -1,40 +0,0 @@ -package replication - -import ( - . "github.com/siddontang/go-mysql/mysql" -) - -// IsNumericType returns true if the given type is numeric type. From: sql/log_event.cc and sql/field.h -func IsNumericType(typ byte) bool { - switch typ { - case MYSQL_TYPE_TINY, - MYSQL_TYPE_SHORT, - MYSQL_TYPE_INT24, - MYSQL_TYPE_LONG, - MYSQL_TYPE_LONGLONG, - MYSQL_TYPE_FLOAT, - MYSQL_TYPE_DOUBLE, - MYSQL_TYPE_DECIMAL, - MYSQL_TYPE_NEWDECIMAL: - return true - - default: - return false - } - -} - -// IsCharacterType returns true if the given type is character type. From: sql/log_event.cc -func IsCharacterType(typ byte) bool { - switch typ { - case MYSQL_TYPE_STRING, - MYSQL_TYPE_VAR_STRING, - MYSQL_TYPE_VARCHAR, - MYSQL_TYPE_BLOB: - return true - - default: - return false - } - -} From b758aa5d741b9ebdc6cf76b82fe62a6d14bb257f Mon Sep 17 00:00:00 2001 From: jayven Date: Tue, 10 Mar 2020 09:24:22 +0800 Subject: [PATCH 20/21] Add some comments about new fields in GTIDEvent.. --- replication/event.go | 38 ++++++++++++++++++++------------------ 1 file changed, 20 insertions(+), 18 deletions(-) diff --git a/replication/event.go b/replication/event.go index 979de05ae..e5457c8cb 100644 --- a/replication/event.go +++ b/replication/event.go @@ -226,31 +226,31 @@ type PreviousGTIDsEvent struct { func (e *PreviousGTIDsEvent) Decode(data []byte) error { var previousGTIDSets []string pos := 0 - uuidCount := binary.LittleEndian.Uint16(data[pos:pos+8]) + uuidCount := binary.LittleEndian.Uint16(data[pos : pos+8]) pos += 8 - for i := uint16(0);i < uuidCount; i++ { - uuid := e.decodeUuid(data[pos:pos+16]) + for i := uint16(0); i < uuidCount; i++ { + uuid := e.decodeUuid(data[pos : pos+16]) pos += 16 - sliceCount := binary.LittleEndian.Uint16(data[pos:pos+8]) + sliceCount := binary.LittleEndian.Uint16(data[pos : pos+8]) pos += 8 var intervals []string - for i := uint16(0);i < sliceCount; i++ { - start := e.decodeInterval(data[pos:pos+8]) + for i := uint16(0); i < sliceCount; i++ { + start := e.decodeInterval(data[pos : pos+8]) pos += 8 - stop := e.decodeInterval(data[pos:pos+8]) + stop := e.decodeInterval(data[pos : pos+8]) pos += 8 interval := "" if stop == start+1 { - interval = fmt.Sprintf("%d",start) - }else { - interval = fmt.Sprintf("%d-%d",start,stop-1) + interval = fmt.Sprintf("%d", start) + } else { + interval = fmt.Sprintf("%d-%d", start, stop-1) } - intervals = append(intervals,interval) + intervals = append(intervals, interval) } - previousGTIDSets = append(previousGTIDSets,fmt.Sprintf("%s:%s",uuid,strings.Join(intervals,":"))) + previousGTIDSets = append(previousGTIDSets, fmt.Sprintf("%s:%s", uuid, strings.Join(intervals, ":"))) } - e.GTIDSets = fmt.Sprintf("%s",strings.Join(previousGTIDSets,",")) + e.GTIDSets = fmt.Sprintf("%s", strings.Join(previousGTIDSets, ",")) return nil } @@ -260,8 +260,8 @@ func (e *PreviousGTIDsEvent) Dump(w io.Writer) { } func (e *PreviousGTIDsEvent) decodeUuid(data []byte) string { - return fmt.Sprintf("%s-%s-%s-%s-%s",hex.EncodeToString(data[0:4]),hex.EncodeToString(data[4:6]), - hex.EncodeToString(data[6:8]),hex.EncodeToString(data[8:10]),hex.EncodeToString(data[10:])) + return fmt.Sprintf("%s-%s-%s-%s-%s", hex.EncodeToString(data[0:4]), hex.EncodeToString(data[4:6]), + hex.EncodeToString(data[6:8]), hex.EncodeToString(data[8:10]), hex.EncodeToString(data[10:])) } func (e *PreviousGTIDsEvent) decodeInterval(data []byte) uint64 { @@ -351,15 +351,17 @@ type GTIDEvent struct { LastCommitted int64 SequenceNumber int64 - // The followings are available only after MySQL-8.0 - + // ImmediateCommitTimestamp/OriginalCommitTimestamp are introduced in MySQL-8.0.1, see: + // https://mysqlhighavailability.com/replication-features-in-mysql-8-0-1/ ImmediateCommitTimestamp uint64 OriginalCommitTimestamp uint64 - // Total transaction length (including this GTIDEvent), see: + // Total transaction length (including this GTIDEvent), introduced in MySQL-8.0.2, see: // https://mysqlhighavailability.com/taking-advantage-of-new-transaction-length-metadata/ TransactionLength uint64 + // ImmediateServerVersion/OriginalServerVersion are introduced in MySQL-8.0.14, see + // https://dev.mysql.com/doc/refman/8.0/en/replication-compatibility.html ImmediateServerVersion uint32 OriginalServerVersion uint32 } From d9d4b74ac3411180688d55cdb93a019223a812aa Mon Sep 17 00:00:00 2001 From: jayven Date: Tue, 10 Mar 2020 12:24:11 +0800 Subject: [PATCH 21/21] Add more test for GTIDEvent and TableMapEvent --- replication/event_test.go | 51 +++++++++++++++++++++++++++++++++++ replication/row_event.go | 2 +- replication/row_event_test.go | 31 ++++++++++++++++++++- 3 files changed, 82 insertions(+), 2 deletions(-) diff --git a/replication/event_test.go b/replication/event_test.go index c410018bb..fd0642817 100644 --- a/replication/event_test.go +++ b/replication/event_test.go @@ -48,3 +48,54 @@ func (_ *testDecodeSuite) TestMariadbGTIDEvent(c *C) { c.Assert(ev.IsGroupCommit(), Equals, true) c.Assert(ev.CommitID, Equals, uint64(0x1716151413121110)) } + +func (_ *testDecodeSuite) TestGTIDEventMysql8NewFields(c *C) { + + testcases := []struct { + data []byte + expectImmediateCommitTimestamp uint64 + expectOriginalCommitTimestamp uint64 + expectTransactoinLength uint64 + expectImmediateServerVersion uint32 + expectOriginalServerVersion uint32 + }{ + { + // master: mysql-5.7, slave: mysql-8.0 + data: []byte("\x00Z\xa7*\u007fD\xa8\x11\xea\x94\u007f\x02B\xac\x19\x00\x02\x02\x01\x00\x00\x00\x00\x00\x00\x02v\x00\x00\x00\x00\x00\x00\x00w\x00\x00\x00\x00\x00\x00\x00\xc1G\x81\x16x\xa0\x85\x00\x00\x00\x00\x00\x00\x00\xfc\xc5\x03\x938\x01\x80\x00\x00\x00\x00"), + expectImmediateCommitTimestamp: 1583812517644225, + expectOriginalCommitTimestamp: 0, + expectTransactoinLength: 965, + expectImmediateServerVersion: 80019, + expectOriginalServerVersion: 0, + }, + { + // mysql-5.7 only + data: []byte("\x00Z\xa7*\u007fD\xa8\x11\xea\x94\u007f\x02B\xac\x19\x00\x02\x03\x01\x00\x00\x00\x00\x00\x00\x025\x00\x00\x00\x00\x00\x00\x006\x00\x00\x00\x00\x00\x00\x00"), + expectImmediateCommitTimestamp: 0, + expectOriginalCommitTimestamp: 0, + expectTransactoinLength: 0, + expectImmediateServerVersion: 0, + expectOriginalServerVersion: 0, + }, + { + // mysql-8.0 only + data: []byte("\x00\\\xcc\x103D\xa8\x11\xea\xbdY\x02B\xac\x19\x00\x03w\x00\x00\x00\x00\x00\x00\x00\x02x\x00\x00\x00\x00\x00\x00\x00y\x00\x00\x00\x00\x00\x00\x00j0\xb1>x\xa0\x05\xfc\xc3\x03\x938\x01\x00"), + expectImmediateCommitTimestamp: 1583813191872618, + expectOriginalCommitTimestamp: 1583813191872618, + expectTransactoinLength: 963, + expectImmediateServerVersion: 80019, + expectOriginalServerVersion: 80019, + }, + } + + for _, tc := range testcases { + ev := new(GTIDEvent) + err := ev.Decode(tc.data) + c.Assert(err, IsNil) + c.Assert(ev.ImmediateCommitTimestamp, Equals, tc.expectImmediateCommitTimestamp) + c.Assert(ev.OriginalCommitTimestamp, Equals, tc.expectOriginalCommitTimestamp) + c.Assert(ev.ImmediateServerVersion, Equals, tc.expectImmediateServerVersion) + c.Assert(ev.OriginalServerVersion, Equals, tc.expectOriginalServerVersion) + } + +} diff --git a/replication/row_event.go b/replication/row_event.go index ceddcc5a2..6427c89cc 100644 --- a/replication/row_event.go +++ b/replication/row_event.go @@ -65,7 +65,7 @@ type TableMapEvent struct { // ColumnName list all column names. ColumnName [][]byte - columnNameString []string + columnNameString []string // the same as ColumnName in string type, just for reuse // GeometryType stores real type for geometry columns. GeometryType []uint64 diff --git a/replication/row_event_test.go b/replication/row_event_test.go index d608915c5..4b624df50 100644 --- a/replication/row_event_test.go +++ b/replication/row_event_test.go @@ -764,6 +764,36 @@ func (_ *testDecodeSuite) TestDecodeDatetime2(c *C) { } } +func (_ *testDecodeSuite) TestTableMapNullable(c *C) { + /* + create table _null (c1 int null, c2 int not null default '2', c3 timestamp default now(), c4 text); + */ + nullables := []bool{true, false, false, true} + testcases := [][]byte{ + // mysql 5.7 + []byte("z\x00\x00\x00\x00\x00\x01\x00\x04test\x00\x05_null\x00\x04\x03\x03\x11\xfc\x02\x00\x02\t"), + // mysql 8.0 + []byte("z\x00\x00\x00\x00\x00\x01\x00\x04test\x00\x05_null\x00\x04\x03\x03\x11\xfc\x02\x00\x02\t\x01\x01\x00\x02\x01\xe0\x04\f\x02c1\x02c2\x02c3\x02c4"), + // mariadb 10.4 + []byte("\x1e\x00\x00\x00\x00\x00\x01\x00\x04test\x00\x05_null\x00\x04\x03\x03\x11\xfc\x02\x00\x02\t"), + // mariadb 10.5 + []byte("\x1d\x00\x00\x00\x00\x00\x01\x00\x04test\x00\x05_null\x00\x04\x03\x03\x11\xfc\x02\x00\x02\t\x01\x01\x00\x02\x01\xe0\x04\f\x02c1\x02c2\x02c3\x02c4"), + } + + for _, tc := range testcases { + tableMapEvent := new(TableMapEvent) + tableMapEvent.tableIDSize = 6 + err := tableMapEvent.Decode(tc) + c.Assert(err, IsNil) + c.Assert(int(tableMapEvent.ColumnCount), Equals, len(nullables)) + for i := 0; i < int(tableMapEvent.ColumnCount); i++ { + available, nullable := tableMapEvent.Nullable(i) + c.Assert(available, Equals, true) + c.Assert(nullable, Equals, nullables[i]) + } + } +} + func (_ *testDecodeSuite) TestTableMapOptMetaNames(c *C) { /* CREATE TABLE `_types` ( @@ -967,7 +997,6 @@ func (_ *testDecodeSuite) TestTableMapOptMetaPrimaryKey(c *C) { } for _, tc := range testcases { - tableMapEvent := new(TableMapEvent) tableMapEvent.tableIDSize = 6 err := tableMapEvent.Decode(tc.data)