Skip to content

parser: allow user-defined rows_event decode func #737

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Nov 16, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions replication/binlogsyncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ type BinlogSyncerConfig struct {

// Set Dialer
Dialer client.Dialer

RowsEventDecodeFunc func(*RowsEvent, []byte) error
}

// BinlogSyncer syncs binlog event from server.
Expand Down Expand Up @@ -176,6 +178,7 @@ func NewBinlogSyncer(cfg BinlogSyncerConfig) *BinlogSyncer {
b.parser.SetTimestampStringLocation(b.cfg.TimestampStringLocation)
b.parser.SetUseDecimal(b.cfg.UseDecimal)
b.parser.SetVerifyChecksum(b.cfg.VerifyChecksum)
b.parser.SetRowsEventDecodeFunc(b.cfg.RowsEventDecodeFunc)
b.running = false
b.ctx, b.cancel = context.WithCancel(context.Background())

Expand Down
14 changes: 13 additions & 1 deletion replication/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ type BinlogParser struct {
useDecimal bool
ignoreJSONDecodeErr bool
verifyChecksum bool

rowsEventDecodeFunc func(*RowsEvent, []byte) error
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we can set the default value to (*RowsEvent).Decode, so in parseEvent we don't need to check it many times.

}

func NewBinlogParser() *BinlogParser {
Expand Down Expand Up @@ -212,6 +214,10 @@ func (p *BinlogParser) SetFlavor(flavor string) {
p.flavor = flavor
}

func (p *BinlogParser) SetRowsEventDecodeFunc(rowsEventDecodeFunc func(*RowsEvent, []byte) error) {
p.rowsEventDecodeFunc = rowsEventDecodeFunc
}

func (p *BinlogParser) parseHeader(data []byte) (*EventHeader, error) {
h := new(EventHeader)
err := h.Decode(data)
Expand Down Expand Up @@ -297,7 +303,13 @@ func (p *BinlogParser) parseEvent(h *EventHeader, data []byte, rawData []byte) (
}
}

if err := e.Decode(data); err != nil {
var err error
if re, ok := e.(*RowsEvent); ok && p.rowsEventDecodeFunc != nil {
err = p.rowsEventDecodeFunc(re, data)
} else {
err = e.Decode(data)
}
if err != nil {
return nil, &EventError{h, err.Error(), data}
}

Expand Down
34 changes: 34 additions & 0 deletions replication/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package replication

import (
"bytes"
"testing"

. "github.com/pingcap/check"
"github.com/stretchr/testify/require"
)

func (t *testSyncerSuite) TestIndexOutOfRange(c *C) {
Expand Down Expand Up @@ -76,3 +78,35 @@ func (t *testSyncerSuite) TestParseEvent(c *C) {
c.Assert(err2, IsNil)
}
}

func TestDecodeRowsEvent(t *testing.T) {
testCases := []struct {
byteData []byte
eventSize uint32
eventType EventType
}{
// FORMAT_DESCRIPTION_EVENT
{[]byte{0x64, 0x61, 0x72, 0x63, 0xf, 0xb, 0x0, 0x0, 0x0, 0x77, 0x0, 0x0, 0x0, 0x7b, 0x0, 0x0, 0x0, 0x1, 0x0, 0x4, 0x0, 0x35, 0x2e, 0x37, 0x2e, 0x32, 0x32, 0x2d, 0x6c, 0x6f, 0x67, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x64, 0x61, 0x72, 0x63, 0x13, 0x38, 0xd, 0x0, 0x8, 0x0, 0x12, 0x0, 0x4, 0x4, 0x4, 0x4, 0x12, 0x0, 0x0, 0x5f, 0x0, 0x4, 0x1a, 0x8, 0x0, 0x0, 0x0, 0x8, 0x8, 0x8, 0x2, 0x0, 0x0, 0x0, 0xa, 0xa, 0xa, 0x2a, 0x2a, 0x0, 0x12, 0x34, 0x0, 0x1, 0xb8, 0x78, 0x9d, 0xfe}, uint32(119), FORMAT_DESCRIPTION_EVENT},
// TABLE MAP EVENT tb(INT)
{[]byte{0x8d, 0x61, 0x72, 0x63, 0x13, 0xb, 0x0, 0x0, 0x0, 0x2c, 0x0, 0x0, 0x0, 0xa7, 0x0, 0x0, 0x0, 0x1, 0x0, 0x6c, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x0, 0x2, 0x64, 0x62, 0x0, 0x3, 0x74, 0x62, 0x6c, 0x0, 0x1, 0x3, 0x0, 0x0, 0x63, 0x17, 0xe6, 0xf0}, uint32(44), TABLE_MAP_EVENT},
// rows INT(1)
{[]byte{0xb6, 0x61, 0x72, 0x63, 0x1e, 0xb, 0x0, 0x0, 0x0, 0x28, 0x0, 0x0, 0x0, 0xcf, 0x0, 0x0, 0x0, 0x1, 0x0, 0x6c, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x0, 0x2, 0x0, 0x1, 0xff, 0x0, 0x1, 0x0, 0x0, 0x0, 0xf9, 0xf7, 0x89, 0x2a}, uint32(40), WRITE_ROWS_EVENTv2},
// TABLE MAP EVENT tb(TINY)
{[]byte{0x22, 0x6c, 0x72, 0x63, 0x13, 0xb, 0x0, 0x0, 0x0, 0x2e, 0x0, 0x0, 0x0, 0xfd, 0x0, 0x0, 0x0, 0x1, 0x0, 0x76, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x0, 0x3, 0x64, 0x62, 0x31, 0x0, 0x4, 0x74, 0x62, 0x6c, 0x31, 0x0, 0x1, 0x1, 0x0, 0x0, 0x32, 0xec, 0x2f, 0x4}, uint32(46), TABLE_MAP_EVENT},
// rows LONG(1)
// panic if not set rows event decode func
{[]byte{0xeb, 0x64, 0x72, 0x63, 0x1e, 0xb, 0x0, 0x0, 0x0, 0x2d, 0x0, 0x0, 0x0, 0x2a, 0x1, 0x0, 0x0, 0x1, 0x0, 0x76, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x0, 0x2, 0x0, 0x1, 0xff, 0x0, 0x1, 0x0, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0, 0x6e, 0xef, 0xb2, 0xb1}, uint32(45), WRITE_ROWS_EVENTv2},
}

parser := NewBinlogParser()
parser.SetRowsEventDecodeFunc(func(re *RowsEvent, bs []byte) error {
_, err := re.DecodeHeader(bs)
return err
})
for _, tc := range testCases {
e, err := parser.Parse(tc.byteData)
require.NoError(t, err)
require.Equal(t, e.Header.EventType, tc.eventType)
require.Equal(t, e.Header.EventSize, tc.eventSize)
}
}
23 changes: 18 additions & 5 deletions replication/row_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -857,7 +857,7 @@ type RowsEvent struct {
ignoreJSONDecodeErr bool
}

func (e *RowsEvent) Decode(data []byte) (err2 error) {
func (e *RowsEvent) DecodeHeader(data []byte) (int, error) {
pos := 0
e.TableID = FixedLengthInt(data[0:e.tableIDSize])
pos += e.tableIDSize
Expand Down Expand Up @@ -890,14 +890,19 @@ func (e *RowsEvent) Decode(data []byte) (err2 error) {
e.Table, ok = e.tables[e.TableID]
if !ok {
if len(e.tables) > 0 {
return errors.Errorf("invalid table id %d, no corresponding table map event", e.TableID)
return 0, errors.Errorf("invalid table id %d, no corresponding table map event", e.TableID)
} else {
return errors.Annotatef(errMissingTableMapEvent, "table id %d", e.TableID)
return 0, errors.Annotatef(errMissingTableMapEvent, "table id %d", e.TableID)
}
}
return pos, nil
}

var err error

func (e *RowsEvent) DecodeData(pos int, data []byte) (err2 error) {
var (
n int
err error
)
// ... repeat rows until event-end
defer func() {
if r := recover(); r != nil {
Expand Down Expand Up @@ -932,6 +937,14 @@ func (e *RowsEvent) Decode(data []byte) (err2 error) {
return nil
}

func (e *RowsEvent) Decode(data []byte) error {
pos, err := e.DecodeHeader(data)
if err != nil {
return err
}
return e.DecodeData(pos, data)
}

func isBitSet(bitmap []byte, i int) bool {
return bitmap[i>>3]&(1<<(uint(i)&7)) > 0
}
Expand Down