Skip to content

Commit e2786c8

Browse files
committed
Start to add mycanal
1 parent 366588c commit e2786c8

File tree

15 files changed

+1609
-0
lines changed

15 files changed

+1609
-0
lines changed

go.mod

+16
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,22 @@ module github.com/huangjunwen/golibs
33
go 1.13
44

55
require (
6+
github.com/Microsoft/go-winio v0.4.14 // indirect
7+
github.com/cenkalti/backoff v2.2.1+incompatible // indirect
8+
github.com/containerd/continuity v0.0.0-20200228182428-0f16d7a0959c // indirect
9+
github.com/go-sql-driver/mysql v1.4.1
610
github.com/huangjunwen/tstsvc v0.6.0
11+
github.com/konsorten/go-windows-terminal-sequences v1.0.2 // indirect
12+
github.com/kr/pretty v0.1.0 // indirect
13+
github.com/ory/dockertest v3.3.5+incompatible // indirect
14+
github.com/pkg/errors v0.9.1
15+
github.com/satori/go.uuid v1.2.0
16+
github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24
17+
github.com/siddontang/go-mysql v0.0.0-20200622032841-a8c16ae9a9cb
18+
github.com/sirupsen/logrus v1.5.0 // indirect
719
github.com/stretchr/testify v1.6.1
20+
golang.org/x/net v0.0.0-20200324143707-d3edc9973b7e // indirect
21+
golang.org/x/sys v0.0.0-20200409092240-59c9f1ba88fa // indirect
22+
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect
23+
gopkg.in/volatiletech/null.v6 v6.0.0-20170828023728-0bef4e07ae1b
824
)

go.sum

+62
Large diffs are not rendered by default.

mycanal/_tests/compatible_test.go

+273
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,273 @@
1+
package tests
2+
3+
import (
4+
"context"
5+
"database/sql"
6+
"fmt"
7+
"log"
8+
"reflect"
9+
"testing"
10+
"time"
11+
12+
tstmysql "github.com/huangjunwen/tstsvc/mysql"
13+
"github.com/ory/dockertest"
14+
"github.com/stretchr/testify/assert"
15+
16+
. "github.com/huangjunwen/golibs/mycanal"
17+
"github.com/huangjunwen/golibs/mycanal/fulldump"
18+
"github.com/huangjunwen/golibs/mycanal/incrdump"
19+
"github.com/huangjunwen/golibs/sqlh"
20+
)
21+
22+
type diffValue struct {
23+
ColName string
24+
FullDumpVal interface{}
25+
IncrDumpVal interface{}
26+
}
27+
28+
func TestCompatible(t *testing.T) {
29+
30+
var err error
31+
assert := assert.New(t)
32+
33+
var resMySQL *tstmysql.Resource
34+
{
35+
resMySQL, err = tstmysql.Run(&tstmysql.Options{
36+
Tag: "8.0.19",
37+
BaseRunOptions: dockertest.RunOptions{
38+
Cmd: []string{
39+
"--gtid-mode=ON",
40+
"--enforce-gtid-consistency=ON",
41+
"--log-bin=/var/lib/mysql/binlog",
42+
"--server-id=1",
43+
"--binlog-format=ROW",
44+
"--binlog-row-image=full",
45+
"--binlog-row-metadata=full",
46+
},
47+
},
48+
})
49+
if err != nil {
50+
log.Panic(err)
51+
}
52+
defer resMySQL.Close()
53+
log.Printf("MySQL server started.\n")
54+
}
55+
56+
cfg := Config{
57+
Host: "localhost",
58+
Port: resMySQL.Options.HostPort,
59+
User: "root",
60+
Password: resMySQL.Options.RootPassword,
61+
}
62+
fullDumpCfg := &FullDumpConfig{
63+
Config: cfg,
64+
}
65+
incrDumpCfg := &IncrDumpConfig{
66+
Config: cfg,
67+
ServerId: 1001,
68+
}
69+
70+
var db *sql.DB
71+
{
72+
db, err = resMySQL.Client()
73+
if err != nil {
74+
log.Panic(err)
75+
}
76+
defer db.Close()
77+
log.Printf("MySQL client created.\n")
78+
}
79+
80+
_, err = db.Exec(`CREATE TABLE tst._types (
81+
id int unsigned primary key auto_increment,
82+
83+
nb_bit bit(64),
84+
mb_bit bit(64) not null default '1011',
85+
86+
ins_tinyint tinyint,
87+
ins_smallint smallint,
88+
ins_mediumint mediumint,
89+
ins_int int,
90+
ins_bigint bigint,
91+
ins_decimal decimal(65,30),
92+
ins_float float,
93+
ins_double double,
94+
inu_tinyint tinyint unsigned,
95+
inu_smallint smallint unsigned,
96+
inu_mediumint mediumint unsigned,
97+
inu_int int unsigned,
98+
inu_bigint bigint unsigned,
99+
inu_decimal decimal(65,30) unsigned,
100+
inu_float float unsigned,
101+
inu_double double unsigned,
102+
103+
ims_tinyint tinyint not null default -128,
104+
ims_smallint smallint not null default -32768,
105+
ims_mediumint mediumint not null default -8388608,
106+
ims_int int not null default -2147483648,
107+
ims_bigint bigint not null default -9223372036854775808,
108+
ims_decimal decimal(65,30) not null default '-77777.77707',
109+
ims_float float not null default -3.1415,
110+
ims_double double not null default -2.7182,
111+
imu_tinyint tinyint unsigned not null default 255,
112+
imu_smallint smallint unsigned not null default 65535,
113+
imu_mediumint mediumint unsigned not null default 16777215,
114+
imu_int int unsigned not null default 4294967295,
115+
imu_bigint bigint unsigned not null default 18446744073709551615,
116+
imu_decimal decimal(65,30) unsigned not null default '88888.88808',
117+
imu_float float unsigned not null default 3.1415,
118+
imu_double double unsigned not null default 2.7182,
119+
120+
tn_year year,
121+
tn_date date,
122+
tn_time time,
123+
tn_ftime time(6),
124+
tn_datetime datetime,
125+
tn_fdatetime datetime(6),
126+
tn_timestamp timestamp,
127+
tn_ftimestamp timestamp(6),
128+
129+
tm_year year not null default '2020',
130+
tm_date date not null default '2020-02-20',
131+
tm_time time not null default '20:20:20',
132+
tm_ftime time(6) not null default '20:20:20.123456',
133+
tm_datetime datetime not null default '2020-02-20 20:20:20',
134+
tm_fdatetime datetime(6) not null default '2020-02-20 20:20:20.123456',
135+
tm_timestamp timestamp not null default current_timestamp,
136+
tm_ftimestamp timestamp(6) not null default current_timestamp(6),
137+
138+
cn_char char(255),
139+
cn_varchar varchar(255),
140+
cn_binary binary(64),
141+
cn_varbinary varbinary(64),
142+
cn_tinyblob tinyblob,
143+
cn_blob blob,
144+
cn_mediumblob mediumblob,
145+
cn_longblob longblob,
146+
cn_tinytext tinytext,
147+
cn_text text,
148+
cn_mediumtext mediumtext,
149+
cn_longtext longtext,
150+
151+
cm_char char(255) not null default 'char',
152+
cm_varchar varchar(255) not null default 'varchar',
153+
cm_binary binary(64) not null default 'binary\0binary\0',
154+
cm_varbinary varbinary(64) not null default 'varbinary\0varbinary\0',
155+
cm_tinyblob tinyblob default ('tinyblob\0tinyblob\0'),
156+
cm_blob blob default ('blob\0blob\0'),
157+
cm_mediumblob mediumblob default ('mediumblob\0mediumblob\0'),
158+
cm_longblob longblob default ('longblob\0longblob\0'),
159+
cm_tinytext tinytext default ('tinytext'),
160+
cm_text text default ('text'),
161+
cm_mediumtext mediumtext default ('mediumtext'),
162+
cm_longtext longtext default ('longtext'),
163+
164+
en_enum enum('a', 'b', 'c'),
165+
em_enum enum('a', 'b', 'c') default 'a',
166+
167+
sn_set set('w', 'x', 'y', 'z'),
168+
sm_set set('w', 'x', 'y', 'z') default 'w,y',
169+
170+
jn_json json,
171+
jm_json json default ('{"a": "b", "c":[]}'),
172+
173+
g_geometry geometry default null
174+
175+
)`)
176+
if err != nil {
177+
log.Panic(err)
178+
}
179+
180+
_, err = db.Exec("INSERT INTO tst._types (id) VALUES (?)", 1)
181+
if err != nil {
182+
log.Panic(err)
183+
}
184+
185+
var gset string
186+
var fullDumpVals map[string]interface{}
187+
gset, err = fulldump.FullDump(context.Background(), fullDumpCfg, func(ctx context.Context, q sqlh.Queryer) error {
188+
iter, err := fulldump.FullTableQuery(ctx, q, "tst", "_types")
189+
if err != nil {
190+
return err
191+
}
192+
defer iter(false)
193+
194+
fullDumpVals, err = iter(true)
195+
if err != nil {
196+
return err
197+
}
198+
199+
return nil
200+
})
201+
202+
assert.NoError(err)
203+
204+
_, err = db.Exec("DELETE FROM tst._types")
205+
assert.NoError(err)
206+
207+
// Capture the deletion
208+
var rowDeletion *incrdump.RowDeletion
209+
ctx, cancel := context.WithCancel(context.Background())
210+
err = incrdump.IncrDump(
211+
ctx,
212+
incrDumpCfg,
213+
gset,
214+
func(ctx context.Context, e interface{}) error {
215+
switch ev := e.(type) {
216+
case *incrdump.RowDeletion:
217+
rowDeletion = ev
218+
cancel()
219+
}
220+
return nil
221+
},
222+
)
223+
assert.NoError(err)
224+
incrDumpVals := rowDeletion.BeforeDataMap()
225+
colNames := rowDeletion.ColumnNames()
226+
227+
fmtValue := func(v interface{}) string {
228+
switch val := v.(type) {
229+
case time.Time:
230+
return fmt.Sprintf("time.Time<%s>", val.Format(time.RFC3339Nano))
231+
232+
case string:
233+
return fmt.Sprintf("%+q", val)
234+
235+
case nil:
236+
return "<nil>"
237+
238+
default:
239+
return fmt.Sprintf("%T<%#v>", val, val)
240+
241+
}
242+
}
243+
244+
diffs := []diffValue{}
245+
246+
for _, colName := range colNames {
247+
fullDumpVal := fullDumpVals[colName]
248+
incrDumpVal := incrDumpVals[colName]
249+
fmt.Println(
250+
colName,
251+
fmtValue(fullDumpVal),
252+
fmtValue(incrDumpVal),
253+
)
254+
if !reflect.DeepEqual(fullDumpVal, incrDumpVal) {
255+
diffs = append(diffs, diffValue{
256+
ColName: colName,
257+
FullDumpVal: fullDumpVal,
258+
IncrDumpVal: incrDumpVal,
259+
})
260+
}
261+
}
262+
263+
fmt.Println("----------- Diff --------------")
264+
265+
for _, d := range diffs {
266+
fmt.Println(
267+
d.ColName,
268+
fmtValue(d.FullDumpVal),
269+
fmtValue(d.IncrDumpVal),
270+
)
271+
}
272+
273+
}

mycanal/config.go

+76
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
package mycanal
2+
3+
import (
4+
"database/sql"
5+
"fmt"
6+
7+
"github.com/go-sql-driver/mysql"
8+
"github.com/siddontang/go-mysql/replication"
9+
)
10+
11+
type Config struct {
12+
// Host of MySQL server.
13+
Host string `json:"host"`
14+
15+
// Port of MySQL server.
16+
Port uint16 `json:"port"`
17+
18+
// User for connection.
19+
User string `json:"user"`
20+
21+
// Password for connection.
22+
Password string `json:"password"`
23+
24+
// Charset for connecting.
25+
Charset string `json:"charset"`
26+
}
27+
28+
type FullDumpConfig struct {
29+
Config
30+
}
31+
32+
type IncrDumpConfig struct {
33+
Config
34+
35+
// ServerId is the server id for the slave.
36+
ServerId uint32 `json:"serverId"`
37+
}
38+
39+
func (cfg *FullDumpConfig) ToDriverCfg() *mysql.Config {
40+
ret := mysql.NewConfig()
41+
ret.Net = "tcp"
42+
ret.Addr = fmt.Sprintf("%s:%d", cfg.Host, cfg.Port)
43+
ret.User = cfg.User
44+
ret.Passwd = cfg.Password
45+
ret.ParseTime = true
46+
ret.InterpolateParams = true
47+
if ret.Params == nil {
48+
ret.Params = map[string]string{}
49+
}
50+
ret.Params["charset"] = cfg.getCharset()
51+
return ret
52+
}
53+
54+
func (cfg *FullDumpConfig) Client() (*sql.DB, error) {
55+
return sql.Open("mysql", cfg.ToDriverCfg().FormatDSN())
56+
}
57+
58+
func (cfg *IncrDumpConfig) ToDriverCfg() replication.BinlogSyncerConfig {
59+
return replication.BinlogSyncerConfig{
60+
ServerID: cfg.ServerId,
61+
Host: cfg.Host,
62+
Port: cfg.Port,
63+
User: cfg.User,
64+
Password: cfg.Password,
65+
Charset: cfg.getCharset(),
66+
ParseTime: true,
67+
UseDecimal: true,
68+
}
69+
}
70+
71+
func (cfg *Config) getCharset() string {
72+
if cfg.Charset != "" {
73+
return cfg.Charset
74+
}
75+
return "utf8mb4"
76+
}

0 commit comments

Comments
 (0)