From bc9d04d753d0906003eb5c08570f75613ebbd5ad Mon Sep 17 00:00:00 2001 From: Geonui Date: Tue, 17 Oct 2023 00:27:14 +0900 Subject: [PATCH 1/5] Add : Example for MySQL to Kafka --- examples/mysql_to_kafka.py | 86 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 86 insertions(+) create mode 100644 examples/mysql_to_kafka.py diff --git a/examples/mysql_to_kafka.py b/examples/mysql_to_kafka.py new file mode 100644 index 00000000..68e57692 --- /dev/null +++ b/examples/mysql_to_kafka.py @@ -0,0 +1,86 @@ +#!/usr/bin/env python + +# +# Output Kafka events to the console from MySQL replication stream +# + +from pymysqlreplication import BinLogStreamReader +from pymysqlreplication.row_event import ( + DeleteRowsEvent, + UpdateRowsEvent, + WriteRowsEvent, +) + +from kafka.admin import KafkaAdminClient, NewTopic +from kafka import KafkaConsumer, KafkaProducer + +MYSQL_SETTINGS = {"host": "127.0.0.1", "port": 3306, "user": "root", "passwd": ""} + + +def create_kafka_topic(topic_name, num_partitions=1, replication_factor=1): + admin_client = KafkaAdminClient(bootstrap_servers="127.0.0.1:9092") + + topic = NewTopic( + name=topic_name, + num_partitions=num_partitions, + replication_factor=replication_factor + ) + + admin_client.create_topics(new_topics=[topic]) + + +def main(): + global message_body + producer = KafkaProducer( + bootstrap_servers='127.0.0.1:9092', + value_serializer=lambda v: str(v).encode('utf-8') + ) + + stream = BinLogStreamReader( + connection_settings=MYSQL_SETTINGS, + server_id=3, + only_events=[DeleteRowsEvent,UpdateRowsEvent,WriteRowsEvent] + ) + + for binlogevent in stream: + + for row in binlogevent.rows: + if isinstance(binlogevent, DeleteRowsEvent): + topic = "deleted" + message_body = row["values"].items() + + elif isinstance(binlogevent, UpdateRowsEvent): + topic = "updated" + message_body = row["after_values"].items() + + elif isinstance(binlogevent, WriteRowsEvent): + topic = "created" + message_body = row["values"].items() + + producer.send(topic, key=None, value=message_body) + + + + consumer = KafkaConsumer( + 'deleted', + 'updated', + 'created', + bootstrap_servers='127.0.0.1:9092', + value_deserializer=lambda x: x.decode('utf-8'), + auto_offset_reset='earliest', + group_id = '1' + ) + + for message in consumer: + print(f"Topic: {message.topic}, Value: {message.value}") + + stream.close() + producer.close() + consumer.close() + + +if __name__ == "__main__": + create_kafka_topic("deleted", num_partitions=3, replication_factor=1) + create_kafka_topic("created", num_partitions=3, replication_factor=1) + create_kafka_topic("updated", num_partitions=3, replication_factor=1) + main() From 006e2b1afa63b2f059c135eb1c926b56c5b4059d Mon Sep 17 00:00:00 2001 From: Geonui Date: Tue, 17 Oct 2023 00:29:45 +0900 Subject: [PATCH 2/5] Add : Example for MySQL to Kafka --- examples/mysql_to_kafka.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/examples/mysql_to_kafka.py b/examples/mysql_to_kafka.py index 68e57692..e29e1c9a 100644 --- a/examples/mysql_to_kafka.py +++ b/examples/mysql_to_kafka.py @@ -59,8 +59,6 @@ def main(): producer.send(topic, key=None, value=message_body) - - consumer = KafkaConsumer( 'deleted', 'updated', From 8576b843b2531c59f547d3d946b718f031959791 Mon Sep 17 00:00:00 2001 From: Geonui Date: Tue, 17 Oct 2023 11:28:06 +0900 Subject: [PATCH 3/5] Fix: Wrapping message_body with dict --- examples/mysql_to_kafka.py | 27 +++++++++++++-------------- 1 file changed, 13 insertions(+), 14 deletions(-) diff --git a/examples/mysql_to_kafka.py b/examples/mysql_to_kafka.py index e29e1c9a..1d8e4662 100644 --- a/examples/mysql_to_kafka.py +++ b/examples/mysql_to_kafka.py @@ -23,7 +23,7 @@ def create_kafka_topic(topic_name, num_partitions=1, replication_factor=1): topic = NewTopic( name=topic_name, num_partitions=num_partitions, - replication_factor=replication_factor + replication_factor=replication_factor, ) admin_client.create_topics(new_topics=[topic]) @@ -32,18 +32,17 @@ def create_kafka_topic(topic_name, num_partitions=1, replication_factor=1): def main(): global message_body producer = KafkaProducer( - bootstrap_servers='127.0.0.1:9092', - value_serializer=lambda v: str(v).encode('utf-8') + bootstrap_servers="127.0.0.1:9092", + value_serializer=lambda v: str(v).encode("utf-8"), ) stream = BinLogStreamReader( connection_settings=MYSQL_SETTINGS, server_id=3, - only_events=[DeleteRowsEvent,UpdateRowsEvent,WriteRowsEvent] + only_events=[DeleteRowsEvent, UpdateRowsEvent, WriteRowsEvent], ) for binlogevent in stream: - for row in binlogevent.rows: if isinstance(binlogevent, DeleteRowsEvent): topic = "deleted" @@ -57,20 +56,20 @@ def main(): topic = "created" message_body = row["values"].items() - producer.send(topic, key=None, value=message_body) + producer.send(topic, key=None, value=dict(message_body)) consumer = KafkaConsumer( - 'deleted', - 'updated', - 'created', - bootstrap_servers='127.0.0.1:9092', - value_deserializer=lambda x: x.decode('utf-8'), - auto_offset_reset='earliest', - group_id = '1' + "deleted", + "updated", + "created", + bootstrap_servers="127.0.0.1:9092", + value_deserializer=lambda x: x.decode("utf-8"), + auto_offset_reset="earliest", + group_id="1", ) for message in consumer: - print(f"Topic: {message.topic}, Value: {message.value}") + print(f'Topic: "{message.topic}", Value: "{message.value}"') stream.close() producer.close() From 25a3e7d5e9e8fa2ff04c9d78f08913ba6d5e41f0 Mon Sep 17 00:00:00 2001 From: Geonui Date: Wed, 18 Oct 2023 14:05:04 +0900 Subject: [PATCH 4/5] Fix: handle the loop with try,except --- examples/mysql_to_kafka.py | 51 +++++++++++++++++++++----------------- 1 file changed, 28 insertions(+), 23 deletions(-) diff --git a/examples/mysql_to_kafka.py b/examples/mysql_to_kafka.py index 1d8e4662..e2cc31ab 100644 --- a/examples/mysql_to_kafka.py +++ b/examples/mysql_to_kafka.py @@ -4,6 +4,8 @@ # Output Kafka events to the console from MySQL replication stream # +import time + from pymysqlreplication import BinLogStreamReader from pymysqlreplication.row_event import ( DeleteRowsEvent, @@ -12,6 +14,7 @@ ) from kafka.admin import KafkaAdminClient, NewTopic +from kafka.errors import TopicAlreadyExistsError from kafka import KafkaConsumer, KafkaProducer MYSQL_SETTINGS = {"host": "127.0.0.1", "port": 3306, "user": "root", "passwd": ""} @@ -25,12 +28,11 @@ def create_kafka_topic(topic_name, num_partitions=1, replication_factor=1): num_partitions=num_partitions, replication_factor=replication_factor, ) - admin_client.create_topics(new_topics=[topic]) def main(): - global message_body + global message_body, topic producer = KafkaProducer( bootstrap_servers="127.0.0.1:9092", value_serializer=lambda v: str(v).encode("utf-8"), @@ -42,22 +44,6 @@ def main(): only_events=[DeleteRowsEvent, UpdateRowsEvent, WriteRowsEvent], ) - for binlogevent in stream: - for row in binlogevent.rows: - if isinstance(binlogevent, DeleteRowsEvent): - topic = "deleted" - message_body = row["values"].items() - - elif isinstance(binlogevent, UpdateRowsEvent): - topic = "updated" - message_body = row["after_values"].items() - - elif isinstance(binlogevent, WriteRowsEvent): - topic = "created" - message_body = row["values"].items() - - producer.send(topic, key=None, value=dict(message_body)) - consumer = KafkaConsumer( "deleted", "updated", @@ -68,12 +54,31 @@ def main(): group_id="1", ) - for message in consumer: - print(f'Topic: "{message.topic}", Value: "{message.value}"') + try: + for binlogevent in stream: + for row in binlogevent.rows: + if isinstance(binlogevent, DeleteRowsEvent): + topic = "deleted" + message_body = row["values"].items() + + elif isinstance(binlogevent, UpdateRowsEvent): + topic = "updated" + message_body = row["after_values"].items() + + elif isinstance(binlogevent, WriteRowsEvent): + topic = "created" + message_body = row["values"].items() + + producer.send(topic, key=None, value=dict(message_body)) + + for message in consumer: + print(f'Topic: "{message.topic}", Value: "{message.value}"') - stream.close() - producer.close() - consumer.close() + except KeyboardInterrupt: + stream.close() + producer.close() + time.sleep(1) + consumer.close() if __name__ == "__main__": From fd1e04132f92f074d694bd36a405726da28ed067 Mon Sep 17 00:00:00 2001 From: Geonui Date: Wed, 18 Oct 2023 14:05:48 +0900 Subject: [PATCH 5/5] Fix: handle topic alreadyExistsError --- examples/mysql_to_kafka.py | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/examples/mysql_to_kafka.py b/examples/mysql_to_kafka.py index e2cc31ab..13d74d02 100644 --- a/examples/mysql_to_kafka.py +++ b/examples/mysql_to_kafka.py @@ -23,12 +23,20 @@ def create_kafka_topic(topic_name, num_partitions=1, replication_factor=1): admin_client = KafkaAdminClient(bootstrap_servers="127.0.0.1:9092") - topic = NewTopic( - name=topic_name, - num_partitions=num_partitions, - replication_factor=replication_factor, - ) - admin_client.create_topics(new_topics=[topic]) + topic_exists = False + try: + topic_listings = admin_client.list_topics() + topic_exists = topic_name in topic_listings + except TopicAlreadyExistsError: + topic_exists = True + + if not topic_exists: + topic = NewTopic( + name=topic_name, + num_partitions=num_partitions, + replication_factor=replication_factor, + ) + admin_client.create_topics(new_topics=[topic]) def main():