Skip to content
This repository was archived by the owner on Apr 14, 2024. It is now read-only.

Commit 8cfa254

Browse files
committed
Usability improvements: Reader's now support the iterator protocol; CallbackReader supports a post-read converter
PoolReader: made the pool and task ref functions part of the internal interface, added pool() and task() functions which return the dereferenced weakref directly
1 parent 5a13dc5 commit 8cfa254

File tree

6 files changed

+87
-11
lines changed

6 files changed

+87
-11
lines changed

__init__.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,3 +28,9 @@ def thread_interrupt_handler(signum, frame):
2828

2929
_init_atexit()
3030
_init_signals()
31+
32+
33+
# initial imports
34+
from task import *
35+
from pool import *
36+
from channel import *

channel.py

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,24 @@ class Reader(object):
142142
#{ Interface
143143
def __init__(self, device):
144144
"""Initialize the instance with the device to read from"""
145+
146+
#{ Iterator protocol
147+
148+
def __iter__(self):
149+
return self
150+
151+
def next(self):
152+
"""Implements the iterator protocol, iterating individual items"""
153+
items = self.read(1)
154+
if items:
155+
return items[0]
156+
raise StopIteration
157+
158+
#} END iterator protocol
145159

160+
161+
#{ Interface
162+
146163
def read(self, count=0, block=True, timeout=None):
147164
"""read a list of items read from the device. The list, as a sequence
148165
of items, is similar to the string of characters returned when reading from
@@ -160,6 +177,8 @@ def read(self, count=0, block=True, timeout=None):
160177
returned."""
161178
raise NotImplementedError()
162179

180+
#} END interface
181+
163182

164183
class ChannelReader(Reader):
165184
"""Allows reading from a channel. The reader is thread-safe if the channel is as well"""
@@ -253,6 +272,7 @@ class CallbackReaderMixin(object):
253272
def __init__(self, *args):
254273
super(CallbackReaderMixin, self).__init__(*args)
255274
self._pre_cb = None
275+
self._post_cb = None
256276

257277
def set_pre_cb(self, fun = lambda count: None):
258278
"""Install a callback to call with the item count to be read before any
@@ -264,11 +284,26 @@ def set_pre_cb(self, fun = lambda count: None):
264284
prev = self._pre_cb
265285
self._pre_cb = fun
266286
return prev
287+
288+
def set_post_cb(self, fun = lambda items: items):
289+
"""Install a callback to call after items have been read, but before
290+
they are returned to the caller. The callback may adjust the items and/or
291+
the list
292+
If no function is provided, the callback is uninstalled
293+
:return: the previously installed function"""
294+
prev = self._post_cb
295+
self._post_cb = fun
296+
return prev
267297

268298
def read(self, count=0, block=True, timeout=None):
269299
if self._pre_cb:
270300
self._pre_cb(count)
271-
return super(CallbackReaderMixin, self).read(count, block, timeout)
301+
items = super(CallbackReaderMixin, self).read(count, block, timeout)
302+
303+
if self._post_cb:
304+
items = self._post_cb(items)
305+
return items
306+
272307

273308

274309
class CallbackChannelReader(CallbackReaderMixin, ChannelReader):

pool.py

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -71,11 +71,8 @@ def __del__(self):
7171
#{ Internal
7272
def _read(self, count=0, block=True, timeout=None):
7373
return CallbackChannelReader.read(self, count, block, timeout)
74-
75-
#} END internal
76-
77-
#{ Interface
78-
74+
75+
7976
def pool_ref(self):
8077
""":return: reference to the pool we belong to"""
8178
return self._pool_ref
@@ -84,6 +81,27 @@ def task_ref(self):
8481
""":return: reference to the task producing our items"""
8582
return self._task_ref
8683

84+
#} END internal
85+
86+
#{ Interface
87+
88+
def task(self):
89+
""":return: task we read from
90+
:raise ValueError: If the instance is not attached to at task"""
91+
task = self._task_ref()
92+
if task is None:
93+
raise ValueError("PoolReader is not associated with at task anymore")
94+
return task
95+
96+
def pool(self):
97+
""":return: pool our task belongs to
98+
:raise ValueError: if the instance does not belong to a pool"""
99+
pool = self._pool_ref()
100+
if pool is None:
101+
raise ValueError("PoolReader is not associated with a pool anymore")
102+
return pool
103+
104+
87105
#} END interface
88106

89107
def read(self, count=0, block=True, timeout=None):

task.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ def __init__(self, id, fun, apply_single=True, min_count=None, max_chunksize=0,
5252
self._wlock = threading.Lock()
5353
self.fun = fun
5454
self.min_count = None
55-
self.max_chunksize = 0 # note set
55+
self.max_chunksize = 0 # not set
5656
self.apply_single = apply_single
5757

5858
def is_done(self):

test/test_channel.py

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,28 +48,34 @@ def test_base(self):
4848
# test callback channels
4949
wc, rc = mkchannel(wtype = CallbackChannelWriter, rtype = CallbackChannelReader)
5050

51-
cb = [0, 0] # set slots to one if called
51+
cb = [0, 0, 0] # set slots to one if called
5252
def pre_write(item):
5353
cb[0] = 1
5454
return item + 1
5555
def pre_read(count):
5656
cb[1] = 1
57+
def post_read(items):
58+
assert isinstance(items, list)
59+
cb[2] = 1
60+
return [ i+1 for i in items]
61+
5762

5863
# set, verify it returns previous one
5964
assert wc.set_pre_cb(pre_write) is None
6065
assert rc.set_pre_cb(pre_read) is None
66+
assert rc.set_post_cb(post_read) is None
6167
assert wc.set_pre_cb(pre_write) is pre_write
6268
assert rc.set_pre_cb(pre_read) is pre_read
69+
assert rc.set_post_cb(post_read) is post_read
6370

6471
# writer transforms input
6572
val = 5
6673
wc.write(val)
6774
assert cb[0] == 1 and cb[1] == 0
6875

6976
rval = rc.read(1)[0] # read one item, must not block
70-
assert cb[0] == 1 and cb[1] == 1
71-
assert rval == val + 1
72-
77+
assert cb[0] == 1 and cb[1] == 1 and cb[2] == 1
78+
assert rval == val + 1 + 1
7379

7480

7581
# ITERATOR READER
@@ -83,5 +89,10 @@ def pre_read(count):
8389
# doesn't work if item is not an iterator
8490
self.failUnlessRaises(ValueError, IteratorReader, list())
8591

92+
93+
# test general read-iteration - its supported by all readers
94+
reader = IteratorReader(iter(range(10)))
95+
assert len(list(reader)) == 10
96+
8697
# NOTE: its thread-safety is tested by the pool
8798

test/test_pool.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -413,6 +413,12 @@ def test_base(self):
413413
urc2 = p.add_task(t2)
414414
assert p.num_tasks() == 2
415415

416+
# test pool reader
417+
assert urc1.pool_ref()() is p
418+
assert urc1.task_ref()() is t1
419+
assert urc1.pool() == p
420+
assert urc1.task() == t1
421+
416422
## SINGLE TASK #################
417423
self._assert_single_task(p, False)
418424
assert p.num_tasks() == 2

0 commit comments

Comments
 (0)