From b87730b24454cbef3ed77a05dadab81be890e091 Mon Sep 17 00:00:00 2001 From: Maurus Cuelenaere Date: Wed, 11 Oct 2023 14:42:17 +0200 Subject: [PATCH 1/2] refactor: move decompression into DecodeData method --- replication/row_event.go | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/replication/row_event.go b/replication/row_event.go index 87fe49dbf..d4eaf7105 100644 --- a/replication/row_event.go +++ b/replication/row_event.go @@ -1018,6 +1018,14 @@ func (e *RowsEvent) decodeExtraData(data []byte) (err2 error) { } func (e *RowsEvent) DecodeData(pos int, data []byte) (err2 error) { + if e.compressed { + data, err2 = DecompressMariadbData(data[pos:]) + if err2 != nil { + //nolint:nakedret + return + } + } + // Rows_log_event::print_verbose() var ( @@ -1073,13 +1081,6 @@ func (e *RowsEvent) Decode(data []byte) error { if err != nil { return err } - if e.compressed { - uncompressedData, err := DecompressMariadbData(data[pos:]) - if err != nil { - return err - } - return e.DecodeData(0, uncompressedData) - } return e.DecodeData(pos, data) } From c59e3811434ef680d68690b043d41aebd8c0df0c Mon Sep 17 00:00:00 2001 From: Maurus Cuelenaere Date: Wed, 11 Oct 2023 14:42:45 +0200 Subject: [PATCH 2/2] feat: skip decoding of row events for tables we are not interested in --- canal/canal.go | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/canal/canal.go b/canal/canal.go index 6c07bb741..989c4e297 100644 --- a/canal/canal.go +++ b/canal/canal.go @@ -451,6 +451,19 @@ func (c *Canal) prepareSyncer() error { Logger: c.cfg.Logger, Dialer: c.cfg.Dialer, Localhost: c.cfg.Localhost, + RowsEventDecodeFunc: func(event *replication.RowsEvent, data []byte) error { + pos, err := event.DecodeHeader(data) + if err != nil { + return err + } + + key := fmt.Sprintf("%s.%s", string(event.Table.Schema), string(event.Table.Table)) + if !c.checkTableMatch(key) { + return nil + } + + return event.DecodeData(pos, data) + }, } if strings.Contains(c.cfg.Addr, "/") {