From 29ab7e7717c7584e3208423e61400cca26e5ac26 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20van=20Eeden?= Date: Tue, 21 Feb 2023 22:00:28 +0100 Subject: [PATCH 01/12] Decoding of compressed binlog events --- go.mod | 1 + go.sum | 2 + replication/const.go | 7 ++ replication/parser.go | 2 + replication/transaction_payload_event.go | 103 +++++++++++++++++++++++ 5 files changed, 115 insertions(+) create mode 100644 replication/transaction_payload_event.go diff --git a/go.mod b/go.mod index 07fd5c89f..7409747d4 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,7 @@ require ( github.com/go-sql-driver/mysql v1.6.0 github.com/google/uuid v1.3.0 github.com/jmoiron/sqlx v1.3.3 + github.com/klauspost/compress v1.15.15 // indirect github.com/pingcap/check v0.0.0-20190102082844-67f458068fc8 github.com/pingcap/errors v0.11.5-0.20210425183316-da1aaba5fb63 github.com/pingcap/tidb/parser v0.0.0-20221126021158-6b02a5d8ba7d diff --git a/go.sum b/go.sum index 8d155a86f..63dc8425f 100644 --- a/go.sum +++ b/go.sum @@ -15,6 +15,8 @@ github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/jmoiron/sqlx v1.3.3 h1:j82X0bf7oQ27XeqxicSZsTU5suPwKElg3oyxNn43iTk= github.com/jmoiron/sqlx v1.3.3/go.mod h1:2BljVx/86SuTyjE+aPYlHCTNvZrnJXghYGpNiXLBMCQ= +github.com/klauspost/compress v1.15.15 h1:EF27CXIuDsYJ6mmvtBRlEuB2UVOqHG1tAXgZ7yIO+lw= +github.com/klauspost/compress v1.15.15/go.mod h1:ZcK2JAFqKOpnBlxcLsJzYfrS9X1akm9fHZNnD9+Vo/4= github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= diff --git a/replication/const.go b/replication/const.go index a8821ba25..96d72ddc1 100644 --- a/replication/const.go +++ b/replication/const.go @@ -90,6 +90,9 @@ const ( TRANSACTION_CONTEXT_EVENT VIEW_CHANGE_EVENT XA_PREPARE_LOG_EVENT + PARTIAL_UPDATE_ROWS_EVENT + TRANSACTION_PAYLOAD_EVENT + HEARTBEAT_LOG_EVENT_V2 ) const ( @@ -188,6 +191,10 @@ func (e EventType) String() string { return "ViewChangeEvent" case XA_PREPARE_LOG_EVENT: return "XAPrepareLogEvent" + case PARTIAL_UPDATE_ROWS_EVENT: + return "PartialUpdateRowsEvent" + case TRANSACTION_PAYLOAD_EVENT: + return "TransactionPlayloadEvent" default: return "UnknownEvent" diff --git a/replication/parser.go b/replication/parser.go index 329159d26..17fb992d6 100644 --- a/replication/parser.go +++ b/replication/parser.go @@ -295,6 +295,8 @@ func (p *BinlogParser) parseEvent(h *EventHeader, data []byte, rawData []byte) ( e = &PreviousGTIDsEvent{} case INTVAR_EVENT: e = &IntVarEvent{} + case TRANSACTION_PAYLOAD_EVENT: + e = &TransactionPayloadEvent{} default: e = &GenericEvent{} } diff --git a/replication/transaction_payload_event.go b/replication/transaction_payload_event.go new file mode 100644 index 000000000..f5d87ac60 --- /dev/null +++ b/replication/transaction_payload_event.go @@ -0,0 +1,103 @@ +package replication + +import ( + "encoding/hex" + "fmt" + "io" + + "github.com/klauspost/compress/zstd" + + . "github.com/go-mysql-org/go-mysql/mysql" +) + +// On The Wire: Field Types +const ( + OTW_PAYLOAD_HEADER_END_MARK = iota + OTW_PAYLOAD_SIZE_FIELD + OTW_PAYLOAD_COMPRESSION_TYPE_FIELD + OTW_PAYLOAD_UNCOMPRESSED_SIZE_FIELD +) + +// Compression Types +const ( + ZSTD = 0 + NONE = 255 +) + +func fieldTypeName(ft uint64) string { + switch ft { + case OTW_PAYLOAD_HEADER_END_MARK: + return "HeaderEndMark" + case OTW_PAYLOAD_SIZE_FIELD: + return "SizeField" + case OTW_PAYLOAD_COMPRESSION_TYPE_FIELD: + return "CompressionType" + case OTW_PAYLOAD_UNCOMPRESSED_SIZE_FIELD: + return "UncompressedSize" + default: + return "Unknown" + } +} + +type TransactionPayloadEvent struct { + Data []byte + Size uint64 + UncompressedSize uint64 + CompressionType uint64 + Payload []byte +} + +func (e *TransactionPayloadEvent) compressionType() string { + switch e.CompressionType { + case ZSTD: + return "ZSTD" + case NONE: + return "NONE" + default: + return "Unknown" + } +} + +func (e *TransactionPayloadEvent) Dump(w io.Writer) { + fmt.Fprintf(w, "Payload Size: %d\n", e.Size) + fmt.Fprintf(w, "Payload Uncompressed Size: %d\n", e.UncompressedSize) + fmt.Fprintf(w, "Payload CompressionType: %s\n", e.compressionType()) + fmt.Fprintf(w, "Payload Body: \n%s", hex.Dump(e.Payload)) + // fmt.Fprintf(w, "Transaction Payload Event data: \n%s", hex.Dump(e.Data)) + + decoder, _ := zstd.NewReader(nil, zstd.WithDecoderConcurrency(0)) + payloadUncompressed, _ := decoder.DecodeAll(e.Payload, nil) + fmt.Fprintf(w, "Decompressed: \n%s", hex.Dump(payloadUncompressed)) + fmt.Fprintln(w) +} + +func (e *TransactionPayloadEvent) Decode(data []byte) error { + e.Data = data + offset := uint64(0) + + for { + fieldType := FixedLengthInt(data[offset : offset+1]) + offset++ + + if fieldType == OTW_PAYLOAD_HEADER_END_MARK { + e.Payload = data[offset:] + break + } else { + fieldLength := FixedLengthInt(data[offset : offset+1]) + offset++ + + switch fieldType { + case OTW_PAYLOAD_SIZE_FIELD: + e.Size = FixedLengthInt(data[offset : offset+fieldLength]) + case OTW_PAYLOAD_COMPRESSION_TYPE_FIELD: + e.CompressionType = FixedLengthInt(data[offset : offset+fieldLength]) + case OTW_PAYLOAD_UNCOMPRESSED_SIZE_FIELD: + e.UncompressedSize = FixedLengthInt(data[offset : offset+fieldLength]) + } + + offset += fieldLength + } + } + + return nil +} From b29c29fbdb6666348ad52dc513fec7aab912910e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20van=20Eeden?= Date: Wed, 22 Feb 2023 21:44:43 +0100 Subject: [PATCH 02/12] Decode the uncompressed payload from TransactionPayloadEvent --- replication/parser.go | 9 +++++++- replication/transaction_payload_event.go | 27 +++++++++++++++++++++++- 2 files changed, 34 insertions(+), 2 deletions(-) diff --git a/replication/parser.go b/replication/parser.go index 17fb992d6..d92a7e05a 100644 --- a/replication/parser.go +++ b/replication/parser.go @@ -296,7 +296,7 @@ func (p *BinlogParser) parseEvent(h *EventHeader, data []byte, rawData []byte) ( case INTVAR_EVENT: e = &IntVarEvent{} case TRANSACTION_PAYLOAD_EVENT: - e = &TransactionPayloadEvent{} + e = p.newTransactionPayloadEvent() default: e = &GenericEvent{} } @@ -419,3 +419,10 @@ func (p *BinlogParser) newRowsEvent(h *EventHeader) *RowsEvent { return e } + +func (p *BinlogParser) newTransactionPayloadEvent() *TransactionPayloadEvent { + e := &TransactionPayloadEvent{} + e.parser = p + + return e +} diff --git a/replication/transaction_payload_event.go b/replication/transaction_payload_event.go index f5d87ac60..32e300c43 100644 --- a/replication/transaction_payload_event.go +++ b/replication/transaction_payload_event.go @@ -1,6 +1,7 @@ package replication import ( + "encoding/binary" "encoding/hex" "fmt" "io" @@ -40,6 +41,7 @@ func fieldTypeName(ft uint64) string { } type TransactionPayloadEvent struct { + parser *BinlogParser Data []byte Size uint64 UncompressedSize uint64 @@ -63,11 +65,34 @@ func (e *TransactionPayloadEvent) Dump(w io.Writer) { fmt.Fprintf(w, "Payload Uncompressed Size: %d\n", e.UncompressedSize) fmt.Fprintf(w, "Payload CompressionType: %s\n", e.compressionType()) fmt.Fprintf(w, "Payload Body: \n%s", hex.Dump(e.Payload)) - // fmt.Fprintf(w, "Transaction Payload Event data: \n%s", hex.Dump(e.Data)) decoder, _ := zstd.NewReader(nil, zstd.WithDecoderConcurrency(0)) payloadUncompressed, _ := decoder.DecodeAll(e.Payload, nil) fmt.Fprintf(w, "Decompressed: \n%s", hex.Dump(payloadUncompressed)) + + // The uncompressed data needs to be split up into individual events for Parsse() + // to work on them. We can't use a NewBinlogParser() as we need the initialization + // from the FormatDescriptionEvent. And we need to disable the binlog checksum + // algorithm as otherwise the XidEvent's get truncated and fail to parse. + offset := uint32(0) + for { + if offset >= uint32(len(payloadUncompressed)) { + break + } + eventLength := binary.LittleEndian.Uint32(payloadUncompressed[offset+9 : offset+13]) + data := payloadUncompressed[offset : offset+eventLength] + + e.parser.format.ChecksumAlgorithm = BINLOG_CHECKSUM_ALG_OFF + pe, err := e.parser.Parse(data) + if err != nil { + fmt.Fprintf(w, "Failed to parse payload: %s\n", err) + } else { + pe.Dump(w) + } + + offset += eventLength + } + fmt.Fprintln(w) } From 7dbc07b993adc7d1f26f1a94c6149b990f57c048 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20van=20Eeden?= Date: Wed, 22 Feb 2023 21:59:12 +0100 Subject: [PATCH 03/12] Fix typo --- replication/transaction_payload_event.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/replication/transaction_payload_event.go b/replication/transaction_payload_event.go index 32e300c43..5221fc59a 100644 --- a/replication/transaction_payload_event.go +++ b/replication/transaction_payload_event.go @@ -70,7 +70,7 @@ func (e *TransactionPayloadEvent) Dump(w io.Writer) { payloadUncompressed, _ := decoder.DecodeAll(e.Payload, nil) fmt.Fprintf(w, "Decompressed: \n%s", hex.Dump(payloadUncompressed)) - // The uncompressed data needs to be split up into individual events for Parsse() + // The uncompressed data needs to be split up into individual events for Parse() // to work on them. We can't use a NewBinlogParser() as we need the initialization // from the FormatDescriptionEvent. And we need to disable the binlog checksum // algorithm as otherwise the XidEvent's get truncated and fail to parse. From f9c9016b9afd49b419c46c460cb3dc11dce10945 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20van=20Eeden?= Date: Wed, 22 Feb 2023 22:06:45 +0100 Subject: [PATCH 04/12] Switch from github.com/klauspost/compress/zstd to github.com/DataDog/zstd --- go.mod | 2 +- go.sum | 4 ++-- replication/transaction_payload_event.go | 8 +++++--- 3 files changed, 8 insertions(+), 6 deletions(-) diff --git a/go.mod b/go.mod index 7409747d4..b79f997e2 100644 --- a/go.mod +++ b/go.mod @@ -4,10 +4,10 @@ go 1.16 require ( github.com/BurntSushi/toml v0.3.1 + github.com/DataDog/zstd v1.5.2 github.com/go-sql-driver/mysql v1.6.0 github.com/google/uuid v1.3.0 github.com/jmoiron/sqlx v1.3.3 - github.com/klauspost/compress v1.15.15 // indirect github.com/pingcap/check v0.0.0-20190102082844-67f458068fc8 github.com/pingcap/errors v0.11.5-0.20210425183316-da1aaba5fb63 github.com/pingcap/tidb/parser v0.0.0-20221126021158-6b02a5d8ba7d diff --git a/go.sum b/go.sum index 63dc8425f..ea24a843c 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,7 @@ github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/DataDog/zstd v1.5.2 h1:vUG4lAyuPCXO0TLbXvPv7EB7cNK1QV/luu55UHLrrn8= +github.com/DataDog/zstd v1.5.2/go.mod h1:g4AWEaM3yOg3HYfnJ3YIawPnVdXJh9QME85blwSAmyw= github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/cznic/mathutil v0.0.0-20181122101859-297441e03548/go.mod h1:e6NPNENfs9mPDVNRekM7lKScauxd5kXTr1Mfyig6TDM= @@ -15,8 +17,6 @@ github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/jmoiron/sqlx v1.3.3 h1:j82X0bf7oQ27XeqxicSZsTU5suPwKElg3oyxNn43iTk= github.com/jmoiron/sqlx v1.3.3/go.mod h1:2BljVx/86SuTyjE+aPYlHCTNvZrnJXghYGpNiXLBMCQ= -github.com/klauspost/compress v1.15.15 h1:EF27CXIuDsYJ6mmvtBRlEuB2UVOqHG1tAXgZ7yIO+lw= -github.com/klauspost/compress v1.15.15/go.mod h1:ZcK2JAFqKOpnBlxcLsJzYfrS9X1akm9fHZNnD9+Vo/4= github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= diff --git a/replication/transaction_payload_event.go b/replication/transaction_payload_event.go index 5221fc59a..212c73c90 100644 --- a/replication/transaction_payload_event.go +++ b/replication/transaction_payload_event.go @@ -6,7 +6,7 @@ import ( "fmt" "io" - "github.com/klauspost/compress/zstd" + "github.com/DataDog/zstd" . "github.com/go-mysql-org/go-mysql/mysql" ) @@ -66,8 +66,10 @@ func (e *TransactionPayloadEvent) Dump(w io.Writer) { fmt.Fprintf(w, "Payload CompressionType: %s\n", e.compressionType()) fmt.Fprintf(w, "Payload Body: \n%s", hex.Dump(e.Payload)) - decoder, _ := zstd.NewReader(nil, zstd.WithDecoderConcurrency(0)) - payloadUncompressed, _ := decoder.DecodeAll(e.Payload, nil) + payloadUncompressed, err := zstd.Decompress(nil, e.Payload) + if err != nil { + fmt.Fprintf(w, "Decompressed failed: %s\n", err) + } fmt.Fprintf(w, "Decompressed: \n%s", hex.Dump(payloadUncompressed)) // The uncompressed data needs to be split up into individual events for Parse() From 846ca9ce8f869a8a0ee9d74d0a3ca84ade277ff1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20van=20Eeden?= Date: Wed, 22 Feb 2023 22:17:58 +0100 Subject: [PATCH 05/12] Fix issue found by linter: Remove fieldTypeName --- replication/transaction_payload_event.go | 15 --------------- 1 file changed, 15 deletions(-) diff --git a/replication/transaction_payload_event.go b/replication/transaction_payload_event.go index 212c73c90..fd071988c 100644 --- a/replication/transaction_payload_event.go +++ b/replication/transaction_payload_event.go @@ -25,21 +25,6 @@ const ( NONE = 255 ) -func fieldTypeName(ft uint64) string { - switch ft { - case OTW_PAYLOAD_HEADER_END_MARK: - return "HeaderEndMark" - case OTW_PAYLOAD_SIZE_FIELD: - return "SizeField" - case OTW_PAYLOAD_COMPRESSION_TYPE_FIELD: - return "CompressionType" - case OTW_PAYLOAD_UNCOMPRESSED_SIZE_FIELD: - return "UncompressedSize" - default: - return "Unknown" - } -} - type TransactionPayloadEvent struct { parser *BinlogParser Data []byte From eb4c15e07044ef994dfe2236faf91116a7e4ef4d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20van=20Eeden?= Date: Thu, 23 Feb 2023 16:42:17 +0100 Subject: [PATCH 06/12] Move decoding from Dump() to Decode() --- replication/transaction_payload_event.go | 82 +++++++++++++++--------- 1 file changed, 51 insertions(+), 31 deletions(-) diff --git a/replication/transaction_payload_event.go b/replication/transaction_payload_event.go index fd071988c..1b8e09f8d 100644 --- a/replication/transaction_payload_event.go +++ b/replication/transaction_payload_event.go @@ -27,11 +27,11 @@ const ( type TransactionPayloadEvent struct { parser *BinlogParser - Data []byte Size uint64 UncompressedSize uint64 CompressionType uint64 Payload []byte + Events []*BinlogEvent } func (e *TransactionPayloadEvent) compressionType() string { @@ -50,41 +50,21 @@ func (e *TransactionPayloadEvent) Dump(w io.Writer) { fmt.Fprintf(w, "Payload Uncompressed Size: %d\n", e.UncompressedSize) fmt.Fprintf(w, "Payload CompressionType: %s\n", e.compressionType()) fmt.Fprintf(w, "Payload Body: \n%s", hex.Dump(e.Payload)) - - payloadUncompressed, err := zstd.Decompress(nil, e.Payload) - if err != nil { - fmt.Fprintf(w, "Decompressed failed: %s\n", err) - } - fmt.Fprintf(w, "Decompressed: \n%s", hex.Dump(payloadUncompressed)) - - // The uncompressed data needs to be split up into individual events for Parse() - // to work on them. We can't use a NewBinlogParser() as we need the initialization - // from the FormatDescriptionEvent. And we need to disable the binlog checksum - // algorithm as otherwise the XidEvent's get truncated and fail to parse. - offset := uint32(0) - for { - if offset >= uint32(len(payloadUncompressed)) { - break - } - eventLength := binary.LittleEndian.Uint32(payloadUncompressed[offset+9 : offset+13]) - data := payloadUncompressed[offset : offset+eventLength] - - e.parser.format.ChecksumAlgorithm = BINLOG_CHECKSUM_ALG_OFF - pe, err := e.parser.Parse(data) - if err != nil { - fmt.Fprintf(w, "Failed to parse payload: %s\n", err) - } else { - pe.Dump(w) - } - - offset += eventLength + for _, event := range e.Events { + event.Dump(w) } - fmt.Fprintln(w) } func (e *TransactionPayloadEvent) Decode(data []byte) error { - e.Data = data + err := e.decodeTransactionHeader(data) + if err != nil { + return err + } + return e.decodeTransactionPayload(data) +} + +func (e *TransactionPayloadEvent) decodeTransactionHeader(data []byte) error { offset := uint64(0) for { @@ -113,3 +93,43 @@ func (e *TransactionPayloadEvent) Decode(data []byte) error { return nil } + +func (e *TransactionPayloadEvent) decodeTransactionPayload(data []byte) error { + payloadUncompressed, err := zstd.Decompress(nil, e.Payload) + if err != nil { + return err + } + + // The uncompressed data needs to be split up into individual events for Parse() + // to work on them. We can't use e.parser directly as we need to disable checksums + // but we still need the initialization from the FormatDescriptionEvent. We can't + // modify e.parser as it is used elsewhere. + parser := NewBinlogParser() + parser.format = &FormatDescriptionEvent{ + Version: e.parser.format.Version, + ServerVersion: e.parser.format.ServerVersion, + CreateTimestamp: e.parser.format.CreateTimestamp, + EventHeaderLength: e.parser.format.EventHeaderLength, + EventTypeHeaderLengths: e.parser.format.EventTypeHeaderLengths, + ChecksumAlgorithm: BINLOG_CHECKSUM_ALG_OFF, + } + + offset := uint32(0) + for { + if offset >= uint32(len(payloadUncompressed)) { + break + } + eventLength := binary.LittleEndian.Uint32(payloadUncompressed[offset+9 : offset+13]) + data := payloadUncompressed[offset : offset+eventLength] + + pe, err := parser.Parse(data) + if err != nil { + return err + } + e.Events = append(e.Events, pe) + + offset += eventLength + } + + return nil +} From f0abf29c59c33efae1f46c2d68f80f9ac7b669e1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20van=20Eeden?= Date: Thu, 23 Feb 2023 16:44:46 +0100 Subject: [PATCH 07/12] Slight change in naming --- replication/transaction_payload_event.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/replication/transaction_payload_event.go b/replication/transaction_payload_event.go index 1b8e09f8d..a061bdc57 100644 --- a/replication/transaction_payload_event.go +++ b/replication/transaction_payload_event.go @@ -57,14 +57,14 @@ func (e *TransactionPayloadEvent) Dump(w io.Writer) { } func (e *TransactionPayloadEvent) Decode(data []byte) error { - err := e.decodeTransactionHeader(data) + err := e.decodeFields(data) if err != nil { return err } - return e.decodeTransactionPayload(data) + return e.decodePayload(data) } -func (e *TransactionPayloadEvent) decodeTransactionHeader(data []byte) error { +func (e *TransactionPayloadEvent) decodeFields(data []byte) error { offset := uint64(0) for { @@ -94,7 +94,7 @@ func (e *TransactionPayloadEvent) decodeTransactionHeader(data []byte) error { return nil } -func (e *TransactionPayloadEvent) decodeTransactionPayload(data []byte) error { +func (e *TransactionPayloadEvent) decodePayload(data []byte) error { payloadUncompressed, err := zstd.Decompress(nil, e.Payload) if err != nil { return err From ede7ac910917f865428f8e5bbc22b2d783d27962 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20van=20Eeden?= Date: Thu, 23 Feb 2023 16:47:48 +0100 Subject: [PATCH 08/12] decodePayload, doesn't use the data argument --- replication/transaction_payload_event.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/replication/transaction_payload_event.go b/replication/transaction_payload_event.go index a061bdc57..0864dcde6 100644 --- a/replication/transaction_payload_event.go +++ b/replication/transaction_payload_event.go @@ -61,7 +61,7 @@ func (e *TransactionPayloadEvent) Decode(data []byte) error { if err != nil { return err } - return e.decodePayload(data) + return e.decodePayload() } func (e *TransactionPayloadEvent) decodeFields(data []byte) error { @@ -94,7 +94,7 @@ func (e *TransactionPayloadEvent) decodeFields(data []byte) error { return nil } -func (e *TransactionPayloadEvent) decodePayload(data []byte) error { +func (e *TransactionPayloadEvent) decodePayload() error { payloadUncompressed, err := zstd.Decompress(nil, e.Payload) if err != nil { return err From c7a54762e59cd9313e840dafb69e5465e438a4b8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20van=20Eeden?= Date: Fri, 24 Feb 2023 08:24:00 +0100 Subject: [PATCH 09/12] Update replication/const.go Co-authored-by: lance6716 --- replication/const.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/replication/const.go b/replication/const.go index 96d72ddc1..436b02f00 100644 --- a/replication/const.go +++ b/replication/const.go @@ -194,7 +194,7 @@ func (e EventType) String() string { case PARTIAL_UPDATE_ROWS_EVENT: return "PartialUpdateRowsEvent" case TRANSACTION_PAYLOAD_EVENT: - return "TransactionPlayloadEvent" + return "TransactionPayloadEvent" default: return "UnknownEvent" From 527b681e670c69d53c1d503771d4acb2837f0597 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20van=20Eeden?= Date: Fri, 24 Feb 2023 08:44:12 +0100 Subject: [PATCH 10/12] Update based on review --- replication/const.go | 2 ++ replication/transaction_payload_event.go | 6 +++++- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/replication/const.go b/replication/const.go index 436b02f00..fc31763ba 100644 --- a/replication/const.go +++ b/replication/const.go @@ -195,6 +195,8 @@ func (e EventType) String() string { return "PartialUpdateRowsEvent" case TRANSACTION_PAYLOAD_EVENT: return "TransactionPayloadEvent" + case HEARTBEAT_LOG_EVENT_V2: + return "HeartbeatLogEventV2" default: return "UnknownEvent" diff --git a/replication/transaction_payload_event.go b/replication/transaction_payload_event.go index 0864dcde6..7475749cc 100644 --- a/replication/transaction_payload_event.go +++ b/replication/transaction_payload_event.go @@ -12,6 +12,8 @@ import ( ) // On The Wire: Field Types +// See also binary_log::codecs::binary::Transaction_payload::fields in MySQL +// https://dev.mysql.com/doc/dev/mysql-server/latest/classbinary__log_1_1codecs_1_1binary_1_1Transaction__payload.html#a9fff7ac12ba064f40e9216565c53d07b const ( OTW_PAYLOAD_HEADER_END_MARK = iota OTW_PAYLOAD_SIZE_FIELD @@ -50,9 +52,11 @@ func (e *TransactionPayloadEvent) Dump(w io.Writer) { fmt.Fprintf(w, "Payload Uncompressed Size: %d\n", e.UncompressedSize) fmt.Fprintf(w, "Payload CompressionType: %s\n", e.compressionType()) fmt.Fprintf(w, "Payload Body: \n%s", hex.Dump(e.Payload)) + fmt.Fprintln(w, "=== Start of events decoded from compressed payload ===") for _, event := range e.Events { event.Dump(w) } + fmt.Fprintln(w, "=== End of events decoded from compressed payload ===") fmt.Fprintln(w) } @@ -116,7 +120,7 @@ func (e *TransactionPayloadEvent) decodePayload() error { offset := uint32(0) for { - if offset >= uint32(len(payloadUncompressed)) { + if offset+13 > uint32(len(payloadUncompressed)) { break } eventLength := binary.LittleEndian.Uint32(payloadUncompressed[offset+9 : offset+13]) From 1a0cfc72bc8fbe835da45081b925046b7dab9657 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20van=20Eeden?= Date: Fri, 24 Feb 2023 09:02:09 +0100 Subject: [PATCH 11/12] A few more checks --- replication/transaction_payload_event.go | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/replication/transaction_payload_event.go b/replication/transaction_payload_event.go index 7475749cc..753f9923b 100644 --- a/replication/transaction_payload_event.go +++ b/replication/transaction_payload_event.go @@ -99,6 +99,11 @@ func (e *TransactionPayloadEvent) decodeFields(data []byte) error { } func (e *TransactionPayloadEvent) decodePayload() error { + if e.CompressionType != ZSTD { + return fmt.Errorf("TransactionPayloadEvent has compression type %d (%s)", + e.CompressionType, e.compressionType()) + } + payloadUncompressed, err := zstd.Decompress(nil, e.Payload) if err != nil { return err @@ -120,10 +125,15 @@ func (e *TransactionPayloadEvent) decodePayload() error { offset := uint32(0) for { - if offset+13 > uint32(len(payloadUncompressed)) { + payloadUncompressedLength := uint32(len(payloadUncompressed)) + if offset+13 > payloadUncompressedLength { break } eventLength := binary.LittleEndian.Uint32(payloadUncompressed[offset+9 : offset+13]) + if offset+eventLength > payloadUncompressedLength { + return fmt.Errorf("Event length of %d with offset %d in uncompressed payload exceeds payload length of %d", + eventLength, offset, payloadUncompressedLength) + } data := payloadUncompressed[offset : offset+eventLength] pe, err := parser.Parse(data) From e5c8adcf6dfe8c5c4d7e0baf0c16024860cb7a10 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20van=20Eeden?= Date: Fri, 24 Feb 2023 09:44:07 +0100 Subject: [PATCH 12/12] Only use format, not the whole parser --- replication/parser.go | 2 +- replication/transaction_payload_event.go | 12 ++++++------ 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/replication/parser.go b/replication/parser.go index d92a7e05a..cb34707e6 100644 --- a/replication/parser.go +++ b/replication/parser.go @@ -422,7 +422,7 @@ func (p *BinlogParser) newRowsEvent(h *EventHeader) *RowsEvent { func (p *BinlogParser) newTransactionPayloadEvent() *TransactionPayloadEvent { e := &TransactionPayloadEvent{} - e.parser = p + e.format = *p.format return e } diff --git a/replication/transaction_payload_event.go b/replication/transaction_payload_event.go index 753f9923b..8b69a1441 100644 --- a/replication/transaction_payload_event.go +++ b/replication/transaction_payload_event.go @@ -28,7 +28,7 @@ const ( ) type TransactionPayloadEvent struct { - parser *BinlogParser + format FormatDescriptionEvent Size uint64 UncompressedSize uint64 CompressionType uint64 @@ -115,11 +115,11 @@ func (e *TransactionPayloadEvent) decodePayload() error { // modify e.parser as it is used elsewhere. parser := NewBinlogParser() parser.format = &FormatDescriptionEvent{ - Version: e.parser.format.Version, - ServerVersion: e.parser.format.ServerVersion, - CreateTimestamp: e.parser.format.CreateTimestamp, - EventHeaderLength: e.parser.format.EventHeaderLength, - EventTypeHeaderLengths: e.parser.format.EventTypeHeaderLengths, + Version: e.format.Version, + ServerVersion: e.format.ServerVersion, + CreateTimestamp: e.format.CreateTimestamp, + EventHeaderLength: e.format.EventHeaderLength, + EventTypeHeaderLengths: e.format.EventTypeHeaderLengths, ChecksumAlgorithm: BINLOG_CHECKSUM_ALG_OFF, }