4
4
# Output Kafka events to the console from MySQL replication stream
5
5
#
6
6
7
+ import time
8
+
7
9
from pymysqlreplication import BinLogStreamReader
8
10
from pymysqlreplication .row_event import (
9
11
DeleteRowsEvent ,
12
14
)
13
15
14
16
from kafka .admin import KafkaAdminClient , NewTopic
17
+ from kafka .errors import TopicAlreadyExistsError
15
18
from kafka import KafkaConsumer , KafkaProducer
16
19
17
20
MYSQL_SETTINGS = {"host" : "127.0.0.1" , "port" : 3306 , "user" : "root" , "passwd" : "" }
@@ -25,12 +28,11 @@ def create_kafka_topic(topic_name, num_partitions=1, replication_factor=1):
25
28
num_partitions = num_partitions ,
26
29
replication_factor = replication_factor ,
27
30
)
28
-
29
31
admin_client .create_topics (new_topics = [topic ])
30
32
31
33
32
34
def main ():
33
- global message_body
35
+ global message_body , topic
34
36
producer = KafkaProducer (
35
37
bootstrap_servers = "127.0.0.1:9092" ,
36
38
value_serializer = lambda v : str (v ).encode ("utf-8" ),
@@ -42,22 +44,6 @@ def main():
42
44
only_events = [DeleteRowsEvent , UpdateRowsEvent , WriteRowsEvent ],
43
45
)
44
46
45
- for binlogevent in stream :
46
- for row in binlogevent .rows :
47
- if isinstance (binlogevent , DeleteRowsEvent ):
48
- topic = "deleted"
49
- message_body = row ["values" ].items ()
50
-
51
- elif isinstance (binlogevent , UpdateRowsEvent ):
52
- topic = "updated"
53
- message_body = row ["after_values" ].items ()
54
-
55
- elif isinstance (binlogevent , WriteRowsEvent ):
56
- topic = "created"
57
- message_body = row ["values" ].items ()
58
-
59
- producer .send (topic , key = None , value = dict (message_body ))
60
-
61
47
consumer = KafkaConsumer (
62
48
"deleted" ,
63
49
"updated" ,
@@ -68,12 +54,31 @@ def main():
68
54
group_id = "1" ,
69
55
)
70
56
71
- for message in consumer :
72
- print (f'Topic: "{ message .topic } ", Value: "{ message .value } "' )
57
+ try :
58
+ for binlogevent in stream :
59
+ for row in binlogevent .rows :
60
+ if isinstance (binlogevent , DeleteRowsEvent ):
61
+ topic = "deleted"
62
+ message_body = row ["values" ].items ()
63
+
64
+ elif isinstance (binlogevent , UpdateRowsEvent ):
65
+ topic = "updated"
66
+ message_body = row ["after_values" ].items ()
67
+
68
+ elif isinstance (binlogevent , WriteRowsEvent ):
69
+ topic = "created"
70
+ message_body = row ["values" ].items ()
71
+
72
+ producer .send (topic , key = None , value = dict (message_body ))
73
+
74
+ for message in consumer :
75
+ print (f'Topic: "{ message .topic } ", Value: "{ message .value } "' )
73
76
74
- stream .close ()
75
- producer .close ()
76
- consumer .close ()
77
+ except KeyboardInterrupt :
78
+ stream .close ()
79
+ producer .close ()
80
+ time .sleep (1 )
81
+ consumer .close ()
77
82
78
83
79
84
if __name__ == "__main__" :
0 commit comments