Skip to content

Commit 9cd9d80

Browse files
author
Joe Jevnik
committed
ENH: always make ndarrays from msgpack writable
Addresses the case where 'compress' was not none. The old implementation would decompress the data and then call np.frombuffer on a bytes object. Because a bytes object is not a mutable buffer, the resulting ndarray had writeable=False. The new implementation ensures that the pandas is the only owner of this new buffer and then sets it to mutable without copying it. This means that we don't need to do a copy of the data coming in AND we can mutate it later. If we are not the only owner of this data then we just copy it with np.fromstring.
1 parent 6e64787 commit 9cd9d80

File tree

3 files changed

+135
-18
lines changed

3 files changed

+135
-18
lines changed

doc/source/whatsnew/v0.18.1.txt

+4
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,11 @@ Enhancements
3737

3838

3939

40+
Other Enhancements
41+
^^^^^^^^^^^^^^^^^^
4042

43+
- ``from_msgpack`` now always gives writeable ndarrays even when compression is
44+
used (:issue:`12359`).
4145

4246
.. _whatsnew_0181.api:
4347

pandas/io/packers.py

+37-12
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,11 @@
3838
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
3939
"""
4040

41-
import os
4241
from datetime import datetime, date, timedelta
4342
from dateutil.parser import parse
43+
import os
44+
import sys
45+
import warnings
4446

4547
import numpy as np
4648
from pandas import compat
@@ -52,7 +54,11 @@
5254
from pandas.sparse.api import SparseSeries, SparseDataFrame, SparsePanel
5355
from pandas.sparse.array import BlockIndex, IntIndex
5456
from pandas.core.generic import NDFrame
55-
from pandas.core.common import needs_i8_conversion, pandas_dtype
57+
from pandas.core.common import (
58+
PerformanceWarning,
59+
needs_i8_conversion,
60+
pandas_dtype,
61+
)
5662
from pandas.io.common import get_filepath_or_buffer
5763
from pandas.core.internals import BlockManager, make_block
5864
import pandas.core.internals as internals
@@ -255,17 +261,36 @@ def unconvert(values, dtype, compress=None):
255261
if not as_is_ext:
256262
values = values.encode('latin1')
257263

258-
if compress == u'zlib':
259-
import zlib
260-
values = zlib.decompress(values)
261-
return np.frombuffer(values, dtype=dtype)
262-
263-
elif compress == u'blosc':
264-
import blosc
265-
values = blosc.decompress(values)
266-
return np.frombuffer(values, dtype=dtype)
264+
if compress:
265+
if compress == u'zlib':
266+
from zlib import decompress
267+
elif compress == u'blosc':
268+
from blosc import decompress
269+
else:
270+
raise ValueError("compress must be one of 'zlib' or 'blosc'")
271+
272+
values = decompress(values)
273+
if sys.getrefcount(values) == 2:
274+
arr = np.frombuffer(values, dtype=dtype)
275+
# We are setting the memory owned by a bytes object as mutable.
276+
# We can do this because we know that no one has a reference to
277+
# this object since it was just created in the call to decompress
278+
# and we have checked that we have the only reference.
279+
# the refcnt reports as 2 instead of 1 because we incref the
280+
# values object when we push it on the stack to call getrefcnt.
281+
# The 2 references are then the local variable `values` and
282+
# TOS.
283+
arr.flags.writeable = True
284+
return arr
285+
else:
286+
warnings.warn(
287+
'copying data after decompressing; this may mean that'
288+
' decompress is caching its result',
289+
PerformanceWarning,
290+
)
291+
# fall through to copying `np.fromstring`
267292

268-
# from a string
293+
# Copy the string into a numpy array.
269294
return np.fromstring(values, dtype=dtype)
270295

271296

pandas/io/tests/test_packers.py

+94-6
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import nose
22

3+
from contextlib import contextmanager
34
import os
45
import datetime
56
import numpy as np
@@ -10,6 +11,7 @@
1011
from pandas.compat import u
1112
from pandas import (Series, DataFrame, Panel, MultiIndex, bdate_range,
1213
date_range, period_range, Index)
14+
from pandas.core.common import PerformanceWarning
1315
from pandas.io.packers import to_msgpack, read_msgpack
1416
import pandas.util.testing as tm
1517
from pandas.util.testing import (ensure_clean, assert_index_equal,
@@ -57,6 +59,20 @@ def check_arbitrary(a, b):
5759
assert(a == b)
5860

5961

62+
@contextmanager
63+
def patch(ob, attr, value):
64+
noattr = object() # mark that the attribute never existed
65+
old = getattr(ob, attr, noattr)
66+
setattr(ob, attr, value)
67+
try:
68+
yield
69+
finally:
70+
if old is noattr:
71+
delattr(ob, attr)
72+
else:
73+
setattr(ob, attr, old)
74+
75+
6076
class TestPackers(tm.TestCase):
6177

6278
def setUp(self):
@@ -539,17 +555,89 @@ def test_plain(self):
539555
for k in self.frame.keys():
540556
assert_frame_equal(self.frame[k], i_rec[k])
541557

542-
def test_compression_zlib(self):
543-
i_rec = self.encode_decode(self.frame, compress='zlib')
558+
def _test_compression(self, compress):
559+
i_rec = self.encode_decode(self.frame, compress=compress)
544560
for k in self.frame.keys():
545-
assert_frame_equal(self.frame[k], i_rec[k])
561+
value = i_rec[k]
562+
expected = self.frame[k]
563+
assert_frame_equal(value, expected)
564+
# make sure that we can write to the new frames
565+
for block in value._data.blocks:
566+
self.assertTrue(block.values.flags.writeable)
567+
568+
def test_compression_zlib(self):
569+
if not _ZLIB_INSTALLED:
570+
raise nose.SkipTest('no zlib')
571+
self._test_compression('zlib')
546572

547573
def test_compression_blosc(self):
548574
if not _BLOSC_INSTALLED:
549575
raise nose.SkipTest('no blosc')
550-
i_rec = self.encode_decode(self.frame, compress='blosc')
551-
for k in self.frame.keys():
552-
assert_frame_equal(self.frame[k], i_rec[k])
576+
self._test_compression('blosc')
577+
578+
def _test_compression_warns_when_decompress_caches(self, compress):
579+
not_garbage = []
580+
control = [] # copied data
581+
582+
compress_module = globals()[compress]
583+
real_decompress = compress_module.decompress
584+
585+
def decompress(ob):
586+
"""mock decompress function that delegates to the real
587+
decompress but caches the result and a copy of the result.
588+
"""
589+
res = real_decompress(ob)
590+
not_garbage.append(res) # hold a reference to this bytes object
591+
control.append(bytearray(res)) # copy the data here to check later
592+
return res
593+
594+
# types mapped to values to add in place.
595+
rhs = {
596+
np.dtype('float64'): 1.0,
597+
np.dtype('int32'): 1,
598+
np.dtype('object'): 'a',
599+
np.dtype('datetime64[ns]'): np.timedelta64(1, 'ns'),
600+
np.dtype('timedelta64[ns]'): np.timedelta64(1, 'ns'),
601+
}
602+
603+
with patch(compress_module, 'decompress', decompress), \
604+
tm.assert_produces_warning(PerformanceWarning) as ws:
605+
606+
i_rec = self.encode_decode(self.frame, compress=compress)
607+
for k in self.frame.keys():
608+
609+
value = i_rec[k]
610+
expected = self.frame[k]
611+
assert_frame_equal(value, expected)
612+
# make sure that we can write to the new frames even though
613+
# we needed to copy the data
614+
for block in value._data.blocks:
615+
self.assertTrue(block.values.flags.writeable)
616+
# mutate the data in some way
617+
block.values[0] += rhs[block.dtype]
618+
619+
for w in ws:
620+
# check the messages from our warnings
621+
self.assertEqual(
622+
str(w.message),
623+
'copying data after decompressing; this may mean that'
624+
' decompress is caching its result',
625+
)
626+
627+
for buf, control_buf in zip(not_garbage, control):
628+
# make sure none of our mutations above affected the
629+
# original buffers
630+
self.assertEqual(buf, control_buf)
631+
632+
def test_compression_warns_when_decompress_caches_zlib(self):
633+
if not _ZLIB_INSTALLED:
634+
raise nose.SkipTest('no zlib')
635+
self._test_compression_warns_when_decompress_caches('zlib')
636+
637+
def test_compression_warns_when_decompress_caches_blosc(self):
638+
if not _ZLIB_INSTALLED:
639+
raise nose.SkipTest('no blosc')
640+
self._test_compression_warns_when_decompress_caches('blosc')
553641

554642
def test_readonly_axis_blosc(self):
555643
# GH11880

0 commit comments

Comments
 (0)