@@ -30,17 +30,20 @@ def test_reading_rotate_event(self):
30
30
query = "CREATE TABLE test_2 (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))"
31
31
self .execute (query )
32
32
33
- rotate_event = self .stream .fetchone ()
33
+ # Rotate event
34
+ self .stream .fetchone ()
34
35
self .stream .close ()
35
36
36
37
query = "CREATE TABLE test_3 (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))"
37
38
self .execute (query )
38
39
39
- rotate_event = self .stream .fetchone ()
40
+ # Rotate event
41
+ self .stream .fetchone ()
40
42
41
43
def test_connection_stream_lost_event (self ):
42
44
self .stream .close ()
43
- self .stream = BinLogStreamReader (connection_settings = self .database , blocking = True )
45
+ self .stream = BinLogStreamReader (connection_settings = self .database ,
46
+ blocking = True )
44
47
45
48
query = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))"
46
49
self .execute (query )
@@ -66,7 +69,8 @@ def test_connection_stream_lost_event(self):
66
69
67
70
def test_filtering_events (self ):
68
71
self .stream .close ()
69
- self .stream = BinLogStreamReader (connection_settings = self .database , only_events = [QueryEvent ])
72
+ self .stream = BinLogStreamReader (connection_settings = self .database ,
73
+ only_events = [QueryEvent ])
70
74
query = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))"
71
75
self .execute (query )
72
76
@@ -172,6 +176,48 @@ def test_update_row_event(self):
172
176
self .assertEqual (event .rows [0 ]["after_values" ]["id" ], 1 )
173
177
self .assertEqual (event .rows [0 ]["after_values" ]["data" ], "World" )
174
178
179
+ def test_log_pos (self ):
180
+ query = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))"
181
+ self .execute (query )
182
+ query = "INSERT INTO test (data) VALUES('Hello')"
183
+ self .execute (query )
184
+ self .execute ("COMMIT" )
185
+
186
+ for i in range (6 ):
187
+ self .stream .fetchone ()
188
+ # record position after insert
189
+ log_file , log_pos = self .stream .log_file , self .stream .log_pos
190
+
191
+ query = "UPDATE test SET data = 'World' WHERE id = 1"
192
+ self .execute (query )
193
+ self .execute ("COMMIT" )
194
+
195
+ # resume stream from previous position
196
+ if self .stream is not None :
197
+ self .stream .close ()
198
+ self .stream = BinLogStreamReader (
199
+ connection_settings = self .database ,
200
+ resume_stream = True ,
201
+ log_file = log_file ,
202
+ log_pos = log_pos
203
+ )
204
+
205
+ # RotateEvent
206
+ self .stream .fetchone ()
207
+ # FormatDescription
208
+ self .stream .fetchone ()
209
+ # XvidEvent
210
+ self .stream .fetchone ()
211
+ # QueryEvent for the BEGIN
212
+ self .stream .fetchone ()
213
+
214
+ event = self .stream .fetchone ()
215
+ self .assertIsInstance (event , TableMapEvent )
216
+
217
+ event = self .stream .fetchone ()
218
+ self .assertIsInstance (event , UpdateRowsEvent )
219
+
220
+
175
221
class TestMultipleRowBinLogStreamReader (base .PyMySQLReplicationTestCase ):
176
222
def test_insert_multiple_row_event (self ):
177
223
query = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))"
@@ -234,7 +280,7 @@ def test_update_multiple_row_event(self):
234
280
235
281
event = self .stream .fetchone ()
236
282
if self .isMySQL56AndMore ():
237
- self .assertEqual (event .event_type , UPDATE_ROWS_EVENT_V2 )
283
+ self .assertEqual (event .event_type , UPDATE_ROWS_EVENT_V2 )
238
284
else :
239
285
self .assertEqual (event .event_type , UPDATE_ROWS_EVENT_V1 )
240
286
self .assertIsInstance (event , UpdateRowsEvent )
0 commit comments