Skip to content

Commit 38d17f8

Browse files
authored
Merge pull request pandas-dev#647 from dimosped/incremental-serialization-disabled-final
Incremental serialization complete implementation (not used, code complete)
2 parents 9295032 + 8577d70 commit 38d17f8

File tree

9 files changed

+497
-53
lines changed

9 files changed

+497
-53
lines changed

CHANGES.md

+5-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
## Changelog
22

3-
### 1.71 (2018-11-05)
3+
### 1.72
4+
* Feature: #577 Added implementation for incremental serializer for numpy records
5+
* Bugfix: #648 Fix issue with Timezone aware Pandas types, which don't contain hasobject attribute
6+
7+
### 1.71 (2018-11-05)
48
* Bugfix: #645 Fix write errors for Pandas DataFrame that has mixed object/string types in multi-index column
59

610
### 1.70 (2018-10-30)

arctic/_util.py

+7-2
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,16 @@
1-
from pandas import DataFrame
2-
from pandas.util.testing import assert_frame_equal
31
import logging
2+
3+
import numpy as np
44
import pymongo
5+
from pandas import DataFrame
6+
from pandas.util.testing import assert_frame_equal
57

68

79
logger = logging.getLogger(__name__)
810

11+
MAX_DOCUMENT_SIZE = int(pymongo.common.MAX_BSON_SIZE * 0.8)
12+
NP_OBJECT_DTYPE = np.dtype('O')
13+
914
# Avoid import-time extra logic
1015
_use_new_count_api = None
1116

arctic/exceptions.py

+2
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ class DataIntegrityException(ArcticException):
4040
"""
4141
pass
4242

43+
class ArcticSerializationException(ArcticException):
44+
pass
4345

4446
class ConcurrentModificationException(DataIntegrityException):
4547
pass

arctic/serialization/incremental.py

+230
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,230 @@
1+
import abc
2+
import hashlib
3+
import logging
4+
import os
5+
from threading import RLock
6+
7+
import numpy as np
8+
import pandas as pd
9+
from bson import Binary
10+
11+
from arctic.serialization.numpy_records import PandasSerializer
12+
from .._compression import compress
13+
from ..exceptions import ArcticSerializationException
14+
from .._util import MAX_DOCUMENT_SIZE, NP_OBJECT_DTYPE
15+
16+
ARCTIC_AUTO_EXPAND_CHUNK_SIZE = bool(os.environ.get('ARCTIC_AUTO_EXPAND_CHUNK_SIZE'))
17+
18+
ABC = abc.ABCMeta('ABC', (object,), {})
19+
20+
log = logging.getLogger(__name__)
21+
22+
23+
def incremental_checksum(item, curr_sha=None, is_bytes=False):
24+
curr_sha = hashlib.sha1() if curr_sha is None else curr_sha
25+
curr_sha.update(item if is_bytes else item.tostring())
26+
return curr_sha
27+
28+
29+
class LazyIncrementalSerializer(ABC):
30+
def __init__(self, serializer, input_data, chunk_size):
31+
if chunk_size < 1:
32+
raise ArcticSerializationException("LazyIncrementalSerializer can't be initialized "
33+
"with chunk_size < 1 ({})".format(chunk_size))
34+
if not serializer:
35+
raise ArcticSerializationException("LazyIncrementalSerializer can't be initialized "
36+
"with a None serializer object")
37+
self.input_data = input_data
38+
self.chunk_size = chunk_size
39+
self._serializer = serializer
40+
self._initialized = False
41+
self._checksum = None
42+
43+
@abc.abstractmethod
44+
def __len__(self):
45+
pass
46+
47+
@abc.abstractproperty
48+
def generator(self):
49+
pass
50+
51+
@abc.abstractproperty
52+
def generator_bytes(self):
53+
pass
54+
55+
@abc.abstractproperty
56+
def serialize(self):
57+
pass
58+
59+
60+
class IncrementalPandasToRecArraySerializer(LazyIncrementalSerializer):
61+
def __init__(self, serializer, input_data, chunk_size, string_max_len=None):
62+
super(IncrementalPandasToRecArraySerializer, self).__init__(serializer, input_data, chunk_size)
63+
if not isinstance(serializer, PandasSerializer):
64+
raise ArcticSerializationException("IncrementalPandasToRecArraySerializer requires a serializer of "
65+
"type PandasSerializer.")
66+
if not isinstance(input_data, (pd.DataFrame, pd.Series)):
67+
raise ArcticSerializationException("IncrementalPandasToRecArraySerializer requires a pandas DataFrame or "
68+
"Series as data source input.")
69+
if string_max_len and string_max_len < 1:
70+
raise ArcticSerializationException("IncrementalPandasToRecArraySerializer can't be initialized "
71+
"with string_max_len < 1 ({})".format(string_max_len))
72+
self.string_max_len = string_max_len
73+
# The state which needs to be lazily initialized
74+
self._dtype = None
75+
self._shape = None
76+
self._rows_per_chunk = 0
77+
self._total_chunks = 0
78+
self._has_string_object = False
79+
self._lock = RLock()
80+
81+
def _dtype_convert_to_max_len_string(self, input_ndtype, fname):
82+
if input_ndtype.type not in (np.string_, np.unicode_):
83+
return input_ndtype, False
84+
type_sym = 'S' if input_ndtype.type == np.string_ else 'U'
85+
max_str_len = len(max(self.input_data[fname].astype(type_sym), key=len))
86+
str_field_dtype = np.dtype('{}{:d}'.format(type_sym, max_str_len)) if max_str_len > 0 else input_ndtype
87+
return str_field_dtype, True
88+
89+
def _get_dtype(self):
90+
# Serializer is being called only if can_convert_to_records_without_objects() has passed,
91+
# which means that the resulting recarray does not contain objects but only numpy types, string, or unicode
92+
93+
# Serialize the first row to obtain info about row size in bytes (cache first few rows only)
94+
# Also raise an Exception early, if data are not serializable
95+
first_chunk, serialized_dtypes = self._serializer.serialize(
96+
self.input_data[0:10] if len(self) > 0 else self.input_data,
97+
string_max_len=self.string_max_len)
98+
99+
# This is the common case, where first row's dtype represents well the whole dataframe's dtype
100+
if serialized_dtypes is None or \
101+
len(self.input_data) == 0 or \
102+
NP_OBJECT_DTYPE not in self.input_data.dtypes.values:
103+
return first_chunk, serialized_dtypes, False
104+
105+
# Reaching here means we have at least one column of type object
106+
# To correctly serialize incrementally, we need to know the final dtype (type and fixed length),
107+
# using length-conversion information from all values of the object columns
108+
109+
dtype_arr = []
110+
has_string_object = False
111+
for field_name in serialized_dtypes.names: # include all column names, along with the expanded multi-index
112+
field_dtype = serialized_dtypes[field_name]
113+
if field_name not in self.input_data or self.input_data.dtypes[field_name] is NP_OBJECT_DTYPE:
114+
# Note: .hasobject breaks for timezone-aware datetime64 pandas columns, so compare with dtype('O')
115+
# if column is an expanded multi index or doesn't contain objects, the serialized 1st row dtype is safe
116+
field_dtype, with_str_object = self._dtype_convert_to_max_len_string(field_dtype, field_name)
117+
has_string_object |= with_str_object
118+
dtype_arr.append((field_name, field_dtype))
119+
return first_chunk, np.dtype(dtype_arr), has_string_object
120+
121+
def _lazy_init(self):
122+
if self._initialized:
123+
return
124+
125+
with self._lock:
126+
if self._initialized: # intentional double check here
127+
return
128+
# Get the dtype of the serialized array (takes into account object types, converted to fixed length strings)
129+
first_chunk, dtype, has_string_object = self._get_dtype()
130+
131+
# Compute the number of rows which can fit in a chunk
132+
rows_per_chunk = 0
133+
if len(self) > 0 and self.chunk_size > 1:
134+
rows_per_chunk = IncrementalPandasToRecArraySerializer._calculate_rows_per_chunk(self.chunk_size, first_chunk)
135+
136+
# Initialize object's state
137+
self._dtype = dtype
138+
shp = list(first_chunk.shape)
139+
shp[0] = len(self)
140+
self._shape = tuple(shp)
141+
self._has_string_object = has_string_object
142+
self._rows_per_chunk = rows_per_chunk
143+
self._total_chunks = int(np.ceil(float(len(self)) / self._rows_per_chunk)) if rows_per_chunk > 0 else 0
144+
self._initialized = True
145+
146+
@staticmethod
147+
def _calculate_rows_per_chunk(max_chunk_size, chunk):
148+
sze = int(chunk.dtype.itemsize * np.prod(chunk.shape[1:]))
149+
sze = sze if sze < max_chunk_size else max_chunk_size
150+
rows_per_chunk = int(max_chunk_size / sze)
151+
if rows_per_chunk < 1 and ARCTIC_AUTO_EXPAND_CHUNK_SIZE:
152+
# If a row size is larger than chunk_size, use the maximum document size
153+
logging.warning('Chunk size of {} is too small to fit a row ({}). '
154+
'Using maximum document size.'.format(max_chunk_size, MAX_DOCUMENT_SIZE))
155+
# For huge rows, fall-back to using a very large document size, less than max-allowed by MongoDB
156+
rows_per_chunk = int(MAX_DOCUMENT_SIZE / sze)
157+
if rows_per_chunk < 1:
158+
raise ArcticSerializationException("Serialization failed to split data into max sized chunks.")
159+
return rows_per_chunk
160+
161+
def __len__(self):
162+
return len(self.input_data)
163+
164+
@property
165+
def shape(self):
166+
self._lazy_init()
167+
return self._shape
168+
169+
@property
170+
def dtype(self):
171+
self._lazy_init()
172+
return self._dtype
173+
174+
@property
175+
def rows_per_chunk(self):
176+
self._lazy_init()
177+
return self._rows_per_chunk
178+
179+
def checksum(self, from_idx, to_idx):
180+
if self._checksum is None:
181+
self._lazy_init()
182+
total_sha = None
183+
for chunk_bytes, dtype in self.generator_bytes(from_idx=from_idx, to_idx=to_idx):
184+
# TODO: what about compress_array here in batches?
185+
compressed_chunk = compress(chunk_bytes)
186+
total_sha = incremental_checksum(compressed_chunk, curr_sha=total_sha, is_bytes=True)
187+
self._checksum = Binary(total_sha.digest())
188+
return self._checksum
189+
190+
def generator(self, from_idx=None, to_idx=None):
191+
return self._generator(from_idx=from_idx, to_idx=to_idx)
192+
193+
def generator_bytes(self, from_idx=None, to_idx=None):
194+
return self._generator(from_idx=from_idx, to_idx=to_idx, get_bytes=True)
195+
196+
def _generator(self, from_idx, to_idx, get_bytes=False):
197+
# Note that the range is: [from_idx, to_idx)
198+
self._lazy_init()
199+
200+
my_lenth = len(self)
201+
202+
# Take into account default arguments and negative indexing (from end offset)
203+
from_idx = 0 if from_idx is None else from_idx
204+
if from_idx < 0:
205+
from_idx = my_lenth + from_idx
206+
to_idx = my_lenth if to_idx is None else min(to_idx, my_lenth)
207+
if to_idx < 0:
208+
to_idx = my_lenth + to_idx
209+
210+
# No data, finish iteration
211+
if my_lenth == 0 or from_idx >= my_lenth or from_idx >= to_idx:
212+
return
213+
214+
# Perform serialization for each chunk
215+
while from_idx < to_idx:
216+
curr_stop = min(from_idx+self._rows_per_chunk, to_idx)
217+
218+
chunk, _ = self._serializer.serialize(
219+
self.input_data[from_idx: curr_stop],
220+
string_max_len=self.string_max_len,
221+
forced_dtype=self.dtype if self._has_string_object else None)
222+
223+
# Let the gc collect the intermediate serialized chunk as early as possible
224+
chunk = chunk.tostring() if chunk is not None and get_bytes else chunk
225+
226+
yield chunk, self.dtype, from_idx, curr_stop
227+
from_idx = curr_stop
228+
229+
def serialize(self):
230+
return self._serializer.serialize(self.input_data, self.string_max_len)

arctic/serialization/numpy_records.py

+28-20
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import numpy as np
55
from pandas import DataFrame, MultiIndex, Series, DatetimeIndex, Index
66
from ..exceptions import ArcticException
7+
from .._util import NP_OBJECT_DTYPE
78
try: # 0.21+ Compatibility
89
from pandas._libs.tslib import Timestamp
910
from pandas._libs.tslibs.timezones import get_timezone
@@ -27,19 +28,21 @@ def set_fast_check_df_serializable(config):
2728
_FAST_CHECK_DF_SERIALIZABLE = bool(config)
2829

2930

30-
def _to_primitive(arr, string_max_len=None):
31+
def _to_primitive(arr, string_max_len=None, forced_dtype=None):
3132
if arr.dtype.hasobject:
3233
if len(arr) > 0 and isinstance(arr[0], Timestamp):
3334
return np.array([t.value for t in arr], dtype=DTN64_DTYPE)
3435

35-
if string_max_len:
36-
str_array = np.array(arr.astype('U{:d}'.format(string_max_len)))
36+
if forced_dtype is not None:
37+
casted_arr = arr.astype(dtype=forced_dtype, copy=False)
38+
elif string_max_len is not None:
39+
casted_arr = np.array(arr.astype('U{:d}'.format(string_max_len)))
3740
else:
38-
str_array = np.array(list(arr))
41+
casted_arr = np.array(list(arr))
3942

4043
# Pick any unwanted data conversions (e.g. np.NaN to 'nan')
41-
if np.array_equal(arr, str_array):
42-
return str_array
44+
if np.array_equal(arr, casted_arr):
45+
return casted_arr
4346
return arr
4447

4548

@@ -48,7 +51,7 @@ def _multi_index_to_records(index, empty_index):
4851
if not empty_index:
4952
ix_vals = list(map(np.array, [index.get_level_values(i) for i in range(index.nlevels)]))
5053
else:
51-
# empty multi index has no size, create empty arrays for recarry..
54+
# empty multi index has no size, create empty arrays for recarry.
5255
ix_vals = [np.array([]) for n in index.names]
5356
index_names = list(index.names)
5457
count = 0
@@ -110,7 +113,7 @@ def _index_from_records(self, recarr):
110113
rtn = MultiIndex.from_arrays(level_arrays, names=index)
111114
return rtn
112115

113-
def _to_records(self, df, string_max_len=None):
116+
def _to_records(self, df, string_max_len=None, forced_dtype=None):
114117
"""
115118
Similar to DataFrame.to_records()
116119
Differences:
@@ -134,11 +137,16 @@ def _to_records(self, df, string_max_len=None):
134137
names = index_names + columns
135138

136139
arrays = []
137-
for arr in ix_vals + column_vals:
138-
arrays.append(_to_primitive(arr, string_max_len))
139-
140-
dtype = np.dtype([(str(x), v.dtype) if len(v.shape) == 1 else (str(x), v.dtype, v.shape[1]) for x, v in zip(names, arrays)],
141-
metadata=metadata)
140+
for arr, name in zip(ix_vals + column_vals, index_names + columns):
141+
arrays.append(_to_primitive(arr, string_max_len,
142+
forced_dtype=None if forced_dtype is None else forced_dtype[name]))
143+
144+
if forced_dtype is None:
145+
dtype = np.dtype([(str(x), v.dtype) if len(v.shape) == 1 else (str(x), v.dtype, v.shape[1])
146+
for x, v in zip(names, arrays)],
147+
metadata=metadata)
148+
else:
149+
dtype = forced_dtype
142150

143151
# The argument names is ignored when dtype is passed
144152
rtn = np.rec.fromarrays(arrays, dtype=dtype, names=names)
@@ -166,8 +174,8 @@ def fast_check_serializable(self, df):
166174
mappings, and empty dict otherwise.
167175
"""
168176
i_dtype, f_dtypes = df.index.dtype, df.dtypes
169-
index_has_object = df.index.dtype.hasobject
170-
fields_with_object = [f for f in df.columns if f_dtypes[f] is np.dtype('O')]
177+
index_has_object = df.index.dtype is NP_OBJECT_DTYPE
178+
fields_with_object = [f for f in df.columns if f_dtypes[f] is NP_OBJECT_DTYPE]
171179
if df.empty or (not index_has_object and not fields_with_object):
172180
arr, _ = self._to_records(df.iloc[:10]) # only first few rows for performance
173181
return arr, {}
@@ -202,7 +210,7 @@ def can_convert_to_records_without_objects(self, df, symbol):
202210
else:
203211
return True
204212

205-
def serialize(self, item):
213+
def serialize(self, item, string_max_len=None, forced_dtype=None):
206214
raise NotImplementedError
207215

208216
def deserialize(self, item):
@@ -224,8 +232,8 @@ def deserialize(self, item):
224232
name = item.dtype.names[-1]
225233
return Series.from_array(item[name], index=index, name=name)
226234

227-
def serialize(self, item, string_max_len=None):
228-
return self._to_records(item, string_max_len)
235+
def serialize(self, item, string_max_len=None, forced_dtype=None):
236+
return self._to_records(item, string_max_len, forced_dtype)
229237

230238

231239
class DataFrameSerializer(PandasSerializer):
@@ -267,5 +275,5 @@ def deserialize(self, item):
267275

268276
return df
269277

270-
def serialize(self, item, string_max_len=None):
271-
return self._to_records(item, string_max_len)
278+
def serialize(self, item, string_max_len=None, forced_dtype=None):
279+
return self._to_records(item, string_max_len, forced_dtype)

0 commit comments

Comments
 (0)