Skip to content

Commit 0dd083f

Browse files
authored
Pandas 1.5 support (#23973)
* [WIP] Pandas 1.5 support * Partial progress * Exclude pandas 1.5.0 and 1.5.1 because of pandas-dev/pandas#45725 * Plumb group_keys more places * Fix bad argument * Fix bad argument * Debug * Debug * Debug * Debug * allow list tests * Update changes * fmt * Lint * Lint * Fix pd_version * Fix pd_version * Remove CHANGES.md since this needs to go in 2.45 section
1 parent b9f2055 commit 0dd083f

File tree

5 files changed

+54
-20
lines changed

5 files changed

+54
-20
lines changed

sdks/python/apache_beam/dataframe/frames.py

Lines changed: 30 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -372,24 +372,24 @@ def last(self, offset):
372372
@frame_base.args_to_kwargs(pd.DataFrame)
373373
@frame_base.populate_defaults(pd.DataFrame)
374374
def groupby(self, by, level, axis, as_index, group_keys, **kwargs):
375-
"""``as_index`` and ``group_keys`` must both be ``True``.
375+
"""``as_index`` must be ``True``.
376376
377377
Aggregations grouping by a categorical column with ``observed=False`` set
378378
are not currently parallelizable
379379
(`Issue 21827 <https://github.com/apache/beam/issues/21827>`_).
380380
"""
381381
if not as_index:
382382
raise NotImplementedError('groupby(as_index=False)')
383-
if not group_keys:
384-
raise NotImplementedError('groupby(group_keys=False)')
385383

386384
if axis in (1, 'columns'):
387385
return _DeferredGroupByCols(
388386
expressions.ComputedExpression(
389387
'groupbycols',
390-
lambda df: df.groupby(by, axis=axis, **kwargs), [self._expr],
388+
lambda df: df.groupby(
389+
by, axis=axis, group_keys=group_keys, **kwargs), [self._expr],
391390
requires_partition_by=partitionings.Arbitrary(),
392-
preserves_partition_by=partitionings.Arbitrary()))
391+
preserves_partition_by=partitionings.Arbitrary()),
392+
group_keys=group_keys)
393393

394394
if level is None and by is None:
395395
raise TypeError("You have to supply one of 'by' and 'level'")
@@ -559,14 +559,17 @@ def prepend_index(df, by): # type: ignore
559559
expressions.ComputedExpression(
560560
'groupbyindex',
561561
lambda df: df.groupby(
562-
level=list(range(df.index.nlevels)), **kwargs), [to_group],
562+
level=list(range(df.index.nlevels)),
563+
group_keys=group_keys,
564+
**kwargs), [to_group],
563565
requires_partition_by=partitionings.Index(),
564566
preserves_partition_by=partitionings.Arbitrary()),
565567
kwargs,
566568
to_group,
567569
to_group_with_index,
568570
grouping_columns=grouping_columns,
569-
grouping_indexes=grouping_indexes)
571+
grouping_indexes=grouping_indexes,
572+
group_keys=group_keys)
570573

571574
@property # type: ignore
572575
@frame_base.with_docs_from(pd.DataFrame)
@@ -676,6 +679,7 @@ def replace(self, to_replace, value, limit, method, **kwargs):
676679
order-sensitive. It cannot be specified.
677680
678681
If ``limit`` is specified this operation is not parallelizable."""
682+
# pylint: disable-next=c-extension-no-member
679683
value_compare = None if PD_VERSION < (1, 4) else lib.no_default
680684
if method is not None and not isinstance(to_replace,
681685
dict) and value is value_compare:
@@ -4123,6 +4127,7 @@ def __init__(self, expr, kwargs,
41234127
ungrouped_with_index: expressions.Expression[pd.core.generic.NDFrame], # pylint: disable=line-too-long
41244128
grouping_columns,
41254129
grouping_indexes,
4130+
group_keys,
41264131
projection=None):
41274132
"""This object represents the result of::
41284133
@@ -4149,6 +4154,7 @@ def __init__(self, expr, kwargs,
41494154
self._projection = projection
41504155
self._grouping_columns = grouping_columns
41514156
self._grouping_indexes = grouping_indexes
4157+
self._group_keys = group_keys
41524158
self._kwargs = kwargs
41534159

41544160
if (self._kwargs.get('dropna', True) is False and
@@ -4170,6 +4176,7 @@ def __getattr__(self, name):
41704176
self._ungrouped_with_index,
41714177
self._grouping_columns,
41724178
self._grouping_indexes,
4179+
self._group_keys,
41734180
projection=name)
41744181

41754182
def __getitem__(self, name):
@@ -4184,6 +4191,7 @@ def __getitem__(self, name):
41844191
self._ungrouped_with_index,
41854192
self._grouping_columns,
41864193
self._grouping_indexes,
4194+
self._group_keys,
41874195
projection=name)
41884196

41894197
@frame_base.with_docs_from(DataFrameGroupBy)
@@ -4233,6 +4241,7 @@ def apply(self, func, *args, **kwargs):
42334241
project = _maybe_project_func(self._projection)
42344242
grouping_indexes = self._grouping_indexes
42354243
grouping_columns = self._grouping_columns
4244+
group_keys = self._group_keys
42364245

42374246
# Unfortunately pandas does not execute func to determine the right proxy.
42384247
# We run user func on a proxy here to detect the return type and generate
@@ -4321,7 +4330,8 @@ def do_partition_apply(df):
43214330
df = df.reset_index(grouping_columns, drop=True)
43224331

43234332
gb = df.groupby(level=grouping_indexes or None,
4324-
by=grouping_columns or None)
4333+
by=grouping_columns or None,
4334+
group_keys=group_keys)
43254335

43264336
gb = project(gb)
43274337

@@ -4361,6 +4371,7 @@ def fn_wrapper(x, *args, **kwargs):
43614371
fn_wrapper = fn
43624372

43634373
project = _maybe_project_func(self._projection)
4374+
group_keys = self._group_keys
43644375

43654376
# pandas cannot execute fn to determine the right proxy.
43664377
# We run user fn on a proxy here to detect the return type and generate the
@@ -4387,10 +4398,12 @@ def fn_wrapper(x, *args, **kwargs):
43874398
return DeferredDataFrame(
43884399
expressions.ComputedExpression(
43894400
'transform',
4390-
lambda df: project(df.groupby(level=levels)).transform(
4391-
fn_wrapper,
4392-
*args,
4393-
**kwargs).droplevel(self._grouping_columns),
4401+
lambda df: project(
4402+
df.groupby(level=levels, group_keys=group_keys)
4403+
).transform(
4404+
fn_wrapper,
4405+
*args,
4406+
**kwargs).droplevel(self._grouping_columns),
43944407
[self._ungrouped_with_index],
43954408
proxy=proxy,
43964409
requires_partition_by=partitionings.Index(levels),
@@ -4551,6 +4564,7 @@ def wrapper(self, *args, **kwargs):
45514564
is_categorical_grouping = any(to_group.get_level_values(i).is_categorical()
45524565
for i in self._grouping_indexes)
45534566
groupby_kwargs = self._kwargs
4567+
group_keys = self._group_keys
45544568

45554569
# Don't include un-observed categorical values in the preagg
45564570
preagg_groupby_kwargs = groupby_kwargs.copy()
@@ -4562,6 +4576,7 @@ def wrapper(self, *args, **kwargs):
45624576
lambda df: getattr(
45634577
project(
45644578
df.groupby(level=list(range(df.index.nlevels)),
4579+
group_keys=group_keys,
45654580
**preagg_groupby_kwargs)
45664581
),
45674582
agg_name)(**kwargs),
@@ -4574,6 +4589,7 @@ def wrapper(self, *args, **kwargs):
45744589
'post_combine_' + post_agg_name,
45754590
lambda df: getattr(
45764591
df.groupby(level=list(range(df.index.nlevels)),
4592+
group_keys=group_keys,
45774593
**groupby_kwargs),
45784594
post_agg_name)(**kwargs),
45794595
[pre_agg],
@@ -4597,6 +4613,7 @@ def wrapper(self, *args, **kwargs):
45974613
assert isinstance(self, DeferredGroupBy)
45984614

45994615
to_group = self._ungrouped.proxy().index
4616+
group_keys = self._group_keys
46004617
is_categorical_grouping = any(to_group.get_level_values(i).is_categorical()
46014618
for i in self._grouping_indexes)
46024619

@@ -4606,6 +4623,7 @@ def wrapper(self, *args, **kwargs):
46064623
agg_name,
46074624
lambda df: getattr(project(
46084625
df.groupby(level=list(range(df.index.nlevels)),
4626+
group_keys=group_keys,
46094627
**groupby_kwargs),
46104628
), agg_name)(**kwargs),
46114629
[self._ungrouped],

sdks/python/apache_beam/dataframe/pandas_doctests_test.py

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -127,10 +127,7 @@ def test_ndframe_tests(self):
127127
'pandas.core.generic.NDFrame.copy': ['*'],
128128
'pandas.core.generic.NDFrame.droplevel': ['*'],
129129
'pandas.core.generic.NDFrame.get': ['*'],
130-
'pandas.core.generic.NDFrame.rank': [
131-
# Modified dataframe
132-
'df'
133-
],
130+
'pandas.core.generic.NDFrame.rank': ['*'],
134131
'pandas.core.generic.NDFrame.rename': [
135132
# Seems to be an upstream bug. The actual error has a different
136133
# message:
@@ -704,14 +701,19 @@ def test_groupby_tests(self):
704701
result = doctests.testmod(
705702
pd.core.groupby.groupby,
706703
use_beam=False,
704+
verbose=True,
707705
wont_implement_ok={
706+
'pandas.core.groupby.groupby.GroupBy.first': ['*'],
708707
'pandas.core.groupby.groupby.GroupBy.head': ['*'],
708+
'pandas.core.groupby.groupby.GroupBy.last': ['*'],
709709
'pandas.core.groupby.groupby.GroupBy.tail': ['*'],
710710
'pandas.core.groupby.groupby.GroupBy.nth': ['*'],
711711
'pandas.core.groupby.groupby.GroupBy.cumcount': ['*'],
712712
'pandas.core.groupby.groupby.GroupBy.resample': ['*'],
713713
},
714714
not_implemented_ok={
715+
'pandas.core.groupby.groupby.GroupBy.first': ['*'],
716+
'pandas.core.groupby.groupby.GroupBy.last': ['*'],
715717
'pandas.core.groupby.groupby.GroupBy.ngroup': ['*'],
716718
'pandas.core.groupby.groupby.GroupBy.sample': ['*'],
717719
'pandas.core.groupby.groupby.GroupBy.rank': ['*'],
@@ -831,6 +833,7 @@ def test_top_level(self):
831833
'crosstab': ['*'],
832834
'cut': ['*'],
833835
'eval': ['*'],
836+
'from_dummies': ['*'],
834837
'get_dummies': ['*'],
835838
'infer_freq': ['*'],
836839
'lreshape': ['*'],
@@ -863,7 +866,10 @@ def test_top_level(self):
863866
},
864867
skip={
865868
# error formatting
866-
'concat': ['pd.concat([df5, df6], verify_integrity=True)'],
869+
'concat': [
870+
'pd.concat([df5, df6], verify_integrity=True)',
871+
'pd.concat([df7, new_row.to_frame().T], ignore_index=True)'
872+
],
867873
# doctest DeprecationWarning
868874
'melt': ['df'],
869875
# Order-sensitive re-indexing.

sdks/python/setup.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -349,8 +349,12 @@ def get_portability_package_data():
349349
# with python 3.10 leading to incorrect stacktrace.
350350
# This can be removed once dill is updated to version > 0.3.5.1
351351
# Issue: https://github.com/apache/beam/issues/23566
352-
'dataframe': ['pandas>=1.0,<1.5;python_version<"3.10"',
353-
'pandas>=1.4.3,<1.5;python_version>="3.10"'],
352+
# Exclude 1.5.0 and 1.5.1 because of
353+
# https://github.com/pandas-dev/pandas/issues/45725
354+
'dataframe': [
355+
'pandas>=1.0,<1.6,!=1.5.0,!=1.5.1;python_version<"3.10"',
356+
'pandas>=1.4.3,<1.6,!=1.5.0,!=1.5.1;python_version>="3.10"'
357+
],
354358
'dask': [
355359
'dask >= 2022.6',
356360
'distributed >= 2022.6',

sdks/python/test-suites/tox/py38/build.gradle

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,10 @@ toxTask "testPy38pandas-14", "py38-pandas-14"
9393
test.dependsOn "testPy38pandas-14"
9494
preCommitPy38.dependsOn "testPy38pandas-14"
9595

96+
toxTask "testPy38pandas-15", "py38-pandas-15"
97+
test.dependsOn "testPy38pandas-15"
98+
preCommitPy38.dependsOn "testPy38pandas-15"
99+
96100
// Create a test task for each minor version of pytorch
97101
toxTask "testPy38pytorch-19", "py38-pytorch-19"
98102
test.dependsOn "testPy38pytorch-19"

sdks/python/tox.ini

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -299,12 +299,14 @@ commands =
299299
# selecting tests with -m (BEAM-12985)
300300
pytest -o junit_suite_name={envname} --junitxml=pytest_{envname}.xml -n 6 -m uses_pyarrow {posargs}
301301

302-
[testenv:py{37,38,39,310}-pandas-{11,12,13,14}]
302+
[testenv:py{37,38,39,310}-pandas-{11,12,13,14,15}]
303303
deps =
304304
11: pandas>=1.1.0,<1.2.0
305305
12: pandas>=1.2.0,<1.3.0
306306
13: pandas>=1.3.0,<1.4.0
307307
14: pandas>=1.4.0,<1.5.0
308+
# Exclude 1.5.0 and 1.5.1 because of https://github.com/pandas-dev/pandas/issues/45725
309+
15: pandas>=1.5.2,<1.6.0
308310
commands =
309311
# Log pandas and numpy version for debugging
310312
/bin/sh -c "pip freeze | grep -E '(pandas|numpy)'"

0 commit comments

Comments
 (0)