Skip to content

Commit acade6e

Browse files
authored
Merge branch 'master' into master
2 parents 7d032f3 + 8250ec4 commit acade6e

File tree

8 files changed

+95
-7
lines changed

8 files changed

+95
-7
lines changed

README.md

+11
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,17 @@
22

33
A pure go library to handle MySQL network protocol and replication.
44

5+
## Call for Committer/Maintainer
6+
7+
Sorry that I have no enough time to maintain this project wholly, if you like this project and want to help me improve it continuously, please contact me through email ([email protected]).
8+
9+
Requirement: In the email, you should list somethings(including but not limited to below) to make me believe we can work together.
10+
11+
+ Your GitHub ID
12+
+ The contributions to go-mysql before, including PRs or Issues.
13+
+ The reason why you can improve go-mysql.
14+
15+
516
## Replication
617

718
Replication package handles MySQL replication protocol like [python-mysql-replication](https://github.com/noplay/python-mysql-replication).

canal/canal.go

+8-2
Original file line numberDiff line numberDiff line change
@@ -425,6 +425,7 @@ func (c *Canal) prepareSyncer() error {
425425
SemiSyncEnabled: c.cfg.SemiSyncEnabled,
426426
MaxReconnectAttempts: c.cfg.MaxReconnectAttempts,
427427
TimestampStringLocation: c.cfg.TimestampStringLocation,
428+
TLSConfig: c.cfg.TLSConfig,
428429
}
429430

430431
if strings.Contains(c.cfg.Addr, "/") {
@@ -453,11 +454,16 @@ func (c *Canal) prepareSyncer() error {
453454
func (c *Canal) Execute(cmd string, args ...interface{}) (rr *mysql.Result, err error) {
454455
c.connLock.Lock()
455456
defer c.connLock.Unlock()
456-
457+
argF := make([]func(*client.Conn), 0)
458+
if c.cfg.TLSConfig != nil {
459+
argF = append(argF, func(conn *client.Conn) {
460+
conn.SetTLSConfig(c.cfg.TLSConfig)
461+
})
462+
}
457463
retryNum := 3
458464
for i := 0; i < retryNum; i++ {
459465
if c.conn == nil {
460-
c.conn, err = client.Connect(c.cfg.Addr, c.cfg.User, c.cfg.Password, "")
466+
c.conn, err = client.Connect(c.cfg.Addr, c.cfg.User, c.cfg.Password, "", argF...)
461467
if err != nil {
462468
return nil, errors.Trace(err)
463469
}

canal/config.go

+4
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package canal
22

33
import (
4+
"crypto/tls"
45
"io/ioutil"
56
"math/rand"
67
"time"
@@ -79,6 +80,9 @@ type Config struct {
7980
// Set to change the maximum number of attempts to re-establish a broken
8081
// connection
8182
MaxReconnectAttempts int `toml:"max_reconnect_attempts"`
83+
84+
// Set TLS config
85+
TLSConfig *tls.Config
8286
}
8387

8488
func NewConfigWithFile(name string) (*Config, error) {

cmd/go-canal/main.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ func main() {
5656
os.Exit(1)
5757
}
5858

59-
if len(*ignoreTables) == 0 {
59+
if len(*ignoreTables) > 0 {
6060
subs := strings.Split(*ignoreTables, ",")
6161
for _, sub := range subs {
6262
if seps := strings.Split(sub, "."); len(seps) == 2 {

mysql/resultset.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ func (p RowData) ParseText(f []*Field) ([]interface{}, error) {
4242

4343
switch f[i].Type {
4444
case MYSQL_TYPE_TINY, MYSQL_TYPE_SHORT, MYSQL_TYPE_INT24,
45-
MYSQL_TYPE_LONGLONG, MYSQL_TYPE_YEAR:
45+
MYSQL_TYPE_LONGLONG, MYSQL_TYPE_LONG, MYSQL_TYPE_YEAR:
4646
if isUnsigned {
4747
data[i], err = strconv.ParseUint(string(v), 10, 64)
4848
} else {

replication/row_event.go

+6-1
Original file line numberDiff line numberDiff line change
@@ -1109,7 +1109,12 @@ func (e *RowsEvent) decodeValue(data []byte, tp byte, meta uint16) (v interface{
11091109

11101110
case MYSQL_TYPE_YEAR:
11111111
n = 1
1112-
v = int(data[0]) + 1900
1112+
year := int(data[0])
1113+
if year == 0 {
1114+
v = year
1115+
} else {
1116+
v = year + 1900
1117+
}
11131118
case MYSQL_TYPE_ENUM:
11141119
l := meta & 0xFF
11151120
switch l {

schema/schema.go

+46-1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ package schema
77
import (
88
"database/sql"
99
"fmt"
10+
"strconv"
1011
"strings"
1112

1213
"github.com/pingcap/errors"
@@ -23,7 +24,7 @@ const (
2324
TYPE_FLOAT // float, double
2425
TYPE_ENUM // enum
2526
TYPE_SET // set
26-
TYPE_STRING // other
27+
TYPE_STRING // char, varchar, etc.
2728
TYPE_DATETIME // datetime
2829
TYPE_TIMESTAMP // timestamp
2930
TYPE_DATE // date
@@ -32,6 +33,8 @@ const (
3233
TYPE_JSON // json
3334
TYPE_DECIMAL // decimal
3435
TYPE_MEDIUM_INT
36+
TYPE_BINARY // binary, varbinary
37+
TYPE_POINT // coordinates
3538
)
3639

3740
type TableColumn struct {
@@ -41,8 +44,11 @@ type TableColumn struct {
4144
RawType string
4245
IsAuto bool
4346
IsUnsigned bool
47+
IsVirtual bool
4448
EnumValues []string
4549
SetValues []string
50+
FixedSize uint
51+
MaxSize uint
4652
}
4753

4854
type Index struct {
@@ -94,6 +100,14 @@ func (ta *Table) AddColumn(name string, columnType string, collation string, ext
94100
")"),
95101
"'", "", -1),
96102
",")
103+
} else if strings.HasPrefix(columnType, "binary") {
104+
ta.Columns[index].Type = TYPE_BINARY
105+
size := getSizeFromColumnType(columnType)
106+
ta.Columns[index].MaxSize = size
107+
ta.Columns[index].FixedSize = size
108+
} else if strings.HasPrefix(columnType, "varbinary") {
109+
ta.Columns[index].Type = TYPE_BINARY
110+
ta.Columns[index].MaxSize = getSizeFromColumnType(columnType)
97111
} else if strings.HasPrefix(columnType, "datetime") {
98112
ta.Columns[index].Type = TYPE_DATETIME
99113
} else if strings.HasPrefix(columnType, "timestamp") {
@@ -106,12 +120,20 @@ func (ta *Table) AddColumn(name string, columnType string, collation string, ext
106120
ta.Columns[index].Type = TYPE_BIT
107121
} else if strings.HasPrefix(columnType, "json") {
108122
ta.Columns[index].Type = TYPE_JSON
123+
} else if strings.Contains(columnType, "point") {
124+
ta.Columns[index].Type = TYPE_POINT
109125
} else if strings.Contains(columnType, "mediumint") {
110126
ta.Columns[index].Type = TYPE_MEDIUM_INT
111127
} else if strings.Contains(columnType, "int") || strings.HasPrefix(columnType, "year") {
112128
ta.Columns[index].Type = TYPE_NUMBER
129+
} else if strings.HasPrefix(columnType, "char") {
130+
ta.Columns[index].Type = TYPE_STRING
131+
size := getSizeFromColumnType(columnType)
132+
ta.Columns[index].FixedSize = size
133+
ta.Columns[index].MaxSize = size
113134
} else {
114135
ta.Columns[index].Type = TYPE_STRING
136+
ta.Columns[index].MaxSize = getSizeFromColumnType(columnType)
115137
}
116138

117139
if strings.Contains(columnType, "unsigned") || strings.Contains(columnType, "zerofill") {
@@ -121,7 +143,30 @@ func (ta *Table) AddColumn(name string, columnType string, collation string, ext
121143

122144
if extra == "auto_increment" {
123145
ta.Columns[index].IsAuto = true
146+
} else if extra == "VIRTUAL GENERATED" {
147+
ta.Columns[index].IsVirtual = true
148+
}
149+
}
150+
151+
func getSizeFromColumnType(columnType string) uint {
152+
startIndex := strings.Index(columnType, "(")
153+
if startIndex < 0 {
154+
return 0
155+
}
156+
157+
// we are searching for the first () and there may not be any closing
158+
// brackets before the opening, so no need search at the offset from the
159+
// opening ones
160+
endIndex := strings.Index(columnType, ")")
161+
if startIndex < 0 || endIndex < 0 || startIndex > endIndex {
162+
return 0
163+
}
164+
165+
i, err := strconv.Atoi(columnType[startIndex+1:endIndex])
166+
if err != nil || i < 0 {
167+
return 0
124168
}
169+
return uint(i)
125170
}
126171

127172
func (ta *Table) FindColumn(name string) int {

schema/schema_test.go

+18-1
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,9 @@ func (s *schemaTestSuite) TestSchema(c *C) {
6262
zfint INT ZEROFILL,
6363
name_ucs VARCHAR(256) CHARACTER SET ucs2,
6464
name_utf8 VARCHAR(256) CHARACTER SET utf8,
65+
name_char CHAR(10),
66+
name_binary BINARY(11),
67+
name_varbinary VARBINARY(12),
6568
PRIMARY KEY(id2, id),
6669
UNIQUE (id1),
6770
INDEX name_idx (name)
@@ -74,20 +77,34 @@ func (s *schemaTestSuite) TestSchema(c *C) {
7477
ta, err := NewTable(s.conn, "test", "schema_test")
7578
c.Assert(err, IsNil)
7679

77-
c.Assert(ta.Columns, HasLen, 12)
80+
c.Assert(ta.Columns, HasLen, 15)
7881
c.Assert(ta.Indexes, HasLen, 3)
7982
c.Assert(ta.PKColumns, DeepEquals, []int{2, 0})
8083
c.Assert(ta.Indexes[0].Columns, HasLen, 2)
8184
c.Assert(ta.Indexes[0].Name, Equals, "PRIMARY")
8285
c.Assert(ta.Indexes[2].Name, Equals, "name_idx")
86+
c.Assert(ta.Columns[3].Type, Equals, TYPE_STRING)
87+
c.Assert(ta.Columns[3].MaxSize, Equals, uint(256))
88+
c.Assert(ta.Columns[3].FixedSize, Equals, uint(0))
8389
c.Assert(ta.Columns[4].EnumValues, DeepEquals, []string{"appointing", "serving", "abnormal", "stop", "noaftermarket", "finish", "financial_audit"})
8490
c.Assert(ta.Columns[5].SetValues, DeepEquals, []string{"a", "b", "c"})
8591
c.Assert(ta.Columns[7].Type, Equals, TYPE_DECIMAL)
8692
c.Assert(ta.Columns[0].IsUnsigned, IsFalse)
8793
c.Assert(ta.Columns[8].IsUnsigned, IsTrue)
8894
c.Assert(ta.Columns[9].IsUnsigned, IsTrue)
8995
c.Assert(ta.Columns[10].Collation, Matches, "^ucs2.*")
96+
c.Assert(ta.Columns[10].MaxSize, Equals, uint(256))
97+
c.Assert(ta.Columns[10].FixedSize, Equals, uint(0))
9098
c.Assert(ta.Columns[11].Collation, Matches, "^utf8.*")
99+
c.Assert(ta.Columns[12].Type, Equals, TYPE_STRING)
100+
c.Assert(ta.Columns[12].MaxSize, Equals, uint(10))
101+
c.Assert(ta.Columns[12].FixedSize, Equals, uint(10))
102+
c.Assert(ta.Columns[13].Type, Equals, TYPE_BINARY)
103+
c.Assert(ta.Columns[13].MaxSize, Equals, uint(11))
104+
c.Assert(ta.Columns[13].FixedSize, Equals, uint(11))
105+
c.Assert(ta.Columns[14].Type, Equals, TYPE_BINARY)
106+
c.Assert(ta.Columns[14].MaxSize, Equals, uint(12))
107+
c.Assert(ta.Columns[14].FixedSize, Equals, uint(0))
91108

92109
taSqlDb, err := NewTableFromSqlDB(s.sqlDB, "test", "schema_test")
93110
c.Assert(err, IsNil)

0 commit comments

Comments
 (0)