Skip to content

Commit 9fbc586

Browse files
authored
Merge pull request #2 from tinybirdco/feature/update-delete-with-inserts
Feature/update delete with inserts
2 parents a8e3c92 + 5dc884d commit 9fbc586

22 files changed

+598
-649
lines changed

.flake8

+13
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
[flake8]
2+
ignore =
3+
; except
4+
E722,
5+
; inline regex
6+
W605,
7+
; long lines
8+
E501,
9+
; too complex
10+
C901
11+
max-complexity = 10
12+
max-line-length = 120
13+
application-import-names = flake8

.gitignore

+2-1
Original file line numberDiff line numberDiff line change
@@ -46,4 +46,5 @@ _build
4646

4747
# Tinibird
4848
bl-*
49-
out-*
49+
out-*
50+
.e

clickhouse_mysql/clioptions.py

+23
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,10 @@ class CLIOptions(Options):
9393
#
9494
# general app section
9595
#
96+
97+
'tb_host': 'https://ui.tinybird.co',
98+
'tb_token': None,
99+
96100
'config_file': '/etc/clickhouse-mysql/clickhouse-mysql.conf',
97101
'log_file': None,
98102
'log_level': None,
@@ -171,6 +175,20 @@ def options(self):
171175
#
172176
# general app section
173177
#
178+
argparser.add_argument(
179+
'--tb-host',
180+
type=str,
181+
default=self.default_options['tb_host'],
182+
help='Tinybird host'
183+
)
184+
185+
argparser.add_argument(
186+
'--tb-token',
187+
type=str,
188+
default=self.default_options['tb_token'],
189+
help='Tinybird host'
190+
)
191+
174192
argparser.add_argument(
175193
'--config-file',
176194
type=str,
@@ -508,6 +526,11 @@ def options(self):
508526
#
509527
# general app section
510528
#
529+
530+
'tb_host': args.tb_host,
531+
'tb_token': args.tb_token,
532+
533+
511534
'config_file': args.config_file,
512535
'log_file': args.log_file,
513536
'log_level': args.log_level,

clickhouse_mysql/config.py

+17-6
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
#!/usr/bin/env python
22
# -*- coding: utf-8 -*-
33

4+
import logging
5+
import os
46
from clickhouse_mysql.reader.mysqlreader import MySQLReader
57
from clickhouse_mysql.reader.csvreader import CSVReader
68

79
from clickhouse_mysql.writer.chwriter import CHWriter
810
from clickhouse_mysql.writer.csvwriter import CSVWriter
9-
from clickhouse_mysql.writer.chcsvwriter import CHCSVWriter
11+
from clickhouse_mysql.writer.tbcsvwriter import TBCSVWriter
1012
from clickhouse_mysql.writer.poolwriter import PoolWriter
1113
from clickhouse_mysql.writer.processwriter import ProcessWriter
1214
from clickhouse_mysql.objectbuilder import ObjectBuilder
@@ -39,7 +41,7 @@ def __init__(self):
3941

4042
log_file = None
4143
log_pos = None
42-
if self.options['binlog_position_file'] and self.options.get_bool('src_resume'):
44+
if self.options['binlog_position_file'] and self.options.get_bool('src_resume') and os.path.exists(self.options['binlog_position_file']):
4345
try:
4446
with open(self.options['binlog_position_file'], 'r') as f:
4547
position = f.read()
@@ -50,17 +52,22 @@ def __init__(self):
5052
log_file,
5153
log_pos
5254
))
53-
except:
55+
except Exception as e:
5456
log_file = None
5557
log_pos = None
56-
print("can't read binlog position from file {}".format(
58+
logging.exception(e)
59+
logging.info("can't read binlog position from file {}".format(
5760
self.options['binlog_position_file'],
5861
))
5962
# build application config out of aggregated options
6063
self.config = {
6164
#
6265
#
6366
#
67+
'tinybird': {
68+
'host': self.options['tb_host'],
69+
'token': self.options['tb_token'],
70+
},
6471
'app': {
6572
'config_file': self.options['config_file'],
6673
'log_file': self.options['log_file'],
@@ -359,8 +366,12 @@ def writer_builder_csvpool(self):
359366
'dst_table': self.config['writer']['file']['dst_table'],
360367
'dst_table_prefix': self.config['writer']['file']['dst_table_prefix'],
361368
'next_writer_builder': ObjectBuilder(
362-
class_name=CHCSVWriter,
363-
constructor_params=self.config['writer']['clickhouse']
369+
class_name=TBCSVWriter,
370+
constructor_params={
371+
'tb_host': self.config['tinybird']['host'],
372+
'tb_token': self.config['tinybird']['token'],
373+
'dst_table': self.config['writer']['clickhouse']['dst_table']
374+
}
364375
),
365376
'converter_builder': self.converter_builder(CONVERTER_CSV),
366377
})

clickhouse_mysql/event/event.py

+5-1
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,11 @@ def __next__(self):
6464

6565
if self.pymysqlreplication_event is not None:
6666
# in native replication event actual data are in row['values'] dict item
67-
return item['values']
67+
if 'after_values' in item:
68+
return item['after_values']
69+
else:
70+
return item['values']
71+
6872
else:
6973
# local-kept data
7074
return item

clickhouse_mysql/main.py

+5
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#!/usr/bin/env python
22
# -*- coding: utf-8 -*-
33

4+
import signal
45
import sys
56
import multiprocessing as mp
67
import logging
@@ -145,6 +146,10 @@ def run(self):
145146
reader=self.config.reader(),
146147
writer=self.config.writer(),
147148
)
149+
150+
signal.signal(signal.SIGINT, pumper.exit_gracefully)
151+
signal.signal(signal.SIGTERM, pumper.exit_gracefully)
152+
148153
pumper.run()
149154

150155
except Exception as ex:

clickhouse_mysql/pumper.py

+11-3
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,18 @@
22
# -*- coding: utf-8 -*-
33

44

5+
from clickhouse_mysql.reader.reader import Reader
6+
from clickhouse_mysql.writer.writer import Writer
7+
import signal
8+
9+
510
class Pumper(object):
611
"""
712
Pump data - read data from reader and push into writer
813
"""
914

10-
reader = None
11-
writer = None
15+
reader: Reader = None
16+
writer: Writer = None
1217

1318
def __init__(self, reader=None, writer=None):
1419
self.reader = reader
@@ -21,7 +26,7 @@ def __init__(self, reader=None, writer=None):
2126
'UpdateRowsEvent': self.update_rows_event,
2227
'DeleteRowsEvent': self.delete_rows_event,
2328
# 'WriteRowsEvent.EachRow': self.write_rows_event_each_row,
24-
'ReaderIdleEvent': self.reader_idle_event,
29+
# 'ReaderIdleEvent': self.reader_idle_event,
2530
})
2631

2732
def run(self):
@@ -61,6 +66,9 @@ def update_rows_event(self, event=None):
6166
"""
6267
self.writer.update(event)
6368

69+
def exit_gracefully(self, sig, frame):
70+
self.reader.close()
71+
6472

6573
if __name__ == '__main__':
6674
print("pumper")

clickhouse_mysql/reader/mysqlreader.py

+23-23
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
from clickhouse_mysql.event.event import Event
1313
from clickhouse_mysql.tableprocessor import TableProcessor
1414
from clickhouse_mysql.util import Util
15-
from pymysqlreplication.event import QueryEvent, RotateEvent, FormatDescriptionEvent
1615

1716

1817
class MySQLReader(Reader):
@@ -29,9 +28,10 @@ class MySQLReader(Reader):
2928
resume_stream = None
3029
binlog_stream = None
3130
nice_pause = 0
31+
exit_gracefully = False
3232

3333
write_rows_event_num = 0
34-
write_rows_event_each_row_num = 0;
34+
write_rows_event_each_row_num = 0
3535

3636
binlog_position_file = None
3737

@@ -316,21 +316,21 @@ def process_update_rows_event(self, mysql_event):
316316
return
317317

318318
# statistics
319-
#self.stat_write_rows_event_calc_rows_num_min_max(rows_num_per_event=len(mysql_event.rows))
319+
self.stat_write_rows_event_calc_rows_num_min_max(rows_num_per_event=len(mysql_event.rows))
320320

321321
if self.subscribers('UpdateRowsEvent'):
322322
# dispatch event to subscribers
323323

324324
# statistics
325-
#self.stat_write_rows_event_all_rows(mysql_event=mysql_event)
325+
# self.stat_write_rows_event_all_rows(mysql_event=mysql_event)
326326

327327
# dispatch Event
328328
event = Event()
329329
event.schema = mysql_event.schema
330330
event.table = mysql_event.table
331331
event.pymysqlreplication_event = mysql_event
332332

333-
#self.process_first_event(event=event)
333+
self.process_first_event(event=event)
334334
self.notify('UpdateRowsEvent', event=event)
335335

336336
# self.stat_write_rows_event_finalyse()
@@ -355,13 +355,13 @@ def process_delete_rows_event(self, mysql_event):
355355
return
356356

357357
# statistics
358-
#self.stat_write_rows_event_calc_rows_num_min_max(rows_num_per_event=len(mysql_event.rows))
358+
# self.stat_write_rows_event_calc_rows_num_min_max(rows_num_per_event=len(mysql_event.rows))
359359

360360
if self.subscribers('DeleteRowsEvent'):
361361
# dispatch event to subscribers
362362

363363
# statistics
364-
#self.stat_write_rows_event_all_rows(mysql_event=mysql_event)
364+
# self.stat_write_rows_event_all_rows(mysql_event=mysql_event)
365365

366366
# dispatch Event
367367
event = Event()
@@ -389,7 +389,7 @@ def read(self):
389389

390390
# fetch events
391391
try:
392-
while True:
392+
while not self.exit_gracefully:
393393
logging.debug('Check events in binlog stream')
394394

395395
self.init_fetch_loop()
@@ -403,8 +403,9 @@ def read(self):
403403

404404
# fetch available events from MySQL
405405
for mysql_event in self.binlog_stream:
406-
# new event has come
407-
# check what to do with it
406+
407+
if self.exit_gracefully:
408+
break
408409

409410
logging.debug(
410411
'Got Event ' + self.binlog_stream.log_file + ":" + str(self.binlog_stream.log_pos))
@@ -420,23 +421,19 @@ def read(self):
420421
# skip other unhandled events
421422
pass
422423

423-
# after event processed, we need to handle current binlog position
424-
self.process_binlog_position(self.binlog_stream.log_file, self.binlog_stream.log_pos)
424+
# after event processed, we need to handle current binlog position
425+
self.process_binlog_position(self.binlog_stream.log_file, self.binlog_stream.log_pos)
425426

426-
except KeyboardInterrupt:
427-
# pass SIGINT further
428-
logging.info("SIGINT received. Pass it further.")
429-
raise
430427
except Exception as ex:
431428
if self.blocking:
432429
# we'd like to continue waiting for data
433430
# report and continue cycle
434431
logging.warning("Got an exception, skip it in blocking mode")
435-
logging.warning(ex)
432+
logging.exception(ex)
436433
else:
437434
# do not continue, report error and exit
438435
logging.critical("Got an exception, abort it in non-blocking mode")
439-
logging.critical(ex)
436+
logging.exception(ex)
440437
sys.exit(1)
441438

442439
# all events fetched (or none of them available)
@@ -453,25 +450,28 @@ def read(self):
453450
time.sleep(self.nice_pause)
454451

455452
self.notify('ReaderIdleEvent')
456-
457-
except KeyboardInterrupt:
458-
logging.info("SIGINT received. Time to exit.")
459453
except Exception as ex:
460454
logging.warning("Got an exception, handle it")
461-
logging.warning(ex)
455+
logging.exception(ex)
462456

463457
try:
464458
self.binlog_stream.close()
459+
logging.info("Stop reading from MySQL")
465460
except Exception as ex:
466461
logging.warning("Unable to close binlog stream correctly")
467-
logging.warning(ex)
462+
logging.exception(ex)
468463

469464
end_timestamp = int(time.time())
470465

471466
logging.info('start %d', self.start_timestamp)
472467
logging.info('end %d', end_timestamp)
473468
logging.info('len %d', end_timestamp - self.start_timestamp)
474469

470+
def close(self):
471+
self.exit_gracefully = True
472+
self.nice_pause = 0
473+
logging.info("MySQL should stop in the next loop")
474+
475475

476476
if __name__ == '__main__':
477477
connection_settings = {

clickhouse_mysql/reader/reader.py

+3
Original file line numberDiff line numberDiff line change
@@ -33,3 +33,6 @@ def __init__(self, converter=None, callbacks={}):
3333

3434
def read(self):
3535
pass
36+
37+
def close(self):
38+
pass

0 commit comments

Comments
 (0)