Skip to content

ENH: always make ndarrays from msgpack writable #12359

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions doc/source/whatsnew/v0.18.1.txt
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,11 @@ Enhancements



Other Enhancements
^^^^^^^^^^^^^^^^^^

- ``from_msgpack`` now always gives writeable ndarrays even when compression is
used (:issue:`12359`).

.. _whatsnew_0181.api:

Expand Down
102 changes: 88 additions & 14 deletions pandas/io/packers.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,11 @@
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""

import os
from datetime import datetime, date, timedelta
from dateutil.parser import parse
import os
from textwrap import dedent
import warnings

import numpy as np
from pandas import compat
Expand All @@ -52,12 +54,61 @@
from pandas.sparse.api import SparseSeries, SparseDataFrame, SparsePanel
from pandas.sparse.array import BlockIndex, IntIndex
from pandas.core.generic import NDFrame
from pandas.core.common import needs_i8_conversion, pandas_dtype
from pandas.core.common import (
PerformanceWarning,
needs_i8_conversion,
pandas_dtype,
)
from pandas.io.common import get_filepath_or_buffer
from pandas.core.internals import BlockManager, make_block
import pandas.core.internals as internals

from pandas.msgpack import Unpacker as _Unpacker, Packer as _Packer, ExtType
from pandas.util._move import (
BadMove as _BadMove,
move_into_mutable_buffer as _move_into_mutable_buffer,
)

# check whcih compression libs we have installed
try:
import zlib

def _check_zlib():
pass
except ImportError:
def _check_zlib():
raise ValueError('zlib is not installed')

_check_zlib.__doc__ = dedent(
"""\
Check if zlib is installed.

Raises
------
ValueError
Raised when zlib is not installed.
""",
)

try:
import blosc

def _check_blosc():
pass
except ImportError:
def _check_blosc():
raise ValueError('blosc is not installed')

_check_blosc.__doc__ = dedent(
"""\
Check if blosc is installed.

Raises
------
ValueError
Raised when blosc is not installed.
""",
)

# until we can pass this into our conversion functions,
# this is pretty hacky
Expand Down Expand Up @@ -215,25 +266,25 @@ def convert(values):
return v.tolist()

if compressor == 'zlib':
_check_zlib()

# return string arrays like they are
if dtype == np.object_:
return v.tolist()

# convert to a bytes array
v = v.tostring()
import zlib
return ExtType(0, zlib.compress(v))

elif compressor == 'blosc':
_check_blosc()

# return string arrays like they are
if dtype == np.object_:
return v.tolist()

# convert to a bytes array
v = v.tostring()
import blosc
return ExtType(0, blosc.compress(v, typesize=dtype.itemsize))

# ndarray (on original dtype)
Expand All @@ -255,17 +306,40 @@ def unconvert(values, dtype, compress=None):
if not as_is_ext:
values = values.encode('latin1')

if compress == u'zlib':
import zlib
values = zlib.decompress(values)
return np.frombuffer(values, dtype=dtype)

elif compress == u'blosc':
import blosc
values = blosc.decompress(values)
return np.frombuffer(values, dtype=dtype)
if compress:
if compress == u'zlib':
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would try to import these at the top of the file and just set variables (_ZLIB_INSTALLED, _BLOSC_INSTALLED); and you can do whatever imports you need there.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done, added _check_* functions to raise a valueerror here if you try to use a compressor that is not installed

_check_zlib()
decompress = zlib.decompress
elif compress == u'blosc':
_check_blosc()
decompress = blosc.decompress
else:
raise ValueError("compress must be one of 'zlib' or 'blosc'")

# from a string
try:
return np.frombuffer(
_move_into_mutable_buffer(decompress(values)),
dtype=dtype,
)
except _BadMove as e:
# Pull the decompressed data off of the `_BadMove` exception.
# We don't just store this in the locals because we want to
# minimize the risk of giving users access to a `bytes` object
# whose data is also given to a mutable buffer.
values = e.args[0]
if len(values) > 1:
# The empty string and single characters are memoized in many
# string creating functions in the capi. This case should not
# warn even though we need to make a copy because we are only
# copying at most 1 byte.
warnings.warn(
'copying data after decompressing; this may mean that'
' decompress is caching its result',
PerformanceWarning,
)
# fall through to copying `np.fromstring`

# Copy the string into a numpy array.
return np.fromstring(values, dtype=dtype)


Expand Down
125 changes: 118 additions & 7 deletions pandas/io/tests/test_packers.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@
from pandas.compat import u
from pandas import (Series, DataFrame, Panel, MultiIndex, bdate_range,
date_range, period_range, Index)
from pandas.core.common import PerformanceWarning
from pandas.io.packers import to_msgpack, read_msgpack
import pandas.util.testing as tm
from pandas.util.testing import (ensure_clean, assert_index_equal,
assert_series_equal,
assert_frame_equal)
assert_frame_equal,
patch)
from pandas.tests.test_panel import assert_panel_equal

import pandas
Expand Down Expand Up @@ -539,17 +541,126 @@ def test_plain(self):
for k in self.frame.keys():
assert_frame_equal(self.frame[k], i_rec[k])

def test_compression_zlib(self):
i_rec = self.encode_decode(self.frame, compress='zlib')
def _test_compression(self, compress):
i_rec = self.encode_decode(self.frame, compress=compress)
for k in self.frame.keys():
assert_frame_equal(self.frame[k], i_rec[k])
value = i_rec[k]
expected = self.frame[k]
assert_frame_equal(value, expected)
# make sure that we can write to the new frames
for block in value._data.blocks:
self.assertTrue(block.values.flags.writeable)

def test_compression_zlib(self):
if not _ZLIB_INSTALLED:
raise nose.SkipTest('no zlib')
self._test_compression('zlib')

def test_compression_blosc(self):
if not _BLOSC_INSTALLED:
raise nose.SkipTest('no blosc')
i_rec = self.encode_decode(self.frame, compress='blosc')
for k in self.frame.keys():
assert_frame_equal(self.frame[k], i_rec[k])
self._test_compression('blosc')

def _test_compression_warns_when_decompress_caches(self, compress):
not_garbage = []
control = [] # copied data

compress_module = globals()[compress]
real_decompress = compress_module.decompress

def decompress(ob):
"""mock decompress function that delegates to the real
decompress but caches the result and a copy of the result.
"""
res = real_decompress(ob)
not_garbage.append(res) # hold a reference to this bytes object
control.append(bytearray(res)) # copy the data here to check later
return res

# types mapped to values to add in place.
rhs = {
np.dtype('float64'): 1.0,
np.dtype('int32'): 1,
np.dtype('object'): 'a',
np.dtype('datetime64[ns]'): np.timedelta64(1, 'ns'),
np.dtype('timedelta64[ns]'): np.timedelta64(1, 'ns'),
}

with patch(compress_module, 'decompress', decompress), \
tm.assert_produces_warning(PerformanceWarning) as ws:

i_rec = self.encode_decode(self.frame, compress=compress)
for k in self.frame.keys():

value = i_rec[k]
expected = self.frame[k]
assert_frame_equal(value, expected)
# make sure that we can write to the new frames even though
# we needed to copy the data
for block in value._data.blocks:
self.assertTrue(block.values.flags.writeable)
# mutate the data in some way
block.values[0] += rhs[block.dtype]

for w in ws:
# check the messages from our warnings
self.assertEqual(
str(w.message),
'copying data after decompressing; this may mean that'
' decompress is caching its result',
)

for buf, control_buf in zip(not_garbage, control):
# make sure none of our mutations above affected the
# original buffers
self.assertEqual(buf, control_buf)

def test_compression_warns_when_decompress_caches_zlib(self):
if not _ZLIB_INSTALLED:
raise nose.SkipTest('no zlib')
self._test_compression_warns_when_decompress_caches('zlib')

def test_compression_warns_when_decompress_caches_blosc(self):
if not _BLOSC_INSTALLED:
raise nose.SkipTest('no blosc')
self._test_compression_warns_when_decompress_caches('blosc')

def _test_small_strings_no_warn(self, compress):
empty = np.array([], dtype='uint8')
with tm.assert_produces_warning(None):
empty_unpacked = self.encode_decode(empty, compress=compress)

np.testing.assert_array_equal(empty_unpacked, empty)
self.assertTrue(empty_unpacked.flags.writeable)

char = np.array([ord(b'a')], dtype='uint8')
with tm.assert_produces_warning(None):
char_unpacked = self.encode_decode(char, compress=compress)

np.testing.assert_array_equal(char_unpacked, char)
self.assertTrue(char_unpacked.flags.writeable)
# if this test fails I am sorry because the interpreter is now in a
# bad state where b'a' points to 98 == ord(b'b').
char_unpacked[0] = ord(b'b')

# we compare the ord of bytes b'a' with unicode u'a' because the should
# always be the same (unless we were able to mutate the shared
# character singleton in which case ord(b'a') == ord(b'b').
self.assertEqual(ord(b'a'), ord(u'a'))
np.testing.assert_array_equal(
char_unpacked,
np.array([ord(b'b')], dtype='uint8'),
)

def test_small_strings_no_warn_zlib(self):
if not _ZLIB_INSTALLED:
raise nose.SkipTest('no zlib')
self._test_small_strings_no_warn('zlib')

def test_small_strings_no_warn_blosc(self):
if not _BLOSC_INSTALLED:
raise nose.SkipTest('no blosc')
self._test_small_strings_no_warn('blosc')

def test_readonly_axis_blosc(self):
# GH11880
Expand Down
33 changes: 33 additions & 0 deletions pandas/tests/test_util.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# -*- coding: utf-8 -*-
import nose

from pandas.util._move import move_into_mutable_buffer, BadMove
from pandas.util.decorators import deprecate_kwarg
from pandas.util.validators import validate_args, validate_kwargs

Expand Down Expand Up @@ -150,6 +151,38 @@ def test_validation(self):
kwargs = {'f': 'foo', 'b': 'bar'}
validate_kwargs('func', kwargs, *compat_args)


class TestMove(tm.TestCase):
def test_more_than_one_ref(self):
"""Test case for when we try to use ``move_into_mutable_buffer`` when
the object being moved has other references.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

its funny, when you use a doc-string it calls the test by this. I guess that is right.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should I just make this a comment instead or do you not mind?

"""
b = b'testing'

with tm.assertRaises(BadMove) as e:
def handle_success(type_, value, tb):
self.assertIs(value.args[0], b)
return type(e).handle_success(e, type_, value, tb) # super

e.handle_success = handle_success
move_into_mutable_buffer(b)

def test_exactly_one_ref(self):
"""Test case for when the object being moved has exactly one reference.
"""
b = b'testing'

# We need to pass an expression on the stack to ensure that there are
# not extra references hanging around. We cannot rewrite this test as
# buf = b[:-3]
# as_stolen_buf = move_into_mutable_buffer(buf)
# because then we would have more than one reference to buf.
as_stolen_buf = move_into_mutable_buffer(b[:-3])

# materialize as bytearray to show that it is mutable
self.assertEqual(bytearray(as_stolen_buf), b'test')


if __name__ == '__main__':
nose.runmodule(argv=[__file__, '-vvs', '-x', '--pdb', '--pdb-failure'],
exit=False)
Loading