Skip to content

Commit e320144

Browse files
authored
Merge pull request #773 from dveeden/compressed_binlog_events
Decoding of compressed binlog events
2 parents 775579c + f60b982 commit e320144

File tree

5 files changed

+170
-0
lines changed

5 files changed

+170
-0
lines changed

go.mod

+1
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ go 1.16
44

55
require (
66
github.com/BurntSushi/toml v0.3.1
7+
github.com/DataDog/zstd v1.5.2
78
github.com/go-sql-driver/mysql v1.6.0
89
github.com/google/uuid v1.3.0
910
github.com/jmoiron/sqlx v1.3.3

go.sum

+2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
22
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
3+
github.com/DataDog/zstd v1.5.2 h1:vUG4lAyuPCXO0TLbXvPv7EB7cNK1QV/luu55UHLrrn8=
4+
github.com/DataDog/zstd v1.5.2/go.mod h1:g4AWEaM3yOg3HYfnJ3YIawPnVdXJh9QME85blwSAmyw=
35
github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8=
46
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
57
github.com/cznic/mathutil v0.0.0-20181122101859-297441e03548/go.mod h1:e6NPNENfs9mPDVNRekM7lKScauxd5kXTr1Mfyig6TDM=

replication/const.go

+9
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,9 @@ const (
9090
TRANSACTION_CONTEXT_EVENT
9191
VIEW_CHANGE_EVENT
9292
XA_PREPARE_LOG_EVENT
93+
PARTIAL_UPDATE_ROWS_EVENT
94+
TRANSACTION_PAYLOAD_EVENT
95+
HEARTBEAT_LOG_EVENT_V2
9396
)
9497

9598
const (
@@ -188,6 +191,12 @@ func (e EventType) String() string {
188191
return "ViewChangeEvent"
189192
case XA_PREPARE_LOG_EVENT:
190193
return "XAPrepareLogEvent"
194+
case PARTIAL_UPDATE_ROWS_EVENT:
195+
return "PartialUpdateRowsEvent"
196+
case TRANSACTION_PAYLOAD_EVENT:
197+
return "TransactionPayloadEvent"
198+
case HEARTBEAT_LOG_EVENT_V2:
199+
return "HeartbeatLogEventV2"
191200

192201
default:
193202
return "UnknownEvent"

replication/parser.go

+9
Original file line numberDiff line numberDiff line change
@@ -295,6 +295,8 @@ func (p *BinlogParser) parseEvent(h *EventHeader, data []byte, rawData []byte) (
295295
e = &PreviousGTIDsEvent{}
296296
case INTVAR_EVENT:
297297
e = &IntVarEvent{}
298+
case TRANSACTION_PAYLOAD_EVENT:
299+
e = p.newTransactionPayloadEvent()
298300
default:
299301
e = &GenericEvent{}
300302
}
@@ -417,3 +419,10 @@ func (p *BinlogParser) newRowsEvent(h *EventHeader) *RowsEvent {
417419

418420
return e
419421
}
422+
423+
func (p *BinlogParser) newTransactionPayloadEvent() *TransactionPayloadEvent {
424+
e := &TransactionPayloadEvent{}
425+
e.format = *p.format
426+
427+
return e
428+
}
+149
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
package replication
2+
3+
import (
4+
"encoding/binary"
5+
"encoding/hex"
6+
"fmt"
7+
"io"
8+
9+
"github.com/DataDog/zstd"
10+
11+
. "github.com/go-mysql-org/go-mysql/mysql"
12+
)
13+
14+
// On The Wire: Field Types
15+
// See also binary_log::codecs::binary::Transaction_payload::fields in MySQL
16+
// https://dev.mysql.com/doc/dev/mysql-server/latest/classbinary__log_1_1codecs_1_1binary_1_1Transaction__payload.html#a9fff7ac12ba064f40e9216565c53d07b
17+
const (
18+
OTW_PAYLOAD_HEADER_END_MARK = iota
19+
OTW_PAYLOAD_SIZE_FIELD
20+
OTW_PAYLOAD_COMPRESSION_TYPE_FIELD
21+
OTW_PAYLOAD_UNCOMPRESSED_SIZE_FIELD
22+
)
23+
24+
// Compression Types
25+
const (
26+
ZSTD = 0
27+
NONE = 255
28+
)
29+
30+
type TransactionPayloadEvent struct {
31+
format FormatDescriptionEvent
32+
Size uint64
33+
UncompressedSize uint64
34+
CompressionType uint64
35+
Payload []byte
36+
Events []*BinlogEvent
37+
}
38+
39+
func (e *TransactionPayloadEvent) compressionType() string {
40+
switch e.CompressionType {
41+
case ZSTD:
42+
return "ZSTD"
43+
case NONE:
44+
return "NONE"
45+
default:
46+
return "Unknown"
47+
}
48+
}
49+
50+
func (e *TransactionPayloadEvent) Dump(w io.Writer) {
51+
fmt.Fprintf(w, "Payload Size: %d\n", e.Size)
52+
fmt.Fprintf(w, "Payload Uncompressed Size: %d\n", e.UncompressedSize)
53+
fmt.Fprintf(w, "Payload CompressionType: %s\n", e.compressionType())
54+
fmt.Fprintf(w, "Payload Body: \n%s", hex.Dump(e.Payload))
55+
fmt.Fprintln(w, "=== Start of events decoded from compressed payload ===")
56+
for _, event := range e.Events {
57+
event.Dump(w)
58+
}
59+
fmt.Fprintln(w, "=== End of events decoded from compressed payload ===")
60+
fmt.Fprintln(w)
61+
}
62+
63+
func (e *TransactionPayloadEvent) Decode(data []byte) error {
64+
err := e.decodeFields(data)
65+
if err != nil {
66+
return err
67+
}
68+
return e.decodePayload()
69+
}
70+
71+
func (e *TransactionPayloadEvent) decodeFields(data []byte) error {
72+
offset := uint64(0)
73+
74+
for {
75+
fieldType := FixedLengthInt(data[offset : offset+1])
76+
offset++
77+
78+
if fieldType == OTW_PAYLOAD_HEADER_END_MARK {
79+
e.Payload = data[offset:]
80+
break
81+
} else {
82+
fieldLength := FixedLengthInt(data[offset : offset+1])
83+
offset++
84+
85+
switch fieldType {
86+
case OTW_PAYLOAD_SIZE_FIELD:
87+
e.Size = FixedLengthInt(data[offset : offset+fieldLength])
88+
case OTW_PAYLOAD_COMPRESSION_TYPE_FIELD:
89+
e.CompressionType = FixedLengthInt(data[offset : offset+fieldLength])
90+
case OTW_PAYLOAD_UNCOMPRESSED_SIZE_FIELD:
91+
e.UncompressedSize = FixedLengthInt(data[offset : offset+fieldLength])
92+
}
93+
94+
offset += fieldLength
95+
}
96+
}
97+
98+
return nil
99+
}
100+
101+
func (e *TransactionPayloadEvent) decodePayload() error {
102+
if e.CompressionType != ZSTD {
103+
return fmt.Errorf("TransactionPayloadEvent has compression type %d (%s)",
104+
e.CompressionType, e.compressionType())
105+
}
106+
107+
payloadUncompressed, err := zstd.Decompress(nil, e.Payload)
108+
if err != nil {
109+
return err
110+
}
111+
112+
// The uncompressed data needs to be split up into individual events for Parse()
113+
// to work on them. We can't use e.parser directly as we need to disable checksums
114+
// but we still need the initialization from the FormatDescriptionEvent. We can't
115+
// modify e.parser as it is used elsewhere.
116+
parser := NewBinlogParser()
117+
parser.format = &FormatDescriptionEvent{
118+
Version: e.format.Version,
119+
ServerVersion: e.format.ServerVersion,
120+
CreateTimestamp: e.format.CreateTimestamp,
121+
EventHeaderLength: e.format.EventHeaderLength,
122+
EventTypeHeaderLengths: e.format.EventTypeHeaderLengths,
123+
ChecksumAlgorithm: BINLOG_CHECKSUM_ALG_OFF,
124+
}
125+
126+
offset := uint32(0)
127+
for {
128+
payloadUncompressedLength := uint32(len(payloadUncompressed))
129+
if offset+13 > payloadUncompressedLength {
130+
break
131+
}
132+
eventLength := binary.LittleEndian.Uint32(payloadUncompressed[offset+9 : offset+13])
133+
if offset+eventLength > payloadUncompressedLength {
134+
return fmt.Errorf("Event length of %d with offset %d in uncompressed payload exceeds payload length of %d",
135+
eventLength, offset, payloadUncompressedLength)
136+
}
137+
data := payloadUncompressed[offset : offset+eventLength]
138+
139+
pe, err := parser.Parse(data)
140+
if err != nil {
141+
return err
142+
}
143+
e.Events = append(e.Events, pe)
144+
145+
offset += eventLength
146+
}
147+
148+
return nil
149+
}

0 commit comments

Comments
 (0)