Skip to content

Add some TableMapEvent helper methods #482

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jun 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions replication/binlogsyncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ func NewBinlogSyncer(cfg BinlogSyncerConfig) *BinlogSyncer {

b.cfg = cfg
b.parser = NewBinlogParser()
b.parser.SetFlavor(cfg.Flavor)
b.parser.SetRawMode(b.cfg.RawModeEnabled)
b.parser.SetParseTime(b.cfg.ParseTime)
b.parser.SetTimestampStringLocation(b.cfg.TimestampStringLocation)
Expand Down
11 changes: 10 additions & 1 deletion replication/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ var (
)

type BinlogParser struct {
// "mysql" or "mariadb", if not set, use "mysql" by default
flavor string

format *FormatDescriptionEvent

tables map[uint64]*TableMapEvent
Expand Down Expand Up @@ -200,6 +203,10 @@ func (p *BinlogParser) SetVerifyChecksum(verify bool) {
p.verifyChecksum = verify
}

func (p *BinlogParser) SetFlavor(flavor string) {
p.flavor = flavor
}

func (p *BinlogParser) parseHeader(data []byte) (*EventHeader, error) {
h := new(EventHeader)
err := h.Decode(data)
Expand Down Expand Up @@ -234,7 +241,9 @@ func (p *BinlogParser) parseEvent(h *EventHeader, data []byte, rawData []byte) (
case XID_EVENT:
e = &XIDEvent{}
case TABLE_MAP_EVENT:
te := &TableMapEvent{}
te := &TableMapEvent{
flavor: p.flavor,
}
if p.format.EventTypeHeaderLengths[TABLE_MAP_EVENT-1] == 6 {
te.tableIDSize = 4
} else {
Expand Down
324 changes: 324 additions & 0 deletions replication/row_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
var errMissingTableMapEvent = errors.New("invalid table id, no corresponding table map event")

type TableMapEvent struct {
flavor string
tableIDSize int

TableID uint64
Expand Down Expand Up @@ -423,6 +424,116 @@ func (e *TableMapEvent) Dump(w io.Writer) {
fmt.Fprintf(w, "Enum/set default charset: %v\n", e.EnumSetDefaultCharset)
fmt.Fprintf(w, "Enum/set column charset: %v\n", e.EnumSetColumnCharset)

unsignedMap := e.UnsignedMap()
fmt.Fprintf(w, "UnsignedMap: %#v\n", unsignedMap)

collationMap := e.CollationMap()
fmt.Fprintf(w, "CollationMap: %#v\n", collationMap)

enumSetCollationMap := e.EnumSetCollationMap()
fmt.Fprintf(w, "EnumSetCollationMap: %#v\n", enumSetCollationMap)

enumStrValueMap := e.EnumStrValueMap()
fmt.Fprintf(w, "EnumStrValueMap: %#v\n", enumStrValueMap)

setStrValueMap := e.SetStrValueMap()
fmt.Fprintf(w, "SetStrValueMap: %#v\n", setStrValueMap)

geometryTypeMap := e.GeometryTypeMap()
fmt.Fprintf(w, "GeometryTypeMap: %#v\n", geometryTypeMap)

nameMaxLen := 0
for _, name := range e.ColumnName {
if len(name) > nameMaxLen {
nameMaxLen = len(name)
}
}
nameFmt := " %s"
if nameMaxLen > 0 {
nameFmt = fmt.Sprintf(" %%-%ds", nameMaxLen)
}

primaryKey := map[int]struct{}{}
for _, pk := range e.PrimaryKey {
primaryKey[int(pk)] = struct{}{}
}

fmt.Fprintf(w, "Columns: \n")
for i := 0; i < int(e.ColumnCount); i++ {
if len(e.ColumnName) == 0 {
fmt.Fprintf(w, nameFmt, "<n/a>")
} else {
fmt.Fprintf(w, nameFmt, e.ColumnName[i])
}

fmt.Fprintf(w, " type=%-3d", e.realType(i))

if e.IsNumericColumn(i) {
if len(unsignedMap) == 0 {
fmt.Fprintf(w, " unsigned=<n/a>")
} else if unsignedMap[i] {
fmt.Fprintf(w, " unsigned=yes")
} else {
fmt.Fprintf(w, " unsigned=no ")
}
}
if e.IsCharacterColumn(i) {
if len(collationMap) == 0 {
fmt.Fprintf(w, " collation=<n/a>")
} else {
fmt.Fprintf(w, " collation=%d ", collationMap[i])
}
}
if e.IsEnumColumn(i) {
if len(enumSetCollationMap) == 0 {
fmt.Fprintf(w, " enum_collation=<n/a>")
} else {
fmt.Fprintf(w, " enum_collation=%d", enumSetCollationMap[i])
}

if len(enumStrValueMap) == 0 {
fmt.Fprintf(w, " enum=<n/a>")
} else {
fmt.Fprintf(w, " enum=%v", enumStrValueMap[i])
}
}
if e.IsSetColumn(i) {
if len(enumSetCollationMap) == 0 {
fmt.Fprintf(w, " set_collation=<n/a>")
} else {
fmt.Fprintf(w, " set_collation=%d", enumSetCollationMap[i])
}

if len(setStrValueMap) == 0 {
fmt.Fprintf(w, " set=<n/a>")
} else {
fmt.Fprintf(w, " set=%v", setStrValueMap[i])
}
}
if e.IsGeometryColumn(i) {
if len(geometryTypeMap) == 0 {
fmt.Fprintf(w, " geometry_type=<n/a>")
} else {
fmt.Fprintf(w, " geometry_type=%v", geometryTypeMap[i])
}
}

available, nullable := e.Nullable(i)
if !available {
fmt.Fprintf(w, " null=<n/a>")
} else if nullable {
fmt.Fprintf(w, " null=yes")
} else {
fmt.Fprintf(w, " null=no ")
}

if _, ok := primaryKey[i]; ok {
fmt.Fprintf(w, " pri")
}

fmt.Fprintf(w, "\n")
}

fmt.Fprintln(w)
}

Expand Down Expand Up @@ -492,6 +603,219 @@ func (e *TableMapEvent) bytesSlice2StrSlice(src [][]byte) []string {
return ret
}

// UnsignedMap returns a map: column index -> unsigned.
// Note that only numeric columns will be returned.
// nil is returned if not available or no numeric columns at all.
func (e *TableMapEvent) UnsignedMap() map[int]bool {
if len(e.SignednessBitmap) == 0 {
return nil
}
p := 0
ret := make(map[int]bool)
for i := 0; i < int(e.ColumnCount); i++ {
if !e.IsNumericColumn(i) {
continue
}
ret[i] = e.SignednessBitmap[p/8]&(1<<uint(7-p%8)) != 0
p++
}
return ret
}

// CollationMap returns a map: column index -> collation id.
// Note that only character columns will be returned.
// nil is returned if not available or no character columns at all.
func (e *TableMapEvent) CollationMap() map[int]uint64 {
return e.collationMap(e.IsCharacterColumn, e.DefaultCharset, e.ColumnCharset)
}

// EnumSetCollationMap returns a map: column index -> collation id.
// Note that only enum or set columns will be returned.
// nil is returned if not available or no enum/set columns at all.
func (e *TableMapEvent) EnumSetCollationMap() map[int]uint64 {
return e.collationMap(e.IsEnumOrSetColumn, e.EnumSetDefaultCharset, e.EnumSetColumnCharset)
}

func (e *TableMapEvent) collationMap(includeType func(int) bool, defaultCharset, columnCharset []uint64) map[int]uint64 {

if len(defaultCharset) != 0 {
defaultCollation := defaultCharset[0]

// character column index -> collation
collations := make(map[int]uint64)
for i := 1; i < len(defaultCharset); i += 2 {
collations[int(defaultCharset[i])] = defaultCharset[i+1]
}

p := 0
ret := make(map[int]uint64)
for i := 0; i < int(e.ColumnCount); i++ {
if !includeType(i) {
continue
}

if collation, ok := collations[p]; ok {
ret[i] = collation
} else {
ret[i] = defaultCollation
}
p++
}

return ret
}

if len(columnCharset) != 0 {

p := 0
ret := make(map[int]uint64)
for i := 0; i < int(e.ColumnCount); i++ {
if !includeType(i) {
continue
}

ret[i] = columnCharset[p]
p++
}

return ret
}

return nil
}

// EnumStrValueMap returns a map: column index -> enum string value.
// Note that only enum columns will be returned.
// nil is returned if not available or no enum columns at all.
func (e *TableMapEvent) EnumStrValueMap() map[int][]string {
return e.strValueMap(e.IsEnumColumn, e.EnumStrValueString())
}

// SetStrValueMap returns a map: column index -> set string value.
// Note that only set columns will be returned.
// nil is returned if not available or no set columns at all.
func (e *TableMapEvent) SetStrValueMap() map[int][]string {
return e.strValueMap(e.IsSetColumn, e.SetStrValueString())
}

func (e *TableMapEvent) strValueMap(includeType func(int) bool, strValue [][]string) map[int][]string {
if len(strValue) == 0 {
return nil
}
p := 0
ret := make(map[int][]string)
for i := 0; i < int(e.ColumnCount); i++ {
if !includeType(i) {
continue
}
ret[i] = strValue[p]
p++
}
return ret
}

// GeometryTypeMap returns a map: column index -> geometry type.
// Note that only geometry columns will be returned.
// nil is returned if not available or no geometry columns at all.
func (e *TableMapEvent) GeometryTypeMap() map[int]uint64 {
if len(e.GeometryType) == 0 {
return nil
}
p := 0
ret := make(map[int]uint64)
for i := 0; i < int(e.ColumnCount); i++ {
if !e.IsGeometryColumn(i) {
continue
}

ret[i] = e.GeometryType[p]
p++
}
return ret
}

// Below realType and IsXXXColumn are base from:
// table_def::type in sql/rpl_utility.h
// Table_map_log_event::print_columns in mysql-8.0/sql/log_event.cc and mariadb-10.5/sql/log_event_client.cc

func (e *TableMapEvent) realType(i int) byte {

typ := e.ColumnType[i]

switch typ {
case MYSQL_TYPE_STRING:
rtyp := byte(e.ColumnMeta[i] >> 8)
if rtyp == MYSQL_TYPE_ENUM || rtyp == MYSQL_TYPE_SET {
return rtyp
}

case MYSQL_TYPE_DATE:
return MYSQL_TYPE_NEWDATE
}

return typ
}

func (e *TableMapEvent) IsNumericColumn(i int) bool {

switch e.realType(i) {
case MYSQL_TYPE_TINY,
MYSQL_TYPE_SHORT,
MYSQL_TYPE_INT24,
MYSQL_TYPE_LONG,
MYSQL_TYPE_LONGLONG,
MYSQL_TYPE_NEWDECIMAL,
MYSQL_TYPE_FLOAT,
MYSQL_TYPE_DOUBLE:
return true

default:
return false
}

}

// IsCharacterColumn returns true if the column type is considered as character type.
// Note that JSON/GEOMETRY types are treated as character type in mariadb.
// (JSON is an alias for LONGTEXT in mariadb: https://mariadb.com/kb/en/json-data-type/)
func (e *TableMapEvent) IsCharacterColumn(i int) bool {

switch e.realType(i) {
case MYSQL_TYPE_STRING,
MYSQL_TYPE_VAR_STRING,
MYSQL_TYPE_VARCHAR,
MYSQL_TYPE_BLOB:
return true

case MYSQL_TYPE_GEOMETRY:
if e.flavor == "mariadb" {
return true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about highlight this special behavior as the comments of this func (IsCharacterColumn)?

}
return false

default:
return false
}

}

func (e *TableMapEvent) IsEnumColumn(i int) bool {
return e.realType(i) == MYSQL_TYPE_ENUM
}

func (e *TableMapEvent) IsSetColumn(i int) bool {
return e.realType(i) == MYSQL_TYPE_SET
}

func (e *TableMapEvent) IsGeometryColumn(i int) bool {
return e.realType(i) == MYSQL_TYPE_GEOMETRY
}

func (e *TableMapEvent) IsEnumOrSetColumn(i int) bool {
rtyp := e.realType(i)
return rtyp == MYSQL_TYPE_ENUM || rtyp == MYSQL_TYPE_SET
}

// RowsEventStmtEndFlag is set in the end of the statement.
const RowsEventStmtEndFlag = 0x01

Expand Down
Loading