diff --git a/pandas/core/apply.py b/pandas/core/apply.py new file mode 100644 index 0000000000000..2f43087f7dff9 --- /dev/null +++ b/pandas/core/apply.py @@ -0,0 +1,301 @@ +import numpy as np +from pandas import compat +from pandas._libs import lib +from pandas.core.dtypes.common import ( + is_extension_type, + is_sequence) + +from pandas.io.formats.printing import pprint_thing + + +def frame_apply(obj, func, axis=0, broadcast=False, + raw=False, reduce=None, args=(), **kwds): + """ construct and return a row or column based frame apply object """ + + axis = obj._get_axis_number(axis) + if axis == 0: + klass = FrameRowApply + elif axis == 1: + klass = FrameColumnApply + + return klass(obj, func, broadcast=broadcast, + raw=raw, reduce=reduce, args=args, kwds=kwds) + + +class FrameApply(object): + + def __init__(self, obj, func, broadcast, raw, reduce, args, kwds): + self.obj = obj + self.broadcast = broadcast + self.raw = raw + self.reduce = reduce + self.args = args + + self.ignore_failures = kwds.pop('ignore_failures', False) + self.kwds = kwds + + # curry if needed + if kwds or args and not isinstance(func, np.ufunc): + def f(x): + return func(x, *args, **kwds) + else: + f = func + + self.f = f + + @property + def columns(self): + return self.obj.columns + + @property + def index(self): + return self.obj.index + + @property + def values(self): + return self.obj.values + + @property + def agg_axis(self): + return self.obj._get_agg_axis(self.axis) + + def get_result(self): + """ compute the results """ + + # all empty + if len(self.columns) == 0 and len(self.index) == 0: + return self.apply_empty_result() + + # string dispatch + if isinstance(self.f, compat.string_types): + if self.axis: + self.kwds['axis'] = self.axis + return getattr(self.obj, self.f)(*self.args, **self.kwds) + + # ufunc + elif isinstance(self.f, np.ufunc): + with np.errstate(all='ignore'): + results = self.f(self.values) + return self.obj._constructor(data=results, index=self.index, + columns=self.columns, copy=False) + + # broadcasting + if self.broadcast: + return self.apply_broadcast() + + # one axis empty + if not all(self.obj.shape): + return self.apply_empty_result() + + # raw + if self.raw and not self.obj._is_mixed_type: + return self.apply_raw() + + return self.apply_standard() + + def apply_empty_result(self): + from pandas import Series + reduce = self.reduce + + if reduce is None: + reduce = False + + EMPTY_SERIES = Series([]) + try: + r = self.f(EMPTY_SERIES, *self.args, **self.kwds) + reduce = not isinstance(r, Series) + except Exception: + pass + + if reduce: + return Series(np.nan, index=self.agg_axis) + else: + return self.obj.copy() + + def apply_raw(self): + try: + result = lib.reduce(self.values, self.f, axis=self.axis) + except Exception: + result = np.apply_along_axis(self.f, self.axis, self.values) + + # TODO: mixed type case + from pandas import DataFrame, Series + if result.ndim == 2: + return DataFrame(result, index=self.index, columns=self.columns) + else: + return Series(result, index=self.agg_axis) + + def apply_standard(self): + from pandas import Series + + reduce = self.reduce + if reduce is None: + reduce = True + + # try to reduce first (by default) + # this only matters if the reduction in values is of different dtype + # e.g. if we want to apply to a SparseFrame, then can't directly reduce + if reduce: + values = self.values + + # we cannot reduce using non-numpy dtypes, + # as demonstrated in gh-12244 + if not is_extension_type(values): + + # Create a dummy Series from an empty array + index = self.obj._get_axis(self.axis) + empty_arr = np.empty(len(index), dtype=values.dtype) + + dummy = Series(empty_arr, index=index, dtype=values.dtype) + + try: + labels = self.agg_axis + result = lib.reduce(values, self.f, + axis=self.axis, + dummy=dummy, + labels=labels) + return Series(result, index=labels) + except Exception: + pass + + # compute the result using the series generator + results, res_index, res_columns = self._apply_series_generator() + + # wrap results + return self.wrap_results(results, res_index, res_columns) + + def _apply_series_generator(self): + series_gen = self.series_generator + res_index = self.result_index + res_columns = self.result_columns + + i = None + keys = [] + results = {} + if self.ignore_failures: + successes = [] + for i, v in enumerate(series_gen): + try: + results[i] = self.f(v) + keys.append(v.name) + successes.append(i) + except Exception: + pass + + # so will work with MultiIndex + if len(successes) < len(res_index): + res_index = res_index.take(successes) + + else: + try: + for i, v in enumerate(series_gen): + results[i] = self.f(v) + keys.append(v.name) + except Exception as e: + if hasattr(e, 'args'): + + # make sure i is defined + if i is not None: + k = res_index[i] + e.args = e.args + ('occurred at index %s' % + pprint_thing(k), ) + raise + + return results, res_index, res_columns + + def wrap_results(self, results, res_index, res_columns): + from pandas import Series + + if len(results) > 0 and is_sequence(results[0]): + if not isinstance(results[0], Series): + index = res_columns + else: + index = None + + result = self.obj._constructor(data=results, index=index) + result.columns = res_index + + if self.axis == 1: + result = result.T + result = result._convert( + datetime=True, timedelta=True, copy=False) + + else: + + result = Series(results) + result.index = res_index + + return result + + def _apply_broadcast(self, target): + result_values = np.empty_like(target.values) + columns = target.columns + for i, col in enumerate(columns): + result_values[:, i] = self.f(target[col]) + + result = self.obj._constructor(result_values, index=target.index, + columns=target.columns) + return result + + +class FrameRowApply(FrameApply): + axis = 0 + + def get_result(self): + + # dispatch to agg + if isinstance(self.f, (list, dict)): + return self.obj.aggregate(self.f, axis=self.axis, + *self.args, **self.kwds) + + return super(FrameRowApply, self).get_result() + + def apply_broadcast(self): + return self._apply_broadcast(self.obj) + + @property + def series_generator(self): + return (self.obj._ixs(i, axis=1) + for i in range(len(self.columns))) + + @property + def result_index(self): + return self.columns + + @property + def result_columns(self): + return self.index + + +class FrameColumnApply(FrameApply): + axis = 1 + + def __init__(self, obj, func, broadcast, raw, reduce, args, kwds): + super(FrameColumnApply, self).__init__(obj, func, broadcast, + raw, reduce, args, kwds) + + # skip if we are mixed datelike and trying reduce across axes + # GH6125 + if self.reduce: + if self.obj._is_mixed_type and self.obj._is_datelike_mixed_type: + self.reduce = False + + def apply_broadcast(self): + return self._apply_broadcast(self.obj.T).T + + @property + def series_generator(self): + from pandas import Series + dtype = object if self.obj._is_mixed_type else None + return (Series._from_array(arr, index=self.columns, name=name, + dtype=dtype) + for i, (arr, name) in enumerate(zip(self.values, + self.index))) + + @property + def result_index(self): + return self.index + + @property + def result_columns(self): + return self.columns diff --git a/pandas/core/frame.py b/pandas/core/frame.py index 5f323d0f040bc..753c623b2de4c 100644 --- a/pandas/core/frame.py +++ b/pandas/core/frame.py @@ -4810,8 +4810,7 @@ def aggregate(self, func, axis=0, *args, **kwargs): def apply(self, func, axis=0, broadcast=False, raw=False, reduce=None, args=(), **kwds): - """ - Applies function along input axis of DataFrame. + """Applies function along input axis of DataFrame. Objects passed to functions are Series objects having index either the DataFrame's index (axis=0) or the columns (axis=1). @@ -4870,194 +4869,15 @@ def apply(self, func, axis=0, broadcast=False, raw=False, reduce=None, ------- applied : Series or DataFrame """ - axis = self._get_axis_number(axis) - ignore_failures = kwds.pop('ignore_failures', False) - - # dispatch to agg - if axis == 0 and isinstance(func, (list, dict)): - return self.aggregate(func, axis=axis, *args, **kwds) - - if len(self.columns) == 0 and len(self.index) == 0: - return self._apply_empty_result(func, axis, reduce, *args, **kwds) - - # if we are a string, try to dispatch - if isinstance(func, compat.string_types): - if axis: - kwds['axis'] = axis - return getattr(self, func)(*args, **kwds) - - if kwds or args and not isinstance(func, np.ufunc): - def f(x): - return func(x, *args, **kwds) - else: - f = func - - if isinstance(f, np.ufunc): - with np.errstate(all='ignore'): - results = f(self.values) - return self._constructor(data=results, index=self.index, - columns=self.columns, copy=False) - else: - if not broadcast: - if not all(self.shape): - return self._apply_empty_result(func, axis, reduce, *args, - **kwds) - - if raw and not self._is_mixed_type: - return self._apply_raw(f, axis) - else: - if reduce is None: - reduce = True - return self._apply_standard( - f, axis, - reduce=reduce, - ignore_failures=ignore_failures) - else: - return self._apply_broadcast(f, axis) - - def _apply_empty_result(self, func, axis, reduce, *args, **kwds): - if reduce is None: - reduce = False - try: - reduce = not isinstance(func(_EMPTY_SERIES, *args, **kwds), - Series) - except Exception: - pass - - if reduce: - return Series(np.nan, index=self._get_agg_axis(axis)) - else: - return self.copy() - - def _apply_raw(self, func, axis): - try: - result = lib.reduce(self.values, func, axis=axis) - except Exception: - result = np.apply_along_axis(func, axis, self.values) - - # TODO: mixed type case - if result.ndim == 2: - return DataFrame(result, index=self.index, columns=self.columns) - else: - return Series(result, index=self._get_agg_axis(axis)) - - def _apply_standard(self, func, axis, ignore_failures=False, reduce=True): - - # skip if we are mixed datelike and trying reduce across axes - # GH6125 - if (reduce and axis == 1 and self._is_mixed_type and - self._is_datelike_mixed_type): - reduce = False - - # try to reduce first (by default) - # this only matters if the reduction in values is of different dtype - # e.g. if we want to apply to a SparseFrame, then can't directly reduce - if reduce: - values = self.values - - # we cannot reduce using non-numpy dtypes, - # as demonstrated in gh-12244 - if not is_extension_type(values): - # Create a dummy Series from an empty array - index = self._get_axis(axis) - empty_arr = np.empty(len(index), dtype=values.dtype) - dummy = Series(empty_arr, index=self._get_axis(axis), - dtype=values.dtype) - - try: - labels = self._get_agg_axis(axis) - result = lib.reduce(values, func, axis=axis, dummy=dummy, - labels=labels) - return Series(result, index=labels) - except Exception: - pass - - dtype = object if self._is_mixed_type else None - if axis == 0: - series_gen = (self._ixs(i, axis=1) - for i in range(len(self.columns))) - res_index = self.columns - res_columns = self.index - elif axis == 1: - res_index = self.index - res_columns = self.columns - values = self.values - series_gen = (Series._from_array(arr, index=res_columns, name=name, - dtype=dtype) - for i, (arr, name) in enumerate(zip(values, - res_index))) - else: # pragma : no cover - raise AssertionError('Axis must be 0 or 1, got %s' % str(axis)) - - i = None - keys = [] - results = {} - if ignore_failures: - successes = [] - for i, v in enumerate(series_gen): - try: - results[i] = func(v) - keys.append(v.name) - successes.append(i) - except Exception: - pass - # so will work with MultiIndex - if len(successes) < len(res_index): - res_index = res_index.take(successes) - else: - try: - for i, v in enumerate(series_gen): - results[i] = func(v) - keys.append(v.name) - except Exception as e: - if hasattr(e, 'args'): - # make sure i is defined - if i is not None: - k = res_index[i] - e.args = e.args + ('occurred at index %s' % - pprint_thing(k), ) - raise - - if len(results) > 0 and is_sequence(results[0]): - if not isinstance(results[0], Series): - index = res_columns - else: - index = None - - result = self._constructor(data=results, index=index) - result.columns = res_index - - if axis == 1: - result = result.T - result = result._convert(datetime=True, timedelta=True, copy=False) - - else: - - result = Series(results) - result.index = res_index - - return result - - def _apply_broadcast(self, func, axis): - if axis == 0: - target = self - elif axis == 1: - target = self.T - else: # pragma: no cover - raise AssertionError('Axis must be 0 or 1, got %s' % axis) - - result_values = np.empty_like(target.values) - columns = target.columns - for i, col in enumerate(columns): - result_values[:, i] = func(target[col]) - - result = self._constructor(result_values, index=target.index, - columns=target.columns) - - if axis == 1: - result = result.T - - return result + from pandas.core.apply import frame_apply + op = frame_apply(self, + func=func, + axis=axis, + broadcast=broadcast, + raw=raw, + reduce=reduce, + args=args, **kwds) + return op.get_result() def applymap(self, func): """ @@ -6189,8 +6009,6 @@ def isin(self, values): ops.add_flex_arithmetic_methods(DataFrame, **ops.frame_flex_funcs) ops.add_special_arithmetic_methods(DataFrame, **ops.frame_special_funcs) -_EMPTY_SERIES = Series([]) - def _arrays_to_mgr(arrays, arr_names, index, columns, dtype=None): """ diff --git a/pandas/core/sparse/frame.py b/pandas/core/sparse/frame.py index 36a18d8f8b4a0..05f39a8caa6f6 100644 --- a/pandas/core/sparse/frame.py +++ b/pandas/core/sparse/frame.py @@ -861,11 +861,17 @@ def apply(self, func, axis=0, broadcast=False, reduce=False): new_series, index=self.index, columns=self.columns, default_fill_value=self._default_fill_value, default_kind=self._default_kind).__finalize__(self) - else: - if not broadcast: - return self._apply_standard(func, axis, reduce=reduce) - else: - return self._apply_broadcast(func, axis) + + from pandas.core.apply import frame_apply + op = frame_apply(self, + func=func, + axis=axis, + reduce=reduce) + + if broadcast: + return op.apply_broadcast() + + return op.apply_standard() def applymap(self, func): """ diff --git a/pandas/tests/frame/test_apply.py b/pandas/tests/frame/test_apply.py index ab2e810d77634..65dd166e1f6a8 100644 --- a/pandas/tests/frame/test_apply.py +++ b/pandas/tests/frame/test_apply.py @@ -13,6 +13,7 @@ Timestamp, compat) import pandas as pd from pandas.core.dtypes.dtypes import CategoricalDtype +from pandas.core.apply import frame_apply from pandas.util.testing import (assert_series_equal, assert_frame_equal) import pandas.util.testing as tm @@ -153,8 +154,9 @@ def test_apply_axis1(self): assert tapplied[d] == np.mean(self.frame.xs(d)) def test_apply_ignore_failures(self): - result = self.mixed_frame._apply_standard(np.mean, 0, - ignore_failures=True) + result = frame_apply(self.mixed_frame, + np.mean, 0, + ignore_failures=True).apply_standard() expected = self.mixed_frame._get_numeric_data().apply(np.mean) assert_series_equal(result, expected)