forked from Altinity/clickhouse-mysql-data-reader
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpumper.py
74 lines (58 loc) · 1.76 KB
/
pumper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from clickhouse_mysql.reader.reader import Reader
from clickhouse_mysql.writer.writer import Writer
import signal
class Pumper(object):
"""
Pump data - read data from reader and push into writer
"""
reader: Reader = None
writer: Writer = None
def __init__(self, reader=None, writer=None):
self.reader = reader
self.writer = writer
if self.reader:
# subscribe on reader's event notifications
self.reader.subscribe({
'WriteRowsEvent': self.write_rows_event,
'UpdateRowsEvent': self.update_rows_event,
'DeleteRowsEvent': self.delete_rows_event,
# 'WriteRowsEvent.EachRow': self.write_rows_event_each_row,
# 'ReaderIdleEvent': self.reader_idle_event,
})
def run(self):
self.reader.read()
def write_rows_event(self, event=None):
"""
WriteRowsEvent handler
:param event:
"""
self.writer.insert(event)
def write_rows_event_each_row(self, event=None):
"""
WriteRowsEvent.EachRow handler
:param event:
"""
self.writer.insert(event)
def reader_idle_event(self):
"""
ReaderIdleEvent handler
"""
self.writer.flush()
def delete_rows_event(self, event=None):
"""
DeleteRowsEvent handler
:param event:
"""
self.writer.delete_row(event)
def update_rows_event(self, event=None):
"""
UpdateRowsEvent handler
:param event:
"""
self.writer.update(event)
def exit_gracefully(self, sig, frame):
self.reader.close()
if __name__ == '__main__':
print("pumper")