Skip to content

Commit 0974f87

Browse files
committed
Channel: Read method revised - now it really really doesn't block anymore, and it runs faster as well, about 2/3 of the performance we have when being in serial mode
1 parent 4e6bece commit 0974f87

File tree

4 files changed

+46
-55
lines changed

4 files changed

+46
-55
lines changed

lib/git/async/channel.py

+37-48
Original file line numberDiff line numberDiff line change
@@ -38,12 +38,11 @@ def __new__(cls, *args):
3838

3939
class WChannel(Channel):
4040
"""The write end of a channel"""
41-
__slots__ = ('_closed', '_queue')
41+
__slots__ = ('_queue')
4242

4343
def __init__(self):
4444
"""initialize this instance, able to hold max_items at once
4545
Write calls will block if the channel is full, until someone reads from it"""
46-
self._closed = False
4746
self._queue = AsyncQueue()
4847

4948

@@ -55,15 +54,10 @@ def write(self, item, block=True, timeout=None):
5554
:param block: If True, the call will block until there is free space in the
5655
channel
5756
:param timeout: timeout in seconds for blocking calls.
58-
:raise IOError: when writing into closed file
59-
:raise EOFError: when writing into a non-blocking full channel"""
57+
:raise ReadOnly: when writing into closed channel"""
6058
# let the queue handle the 'closed' attribute, we write much more often
6159
# to an open channel than to a closed one, saving a few cycles
62-
try:
63-
self._queue.put(item, block, timeout)
64-
except ReadOnly:
65-
raise IOError("Cannot write to a closed channel")
66-
# END exception handling
60+
self._queue.put(item, block, timeout)
6761

6862
def size(self):
6963
""":return: approximate number of items that could be read from the read-ends
@@ -73,15 +67,11 @@ def size(self):
7367
def close(self):
7468
"""Close the channel. Multiple close calls on a closed channel are no
7569
an error"""
76-
# yes, close it a little too early, better than having anyone put
77-
# additional items
78-
self._closed = True
7970
self._queue.set_writable(False)
8071

81-
@property
8272
def closed(self):
8373
""":return: True if the channel was closed"""
84-
return self._closed
74+
return not self._queue.writable()
8575
#} END interface
8676

8777

@@ -104,6 +94,7 @@ def read(self, count=0, block=True, timeout=None):
10494
:param block: if True, the call will block until an item is available
10595
:param timeout: if positive and block is True, it will block only for the
10696
given amount of seconds, returning the items it received so far.
97+
The timeout is applied to each read item, not for the whole operation.
10798
:return: single item in a list if count is 1, or a list of count items.
10899
If the channel was empty and count was 1, an empty list will be returned.
109100
If count was greater 1, a list with less than count items will be
@@ -112,9 +103,11 @@ def read(self, count=0, block=True, timeout=None):
112103
returned."""
113104
# if the channel is closed for writing, we never block
114105
# NOTE: is handled by the queue
115-
if self._wc.closed or timeout == 0:
116-
block = False
117-
106+
# We don't check for a closed state here has it costs time - most of
107+
# the time, it will not be closed, and will bail out automatically once
108+
# it gets closed
109+
110+
118111
# in non-blocking mode, its all not a problem
119112
out = list()
120113
queue = self._wc._queue
@@ -142,42 +135,38 @@ def read(self, count=0, block=True, timeout=None):
142135
count = sys.maxint
143136
# END handle count
144137

145-
endtime = sys.maxint # allows timeout for whole operation
146-
if timeout is not None:
147-
endtime = time() + timeout
148-
# could be improved by a separate: no-endtime branch, saving the time calls
149-
for i in xrange(count):
138+
i = 0
139+
while i < count:
150140
try:
151141
out.append(queue.get(block, timeout))
142+
i += 1
152143
except Empty:
153-
# here we are only if there is nothing on the queue,
154-
# and if we are blocking. If we are not blocking, this
155-
# indiccates that the queue was set unwritable in the meanwhile.
156-
# hence we can abort now to prevent reading (possibly) forever
157-
# Besides, this is racy as all threads will rip on the channel
158-
# without waiting until its empty
159-
if not block:
160-
break
161-
# END ignore empty
162-
163-
# if we have been unblocked because the closed state changed
164-
# in the meanwhile, stop trying
165-
# NOTE: must NOT cache _wc
166-
if self._wc.closed:
167-
# If we were closed, we drop out even if there might still
168-
# be items. Now its time to get these items, according to
169-
# our count. Just switch to unblocking mode.
170-
# If we are to read unlimited items, this would run forever,
171-
# but the EmptyException handler takes care of this
172-
block = False
144+
# here we are only if
145+
# someone woke us up to inform us about the queue that changed
146+
# its writable state
147+
# The following branch checks for closed channels, and pulls
148+
# as many items as we need and as possible, before
149+
# leaving the loop.
150+
if not queue.writable():
151+
try:
152+
while i < count:
153+
out.append(queue.get(False, None))
154+
i += 1
155+
# END count loop
156+
except Empty:
157+
break # out of count loop
158+
# END handle absolutely empty queue
159+
# END handle closed channel
173160

174-
# we don't continue, but let the timer decide whether
175-
# it wants to abort
176-
# END handle channel cloased
177-
178-
if time() >= endtime:
161+
# if we are here, we woke up and the channel is not closed
162+
# Either the queue became writable again, which currently shouldn't
163+
# be able to happen in the channel, or someone read with a timeout
164+
# that actually timed out.
165+
# As it timed out, which is the only reason we are here,
166+
# we have to abort
179167
break
180-
# END stop operation on timeout
168+
# END ignore empty
169+
181170
# END for each item
182171
# END handle blocking
183172
return out

lib/git/async/task.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ def close(self):
6666

6767
def is_closed(self):
6868
""":return: True if the task's write channel is closed"""
69-
return self._out_wc.closed
69+
return self._out_wc.closed()
7070

7171
def error(self):
7272
""":return: Exception caught during last processing or None"""

test/git/async/test_channel.py

+4-6
Original file line numberDiff line numberDiff line change
@@ -33,18 +33,16 @@ def test_base(self):
3333
assert time.time() - st >= to
3434

3535
# writing to a closed channel raises
36-
assert not wc.closed
36+
assert not wc.closed()
3737
wc.close()
38-
assert wc.closed
38+
assert wc.closed()
3939
wc.close() # fine
40-
assert wc.closed
40+
assert wc.closed()
4141

42-
self.failUnlessRaises(IOError, wc.write, 1)
42+
self.failUnlessRaises(ReadOnly, wc.write, 1)
4343

4444
# reading from a closed channel never blocks
45-
print "preblock"
4645
assert len(rc.read()) == 0
47-
print "got read(0)"
4846
assert len(rc.read(5)) == 0
4947
assert len(rc.read(1)) == 0
5048

test/git/async/test_pool.py

+4
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,10 @@ def _assert(self, pc, fc, check_scheduled=False):
5757
return self
5858

5959

60+
class TestThreadFailureNode(TestThreadTaskNode):
61+
"""Fails after X items"""
62+
63+
6064
class TestThreadPool(TestBase):
6165

6266
max_threads = cpu_count()

0 commit comments

Comments
 (0)