Skip to content

Commit 0359f17

Browse files
authored
ENH: use native filesystem (if available) for read_parquet with pyarrow engine (pandas-dev#51609)
* ENH: Add filesystem to read_parquet/to_parquet * Add to to_parquet * Bump fsspec * fix import * Mock gcs to local for parquet * Fix condidition, add whatsnew * address tests, bump gcsfs * bump s3fs * Fix doc issues * Try without fsspec wrapper * Revert "Try without fsspec wrapper" This reverts commit 7ec7d75. * Returns a tuple * Don't wrap in fsspec, undo deps bump * Fix whatsnew * Add validations for filesystem * Validate that mock filesystem is used * Undo install.rst * Try this * Make global again? * Try this * Address review * Fix test * Use localfilesystem correctly * use absolute
1 parent fb754d7 commit 0359f17

File tree

4 files changed

+168
-16
lines changed

4 files changed

+168
-16
lines changed

doc/source/whatsnew/v2.1.0.rst

+2-1
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,8 @@ Performance improvements
113113
- Performance improvement in :meth:`DataFrame.clip` and :meth:`Series.clip` (:issue:`51472`)
114114
- Performance improvement in :meth:`DataFrame.first_valid_index` and :meth:`DataFrame.last_valid_index` for extension array dtypes (:issue:`51549`)
115115
- Performance improvement in :meth:`DataFrame.where` when ``cond`` is backed by an extension dtype (:issue:`51574`)
116-
- Performance improvement in :meth:`read_orc` when reading a remote URI file path. (:issue:`51609`)
116+
- Performance improvement in :func:`read_orc` when reading a remote URI file path. (:issue:`51609`)
117+
- Performance improvement in :func:`read_parquet` and :meth:`DataFrame.to_parquet` when reading a remote file with ``engine="pyarrow"`` (:issue:`51609`)
117118
- Performance improvement in :meth:`MultiIndex.sortlevel` when ``ascending`` is a list (:issue:`51612`)
118119
- Performance improvement in :meth:`~arrays.ArrowExtensionArray.isna` when array has zero nulls or is all nulls (:issue:`51630`)
119120
- Performance improvement when parsing strings to ``boolean[pyarrow]`` dtype (:issue:`51730`)

pandas/io/parquet.py

+88-11
Original file line numberDiff line numberDiff line change
@@ -90,12 +90,39 @@ def _get_path_or_handle(
9090
]:
9191
"""File handling for PyArrow."""
9292
path_or_handle = stringify_path(path)
93+
if fs is not None:
94+
pa_fs = import_optional_dependency("pyarrow.fs", errors="ignore")
95+
fsspec = import_optional_dependency("fsspec", errors="ignore")
96+
if pa_fs is None and fsspec is None:
97+
raise ValueError(
98+
f"filesystem must be a pyarrow or fsspec FileSystem, "
99+
f"not a {type(fs).__name__}"
100+
)
101+
elif (pa_fs is not None and not isinstance(fs, pa_fs.FileSystem)) and (
102+
fsspec is not None and not isinstance(fs, fsspec.spec.AbstractFileSystem)
103+
):
104+
raise ValueError(
105+
f"filesystem must be a pyarrow or fsspec FileSystem, "
106+
f"not a {type(fs).__name__}"
107+
)
108+
elif pa_fs is not None and isinstance(fs, pa_fs.FileSystem) and storage_options:
109+
raise NotImplementedError(
110+
"storage_options not supported with a pyarrow FileSystem."
111+
)
93112
if is_fsspec_url(path_or_handle) and fs is None:
94-
fsspec = import_optional_dependency("fsspec")
113+
if storage_options is None:
114+
pa = import_optional_dependency("pyarrow")
115+
pa_fs = import_optional_dependency("pyarrow.fs")
95116

96-
fs, path_or_handle = fsspec.core.url_to_fs(
97-
path_or_handle, **(storage_options or {})
98-
)
117+
try:
118+
fs, path_or_handle = pa_fs.FileSystem.from_uri(path)
119+
except (TypeError, pa.ArrowInvalid):
120+
pass
121+
if fs is None:
122+
fsspec = import_optional_dependency("fsspec")
123+
fs, path_or_handle = fsspec.core.url_to_fs(
124+
path_or_handle, **(storage_options or {})
125+
)
99126
elif storage_options and (not is_url(path_or_handle) or mode != "rb"):
100127
# can't write to a remote url
101128
# without making use of fsspec at the moment
@@ -173,6 +200,7 @@ def write(
173200
index: bool | None = None,
174201
storage_options: StorageOptions = None,
175202
partition_cols: list[str] | None = None,
203+
filesystem=None,
176204
**kwargs,
177205
) -> None:
178206
self.validate_dataframe(df)
@@ -183,9 +211,9 @@ def write(
183211

184212
table = self.api.Table.from_pandas(df, **from_pandas_kwargs)
185213

186-
path_or_handle, handles, kwargs["filesystem"] = _get_path_or_handle(
214+
path_or_handle, handles, filesystem = _get_path_or_handle(
187215
path,
188-
kwargs.pop("filesystem", None),
216+
filesystem,
189217
storage_options=storage_options,
190218
mode="wb",
191219
is_dir=partition_cols is not None,
@@ -207,12 +235,17 @@ def write(
207235
path_or_handle,
208236
compression=compression,
209237
partition_cols=partition_cols,
238+
filesystem=filesystem,
210239
**kwargs,
211240
)
212241
else:
213242
# write to single output file
214243
self.api.parquet.write_table(
215-
table, path_or_handle, compression=compression, **kwargs
244+
table,
245+
path_or_handle,
246+
compression=compression,
247+
filesystem=filesystem,
248+
**kwargs,
216249
)
217250
finally:
218251
if handles is not None:
@@ -225,6 +258,7 @@ def read(
225258
use_nullable_dtypes: bool = False,
226259
dtype_backend: DtypeBackend | lib.NoDefault = lib.no_default,
227260
storage_options: StorageOptions = None,
261+
filesystem=None,
228262
**kwargs,
229263
) -> DataFrame:
230264
kwargs["use_pandas_metadata"] = True
@@ -242,15 +276,15 @@ def read(
242276
if manager == "array":
243277
to_pandas_kwargs["split_blocks"] = True # type: ignore[assignment]
244278

245-
path_or_handle, handles, kwargs["filesystem"] = _get_path_or_handle(
279+
path_or_handle, handles, filesystem = _get_path_or_handle(
246280
path,
247-
kwargs.pop("filesystem", None),
281+
filesystem,
248282
storage_options=storage_options,
249283
mode="rb",
250284
)
251285
try:
252286
pa_table = self.api.parquet.read_table(
253-
path_or_handle, columns=columns, **kwargs
287+
path_or_handle, columns=columns, filesystem=filesystem, **kwargs
254288
)
255289
result = pa_table.to_pandas(**to_pandas_kwargs)
256290

@@ -279,6 +313,7 @@ def write(
279313
index=None,
280314
partition_cols=None,
281315
storage_options: StorageOptions = None,
316+
filesystem=None,
282317
**kwargs,
283318
) -> None:
284319
self.validate_dataframe(df)
@@ -294,6 +329,11 @@ def write(
294329
if partition_cols is not None:
295330
kwargs["file_scheme"] = "hive"
296331

332+
if filesystem is not None:
333+
raise NotImplementedError(
334+
"filesystem is not implemented for the fastparquet engine."
335+
)
336+
297337
# cannot use get_handle as write() does not accept file buffers
298338
path = stringify_path(path)
299339
if is_fsspec_url(path):
@@ -319,7 +359,12 @@ def write(
319359
)
320360

321361
def read(
322-
self, path, columns=None, storage_options: StorageOptions = None, **kwargs
362+
self,
363+
path,
364+
columns=None,
365+
storage_options: StorageOptions = None,
366+
filesystem=None,
367+
**kwargs,
323368
) -> DataFrame:
324369
parquet_kwargs: dict[str, Any] = {}
325370
use_nullable_dtypes = kwargs.pop("use_nullable_dtypes", False)
@@ -337,6 +382,10 @@ def read(
337382
"The 'dtype_backend' argument is not supported for the "
338383
"fastparquet engine"
339384
)
385+
if filesystem is not None:
386+
raise NotImplementedError(
387+
"filesystem is not implemented for the fastparquet engine."
388+
)
340389
path = stringify_path(path)
341390
handles = None
342391
if is_fsspec_url(path):
@@ -376,6 +425,7 @@ def to_parquet(
376425
index: bool | None = None,
377426
storage_options: StorageOptions = None,
378427
partition_cols: list[str] | None = None,
428+
filesystem: Any = None,
379429
**kwargs,
380430
) -> bytes | None:
381431
"""
@@ -398,6 +448,12 @@ def to_parquet(
398448
``io.parquet.engine`` is used. The default ``io.parquet.engine``
399449
behavior is to try 'pyarrow', falling back to 'fastparquet' if
400450
'pyarrow' is unavailable.
451+
452+
When using the ``'pyarrow'`` engine and no storage options are provided
453+
and a filesystem is implemented by both ``pyarrow.fs`` and ``fsspec``
454+
(e.g. "s3://"), then the ``pyarrow.fs`` filesystem is attempted first.
455+
Use the filesystem keyword with an instantiated fsspec filesystem
456+
if you wish to use its implementation.
401457
compression : {{'snappy', 'gzip', 'brotli', 'lz4', 'zstd', None}},
402458
default 'snappy'. Name of the compression to use. Use ``None``
403459
for no compression. The supported compression methods actually
@@ -420,6 +476,12 @@ def to_parquet(
420476
421477
.. versionadded:: 1.2.0
422478
479+
filesystem : fsspec or pyarrow filesystem, default None
480+
Filesystem object to use when reading the parquet file. Only implemented
481+
for ``engine="pyarrow"``.
482+
483+
.. versionadded:: 2.1.0
484+
423485
kwargs
424486
Additional keyword arguments passed to the engine
425487
@@ -440,6 +502,7 @@ def to_parquet(
440502
index=index,
441503
partition_cols=partition_cols,
442504
storage_options=storage_options,
505+
filesystem=filesystem,
443506
**kwargs,
444507
)
445508

@@ -458,6 +521,7 @@ def read_parquet(
458521
storage_options: StorageOptions = None,
459522
use_nullable_dtypes: bool | lib.NoDefault = lib.no_default,
460523
dtype_backend: DtypeBackend | lib.NoDefault = lib.no_default,
524+
filesystem: Any = None,
461525
**kwargs,
462526
) -> DataFrame:
463527
"""
@@ -480,6 +544,12 @@ def read_parquet(
480544
``io.parquet.engine`` is used. The default ``io.parquet.engine``
481545
behavior is to try 'pyarrow', falling back to 'fastparquet' if
482546
'pyarrow' is unavailable.
547+
548+
When using the ``'pyarrow'`` engine and no storage options are provided
549+
and a filesystem is implemented by both ``pyarrow.fs`` and ``fsspec``
550+
(e.g. "s3://"), then the ``pyarrow.fs`` filesystem is attempted first.
551+
Use the filesystem keyword with an instantiated fsspec filesystem
552+
if you wish to use its implementation.
483553
columns : list, default=None
484554
If not None, only these columns will be read from the file.
485555
@@ -508,6 +578,12 @@ def read_parquet(
508578
509579
.. versionadded:: 2.0
510580
581+
filesystem : fsspec or pyarrow filesystem, default None
582+
Filesystem object to use when reading the parquet file. Only implemented
583+
for ``engine="pyarrow"``.
584+
585+
.. versionadded:: 2.1.0
586+
511587
**kwargs
512588
Any additional kwargs are passed to the engine.
513589
@@ -537,5 +613,6 @@ def read_parquet(
537613
storage_options=storage_options,
538614
use_nullable_dtypes=use_nullable_dtypes,
539615
dtype_backend=dtype_backend,
616+
filesystem=filesystem,
540617
**kwargs,
541618
)

pandas/tests/io/test_gcs.py

+18-4
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from io import BytesIO
22
import os
3+
import pathlib
34
import tarfile
45
import zipfile
56

@@ -20,7 +21,7 @@
2021

2122

2223
@pytest.fixture
23-
def gcs_buffer(monkeypatch):
24+
def gcs_buffer():
2425
"""Emulate GCS using a binary buffer."""
2526
import fsspec
2627

@@ -45,7 +46,7 @@ def ls(self, path, **kwargs):
4546

4647
@td.skip_if_no("gcsfs")
4748
@pytest.mark.parametrize("format", ["csv", "json", "parquet", "excel", "markdown"])
48-
def test_to_read_gcs(gcs_buffer, format):
49+
def test_to_read_gcs(gcs_buffer, format, monkeypatch, capsys):
4950
"""
5051
Test that many to/read functions support GCS.
5152
@@ -75,8 +76,21 @@ def test_to_read_gcs(gcs_buffer, format):
7576
df2 = read_json(path, convert_dates=["dt"])
7677
elif format == "parquet":
7778
pytest.importorskip("pyarrow")
78-
df1.to_parquet(path)
79-
df2 = read_parquet(path)
79+
pa_fs = pytest.importorskip("pyarrow.fs")
80+
81+
class MockFileSystem(pa_fs.FileSystem):
82+
@staticmethod
83+
def from_uri(path):
84+
print("Using pyarrow filesystem")
85+
to_local = pathlib.Path(path.replace("gs://", "")).absolute().as_uri()
86+
return pa_fs.LocalFileSystem(to_local)
87+
88+
with monkeypatch.context() as m:
89+
m.setattr(pa_fs, "FileSystem", MockFileSystem)
90+
df1.to_parquet(path)
91+
df2 = read_parquet(path)
92+
captured = capsys.readouterr()
93+
assert captured.out == "Using pyarrow filesystem\nUsing pyarrow filesystem\n"
8094
elif format == "markdown":
8195
pytest.importorskip("tabulate")
8296
df1.to_markdown(path)

pandas/tests/io/test_parquet.py

+60
Original file line numberDiff line numberDiff line change
@@ -1211,6 +1211,66 @@ def test_bytes_file_name(self, engine):
12111211
result = read_parquet(path, engine=engine)
12121212
tm.assert_frame_equal(result, df)
12131213

1214+
def test_filesystem_notimplemented(self):
1215+
pytest.importorskip("fastparquet")
1216+
df = pd.DataFrame(data={"A": [0, 1], "B": [1, 0]})
1217+
with tm.ensure_clean() as path:
1218+
with pytest.raises(
1219+
NotImplementedError, match="filesystem is not implemented"
1220+
):
1221+
df.to_parquet(path, engine="fastparquet", filesystem="foo")
1222+
1223+
with tm.ensure_clean() as path:
1224+
pathlib.Path(path).write_bytes(b"foo")
1225+
with pytest.raises(
1226+
NotImplementedError, match="filesystem is not implemented"
1227+
):
1228+
read_parquet(path, engine="fastparquet", filesystem="foo")
1229+
1230+
def test_invalid_filesystem(self):
1231+
pytest.importorskip("pyarrow")
1232+
df = pd.DataFrame(data={"A": [0, 1], "B": [1, 0]})
1233+
with tm.ensure_clean() as path:
1234+
with pytest.raises(
1235+
ValueError, match="filesystem must be a pyarrow or fsspec FileSystem"
1236+
):
1237+
df.to_parquet(path, engine="pyarrow", filesystem="foo")
1238+
1239+
with tm.ensure_clean() as path:
1240+
pathlib.Path(path).write_bytes(b"foo")
1241+
with pytest.raises(
1242+
ValueError, match="filesystem must be a pyarrow or fsspec FileSystem"
1243+
):
1244+
read_parquet(path, engine="pyarrow", filesystem="foo")
1245+
1246+
def test_unsupported_pa_filesystem_storage_options(self):
1247+
pa_fs = pytest.importorskip("pyarrow.fs")
1248+
df = pd.DataFrame(data={"A": [0, 1], "B": [1, 0]})
1249+
with tm.ensure_clean() as path:
1250+
with pytest.raises(
1251+
NotImplementedError,
1252+
match="storage_options not supported with a pyarrow FileSystem.",
1253+
):
1254+
df.to_parquet(
1255+
path,
1256+
engine="pyarrow",
1257+
filesystem=pa_fs.LocalFileSystem(),
1258+
storage_options={"foo": "bar"},
1259+
)
1260+
1261+
with tm.ensure_clean() as path:
1262+
pathlib.Path(path).write_bytes(b"foo")
1263+
with pytest.raises(
1264+
NotImplementedError,
1265+
match="storage_options not supported with a pyarrow FileSystem.",
1266+
):
1267+
read_parquet(
1268+
path,
1269+
engine="pyarrow",
1270+
filesystem=pa_fs.LocalFileSystem(),
1271+
storage_options={"foo": "bar"},
1272+
)
1273+
12141274
def test_invalid_dtype_backend(self, engine):
12151275
msg = (
12161276
"dtype_backend numpy is invalid, only 'numpy_nullable' and "

0 commit comments

Comments
 (0)