Skip to content

fix(canal): fix mysql unsigned medium int #399

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Jul 11, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion canal/canal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,16 @@ func (s *canalTestSuite) SetUpSuite(c *C) {
id int AUTO_INCREMENT,
content blob DEFAULT NULL,
name varchar(100),
mi mediumint(8) NOT NULL DEFAULT 0,
umi mediumint(8) unsigned NOT NULL DEFAULT 0,
PRIMARY KEY(id)
)ENGINE=innodb;
`

s.execute(c, sql)

s.execute(c, "DELETE FROM test.canal_test")
s.execute(c, "INSERT INTO test.canal_test (content, name) VALUES (?, ?), (?, ?), (?, ?)", "1", "a", `\0\ndsfasdf`, "b", "", "c")
s.execute(c, "INSERT INTO test.canal_test (content, name, mi, umi) VALUES (?, ?, ?, ?), (?, ?, ?, ?), (?, ?, ?, ?)", "1", "a", 0, 0, `\0\ndsfasdf`, "b", 1, 16777215, "", "c", -1, 1)

s.execute(c, "SET GLOBAL binlog_format = 'ROW'")

Expand Down Expand Up @@ -96,6 +98,10 @@ type testEventHandler struct {

func (h *testEventHandler) OnRow(e *RowsEvent) error {
log.Infof("OnRow %s %v\n", e.Action, e.Rows)
umi, ok := e.Rows[0][4].(uint32) // 4th col is umi. mysqldump gives uint64 instead of uint32
if ok && (umi != 0 && umi != 1 && umi != 16777215) {
return fmt.Errorf("invalid unsigned medium int %d", umi)
}
return nil
}

Expand All @@ -113,6 +119,7 @@ func (s *canalTestSuite) TestCanal(c *C) {
for i := 1; i < 10; i++ {
s.execute(c, "INSERT INTO test.canal_test (name) VALUES (?)", fmt.Sprintf("%d", i))
}
s.execute(c, "INSERT INTO test.canal_test (mi,umi) VALUES (?,?), (?,?), (?,?)", 0, 0, -1, 16777215, 1, 1)
s.execute(c, "ALTER TABLE test.canal_test ADD `age` INT(5) NOT NULL AFTER `name`")
s.execute(c, "INSERT INTO test.canal_test (name,age) VALUES (?,?)", "d", "18")

Expand Down
2 changes: 1 addition & 1 deletion canal/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (h *dumpParseHandler) Data(db string, table string, values []string) error
} else if v == "_binary ''" {
vs[i] = []byte{}
} else if v[0] != '\'' {
if tableInfo.Columns[i].Type == schema.TYPE_NUMBER {
if tableInfo.Columns[i].Type == schema.TYPE_NUMBER || tableInfo.Columns[i].Type == schema.TYPE_MEDIUM_INT {
n, err := strconv.ParseInt(v, 10, 64)
if err != nil {
return fmt.Errorf("parse row %v at %d error %v, int expected", values, i, err)
Expand Down
16 changes: 14 additions & 2 deletions canal/rows.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package canal

import (
"fmt"

"github.com/siddontang/go-mysql/replication"
"github.com/siddontang/go-mysql/schema"
)
Expand Down Expand Up @@ -41,6 +40,8 @@ func newRowsEvent(table *schema.Table, action string, rows [][]interface{}, head
return e
}

const maxMediumintUnsigned int32 = 16777215

func (r *RowsEvent) handleUnsigned() {
// Handle Unsigned Columns here, for binlog replication, we can't know the integer is unsigned or not,
// so we use int type but this may cause overflow outside sometimes, so we must convert to the really .
Expand All @@ -57,7 +58,18 @@ func (r *RowsEvent) handleUnsigned() {
case int16:
r.Rows[i][index] = uint16(t)
case int32:
r.Rows[i][index] = uint32(t)
if r.Table.Columns[i].Type == schema.TYPE_MEDIUM_INT {
// problem with mediumint is that it's a 3-byte type. There is no compatible golang type to match that.
// So to convert from negative to positive we'd need to convert the value manually
if i >= 0 {
r.Rows[i][index] = uint32(t)
} else {
r.Rows[i][index] = uint32(maxMediumintUnsigned + t + 1)
}
return
} else {
r.Rows[i][index] = uint32(t)
}
case int64:
r.Rows[i][index] = uint64(t)
case int:
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs
github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg=
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/pingcap/check v0.0.0-20190102082844-67f458068fc8 h1:USx2/E1bX46VG32FIw034Au6seQ2fY9NEILmNh/UlQg=
github.com/pingcap/check v0.0.0-20190102082844-67f458068fc8/go.mod h1:B1+S9LNcuMyLH/4HMTViQOJevkGiik3wW2AN9zb2fNQ=
github.com/pingcap/errors v0.11.0 h1:DCJQB8jrHbQ1VVlMFIrbj2ApScNNotVmkSNplu2yUt4=
github.com/pingcap/errors v0.11.0/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8=
Expand Down
5 changes: 4 additions & 1 deletion schema/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ var HAHealthCheckSchema = "mysql.ha_health_check"

// Different column type
const (
TYPE_NUMBER = iota + 1 // tinyint, smallint, mediumint, int, bigint, year
TYPE_NUMBER = iota + 1 // tinyint, smallint, int, bigint, year
TYPE_FLOAT // float, double
TYPE_ENUM // enum
TYPE_SET // set
Expand All @@ -31,6 +31,7 @@ const (
TYPE_BIT // bit
TYPE_JSON // json
TYPE_DECIMAL // decimal
TYPE_MEDIUM_INT
)

type TableColumn struct {
Expand Down Expand Up @@ -105,6 +106,8 @@ func (ta *Table) AddColumn(name string, columnType string, collation string, ext
ta.Columns[index].Type = TYPE_BIT
} else if strings.HasPrefix(columnType, "json") {
ta.Columns[index].Type = TYPE_JSON
} else if strings.Contains(columnType, "mediumint") {
ta.Columns[index].Type = TYPE_MEDIUM_INT
} else if strings.Contains(columnType, "int") || strings.HasPrefix(columnType, "year") {
ta.Columns[index].Type = TYPE_NUMBER
} else {
Expand Down