Skip to content

Commit 09d79ac

Browse files
author
Steven Hazel
committed
Do a better job parallelizing the inital batch of tests when the
number of nodes is more than half the number of tests. This makes it possible, for example, to run all tests in parallel, where previous the maximum parallelization was half of all tests.
1 parent 14f39a7 commit 09d79ac

File tree

3 files changed

+80
-20
lines changed

3 files changed

+80
-20
lines changed

testing/test_dsession.py

+55-13
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ class MockNode:
2727
def __init__(self):
2828
self.sent = []
2929
self.gateway = MockGateway()
30+
self._shutdown = False
3031

3132
def send_runtest_some(self, indices):
3233
self.sent.extend(indices)
@@ -37,6 +38,10 @@ def send_runtest_all(self):
3738
def shutdown(self):
3839
self._shutdown = True
3940

41+
@property
42+
def shutting_down(self):
43+
return self._shutdown
44+
4045

4146
def dumpqueue(queue):
4247
while queue.qsize():
@@ -100,16 +105,15 @@ def test_schedule_load_simple(self):
100105
assert sched.node2collection[node2] == collection
101106
sched.init_distribute()
102107
assert not sched.pending
103-
assert not sched.tests_finished()
104-
assert len(node1.sent) == 2
105-
assert len(node2.sent) == 0
106-
assert node1.sent == [0, 1]
107-
sched.remove_item(node1, node1.sent[0])
108108
assert sched.tests_finished()
109-
sched.remove_item(node1, node1.sent[1])
109+
assert len(node1.sent) == 1
110+
assert len(node2.sent) == 1
111+
assert node1.sent == [0]
112+
assert node2.sent == [1]
113+
sched.remove_item(node1, node1.sent[0])
110114
assert sched.tests_finished()
111115

112-
def test_init_distribute_chunksize(self):
116+
def test_init_distribute_batch_size(self):
113117
sched = LoadScheduling(2)
114118
sched.addnode(MockNode())
115119
sched.addnode(MockNode())
@@ -121,18 +125,56 @@ def test_init_distribute_chunksize(self):
121125
# assert not sched.tests_finished()
122126
sent1 = node1.sent
123127
sent2 = node2.sent
124-
assert sent1 == [0, 1]
125-
assert sent2 == [2, 3]
128+
assert sent1 == [0, 2]
129+
assert sent2 == [1, 3]
126130
assert sched.pending == [4, 5]
127131
assert sched.node2pending[node1] == sent1
128132
assert sched.node2pending[node2] == sent2
129133
assert len(sched.pending) == 2
130134
sched.remove_item(node1, 0)
131-
assert node1.sent == [0, 1, 4]
135+
assert node1.sent == [0, 2, 4]
132136
assert sched.pending == [5]
133-
assert node2.sent == [2, 3]
134-
sched.remove_item(node1, 1)
135-
assert node1.sent == [0, 1, 4, 5]
137+
assert node2.sent == [1, 3]
138+
sched.remove_item(node1, 2)
139+
assert node1.sent == [0, 2, 4, 5]
140+
assert not sched.pending
141+
142+
def test_init_distribute_fewer_tests_than_nodes(self):
143+
sched = LoadScheduling(2)
144+
sched.addnode(MockNode())
145+
sched.addnode(MockNode())
146+
sched.addnode(MockNode())
147+
node1, node2, node3 = sched.nodes
148+
col = ["xyz"] * 2
149+
sched.addnode_collection(node1, col)
150+
sched.addnode_collection(node2, col)
151+
sched.init_distribute()
152+
# assert not sched.tests_finished()
153+
sent1 = node1.sent
154+
sent2 = node2.sent
155+
sent3 = node3.sent
156+
assert sent1 == [0]
157+
assert sent2 == [1]
158+
assert sent3 == []
159+
assert not sched.pending
160+
161+
def test_init_distribute_fewer_than_two_tests_per_node(self):
162+
sched = LoadScheduling(2)
163+
sched.addnode(MockNode())
164+
sched.addnode(MockNode())
165+
sched.addnode(MockNode())
166+
node1, node2, node3 = sched.nodes
167+
col = ["xyz"] * 5
168+
sched.addnode_collection(node1, col)
169+
sched.addnode_collection(node2, col)
170+
sched.init_distribute()
171+
# assert not sched.tests_finished()
172+
sent1 = node1.sent
173+
sent2 = node2.sent
174+
sent3 = node3.sent
175+
assert sent1 == [0, 3]
176+
assert sent2 == [1, 4]
177+
assert sent3 == [2]
136178
assert not sched.pending
137179

138180
def test_add_remove_node(self):

xdist/dsession.py

+19-7
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import difflib
2+
import itertools
23
from _pytest.runner import CollectReport
34

45
import pytest
@@ -289,6 +290,9 @@ def check_schedule(self, node, duration=0):
289290
``duration`` of the last test is optionally used as a
290291
heuristic to influence how many tests the node is assigned.
291292
"""
293+
if node.shutting_down:
294+
return
295+
292296
if self.pending:
293297
# how many nodes do we have?
294298
num_nodes = len(self.node2pending)
@@ -363,13 +367,21 @@ def init_distribute(self):
363367
if not self.collection:
364368
return
365369

366-
# how many items per node do we have about?
367-
items_per_node = len(self.collection) // len(self.node2pending)
368-
# take a fraction of tests for initial distribution
369-
node_chunksize = max(items_per_node // 4, 2)
370-
# and initialize each node with a chunk of tests
371-
for node in self.nodes:
372-
self._send_tests(node, node_chunksize)
370+
# Send a batch of tests to run. If we don't have at least two
371+
# tests per node, we have to send them all so that we can send
372+
# shutdown signals and get all nodes working.
373+
initial_batch = max(len(self.pending) // 4,
374+
2 * len(self.nodes))
375+
376+
# distribute tests round-robin up to the batch size (or until we run out)
377+
nodes = itertools.cycle(self.nodes)
378+
for i in xrange(initial_batch):
379+
self._send_tests(nodes.next(), 1)
380+
381+
if not self.pending:
382+
# initial distribution sent all tests, start node shutdown
383+
for node in self.nodes:
384+
node.shutdown()
373385

374386
def _send_tests(self, node, num):
375387
tests_per_node = self.pending[:num]

xdist/slavemanage.py

+6
Original file line numberDiff line numberDiff line change
@@ -207,13 +207,18 @@ def __init__(self, nodemanager, gateway, config, putevent):
207207
self.config = config
208208
self.slaveinput = {'slaveid': gateway.id}
209209
self._down = False
210+
self._shutdown_sent = False
210211
self.log = py.log.Producer("slavectl-%s" % gateway.id)
211212
if not self.config.option.debug:
212213
py.log.setconsumer(self.log._keywords, None)
213214

214215
def __repr__(self):
215216
return "<%s %s>" % (self.__class__.__name__, self.gateway.id,)
216217

218+
@property
219+
def shutting_down(self):
220+
return self._down or self._shutdown_sent
221+
217222
def setup(self):
218223
self.log("setting up slave session")
219224
spec = self.gateway.spec
@@ -256,6 +261,7 @@ def shutdown(self):
256261
self.sendcommand("shutdown")
257262
except IOError:
258263
pass
264+
self._shutdown_sent = True
259265

260266
def sendcommand(self, name, **kwargs):
261267
""" send a named parametrized command to the other side. """

0 commit comments

Comments
 (0)