Skip to content

Commit 7a0b79e

Browse files
committed
task: improved naming of task types, improved pool test to be less dependent on starting with just the main thread
1 parent 1d8a577 commit 7a0b79e

File tree

4 files changed

+42
-29
lines changed

4 files changed

+42
-29
lines changed

lib/git/async/task.py

+16-10
Original file line numberDiff line numberDiff line change
@@ -8,21 +8,27 @@
88
import sys
99
import new
1010

11-
__all__ = ('OutputChannelTask', 'ThreadTaskBase', 'InputIteratorTaskBase',
12-
'InputIteratorThreadTask', 'InputChannelTask')
11+
__all__ = ('Task', 'ThreadTaskBase', 'IteratorTaskBase',
12+
'IteratorThreadTask', 'ChannelThreadTask')
1313

14-
class OutputChannelTask(Node):
14+
class Task(Node):
1515
"""Abstracts a named task, which contains
1616
additional information on how the task should be queued and processed.
1717
18-
Results of the item processing are sent to a write channel, which is to be
18+
Results of the item processing are sent to a writer, which is to be
1919
set by the creator using the ``set_writer`` method.
2020
21+
Items are read using the internal ``_read`` callable, subclasses are meant to
22+
set this to a callable that supports the Reader interface's read function.
23+
2124
* **min_count** assures that not less than min_count items will be processed per call.
2225
* **max_chunksize** assures that multi-threading is happening in smaller chunks. If
2326
someone wants all items to be processed, using read(0), the whole task would go to
2427
one worker, as well as dependent tasks. If you want finer granularity , you can
25-
specify this here, causing chunks to be no larger than max_chunksize"""
28+
specify this here, causing chunks to be no larger than max_chunksize
29+
* **apply_single** if True, default True, individual items will be given to the
30+
worker function. If False, a list of possibly multiple items will be passed
31+
instead."""
2632
__slots__ = ( '_read', # method to yield items to process
2733
'_out_writer', # output write channel
2834
'_exc', # exception caught
@@ -178,32 +184,32 @@ class ThreadTaskBase(object):
178184
pass
179185

180186

181-
class InputIteratorTaskBase(OutputChannelTask):
187+
class IteratorTaskBase(Task):
182188
"""Implements a task which processes items from an iterable in a multi-processing
183189
safe manner"""
184190
__slots__ = tuple()
185191

186192

187193
def __init__(self, iterator, *args, **kwargs):
188-
OutputChannelTask.__init__(self, *args, **kwargs)
194+
Task.__init__(self, *args, **kwargs)
189195
self._read = IteratorReader(iterator).read
190196
# defaults to returning our items unchanged
191197
self.fun = lambda item: item
192198

193199

194-
class InputIteratorThreadTask(InputIteratorTaskBase, ThreadTaskBase):
200+
class IteratorThreadTask(IteratorTaskBase, ThreadTaskBase):
195201
"""An input iterator for threaded pools"""
196202
lock_type = threading.Lock
197203

198204

199-
class InputChannelTask(OutputChannelTask, ThreadTaskBase):
205+
class ChannelThreadTask(Task, ThreadTaskBase):
200206
"""Uses an input channel as source for reading items
201207
For instantiation, it takes all arguments of its base, the first one needs
202208
to be the input channel to read from though."""
203209
__slots__ = "_pool_ref"
204210

205211
def __init__(self, in_reader, *args, **kwargs):
206-
OutputChannelTask.__init__(self, *args, **kwargs)
212+
Task.__init__(self, *args, **kwargs)
207213
self._read = in_reader.read
208214
self._pool_ref = None
209215

test/git/async/task.py

+13-13
Original file line numberDiff line numberDiff line change
@@ -51,18 +51,18 @@ def _assert(self, pc, fc, check_scheduled=False):
5151
return self
5252

5353

54-
class TestThreadTaskNode(_TestTaskBase, InputIteratorThreadTask):
54+
class TestThreadTask(_TestTaskBase, IteratorThreadTask):
5555
pass
5656

5757

58-
class TestThreadFailureNode(TestThreadTaskNode):
58+
class TestFailureThreadTask(TestThreadTask):
5959
"""Fails after X items"""
6060
def __init__(self, *args, **kwargs):
6161
self.fail_after = kwargs.pop('fail_after')
62-
super(TestThreadFailureNode, self).__init__(*args, **kwargs)
62+
super(TestFailureThreadTask, self).__init__(*args, **kwargs)
6363

6464
def do_fun(self, item):
65-
item = TestThreadTaskNode.do_fun(self, item)
65+
item = TestThreadTask.do_fun(self, item)
6666

6767
self.lock.acquire()
6868
try:
@@ -74,15 +74,15 @@ def do_fun(self, item):
7474
return item
7575

7676

77-
class TestThreadInputChannelTaskNode(_TestTaskBase, InputChannelTask):
77+
class TestChannelThreadTask(_TestTaskBase, ChannelThreadTask):
7878
"""Apply a transformation on items read from an input channel"""
7979
def __init__(self, *args, **kwargs):
8080
self.fail_after = kwargs.pop('fail_after', 0)
81-
super(TestThreadInputChannelTaskNode, self).__init__(*args, **kwargs)
81+
super(TestChannelThreadTask, self).__init__(*args, **kwargs)
8282

8383
def do_fun(self, item):
8484
"""return tuple(i, i*2)"""
85-
item = super(TestThreadInputChannelTaskNode, self).do_fun(item)
85+
item = super(TestChannelThreadTask, self).do_fun(item)
8686

8787
# fail after support
8888
if self.fail_after:
@@ -102,22 +102,22 @@ def do_fun(self, item):
102102
# END handle tuple
103103

104104

105-
class TestThreadPerformanceTaskNode(InputChannelTask):
105+
class TestPerformanceThreadTask(ChannelThreadTask):
106106
"""Applies no operation to the item, and does not lock, measuring
107107
the actual throughput of the system"""
108108

109109
def do_fun(self, item):
110110
return item
111111

112112

113-
class TestThreadInputChannelVerifyTaskNode(_TestTaskBase, InputChannelTask):
113+
class TestVerifyChannelThreadTask(_TestTaskBase, ChannelThreadTask):
114114
"""An input channel task, which verifies the result of its input channels,
115115
should be last in the chain.
116116
Id must be int"""
117117

118118
def do_fun(self, item):
119119
"""return tuple(i, i*2)"""
120-
item = super(TestThreadInputChannelVerifyTaskNode, self).do_fun(item)
120+
item = super(TestVerifyChannelThreadTask, self).do_fun(item)
121121

122122
# make sure the computation order matches
123123
assert isinstance(item, tuple), "input was no tuple: %s" % item
@@ -137,7 +137,7 @@ def make_proxy_method(t):
137137
return lambda item: wt.do_fun(item)
138138

139139
def add_task_chain(p, ni, count=1, fail_setup=list(), feeder_channel=None, id_offset=0,
140-
feedercls=TestThreadTaskNode, transformercls=TestThreadInputChannelTaskNode,
140+
feedercls=TestThreadTask, transformercls=TestChannelThreadTask,
141141
include_verifier=True):
142142
"""Create a task chain of feeder, count transformers and order verifcator
143143
to the pool p, like t1 -> t2 -> t3
@@ -179,7 +179,7 @@ def add_task_chain(p, ni, count=1, fail_setup=list(), feeder_channel=None, id_of
179179
# END setup failure
180180

181181
if include_verifier:
182-
verifier = TestThreadInputChannelVerifyTaskNode(inrc, 'verifier', None)
182+
verifier = TestVerifyChannelThreadTask(inrc, 'verifier', None)
183183
#verifier.fun = verifier.do_fun
184184
verifier.fun = make_proxy_method(verifier)
185185
vrc = p.add_task(verifier)
@@ -190,7 +190,7 @@ def add_task_chain(p, ni, count=1, fail_setup=list(), feeder_channel=None, id_of
190190
# END handle include verifier
191191
return tasks, rcs
192192

193-
def make_iterator_task(ni, taskcls=TestThreadTaskNode, **kwargs):
193+
def make_iterator_task(ni, taskcls=TestThreadTask, **kwargs):
194194
""":return: task which yields ni items
195195
:param taskcls: the actual iterator type to use
196196
:param **kwargs: additional kwargs to be passed to the task"""

test/git/async/test_performance.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@ def test_base(self):
2626
for num_transformers in (1, 5, 10):
2727
for read_mode in range(2):
2828
ts, rcs = add_task_chain(pool, ni, count=num_transformers,
29-
feedercls=InputIteratorThreadTask,
30-
transformercls=TestThreadPerformanceTaskNode,
29+
feedercls=IteratorThreadTask,
30+
transformercls=TestPerformanceThreadTask,
3131
include_verifier=False)
3232

3333
mode_info = "read(0)"

test/git/async/test_pool.py

+11-4
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,7 @@ def _assert_single_task(self, p, async=False):
198198
# test failure after ni / 2 items
199199
# This makes sure it correctly closes the channel on failure to prevent blocking
200200
nri = ni/2
201-
task = make_task(TestThreadFailureNode, fail_after=ni/2)
201+
task = make_task(TestFailureThreadTask, fail_after=ni/2)
202202
rc = p.add_task(task)
203203
assert len(rc.read()) == nri
204204
assert task.is_done()
@@ -374,7 +374,14 @@ def _assert_async_dependent_tasks(self, pool):
374374

375375
@terminate_threads
376376
def test_base(self):
377-
assert len(threading.enumerate()) == 1
377+
max_wait_attempts = 3
378+
sleep_time = 0.1
379+
for mc in range(max_wait_attempts):
380+
# wait for threads to die
381+
if len(threading.enumerate()) != 1:
382+
time.sleep(sleep_time)
383+
# END for each attempt
384+
assert len(threading.enumerate()) == 1, "Waited %f s for threads to die, its still alive" % (max_wait_attempts, sleep_time)
378385

379386
p = ThreadPool()
380387

@@ -401,7 +408,7 @@ def test_base(self):
401408
# SINGLE TASK SERIAL SYNC MODE
402409
##############################
403410
# put a few unrelated tasks that we forget about - check ref counts and cleanup
404-
t1, t2 = TestThreadTaskNode(iter(list()), "nothing1", None), TestThreadTaskNode(iter(list()), "nothing2", None)
411+
t1, t2 = TestThreadTask(iter(list()), "nothing1", None), TestThreadTask(iter(list()), "nothing2", None)
405412
urc1 = p.add_task(t1)
406413
urc2 = p.add_task(t2)
407414
assert p.num_tasks() == 2
@@ -416,7 +423,7 @@ def test_base(self):
416423
assert p.num_tasks() == 0
417424
assert sys.getrefcount(t2) == 2
418425

419-
t3 = TestThreadInputChannelTaskNode(urc2, "channel", None)
426+
t3 = TestChannelThreadTask(urc2, "channel", None)
420427
urc3 = p.add_task(t3)
421428
assert p.num_tasks() == 1
422429
del(urc3)

0 commit comments

Comments
 (0)