Skip to content

Commit 69133e9

Browse files
committed
Add RSS statistics collecting
Found that some tests may fail due to lack of memory. Mostly it happens in CI on remote hosts. To be able to collect memory used statistic decided to add RSS memory status collecting routine get_proc_stat_rss() which parses files: /proc/<worker pid>/status for RSS value 'VmRSS' which is size of memory portions. It contains the three following parts (VmRSS = RssAnon + RssFile + RssShmem) [1]: RssAnon - size of resident anonymous memory RssFile - size of resident file mappings RssShmem - size of resident shmem memory (includes SysV shm, mapping of tmpfs and shared anonymous mappings) Decided that the best way for CI not to run this RSS collecting routine for each sent command from tests tasks, but to run it when the test task started and each 0.1 second delay after, to collect maximun RSS value. Also created new listener 'RSSMonitor' which has the following routines and its use: process_result() - called when test task started and finished: Using 'WorkerCurrentTask' queue it saves the initial RSS value of the used worker when task started to run. Using 'WorkerTaskResult' queue collect tasks that failed. process_timeout() - called after each 0.1 sec delay of the task run. It saves/updates worker RSS value for the current test task choosing its maximum values. print_statistics() - statistics printing to stdout after testing. Checked and choosed RSS memory limit of 30000 kB RSS below which tests are not interesting to show except failed. Created new subdirectory 'statistics' in 'vardir' path to save statistics files. The current patch uses it to save there 'rss.log' file with RSS values per tested tasks in format: <test task name> <maximum RSS value> Closes tarantool/tarantool-qa#98 [1]: https://www.kernel.org/doc/html/latest/filesystems/proc.html
1 parent e698240 commit 69133e9

File tree

4 files changed

+92
-5
lines changed

4 files changed

+92
-5
lines changed

dispatcher.py

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
from listeners import FailWatcher
4242
from listeners import HangWatcher
4343
from listeners import LogOutputWatcher
44+
from listeners import RSSMonitor
4445
from listeners import OutputWatcher
4546
from listeners import StatisticsWatcher
4647

@@ -121,19 +122,20 @@ def __init__(self, task_groups, max_workers_cnt, randomize):
121122
self.result_queues.append(task_queue_disp.result_queue)
122123
self.task_queues.append(task_queue_disp.task_queue)
123124

124-
self.report_timeout = 1.0
125+
self.report_timeout = 0.1
126+
127+
self.pid_to_worker_id = dict()
128+
self.worker_id_to_pid = dict()
125129

126130
self.statistics = None
127131
self.artifacts = None
132+
self.rss_monitor = None
128133
self.fail_watcher = None
129134
self.listeners = None
130135
self.init_listeners()
131136

132137
self.max_workers_cnt = min(max_workers_cnt, tasks_cnt)
133138

134-
self.pid_to_worker_id = dict()
135-
self.worker_id_to_pid = dict()
136-
137139
self.randomize = randomize
138140
self.tcp_port_dispatcher = TcpPortDispatcher(
139141
range_count=max_workers_cnt)
@@ -166,7 +168,9 @@ def init_listeners(self):
166168
self.statistics = StatisticsWatcher(log_output_watcher.get_logfile)
167169
self.artifacts = ArtifactsWatcher(log_output_watcher.get_logfile)
168170
output_watcher = OutputWatcher()
169-
self.listeners = [self.statistics, log_output_watcher, output_watcher, self.artifacts]
171+
self.rss_monitor = RSSMonitor(output_watcher.not_done_worker_ids, self.worker_id_to_pid)
172+
self.listeners = [self.statistics, log_output_watcher, output_watcher, self.artifacts,
173+
self.rss_monitor]
170174
if watch_fail:
171175
self.fail_watcher = FailWatcher(self.terminate_all_workers)
172176
self.listeners.append(self.fail_watcher)

lib/utils.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,21 @@ def format_process(pid):
233233
return 'process %d [%s; %s]' % (pid, status, cmdline)
234234

235235

236+
def get_proc_stat_rss(pid):
237+
rss = 0
238+
try:
239+
with open('/proc/%d/status' % pid, 'r') as f:
240+
for line in f:
241+
if ':' not in line:
242+
continue
243+
key, value = line.split(':', 1)
244+
if key == 'VmRSS':
245+
rss = int(value.strip().split()[0])
246+
except (OSError, IOError):
247+
pass
248+
return rss
249+
250+
236251
def set_fd_cloexec(socket):
237252
flags = fcntl.fcntl(socket, fcntl.F_GETFD)
238253
fcntl.fcntl(socket, fcntl.F_SETFD, flags | fcntl.FD_CLOEXEC)

listeners.py

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
from lib.utils import safe_makedirs
1616
from lib.utils import print_tail_n
1717
from lib.utils import print_unidiff
18+
from lib.utils import get_proc_stat_rss
1819

1920

2021
class BaseWatcher(object):
@@ -178,6 +179,72 @@ def __del__(self):
178179
pass
179180

180181

182+
class RSSMonitor(BaseWatcher):
183+
def __init__(self, get_not_done_worker_ids, worker_id_to_pid):
184+
self.get_not_done_worker_ids = get_not_done_worker_ids
185+
self.worker_id_to_pid = worker_id_to_pid
186+
self.activity = 0.0
187+
self.worker_current_task = dict()
188+
self.failed_tasks = dict()
189+
self.rss_results = dict()
190+
191+
def process_result(self, obj):
192+
if isinstance(obj, WorkerCurrentTask):
193+
self.worker_current_task[obj.worker_id] = obj
194+
self.rss_results[obj.task_name] = get_proc_stat_rss(
195+
self.worker_id_to_pid[obj.worker_id])
196+
197+
if isinstance(obj, WorkerTaskResult):
198+
if obj.short_status == 'fail':
199+
self.failed_tasks[obj.task_id[0]] = 1
200+
201+
def process_timeout(self, delta_seconds):
202+
rss = None
203+
self.activity += delta_seconds
204+
worker_ids = self.get_not_done_worker_ids()
205+
206+
running_tasks = [task for worker_id, task
207+
in self.worker_current_task.items()
208+
if worker_id in worker_ids]
209+
for task in running_tasks:
210+
if task.task_name in self.rss_results:
211+
rss = get_proc_stat_rss(self.worker_id_to_pid[task.worker_id])
212+
if rss > self.rss_results[task.task_name]:
213+
self.rss_results[task.task_name] = rss
214+
215+
def print_statistics(self):
216+
rss_print_limit = 30000
217+
tests_not_found = True
218+
219+
"""Returns are there failed tasks."""
220+
if not self.rss_results:
221+
return
222+
223+
if self.failed_tasks:
224+
color_stdout('RSS of the failed tasks:\n', schema='test_var')
225+
for task_name in self.failed_tasks:
226+
if task_name in self.rss_results:
227+
color_stdout('* %s: %d\n' % (task_name, self.rss_results[task_name]),
228+
schema='test_var')
229+
color_stdout('Tasks used more than {} kB of RSS:\n' . format(rss_print_limit),
230+
schema='test_var')
231+
for task_name in self.rss_results:
232+
if task_name not in self.failed_tasks and self.rss_results[task_name] > rss_print_limit:
233+
color_stdout('* %s: %d\n' % (task_name, self.rss_results[task_name]),
234+
schema='test_var')
235+
tests_not_found = False
236+
if tests_not_found:
237+
color_stdout('* tasks not found that used such RSS !\n', schema='test_var')
238+
239+
stats_dir = os.path.join(Options().args.vardir, 'statistics')
240+
safe_makedirs(stats_dir)
241+
filepath = os.path.join(stats_dir, 'rss.log')
242+
fd = open(filepath, 'w')
243+
for task_name in self.rss_results:
244+
fd.write("{} {}\n" . format(task_name, self.rss_results[task_name]))
245+
fd.close()
246+
247+
181248
class OutputWatcher(BaseWatcher):
182249
def __init__(self):
183250
self.buffer = dict()

test-run.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@ def main_loop_parallel():
119119
dispatcher.wait()
120120
dispatcher.wait_processes()
121121
color_stdout('-' * 81, "\n", schema='separator')
122+
dispatcher.rss_monitor.print_statistics()
122123
has_failed = dispatcher.statistics.print_statistics()
123124
has_undone = dispatcher.report_undone(
124125
verbose=bool(is_force or not has_failed))

0 commit comments

Comments
 (0)