Skip to content

Commit d677ed1

Browse files
authored
Merge pull request pandas-dev#533 from manahl/issue-490
Issue pandas-dev#490: Make arctic compatible with numpy 1.14
2 parents cb2024b + 15bbc3b commit d677ed1

File tree

10 files changed

+162
-33
lines changed

10 files changed

+162
-33
lines changed

CHANGES.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
### 1.64
44
* Bugfix: #531 arctic_prune_versions: clean broken snapshot references before pruning
5+
* Feature: #490 add support to numpy 1.14
56

67
### 1.63 (2018-04-06)
78
* Bugfix: #521 Clang 6.0 compiler support on macOS

arctic/serialization/numpy_arrays.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -139,15 +139,17 @@ def objify(self, doc, columns=None):
139139

140140
for col in cols:
141141
d = decompress(doc[DATA][doc[METADATA][LENGTHS][col][0]: doc[METADATA][LENGTHS][col][1] + 1])
142-
d = np.fromstring(d, doc[METADATA][DTYPE][col])
142+
# d is ready-only but that's not an issue since DataFrame will copy the data anyway.
143+
d = np.frombuffer(d, doc[METADATA][DTYPE][col])
143144

144145
if MASK in doc[METADATA] and col in doc[METADATA][MASK]:
145146
mask_data = decompress(doc[METADATA][MASK][col])
146-
mask = np.fromstring(mask_data, 'bool')
147+
mask = np.frombuffer(mask_data, 'bool')
147148
d = ma.masked_array(d, mask)
148149
data[col] = d
149150

150-
return pd.DataFrame(data, columns=cols)[cols]
151+
# Copy into
152+
return pd.DataFrame(data, columns=cols, copy=True)[cols]
151153

152154

153155
class FrametoArraySerializer(Serializer):

arctic/store/_ndarray_store.py

Lines changed: 48 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,44 @@ def _attempt_update_unchanged(symbol, unchanged_segment_ids, collection, version
6262
symbol, previous_version['version'], result.matched_count, len(unchanged_segment_ids)))
6363

6464

65+
def _resize_with_dtype(arr, dtype):
66+
"""
67+
This function will transform arr into an array with the same type as dtype. It will do this by
68+
filling new columns with zeros (or NaNs, if it is a float column). Also, columns that are not
69+
in the new dtype will be dropped.
70+
"""
71+
structured_arrays = dtype.names is not None and arr.dtype.names is not None
72+
old_columns = set(arr.dtype.names or [])
73+
new_columns = set(dtype.names or [])
74+
75+
# In numpy 1.9 the ndarray.astype method used to handle changes in number of fields. The code below
76+
# should replicate the same behaviour the old astype used to have.
77+
#
78+
# One may be tempted to use np.lib.recfunctions.stack_arrays to implement both this step and the
79+
# concatenate that follows but it 2x slower and it requires providing your own default values (instead
80+
# of np.zeros).
81+
#
82+
# Numpy 1.14 supports doing new_arr[old_columns] = arr[old_columns], which is faster than the code below
83+
# (in benchmarks it seems to be even slightly faster than using the old astype). However, that is not
84+
# supported by numpy 1.9.2.
85+
if structured_arrays and (old_columns != new_columns):
86+
new_arr = np.zeros(arr.shape, dtype)
87+
for c in old_columns & new_columns:
88+
new_arr[c] = arr[c]
89+
90+
# missing float columns should default to nan rather than zero
91+
_is_float_type = lambda _dtype: _dtype.type in (np.float32, np.float64)
92+
_is_void_float_type = lambda _dtype: _dtype.type == np.void and _is_float_type(_dtype.subdtype[0])
93+
_is_float_or_void_float_type = lambda _dtype: _is_float_type(_dtype) or _is_void_float_type(_dtype)
94+
_is_float = lambda column: _is_float_or_void_float_type(dtype.fields[column][0])
95+
for new_column in filter(_is_float, new_columns - old_columns):
96+
new_arr[new_column] = np.nan
97+
else:
98+
new_arr = arr.astype(dtype)
99+
100+
return new_arr
101+
102+
65103
class NdarrayStore(object):
66104
"""Chunked store for arbitrary ndarrays, supporting append.
67105
@@ -210,23 +248,18 @@ def _do_read(self, collection, version, symbol, index_range=None):
210248
else:
211249
segment_count = version.get('segment_count', None)
212250

213-
segments = []
251+
data = bytearray()
214252
i = -1
215253
for i, x in enumerate(collection.find(spec, sort=[('segment', pymongo.ASCENDING)],)):
216-
segments.append(decompress(x['data']) if x['compressed'] else x['data'])
217-
218-
data = b''.join(segments)
219-
220-
# free up memory from initial copy of data
221-
del segments
254+
data.extend(decompress(x['data']) if x['compressed'] else x['data'])
222255

223256
# Check that the correct number of segments has been returned
224257
if segment_count is not None and i + 1 != segment_count:
225258
raise OperationFailure("Incorrect number of segments returned for {}:{}. Expected: {}, but got {}. {}".format(
226259
symbol, version['version'], segment_count, i + 1, collection.database.name + '.' + collection.name))
227260

228261
dtype = self._dtype(version['dtype'], version.get('dtype_metadata', {}))
229-
rtn = np.fromstring(data, dtype=dtype).reshape(version.get('shape', (-1)))
262+
rtn = np.frombuffer(data, dtype=dtype).reshape(version.get('shape', (-1)))
230263
return rtn
231264

232265
def _promote_types(self, dtype, dtype_str):
@@ -247,6 +280,9 @@ def append(self, arctic_lib, version, symbol, item, previous_version, dtype=None
247280

248281
if not dtype:
249282
dtype = item.dtype
283+
284+
if (self._dtype(previous_version['dtype']).fields is None) != (dtype.fields is None):
285+
raise ValueError("type changes to or from structured array not supported")
250286

251287
if previous_version['up_to'] == 0:
252288
dtype = dtype
@@ -263,17 +299,10 @@ def append(self, arctic_lib, version, symbol, item, previous_version, dtype=None
263299
version['dtype_metadata'] = dict(dtype.metadata or {})
264300
version['type'] = self.TYPE
265301

266-
old_arr = self._do_read(collection, previous_version, symbol).astype(dtype)
267-
# missing float columns should default to nan rather than zero
268-
old_dtype = self._dtype(previous_version['dtype'])
269-
if dtype.names is not None and old_dtype.names is not None:
270-
new_columns = set(dtype.names) - set(old_dtype.names)
271-
_is_float_type = lambda _dtype: _dtype.type in (np.float32, np.float64)
272-
_is_void_float_type = lambda _dtype: _dtype.type == np.void and _is_float_type(_dtype.subdtype[0])
273-
_is_float_or_void_float_type = lambda _dtype: _is_float_type(_dtype) or _is_void_float_type(_dtype)
274-
_is_float = lambda column: _is_float_or_void_float_type(dtype.fields[column][0])
275-
for new_column in filter(_is_float, new_columns):
276-
old_arr[new_column] = np.nan
302+
# This function will drop columns read from the previous version if they are not found in the
303+
# new append. However, the promote_types will raise an exception in that case and this code
304+
# will not be reached.
305+
old_arr = _resize_with_dtype(self._do_read(collection, previous_version, symbol), dtype)
277306

278307
item = np.concatenate([old_arr, item])
279308
version['up_to'] = len(item)

arctic/store/_pandas_ndarray_store.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,8 @@ def _segment_index(self, recarr, existing_index, start, new_segments):
5151
dtype=INDEX_DTYPE)
5252
# append to existing index if exists
5353
if existing_index:
54-
existing_index_arr = np.fromstring(decompress(existing_index), dtype=INDEX_DTYPE)
54+
# existing_index_arr is read-only but it's never written to
55+
existing_index_arr = np.frombuffer(decompress(existing_index), dtype=INDEX_DTYPE)
5556
if start > 0:
5657
existing_index_arr = existing_index_arr[existing_index_arr['index'] < start]
5758
index = np.concatenate((existing_index_arr, index))
@@ -74,7 +75,8 @@ def _index_range(self, version, symbol, date_range=None, **kwargs):
7475
with the date_range. As the segment index is (id -> last datetime)
7576
we need to take care in choosing the correct chunks. """
7677
if date_range and 'segment_index' in version:
77-
index = np.fromstring(decompress(version['segment_index']), dtype=INDEX_DTYPE)
78+
# index is read-only but it's never written to
79+
index = np.frombuffer(decompress(version['segment_index']), dtype=INDEX_DTYPE)
7880
dtcol = self._datetime64_index(index)
7981
if dtcol and len(index):
8082
dts = index[dtcol]

arctic/tickstore/tickstore.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -435,7 +435,8 @@ def _read_bucket(self, doc, column_set, column_dtypes, include_symbol, include_i
435435
rtn = {}
436436
if doc[VERSION] != 3:
437437
raise ArcticException("Unhandled document version: %s" % doc[VERSION])
438-
rtn[INDEX] = np.cumsum(np.fromstring(decompress(doc[INDEX]), dtype='uint64'))
438+
# np.cumsum copies the read-only array created with frombuffer
439+
rtn[INDEX] = np.cumsum(np.frombuffer(decompress(doc[INDEX]), dtype='uint64'))
439440
doc_length = len(rtn[INDEX])
440441
column_set.update(doc[COLUMNS].keys())
441442

@@ -444,7 +445,8 @@ def _read_bucket(self, doc, column_set, column_dtypes, include_symbol, include_i
444445
for c in column_set:
445446
try:
446447
coldata = doc[COLUMNS][c]
447-
mask = np.fromstring(decompress(coldata[ROWMASK]), dtype='uint8')
448+
# the or below will make a copy of this read-only array
449+
mask = np.frombuffer(decompress(coldata[ROWMASK]), dtype='uint8')
448450
union_mask = union_mask | mask
449451
except KeyError:
450452
rtn[c] = None
@@ -460,10 +462,13 @@ def _read_bucket(self, doc, column_set, column_dtypes, include_symbol, include_i
460462
try:
461463
coldata = doc[COLUMNS][c]
462464
dtype = np.dtype(coldata[DTYPE])
463-
values = np.fromstring(decompress(coldata[DATA]), dtype=dtype)
465+
# values ends up being copied by pandas before being returned to the user. However, we
466+
# copy it into a bytearray here for safety.
467+
values = np.frombuffer(bytearray(decompress(coldata[DATA])), dtype=dtype)
464468
self._set_or_promote_dtype(column_dtypes, c, dtype)
465469
rtn[c] = self._empty(rtn_length, dtype=column_dtypes[c])
466-
rowmask = np.unpackbits(np.fromstring(decompress(coldata[ROWMASK]),
470+
# unpackbits will make a copy of the read-only array created by frombuffer
471+
rowmask = np.unpackbits(np.frombuffer(decompress(coldata[ROWMASK]),
467472
dtype='uint8'))[:doc_length].astype('bool')
468473
rowmask = rowmask[union_mask]
469474
rtn[c][rowmask] = values

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ def extensions():
139139
ext_modules=defer_cythonize(extensions),
140140
setup_requires=["six",
141141
"cython",
142-
"numpy<=1.13.3",
142+
"numpy",
143143
"setuptools-git",
144144
],
145145
install_requires=["cython",

tests/integration/store/test_ndarray_store_append.py

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import bson
22
import numpy as np
33
from numpy.testing import assert_equal
4+
import pytest
45

56
from arctic.store._ndarray_store import NdarrayStore, _APPEND_COUNT
67
from arctic.store.version_store import register_versioned_storage
@@ -46,6 +47,40 @@ def test_promote_types2(library):
4647
assert np.all(ndarr.astype([('abc', np.promote_types('float64', 'int64'))]) == saved_arr)
4748

4849

50+
def test_promote_types_smaller_sizes(library):
51+
library.write('MYARR', np.ones(100, dtype='int64'))
52+
library.append('MYARR', np.ones(100, dtype='int32'))
53+
saved_arr = library.read('MYARR').data
54+
assert np.all(np.ones(200, dtype='int64') == saved_arr)
55+
56+
57+
def test_promote_types_larger_sizes(library):
58+
library.write('MYARR', np.ones(100, dtype='int32'))
59+
library.append('MYARR', np.ones(100, dtype='int64'))
60+
saved_arr = library.read('MYARR').data
61+
assert np.all(np.ones(200, dtype='int64') == saved_arr)
62+
63+
64+
def test_promote_field_types_smaller_sizes(library):
65+
arr = np.array([(3, 7)], dtype=[('a', '<i8'), ('b', '<i8')])
66+
library.write('MYARR', arr)
67+
arr = np.array([(9, 8)], dtype=[('a', '<i4'), ('b', '<i8')])
68+
library.append('MYARR', arr)
69+
saved_arr = library.read('MYARR').data
70+
expected = np.array([(3, 7), (9, 8)], dtype=[('a', '<i8'), ('b', '<i8')])
71+
assert np.all(saved_arr == expected)
72+
73+
74+
def test_promote_field_types_larger_sizes(library):
75+
arr = np.array([(3, 7)], dtype=[('a', '<i4'), ('b', '<i8')])
76+
library.write('MYARR', arr)
77+
arr = np.array([(9, 8)], dtype=[('a', '<i8'), ('b', '<i8')])
78+
library.append('MYARR', arr)
79+
saved_arr = library.read('MYARR').data
80+
expected = np.array([(3, 7), (9, 8)], dtype=[('a', '<i8'), ('b', '<i8')])
81+
assert np.all(saved_arr == expected)
82+
83+
4984
def test_append_ndarray_with_field_shape(library):
5085
ndarr = np.empty(10, dtype=[('A', 'int64'), ('B', 'float64', (2,))])
5186
ndarr['A'] = 1
@@ -131,6 +166,15 @@ def test_append_too_large_ndarray(library):
131166
assert np.all(np.concatenate([ndarr, ndarr]) == saved_arr)
132167

133168

169+
def test_empty_field_append_keeps_all_columns(library):
170+
ndarr = np.array([(3, 5)], dtype=[('a', '<i'), ('b', '<i')])
171+
ndarr2 = np.array([], dtype=[('a', '<i')])
172+
library.write('MYARR', ndarr)
173+
library.append('MYARR', ndarr2)
174+
saved_arr = library.read('MYARR').data
175+
assert np.all(saved_arr == np.array([(3, 5)], dtype=[('a', '<i'), ('b', '<i')]))
176+
177+
134178
def test_empty_append_promotes_dtype(library):
135179
ndarr = np.array(["a", "b", "c"])
136180
ndarr2 = np.array([])
@@ -160,6 +204,14 @@ def test_empty_append_promotes_dtype3(library):
160204
assert np.all(saved_arr == np.hstack((ndarr2, ndarr2)))
161205

162206

207+
def test_convert_to_structured_array(library):
208+
arr = np.ones(100, dtype='int64')
209+
library.write('MYARR', arr)
210+
arr = np.array([(6,)], dtype=[('a', '<i8')])
211+
with pytest.raises(ValueError):
212+
library.append('MYARR', arr)
213+
214+
163215
def test_empty_append_concat_and_rewrite(library):
164216
ndarr = np.array([])
165217
ndarr2 = np.array(["a", "b", "c"])

tests/integration/tickstore/test_ts_read.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,35 @@ def test_read(tickstore_lib):
4545
assert tickstore_lib._collection.find_one()['c'] == 2
4646

4747

48+
def test_read_data_is_modifiable(tickstore_lib):
49+
data = [{'ASK': 1545.25,
50+
'ASKSIZE': 1002.0,
51+
'BID': 1545.0,
52+
'BIDSIZE': 55.0,
53+
'CUMVOL': 2187387.0,
54+
'DELETED_TIME': 0,
55+
'INSTRTYPE': 'FUT',
56+
'PRICE': 1545.0,
57+
'SIZE': 1.0,
58+
'TICK_STATUS': 0,
59+
'TRADEHIGH': 1561.75,
60+
'TRADELOW': 1537.25,
61+
'index': 1185076787070},
62+
{'CUMVOL': 354.0,
63+
'DELETED_TIME': 0,
64+
'PRICE': 1543.75,
65+
'SIZE': 354.0,
66+
'TRADEHIGH': 1543.75,
67+
'TRADELOW': 1543.75,
68+
'index': 1185141600600}]
69+
tickstore_lib.write('FEED::SYMBOL', data)
70+
71+
df = tickstore_lib.read('FEED::SYMBOL', columns=['BID', 'ASK', 'PRICE'])
72+
73+
df[['BID', 'ASK', 'PRICE']] = 7
74+
assert np.all(df[['BID', 'ASK', 'PRICE']].values == np.array([[7, 7, 7], [7, 7, 7]]))
75+
76+
4877
def test_read_allow_secondary(tickstore_lib):
4978
data = [{'ASK': 1545.25,
5079
'ASKSIZE': 1002.0,

tests/unit/serialization/test_numpy_arrays.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,3 +92,12 @@ def test_multi_column_fail():
9292
with pytest.raises(Exception) as e:
9393
n.deserialize(a, columns=['A', 'B'])
9494
assert('Duplicate' in str(e))
95+
96+
97+
def test_dataframe_writable_after_objify():
98+
f = FrameConverter()
99+
df = pd.DataFrame(data={'one': [5, 6, 2]})
100+
df = f.objify(f.docify(df))
101+
df['one'] = 7
102+
103+
assert np.all(df['one'].values == np.array([7, 7, 7]))

tests/unit/tickstore/test_tickstore.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ def test_tickstore_to_bucket_with_image():
9494
assert get_coldata(bucket[COLUMNS]['B']) == ([27.2], [0, 1, 0, 0, 0, 0, 0, 0])
9595
assert get_coldata(bucket[COLUMNS]['D']) == ([0], [1, 0, 0, 0, 0, 0, 0, 0])
9696
index = [dt.fromtimestamp(int(i/1000)).replace(tzinfo=mktz(tz)) for i in
97-
list(np.cumsum(np.fromstring(decompress(bucket[INDEX]), dtype='uint64')))]
97+
list(np.cumsum(np.frombuffer(decompress(bucket[INDEX]), dtype='uint64')))]
9898
assert index == [i['index'] for i in data]
9999
assert bucket[COLUMNS]['A'][DTYPE] == 'int64'
100100
assert bucket[COLUMNS]['B'][DTYPE] == 'float64'
@@ -128,8 +128,8 @@ def test_tickstore_to_bucket_always_forwards_image():
128128
def get_coldata(coldata):
129129
""" return values and rowmask """
130130
dtype = np.dtype(coldata[DTYPE])
131-
values = np.fromstring(decompress(coldata[DATA]), dtype=dtype)
132-
rowmask = np.unpackbits(np.fromstring(decompress(coldata[ROWMASK]), dtype='uint8'))
131+
values = np.frombuffer(decompress(coldata[DATA]), dtype=dtype)
132+
rowmask = np.unpackbits(np.frombuffer(decompress(coldata[ROWMASK]), dtype='uint8'))
133133
return list(values), list(rowmask)
134134

135135

@@ -159,7 +159,7 @@ def test_tickstore_pandas_to_bucket_image():
159159
assert values[0] == 1 and values[2] == 1
160160
assert rowmask == [1, 1, 1, 0, 0, 0, 0, 0]
161161
index = [dt.fromtimestamp(int(i/1000)).replace(tzinfo=mktz(tz)) for i in
162-
list(np.cumsum(np.fromstring(decompress(bucket[INDEX]), dtype='uint64')))]
162+
list(np.cumsum(np.frombuffer(decompress(bucket[INDEX]), dtype='uint64')))]
163163
assert index == tick_index
164164
assert bucket[COLUMNS]['A'][DTYPE] == 'int64'
165165
assert bucket[COLUMNS]['B'][DTYPE] == 'float64'

0 commit comments

Comments
 (0)