Skip to content

Commit 10492db

Browse files
committed
Add RSS memory status collecting routine
Added RSS memory status collecting routine which parses file: /sys/fs/cgroup/memory/memory.stat Closes tarantool/tarantool-qa#98 got RSS in KB per worker RSS with statistics
1 parent e698240 commit 10492db

File tree

4 files changed

+91
-5
lines changed

4 files changed

+91
-5
lines changed

dispatcher.py

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
from listeners import FailWatcher
4242
from listeners import HangWatcher
4343
from listeners import LogOutputWatcher
44+
from listeners import RSSMonitor
4445
from listeners import OutputWatcher
4546
from listeners import StatisticsWatcher
4647

@@ -121,19 +122,20 @@ def __init__(self, task_groups, max_workers_cnt, randomize):
121122
self.result_queues.append(task_queue_disp.result_queue)
122123
self.task_queues.append(task_queue_disp.task_queue)
123124

124-
self.report_timeout = 1.0
125+
self.report_timeout = 0.1
126+
127+
self.pid_to_worker_id = dict()
128+
self.worker_id_to_pid = dict()
125129

126130
self.statistics = None
127131
self.artifacts = None
132+
self.rss_monitor = None
128133
self.fail_watcher = None
129134
self.listeners = None
130135
self.init_listeners()
131136

132137
self.max_workers_cnt = min(max_workers_cnt, tasks_cnt)
133138

134-
self.pid_to_worker_id = dict()
135-
self.worker_id_to_pid = dict()
136-
137139
self.randomize = randomize
138140
self.tcp_port_dispatcher = TcpPortDispatcher(
139141
range_count=max_workers_cnt)
@@ -166,7 +168,8 @@ def init_listeners(self):
166168
self.statistics = StatisticsWatcher(log_output_watcher.get_logfile)
167169
self.artifacts = ArtifactsWatcher(log_output_watcher.get_logfile)
168170
output_watcher = OutputWatcher()
169-
self.listeners = [self.statistics, log_output_watcher, output_watcher, self.artifacts]
171+
self.rss_monitor = RSSMonitor(output_watcher.not_done_worker_ids, self.worker_id_to_pid)
172+
self.listeners = [self.statistics, log_output_watcher, output_watcher, self.artifacts, self.rss_monitor]
170173
if watch_fail:
171174
self.fail_watcher = FailWatcher(self.terminate_all_workers)
172175
self.listeners.append(self.fail_watcher)

lib/utils.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,21 @@ def format_process(pid):
233233
return 'process %d [%s; %s]' % (pid, status, cmdline)
234234

235235

236+
def get_proc_stat_rss(pid):
237+
rss = 0
238+
try:
239+
with open('/proc/%d/status' % pid, 'r') as f:
240+
for line in f:
241+
if ':' not in line:
242+
continue
243+
key, value = line.split(':', 1)
244+
if key == 'VmRSS':
245+
rss = int(value.strip().split()[0])
246+
except (OSError, IOError):
247+
pass
248+
return rss
249+
250+
236251
def set_fd_cloexec(socket):
237252
flags = fcntl.fcntl(socket, fcntl.F_GETFD)
238253
fcntl.fcntl(socket, fcntl.F_SETFD, flags | fcntl.FD_CLOEXEC)

listeners.py

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import sys
33
import yaml
44
import shutil
5+
import time
56

67
from lib import Options
78
from lib.colorer import color_stdout
@@ -15,6 +16,7 @@
1516
from lib.utils import safe_makedirs
1617
from lib.utils import print_tail_n
1718
from lib.utils import print_unidiff
19+
from lib.utils import get_proc_stat_rss
1820

1921

2022
class BaseWatcher(object):
@@ -51,6 +53,7 @@ def process_result(self, obj):
5153
obj.show_reproduce_content))
5254

5355
def print_statistics(self):
56+
print("ALX ####################\n")
5457
"""Returns are there failed tasks."""
5558
if self.stats:
5659
color_stdout('Statistics:\n', schema='test_var')
@@ -178,6 +181,70 @@ def __del__(self):
178181
pass
179182

180183

184+
class RSSMonitor(BaseWatcher):
185+
def __init__(self, get_not_done_worker_ids, worker_id_to_pid):
186+
self.get_not_done_worker_ids = get_not_done_worker_ids
187+
self.worker_id_to_pid = worker_id_to_pid
188+
self.activity = 0.0
189+
self.worker_current_task = dict()
190+
self.failed_tasks = dict()
191+
self.rss_results = dict()
192+
193+
def process_result(self, obj):
194+
if isinstance(obj, WorkerCurrentTask):
195+
self.worker_current_task[obj.worker_id] = obj
196+
print("ALX process_result ================ {} {}\n" . format(obj.task_name, self.worker_id_to_pid[obj.worker_id]))
197+
self.rss_results[obj.task_name] = get_proc_stat_rss(self.worker_id_to_pid[obj.worker_id])
198+
199+
if isinstance(obj, WorkerTaskResult):
200+
if obj.short_status == 'fail':
201+
print("ALX FAILED process_result ================ {}\n" . format(obj.task_name))
202+
self.failed_tasks[obj.task_name] = 1
203+
204+
def process_timeout(self, delta_seconds):
205+
rss = None
206+
rss_new_array = []
207+
self.activity += delta_seconds
208+
worker_ids = self.get_not_done_worker_ids()
209+
210+
running_tasks = [task for worker_id, task
211+
in self.worker_current_task.items()
212+
if worker_id in worker_ids]
213+
for task in running_tasks:
214+
if task.task_name in self.rss_results:
215+
rss = get_proc_stat_rss(self.worker_id_to_pid[task.worker_id])
216+
if rss > self.rss_results[task.task_name]:
217+
self.rss_results[task.task_name] = rss
218+
print("ALX process_timeout ============ {} sec, pid={} '{}' {}\n" . format(round(self.activity, 1), self.worker_id_to_pid[task.worker_id], task.task_name, rss))
219+
220+
def print_statistics(self):
221+
print("ALX ####################\n")
222+
"""Returns are there failed tasks."""
223+
if self.rss_results:
224+
color_stdout('RSS Statistics:\n', schema='test_var')
225+
for task_name in self.rss_results:
226+
color_stdout('* %s: %d\n' % (task_name, self.rss_results[task_name]), schema='test_var')
227+
228+
if not self.failed_tasks:
229+
return False
230+
231+
color_stdout('Failed tasks:\n', schema='test_var')
232+
for task_id, worker_name, result_checksum, show_reproduce_content in self.failed_tasks:
233+
logfile = self.get_logfile(worker_name)
234+
task_id_str = yaml.safe_dump(task_id, default_flow_style=True)
235+
color_stdout('- %s' % task_id_str, schema='test_var')
236+
color_stdout('# results file checksum: %s\n' % result_checksum)
237+
color_stdout('# logfile: %s\n' % logfile)
238+
reproduce_file_path = get_reproduce_file(worker_name)
239+
color_stdout('# reproduce file: %s\n' % reproduce_file_path)
240+
if show_reproduce_content:
241+
color_stdout("---\n", schema='separator')
242+
print_tail_n(reproduce_file_path)
243+
color_stdout("...\n", schema='separator')
244+
245+
return True
246+
247+
181248
class OutputWatcher(BaseWatcher):
182249
def __init__(self):
183250
self.buffer = dict()

test-run.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@ def main_loop_parallel():
119119
dispatcher.wait()
120120
dispatcher.wait_processes()
121121
color_stdout('-' * 81, "\n", schema='separator')
122+
rss_monitor = dispatcher.rss_monitor.print_statistics()
122123
has_failed = dispatcher.statistics.print_statistics()
123124
has_undone = dispatcher.report_undone(
124125
verbose=bool(is_force or not has_failed))

0 commit comments

Comments
 (0)