Skip to content

Commit be8955a

Browse files
committed
Cleaned up channel design, Reader and Writer bases don't require a channel anymore, but are abstract.
Added IteratorReader, implementing the reader interface from an iterator. The implementation moved from the TaskIterator to the channel
1 parent a28942b commit be8955a

File tree

4 files changed

+147
-78
lines changed

4 files changed

+147
-78
lines changed

lib/git/async/channel.py

+116-23
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,12 @@
1111
)
1212

1313
from time import time
14+
import threading
1415
import sys
1516

16-
__all__ = ('Channel', 'SerialChannel', 'Writer', 'CallbackWriter', 'Reader',
17-
'CallbackReader', 'mkchannel', 'ReadOnly')
17+
__all__ = ('Channel', 'SerialChannel', 'Writer', 'ChannelWriter', 'CallbackChannelWriter',
18+
'Reader', 'ChannelReader', 'CallbackChannelReader', 'mkchannel', 'ReadOnly',
19+
'IteratorReader')
1820

1921
#{ Classes
2022
class Channel(object):
@@ -43,15 +45,50 @@ class SerialChannel(Channel):
4345

4446

4547
class Writer(object):
48+
"""A writer is an object providing write access to a possibly blocking reading device"""
49+
__slots__ = tuple()
50+
51+
#{ Interface
52+
53+
def __init__(self, device):
54+
"""Initialize the instance with the device to write to"""
55+
56+
def write(self, item, block=True, timeout=None):
57+
"""Write the given item into the device
58+
:param block: True if the device may block until space for the item is available
59+
:param timeout: The time in seconds to wait for the device to become ready
60+
in blocking mode"""
61+
raise NotImplementedError()
62+
63+
def size(self):
64+
""":return: number of items already in the device, they could be read with a reader"""
65+
raise NotImplementedError()
66+
67+
def close(self):
68+
"""Close the channel. Multiple close calls on a closed channel are no
69+
an error"""
70+
raise NotImplementedError()
71+
72+
def closed(self):
73+
""":return: True if the channel was closed"""
74+
raise NotImplementedError()
75+
76+
#} END interface
77+
78+
79+
class ChannelWriter(Writer):
4680
"""The write end of a channel, a file-like interface for a channel"""
47-
__slots__ = ('write', 'channel')
81+
__slots__ = ('channel', '_put')
4882

4983
def __init__(self, channel):
5084
"""Initialize the writer to use the given channel"""
5185
self.channel = channel
52-
self.write = channel.queue.put
86+
self._put = self.channel.queue.put
5387

5488
#{ Interface
89+
def write(self, item, block=False, timeout=None):
90+
return self._put(item, block, timeout)
91+
5592
def size(self):
5693
return self.channel.queue.qsize()
5794

@@ -66,15 +103,14 @@ def closed(self):
66103
#} END interface
67104

68105

69-
class CallbackWriter(Writer):
106+
class CallbackChannelWriter(ChannelWriter):
70107
"""The write end of a channel which allows you to setup a callback to be
71108
called after an item was written to the channel"""
72109
__slots__ = ('_pre_cb')
73110

74111
def __init__(self, channel):
75-
Writer.__init__(self, channel)
112+
super(CallbackChannelWriter, self).__init__(channel)
76113
self._pre_cb = None
77-
self.write = self._write
78114

79115
def set_pre_cb(self, fun = lambda item: item):
80116
"""Install a callback to be called before the given item is written.
@@ -87,25 +123,22 @@ def set_pre_cb(self, fun = lambda item: item):
87123
self._pre_cb = fun
88124
return prev
89125

90-
def _write(self, item, block=True, timeout=None):
126+
def write(self, item, block=True, timeout=None):
91127
if self._pre_cb:
92128
item = self._pre_cb(item)
93-
self.channel.queue.put(item, block, timeout)
129+
super(CallbackChannelWriter, self).write(item, block, timeout)
94130

95131

96132
class Reader(object):
97-
"""Allows reading from a channel"""
98-
__slots__ = 'channel'
133+
"""Allows reading from a device"""
134+
__slots__ = tuple()
99135

100-
def __init__(self, channel):
101-
"""Initialize this instance from its parent write channel"""
102-
self.channel = channel
103-
104-
105136
#{ Interface
106-
137+
def __init__(self, device):
138+
"""Initialize the instance with the device to read from"""
139+
107140
def read(self, count=0, block=True, timeout=None):
108-
"""read a list of items read from the channel. The list, as a sequence
141+
"""read a list of items read from the device. The list, as a sequence
109142
of items, is similar to the string of characters returned when reading from
110143
file like objects.
111144
:param count: given amount of items to read. If < 1, all items will be read
@@ -114,11 +147,25 @@ def read(self, count=0, block=True, timeout=None):
114147
given amount of seconds, returning the items it received so far.
115148
The timeout is applied to each read item, not for the whole operation.
116149
:return: single item in a list if count is 1, or a list of count items.
117-
If the channel was empty and count was 1, an empty list will be returned.
150+
If the device was empty and count was 1, an empty list will be returned.
118151
If count was greater 1, a list with less than count items will be
119152
returned.
120153
If count was < 1, a list with all items that could be read will be
121154
returned."""
155+
raise NotImplementedError()
156+
157+
158+
class ChannelReader(Reader):
159+
"""Allows reading from a channel. The reader is thread-safe if the channel is as well"""
160+
__slots__ = 'channel'
161+
162+
def __init__(self, channel):
163+
"""Initialize this instance from its parent write channel"""
164+
self.channel = channel
165+
166+
#{ Interface
167+
168+
def read(self, count=0, block=True, timeout=None):
122169
# if the channel is closed for writing, we never block
123170
# NOTE: is handled by the queue
124171
# We don't check for a closed state here has it costs time - most of
@@ -191,12 +238,12 @@ def read(self, count=0, block=True, timeout=None):
191238

192239
#} END interface
193240

194-
class CallbackReader(Reader):
241+
class CallbackChannelReader(ChannelReader):
195242
"""A channel which sends a callback before items are read from the channel"""
196243
__slots__ = "_pre_cb"
197244

198245
def __init__(self, channel):
199-
Reader.__init__(self, channel)
246+
super(CallbackChannelReader, self).__init__(channel)
200247
self._pre_cb = None
201248

202249
def set_pre_cb(self, fun = lambda count: None):
@@ -213,13 +260,59 @@ def set_pre_cb(self, fun = lambda count: None):
213260
def read(self, count=0, block=True, timeout=None):
214261
if self._pre_cb:
215262
self._pre_cb(count)
216-
return Reader.read(self, count, block, timeout)
263+
return super(CallbackChannelReader, self).read(count, block, timeout)
264+
217265

266+
class IteratorReader(Reader):
267+
"""A Reader allowing to read items from an iterator, instead of a channel.
268+
Reads will never block. Its thread-safe"""
269+
__slots__ = ("_empty", '_iter', '_lock')
270+
271+
# the type of the lock to use when reading from the iterator
272+
lock_type = threading.Lock
273+
274+
def __init__(self, iterator):
275+
self._empty = False
276+
if not hasattr(iterator, 'next'):
277+
raise ValueError("Iterator %r needs a next() function" % iterator)
278+
self._iter = iterator
279+
self._lock = self.lock_type()
280+
281+
def read(self, count=0, block=True, timeout=None):
282+
"""Non-Blocking implementation of read"""
283+
# not threadsafe, but worst thing that could happen is that
284+
# we try to get items one more time
285+
if self._empty:
286+
return list()
287+
# END early abort
288+
289+
self._lock.acquire()
290+
try:
291+
if count == 0:
292+
self._empty = True
293+
return list(self._iter)
294+
else:
295+
out = list()
296+
it = self._iter
297+
for i in xrange(count):
298+
try:
299+
out.append(it.next())
300+
except StopIteration:
301+
self._empty = True
302+
break
303+
# END handle empty iterator
304+
# END for each item to take
305+
return out
306+
# END handle count
307+
finally:
308+
self._lock.release()
309+
# END handle locking
310+
218311

219312
#} END classes
220313

221314
#{ Constructors
222-
def mkchannel(ctype = Channel, wtype = Writer, rtype = Reader):
315+
def mkchannel(ctype = Channel, wtype = ChannelWriter, rtype = ChannelReader):
223316
"""Create a channel, with a reader and a writer
224317
:return: tuple(reader, writer)
225318
:param ctype: Channel to instantiate

lib/git/async/pool.py

+8-7
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,10 @@
1818
from graph import Graph
1919
from channel import (
2020
mkchannel,
21-
Writer,
21+
ChannelWriter,
2222
Channel,
2323
SerialChannel,
24-
CallbackReader
24+
CallbackChannelReader
2525
)
2626

2727
import sys
@@ -32,13 +32,14 @@
3232

3333
__all__ = ('PoolReader', 'Pool', 'ThreadPool')
3434

35-
class PoolReader(CallbackReader):
35+
36+
class PoolReader(CallbackChannelReader):
3637
"""A reader designed to read from channels which take part in pools
3738
It acts like a handle to the underlying task in the pool."""
3839
__slots__ = ('_task_ref', '_pool_ref')
3940

4041
def __init__(self, channel, task, pool):
41-
CallbackReader.__init__(self, channel)
42+
CallbackChannelReader.__init__(self, channel)
4243
self._task_ref = weakref.ref(task)
4344
self._pool_ref = weakref.ref(pool)
4445

@@ -69,7 +70,7 @@ def __del__(self):
6970

7071
#{ Internal
7172
def _read(self, count=0, block=True, timeout=None):
72-
return CallbackReader.read(self, count, block, timeout)
73+
return CallbackChannelReader.read(self, count, block, timeout)
7374

7475
#} END internal
7576

@@ -115,7 +116,7 @@ def read(self, count=0, block=True, timeout=None):
115116
####### read data ########
116117
##########################
117118
# read actual items, tasks were setup to put their output into our channel ( as well )
118-
items = CallbackReader.read(self, count, block, timeout)
119+
items = CallbackChannelReader.read(self, count, block, timeout)
119120
##########################
120121

121122

@@ -446,7 +447,7 @@ def add_task(self, task):
446447
ch = None
447448
if wc is None:
448449
ch = ctype()
449-
wc = Writer(ch)
450+
wc = ChannelWriter(ch)
450451
task.set_writer(wc)
451452
else:
452453
ch = wc.channel

lib/git/async/task.py

+6-45
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
from graph import Node
22
from util import ReadOnly
3+
from channel import IteratorReader
4+
35

46
import threading
57
import weakref
@@ -179,56 +181,15 @@ class ThreadTaskBase(object):
179181
class InputIteratorTaskBase(OutputChannelTask):
180182
"""Implements a task which processes items from an iterable in a multi-processing
181183
safe manner"""
182-
__slots__ = ('_iterator', '_lock', '_empty')
183-
# the type of the lock to use when reading from the iterator
184-
lock_type = None
184+
__slots__ = tuple()
185+
185186

186187
def __init__(self, iterator, *args, **kwargs):
187188
OutputChannelTask.__init__(self, *args, **kwargs)
188-
if not hasattr(iterator, 'next'):
189-
raise ValueError("Iterator %r needs a next() function" % iterator)
190-
self._iterator = iterator
191-
self._lock = self.lock_type()
192-
193-
# this is necessary to prevent a cyclic ref, preventing us from
194-
# getting deleted ( and collected )
195-
weakself = weakref.ref(self)
196-
self._read = lambda count: weakself().__read(count)
197-
self._empty = False
198-
189+
self._read = IteratorReader(iterator).read
199190
# defaults to returning our items unchanged
200191
self.fun = lambda item: item
201-
202-
def __read(self, count=0):
203-
"""Read count items from the iterator, and return them"""
204-
# not threadsafe, but worst thing that could happen is that
205-
# we try to get items one more time
206-
if self._empty:
207-
return list()
208-
# END early abort
209-
210-
self._lock.acquire()
211-
try:
212-
if count == 0:
213-
self._empty = True
214-
return list(self._iterator)
215-
else:
216-
out = list()
217-
it = self._iterator
218-
for i in xrange(count):
219-
try:
220-
out.append(it.next())
221-
except StopIteration:
222-
self._empty = True
223-
break
224-
# END handle empty iterator
225-
# END for each item to take
226-
return out
227-
# END handle count
228-
finally:
229-
self._lock.release()
230-
# END handle locking
231-
192+
232193

233194
class InputIteratorThreadTask(InputIteratorTaskBase, ThreadTaskBase):
234195
"""An input iterator for threaded pools"""

test/git/async/test_channel.py

+17-3
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@ class TestChannels(TestBase):
99
def test_base(self):
1010
# creating channel yields a write and a read channal
1111
wc, rc = mkchannel()
12-
assert isinstance(wc, Writer) # default args
13-
assert isinstance(rc, Reader)
12+
assert isinstance(wc, ChannelWriter) # default args
13+
assert isinstance(rc, ChannelReader)
1414

1515

1616
# TEST UNLIMITED SIZE CHANNEL - writing+reading is FIFO
@@ -46,7 +46,7 @@ def test_base(self):
4646

4747

4848
# test callback channels
49-
wc, rc = mkchannel(wtype = CallbackWriter, rtype = CallbackReader)
49+
wc, rc = mkchannel(wtype = CallbackChannelWriter, rtype = CallbackChannelReader)
5050

5151
cb = [0, 0] # set slots to one if called
5252
def pre_write(item):
@@ -71,3 +71,17 @@ def pre_read(count):
7171
assert rval == val + 1
7272

7373

74+
75+
# ITERATOR READER
76+
reader = IteratorReader(iter(range(10)))
77+
assert len(reader.read(2)) == 2
78+
assert len(reader.read(0)) == 8
79+
# its empty now
80+
assert len(reader.read(0)) == 0
81+
assert len(reader.read(5)) == 0
82+
83+
# doesn't work if item is not an iterator
84+
self.failUnlessRaises(ValueError, IteratorReader, list())
85+
86+
# NOTE: its thread-safety is tested by the pool
87+

0 commit comments

Comments
 (0)