Skip to content

Commit 10e9c15

Browse files
authored
Merge pull request #1 from tinybirdco/feature/add-update-delete-support
Feature/add update delete support
2 parents 3b1b708 + a9c0cfd commit 10e9c15

18 files changed

+914
-25
lines changed

.gitignore

+4
Original file line numberDiff line numberDiff line change
@@ -43,3 +43,7 @@ _build
4343

4444
# Pyenv
4545
.python-version
46+
47+
# Tinibird
48+
bl-*
49+
out-*

clickhouse_mysql/event/event.py

+3
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,9 @@ class Event(object):
2828
# table name
2929
table = None
3030

31+
# primary key
32+
primary_key = None
33+
3134
# /path/to/csv/file.csv
3235
filename = None
3336

clickhouse_mysql/pool/bbpool.py

+13-1
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
from clickhouse_mysql.pool.pool import Pool
88
from clickhouse_mysql.objectbuilder import ObjectBuilder
9+
from pymysqlreplication.row_event import WriteRowsEvent, UpdateRowsEvent, DeleteRowsEvent
910

1011

1112
# Buckets Belts' Index Generator
@@ -149,7 +150,18 @@ def rotate_belt(self, belt_index, flush=False):
149150
# time to flush data for specified key
150151
#self.writer_builder.param('csv_file_path_suffix_parts', [str(int(now)), str(self.buckets_num_total)])
151152
writer = self.writer_builder.new()
152-
writer.insert(self.belts[belt_index].pop())
153+
item = self.belts[belt_index].pop()
154+
# process event based on its type
155+
if isinstance(item[0].pymysqlreplication_event, WriteRowsEvent):
156+
writer.insert(item)
157+
elif isinstance(item[0].pymysqlreplication_event, DeleteRowsEvent):
158+
writer.delete(item)
159+
elif isinstance(item[0].pymysqlreplication_event, UpdateRowsEvent):
160+
writer.update(item)
161+
else:
162+
# skip other unhandled events
163+
pass
164+
# writer.insert(self.belts[belt_index].pop())
153165
writer.close()
154166
writer.push()
155167
writer.destroy()

clickhouse_mysql/pumper.py

+17-1
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,15 @@ class Pumper(object):
1111
writer = None
1212

1313
def __init__(self, reader=None, writer=None):
14-
1514
self.reader = reader
1615
self.writer = writer
1716

1817
if self.reader:
1918
# subscribe on reader's event notifications
2019
self.reader.subscribe({
2120
'WriteRowsEvent': self.write_rows_event,
21+
'UpdateRowsEvent': self.update_rows_event,
22+
'DeleteRowsEvent': self.delete_rows_event,
2223
# 'WriteRowsEvent.EachRow': self.write_rows_event_each_row,
2324
'ReaderIdleEvent': self.reader_idle_event,
2425
})
@@ -46,5 +47,20 @@ def reader_idle_event(self):
4647
"""
4748
self.writer.flush()
4849

50+
def delete_rows_event(self, event=None):
51+
"""
52+
DeleteRowsEvent handler
53+
:param event:
54+
"""
55+
self.writer.delete_row(event)
56+
57+
def update_rows_event(self, event=None):
58+
"""
59+
UpdateRowsEvent handler
60+
:param event:
61+
"""
62+
self.writer.update(event)
63+
64+
4965
if __name__ == '__main__':
5066
print("pumper")

clickhouse_mysql/reader/mysqlreader.py

+100-20
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
from clickhouse_mysql.event.event import Event
1313
from clickhouse_mysql.tableprocessor import TableProcessor
1414
from clickhouse_mysql.util import Util
15-
#from pymysqlreplication.event import QueryEvent, RotateEvent, FormatDescriptionEvent
15+
from pymysqlreplication.event import QueryEvent, RotateEvent, FormatDescriptionEvent
1616

1717

1818
class MySQLReader(Reader):
@@ -56,13 +56,15 @@ def __init__(
5656
self.server_id = server_id
5757
self.log_file = log_file
5858
self.log_pos = log_pos
59-
self.schemas = None if not TableProcessor.extract_dbs(schemas, Util.join_lists(tables, tables_prefixes)) else TableProcessor.extract_dbs(schemas, Util.join_lists(tables, tables_prefixes))
59+
self.schemas = None if not TableProcessor.extract_dbs(schemas, Util.join_lists(tables,
60+
tables_prefixes)) else TableProcessor.extract_dbs(
61+
schemas, Util.join_lists(tables, tables_prefixes))
6062
self.tables = None if tables is None else TableProcessor.extract_tables(tables)
6163
self.tables_prefixes = None if tables_prefixes is None else TableProcessor.extract_tables(tables_prefixes)
6264
self.blocking = blocking
6365
self.resume_stream = resume_stream
6466
self.nice_pause = nice_pause
65-
self.binlog_position_file=binlog_position_file
67+
self.binlog_position_file = binlog_position_file
6668

6769
logging.info("raw dbs list. len()=%d", 0 if schemas is None else len(schemas))
6870
if schemas is not None:
@@ -86,7 +88,8 @@ def __init__(
8688
if tables_prefixes is not None:
8789
for table in tables_prefixes:
8890
logging.info(table)
89-
logging.info("normalised tables-prefixes list. len()=%d", 0 if self.tables_prefixes is None else len(self.tables_prefixes))
91+
logging.info("normalised tables-prefixes list. len()=%d",
92+
0 if self.tables_prefixes is None else len(self.tables_prefixes))
9093
if self.tables_prefixes is not None:
9194
for table in self.tables_prefixes:
9295
logging.info(table)
@@ -101,21 +104,21 @@ def __init__(
101104
# we are interested in reading CH-repeatable events only
102105
only_events=[
103106
# Possible events
104-
#BeginLoadQueryEvent,
107+
# BeginLoadQueryEvent,
105108
DeleteRowsEvent,
106-
#ExecuteLoadQueryEvent,
107-
#FormatDescriptionEvent,
108-
#GtidEvent,
109-
#HeartbeatLogEvent,
110-
#IntvarEvent
111-
#NotImplementedEvent,
112-
#QueryEvent,
113-
#RotateEvent,
114-
#StopEvent,
115-
#TableMapEvent,
109+
# ExecuteLoadQueryEvent,
110+
# FormatDescriptionEvent,
111+
# GtidEvent,
112+
# HeartbeatLogEvent,
113+
# IntvarEvent
114+
# NotImplementedEvent,
115+
# QueryEvent,
116+
# RotateEvent,
117+
# StopEvent,
118+
# TableMapEvent,
116119
UpdateRowsEvent,
117120
WriteRowsEvent,
118-
#XidEvent,
121+
# XidEvent,
119122
],
120123
only_schemas=self.schemas,
121124
# in case we have any prefixes - this means we need to listen to all tables within specified schemas
@@ -245,6 +248,9 @@ def process_write_rows_event(self, mysql_event):
245248
:param mysql_event: WriteRowsEvent instance
246249
:return:
247250
"""
251+
252+
logging.debug("Received insert event for table: " + mysql_event.table)
253+
248254
if self.tables_prefixes:
249255
# we have prefixes specified
250256
# need to find whether current event is produced by table in 'looking-into-tables' list
@@ -294,10 +300,81 @@ def process_write_rows_event(self, mysql_event):
294300
self.stat_write_rows_event_finalyse()
295301

296302
def process_update_rows_event(self, mysql_event):
297-
logging.info("Skip update rows")
303+
304+
logging.debug("Received update event for table: " + mysql_event.table + " Schema: " + mysql_event.schema)
305+
306+
# for row in mysql_event.rows:
307+
# for key in row['before_values']:
308+
# logging.debug("\t *%s:%s=>%s" % (key, row["before_values"][key], row["after_values"][key]))
309+
310+
if self.tables_prefixes:
311+
# we have prefixes specified
312+
# need to find whether current event is produced by table in 'looking-into-tables' list
313+
if not self.is_table_listened(mysql_event.table):
314+
# this table is not listened
315+
# processing is over - just skip event
316+
return
317+
318+
# statistics
319+
#self.stat_write_rows_event_calc_rows_num_min_max(rows_num_per_event=len(mysql_event.rows))
320+
321+
if self.subscribers('UpdateRowsEvent'):
322+
# dispatch event to subscribers
323+
324+
# statistics
325+
#self.stat_write_rows_event_all_rows(mysql_event=mysql_event)
326+
327+
# dispatch Event
328+
event = Event()
329+
event.schema = mysql_event.schema
330+
event.table = mysql_event.table
331+
event.pymysqlreplication_event = mysql_event
332+
333+
#self.process_first_event(event=event)
334+
self.notify('UpdateRowsEvent', event=event)
335+
336+
# self.stat_write_rows_event_finalyse()
337+
338+
# logging.info("Skip update rows")
298339

299340
def process_delete_rows_event(self, mysql_event):
300-
logging.info("Skip delete rows")
341+
logging.debug("Received delete event for table: " + mysql_event.table)
342+
343+
"""
344+
for row in mysql_event.rows:
345+
for key in row['values']:
346+
logging.debug("\t *", key, ":", row["values"][key])
347+
"""
348+
349+
if self.tables_prefixes:
350+
# we have prefixes specified
351+
# need to find whether current event is produced by table in 'looking-into-tables' list
352+
if not self.is_table_listened(mysql_event.table):
353+
# this table is not listened
354+
# processing is over - just skip event
355+
return
356+
357+
# statistics
358+
#self.stat_write_rows_event_calc_rows_num_min_max(rows_num_per_event=len(mysql_event.rows))
359+
360+
if self.subscribers('DeleteRowsEvent'):
361+
# dispatch event to subscribers
362+
363+
# statistics
364+
#self.stat_write_rows_event_all_rows(mysql_event=mysql_event)
365+
366+
# dispatch Event
367+
event = Event()
368+
event.schema = mysql_event.schema
369+
event.table = mysql_event.table
370+
event.pymysqlreplication_event = mysql_event
371+
372+
self.process_first_event(event=event)
373+
self.notify('DeleteRowsEvent', event=event)
374+
375+
# self.stat_write_rows_event_finalyse()
376+
377+
# logging.info("Skip delete rows")
301378

302379
def process_binlog_position(self, file, pos):
303380
if self.binlog_position_file:
@@ -321,14 +398,16 @@ def read(self):
321398
self.stat_init_fetch_loop()
322399

323400
try:
324-
logging.debug('Pre-start binlog position: ' + self.binlog_stream.log_file + ":" + str(self.binlog_stream.log_pos) if self.binlog_stream.log_pos is not None else "undef")
401+
logging.debug('Pre-start binlog position: ' + self.binlog_stream.log_file + ":" + str(
402+
self.binlog_stream.log_pos) if self.binlog_stream.log_pos is not None else "undef")
325403

326404
# fetch available events from MySQL
327405
for mysql_event in self.binlog_stream:
328406
# new event has come
329407
# check what to do with it
330408

331-
logging.debug('Got Event ' + self.binlog_stream.log_file + ":" + str(self.binlog_stream.log_pos))
409+
logging.debug(
410+
'Got Event ' + self.binlog_stream.log_file + ":" + str(self.binlog_stream.log_pos))
332411

333412
# process event based on its type
334413
if isinstance(mysql_event, WriteRowsEvent):
@@ -393,6 +472,7 @@ def read(self):
393472
logging.info('end %d', end_timestamp)
394473
logging.info('len %d', end_timestamp - self.start_timestamp)
395474

475+
396476
if __name__ == '__main__':
397477
connection_settings = {
398478
'host': '127.0.0.1',

clickhouse_mysql/reader/reader.py

+7
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,13 @@ class Reader(Observable):
1818

1919
# called when Reader has no data to read
2020
'ReaderIdleEvent': [],
21+
22+
# called on each DeleteRowsEvent
23+
'DeleteRowsEvent': [],
24+
25+
# called on each UpdateRowsEvent
26+
'UpdateRowsEvent': [],
27+
2128
}
2229

2330
def __init__(self, converter=None, callbacks={}):

0 commit comments

Comments
 (0)