Skip to content

Commit 23196b1

Browse files
committed
avg speed in buckets/items/rows
1 parent f1adf35 commit 23196b1

File tree

5 files changed

+44
-26
lines changed

5 files changed

+44
-26
lines changed

main.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ def __init__(self):
2929
)
3030
super().__init__(pidfile=self.config.pid_file())
3131
logging.debug(pprint.pformat(self.config.config))
32-
mp.set_start_method('forkserver')
32+
# mp.set_start_method('forkserver')
3333

3434
def run(self):
3535
pumper = Pumper(

src/pool/bbpool.py

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,11 @@ class BBPool(Pool):
3232
}
3333

3434
buckets_count = 0
35-
buckets_content_count = 0;
35+
items_count = 0;
3636

3737
prev_time = None
3838
prev_buckets_count = 0
39-
prev_buckets_content_count = 0;
39+
prev_items_count = 0;
4040

4141
def __init__(
4242
self,
@@ -133,18 +133,18 @@ def rotate_belt(self, belt_index, flush=False):
133133
last_bucket_size = len(self.belts[belt_index][buckets_num-1])
134134

135135
self.buckets_count += 1
136-
self.buckets_content_count += last_bucket_size
136+
self.items_count += last_bucket_size
137137

138138
logging.info('rot now:%d bktcnt:%d bktcontentcnt: %d index:%s reason:%s bktsonbelt:%d bktsize:%d beltnum:%d',
139-
now,
140-
self.buckets_count,
141-
self.buckets_content_count,
142-
str(belt_index),
143-
rotate_reason,
144-
buckets_num,
145-
last_bucket_size,
146-
len(self.belts)
147-
)
139+
now,
140+
self.buckets_count,
141+
self.items_count,
142+
str(belt_index),
143+
rotate_reason,
144+
buckets_num,
145+
last_bucket_size,
146+
len(self.belts)
147+
)
148148

149149
# time to flush data for specified key
150150
self.writer_builder.param('csv_file_path_suffix_parts', [str(now), str(self.buckets_count)])
@@ -160,17 +160,17 @@ def rotate_belt(self, belt_index, flush=False):
160160
# can calculate belt speed
161161
window_size = now - self.prev_time
162162
buckets_per_sec = (self.buckets_count - self.prev_buckets_count)/window_size
163-
buckets_content_per_sec = (self.buckets_content_count - self.prev_buckets_content_count)/window_size
163+
items_per_sec = (self.items_count - self.prev_items_count) / window_size
164164
logging.info(
165-
'buckets per sec:%f buckets content per sec:%f for last %d sec',
165+
'buckets_per_sec:%f items_per_sec:%f for last %d sec',
166166
buckets_per_sec,
167-
buckets_content_per_sec,
167+
items_per_sec,
168168
window_size
169169
)
170170

171171
self.prev_time = now
172172
self.prev_buckets_count = self.buckets_count
173-
self.prev_buckets_content_count = self.buckets_content_count
173+
self.prev_items_count = self.items_count
174174

175175
# belt rotated
176176
return True

src/reader/mysqlreader.py

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -73,13 +73,17 @@ def read(self):
7373
start_timestamp = int(time.time())
7474
# fetch events
7575
try:
76+
prev_stat_time = time.time()
77+
rows_num = 0
78+
7679
while True:
77-
logging.info('Check events in binlog stream')
80+
logging.debug('Check events in binlog stream')
7881
for mysql_event in self.binlog_stream:
7982
if isinstance(mysql_event, WriteRowsEvent):
8083
if self.subscribers('WriteRowsEvent'):
8184
self.write_rows_event_num += 1
82-
logging.info('WriteRowsEvent #%d rows: %d', self.write_rows_event_num, len(mysql_event.rows))
85+
logging.debug('WriteRowsEvent #%d rows: %d', self.write_rows_event_num, len(mysql_event.rows))
86+
rows_num += len(mysql_event.rows)
8387
event = Event()
8488
event.schema = mysql_event.schema
8589
event.table = mysql_event.table
@@ -90,8 +94,9 @@ def read(self):
9094

9195
if self.subscribers('WriteRowsEvent.EachRow'):
9296
self.write_rows_event_each_row_num += 1
93-
logging.info('WriteRowsEvent.EachRow #%d', self.write_rows_event_each_row_num)
97+
logging.debug('WriteRowsEvent.EachRow #%d', self.write_rows_event_each_row_num)
9498
for row in mysql_event.rows:
99+
rows_num += 1
95100
event = Event()
96101
event.schema = mysql_event.schema
97102
event.table = mysql_event.table
@@ -101,8 +106,21 @@ def read(self):
101106
# skip non-insert events
102107
pass
103108

109+
now = time.time()
110+
if now > prev_stat_time + 60:
111+
# time to calc stat
112+
window_size = now - prev_stat_time
113+
rows_per_sec = rows_num / window_size
114+
logging.info(
115+
'rows_per_sec:%f for last %f sec',
116+
rows_per_sec,
117+
window_size
118+
)
119+
prev_stat_time = now
120+
rows_num = 0
121+
104122
if not self.blocking:
105-
break
123+
break # while True
106124

107125
# blocking
108126
if self.nice_pause > 0:

src/writer/csvwriter.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ def insert(self, event_or_events):
7373
if len(events) < 1:
7474
return
7575

76-
logging.debug('class:%s insert %d rows', __class__, len(events))
76+
logging.debug('class:%s insert %d events', __class__, len(events))
7777

7878
if not self.opened():
7979
self.open()

src/writer/processwriter.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,13 @@ def open(self):
2424
pass
2525

2626
def process(self, event_or_events=None):
27-
logging.debug('class:%s starting process', __class__)
27+
logging.debug('class:%s process()', __class__)
2828
writer = self.next_writer_builder.get()
2929
writer.insert(event_or_events)
3030
writer.close()
3131
writer.push()
3232
writer.destroy()
33-
logging.debug('class:%s ending process', __class__)
33+
logging.debug('class:%s process() done', __class__)
3434

3535
def insert(self, event_or_events=None):
3636
# event_or_events = [
@@ -44,11 +44,11 @@ def insert(self, event_or_events=None):
4444
logging.debug('class:%s insert', __class__)
4545
process = mp.Process(target=self.process, args=(event_or_events,))
4646

47-
logging.debug('class:%s insert - starting process', __class__)
47+
logging.debug('class:%s insert.process.start()', __class__)
4848
process.start()
4949

5050
#process.join()
51-
logging.debug('class:%s insert - done process', __class__)
51+
logging.debug('class:%s insert done', __class__)
5252
pass
5353

5454
def flush(self):

0 commit comments

Comments
 (0)