Skip to content

Commit a1e8044

Browse files
committed
initial version of new odb design to facilitate a channel based multi-threading implementation of all odb functions
1 parent b01ca6a commit a1e8044

File tree

9 files changed

+476
-273
lines changed

9 files changed

+476
-273
lines changed

lib/git/objects/base.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -76,10 +76,10 @@ def _set_cache_(self, attr):
7676
Retrieve object information
7777
"""
7878
if attr == "size":
79-
typename, self.size = self.repo.odb.object_info(self.sha)
79+
typename, self.size = self.repo.odb.info(self.sha)
8080
assert typename == self.type, _assertion_msg_format % (self.sha, typename, self.type)
8181
elif attr == "data":
82-
typename, self.size, stream = self.repo.odb.object(self.sha)
82+
typename, self.size, stream = self.repo.odb.stream(self.sha)
8383
self.data = stream.read() # once we have an own odb, we can delay reading
8484
assert typename == self.type, _assertion_msg_format % (self.sha, typename, self.type)
8585
else:
@@ -124,14 +124,14 @@ def __repr__(self):
124124
def data_stream(self):
125125
""" :return: File Object compatible stream to the uncompressed raw data of the object
126126
:note: returned streams must be read in order"""
127-
type, size, stream = self.repo.odb.object(self.sha)
127+
type, size, stream = self.repo.odb.stream(self.sha)
128128
return stream
129129

130130
def stream_data(self, ostream):
131131
"""Writes our data directly to the given output stream
132132
:param ostream: File object compatible stream object.
133133
:return: self"""
134-
type, size, istream = self.repo.odb.object(self.sha)
134+
type, size, istream = self.repo.odb.stream(self.sha)
135135
stream_copy(istream, ostream)
136136
return self
137137

lib/git/objects/commit.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -346,7 +346,7 @@ def create_from_tree(cls, repo, tree, message, parent_commits=None, head=False):
346346
streamlen = stream.tell()
347347
stream.seek(0)
348348

349-
new_commit.sha = repo.odb.to_object(cls.type, streamlen, stream, sha_as_hex=True)
349+
new_commit.sha = repo.odb.store(cls.type, streamlen, stream, sha_as_hex=True)
350350

351351
if head:
352352
try:

lib/git/odb/db.py

Lines changed: 72 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,12 @@
66
BadObjectType
77
)
88

9-
from utils import (
9+
from stream import (
1010
DecompressMemMapReader,
11-
FDCompressedSha1Writer,
11+
FDCompressedSha1Writer
12+
)
13+
14+
from utils import (
1215
ENOENT,
1316
to_hex_sha,
1417
exists,
@@ -31,7 +34,7 @@
3134
import os
3235

3336

34-
class iObjectDBR(object):
37+
class ObjectDBR(object):
3538
"""Defines an interface for object database lookup.
3639
Objects are identified either by hex-sha (40 bytes) or
3740
by sha (20 bytes)"""
@@ -48,62 +51,87 @@ def has_object(self, sha):
4851
:raise BadObject:"""
4952
raise NotImplementedError("To be implemented in subclass")
5053

51-
def object(self, sha):
52-
"""
53-
:return: tuple(type_string, size_in_bytes, stream) a tuple with object
54-
information including its type, its size as well as a stream from which its
55-
contents can be read
54+
def info(self, sha):
55+
""" :return: ODB_Info instance
5656
:param sha: 40 bytes hexsha or 20 bytes binary sha
5757
:raise BadObject:"""
5858
raise NotImplementedError("To be implemented in subclass")
5959

60-
def object_info(self, sha):
61-
"""
62-
:return: tuple(type_string, size_in_bytes) tuple with the object's type
63-
string as well as its size in bytes
60+
def info_async(self, input_channel):
61+
"""Retrieve information of a multitude of objects asynchronously
62+
:param input_channel: Channel yielding the sha's of the objects of interest
63+
:return: Channel yielding ODB_Info|InvalidODB_Info, in any order"""
64+
raise NotImplementedError("To be implemented in subclass")
65+
66+
def stream(self, sha):
67+
""":return: ODB_OStream instance
6468
:param sha: 40 bytes hexsha or 20 bytes binary sha
6569
:raise BadObject:"""
6670
raise NotImplementedError("To be implemented in subclass")
71+
72+
def stream_async(self, input_channel):
73+
"""Retrieve the ODB_OStream of multiple objects
74+
:param input_channel: see ``info``
75+
:param max_threads: see ``ObjectDBW.store``
76+
:return: Channel yielding ODB_OStream|InvalidODB_OStream instances in any order"""
77+
raise NotImplementedError("To be implemented in subclass")
6778

6879
#} END query interface
6980

70-
class iObjectDBW(object):
81+
class ObjectDBW(object):
7182
"""Defines an interface to create objects in the database"""
72-
__slots__ = tuple()
83+
__slots__ = "_ostream"
84+
85+
def __init__(self, *args, **kwargs):
86+
self._ostream = None
7387

7488
#{ Edit Interface
89+
def set_ostream(self, stream):
90+
"""Adjusts the stream to which all data should be sent when storing new objects
91+
:param stream: if not None, the stream to use, if None the default stream
92+
will be used.
93+
:return: previously installed stream, or None if there was no override
94+
:raise TypeError: if the stream doesn't have the supported functionality"""
95+
cstream = self._ostream
96+
self._ostream = stream
97+
return cstream
98+
99+
def ostream(self):
100+
""":return: overridden output stream this instance will write to, or None
101+
if it will write to the default stream"""
102+
return self._ostream
75103

76-
def to_object(self, type, size, stream, dry_run=False, sha_as_hex=True):
104+
def store(self, istream):
77105
"""Create a new object in the database
78-
:return: the sha identifying the object in the database
79-
:param type: type string identifying the object
80-
:param size: size of the data to read from stream
81-
:param stream: stream providing the data
82-
:param dry_run: if True, the object database will not actually be changed
83-
:param sha_as_hex: if True, the returned sha identifying the object will be
84-
hex encoded, not binary
106+
:return: the input istream object with its sha set to its corresponding value
107+
:param istream: ODB_IStream compatible instance. If its sha is already set
108+
to a value, the object will just be stored in the our database format,
109+
in which case the input stream is expected to be in object format ( header + contents ).
85110
:raise IOError: if data could not be written"""
86111
raise NotImplementedError("To be implemented in subclass")
87112

88-
def to_objects(self, iter_info, dry_run=False, sha_as_hex=True, max_threads=0):
89-
"""Create multiple new objects in the database
90-
:return: sequence of shas identifying the created objects in the order in which
91-
they where given.
92-
:param iter_info: iterable yielding tuples containing the type_string
93-
size_in_bytes and the steam with the content data.
94-
:param dry_run: see ``to_object``
95-
:param sha_as_hex: see ``to_object``
96-
:param max_threads: if < 1, any number of threads may be started while processing
97-
the request, otherwise the given number of threads will be started.
98-
:raise IOError: if data could not be written"""
113+
def store_async(self, input_channel):
114+
"""Create multiple new objects in the database asynchronously. The method will
115+
return right away, returning an output channel which receives the results as
116+
they are computed.
117+
118+
:return: Channel yielding your ODB_IStream which served as input, in any order.
119+
The IStreams sha will be set to the sha it received during the process,
120+
or its error attribute will be set to the exception informing about the error.
121+
:param input_channel: Channel yielding ODB_IStream instance.
122+
As the same instances will be used in the output channel, you can create a map
123+
between the id(istream) -> istream
124+
:note:As some ODB implementations implement this operation as atomic, they might
125+
abort the whole operation if one item could not be processed. Hence check how
126+
many items have actually been produced."""
99127
# a trivial implementation, ignoring the threads for now
100128
# TODO: add configuration to the class to determine whether we may
101129
# actually use multiple threads, default False of course. If the add
102130
shas = list()
103131
for args in iter_info:
104-
shas.append(self.to_object(dry_run=dry_run, sha_as_hex=sha_as_hex, *args))
132+
shas.append(self.store(dry_run=dry_run, sha_as_hex=sha_as_hex, *args))
105133
return shas
106-
134+
107135
#} END edit interface
108136

109137

@@ -118,6 +146,7 @@ def __init__(self, root_path):
118146
:raise InvalidDBRoot:
119147
:note: The base will perform basic checking for accessability, but the subclass
120148
is required to verify that the root_path contains the database structure it needs"""
149+
super(FileDBBase, self).__init__()
121150
if not os.path.isdir(root_path):
122151
raise InvalidDBRoot(root_path)
123152
self._root_path = root_path
@@ -141,7 +170,7 @@ def db_path(self, rela_path):
141170
#} END utilities
142171

143172

144-
class LooseObjectDB(FileDBBase, iObjectDBR, iObjectDBW):
173+
class LooseObjectDB(FileDBBase, ObjectDBR, ObjectDBW):
145174
"""A database which operates on loose object files"""
146175
__slots__ = ('_hexsha_to_file', '_fd_open_flags')
147176
# CONFIGURATION
@@ -210,7 +239,7 @@ def _map_loose_object(self, sha):
210239
os.close(fd)
211240
# END assure file is closed
212241

213-
def object_info(self, sha):
242+
def info(self, sha):
214243
m = self._map_loose_object(sha)
215244
try:
216245
return loose_object_header_info(m)
@@ -233,8 +262,9 @@ def has_object(self, sha):
233262
return False
234263
# END check existance
235264

236-
def to_object(self, type, size, stream, dry_run=False, sha_as_hex=True):
265+
def store(self, istream):
237266
# open a tmp file to write the data to
267+
# todo: implement ostream properly
238268
fd, tmp_path = tempfile.mkstemp(prefix='obj', dir=self._root_path)
239269
writer = FDCompressedSha1Writer(fd)
240270

@@ -269,19 +299,19 @@ def to_object(self, type, size, stream, dry_run=False, sha_as_hex=True):
269299
return sha
270300

271301

272-
class PackedDB(FileDBBase, iObjectDBR):
302+
class PackedDB(FileDBBase, ObjectDBR):
273303
"""A database operating on a set of object packs"""
274304

275305

276-
class CompoundDB(iObjectDBR):
306+
class CompoundDB(ObjectDBR):
277307
"""A database which delegates calls to sub-databases"""
278308

279309

280310
class ReferenceDB(CompoundDB):
281311
"""A database consisting of database referred to in a file"""
282312

283313

284-
#class GitObjectDB(CompoundDB, iObjectDBW):
314+
#class GitObjectDB(CompoundDB, ObjectDBW):
285315
class GitObjectDB(LooseObjectDB):
286316
"""A database representing the default git object store, which includes loose
287317
objects, pack files and an alternates file
@@ -296,7 +326,7 @@ def __init__(self, root_path, git):
296326
super(GitObjectDB, self).__init__(root_path)
297327
self._git = git
298328

299-
def object_info(self, sha):
329+
def info(self, sha):
300330
discard, type, size = self._git.get_object_header(sha)
301331
return type, size
302332

0 commit comments

Comments
 (0)