Skip to content

Commit d87ca1c

Browse files
WillAydjreback
authored andcommittedFeb 25, 2018
Cythonized GroupBy Fill (#19673)
1 parent 10cc8f4 commit d87ca1c

File tree

6 files changed

+362
-185
lines changed

6 files changed

+362
-185
lines changed
 

‎asv_bench/benchmarks/groupby.py

+5-5
Original file line numberDiff line numberDiff line change
@@ -370,11 +370,11 @@ class GroupByMethods(object):
370370

371371
param_names = ['dtype', 'method']
372372
params = [['int', 'float'],
373-
['all', 'any', 'count', 'cumcount', 'cummax', 'cummin',
374-
'cumprod', 'cumsum', 'describe', 'first', 'head', 'last', 'mad',
375-
'max', 'min', 'median', 'mean', 'nunique', 'pct_change', 'prod',
376-
'rank', 'sem', 'shift', 'size', 'skew', 'std', 'sum', 'tail',
377-
'unique', 'value_counts', 'var']]
373+
['all', 'any', 'bfill', 'count', 'cumcount', 'cummax', 'cummin',
374+
'cumprod', 'cumsum', 'describe', 'ffill', 'first', 'head',
375+
'last', 'mad', 'max', 'min', 'median', 'mean', 'nunique',
376+
'pct_change', 'prod', 'rank', 'sem', 'shift', 'size', 'skew',
377+
'std', 'sum', 'tail', 'unique', 'value_counts', 'var']]
378378

379379
def setup(self, dtype, method):
380380
ngroups = 1000

‎doc/source/whatsnew/v0.23.0.txt

+1
Original file line numberDiff line numberDiff line change
@@ -689,6 +689,7 @@ Performance Improvements
689689
- Improved performance of pairwise ``.rolling()`` and ``.expanding()`` with ``.cov()`` and ``.corr()`` operations (:issue:`17917`)
690690
- Improved performance of :func:`DataFrameGroupBy.rank` (:issue:`15779`)
691691
- Improved performance of variable ``.rolling()`` on ``.min()`` and ``.max()`` (:issue:`19521`)
692+
- Improved performance of ``GroupBy.ffill`` and ``GroupBy.bfill`` (:issue:`11296`)
692693

693694
.. _whatsnew_0230.docs:
694695

‎pandas/_libs/groupby.pyx

+216
Original file line numberDiff line numberDiff line change
@@ -94,5 +94,221 @@ cdef inline float64_t kth_smallest_c(float64_t* a,
9494
return a[k]
9595

9696

97+
@cython.boundscheck(False)
98+
@cython.wraparound(False)
99+
def group_median_float64(ndarray[float64_t, ndim=2] out,
100+
ndarray[int64_t] counts,
101+
ndarray[float64_t, ndim=2] values,
102+
ndarray[int64_t] labels,
103+
Py_ssize_t min_count=-1):
104+
"""
105+
Only aggregates on axis=0
106+
"""
107+
cdef:
108+
Py_ssize_t i, j, N, K, ngroups, size
109+
ndarray[int64_t] _counts
110+
ndarray data
111+
float64_t* ptr
112+
113+
assert min_count == -1, "'min_count' only used in add and prod"
114+
115+
ngroups = len(counts)
116+
N, K = (<object> values).shape
117+
118+
indexer, _counts = groupsort_indexer(labels, ngroups)
119+
counts[:] = _counts[1:]
120+
121+
data = np.empty((K, N), dtype=np.float64)
122+
ptr = <float64_t*> data.data
123+
124+
take_2d_axis1_float64_float64(values.T, indexer, out=data)
125+
126+
with nogil:
127+
128+
for i in range(K):
129+
# exclude NA group
130+
ptr += _counts[0]
131+
for j in range(ngroups):
132+
size = _counts[j + 1]
133+
out[j, i] = median_linear(ptr, size)
134+
ptr += size
135+
136+
137+
@cython.boundscheck(False)
138+
@cython.wraparound(False)
139+
def group_cumprod_float64(float64_t[:, :] out,
140+
float64_t[:, :] values,
141+
int64_t[:] labels,
142+
bint is_datetimelike):
143+
"""
144+
Only transforms on axis=0
145+
"""
146+
cdef:
147+
Py_ssize_t i, j, N, K, size
148+
float64_t val
149+
float64_t[:, :] accum
150+
int64_t lab
151+
152+
N, K = (<object> values).shape
153+
accum = np.ones_like(values)
154+
155+
with nogil:
156+
for i in range(N):
157+
lab = labels[i]
158+
159+
if lab < 0:
160+
continue
161+
for j in range(K):
162+
val = values[i, j]
163+
if val == val:
164+
accum[lab, j] *= val
165+
out[i, j] = accum[lab, j]
166+
167+
168+
@cython.boundscheck(False)
169+
@cython.wraparound(False)
170+
def group_cumsum(numeric[:, :] out,
171+
numeric[:, :] values,
172+
int64_t[:] labels,
173+
is_datetimelike):
174+
"""
175+
Only transforms on axis=0
176+
"""
177+
cdef:
178+
Py_ssize_t i, j, N, K, size
179+
numeric val
180+
numeric[:, :] accum
181+
int64_t lab
182+
183+
N, K = (<object> values).shape
184+
accum = np.zeros_like(values)
185+
186+
with nogil:
187+
for i in range(N):
188+
lab = labels[i]
189+
190+
if lab < 0:
191+
continue
192+
for j in range(K):
193+
val = values[i, j]
194+
195+
if numeric == float32_t or numeric == float64_t:
196+
if val == val:
197+
accum[lab, j] += val
198+
out[i, j] = accum[lab, j]
199+
else:
200+
accum[lab, j] += val
201+
out[i, j] = accum[lab, j]
202+
203+
204+
@cython.boundscheck(False)
205+
@cython.wraparound(False)
206+
def group_shift_indexer(ndarray[int64_t] out, ndarray[int64_t] labels,
207+
int ngroups, int periods):
208+
cdef:
209+
Py_ssize_t N, i, j, ii
210+
int offset, sign
211+
int64_t lab, idxer, idxer_slot
212+
int64_t[:] label_seen = np.zeros(ngroups, dtype=np.int64)
213+
int64_t[:, :] label_indexer
214+
215+
N, = (<object> labels).shape
216+
217+
if periods < 0:
218+
periods = -periods
219+
offset = N - 1
220+
sign = -1
221+
elif periods > 0:
222+
offset = 0
223+
sign = 1
224+
225+
if periods == 0:
226+
with nogil:
227+
for i in range(N):
228+
out[i] = i
229+
else:
230+
# array of each previous indexer seen
231+
label_indexer = np.zeros((ngroups, periods), dtype=np.int64)
232+
with nogil:
233+
for i in range(N):
234+
## reverse iterator if shifting backwards
235+
ii = offset + sign * i
236+
lab = labels[ii]
237+
238+
# Skip null keys
239+
if lab == -1:
240+
out[ii] = -1
241+
continue
242+
243+
label_seen[lab] += 1
244+
245+
idxer_slot = label_seen[lab] % periods
246+
idxer = label_indexer[lab, idxer_slot]
247+
248+
if label_seen[lab] > periods:
249+
out[ii] = idxer
250+
else:
251+
out[ii] = -1
252+
253+
label_indexer[lab, idxer_slot] = ii
254+
255+
256+
@cython.wraparound(False)
257+
@cython.boundscheck(False)
258+
def group_fillna_indexer(ndarray[int64_t] out, ndarray[int64_t] labels,
259+
ndarray[uint8_t] mask, object direction,
260+
int64_t limit):
261+
"""Indexes how to fill values forwards or backwards within a group
262+
263+
Parameters
264+
----------
265+
out : array of int64_t values which this method will write its results to
266+
Missing values will be written to with a value of -1
267+
labels : array containing unique label for each group, with its ordering
268+
matching up to the corresponding record in `values`
269+
mask : array of int64_t values where a 1 indicates a missing value
270+
direction : {'ffill', 'bfill'}
271+
Direction for fill to be applied (forwards or backwards, respectively)
272+
limit : Consecutive values to fill before stopping, or -1 for no limit
273+
274+
Notes
275+
-----
276+
This method modifies the `out` parameter rather than returning an object
277+
"""
278+
cdef:
279+
Py_ssize_t i, N
280+
ndarray[int64_t] sorted_labels
281+
int64_t idx, curr_fill_idx=-1, filled_vals=0
282+
283+
N = len(out)
284+
285+
# Make sure all arrays are the same size
286+
assert N == len(labels) == len(mask)
287+
288+
sorted_labels = np.argsort(labels).astype(np.int64, copy=False)
289+
if direction == 'bfill':
290+
sorted_labels = sorted_labels[::-1]
291+
292+
with nogil:
293+
for i in range(N):
294+
idx = sorted_labels[i]
295+
if mask[idx] == 1: # is missing
296+
# Stop filling once we've hit the limit
297+
if filled_vals >= limit and limit != -1:
298+
curr_fill_idx = -1
299+
filled_vals += 1
300+
else: # reset items when not missing
301+
filled_vals = 0
302+
curr_fill_idx = idx
303+
304+
out[idx] = curr_fill_idx
305+
306+
# If we move to the next group, reset
307+
# the fill_idx and counter
308+
if i == N - 1 or labels[idx] != labels[sorted_labels[i+1]]:
309+
curr_fill_idx = -1
310+
filled_vals = 0
311+
312+
97313
# generated from template
98314
include "groupby_helper.pxi"

‎pandas/_libs/groupby_helper.pxi.in

-163
Original file line numberDiff line numberDiff line change
@@ -791,166 +791,3 @@ def group_cummax_{{name}}(ndarray[{{dest_type2}}, ndim=2] out,
791791
out[i, j] = mval
792792

793793
{{endfor}}
794-
795-
#----------------------------------------------------------------------
796-
# other grouping functions not needing a template
797-
#----------------------------------------------------------------------
798-
799-
800-
@cython.boundscheck(False)
801-
@cython.wraparound(False)
802-
def group_median_float64(ndarray[float64_t, ndim=2] out,
803-
ndarray[int64_t] counts,
804-
ndarray[float64_t, ndim=2] values,
805-
ndarray[int64_t] labels,
806-
Py_ssize_t min_count=-1):
807-
"""
808-
Only aggregates on axis=0
809-
"""
810-
cdef:
811-
Py_ssize_t i, j, N, K, ngroups, size
812-
ndarray[int64_t] _counts
813-
ndarray data
814-
float64_t* ptr
815-
816-
assert min_count == -1, "'min_count' only used in add and prod"
817-
818-
ngroups = len(counts)
819-
N, K = (<object> values).shape
820-
821-
indexer, _counts = groupsort_indexer(labels, ngroups)
822-
counts[:] = _counts[1:]
823-
824-
data = np.empty((K, N), dtype=np.float64)
825-
ptr = <float64_t*> data.data
826-
827-
take_2d_axis1_float64_float64(values.T, indexer, out=data)
828-
829-
with nogil:
830-
831-
for i in range(K):
832-
# exclude NA group
833-
ptr += _counts[0]
834-
for j in range(ngroups):
835-
size = _counts[j + 1]
836-
out[j, i] = median_linear(ptr, size)
837-
ptr += size
838-
839-
840-
@cython.boundscheck(False)
841-
@cython.wraparound(False)
842-
def group_cumprod_float64(float64_t[:, :] out,
843-
float64_t[:, :] values,
844-
int64_t[:] labels,
845-
bint is_datetimelike):
846-
"""
847-
Only transforms on axis=0
848-
"""
849-
cdef:
850-
Py_ssize_t i, j, N, K, size
851-
float64_t val
852-
float64_t[:, :] accum
853-
int64_t lab
854-
855-
N, K = (<object> values).shape
856-
accum = np.ones_like(values)
857-
858-
with nogil:
859-
for i in range(N):
860-
lab = labels[i]
861-
862-
if lab < 0:
863-
continue
864-
for j in range(K):
865-
val = values[i, j]
866-
if val == val:
867-
accum[lab, j] *= val
868-
out[i, j] = accum[lab, j]
869-
870-
871-
@cython.boundscheck(False)
872-
@cython.wraparound(False)
873-
def group_cumsum(numeric[:, :] out,
874-
numeric[:, :] values,
875-
int64_t[:] labels,
876-
is_datetimelike):
877-
"""
878-
Only transforms on axis=0
879-
"""
880-
cdef:
881-
Py_ssize_t i, j, N, K, size
882-
numeric val
883-
numeric[:, :] accum
884-
int64_t lab
885-
886-
N, K = (<object> values).shape
887-
accum = np.zeros_like(values)
888-
889-
with nogil:
890-
for i in range(N):
891-
lab = labels[i]
892-
893-
if lab < 0:
894-
continue
895-
for j in range(K):
896-
val = values[i, j]
897-
898-
if numeric == float32_t or numeric == float64_t:
899-
if val == val:
900-
accum[lab, j] += val
901-
out[i, j] = accum[lab, j]
902-
else:
903-
accum[lab, j] += val
904-
out[i, j] = accum[lab, j]
905-
906-
907-
@cython.boundscheck(False)
908-
@cython.wraparound(False)
909-
def group_shift_indexer(int64_t[:] out, int64_t[:] labels,
910-
int ngroups, int periods):
911-
cdef:
912-
Py_ssize_t N, i, j, ii
913-
int offset, sign
914-
int64_t lab, idxer, idxer_slot
915-
int64_t[:] label_seen = np.zeros(ngroups, dtype=np.int64)
916-
int64_t[:, :] label_indexer
917-
918-
N, = (<object> labels).shape
919-
920-
if periods < 0:
921-
periods = -periods
922-
offset = N - 1
923-
sign = -1
924-
elif periods > 0:
925-
offset = 0
926-
sign = 1
927-
928-
if periods == 0:
929-
with nogil:
930-
for i in range(N):
931-
out[i] = i
932-
else:
933-
# array of each previous indexer seen
934-
label_indexer = np.zeros((ngroups, periods), dtype=np.int64)
935-
with nogil:
936-
for i in range(N):
937-
## reverse iterator if shifting backwards
938-
ii = offset + sign * i
939-
lab = labels[ii]
940-
941-
# Skip null keys
942-
if lab == -1:
943-
out[ii] = -1
944-
continue
945-
946-
label_seen[lab] += 1
947-
948-
idxer_slot = label_seen[lab] % periods
949-
idxer = label_indexer[lab, idxer_slot]
950-
951-
if label_seen[lab] > periods:
952-
out[ii] = idxer
953-
else:
954-
out[ii] = -1
955-
956-
label_indexer[lab, idxer_slot] = ii

‎pandas/core/groupby.py

+85-17
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import types
2-
from functools import wraps
2+
from functools import wraps, partial
33
import numpy as np
44
import datetime
55
import collections
@@ -38,7 +38,7 @@
3838
_ensure_float)
3939
from pandas.core.dtypes.cast import maybe_downcast_to_dtype
4040
from pandas.core.dtypes.generic import ABCSeries
41-
from pandas.core.dtypes.missing import isna, notna, _maybe_fill
41+
from pandas.core.dtypes.missing import isna, isnull, notna, _maybe_fill
4242

4343
from pandas.core.base import (PandasObject, SelectionMixin, GroupByError,
4444
DataError, SpecificationError)
@@ -1457,6 +1457,36 @@ def expanding(self, *args, **kwargs):
14571457
from pandas.core.window import ExpandingGroupby
14581458
return ExpandingGroupby(self, *args, **kwargs)
14591459

1460+
def _fill(self, direction, limit=None):
1461+
"""Shared function for `pad` and `backfill` to call Cython method
1462+
1463+
Parameters
1464+
----------
1465+
direction : {'ffill', 'bfill'}
1466+
Direction passed to underlying Cython function. `bfill` will cause
1467+
values to be filled backwards. `ffill` and any other values will
1468+
default to a forward fill
1469+
limit : int, default None
1470+
Maximum number of consecutive values to fill. If `None`, this
1471+
method will convert to -1 prior to passing to Cython
1472+
1473+
Returns
1474+
-------
1475+
`Series` or `DataFrame` with filled values
1476+
1477+
See Also
1478+
--------
1479+
pad
1480+
backfill
1481+
"""
1482+
# Need int value for Cython
1483+
if limit is None:
1484+
limit = -1
1485+
1486+
return self._get_cythonized_result('group_fillna_indexer',
1487+
self.grouper, needs_mask=True,
1488+
direction=direction, limit=limit)
1489+
14601490
@Substitution(name='groupby')
14611491
def pad(self, limit=None):
14621492
"""
@@ -1474,7 +1504,7 @@ def pad(self, limit=None):
14741504
Series.fillna
14751505
DataFrame.fillna
14761506
"""
1477-
return self.apply(lambda x: x.ffill(limit=limit))
1507+
return self._fill('ffill', limit=limit)
14781508
ffill = pad
14791509

14801510
@Substitution(name='groupby')
@@ -1494,7 +1524,7 @@ def backfill(self, limit=None):
14941524
Series.fillna
14951525
DataFrame.fillna
14961526
"""
1497-
return self.apply(lambda x: x.bfill(limit=limit))
1527+
return self._fill('bfill', limit=limit)
14981528
bfill = backfill
14991529

15001530
@Substitution(name='groupby')
@@ -1843,6 +1873,45 @@ def cummax(self, axis=0, **kwargs):
18431873

18441874
return self._cython_transform('cummax', numeric_only=False)
18451875

1876+
def _get_cythonized_result(self, how, grouper, needs_mask=False,
1877+
needs_ngroups=False, **kwargs):
1878+
"""Get result for Cythonized functions
1879+
1880+
Parameters
1881+
----------
1882+
how : str, Cythonized function name to be called
1883+
grouper : Grouper object containing pertinent group info
1884+
needs_mask : bool, default False
1885+
Whether boolean mask needs to be part of the Cython call signature
1886+
needs_ngroups : bool, default False
1887+
Whether number of groups part of the Cython call signature
1888+
**kwargs : dict
1889+
Extra arguments to be passed back to Cython funcs
1890+
1891+
Returns
1892+
-------
1893+
`Series` or `DataFrame` with filled values
1894+
"""
1895+
1896+
labels, _, ngroups = grouper.group_info
1897+
output = collections.OrderedDict()
1898+
base_func = getattr(libgroupby, how)
1899+
1900+
for name, obj in self._iterate_slices():
1901+
indexer = np.zeros_like(labels, dtype=np.int64)
1902+
func = partial(base_func, indexer, labels)
1903+
if needs_mask:
1904+
mask = isnull(obj.values).view(np.uint8)
1905+
func = partial(func, mask)
1906+
1907+
if needs_ngroups:
1908+
func = partial(func, ngroups)
1909+
1910+
func(**kwargs) # Call func to modify indexer values in place
1911+
output[name] = algorithms.take_nd(obj.values, indexer)
1912+
1913+
return self._wrap_transformed_output(output)
1914+
18461915
@Substitution(name='groupby')
18471916
@Appender(_doc_template)
18481917
def shift(self, periods=1, freq=None, axis=0):
@@ -1860,17 +1929,9 @@ def shift(self, periods=1, freq=None, axis=0):
18601929
if freq is not None or axis != 0:
18611930
return self.apply(lambda x: x.shift(periods, freq, axis))
18621931

1863-
labels, _, ngroups = self.grouper.group_info
1864-
1865-
# filled in by Cython
1866-
indexer = np.zeros_like(labels)
1867-
libgroupby.group_shift_indexer(indexer, labels, ngroups, periods)
1868-
1869-
output = {}
1870-
for name, obj in self._iterate_slices():
1871-
output[name] = algorithms.take_nd(obj.values, indexer)
1872-
1873-
return self._wrap_transformed_output(output)
1932+
return self._get_cythonized_result('group_shift_indexer',
1933+
self.grouper, needs_ngroups=True,
1934+
periods=periods)
18741935

18751936
@Substitution(name='groupby')
18761937
@Appender(_doc_template)
@@ -3577,7 +3638,6 @@ def describe(self, **kwargs):
35773638
def value_counts(self, normalize=False, sort=True, ascending=False,
35783639
bins=None, dropna=True):
35793640

3580-
from functools import partial
35813641
from pandas.core.reshape.tile import cut
35823642
from pandas.core.reshape.merge import _get_join_indexers
35833643

@@ -4585,9 +4645,17 @@ def _apply_to_column_groupbys(self, func):
45854645
in self._iterate_column_groupbys()),
45864646
keys=self._selected_obj.columns, axis=1)
45874647

4648+
def _fill(self, direction, limit=None):
4649+
"""Overriden method to join grouped columns in output"""
4650+
res = super(DataFrameGroupBy, self)._fill(direction, limit=limit)
4651+
output = collections.OrderedDict(
4652+
(grp.name, grp.grouper) for grp in self.grouper.groupings)
4653+
4654+
from pandas import concat
4655+
return concat((self._wrap_transformed_output(output), res), axis=1)
4656+
45884657
def count(self):
45894658
""" Compute count of group, excluding missing values """
4590-
from functools import partial
45914659
from pandas.core.dtypes.missing import _isna_ndarraylike as isna
45924660

45934661
data, _ = self._get_data_to_aggregate()

‎pandas/tests/groupby/test_groupby.py

+55
Original file line numberDiff line numberDiff line change
@@ -2061,6 +2061,61 @@ def test_rank_object_raises(self, ties_method, ascending, na_option,
20612061
ascending=ascending,
20622062
na_option=na_option, pct=pct)
20632063

2064+
@pytest.mark.parametrize("mix_groupings", [True, False])
2065+
@pytest.mark.parametrize("as_series", [True, False])
2066+
@pytest.mark.parametrize("val1,val2", [
2067+
('foo', 'bar'), (1, 2), (1., 2.)])
2068+
@pytest.mark.parametrize("fill_method,limit,exp_vals", [
2069+
("ffill", None,
2070+
[np.nan, np.nan, 'val1', 'val1', 'val1', 'val2', 'val2', 'val2']),
2071+
("ffill", 1,
2072+
[np.nan, np.nan, 'val1', 'val1', np.nan, 'val2', 'val2', np.nan]),
2073+
("bfill", None,
2074+
['val1', 'val1', 'val1', 'val2', 'val2', 'val2', np.nan, np.nan]),
2075+
("bfill", 1,
2076+
[np.nan, 'val1', 'val1', np.nan, 'val2', 'val2', np.nan, np.nan])
2077+
])
2078+
def test_group_fill_methods(self, mix_groupings, as_series, val1, val2,
2079+
fill_method, limit, exp_vals):
2080+
vals = [np.nan, np.nan, val1, np.nan, np.nan, val2, np.nan, np.nan]
2081+
_exp_vals = list(exp_vals)
2082+
# Overwrite placeholder values
2083+
for index, exp_val in enumerate(_exp_vals):
2084+
if exp_val == 'val1':
2085+
_exp_vals[index] = val1
2086+
elif exp_val == 'val2':
2087+
_exp_vals[index] = val2
2088+
2089+
# Need to modify values and expectations depending on the
2090+
# Series / DataFrame that we ultimately want to generate
2091+
if mix_groupings: # ['a', 'b', 'a, 'b', ...]
2092+
keys = ['a', 'b'] * len(vals)
2093+
2094+
def interweave(list_obj):
2095+
temp = list()
2096+
for x in list_obj:
2097+
temp.extend([x, x])
2098+
2099+
return temp
2100+
2101+
_exp_vals = interweave(_exp_vals)
2102+
vals = interweave(vals)
2103+
else: # ['a', 'a', 'a', ... 'b', 'b', 'b']
2104+
keys = ['a'] * len(vals) + ['b'] * len(vals)
2105+
_exp_vals = _exp_vals * 2
2106+
vals = vals * 2
2107+
2108+
df = DataFrame({'key': keys, 'val': vals})
2109+
if as_series:
2110+
result = getattr(
2111+
df.groupby('key')['val'], fill_method)(limit=limit)
2112+
exp = Series(_exp_vals, name='val')
2113+
assert_series_equal(result, exp)
2114+
else:
2115+
result = getattr(df.groupby('key'), fill_method)(limit=limit)
2116+
exp = DataFrame({'key': keys, 'val': _exp_vals})
2117+
assert_frame_equal(result, exp)
2118+
20642119
def test_dont_clobber_name_column(self):
20652120
df = DataFrame({'key': ['a', 'a', 'a', 'b', 'b', 'b'],
20662121
'name': ['foo', 'bar', 'baz'] * 2})

0 commit comments

Comments
 (0)
Please sign in to comment.