diff --git a/docker-compose.yml b/docker-compose.yml index d6449d2c..0ed716e5 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -15,3 +15,17 @@ services: ports: - 3307:3307 command: mysqld --log-bin=mysql-bin.log --server-id 1 --binlog-format=row --gtid_mode=on --enforce-gtid-consistency=on --log_slave_updates -P 3307 + + mariadb-10.6: + image: mariadb:10.6 + environment: + MARIADB_ALLOW_EMPTY_ROOT_PASSWORD: 1 + ports: + - "3308:3306" + command: | + --server-id=1 + --default-authentication-plugin=mysql_native_password + --log-bin=master-bin + --binlog-format=row + --log-slave-updates=on + \ No newline at end of file diff --git a/examples/mariadb_gtid/read_event.py b/examples/mariadb_gtid/read_event.py index cc88a97f..49598c3f 100644 --- a/examples/mariadb_gtid/read_event.py +++ b/examples/mariadb_gtid/read_event.py @@ -1,7 +1,7 @@ import pymysql from pymysqlreplication import BinLogStreamReader, gtid -from pymysqlreplication.event import GtidEvent, RotateEvent, MariadbGtidEvent, QueryEvent +from pymysqlreplication.event import GtidEvent, RotateEvent, MariadbGtidEvent, QueryEvent,MariadbAnnotateRowsEvent from pymysqlreplication.row_event import WriteRowsEvent, UpdateRowsEvent, DeleteRowsEvent MARIADB_SETTINGS = { @@ -65,10 +65,12 @@ def query_server_id(self): RotateEvent, WriteRowsEvent, UpdateRowsEvent, - DeleteRowsEvent + DeleteRowsEvent, + MariadbAnnotateRowsEvent ], auto_position=gtid, - is_mariadb=True + is_mariadb=True, + annotate_rows_event=True ) print('Starting reading events from GTID ', gtid) diff --git a/pymysqlreplication/binlogstream.py b/pymysqlreplication/binlogstream.py index fa65aa22..9abc0823 100644 --- a/pymysqlreplication/binlogstream.py +++ b/pymysqlreplication/binlogstream.py @@ -14,7 +14,8 @@ QueryEvent, RotateEvent, FormatDescriptionEvent, XidEvent, GtidEvent, StopEvent, XAPrepareEvent, BeginLoadQueryEvent, ExecuteLoadQueryEvent, - HeartbeatLogEvent, NotImplementedEvent, MariadbGtidEvent) + HeartbeatLogEvent, NotImplementedEvent, MariadbGtidEvent, + MariadbAnnotateRowsEvent) from .exceptions import BinLogNotEnabled from .row_event import ( UpdateRowsEvent, WriteRowsEvent, DeleteRowsEvent, TableMapEvent) @@ -141,6 +142,7 @@ def __init__(self, connection_settings, server_id, fail_on_table_metadata_unavailable=False, slave_heartbeat=None, is_mariadb=False, + annotate_rows_event=False, ignore_decode_errors=False): """ Attributes: @@ -178,6 +180,8 @@ def __init__(self, connection_settings, server_id, for semantics is_mariadb: Flag to indicate it's a MariaDB server, used with auto_position to point to Mariadb specific GTID. + annotate_rows_event: Parameter value to enable annotate rows event in mariadb, + used with 'is_mariadb' ignore_decode_errors: If true, any decode errors encountered when reading column data will be ignored. """ @@ -219,6 +223,7 @@ def __init__(self, connection_settings, server_id, self.auto_position = auto_position self.skip_to_timestamp = skip_to_timestamp self.is_mariadb = is_mariadb + self.__annotate_rows_event = annotate_rows_event if end_log_pos: self.is_past_end_log_pos = False @@ -331,67 +336,39 @@ def __connect_to_stream(self): self._register_slave() if not self.auto_position: - # only when log_file and log_pos both provided, the position info is - # valid, if not, get the current position from master - if self.log_file is None or self.log_pos is None: - cur = self._stream_connection.cursor() - cur.execute("SHOW MASTER STATUS") - master_status = cur.fetchone() - if master_status is None: - raise BinLogNotEnabled() - self.log_file, self.log_pos = master_status[:2] - cur.close() - - prelude = struct.pack('