Skip to content

Commit e10c715

Browse files
committed
Decode the uncompressed payload from TransactionPayloadEvent
1 parent 29ab7e7 commit e10c715

File tree

2 files changed

+35
-2
lines changed

2 files changed

+35
-2
lines changed

replication/parser.go

+8-1
Original file line numberDiff line numberDiff line change
@@ -296,7 +296,7 @@ func (p *BinlogParser) parseEvent(h *EventHeader, data []byte, rawData []byte) (
296296
case INTVAR_EVENT:
297297
e = &IntVarEvent{}
298298
case TRANSACTION_PAYLOAD_EVENT:
299-
e = &TransactionPayloadEvent{}
299+
e = p.newTransactionPayloadEvent()
300300
default:
301301
e = &GenericEvent{}
302302
}
@@ -419,3 +419,10 @@ func (p *BinlogParser) newRowsEvent(h *EventHeader) *RowsEvent {
419419

420420
return e
421421
}
422+
423+
func (p *BinlogParser) newTransactionPayloadEvent() *TransactionPayloadEvent {
424+
e := &TransactionPayloadEvent{}
425+
e.parser = p
426+
427+
return e
428+
}

replication/transaction_payload_event.go

+27-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package replication
22

33
import (
4+
"encoding/binary"
45
"encoding/hex"
56
"fmt"
67
"io"
@@ -40,6 +41,7 @@ func fieldTypeName(ft uint64) string {
4041
}
4142

4243
type TransactionPayloadEvent struct {
44+
parser *BinlogParser
4345
Data []byte
4446
Size uint64
4547
UncompressedSize uint64
@@ -63,11 +65,35 @@ func (e *TransactionPayloadEvent) Dump(w io.Writer) {
6365
fmt.Fprintf(w, "Payload Uncompressed Size: %d\n", e.UncompressedSize)
6466
fmt.Fprintf(w, "Payload CompressionType: %s\n", e.compressionType())
6567
fmt.Fprintf(w, "Payload Body: \n%s", hex.Dump(e.Payload))
66-
// fmt.Fprintf(w, "Transaction Payload Event data: \n%s", hex.Dump(e.Data))
6768

6869
decoder, _ := zstd.NewReader(nil, zstd.WithDecoderConcurrency(0))
6970
payloadUncompressed, _ := decoder.DecodeAll(e.Payload, nil)
7071
fmt.Fprintf(w, "Decompressed: \n%s", hex.Dump(payloadUncompressed))
72+
73+
// The uncompressed data needs to be split up into individual events for Parsse()
74+
// to work on them. We can't use a NewBinlogParser() as we need the initialization
75+
// from the FormatDescriptionEvent. And we need to disable the binlog checksum
76+
// algorithm as otherwise the XidEvent's get truncated and fail to parse.
77+
offset := uint32(0)
78+
for {
79+
if offset >= uint32(len(payloadUncompressed)) {
80+
break
81+
}
82+
eventLength := binary.LittleEndian.Uint32(payloadUncompressed[offset+9 : offset+13])
83+
data := payloadUncompressed[offset : offset+eventLength]
84+
fmt.Fprintf(w, "Event Data: \n%s", hex.Dump(data))
85+
86+
e.parser.format.ChecksumAlgorithm = BINLOG_CHECKSUM_ALG_OFF
87+
pe, err := e.parser.Parse(data)
88+
if err != nil {
89+
fmt.Fprintf(w, "Failed to parse payload: %s\n", err)
90+
} else {
91+
pe.Dump(w)
92+
}
93+
94+
offset += eventLength
95+
}
96+
7197
fmt.Fprintln(w)
7298
}
7399

0 commit comments

Comments
 (0)