1
1
package replication
2
2
3
3
import (
4
+ "encoding/binary"
4
5
"encoding/hex"
5
6
"fmt"
6
7
"io"
@@ -40,6 +41,7 @@ func fieldTypeName(ft uint64) string {
40
41
}
41
42
42
43
type TransactionPayloadEvent struct {
44
+ parser * BinlogParser
43
45
Data []byte
44
46
Size uint64
45
47
UncompressedSize uint64
@@ -63,11 +65,34 @@ func (e *TransactionPayloadEvent) Dump(w io.Writer) {
63
65
fmt .Fprintf (w , "Payload Uncompressed Size: %d\n " , e .UncompressedSize )
64
66
fmt .Fprintf (w , "Payload CompressionType: %s\n " , e .compressionType ())
65
67
fmt .Fprintf (w , "Payload Body: \n %s" , hex .Dump (e .Payload ))
66
- // fmt.Fprintf(w, "Transaction Payload Event data: \n%s", hex.Dump(e.Data))
67
68
68
69
decoder , _ := zstd .NewReader (nil , zstd .WithDecoderConcurrency (0 ))
69
70
payloadUncompressed , _ := decoder .DecodeAll (e .Payload , nil )
70
71
fmt .Fprintf (w , "Decompressed: \n %s" , hex .Dump (payloadUncompressed ))
72
+
73
+ // The uncompressed data needs to be split up into individual events for Parsse()
74
+ // to work on them. We can't use a NewBinlogParser() as we need the initialization
75
+ // from the FormatDescriptionEvent. And we need to disable the binlog checksum
76
+ // algorithm as otherwise the XidEvent's get truncated and fail to parse.
77
+ offset := uint32 (0 )
78
+ for {
79
+ if offset >= uint32 (len (payloadUncompressed )) {
80
+ break
81
+ }
82
+ eventLength := binary .LittleEndian .Uint32 (payloadUncompressed [offset + 9 : offset + 13 ])
83
+ data := payloadUncompressed [offset : offset + eventLength ]
84
+
85
+ e .parser .format .ChecksumAlgorithm = BINLOG_CHECKSUM_ALG_OFF
86
+ pe , err := e .parser .Parse (data )
87
+ if err != nil {
88
+ fmt .Fprintf (w , "Failed to parse payload: %s\n " , err )
89
+ } else {
90
+ pe .Dump (w )
91
+ }
92
+
93
+ offset += eventLength
94
+ }
95
+
71
96
fmt .Fprintln (w )
72
97
}
73
98
0 commit comments