Skip to content

Commit bc9d04d

Browse files
committed
Add : Example for MySQL to Kafka
1 parent 7262698 commit bc9d04d

File tree

1 file changed

+86
-0
lines changed

1 file changed

+86
-0
lines changed

examples/mysql_to_kafka.py

+86
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
#!/usr/bin/env python
2+
3+
#
4+
# Output Kafka events to the console from MySQL replication stream
5+
#
6+
7+
from pymysqlreplication import BinLogStreamReader
8+
from pymysqlreplication.row_event import (
9+
DeleteRowsEvent,
10+
UpdateRowsEvent,
11+
WriteRowsEvent,
12+
)
13+
14+
from kafka.admin import KafkaAdminClient, NewTopic
15+
from kafka import KafkaConsumer, KafkaProducer
16+
17+
MYSQL_SETTINGS = {"host": "127.0.0.1", "port": 3306, "user": "root", "passwd": ""}
18+
19+
20+
def create_kafka_topic(topic_name, num_partitions=1, replication_factor=1):
21+
admin_client = KafkaAdminClient(bootstrap_servers="127.0.0.1:9092")
22+
23+
topic = NewTopic(
24+
name=topic_name,
25+
num_partitions=num_partitions,
26+
replication_factor=replication_factor
27+
)
28+
29+
admin_client.create_topics(new_topics=[topic])
30+
31+
32+
def main():
33+
global message_body
34+
producer = KafkaProducer(
35+
bootstrap_servers='127.0.0.1:9092',
36+
value_serializer=lambda v: str(v).encode('utf-8')
37+
)
38+
39+
stream = BinLogStreamReader(
40+
connection_settings=MYSQL_SETTINGS,
41+
server_id=3,
42+
only_events=[DeleteRowsEvent,UpdateRowsEvent,WriteRowsEvent]
43+
)
44+
45+
for binlogevent in stream:
46+
47+
for row in binlogevent.rows:
48+
if isinstance(binlogevent, DeleteRowsEvent):
49+
topic = "deleted"
50+
message_body = row["values"].items()
51+
52+
elif isinstance(binlogevent, UpdateRowsEvent):
53+
topic = "updated"
54+
message_body = row["after_values"].items()
55+
56+
elif isinstance(binlogevent, WriteRowsEvent):
57+
topic = "created"
58+
message_body = row["values"].items()
59+
60+
producer.send(topic, key=None, value=message_body)
61+
62+
63+
64+
consumer = KafkaConsumer(
65+
'deleted',
66+
'updated',
67+
'created',
68+
bootstrap_servers='127.0.0.1:9092',
69+
value_deserializer=lambda x: x.decode('utf-8'),
70+
auto_offset_reset='earliest',
71+
group_id = '1'
72+
)
73+
74+
for message in consumer:
75+
print(f"Topic: {message.topic}, Value: {message.value}")
76+
77+
stream.close()
78+
producer.close()
79+
consumer.close()
80+
81+
82+
if __name__ == "__main__":
83+
create_kafka_topic("deleted", num_partitions=3, replication_factor=1)
84+
create_kafka_topic("created", num_partitions=3, replication_factor=1)
85+
create_kafka_topic("updated", num_partitions=3, replication_factor=1)
86+
main()

0 commit comments

Comments
 (0)