-
-
Notifications
You must be signed in to change notification settings - Fork 18.4k
PERF: Cythonize Groupby Rank #19481
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
PERF: Cythonize Groupby Rank #19481
Changes from all commits
396f1b6
c2c2177
529503f
c7faa3b
baeb192
07c8e0f
2ba6643
4e54aa5
428d32c
902ef3c
ecd4b51
e17433d
b0ea557
e15b4b2
04eb4f1
7a4602d
7be3bf3
ca28350
913ce94
4755941
d4a6662
56e7974
9d7c3e6
178654d
f6ae88a
caacef2
a315a92
a6ca485
fd29d70
b9e4719
613384c
94a2749
3ee99c0
b430635
aa4578d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -444,8 +444,173 @@ def group_nth_{{name}}(ndarray[{{dest_type2}}, ndim=2] out, | |
else: | ||
out[i, j] = resx[i, j] | ||
|
||
|
||
@cython.boundscheck(False) | ||
@cython.wraparound(False) | ||
def group_rank_{{name}}(ndarray[float64_t, ndim=2] out, | ||
ndarray[{{c_type}}, ndim=2] values, | ||
ndarray[int64_t] labels, | ||
bint is_datetimelike, object ties_method, | ||
bint ascending, bint pct, object na_option): | ||
"""Provides the rank of values within each group | ||
|
||
Parameters | ||
---------- | ||
out : array of float64_t values which this method will write its results to | ||
values : array of {{c_type}} values to be ranked | ||
labels : array containing unique label for each group, with its ordering | ||
matching up to the corresponding record in `values` | ||
is_datetimelike : bool | ||
unused in this method but provided for call compatability with other | ||
Cython transformations | ||
ties_method : {'keep', 'top', 'bottom'} | ||
* keep: leave NA values where they are | ||
* top: smallest rank if ascending | ||
* bottom: smallest rank if descending | ||
ascending : boolean | ||
False for ranks by high (1) to low (N) | ||
pct : boolean | ||
Compute percentage rank of data within each group | ||
|
||
Notes | ||
----- | ||
This method modifies the `out` parameter rather than returning an object | ||
""" | ||
cdef: | ||
TiebreakEnumType tiebreak | ||
Py_ssize_t i, j, N, K, val_start=0, grp_start=0, dups=0, sum_ranks=0 | ||
Py_ssize_t grp_vals_seen=1, grp_na_count=0 | ||
ndarray[int64_t] _as | ||
ndarray[float64_t, ndim=2] grp_sizes | ||
ndarray[{{c_type}}] masked_vals | ||
ndarray[uint8_t] mask | ||
bint keep_na | ||
{{c_type}} nan_fill_val | ||
|
||
tiebreak = tiebreakers[ties_method] | ||
keep_na = na_option == 'keep' | ||
N, K = (<object> values).shape | ||
grp_sizes = np.ones_like(out) | ||
|
||
# Copy values into new array in order to fill missing data | ||
# with mask, without obfuscating location of missing data | ||
# in values array | ||
masked_vals = np.array(values[:, 0], copy=True) | ||
{{if name=='int64'}} | ||
mask = (masked_vals == {{nan_val}}).astype(np.uint8) | ||
{{else}} | ||
mask = np.isnan(masked_vals).astype(np.uint8) | ||
{{endif}} | ||
|
||
if ascending ^ (na_option == 'top'): | ||
{{if name == 'int64'}} | ||
nan_fill_val = np.iinfo(np.int64).max | ||
{{else}} | ||
nan_fill_val = np.inf | ||
{{endif}} | ||
order = (masked_vals, mask, labels) | ||
else: | ||
{{if name == 'int64'}} | ||
nan_fill_val = np.iinfo(np.int64).min | ||
{{else}} | ||
nan_fill_val = -np.inf | ||
{{endif}} | ||
order = (masked_vals, ~mask, labels) | ||
np.putmask(masked_vals, mask, nan_fill_val) | ||
|
||
# lexsort using labels, then mask, then actual values | ||
# each label corresponds to a different group value, | ||
# the mask helps you differentiate missing values before | ||
# performing sort on the actual values | ||
_as = np.lexsort(order) | ||
|
||
if not ascending: | ||
_as = _as[::-1] | ||
|
||
with nogil: | ||
# Loop over the length of the value array | ||
# each incremental i value can be looked up in the _as array | ||
# that we sorted previously, which gives us the location of | ||
# that sorted value for retrieval back from the original | ||
# values / masked_vals arrays | ||
for i in range(N): | ||
# dups and sum_ranks will be incremented each loop where | ||
# the value / group remains the same, and should be reset | ||
# when either of those change | ||
# Used to calculate tiebreakers | ||
dups += 1 | ||
sum_ranks += i - grp_start + 1 | ||
|
||
# if keep_na, check for missing values and assign back | ||
# to the result where appropriate | ||
if keep_na and masked_vals[_as[i]] == nan_fill_val: | ||
grp_na_count += 1 | ||
out[_as[i], 0] = nan | ||
else: | ||
# this implementation is inefficient because it will | ||
# continue overwriting previously encountered dups | ||
# i.e. if 5 duplicated values are encountered it will | ||
# write to the result as follows (assumes avg tiebreaker): | ||
# 1 | ||
# .5 .5 | ||
# .33 .33 .33 | ||
# .25 .25 .25 .25 | ||
# .2 .2 .2 .2 .2 | ||
# | ||
# could potentially be optimized to only write to the | ||
# result once the last duplicate value is encountered | ||
if tiebreak == TIEBREAK_AVERAGE: | ||
for j in range(i - dups + 1, i + 1): | ||
out[_as[j], 0] = sum_ranks / <float64_t>dups | ||
elif tiebreak == TIEBREAK_MIN: | ||
for j in range(i - dups + 1, i + 1): | ||
out[_as[j], 0] = i - grp_start - dups + 2 | ||
elif tiebreak == TIEBREAK_MAX: | ||
for j in range(i - dups + 1, i + 1): | ||
out[_as[j], 0] = i - grp_start + 1 | ||
elif tiebreak == TIEBREAK_FIRST: | ||
for j in range(i - dups + 1, i + 1): | ||
if ascending: | ||
out[_as[j], 0] = j + 1 - grp_start | ||
else: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. any comments on the impl would be helpful (to future readers) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nice comments! |
||
out[_as[j], 0] = 2 * i - j - dups + 2 - grp_start | ||
elif tiebreak == TIEBREAK_DENSE: | ||
for j in range(i - dups + 1, i + 1): | ||
out[_as[j], 0] = grp_vals_seen | ||
|
||
# look forward to the next value (using the sorting in _as) | ||
# if the value does not equal the current value then we need to | ||
# reset the dups and sum_ranks, knowing that a new value is coming | ||
# up. the conditional also needs to handle nan equality and the | ||
# end of iteration | ||
if (i == N - 1 or ( | ||
(masked_vals[_as[i]] != masked_vals[_as[i+1]]) and not | ||
(mask[_as[i]] and mask[_as[i+1]]))): | ||
dups = sum_ranks = 0 | ||
val_start = i | ||
grp_vals_seen += 1 | ||
|
||
# Similar to the previous conditional, check now if we are moving | ||
# to a new group. If so, keep track of the index where the new | ||
# group occurs, so the tiebreaker calculations can decrement that | ||
# from their position. fill in the size of each group encountered | ||
# (used by pct calculations later). also be sure to reset any of | ||
# the items helping to calculate dups | ||
if i == N - 1 or labels[_as[i]] != labels[_as[i+1]]: | ||
for j in range(grp_start, i + 1): | ||
grp_sizes[_as[j], 0] = i - grp_start + 1 - grp_na_count | ||
dups = sum_ranks = 0 | ||
grp_na_count = 0 | ||
val_start = i + 1 | ||
grp_start = i + 1 | ||
grp_vals_seen = 1 | ||
|
||
if pct: | ||
for i in range(N): | ||
out[i, 0] = out[i, 0] / grp_sizes[i, 0] | ||
{{endfor}} | ||
|
||
|
||
#---------------------------------------------------------------------- | ||
# group_min, group_max | ||
#---------------------------------------------------------------------- | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -994,20 +994,24 @@ def _transform_should_cast(self, func_nm): | |
return (self.size().fillna(0) > 0).any() and (func_nm not in | ||
_cython_cast_blacklist) | ||
|
||
def _cython_transform(self, how, numeric_only=True): | ||
def _cython_transform(self, how, numeric_only=True, **kwargs): | ||
output = collections.OrderedDict() | ||
for name, obj in self._iterate_slices(): | ||
is_numeric = is_numeric_dtype(obj.dtype) | ||
if numeric_only and not is_numeric: | ||
continue | ||
|
||
try: | ||
result, names = self.grouper.transform(obj.values, how) | ||
result, names = self.grouper.transform(obj.values, how, | ||
**kwargs) | ||
except NotImplementedError: | ||
continue | ||
except AssertionError as e: | ||
raise GroupByError(str(e)) | ||
output[name] = self._try_cast(result, obj) | ||
if self._transform_should_cast(how): | ||
output[name] = self._try_cast(result, obj) | ||
else: | ||
output[name] = result | ||
|
||
if len(output) == 0: | ||
raise DataError('No numeric types to aggregate') | ||
|
@@ -1768,6 +1772,37 @@ def cumcount(self, ascending=True): | |
cumcounts = self._cumcount_array(ascending=ascending) | ||
return Series(cumcounts, index) | ||
|
||
@Substitution(name='groupby') | ||
@Appender(_doc_template) | ||
def rank(self, method='average', ascending=True, na_option='keep', | ||
pct=False, axis=0): | ||
"""Provides the rank of values within each group | ||
|
||
Parameters | ||
---------- | ||
method : {'average', 'min', 'max', 'first', 'dense'}, efault 'average' | ||
* average: average rank of group | ||
* min: lowest rank in group | ||
* max: highest rank in group | ||
* first: ranks assigned in order they appear in the array | ||
* dense: like 'min', but rank always increases by 1 between groups | ||
method : {'keep', 'top', 'bottom'}, default 'keep' | ||
* keep: leave NA values where they are | ||
* top: smallest rank if ascending | ||
* bottom: smallest rank if descending | ||
ascending : boolean, default True | ||
False for ranks by high (1) to low (N) | ||
pct : boolean, default False | ||
Compute percentage rank of data within each group | ||
|
||
Returns | ||
----- | ||
DataFrame with ranking of values within each group | ||
""" | ||
return self._cython_transform('rank', numeric_only=False, | ||
ties_method=method, ascending=ascending, | ||
na_option=na_option, pct=pct, axis=axis) | ||
|
||
@Substitution(name='groupby') | ||
@Appender(_doc_template) | ||
def cumprod(self, axis=0, *args, **kwargs): | ||
|
@@ -2183,6 +2218,16 @@ def get_group_levels(self): | |
'cumsum': 'group_cumsum', | ||
'cummin': 'group_cummin', | ||
'cummax': 'group_cummax', | ||
'rank': { | ||
'name': 'group_rank', | ||
'f': lambda func, a, b, c, d, **kwargs: func( | ||
a, b, c, d, | ||
kwargs.get('ties_method', 'average'), | ||
kwargs.get('ascending', True), | ||
kwargs.get('pct', False), | ||
kwargs.get('na_option', 'keep') | ||
) | ||
} | ||
} | ||
} | ||
|
||
|
@@ -2242,7 +2287,8 @@ def wrapper(*args, **kwargs): | |
(how, dtype_str)) | ||
return func | ||
|
||
def _cython_operation(self, kind, values, how, axis, min_count=-1): | ||
def _cython_operation(self, kind, values, how, axis, min_count=-1, | ||
**kwargs): | ||
assert kind in ['transform', 'aggregate'] | ||
|
||
# can we do this operation with our cython functions | ||
|
@@ -2314,10 +2360,13 @@ def _cython_operation(self, kind, values, how, axis, min_count=-1): | |
else: | ||
raise | ||
|
||
if is_numeric: | ||
out_dtype = '%s%d' % (values.dtype.kind, values.dtype.itemsize) | ||
if how == 'rank': | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we should prob have a better way of doing this :< There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The catch I see here is that unlike other groupby transformations (where the dtype of the The generic calls to I'd argue the effort and maintainability of doing that would far outweigh any benefit from cleaning up this conditional, but curious if you have other thoughts There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is similar in a way to the pre-transformations we do for datetimelikes. What I meant is that the registry of _cython_functions should have this logic itself (which is already based on the function name). e.g. maybe in _cython_functions you get back a tuple rather than a scalar / lamba function, which includes the required dtype (or None). Or _cython_functions actually should be a class which is dispatched to, to call the functions themselves. IOW something like
(and really should have more logic pushed to this class, e.g. pre-and post-convert dtype things. And should have a Transformation and an Aggregation version. This would all of the specific logic to be pushed here, rather than lookup in dicts and such. Further we could prob move this code out of the main groupby.py class into a sub-module. don't have to do this here, but groupby for sure needs cleanup like this. |
||
out_dtype = 'float' | ||
else: | ||
out_dtype = 'object' | ||
if is_numeric: | ||
out_dtype = '%s%d' % (values.dtype.kind, values.dtype.itemsize) | ||
else: | ||
out_dtype = 'object' | ||
|
||
labels, _, _ = self.group_info | ||
|
||
|
@@ -2334,7 +2383,8 @@ def _cython_operation(self, kind, values, how, axis, min_count=-1): | |
|
||
# TODO: min_count | ||
result = self._transform( | ||
result, values, labels, func, is_numeric, is_datetimelike) | ||
result, values, labels, func, is_numeric, is_datetimelike, | ||
**kwargs) | ||
|
||
if is_integer_dtype(result) and not is_datetimelike: | ||
mask = result == iNaT | ||
|
@@ -2373,8 +2423,8 @@ def aggregate(self, values, how, axis=0, min_count=-1): | |
return self._cython_operation('aggregate', values, how, axis, | ||
min_count=min_count) | ||
|
||
def transform(self, values, how, axis=0): | ||
return self._cython_operation('transform', values, how, axis) | ||
def transform(self, values, how, axis=0, **kwargs): | ||
return self._cython_operation('transform', values, how, axis, **kwargs) | ||
|
||
def _aggregate(self, result, counts, values, comp_ids, agg_func, | ||
is_numeric, is_datetimelike, min_count=-1): | ||
|
@@ -2394,7 +2444,7 @@ def _aggregate(self, result, counts, values, comp_ids, agg_func, | |
return result | ||
|
||
def _transform(self, result, values, comp_ids, transform_func, | ||
is_numeric, is_datetimelike): | ||
is_numeric, is_datetimelike, **kwargs): | ||
|
||
comp_ids, _, ngroups = self.group_info | ||
if values.ndim > 3: | ||
|
@@ -2406,9 +2456,9 @@ def _transform(self, result, values, comp_ids, transform_func, | |
|
||
chunk = chunk.squeeze() | ||
transform_func(result[:, :, i], values, | ||
comp_ids, is_datetimelike) | ||
comp_ids, is_datetimelike, **kwargs) | ||
else: | ||
transform_func(result, values, comp_ids, is_datetimelike) | ||
transform_func(result, values, comp_ids, is_datetimelike, **kwargs) | ||
|
||
return result | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe import directly as the name of the enum?
can you also share this name with the non-grouping rank functions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried this back in d09268b but all of the Py27 builds failed as a result with the error below (py3 was fine). Tried digging up info but couldn't find anything - any chance you've seen this before?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thats annoying. havne't seen that