|
15 | 15 | from lib.utils import safe_makedirs
|
16 | 16 | from lib.utils import print_tail_n
|
17 | 17 | from lib.utils import print_unidiff
|
| 18 | +from lib.utils import get_proc_stat_rss |
18 | 19 |
|
19 | 20 |
|
20 | 21 | class BaseWatcher(object):
|
@@ -178,6 +179,81 @@ def __del__(self):
|
178 | 179 | pass
|
179 | 180 |
|
180 | 181 |
|
| 182 | +class StatisticsMonitor(BaseWatcher): |
| 183 | + def __init__(self, get_not_done_worker_ids, worker_id_to_pid): |
| 184 | + self.get_not_done_worker_ids = get_not_done_worker_ids |
| 185 | + self.worker_id_to_pid = worker_id_to_pid |
| 186 | + self.worker_current_task = dict() |
| 187 | + self.failed_tasks = dict() |
| 188 | + self.rss_results = dict() |
| 189 | + |
| 190 | + def process_result(self, obj): |
| 191 | + # called only once on task run initiated |
| 192 | + if isinstance(obj, WorkerCurrentTask): |
| 193 | + self.worker_current_task[obj.worker_id] = obj |
| 194 | + self.rss_results[obj.task_name] = get_proc_stat_rss( |
| 195 | + self.worker_id_to_pid[obj.worker_id]) |
| 196 | + |
| 197 | + # called only once on task run finished |
| 198 | + if isinstance(obj, WorkerTaskResult): |
| 199 | + task_name = obj.task_id[0] |
| 200 | + if obj.short_status == 'fail': |
| 201 | + self.failed_tasks[task_name] = 1 |
| 202 | + |
| 203 | + # called on each 'delta_seconds' delay within task run |
| 204 | + def process_timeout(self, delta_seconds): |
| 205 | + rss = None |
| 206 | + worker_ids = self.get_not_done_worker_ids() |
| 207 | + |
| 208 | + running_tasks = [task for worker_id, task |
| 209 | + in self.worker_current_task.items() |
| 210 | + if worker_id in worker_ids] |
| 211 | + for task in running_tasks: |
| 212 | + if task.task_name in self.rss_results: |
| 213 | + rss = get_proc_stat_rss(self.worker_id_to_pid[task.worker_id]) |
| 214 | + if rss > self.rss_results[task.task_name]: |
| 215 | + self.rss_results[task.task_name] = rss |
| 216 | + |
| 217 | + # called only once after all tasks finished |
| 218 | + def print_statistics(self): |
| 219 | + rss_print_limit = 30000 |
| 220 | + tests_not_found = True |
| 221 | + |
| 222 | + # prepare standalone subpath '<vardir>/statistics' for statistics files |
| 223 | + stats_dir = os.path.join(Options().args.vardir, 'statistics') |
| 224 | + safe_makedirs(stats_dir) |
| 225 | + |
| 226 | + # print RSS statistics to stdout and 'rss.log' file |
| 227 | + if self.rss_results: |
| 228 | + return |
| 229 | + |
| 230 | + # print to stdout RSS statistics for all failed tasks |
| 231 | + if self.failed_tasks: |
| 232 | + color_stdout('RSS of the failed tasks:\n', schema='test_var') |
| 233 | + for task_name in self.failed_tasks: |
| 234 | + if task_name in self.rss_results: |
| 235 | + color_stdout('* %s: %d\n' % (task_name, self.rss_results[task_name]), |
| 236 | + schema='test_var') |
| 237 | + # Print to stdout RSS statistics for tasks with RSS bigger than |
| 238 | + # checked limit except failed tasks that already printed above. |
| 239 | + color_stdout('Tasks used more than {} kB of RSS:\n' . format(rss_print_limit), |
| 240 | + schema='test_var') |
| 241 | + for task_name in self.rss_results: |
| 242 | + if task_name not in self.failed_tasks and self.rss_results[task_name] > rss_print_limit: |
| 243 | + color_stdout('* %s: %d\n' % (task_name, self.rss_results[task_name]), |
| 244 | + schema='test_var') |
| 245 | + tests_not_found = False |
| 246 | + if tests_not_found: |
| 247 | + color_stdout('* tasks not found that used such RSS !\n', schema='test_var') |
| 248 | + |
| 249 | + # print RSS statistics to '<vardir>/statistics/rss.log' file |
| 250 | + filepath = os.path.join(stats_dir, 'rss.log') |
| 251 | + fd = open(filepath, 'w') |
| 252 | + for task_name in self.rss_results: |
| 253 | + fd.write("{} {}\n" . format(task_name, self.rss_results[task_name])) |
| 254 | + fd.close() |
| 255 | + |
| 256 | + |
181 | 257 | class OutputWatcher(BaseWatcher):
|
182 | 258 | def __init__(self):
|
183 | 259 | self.buffer = dict()
|
|
0 commit comments