Skip to content

Commit a4347e8

Browse files
Totktonadaavtikhon
authored andcommitted
sampler: add simple sampling infrastructure
Track tarantool and unit test executables that are run using test-run with metainformation: worker, test, test configuration and server name. Add a function that will be called each 0.1 second for each tracked process. The implementation tracks non-default servers and re-register default servers that executes several tests ('core = tarantool' case). Part of #277
1 parent e698240 commit a4347e8

File tree

5 files changed

+180
-3
lines changed

5 files changed

+180
-3
lines changed

dispatcher.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
from multiprocessing.queues import SimpleQueue
3535

3636
from lib import Options
37+
from lib.sampler import sampler
3738
from lib.utils import set_fd_cloexec
3839
from lib.worker import WorkerTaskResult, WorkerDone
3940
from lib.colorer import color_stdout
@@ -166,7 +167,8 @@ def init_listeners(self):
166167
self.statistics = StatisticsWatcher(log_output_watcher.get_logfile)
167168
self.artifacts = ArtifactsWatcher(log_output_watcher.get_logfile)
168169
output_watcher = OutputWatcher()
169-
self.listeners = [self.statistics, log_output_watcher, output_watcher, self.artifacts]
170+
self.listeners = [self.statistics, log_output_watcher, output_watcher, self.artifacts,
171+
sampler.watcher]
170172
if watch_fail:
171173
self.fail_watcher = FailWatcher(self.terminate_all_workers)
172174
self.listeners.append(self.fail_watcher)
@@ -416,6 +418,7 @@ def _run_worker(self, worker_id, tcp_port_range):
416418
os.environ['TEST_RUN_TCP_PORT_END'] = str(tcp_port_range[1])
417419
color_stdout.queue = self.result_queue
418420
worker = self.gen_worker(worker_id)
421+
sampler.set_queue(self.result_queue, worker_id, worker.name)
419422
worker.run_all(self.task_queue, self.result_queue)
420423

421424
def add_worker(self, worker_id, tcp_port_range):

lib/app_server.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
from lib.colorer import qa_notice
1313
from lib.options import Options
1414
from lib.preprocessor import TestState
15+
from lib.sampler import sampler
1516
from lib.server import Server
1617
from lib.server import DEFAULT_SNAPSHOT_NAME
1718
from lib.tarantool_server import Test
@@ -30,9 +31,10 @@ def timeout_handler(server_process, test_timeout):
3031
server_process.kill()
3132

3233

33-
def run_server(execs, cwd, server, logfile, retval):
34+
def run_server(execs, cwd, server, logfile, retval, test_id):
3435
os.putenv("LISTEN", server.iproto)
3536
server.process = Popen(execs, stdout=PIPE, stderr=PIPE, cwd=cwd)
37+
sampler.register_process(server.process.pid, test_id, server.name)
3638
test_timeout = Options().args.test_timeout
3739
timer = Timer(test_timeout, timeout_handler, (server.process, test_timeout))
3840
timer.start()
@@ -56,7 +58,7 @@ def execute(self, server):
5658
execs = server.prepare_args()
5759
retval = dict()
5860
tarantool = TestRunGreenlet(run_server, execs, server.vardir, server,
59-
server.logfile, retval)
61+
server.logfile, retval, self.id)
6062
self.current_test_greenlet = tarantool
6163

6264
# Copy the snapshot right before starting the server.

lib/sampler.py

Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
import os
2+
import sys
3+
import time
4+
5+
from lib.colorer import color_log
6+
from lib.colorer import qa_notice
7+
from lib.utils import format_process
8+
9+
10+
if sys.version_info[0] == 2:
11+
ProcessLookupError = OSError
12+
13+
14+
# Don't inherit BaseWorkerMessage to bypass cyclic import.
15+
class RegisterProcessMessage(object):
16+
"""Ask the sampler in the main test-run process to register
17+
given process.
18+
"""
19+
def __init__(self, worker_id, worker_name, pid, task_id, server_name):
20+
self.worker_id = worker_id
21+
self.worker_name = worker_name
22+
self.pid = pid
23+
self.task_id = task_id
24+
self.server_name = server_name
25+
26+
27+
# Don't inherit BaseWatcher to bypass cyclic import.
28+
class SamplerWatcher(object):
29+
def __init__(self, sampler):
30+
self._sampler = sampler
31+
self._last_sample = 0
32+
self._sample_interval = 0.1 # seconds
33+
self._warn_interval = self._sample_interval * 4
34+
35+
def process_result(self, obj):
36+
if isinstance(obj, RegisterProcessMessage):
37+
self._sampler.register_process(
38+
obj.pid, obj.task_id, obj.server_name, obj.worker_id,
39+
obj.worker_name)
40+
self._wakeup()
41+
42+
def process_timeout(self, delta_seconds):
43+
self._wakeup()
44+
45+
def _wakeup(self):
46+
"""Invoke Sampler.sample() if enough time elapsed since
47+
the previous call.
48+
"""
49+
now = time.time()
50+
delta = now - self._last_sample
51+
if self._last_sample > 0 and delta > self._warn_interval:
52+
template = 'Low sampling resolution. The expected interval\n' + \
53+
'is {:.2f} seconds ({:.2f} seconds without warnings),\n' + \
54+
'but the last sample was collected {:.2f} seconds ago.'
55+
qa_notice(template.format(self._sample_interval, self._warn_interval,
56+
delta))
57+
if delta > self._sample_interval:
58+
self._sampler._sample()
59+
self._last_sample = now
60+
61+
62+
class Sampler:
63+
def __init__(self):
64+
# The instance is created in the test-run main process.
65+
66+
# Field for an instance in a worker.
67+
self._worker_id = None
68+
self._worker_name = None
69+
self._queue = None
70+
71+
# Field for an instance in the main process.
72+
self._watcher = SamplerWatcher(self)
73+
74+
self.processes = dict()
75+
76+
def set_queue(self, queue, worker_id, worker_name):
77+
# Called from a worker process (_run_worker()).
78+
self._worker_id = worker_id
79+
self._worker_name = worker_name
80+
self._queue = queue
81+
self._watcher = None
82+
83+
@property
84+
def watcher(self):
85+
if not self._watcher:
86+
raise RuntimeError('sampler: watcher is available only in the ' +
87+
'main test-run process')
88+
return self._watcher
89+
90+
def register_process(self, pid, task_id, server_name, worker_id=None,
91+
worker_name=None):
92+
"""Register a process to sampling.
93+
94+
Call it without worker_* arguments from a worker
95+
process.
96+
"""
97+
if not self._queue:
98+
# In main test-run process.
99+
self.processes[pid] = {
100+
'task_id': task_id,
101+
'server_name': server_name,
102+
'worker_id': worker_id,
103+
'worker_name': worker_name,
104+
}
105+
self._log('register', pid)
106+
return
107+
108+
# Pass to the main test-run process.
109+
self._queue.put(RegisterProcessMessage(
110+
self._worker_id, self._worker_name, pid, task_id, server_name))
111+
112+
def unregister_process(self, pid):
113+
if self._queue:
114+
raise NotImplementedError('sampler: a process unregistration ' +
115+
'from a test-run worker is not ' +
116+
'implemented yet')
117+
if pid not in self.processes:
118+
return
119+
120+
self._log('unregister', pid)
121+
del self.processes[pid]
122+
123+
def _log(self, event, pid):
124+
# Those logs are not written due to gh-247.
125+
process_def = self.processes[pid]
126+
task_id = process_def['task_id']
127+
test_name = task_id[0] + ((':' + task_id[1]) if task_id[1] else '')
128+
worker_name = process_def['worker_name']
129+
server_name = process_def['server_name']
130+
color_log('DEBUG: sampler: {} {}\n'.format(
131+
event, format_process(pid)), schema='info')
132+
color_log(' | worker: {}\n'.format(worker_name))
133+
color_log(' | test: {}\n'.format(test_name))
134+
color_log(' | server: {}\n'.format(str(server_name)))
135+
136+
def _sample(self):
137+
for pid in list(self.processes.keys()):
138+
# Unregister processes that're gone.
139+
# Assume that PIDs are rarely reused.
140+
try:
141+
os.kill(pid, 0)
142+
except ProcessLookupError:
143+
self.unregister_process(pid)
144+
else:
145+
self._sample_process(pid)
146+
147+
def _sample_process(self, pid):
148+
# Your sampling code here.
149+
pass
150+
151+
152+
# The 'singleton' sampler instance: created in the main test-run
153+
# process, but then work differently in the main process and
154+
# workers.
155+
sampler = Sampler()

lib/tarantool_server.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
from lib.colorer import qa_notice
3333
from lib.options import Options
3434
from lib.preprocessor import TestState
35+
from lib.sampler import sampler
3536
from lib.server import Server
3637
from lib.server import DEFAULT_SNAPSHOT_NAME
3738
from lib.test import Test
@@ -332,6 +333,10 @@ def exec_loop(self, ts):
332333

333334
def execute(self, server):
334335
super(LuaTest, self).execute(server)
336+
337+
# Track the same process metrics as part of another test.
338+
sampler.register_process(server.process.pid, self.id, server.name)
339+
335340
cls_name = server.__class__.__name__.lower()
336341
if 'gdb' in cls_name or 'lldb' in cls_name or 'strace' in cls_name:
337342
# don't propagate gdb/lldb/strace mixin to non-default servers,
@@ -399,6 +404,10 @@ class PythonTest(Test):
399404

400405
def execute(self, server):
401406
super(PythonTest, self).execute(server)
407+
408+
# Track the same process metrics as part of another test.
409+
sampler.register_process(server.process.pid, self.id, server.name)
410+
402411
new_globals = dict(locals(), test_run_current_test=self, **server.__dict__)
403412
with open(self.name) as f:
404413
code = compile(f.read(), self.name, 'exec')
@@ -873,6 +882,12 @@ def start(self, silent=True, wait=True, wait_load=True, rais=True, args=[],
873882
# Restore the actual PWD value.
874883
os.environ['PWD'] = os.getcwd()
875884

885+
# Track non-default server metrics as part of current
886+
# test.
887+
if self.current_test:
888+
sampler.register_process(self.process.pid, self.current_test.id,
889+
self.name)
890+
876891
# gh-19 crash detection
877892
self.crash_detector = TestRunGreenlet(self.crash_detect)
878893
self.crash_detector.info = "Crash detector: %s" % self.process

lib/unittest_server.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import glob
44
from subprocess import Popen, PIPE, STDOUT
55

6+
from lib.sampler import sampler
67
from lib.server import Server
78
from lib.tarantool_server import Test
89

@@ -16,6 +17,7 @@ def execute(self, server):
1617
server.current_test = self
1718
execs = server.prepare_args()
1819
proc = Popen(execs, cwd=server.vardir, stdout=PIPE, stderr=STDOUT)
20+
sampler.register_process(proc.pid, self.id, server.name)
1921
sys.stdout.write_bytes(proc.communicate()[0])
2022

2123

0 commit comments

Comments
 (0)