Skip to content

Commit 1d2c8ab

Browse files
Merge pull request #561 from HongGeonUi/example/kafka
Add : Example for MySQL to Kafka events
2 parents b542936 + fd1e041 commit 1d2c8ab

File tree

1 file changed

+96
-0
lines changed

1 file changed

+96
-0
lines changed

examples/mysql_to_kafka.py

+96
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
#!/usr/bin/env python
2+
3+
#
4+
# Output Kafka events to the console from MySQL replication stream
5+
#
6+
7+
import time
8+
9+
from pymysqlreplication import BinLogStreamReader
10+
from pymysqlreplication.row_event import (
11+
DeleteRowsEvent,
12+
UpdateRowsEvent,
13+
WriteRowsEvent,
14+
)
15+
16+
from kafka.admin import KafkaAdminClient, NewTopic
17+
from kafka.errors import TopicAlreadyExistsError
18+
from kafka import KafkaConsumer, KafkaProducer
19+
20+
MYSQL_SETTINGS = {"host": "127.0.0.1", "port": 3306, "user": "root", "passwd": ""}
21+
22+
23+
def create_kafka_topic(topic_name, num_partitions=1, replication_factor=1):
24+
admin_client = KafkaAdminClient(bootstrap_servers="127.0.0.1:9092")
25+
26+
topic_exists = False
27+
try:
28+
topic_listings = admin_client.list_topics()
29+
topic_exists = topic_name in topic_listings
30+
except TopicAlreadyExistsError:
31+
topic_exists = True
32+
33+
if not topic_exists:
34+
topic = NewTopic(
35+
name=topic_name,
36+
num_partitions=num_partitions,
37+
replication_factor=replication_factor,
38+
)
39+
admin_client.create_topics(new_topics=[topic])
40+
41+
42+
def main():
43+
global message_body, topic
44+
producer = KafkaProducer(
45+
bootstrap_servers="127.0.0.1:9092",
46+
value_serializer=lambda v: str(v).encode("utf-8"),
47+
)
48+
49+
stream = BinLogStreamReader(
50+
connection_settings=MYSQL_SETTINGS,
51+
server_id=3,
52+
only_events=[DeleteRowsEvent, UpdateRowsEvent, WriteRowsEvent],
53+
)
54+
55+
consumer = KafkaConsumer(
56+
"deleted",
57+
"updated",
58+
"created",
59+
bootstrap_servers="127.0.0.1:9092",
60+
value_deserializer=lambda x: x.decode("utf-8"),
61+
auto_offset_reset="earliest",
62+
group_id="1",
63+
)
64+
65+
try:
66+
for binlogevent in stream:
67+
for row in binlogevent.rows:
68+
if isinstance(binlogevent, DeleteRowsEvent):
69+
topic = "deleted"
70+
message_body = row["values"].items()
71+
72+
elif isinstance(binlogevent, UpdateRowsEvent):
73+
topic = "updated"
74+
message_body = row["after_values"].items()
75+
76+
elif isinstance(binlogevent, WriteRowsEvent):
77+
topic = "created"
78+
message_body = row["values"].items()
79+
80+
producer.send(topic, key=None, value=dict(message_body))
81+
82+
for message in consumer:
83+
print(f'Topic: "{message.topic}", Value: "{message.value}"')
84+
85+
except KeyboardInterrupt:
86+
stream.close()
87+
producer.close()
88+
time.sleep(1)
89+
consumer.close()
90+
91+
92+
if __name__ == "__main__":
93+
create_kafka_topic("deleted", num_partitions=3, replication_factor=1)
94+
create_kafka_topic("created", num_partitions=3, replication_factor=1)
95+
create_kafka_topic("updated", num_partitions=3, replication_factor=1)
96+
main()

0 commit comments

Comments
 (0)