Skip to content

Commit a28942b

Browse files
committed
Added performance test, improved iterator task which will now be usable by default. It shows that there must be the notion of a producer, which can work if there are no items read
1 parent cac6e06 commit a28942b

File tree

3 files changed

+79
-13
lines changed

3 files changed

+79
-13
lines changed

lib/git/async/task.py

+3
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,9 @@ def __init__(self, iterator, *args, **kwargs):
196196
self._read = lambda count: weakself().__read(count)
197197
self._empty = False
198198

199+
# defaults to returning our items unchanged
200+
self.fun = lambda item: item
201+
199202
def __read(self, count=0):
200203
"""Read count items from the iterator, and return them"""
201204
# not threadsafe, but worst thing that could happen is that

test/git/async/task.py

+25-13
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,14 @@ def do_fun(self, item):
102102
# END handle tuple
103103

104104

105+
class TestThreadPerformanceTaskNode(InputChannelTask):
106+
"""Applies no operation to the item, and does not lock, measuring
107+
the actual throughput of the system"""
108+
109+
def do_fun(self, item):
110+
return item
111+
112+
105113
class TestThreadInputChannelVerifyTaskNode(_TestTaskBase, InputChannelTask):
106114
"""An input channel task, which verifies the result of its input channels,
107115
should be last in the chain.
@@ -121,15 +129,16 @@ def do_fun(self, item):
121129

122130
return item
123131

124-
125132
#{ Utilities
126133

127134
def make_proxy_method(t):
128135
"""required to prevent binding self into the method we call"""
129136
wt = weakref.proxy(t)
130137
return lambda item: wt.do_fun(item)
131138

132-
def add_task_chain(p, ni, count=1, fail_setup=list(), feeder_channel=None, id_offset=0):
139+
def add_task_chain(p, ni, count=1, fail_setup=list(), feeder_channel=None, id_offset=0,
140+
feedercls=TestThreadTaskNode, transformercls=TestThreadInputChannelTaskNode,
141+
include_verifier=True):
133142
"""Create a task chain of feeder, count transformers and order verifcator
134143
to the pool p, like t1 -> t2 -> t3
135144
:param fail_setup: a list of pairs, task_id, fail_after, i.e. [(2, 20)] would
@@ -145,7 +154,7 @@ def add_task_chain(p, ni, count=1, fail_setup=list(), feeder_channel=None, id_of
145154
feeder = None
146155
frc = feeder_channel
147156
if feeder_channel is None:
148-
feeder = make_iterator_task(ni)
157+
feeder = make_iterator_task(ni, taskcls=feedercls)
149158
frc = p.add_task(feeder)
150159
# END handle specific feeder
151160

@@ -154,7 +163,7 @@ def add_task_chain(p, ni, count=1, fail_setup=list(), feeder_channel=None, id_of
154163

155164
inrc = frc
156165
for tc in xrange(count):
157-
t = TestThreadInputChannelTaskNode(inrc, tc+id_offset, None)
166+
t = transformercls(inrc, tc+id_offset, None)
158167

159168
t.fun = make_proxy_method(t)
160169
#t.fun = t.do_fun
@@ -169,22 +178,25 @@ def add_task_chain(p, ni, count=1, fail_setup=list(), feeder_channel=None, id_of
169178
tasks[1+id].fail_after = fail_after
170179
# END setup failure
171180

172-
verifier = TestThreadInputChannelVerifyTaskNode(inrc, 'verifier', None)
173-
#verifier.fun = verifier.do_fun
174-
verifier.fun = make_proxy_method(verifier)
175-
vrc = p.add_task(verifier)
176-
177-
178-
tasks.append(verifier)
179-
rcs.append(vrc)
181+
if include_verifier:
182+
verifier = TestThreadInputChannelVerifyTaskNode(inrc, 'verifier', None)
183+
#verifier.fun = verifier.do_fun
184+
verifier.fun = make_proxy_method(verifier)
185+
vrc = p.add_task(verifier)
186+
187+
188+
tasks.append(verifier)
189+
rcs.append(vrc)
190+
# END handle include verifier
180191
return tasks, rcs
181192

182193
def make_iterator_task(ni, taskcls=TestThreadTaskNode, **kwargs):
183194
""":return: task which yields ni items
184195
:param taskcls: the actual iterator type to use
185196
:param **kwargs: additional kwargs to be passed to the task"""
186197
t = taskcls(iter(range(ni)), 'iterator', None, **kwargs)
187-
t.fun = make_proxy_method(t)
198+
if isinstance(t, _TestTaskBase):
199+
t.fun = make_proxy_method(t)
188200
return t
189201

190202
#} END utilities

test/git/async/test_performance.py

+51
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
"""Channel testing"""
2+
from test.testlib import *
3+
from task import *
4+
5+
from git.async.pool import *
6+
from git.async.thread import terminate_threads
7+
from git.async.util import cpu_count
8+
9+
import time
10+
import sys
11+
12+
13+
14+
class TestThreadPoolPerformance(TestBase):
15+
16+
max_threads = cpu_count()
17+
18+
def test_base(self):
19+
# create a dependency network, and see how the performance changes
20+
# when adjusting the amount of threads
21+
pool = ThreadPool(0)
22+
ni = 1000 # number of items to process
23+
print self.max_threads
24+
for num_threads in range(self.max_threads*2 + 1):
25+
pool.set_size(num_threads)
26+
for num_transformers in (1, 5, 10):
27+
for read_mode in range(2):
28+
ts, rcs = add_task_chain(pool, ni, count=num_transformers,
29+
feedercls=InputIteratorThreadTask,
30+
transformercls=TestThreadPerformanceTaskNode,
31+
include_verifier=False)
32+
33+
mode_info = "read(0)"
34+
if read_mode == 1:
35+
mode_info = "read(1) * %i" % ni
36+
# END mode info
37+
fmt = "Threadcount=%%i: Produced %%i items using %s in %%i transformations in %%f s (%%f items / s)" % mode_info
38+
reader = rcs[-1]
39+
st = time.time()
40+
if read_mode == 1:
41+
for i in xrange(ni):
42+
assert len(reader.read(1)) == 1
43+
# END for each item to read
44+
else:
45+
assert len(reader.read(0)) == ni
46+
# END handle read mode
47+
elapsed = time.time() - st
48+
print >> sys.stderr, fmt % (num_threads, ni, num_transformers, elapsed, ni / elapsed)
49+
# END for each read-mode
50+
# END for each amount of processors
51+
# END for each thread count

0 commit comments

Comments
 (0)