Skip to content

Commit 2b5de27

Browse files
Merge pull request #21 from tinybirdco/unify-syncers
unify syncers in one datasource
2 parents ff5ac51 + 6dd16a2 commit 2b5de27

File tree

8 files changed

+38
-245
lines changed

8 files changed

+38
-245
lines changed

clickhouse_mysql/pool/bbpool.py

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -149,18 +149,7 @@ def rotate_belt(self, belt_index, flush=False):
149149
# time to flush data for specified key
150150
#self.writer_builder.param('csv_file_path_suffix_parts', [str(int(now)), str(self.buckets_num_total)])
151151
writer = self.writer_builder.new()
152-
item = self.belts[belt_index].pop()
153-
# process event based on its type
154-
if isinstance(item[0].pymysqlreplication_event, WriteRowsEvent):
155-
writer.insert(item)
156-
elif isinstance(item[0].pymysqlreplication_event, DeleteRowsEvent):
157-
writer.delete(item)
158-
elif isinstance(item[0].pymysqlreplication_event, UpdateRowsEvent):
159-
writer.update(item)
160-
else:
161-
# skip other unhandled events
162-
pass
163-
# writer.insert(self.belts[belt_index].pop())
152+
writer.insert(self.belts[belt_index].pop())
164153
writer.close()
165154
writer.push()
166155
writer.destroy()

clickhouse_mysql/reader/mysqlreader.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,7 @@ def stat_write_rows_event_finalyse(self):
239239

240240
def process_first_event(self, event):
241241
if "{}.{}".format(event.schema, event.table) not in self.first_rows_passed:
242-
Util.log_row(event.first_row(), "first row in replication {}.{}".format(event.schema, event.table))
242+
Util.log_row(event.first_row(), "first row in replication {}.{} - binlog pos {}".format(event.schema, event.table, event.pymysqlreplication_event.packet.log_pos))
243243
self.first_rows_passed.append("{}.{}".format(event.schema, event.table))
244244
logging.info(self.first_rows_passed)
245245

@@ -403,8 +403,7 @@ def read(self):
403403
self.stat_init_fetch_loop()
404404

405405
try:
406-
logging.debug('Pre-start binlog position: ' + self.binlog_stream.log_file + ":" + str(
407-
self.binlog_stream.log_pos) if self.binlog_stream.log_pos is not None else "undef")
406+
logging.debug('Pre-start binlog position: ' + self.binlog_stream.log_file + ":" + str(self.binlog_stream.log_pos) if self.binlog_stream.log_pos is not None else "undef")
408407

409408
# fetch available events from MySQL
410409
for mysql_event in self.binlog_stream:
@@ -426,8 +425,8 @@ def read(self):
426425
# skip other unhandled events
427426
pass
428427

429-
# after event processed, we need to handle current binlog position
430-
self.process_binlog_position(self.binlog_stream.log_file, self.binlog_stream.log_pos)
428+
# after event processed, we need to handle current binlog position
429+
self.process_binlog_position(self.binlog_stream.log_file, self.binlog_stream.log_pos)
431430

432431
except Exception as ex:
433432
if self.blocking:

clickhouse_mysql/writer/csvwriter.py

Lines changed: 27 additions & 140 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import copy
88
import time
99
import uuid
10+
import json
1011

1112
from clickhouse_mysql.writer.writer import Writer
1213
from clickhouse_mysql.event.event import Event
@@ -125,9 +126,17 @@ def insert(self, event_or_events):
125126

126127
event_converted = self.convert(event)
127128
rows = event_converted.pymysqlreplication_event.rows
128-
headers = list(rows[0]['values'].keys())
129+
if 'after_values' in rows[0].keys():
130+
headers = list(rows[0]['after_values'].keys())
131+
else:
132+
headers = list(rows[0]['values'].keys())
129133
headers.insert(0, 'operation')
130134
headers.insert(1, 'tb_upd')
135+
headers.insert(2, 'table')
136+
headers.insert(3, 'schema')
137+
headers.insert(4, 'log_pos')
138+
headers.insert(5, 'binlog_timestamp')
139+
headers.insert(6, 'payload')
131140

132141
# self.fieldnames = sorted(self.convert(copy.copy(event.first_row())).keys())
133142
self.fieldnames = headers
@@ -136,59 +145,7 @@ def insert(self, event_or_events):
136145
if self.dst_table is None:
137146
self.dst_table = event.table
138147

139-
self.writer = csv.DictWriter(self.file, fieldnames=self.fieldnames, quoting=csv.QUOTE_NONNUMERIC)
140-
if not self.header_written:
141-
self.writer.writeheader()
142-
143-
for event in events:
144-
if not event.verify:
145-
logging.warning('Event verification failed. Skip one event. Event: %s Class: %s', event.meta(), __class__)
146-
continue # for event
147-
self.generate_row(event)
148-
149-
def delete_row(self, event_or_events):
150-
151-
# event_or_events = [
152-
# event: {
153-
# row: {'id': 3, 'a': 3}
154-
# },
155-
# event: {
156-
# row: {'id': 3, 'a': 3}
157-
# },
158-
# ]
159-
160-
logging.debug("Delete CSV Writer")
161-
162-
events = self.listify(event_or_events)
163-
if len(events) < 1:
164-
logging.warning('No events to delete. class: %s', __class__)
165-
return
166-
167-
# assume we have at least one Event
168-
169-
logging.debug('class:%s delete %d events', __class__, len(events))
170-
171-
if not self.opened():
172-
self.open()
173-
174-
if not self.writer:
175-
# pick any event from the list
176-
event = events[0]
177-
if not event.verify:
178-
logging.warning('Event verification failed. Skip insert(). Event: %s Class: %s', event.meta(), __class__)
179-
return
180-
181-
event_converted = self.convert(event)
182-
rows = event_converted.pymysqlreplication_event.rows
183-
headers = list(rows[0]['values'].keys())
184-
headers.insert(0, 'operation')
185-
headers.insert(1, 'tb_upd')
186-
187-
self.fieldnames = headers
188-
if self.dst_schema is None:
189-
self.dst_schema = event.schema
190-
if self.dst_table is None:
191-
self.dst_table = event.table
148+
self.fieldnames = self.fieldnames[0:7] # get only operation, tb_upd, table and payload
192149

193150
self.writer = csv.DictWriter(self.file, fieldnames=self.fieldnames, quoting=csv.QUOTE_NONNUMERIC)
194151
if not self.header_written:
@@ -200,72 +157,6 @@ def delete_row(self, event_or_events):
200157
continue # for event
201158
self.generate_row(event)
202159

203-
204-
205-
def update(self, event_or_events):
206-
207-
# event_or_events = [
208-
# event: {
209-
# row: {
210-
# 'before_values': {'id': 3, 'a': 3},
211-
# 'after_values': {'id': 3, 'a': 2}
212-
# }
213-
# },
214-
# event: {
215-
# row: {
216-
# 'before_values': {'id': 2, 'a': 3},
217-
# 'after_values': {'id': 2, 'a': 2}
218-
# }
219-
# },
220-
# ]
221-
222-
logging.debug("Update CSV Writer")
223-
224-
events = self.listify(event_or_events)
225-
if len(events) < 1:
226-
logging.warning('No events to update. class: %s', __class__)
227-
return
228-
229-
# assume we have at least one Event
230-
231-
logging.debug('class:%s updated %d events', __class__, len(events))
232-
233-
if not self.opened():
234-
self.open()
235-
236-
if not self.writer:
237-
# pick any event from the list
238-
event = events[0]
239-
if not event.verify:
240-
logging.warning('Event verification failed. Skip insert(). Event: %s Class: %s', event.meta(), __class__)
241-
return
242-
243-
event_converted = self.convert(event)
244-
rows = event_converted.pymysqlreplication_event.rows
245-
headers = list(rows[0]['after_values'].keys())
246-
headers.insert(0, 'operation')
247-
headers.insert(1, 'tb_upd')
248-
249-
# self.fieldnames = sorted(headers)
250-
self.fieldnames = headers
251-
if self.dst_schema is None:
252-
self.dst_schema = event.schema
253-
if self.dst_table is None:
254-
self.dst_table = event.table
255-
256-
self.writer = csv.DictWriter(self.file, fieldnames=self.fieldnames, quoting=csv.QUOTE_NONNUMERIC)
257-
if not self.header_written:
258-
self.writer.writeheader()
259-
260-
for event in events:
261-
if not event.verify:
262-
logging.warning('Event verification failed. Skip one event. Event: %s Class: %s', event.meta(), __class__)
263-
continue # for event
264-
265-
event_converted = self.convert(event)
266-
self.generate_row(event_converted)
267-
268-
269160
def convert_null_values(self, row):
270161
""" We need to mark those fields that are null to be able to distinguish between NULL and empty strings """
271162
for key in list(row.keys()):
@@ -274,26 +165,22 @@ def convert_null_values(self, row):
274165

275166
def generate_row(self, event):
276167
""" When using mempool or csvpool events are cached so you can receive different kind of events in the same list. These events should be handled in a different way """
277-
278-
if isinstance(event.pymysqlreplication_event, WriteRowsEvent):
279-
for row in event:
280-
row['tb_upd'] = datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S.%f')
281-
row['operation'] = 0
282-
self.convert_null_values(row)
283-
self.writer.writerow(self.convert(row))
284-
elif isinstance(event.pymysqlreplication_event, DeleteRowsEvent):
285-
for row in event:
286-
row['tb_upd'] = datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S.%f')
287-
row['operation'] = 2
288-
self.convert_null_values(row)
289-
self.writer.writerow(self.convert(row))
290-
elif isinstance(event.pymysqlreplication_event, UpdateRowsEvent):
291-
for row in event.pymysqlreplication_event.rows:
292-
row['after_values']['tb_upd'] = datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S.%f')
293-
row['after_values']['operation'] = 1
294-
self.convert_null_values(row['after_values'])
295-
self.writer.writerow(self.convert(row['after_values']))
296-
168+
row_w_payload = {}
169+
for row in event:
170+
row_w_payload['tb_upd'] = datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S.%f')
171+
if isinstance(event.pymysqlreplication_event, WriteRowsEvent):
172+
row_w_payload['operation'] = 0
173+
elif isinstance(event.pymysqlreplication_event, DeleteRowsEvent):
174+
row_w_payload['operation'] = 2
175+
else:
176+
row_w_payload['operation'] = 1
177+
row_w_payload['table'] = event.table
178+
row_w_payload['schema'] = str(event.schema).split('_')[0]
179+
row_w_payload['log_pos'] = event.pymysqlreplication_event.packet.log_pos
180+
row_w_payload['binlog_timestamp'] = event.pymysqlreplication_event.timestamp
181+
self.convert_null_values(row)
182+
row_w_payload['payload'] = json.dumps(row, default=str)
183+
self.writer.writerow(self.convert(row_w_payload))
297184

298185
def push(self):
299186
if not self.next_writer_builder or not self.fieldnames:

clickhouse_mysql/writer/processwriter.py

Lines changed: 0 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66

77
from clickhouse_mysql.writer.writer import Writer
88

9-
109
class ProcessWriter(Writer):
1110
"""Start write procedure as a separated process"""
1211
args = None
@@ -35,28 +34,6 @@ def process(self, event_or_events=None):
3534
writer.destroy()
3635
logging.debug('class:%s process() done', __class__)
3736

38-
def processDelete(self, event_or_events=None):
39-
"""Separate process body to be run"""
40-
41-
logging.debug('class:%s process()', __class__)
42-
writer = self.next_writer_builder.get()
43-
writer.delete_row(event_or_events)
44-
writer.close()
45-
writer.push()
46-
writer.destroy()
47-
logging.debug('class:%s processDelete() done', __class__)
48-
49-
def processUpdate(self, event_or_events=None):
50-
"""Separate process body to be run"""
51-
52-
logging.debug('class:%s process()', __class__)
53-
writer = self.next_writer_builder.get()
54-
writer.update(event_or_events)
55-
writer.close()
56-
writer.push()
57-
writer.destroy()
58-
logging.debug('class:%s processUpdate() done', __class__)
59-
6037
def insert(self, event_or_events=None):
6138
# event_or_events = [
6239
# event: {
@@ -79,50 +56,6 @@ def insert(self, event_or_events=None):
7956
logging.debug('class:%s insert done', __class__)
8057
pass
8158

82-
def delete(self, event_or_events=None):
83-
# event_or_events = [
84-
# event: {
85-
# row: {'id': 3, 'a': 3}
86-
# },
87-
# event: {
88-
# row: {'id': 3, 'a': 3}
89-
# },
90-
# ]
91-
92-
# start separated process with event_or_events to be inserted
93-
94-
logging.debug('class:%s delete', __class__)
95-
process = mp.Process(target=self.processDelete, args=(event_or_events,))
96-
97-
logging.debug('class:%s delete.process.start()', __class__)
98-
process.start()
99-
100-
#process.join()
101-
logging.debug('class:%s delete done', __class__)
102-
pass
103-
104-
def update(self, event_or_events=None):
105-
# event_or_events = [
106-
# event: {
107-
# row: {'id': 3, 'a': 3}
108-
# },
109-
# event: {
110-
# row: {'id': 3, 'a': 3}
111-
# },
112-
# ]
113-
114-
# start separated process with event_or_events to be inserted
115-
116-
logging.debug('class:%s update', __class__)
117-
process = mp.Process(target=self.processUpdate, args=(event_or_events,))
118-
119-
logging.debug('class:%s update.process.start()', __class__)
120-
process.start()
121-
122-
#process.join()
123-
logging.debug('class:%s update done', __class__)
124-
pass
125-
12659
def flush(self):
12760
pass
12861

clickhouse_mysql/writer/tbcsvwriter.py

Lines changed: 3 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -52,30 +52,15 @@ def __init__(
5252
self.dst_distribute = dst_distribute
5353

5454

55-
def format_null_values(self, csv_file):
56-
""" We need to replace NULL values by \\N (CH null value) and do not quote this field.
57-
It is the only way to distinguish between NULL and empty strings. With this we will have:
58-
- xx,\\N,yy --> For null values
59-
- xx,'',yy --> For empty strings """
60-
61-
with open(csv_file, 'r') as file:
62-
data = file.read()
63-
data = data.replace('"NULL"', '\\N')
64-
65-
with open(csv_file, 'w') as file:
66-
file.write(data)
67-
68-
6955
def uploadCSV(self, table, filename, tries=1):
7056
limit_of_retries = 3
7157
params = {
7258
'name': table,
73-
'mode': 'append'
59+
'mode': 'append',
60+
'dialect_delimiter': ','
7461
}
7562

7663
try:
77-
# Add replace NULL values by \N
78-
self.format_null_values(filename)
7964

8065
with open(filename, 'rb') as f:
8166
m = MultipartEncoder(fields={'csv': ('csv', f, 'text/csv')})
@@ -264,6 +249,6 @@ def update(self, event_or_events=None):
264249
# logging.debug('starting %s', bash)
265250
# os.system(bash)
266251

267-
logging.debug("CHCSVWriter: delete row")
252+
logging.debug("CHCSVWriter: update row")
268253

269254
pass

pack/clickhouse-mysql.spec

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ chown -R $CLICKHOUSE_USER:$CLICKHOUSE_GROUP /var/run/clickhouse-mysql
118118
chown -R $CLICKHOUSE_USER:$CLICKHOUSE_GROUP /var/log/clickhouse-mysql
119119

120120
/usr/bin/pip3 install mysqlclient
121-
/usr/bin/pip3 install mysql-replication
121+
/usr/bin/pip3 install mysql-replication==0.45.1
122122
/usr/bin/pip3 install clickhouse-driver
123123
/usr/bin/pip3 install configobj
124124

requirements.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
clickhouse-driver==0.2.0
2-
mysql-replication==0.23
2+
mysql-replication==0.45.1
33
mysqlclient==2.0.3
44
PyMySQL==1.0.2
55
pytz==2021.1

0 commit comments

Comments
 (0)