diff --git a/examples/mariadb_gtid/read_event.py b/examples/mariadb_gtid/read_event.py index 49598c3f..607bfa34 100644 --- a/examples/mariadb_gtid/read_event.py +++ b/examples/mariadb_gtid/read_event.py @@ -1,7 +1,7 @@ import pymysql from pymysqlreplication import BinLogStreamReader, gtid -from pymysqlreplication.event import GtidEvent, RotateEvent, MariadbGtidEvent, QueryEvent,MariadbAnnotateRowsEvent +from pymysqlreplication.event import GtidEvent, RotateEvent, MariadbGtidEvent, QueryEvent,MariadbAnnotateRowsEvent, MariadbBinLogCheckPointEvent from pymysqlreplication.row_event import WriteRowsEvent, UpdateRowsEvent, DeleteRowsEvent MARIADB_SETTINGS = { @@ -62,6 +62,7 @@ def query_server_id(self): blocking=False, only_events=[ MariadbGtidEvent, + MariadbBinLogCheckPointEvent, RotateEvent, WriteRowsEvent, UpdateRowsEvent, diff --git a/pymysqlreplication/binlogstream.py b/pymysqlreplication/binlogstream.py index eeab352b..c153fcda 100644 --- a/pymysqlreplication/binlogstream.py +++ b/pymysqlreplication/binlogstream.py @@ -14,7 +14,7 @@ BeginLoadQueryEvent, ExecuteLoadQueryEvent, HeartbeatLogEvent, NotImplementedEvent, MariadbGtidEvent, MariadbAnnotateRowsEvent, RandEvent, MariadbStartEncryptionEvent, RowsQueryLogEvent, - MariadbGtidListEvent) + MariadbGtidListEvent, MariadbBinLogCheckPointEvent) from .exceptions import BinLogNotEnabled from .gtid import GtidSet from .packet import BinLogPacketWrapper @@ -625,7 +625,8 @@ def _allowed_event_list(self, only_events, ignored_events, MariadbAnnotateRowsEvent, RandEvent, MariadbStartEncryptionEvent, - MariadbGtidListEvent + MariadbGtidListEvent, + MariadbBinLogCheckPointEvent )) if ignored_events is not None: for e in ignored_events: diff --git a/pymysqlreplication/event.py b/pymysqlreplication/event.py index 409743bc..12db2915 100644 --- a/pymysqlreplication/event.py +++ b/pymysqlreplication/event.py @@ -110,6 +110,25 @@ def _dump(self): print("Flags:", self.flags) print('GTID:', self.gtid) +class MariadbBinLogCheckPointEvent(BinLogEvent): + """ + Represents a checkpoint in a binlog event in MariaDB. + + More details are available in the MariaDB Knowledge Base: + https://mariadb.com/kb/en/binlog_checkpoint_event/ + + :ivar filename_length: int - The length of the filename. + :ivar filename: str - The name of the file saved at the checkpoint. + """ + + def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs): + super(MariadbBinLogCheckPointEvent, self).__init__(from_packet, event_size, table_map, ctl_connection, + **kwargs) + filename_length = self.packet.read_uint32() + self.filename = self.packet.read(filename_length).decode() + + def _dump(self): + print('Filename:', self.filename) class MariadbAnnotateRowsEvent(BinLogEvent): """ diff --git a/pymysqlreplication/packet.py b/pymysqlreplication/packet.py index 04d60581..1d2e408b 100644 --- a/pymysqlreplication/packet.py +++ b/pymysqlreplication/packet.py @@ -88,6 +88,7 @@ class BinLogPacketWrapper(object): # MariaDB GTID constants.MARIADB_ANNOTATE_ROWS_EVENT: event.MariadbAnnotateRowsEvent, constants.MARIADB_BINLOG_CHECKPOINT_EVENT: event.NotImplementedEvent, + constants.MARIADB_BINLOG_CHECKPOINT_EVENT: event.MariadbBinLogCheckPointEvent, constants.MARIADB_GTID_EVENT: event.MariadbGtidEvent, constants.MARIADB_GTID_GTID_LIST_EVENT: event.MariadbGtidListEvent, constants.MARIADB_START_ENCRYPTION_EVENT: event.MariadbStartEncryptionEvent diff --git a/pymysqlreplication/tests/base.py b/pymysqlreplication/tests/base.py index 6cb23094..fd18cb3d 100644 --- a/pymysqlreplication/tests/base.py +++ b/pymysqlreplication/tests/base.py @@ -144,4 +144,9 @@ def setUp(self): self.connect_conn_control(db) self.stream = None self.resetBinLog() - \ No newline at end of file + + def bin_log_basename(self): + cursor = self.execute('SELECT @@log_bin_basename') + bin_log_basename = cursor.fetchone()[0] + bin_log_basename = bin_log_basename.split("/")[-1] + return bin_log_basename diff --git a/pymysqlreplication/tests/test_basic.py b/pymysqlreplication/tests/test_basic.py index 8a4f7cd9..cb27dada 100644 --- a/pymysqlreplication/tests/test_basic.py +++ b/pymysqlreplication/tests/test_basic.py @@ -27,9 +27,9 @@ def ignoredEvents(self): return [GtidEvent] def test_allowed_event_list(self): - self.assertEqual(len(self.stream._allowed_event_list(None, None, False)), 21) - self.assertEqual(len(self.stream._allowed_event_list(None, None, True)), 20) - self.assertEqual(len(self.stream._allowed_event_list(None, [RotateEvent], False)), 20) + self.assertEqual(len(self.stream._allowed_event_list(None, None, False)), 22) + self.assertEqual(len(self.stream._allowed_event_list(None, None, True)), 21) + self.assertEqual(len(self.stream._allowed_event_list(None, [RotateEvent], False)), 21) self.assertEqual(len(self.stream._allowed_event_list([RotateEvent], None, False)), 1) def test_read_query_event(self): @@ -1009,6 +1009,32 @@ def test_parsing(self): gtid = Gtid("57b70f4e-20d3-11e5-a393-4a63946f7eac:1-:1") gtid = Gtid("57b70f4e-20d3-11e5-a393-4a63946f7eac::1") +class TestMariadbBinlogStreamReader(base.PyMySQLReplicationMariaDbTestCase): + def test_binlog_checkpoint_event(self): + self.stream.close() + self.stream = BinLogStreamReader( + self.database, + server_id=1023, + blocking=False, + is_mariadb=True + ) + + query = "DROP TABLE IF EXISTS test" + self.execute(query) + + query = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" + self.execute(query) + self.stream.close() + + event = self.stream.fetchone() + self.assertIsInstance(event, RotateEvent) + + event = self.stream.fetchone() + self.assertIsInstance(event,FormatDescriptionEvent) + + event = self.stream.fetchone() + self.assertIsInstance(event, MariadbBinLogCheckPointEvent) + self.assertEqual(event.filename, self.bin_log_basename()+".000001") class TestMariadbBinlogStreamReader(base.PyMySQLReplicationMariaDbTestCase):