Skip to content

Commit 7dc9263

Browse files
committed
to osheroff#127 Supported customizeEventDeserializer
1 parent 9d715a5 commit 7dc9263

File tree

3 files changed

+106
-5
lines changed

3 files changed

+106
-5
lines changed

src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/EventDeserializer.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,25 @@ private void registerDefaultEventDataDeserializers() {
130130
eventDataDeserializers.put(EventType.MARIADB_GTID_LIST,
131131
new MariadbGtidListEventDataDeserializer());
132132
eventDataDeserializers.put(EventType.TRANSACTION_PAYLOAD,
133-
new TransactionPayloadEventDataDeserializer());
133+
new TransactionPayloadEventDataDeserializer().customizeEventDeserializerSupplier(new TransactionPayloadEventDataDeserializer.Supplier<EventDeserializer>() {
134+
@Override
135+
public EventDeserializer get() {
136+
EventDeserializer eventDeserializer = new EventDeserializer(
137+
eventHeaderDeserializer,
138+
defaultEventDataDeserializer,
139+
eventDataDeserializers,
140+
tableMapEventByTableId
141+
);
142+
143+
if (!compatibilitySet.isEmpty()) {
144+
CompatibilityMode[] compatibilityModeSettings = new CompatibilityMode[compatibilitySet.size()];
145+
compatibilitySet.toArray(compatibilityModeSettings);
146+
eventDeserializer.setCompatibilityMode(compatibilityModeSettings[0], compatibilityModeSettings);
147+
}
148+
149+
return eventDeserializer;
150+
}
151+
}));
134152
}
135153

136154
public void setEventDataDeserializer(EventType eventType, EventDataDeserializer eventDataDeserializer) {

src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/TransactionPayloadEventDataDeserializer.java

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,18 @@ public class TransactionPayloadEventDataDeserializer implements EventDataDeseria
3434
public static final int OTW_PAYLOAD_COMPRESSION_TYPE_FIELD = 2;
3535
public static final int OTW_PAYLOAD_UNCOMPRESSED_SIZE_FIELD = 3;
3636

37+
private Supplier<EventDeserializer> eventDeserializerSupplier = new Supplier<EventDeserializer>() {
38+
@Override
39+
public EventDeserializer get() {
40+
return new EventDeserializer();
41+
}
42+
};
43+
44+
public TransactionPayloadEventDataDeserializer customizeEventDeserializerSupplier(Supplier<EventDeserializer> supplier) {
45+
this.eventDeserializerSupplier = supplier;
46+
return this;
47+
}
48+
3749
@Override
3850
public TransactionPayloadEventData deserialize(ByteArrayInputStream inputStream) throws IOException {
3951
TransactionPayloadEventData eventData = new TransactionPayloadEventData();
@@ -86,7 +98,7 @@ public TransactionPayloadEventData deserialize(ByteArrayInputStream inputStream)
8698

8799
// Read and store events from decompressed byte array into input stream
88100
ArrayList<Event> decompressedEvents = new ArrayList<>();
89-
EventDeserializer transactionPayloadEventDeserializer = new EventDeserializer();
101+
EventDeserializer transactionPayloadEventDeserializer = obtainEventDeserializer();
90102
ByteArrayInputStream destinationInputStream = new ByteArrayInputStream(dst);
91103

92104
Event internalEvent = transactionPayloadEventDeserializer.nextEvent(destinationInputStream);
@@ -99,4 +111,15 @@ public TransactionPayloadEventData deserialize(ByteArrayInputStream inputStream)
99111

100112
return eventData;
101113
}
114+
115+
protected EventDeserializer obtainEventDeserializer() {
116+
return eventDeserializerSupplier.get();
117+
}
118+
119+
public interface Supplier<V> {
120+
121+
V get();
122+
123+
}
124+
102125
}

src/test/java/com/github/shyiko/mysql/binlog/event/deserialization/TransactionPayloadEventDataDeserializerTest.java

Lines changed: 63 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,15 @@
1515
*/
1616
package com.github.shyiko.mysql.binlog.event.deserialization;
1717

18-
import com.github.shyiko.mysql.binlog.event.EventType;
19-
import com.github.shyiko.mysql.binlog.event.TransactionPayloadEventData;
20-
import com.github.shyiko.mysql.binlog.event.XAPrepareEventData;
18+
import com.github.shyiko.mysql.binlog.event.*;
2119
import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream;
2220
import org.testng.annotations.Test;
2321

2422
import java.io.IOException;
23+
import java.io.Serializable;
2524

2625
import static org.testng.Assert.assertEquals;
26+
import static org.testng.Assert.assertTrue;
2727

2828
/**
2929
* @author <a href="mailto:[email protected]">Somesh Malviya</a>
@@ -82,6 +82,8 @@ public class TransactionPayloadEventDataDeserializerTest {
8282
.append("]}")
8383
.toString();
8484

85+
private static final byte[] UNCOMPRESSED_UPDATE_EVENT_BEFORE_ROW_0_BYTE_ARRAY = new byte[] {1, 0, 0, 0};
86+
8587
@Test
8688
public void deserialize() throws IOException {
8789
TransactionPayloadEventDataDeserializer deserializer = new TransactionPayloadEventDataDeserializer();
@@ -97,4 +99,62 @@ public void deserialize() throws IOException {
9799
assertEquals(EventType.XID, transactionPayloadEventData.getUncompressedEvents().get(3).getHeader().getEventType());
98100
assertEquals(UNCOMPRESSED_UPDATE_EVENT, transactionPayloadEventData.getUncompressedEvents().get(2).getData().toString());
99101
}
102+
103+
@Test
104+
public void deserializeUsingEventDeserializer() throws IOException {
105+
106+
ByteArrayInputStream dataStream = new ByteArrayInputStream(DATA);
107+
108+
// Mock create target TransactionPayloadEventData DATA event header
109+
final EventHeaderV4 eventHeader = new EventHeaderV4();
110+
eventHeader.setEventType(EventType.TRANSACTION_PAYLOAD);
111+
eventHeader.setEventLength(DATA.length + 19L);
112+
eventHeader.setTimestamp(1646406641000L);
113+
eventHeader.setServerId(223344);
114+
115+
116+
EventHeaderDeserializer eventHeaderDeserializer = new EventHeaderDeserializer() {
117+
118+
private long count = 0L;
119+
120+
private EventHeaderDeserializer defaultEventHeaderDeserializer = new EventHeaderV4Deserializer();
121+
122+
@Override
123+
public EventHeader deserialize(ByteArrayInputStream inputStream) throws IOException {
124+
if (count > 0) {
125+
// uncompressed event header deserialize
126+
return defaultEventHeaderDeserializer.deserialize(inputStream);
127+
}
128+
count++;
129+
// we need to return target TransactionPayloadEventData DATA event header we had mocked
130+
return eventHeader;
131+
}
132+
};
133+
134+
EventDeserializer eventDeserializer = new EventDeserializer(eventHeaderDeserializer, new NullEventDataDeserializer());
135+
eventDeserializer.setCompatibilityMode(EventDeserializer.CompatibilityMode.INTEGER_AS_BYTE_ARRAY);
136+
137+
Event event = eventDeserializer.nextEvent(dataStream);
138+
139+
assertTrue(event.getHeader().getEventType() == EventType.TRANSACTION_PAYLOAD);
140+
assertTrue(event.getData() instanceof TransactionPayloadEventData);
141+
142+
TransactionPayloadEventData transactionPayloadEventData = event.getData();
143+
assertEquals(COMPRESSION_TYPE, transactionPayloadEventData.getCompressionType());
144+
assertEquals(PAYLOAD_SIZE, transactionPayloadEventData.getPayloadSize());
145+
assertEquals(UNCOMPRESSED_SIZE, transactionPayloadEventData.getUncompressedSize());
146+
assertEquals(NUMBER_OF_UNCOMPRESSED_EVENTS, transactionPayloadEventData.getUncompressedEvents().size());
147+
assertEquals(EventType.QUERY, transactionPayloadEventData.getUncompressedEvents().get(0).getHeader().getEventType());
148+
assertEquals(EventType.TABLE_MAP, transactionPayloadEventData.getUncompressedEvents().get(1).getHeader().getEventType());
149+
assertEquals(EventType.EXT_UPDATE_ROWS, transactionPayloadEventData.getUncompressedEvents().get(2).getHeader().getEventType());
150+
assertEquals(EventType.XID, transactionPayloadEventData.getUncompressedEvents().get(3).getHeader().getEventType());
151+
// assertEquals(UNCOMPRESSED_UPDATE_EVENT, transactionPayloadEventData.getUncompressedEvents().get(2).getData().toString());
152+
assertTrue(transactionPayloadEventData.getUncompressedEvents().get(2).getData() instanceof UpdateRowsEventData);
153+
154+
UpdateRowsEventData updateRowsEventData = transactionPayloadEventData.getUncompressedEvents().get(2).getData();
155+
assertEquals(1, updateRowsEventData.getRows().size());
156+
Serializable[] updateBefore = updateRowsEventData.getRows().get(0).getKey();
157+
assertEquals(UNCOMPRESSED_UPDATE_EVENT_BEFORE_ROW_0_BYTE_ARRAY, updateBefore[0]);
158+
}
159+
100160
}

0 commit comments

Comments
 (0)