From 9d11734f1fc143ae5be9db19e7f5ed10bc9d4dac Mon Sep 17 00:00:00 2001 From: Chris Date: Fri, 21 Aug 2015 21:49:37 -0500 Subject: [PATCH] PERF: Cythonize groupby transforms #4095 --- asv_bench/benchmarks/groupby.py | 16 +++ doc/source/whatsnew/v0.17.1.txt | 2 + pandas/algos.pyx | 38 ------ pandas/core/groupby.py | 225 ++++++++++++++++++++++++-------- pandas/src/generate_code.py | 140 ++++++++++++++++++++ pandas/src/generated.pyx | 135 +++++++++++++++++++ pandas/tests/test_groupby.py | 153 +++++++++++++++++++++- 7 files changed, 614 insertions(+), 95 deletions(-) diff --git a/asv_bench/benchmarks/groupby.py b/asv_bench/benchmarks/groupby.py index 138977a29463e..48480041ed1bd 100644 --- a/asv_bench/benchmarks/groupby.py +++ b/asv_bench/benchmarks/groupby.py @@ -772,3 +772,19 @@ def setup(self): def time_groupby_transform_series2(self): self.df.groupby('id')['val'].transform(np.mean) + +class groupby_transform_cythonized(object): + goal_time = 0.2 + + def setup(self): + np.random.seed(0) + self.df = DataFrame({'id': (np.arange(100000) / 3), 'val': np.random.randn(100000), }) + + def time_groupby_transform_cumprod(self): + self.df.groupby('id').cumprod() + + def time_groupby_transform_cumsum(self): + self.df.groupby('id').cumsum() + + def time_groupby_transform_shift(self): + self.df.groupby('id').shift() diff --git a/doc/source/whatsnew/v0.17.1.txt b/doc/source/whatsnew/v0.17.1.txt index 114665e28203f..b44c3f4f4673c 100755 --- a/doc/source/whatsnew/v0.17.1.txt +++ b/doc/source/whatsnew/v0.17.1.txt @@ -93,6 +93,8 @@ Performance Improvements - Performance bug in repr of ``Categorical`` categories, which was rendering the strings before chopping them for display (:issue:`11305`) - Improved performance of ``Series`` constructor with no data and ``DatetimeIndex`` (:issue:`11433`) +- Improved performance ``shift``, ``cumprod``, and ``cumsum`` with groupby (:issue:`4095`) + .. _whatsnew_0171.bug_fixes: diff --git a/pandas/algos.pyx b/pandas/algos.pyx index 8569209f2e946..62ee6ced84882 100644 --- a/pandas/algos.pyx +++ b/pandas/algos.pyx @@ -50,7 +50,6 @@ cdef np.float64_t MAXfloat64 = np.inf cdef double NaN = np.NaN cdef double nan = NaN - cdef inline int int_max(int a, int b): return a if a >= b else b cdef inline int int_min(int a, int b): return a if a <= b else b @@ -2266,43 +2265,6 @@ def group_last_bin_object(ndarray[object, ndim=2] out, else: out[i, j] = resx[i, j] - - -#---------------------------------------------------------------------- -# median - -def group_median(ndarray[float64_t, ndim=2] out, - ndarray[int64_t] counts, - ndarray[float64_t, ndim=2] values, - ndarray[int64_t] labels): - ''' - Only aggregates on axis=0 - ''' - cdef: - Py_ssize_t i, j, N, K, ngroups, size - ndarray[int64_t] _counts - ndarray data - float64_t* ptr - ngroups = len(counts) - N, K = ( values).shape - - indexer, _counts = groupsort_indexer(labels, ngroups) - counts[:] = _counts[1:] - - data = np.empty((K, N), dtype=np.float64) - ptr = data.data - - take_2d_axis1_float64_float64(values.T, indexer, out=data) - - for i in range(K): - # exclude NA group - ptr += _counts[0] - for j in range(ngroups): - size = _counts[j + 1] - out[j, i] = _median_linear(ptr, size) - ptr += size - - cdef inline float64_t _median_linear(float64_t* a, int n): cdef int i, j, na_count = 0 cdef float64_t result diff --git a/pandas/core/groupby.py b/pandas/core/groupby.py index 9c5a40f6e34d6..e9aa9066b75a5 100644 --- a/pandas/core/groupby.py +++ b/pandas/core/groupby.py @@ -4,6 +4,7 @@ import datetime import collections import warnings +import copy from pandas.compat import( zip, builtins, range, long, lzip, @@ -88,6 +89,7 @@ _dataframe_apply_whitelist = \ _common_apply_whitelist | frozenset(['dtypes', 'corrwith']) +_cython_transforms = frozenset(['cumprod', 'cumsum', 'shift']) class GroupByError(Exception): pass @@ -1021,6 +1023,45 @@ def cumcount(self, ascending=True): cumcounts = self._cumcount_array(ascending=ascending) return Series(cumcounts, index) + def cumprod(self, axis=0): + """ + Cumulative product for each group + + """ + if axis != 0: + return self.apply(lambda x: x.cumprod(axis=axis)) + + return self._cython_transform('cumprod') + + def cumsum(self, axis=0): + """ + Cumulative sum for each group + + """ + if axis != 0: + return self.apply(lambda x: x.cumprod(axis=axis)) + + return self._cython_transform('cumsum') + + def shift(self, periods=1, freq=None, axis=0): + """ + Shift each group by periods observations + """ + + if freq is not None or axis != 0: + return self.apply(lambda x: x.shift(periods, freq, axis)) + + labels, _, ngroups = self.grouper.group_info + # filled in by Cython + indexer = np.zeros_like(labels) + _algos.group_shift_indexer(indexer, labels, ngroups, periods) + + output = {} + for name, obj in self._iterate_slices(): + output[name] = com.take_nd(obj.values, indexer) + + return self._wrap_transformed_output(output) + def head(self, n=5): """ Returns first n rows of each group. @@ -1139,6 +1180,24 @@ def _try_cast(self, result, obj): return result + def _cython_transform(self, how, numeric_only=True): + output = {} + for name, obj in self._iterate_slices(): + is_numeric = is_numeric_dtype(obj.dtype) + if numeric_only and not is_numeric: + continue + + try: + result, names = self.grouper.transform(obj.values, how) + except AssertionError as e: + raise GroupByError(str(e)) + output[name] = self._try_cast(result, obj) + + if len(output) == 0: + raise DataError('No numeric types to aggregate') + + return self._wrap_transformed_output(output, names) + def _cython_agg_general(self, how, numeric_only=True): output = {} for name, obj in self._iterate_slices(): @@ -1468,20 +1527,27 @@ def get_group_levels(self): # Aggregation functions _cython_functions = { - 'add': 'group_add', - 'prod': 'group_prod', - 'min': 'group_min', - 'max': 'group_max', - 'mean': 'group_mean', - 'median': { - 'name': 'group_median' - }, - 'var': 'group_var', - 'first': { - 'name': 'group_nth', - 'f': lambda func, a, b, c, d: func(a, b, c, d, 1) - }, - 'last': 'group_last', + 'aggregate': { + 'add': 'group_add', + 'prod': 'group_prod', + 'min': 'group_min', + 'max': 'group_max', + 'mean': 'group_mean', + 'median': { + 'name': 'group_median' + }, + 'var': 'group_var', + 'first': { + 'name': 'group_nth', + 'f': lambda func, a, b, c, d: func(a, b, c, d, 1) + }, + 'last': 'group_last', + }, + + 'transform': { + 'cumprod' : 'group_cumprod', + 'cumsum' : 'group_cumsum', + } } _cython_arity = { @@ -1490,22 +1556,24 @@ def get_group_levels(self): _name_functions = {} - def _get_aggregate_function(self, how, values): + def _get_cython_function(self, kind, how, values, is_numeric): dtype_str = values.dtype.name def get_func(fname): - # find the function, or use the object function, or return a - # generic + # see if there is a fused-type version of function + # only valid for numeric + f = getattr(_algos, fname, None) + if f is not None and is_numeric: + return f + + # otherwise find dtype-specific version, falling back to object for dt in [dtype_str, 'object']: f = getattr(_algos, "%s_%s" % (fname, dtype_str), None) if f is not None: return f - if dtype_str == 'float64': - return getattr(_algos, fname, None) - - ftype = self._cython_functions[how] + ftype = self._cython_functions[kind][how] if isinstance(ftype, dict): func = afunc = get_func(ftype['name']) @@ -1529,7 +1597,9 @@ def wrapper(*args, **kwargs): (how, dtype_str)) return func, dtype_str - def aggregate(self, values, how, axis=0): + def _cython_operation(self, kind, values, how, axis): + assert kind in ['transform', 'aggregate'] + arity = self._cython_arity.get(how, 1) vdim = values.ndim @@ -1561,11 +1631,11 @@ def aggregate(self, values, how, axis=0): values = values.astype(object) try: - agg_func, dtype_str = self._get_aggregate_function(how, values) + func, dtype_str = self._get_cython_function(kind, how, values, is_numeric) except NotImplementedError: if is_numeric: values = _algos.ensure_float64(values) - agg_func, dtype_str = self._get_aggregate_function(how, values) + func, dtype_str = self._get_cython_function(kind, how, values, is_numeric) else: raise @@ -1574,19 +1644,26 @@ def aggregate(self, values, how, axis=0): else: out_dtype = 'object' - # will be filled in Cython function - result = np.empty(out_shape, dtype=out_dtype) - result.fill(np.nan) - counts = np.zeros(self.ngroups, dtype=np.int64) + labels, _, _ = self.group_info - result = self._aggregate(result, counts, values, agg_func, is_numeric) + if kind == 'aggregate': + result = np.empty(out_shape, dtype=out_dtype) + result.fill(np.nan) + counts = np.zeros(self.ngroups, dtype=np.int64) + result = self._aggregate(result, counts, values, labels, func, is_numeric) + elif kind == 'transform': + result = np.empty_like(values, dtype=out_dtype) + result.fill(np.nan) + # temporary storange for running-total type tranforms + accum = np.empty(out_shape, dtype=out_dtype) + result = self._transform(result, accum, values, labels, func, is_numeric) if com.is_integer_dtype(result): if len(result[result == tslib.iNaT]) > 0: result = result.astype('float64') result[result == tslib.iNaT] = np.nan - if self._filter_empty_groups and not counts.all(): + if kind == 'aggregate' and self._filter_empty_groups and not counts.all(): if result.ndim == 2: try: result = lib.row_bool_subset( @@ -1612,8 +1689,13 @@ def aggregate(self, values, how, axis=0): return result, names - def _aggregate(self, result, counts, values, agg_func, is_numeric): - comp_ids, _, ngroups = self.group_info + def aggregate(self, values, how, axis=0): + return self._cython_operation('aggregate', values, how, axis) + + def transform(self, values, how, axis=0): + return self._cython_operation('transform', values, how, axis) + + def _aggregate(self, result, counts, values, comp_ids, agg_func, is_numeric): if values.ndim > 3: # punting for now raise NotImplementedError("number of dimensions is currently " @@ -1628,6 +1710,22 @@ def _aggregate(self, result, counts, values, agg_func, is_numeric): return result + def _transform(self, result, accum, values, comp_ids, transform_func, is_numeric): + comp_ids, _, ngroups = self.group_info + if values.ndim > 3: + # punting for now + raise NotImplementedError("number of dimensions is currently " + "limited to 3") + elif values.ndim > 2: + for i, chunk in enumerate(values.transpose(2, 0, 1)): + + chunk = chunk.squeeze() + agg_func(result[:, :, i], values, comp_ids, accum) + else: + transform_func(result, values, comp_ids, accum) + + return result + def agg_series(self, obj, func): try: return self._aggregate_series_fast(obj, func) @@ -1848,9 +1946,9 @@ def groupings(self): #---------------------------------------------------------------------- # cython aggregation - _cython_functions = {'ohlc': 'group_ohlc'} - _cython_functions.update(BaseGrouper._cython_functions) - _cython_functions.pop('median') + _cython_functions = copy.deepcopy(BaseGrouper._cython_functions) + _cython_functions['aggregate']['ohlc'] = 'group_ohlc' + _cython_functions['aggregate'].pop('median') _name_functions = { 'ohlc': lambda *args: ['open', 'high', 'low', 'close'] @@ -2380,10 +2478,9 @@ def _aggregate_multiple_funcs(self, arg): return DataFrame(results, columns=columns) - def _wrap_aggregated_output(self, output, names=None): - # sort of a kludge + def _wrap_output(self, output, index, names=None): + """ common agg/transform wrapping logic """ output = output[self.name] - index = self.grouper.result_index if names is not None: return DataFrame(output, index=index, columns=names) @@ -2393,6 +2490,16 @@ def _wrap_aggregated_output(self, output, names=None): name = self._selected_obj.name return Series(output, index=index, name=name) + def _wrap_aggregated_output(self, output, names=None): + return self._wrap_output(output=output, + index=self.grouper.result_index, + names=names) + + def _wrap_transformed_output(self, output, names=None): + return self._wrap_output(output=output, + index=self.obj.index, + names=names) + def _wrap_applied_output(self, keys, values, not_indexed_same=False): if len(keys) == 0: # GH #6265 @@ -2452,14 +2559,16 @@ def transform(self, func, *args, **kwargs): transformed : Series """ + func = _intercept_cython(func) or func + # if string function if isinstance(func, compat.string_types): - return self._transform_fast(lambda : getattr(self, func)(*args, **kwargs)) - - # do we have a cython function - cyfunc = _intercept_cython(func) - if cyfunc and not args and not kwargs: - return self._transform_fast(cyfunc) + if func in _cython_transforms: + # cythonized transform + return getattr(self, func)(*args, **kwargs) + else: + # cythonized aggregation and merge + return self._transform_fast(lambda : getattr(self, func)(*args, **kwargs)) # reg transform dtype = self._selected_obj.dtype @@ -3208,24 +3317,23 @@ def transform(self, func, *args, **kwargs): >>> grouped.transform(lambda x: (x - x.mean()) / x.std()) """ - # try to do a fast transform via merge if possible - try: - obj = self._obj_with_exclusions - if isinstance(func, compat.string_types): - result = getattr(self, func)(*args, **kwargs) + # optimized transforms + func = _intercept_cython(func) or func + if isinstance(func, compat.string_types): + if func in _cython_transforms: + # cythonized transform + return getattr(self, func)(*args, **kwargs) else: - cyfunc = _intercept_cython(func) - if cyfunc and not args and not kwargs: - result = getattr(self, cyfunc)() - else: - return self._transform_general(func, *args, **kwargs) - except: + # cythonized aggregation and merge + result = getattr(self, func)(*args, **kwargs) + else: return self._transform_general(func, *args, **kwargs) # a reduction transform if not isinstance(result, DataFrame): return self._transform_general(func, *args, **kwargs) + obj = self._obj_with_exclusions # nuiscance columns if not result.columns.equals(obj.columns): return self._transform_general(func, *args, **kwargs) @@ -3437,6 +3545,9 @@ def _wrap_aggregated_output(self, output, names=None): return self._reindex_output(result)._convert(datetime=True) + def _wrap_transformed_output(self, output, names=None): + return DataFrame(output, index=self.obj.index) + def _wrap_agged_blocks(self, items, blocks): if not self.as_index: index = np.arange(blocks[0].values.shape[1]) @@ -4069,7 +4180,9 @@ def _reorder_by_uniques(uniques, labels): np.var: 'var', np.median: 'median', np.max: 'max', - np.min: 'min' + np.min: 'min', + np.cumprod: 'cumprod', + np.cumsum: 'cumsum' } diff --git a/pandas/src/generate_code.py b/pandas/src/generate_code.py index 8c5c7d709e5f1..d137ce732e005 100644 --- a/pandas/src/generate_code.py +++ b/pandas/src/generate_code.py @@ -1749,6 +1749,144 @@ def put2d_%(name)s_%(dest_type)s(ndarray[%(c_type)s, ndim=2, cast=True] values, out[i] = values[j, loc] """ +#---------------------------------------------------------------------- +# other grouping functions not needing a template +grouping_no_template = """ +def group_median_float64(ndarray[float64_t, ndim=2] out, + ndarray[int64_t] counts, + ndarray[float64_t, ndim=2] values, + ndarray[int64_t] labels): + ''' + Only aggregates on axis=0 + ''' + cdef: + Py_ssize_t i, j, N, K, ngroups, size + ndarray[int64_t] _counts + ndarray data + float64_t* ptr + ngroups = len(counts) + N, K = ( values).shape + + indexer, _counts = groupsort_indexer(labels, ngroups) + counts[:] = _counts[1:] + + data = np.empty((K, N), dtype=np.float64) + ptr = data.data + + take_2d_axis1_float64_float64(values.T, indexer, out=data) + + for i in range(K): + # exclude NA group + ptr += _counts[0] + for j in range(ngroups): + size = _counts[j + 1] + out[j, i] = _median_linear(ptr, size) + ptr += size + +@cython.boundscheck(False) +@cython.wraparound(False) +def group_cumprod_float64(float64_t[:,:] out, + float64_t[:,:] values, + int64_t[:] labels, + float64_t[:,:] accum): + ''' + Only transforms on axis=0 + ''' + cdef: + Py_ssize_t i, j, N, K, size + float64_t val + int64_t lab + + N, K = ( values).shape + accum = np.ones_like(accum) + + with nogil: + for i in range(N): + lab = labels[i] + + if lab < 0: + continue + for j in range(K): + val = values[i, j] + if val == val: + accum[lab, j] *= val + out[i, j] = accum[lab, j] + +@cython.boundscheck(False) +@cython.wraparound(False) +def group_cumsum(numeric[:,:] out, + numeric[:,:] values, + int64_t[:] labels, + numeric[:,:] accum): + ''' + Only transforms on axis=0 + ''' + cdef: + Py_ssize_t i, j, N, K, size + numeric val + int64_t lab + + N, K = ( values).shape + accum = np.zeros_like(accum) + + with nogil: + for i in range(N): + lab = labels[i] + + if lab < 0: + continue + for j in range(K): + val = values[i,j] + if val == val: + accum[lab,j] += val + out[i,j] = accum[lab,j] + +@cython.boundscheck(False) +@cython.wraparound(False) +def group_shift_indexer(int64_t[:] out, int64_t[:] labels, + int ngroups, int periods): + cdef: + Py_ssize_t N, i, j, ii + int offset, sign + int64_t lab, idxer, idxer_slot + int64_t[:] label_seen = np.zeros(ngroups, dtype=np.int64) + int64_t[:,:] label_indexer + + N, = ( labels).shape + + if periods < 0: + periods = -periods + offset = N - 1 + sign = -1 + elif periods > 0: + offset = 0 + sign = 1 + + if periods == 0: + with nogil: + for i in range(N): + out[i] = i + else: + # array of each previous indexer seen + label_indexer = np.zeros((ngroups, periods), dtype=np.int64) + with nogil: + for i in range(N): + ## reverse iterator if shifting backwards + ii = offset + sign * i + lab = labels[ii] + label_seen[lab] += 1 + + idxer_slot = label_seen[lab] % periods + idxer = label_indexer[lab, idxer_slot] + + if label_seen[lab] > periods: + out[ii] = idxer + else: + out[ii] = -1 + + label_indexer[lab, idxer_slot] = ii +""" + #------------------------------------------------------------------------- # Generators @@ -2012,6 +2150,8 @@ def generate_take_cython_file(): print(generate_put_min_max_template(template, use_ints=True), file=f) + print(grouping_no_template, file=f) + for template in nobool_1d_templates: print(generate_from_template(template, exclude=['bool']), file=f) diff --git a/pandas/src/generated.pyx b/pandas/src/generated.pyx index 767e7d6292b6d..738f695a6ce9f 100644 --- a/pandas/src/generated.pyx +++ b/pandas/src/generated.pyx @@ -7933,6 +7933,141 @@ def group_max_int64(ndarray[int64_t, ndim=2] out, out[i, j] = maxx[i, j] + +def group_median_float64(ndarray[float64_t, ndim=2] out, + ndarray[int64_t] counts, + ndarray[float64_t, ndim=2] values, + ndarray[int64_t] labels): + ''' + Only aggregates on axis=0 + ''' + cdef: + Py_ssize_t i, j, N, K, ngroups, size + ndarray[int64_t] _counts + ndarray data + float64_t* ptr + ngroups = len(counts) + N, K = ( values).shape + + indexer, _counts = groupsort_indexer(labels, ngroups) + counts[:] = _counts[1:] + + data = np.empty((K, N), dtype=np.float64) + ptr = data.data + + take_2d_axis1_float64_float64(values.T, indexer, out=data) + + for i in range(K): + # exclude NA group + ptr += _counts[0] + for j in range(ngroups): + size = _counts[j + 1] + out[j, i] = _median_linear(ptr, size) + ptr += size + +@cython.boundscheck(False) +@cython.wraparound(False) +def group_cumprod_float64(float64_t[:,:] out, + float64_t[:,:] values, + int64_t[:] labels, + float64_t[:,:] accum): + ''' + Only transforms on axis=0 + ''' + cdef: + Py_ssize_t i, j, N, K, size + float64_t val + int64_t lab + + N, K = ( values).shape + accum = np.ones_like(accum) + + with nogil: + for i in range(N): + lab = labels[i] + + if lab < 0: + continue + for j in range(K): + val = values[i, j] + if val == val: + accum[lab, j] *= val + out[i, j] = accum[lab, j] + +@cython.boundscheck(False) +@cython.wraparound(False) +def group_cumsum(numeric[:,:] out, + numeric[:,:] values, + int64_t[:] labels, + numeric[:,:] accum): + ''' + Only transforms on axis=0 + ''' + cdef: + Py_ssize_t i, j, N, K, size + numeric val + int64_t lab + + N, K = ( values).shape + accum = np.zeros_like(accum) + + with nogil: + for i in range(N): + lab = labels[i] + + if lab < 0: + continue + for j in range(K): + val = values[i,j] + if val == val: + accum[lab,j] += val + out[i,j] = accum[lab,j] + +@cython.boundscheck(False) +@cython.wraparound(False) +def group_shift_indexer(int64_t[:] out, int64_t[:] labels, + int ngroups, int periods): + cdef: + Py_ssize_t N, i, j, ii + int offset, sign + int64_t lab, idxer, idxer_slot + int64_t[:] label_seen = np.zeros(ngroups, dtype=np.int64) + int64_t[:,:] label_indexer + + N, = ( labels).shape + + if periods < 0: + periods = -periods + offset = N - 1 + sign = -1 + elif periods > 0: + offset = 0 + sign = 1 + + if periods == 0: + with nogil: + for i in range(N): + out[i] = i + else: + # array of each previous indexer seen + label_indexer = np.zeros((ngroups, periods), dtype=np.int64) + with nogil: + for i in range(N): + ## reverse iterator if shifting backwards + ii = offset + sign * i + lab = labels[ii] + label_seen[lab] += 1 + + idxer_slot = label_seen[lab] % periods + idxer = label_indexer[lab, idxer_slot] + + if label_seen[lab] > periods: + out[ii] = idxer + else: + out[ii] = -1 + + label_indexer[lab, idxer_slot] = ii + @cython.wraparound(False) @cython.boundscheck(False) def left_join_indexer_unique_float64(ndarray[float64_t] left, diff --git a/pandas/tests/test_groupby.py b/pandas/tests/test_groupby.py index 9649288ab5b6d..6a2408cbe5ce0 100644 --- a/pandas/tests/test_groupby.py +++ b/pandas/tests/test_groupby.py @@ -2636,7 +2636,7 @@ def test_cython_api2(self): result = df.groupby('A').cumsum() assert_frame_equal(result,expected) - expected = DataFrame([[1, 2, np.nan], [2, np.nan, 9], [3, 4, 9]], columns=['A', 'B', 'C']).astype('float64') + # GH 5755 - cumsum is a transformer and should ignore as_index result = df.groupby('A', as_index=False).cumsum() assert_frame_equal(result,expected) @@ -5360,6 +5360,157 @@ def test__cython_agg_general(self): exc.args += ('operation: %s' % op,) raise + def test_cython_group_transform_algos(self): + #GH 4095 + dtypes = [np.int8, np.int16, np.int32, np.int64, + np.uint8, np.uint32, np.uint64, + np.float32, np.float64] + + ops = [(pd.algos.group_cumprod_float64, np.cumproduct, [np.float64]), + (pd.algos.group_cumsum, np.cumsum, dtypes)] + + for pd_op, np_op, dtypes in ops: + for dtype in dtypes: + data = np.array([[1],[2],[3],[4]], dtype=dtype) + ans = np.zeros_like(data) + accum = np.array([[0]], dtype=dtype) + labels = np.array([0,0,0,0], dtype=np.int64) + pd_op(ans, data, labels, accum) + self.assert_numpy_array_equal(np_op(data), ans[:,0]) + + + + # with nans + labels = np.array([0,0,0,0,0], dtype=np.int64) + + data = np.array([[1],[2],[3],[np.nan],[4]], dtype='float64') + accum = np.array([[0.0]]) + actual = np.zeros_like(data) + actual.fill(np.nan) + pd.algos.group_cumprod_float64(actual, data, labels, accum) + expected = np.array([1, 2, 6, np.nan, 24], dtype='float64') + self.assert_numpy_array_equal(actual[:, 0], expected) + + accum = np.array([[0.0]]) + actual = np.zeros_like(data) + actual.fill(np.nan) + pd.algos.group_cumsum(actual, data, labels, accum) + expected = np.array([1, 3, 6, np.nan, 10], dtype='float64') + self.assert_numpy_array_equal(actual[:, 0], expected) + + # timedelta + data = np.array([np.timedelta64(1, 'ns')] * 5, dtype='m8[ns]')[:, None] + accum = np.array([[0]], dtype='int64') + actual = np.zeros_like(data, dtype='int64') + actual.fill(np.nan) + pd.algos.group_cumsum(actual, data.view('int64'), labels, accum) + expected = np.array( + [np.timedelta64(1, 'ns'), np.timedelta64(2, 'ns'), + np.timedelta64(3, 'ns'), np.timedelta64(4, 'ns'), + np.timedelta64(5, 'ns')]) + self.assert_numpy_array_equal(actual[:, 0].view('m8[ns]'), expected) + + + + def test_cython_transform(self): + # GH 4095 + ops = [(('cumprod', ()), lambda x: x.cumprod()), + (('cumsum', ()), lambda x: x.cumsum()), + (('shift', (-1,)), lambda x: x.shift(-1)), + (('shift', (1,)), lambda x: x.shift())] + + s = Series(np.random.randn(1000)) + s_missing = s.copy() + s_missing.iloc[2:10] = np.nan + labels = np.random.randint(0, 50, size=1000).astype(float) + + #series + for (op, args), targop in ops: + for data in [s, s_missing]: + # print(data.head()) + expected = data.groupby(labels).transform(targop) + + tm.assert_series_equal(expected, + data.groupby(labels).transform(op, *args)) + tm.assert_series_equal(expected, + getattr(data.groupby(labels), op)(*args)) + + strings = list('qwertyuiopasdfghjklz') + strings_missing = strings[:] + strings_missing[5] = np.nan + df = DataFrame({'float': s, + 'float_missing': s_missing, + 'int': [1,1,1,1,2] * 200, + 'datetime': pd.date_range('1990-1-1', periods=1000), + 'timedelta': pd.timedelta_range(1, freq='s', periods=1000), + 'string': strings * 50, + 'string_missing': strings_missing * 50}) + df['cat'] = df['string'].astype('category') + + df2 = df.copy() + df2.index = pd.MultiIndex.from_product([range(100), range(10)]) + + #DataFrame - Single and MultiIndex, + #group by values, index level, columns + for df in [df, df2]: + for gb_target in [dict(by=labels), dict(level=0), + dict(by='string')]: # dict(by='string_missing')]: + # dict(by=['int','string'])]: + + gb = df.groupby(**gb_target) + # whitelisted methods set the selection before applying + # bit a of hack to make sure the cythonized shift + # is equivalent to pre 0.17.1 behavior + if op == 'shift': + gb._set_selection_from_grouper() + + for (op, args), targop in ops: + print(op) + if op != 'shift' and 'int' not in gb_target: + # numeric apply fastpath promotes dtype so have + # to apply seperately and concat + i = gb[['int']].apply(targop) + f = gb[['float','float_missing']].apply(targop) + expected = pd.concat([f,i], axis=1) + else: + expected = gb.apply(targop) + + expected = expected.sort_index(axis=1) + tm.assert_frame_equal(expected, + gb.transform(op, *args).sort_index(axis=1)) + tm.assert_frame_equal(expected, + getattr(gb, op)(*args)) + # individual columns + for c in df: + if c not in ['float', 'int', 'float_missing'] and op != 'shift': + self.assertRaises(DataError, gb[c].transform, op) + self.assertRaises(DataError, getattr(gb[c], op)) + else: + expected = gb[c].apply(targop) + expected.name = c + tm.assert_series_equal(expected, + gb[c].transform(op, *args)) + tm.assert_series_equal(expected, + getattr(gb[c], op)(*args)) + def test_groupby_cumprod(self): + # GH 4095 + df = pd.DataFrame({'key': ['b'] * 10, 'value': 2}) + + actual = df.groupby('key')['value'].cumprod() + expected = df.groupby('key')['value'].apply(lambda x: x.cumprod()) + expected.name = 'value' + tm.assert_series_equal(actual, expected) + + df = pd.DataFrame({'key': ['b'] * 100, 'value': 2}) + actual = df.groupby('key')['value'].cumprod() + # if overflows, groupby product casts to float + # while numpy passes back invalid values + df['value'] = df['value'].astype(float) + expected = df.groupby('key')['value'].apply(lambda x: x.cumprod()) + expected.name = 'value' + tm.assert_series_equal(actual, expected) + + def test_ops_general(self): ops = [('mean', np.mean), ('median', np.median),