Skip to content

Commit 8b19e18

Browse files
committed
Merge remote-tracking branch 'origin/main'
2 parents 3b95812 + dc190cb commit 8b19e18

File tree

5 files changed

+123
-68
lines changed

5 files changed

+123
-68
lines changed

Diff for: pymysqlreplication/binlogstream.py

+21-19
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,22 @@
11
# -*- coding: utf-8 -*-
22

3-
import pymysql
43
import struct
54
from distutils.version import LooseVersion
65

6+
import pymysql
77
from pymysql.constants.COMMAND import COM_BINLOG_DUMP, COM_REGISTER_SLAVE
88
from pymysql.cursors import DictCursor
99

10-
from .packet import BinLogPacketWrapper
1110
from .constants.BINLOG import TABLE_MAP_EVENT, ROTATE_EVENT, FORMAT_DESCRIPTION_EVENT
12-
from .gtid import GtidSet
1311
from .event import (
1412
QueryEvent, RotateEvent, FormatDescriptionEvent,
1513
XidEvent, GtidEvent, StopEvent, XAPrepareEvent,
1614
BeginLoadQueryEvent, ExecuteLoadQueryEvent,
1715
HeartbeatLogEvent, NotImplementedEvent, MariadbGtidEvent,
18-
MariadbAnnotateRowsEvent, RandEvent, MariadbStartEncryptionEvent)
16+
MariadbAnnotateRowsEvent, RandEvent, MariadbStartEncryptionEvent, RowsQueryLogEvent)
1917
from .exceptions import BinLogNotEnabled
18+
from .gtid import GtidSet
19+
from .packet import BinLogPacketWrapper
2020
from .row_event import (
2121
UpdateRowsEvent, WriteRowsEvent, DeleteRowsEvent, TableMapEvent)
2222

@@ -33,7 +33,6 @@
3333

3434

3535
class ReportSlave(object):
36-
3736
"""Represent the values that you may report when connecting as a slave
3837
to a master. SHOW SLAVE HOSTS related"""
3938

@@ -68,7 +67,7 @@ def __init__(self, value):
6867
self.hostname = value
6968

7069
def __repr__(self):
71-
return '<ReportSlave hostname=%s username=%s password=%s port=%d>' %\
70+
return '<ReportSlave hostname=%s username=%s password=%s port=%d>' % \
7271
(self.hostname, self.username, self.password, self.port)
7372

7473
def encoded(self, server_id, master_id=0):
@@ -123,7 +122,6 @@ def encoded(self, server_id, master_id=0):
123122

124123

125124
class BinLogStreamReader(object):
126-
127125
"""Connect to replication stream and read event
128126
"""
129127
report_slave = None
@@ -256,6 +254,7 @@ def __connect_to_ctl(self):
256254
self._ctl_connection_settings = dict(self.__connection_settings)
257255
self._ctl_connection_settings["db"] = "information_schema"
258256
self._ctl_connection_settings["cursorclass"] = DictCursor
257+
self._ctl_connection_settings["autocommit"] = True
259258
self._ctl_connection = self.pymysql_wrapper(**self._ctl_connection_settings)
260259
self._ctl_connection._get_table_information = self.__get_table_information
261260
self.__connected_ctl = True
@@ -316,7 +315,7 @@ def __connect_to_stream(self):
316315
4294967))
317316
# If heartbeat is too low, the connection will disconnect before,
318317
# this is also the behavior in mysql
319-
heartbeat = float(min(net_timeout/2., self.slave_heartbeat))
318+
heartbeat = float(min(net_timeout / 2., self.slave_heartbeat))
320319
if heartbeat > 4294967:
321320
heartbeat = 4294967
322321

@@ -352,7 +351,7 @@ def __connect_to_stream(self):
352351
cur.close()
353352

354353
prelude = struct.pack('<i', len(self.log_file) + 11) \
355-
+ bytes(bytearray([COM_BINLOG_DUMP]))
354+
+ bytes(bytearray([COM_BINLOG_DUMP]))
356355

357356
if self.__resume_stream:
358357
prelude += struct.pack('<I', self.log_pos)
@@ -369,7 +368,7 @@ def __connect_to_stream(self):
369368
prelude += self.log_file.encode()
370369
else:
371370
if self.is_mariadb:
372-
prelude = self.__set_mariadb_settings()
371+
prelude = self.__set_mariadb_settings()
373372
else:
374373
# Format for mysql packet master_auto_position
375374
#
@@ -416,8 +415,8 @@ def __connect_to_stream(self):
416415
8 + # binlog_pos_info_size
417416
4) # encoded_data_size
418417

419-
prelude = b'' + struct.pack('<i', header_size + encoded_data_size)\
420-
+ bytes(bytearray([COM_BINLOG_DUMP_GTID]))
418+
prelude = b'' + struct.pack('<i', header_size + encoded_data_size) \
419+
+ bytes(bytearray([COM_BINLOG_DUMP_GTID]))
421420

422421
flags = 0
423422
if not self.__blocking:
@@ -454,7 +453,7 @@ def __connect_to_stream(self):
454453
def __set_mariadb_settings(self):
455454
# https://mariadb.com/kb/en/5-slave-registration/
456455
cur = self._stream_connection.cursor()
457-
if self.auto_position != None :
456+
if self.auto_position != None:
458457
cur.execute("SET @slave_connect_state='%s'" % self.auto_position)
459458
cur.execute("SET @slave_gtid_strict_mode=1")
460459
cur.execute("SET @slave_gtid_ignore_duplicates=0")
@@ -465,7 +464,7 @@ def __set_mariadb_settings(self):
465464
4 + # binlog pos
466465
2 + # binlog flags
467466
4 + # slave server_id,
468-
4 # requested binlog file name , set it to empty
467+
4 # requested binlog file name , set it to empty
469468
)
470469

471470
prelude = struct.pack('<i', header_size) + bytes(bytearray([COM_BINLOG_DUMP]))
@@ -477,11 +476,11 @@ def __set_mariadb_settings(self):
477476

478477
# Enable annotate rows event
479478
if self.__annotate_rows_event:
480-
flags |= 0x02 # BINLOG_SEND_ANNOTATE_ROWS_EVENT
479+
flags |= 0x02 # BINLOG_SEND_ANNOTATE_ROWS_EVENT
481480

482481
if not self.__blocking:
483482
flags |= 0x01 # BINLOG_DUMP_NON_BLOCK
484-
483+
485484
# binlog flags
486485
prelude += struct.pack('<H', flags)
487486

@@ -621,10 +620,11 @@ def _allowed_event_list(self, only_events, ignored_events,
621620
HeartbeatLogEvent,
622621
NotImplementedEvent,
623622
MariadbGtidEvent,
623+
RowsQueryLogEvent,
624624
MariadbAnnotateRowsEvent,
625625
RandEvent,
626-
MariadbStartEncryptionEvent
627-
))
626+
MariadbStartEncryptionEvent,
627+
))
628628
if ignored_events is not None:
629629
for e in ignored_events:
630630
events.remove(e)
@@ -653,8 +653,10 @@ def __get_table_information(self, schema, table):
653653
table_schema = %s AND table_name = %s
654654
ORDER BY ORDINAL_POSITION
655655
""", (schema, table))
656+
result = cur.fetchall()
657+
cur.close()
656658

657-
return cur.fetchall()
659+
return result
658660
except pymysql.OperationalError as error:
659661
code, message = error.args
660662
if code in MYSQL_EXPECTED_ERROR_CODES:

Diff for: pymysqlreplication/event.py

+46-23
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ class GtidEvent(BinLogEvent):
5656
"""GTID change in binlog event
5757
"""
5858
def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs):
59-
super(GtidEvent, self).__init__(from_packet, event_size, table_map,
59+
super().__init__(from_packet, event_size, table_map,
6060
ctl_connection, **kwargs)
6161

6262
self.commit_flag = struct.unpack("!B", self.packet.read(1))[0] == 1
@@ -97,7 +97,7 @@ class MariadbGtidEvent(BinLogEvent):
9797
"""
9898
def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs):
9999

100-
super(MariadbGtidEvent, self).__init__(from_packet, event_size, table_map, ctl_connection, **kwargs)
100+
super().__init__(from_packet, event_size, table_map, ctl_connection, **kwargs)
101101

102102
self.server_id = self.packet.server_id
103103
self.gtid_seq_no = self.packet.read_uint64()
@@ -106,7 +106,7 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs)
106106
self.gtid = "%d-%d-%d" % (self.domain_id, self.server_id, self.gtid_seq_no)
107107

108108
def _dump(self):
109-
super(MariadbGtidEvent, self)._dump()
109+
super()._dump()
110110
print("Flags:", self.flags)
111111
print('GTID:', self.gtid)
112112

@@ -121,11 +121,11 @@ class MariadbAnnotateRowsEvent(BinLogEvent):
121121
sql_statement: The SQL statement
122122
"""
123123
def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs):
124-
super(MariadbAnnotateRowsEvent, self).__init__(from_packet, event_size, table_map, ctl_connection, **kwargs)
124+
super().__init__(from_packet, event_size, table_map, ctl_connection, **kwargs)
125125
self.sql_statement = self.packet.read(event_size)
126126

127127
def _dump(self):
128-
super(MariadbAnnotateRowsEvent, self)._dump()
128+
super()._dump()
129129
print("SQL statement :", self.sql_statement)
130130

131131

@@ -137,7 +137,7 @@ class RotateEvent(BinLogEvent):
137137
next_binlog: Name of next binlog file
138138
"""
139139
def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs):
140-
super(RotateEvent, self).__init__(from_packet, event_size, table_map,
140+
super().__init__(from_packet, event_size, table_map,
141141
ctl_connection, **kwargs)
142142
self.position = struct.unpack('<Q', self.packet.read(8))[0]
143143
self.next_binlog = self.packet.read(event_size - 8).decode()
@@ -158,7 +158,7 @@ class XAPrepareEvent(BinLogEvent):
158158
xid: serialized XID representation of XA transaction
159159
"""
160160
def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs):
161-
super(XAPrepareEvent, self).__init__(from_packet, event_size, table_map,
161+
super().__init__(from_packet, event_size, table_map,
162162
ctl_connection, **kwargs)
163163

164164
# one_phase is True: XA COMMIT ... ONE PHASE
@@ -182,7 +182,7 @@ def _dump(self):
182182

183183
class FormatDescriptionEvent(BinLogEvent):
184184
def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs):
185-
super(FormatDescriptionEvent, self).__init__(from_packet, event_size, table_map,
185+
super().__init__(from_packet, event_size, table_map,
186186
ctl_connection, **kwargs)
187187
self.binlog_version = struct.unpack('<H', self.packet.read(2))
188188
self.mysql_version_str = self.packet.read(50).rstrip(b'\0').decode()
@@ -206,12 +206,12 @@ class XidEvent(BinLogEvent):
206206
"""
207207

208208
def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs):
209-
super(XidEvent, self).__init__(from_packet, event_size, table_map,
209+
super().__init__(from_packet, event_size, table_map,
210210
ctl_connection, **kwargs)
211211
self.xid = struct.unpack('<Q', self.packet.read(8))[0]
212212

213213
def _dump(self):
214-
super(XidEvent, self)._dump()
214+
super()._dump()
215215
print("Transaction ID: %d" % (self.xid))
216216

217217

@@ -236,21 +236,21 @@ class HeartbeatLogEvent(BinLogEvent):
236236
"""
237237

238238
def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs):
239-
super(HeartbeatLogEvent, self).__init__(from_packet, event_size,
239+
super().__init__(from_packet, event_size,
240240
table_map, ctl_connection,
241241
**kwargs)
242242
self.ident = self.packet.read(event_size).decode()
243243

244244
def _dump(self):
245-
super(HeartbeatLogEvent, self)._dump()
245+
super()._dump()
246246
print("Current binlog: %s" % (self.ident))
247247

248248

249249
class QueryEvent(BinLogEvent):
250250
'''This event is trigger when a query is run of the database.
251251
Only replicated queries are logged.'''
252252
def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs):
253-
super(QueryEvent, self).__init__(from_packet, event_size, table_map,
253+
super().__init__(from_packet, event_size, table_map,
254254
ctl_connection, **kwargs)
255255

256256
# Post-header
@@ -276,7 +276,7 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs)
276276
#string[EOF] query
277277

278278
def _dump(self):
279-
super(QueryEvent, self)._dump()
279+
super()._dump()
280280
print("Schema: %s" % (self.schema))
281281
print("Execution time: %d" % (self.execution_time))
282282
print("Query: %s" % (self.query))
@@ -376,15 +376,15 @@ class BeginLoadQueryEvent(BinLogEvent):
376376
block-data
377377
"""
378378
def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs):
379-
super(BeginLoadQueryEvent, self).__init__(from_packet, event_size, table_map,
379+
super().__init__(from_packet, event_size, table_map,
380380
ctl_connection, **kwargs)
381381

382382
# Payload
383383
self.file_id = self.packet.read_uint32()
384384
self.block_data = self.packet.read(event_size - 4)
385385

386386
def _dump(self):
387-
super(BeginLoadQueryEvent, self)._dump()
387+
super()._dump()
388388
print("File id: %d" % (self.file_id))
389389
print("Block data: %s" % (self.block_data))
390390

@@ -405,7 +405,7 @@ class ExecuteLoadQueryEvent(BinLogEvent):
405405
dup_handling_flags
406406
"""
407407
def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs):
408-
super(ExecuteLoadQueryEvent, self).__init__(from_packet, event_size, table_map,
408+
super().__init__(from_packet, event_size, table_map,
409409
ctl_connection, **kwargs)
410410

411411
# Post-header
@@ -442,18 +442,19 @@ class IntvarEvent(BinLogEvent):
442442
value
443443
"""
444444
def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs):
445-
super(IntvarEvent, self).__init__(from_packet, event_size, table_map,
445+
super().__init__(from_packet, event_size, table_map,
446446
ctl_connection, **kwargs)
447447

448448
# Payload
449449
self.type = self.packet.read_uint8()
450450
self.value = self.packet.read_uint32()
451451

452452
def _dump(self):
453-
super(IntvarEvent, self)._dump()
453+
super()._dump()
454454
print("type: %d" % (self.type))
455455
print("Value: %d" % (self.value))
456456

457+
457458
class RandEvent(BinLogEvent):
458459
"""
459460
RandEvent is generated every time a statement uses the RAND() function.
@@ -467,7 +468,7 @@ class RandEvent(BinLogEvent):
467468
"""
468469

469470
def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs):
470-
super(RandEvent, self).__init__(from_packet, event_size, table_map,
471+
super().__init__(from_packet, event_size, table_map,
471472
ctl_connection, **kwargs)
472473
# Payload
473474
self._seed1 = self.packet.read_uint64()
@@ -484,10 +485,11 @@ def seed2(self):
484485
return self._seed2
485486

486487
def _dump(self):
487-
super(RandEvent, self)._dump()
488+
super()._dump()
488489
print("seed1: %d" % (self.seed1))
489490
print("seed2: %d" % (self.seed2))
490491

492+
491493
class MariadbStartEncryptionEvent(BinLogEvent):
492494
"""
493495
Since MariaDB 10.1.7,
@@ -505,7 +507,7 @@ class MariadbStartEncryptionEvent(BinLogEvent):
505507
"""
506508

507509
def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs):
508-
super(MariadbStartEncryptionEvent, self).__init__(from_packet, event_size, table_map, ctl_connection, **kwargs)
510+
super().__init__(from_packet, event_size, table_map, ctl_connection, **kwargs)
509511

510512
self.schema = self.packet.read_uint8()
511513
self.key_version = self.packet.read_uint32()
@@ -517,8 +519,29 @@ def _dump(self):
517519
print(f"Nonce: {self.nonce}")
518520

519521

522+
class RowsQueryLogEvent(BinLogEvent):
523+
"""
524+
Record original query for the row events in Row-Based Replication
525+
526+
More details are available in the MySQL Knowledge Base:
527+
https://dev.mysql.com/doc/dev/mysql-server/latest/classRows__query__log__event.html
528+
529+
:ivar query_length: uint - Length of the SQL statement
530+
:ivar query: str - The executed SQL statement
531+
"""
532+
def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs):
533+
super(RowsQueryLogEvent, self).__init__(from_packet, event_size, table_map,
534+
ctl_connection, **kwargs)
535+
self.query_length = self.packet.read_uint8()
536+
self.query = self.packet.read(self.query_length).decode('utf-8')
537+
def dump(self):
538+
print("=== %s ===" % (self.__class__.__name__))
539+
print("Query length: %d" % self.query_length)
540+
print("Query: %s" % self.query)
541+
542+
520543
class NotImplementedEvent(BinLogEvent):
521544
def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs):
522-
super(NotImplementedEvent, self).__init__(
545+
super().__init__(
523546
from_packet, event_size, table_map, ctl_connection, **kwargs)
524547
self.packet.advance(event_size)

Diff for: pymysqlreplication/packet.py

+2
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ class BinLogPacketWrapper(object):
7070
constants.EXECUTE_LOAD_QUERY_EVENT: event.ExecuteLoadQueryEvent,
7171
constants.HEARTBEAT_LOG_EVENT: event.HeartbeatLogEvent,
7272
constants.XA_PREPARE_EVENT: event.XAPrepareEvent,
73+
constants.ROWS_QUERY_LOG_EVENT: event.RowsQueryLogEvent,
7374
constants.RAND_EVENT: event.RandEvent,
7475
# row_event
7576
constants.UPDATE_ROWS_EVENT_V1: row_event.UpdateRowsEvent,
@@ -82,6 +83,7 @@ class BinLogPacketWrapper(object):
8283

8384
#5.6 GTID enabled replication events
8485
constants.ANONYMOUS_GTID_LOG_EVENT: event.NotImplementedEvent,
86+
constants.ANONYMOUS_GTID_LOG_EVENT: event.NotImplementedEvent,
8587
constants.PREVIOUS_GTIDS_LOG_EVENT: event.NotImplementedEvent,
8688
# MariaDB GTID
8789
constants.MARIADB_ANNOTATE_ROWS_EVENT: event.MariadbAnnotateRowsEvent,

0 commit comments

Comments
 (0)