diff --git a/go.mod b/go.mod index 07fd5c89f..b79f997e2 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.16 require ( github.com/BurntSushi/toml v0.3.1 + github.com/DataDog/zstd v1.5.2 github.com/go-sql-driver/mysql v1.6.0 github.com/google/uuid v1.3.0 github.com/jmoiron/sqlx v1.3.3 diff --git a/go.sum b/go.sum index 8d155a86f..ea24a843c 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,7 @@ github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/DataDog/zstd v1.5.2 h1:vUG4lAyuPCXO0TLbXvPv7EB7cNK1QV/luu55UHLrrn8= +github.com/DataDog/zstd v1.5.2/go.mod h1:g4AWEaM3yOg3HYfnJ3YIawPnVdXJh9QME85blwSAmyw= github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/cznic/mathutil v0.0.0-20181122101859-297441e03548/go.mod h1:e6NPNENfs9mPDVNRekM7lKScauxd5kXTr1Mfyig6TDM= diff --git a/replication/const.go b/replication/const.go index a8821ba25..fc31763ba 100644 --- a/replication/const.go +++ b/replication/const.go @@ -90,6 +90,9 @@ const ( TRANSACTION_CONTEXT_EVENT VIEW_CHANGE_EVENT XA_PREPARE_LOG_EVENT + PARTIAL_UPDATE_ROWS_EVENT + TRANSACTION_PAYLOAD_EVENT + HEARTBEAT_LOG_EVENT_V2 ) const ( @@ -188,6 +191,12 @@ func (e EventType) String() string { return "ViewChangeEvent" case XA_PREPARE_LOG_EVENT: return "XAPrepareLogEvent" + case PARTIAL_UPDATE_ROWS_EVENT: + return "PartialUpdateRowsEvent" + case TRANSACTION_PAYLOAD_EVENT: + return "TransactionPayloadEvent" + case HEARTBEAT_LOG_EVENT_V2: + return "HeartbeatLogEventV2" default: return "UnknownEvent" diff --git a/replication/parser.go b/replication/parser.go index 329159d26..cb34707e6 100644 --- a/replication/parser.go +++ b/replication/parser.go @@ -295,6 +295,8 @@ func (p *BinlogParser) parseEvent(h *EventHeader, data []byte, rawData []byte) ( e = &PreviousGTIDsEvent{} case INTVAR_EVENT: e = &IntVarEvent{} + case TRANSACTION_PAYLOAD_EVENT: + e = p.newTransactionPayloadEvent() default: e = &GenericEvent{} } @@ -417,3 +419,10 @@ func (p *BinlogParser) newRowsEvent(h *EventHeader) *RowsEvent { return e } + +func (p *BinlogParser) newTransactionPayloadEvent() *TransactionPayloadEvent { + e := &TransactionPayloadEvent{} + e.format = *p.format + + return e +} diff --git a/replication/transaction_payload_event.go b/replication/transaction_payload_event.go new file mode 100644 index 000000000..8b69a1441 --- /dev/null +++ b/replication/transaction_payload_event.go @@ -0,0 +1,149 @@ +package replication + +import ( + "encoding/binary" + "encoding/hex" + "fmt" + "io" + + "github.com/DataDog/zstd" + + . "github.com/go-mysql-org/go-mysql/mysql" +) + +// On The Wire: Field Types +// See also binary_log::codecs::binary::Transaction_payload::fields in MySQL +// https://dev.mysql.com/doc/dev/mysql-server/latest/classbinary__log_1_1codecs_1_1binary_1_1Transaction__payload.html#a9fff7ac12ba064f40e9216565c53d07b +const ( + OTW_PAYLOAD_HEADER_END_MARK = iota + OTW_PAYLOAD_SIZE_FIELD + OTW_PAYLOAD_COMPRESSION_TYPE_FIELD + OTW_PAYLOAD_UNCOMPRESSED_SIZE_FIELD +) + +// Compression Types +const ( + ZSTD = 0 + NONE = 255 +) + +type TransactionPayloadEvent struct { + format FormatDescriptionEvent + Size uint64 + UncompressedSize uint64 + CompressionType uint64 + Payload []byte + Events []*BinlogEvent +} + +func (e *TransactionPayloadEvent) compressionType() string { + switch e.CompressionType { + case ZSTD: + return "ZSTD" + case NONE: + return "NONE" + default: + return "Unknown" + } +} + +func (e *TransactionPayloadEvent) Dump(w io.Writer) { + fmt.Fprintf(w, "Payload Size: %d\n", e.Size) + fmt.Fprintf(w, "Payload Uncompressed Size: %d\n", e.UncompressedSize) + fmt.Fprintf(w, "Payload CompressionType: %s\n", e.compressionType()) + fmt.Fprintf(w, "Payload Body: \n%s", hex.Dump(e.Payload)) + fmt.Fprintln(w, "=== Start of events decoded from compressed payload ===") + for _, event := range e.Events { + event.Dump(w) + } + fmt.Fprintln(w, "=== End of events decoded from compressed payload ===") + fmt.Fprintln(w) +} + +func (e *TransactionPayloadEvent) Decode(data []byte) error { + err := e.decodeFields(data) + if err != nil { + return err + } + return e.decodePayload() +} + +func (e *TransactionPayloadEvent) decodeFields(data []byte) error { + offset := uint64(0) + + for { + fieldType := FixedLengthInt(data[offset : offset+1]) + offset++ + + if fieldType == OTW_PAYLOAD_HEADER_END_MARK { + e.Payload = data[offset:] + break + } else { + fieldLength := FixedLengthInt(data[offset : offset+1]) + offset++ + + switch fieldType { + case OTW_PAYLOAD_SIZE_FIELD: + e.Size = FixedLengthInt(data[offset : offset+fieldLength]) + case OTW_PAYLOAD_COMPRESSION_TYPE_FIELD: + e.CompressionType = FixedLengthInt(data[offset : offset+fieldLength]) + case OTW_PAYLOAD_UNCOMPRESSED_SIZE_FIELD: + e.UncompressedSize = FixedLengthInt(data[offset : offset+fieldLength]) + } + + offset += fieldLength + } + } + + return nil +} + +func (e *TransactionPayloadEvent) decodePayload() error { + if e.CompressionType != ZSTD { + return fmt.Errorf("TransactionPayloadEvent has compression type %d (%s)", + e.CompressionType, e.compressionType()) + } + + payloadUncompressed, err := zstd.Decompress(nil, e.Payload) + if err != nil { + return err + } + + // The uncompressed data needs to be split up into individual events for Parse() + // to work on them. We can't use e.parser directly as we need to disable checksums + // but we still need the initialization from the FormatDescriptionEvent. We can't + // modify e.parser as it is used elsewhere. + parser := NewBinlogParser() + parser.format = &FormatDescriptionEvent{ + Version: e.format.Version, + ServerVersion: e.format.ServerVersion, + CreateTimestamp: e.format.CreateTimestamp, + EventHeaderLength: e.format.EventHeaderLength, + EventTypeHeaderLengths: e.format.EventTypeHeaderLengths, + ChecksumAlgorithm: BINLOG_CHECKSUM_ALG_OFF, + } + + offset := uint32(0) + for { + payloadUncompressedLength := uint32(len(payloadUncompressed)) + if offset+13 > payloadUncompressedLength { + break + } + eventLength := binary.LittleEndian.Uint32(payloadUncompressed[offset+9 : offset+13]) + if offset+eventLength > payloadUncompressedLength { + return fmt.Errorf("Event length of %d with offset %d in uncompressed payload exceeds payload length of %d", + eventLength, offset, payloadUncompressedLength) + } + data := payloadUncompressed[offset : offset+eventLength] + + pe, err := parser.Parse(data) + if err != nil { + return err + } + e.Events = append(e.Events, pe) + + offset += eventLength + } + + return nil +}