From 1620956240d4a625157b5a6c0608f011fdced74f Mon Sep 17 00:00:00 2001 From: jolivares Date: Mon, 20 Jan 2025 14:21:33 +0100 Subject: [PATCH] allow to use custom EventDeserializer for TransactionPayloadEvent decompressed events --- .../TransactionPayloadEventDataDeserializer.java | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/TransactionPayloadEventDataDeserializer.java b/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/TransactionPayloadEventDataDeserializer.java index a8e84876..19d097fc 100644 --- a/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/TransactionPayloadEventDataDeserializer.java +++ b/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/TransactionPayloadEventDataDeserializer.java @@ -23,6 +23,8 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Objects; +import java.util.function.Supplier; /** * @author Somesh Malviya @@ -34,6 +36,16 @@ public class TransactionPayloadEventDataDeserializer implements EventDataDeseria public static final int OTW_PAYLOAD_COMPRESSION_TYPE_FIELD = 2; public static final int OTW_PAYLOAD_UNCOMPRESSED_SIZE_FIELD = 3; + private final Supplier transactionPayloadEventDeserializerProvider; + + public TransactionPayloadEventDataDeserializer() { + this(EventDeserializer::new); + } + + public TransactionPayloadEventDataDeserializer(Supplier transactionPayloadEventDeserializerProvider) { + this.transactionPayloadEventDeserializerProvider = Objects.requireNonNull(transactionPayloadEventDeserializerProvider); + } + @Override public TransactionPayloadEventData deserialize(ByteArrayInputStream inputStream) throws IOException { TransactionPayloadEventData eventData = new TransactionPayloadEventData(); @@ -86,7 +98,7 @@ public TransactionPayloadEventData deserialize(ByteArrayInputStream inputStream) // Read and store events from decompressed byte array into input stream ArrayList decompressedEvents = new ArrayList<>(); - EventDeserializer transactionPayloadEventDeserializer = new EventDeserializer(); + EventDeserializer transactionPayloadEventDeserializer = transactionPayloadEventDeserializerProvider.get(); ByteArrayInputStream destinationInputStream = new ByteArrayInputStream(dst); Event internalEvent = transactionPayloadEventDeserializer.nextEvent(destinationInputStream);