diff --git a/doc/source/whatsnew/v0.18.1.txt b/doc/source/whatsnew/v0.18.1.txt index 5bb7dfe87562c..037639b51cd26 100644 --- a/doc/source/whatsnew/v0.18.1.txt +++ b/doc/source/whatsnew/v0.18.1.txt @@ -37,7 +37,11 @@ Enhancements +Other Enhancements +^^^^^^^^^^^^^^^^^^ +- ``from_msgpack`` now always gives writeable ndarrays even when compression is + used (:issue:`12359`). .. _whatsnew_0181.api: diff --git a/pandas/io/packers.py b/pandas/io/packers.py index 701b78d2771fb..f3f3c26b88993 100644 --- a/pandas/io/packers.py +++ b/pandas/io/packers.py @@ -38,9 +38,11 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. """ -import os from datetime import datetime, date, timedelta from dateutil.parser import parse +import os +from textwrap import dedent +import warnings import numpy as np from pandas import compat @@ -52,12 +54,61 @@ from pandas.sparse.api import SparseSeries, SparseDataFrame, SparsePanel from pandas.sparse.array import BlockIndex, IntIndex from pandas.core.generic import NDFrame -from pandas.core.common import needs_i8_conversion, pandas_dtype +from pandas.core.common import ( + PerformanceWarning, + needs_i8_conversion, + pandas_dtype, +) from pandas.io.common import get_filepath_or_buffer from pandas.core.internals import BlockManager, make_block import pandas.core.internals as internals from pandas.msgpack import Unpacker as _Unpacker, Packer as _Packer, ExtType +from pandas.util._move import ( + BadMove as _BadMove, + move_into_mutable_buffer as _move_into_mutable_buffer, +) + +# check whcih compression libs we have installed +try: + import zlib + + def _check_zlib(): + pass +except ImportError: + def _check_zlib(): + raise ValueError('zlib is not installed') + +_check_zlib.__doc__ = dedent( + """\ + Check if zlib is installed. + + Raises + ------ + ValueError + Raised when zlib is not installed. + """, +) + +try: + import blosc + + def _check_blosc(): + pass +except ImportError: + def _check_blosc(): + raise ValueError('blosc is not installed') + +_check_blosc.__doc__ = dedent( + """\ + Check if blosc is installed. + + Raises + ------ + ValueError + Raised when blosc is not installed. + """, +) # until we can pass this into our conversion functions, # this is pretty hacky @@ -215,6 +266,7 @@ def convert(values): return v.tolist() if compressor == 'zlib': + _check_zlib() # return string arrays like they are if dtype == np.object_: @@ -222,10 +274,10 @@ def convert(values): # convert to a bytes array v = v.tostring() - import zlib return ExtType(0, zlib.compress(v)) elif compressor == 'blosc': + _check_blosc() # return string arrays like they are if dtype == np.object_: @@ -233,7 +285,6 @@ def convert(values): # convert to a bytes array v = v.tostring() - import blosc return ExtType(0, blosc.compress(v, typesize=dtype.itemsize)) # ndarray (on original dtype) @@ -255,17 +306,40 @@ def unconvert(values, dtype, compress=None): if not as_is_ext: values = values.encode('latin1') - if compress == u'zlib': - import zlib - values = zlib.decompress(values) - return np.frombuffer(values, dtype=dtype) - - elif compress == u'blosc': - import blosc - values = blosc.decompress(values) - return np.frombuffer(values, dtype=dtype) + if compress: + if compress == u'zlib': + _check_zlib() + decompress = zlib.decompress + elif compress == u'blosc': + _check_blosc() + decompress = blosc.decompress + else: + raise ValueError("compress must be one of 'zlib' or 'blosc'") - # from a string + try: + return np.frombuffer( + _move_into_mutable_buffer(decompress(values)), + dtype=dtype, + ) + except _BadMove as e: + # Pull the decompressed data off of the `_BadMove` exception. + # We don't just store this in the locals because we want to + # minimize the risk of giving users access to a `bytes` object + # whose data is also given to a mutable buffer. + values = e.args[0] + if len(values) > 1: + # The empty string and single characters are memoized in many + # string creating functions in the capi. This case should not + # warn even though we need to make a copy because we are only + # copying at most 1 byte. + warnings.warn( + 'copying data after decompressing; this may mean that' + ' decompress is caching its result', + PerformanceWarning, + ) + # fall through to copying `np.fromstring` + + # Copy the string into a numpy array. return np.fromstring(values, dtype=dtype) diff --git a/pandas/io/tests/test_packers.py b/pandas/io/tests/test_packers.py index 9387a6069d974..276763989d7cf 100644 --- a/pandas/io/tests/test_packers.py +++ b/pandas/io/tests/test_packers.py @@ -10,11 +10,13 @@ from pandas.compat import u from pandas import (Series, DataFrame, Panel, MultiIndex, bdate_range, date_range, period_range, Index) +from pandas.core.common import PerformanceWarning from pandas.io.packers import to_msgpack, read_msgpack import pandas.util.testing as tm from pandas.util.testing import (ensure_clean, assert_index_equal, assert_series_equal, - assert_frame_equal) + assert_frame_equal, + patch) from pandas.tests.test_panel import assert_panel_equal import pandas @@ -539,17 +541,126 @@ def test_plain(self): for k in self.frame.keys(): assert_frame_equal(self.frame[k], i_rec[k]) - def test_compression_zlib(self): - i_rec = self.encode_decode(self.frame, compress='zlib') + def _test_compression(self, compress): + i_rec = self.encode_decode(self.frame, compress=compress) for k in self.frame.keys(): - assert_frame_equal(self.frame[k], i_rec[k]) + value = i_rec[k] + expected = self.frame[k] + assert_frame_equal(value, expected) + # make sure that we can write to the new frames + for block in value._data.blocks: + self.assertTrue(block.values.flags.writeable) + + def test_compression_zlib(self): + if not _ZLIB_INSTALLED: + raise nose.SkipTest('no zlib') + self._test_compression('zlib') def test_compression_blosc(self): if not _BLOSC_INSTALLED: raise nose.SkipTest('no blosc') - i_rec = self.encode_decode(self.frame, compress='blosc') - for k in self.frame.keys(): - assert_frame_equal(self.frame[k], i_rec[k]) + self._test_compression('blosc') + + def _test_compression_warns_when_decompress_caches(self, compress): + not_garbage = [] + control = [] # copied data + + compress_module = globals()[compress] + real_decompress = compress_module.decompress + + def decompress(ob): + """mock decompress function that delegates to the real + decompress but caches the result and a copy of the result. + """ + res = real_decompress(ob) + not_garbage.append(res) # hold a reference to this bytes object + control.append(bytearray(res)) # copy the data here to check later + return res + + # types mapped to values to add in place. + rhs = { + np.dtype('float64'): 1.0, + np.dtype('int32'): 1, + np.dtype('object'): 'a', + np.dtype('datetime64[ns]'): np.timedelta64(1, 'ns'), + np.dtype('timedelta64[ns]'): np.timedelta64(1, 'ns'), + } + + with patch(compress_module, 'decompress', decompress), \ + tm.assert_produces_warning(PerformanceWarning) as ws: + + i_rec = self.encode_decode(self.frame, compress=compress) + for k in self.frame.keys(): + + value = i_rec[k] + expected = self.frame[k] + assert_frame_equal(value, expected) + # make sure that we can write to the new frames even though + # we needed to copy the data + for block in value._data.blocks: + self.assertTrue(block.values.flags.writeable) + # mutate the data in some way + block.values[0] += rhs[block.dtype] + + for w in ws: + # check the messages from our warnings + self.assertEqual( + str(w.message), + 'copying data after decompressing; this may mean that' + ' decompress is caching its result', + ) + + for buf, control_buf in zip(not_garbage, control): + # make sure none of our mutations above affected the + # original buffers + self.assertEqual(buf, control_buf) + + def test_compression_warns_when_decompress_caches_zlib(self): + if not _ZLIB_INSTALLED: + raise nose.SkipTest('no zlib') + self._test_compression_warns_when_decompress_caches('zlib') + + def test_compression_warns_when_decompress_caches_blosc(self): + if not _BLOSC_INSTALLED: + raise nose.SkipTest('no blosc') + self._test_compression_warns_when_decompress_caches('blosc') + + def _test_small_strings_no_warn(self, compress): + empty = np.array([], dtype='uint8') + with tm.assert_produces_warning(None): + empty_unpacked = self.encode_decode(empty, compress=compress) + + np.testing.assert_array_equal(empty_unpacked, empty) + self.assertTrue(empty_unpacked.flags.writeable) + + char = np.array([ord(b'a')], dtype='uint8') + with tm.assert_produces_warning(None): + char_unpacked = self.encode_decode(char, compress=compress) + + np.testing.assert_array_equal(char_unpacked, char) + self.assertTrue(char_unpacked.flags.writeable) + # if this test fails I am sorry because the interpreter is now in a + # bad state where b'a' points to 98 == ord(b'b'). + char_unpacked[0] = ord(b'b') + + # we compare the ord of bytes b'a' with unicode u'a' because the should + # always be the same (unless we were able to mutate the shared + # character singleton in which case ord(b'a') == ord(b'b'). + self.assertEqual(ord(b'a'), ord(u'a')) + np.testing.assert_array_equal( + char_unpacked, + np.array([ord(b'b')], dtype='uint8'), + ) + + def test_small_strings_no_warn_zlib(self): + if not _ZLIB_INSTALLED: + raise nose.SkipTest('no zlib') + self._test_small_strings_no_warn('zlib') + + def test_small_strings_no_warn_blosc(self): + if not _BLOSC_INSTALLED: + raise nose.SkipTest('no blosc') + self._test_small_strings_no_warn('blosc') def test_readonly_axis_blosc(self): # GH11880 diff --git a/pandas/tests/test_util.py b/pandas/tests/test_util.py index 367b8d21f95d0..e87e9770b770a 100644 --- a/pandas/tests/test_util.py +++ b/pandas/tests/test_util.py @@ -1,6 +1,7 @@ # -*- coding: utf-8 -*- import nose +from pandas.util._move import move_into_mutable_buffer, BadMove from pandas.util.decorators import deprecate_kwarg from pandas.util.validators import validate_args, validate_kwargs @@ -150,6 +151,38 @@ def test_validation(self): kwargs = {'f': 'foo', 'b': 'bar'} validate_kwargs('func', kwargs, *compat_args) + +class TestMove(tm.TestCase): + def test_more_than_one_ref(self): + """Test case for when we try to use ``move_into_mutable_buffer`` when + the object being moved has other references. + """ + b = b'testing' + + with tm.assertRaises(BadMove) as e: + def handle_success(type_, value, tb): + self.assertIs(value.args[0], b) + return type(e).handle_success(e, type_, value, tb) # super + + e.handle_success = handle_success + move_into_mutable_buffer(b) + + def test_exactly_one_ref(self): + """Test case for when the object being moved has exactly one reference. + """ + b = b'testing' + + # We need to pass an expression on the stack to ensure that there are + # not extra references hanging around. We cannot rewrite this test as + # buf = b[:-3] + # as_stolen_buf = move_into_mutable_buffer(buf) + # because then we would have more than one reference to buf. + as_stolen_buf = move_into_mutable_buffer(b[:-3]) + + # materialize as bytearray to show that it is mutable + self.assertEqual(bytearray(as_stolen_buf), b'test') + + if __name__ == '__main__': nose.runmodule(argv=[__file__, '-vvs', '-x', '--pdb', '--pdb-failure'], exit=False) diff --git a/pandas/util/_move.c b/pandas/util/_move.c new file mode 100644 index 0000000000000..68fcad793e16c --- /dev/null +++ b/pandas/util/_move.c @@ -0,0 +1,274 @@ +#include + +#define COMPILING_IN_PY2 (PY_VERSION_HEX <= 0x03000000) + +#if !COMPILING_IN_PY2 +/* alias this because it is not aliased in Python 3 */ +#define PyString_CheckExact PyBytes_CheckExact +#define PyString_AS_STRING PyBytes_AS_STRING +#define PyString_GET_SIZE PyBytes_GET_SIZE +#endif /* !COMPILING_IN_PY2 */ + +#ifndef Py_TPFLAGS_HAVE_GETCHARBUFFER +#define Py_TPFLAGS_HAVE_GETCHARBUFFER 0 +#endif + +#ifndef Py_TPFLAGS_HAVE_NEWBUFFER +#define Py_TPFLAGS_HAVE_NEWBUFFER 0 +#endif + +PyObject *badmove; /* bad move exception class */ + +typedef struct { + PyObject_HEAD + /* the bytes that own the buffer we are mutating */ + PyObject *invalid_bytes; +} stolenbufobject; + +PyTypeObject stolenbuf_type; /* forward declare type */ + +static void +stolenbuf_dealloc(stolenbufobject *self) +{ + Py_DECREF(self->invalid_bytes); + PyObject_Del(self); +} + +static int +stolenbuf_getbuffer(stolenbufobject *self, Py_buffer *view, int flags) +{ + return PyBuffer_FillInfo(view, + (PyObject*) self, + (void*) PyString_AS_STRING(self->invalid_bytes), + PyString_GET_SIZE(self->invalid_bytes), + 0, /* not readonly */ + flags); +} + +#if COMPILING_IN_PY2 + +static Py_ssize_t +stolenbuf_getreadwritebuf(stolenbufobject *self, Py_ssize_t segment, void **out) +{ + if (segment != 0) { + PyErr_SetString(PyExc_SystemError, + "accessing non-existent string segment"); + return -1; + } + *out = PyString_AS_STRING(self->invalid_bytes); + return PyString_GET_SIZE(self->invalid_bytes); +} + +static Py_ssize_t +stolenbuf_getsegcount(stolenbufobject *self, Py_ssize_t *len) +{ + if (len) { + *len = PyString_GET_SIZE(self->invalid_bytes); + } + return 1; +} + +PyBufferProcs stolenbuf_as_buffer = { + (readbufferproc) stolenbuf_getreadwritebuf, + (writebufferproc) stolenbuf_getreadwritebuf, + (segcountproc) stolenbuf_getsegcount, + (charbufferproc) stolenbuf_getreadwritebuf, + (getbufferproc) stolenbuf_getbuffer, +}; + +#else /* Python 3 */ + +PyBufferProcs stolenbuf_as_buffer = { + (getbufferproc) stolenbuf_getbuffer, + NULL, +}; + +#endif /* COMPILING_IN_PY2 */ + +static PyObject * +stolenbuf_new(PyObject *self, PyObject *args, PyObject *kwargs) +{ + stolenbufobject *ret; + PyObject *bytes_rvalue; + + if (kwargs && PyDict_Size(kwargs)) { + PyErr_SetString(PyExc_TypeError, + "stolenbuf does not accept keyword arguments"); + return NULL; + } + + if (PyTuple_GET_SIZE(args) != 1) { + PyErr_SetString(PyExc_TypeError, + "stolenbuf requires exactly 1 positional argument"); + return NULL; + + } + + /* pull out the single, positional argument */ + bytes_rvalue = PyTuple_GET_ITEM(args, 0); + + if (!PyString_CheckExact(bytes_rvalue)) { + PyErr_SetString(PyExc_TypeError, + "stolenbuf can only steal from bytes objects"); + return NULL; + } + + if (Py_REFCNT(bytes_rvalue) != 1) { + /* there is a reference other than the caller's stack */ + PyErr_SetObject(badmove, bytes_rvalue); + return NULL; + } + + if (!(ret = PyObject_New(stolenbufobject, &stolenbuf_type))) { + return NULL; + } + + /* store the original bytes object in a field that is not + exposed to python */ + Py_INCREF(bytes_rvalue); + ret->invalid_bytes = bytes_rvalue; + return (PyObject*) ret; +} + +PyDoc_STRVAR( + stolenbuf_doc, + "Moves a bytes object that is about to be destroyed into a mutable buffer\n" + "without copying the data.\n" + "\n" + "Parameters\n" + "----------\n" + "bytes_rvalue : bytes with 1 refcount.\n" + " The bytes object that you want to move into a mutable buffer. This\n" + " cannot be a named object. It must only have a single reference.\n" + "\n" + "Returns\n" + "-------\n" + "buf : stolenbuf\n" + " An object that supports the buffer protocol which can give a mutable\n" + " view of the data that was previously owned by ``bytes_rvalue``.\n" + "\n" + "Raises\n" + "------\n" + "BadMove\n" + " Raised when a move is attempted on an object with more than one\n" + " reference.\n" + "\n" + "Notes\n" + "-----\n" + "If you want to use this function you are probably wrong.\n"); + +PyTypeObject stolenbuf_type = { + PyVarObject_HEAD_INIT(NULL, 0) + "pandas.util._move.stolenbuf", /* tp_name */ + sizeof(stolenbufobject), /* tp_basicsize */ + 0, /* tp_itemsize */ + (destructor) stolenbuf_dealloc, /* tp_dealloc */ + 0, /* tp_print */ + 0, /* tp_getattr */ + 0, /* tp_setattr */ + 0, /* tp_reserved */ + 0, /* tp_repr */ + 0, /* tp_as_number */ + 0, /* tp_as_sequence */ + 0, /* tp_as_mapping */ + 0, /* tp_hash */ + 0, /* tp_call */ + 0, /* tp_str */ + 0, /* tp_getattro */ + 0, /* tp_setattro */ + &stolenbuf_as_buffer, /* tp_as_buffer */ + Py_TPFLAGS_DEFAULT | + Py_TPFLAGS_HAVE_NEWBUFFER | + Py_TPFLAGS_HAVE_GETCHARBUFFER, /* tp_flags */ + stolenbuf_doc, /* tp_doc */ + 0, /* tp_traverse */ + 0, /* tp_clear */ + 0, /* tp_richcompare */ + 0, /* tp_weaklistoffset */ + 0, /* tp_iter */ + 0, /* tp_iternext */ + 0, /* tp_methods */ + 0, /* tp_members */ + 0, /* tp_getset */ + 0, /* tp_base */ + 0, /* tp_dict */ + 0, /* tp_descr_get */ + 0, /* tp_descr_set */ + 0, /* tp_dictoffset */ + 0, /* tp_init */ + 0, /* tp_alloc */ + (newfunc) stolenbuf_new, /* tp_new */ +}; + +#define MODULE_NAME "pandas.util._move" + +#if !COMPILING_IN_PY2 +PyModuleDef _move_module = { + PyModuleDef_HEAD_INIT, + MODULE_NAME, + NULL, + -1, +}; +#endif /* !COMPILING_IN_PY2 */ + +PyDoc_STRVAR( + badmove_doc, + "Exception used to indicate that a move was attempted on a value with\n" + "more than a single reference.\n" + "\n" + "Parameters\n" + "----------\n" + "data : any\n" + " The data which was passed to ``_move_into_mutable_buffer``.\n" + "\n" + "See Also\n" + "--------\n" + "pandas.util._move.move_into_mutable_buffer\n"); + +PyMODINIT_FUNC +#if !COMPILING_IN_PY2 +#define ERROR_RETURN NULL +PyInit__move(void) +#else +#define ERROR_RETURN +init_move(void) +#endif /* !COMPILING_IN_PY2 */ +{ + PyObject *m; + + if (!(badmove = PyErr_NewExceptionWithDoc("pandas.util._move.BadMove", + badmove_doc, + NULL, + NULL))) { + return ERROR_RETURN; + } + + if (PyType_Ready(&stolenbuf_type)) { + return ERROR_RETURN; + } + +#if !COMPILING_IN_PY2 + if (!(m = PyModule_Create(&_move_module))) +#else + if (!(m = Py_InitModule(MODULE_NAME, NULL))) +#endif /* !COMPILING_IN_PY2 */ + { + return ERROR_RETURN; + } + + if (PyModule_AddObject(m, + "move_into_mutable_buffer", + (PyObject*) &stolenbuf_type)) { + Py_DECREF(m); + return ERROR_RETURN; + } + + if (PyModule_AddObject(m, "BadMove", badmove)) { + Py_DECREF(m); + return ERROR_RETURN; + } + +#if !COMPILING_IN_PY2 + return m; +#endif /* !COMPILING_IN_PY2 */ +} diff --git a/pandas/util/testing.py b/pandas/util/testing.py index ebd1f7d2c17f8..3aebbf527bcd2 100644 --- a/pandas/util/testing.py +++ b/pandas/util/testing.py @@ -2313,3 +2313,55 @@ class SubclassedDataFrame(DataFrame): @property def _constructor(self): return SubclassedDataFrame + + +@contextmanager +def patch(ob, attr, value): + """Temporarily patch an attribute of an object. + + Parameters + ---------- + ob : any + The object to patch. This must support attribute assignment for `attr`. + attr : str + The name of the attribute to patch. + value : any + The temporary attribute to assign. + + Examples + -------- + >>> class C(object): + ... attribute = 'original' + ... + >>> C.attribute + 'original' + >>> with patch(C, 'attribute', 'patched'): + ... in_context = C.attribute + ... + >>> in_context + 'patched' + >>> C.attribute # the value is reset when the context manager exists + 'original' + + Correctly replaces attribute when the manager exits with an exception. + >>> with patch(C, 'attribute', 'patched'): + ... in_context = C.attribute + ... raise ValueError() + Traceback (most recent call last): + ... + ValueError + >>> in_context + 'patched' + >>> C.attribute + 'original' + """ + noattr = object() # mark that the attribute never existed + old = getattr(ob, attr, noattr) + setattr(ob, attr, value) + try: + yield + finally: + if old is noattr: + delattr(ob, attr) + else: + setattr(ob, attr, old) diff --git a/setup.py b/setup.py index e3fb5a007aad3..848e8b724edad 100755 --- a/setup.py +++ b/setup.py @@ -532,6 +532,13 @@ def pxd(name): extensions.append(ujson_ext) +# extension for pseudo-safely moving bytes into mutable buffers +_move_ext = Extension('pandas.util._move', + depends=[], + sources=['pandas/util/_move.c']) +extensions.append(_move_ext) + + if _have_setuptools: setuptools_kwargs["test_suite"] = "nose.collector"