Skip to content

Commit 8028058

Browse files
committed
Add RSS memory status collecting routine
Added RSS memory status collecting routine which parses file: /sys/fs/cgroup/memory/memory.stat Closes tarantool/tarantool-qa#98
1 parent e698240 commit 8028058

File tree

3 files changed

+58
-2
lines changed

3 files changed

+58
-2
lines changed

dispatcher.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
from listeners import FailWatcher
4242
from listeners import HangWatcher
4343
from listeners import LogOutputWatcher
44+
from listeners import RSSMonitor
4445
from listeners import OutputWatcher
4546
from listeners import StatisticsWatcher
4647

@@ -121,10 +122,11 @@ def __init__(self, task_groups, max_workers_cnt, randomize):
121122
self.result_queues.append(task_queue_disp.result_queue)
122123
self.task_queues.append(task_queue_disp.task_queue)
123124

124-
self.report_timeout = 1.0
125+
self.report_timeout = 0.1
125126

126127
self.statistics = None
127128
self.artifacts = None
129+
self.rss_monitor = None
128130
self.fail_watcher = None
129131
self.listeners = None
130132
self.init_listeners()
@@ -166,7 +168,8 @@ def init_listeners(self):
166168
self.statistics = StatisticsWatcher(log_output_watcher.get_logfile)
167169
self.artifacts = ArtifactsWatcher(log_output_watcher.get_logfile)
168170
output_watcher = OutputWatcher()
169-
self.listeners = [self.statistics, log_output_watcher, output_watcher, self.artifacts]
171+
self.rss_monitor = RSSMonitor(output_watcher.not_done_worker_ids)
172+
self.listeners = [self.statistics, log_output_watcher, output_watcher, self.artifacts, self.rss_monitor]
170173
if watch_fail:
171174
self.fail_watcher = FailWatcher(self.terminate_all_workers)
172175
self.listeners.append(self.fail_watcher)

lib/utils.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import os
2+
import re
23
import sys
34
import collections
45
import signal
@@ -233,6 +234,19 @@ def format_process(pid):
233234
return 'process %d [%s; %s]' % (pid, status, cmdline)
234235

235236

237+
def get_proc_stat_rss():
238+
rss = 'unknown'
239+
pat = re.compile(r"^total_rss .*")
240+
try:
241+
with open('/sys/fs/cgroup/memory/memory.stat', 'r') as f:
242+
for line in f:
243+
if pat.match(line) is not None:
244+
rss = int(line.split(' ')[1])
245+
except (OSError, IOError):
246+
pass
247+
return rss
248+
249+
236250
def set_fd_cloexec(socket):
237251
flags = fcntl.fcntl(socket, fcntl.F_GETFD)
238252
fcntl.fcntl(socket, fcntl.F_SETFD, flags | fcntl.FD_CLOEXEC)

listeners.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import sys
33
import yaml
44
import shutil
5+
import time
56

67
from lib import Options
78
from lib.colorer import color_stdout
@@ -15,6 +16,7 @@
1516
from lib.utils import safe_makedirs
1617
from lib.utils import print_tail_n
1718
from lib.utils import print_unidiff
19+
from lib.utils import get_proc_stat_rss
1820

1921

2022
class BaseWatcher(object):
@@ -178,6 +180,43 @@ def __del__(self):
178180
pass
179181

180182

183+
class RSSMonitor(BaseWatcher):
184+
def __init__(self, get_not_done_worker_ids):
185+
self.get_not_done_worker_ids = get_not_done_worker_ids
186+
self.rss_results = []
187+
self.activity = 0.0
188+
self.worker_current_task = dict()
189+
190+
def process_result(self, obj):
191+
if isinstance(obj, WorkerCurrentTask):
192+
self.worker_current_task[obj.worker_id] = obj
193+
194+
if not isinstance(obj, WorkerCurrentTask):
195+
return
196+
197+
print("ALX process_result ================ {}\n" . format(obj.task_name))
198+
self.rss_results.append((obj.task_name,
199+
get_proc_stat_rss()))
200+
201+
def process_timeout(self, delta_seconds):
202+
rss_new = []
203+
self.activity += delta_seconds
204+
worker_ids = self.get_not_done_worker_ids()
205+
206+
running_tasks = [task for worker_id, task
207+
in self.worker_current_task.items()
208+
if worker_id in worker_ids]
209+
rss = get_proc_stat_rss()
210+
for task_name, rss_old in self.rss_results:
211+
for task in running_tasks:
212+
if task_name == task.task_name:
213+
if rss_old > rss:
214+
rss = rss_old
215+
print("ALX process_timeout ============ {}: '{}' {}\n" . format(round(self.activity, 1), task_name, rss))
216+
rss_new.append((task_name, rss))
217+
self.rss_results = rss_new
218+
219+
181220
class OutputWatcher(BaseWatcher):
182221
def __init__(self):
183222
self.buffer = dict()

0 commit comments

Comments
 (0)