forked from julien-duponchelle/python-mysql-replication
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmysql_to_kafka.py
96 lines (75 loc) · 2.77 KB
/
mysql_to_kafka.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
#!/usr/bin/env python
#
# Output Kafka events to the console from MySQL replication stream
#
import time
from pymysqlreplication import BinLogStreamReader
from pymysqlreplication.row_event import (
DeleteRowsEvent,
UpdateRowsEvent,
WriteRowsEvent,
)
from kafka.admin import KafkaAdminClient, NewTopic
from kafka.errors import TopicAlreadyExistsError
from kafka import KafkaConsumer, KafkaProducer
MYSQL_SETTINGS = {"host": "127.0.0.1", "port": 3306, "user": "root", "passwd": ""}
def create_kafka_topic(topic_name, num_partitions=1, replication_factor=1):
admin_client = KafkaAdminClient(bootstrap_servers="127.0.0.1:9092")
topic_exists = False
try:
topic_listings = admin_client.list_topics()
topic_exists = topic_name in topic_listings
except TopicAlreadyExistsError:
topic_exists = True
if not topic_exists:
topic = NewTopic(
name=topic_name,
num_partitions=num_partitions,
replication_factor=replication_factor,
)
admin_client.create_topics(new_topics=[topic])
def main():
global message_body, topic
producer = KafkaProducer(
bootstrap_servers="127.0.0.1:9092",
value_serializer=lambda v: str(v).encode("utf-8"),
)
stream = BinLogStreamReader(
connection_settings=MYSQL_SETTINGS,
server_id=3,
only_events=[DeleteRowsEvent, UpdateRowsEvent, WriteRowsEvent],
)
consumer = KafkaConsumer(
"deleted",
"updated",
"created",
bootstrap_servers="127.0.0.1:9092",
value_deserializer=lambda x: x.decode("utf-8"),
auto_offset_reset="earliest",
group_id="1",
)
try:
for binlogevent in stream:
for row in binlogevent.rows:
if isinstance(binlogevent, DeleteRowsEvent):
topic = "deleted"
message_body = row["values"].items()
elif isinstance(binlogevent, UpdateRowsEvent):
topic = "updated"
message_body = row["after_values"].items()
elif isinstance(binlogevent, WriteRowsEvent):
topic = "created"
message_body = row["values"].items()
producer.send(topic, key=None, value=dict(message_body))
for message in consumer:
print(f'Topic: "{message.topic}", Value: "{message.value}"')
except KeyboardInterrupt:
stream.close()
producer.close()
time.sleep(1)
consumer.close()
if __name__ == "__main__":
create_kafka_topic("deleted", num_partitions=3, replication_factor=1)
create_kafka_topic("created", num_partitions=3, replication_factor=1)
create_kafka_topic("updated", num_partitions=3, replication_factor=1)
main()