diff --git a/clickhouse-mysql b/clickhouse-mysql index a7432cb..9e77d5a 100755 --- a/clickhouse-mysql +++ b/clickhouse-mysql @@ -45,6 +45,7 @@ class Main(Daemon): elif self.config.is_table_migrate(): migrator = self.config.table_migrator() migrator.chwriter = self.config.writer() + migrator.pool_max_rows_num = self.config.mempool_max_rows_num() migrator.migrate() else: diff --git a/src/cliopts.py b/src/cliopts.py index 658792a..73e9572 100644 --- a/src/cliopts.py +++ b/src/cliopts.py @@ -124,6 +124,12 @@ def config(): default=100000, help='Max events number to pool - triggering pool flush' ) + argparser.add_argument( + '--mempool-max-rows-num', + type=int, + default=100000, + help='Max rows number to pool - triggering pool flush' + ) argparser.add_argument( '--mempool-max-flush-interval', type=int, @@ -290,6 +296,7 @@ def config(): 'pid_file': args.pid_file, 'mempool': args.mempool or args.csvpool, # csvpool assumes mempool to be enabled 'mempool-max-events-num': args.mempool_max_events_num, + 'mempool-max-rows-num': args.mempool_max_rows_num, 'mempool-max-flush-interval': args.mempool_max_flush_interval, 'csvpool': args.csvpool, }, diff --git a/src/config.py b/src/config.py index 926a883..62dbdc3 100644 --- a/src/config.py +++ b/src/config.py @@ -41,6 +41,12 @@ def nice_pause(self): def pid_file(self): return self.config['app-config']['pid_file'] + def mempool_max_events_num(self): + return self.config['app-config']['mempool-max-events-num'] + + def mempool_max_rows_num(self): + return self.config['app-config']['mempool-max-rows-num'] + def is_daemon(self): return self.config['app-config']['daemon'] diff --git a/src/tablemigrator.py b/src/tablemigrator.py index c1b830b..1e2450b 100644 --- a/src/tablemigrator.py +++ b/src/tablemigrator.py @@ -11,6 +11,7 @@ class TableMigrator(TableProcessor): cursorclass = SSDictCursor chwriter = None + pool_max_rows_num = 100000 def migrate(self): dbs = self.dbs_tables_lists() @@ -27,7 +28,7 @@ def migrate_table(self, db=None, table=None, ): self.cursor.execute("SELECT * FROM {0}".format(self.create_full_table_name(db=db, table=table))) cnt = 0; while True: - rows = self.cursor.fetchmany(10000) + rows = self.cursor.fetchmany(self.pool_max_rows_num) if not rows: break self.chwriter.dst_schema = db