Skip to content
This repository was archived by the owner on Apr 14, 2024. It is now read-only.

Commit fe98426

Browse files
committed
Added brief usage guide and test for the given examples
1 parent 4c6c53b commit fe98426

File tree

7 files changed

+148
-14
lines changed

7 files changed

+148
-14
lines changed

channel.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616

1717
__all__ = ('Channel', 'SerialChannel', 'Writer', 'ChannelWriter', 'CallbackChannelWriter',
1818
'Reader', 'ChannelReader', 'CallbackChannelReader', 'mkchannel', 'ReadOnly',
19-
'IteratorReader')
19+
'IteratorReader', 'CallbackReaderMixin', 'CallbackWriterMixin')
2020

2121
#{ Classes
2222
class Channel(object):

doc/source/intro.rst

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,20 +2,22 @@
22
Overview
33
########
44

5-
The *GitDB* project implements interfaces to allow read and write access to git repositories. In its core lies the *db* package, which contains all database types necessary to read a complete git repository. These are the ``LooseObjectDB``, the ``PackedDB`` and the ``ReferenceDB`` which are combined into the ``GitDB`` to combine every aspect of the git database.
5+
*Async* is one more attempt to make the definition and execution of asynchronous interdependent operations easy. For that to work, you may define tasks which communicate with each other by channels. Channels transfer items, which is very similar to bytes flowing through pipes uses in inter-process communication. Items will only be generated on demand, that is when you read from the respective output channel.
66

7-
For this to work, GitDB implements pack reading, as well as loose object reading and writing. Data is always encapsulated in streams, which allows huge files to be handled as well as small ones, usually only chunks of the stream are kept in memory for processing, never the whole stream at once.
7+
As it turned out, the GIL is far more restricting than initially thought, which effectively means true concurrency can only be obtained during input output to files and sockets, as well as specifically written versions of existing c python extensions which release the GIL before lengthy operations. Many of the currently available c extensions, such as zlib, lock everything down to just one thread at a time, even though this isn't a strict technical requirement.
88

9-
Interfaces are used to describe the API, making it easy to provide alternate implementations.
9+
If you want to make good use of *async*, you will have to carefully plan the operation, and you might end up writing a new or altering existing c-extensions for this.
10+
11+
If you have 10 minutes, watch a more graphical presentation `on youtube <http://www.youtube.com/watch?v=wy1yB1M-dcQ>`_.
1012

1113
================
12-
Installing GitDB
14+
Installing Async
1315
================
14-
Its easiest to install gitdb using the *easy_install* program, which is part of the `setuptools`_::
16+
Its easiest to install async using the *easy_install* program, which is part of the `setuptools`_::
1517
16-
$ easy_install gitdb
18+
$ easy_install async
1719
18-
As the command will install gitdb in your respective python distribution, you will most likely need root permissions to authorize the required changes.
20+
As the command will install async in your respective python distribution, you will most likely need root permissions to authorize the required changes.
1921

2022
If you have downloaded the source archive, the package can be installed by running the ``setup.py`` script::
2123
@@ -24,6 +26,6 @@ If you have downloaded the source archive, the package can be installed by runni
2426
===============
2527
Getting Started
2628
===============
27-
It is advised to have a look at the :ref:`Usage Guide <tutorial-label>` for a brief introduction on the different database implementations.
29+
It is advised to have a look at the :ref:`Usage Guide <tutorial-label>` for a brief introduction.
2830

2931
.. _setuptools: http://peak.telecommunity.com/DevCenter/setuptools

doc/source/usage.rst

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
.. _tutorial-label:
2+
3+
###########
4+
Usage Guide
5+
###########
6+
7+
******
8+
Design
9+
******
10+
The central instance within *async* is the **Pool**. A pool keeps a set of 0 or more workers which can run asynchronoously and process **Task**\ s. Tasks are added to the pool using the ``add_task`` function. Once added, the caller receives a **ChannelReader** instance which connects to a channel. Calling ``read`` on the instance will trigger the actual computation. A ChannelReader can serve as input for another task as well, which once added to the Pool, indicates a dependency between these tasks. To obtain one item from task 2, one item needs to be produced by task 1 beforehand - the pool takes care of the dependency handling as well as scheduling.
11+
12+
Task instances allow to define the minimum amount of items to be processed on each request, and the maximum amount of items per batch. This chunking behaviour allows you to have fine-grained control about the memory requirements as well as the actuall achieved concurrency for your chain of tasks.
13+
14+
Task chunks are the units actually being processed by the workers, the pool assures these are processed in the right order. Chunks help to bridge the gap between slowly items that take a long time to process, and those which are quickly generated. Generally, slow tasks should have small chunks, otherwise some of the workers might just end up waiting for input while slowy processing items of a big chunk take place in another worker.
15+
16+
**************
17+
The ThreadPool
18+
**************
19+
A thread pool is a pool implementation which uses threads as workers. ``ChannelReader``\ s are blocking channels which are used as a means of communication between tasks which are currently being processed.
20+
21+
The ``set_size`` method is essential, as it determines the amount of workers in the pool. It defaults to 0 for newly created pools, which is equal to a fully synchonized mode of operation - all processing is effectively done by the calling thread::
22+
23+
from async.pool import ThreadPool
24+
25+
p = ThreadPool()
26+
# default size is 0, synchronous mode
27+
assert p.size() == 0
28+
29+
# now tasks would be processed asynchronously
30+
p.set_size(1)
31+
assert p.size() == 1
32+
33+
Currently this is the only implementation, but it was designed with the ``Multiprocessing`` package in mind, which shouldn't make it too hard to implement that in future releases.
34+
35+
*****
36+
Tasks
37+
*****
38+
A task encapsulates properties of a task, and how its items should be processed. The processing is usually performed per item, calling a function with one item, to receive a processed item back which will be written to into the output channel. The reader end of that channel is either held by the client of the items, or by another task which performs additional processing.
39+
40+
In the following example, a simple task is created which takes integers and multiplies them by itself::
41+
42+
from async.task import IteratorThreadTask
43+
44+
# A task performing processing on items from an iterator
45+
t = IteratorThreadTask(iter(range(10)), "power", lambda i: i*i)
46+
reader = p.add_task(t)
47+
48+
# read all items - they where procesed by worker 1
49+
items = reader.read()
50+
assert len(items) == 10 and items[0] == 0 and items[-1] == 81
51+
52+
53+
*****************************
54+
Channels, Readers and Writers
55+
*****************************
56+
Channels a the means of communication between tasks as well as clients to finally receive the processed itmes. A channel has one or more write ends and and one or more read ends. Readers will block if there are less than the requested amount of items, but will wake up once the missing items where sent through the write end.
57+
58+
A channel's major difference over a queue is its ability to be closed, which will immediately wake up all waiting readers.
59+
60+
Reader Callbacks
61+
================
62+
The reader returned by the Pool's ``add_task`` method is a specialized version of a ``CallbackChannelReader``, which allows to setup functions to be called before and after an item is read. This allows for just-in-time notification of asynchronous events, as well as to apply item transformations.
63+
64+
**************
65+
Chaining Tasks
66+
**************
67+
When using different task types, chains between tasks can be created. These will be understood by the pool, which realizes the implicit task dependency and will schedule the tasks in the right order.
68+
69+
The following example creates two tasks which combine their results. As the pool only has one worker, and as the chunk size is maximized, we can be sure that the items are returned in order in this case::
70+
71+
from async.task import ChannelThreadTask
72+
73+
t = IteratorThreadTask(iter(range(10)), "power", lambda i: i*i)
74+
reader = p.add_task(t)
75+
76+
# chain both by linking their readers
77+
tmult = ChannelThreadTask(reader, "mult", lambda i: i*2)
78+
result_reader = p.add_task(tmult)
79+
80+
# read all
81+
items = result_reader.read()
82+
assert len(items) == 10 and items[0] == 0 and items[-1] == 162
83+
84+

pool.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -106,8 +106,7 @@ def pool(self):
106106

107107
def read(self, count=0, block=True, timeout=None):
108108
"""Read an item that was processed by one of our threads
109-
:note: Triggers task dependency handling needed to provide the necessary
110-
input"""
109+
:note: Triggers task dependency handling needed to provide the necessary input"""
111110
# NOTE: we always queue the operation that would give us count items
112111
# as tracking the scheduled items or testing the channels size
113112
# is in herently unsafe depending on the design of the task network
@@ -389,13 +388,15 @@ def num_tasks(self):
389388
self._taskgraph_lock.release()
390389

391390
def remove_task(self, task, _from_destructor_ = False):
392-
"""Delete the task
391+
"""
392+
Delete the task.
393393
Additionally we will remove orphaned tasks, which can be identified if their
394394
output channel is only held by themselves, so no one will ever consume
395395
its items.
396396
397397
This method blocks until all tasks to be removed have been processed, if
398398
they are currently being processed.
399+
399400
:return: self"""
400401
self._taskgraph_lock.acquire()
401402
try:
@@ -430,6 +431,7 @@ def remove_task(self, task, _from_destructor_ = False):
430431

431432
def add_task(self, task):
432433
"""Add a new task to be processed.
434+
433435
:return: a read channel to retrieve processed items. If that handle is lost,
434436
the task will be considered orphaned and will be deleted on the next
435437
occasion."""

task.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -196,8 +196,10 @@ class IteratorTaskBase(Task):
196196
def __init__(self, iterator, *args, **kwargs):
197197
Task.__init__(self, *args, **kwargs)
198198
self._read = IteratorReader(iterator).read
199+
199200
# defaults to returning our items unchanged
200-
self.fun = lambda item: item
201+
if self.fun is None:
202+
self.fun = lambda item: item
201203

202204

203205
class IteratorThreadTask(IteratorTaskBase, ThreadTaskBase):

test/test_example.py

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
"""Module containing examples from the documentaiton"""
2+
from lib import *
3+
4+
from async.pool import *
5+
from async.task import *
6+
from async.thread import terminate_threads
7+
8+
9+
10+
11+
class TestExamples(TestBase):
12+
13+
@terminate_threads
14+
def test_usage(self):
15+
p = ThreadPool()
16+
# default size is 0, synchronous mode
17+
assert p.size() == 0
18+
19+
# now tasks would be processed asynchronously
20+
p.set_size(1)
21+
assert p.size() == 1
22+
23+
# A task performing processing on items from an iterator
24+
t = IteratorThreadTask(iter(range(10)), "power", lambda i: i*i)
25+
reader = p.add_task(t)
26+
27+
# read all items - they where procesed by worker 1
28+
items = reader.read()
29+
assert len(items) == 10 and items[0] == 0 and items[-1] == 81
30+
31+
32+
# chaining
33+
t = IteratorThreadTask(iter(range(10)), "power", lambda i: i*i)
34+
reader = p.add_task(t)
35+
36+
# chain both by linking their readers
37+
tmult = ChannelThreadTask(reader, "mult", lambda i: i*2)
38+
result_reader = p.add_task(tmult)
39+
40+
# read all
41+
items = result_reader.read()
42+
assert len(items) == 10 and items[0] == 0 and items[-1] == 162
43+
44+

test/test_pool.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
"""Channel testing"""
1+
"""Pool testing"""
22
from lib import *
33
from task import *
44

0 commit comments

Comments
 (0)