Skip to content

Commit e716758

Browse files
authored
Merge pull request pandas-dev#568 from dimosped/fix-metadata-piclestore
fix the pickle store read corruption after calling write_metadata()
2 parents 644940a + 658aa92 commit e716758

File tree

7 files changed

+70
-18
lines changed

7 files changed

+70
-18
lines changed

CHANGES.md

+4-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
## Changelog
22

3-
### 1.66
3+
### 1.67
4+
* Bugfix: #561 Fix PickleStore read corruption after write_metadata
5+
6+
### 1.66 (2018-05-21)
47
* Bugfix: #168 Do not allow empty string as a column name
58
* Bugfix: #483 Remove potential floating point error from datetime_to_ms
69
* Bugfix: #271 Log when library doesnt exist on delete

arctic/store/_ndarray_store.py

+18-9
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88

99
from ..decorators import mongo_retry
1010
from ..exceptions import UnhandledDtypeException, DataIntegrityException
11-
from ._version_store_utils import checksum
11+
from ._version_store_utils import checksum, version_base_or_id
1212

1313
from .._compression import compress_array, decompress
1414
from six.moves import xrange
@@ -39,20 +39,25 @@ def _promote(type1, type2):
3939
def _attempt_update_unchanged(symbol, unchanged_segment_ids, collection, version, previous_version):
4040
if not unchanged_segment_ids or not collection or not version:
4141
return
42+
43+
# Currenlty it is called only from _concat_and_rewrite, with "base_version_id" always empty
44+
# Use version_base_or_id() instead, to make the method safe going forward, called form anywhere
45+
parent_id = version_base_or_id(version)
46+
4247
# Update the parent set of the unchanged/compressed segments
4348
result = collection.update_many({
4449
'symbol': symbol, # hit only the right shard
4550
# update_many is a broadcast query otherwise
4651
'_id': {'$in': [x['_id'] for x in unchanged_segment_ids]}
4752
},
48-
{'$addToSet': {'parent': version['_id']}})
53+
{'$addToSet': {'parent': parent_id}})
4954
# Fast check for success without extra query
5055
if result.matched_count == len(unchanged_segment_ids):
5156
return
5257
# update_many is tricky sometimes wrt matched_count across replicas when balancer runs. Check based on _id.
5358
unchanged_ids = set([x['_id'] for x in unchanged_segment_ids])
5459
spec = {'symbol': symbol,
55-
'parent': version['_id'],
60+
'parent': parent_id,
5661
'segment': {'$lte': unchanged_segment_ids[-1]['segment']}}
5762
matched_segments_ids = set([x['_id'] for x in collection.find(spec)])
5863
if unchanged_ids != matched_segments_ids:
@@ -240,7 +245,7 @@ def _do_read(self, collection, version, symbol, index_range=None):
240245
segment_count = None
241246

242247
spec = {'symbol': symbol,
243-
'parent': version.get('base_version_id', version['_id']),
248+
'parent': version_base_or_id(version),
244249
'segment': {'$lt': to_index}
245250
}
246251
if from_index is not None:
@@ -333,7 +338,7 @@ def _do_append(self, collection, version, symbol, item, previous_version, dirty_
333338
#_CHUNK_SIZE is probably too big if we're only appending single rows of data - perhaps something smaller,
334339
#or also look at number of appended segments?
335340
if not dirty_append and version['append_count'] < _APPEND_COUNT and version['append_size'] < _APPEND_SIZE:
336-
version['base_version_id'] = previous_version.get('base_version_id', previous_version['_id'])
341+
version['base_version_id'] = version_base_or_id(previous_version)
337342

338343
if len(item) > 0:
339344

@@ -375,7 +380,7 @@ def _concat_and_rewrite(self, collection, version, symbol, item, previous_versio
375380

376381
# Figure out which is the last 'full' chunk
377382
spec = {'symbol': symbol,
378-
'parent': previous_version.get('base_version_id', previous_version['_id']),
383+
'parent': version_base_or_id(previous_version),
379384
'segment': {'$lt': previous_version['up_to']}}
380385

381386
read_index_range = [0, None]
@@ -423,17 +428,21 @@ def _concat_and_rewrite(self, collection, version, symbol, item, previous_versio
423428
self.check_written(collection, symbol, version)
424429

425430
def check_written(self, collection, symbol, version):
431+
# Currently only called from methods which guarantee 'base_version_id' is not populated.
432+
# Make it nonetheless safe for the general case.
433+
parent_id = version_base_or_id(version)
434+
426435
# Check all the chunks are in place
427-
seen_chunks = collection.find({'symbol': symbol, 'parent': version['_id']},
436+
seen_chunks = collection.find({'symbol': symbol, 'parent': parent_id},
428437
).count()
429438

430439
if seen_chunks != version['segment_count']:
431-
segments = [x['segment'] for x in collection.find({'symbol': symbol, 'parent': version['_id']},
440+
segments = [x['segment'] for x in collection.find({'symbol': symbol, 'parent': parent_id},
432441
projection={'segment': 1},
433442
)]
434443
raise pymongo.errors.OperationFailure("Failed to write all the Chunks. Saw %s expecting %s"
435444
"Parent: %s \n segments: %s" %
436-
(seen_chunks, version['segment_count'], version['_id'], segments))
445+
(seen_chunks, version['segment_count'], parent_id, segments))
437446

438447
def checksum(self, item):
439448
sha = hashlib.sha1()

arctic/store/_pickle_store.py

+3-3
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
from .._compression import decompress, compress_array
88
import pymongo
99

10-
from ._version_store_utils import checksum, pickle_compat_load
10+
from ._version_store_utils import checksum, pickle_compat_load, version_base_or_id
1111
from ..exceptions import UnsupportedPickleStoreVersion
1212

1313

@@ -38,12 +38,12 @@ def read(self, mongoose_lib, version, symbol, **kwargs):
3838
if blob == _MAGIC_CHUNKEDV2:
3939
collection = mongoose_lib.get_top_level_collection()
4040
data = b''.join(decompress(x['data']) for x in collection.find({'symbol': symbol,
41-
'parent': version['_id']},
41+
'parent': version_base_or_id(version)},
4242
sort=[('segment', pymongo.ASCENDING)]))
4343
elif blob == _MAGIC_CHUNKED:
4444
collection = mongoose_lib.get_top_level_collection()
4545
data = b''.join(x['data'] for x in collection.find({'symbol': symbol,
46-
'parent': version['_id']},
46+
'parent': version_base_or_id(version)},
4747
sort=[('segment', pymongo.ASCENDING)]))
4848
data = decompress(data)
4949
else:

arctic/store/_version_store_utils.py

+5
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,10 @@ def cleanup(arctic_lib, symbol, version_ids):
6363
collection.delete_one({'symbol': symbol, 'parent': []})
6464

6565

66+
def version_base_or_id(version):
67+
return version.get('base_version_id', version['_id'])
68+
69+
6670
def _define_compat_pickle_load():
6771
"""Factory function to initialise the correct Pickle load function based on
6872
the Pandas version.
@@ -71,6 +75,7 @@ def _define_compat_pickle_load():
7175
return pickle.load
7276
return functools.partial(pickle_compat.load, compat=True)
7377

78+
7479
# Initialise the pickle load function and delete the factory function.
7580
pickle_compat_load = _define_compat_pickle_load()
7681
del _define_compat_pickle_load

tests/integration/store/test_pickle_store.py

+9
Original file line numberDiff line numberDiff line change
@@ -110,3 +110,12 @@ def test_prune_previous_doesnt_kill_other_objects(library):
110110
library._delete_version('BLOB', 2)
111111
assert library._collection.count() == 0
112112
assert library._collection.versions.count() == 1
113+
114+
115+
def test_write_metadata(library):
116+
blob = {'foo': dt(2015, 1, 1), 'object': Arctic}
117+
library.write(symbol='symX', data=blob, metadata={'key1': 'value1'})
118+
library.write_metadata(symbol='symX', metadata={'key2': 'value2'})
119+
v = library.read('symX')
120+
assert v.data == blob
121+
assert v.metadata == {'key2': 'value2'}

tests/unit/store/test_pickle_store.py

+17-2
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,23 @@ def test_read_object_2():
6565
assert PickleStore.read(self, arctic_lib, version, sentinel.symbol) == object
6666
assert coll.find.call_args_list == [call({'symbol': sentinel.symbol, 'parent': sentinel._id}, sort=[('segment', 1)])]
6767

68+
69+
def test_read_with_base_version_id():
70+
self = create_autospec(PickleStore)
71+
version = {'_id': sentinel._id,
72+
'base_version_id': sentinel.base_version_id,
73+
'blob': '__chunked__'}
74+
coll = Mock()
75+
arctic_lib = Mock()
76+
coll.find.return_value = [{'data': Binary(compressHC(cPickle.dumps(object))),
77+
'symbol': 'sentinel.symbol'}
78+
]
79+
arctic_lib.get_top_level_collection.return_value = coll
80+
81+
assert PickleStore.read(self, arctic_lib, version, sentinel.symbol) == object
82+
assert coll.find.call_args_list == [call({'symbol': sentinel.symbol, 'parent': sentinel.base_version_id}, sort=[('segment', 1)])]
83+
84+
6885
@pytest.mark.xfail(sys.version_info >= (3,),
6986
reason="lz4 data written with python2 not compatible with python3")
7087
def test_read_backward_compatibility():
@@ -153,5 +170,3 @@ def test_pickle_store_future_version():
153170
with pytest.raises(UnsupportedPickleStoreVersion) as e:
154171
ps.read(arctic_lib, version, sentinel.symbol)
155172
assert('unsupported version of pickle store' in str(e))
156-
157-

tests/unit/store/test_version_store_utils.py

+14-3
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
1-
import pytest
2-
import numpy as np
31
import binascii
42

5-
from arctic.store._version_store_utils import _split_arrs, checksum
3+
import numpy as np
4+
import pytest
5+
from mock import sentinel
6+
7+
from arctic.store._version_store_utils import _split_arrs, checksum, version_base_or_id
68

79

810
def test_split_arrs_empty():
@@ -30,3 +32,12 @@ def test_checksum_handles_p3strs_and_binary():
3032
expected = b'4O11 ;<[email protected](JRB1.?D[ZEN!8'
3133
assert binascii.b2a_uu(digest).strip() == expected
3234

35+
36+
def test_version_base_or_id():
37+
with pytest.raises(KeyError):
38+
version_base_or_id({})
39+
assert version_base_or_id({'_id': sentinel._id}) == sentinel._id
40+
assert version_base_or_id({
41+
'_id': sentinel._id,
42+
'base_version_id': sentinel.base_version_id
43+
}) == sentinel.base_version_id

0 commit comments

Comments
 (0)