Skip to content

Feature/add update delete support #1

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Jun 7, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,7 @@ _build

# Pyenv
.python-version

# Tinibird
bl-*
out-*
3 changes: 3 additions & 0 deletions clickhouse_mysql/event/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ class Event(object):
# table name
table = None

# primary key
primary_key = None

# /path/to/csv/file.csv
filename = None

Expand Down
14 changes: 13 additions & 1 deletion clickhouse_mysql/pool/bbpool.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from clickhouse_mysql.pool.pool import Pool
from clickhouse_mysql.objectbuilder import ObjectBuilder
from pymysqlreplication.row_event import WriteRowsEvent, UpdateRowsEvent, DeleteRowsEvent


# Buckets Belts' Index Generator
Expand Down Expand Up @@ -149,7 +150,18 @@ def rotate_belt(self, belt_index, flush=False):
# time to flush data for specified key
#self.writer_builder.param('csv_file_path_suffix_parts', [str(int(now)), str(self.buckets_num_total)])
writer = self.writer_builder.new()
writer.insert(self.belts[belt_index].pop())
item = self.belts[belt_index].pop()
# process event based on its type
if isinstance(item[0].pymysqlreplication_event, WriteRowsEvent):
writer.insert(item)
elif isinstance(item[0].pymysqlreplication_event, DeleteRowsEvent):
writer.delete(item)
elif isinstance(item[0].pymysqlreplication_event, UpdateRowsEvent):
writer.update(item)
else:
# skip other unhandled events
pass
# writer.insert(self.belts[belt_index].pop())
writer.close()
writer.push()
writer.destroy()
Expand Down
18 changes: 17 additions & 1 deletion clickhouse_mysql/pumper.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,15 @@ class Pumper(object):
writer = None

def __init__(self, reader=None, writer=None):

self.reader = reader
self.writer = writer

if self.reader:
# subscribe on reader's event notifications
self.reader.subscribe({
'WriteRowsEvent': self.write_rows_event,
'UpdateRowsEvent': self.update_rows_event,
'DeleteRowsEvent': self.delete_rows_event,
# 'WriteRowsEvent.EachRow': self.write_rows_event_each_row,
'ReaderIdleEvent': self.reader_idle_event,
})
Expand Down Expand Up @@ -46,5 +47,20 @@ def reader_idle_event(self):
"""
self.writer.flush()

def delete_rows_event(self, event=None):
"""
DeleteRowsEvent handler
:param event:
"""
self.writer.delete_row(event)

def update_rows_event(self, event=None):
"""
UpdateRowsEvent handler
:param event:
"""
self.writer.update(event)


if __name__ == '__main__':
print("pumper")
120 changes: 100 additions & 20 deletions clickhouse_mysql/reader/mysqlreader.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from clickhouse_mysql.event.event import Event
from clickhouse_mysql.tableprocessor import TableProcessor
from clickhouse_mysql.util import Util
#from pymysqlreplication.event import QueryEvent, RotateEvent, FormatDescriptionEvent
from pymysqlreplication.event import QueryEvent, RotateEvent, FormatDescriptionEvent


class MySQLReader(Reader):
Expand Down Expand Up @@ -56,13 +56,15 @@ def __init__(
self.server_id = server_id
self.log_file = log_file
self.log_pos = log_pos
self.schemas = None if not TableProcessor.extract_dbs(schemas, Util.join_lists(tables, tables_prefixes)) else TableProcessor.extract_dbs(schemas, Util.join_lists(tables, tables_prefixes))
self.schemas = None if not TableProcessor.extract_dbs(schemas, Util.join_lists(tables,
tables_prefixes)) else TableProcessor.extract_dbs(
schemas, Util.join_lists(tables, tables_prefixes))
self.tables = None if tables is None else TableProcessor.extract_tables(tables)
self.tables_prefixes = None if tables_prefixes is None else TableProcessor.extract_tables(tables_prefixes)
self.blocking = blocking
self.resume_stream = resume_stream
self.nice_pause = nice_pause
self.binlog_position_file=binlog_position_file
self.binlog_position_file = binlog_position_file

logging.info("raw dbs list. len()=%d", 0 if schemas is None else len(schemas))
if schemas is not None:
Expand All @@ -86,7 +88,8 @@ def __init__(
if tables_prefixes is not None:
for table in tables_prefixes:
logging.info(table)
logging.info("normalised tables-prefixes list. len()=%d", 0 if self.tables_prefixes is None else len(self.tables_prefixes))
logging.info("normalised tables-prefixes list. len()=%d",
0 if self.tables_prefixes is None else len(self.tables_prefixes))
if self.tables_prefixes is not None:
for table in self.tables_prefixes:
logging.info(table)
Expand All @@ -101,21 +104,21 @@ def __init__(
# we are interested in reading CH-repeatable events only
only_events=[
# Possible events
#BeginLoadQueryEvent,
# BeginLoadQueryEvent,
DeleteRowsEvent,
#ExecuteLoadQueryEvent,
#FormatDescriptionEvent,
#GtidEvent,
#HeartbeatLogEvent,
#IntvarEvent
#NotImplementedEvent,
#QueryEvent,
#RotateEvent,
#StopEvent,
#TableMapEvent,
# ExecuteLoadQueryEvent,
# FormatDescriptionEvent,
# GtidEvent,
# HeartbeatLogEvent,
# IntvarEvent
# NotImplementedEvent,
# QueryEvent,
# RotateEvent,
# StopEvent,
# TableMapEvent,
UpdateRowsEvent,
WriteRowsEvent,
#XidEvent,
# XidEvent,
],
only_schemas=self.schemas,
# in case we have any prefixes - this means we need to listen to all tables within specified schemas
Expand Down Expand Up @@ -245,6 +248,9 @@ def process_write_rows_event(self, mysql_event):
:param mysql_event: WriteRowsEvent instance
:return:
"""

logging.debug("Received insert event for table: " + mysql_event.table)

if self.tables_prefixes:
# we have prefixes specified
# need to find whether current event is produced by table in 'looking-into-tables' list
Expand Down Expand Up @@ -294,10 +300,81 @@ def process_write_rows_event(self, mysql_event):
self.stat_write_rows_event_finalyse()

def process_update_rows_event(self, mysql_event):
logging.info("Skip update rows")

logging.debug("Received update event for table: " + mysql_event.table + " Schema: " + mysql_event.schema)

# for row in mysql_event.rows:
# for key in row['before_values']:
# logging.debug("\t *%s:%s=>%s" % (key, row["before_values"][key], row["after_values"][key]))

if self.tables_prefixes:
# we have prefixes specified
# need to find whether current event is produced by table in 'looking-into-tables' list
if not self.is_table_listened(mysql_event.table):
# this table is not listened
# processing is over - just skip event
return

# statistics
#self.stat_write_rows_event_calc_rows_num_min_max(rows_num_per_event=len(mysql_event.rows))

if self.subscribers('UpdateRowsEvent'):
# dispatch event to subscribers

# statistics
#self.stat_write_rows_event_all_rows(mysql_event=mysql_event)

# dispatch Event
event = Event()
event.schema = mysql_event.schema
event.table = mysql_event.table
event.pymysqlreplication_event = mysql_event

#self.process_first_event(event=event)
self.notify('UpdateRowsEvent', event=event)

# self.stat_write_rows_event_finalyse()

# logging.info("Skip update rows")

def process_delete_rows_event(self, mysql_event):
logging.info("Skip delete rows")
logging.debug("Received delete event for table: " + mysql_event.table)

"""
for row in mysql_event.rows:
for key in row['values']:
logging.debug("\t *", key, ":", row["values"][key])
"""

if self.tables_prefixes:
# we have prefixes specified
# need to find whether current event is produced by table in 'looking-into-tables' list
if not self.is_table_listened(mysql_event.table):
# this table is not listened
# processing is over - just skip event
return

# statistics
#self.stat_write_rows_event_calc_rows_num_min_max(rows_num_per_event=len(mysql_event.rows))

if self.subscribers('DeleteRowsEvent'):
# dispatch event to subscribers

# statistics
#self.stat_write_rows_event_all_rows(mysql_event=mysql_event)

# dispatch Event
event = Event()
event.schema = mysql_event.schema
event.table = mysql_event.table
event.pymysqlreplication_event = mysql_event

self.process_first_event(event=event)
self.notify('DeleteRowsEvent', event=event)

# self.stat_write_rows_event_finalyse()

# logging.info("Skip delete rows")

def process_binlog_position(self, file, pos):
if self.binlog_position_file:
Expand All @@ -321,14 +398,16 @@ def read(self):
self.stat_init_fetch_loop()

try:
logging.debug('Pre-start binlog position: ' + self.binlog_stream.log_file + ":" + str(self.binlog_stream.log_pos) if self.binlog_stream.log_pos is not None else "undef")
logging.debug('Pre-start binlog position: ' + self.binlog_stream.log_file + ":" + str(
self.binlog_stream.log_pos) if self.binlog_stream.log_pos is not None else "undef")

# fetch available events from MySQL
for mysql_event in self.binlog_stream:
# new event has come
# check what to do with it

logging.debug('Got Event ' + self.binlog_stream.log_file + ":" + str(self.binlog_stream.log_pos))
logging.debug(
'Got Event ' + self.binlog_stream.log_file + ":" + str(self.binlog_stream.log_pos))

# process event based on its type
if isinstance(mysql_event, WriteRowsEvent):
Expand Down Expand Up @@ -393,6 +472,7 @@ def read(self):
logging.info('end %d', end_timestamp)
logging.info('len %d', end_timestamp - self.start_timestamp)


if __name__ == '__main__':
connection_settings = {
'host': '127.0.0.1',
Expand Down
7 changes: 7 additions & 0 deletions clickhouse_mysql/reader/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,13 @@ class Reader(Observable):

# called when Reader has no data to read
'ReaderIdleEvent': [],

# called on each DeleteRowsEvent
'DeleteRowsEvent': [],

# called on each UpdateRowsEvent
'UpdateRowsEvent': [],

}

def __init__(self, converter=None, callbacks={}):
Expand Down
Loading