Skip to content

Commit f91495e

Browse files
committed
Merge branch 'async'
2 parents 7c1169f + 7a0b79e commit f91495e

21 files changed

+2820
-147
lines changed

lib/git/async/__init__.py

+30
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
"""Initialize the multi-processing package"""
2+
3+
#{ Initialization
4+
def _init_atexit():
5+
"""Setup an at-exit job to be sure our workers are shutdown correctly before
6+
the interpreter quits"""
7+
import atexit
8+
import thread
9+
atexit.register(thread.do_terminate_threads)
10+
11+
def _init_signals():
12+
"""Assure we shutdown our threads correctly when being interrupted"""
13+
import signal
14+
import thread
15+
16+
prev_handler = signal.getsignal(signal.SIGINT)
17+
def thread_interrupt_handler(signum, frame):
18+
thread.do_terminate_threads()
19+
if callable(prev_handler):
20+
prev_handler(signum, frame)
21+
raise KeyboardInterrupt()
22+
# END call previous handler
23+
# END signal handler
24+
signal.signal(signal.SIGINT, thread_interrupt_handler)
25+
26+
27+
#} END init
28+
29+
_init_atexit()
30+
_init_signals()

lib/git/async/channel.py

+338
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,338 @@
1+
"""Contains a queue based channel implementation"""
2+
from Queue import (
3+
Empty,
4+
Full
5+
)
6+
7+
from util import (
8+
AsyncQueue,
9+
SyncQueue,
10+
ReadOnly
11+
)
12+
13+
from time import time
14+
import threading
15+
import sys
16+
17+
__all__ = ('Channel', 'SerialChannel', 'Writer', 'ChannelWriter', 'CallbackChannelWriter',
18+
'Reader', 'ChannelReader', 'CallbackChannelReader', 'mkchannel', 'ReadOnly',
19+
'IteratorReader')
20+
21+
#{ Classes
22+
class Channel(object):
23+
"""A channel is similar to a file like object. It has a write end as well as one or
24+
more read ends. If Data is in the channel, it can be read, if not the read operation
25+
will block until data becomes available.
26+
If the channel is closed, any read operation will result in an exception
27+
28+
This base class is not instantiated directly, but instead serves as constructor
29+
for Rwriter pairs.
30+
31+
Create a new channel """
32+
__slots__ = 'queue'
33+
34+
# The queue to use to store the actual data
35+
QueueCls = AsyncQueue
36+
37+
def __init__(self):
38+
"""initialize this instance with a queue holding the channel contents"""
39+
self.queue = self.QueueCls()
40+
41+
42+
class SerialChannel(Channel):
43+
"""A slightly faster version of a Channel, which sacrificed thead-safety for performance"""
44+
QueueCls = SyncQueue
45+
46+
47+
class Writer(object):
48+
"""A writer is an object providing write access to a possibly blocking reading device"""
49+
__slots__ = tuple()
50+
51+
#{ Interface
52+
53+
def __init__(self, device):
54+
"""Initialize the instance with the device to write to"""
55+
56+
def write(self, item, block=True, timeout=None):
57+
"""Write the given item into the device
58+
:param block: True if the device may block until space for the item is available
59+
:param timeout: The time in seconds to wait for the device to become ready
60+
in blocking mode"""
61+
raise NotImplementedError()
62+
63+
def size(self):
64+
""":return: number of items already in the device, they could be read with a reader"""
65+
raise NotImplementedError()
66+
67+
def close(self):
68+
"""Close the channel. Multiple close calls on a closed channel are no
69+
an error"""
70+
raise NotImplementedError()
71+
72+
def closed(self):
73+
""":return: True if the channel was closed"""
74+
raise NotImplementedError()
75+
76+
#} END interface
77+
78+
79+
class ChannelWriter(Writer):
80+
"""The write end of a channel, a file-like interface for a channel"""
81+
__slots__ = ('channel', '_put')
82+
83+
def __init__(self, channel):
84+
"""Initialize the writer to use the given channel"""
85+
self.channel = channel
86+
self._put = self.channel.queue.put
87+
88+
#{ Interface
89+
def write(self, item, block=False, timeout=None):
90+
return self._put(item, block, timeout)
91+
92+
def size(self):
93+
return self.channel.queue.qsize()
94+
95+
def close(self):
96+
"""Close the channel. Multiple close calls on a closed channel are no
97+
an error"""
98+
self.channel.queue.set_writable(False)
99+
100+
def closed(self):
101+
""":return: True if the channel was closed"""
102+
return not self.channel.queue.writable()
103+
#} END interface
104+
105+
106+
class CallbackWriterMixin(object):
107+
"""The write end of a channel which allows you to setup a callback to be
108+
called after an item was written to the channel"""
109+
# slots don't work with mixin's :(
110+
# __slots__ = ('_pre_cb')
111+
112+
def __init__(self, *args):
113+
super(CallbackWriterMixin, self).__init__(*args)
114+
self._pre_cb = None
115+
116+
def set_pre_cb(self, fun = lambda item: item):
117+
"""Install a callback to be called before the given item is written.
118+
It returns a possibly altered item which will be written to the channel
119+
instead, making it useful for pre-write item conversions.
120+
Providing None uninstalls the current method.
121+
:return: the previously installed function or None
122+
:note: Must be thread-safe if the channel is used in multiple threads"""
123+
prev = self._pre_cb
124+
self._pre_cb = fun
125+
return prev
126+
127+
def write(self, item, block=True, timeout=None):
128+
if self._pre_cb:
129+
item = self._pre_cb(item)
130+
super(CallbackWriterMixin, self).write(item, block, timeout)
131+
132+
133+
class CallbackChannelWriter(CallbackWriterMixin, ChannelWriter):
134+
"""Implements a channel writer with callback functionality"""
135+
pass
136+
137+
138+
class Reader(object):
139+
"""Allows reading from a device"""
140+
__slots__ = tuple()
141+
142+
#{ Interface
143+
def __init__(self, device):
144+
"""Initialize the instance with the device to read from"""
145+
146+
def read(self, count=0, block=True, timeout=None):
147+
"""read a list of items read from the device. The list, as a sequence
148+
of items, is similar to the string of characters returned when reading from
149+
file like objects.
150+
:param count: given amount of items to read. If < 1, all items will be read
151+
:param block: if True, the call will block until an item is available
152+
:param timeout: if positive and block is True, it will block only for the
153+
given amount of seconds, returning the items it received so far.
154+
The timeout is applied to each read item, not for the whole operation.
155+
:return: single item in a list if count is 1, or a list of count items.
156+
If the device was empty and count was 1, an empty list will be returned.
157+
If count was greater 1, a list with less than count items will be
158+
returned.
159+
If count was < 1, a list with all items that could be read will be
160+
returned."""
161+
raise NotImplementedError()
162+
163+
164+
class ChannelReader(Reader):
165+
"""Allows reading from a channel. The reader is thread-safe if the channel is as well"""
166+
__slots__ = 'channel'
167+
168+
def __init__(self, channel):
169+
"""Initialize this instance from its parent write channel"""
170+
self.channel = channel
171+
172+
#{ Interface
173+
174+
def read(self, count=0, block=True, timeout=None):
175+
# if the channel is closed for writing, we never block
176+
# NOTE: is handled by the queue
177+
# We don't check for a closed state here has it costs time - most of
178+
# the time, it will not be closed, and will bail out automatically once
179+
# it gets closed
180+
181+
182+
# in non-blocking mode, its all not a problem
183+
out = list()
184+
queue = self.channel.queue
185+
if not block:
186+
# be as fast as possible in non-blocking mode, hence
187+
# its a bit 'unrolled'
188+
try:
189+
if count == 1:
190+
out.append(queue.get(False))
191+
elif count < 1:
192+
while True:
193+
out.append(queue.get(False))
194+
# END for each item
195+
else:
196+
for i in xrange(count):
197+
out.append(queue.get(False))
198+
# END for each item
199+
# END handle count
200+
except Empty:
201+
pass
202+
# END handle exceptions
203+
else:
204+
# to get everything into one loop, we set the count accordingly
205+
if count == 0:
206+
count = sys.maxint
207+
# END handle count
208+
209+
i = 0
210+
while i < count:
211+
try:
212+
out.append(queue.get(block, timeout))
213+
i += 1
214+
except Empty:
215+
# here we are only if
216+
# someone woke us up to inform us about the queue that changed
217+
# its writable state
218+
# The following branch checks for closed channels, and pulls
219+
# as many items as we need and as possible, before
220+
# leaving the loop.
221+
if not queue.writable():
222+
try:
223+
while i < count:
224+
out.append(queue.get(False, None))
225+
i += 1
226+
# END count loop
227+
except Empty:
228+
break # out of count loop
229+
# END handle absolutely empty queue
230+
# END handle closed channel
231+
232+
# if we are here, we woke up and the channel is not closed
233+
# Either the queue became writable again, which currently shouldn't
234+
# be able to happen in the channel, or someone read with a timeout
235+
# that actually timed out.
236+
# As it timed out, which is the only reason we are here,
237+
# we have to abort
238+
break
239+
# END ignore empty
240+
241+
# END for each item
242+
# END handle blocking
243+
return out
244+
245+
#} END interface
246+
247+
248+
class CallbackReaderMixin(object):
249+
"""A channel which sends a callback before items are read from the channel"""
250+
# unfortunately, slots can only use direct inheritance, have to turn it off :(
251+
# __slots__ = "_pre_cb"
252+
253+
def __init__(self, *args):
254+
super(CallbackReaderMixin, self).__init__(*args)
255+
self._pre_cb = None
256+
257+
def set_pre_cb(self, fun = lambda count: None):
258+
"""Install a callback to call with the item count to be read before any
259+
item is actually read from the channel.
260+
Exceptions will be propagated.
261+
If a function is not provided, the call is effectively uninstalled.
262+
:return: the previously installed callback or None
263+
:note: The callback must be threadsafe if the channel is used by multiple threads."""
264+
prev = self._pre_cb
265+
self._pre_cb = fun
266+
return prev
267+
268+
def read(self, count=0, block=True, timeout=None):
269+
if self._pre_cb:
270+
self._pre_cb(count)
271+
return super(CallbackReaderMixin, self).read(count, block, timeout)
272+
273+
274+
class CallbackChannelReader(CallbackReaderMixin, ChannelReader):
275+
"""Implements a channel reader with callback functionality"""
276+
pass
277+
278+
279+
class IteratorReader(Reader):
280+
"""A Reader allowing to read items from an iterator, instead of a channel.
281+
Reads will never block. Its thread-safe"""
282+
__slots__ = ("_empty", '_iter', '_lock')
283+
284+
# the type of the lock to use when reading from the iterator
285+
lock_type = threading.Lock
286+
287+
def __init__(self, iterator):
288+
self._empty = False
289+
if not hasattr(iterator, 'next'):
290+
raise ValueError("Iterator %r needs a next() function" % iterator)
291+
self._iter = iterator
292+
self._lock = self.lock_type()
293+
294+
def read(self, count=0, block=True, timeout=None):
295+
"""Non-Blocking implementation of read"""
296+
# not threadsafe, but worst thing that could happen is that
297+
# we try to get items one more time
298+
if self._empty:
299+
return list()
300+
# END early abort
301+
302+
self._lock.acquire()
303+
try:
304+
if count == 0:
305+
self._empty = True
306+
return list(self._iter)
307+
else:
308+
out = list()
309+
it = self._iter
310+
for i in xrange(count):
311+
try:
312+
out.append(it.next())
313+
except StopIteration:
314+
self._empty = True
315+
break
316+
# END handle empty iterator
317+
# END for each item to take
318+
return out
319+
# END handle count
320+
finally:
321+
self._lock.release()
322+
# END handle locking
323+
324+
325+
#} END classes
326+
327+
#{ Constructors
328+
def mkchannel(ctype = Channel, wtype = ChannelWriter, rtype = ChannelReader):
329+
"""Create a channel, with a reader and a writer
330+
:return: tuple(reader, writer)
331+
:param ctype: Channel to instantiate
332+
:param wctype: The type of the write channel to instantiate
333+
:param rctype: The type of the read channel to instantiate"""
334+
c = ctype()
335+
wc = wtype(c)
336+
rc = rtype(c)
337+
return wc, rc
338+
#} END constructors

0 commit comments

Comments
 (0)