Skip to content

Commit 29ab7e7

Browse files
committed
Decoding of compressed binlog events
1 parent d525053 commit 29ab7e7

File tree

5 files changed

+115
-0
lines changed

5 files changed

+115
-0
lines changed

go.mod

+1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ require (
77
github.com/go-sql-driver/mysql v1.6.0
88
github.com/google/uuid v1.3.0
99
github.com/jmoiron/sqlx v1.3.3
10+
github.com/klauspost/compress v1.15.15 // indirect
1011
github.com/pingcap/check v0.0.0-20190102082844-67f458068fc8
1112
github.com/pingcap/errors v0.11.5-0.20210425183316-da1aaba5fb63
1213
github.com/pingcap/tidb/parser v0.0.0-20221126021158-6b02a5d8ba7d

go.sum

+2
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
1515
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
1616
github.com/jmoiron/sqlx v1.3.3 h1:j82X0bf7oQ27XeqxicSZsTU5suPwKElg3oyxNn43iTk=
1717
github.com/jmoiron/sqlx v1.3.3/go.mod h1:2BljVx/86SuTyjE+aPYlHCTNvZrnJXghYGpNiXLBMCQ=
18+
github.com/klauspost/compress v1.15.15 h1:EF27CXIuDsYJ6mmvtBRlEuB2UVOqHG1tAXgZ7yIO+lw=
19+
github.com/klauspost/compress v1.15.15/go.mod h1:ZcK2JAFqKOpnBlxcLsJzYfrS9X1akm9fHZNnD9+Vo/4=
1820
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
1921
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
2022
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=

replication/const.go

+7
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,9 @@ const (
9090
TRANSACTION_CONTEXT_EVENT
9191
VIEW_CHANGE_EVENT
9292
XA_PREPARE_LOG_EVENT
93+
PARTIAL_UPDATE_ROWS_EVENT
94+
TRANSACTION_PAYLOAD_EVENT
95+
HEARTBEAT_LOG_EVENT_V2
9396
)
9497

9598
const (
@@ -188,6 +191,10 @@ func (e EventType) String() string {
188191
return "ViewChangeEvent"
189192
case XA_PREPARE_LOG_EVENT:
190193
return "XAPrepareLogEvent"
194+
case PARTIAL_UPDATE_ROWS_EVENT:
195+
return "PartialUpdateRowsEvent"
196+
case TRANSACTION_PAYLOAD_EVENT:
197+
return "TransactionPlayloadEvent"
191198

192199
default:
193200
return "UnknownEvent"

replication/parser.go

+2
Original file line numberDiff line numberDiff line change
@@ -295,6 +295,8 @@ func (p *BinlogParser) parseEvent(h *EventHeader, data []byte, rawData []byte) (
295295
e = &PreviousGTIDsEvent{}
296296
case INTVAR_EVENT:
297297
e = &IntVarEvent{}
298+
case TRANSACTION_PAYLOAD_EVENT:
299+
e = &TransactionPayloadEvent{}
298300
default:
299301
e = &GenericEvent{}
300302
}
+103
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
package replication
2+
3+
import (
4+
"encoding/hex"
5+
"fmt"
6+
"io"
7+
8+
"github.com/klauspost/compress/zstd"
9+
10+
. "github.com/go-mysql-org/go-mysql/mysql"
11+
)
12+
13+
// On The Wire: Field Types
14+
const (
15+
OTW_PAYLOAD_HEADER_END_MARK = iota
16+
OTW_PAYLOAD_SIZE_FIELD
17+
OTW_PAYLOAD_COMPRESSION_TYPE_FIELD
18+
OTW_PAYLOAD_UNCOMPRESSED_SIZE_FIELD
19+
)
20+
21+
// Compression Types
22+
const (
23+
ZSTD = 0
24+
NONE = 255
25+
)
26+
27+
func fieldTypeName(ft uint64) string {
28+
switch ft {
29+
case OTW_PAYLOAD_HEADER_END_MARK:
30+
return "HeaderEndMark"
31+
case OTW_PAYLOAD_SIZE_FIELD:
32+
return "SizeField"
33+
case OTW_PAYLOAD_COMPRESSION_TYPE_FIELD:
34+
return "CompressionType"
35+
case OTW_PAYLOAD_UNCOMPRESSED_SIZE_FIELD:
36+
return "UncompressedSize"
37+
default:
38+
return "Unknown"
39+
}
40+
}
41+
42+
type TransactionPayloadEvent struct {
43+
Data []byte
44+
Size uint64
45+
UncompressedSize uint64
46+
CompressionType uint64
47+
Payload []byte
48+
}
49+
50+
func (e *TransactionPayloadEvent) compressionType() string {
51+
switch e.CompressionType {
52+
case ZSTD:
53+
return "ZSTD"
54+
case NONE:
55+
return "NONE"
56+
default:
57+
return "Unknown"
58+
}
59+
}
60+
61+
func (e *TransactionPayloadEvent) Dump(w io.Writer) {
62+
fmt.Fprintf(w, "Payload Size: %d\n", e.Size)
63+
fmt.Fprintf(w, "Payload Uncompressed Size: %d\n", e.UncompressedSize)
64+
fmt.Fprintf(w, "Payload CompressionType: %s\n", e.compressionType())
65+
fmt.Fprintf(w, "Payload Body: \n%s", hex.Dump(e.Payload))
66+
// fmt.Fprintf(w, "Transaction Payload Event data: \n%s", hex.Dump(e.Data))
67+
68+
decoder, _ := zstd.NewReader(nil, zstd.WithDecoderConcurrency(0))
69+
payloadUncompressed, _ := decoder.DecodeAll(e.Payload, nil)
70+
fmt.Fprintf(w, "Decompressed: \n%s", hex.Dump(payloadUncompressed))
71+
fmt.Fprintln(w)
72+
}
73+
74+
func (e *TransactionPayloadEvent) Decode(data []byte) error {
75+
e.Data = data
76+
offset := uint64(0)
77+
78+
for {
79+
fieldType := FixedLengthInt(data[offset : offset+1])
80+
offset++
81+
82+
if fieldType == OTW_PAYLOAD_HEADER_END_MARK {
83+
e.Payload = data[offset:]
84+
break
85+
} else {
86+
fieldLength := FixedLengthInt(data[offset : offset+1])
87+
offset++
88+
89+
switch fieldType {
90+
case OTW_PAYLOAD_SIZE_FIELD:
91+
e.Size = FixedLengthInt(data[offset : offset+fieldLength])
92+
case OTW_PAYLOAD_COMPRESSION_TYPE_FIELD:
93+
e.CompressionType = FixedLengthInt(data[offset : offset+fieldLength])
94+
case OTW_PAYLOAD_UNCOMPRESSED_SIZE_FIELD:
95+
e.UncompressedSize = FixedLengthInt(data[offset : offset+fieldLength])
96+
}
97+
98+
offset += fieldLength
99+
}
100+
}
101+
102+
return nil
103+
}

0 commit comments

Comments
 (0)