Skip to content

Commit bf638fd

Browse files
committed
Merge branch 'smmap'
2 parents da12df9 + f0c05ea commit bf638fd

File tree

6 files changed

+79
-57
lines changed

6 files changed

+79
-57
lines changed

.gitmodules

+4-1
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1-
[submodule "git/ext/async"]
1+
[submodule "async"]
22
path = git/ext/async
33
url = git://github.com/gitpython-developers/async.git
4+
[submodule "smmap"]
5+
path = git/ext/smmap
6+
url = git://github.com/Byron/smmap.git

git/__init__.py

+9-7
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,15 @@
1414
#{ Initialization
1515
def _init_externals():
1616
"""Initialize external projects by putting them into the path"""
17-
sys.path.append(os.path.join(os.path.dirname(__file__), 'ext', 'async'))
18-
19-
try:
20-
import async
21-
except ImportError:
22-
raise ImportError("'async' could not be found in your PYTHONPATH")
23-
#END verify import
17+
ext_base = os.path.join(os.path.dirname(__file__), 'ext')
18+
for package in ('async', 'smmap'):
19+
sys.path.append(os.path.join(ext_base, package))
20+
try:
21+
__import__(package)
22+
except ImportError:
23+
raise ImportError("%r could not be found in your PYTHONPATH" % package)
24+
#END verify import
25+
#END handle external import
2426

2527
#} END initialization
2628

git/ext/smmap

Submodule smmap added at cf297b7

git/pack.py

+47-45
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,10 @@
1010
)
1111
from util import (
1212
zlib,
13+
mman,
1314
LazyMixin,
1415
unpack_from,
1516
bin_to_hex,
16-
file_contents_ro_filepath,
1717
)
1818

1919
from fun import (
@@ -73,7 +73,7 @@
7373

7474
#{ Utilities
7575

76-
def pack_object_at(data, offset, as_stream):
76+
def pack_object_at(cursor, offset, as_stream):
7777
"""
7878
:return: Tuple(abs_data_offset, PackInfo|PackStream)
7979
an object of the correct type according to the type_id of the object.
@@ -83,7 +83,7 @@ def pack_object_at(data, offset, as_stream):
8383
:parma offset: offset in to the data at which the object information is located
8484
:param as_stream: if True, a stream object will be returned that can read
8585
the data, otherwise you receive an info object only"""
86-
data = buffer(data, offset)
86+
data = cursor.use_region(offset).buffer()
8787
type_id, uncomp_size, data_rela_offset = pack_object_header_info(data)
8888
total_rela_offset = None # set later, actual offset until data stream begins
8989
delta_info = None
@@ -247,7 +247,7 @@ class PackIndexFile(LazyMixin):
247247

248248
# Dont use slots as we dynamically bind functions for each version, need a dict for this
249249
# The slots you see here are just to keep track of our instance variables
250-
# __slots__ = ('_indexpath', '_fanout_table', '_data', '_version',
250+
# __slots__ = ('_indexpath', '_fanout_table', '_cursor', '_version',
251251
# '_sha_list_offset', '_crc_list_offset', '_pack_offset', '_pack_64_offset')
252252

253253
# used in v2 indices
@@ -261,22 +261,27 @@ def __init__(self, indexpath):
261261

262262
def _set_cache_(self, attr):
263263
if attr == "_packfile_checksum":
264-
self._packfile_checksum = self._data[-40:-20]
264+
self._packfile_checksum = self._cursor.map()[-40:-20]
265265
elif attr == "_packfile_checksum":
266-
self._packfile_checksum = self._data[-20:]
267-
elif attr == "_data":
266+
self._packfile_checksum = self._cursor.map()[-20:]
267+
elif attr == "_cursor":
268268
# Note: We don't lock the file when reading as we cannot be sure
269269
# that we can actually write to the location - it could be a read-only
270270
# alternate for instance
271-
self._data = file_contents_ro_filepath(self._indexpath)
271+
self._cursor = mman.make_cursor(self._indexpath).use_region()
272+
# We will assume that the index will always fully fit into memory !
273+
if mman.window_size() > 0 and self._cursor.file_size() > mman.window_size():
274+
raise AssertionError("The index file at %s is too large to fit into a mapped window (%i > %i). This is a limitation of the implementation" % (self._indexpath, self._cursor.file_size(), mman.window_size()))
275+
#END assert window size
272276
else:
273277
# now its time to initialize everything - if we are here, someone wants
274278
# to access the fanout table or related properties
275279

276280
# CHECK VERSION
277-
self._version = (self._data[:4] == self.index_v2_signature and 2) or 1
281+
mmap = self._cursor.map()
282+
self._version = (mmap[:4] == self.index_v2_signature and 2) or 1
278283
if self._version == 2:
279-
version_id = unpack_from(">L", self._data, 4)[0]
284+
version_id = unpack_from(">L", mmap, 4)[0]
280285
assert version_id == self._version, "Unsupported index version: %i" % version_id
281286
# END assert version
282287

@@ -297,16 +302,16 @@ def _set_cache_(self, attr):
297302

298303
def _entry_v1(self, i):
299304
""":return: tuple(offset, binsha, 0)"""
300-
return unpack_from(">L20s", self._data, 1024 + i*24) + (0, )
305+
return unpack_from(">L20s", self._cursor.map(), 1024 + i*24) + (0, )
301306

302307
def _offset_v1(self, i):
303308
"""see ``_offset_v2``"""
304-
return unpack_from(">L", self._data, 1024 + i*24)[0]
309+
return unpack_from(">L", self._cursor.map(), 1024 + i*24)[0]
305310

306311
def _sha_v1(self, i):
307312
"""see ``_sha_v2``"""
308313
base = 1024 + (i*24)+4
309-
return self._data[base:base+20]
314+
return self._cursor.map()[base:base+20]
310315

311316
def _crc_v1(self, i):
312317
"""unsupported"""
@@ -322,25 +327,25 @@ def _entry_v2(self, i):
322327
def _offset_v2(self, i):
323328
""":return: 32 or 64 byte offset into pack files. 64 byte offsets will only
324329
be returned if the pack is larger than 4 GiB, or 2^32"""
325-
offset = unpack_from(">L", self._data, self._pack_offset + i * 4)[0]
330+
offset = unpack_from(">L", self._cursor.map(), self._pack_offset + i * 4)[0]
326331

327332
# if the high-bit is set, this indicates that we have to lookup the offset
328333
# in the 64 bit region of the file. The current offset ( lower 31 bits )
329334
# are the index into it
330335
if offset & 0x80000000:
331-
offset = unpack_from(">Q", self._data, self._pack_64_offset + (offset & ~0x80000000) * 8)[0]
336+
offset = unpack_from(">Q", self._cursor.map(), self._pack_64_offset + (offset & ~0x80000000) * 8)[0]
332337
# END handle 64 bit offset
333338

334339
return offset
335340

336341
def _sha_v2(self, i):
337342
""":return: sha at the given index of this file index instance"""
338343
base = self._sha_list_offset + i * 20
339-
return self._data[base:base+20]
344+
return self._cursor.map()[base:base+20]
340345

341346
def _crc_v2(self, i):
342347
""":return: 4 bytes crc for the object at index i"""
343-
return unpack_from(">L", self._data, self._crc_list_offset + i * 4)[0]
348+
return unpack_from(">L", self._cursor.map(), self._crc_list_offset + i * 4)[0]
344349

345350
#} END access V2
346351

@@ -358,7 +363,7 @@ def _initialize(self):
358363

359364
def _read_fanout(self, byte_offset):
360365
"""Generate a fanout table from our data"""
361-
d = self._data
366+
d = self._cursor.map()
362367
out = list()
363368
append = out.append
364369
for i in range(256):
@@ -382,19 +387,19 @@ def path(self):
382387

383388
def packfile_checksum(self):
384389
""":return: 20 byte sha representing the sha1 hash of the pack file"""
385-
return self._data[-40:-20]
390+
return self._cursor.map()[-40:-20]
386391

387392
def indexfile_checksum(self):
388393
""":return: 20 byte sha representing the sha1 hash of this index file"""
389-
return self._data[-20:]
394+
return self._cursor.map()[-20:]
390395

391396
def offsets(self):
392397
""":return: sequence of all offsets in the order in which they were written
393398
:note: return value can be random accessed, but may be immmutable"""
394399
if self._version == 2:
395400
# read stream to array, convert to tuple
396401
a = array.array('I') # 4 byte unsigned int, long are 8 byte on 64 bit it appears
397-
a.fromstring(buffer(self._data, self._pack_offset, self._pack_64_offset - self._pack_offset))
402+
a.fromstring(buffer(self._cursor.map(), self._pack_offset, self._pack_64_offset - self._pack_offset))
398403

399404
# networkbyteorder to something array likes more
400405
if sys.byteorder == 'little':
@@ -501,7 +506,7 @@ class PackFile(LazyMixin):
501506
for some reason - one clearly doesn't want to read 10GB at once in that
502507
case"""
503508

504-
__slots__ = ('_packpath', '_data', '_size', '_version')
509+
__slots__ = ('_packpath', '_cursor', '_size', '_version')
505510
pack_signature = 0x5041434b # 'PACK'
506511
pack_version_default = 2
507512

@@ -513,32 +518,26 @@ def __init__(self, packpath):
513518
self._packpath = packpath
514519

515520
def _set_cache_(self, attr):
516-
if attr == '_data':
517-
self._data = file_contents_ro_filepath(self._packpath)
518-
519-
# read the header information
520-
type_id, self._version, self._size = unpack_from(">LLL", self._data, 0)
521-
522-
# TODO: figure out whether we should better keep the lock, or maybe
523-
# add a .keep file instead ?
524-
else: # must be '_size' or '_version'
525-
# read header info - we do that just with a file stream
526-
type_id, self._version, self._size = unpack(">LLL", open(self._packpath).read(12))
527-
# END handle header
521+
# we fill the whole cache, whichever attribute gets queried first
522+
self._cursor = mman.make_cursor(self._packpath).use_region()
528523

524+
# read the header information
525+
type_id, self._version, self._size = unpack_from(">LLL", self._cursor.map(), 0)
526+
527+
# TODO: figure out whether we should better keep the lock, or maybe
528+
# add a .keep file instead ?
529529
if type_id != self.pack_signature:
530530
raise ParseError("Invalid pack signature: %i" % type_id)
531-
#END assert type id
532531

533532
def _iter_objects(self, start_offset, as_stream=True):
534533
"""Handle the actual iteration of objects within this pack"""
535-
data = self._data
536-
content_size = len(data) - self.footer_size
534+
c = self._cursor
535+
content_size = c.file_size() - self.footer_size
537536
cur_offset = start_offset or self.first_object_offset
538537

539538
null = NullStream()
540539
while cur_offset < content_size:
541-
data_offset, ostream = pack_object_at(data, cur_offset, True)
540+
data_offset, ostream = pack_object_at(c, cur_offset, True)
542541
# scrub the stream to the end - this decompresses the object, but yields
543542
# the amount of compressed bytes we need to get to the next offset
544543

@@ -567,12 +566,14 @@ def version(self):
567566
def data(self):
568567
"""
569568
:return: read-only data of this pack. It provides random access and usually
570-
is a memory map"""
571-
return self._data
569+
is a memory map.
570+
:note: This method is unsafe as it returns a window into a file which might be larger than than the actual window size"""
571+
# can use map as we are starting at offset 0. Otherwise we would have to use buffer()
572+
return self._cursor.use_region().map()
572573

573574
def checksum(self):
574575
""":return: 20 byte sha1 hash on all object sha's contained in this file"""
575-
return self._data[-20:]
576+
return self._cursor.use_region(self._cursor.file_size()-20).buffer()[:]
576577

577578
def path(self):
578579
""":return: path to the packfile"""
@@ -591,8 +592,9 @@ def collect_streams(self, offset):
591592
If the object at offset is no delta, the size of the list is 1.
592593
:param offset: specifies the first byte of the object within this pack"""
593594
out = list()
595+
c = self._cursor
594596
while True:
595-
ostream = pack_object_at(self._data, offset, True)[1]
597+
ostream = pack_object_at(c, offset, True)[1]
596598
out.append(ostream)
597599
if ostream.type_id == OFS_DELTA:
598600
offset = ostream.pack_offset - ostream.delta_info
@@ -614,14 +616,14 @@ def info(self, offset):
614616
615617
:param offset: byte offset
616618
:return: OPackInfo instance, the actual type differs depending on the type_id attribute"""
617-
return pack_object_at(self._data, offset or self.first_object_offset, False)[1]
619+
return pack_object_at(self._cursor, offset or self.first_object_offset, False)[1]
618620

619621
def stream(self, offset):
620622
"""Retrieve an object at the given file-relative offset as stream along with its information
621623
622624
:param offset: byte offset
623625
:return: OPackStream instance, the actual type differs depending on the type_id attribute"""
624-
return pack_object_at(self._data, offset or self.first_object_offset, True)[1]
626+
return pack_object_at(self._cursor, offset or self.first_object_offset, True)[1]
625627

626628
def stream_iter(self, start_offset=0):
627629
"""
@@ -704,7 +706,7 @@ def _object(self, sha, as_stream, index=-1):
704706
sha = self._index.sha(index)
705707
# END assure sha is present ( in output )
706708
offset = self._index.offset(index)
707-
type_id, uncomp_size, data_rela_offset = pack_object_header_info(buffer(self._pack._data, offset))
709+
type_id, uncomp_size, data_rela_offset = pack_object_header_info(self._pack._cursor.use_region(offset).buffer())
708710
if as_stream:
709711
if type_id not in delta_types:
710712
packstream = self._pack.stream(offset)

git/test/performance/db/test_packedodb_pure.py

+3-4
Original file line numberDiff line numberDiff line change
@@ -49,18 +49,17 @@ def test_pack_writing(self):
4949
count = 0
5050
total_size = 0
5151
st = time()
52-
objs = list()
5352
for sha in rorepo.sha_iter():
5453
count += 1
55-
objs.append(rorepo.stream(sha))
54+
rorepo.stream(sha)
5655
if count == ni:
5756
break
5857
#END gather objects for pack-writing
5958
elapsed = time() - st
60-
print >> sys.stderr, "PDB Streaming: Got %i streams from %s by sha in in %f s ( %f streams/s )" % (ni, rorepo.__class__.__name__, elapsed, ni / elapsed)
59+
print >> sys.stderr, "PDB Streaming: Got %i streams from %s by sha in in %f s ( %f streams/s )" % (count, rorepo.__class__.__name__, elapsed, count / elapsed)
6160

6261
st = time()
63-
PackEntity.write_pack(objs, ostream.write)
62+
PackEntity.write_pack((rorepo.stream(sha) for sha in rorepo.sha_iter()), ostream.write, object_count=ni)
6463
elapsed = time() - st
6564
total_kb = ostream.bytes_written() / 1000
6665
print >> sys.stderr, "PDB Streaming: Wrote pack of size %i kb in %f s (%f kb/s)" % (total_kb, elapsed, total_kb/elapsed)

git/util.py

+15
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,13 @@
1515
import stat
1616
import shutil
1717
import tempfile
18+
from smmap import (
19+
StaticWindowMapManager,
20+
SlidingWindowMapManager,
21+
SlidingWindowMapBuffer
22+
)
23+
24+
1825

1926
__all__ = ( "stream_copy", "join_path", "to_native_path_windows", "to_native_path_linux",
2027
"join_path_native", "Stats", "IndexFileSHA1Writer", "Iterable", "IterableList",
@@ -64,6 +71,14 @@ def unpack_from(fmt, data, offset=0):
6471
# will be handled in the main thread
6572
pool = ThreadPool(0)
6673

74+
# initialize our global memory manager instance
75+
# Use it to free cached (and unused) resources.
76+
if sys.version_info[1] < 6:
77+
mman = StaticWindowMapManager()
78+
else:
79+
mman = SlidingWindowMapManager()
80+
#END handle mman
81+
6782
#} END globals
6883

6984

0 commit comments

Comments
 (0)