Skip to content

Commit d4be09b

Browse files
committed
Add RSS statistics collecting
Found that some tests may fail due to lack of memory. Mostly it happens in CI on remote hosts. To be able to collect memory used statistic decided to add RSS memory status collecting routine get_proc_stat_rss() which parses files: /proc/<worker pid>/status for RSS value 'VmRSS' which is size of memory portions. It contains the three following parts (VmRSS = RssAnon + RssFile + RssShmem) [1]: RssAnon - size of resident anonymous memory RssFile - size of resident file mappings RssShmem - size of resident shmem memory (includes SysV shm, mapping of tmpfs and shared anonymous mappings) Decided that the best way for CI not to run this RSS collecting routine for each sent command from tests tasks, but to run it when the test task started and each 0.1 second delay after, to collect maximun RSS value. For this delay used already existed delay of 1.0 sec used in listener 'HangWatcher'. Found that its change from 1.0 to 0.1 didn't increase the testing times. Also found that delay of 0.1 sec is completely enough to catch RSS use increase, due to tested check: tarantool> require('clock').bench(function() local t = {} for i = 1, 1024^2 * 100 do t[i] = true end end) Which checked that 100 Mb of data allocated in seconds: - on CI test host: 3.153877479 - on local fast host: 0.54504489 Also created new listener 'StatisticsMonitor' which has the following routines and its use: process_result() - called when test task started and finished: Using 'WorkerCurrentTask' queue it saves the initial RSS value of the used worker when task started to run. Using 'WorkerTaskResult' queue collect tasks that failed. process_timeout() - called after each 0.1 sec delay of the task run. It saves/updates worker RSS value for the current test task choosing its maximum values. print_statistics() - statistics printing to stdout after testing. Prints RSS usage for failed tasks and up to 5 most used it tasks. Created new subdirectory 'statistics' in 'vardir' path to save statistics files. The current patch uses it to save there 'rss.log' file with RSS values per tested tasks in format: <test task name> <maximum RSS value> Closes tarantool/tarantool-qa#98 [1]: https://www.kernel.org/doc/html/latest/filesystems/proc.html
1 parent e698240 commit d4be09b

File tree

4 files changed

+107
-5
lines changed

4 files changed

+107
-5
lines changed

dispatcher.py

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
from listeners import LogOutputWatcher
4444
from listeners import OutputWatcher
4545
from listeners import StatisticsWatcher
46+
from listeners import StatisticsMonitor
4647

4748

4849
class TcpPortDispatcher:
@@ -121,19 +122,20 @@ def __init__(self, task_groups, max_workers_cnt, randomize):
121122
self.result_queues.append(task_queue_disp.result_queue)
122123
self.task_queues.append(task_queue_disp.task_queue)
123124

124-
self.report_timeout = 1.0
125+
self.report_timeout = 0.1
126+
127+
self.pid_to_worker_id = dict()
128+
self.worker_id_to_pid = dict()
125129

126130
self.statistics = None
127131
self.artifacts = None
132+
self.stat_monitor = None
128133
self.fail_watcher = None
129134
self.listeners = None
130135
self.init_listeners()
131136

132137
self.max_workers_cnt = min(max_workers_cnt, tasks_cnt)
133138

134-
self.pid_to_worker_id = dict()
135-
self.worker_id_to_pid = dict()
136-
137139
self.randomize = randomize
138140
self.tcp_port_dispatcher = TcpPortDispatcher(
139141
range_count=max_workers_cnt)
@@ -166,7 +168,10 @@ def init_listeners(self):
166168
self.statistics = StatisticsWatcher(log_output_watcher.get_logfile)
167169
self.artifacts = ArtifactsWatcher(log_output_watcher.get_logfile)
168170
output_watcher = OutputWatcher()
169-
self.listeners = [self.statistics, log_output_watcher, output_watcher, self.artifacts]
171+
self.stat_monitor = StatisticsMonitor(output_watcher.not_done_worker_ids,
172+
self.worker_id_to_pid)
173+
self.listeners = [self.statistics, log_output_watcher, output_watcher, self.artifacts,
174+
self.stat_monitor]
170175
if watch_fail:
171176
self.fail_watcher = FailWatcher(self.terminate_all_workers)
172177
self.listeners.append(self.fail_watcher)

lib/utils.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,21 @@ def format_process(pid):
233233
return 'process %d [%s; %s]' % (pid, status, cmdline)
234234

235235

236+
def get_proc_stat_rss(pid):
237+
rss = 0
238+
try:
239+
with open('/proc/%d/status' % pid, 'r') as f:
240+
for line in f:
241+
if ':' not in line:
242+
continue
243+
key, value = line.split(':', 1)
244+
if key == 'VmRSS':
245+
rss = int(value.strip().split()[0])
246+
except (OSError, IOError):
247+
pass
248+
return rss
249+
250+
236251
def set_fd_cloexec(socket):
237252
flags = fcntl.fcntl(socket, fcntl.F_GETFD)
238253
fcntl.fcntl(socket, fcntl.F_SETFD, flags | fcntl.FD_CLOEXEC)

listeners.py

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
from lib.utils import safe_makedirs
1616
from lib.utils import print_tail_n
1717
from lib.utils import print_unidiff
18+
from lib.utils import get_proc_stat_rss
1819

1920

2021
class BaseWatcher(object):
@@ -178,6 +179,86 @@ def __del__(self):
178179
pass
179180

180181

182+
class StatisticsMonitor(BaseWatcher):
183+
def __init__(self, get_not_done_worker_ids, worker_id_to_pid):
184+
self.get_not_done_worker_ids = get_not_done_worker_ids
185+
self.worker_id_to_pid = worker_id_to_pid
186+
self.worker_current_task = dict()
187+
self.failed_tasks = dict()
188+
self.rss_results = dict()
189+
190+
# task name set as test name with running configuration
191+
def get_task_name_conf(self, name, conf):
192+
if not conf:
193+
return name
194+
return name + ':' + conf
195+
196+
def process_result(self, obj):
197+
# called only once on task run initiated
198+
if isinstance(obj, WorkerCurrentTask):
199+
task_name = self.get_task_name_conf(obj.task_name, obj.task_param)
200+
self.worker_current_task[obj.worker_id] = obj
201+
self.rss_results[task_name] = get_proc_stat_rss(
202+
self.worker_id_to_pid[obj.worker_id])
203+
204+
# called only once on task run finished
205+
if isinstance(obj, WorkerTaskResult):
206+
task_name = self.get_task_name_conf(obj.task_id[0], obj.task_id[1])
207+
if obj.short_status == 'fail':
208+
self.failed_tasks[task_name] = 1
209+
210+
# called on each 'delta_seconds' delay within task run
211+
def process_timeout(self, delta_seconds):
212+
rss = None
213+
worker_ids = self.get_not_done_worker_ids()
214+
215+
running_tasks = [task for worker_id, task
216+
in self.worker_current_task.items()
217+
if worker_id in worker_ids]
218+
for task in running_tasks:
219+
task_name = self.get_task_name_conf(task.task_name, task.task_param)
220+
if task_name in self.rss_results:
221+
rss = get_proc_stat_rss(self.worker_id_to_pid[task.worker_id])
222+
if rss > self.rss_results[task_name]:
223+
self.rss_results[task_name] = rss
224+
225+
# called only once after all tasks finished
226+
def print_statistics(self):
227+
top_rss = 5
228+
results_sorted = []
229+
230+
# prepare standalone subpath '<vardir>/statistics' for statistics files
231+
stats_dir = os.path.join(Options().args.vardir, 'statistics')
232+
safe_makedirs(stats_dir)
233+
234+
# RSS
235+
236+
# print to stdout RSS statistics for all failed tasks
237+
if self.failed_tasks:
238+
color_stdout('RSS of the failed tasks:\n', schema='test_var')
239+
for task_name in self.failed_tasks:
240+
if task_name in self.rss_results:
241+
color_stdout('* %6d %s\n' % (self.rss_results[task_name], task_name),
242+
schema='test_var')
243+
244+
# print to stdout RSS statistics for some number of most it used tasks
245+
color_stdout('Up to {} most RSS used tasks in kB:\n' . format(top_rss), schema='test_var')
246+
results_sorted = sorted(self.rss_results.items(), key=lambda x: x[1], reverse=True)
247+
count = 0
248+
for task_name, rss in results_sorted:
249+
if count < top_rss:
250+
color_stdout('* %6s %s\n' % (rss, task_name), schema='test_var')
251+
count = count + 1
252+
color_stdout('-' * 41, "\n", schema='separator')
253+
254+
# print RSS statistics to '<vardir>/statistics/rss.log' file
255+
filepath = os.path.join(stats_dir, 'rss.log')
256+
fd = open(filepath, 'w')
257+
for task_name in self.rss_results:
258+
fd.write("{} {}\n" . format(task_name, self.rss_results[task_name]))
259+
fd.close()
260+
261+
181262
class OutputWatcher(BaseWatcher):
182263
def __init__(self):
183264
self.buffer = dict()

test-run.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@ def main_loop_parallel():
119119
dispatcher.wait()
120120
dispatcher.wait_processes()
121121
color_stdout('-' * 81, "\n", schema='separator')
122+
dispatcher.stat_monitor.print_statistics()
122123
has_failed = dispatcher.statistics.print_statistics()
123124
has_undone = dispatcher.report_undone(
124125
verbose=bool(is_force or not has_failed))

0 commit comments

Comments
 (0)