forked from pandas-dev/pandas
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathgroupby.py
82 lines (70 loc) · 3.13 KB
/
groupby.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
import pytest
import pandas.util.testing as tm
import pandas as pd
from .base import BaseExtensionTests
class BaseGroupbyTests(BaseExtensionTests):
"""Groupby-specific tests."""
def test_grouping_grouper(self, data_for_grouping):
df = pd.DataFrame({
"A": ["B", "B", None, None, "A", "A", "B", "C"],
"B": data_for_grouping
})
gr1 = df.groupby("A").grouper.groupings[0]
gr2 = df.groupby("B").grouper.groupings[0]
tm.assert_numpy_array_equal(gr1.grouper, df.A.values)
tm.assert_extension_array_equal(gr2.grouper, data_for_grouping)
@pytest.mark.parametrize('as_index', [True, False])
def test_groupby_extension_agg(self, as_index, data_for_grouping):
df = pd.DataFrame({"A": [1, 1, 2, 2, 3, 3, 1, 4],
"B": data_for_grouping})
result = df.groupby("B", as_index=as_index).A.mean()
_, index = pd.factorize(data_for_grouping, sort=True)
# TODO(ExtensionIndex): remove astype
index = pd.Index(index.astype(object), name="B")
if data_for_grouping._can_hold_na:
expected = pd.Series([3, 1, 4], index=index, name="A")
else:
expected = pd.Series([2, 3, 1, 4], index=index, name="A")
if as_index:
self.assert_series_equal(result, expected)
else:
expected = expected.reset_index()
self.assert_frame_equal(result, expected)
def test_groupby_extension_no_sort(self, data_for_grouping):
df = pd.DataFrame({"A": [1, 1, 2, 2, 3, 3, 1, 4],
"B": data_for_grouping})
result = df.groupby("B", sort=False).A.mean()
_, index = pd.factorize(data_for_grouping, sort=False)
# TODO(ExtensionIndex): remove astype
index = pd.Index(index.astype(object), name="B")
if data_for_grouping._can_hold_na:
expected = pd.Series([1, 3, 4], index=index, name="A")
else:
expected = pd.Series([1, 2, 3, 4], index=index, name="A")
self.assert_series_equal(result, expected)
def test_groupby_extension_transform(self, data_for_grouping):
valid = data_for_grouping[~data_for_grouping.isna()]
if data_for_grouping._can_hold_na:
dfval = [1, 1, 3, 3, 1, 4]
exres = [3, 3, 2, 2, 3, 1]
else:
dfval = [1, 1, 2, 2, 3, 3, 1, 4]
exres = [3, 3, 2, 2, 2, 2, 3, 1]
df = pd.DataFrame({"A": dfval,
"B": valid})
result = df.groupby("B").A.transform(len)
expected = pd.Series(exres, name="A")
self.assert_series_equal(result, expected)
@pytest.mark.parametrize('op', [
lambda x: 1,
lambda x: [1] * len(x),
lambda x: pd.Series([1] * len(x)),
lambda x: x,
], ids=['scalar', 'list', 'series', 'object'])
def test_groupby_extension_apply(self, data_for_grouping, op):
df = pd.DataFrame({"A": [1, 1, 2, 2, 3, 3, 1, 4],
"B": data_for_grouping})
df.groupby("B").apply(op)
df.groupby("B").A.apply(op)
df.groupby("A").apply(op)
df.groupby("A").B.apply(op)