From 16b8a058bc1d820705cc950025942c3532a2ac87 Mon Sep 17 00:00:00 2001 From: Joris Van den Bossche Date: Mon, 8 Mar 2021 11:28:30 +0100 Subject: [PATCH 1/4] [ArrayManager] Enable read_parquet to not create 2D blocks when using ArrayManager --- pandas/core/frame.py | 7 +++++-- pandas/core/internals/construction.py | 10 +++++++--- pandas/io/parquet.py | 8 +++++++- 3 files changed, 19 insertions(+), 6 deletions(-) diff --git a/pandas/core/frame.py b/pandas/core/frame.py index 38cd730efabd1..8e2a5a1d89a5a 100644 --- a/pandas/core/frame.py +++ b/pandas/core/frame.py @@ -668,13 +668,16 @@ def __init__( NDFrame.__init__(self, mgr) - def _as_manager(self, typ: str) -> DataFrame: + def _as_manager(self, typ: str, copy: bool = True) -> DataFrame: """ Private helper function to create a DataFrame with specific manager. Parameters ---------- typ : {"block", "array"} + copy : bool, default True + Only controls whether the conversion from Block->ArrayManager + copies the 1D arrays (to ensure proper/contiguous memory layout). Returns ------- @@ -683,7 +686,7 @@ def _as_manager(self, typ: str) -> DataFrame: to be a copy or not. """ new_mgr: Manager - new_mgr = mgr_to_mgr(self._mgr, typ=typ) + new_mgr = mgr_to_mgr(self._mgr, typ=typ, copy=copy) # fastpath of passing a manager doesn't check the option/manager class return DataFrame(new_mgr) diff --git a/pandas/core/internals/construction.py b/pandas/core/internals/construction.py index c314673f609f3..837f217b068a4 100644 --- a/pandas/core/internals/construction.py +++ b/pandas/core/internals/construction.py @@ -191,10 +191,11 @@ def fill_masked_arrays(data: MaskedRecords, arr_columns: Index) -> List[np.ndarr return new_arrays -def mgr_to_mgr(mgr, typ: str): +def mgr_to_mgr(mgr, typ: str, copy: bool = True): """ Convert to specific type of Manager. Does not copy if the type is already - correct. Does not guarantee a copy otherwise. + correct. Does not guarantee a copy otherwise. `copy` keyword only controls + whether conversion from Block->ArrayManager copies the 1D arrays. """ new_mgr: Manager @@ -209,7 +210,10 @@ def mgr_to_mgr(mgr, typ: str): if isinstance(mgr, ArrayManager): new_mgr = mgr else: - arrays = [mgr.iget_values(i).copy() for i in range(len(mgr.axes[0]))] + if copy: + arrays = [mgr.iget_values(i).copy() for i in range(len(mgr.axes[0]))] + else: + arrays = [mgr.iget_values(i) for i in range(len(mgr.axes[0]))] new_mgr = ArrayManager(arrays, [mgr.axes[1], mgr.axes[0]]) else: raise ValueError(f"'typ' needs to be one of {{'block', 'array'}}, got '{typ}'") diff --git a/pandas/io/parquet.py b/pandas/io/parquet.py index 183d753ddd60b..d2701207d4b42 100644 --- a/pandas/io/parquet.py +++ b/pandas/io/parquet.py @@ -235,6 +235,9 @@ def read( "'use_nullable_dtypes=True' is only supported for pyarrow >= 0.16 " f"({self.api.__version__} is installed" ) + manager = get_option("mode.data_manager") + if manager == "array": + to_pandas_kwargs["split_blocks"] = True path_or_handle, handles, kwargs["filesystem"] = _get_path_or_handle( path, @@ -243,9 +246,12 @@ def read( mode="rb", ) try: - return self.api.parquet.read_table( + result = self.api.parquet.read_table( path_or_handle, columns=columns, **kwargs ).to_pandas(**to_pandas_kwargs) + if manager == "array": + result = result._as_manager("array", copy=False) + return result finally: if handles is not None: handles.close() From 0fe4b1eccb7874591453083f9bbca109b906a613 Mon Sep 17 00:00:00 2001 From: Joris Van den Bossche Date: Thu, 1 Apr 2021 10:09:40 +0200 Subject: [PATCH 2/4] add test --- pandas/tests/io/test_parquet.py | 29 +++++++++++++++++++++++------ 1 file changed, 23 insertions(+), 6 deletions(-) diff --git a/pandas/tests/io/test_parquet.py b/pandas/tests/io/test_parquet.py index e74c915bbaf74..dffafbc73ec44 100644 --- a/pandas/tests/io/test_parquet.py +++ b/pandas/tests/io/test_parquet.py @@ -9,6 +9,8 @@ import numpy as np import pytest +from pandas._config import get_option + from pandas.compat import ( PY38, is_platform_windows, @@ -41,12 +43,12 @@ _HAVE_FASTPARQUET = False -pytestmark = [ - pytest.mark.filterwarnings("ignore:RangeIndex.* is deprecated:DeprecationWarning"), - # TODO(ArrayManager) fastparquet / pyarrow rely on BlockManager internals - td.skip_array_manager_not_yet_implemented, -] +pytestmark = pytest.mark.filterwarnings( + "ignore:RangeIndex.* is deprecated:DeprecationWarning" +) + +# TODO(ArrayManager) fastparquet relies on BlockManager internals # setup engines & skips @pytest.fixture( @@ -54,7 +56,8 @@ pytest.param( "fastparquet", marks=pytest.mark.skipif( - not _HAVE_FASTPARQUET, reason="fastparquet is not installed" + not _HAVE_FASTPARQUET or get_option("mode.data_manager") == "array", + reason="fastparquet is not installed or ArrayManager is used", ), ), pytest.param( @@ -80,6 +83,8 @@ def pa(): def fp(): if not _HAVE_FASTPARQUET: pytest.skip("fastparquet is not installed") + elif get_option("mode.data_manager") == "array": + pytest.skip("ArrayManager is not supported with fastparquet") return "fastparquet" @@ -915,6 +920,18 @@ def test_filter_row_groups(self, pa): ) assert len(result) == 1 + def test_read_parquet_manager(self, pa, using_array_manager): + # ensure that read_parquet honors the pandas.options.mode.data_manager option + df = pd.DataFrame(np.random.randn(10, 3), columns=["A", "B", "C"]) + + with tm.ensure_clean() as path: + df.to_parquet(path, pa) + result = read_parquet(path, pa) + if using_array_manager: + assert isinstance(result._mgr, pd.core.internals.ArrayManager) + else: + assert isinstance(result._mgr, pd.core.internals.BlockManager) + class TestParquetFastParquet(Base): def test_basic(self, fp, df_full): From 6f085f2ce5cff5e0fe2ed765b3729ba81de1c07d Mon Sep 17 00:00:00 2001 From: Joris Van den Bossche Date: Tue, 20 Apr 2021 10:44:11 +0200 Subject: [PATCH 3/4] bool -> bool_t --- pandas/core/generic.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pandas/core/generic.py b/pandas/core/generic.py index 1cb08dd12c4b2..94ad83b2b63b3 100644 --- a/pandas/core/generic.py +++ b/pandas/core/generic.py @@ -286,7 +286,9 @@ def _from_mgr(cls, mgr: Manager): object.__setattr__(obj, "_attrs", {}) return obj - def _as_manager(self: FrameOrSeries, typ: str, copy: bool = True) -> FrameOrSeries: + def _as_manager( + self: FrameOrSeries, typ: str, copy: bool_t = True + ) -> FrameOrSeries: """ Private helper function to create a DataFrame with specific manager. From ecf1163f7b4a9b8bb94ca732ef5249fa1d178261 Mon Sep 17 00:00:00 2001 From: Joris Van den Bossche Date: Tue, 20 Apr 2021 14:26:47 +0200 Subject: [PATCH 4/4] type fix --- pandas/io/parquet.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pandas/io/parquet.py b/pandas/io/parquet.py index f3a7e6c665119..3801a29fec39e 100644 --- a/pandas/io/parquet.py +++ b/pandas/io/parquet.py @@ -233,7 +233,7 @@ def read( ) manager = get_option("mode.data_manager") if manager == "array": - to_pandas_kwargs["split_blocks"] = True + to_pandas_kwargs["split_blocks"] = True # type: ignore[assignment] path_or_handle, handles, kwargs["filesystem"] = _get_path_or_handle( path,