From 00609a45758936d9cd2efea3941faec8c79185aa Mon Sep 17 00:00:00 2001 From: renhongdi Date: Mon, 1 Jul 2019 12:09:04 +0800 Subject: [PATCH 1/6] fix(canal): fix mysql unsigned medium int --- canal/rows.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/canal/rows.go b/canal/rows.go index e246ee5a2..f7a045084 100644 --- a/canal/rows.go +++ b/canal/rows.go @@ -2,6 +2,7 @@ package canal import ( "fmt" + "strings" "github.com/siddontang/go-mysql/replication" "github.com/siddontang/go-mysql/schema" @@ -57,7 +58,14 @@ func (r *RowsEvent) handleUnsigned() { case int16: r.Rows[i][index] = uint16(t) case int32: - r.Rows[i][index] = uint32(t) + if strings.Contains(strings.ToLower(r.Table.Columns[i].RawType), "mediumint") { + b0 := byte(t & 0xFF) + b1 := byte(t >> 8) + b2 := byte(t >> 16) + r.Rows[i][index] = uint32(uint32(b0) | uint32(b1)<<8 | uint32(b2)<<16) + } else { + r.Rows[i][index] = uint32(t) + } case int64: r.Rows[i][index] = uint64(t) case int: From 7a090e849f9038c2ffff5abf97a7a18401b83597 Mon Sep 17 00:00:00 2001 From: renhongdi Date: Mon, 8 Jul 2019 14:44:14 +0800 Subject: [PATCH 2/6] fix(canal): port solution from gh-ost and add test --- canal/canal_test.go | 10 +++++++++- canal/rows.go | 14 ++++++++++---- 2 files changed, 19 insertions(+), 5 deletions(-) diff --git a/canal/canal_test.go b/canal/canal_test.go index 2bd5f6c41..b7b778d86 100755 --- a/canal/canal_test.go +++ b/canal/canal_test.go @@ -53,6 +53,8 @@ func (s *canalTestSuite) SetUpSuite(c *C) { id int AUTO_INCREMENT, content blob DEFAULT NULL, name varchar(100), + mi mediumint(8) NOT NULL DEFAULT 0, + umi mediumint(8) unsigned NOT NULL DEFAULT 0, PRIMARY KEY(id) )ENGINE=innodb; ` @@ -60,7 +62,7 @@ func (s *canalTestSuite) SetUpSuite(c *C) { s.execute(c, sql) s.execute(c, "DELETE FROM test.canal_test") - s.execute(c, "INSERT INTO test.canal_test (content, name) VALUES (?, ?), (?, ?), (?, ?)", "1", "a", `\0\ndsfasdf`, "b", "", "c") + s.execute(c, "INSERT INTO test.canal_test (content, name, mi, umi) VALUES (?, ?, ?, ?), (?, ?, ?, ?), (?, ?, ?, ?)", "1", "a", 0, 0, `\0\ndsfasdf`, "b", 1, 16777215, "", "c", -1, 1) s.execute(c, "SET GLOBAL binlog_format = 'ROW'") @@ -96,6 +98,9 @@ type testEventHandler struct { func (h *testEventHandler) OnRow(e *RowsEvent) error { log.Infof("OnRow %s %v\n", e.Action, e.Rows) + if e.Rows[0][4].(uint32) < 0 { + return fmt.Errorf("invalid unsigned medium int %v", e.Rows[0][4]) + } return nil } @@ -113,6 +118,9 @@ func (s *canalTestSuite) TestCanal(c *C) { for i := 1; i < 10; i++ { s.execute(c, "INSERT INTO test.canal_test (name) VALUES (?)", fmt.Sprintf("%d", i)) } + s.execute(c, "INSERT INTO test.canal_test (mi,umi) VALUES (?,?)", 0, 0) + s.execute(c, "INSERT INTO test.canal_test (mi,umi) VALUES (?,?)", -1, 16777215) + s.execute(c, "INSERT INTO test.canal_test (mi,umi) VALUES (?,?)", 1, 1) s.execute(c, "ALTER TABLE test.canal_test ADD `age` INT(5) NOT NULL AFTER `name`") s.execute(c, "INSERT INTO test.canal_test (name,age) VALUES (?,?)", "d", "18") diff --git a/canal/rows.go b/canal/rows.go index f7a045084..fba1aaa0a 100644 --- a/canal/rows.go +++ b/canal/rows.go @@ -42,6 +42,8 @@ func newRowsEvent(table *schema.Table, action string, rows [][]interface{}, head return e } +const maxMediumintUnsigned int32 = 16777215 + func (r *RowsEvent) handleUnsigned() { // Handle Unsigned Columns here, for binlog replication, we can't know the integer is unsigned or not, // so we use int type but this may cause overflow outside sometimes, so we must convert to the really . @@ -59,10 +61,14 @@ func (r *RowsEvent) handleUnsigned() { r.Rows[i][index] = uint16(t) case int32: if strings.Contains(strings.ToLower(r.Table.Columns[i].RawType), "mediumint") { - b0 := byte(t & 0xFF) - b1 := byte(t >> 8) - b2 := byte(t >> 16) - r.Rows[i][index] = uint32(uint32(b0) | uint32(b1)<<8 | uint32(b2)<<16) + // problem with mediumint is that it's a 3-byte type. There is no compatible golang type to match that. + // So to convert from negative to positive we'd need to convert the value manually + if i >= 0 { + r.Rows[i][index] = uint32(t) + } else { + r.Rows[i][index] = uint32(maxMediumintUnsigned + t + 1) + } + return } else { r.Rows[i][index] = uint32(t) } From 7648b71b3cdb01c0fda7a791488e44eda3ce4962 Mon Sep 17 00:00:00 2001 From: renhongdi Date: Mon, 8 Jul 2019 14:53:32 +0800 Subject: [PATCH 3/6] fix test --- canal/canal_test.go | 9 ++++----- go.sum | 1 + 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/canal/canal_test.go b/canal/canal_test.go index b7b778d86..356cae0cf 100755 --- a/canal/canal_test.go +++ b/canal/canal_test.go @@ -98,8 +98,9 @@ type testEventHandler struct { func (h *testEventHandler) OnRow(e *RowsEvent) error { log.Infof("OnRow %s %v\n", e.Action, e.Rows) - if e.Rows[0][4].(uint32) < 0 { - return fmt.Errorf("invalid unsigned medium int %v", e.Rows[0][4]) + umi := e.Rows[0][4].(uint32) // 4th col is umi + if umi != 0 && umi != 1 && umi != 16777215 { + return fmt.Errorf("invalid unsigned medium int %d", umi) } return nil } @@ -118,9 +119,7 @@ func (s *canalTestSuite) TestCanal(c *C) { for i := 1; i < 10; i++ { s.execute(c, "INSERT INTO test.canal_test (name) VALUES (?)", fmt.Sprintf("%d", i)) } - s.execute(c, "INSERT INTO test.canal_test (mi,umi) VALUES (?,?)", 0, 0) - s.execute(c, "INSERT INTO test.canal_test (mi,umi) VALUES (?,?)", -1, 16777215) - s.execute(c, "INSERT INTO test.canal_test (mi,umi) VALUES (?,?)", 1, 1) + s.execute(c, "INSERT INTO test.canal_test (mi,umi) VALUES (?,?), (?,?), (?,?)", 0, 0, -1, 16777215, 1, 1) s.execute(c, "ALTER TABLE test.canal_test ADD `age` INT(5) NOT NULL AFTER `name`") s.execute(c, "INSERT INTO test.canal_test (name,age) VALUES (?,?)", "d", "18") diff --git a/go.sum b/go.sum index 9c11a7ab9..c9dd16fbd 100644 --- a/go.sum +++ b/go.sum @@ -4,6 +4,7 @@ github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= +github.com/pingcap/check v0.0.0-20190102082844-67f458068fc8 h1:USx2/E1bX46VG32FIw034Au6seQ2fY9NEILmNh/UlQg= github.com/pingcap/check v0.0.0-20190102082844-67f458068fc8/go.mod h1:B1+S9LNcuMyLH/4HMTViQOJevkGiik3wW2AN9zb2fNQ= github.com/pingcap/errors v0.11.0 h1:DCJQB8jrHbQ1VVlMFIrbj2ApScNNotVmkSNplu2yUt4= github.com/pingcap/errors v0.11.0/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8= From fdb765858695f827cb9f2015a4ca0ece91d502f4 Mon Sep 17 00:00:00 2001 From: renhongdi Date: Mon, 8 Jul 2019 14:58:45 +0800 Subject: [PATCH 4/6] fix test --- canal/canal_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/canal/canal_test.go b/canal/canal_test.go index 356cae0cf..edd83fc02 100755 --- a/canal/canal_test.go +++ b/canal/canal_test.go @@ -98,8 +98,8 @@ type testEventHandler struct { func (h *testEventHandler) OnRow(e *RowsEvent) error { log.Infof("OnRow %s %v\n", e.Action, e.Rows) - umi := e.Rows[0][4].(uint32) // 4th col is umi - if umi != 0 && umi != 1 && umi != 16777215 { + umi, ok := e.Rows[0][4].(uint32) // 4th col is umi. mysqldump gives uint64 instead of uint32 + if ok && (umi != 0 && umi != 1 && umi != 16777215) { return fmt.Errorf("invalid unsigned medium int %d", umi) } return nil From 1b5a71413e0e5981c4165f11db535fd759bddb99 Mon Sep 17 00:00:00 2001 From: renhongdi Date: Mon, 8 Jul 2019 19:23:20 +0800 Subject: [PATCH 5/6] add medium int column type --- canal/rows.go | 4 +--- schema/schema.go | 5 ++++- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/canal/rows.go b/canal/rows.go index fba1aaa0a..f464acc25 100644 --- a/canal/rows.go +++ b/canal/rows.go @@ -2,8 +2,6 @@ package canal import ( "fmt" - "strings" - "github.com/siddontang/go-mysql/replication" "github.com/siddontang/go-mysql/schema" ) @@ -60,7 +58,7 @@ func (r *RowsEvent) handleUnsigned() { case int16: r.Rows[i][index] = uint16(t) case int32: - if strings.Contains(strings.ToLower(r.Table.Columns[i].RawType), "mediumint") { + if r.Table.Columns[i].Type == schema.TYPE_MEDIUM_INT { // problem with mediumint is that it's a 3-byte type. There is no compatible golang type to match that. // So to convert from negative to positive we'd need to convert the value manually if i >= 0 { diff --git a/schema/schema.go b/schema/schema.go index ef5cf33a5..0c4f6344e 100644 --- a/schema/schema.go +++ b/schema/schema.go @@ -19,7 +19,7 @@ var HAHealthCheckSchema = "mysql.ha_health_check" // Different column type const ( - TYPE_NUMBER = iota + 1 // tinyint, smallint, mediumint, int, bigint, year + TYPE_NUMBER = iota + 1 // tinyint, smallint, int, bigint, year TYPE_FLOAT // float, double TYPE_ENUM // enum TYPE_SET // set @@ -31,6 +31,7 @@ const ( TYPE_BIT // bit TYPE_JSON // json TYPE_DECIMAL // decimal + TYPE_MEDIUM_INT ) type TableColumn struct { @@ -105,6 +106,8 @@ func (ta *Table) AddColumn(name string, columnType string, collation string, ext ta.Columns[index].Type = TYPE_BIT } else if strings.HasPrefix(columnType, "json") { ta.Columns[index].Type = TYPE_JSON + } else if strings.Contains(columnType, "mediumint") { + ta.Columns[index].Type = TYPE_MEDIUM_INT } else if strings.Contains(columnType, "int") || strings.HasPrefix(columnType, "year") { ta.Columns[index].Type = TYPE_NUMBER } else { From 053929629ec87f18529a2bd3d22397a5645a370a Mon Sep 17 00:00:00 2001 From: renhongdi Date: Tue, 9 Jul 2019 14:23:06 +0800 Subject: [PATCH 6/6] fix dump --- canal/dump.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/canal/dump.go b/canal/dump.go index 9ab0a948b..e4f712d66 100644 --- a/canal/dump.go +++ b/canal/dump.go @@ -52,7 +52,7 @@ func (h *dumpParseHandler) Data(db string, table string, values []string) error } else if v == "_binary ''" { vs[i] = []byte{} } else if v[0] != '\'' { - if tableInfo.Columns[i].Type == schema.TYPE_NUMBER { + if tableInfo.Columns[i].Type == schema.TYPE_NUMBER || tableInfo.Columns[i].Type == schema.TYPE_MEDIUM_INT { n, err := strconv.ParseInt(v, 10, 64) if err != nil { return fmt.Errorf("parse row %v at %d error %v, int expected", values, i, err)