diff --git a/notes/field_type.md b/notes/field_type.md new file mode 100644 index 000000000..6ceb1b42f --- /dev/null +++ b/notes/field_type.md @@ -0,0 +1,87 @@ + +There are several `types` for each field (column) type, such as `type()`/`real_type()/binlog_type()` ... + +By default, `real_type()` and `binlog_type()` are the same as `type()`: + +``` + // From mysql-8.0/sql/field.h Field + + // ... + virtual enum_field_types real_type() const { return type(); } + virtual enum_field_types binlog_type() const { + /* + Binlog stores field->type() as type code by default. + This puts MYSQL_TYPE_STRING in case of CHAR, VARCHAR, SET and ENUM, + with extra data type details put into metadata. + + We cannot store field->type() in case of temporal types with + fractional seconds: TIME(n), DATETIME(n) and TIMESTAMP(n), + because binlog records with MYSQL_TYPE_TIME, MYSQL_TYPE_DATETIME + type codes do not have metadata. + So for temporal data types with fractional seconds we'll store + real_type() type codes instead, i.e. + MYSQL_TYPE_TIME2, MYSQL_TYPE_DATETIME2, MYSQL_TYPE_TIMESTAMP2, + and put precision into metatada. + + Note: perhaps binlog should eventually be modified to store + real_type() instead of type() for all column types. + */ + return type(); + } + // ... +``` + +Here is a list collected from `mysql-8.0/sql/field.h`: + +``` ++------------------------------------------------------------+-------------------------------+-----------------------+------------------------+ +| Field | type() | real_type() | binlog_type() | ++------------------------------------------------------------+-------------------------------+-----------------------+------------------------+ +| | | | | +| Field (abstract) | | | | +| | | | | | +| +--Field_bit | MYSQL_TYPE_BIT | | | +| | +--Field_bit_as_char | | | | +| | | | | | +| +--Field_num (abstract) | | | | +| | | +--Field_real (abstract) | | | | +| | | +--Field_decimal | MYSQL_TYPE_DECIMAL | | | +| | | +--Field_float | MYSQL_TYPE_FLOAT | | | +| | | +--Field_double | MYSQL_TYPE_DOUBLE | | | +| | | | | | | +| | +--Field_new_decimal | MYSQL_TYPE_NEWDECIMAL | | | +| | +--Field_short | MYSQL_TYPE_SHORT | | | +| | +--Field_medium | MYSQL_TYPE_INT24 | | | +| | +--Field_long | MYSQL_TYPE_LONG | | | +| | +--Field_longlong | MYSQL_TYPE_LONGLONG | | | +| | +--Field_tiny | MYSQL_TYPE_TINY | | | +| | +--Field_year | MYSQL_TYPE_YEAR | | | +| | | | | | +| +--Field_str (abstract) | | | | +| | +--Field_longstr | | | | +| | | +--Field_string | MYSQL_TYPE_STRING | MYSQL_TYPE_STRING | | +| | | +--Field_varstring | MYSQL_TYPE_VARCHAR | MYSQL_TYPE_VARCHAR | | +| | | +--Field_blob | MYSQL_TYPE_BLOB | | | +| | | +--Field_geom | MYSQL_TYPE_GEOMETRY | | | +| | | +--Field_json | MYSQL_TYPE_JSON | | | +| | | +--Field_typed_array | real_type_to_type(m_elt_type) | m_elt_type | MYSQL_TYPE_TYPED_ARRAY | +| | | | | | | +| | +--Field_null | MYSQL_TYPE_NULL | | | +| | +--Field_enum | MYSQL_TYPE_STRING | MYSQL_TYPE_ENUM | | +| | +--Field_set | | MYSQL_TYPE_SET | | +| | | | | | +| +--Field_temporal (abstract) | | | | +| +--Field_time_common (abstract) | | | | +| | +--Field_time | MYSQL_TYPE_TIME | | | +| | +--Field_timef | MYSQL_TYPE_TIME | MYSQL_TYPE_TIME2 | MYSQL_TYPE_TIME2 | +| | | | | | +| +--Field_temporal_with_date (abstract) | | | | +| +--Field_newdate | MYSQL_TYPE_DATE | MYSQL_TYPE_NEWDATE | | +| +--Field_temporal_with_date_and_time (abstract) | | | | +| +--Field_timestamp | MYSQL_TYPE_TIMESTAMP | | | +| +--Field_datetime | MYSQL_TYPE_DATETIME | | | +| +--Field_temporal_with_date_and_timef (abstract) | | | | +| +--Field_timestampf | MYSQL_TYPE_TIMESTAMP | MYSQL_TYPE_TIMESTAMP2 | MYSQL_TYPE_TIMESTAMP2 | +| +--Field_datetimef | MYSQL_TYPE_DATETIME | MYSQL_TYPE_DATETIME2 | MYSQL_TYPE_DATETIME2 | ++------------------------------------------------------------+-------------------------------+-----------------------+------------------------+ +``` diff --git a/replication/const.go b/replication/const.go index 3420b5d63..6230257fc 100644 --- a/replication/const.go +++ b/replication/const.go @@ -188,7 +188,7 @@ func (e EventType) String() string { return "ViewChangeEvent" case XA_PREPARE_LOG_EVENT: return "XAPrepareLogEvent" - + default: return "UnknownEvent" } @@ -202,3 +202,18 @@ const ( BINLOG_CHECKSUM_ALG_UNDEF byte = 255 // special value to tag undetermined yet checksum // or events from checksum-unaware servers ) + +// These are TABLE_MAP_EVENT's optional metadata field type, from: libbinlogevents/include/rows_event.h +const ( + TABLE_MAP_OPT_META_SIGNEDNESS byte = iota + 1 + TABLE_MAP_OPT_META_DEFAULT_CHARSET + TABLE_MAP_OPT_META_COLUMN_CHARSET + TABLE_MAP_OPT_META_COLUMN_NAME + TABLE_MAP_OPT_META_SET_STR_VALUE + TABLE_MAP_OPT_META_ENUM_STR_VALUE + TABLE_MAP_OPT_META_GEOMETRY_TYPE + TABLE_MAP_OPT_META_SIMPLE_PRIMARY_KEY + TABLE_MAP_OPT_META_PRIMARY_KEY_WITH_PREFIX + TABLE_MAP_OPT_META_ENUM_AND_SET_DEFAULT_CHARSET + TABLE_MAP_OPT_META_ENUM_AND_SET_COLUMN_CHARSET +) diff --git a/replication/event.go b/replication/event.go index 5c44140ab..e5457c8cb 100644 --- a/replication/event.go +++ b/replication/event.go @@ -21,6 +21,7 @@ const ( LogicalTimestampTypeCode = 2 PartLogicalTimestampLength = 8 BinlogChecksumLength = 4 + UndefinedServerVer = 999999 // UNDEFINED_SERVER_VERSION ) type BinlogEvent struct { @@ -225,31 +226,31 @@ type PreviousGTIDsEvent struct { func (e *PreviousGTIDsEvent) Decode(data []byte) error { var previousGTIDSets []string pos := 0 - uuidCount := binary.LittleEndian.Uint16(data[pos:pos+8]) + uuidCount := binary.LittleEndian.Uint16(data[pos : pos+8]) pos += 8 - for i := uint16(0);i < uuidCount; i++ { - uuid := e.decodeUuid(data[pos:pos+16]) + for i := uint16(0); i < uuidCount; i++ { + uuid := e.decodeUuid(data[pos : pos+16]) pos += 16 - sliceCount := binary.LittleEndian.Uint16(data[pos:pos+8]) + sliceCount := binary.LittleEndian.Uint16(data[pos : pos+8]) pos += 8 var intervals []string - for i := uint16(0);i < sliceCount; i++ { - start := e.decodeInterval(data[pos:pos+8]) + for i := uint16(0); i < sliceCount; i++ { + start := e.decodeInterval(data[pos : pos+8]) pos += 8 - stop := e.decodeInterval(data[pos:pos+8]) + stop := e.decodeInterval(data[pos : pos+8]) pos += 8 interval := "" if stop == start+1 { - interval = fmt.Sprintf("%d",start) - }else { - interval = fmt.Sprintf("%d-%d",start,stop-1) + interval = fmt.Sprintf("%d", start) + } else { + interval = fmt.Sprintf("%d-%d", start, stop-1) } - intervals = append(intervals,interval) + intervals = append(intervals, interval) } - previousGTIDSets = append(previousGTIDSets,fmt.Sprintf("%s:%s",uuid,strings.Join(intervals,":"))) + previousGTIDSets = append(previousGTIDSets, fmt.Sprintf("%s:%s", uuid, strings.Join(intervals, ":"))) } - e.GTIDSets = fmt.Sprintf("%s",strings.Join(previousGTIDSets,",")) + e.GTIDSets = fmt.Sprintf("%s", strings.Join(previousGTIDSets, ",")) return nil } @@ -259,8 +260,8 @@ func (e *PreviousGTIDsEvent) Dump(w io.Writer) { } func (e *PreviousGTIDsEvent) decodeUuid(data []byte) string { - return fmt.Sprintf("%s-%s-%s-%s-%s",hex.EncodeToString(data[0:4]),hex.EncodeToString(data[4:6]), - hex.EncodeToString(data[6:8]),hex.EncodeToString(data[8:10]),hex.EncodeToString(data[10:])) + return fmt.Sprintf("%s-%s-%s-%s-%s", hex.EncodeToString(data[0:4]), hex.EncodeToString(data[4:6]), + hex.EncodeToString(data[6:8]), hex.EncodeToString(data[8:10]), hex.EncodeToString(data[10:])) } func (e *PreviousGTIDsEvent) decodeInterval(data []byte) uint64 { @@ -349,6 +350,20 @@ type GTIDEvent struct { GNO int64 LastCommitted int64 SequenceNumber int64 + + // ImmediateCommitTimestamp/OriginalCommitTimestamp are introduced in MySQL-8.0.1, see: + // https://mysqlhighavailability.com/replication-features-in-mysql-8-0-1/ + ImmediateCommitTimestamp uint64 + OriginalCommitTimestamp uint64 + + // Total transaction length (including this GTIDEvent), introduced in MySQL-8.0.2, see: + // https://mysqlhighavailability.com/taking-advantage-of-new-transaction-length-metadata/ + TransactionLength uint64 + + // ImmediateServerVersion/OriginalServerVersion are introduced in MySQL-8.0.14, see + // https://dev.mysql.com/doc/refman/8.0/en/replication-compatibility.html + ImmediateServerVersion uint32 + OriginalServerVersion uint32 } func (e *GTIDEvent) Decode(data []byte) error { @@ -359,26 +374,99 @@ func (e *GTIDEvent) Decode(data []byte) error { pos += SidLength e.GNO = int64(binary.LittleEndian.Uint64(data[pos:])) pos += 8 + if len(data) >= 42 { if uint8(data[pos]) == LogicalTimestampTypeCode { pos++ e.LastCommitted = int64(binary.LittleEndian.Uint64(data[pos:])) pos += PartLogicalTimestampLength e.SequenceNumber = int64(binary.LittleEndian.Uint64(data[pos:])) + pos += 8 + + // IMMEDIATE_COMMIT_TIMESTAMP_LENGTH = 7 + if len(data)-pos < 7 { + return nil + } + e.ImmediateCommitTimestamp = FixedLengthInt(data[pos : pos+7]) + pos += 7 + if (e.ImmediateCommitTimestamp & (uint64(1) << 55)) != 0 { + // If the most significant bit set, another 7 byte follows representing OriginalCommitTimestamp + e.ImmediateCommitTimestamp &= ^(uint64(1) << 55) + e.OriginalCommitTimestamp = FixedLengthInt(data[pos : pos+7]) + pos += 7 + + } else { + // Otherwise OriginalCommitTimestamp == ImmediateCommitTimestamp + e.OriginalCommitTimestamp = e.ImmediateCommitTimestamp + + } + + // TRANSACTION_LENGTH_MIN_LENGTH = 1 + if len(data)-pos < 1 { + return nil + } + var n int + e.TransactionLength, _, n = LengthEncodedInt(data[pos:]) + pos += n + + // IMMEDIATE_SERVER_VERSION_LENGTH = 4 + e.ImmediateServerVersion = UndefinedServerVer + e.OriginalServerVersion = UndefinedServerVer + if len(data)-pos < 4 { + return nil + } + e.ImmediateServerVersion = binary.LittleEndian.Uint32(data[pos:]) + pos += 4 + if (e.ImmediateServerVersion & (uint32(1) << 31)) != 0 { + // If the most significant bit set, another 4 byte follows representing OriginalServerVersion + e.ImmediateServerVersion &= ^(uint32(1) << 31) + e.OriginalServerVersion = binary.LittleEndian.Uint32(data[pos:]) + pos += 4 + + } else { + // Otherwise OriginalServerVersion == ImmediateServerVersion + e.OriginalServerVersion = e.ImmediateServerVersion + + } + } } return nil } func (e *GTIDEvent) Dump(w io.Writer) { + fmtTime := func(t time.Time) string { + if t.IsZero() { + return "" + } + return t.Format(time.RFC3339Nano) + } + fmt.Fprintf(w, "Commit flag: %d\n", e.CommitFlag) u, _ := uuid.FromBytes(e.SID) fmt.Fprintf(w, "GTID_NEXT: %s:%d\n", u.String(), e.GNO) fmt.Fprintf(w, "LAST_COMMITTED: %d\n", e.LastCommitted) fmt.Fprintf(w, "SEQUENCE_NUMBER: %d\n", e.SequenceNumber) + fmt.Fprintf(w, "Immediate commmit timestamp: %d (%s)\n", e.ImmediateCommitTimestamp, fmtTime(e.ImmediateCommitTime())) + fmt.Fprintf(w, "Orignal commmit timestamp: %d (%s)\n", e.OriginalCommitTimestamp, fmtTime(e.OriginalCommitTime())) + fmt.Fprintf(w, "Transaction length: %d\n", e.TransactionLength) + fmt.Fprintf(w, "Immediate server version: %d\n", e.ImmediateServerVersion) + fmt.Fprintf(w, "Orignal server version: %d\n", e.OriginalServerVersion) fmt.Fprintln(w) } +// ImmediateCommitTime returns the commit time of this trx on the immediate server +// or zero time if not available. +func (e *GTIDEvent) ImmediateCommitTime() time.Time { + return microSecTimestampToTime(e.ImmediateCommitTimestamp) +} + +// OriginalCommitTime returns the commit time of this trx on the original server +// or zero time if not available. +func (e *GTIDEvent) OriginalCommitTime() time.Time { + return microSecTimestampToTime(e.OriginalCommitTimestamp) +} + type BeginLoadQueryEvent struct { FileID uint32 BlockData []byte diff --git a/replication/event_test.go b/replication/event_test.go index c410018bb..fd0642817 100644 --- a/replication/event_test.go +++ b/replication/event_test.go @@ -48,3 +48,54 @@ func (_ *testDecodeSuite) TestMariadbGTIDEvent(c *C) { c.Assert(ev.IsGroupCommit(), Equals, true) c.Assert(ev.CommitID, Equals, uint64(0x1716151413121110)) } + +func (_ *testDecodeSuite) TestGTIDEventMysql8NewFields(c *C) { + + testcases := []struct { + data []byte + expectImmediateCommitTimestamp uint64 + expectOriginalCommitTimestamp uint64 + expectTransactoinLength uint64 + expectImmediateServerVersion uint32 + expectOriginalServerVersion uint32 + }{ + { + // master: mysql-5.7, slave: mysql-8.0 + data: []byte("\x00Z\xa7*\u007fD\xa8\x11\xea\x94\u007f\x02B\xac\x19\x00\x02\x02\x01\x00\x00\x00\x00\x00\x00\x02v\x00\x00\x00\x00\x00\x00\x00w\x00\x00\x00\x00\x00\x00\x00\xc1G\x81\x16x\xa0\x85\x00\x00\x00\x00\x00\x00\x00\xfc\xc5\x03\x938\x01\x80\x00\x00\x00\x00"), + expectImmediateCommitTimestamp: 1583812517644225, + expectOriginalCommitTimestamp: 0, + expectTransactoinLength: 965, + expectImmediateServerVersion: 80019, + expectOriginalServerVersion: 0, + }, + { + // mysql-5.7 only + data: []byte("\x00Z\xa7*\u007fD\xa8\x11\xea\x94\u007f\x02B\xac\x19\x00\x02\x03\x01\x00\x00\x00\x00\x00\x00\x025\x00\x00\x00\x00\x00\x00\x006\x00\x00\x00\x00\x00\x00\x00"), + expectImmediateCommitTimestamp: 0, + expectOriginalCommitTimestamp: 0, + expectTransactoinLength: 0, + expectImmediateServerVersion: 0, + expectOriginalServerVersion: 0, + }, + { + // mysql-8.0 only + data: []byte("\x00\\\xcc\x103D\xa8\x11\xea\xbdY\x02B\xac\x19\x00\x03w\x00\x00\x00\x00\x00\x00\x00\x02x\x00\x00\x00\x00\x00\x00\x00y\x00\x00\x00\x00\x00\x00\x00j0\xb1>x\xa0\x05\xfc\xc3\x03\x938\x01\x00"), + expectImmediateCommitTimestamp: 1583813191872618, + expectOriginalCommitTimestamp: 1583813191872618, + expectTransactoinLength: 963, + expectImmediateServerVersion: 80019, + expectOriginalServerVersion: 80019, + }, + } + + for _, tc := range testcases { + ev := new(GTIDEvent) + err := ev.Decode(tc.data) + c.Assert(err, IsNil) + c.Assert(ev.ImmediateCommitTimestamp, Equals, tc.expectImmediateCommitTimestamp) + c.Assert(ev.OriginalCommitTimestamp, Equals, tc.expectOriginalCommitTimestamp) + c.Assert(ev.ImmediateServerVersion, Equals, tc.expectImmediateServerVersion) + c.Assert(ev.OriginalServerVersion, Equals, tc.expectOriginalServerVersion) + } + +} diff --git a/replication/row_event.go b/replication/row_event.go index 0c8f41a60..6427c89cc 100644 --- a/replication/row_event.go +++ b/replication/row_event.go @@ -34,6 +34,52 @@ type TableMapEvent struct { //len = (ColumnCount + 7) / 8 NullBitmap []byte + + /* + The followings are available only after MySQL-8.0.1 or MariaDB-10.5.0 + see: + - https://dev.mysql.com/doc/refman/8.0/en/replication-options-binary-log.html#sysvar_binlog_row_metadata + - https://mysqlhighavailability.com/more-metadata-is-written-into-binary-log/ + - https://jira.mariadb.org/browse/MDEV-20477 + */ + + // SignednessBitmap stores signedness info for numeric columns. + SignednessBitmap []byte + + // DefaultCharset/ColumnCharset stores collation info for character columns. + + // DefaultCharset[0] is the default collation of character columns. + // For character columns that have different charset, + // (character column index, column collation) pairs follows + DefaultCharset []uint64 + // ColumnCharset contains collation sequence for all character columns + ColumnCharset []uint64 + + // SetStrValue stores values for set columns. + SetStrValue [][][]byte + setStrValueString [][]string + + // EnumStrValue stores values for enum columns. + EnumStrValue [][][]byte + enumStrValueString [][]string + + // ColumnName list all column names. + ColumnName [][]byte + columnNameString []string // the same as ColumnName in string type, just for reuse + + // GeometryType stores real type for geometry columns. + GeometryType []uint64 + + // PrimaryKey is a sequence of column indexes of primary key. + PrimaryKey []uint64 + + // PrimaryKeyPrefix is the prefix length used for each column of primary key. + // 0 means that the whole column length is used. + PrimaryKeyPrefix []uint64 + + // EnumSetDefaultCharset/EnumSetColumnCharset is similar to DefaultCharset/ColumnCharset but for enum/set columns. + EnumSetDefaultCharset []uint64 + EnumSetColumnCharset []uint64 } func (e *TableMapEvent) Decode(data []byte) error { @@ -88,7 +134,11 @@ func (e *TableMapEvent) Decode(data []byte) error { e.NullBitmap = data[pos : pos+nullBitmapSize] - // TODO: handle optional field meta + pos += nullBitmapSize + + if err = e.decodeOptionalMeta(data[pos:]); err != nil { + return err + } return nil } @@ -186,6 +236,171 @@ func (e *TableMapEvent) decodeMeta(data []byte) error { return nil } +func (e *TableMapEvent) decodeOptionalMeta(data []byte) (err error) { + + pos := 0 + for pos < len(data) { + + // optional metadata fields are stored in Type, Length, Value(TLV) format + // Type takes 1 byte. Length is a packed integer value. Values takes Length bytes + t := data[pos] + pos++ + + l, _, n := LengthEncodedInt(data[pos:]) + pos += n + + v := data[pos : pos+int(l)] + pos += int(l) + + switch t { + case TABLE_MAP_OPT_META_SIGNEDNESS: + e.SignednessBitmap = v + + case TABLE_MAP_OPT_META_DEFAULT_CHARSET: + e.DefaultCharset, err = e.decodeDefaultCharset(v) + if err != nil { + return err + } + + case TABLE_MAP_OPT_META_COLUMN_CHARSET: + e.ColumnCharset, err = e.decodeIntSeq(v) + if err != nil { + return err + } + + case TABLE_MAP_OPT_META_COLUMN_NAME: + if err = e.decodeColumnNames(v); err != nil { + return err + } + + case TABLE_MAP_OPT_META_SET_STR_VALUE: + e.SetStrValue, err = e.decodeStrValue(v) + if err != nil { + return err + } + + case TABLE_MAP_OPT_META_ENUM_STR_VALUE: + e.EnumStrValue, err = e.decodeStrValue(v) + if err != nil { + return err + } + + case TABLE_MAP_OPT_META_GEOMETRY_TYPE: + e.GeometryType, err = e.decodeIntSeq(v) + if err != nil { + return err + } + + case TABLE_MAP_OPT_META_SIMPLE_PRIMARY_KEY: + if err = e.decodeSimplePrimaryKey(v); err != nil { + return err + } + + case TABLE_MAP_OPT_META_PRIMARY_KEY_WITH_PREFIX: + if err = e.decodePrimaryKeyWithPrefix(v); err != nil { + return err + } + + case TABLE_MAP_OPT_META_ENUM_AND_SET_DEFAULT_CHARSET: + e.EnumSetDefaultCharset, err = e.decodeDefaultCharset(v) + if err != nil { + return err + } + + case TABLE_MAP_OPT_META_ENUM_AND_SET_COLUMN_CHARSET: + e.EnumSetColumnCharset, err = e.decodeIntSeq(v) + if err != nil { + return err + } + + default: + // Ignore for future extension + } + } + + return nil +} + +func (e *TableMapEvent) decodeIntSeq(v []byte) (ret []uint64, err error) { + p := 0 + for p < len(v) { + i, _, n := LengthEncodedInt(v[p:]) + p += n + ret = append(ret, i) + } + return +} + +func (e *TableMapEvent) decodeDefaultCharset(v []byte) (ret []uint64, err error) { + ret, err = e.decodeIntSeq(v) + if err != nil { + return + } + if len(ret)%2 != 1 { + return nil, errors.Errorf("Expect odd item in DefaultCharset but got %d", len(ret)) + } + return +} + +func (e *TableMapEvent) decodeColumnNames(v []byte) error { + p := 0 + e.ColumnName = make([][]byte, 0, e.ColumnCount) + for p < len(v) { + n := int(v[p]) + p++ + e.ColumnName = append(e.ColumnName, v[p:p+n]) + p += n + } + + if len(e.ColumnName) != int(e.ColumnCount) { + return errors.Errorf("Expect %d column names but got %d", e.ColumnCount, len(e.ColumnName)) + } + return nil +} + +func (e *TableMapEvent) decodeStrValue(v []byte) (ret [][][]byte, err error) { + p := 0 + for p < len(v) { + nVal, _, n := LengthEncodedInt(v[p:]) + p += n + vals := make([][]byte, 0, int(nVal)) + for i := 0; i < int(nVal); i++ { + val, _, n, err := LengthEncodedString(v[p:]) + if err != nil { + return nil, err + } + p += n + vals = append(vals, val) + } + ret = append(ret, vals) + } + return +} + +func (e *TableMapEvent) decodeSimplePrimaryKey(v []byte) error { + p := 0 + for p < len(v) { + i, _, n := LengthEncodedInt(v[p:]) + e.PrimaryKey = append(e.PrimaryKey, i) + e.PrimaryKeyPrefix = append(e.PrimaryKeyPrefix, 0) + p += n + } + return nil +} + +func (e *TableMapEvent) decodePrimaryKeyWithPrefix(v []byte) error { + p := 0 + for p < len(v) { + i, _, n := LengthEncodedInt(v[p:]) + e.PrimaryKey = append(e.PrimaryKey, i) + p += n + i, _, n = LengthEncodedInt(v[p:]) + e.PrimaryKeyPrefix = append(e.PrimaryKeyPrefix, i) + p += n + } + return nil +} + func (e *TableMapEvent) Dump(w io.Writer) { fmt.Fprintf(w, "TableID: %d\n", e.TableID) fmt.Fprintf(w, "TableID size: %d\n", e.tableIDSize) @@ -195,9 +410,88 @@ func (e *TableMapEvent) Dump(w io.Writer) { fmt.Fprintf(w, "Column count: %d\n", e.ColumnCount) fmt.Fprintf(w, "Column type: \n%s", hex.Dump(e.ColumnType)) fmt.Fprintf(w, "NULL bitmap: \n%s", hex.Dump(e.NullBitmap)) + + fmt.Fprintf(w, "Signedness bitmap: \n%s", hex.Dump(e.SignednessBitmap)) + fmt.Fprintf(w, "Default charset: %v\n", e.DefaultCharset) + fmt.Fprintf(w, "Column charset: %v\n", e.ColumnCharset) + fmt.Fprintf(w, "Set str value: %v\n", e.SetStrValueString()) + fmt.Fprintf(w, "Enum str value: %v\n", e.EnumStrValueString()) + fmt.Fprintf(w, "Column name: %v\n", e.ColumnNameString()) + fmt.Fprintf(w, "Geometry type: %v\n", e.GeometryType) + fmt.Fprintf(w, "Primary key: %v\n", e.PrimaryKey) + fmt.Fprintf(w, "Primary key prefix: %v\n", e.PrimaryKeyPrefix) + fmt.Fprintf(w, "Enum/set default charset: %v\n", e.EnumSetDefaultCharset) + fmt.Fprintf(w, "Enum/set column charset: %v\n", e.EnumSetColumnCharset) + fmt.Fprintln(w) } +// Nullable returns the nullablity of the i-th column. +// If null bits are not available, available is false. +// i must be in range [0, ColumnCount). +func (e *TableMapEvent) Nullable(i int) (available, nullable bool) { + if len(e.NullBitmap) == 0 { + return + } + return true, e.NullBitmap[i/8]&(1<