From ec808226be20e202eed03cc6e19452764e2f789b Mon Sep 17 00:00:00 2001 From: gmhdbjd Date: Wed, 16 Nov 2022 14:23:35 +0800 Subject: [PATCH 1/2] add RowsEventDecodeFunc --- replication/binlogsyncer.go | 3 +++ replication/parser.go | 14 +++++++++++++- replication/parser_test.go | 34 ++++++++++++++++++++++++++++++++++ replication/row_event.go | 23 ++++++++++++++++++----- 4 files changed, 68 insertions(+), 6 deletions(-) diff --git a/replication/binlogsyncer.go b/replication/binlogsyncer.go index 01c4252d8..082bdde98 100644 --- a/replication/binlogsyncer.go +++ b/replication/binlogsyncer.go @@ -118,6 +118,8 @@ type BinlogSyncerConfig struct { // Set Dialer Dialer client.Dialer + + RowsEventDecodeFunc func(*RowsEvent, []byte) error } // BinlogSyncer syncs binlog event from server. @@ -176,6 +178,7 @@ func NewBinlogSyncer(cfg BinlogSyncerConfig) *BinlogSyncer { b.parser.SetTimestampStringLocation(b.cfg.TimestampStringLocation) b.parser.SetUseDecimal(b.cfg.UseDecimal) b.parser.SetVerifyChecksum(b.cfg.VerifyChecksum) + b.parser.SetRowsEventDecodeFunc(b.cfg.RowsEventDecodeFunc) b.running = false b.ctx, b.cancel = context.WithCancel(context.Background()) diff --git a/replication/parser.go b/replication/parser.go index 7688fe7b0..329159d26 100644 --- a/replication/parser.go +++ b/replication/parser.go @@ -40,6 +40,8 @@ type BinlogParser struct { useDecimal bool ignoreJSONDecodeErr bool verifyChecksum bool + + rowsEventDecodeFunc func(*RowsEvent, []byte) error } func NewBinlogParser() *BinlogParser { @@ -212,6 +214,10 @@ func (p *BinlogParser) SetFlavor(flavor string) { p.flavor = flavor } +func (p *BinlogParser) SetRowsEventDecodeFunc(rowsEventDecodeFunc func(*RowsEvent, []byte) error) { + p.rowsEventDecodeFunc = rowsEventDecodeFunc +} + func (p *BinlogParser) parseHeader(data []byte) (*EventHeader, error) { h := new(EventHeader) err := h.Decode(data) @@ -297,7 +303,13 @@ func (p *BinlogParser) parseEvent(h *EventHeader, data []byte, rawData []byte) ( } } - if err := e.Decode(data); err != nil { + var err error + if re, ok := e.(*RowsEvent); ok && p.rowsEventDecodeFunc != nil { + err = p.rowsEventDecodeFunc(re, data) + } else { + err = e.Decode(data) + } + if err != nil { return nil, &EventError{h, err.Error(), data} } diff --git a/replication/parser_test.go b/replication/parser_test.go index 2f957198c..ddf6fe51e 100644 --- a/replication/parser_test.go +++ b/replication/parser_test.go @@ -2,8 +2,10 @@ package replication import ( "bytes" + "testing" . "github.com/pingcap/check" + "github.com/stretchr/testify/require" ) func (t *testSyncerSuite) TestIndexOutOfRange(c *C) { @@ -76,3 +78,35 @@ func (t *testSyncerSuite) TestParseEvent(c *C) { c.Assert(err2, IsNil) } } + +func TestDecodeRowsEvent(t *testing.T) { + testCases := []struct { + byteData []byte + eventSize uint32 + eventType EventType + }{ + // FORMAT_DESCRIPTION_EVENT + {[]byte{0x64, 0x61, 0x72, 0x63, 0xf, 0xb, 0x0, 0x0, 0x0, 0x77, 0x0, 0x0, 0x0, 0x7b, 0x0, 0x0, 0x0, 0x1, 0x0, 0x4, 0x0, 0x35, 0x2e, 0x37, 0x2e, 0x32, 0x32, 0x2d, 0x6c, 0x6f, 0x67, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x64, 0x61, 0x72, 0x63, 0x13, 0x38, 0xd, 0x0, 0x8, 0x0, 0x12, 0x0, 0x4, 0x4, 0x4, 0x4, 0x12, 0x0, 0x0, 0x5f, 0x0, 0x4, 0x1a, 0x8, 0x0, 0x0, 0x0, 0x8, 0x8, 0x8, 0x2, 0x0, 0x0, 0x0, 0xa, 0xa, 0xa, 0x2a, 0x2a, 0x0, 0x12, 0x34, 0x0, 0x1, 0xb8, 0x78, 0x9d, 0xfe}, uint32(119), FORMAT_DESCRIPTION_EVENT}, + // TABLE MAP EVENT tb(INT) + {[]byte{0x8d, 0x61, 0x72, 0x63, 0x13, 0xb, 0x0, 0x0, 0x0, 0x2c, 0x0, 0x0, 0x0, 0xa7, 0x0, 0x0, 0x0, 0x1, 0x0, 0x6c, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x0, 0x2, 0x64, 0x62, 0x0, 0x3, 0x74, 0x62, 0x6c, 0x0, 0x1, 0x3, 0x0, 0x0, 0x63, 0x17, 0xe6, 0xf0}, uint32(44), TABLE_MAP_EVENT}, + // rows INT(1) + {[]byte{0xb6, 0x61, 0x72, 0x63, 0x1e, 0xb, 0x0, 0x0, 0x0, 0x28, 0x0, 0x0, 0x0, 0xcf, 0x0, 0x0, 0x0, 0x1, 0x0, 0x6c, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x0, 0x2, 0x0, 0x1, 0xff, 0x0, 0x1, 0x0, 0x0, 0x0, 0xf9, 0xf7, 0x89, 0x2a}, uint32(40), WRITE_ROWS_EVENTv2}, + // TABLE MAP EVENT tb(TINY) + {[]byte{0x22, 0x6c, 0x72, 0x63, 0x13, 0xb, 0x0, 0x0, 0x0, 0x2e, 0x0, 0x0, 0x0, 0xfd, 0x0, 0x0, 0x0, 0x1, 0x0, 0x76, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x0, 0x3, 0x64, 0x62, 0x31, 0x0, 0x4, 0x74, 0x62, 0x6c, 0x31, 0x0, 0x1, 0x1, 0x0, 0x0, 0x32, 0xec, 0x2f, 0x4}, uint32(46), TABLE_MAP_EVENT}, + // rows LONG(1) + // panic if not set rows event decode func + {[]byte{0xeb, 0x64, 0x72, 0x63, 0x1e, 0xb, 0x0, 0x0, 0x0, 0x2d, 0x0, 0x0, 0x0, 0x2a, 0x1, 0x0, 0x0, 0x1, 0x0, 0x76, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x0, 0x2, 0x0, 0x1, 0xff, 0x0, 0x1, 0x0, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0, 0x6e, 0xef, 0xb2, 0xb1}, uint32(45), WRITE_ROWS_EVENTv2}, + } + + parser := NewBinlogParser() + parser.SetRowsEventDecodeFunc(func(re *RowsEvent, bs []byte) error { + _, err := re.DecodeHeader(bs) + return err + }) + for _, tc := range testCases { + e, err := parser.Parse(tc.byteData) + require.NoError(t, err) + require.Equal(t, e.Header.EventType, tc.eventType) + require.Equal(t, e.Header.EventSize, tc.eventSize) + } +} diff --git a/replication/row_event.go b/replication/row_event.go index 5f17e2b9d..8c02e1685 100644 --- a/replication/row_event.go +++ b/replication/row_event.go @@ -857,7 +857,7 @@ type RowsEvent struct { ignoreJSONDecodeErr bool } -func (e *RowsEvent) Decode(data []byte) (err2 error) { +func (e *RowsEvent) DecodeHeader(data []byte) (int, error) { pos := 0 e.TableID = FixedLengthInt(data[0:e.tableIDSize]) pos += e.tableIDSize @@ -890,14 +890,19 @@ func (e *RowsEvent) Decode(data []byte) (err2 error) { e.Table, ok = e.tables[e.TableID] if !ok { if len(e.tables) > 0 { - return errors.Errorf("invalid table id %d, no corresponding table map event", e.TableID) + return 0, errors.Errorf("invalid table id %d, no corresponding table map event", e.TableID) } else { - return errors.Annotatef(errMissingTableMapEvent, "table id %d", e.TableID) + return 0, errors.Annotatef(errMissingTableMapEvent, "table id %d", e.TableID) } } + return pos, nil +} - var err error - +func (e *RowsEvent) DecodeData(pos int, data []byte) (err2 error) { + var ( + n int + err error + ) // ... repeat rows until event-end defer func() { if r := recover(); r != nil { @@ -932,6 +937,14 @@ func (e *RowsEvent) Decode(data []byte) (err2 error) { return nil } +func (e *RowsEvent) Decode(data []byte) error { + pos, err := e.DecodeHeader(data) + if err != nil { + return err + } + return e.DecodeData(pos, data) +} + func isBitSet(bitmap []byte, i int) bool { return bitmap[i>>3]&(1<<(uint(i)&7)) > 0 } From b59a00a53e30183dbab36ef434320f1da5e21f54 Mon Sep 17 00:00:00 2001 From: gmhdbjd Date: Wed, 16 Nov 2022 17:03:10 +0800 Subject: [PATCH 2/2] use checker instead of require --- replication/parser_test.go | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/replication/parser_test.go b/replication/parser_test.go index ddf6fe51e..a9d3100c4 100644 --- a/replication/parser_test.go +++ b/replication/parser_test.go @@ -2,10 +2,8 @@ package replication import ( "bytes" - "testing" . "github.com/pingcap/check" - "github.com/stretchr/testify/require" ) func (t *testSyncerSuite) TestIndexOutOfRange(c *C) { @@ -79,7 +77,7 @@ func (t *testSyncerSuite) TestParseEvent(c *C) { } } -func TestDecodeRowsEvent(t *testing.T) { +func (t *testSyncerSuite) TestRowsEventDecodeFunc(c *C) { testCases := []struct { byteData []byte eventSize uint32 @@ -105,8 +103,8 @@ func TestDecodeRowsEvent(t *testing.T) { }) for _, tc := range testCases { e, err := parser.Parse(tc.byteData) - require.NoError(t, err) - require.Equal(t, e.Header.EventType, tc.eventType) - require.Equal(t, e.Header.EventSize, tc.eventSize) + c.Assert(err, IsNil) + c.Assert(e.Header.EventType, Equals, tc.eventType) + c.Assert(e.Header.EventSize, Equals, tc.eventSize) } }