Skip to content

Commit 101c5e9

Browse files
willdealtrydimosped
authored andcommitted
Initial implementation of default handling for pickled frames (pandas-dev#622)
* initial implementation of default halnding for pickled frmes * MDP-3767 throw exceptions instead of falling back to default pickle behaviour * updated the strict handler check mechanism to be at the library level, and then use the os.envion (if set), else by default disabled * sanitized the tests for the strict handler checks * clarified the decision of having the handler_supports_read_option option in the do_read of version store instead inside individual handlers
1 parent eb23f47 commit 101c5e9

File tree

7 files changed

+154
-9
lines changed

7 files changed

+154
-9
lines changed

arctic/serialization/numpy_records.py

+2-3
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
DTN64_DTYPE = 'datetime64[ns]'
1919

20-
2120
def _to_primitive(arr, string_max_len=None):
2221
if arr.dtype.hasobject:
2322
if len(arr) > 0:
@@ -145,8 +144,8 @@ def can_convert_to_records_without_objects(self, df, symbol):
145144
return False
146145
else:
147146
if arr.dtype.hasobject:
148-
log.info('Pandas dataframe %s contains Objects, saving as Blob' % symbol)
149-
# Will fall-back to saving using Pickle
147+
log.warning('Pandas dataframe %s contains Objects, saving as Blob' % symbol)
148+
# Fall-back to saving using Pickle
150149
return False
151150
elif any([len(x[0].shape) for x in arr.dtype.fields.values()]):
152151
log.info('Pandas dataframe %s contains >1 dimensional arrays, saving as Blob' % symbol)

arctic/store/_ndarray_store.py

+9-1
Original file line numberDiff line numberDiff line change
@@ -210,8 +210,12 @@ def can_delete(self, version, symbol):
210210
def can_read(self, version, symbol):
211211
return version['type'] == self.TYPE
212212

213+
@staticmethod
214+
def can_write_type(data):
215+
return isinstance(data, np.ndarray)
216+
213217
def can_write(self, version, symbol, data):
214-
return isinstance(data, np.ndarray) and not data.dtype.hasobject
218+
return self.can_write_type(data) and not data.dtype.hasobject
215219

216220
def _dtype(self, string, metadata=None):
217221
if metadata is None:
@@ -241,6 +245,10 @@ def get_info(self, version):
241245
ret['rows'] = int(version['up_to'])
242246
return ret
243247

248+
@staticmethod
249+
def read_options():
250+
return ['from_version']
251+
244252
def read(self, arctic_lib, version, symbol, read_preference=None, **kwargs):
245253
index_range = self._index_range(version, symbol, **kwargs)
246254
collection = arctic_lib.get_top_level_collection()

arctic/store/_pandas_ndarray_store.py

+27-3
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,9 @@ def _datetime64_index(self, recarr):
7070
return name
7171
return None
7272

73+
def read_options(self):
74+
return ['date_range']
75+
7376
def _index_range(self, version, symbol, date_range=None, **kwargs):
7477
""" Given a version, read the segment_index and return the chunks associated
7578
with the date_range. As the segment index is (id -> last datetime)
@@ -143,8 +146,12 @@ class PandasSeriesStore(PandasStore):
143146
TYPE = 'pandasseries'
144147
SERIALIZER = SeriesSerializer()
145148

149+
@staticmethod
150+
def can_write_type(data):
151+
return isinstance(data, Series)
152+
146153
def can_write(self, version, symbol, data):
147-
if isinstance(data, Series):
154+
if self.can_write_type(data):
148155
if data.dtype == np.object_ or data.index.dtype == np.object_:
149156
return self.SERIALIZER.can_convert_to_records_without_objects(data, symbol)
150157
return True
@@ -158,6 +165,9 @@ def append(self, arctic_lib, version, symbol, item, previous_version, **kwargs):
158165
item, md = self.SERIALIZER.serialize(item)
159166
super(PandasSeriesStore, self).append(arctic_lib, version, symbol, item, previous_version, dtype=md, **kwargs)
160167

168+
def read_options(self):
169+
return super(PandasSeriesStore, self).read_options()
170+
161171
def read(self, arctic_lib, version, symbol, **kwargs):
162172
item = super(PandasSeriesStore, self).read(arctic_lib, version, symbol, **kwargs)
163173
return self.SERIALIZER.deserialize(item)
@@ -167,8 +177,12 @@ class PandasDataFrameStore(PandasStore):
167177
TYPE = 'pandasdf'
168178
SERIALIZER = DataFrameSerializer()
169179

180+
@staticmethod
181+
def can_write_type(data):
182+
return isinstance(data, DataFrame)
183+
170184
def can_write(self, version, symbol, data):
171-
if isinstance(data, DataFrame):
185+
if self.can_write_type(data):
172186
if np.any(data.dtypes.values == 'object'):
173187
return self.SERIALIZER.can_convert_to_records_without_objects(data, symbol)
174188
return True
@@ -186,12 +200,19 @@ def read(self, arctic_lib, version, symbol, **kwargs):
186200
item = super(PandasDataFrameStore, self).read(arctic_lib, version, symbol, **kwargs)
187201
return self.SERIALIZER.deserialize(item)
188202

203+
def read_options(self):
204+
return super(PandasDataFrameStore, self).read_options()
205+
189206

190207
class PandasPanelStore(PandasDataFrameStore):
191208
TYPE = 'pandaspan'
192209

210+
@staticmethod
211+
def can_write_type(data):
212+
return isinstance(data, Panel)
213+
193214
def can_write(self, version, symbol, data):
194-
if isinstance(data, Panel):
215+
if self.can_write_type(data):
195216
frame = data.to_frame(filter_observations=False)
196217
if np.any(frame.dtypes.values == 'object'):
197218
return self.SERIALIZER.can_convert_to_records_without_objects(frame, symbol)
@@ -220,5 +241,8 @@ def read(self, arctic_lib, version, symbol, **kwargs):
220241
return item.iloc[:, 0].unstack().to_panel()
221242
return item.to_panel()
222243

244+
def read_options(self):
245+
return super(PandasPanelStore, self).read_options()
246+
223247
def append(self, arctic_lib, version, symbol, item, previous_version, **kwargs):
224248
raise ValueError('Appending not supported for pandas.Panel')

arctic/store/_pickle_store.py

+4
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,10 @@ def read(self, mongoose_lib, version, symbol, **kwargs):
5858
return pickle_compat_load(io.BytesIO(data))
5959
return version['data']
6060

61+
@staticmethod
62+
def read_options():
63+
return []
64+
6165
def write(self, arctic_lib, version, symbol, item, previous_version):
6266
try:
6367
# If it's encodeable, then ship it

arctic/store/version_store.py

+40
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from datetime import datetime as dt, timedelta
22
import logging
3+
import os
34

45
import bson
56
from pymongo import ReadPreference
@@ -21,6 +22,7 @@
2122

2223
VERSION_STORE_TYPE = 'VersionStore'
2324
_TYPE_HANDLERS = []
25+
STRICT_WRITE_HANDLER_MATCH = bool(os.environ.get('STRICT_WRITE_HANDLER_MATCH'))
2426

2527

2628
def register_versioned_storage(storageClass):
@@ -45,6 +47,10 @@ def initialize_library(cls, arctic_lib, hashed=True, **kwargs):
4547
# 32MB buffer for change notifications
4648
mongo_retry(c.database.create_collection)('%s.changes' % c.name, capped=True, size=32 * 1024 * 1024)
4749

50+
if 'STRICT_WRITE_HANDLER_MATCH' in kwargs:
51+
arctic_lib.set_library_metadata('STRICT_WRITE_HANDLER_MATCH',
52+
bool(kwargs.pop('STRICT_WRITE_HANDLER_MATCH')))
53+
4854
for th in _TYPE_HANDLERS:
4955
th.initialize_library(arctic_lib, **kwargs)
5056
VersionStore._bson_handler.initialize_library(arctic_lib, **kwargs)
@@ -79,6 +85,14 @@ def __init__(self, arctic_lib):
7985
# Do we allow reading from secondaries
8086
self._allow_secondary = self._arctic_lib.arctic._allow_secondary
8187
self._reset()
88+
self._with_strict_handler = None
89+
90+
@property
91+
def _with_strict_handler_match(self):
92+
if self._with_strict_handler is None:
93+
strict_meta = self._arctic_lib.get_library_metadata('STRICT_WRITE_HANDLER_MATCH')
94+
self._with_strict_handler = STRICT_WRITE_HANDLER_MATCH if strict_meta is None else strict_meta
95+
return self._with_strict_handler
8296

8397
@mongo_retry
8498
def _reset(self):
@@ -301,12 +315,21 @@ def _read_handler(self, version, symbol):
301315
handler = self._bson_handler
302316
return handler
303317

318+
@staticmethod
319+
def handler_can_write_type(handler, data):
320+
type_method = getattr(handler, "can_write_type", None)
321+
if callable(type_method):
322+
return type_method(data)
323+
return False
324+
304325
def _write_handler(self, version, symbol, data, **kwargs):
305326
handler = None
306327
for h in _TYPE_HANDLERS:
307328
if h.can_write(version, symbol, data, **kwargs):
308329
handler = h
309330
break
331+
if self._with_strict_handler_match and self.handler_can_write_type(h, data):
332+
raise ArcticException("Not falling back to default handler for %s" % symbol)
310333
if handler is None:
311334
version['type'] = 'default'
312335
handler = self._bson_handler
@@ -384,10 +407,27 @@ def get_info(self, symbol, as_of=None):
384407
return handler.get_info(version)
385408
return {}
386409

410+
@staticmethod
411+
def handler_supports_read_option(handler, option):
412+
options_method = getattr(handler, "read_options", None)
413+
if callable(options_method):
414+
return option in options_method()
415+
416+
# If the handler doesn't support interrogation of its read options assume
417+
# that it does support this option (i.e. fail-open)
418+
return True
419+
387420
def _do_read(self, symbol, version, from_version=None, **kwargs):
388421
if version.get('deleted'):
389422
raise NoDataFoundException("No data found for %s in library %s" % (symbol, self._arctic_lib.get_name()))
390423
handler = self._read_handler(version, symbol)
424+
# We don't push the date_range check in the handler's code, since the "_with_strict_handler_match"
425+
# value is configured on a per-library basis, and is part of the VersionStore instance.
426+
if self._with_strict_handler_match and \
427+
kwargs.get('date_range') and \
428+
not self.handler_supports_read_option(handler, 'date_range'):
429+
raise ArcticException("Date range arguments not supported by handler in %s" % symbol)
430+
391431
data = handler.read(self._arctic_lib, version, symbol, from_version=from_version, **kwargs)
392432
return VersionedItem(symbol=symbol, library=self._arctic_lib.get_name(), version=version['version'],
393433
metadata=version.pop('metadata', None), data=data,

tests/integration/store/test_version_store.py

+71-1
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import struct
44
from datetime import datetime as dt, timedelta as dtd
55
import pandas as pd
6+
from arctic import VERSION_STORE
67
from pandas.util.testing import assert_frame_equal
78
from pymongo.errors import OperationFailure
89
from pymongo.server_type import SERVER_TYPE
@@ -18,6 +19,7 @@
1819
from arctic.exceptions import NoDataFoundException, DuplicateSnapshotException, ArcticException
1920
from arctic.date import DateRange
2021
from arctic.store import _version_store_utils
22+
from arctic.store import version_store
2123

2224
from ...util import read_str_as_pandas
2325
from arctic.date._mktz import mktz
@@ -1492,4 +1494,72 @@ def test_snapshot_list_versions_after_delete(library, library_name):
14921494

14931495
library.delete('symC')
14941496

1495-
assert {v['symbol'] for v in library.list_versions(snapshot='snapA')} == {'symA', 'symB', 'symC'}
1497+
assert {v['symbol'] for v in library.list_versions(snapshot='snapA')} == {'symA', 'symB', 'symC'}
1498+
1499+
1500+
def test_write_non_serializable_throws(arctic):
1501+
lib_name = 'write_hanlder_test'
1502+
arctic.initialize_library(lib_name, VERSION_STORE)
1503+
with patch('arctic.store.version_store.STRICT_WRITE_HANDLER_MATCH', True):
1504+
library = arctic[lib_name]
1505+
1506+
# Check that falling back to a pickle from a dataframe throws
1507+
df = pd.DataFrame({'a': [dict(a=1)]})
1508+
1509+
with pytest.raises(ArcticException):
1510+
library.write('ns1', df)
1511+
1512+
# Check that saving a regular dataframe succeeds with this option set
1513+
library.write('ns2', ts1)
1514+
assert_frame_equal(ts1, library.read('ns2').data)
1515+
1516+
1517+
def test_write_non_serializable_pickling_default(arctic):
1518+
lib_name = 'write_hanlder_test'
1519+
arctic.initialize_library(lib_name, VERSION_STORE)
1520+
library = arctic[lib_name]
1521+
df = pd.DataFrame({'a': [dict(a=1)]})
1522+
library.write('ns3', df)
1523+
assert_frame_equal(df, library.read('ns3').data)
1524+
1525+
1526+
def test_write_strict_no_daterange(arctic):
1527+
lib_name = 'write_hanlder_test'
1528+
arctic.initialize_library(lib_name, VERSION_STORE)
1529+
1530+
# Write with pickling
1531+
with patch('arctic.store.version_store.STRICT_WRITE_HANDLER_MATCH', True):
1532+
library = arctic[lib_name]
1533+
data = [dict(a=1)]
1534+
library.write('ns4', data)
1535+
1536+
# When the option is set, we should now be unable to read this item when we specify a
1537+
# date range, even though it was written successfully
1538+
with pytest.raises(ArcticException):
1539+
library.read('ns4', date_range=DateRange(dt(2017, 1, 1), dt(2017, 1, 2)))
1540+
1541+
assert data == library.read('ns4').data
1542+
1543+
1544+
def test_handler_check_default_false(arctic):
1545+
lib_name = 'write_hanlder_test1'
1546+
arctic.initialize_library(lib_name, VERSION_STORE)
1547+
assert arctic[lib_name]._with_strict_handler_match is False
1548+
1549+
1550+
def test_handler_check_default_osenviron(arctic):
1551+
with patch('arctic.store.version_store.STRICT_WRITE_HANDLER_MATCH', True):
1552+
lib_name = 'write_hanlder_test2'
1553+
arctic.initialize_library(lib_name, VERSION_STORE)
1554+
assert arctic[lib_name]._with_strict_handler_match is True
1555+
1556+
def test_handler_check_set_false(arctic):
1557+
lib_name = 'write_hanlder_test3'
1558+
arctic.initialize_library(lib_name, VERSION_STORE, STRICT_WRITE_HANDLER_MATCH=False)
1559+
assert arctic[lib_name]._with_strict_handler_match is False
1560+
1561+
1562+
def test_handler_check_set_true(arctic):
1563+
lib_name = 'write_hanlder_test4'
1564+
arctic.initialize_library(lib_name, VERSION_STORE, STRICT_WRITE_HANDLER_MATCH=True)
1565+
assert arctic[lib_name]._with_strict_handler_match is True

tests/unit/serialization/test_numpy_records.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ def test_can_convert_to_records_without_objects_returns_false_when_records_have_
2929
with patch('arctic.serialization.numpy_records.log') as mock_log:
3030
assert store.can_convert_to_records_without_objects(sentinel.df, 'my_symbol') is False
3131

32-
mock_log.info.assert_called_once_with('Pandas dataframe my_symbol contains Objects, saving as Blob')
32+
mock_log.warning.assert_called_once_with('Pandas dataframe my_symbol contains Objects, saving as Blob')
3333
store._to_records.assert_called_once_with(sentinel.df)
3434

3535

0 commit comments

Comments
 (0)