|
9 | 9 | from unittest import TestCase
|
10 | 10 | import tempfile
|
11 | 11 | import shutil
|
| 12 | +import random |
| 13 | +from array import array |
12 | 14 | import cStringIO
|
13 | 15 |
|
14 | 16 | GIT_REPO = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
|
15 | 17 |
|
| 18 | +#{ Routines |
| 19 | + |
16 | 20 | def fixture_path(name):
|
17 |
| - test_dir = os.path.dirname(os.path.dirname(__file__)) |
18 |
| - return os.path.join(test_dir, "fixtures", name) |
| 21 | + test_dir = os.path.dirname(os.path.dirname(__file__)) |
| 22 | + return os.path.join(test_dir, "fixtures", name) |
19 | 23 |
|
20 | 24 | def fixture(name):
|
21 |
| - return open(fixture_path(name), 'rb').read() |
| 25 | + return open(fixture_path(name), 'rb').read() |
22 | 26 |
|
23 | 27 | def absolute_project_path():
|
24 |
| - return os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..")) |
25 |
| - |
26 |
| - |
| 28 | + return os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..")) |
| 29 | + |
| 30 | +def make_bytes(size_in_bytes, randomize=False): |
| 31 | + """:return: string with given size in bytes |
| 32 | + :param randomize: try to produce a very random stream""" |
| 33 | + actual_size = size_in_bytes / 4 |
| 34 | + producer = xrange(actual_size) |
| 35 | + if randomize: |
| 36 | + producer = list(producer) |
| 37 | + random.shuffle(producer) |
| 38 | + # END randomize |
| 39 | + a = array('i', producer) |
| 40 | + return a.tostring() |
| 41 | + |
| 42 | + |
| 43 | +def make_object(type, data): |
| 44 | + """:return: bytes resembling an uncompressed object""" |
| 45 | + odata = "blob %i\0" % len(data) |
| 46 | + return odata + data |
| 47 | + |
| 48 | +#} END routines |
| 49 | + |
| 50 | +#{ Adapters |
| 51 | + |
27 | 52 | class StringProcessAdapter(object):
|
28 |
| - """Allows to use strings as Process object as returned by SubProcess.Popen. |
29 |
| - Its tailored to work with the test system only""" |
30 |
| - |
31 |
| - def __init__(self, input_string): |
32 |
| - self.stdout = cStringIO.StringIO(input_string) |
33 |
| - self.stderr = cStringIO.StringIO() |
34 |
| - |
35 |
| - def wait(self): |
36 |
| - return 0 |
37 |
| - |
38 |
| - poll = wait |
39 |
| - |
| 53 | + """Allows to use strings as Process object as returned by SubProcess.Popen. |
| 54 | + Its tailored to work with the test system only""" |
| 55 | + |
| 56 | + def __init__(self, input_string): |
| 57 | + self.stdout = cStringIO.StringIO(input_string) |
| 58 | + self.stderr = cStringIO.StringIO() |
| 59 | + |
| 60 | + def wait(self): |
| 61 | + return 0 |
| 62 | + |
| 63 | + poll = wait |
| 64 | + |
| 65 | +#} END adapters |
| 66 | + |
| 67 | +#{ Decorators |
40 | 68 |
|
41 | 69 | def _rmtree_onerror(osremove, fullpath, exec_info):
|
42 |
| - """ |
43 |
| - Handle the case on windows that read-only files cannot be deleted by |
44 |
| - os.remove by setting it to mode 777, then retry deletion. |
45 |
| - """ |
46 |
| - if os.name != 'nt' or osremove is not os.remove: |
47 |
| - raise |
48 |
| - |
49 |
| - os.chmod(fullpath, 0777) |
50 |
| - os.remove(fullpath) |
| 70 | + """ |
| 71 | + Handle the case on windows that read-only files cannot be deleted by |
| 72 | + os.remove by setting it to mode 777, then retry deletion. |
| 73 | + """ |
| 74 | + if os.name != 'nt' or osremove is not os.remove: |
| 75 | + raise |
| 76 | + |
| 77 | + os.chmod(fullpath, 0777) |
| 78 | + os.remove(fullpath) |
51 | 79 |
|
52 | 80 | def with_bare_rw_repo(func):
|
53 |
| - """ |
54 |
| - Decorator providing a specially made read-write repository to the test case |
55 |
| - decorated with it. The test case requires the following signature:: |
56 |
| - def case(self, rw_repo) |
57 |
| - |
58 |
| - The rwrepo will be a bare clone or the types rorepo. Once the method finishes, |
59 |
| - it will be removed completely. |
60 |
| - |
61 |
| - Use this if you want to make purely index based adjustments, change refs, create |
62 |
| - heads, generally operations that do not need a working tree. |
63 |
| - """ |
64 |
| - def bare_repo_creator(self): |
65 |
| - repo_dir = tempfile.mktemp("bare_repo") |
66 |
| - rw_repo = self.rorepo.clone(repo_dir, shared=True, bare=True) |
67 |
| - prev_cwd = os.getcwd() |
68 |
| - try: |
69 |
| - return func(self, rw_repo) |
70 |
| - finally: |
71 |
| - rw_repo.git.clear_cache() |
72 |
| - shutil.rmtree(repo_dir, onerror=_rmtree_onerror) |
73 |
| - # END cleanup |
74 |
| - # END bare repo creator |
75 |
| - bare_repo_creator.__name__ = func.__name__ |
76 |
| - return bare_repo_creator |
77 |
| - |
| 81 | + """ |
| 82 | + Decorator providing a specially made read-write repository to the test case |
| 83 | + decorated with it. The test case requires the following signature:: |
| 84 | + def case(self, rw_repo) |
| 85 | + |
| 86 | + The rwrepo will be a bare clone or the types rorepo. Once the method finishes, |
| 87 | + it will be removed completely. |
| 88 | + |
| 89 | + Use this if you want to make purely index based adjustments, change refs, create |
| 90 | + heads, generally operations that do not need a working tree. |
| 91 | + """ |
| 92 | + def bare_repo_creator(self): |
| 93 | + repo_dir = tempfile.mktemp("bare_repo") |
| 94 | + rw_repo = self.rorepo.clone(repo_dir, shared=True, bare=True) |
| 95 | + prev_cwd = os.getcwd() |
| 96 | + try: |
| 97 | + return func(self, rw_repo) |
| 98 | + finally: |
| 99 | + rw_repo.git.clear_cache() |
| 100 | + shutil.rmtree(repo_dir, onerror=_rmtree_onerror) |
| 101 | + # END cleanup |
| 102 | + # END bare repo creator |
| 103 | + bare_repo_creator.__name__ = func.__name__ |
| 104 | + return bare_repo_creator |
| 105 | + |
78 | 106 | def with_rw_repo(working_tree_ref):
|
79 |
| - """ |
80 |
| - Same as with_bare_repo, but clones the rorepo as non-bare repository, checking |
81 |
| - out the working tree at the given working_tree_ref. |
82 |
| - |
83 |
| - This repository type is more costly due to the working copy checkout. |
84 |
| - |
85 |
| - To make working with relative paths easier, the cwd will be set to the working |
86 |
| - dir of the repository. |
87 |
| - """ |
88 |
| - assert isinstance(working_tree_ref, basestring), "Decorator requires ref name for working tree checkout" |
89 |
| - def argument_passer(func): |
90 |
| - def repo_creator(self): |
91 |
| - repo_dir = tempfile.mktemp("non_bare_repo") |
92 |
| - rw_repo = self.rorepo.clone(repo_dir, shared=True, bare=False, n=True) |
93 |
| - |
94 |
| - rw_repo.head.commit = working_tree_ref |
95 |
| - rw_repo.head.reference.checkout() |
96 |
| - |
97 |
| - prev_cwd = os.getcwd() |
98 |
| - os.chdir(rw_repo.working_dir) |
99 |
| - try: |
100 |
| - return func(self, rw_repo) |
101 |
| - finally: |
102 |
| - os.chdir(prev_cwd) |
103 |
| - rw_repo.git.clear_cache() |
104 |
| - shutil.rmtree(repo_dir, onerror=_rmtree_onerror) |
105 |
| - # END cleanup |
106 |
| - # END rw repo creator |
107 |
| - repo_creator.__name__ = func.__name__ |
108 |
| - return repo_creator |
109 |
| - # END argument passer |
110 |
| - return argument_passer |
111 |
| - |
| 107 | + """ |
| 108 | + Same as with_bare_repo, but clones the rorepo as non-bare repository, checking |
| 109 | + out the working tree at the given working_tree_ref. |
| 110 | + |
| 111 | + This repository type is more costly due to the working copy checkout. |
| 112 | + |
| 113 | + To make working with relative paths easier, the cwd will be set to the working |
| 114 | + dir of the repository. |
| 115 | + """ |
| 116 | + assert isinstance(working_tree_ref, basestring), "Decorator requires ref name for working tree checkout" |
| 117 | + def argument_passer(func): |
| 118 | + def repo_creator(self): |
| 119 | + repo_dir = tempfile.mktemp("non_bare_repo") |
| 120 | + rw_repo = self.rorepo.clone(repo_dir, shared=True, bare=False, n=True) |
| 121 | + |
| 122 | + rw_repo.head.commit = working_tree_ref |
| 123 | + rw_repo.head.reference.checkout() |
| 124 | + |
| 125 | + prev_cwd = os.getcwd() |
| 126 | + os.chdir(rw_repo.working_dir) |
| 127 | + try: |
| 128 | + return func(self, rw_repo) |
| 129 | + finally: |
| 130 | + os.chdir(prev_cwd) |
| 131 | + rw_repo.git.clear_cache() |
| 132 | + shutil.rmtree(repo_dir, onerror=_rmtree_onerror) |
| 133 | + # END cleanup |
| 134 | + # END rw repo creator |
| 135 | + repo_creator.__name__ = func.__name__ |
| 136 | + return repo_creator |
| 137 | + # END argument passer |
| 138 | + return argument_passer |
| 139 | + |
112 | 140 | def with_rw_and_rw_remote_repo(working_tree_ref):
|
113 |
| - """ |
114 |
| - Same as with_rw_repo, but also provides a writable remote repository from which the |
115 |
| - rw_repo has been forked as well as a handle for a git-daemon that may be started to |
116 |
| - run the remote_repo. |
117 |
| - The remote repository was cloned as bare repository from the rorepo, wheras |
118 |
| - the rw repo has a working tree and was cloned from the remote repository. |
119 |
| - |
120 |
| - remote_repo has two remotes: origin and daemon_origin. One uses a local url, |
121 |
| - the other uses a server url. The daemon setup must be done on system level |
122 |
| - and should be an inetd service that serves tempdir.gettempdir() and all |
123 |
| - directories in it. |
124 |
| - |
125 |
| - The following scetch demonstrates this:: |
126 |
| - rorepo ---<bare clone>---> rw_remote_repo ---<clone>---> rw_repo |
127 |
| - |
128 |
| - The test case needs to support the following signature:: |
129 |
| - def case(self, rw_repo, rw_remote_repo) |
130 |
| - |
131 |
| - This setup allows you to test push and pull scenarios and hooks nicely. |
132 |
| - |
133 |
| - See working dir info in with_rw_repo |
134 |
| - """ |
135 |
| - assert isinstance(working_tree_ref, basestring), "Decorator requires ref name for working tree checkout" |
136 |
| - def argument_passer(func): |
137 |
| - def remote_repo_creator(self): |
138 |
| - remote_repo_dir = tempfile.mktemp("remote_repo") |
139 |
| - repo_dir = tempfile.mktemp("remote_clone_non_bare_repo") |
140 |
| - |
141 |
| - rw_remote_repo = self.rorepo.clone(remote_repo_dir, shared=True, bare=True) |
142 |
| - rw_repo = rw_remote_repo.clone(repo_dir, shared=True, bare=False, n=True) # recursive alternates info ? |
143 |
| - rw_repo.head.commit = working_tree_ref |
144 |
| - rw_repo.head.reference.checkout() |
145 |
| - |
146 |
| - # prepare for git-daemon |
147 |
| - rw_remote_repo.daemon_export = True |
148 |
| - |
149 |
| - # this thing is just annoying ! |
150 |
| - crw = rw_remote_repo.config_writer() |
151 |
| - section = "daemon" |
152 |
| - try: |
153 |
| - crw.add_section(section) |
154 |
| - except Exception: |
155 |
| - pass |
156 |
| - crw.set(section, "receivepack", True) |
157 |
| - # release lock |
158 |
| - del(crw) |
159 |
| - |
160 |
| - # initialize the remote - first do it as local remote and pull, then |
161 |
| - # we change the url to point to the daemon. The daemon should be started |
162 |
| - # by the user, not by us |
163 |
| - d_remote = Remote.create(rw_repo, "daemon_origin", remote_repo_dir) |
164 |
| - d_remote.fetch() |
165 |
| - remote_repo_url = "git://localhost%s" % remote_repo_dir |
166 |
| - d_remote.config_writer.set('url', remote_repo_url) |
167 |
| - |
168 |
| - # try to list remotes to diagnoes whether the server is up |
169 |
| - try: |
170 |
| - rw_repo.git.ls_remote(d_remote) |
171 |
| - except GitCommandError,e: |
172 |
| - print str(e) |
173 |
| - if os.name == 'nt': |
174 |
| - raise AssertionError('git-daemon needs to run this test, but windows does not have one. Otherwise, run: git-daemon "%s"'%tempfile.gettempdir()) |
175 |
| - else: |
176 |
| - raise AssertionError('Please start a git-daemon to run this test, execute: git-daemon "%s"'%tempfile.gettempdir()) |
177 |
| - |
178 |
| - # adjust working dir |
179 |
| - prev_cwd = os.getcwd() |
180 |
| - os.chdir(rw_repo.working_dir) |
181 |
| - try: |
182 |
| - return func(self, rw_repo, rw_remote_repo) |
183 |
| - finally: |
184 |
| - os.chdir(prev_cwd) |
185 |
| - rw_repo.git.clear_cache() |
186 |
| - rw_remote_repo.git.clear_cache() |
187 |
| - shutil.rmtree(repo_dir, onerror=_rmtree_onerror) |
188 |
| - shutil.rmtree(remote_repo_dir, onerror=_rmtree_onerror) |
189 |
| - # END cleanup |
190 |
| - # END bare repo creator |
191 |
| - remote_repo_creator.__name__ = func.__name__ |
192 |
| - return remote_repo_creator |
193 |
| - # END remote repo creator |
194 |
| - # END argument parsser |
195 |
| - |
196 |
| - return argument_passer |
197 |
| - |
198 |
| - |
| 141 | + """ |
| 142 | + Same as with_rw_repo, but also provides a writable remote repository from which the |
| 143 | + rw_repo has been forked as well as a handle for a git-daemon that may be started to |
| 144 | + run the remote_repo. |
| 145 | + The remote repository was cloned as bare repository from the rorepo, wheras |
| 146 | + the rw repo has a working tree and was cloned from the remote repository. |
| 147 | + |
| 148 | + remote_repo has two remotes: origin and daemon_origin. One uses a local url, |
| 149 | + the other uses a server url. The daemon setup must be done on system level |
| 150 | + and should be an inetd service that serves tempdir.gettempdir() and all |
| 151 | + directories in it. |
| 152 | + |
| 153 | + The following scetch demonstrates this:: |
| 154 | + rorepo ---<bare clone>---> rw_remote_repo ---<clone>---> rw_repo |
| 155 | + |
| 156 | + The test case needs to support the following signature:: |
| 157 | + def case(self, rw_repo, rw_remote_repo) |
| 158 | + |
| 159 | + This setup allows you to test push and pull scenarios and hooks nicely. |
| 160 | + |
| 161 | + See working dir info in with_rw_repo |
| 162 | + """ |
| 163 | + assert isinstance(working_tree_ref, basestring), "Decorator requires ref name for working tree checkout" |
| 164 | + def argument_passer(func): |
| 165 | + def remote_repo_creator(self): |
| 166 | + remote_repo_dir = tempfile.mktemp("remote_repo") |
| 167 | + repo_dir = tempfile.mktemp("remote_clone_non_bare_repo") |
| 168 | + |
| 169 | + rw_remote_repo = self.rorepo.clone(remote_repo_dir, shared=True, bare=True) |
| 170 | + rw_repo = rw_remote_repo.clone(repo_dir, shared=True, bare=False, n=True) # recursive alternates info ? |
| 171 | + rw_repo.head.commit = working_tree_ref |
| 172 | + rw_repo.head.reference.checkout() |
| 173 | + |
| 174 | + # prepare for git-daemon |
| 175 | + rw_remote_repo.daemon_export = True |
| 176 | + |
| 177 | + # this thing is just annoying ! |
| 178 | + crw = rw_remote_repo.config_writer() |
| 179 | + section = "daemon" |
| 180 | + try: |
| 181 | + crw.add_section(section) |
| 182 | + except Exception: |
| 183 | + pass |
| 184 | + crw.set(section, "receivepack", True) |
| 185 | + # release lock |
| 186 | + del(crw) |
| 187 | + |
| 188 | + # initialize the remote - first do it as local remote and pull, then |
| 189 | + # we change the url to point to the daemon. The daemon should be started |
| 190 | + # by the user, not by us |
| 191 | + d_remote = Remote.create(rw_repo, "daemon_origin", remote_repo_dir) |
| 192 | + d_remote.fetch() |
| 193 | + remote_repo_url = "git://localhost%s" % remote_repo_dir |
| 194 | + d_remote.config_writer.set('url', remote_repo_url) |
| 195 | + |
| 196 | + # try to list remotes to diagnoes whether the server is up |
| 197 | + try: |
| 198 | + rw_repo.git.ls_remote(d_remote) |
| 199 | + except GitCommandError,e: |
| 200 | + print str(e) |
| 201 | + if os.name == 'nt': |
| 202 | + raise AssertionError('git-daemon needs to run this test, but windows does not have one. Otherwise, run: git-daemon "%s"'%tempfile.gettempdir()) |
| 203 | + else: |
| 204 | + raise AssertionError('Please start a git-daemon to run this test, execute: git-daemon "%s"'%tempfile.gettempdir()) |
| 205 | + |
| 206 | + # adjust working dir |
| 207 | + prev_cwd = os.getcwd() |
| 208 | + os.chdir(rw_repo.working_dir) |
| 209 | + try: |
| 210 | + return func(self, rw_repo, rw_remote_repo) |
| 211 | + finally: |
| 212 | + os.chdir(prev_cwd) |
| 213 | + rw_repo.git.clear_cache() |
| 214 | + rw_remote_repo.git.clear_cache() |
| 215 | + shutil.rmtree(repo_dir, onerror=_rmtree_onerror) |
| 216 | + shutil.rmtree(remote_repo_dir, onerror=_rmtree_onerror) |
| 217 | + # END cleanup |
| 218 | + # END bare repo creator |
| 219 | + remote_repo_creator.__name__ = func.__name__ |
| 220 | + return remote_repo_creator |
| 221 | + # END remote repo creator |
| 222 | + # END argument parsser |
| 223 | + |
| 224 | + return argument_passer |
| 225 | + |
| 226 | +#} END decorators |
| 227 | + |
199 | 228 | class TestBase(TestCase):
|
200 |
| - """ |
201 |
| - Base Class providing default functionality to all tests such as: |
202 |
| - |
203 |
| - - Utility functions provided by the TestCase base of the unittest method such as:: |
204 |
| - self.fail("todo") |
205 |
| - self.failUnlessRaises(...) |
206 |
| - |
207 |
| - - Class level repository which is considered read-only as it is shared among |
208 |
| - all test cases in your type. |
209 |
| - Access it using:: |
210 |
| - self.rorepo # 'ro' stands for read-only |
211 |
| - |
212 |
| - The rorepo is in fact your current project's git repo. If you refer to specific |
213 |
| - shas for your objects, be sure you choose some that are part of the immutable portion |
214 |
| - of the project history ( to assure tests don't fail for others ). |
215 |
| - """ |
216 |
| - |
217 |
| - @classmethod |
218 |
| - def setUpAll(cls): |
219 |
| - """ |
220 |
| - Dynamically add a read-only repository to our actual type. This way |
221 |
| - each test type has its own repository |
222 |
| - """ |
223 |
| - cls.rorepo = Repo(GIT_REPO) |
224 |
| - |
225 |
| - def _make_file(self, rela_path, data, repo=None): |
226 |
| - """ |
227 |
| - Create a file at the given path relative to our repository, filled |
228 |
| - with the given data. Returns absolute path to created file. |
229 |
| - """ |
230 |
| - repo = repo or self.rorepo |
231 |
| - abs_path = os.path.join(repo.working_tree_dir, rela_path) |
232 |
| - fp = open(abs_path, "w") |
233 |
| - fp.write(data) |
234 |
| - fp.close() |
235 |
| - return abs_path |
| 229 | + """ |
| 230 | + Base Class providing default functionality to all tests such as: |
| 231 | + |
| 232 | + - Utility functions provided by the TestCase base of the unittest method such as:: |
| 233 | + self.fail("todo") |
| 234 | + self.failUnlessRaises(...) |
| 235 | + |
| 236 | + - Class level repository which is considered read-only as it is shared among |
| 237 | + all test cases in your type. |
| 238 | + Access it using:: |
| 239 | + self.rorepo # 'ro' stands for read-only |
| 240 | + |
| 241 | + The rorepo is in fact your current project's git repo. If you refer to specific |
| 242 | + shas for your objects, be sure you choose some that are part of the immutable portion |
| 243 | + of the project history ( to assure tests don't fail for others ). |
| 244 | + """ |
| 245 | + |
| 246 | + @classmethod |
| 247 | + def setUpAll(cls): |
| 248 | + """ |
| 249 | + Dynamically add a read-only repository to our actual type. This way |
| 250 | + each test type has its own repository |
| 251 | + """ |
| 252 | + cls.rorepo = Repo(GIT_REPO) |
| 253 | + |
| 254 | + def _make_file(self, rela_path, data, repo=None): |
| 255 | + """ |
| 256 | + Create a file at the given path relative to our repository, filled |
| 257 | + with the given data. Returns absolute path to created file. |
| 258 | + """ |
| 259 | + repo = repo or self.rorepo |
| 260 | + abs_path = os.path.join(repo.working_tree_dir, rela_path) |
| 261 | + fp = open(abs_path, "w") |
| 262 | + fp.write(data) |
| 263 | + fp.close() |
| 264 | + return abs_path |
0 commit comments