Skip to content

Commit 70e8278

Browse files
authored
1 parent 76ae30c commit 70e8278

File tree

3 files changed

+67
-31
lines changed

3 files changed

+67
-31
lines changed

arctic/store/_pickle_store.py

+22-14
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,13 @@
33
from bson.errors import InvalidDocument
44
from six.moves import cPickle, xrange
55
import io
6-
import lz4
6+
from .._compression import compress, decompress, compress_array
77
import pymongo
88

99
from arctic.store._version_store_utils import checksum, pickle_compat_load
1010

1111
_MAGIC_CHUNKED = '__chunked__'
12+
_MAGIC_CHUNKEDV2 = '__chunked__V2'
1213
_CHUNK_SIZE = 15 * 1024 * 1024 # 15MB
1314
_MAX_BSON_ENCODE = 256 * 1024 # 256K - don't fill up the version document with encoded bson
1415

@@ -28,15 +29,19 @@ def get_info(self, version):
2829
def read(self, mongoose_lib, version, symbol, **kwargs):
2930
blob = version.get("blob")
3031
if blob is not None:
31-
if blob == _MAGIC_CHUNKED:
32+
if blob == _MAGIC_CHUNKEDV2:
33+
collection = mongoose_lib.get_top_level_collection()
34+
data = b''.join(decompress(x['data']) for x in collection.find({'symbol': symbol,
35+
'parent': version['_id']},
36+
sort=[('segment', pymongo.ASCENDING)]))
37+
elif blob == _MAGIC_CHUNKED:
3238
collection = mongoose_lib.get_top_level_collection()
3339
data = b''.join(x['data'] for x in collection.find({'symbol': symbol,
34-
'parent': version['_id']},
40+
'parent': version['_id']},
3541
sort=[('segment', pymongo.ASCENDING)]))
42+
data = decompress(data)
3643
else:
37-
data = blob
38-
# Backwards compatibility
39-
data = lz4.decompress(data)
44+
data = decompress(blob)
4045
return pickle_compat_load(io.BytesIO(data))
4146
return version['data']
4247

@@ -53,13 +58,16 @@ def write(self, arctic_lib, version, symbol, item, previous_version):
5358
# Pickle, chunk and store the data
5459
collection = arctic_lib.get_top_level_collection()
5560
# Try to pickle it. This is best effort
56-
version['blob'] = _MAGIC_CHUNKED
57-
pickled = lz4.compressHC(cPickle.dumps(item, protocol=cPickle.HIGHEST_PROTOCOL))
61+
version['blob'] = _MAGIC_CHUNKEDV2
62+
pickled = cPickle.dumps(item, protocol=cPickle.HIGHEST_PROTOCOL)
63+
64+
data = compress_array([pickled[i * _CHUNK_SIZE: (i + 1) * _CHUNK_SIZE] for i in xrange(int(len(pickled) / _CHUNK_SIZE + 1))])
5865

59-
for i in xrange(int(len(pickled) / _CHUNK_SIZE + 1)):
60-
segment = {'data': Binary(pickled[i * _CHUNK_SIZE : (i + 1) * _CHUNK_SIZE])}
66+
for seg, d in enumerate(data):
67+
segment = {'data': Binary(d)}
68+
segment['segment'] = seg
69+
seg += 1
6170
sha = checksum(symbol, segment)
62-
segment['segment'] = i
63-
collection.update_one({'symbol': symbol, 'sha': sha}, {'$set': segment,
64-
'$addToSet': {'parent': version['_id']}},
65-
upsert=True)
71+
collection.update_one({'symbol': symbol, 'sha': sha},
72+
{'$set': segment, '$addToSet': {'parent': version['_id']}},
73+
upsert=True)

tests/integration/store/test_pickle_store.py

+13-2
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
from datetime import datetime as dt, timedelta
33
from mock import patch
44
import numpy as np
5-
import re
65

76
from arctic.arctic import Arctic
87

@@ -13,13 +12,25 @@ def test_save_read_bson(library):
1312
saved_blob = library.read('BLOB').data
1413
assert blob == saved_blob
1514

15+
'''
16+
Run test at your own discretion. Takes > 60 secs
17+
def test_save_read_MASSIVE(library):
18+
import pandas as pd
19+
df = pd.DataFrame(data={'data': [1] * 150000000})
20+
data = (df, df)
21+
library.write('BLOB', data)
22+
saved_blob = library.read('BLOB').data
23+
assert(saved_blob[0].equals(df))
24+
assert(saved_blob[1].equals(df))
25+
'''
26+
1627

1728
def test_save_read_big_encodable(library):
1829
blob = {'foo': 'a' * 1024 * 1024 * 20}
1930
library.write('BLOB', blob)
2031
saved_blob = library.read('BLOB').data
2132
assert blob == saved_blob
22-
33+
2334

2435
def test_save_read_bson_object(library):
2536
blob = {'foo': dt(2015, 1, 1), 'object': Arctic}

tests/unit/store/test_pickle_store.py

+32-15
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,5 @@
11
import pandas as pd
2-
try:
3-
import cPickle as pickle
4-
except ImportError:
5-
import pickle
2+
from six.moves import cPickle
63
import lz4
74
import pytest
85
import sys
@@ -12,6 +9,7 @@
129
from distutils.version import LooseVersion
1310
from mock import create_autospec, sentinel, Mock, call
1411

12+
from arctic._compression import compress, decompress
1513
from arctic.store._pickle_store import PickleStore
1614
from arctic.store._version_store_utils import checksum
1715

@@ -32,13 +30,12 @@ def test_write_object():
3230
PickleStore.write(self, arctic_lib, version, 'sentinel.symbol', sentinel.item, sentinel.previous_version)
3331
assert 'data' not in version
3432

35-
assert version['blob'] == '__chunked__'
33+
assert version['blob'] == '__chunked__V2'
3634
coll = arctic_lib.get_top_level_collection.return_value
37-
assert coll.update_one.call_args_list == [call({'sha': checksum('sentinel.symbol',
38-
{'data': Binary(lz4.compressHC(pickle.dumps(sentinel.item, pickle.HIGHEST_PROTOCOL)))}), 'symbol': 'sentinel.symbol'},
39-
{'$set': {'segment': 0,
40-
'data': Binary(lz4.compressHC(pickle.dumps(sentinel.item, pickle.HIGHEST_PROTOCOL)), 0)},
41-
'$addToSet': {'parent': version['_id']}}, upsert=True)]
35+
assert coll.update_one.call_args_list == [call({'sha': checksum('sentinel.symbol', {'segment':0, 'data': Binary(compress(cPickle.dumps(sentinel.item, cPickle.HIGHEST_PROTOCOL)))}),
36+
'symbol': 'sentinel.symbol'},
37+
{'$set': {'segment': 0, 'data': Binary(compress(cPickle.dumps(sentinel.item, cPickle.HIGHEST_PROTOCOL)), 0)},
38+
'$addToSet': {'parent': version['_id']}}, upsert=True)]
4239

4340

4441
def test_read():
@@ -49,7 +46,7 @@ def test_read():
4946

5047
def test_read_object_backwards_compat():
5148
self = create_autospec(PickleStore)
52-
version = {'blob': Binary(lz4.compressHC(pickle.dumps(object)))}
49+
version = {'blob': Binary(lz4.compressHC(cPickle.dumps(object)))}
5350
assert PickleStore.read(self, sentinel.arctic_lib, version, sentinel.symbol) == object
5451

5552

@@ -59,7 +56,7 @@ def test_read_object_2():
5956
'blob': '__chunked__'}
6057
coll = Mock()
6158
arctic_lib = Mock()
62-
coll.find.return_value = [{'data': Binary(lz4.compressHC(pickle.dumps(object))),
59+
coll.find.return_value = [{'data': Binary(lz4.compressHC(cPickle.dumps(object))),
6360
'symbol': 'sentinel.symbol'}
6461
]
6562
arctic_lib.get_top_level_collection.return_value = coll
@@ -79,10 +76,10 @@ def test_read_backward_compatibility():
7976
if PANDAS_VERSION >= LooseVersion("0.16.1"):
8077
if sys.version_info[0] >= 3:
8178
with pytest.raises(UnicodeDecodeError), open(fname) as fh:
82-
pickle.load(fh)
79+
cPickle.load(fh)
8380
else:
8481
with pytest.raises(TypeError), open(fname) as fh:
85-
pickle.load(fh)
82+
cPickle.load(fh)
8683

8784
# Verify that PickleStore() uses a backwards compatible unpickler.
8885
store = PickleStore()
@@ -101,7 +98,7 @@ def test_unpickle_highest_protocol():
10198
container has been pickled with HIGHEST_PROTOCOL.
10299
"""
103100
version = {
104-
'blob': lz4.compressHC(pickle.dumps(pd.Series(), protocol=pickle.HIGHEST_PROTOCOL)),
101+
'blob': lz4.compressHC(cPickle.dumps(pd.Series(), protocol=cPickle.HIGHEST_PROTOCOL)),
105102
}
106103

107104
store = PickleStore()
@@ -111,3 +108,23 @@ def test_unpickle_highest_protocol():
111108
assert (ps == expected).all()
112109

113110

111+
def test_pickle_chunk_V1_read():
112+
data = {'foo': b'abcdefghijklmnopqrstuvwxyz'}
113+
version = {'_id': sentinel._id,
114+
'blob': '__chunked__'}
115+
coll = Mock()
116+
arctic_lib = Mock()
117+
datap = lz4.compressHC(cPickle.dumps(data, protocol=cPickle.HIGHEST_PROTOCOL))
118+
data_1 = datap[0:5]
119+
data_2 = datap[5:]
120+
coll.find.return_value = [{'data': Binary(data_1),
121+
'symbol': 'sentinel.symbol',
122+
'segment': 0},
123+
{'data': Binary(data_2),
124+
'symbol': 'sentinel.symbol',
125+
'segment': 1},
126+
]
127+
arctic_lib.get_top_level_collection.return_value = coll
128+
129+
ps = PickleStore()
130+
assert(data == ps.read(arctic_lib, version, sentinel.symbol))

0 commit comments

Comments
 (0)