Skip to content

Commit 6cdaae9

Browse files
committed
Add RSS statistics collecting
Found that some tests may fail due to lack of memory. Mostly it happens in CI on remote hosts. To be able to collect memory used statistic decided to add RSS memory status collecting routine get_proc_stat_rss() which parses files: /proc/<worker pid>/status for RSS value 'VmRSS' which is size of memory portions. It contains the three following parts (VmRSS = RssAnon + RssFile + RssShmem) [1]: RssAnon - size of resident anonymous memory RssFile - size of resident file mappings RssShmem - size of resident shmem memory (includes SysV shm, mapping of tmpfs and shared anonymous mappings) Decided that the best way for CI not to run this RSS collecting routine for each sent command from tests tasks, but to run it when the test task started and each 0.1 second delay after, to collect maximun RSS value. For this delay used already existed delay of 1.0 sec used in listener 'HangWatcher'. Found that its change from 1.0 to 0.1 didn't increase the testing times. Also found that delay of 0.1 sec is completely enough to catch RSS use increase, due to tested check: tarantool> require('clock').bench(function() local t = {} for i = 1, 1024^2 * 100 do t[i] = true end end) Which checked that 100 Mb of data allocated in seconds: - on CI test host: 3.153877479 - on local fast host: 0.54504489 Also created new listener 'StatisticsMonitor' which has the following routines and its use: process_result() - called when test task started and finished: Using 'WorkerCurrentTask' queue it saves the initial RSS value of the used worker when task started to run. Using 'WorkerTaskResult' queue collect tasks that failed. process_timeout() - called after each 0.1 sec delay of the task run. It saves/updates worker RSS value for the current test task choosing its maximum values. print_statistics() - statistics printing to stdout after testing. Prints RSS usage for failed tasks and up to 5 most used it tasks. Created new subdirectory 'statistics' in 'vardir' path to save statistics files. The current patch uses it to save there 'rss.log' file with RSS values per tested tasks in format: <test task name> <maximum RSS value> Closes tarantool/tarantool-qa#98 [1]: https://www.kernel.org/doc/html/latest/filesystems/proc.html
1 parent a4347e8 commit 6cdaae9

File tree

5 files changed

+118
-5
lines changed

5 files changed

+118
-5
lines changed

dispatcher.py

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
from listeners import LogOutputWatcher
4545
from listeners import OutputWatcher
4646
from listeners import StatisticsWatcher
47+
from listeners import StatisticsMonitor
4748

4849

4950
class TcpPortDispatcher:
@@ -122,19 +123,20 @@ def __init__(self, task_groups, max_workers_cnt, randomize):
122123
self.result_queues.append(task_queue_disp.result_queue)
123124
self.task_queues.append(task_queue_disp.task_queue)
124125

125-
self.report_timeout = 1.0
126+
self.report_timeout = 0.1
127+
128+
self.pid_to_worker_id = dict()
129+
self.worker_id_to_pid = dict()
126130

127131
self.statistics = None
128132
self.artifacts = None
133+
self.stat_monitor = None
129134
self.fail_watcher = None
130135
self.listeners = None
131136
self.init_listeners()
132137

133138
self.max_workers_cnt = min(max_workers_cnt, tasks_cnt)
134139

135-
self.pid_to_worker_id = dict()
136-
self.worker_id_to_pid = dict()
137-
138140
self.randomize = randomize
139141
self.tcp_port_dispatcher = TcpPortDispatcher(
140142
range_count=max_workers_cnt)
@@ -167,8 +169,10 @@ def init_listeners(self):
167169
self.statistics = StatisticsWatcher(log_output_watcher.get_logfile)
168170
self.artifacts = ArtifactsWatcher(log_output_watcher.get_logfile)
169171
output_watcher = OutputWatcher()
172+
self.stat_monitor = StatisticsMonitor(output_watcher.not_done_worker_ids,
173+
self.worker_id_to_pid)
170174
self.listeners = [self.statistics, log_output_watcher, output_watcher, self.artifacts,
171-
sampler.watcher]
175+
sampler.watcher, self.stat_monitor]
172176
if watch_fail:
173177
self.fail_watcher = FailWatcher(self.terminate_all_workers)
174178
self.listeners.append(self.fail_watcher)

lib/sampler.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
from lib.colorer import color_log
66
from lib.colorer import qa_notice
77
from lib.utils import format_process
8+
from lib.utils import get_proc_stat_rss
89

910

1011
if sys.version_info[0] == 2:
@@ -31,6 +32,10 @@ def __init__(self, sampler):
3132
self._last_sample = 0
3233
self._sample_interval = 0.1 # seconds
3334
self._warn_interval = self._sample_interval * 4
35+
self.rss_procs_results = dict()
36+
37+
def get_rss_procs_results(self):
38+
return self._sampler.rss_results
3439

3540
def process_result(self, obj):
3641
if isinstance(obj, RegisterProcessMessage):
@@ -55,6 +60,7 @@ def _wakeup(self):
5560
qa_notice(template.format(self._sample_interval, self._warn_interval,
5661
delta))
5762
if delta > self._sample_interval:
63+
self._sampler.collect_rss()
5864
self._sampler._sample()
5965
self._last_sample = now
6066

@@ -72,6 +78,8 @@ def __init__(self):
7278
self._watcher = SamplerWatcher(self)
7379

7480
self.processes = dict()
81+
self.tasks_pids = dict()
82+
self.rss_results = dict()
7583

7684
def set_queue(self, queue, worker_id, worker_name):
7785
# Called from a worker process (_run_worker()).
@@ -80,6 +88,20 @@ def set_queue(self, queue, worker_id, worker_name):
8088
self._queue = queue
8189
self._watcher = None
8290

91+
def collect_rss(self):
92+
# loop for each registerred task
93+
for task in self.tasks_pids:
94+
pids_rss = 0
95+
# loop for each registerred pid
96+
for pid in self.tasks_pids[task]:
97+
# refresh current RSS value of the pid
98+
self.tasks_pids[task][pid] = get_proc_stat_rss(pid)
99+
pids_rss = pids_rss + self.tasks_pids[task][pid]
100+
# save current overall RSS value if it is bigger that saved
101+
task_name = task[0] + ((':' + task[1]) if task[1] else '')
102+
if task_name not in self.rss_results or self.rss_results[task_name] < pids_rss:
103+
self.rss_results[task_name] = pids_rss
104+
83105
@property
84106
def watcher(self):
85107
if not self._watcher:
@@ -102,6 +124,9 @@ def register_process(self, pid, task_id, server_name, worker_id=None,
102124
'worker_id': worker_id,
103125
'worker_name': worker_name,
104126
}
127+
if task_id not in self.tasks_pids:
128+
self.tasks_pids[task_id] = dict()
129+
self.tasks_pids[task_id][pid] = get_proc_stat_rss(pid)
105130
self._log('register', pid)
106131
return
107132

@@ -119,6 +144,9 @@ def unregister_process(self, pid):
119144

120145
self._log('unregister', pid)
121146
del self.processes[pid]
147+
for task_id in self.tasks_pids:
148+
if pid in self.tasks_pids[task_id]:
149+
del self.tasks_pids[task_id][pid]
122150

123151
def _log(self, event, pid):
124152
# Those logs are not written due to gh-247.

lib/utils.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,21 @@ def format_process(pid):
233233
return 'process %d [%s; %s]' % (pid, status, cmdline)
234234

235235

236+
def get_proc_stat_rss(pid):
237+
rss = 0
238+
try:
239+
with open('/proc/%d/status' % pid, 'r') as f:
240+
for line in f:
241+
if ':' not in line:
242+
continue
243+
key, value = line.split(':', 1)
244+
if key == 'VmRSS':
245+
rss = int(value.strip().split()[0])
246+
except (OSError, IOError):
247+
pass
248+
return rss
249+
250+
236251
def set_fd_cloexec(socket):
237252
flags = fcntl.fcntl(socket, fcntl.F_GETFD)
238253
fcntl.fcntl(socket, fcntl.F_SETFD, flags | fcntl.FD_CLOEXEC)

listeners.py

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
from lib.utils import safe_makedirs
1616
from lib.utils import print_tail_n
1717
from lib.utils import print_unidiff
18+
from lib.utils import get_vardir_device
19+
from lib.utils import get_disk_bound_stat_busy
1820

1921

2022
class BaseWatcher(object):
@@ -178,6 +180,67 @@ def __del__(self):
178180
pass
179181

180182

183+
class StatisticsMonitor(BaseWatcher):
184+
def __init__(self, get_not_done_worker_ids, worker_id_to_pid):
185+
self.get_not_done_worker_ids = get_not_done_worker_ids
186+
self.worker_id_to_pid = worker_id_to_pid
187+
self.failed_tasks = dict()
188+
self.rss_results = dict()
189+
190+
# task name set as test name with running configuration
191+
def get_task_name_conf(self, name, conf):
192+
if not conf:
193+
return name
194+
return name + ':' + conf
195+
196+
def process_result(self, obj):
197+
# called only once on task run finished
198+
if isinstance(obj, WorkerTaskResult):
199+
task_name = self.get_task_name_conf(obj.task_id[0], obj.task_id[1])
200+
if obj.short_status == 'fail':
201+
self.failed_tasks[task_name] = 1
202+
203+
def set_procs_rss(self, rss):
204+
self.rss_results = rss
205+
206+
# called only once after all tasks finished
207+
def print_statistics(self):
208+
top_rss = 10
209+
top_timings = 10
210+
results_sorted = []
211+
212+
# prepare standalone subpath '<vardir>/statistics' for statistics files
213+
stats_dir = os.path.join(Options().args.vardir, 'statistics')
214+
safe_makedirs(stats_dir)
215+
216+
# RSS
217+
218+
# print to stdout RSS statistics for all failed tasks
219+
if self.failed_tasks:
220+
color_stdout('RSS of the failed tasks:\n', schema='test_var')
221+
for task_name in self.failed_tasks:
222+
if task_name in self.rss_results:
223+
color_stdout('* %6.1f %s\n' % (self.rss_results[task_name] / 1000, task_name),
224+
schema='test_var')
225+
226+
# print to stdout RSS statistics for some number of most it used tasks
227+
color_stdout('Up to {} most RSS used tasks in Mb:\n' . format(top_rss), schema='test_var')
228+
results_sorted = sorted(self.rss_results.items(), key=lambda x: x[1], reverse=True)
229+
count = 0
230+
for task_name, rss in results_sorted:
231+
if count < top_rss:
232+
color_stdout('* %6.1f %s\n' % (int(rss) / 1000, task_name), schema='test_var')
233+
count = count + 1
234+
color_stdout('-' * 41, "\n", schema='separator')
235+
236+
# print RSS statistics to '<vardir>/statistics/rss.log' file
237+
filepath = os.path.join(stats_dir, 'rss.log')
238+
fd = open(filepath, 'w')
239+
for task_name in self.rss_results:
240+
fd.write("{} {}\n" . format(task_name, self.rss_results[task_name]))
241+
fd.close()
242+
243+
181244
class OutputWatcher(BaseWatcher):
182245
def __init__(self):
183246
self.buffer = dict()

test-run.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
import time
5555

5656
from lib import Options
57+
from lib.sampler import sampler
5758
from lib.colorer import color_stdout
5859
from lib.utils import print_tail_n
5960
from lib.utils import PY3
@@ -119,6 +120,8 @@ def main_loop_parallel():
119120
dispatcher.wait()
120121
dispatcher.wait_processes()
121122
color_stdout('-' * 81, "\n", schema='separator')
123+
dispatcher.stat_monitor.set_procs_rss(sampler.watcher.get_rss_procs_results())
124+
dispatcher.stat_monitor.print_statistics()
122125
has_failed = dispatcher.statistics.print_statistics()
123126
has_undone = dispatcher.report_undone(
124127
verbose=bool(is_force or not has_failed))

0 commit comments

Comments
 (0)