Skip to content

Commit 28ba08c

Browse files
Joe Jevnikjreback
Joe Jevnik
authored andcommitted
ENH: always make ndarrays from msgpack writable
Addresses the case where 'compress' was not none. The old implementation would decompress the data and then call np.frombuffer on a bytes object. Because a bytes object is not a mutable buffer, the resulting ndarray had writeable=False. The new implementation ensures that the pandas is the only owner of this new buffer and then sets it to mutable without copying it. This means that we don't need to do a copy of the data coming in AND we can mutate it later. If we are not the only owner of this data then we just copy it with np.fromstring. Author: Joe Jevnik <[email protected]> Closes pandas-dev#12359 from llllllllll/msgpack-owns-new-memory and squashes the following commits: e896603 [Joe Jevnik] TST: adds `pandas.util.testing.patch` 84d7275 [Joe Jevnik] ENH: reimplement `_move_into_mutable_buffer` in C. 330dd76 [Joe Jevnik] ENH: updates unconvert no-copy code to be safer 830abba [Joe Jevnik] TST: adds test for memoized empty string and chars 9cd9d80 [Joe Jevnik] ENH: always make ndarrays from msgpack writable
1 parent 28327ce commit 28ba08c

File tree

7 files changed

+576
-21
lines changed

7 files changed

+576
-21
lines changed

doc/source/whatsnew/v0.18.1.txt

+5
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,12 @@ Enhancements
3737

3838

3939

40+
.. _whatsnew_0181.other:
4041

42+
Other Enhancements
43+
^^^^^^^^^^^^^^^^^^
44+
45+
- ``pd.read_msgpack()`` now always gives writeable ndarrays even when compression is used (:issue:`12359`).
4146

4247
.. _whatsnew_0181.api:
4348

pandas/io/packers.py

+88-14
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,11 @@
3838
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
3939
"""
4040

41-
import os
4241
from datetime import datetime, date, timedelta
4342
from dateutil.parser import parse
43+
import os
44+
from textwrap import dedent
45+
import warnings
4446

4547
import numpy as np
4648
from pandas import compat
@@ -52,12 +54,61 @@
5254
from pandas.sparse.api import SparseSeries, SparseDataFrame, SparsePanel
5355
from pandas.sparse.array import BlockIndex, IntIndex
5456
from pandas.core.generic import NDFrame
55-
from pandas.core.common import needs_i8_conversion, pandas_dtype
57+
from pandas.core.common import (
58+
PerformanceWarning,
59+
needs_i8_conversion,
60+
pandas_dtype,
61+
)
5662
from pandas.io.common import get_filepath_or_buffer
5763
from pandas.core.internals import BlockManager, make_block
5864
import pandas.core.internals as internals
5965

6066
from pandas.msgpack import Unpacker as _Unpacker, Packer as _Packer, ExtType
67+
from pandas.util._move import (
68+
BadMove as _BadMove,
69+
move_into_mutable_buffer as _move_into_mutable_buffer,
70+
)
71+
72+
# check whcih compression libs we have installed
73+
try:
74+
import zlib
75+
76+
def _check_zlib():
77+
pass
78+
except ImportError:
79+
def _check_zlib():
80+
raise ValueError('zlib is not installed')
81+
82+
_check_zlib.__doc__ = dedent(
83+
"""\
84+
Check if zlib is installed.
85+
86+
Raises
87+
------
88+
ValueError
89+
Raised when zlib is not installed.
90+
""",
91+
)
92+
93+
try:
94+
import blosc
95+
96+
def _check_blosc():
97+
pass
98+
except ImportError:
99+
def _check_blosc():
100+
raise ValueError('blosc is not installed')
101+
102+
_check_blosc.__doc__ = dedent(
103+
"""\
104+
Check if blosc is installed.
105+
106+
Raises
107+
------
108+
ValueError
109+
Raised when blosc is not installed.
110+
""",
111+
)
61112

62113
# until we can pass this into our conversion functions,
63114
# this is pretty hacky
@@ -215,25 +266,25 @@ def convert(values):
215266
return v.tolist()
216267

217268
if compressor == 'zlib':
269+
_check_zlib()
218270

219271
# return string arrays like they are
220272
if dtype == np.object_:
221273
return v.tolist()
222274

223275
# convert to a bytes array
224276
v = v.tostring()
225-
import zlib
226277
return ExtType(0, zlib.compress(v))
227278

228279
elif compressor == 'blosc':
280+
_check_blosc()
229281

230282
# return string arrays like they are
231283
if dtype == np.object_:
232284
return v.tolist()
233285

234286
# convert to a bytes array
235287
v = v.tostring()
236-
import blosc
237288
return ExtType(0, blosc.compress(v, typesize=dtype.itemsize))
238289

239290
# ndarray (on original dtype)
@@ -255,17 +306,40 @@ def unconvert(values, dtype, compress=None):
255306
if not as_is_ext:
256307
values = values.encode('latin1')
257308

258-
if compress == u'zlib':
259-
import zlib
260-
values = zlib.decompress(values)
261-
return np.frombuffer(values, dtype=dtype)
262-
263-
elif compress == u'blosc':
264-
import blosc
265-
values = blosc.decompress(values)
266-
return np.frombuffer(values, dtype=dtype)
309+
if compress:
310+
if compress == u'zlib':
311+
_check_zlib()
312+
decompress = zlib.decompress
313+
elif compress == u'blosc':
314+
_check_blosc()
315+
decompress = blosc.decompress
316+
else:
317+
raise ValueError("compress must be one of 'zlib' or 'blosc'")
267318

268-
# from a string
319+
try:
320+
return np.frombuffer(
321+
_move_into_mutable_buffer(decompress(values)),
322+
dtype=dtype,
323+
)
324+
except _BadMove as e:
325+
# Pull the decompressed data off of the `_BadMove` exception.
326+
# We don't just store this in the locals because we want to
327+
# minimize the risk of giving users access to a `bytes` object
328+
# whose data is also given to a mutable buffer.
329+
values = e.args[0]
330+
if len(values) > 1:
331+
# The empty string and single characters are memoized in many
332+
# string creating functions in the capi. This case should not
333+
# warn even though we need to make a copy because we are only
334+
# copying at most 1 byte.
335+
warnings.warn(
336+
'copying data after decompressing; this may mean that'
337+
' decompress is caching its result',
338+
PerformanceWarning,
339+
)
340+
# fall through to copying `np.fromstring`
341+
342+
# Copy the string into a numpy array.
269343
return np.fromstring(values, dtype=dtype)
270344

271345

pandas/io/tests/test_packers.py

+118-7
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,13 @@
1010
from pandas.compat import u
1111
from pandas import (Series, DataFrame, Panel, MultiIndex, bdate_range,
1212
date_range, period_range, Index)
13+
from pandas.core.common import PerformanceWarning
1314
from pandas.io.packers import to_msgpack, read_msgpack
1415
import pandas.util.testing as tm
1516
from pandas.util.testing import (ensure_clean, assert_index_equal,
1617
assert_series_equal,
17-
assert_frame_equal)
18+
assert_frame_equal,
19+
patch)
1820
from pandas.tests.test_panel import assert_panel_equal
1921

2022
import pandas
@@ -539,17 +541,126 @@ def test_plain(self):
539541
for k in self.frame.keys():
540542
assert_frame_equal(self.frame[k], i_rec[k])
541543

542-
def test_compression_zlib(self):
543-
i_rec = self.encode_decode(self.frame, compress='zlib')
544+
def _test_compression(self, compress):
545+
i_rec = self.encode_decode(self.frame, compress=compress)
544546
for k in self.frame.keys():
545-
assert_frame_equal(self.frame[k], i_rec[k])
547+
value = i_rec[k]
548+
expected = self.frame[k]
549+
assert_frame_equal(value, expected)
550+
# make sure that we can write to the new frames
551+
for block in value._data.blocks:
552+
self.assertTrue(block.values.flags.writeable)
553+
554+
def test_compression_zlib(self):
555+
if not _ZLIB_INSTALLED:
556+
raise nose.SkipTest('no zlib')
557+
self._test_compression('zlib')
546558

547559
def test_compression_blosc(self):
548560
if not _BLOSC_INSTALLED:
549561
raise nose.SkipTest('no blosc')
550-
i_rec = self.encode_decode(self.frame, compress='blosc')
551-
for k in self.frame.keys():
552-
assert_frame_equal(self.frame[k], i_rec[k])
562+
self._test_compression('blosc')
563+
564+
def _test_compression_warns_when_decompress_caches(self, compress):
565+
not_garbage = []
566+
control = [] # copied data
567+
568+
compress_module = globals()[compress]
569+
real_decompress = compress_module.decompress
570+
571+
def decompress(ob):
572+
"""mock decompress function that delegates to the real
573+
decompress but caches the result and a copy of the result.
574+
"""
575+
res = real_decompress(ob)
576+
not_garbage.append(res) # hold a reference to this bytes object
577+
control.append(bytearray(res)) # copy the data here to check later
578+
return res
579+
580+
# types mapped to values to add in place.
581+
rhs = {
582+
np.dtype('float64'): 1.0,
583+
np.dtype('int32'): 1,
584+
np.dtype('object'): 'a',
585+
np.dtype('datetime64[ns]'): np.timedelta64(1, 'ns'),
586+
np.dtype('timedelta64[ns]'): np.timedelta64(1, 'ns'),
587+
}
588+
589+
with patch(compress_module, 'decompress', decompress), \
590+
tm.assert_produces_warning(PerformanceWarning) as ws:
591+
592+
i_rec = self.encode_decode(self.frame, compress=compress)
593+
for k in self.frame.keys():
594+
595+
value = i_rec[k]
596+
expected = self.frame[k]
597+
assert_frame_equal(value, expected)
598+
# make sure that we can write to the new frames even though
599+
# we needed to copy the data
600+
for block in value._data.blocks:
601+
self.assertTrue(block.values.flags.writeable)
602+
# mutate the data in some way
603+
block.values[0] += rhs[block.dtype]
604+
605+
for w in ws:
606+
# check the messages from our warnings
607+
self.assertEqual(
608+
str(w.message),
609+
'copying data after decompressing; this may mean that'
610+
' decompress is caching its result',
611+
)
612+
613+
for buf, control_buf in zip(not_garbage, control):
614+
# make sure none of our mutations above affected the
615+
# original buffers
616+
self.assertEqual(buf, control_buf)
617+
618+
def test_compression_warns_when_decompress_caches_zlib(self):
619+
if not _ZLIB_INSTALLED:
620+
raise nose.SkipTest('no zlib')
621+
self._test_compression_warns_when_decompress_caches('zlib')
622+
623+
def test_compression_warns_when_decompress_caches_blosc(self):
624+
if not _BLOSC_INSTALLED:
625+
raise nose.SkipTest('no blosc')
626+
self._test_compression_warns_when_decompress_caches('blosc')
627+
628+
def _test_small_strings_no_warn(self, compress):
629+
empty = np.array([], dtype='uint8')
630+
with tm.assert_produces_warning(None):
631+
empty_unpacked = self.encode_decode(empty, compress=compress)
632+
633+
np.testing.assert_array_equal(empty_unpacked, empty)
634+
self.assertTrue(empty_unpacked.flags.writeable)
635+
636+
char = np.array([ord(b'a')], dtype='uint8')
637+
with tm.assert_produces_warning(None):
638+
char_unpacked = self.encode_decode(char, compress=compress)
639+
640+
np.testing.assert_array_equal(char_unpacked, char)
641+
self.assertTrue(char_unpacked.flags.writeable)
642+
# if this test fails I am sorry because the interpreter is now in a
643+
# bad state where b'a' points to 98 == ord(b'b').
644+
char_unpacked[0] = ord(b'b')
645+
646+
# we compare the ord of bytes b'a' with unicode u'a' because the should
647+
# always be the same (unless we were able to mutate the shared
648+
# character singleton in which case ord(b'a') == ord(b'b').
649+
self.assertEqual(ord(b'a'), ord(u'a'))
650+
np.testing.assert_array_equal(
651+
char_unpacked,
652+
np.array([ord(b'b')], dtype='uint8'),
653+
)
654+
655+
def test_small_strings_no_warn_zlib(self):
656+
if not _ZLIB_INSTALLED:
657+
raise nose.SkipTest('no zlib')
658+
self._test_small_strings_no_warn('zlib')
659+
660+
def test_small_strings_no_warn_blosc(self):
661+
if not _BLOSC_INSTALLED:
662+
raise nose.SkipTest('no blosc')
663+
self._test_small_strings_no_warn('blosc')
553664

554665
def test_readonly_axis_blosc(self):
555666
# GH11880

pandas/tests/test_util.py

+33
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# -*- coding: utf-8 -*-
22
import nose
33

4+
from pandas.util._move import move_into_mutable_buffer, BadMove
45
from pandas.util.decorators import deprecate_kwarg
56
from pandas.util.validators import validate_args, validate_kwargs
67

@@ -150,6 +151,38 @@ def test_validation(self):
150151
kwargs = {'f': 'foo', 'b': 'bar'}
151152
validate_kwargs('func', kwargs, *compat_args)
152153

154+
155+
class TestMove(tm.TestCase):
156+
def test_more_than_one_ref(self):
157+
"""Test case for when we try to use ``move_into_mutable_buffer`` when
158+
the object being moved has other references.
159+
"""
160+
b = b'testing'
161+
162+
with tm.assertRaises(BadMove) as e:
163+
def handle_success(type_, value, tb):
164+
self.assertIs(value.args[0], b)
165+
return type(e).handle_success(e, type_, value, tb) # super
166+
167+
e.handle_success = handle_success
168+
move_into_mutable_buffer(b)
169+
170+
def test_exactly_one_ref(self):
171+
"""Test case for when the object being moved has exactly one reference.
172+
"""
173+
b = b'testing'
174+
175+
# We need to pass an expression on the stack to ensure that there are
176+
# not extra references hanging around. We cannot rewrite this test as
177+
# buf = b[:-3]
178+
# as_stolen_buf = move_into_mutable_buffer(buf)
179+
# because then we would have more than one reference to buf.
180+
as_stolen_buf = move_into_mutable_buffer(b[:-3])
181+
182+
# materialize as bytearray to show that it is mutable
183+
self.assertEqual(bytearray(as_stolen_buf), b'test')
184+
185+
153186
if __name__ == '__main__':
154187
nose.runmodule(argv=[__file__, '-vvs', '-x', '--pdb', '--pdb-failure'],
155188
exit=False)

0 commit comments

Comments
 (0)