Skip to content

Commit b72d476

Browse files
Merge pull request julien-duponchelle#547 from myminju/add-rabbitmq-example
add: send mysql_to_rabbitmq.py
2 parents d180d65 + fb6b97f commit b72d476

File tree

1 file changed

+70
-0
lines changed

1 file changed

+70
-0
lines changed

examples/mysql_to_rabbitmq.py

+70
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
#!/usr/bin/env python
2+
3+
#
4+
# Update a RabbitMQ when an event is triggered
5+
# in MySQL replication log
6+
#
7+
8+
import json
9+
import pika
10+
11+
from pika import DeliveryMode
12+
from pymysqlreplication import BinLogStreamReader
13+
from pymysqlreplication.row_event import (
14+
DeleteRowsEvent,
15+
UpdateRowsEvent,
16+
WriteRowsEvent,
17+
)
18+
19+
MYSQL_SETTINGS = {"host": "127.0.0.1", "port": 3306, "user": "root",
20+
"passwd": "password"}
21+
22+
def main():
23+
stream = BinLogStreamReader(
24+
connection_settings=MYSQL_SETTINGS,
25+
server_id=3,
26+
only_events=[DeleteRowsEvent, WriteRowsEvent, UpdateRowsEvent],
27+
)
28+
29+
credentials = pika.PlainCredentials(
30+
username='username',
31+
password='password'
32+
)
33+
params = pika.ConnectionParameters('rabbitmq_host', credentials=credentials)
34+
35+
# RabbitMQ Connection Settings
36+
conn = pika.BlockingConnection(params)
37+
channel = conn.channel()
38+
channel.queue_declare(queue='order')
39+
channel.exchange_declare(durable=True, exchange_type='direct', exchange='direct')
40+
channel.queue_bind(queue='order', exchange='direct', routing_key='order')
41+
42+
for binlogevent in stream:
43+
for row in binlogevent.rows:
44+
if isinstance(binlogevent, DeleteRowsEvent):
45+
routing_key = "order"
46+
message_body = row["values"].items()
47+
48+
elif isinstance(binlogevent, UpdateRowsEvent):
49+
routing_key = "order"
50+
message_body = row["after_values"].items()
51+
52+
elif isinstance(binlogevent, WriteRowsEvent):
53+
routing_key = "order"
54+
message_body = row["values"].items()
55+
56+
properties = pika.BasicProperties(content_type='application/json',
57+
delivery_mode=DeliveryMode.Transient)
58+
channel.basic_publish(
59+
exchange='direct',
60+
routing_key=routing_key,
61+
body=json.dumps(message_body, default=lambda x: str(x)),
62+
properties=properties
63+
)
64+
65+
stream.close()
66+
conn.close()
67+
68+
69+
if __name__ == '__main__':
70+
main()

0 commit comments

Comments
 (0)