Skip to content

Commit 49d58c4

Browse files
authored
Merge pull request #737 from GMHDBJD/addRowsEventDecodeFunc
parser: allow user-defined rows_event decode func
2 parents 0cba5f5 + b59a00a commit 49d58c4

File tree

4 files changed

+66
-6
lines changed

4 files changed

+66
-6
lines changed

replication/binlogsyncer.go

+3
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,8 @@ type BinlogSyncerConfig struct {
118118

119119
// Set Dialer
120120
Dialer client.Dialer
121+
122+
RowsEventDecodeFunc func(*RowsEvent, []byte) error
121123
}
122124

123125
// BinlogSyncer syncs binlog event from server.
@@ -176,6 +178,7 @@ func NewBinlogSyncer(cfg BinlogSyncerConfig) *BinlogSyncer {
176178
b.parser.SetTimestampStringLocation(b.cfg.TimestampStringLocation)
177179
b.parser.SetUseDecimal(b.cfg.UseDecimal)
178180
b.parser.SetVerifyChecksum(b.cfg.VerifyChecksum)
181+
b.parser.SetRowsEventDecodeFunc(b.cfg.RowsEventDecodeFunc)
179182
b.running = false
180183
b.ctx, b.cancel = context.WithCancel(context.Background())
181184

replication/parser.go

+13-1
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ type BinlogParser struct {
4040
useDecimal bool
4141
ignoreJSONDecodeErr bool
4242
verifyChecksum bool
43+
44+
rowsEventDecodeFunc func(*RowsEvent, []byte) error
4345
}
4446

4547
func NewBinlogParser() *BinlogParser {
@@ -212,6 +214,10 @@ func (p *BinlogParser) SetFlavor(flavor string) {
212214
p.flavor = flavor
213215
}
214216

217+
func (p *BinlogParser) SetRowsEventDecodeFunc(rowsEventDecodeFunc func(*RowsEvent, []byte) error) {
218+
p.rowsEventDecodeFunc = rowsEventDecodeFunc
219+
}
220+
215221
func (p *BinlogParser) parseHeader(data []byte) (*EventHeader, error) {
216222
h := new(EventHeader)
217223
err := h.Decode(data)
@@ -297,7 +303,13 @@ func (p *BinlogParser) parseEvent(h *EventHeader, data []byte, rawData []byte) (
297303
}
298304
}
299305

300-
if err := e.Decode(data); err != nil {
306+
var err error
307+
if re, ok := e.(*RowsEvent); ok && p.rowsEventDecodeFunc != nil {
308+
err = p.rowsEventDecodeFunc(re, data)
309+
} else {
310+
err = e.Decode(data)
311+
}
312+
if err != nil {
301313
return nil, &EventError{h, err.Error(), data}
302314
}
303315

replication/parser_test.go

+32
Original file line numberDiff line numberDiff line change
@@ -76,3 +76,35 @@ func (t *testSyncerSuite) TestParseEvent(c *C) {
7676
c.Assert(err2, IsNil)
7777
}
7878
}
79+
80+
func (t *testSyncerSuite) TestRowsEventDecodeFunc(c *C) {
81+
testCases := []struct {
82+
byteData []byte
83+
eventSize uint32
84+
eventType EventType
85+
}{
86+
// FORMAT_DESCRIPTION_EVENT
87+
{[]byte{0x64, 0x61, 0x72, 0x63, 0xf, 0xb, 0x0, 0x0, 0x0, 0x77, 0x0, 0x0, 0x0, 0x7b, 0x0, 0x0, 0x0, 0x1, 0x0, 0x4, 0x0, 0x35, 0x2e, 0x37, 0x2e, 0x32, 0x32, 0x2d, 0x6c, 0x6f, 0x67, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x64, 0x61, 0x72, 0x63, 0x13, 0x38, 0xd, 0x0, 0x8, 0x0, 0x12, 0x0, 0x4, 0x4, 0x4, 0x4, 0x12, 0x0, 0x0, 0x5f, 0x0, 0x4, 0x1a, 0x8, 0x0, 0x0, 0x0, 0x8, 0x8, 0x8, 0x2, 0x0, 0x0, 0x0, 0xa, 0xa, 0xa, 0x2a, 0x2a, 0x0, 0x12, 0x34, 0x0, 0x1, 0xb8, 0x78, 0x9d, 0xfe}, uint32(119), FORMAT_DESCRIPTION_EVENT},
88+
// TABLE MAP EVENT tb(INT)
89+
{[]byte{0x8d, 0x61, 0x72, 0x63, 0x13, 0xb, 0x0, 0x0, 0x0, 0x2c, 0x0, 0x0, 0x0, 0xa7, 0x0, 0x0, 0x0, 0x1, 0x0, 0x6c, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x0, 0x2, 0x64, 0x62, 0x0, 0x3, 0x74, 0x62, 0x6c, 0x0, 0x1, 0x3, 0x0, 0x0, 0x63, 0x17, 0xe6, 0xf0}, uint32(44), TABLE_MAP_EVENT},
90+
// rows INT(1)
91+
{[]byte{0xb6, 0x61, 0x72, 0x63, 0x1e, 0xb, 0x0, 0x0, 0x0, 0x28, 0x0, 0x0, 0x0, 0xcf, 0x0, 0x0, 0x0, 0x1, 0x0, 0x6c, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x0, 0x2, 0x0, 0x1, 0xff, 0x0, 0x1, 0x0, 0x0, 0x0, 0xf9, 0xf7, 0x89, 0x2a}, uint32(40), WRITE_ROWS_EVENTv2},
92+
// TABLE MAP EVENT tb(TINY)
93+
{[]byte{0x22, 0x6c, 0x72, 0x63, 0x13, 0xb, 0x0, 0x0, 0x0, 0x2e, 0x0, 0x0, 0x0, 0xfd, 0x0, 0x0, 0x0, 0x1, 0x0, 0x76, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x0, 0x3, 0x64, 0x62, 0x31, 0x0, 0x4, 0x74, 0x62, 0x6c, 0x31, 0x0, 0x1, 0x1, 0x0, 0x0, 0x32, 0xec, 0x2f, 0x4}, uint32(46), TABLE_MAP_EVENT},
94+
// rows LONG(1)
95+
// panic if not set rows event decode func
96+
{[]byte{0xeb, 0x64, 0x72, 0x63, 0x1e, 0xb, 0x0, 0x0, 0x0, 0x2d, 0x0, 0x0, 0x0, 0x2a, 0x1, 0x0, 0x0, 0x1, 0x0, 0x76, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x0, 0x2, 0x0, 0x1, 0xff, 0x0, 0x1, 0x0, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0, 0x6e, 0xef, 0xb2, 0xb1}, uint32(45), WRITE_ROWS_EVENTv2},
97+
}
98+
99+
parser := NewBinlogParser()
100+
parser.SetRowsEventDecodeFunc(func(re *RowsEvent, bs []byte) error {
101+
_, err := re.DecodeHeader(bs)
102+
return err
103+
})
104+
for _, tc := range testCases {
105+
e, err := parser.Parse(tc.byteData)
106+
c.Assert(err, IsNil)
107+
c.Assert(e.Header.EventType, Equals, tc.eventType)
108+
c.Assert(e.Header.EventSize, Equals, tc.eventSize)
109+
}
110+
}

replication/row_event.go

+18-5
Original file line numberDiff line numberDiff line change
@@ -857,7 +857,7 @@ type RowsEvent struct {
857857
ignoreJSONDecodeErr bool
858858
}
859859

860-
func (e *RowsEvent) Decode(data []byte) (err2 error) {
860+
func (e *RowsEvent) DecodeHeader(data []byte) (int, error) {
861861
pos := 0
862862
e.TableID = FixedLengthInt(data[0:e.tableIDSize])
863863
pos += e.tableIDSize
@@ -890,14 +890,19 @@ func (e *RowsEvent) Decode(data []byte) (err2 error) {
890890
e.Table, ok = e.tables[e.TableID]
891891
if !ok {
892892
if len(e.tables) > 0 {
893-
return errors.Errorf("invalid table id %d, no corresponding table map event", e.TableID)
893+
return 0, errors.Errorf("invalid table id %d, no corresponding table map event", e.TableID)
894894
} else {
895-
return errors.Annotatef(errMissingTableMapEvent, "table id %d", e.TableID)
895+
return 0, errors.Annotatef(errMissingTableMapEvent, "table id %d", e.TableID)
896896
}
897897
}
898+
return pos, nil
899+
}
898900

899-
var err error
900-
901+
func (e *RowsEvent) DecodeData(pos int, data []byte) (err2 error) {
902+
var (
903+
n int
904+
err error
905+
)
901906
// ... repeat rows until event-end
902907
defer func() {
903908
if r := recover(); r != nil {
@@ -932,6 +937,14 @@ func (e *RowsEvent) Decode(data []byte) (err2 error) {
932937
return nil
933938
}
934939

940+
func (e *RowsEvent) Decode(data []byte) error {
941+
pos, err := e.DecodeHeader(data)
942+
if err != nil {
943+
return err
944+
}
945+
return e.DecodeData(pos, data)
946+
}
947+
935948
func isBitSet(bitmap []byte, i int) bool {
936949
return bitmap[i>>3]&(1<<(uint(i)&7)) > 0
937950
}

0 commit comments

Comments
 (0)