Skip to content

Commit 725f035

Browse files
committed
Merge remote-tracking branch 'origin/feature/optional-meta-data' into feature/optional-meta-data
2 parents b882122 + 3bc4e08 commit 725f035

File tree

12 files changed

+351
-162
lines changed

12 files changed

+351
-162
lines changed

README.md

+5-1
Original file line numberDiff line numberDiff line change
@@ -56,11 +56,14 @@ Limitations
5656

5757
https://python-mysql-replication.readthedocs.org/en/latest/limitations.html
5858

59-
Featured Books
59+
Featured
6060
=============
6161

6262
[Data Pipelines Pocket Reference](https://www.oreilly.com/library/view/data-pipelines-pocket/9781492087823/) (by James Densmore, O'Reilly): Introduced and exemplified in Chapter 4: Data Ingestion: Extracting Data.
6363

64+
[Streaming Changes in a Database with Amazon Kinesis](https://aws.amazon.com/blogs/database/streaming-changes-in-a-database-with-amazon-kinesis/) (by Emmanuel Espina, Amazon Web Services)
65+
66+
6467
Projects using this library
6568
===========================
6669

@@ -84,6 +87,7 @@ Projects using this library
8487
* MySQL to Kafka (https://github.com/scottpersinger/mysql-to-kafka/)
8588
* Aventri MySQL Monitor (https://github.com/aventri/mysql-monitor)
8689
* BitSwanPump: A real-time stream processor (https://github.com/LibertyAces/BitSwanPump)
90+
* clickhouse-mysql-data-reader: https://github.com/Altinity/clickhouse-mysql-data-reader
8791

8892
MySQL server settings
8993
=========================

docker-compose-test.yml

+51-17
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,56 @@
1-
version: '3.2'
1+
version: '3.4'
2+
3+
x-mysql: &mysql
4+
environment:
5+
MYSQL_ALLOW_EMPTY_PASSWORD: true
6+
command: >
7+
mysqld
8+
--log-bin=mysql-bin.log
9+
--server-id 1
10+
--binlog-format=row
11+
--gtid_mode=on
12+
--enforce-gtid-consistency=on
13+
14+
x-mariadb: &mariadb
15+
environment:
16+
MARIADB_ALLOW_EMPTY_ROOT_PASSWORD: 1
17+
command: >
18+
--server-id=1
19+
--default-authentication-plugin=mysql_native_password
20+
--log-bin=master-bin
21+
--binlog-format=row
22+
223
services:
3-
percona-5.7:
4-
platform: linux/amd64
24+
percona-5.7-ctl:
25+
<<: *mysql
526
image: percona:5.7
6-
environment:
7-
MYSQL_ALLOW_EMPTY_PASSWORD: true
8-
MYSQL_DATABASE: pymysqlreplication_test
927
ports:
10-
- 3306:3306
11-
command: mysqld --log-bin=mysql-bin.log --server-id 1 --binlog-format=row --gtid_mode=on --enforce-gtid-consistency=on --log_slave_updates
12-
restart: always
28+
- "3307:3306"
1329
networks:
1430
- default
1531

16-
percona-5.7-ctl:
32+
percona-5.7:
33+
<<: *mysql
1734
image: percona:5.7
18-
environment:
19-
MYSQL_ALLOW_EMPTY_PASSWORD: true
20-
MYSQL_DATABASE: pymysqlreplication_test
2135
ports:
22-
- 3307:3307
23-
command: mysqld --log-bin=mysql-bin.log --server-id 1 --binlog-format=row --gtid_mode=on --enforce-gtid-consistency=on --log_slave_updates -P 3307
36+
- "3306:3306"
37+
networks:
38+
- default
39+
40+
mariadb-10.6:
41+
<<: *mariadb
42+
image: mariadb:10.6
43+
ports:
44+
- "3308:3306"
45+
volumes:
46+
- type: bind
47+
source: ./.mariadb
48+
target: /opt/key_file
49+
- type: bind
50+
source: ./.mariadb/my.cnf
51+
target: /etc/mysql/my.cnf
52+
networks:
53+
- default
2454

2555
pymysqlreplication:
2656
build:
@@ -30,6 +60,9 @@ services:
3060
BASE_IMAGE: python:3.11-alpine
3161
MYSQL_5_7: percona-5.7
3262
MYSQL_5_7_CTL: percona-5.7-ctl
63+
MYSQL_5_7_CTL_PORT: 3306
64+
MARIADB_10_6: mariadb-10.6
65+
MARIADB_10_6_PORT: 3306
3366

3467
command:
3568
- /bin/sh
@@ -39,7 +72,7 @@ services:
3972
4073
while :
4174
do
42-
if mysql -h percona-5.7 --user=root --execute "USE pymysqlreplication_test;" 2>&1 >/dev/null && mysql -h percona-5.7-ctl --port=3307 --user=root --execute "USE pymysqlreplication_test;" 2>&1 >/dev/null; then
75+
if mysql -h percona-5.7 --user=root --execute "SELECT version();" 2>&1 >/dev/null && mysql -h percona-5.7-ctl --user=root --execute "SELECT version();" 2>&1 >/dev/null; then
4376
break
4477
fi
4578
sleep 1
@@ -56,4 +89,5 @@ services:
5689
- percona-5.7-ctl
5790

5891
networks:
59-
default: {}
92+
default:
93+
driver: bridge

docker-compose.yml

+28-18
Original file line numberDiff line numberDiff line change
@@ -1,37 +1,47 @@
1-
version: '3.2'
1+
version: '3.4'
2+
3+
x-mysql: &mysql
4+
environment:
5+
MYSQL_ALLOW_EMPTY_PASSWORD: true
6+
command: >
7+
mysqld
8+
--log-bin=mysql-bin.log
9+
--server-id 1
10+
--binlog-format=row
11+
--gtid_mode=on
12+
--enforce-gtid-consistency=on
13+
14+
x-mariadb: &mariadb
15+
environment:
16+
MARIADB_ALLOW_EMPTY_ROOT_PASSWORD: 1
17+
command: >
18+
--log-bin=master-bin
19+
--server-id=1
20+
--default-authentication-plugin=mysql_native_password
21+
--binlog-format=row
22+
223
services:
324
percona-5.7:
25+
<<: *mysql
426
image: percona:5.7
5-
environment:
6-
MYSQL_ALLOW_EMPTY_PASSWORD: true
727
ports:
8-
- 3306:3306
9-
command: mysqld --log-bin=mysql-bin.log --server-id 1 --binlog-format=row --gtid_mode=on --enforce-gtid-consistency=on --log_slave_updates
28+
- "3306:3306"
1029

1130
percona-5.7-ctl:
31+
<<: *mysql
1232
image: percona:5.7
13-
environment:
14-
MYSQL_ALLOW_EMPTY_PASSWORD: true
1533
ports:
16-
- 3307:3307
17-
command: mysqld --log-bin=mysql-bin.log --server-id 1 --binlog-format=row --gtid_mode=on --enforce-gtid-consistency=on --log_slave_updates -P 3307
34+
- "3307:3306"
1835

1936
mariadb-10.6:
37+
<<: *mariadb
2038
image: mariadb:10.6
21-
environment:
22-
MARIADB_ALLOW_EMPTY_ROOT_PASSWORD: 1
2339
ports:
2440
- "3308:3306"
25-
command: |
26-
--server-id=1
27-
--default-authentication-plugin=mysql_native_password
28-
--log-bin=master-bin
29-
--binlog-format=row
30-
--log-slave-updates=on
3141
volumes:
3242
- type: bind
3343
source: ./.mariadb
3444
target: /opt/key_file
3545
- type: bind
3646
source: ./.mariadb/my.cnf
37-
target: /etc/mysql/my.cnf
47+
target: /etc/mysql/my.cnf

examples/mariadb_gtid/read_event.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import pymysql
22

33
from pymysqlreplication import BinLogStreamReader, gtid
4-
from pymysqlreplication.event import GtidEvent, RotateEvent, MariadbGtidEvent, QueryEvent,MariadbAnnotateRowsEvent
4+
from pymysqlreplication.event import GtidEvent, RotateEvent, MariadbGtidEvent, QueryEvent,MariadbAnnotateRowsEvent, MariadbBinLogCheckPointEvent
55
from pymysqlreplication.row_event import WriteRowsEvent, UpdateRowsEvent, DeleteRowsEvent
66

77
MARIADB_SETTINGS = {
@@ -62,6 +62,7 @@ def query_server_id(self):
6262
blocking=False,
6363
only_events=[
6464
MariadbGtidEvent,
65+
MariadbBinLogCheckPointEvent,
6566
RotateEvent,
6667
WriteRowsEvent,
6768
UpdateRowsEvent,

pymysqlreplication/binlogstream.py

+5-3
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@
1313
XidEvent, GtidEvent, StopEvent, XAPrepareEvent,
1414
BeginLoadQueryEvent, ExecuteLoadQueryEvent,
1515
HeartbeatLogEvent, NotImplementedEvent, MariadbGtidEvent,
16-
MariadbAnnotateRowsEvent, RandEvent, MariadbStartEncryptionEvent, RowsQueryLogEvent)
16+
MariadbAnnotateRowsEvent, RandEvent, MariadbStartEncryptionEvent, RowsQueryLogEvent,
17+
MariadbGtidListEvent, MariadbBinLogCheckPointEvent)
1718
from .exceptions import BinLogNotEnabled
1819
from .gtid import GtidSet
1920
from .packet import BinLogPacketWrapper
@@ -627,6 +628,8 @@ def _allowed_event_list(self, only_events, ignored_events,
627628
MariadbAnnotateRowsEvent,
628629
RandEvent,
629630
MariadbStartEncryptionEvent,
631+
MariadbGtidListEvent,
632+
MariadbBinLogCheckPointEvent
630633
))
631634
if ignored_events is not None:
632635
for e in ignored_events:
@@ -654,9 +657,8 @@ def __get_table_information(self, schema, table):
654657
information_schema.columns
655658
WHERE
656659
table_schema = %s AND table_name = %s
657-
ORDER BY ORDINAL_POSITION
658660
""", (schema, table))
659-
result = cur.fetchall()
661+
result = sorted(cur.fetchall(), key=lambda x: x['ORDINAL_POSITION'])
660662
cur.close()
661663

662664
return result

pymysqlreplication/constants/CHARSET.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ def by_name(self, name, dbms='mysql'):
5454
charset_by_id = charsets.by_id
5555

5656
with open(os.path.join(os.path.dirname(os.path.abspath(__file__)), 'charset_list.csv'), 'r') as f:
57-
f.read() # pass header
57+
f.readline() # pass header
5858
for line in f:
5959
lines = line.split(',')
6060
if len(lines) != 5:

pymysqlreplication/event.py

+55-2
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ def _read_table_id(self):
4040

4141
def dump(self):
4242
print("=== %s ===" % (self.__class__.__name__))
43-
print("Date: %s" % (datetime.datetime.fromtimestamp(self.timestamp)
43+
print("Date: %s" % (datetime.datetime.utcfromtimestamp(self.timestamp)
4444
.isoformat()))
4545
print("Log position: %d" % self.packet.log_pos)
4646
print("Event size: %d" % (self.event_size))
@@ -111,6 +111,25 @@ def _dump(self):
111111
print("Flags:", self.flags)
112112
print('GTID:', self.gtid)
113113

114+
class MariadbBinLogCheckPointEvent(BinLogEvent):
115+
"""
116+
Represents a checkpoint in a binlog event in MariaDB.
117+
118+
More details are available in the MariaDB Knowledge Base:
119+
https://mariadb.com/kb/en/binlog_checkpoint_event/
120+
121+
:ivar filename_length: int - The length of the filename.
122+
:ivar filename: str - The name of the file saved at the checkpoint.
123+
"""
124+
125+
def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs):
126+
super(MariadbBinLogCheckPointEvent, self).__init__(from_packet, event_size, table_map, ctl_connection,
127+
**kwargs)
128+
filename_length = self.packet.read_uint32()
129+
self.filename = self.packet.read(filename_length).decode()
130+
131+
def _dump(self):
132+
print('Filename:', self.filename)
114133

115134
class MariadbAnnotateRowsEvent(BinLogEvent):
116135
"""
@@ -127,7 +146,41 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs)
127146

128147
def _dump(self):
129148
super()._dump()
130-
print("SQL statement :", self.sql_statement)
149+
print("SQL statement :", self.sql_statement)
150+
151+
class MariadbGtidListEvent(BinLogEvent):
152+
"""
153+
GTID List event
154+
https://mariadb.com/kb/en/gtid_list_event/
155+
156+
Attributes:
157+
gtid_length: Number of GTIDs
158+
gtid_list: list of 'MariadbGtidObejct'
159+
160+
'MariadbGtidObejct' Attributes:
161+
domain_id: Replication Domain ID
162+
server_id: Server_ID
163+
gtid_seq_no: GTID sequence
164+
gtid: 'domain_id'+ 'server_id' + 'gtid_seq_no'
165+
"""
166+
def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs):
167+
168+
super(MariadbGtidListEvent, self).__init__(from_packet, event_size, table_map, ctl_connection, **kwargs)
169+
170+
class MariadbGtidObejct(BinLogEvent):
171+
"""
172+
Information class of elements in GTID list
173+
"""
174+
def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs):
175+
super(MariadbGtidObejct, self).__init__(from_packet, event_size, table_map, ctl_connection, **kwargs)
176+
self.domain_id = self.packet.read_uint32()
177+
self.server_id = self.packet.read_uint32()
178+
self.gtid_seq_no = self.packet.read_uint64()
179+
self.gtid = "%d-%d-%d" % (self.domain_id, self.server_id, self.gtid_seq_no)
180+
181+
182+
self.gtid_length = self.packet.read_uint32()
183+
self.gtid_list = [MariadbGtidObejct(from_packet, event_size, table_map, ctl_connection, **kwargs) for i in range(self.gtid_length)]
131184

132185

133186
class RotateEvent(BinLogEvent):

pymysqlreplication/packet.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -88,8 +88,9 @@ class BinLogPacketWrapper(object):
8888
# MariaDB GTID
8989
constants.MARIADB_ANNOTATE_ROWS_EVENT: event.MariadbAnnotateRowsEvent,
9090
constants.MARIADB_BINLOG_CHECKPOINT_EVENT: event.NotImplementedEvent,
91+
constants.MARIADB_BINLOG_CHECKPOINT_EVENT: event.MariadbBinLogCheckPointEvent,
9192
constants.MARIADB_GTID_EVENT: event.MariadbGtidEvent,
92-
constants.MARIADB_GTID_GTID_LIST_EVENT: event.NotImplementedEvent,
93+
constants.MARIADB_GTID_GTID_LIST_EVENT: event.MariadbGtidListEvent,
9394
constants.MARIADB_START_ENCRYPTION_EVENT: event.MariadbStartEncryptionEvent
9495
}
9596

0 commit comments

Comments
 (0)