@@ -23,7 +23,7 @@ def create_kafka_topic(topic_name, num_partitions=1, replication_factor=1):
23
23
topic = NewTopic (
24
24
name = topic_name ,
25
25
num_partitions = num_partitions ,
26
- replication_factor = replication_factor
26
+ replication_factor = replication_factor ,
27
27
)
28
28
29
29
admin_client .create_topics (new_topics = [topic ])
@@ -32,18 +32,17 @@ def create_kafka_topic(topic_name, num_partitions=1, replication_factor=1):
32
32
def main ():
33
33
global message_body
34
34
producer = KafkaProducer (
35
- bootstrap_servers = ' 127.0.0.1:9092' ,
36
- value_serializer = lambda v : str (v ).encode (' utf-8' )
35
+ bootstrap_servers = " 127.0.0.1:9092" ,
36
+ value_serializer = lambda v : str (v ).encode (" utf-8" ),
37
37
)
38
38
39
39
stream = BinLogStreamReader (
40
40
connection_settings = MYSQL_SETTINGS ,
41
41
server_id = 3 ,
42
- only_events = [DeleteRowsEvent ,UpdateRowsEvent ,WriteRowsEvent ]
42
+ only_events = [DeleteRowsEvent , UpdateRowsEvent , WriteRowsEvent ],
43
43
)
44
44
45
45
for binlogevent in stream :
46
-
47
46
for row in binlogevent .rows :
48
47
if isinstance (binlogevent , DeleteRowsEvent ):
49
48
topic = "deleted"
@@ -57,20 +56,20 @@ def main():
57
56
topic = "created"
58
57
message_body = row ["values" ].items ()
59
58
60
- producer .send (topic , key = None , value = message_body )
59
+ producer .send (topic , key = None , value = dict ( message_body ) )
61
60
62
61
consumer = KafkaConsumer (
63
- ' deleted' ,
64
- ' updated' ,
65
- ' created' ,
66
- bootstrap_servers = ' 127.0.0.1:9092' ,
67
- value_deserializer = lambda x : x .decode (' utf-8' ),
68
- auto_offset_reset = ' earliest' ,
69
- group_id = '1'
62
+ " deleted" ,
63
+ " updated" ,
64
+ " created" ,
65
+ bootstrap_servers = " 127.0.0.1:9092" ,
66
+ value_deserializer = lambda x : x .decode (" utf-8" ),
67
+ auto_offset_reset = " earliest" ,
68
+ group_id = "1" ,
70
69
)
71
70
72
71
for message in consumer :
73
- print (f" Topic: { message .topic } , Value: { message .value } " )
72
+ print (f' Topic: " { message .topic } " , Value: " { message .value } "' )
74
73
75
74
stream .close ()
76
75
producer .close ()
0 commit comments