Skip to content

Commit 31da057

Browse files
A sample rethinkDB
1 parent 27a3bd8 commit 31da057

File tree

1 file changed

+51
-0
lines changed

1 file changed

+51
-0
lines changed

examples/rethinkdb_sync.py

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
#!/usr/bin/env python
2+
#
3+
# Insert a new element in a RethinkDB database
4+
# when an evenement is trigger in MySQL replication log
5+
#
6+
# Please test with MySQL employees DB available here: https://launchpad.net/test-db/
7+
#
8+
9+
import datetime
10+
import rethinkdb as r
11+
from pymysqlreplication import BinLogStreamReader
12+
from pymysqlreplication.row_event import *
13+
14+
15+
#RethinkDB
16+
r.connect('localhost', 28015, 'mysql')
17+
try:
18+
r.db_drop('mysql').run()
19+
except:
20+
pass
21+
r.db_create('mysql').run()
22+
23+
tables = ['dept_emp', 'dept_manager', 'titles', 'salaries', 'employees', 'departments']
24+
for table in tables:
25+
r.db('mysql').table_create(table).run()
26+
27+
28+
#MySQL
29+
mysql_settings = {'host': '127.0.0.1', 'port': 3306, 'user': 'root', 'passwd': ''}
30+
stream = BinLogStreamReader(connection_settings = mysql_settings,
31+
only_events = [DeleteRowsEvent, WriteRowsEvent, UpdateRowsEvent], blocking = True)
32+
33+
#Process Feed
34+
for binlogevent in stream:
35+
prefix = "%s:%s:" % (binlogevent.schema, binlogevent.table)
36+
37+
for row in binlogevent.rows:
38+
if binlogevent.schema == 'employees':
39+
if isinstance(binlogevent, WriteRowsEvent):
40+
vals = {}
41+
for (k, v) in row["values"].items():
42+
if isinstance(v, datetime.date):
43+
vals[str(k)] = str(v)
44+
else:
45+
vals[str(k)] = v
46+
r.table(binlogevent.table).insert(vals).run()
47+
48+
stream.close()
49+
50+
51+

0 commit comments

Comments
 (0)