Skip to content

Commit d1dfad3

Browse files
authored
Implement work-stealing scheduler (#862)
Closes #858
1 parent 9b0b5b1 commit d1dfad3

File tree

10 files changed

+638
-26
lines changed

10 files changed

+638
-26
lines changed

Diff for: changelog/858.feature

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
New ``worksteal`` scheduler, based on the idea of `work stealing <https://en.wikipedia.org/wiki/Work_stealing>`_. It's similar to ``load`` scheduler, but it should handle tests with significantly differing duration better, and, at the same time, it should provide similar or better reuse of fixtures.

Diff for: docs/distribution.rst

+9
Original file line numberDiff line numberDiff line change
@@ -82,4 +82,13 @@ The test distribution algorithm is configured with the ``--dist`` command-line o
8282
This will make sure ``test1`` and ``TestA::test2`` will run in the same worker.
8383
Tests without the ``xdist_group`` mark are distributed normally as in the ``--dist=load`` mode.
8484

85+
* ``--dist worksteal``: Initially, tests are distributed evenly among all
86+
available workers. When a worker completes most of its assigned tests and
87+
doesn't have enough tests to continue (currently, every worker needs at least
88+
two tests in its queue), an attempt is made to reassign ("steal") a portion
89+
of tests from some other worker's queue. The results should be similar to
90+
the ``load`` method, but ``worksteal`` should handle tests with significantly
91+
differing duration better, and, at the same time, it should provide similar
92+
or better reuse of fixtures.
93+
8594
* ``--dist no``: The normal pytest execution mode, runs one test at a time (no distribution at all).

Diff for: src/xdist/dsession.py

+13
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
LoadScopeScheduling,
99
LoadFileScheduling,
1010
LoadGroupScheduling,
11+
WorkStealingScheduling,
1112
)
1213

1314

@@ -100,6 +101,7 @@ def pytest_xdist_make_scheduler(self, config, log):
100101
"loadscope": LoadScopeScheduling,
101102
"loadfile": LoadFileScheduling,
102103
"loadgroup": LoadGroupScheduling,
104+
"worksteal": WorkStealingScheduling,
103105
}
104106
return schedulers[dist](config, log)
105107

@@ -282,6 +284,17 @@ def worker_runtest_protocol_complete(self, node, item_index, duration):
282284
"""
283285
self.sched.mark_test_complete(node, item_index, duration)
284286

287+
def worker_unscheduled(self, node, indices):
288+
"""
289+
Emitted when a node fires the 'unscheduled' event, signalling that
290+
some tests have been removed from the worker's queue and should be
291+
sent to some worker again.
292+
293+
This should happen only in response to 'steal' command, so schedulers
294+
not using 'steal' command don't have to implement it.
295+
"""
296+
self.sched.remove_pending_tests_from_node(node, indices)
297+
285298
def worker_collectreport(self, node, rep):
286299
"""Emitted when a node calls the pytest_collectreport hook.
287300

Diff for: src/xdist/plugin.py

+11-1
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,15 @@ def pytest_addoption(parser):
9494
"--dist",
9595
metavar="distmode",
9696
action="store",
97-
choices=["each", "load", "loadscope", "loadfile", "loadgroup", "no"],
97+
choices=[
98+
"each",
99+
"load",
100+
"loadscope",
101+
"loadfile",
102+
"loadgroup",
103+
"worksteal",
104+
"no",
105+
],
98106
dest="dist",
99107
default="no",
100108
help=(
@@ -107,6 +115,8 @@ def pytest_addoption(parser):
107115
"loadfile: load balance by sending test grouped by file"
108116
" to any available environment.\n\n"
109117
"loadgroup: like load, but sends tests marked with 'xdist_group' to the same worker.\n\n"
118+
"worksteal: split the test suite between available environments,"
119+
" then rebalance when any worker runs out of tests.\n\n"
110120
"(default) no: run tests inprocess, don't distribute."
111121
),
112122
)

Diff for: src/xdist/remote.py

+54-24
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
needs not to be installed in remote environments.
77
"""
88

9+
import contextlib
910
import sys
1011
import os
1112
import time
@@ -56,14 +57,21 @@ def worker_title(title):
5657

5758

5859
class WorkerInteractor:
60+
SHUTDOWN_MARK = object()
61+
5962
def __init__(self, config, channel):
6063
self.config = config
6164
self.workerid = config.workerinput.get("workerid", "?")
6265
self.testrunuid = config.workerinput["testrunuid"]
6366
self.log = Producer(f"worker-{self.workerid}", enabled=config.option.debug)
6467
self.channel = channel
68+
self.torun = self._make_queue()
69+
self.nextitem_index = None
6570
config.pluginmanager.register(self)
6671

72+
def _make_queue(self):
73+
return self.channel.gateway.execmodel.queue.Queue()
74+
6775
def sendevent(self, name, **kwargs):
6876
self.log("sending", name, kwargs)
6977
self.channel.send((name, kwargs))
@@ -92,38 +100,60 @@ def pytest_sessionfinish(self, exitstatus):
92100
def pytest_collection(self, session):
93101
self.sendevent("collectionstart")
94102

103+
def handle_command(self, command):
104+
if command is self.SHUTDOWN_MARK:
105+
self.torun.put(self.SHUTDOWN_MARK)
106+
return
107+
108+
name, kwargs = command
109+
110+
self.log("received command", name, kwargs)
111+
if name == "runtests":
112+
for i in kwargs["indices"]:
113+
self.torun.put(i)
114+
elif name == "runtests_all":
115+
for i in range(len(self.session.items)):
116+
self.torun.put(i)
117+
elif name == "shutdown":
118+
self.torun.put(self.SHUTDOWN_MARK)
119+
elif name == "steal":
120+
self.steal(kwargs["indices"])
121+
122+
def steal(self, indices):
123+
indices = set(indices)
124+
stolen = []
125+
126+
old_queue, self.torun = self.torun, self._make_queue()
127+
128+
def old_queue_get_nowait_noraise():
129+
with contextlib.suppress(self.channel.gateway.execmodel.queue.Empty):
130+
return old_queue.get_nowait()
131+
132+
for i in iter(old_queue_get_nowait_noraise, None):
133+
if i in indices:
134+
stolen.append(i)
135+
else:
136+
self.torun.put(i)
137+
138+
self.sendevent("unscheduled", indices=stolen)
139+
95140
@pytest.hookimpl
96141
def pytest_runtestloop(self, session):
97142
self.log("entering main loop")
98-
torun = []
99-
while 1:
100-
try:
101-
name, kwargs = self.channel.receive()
102-
except EOFError:
103-
return True
104-
self.log("received command", name, kwargs)
105-
if name == "runtests":
106-
torun.extend(kwargs["indices"])
107-
elif name == "runtests_all":
108-
torun.extend(range(len(session.items)))
109-
self.log("items to run:", torun)
110-
# only run if we have an item and a next item
111-
while len(torun) >= 2:
112-
self.run_one_test(torun)
113-
if name == "shutdown":
114-
if torun:
115-
self.run_one_test(torun)
116-
break
143+
self.channel.setcallback(self.handle_command, endmarker=self.SHUTDOWN_MARK)
144+
self.nextitem_index = self.torun.get()
145+
while self.nextitem_index is not self.SHUTDOWN_MARK:
146+
self.run_one_test()
117147
return True
118148

119-
def run_one_test(self, torun):
149+
def run_one_test(self):
120150
items = self.session.items
121-
self.item_index = torun.pop(0)
151+
self.item_index, self.nextitem_index = self.nextitem_index, self.torun.get()
122152
item = items[self.item_index]
123-
if torun:
124-
nextitem = items[torun[0]]
125-
else:
153+
if self.nextitem_index is self.SHUTDOWN_MARK:
126154
nextitem = None
155+
else:
156+
nextitem = items[self.nextitem_index]
127157

128158
worker_title("[pytest-xdist running] %s" % item.nodeid)
129159

Diff for: src/xdist/scheduler/__init__.py

+1
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,4 @@
33
from xdist.scheduler.loadfile import LoadFileScheduling # noqa
44
from xdist.scheduler.loadscope import LoadScopeScheduling # noqa
55
from xdist.scheduler.loadgroup import LoadGroupScheduling # noqa
6+
from xdist.scheduler.worksteal import WorkStealingScheduling # noqa

0 commit comments

Comments
 (0)