Skip to content

Commit 2c8b877

Browse files
itholicragnarok56
authored andcommitted
[SPARK-43563][SPARK-43459][SPARK-43451][SPARK-43506] Remove squeeze from read_csv & enabling more tests
### What changes were proposed in this pull request? This PR proposes to remove `squeeze` parameter from `read_csv` to follow the behavior of latest pandas. See pandas-dev/pandas#40413 and pandas-dev/pandas#43427 for detail. This PR also enables more tests for pandas 2.0.0 and above. ### Why are the changes needed? To follow the behavior of latest pandas, and increase the test coverage. ### Does this PR introduce _any_ user-facing change? `squeeze` will be no longer available from `read_csv`. Otherwise, it's test-only. ### How was this patch tested? Enabling & updating the existing tests. Closes apache#42551 from itholic/pandas_remaining_tests. Authored-by: itholic <[email protected]> Signed-off-by: Ruifeng Zheng <[email protected]>
1 parent c1b01c2 commit 2c8b877

File tree

8 files changed

+153
-135
lines changed

8 files changed

+153
-135
lines changed

python/docs/source/migration_guide/pyspark_upgrade.rst

+1
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ Upgrading from PySpark 3.5 to 4.0
3838
* In Spark 4.0, ``sort_columns`` parameter from ``DataFrame.plot`` and `Series.plot`` has been removed from pandas API on Spark.
3939
* In Spark 4.0, the default value of ``regex`` parameter for ``Series.str.replace`` has been changed from ``True`` to ``False`` from pandas API on Spark. Additionally, a single character ``pat`` with ``regex=True`` is now treated as a regular expression instead of a string literal.
4040
* In Spark 4.0, the resulting name from ``value_counts`` for all objects sets to ``'count'`` (or ``'propotion'`` if ``nomalize=True`` was passed) from pandas API on Spark, and the index will be named after the original object.
41+
* In Spark 4.0, ``squeeze`` parameter from ``ps.read_csv`` and ``ps.read_excel`` has been removed from pandas API on Spark.
4142

4243

4344
Upgrading from PySpark 3.3 to 3.4

python/pyspark/pandas/namespace.py

+5-27
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,6 @@ def read_csv(
222222
names: Optional[Union[str, List[str]]] = None,
223223
index_col: Optional[Union[str, List[str]]] = None,
224224
usecols: Optional[Union[List[int], List[str], Callable[[str], bool]]] = None,
225-
squeeze: bool = False,
226225
mangle_dupe_cols: bool = True,
227226
dtype: Optional[Union[str, Dtype, Dict[str, Union[str, Dtype]]]] = None,
228227
nrows: Optional[int] = None,
@@ -262,11 +261,6 @@ def read_csv(
262261
from the document header row(s).
263262
If callable, the callable function will be evaluated against the column names,
264263
returning names where the callable function evaluates to `True`.
265-
squeeze : bool, default False
266-
If the parsed data only contains one column then return a Series.
267-
268-
.. deprecated:: 3.4.0
269-
270264
mangle_dupe_cols : bool, default True
271265
Duplicate columns will be specified as 'X0', 'X1', ... 'XN', rather
272266
than 'X' ... 'X'. Passing in False will cause data to be overwritten if
@@ -466,10 +460,7 @@ def read_csv(
466460
for col in psdf.columns:
467461
psdf[col] = psdf[col].astype(dtype)
468462

469-
if squeeze and len(psdf.columns) == 1:
470-
return first_series(psdf)
471-
else:
472-
return psdf
463+
return psdf
473464

474465

475466
def read_json(
@@ -912,7 +903,6 @@ def read_excel(
912903
names: Optional[List] = None,
913904
index_col: Optional[List[int]] = None,
914905
usecols: Optional[Union[int, str, List[Union[int, str]], Callable[[str], bool]]] = None,
915-
squeeze: bool = False,
916906
dtype: Optional[Dict[str, Union[str, Dtype]]] = None,
917907
engine: Optional[str] = None,
918908
converters: Optional[Dict] = None,
@@ -985,11 +975,6 @@ def read_excel(
985975
* If list of string, then indicates list of column names to be parsed.
986976
* If callable, then evaluate each column name against it and parse the
987977
column if the callable returns ``True``.
988-
squeeze : bool, default False
989-
If the parsed data only contains one column then return a Series.
990-
991-
.. deprecated:: 3.4.0
992-
993978
dtype : Type name or dict of column -> type, default None
994979
Data type for data or columns. E.g. {'a': np.float64, 'b': np.int32}
995980
Use `object` to preserve data as stored in Excel and not interpret dtype.
@@ -1142,7 +1127,7 @@ def read_excel(
11421127
"""
11431128

11441129
def pd_read_excel(
1145-
io_or_bin: Any, sn: Union[str, int, List[Union[str, int]], None], sq: bool
1130+
io_or_bin: Any, sn: Union[str, int, List[Union[str, int]], None]
11461131
) -> pd.DataFrame:
11471132
return pd.read_excel(
11481133
io=BytesIO(io_or_bin) if isinstance(io_or_bin, (bytes, bytearray)) else io_or_bin,
@@ -1151,7 +1136,6 @@ def pd_read_excel(
11511136
names=names,
11521137
index_col=index_col,
11531138
usecols=usecols,
1154-
squeeze=sq,
11551139
dtype=dtype,
11561140
engine=engine,
11571141
converters=converters,
@@ -1181,7 +1165,7 @@ def pd_read_excel(
11811165
io_or_bin = io
11821166
single_file = True
11831167

1184-
pdf_or_psers = pd_read_excel(io_or_bin, sn=sheet_name, sq=squeeze)
1168+
pdf_or_psers = pd_read_excel(io_or_bin, sn=sheet_name)
11851169

11861170
if single_file:
11871171
if isinstance(pdf_or_psers, dict):
@@ -1208,9 +1192,7 @@ def read_excel_on_spark(
12081192
)
12091193

12101194
def output_func(pdf: pd.DataFrame) -> pd.DataFrame:
1211-
pdf = pd.concat(
1212-
[pd_read_excel(bin, sn=sn, sq=False) for bin in pdf[pdf.columns[0]]]
1213-
)
1195+
pdf = pd.concat([pd_read_excel(bin, sn=sn) for bin in pdf[pdf.columns[0]]])
12141196

12151197
reset_index = pdf.reset_index()
12161198
for name, col in reset_index.items():
@@ -1231,11 +1213,7 @@ def output_func(pdf: pd.DataFrame) -> pd.DataFrame:
12311213
.mapInPandas(lambda iterator: map(output_func, iterator), schema=return_schema)
12321214
)
12331215

1234-
psdf = DataFrame(psdf._internal.with_new_sdf(sdf))
1235-
if squeeze and len(psdf.columns) == 1:
1236-
return first_series(psdf)
1237-
else:
1238-
return psdf
1216+
return DataFrame(psdf._internal.with_new_sdf(sdf))
12391217

12401218
if isinstance(pdf_or_psers, dict):
12411219
return {

python/pyspark/pandas/tests/test_csv.py

-18
Original file line numberDiff line numberDiff line change
@@ -255,24 +255,6 @@ def test_read_csv_with_sep(self):
255255
actual = ps.read_csv(fn, sep="\t")
256256
self.assert_eq(expected, actual, almost=True)
257257

258-
@unittest.skipIf(
259-
LooseVersion(pd.__version__) >= LooseVersion("2.0.0"),
260-
"TODO(SPARK-43563): Enable CsvTests.test_read_csv_with_squeeze for pandas 2.0.0.",
261-
)
262-
def test_read_csv_with_squeeze(self):
263-
with self.csv_file(self.csv_text) as fn:
264-
expected = pd.read_csv(fn, squeeze=True, usecols=["name"])
265-
actual = ps.read_csv(fn, squeeze=True, usecols=["name"])
266-
self.assert_eq(expected, actual, almost=True)
267-
268-
expected = pd.read_csv(fn, squeeze=True, usecols=["name", "amount"])
269-
actual = ps.read_csv(fn, squeeze=True, usecols=["name", "amount"])
270-
self.assert_eq(expected, actual, almost=True)
271-
272-
expected = pd.read_csv(fn, squeeze=True, usecols=["name", "amount"], index_col=["name"])
273-
actual = ps.read_csv(fn, squeeze=True, usecols=["name", "amount"], index_col=["name"])
274-
self.assert_eq(expected, actual, almost=True)
275-
276258
def test_read_csv_with_mangle_dupe_cols(self):
277259
self.assertRaisesRegex(
278260
ValueError, "mangle_dupe_cols", lambda: ps.read_csv("path", mangle_dupe_cols=False)

python/pyspark/pandas/tests/test_ops_on_diff_frames_groupby.py

+3-13
Original file line numberDiff line numberDiff line change
@@ -37,11 +37,6 @@ def tearDownClass(cls):
3737
reset_option("compute.ops_on_diff_frames")
3838
super().tearDownClass()
3939

40-
@unittest.skipIf(
41-
LooseVersion(pd.__version__) >= LooseVersion("2.0.0"),
42-
"TODO(SPARK-43460): Enable OpsOnDiffFramesGroupByTests.test_groupby_different_lengths "
43-
"for pandas 2.0.0.",
44-
)
4540
def test_groupby_different_lengths(self):
4641
pdfs1 = [
4742
pd.DataFrame({"c": [4, 2, 7, 3, None, 1, 1, 1, 2], "d": list("abcdefght")}),
@@ -71,7 +66,7 @@ def sort(df):
7166

7267
self.assert_eq(
7368
sort(psdf1.groupby(psdf2.a, as_index=as_index).sum()),
74-
sort(pdf1.groupby(pdf2.a, as_index=as_index).sum()),
69+
sort(pdf1.groupby(pdf2.a, as_index=as_index).sum(numeric_only=True)),
7570
almost=as_index,
7671
)
7772

@@ -86,11 +81,6 @@ def sort(df):
8681
almost=as_index,
8782
)
8883

89-
@unittest.skipIf(
90-
LooseVersion(pd.__version__) >= LooseVersion("2.0.0"),
91-
"TODO(SPARK-43459): Enable OpsOnDiffFramesGroupByTests.test_groupby_multiindex_columns "
92-
"for pandas 2.0.0.",
93-
)
9484
def test_groupby_multiindex_columns(self):
9585
pdf1 = pd.DataFrame(
9686
{("y", "c"): [4, 2, 7, 3, None, 1, 1, 1, 2], ("z", "d"): list("abcdefght")}
@@ -103,7 +93,7 @@ def test_groupby_multiindex_columns(self):
10393

10494
self.assert_eq(
10595
psdf1.groupby(psdf2[("x", "a")]).sum().sort_index(),
106-
pdf1.groupby(pdf2[("x", "a")]).sum().sort_index(),
96+
pdf1.groupby(pdf2[("x", "a")]).sum(numeric_only=True).sort_index(),
10797
)
10898

10999
self.assert_eq(
@@ -112,7 +102,7 @@ def test_groupby_multiindex_columns(self):
112102
.sort_values(("y", "c"))
113103
.reset_index(drop=True),
114104
pdf1.groupby(pdf2[("x", "a")], as_index=False)
115-
.sum()
105+
.sum(numeric_only=True)
116106
.sort_values(("y", "c"))
117107
.reset_index(drop=True),
118108
)

python/pyspark/pandas/tests/test_ops_on_diff_frames_groupby_rolling.py

+28-5
Original file line numberDiff line numberDiff line change
@@ -72,12 +72,35 @@ def _test_groupby_rolling_func(self, f):
7272
getattr(pdf.groupby(pkey)[["b"]].rolling(2), f)().sort_index(),
7373
)
7474

75-
@unittest.skipIf(
76-
LooseVersion(pd.__version__) >= LooseVersion("2.0.0"),
77-
"TODO(SPARK-43452): Enable RollingTests.test_groupby_rolling_count for pandas 2.0.0.",
78-
)
7975
def test_groupby_rolling_count(self):
80-
self._test_groupby_rolling_func("count")
76+
pser = pd.Series([1, 2, 3], name="a")
77+
pkey = pd.Series([1, 2, 3], name="a")
78+
psser = ps.from_pandas(pser)
79+
kkey = ps.from_pandas(pkey)
80+
81+
# TODO(SPARK-43432): Fix `min_periods` for Rolling.count() to work same as pandas
82+
self.assert_eq(
83+
psser.groupby(kkey).rolling(2).count().sort_index(),
84+
pser.groupby(pkey).rolling(2, min_periods=1).count().sort_index(),
85+
)
86+
87+
pdf = pd.DataFrame({"a": [1, 2, 3, 2], "b": [4.0, 2.0, 3.0, 1.0]})
88+
pkey = pd.Series([1, 2, 3, 2], name="a")
89+
psdf = ps.from_pandas(pdf)
90+
kkey = ps.from_pandas(pkey)
91+
92+
self.assert_eq(
93+
psdf.groupby(kkey).rolling(2).count().sort_index(),
94+
pdf.groupby(pkey).rolling(2, min_periods=1).count().sort_index(),
95+
)
96+
self.assert_eq(
97+
psdf.groupby(kkey)["b"].rolling(2).count().sort_index(),
98+
pdf.groupby(pkey)["b"].rolling(2, min_periods=1).count().sort_index(),
99+
)
100+
self.assert_eq(
101+
psdf.groupby(kkey)[["b"]].rolling(2).count().sort_index(),
102+
pdf.groupby(pkey)[["b"]].rolling(2, min_periods=1).count().sort_index(),
103+
)
81104

82105
def test_groupby_rolling_min(self):
83106
self._test_groupby_rolling_func("min")

0 commit comments

Comments
 (0)