From 4d27faf4df6614c3dd733bef6fe88f92e7f93298 Mon Sep 17 00:00:00 2001 From: "minju.jeon" Date: Sun, 15 Oct 2023 17:25:52 +0900 Subject: [PATCH 1/2] Add: send mysql_to_rabbitmq.py closes 545 --- examples/mysql_to_rabbitmq.py | 67 +++++++++++++++++++++++++++++++++++ 1 file changed, 67 insertions(+) create mode 100644 examples/mysql_to_rabbitmq.py diff --git a/examples/mysql_to_rabbitmq.py b/examples/mysql_to_rabbitmq.py new file mode 100644 index 00000000..f6a57a69 --- /dev/null +++ b/examples/mysql_to_rabbitmq.py @@ -0,0 +1,67 @@ +#!/usr/bin/env python + +# +# Update a RabbitMQ when an event is triggered +# in MySQL replication log +# + +import json +import pika + +from pika import DeliveryMode +from pymysqlreplication import BinLogStreamReader +from pymysqlreplication.row_event import ( + DeleteRowsEvent, + UpdateRowsEvent, + WriteRowsEvent, +) + +MYSQL_SETTINGS = {"host": "127.0.0.1", "port": 3306, "user": "root", + "passwd": "password"} + +def main(): + stream = BinLogStreamReader( + connection_settings=MYSQL_SETTINGS, + server_id=3, + only_events=[DeleteRowsEvent, WriteRowsEvent, UpdateRowsEvent], + ) + + credentials = pika.PlainCredentials( + username='username', + password='password' + ) + params = pika.ConnectionParameters('rabbitmq_host', credentials=credentials) + + # RabbitMQ Connection Settings + conn = pika.BlockingConnection(params) + channel = conn.channel() + channel.queue_declare(queue='order') + channel.exchange_declare(durable=True, exchange_type='direct', exchange='direct') + channel.queue_bind(queue='order', exchange='direct', routing_key='order') + + for binlogevent in stream: + for row in binlogevent.rows: + if isinstance(binlogevent, DeleteRowsEvent): + routing_key = "order" + message_body = row["values"].items() + + elif isinstance(binlogevent, UpdateRowsEvent): + routing_key = "order" + message_body = row["after_values"].items() + + elif isinstance(binlogevent, WriteRowsEvent): + routing_key = "order" + message_body = row["values"].items() + + properties = pika.BasicProperties(content_type='application/json', + delivery_mode=DeliveryMode.Transient) + + channel.basic_publish(exchange='direct', routing_key=routing_key, + body=json.dumps(dict(message_body)), properties=properties) + + stream.close() + conn.close() + + +if __name__ == '__main__': + main() \ No newline at end of file From fb6b97f1db1249ee5cdc26d2bc6b0bd3a1e2613a Mon Sep 17 00:00:00 2001 From: "will.k" Date: Thu, 19 Oct 2023 22:54:34 +0900 Subject: [PATCH 2/2] Fix serialization issues where column type is temporal or JSON --- examples/mysql_to_rabbitmq.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/examples/mysql_to_rabbitmq.py b/examples/mysql_to_rabbitmq.py index f6a57a69..8e6e527d 100644 --- a/examples/mysql_to_rabbitmq.py +++ b/examples/mysql_to_rabbitmq.py @@ -55,9 +55,12 @@ def main(): properties = pika.BasicProperties(content_type='application/json', delivery_mode=DeliveryMode.Transient) - - channel.basic_publish(exchange='direct', routing_key=routing_key, - body=json.dumps(dict(message_body)), properties=properties) + channel.basic_publish( + exchange='direct', + routing_key=routing_key, + body=json.dumps(message_body, default=lambda x: str(x)), + properties=properties + ) stream.close() conn.close()