Skip to content

Commit 84fd438

Browse files
author
jasonz
committed
add support for BeginLoadQueryEvent and ExecuteLoadQueryEvent, update tests
1 parent 3a5d042 commit 84fd438

File tree

4 files changed

+103
-5
lines changed

4 files changed

+103
-5
lines changed

pymysqlreplication/binlogstream.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,13 @@
88
from pymysql.util import int2byte
99

1010
from .packet import BinLogPacketWrapper
11-
from .constants.BINLOG import TABLE_MAP_EVENT, ROTATE_EVENT, STOP_EVENT
11+
from .constants.BINLOG import TABLE_MAP_EVENT, ROTATE_EVENT
1212
from .gtid import GtidSet
1313
from .event import (
1414
QueryEvent, RotateEvent, FormatDescriptionEvent,
15-
XidEvent, GtidEvent, StopEvent, NotImplementedEvent)
15+
XidEvent, GtidEvent, StopEvent,
16+
BeginLoadQueryEvent, ExecuteLoadQueryEvent,
17+
NotImplementedEvent)
1618
from .row_event import (
1719
UpdateRowsEvent, WriteRowsEvent, DeleteRowsEvent, TableMapEvent)
1820

@@ -297,6 +299,8 @@ def _allowed_event_list(self, only_events, ignored_events,
297299
FormatDescriptionEvent,
298300
XidEvent,
299301
GtidEvent,
302+
BeginLoadQueryEvent,
303+
ExecuteLoadQueryEvent,
300304
UpdateRowsEvent,
301305
WriteRowsEvent,
302306
DeleteRowsEvent,

pymysqlreplication/event.py

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,72 @@ def _dump(self):
147147
print("Query: %s" % (self.query))
148148

149149

150+
class BeginLoadQueryEvent(BinLogEvent):
151+
"""
152+
153+
Attributes:
154+
file_id
155+
block-data
156+
"""
157+
def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs):
158+
super(BeginLoadQueryEvent, self).__init__(from_packet, event_size, table_map,
159+
ctl_connection, **kwargs)
160+
161+
# Payload
162+
self.file_id = self.packet.read_uint32()
163+
self.block_data = self.packet.read(event_size - 4)
164+
165+
def _dump(self):
166+
super(BeginLoadQueryEvent, self)._dump()
167+
print("File id: %d" % (self.file_id))
168+
print("Block data: %s" % (self.block_data))
169+
170+
171+
class ExecuteLoadQueryEvent(BinLogEvent):
172+
"""
173+
174+
Attributes:
175+
slave_proxy_id
176+
execution_time
177+
schema_length
178+
error_code
179+
status_vars_length
180+
181+
file_id
182+
start_pos
183+
end_pos
184+
dup_handling_flags
185+
"""
186+
def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs):
187+
super(ExecuteLoadQueryEvent, self).__init__(from_packet, event_size, table_map,
188+
ctl_connection, **kwargs)
189+
190+
# Post-header
191+
self.slave_proxy_id = self.packet.read_uint32()
192+
self.execution_time = self.packet.read_uint32()
193+
self.schema_length = self.packet.read_uint8()
194+
self.error_code = self.packet.read_uint16()
195+
self.status_vars_length = self.packet.read_uint16()
196+
197+
# Payload
198+
self.file_id = self.packet.read_uint32()
199+
self.start_pos = self.packet.read_uint32()
200+
self.end_pos = self.packet.read_uint32()
201+
self.dup_handling_flags = self.packet.read_uint8()
202+
203+
def _dump(self):
204+
super(ExecuteLoadQueryEvent, self)._dump()
205+
print("Slave proxy id: %d" % (self.slave_proxy_id))
206+
print("Execution time: %d" % (self.execution_time))
207+
print("Schema length: %d" % (self.schema_length))
208+
print("Error code: %d" % (self.error_code))
209+
print("Status vars length: %d" % (self.status_vars_length))
210+
print("File id: %d" % (self.file_id))
211+
print("Start pos: %d" % (self.start_pos))
212+
print("End pos: %d" % (self.end_pos))
213+
print("Dup handling flags: %d" % (self.dup_handling_flags))
214+
215+
150216
class NotImplementedEvent(BinLogEvent):
151217
def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs):
152218
super(NotImplementedEvent, self).__init__(

pymysqlreplication/packet.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ class BinLogPacketWrapper(object):
3434
constants.INTVAR_EVENT: event.NotImplementedEvent,
3535
constants.GTID_LOG_EVENT: event.GtidEvent,
3636
constants.STOP_EVENT: event.StopEvent,
37+
constants.BEGIN_LOAD_QUERY_EVENT: event.BeginLoadQueryEvent,
38+
constants.EXECUTE_LOAD_QUERY_EVENT: event.ExecuteLoadQueryEvent,
3739
# row_event
3840
constants.UPDATE_ROWS_EVENT_V1: row_event.UpdateRowsEvent,
3941
constants.WRITE_ROWS_EVENT_V1: row_event.WriteRowsEvent,

pymysqlreplication/tests/test_basic.py

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,9 @@ def ignoredEvents(self):
1414
return [GtidEvent]
1515

1616
def test_allowed_event_list(self):
17-
self.assertEqual(len(self.stream._allowed_event_list(None, None, False)), 11)
18-
self.assertEqual(len(self.stream._allowed_event_list(None, None, True)), 10)
19-
self.assertEqual(len(self.stream._allowed_event_list(None, [RotateEvent], False)), 10)
17+
self.assertEqual(len(self.stream._allowed_event_list(None, None, False)), 13)
18+
self.assertEqual(len(self.stream._allowed_event_list(None, None, True)), 12)
19+
self.assertEqual(len(self.stream._allowed_event_list(None, [RotateEvent], False)), 12)
2020
self.assertEqual(len(self.stream._allowed_event_list([RotateEvent], None, False)), 1)
2121

2222
def test_read_query_event(self):
@@ -63,6 +63,32 @@ def test_reading_rotate_event(self):
6363
# Rotate event
6464
self.assertIsInstance(self.stream.fetchone(), RotateEvent)
6565

66+
""" `test_load_query_event` needs statement-based binlog
67+
def test_load_query_event(self):
68+
# prepare csv
69+
with open("/tmp/test_load_query.csv", "w") as fp:
70+
fp.write("1,aaa\n2,bbb\n3,ccc\n4,ddd\n")
71+
72+
query = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))"
73+
self.execute(query)
74+
query = "LOAD DATA INFILE '/tmp/test_load_query.csv' INTO TABLE test \
75+
FIELDS TERMINATED BY ',' \
76+
ENCLOSED BY '\"' \
77+
LINES TERMINATED BY '\r\n'"
78+
self.execute(query)
79+
80+
self.assertIsInstance(self.stream.fetchone(), RotateEvent)
81+
self.assertIsInstance(self.stream.fetchone(), FormatDescriptionEvent)
82+
# create table
83+
self.assertIsInstance(self.stream.fetchone(), QueryEvent)
84+
# begin
85+
self.assertIsInstance(self.stream.fetchone(), QueryEvent)
86+
87+
self.assertIsInstance(self.stream.fetchone(), BeginLoadQueryEvent)
88+
self.assertIsInstance(self.stream.fetchone(), ExecuteLoadQueryEvent)
89+
90+
self.assertIsInstance(self.stream.fetchone(), XidEvent)
91+
"""
6692

6793
def test_connection_stream_lost_event(self):
6894
self.stream.close()

0 commit comments

Comments
 (0)