From 19db1c31604f1cd1c1f86e1dbf5300f9ec34089b Mon Sep 17 00:00:00 2001 From: Brock Date: Tue, 30 Mar 2021 10:57:34 -0700 Subject: [PATCH 1/3] REF: move union_categoricals call outside of cython --- pandas/_libs/parsers.pyx | 61 ++--------------- pandas/io/parsers/c_parser_wrapper.py | 83 ++++++++++++++++++++++- pandas/tests/io/parser/test_textreader.py | 5 ++ 3 files changed, 94 insertions(+), 55 deletions(-) diff --git a/pandas/_libs/parsers.pyx b/pandas/_libs/parsers.pyx index 153ac4b5f0893..8e7c927b7e2d5 100644 --- a/pandas/_libs/parsers.pyx +++ b/pandas/_libs/parsers.pyx @@ -94,7 +94,6 @@ from pandas._libs.khash cimport ( ) from pandas.errors import ( - DtypeWarning, EmptyDataError, ParserError, ParserWarning, @@ -108,9 +107,7 @@ from pandas.core.dtypes.common import ( is_float_dtype, is_integer_dtype, is_object_dtype, - pandas_dtype, ) -from pandas.core.dtypes.concat import union_categoricals cdef: float64_t INF = np.inf @@ -492,12 +489,10 @@ cdef class TextReader: raise ValueError(f'Unrecognized float_precision option: ' f'{float_precision}') - if isinstance(dtype, dict): - dtype = {k: pandas_dtype(dtype[k]) - for k in dtype} - elif dtype is not None: - dtype = pandas_dtype(dtype) - + # Caller is responsible for ensuring we have one of + # - None + # - DtypeObj + # - dict[Any, DtypeObj] self.dtype = dtype # XXX @@ -775,6 +770,8 @@ cdef class TextReader: """ if self.low_memory: # Conserve intermediate space + # Caller is responsible for concatenating chunks, + # see c_parser_wrapper._concatenatve_chunks columns = self._read_low_memory(rows) else: # Don't care about memory usage @@ -818,8 +815,7 @@ cdef class TextReader: if len(chunks) == 0: raise StopIteration - # destructive to chunks - return _concatenate_chunks(chunks) + return chunks cdef _tokenize_rows(self, size_t nrows): cdef: @@ -1907,49 +1903,6 @@ cdef raise_parser_error(object base, parser_t *parser): raise ParserError(message) -# chunks: list[dict[int, "ArrayLike"]] -# -> dict[int, "ArrayLike"] -def _concatenate_chunks(list chunks) -> dict: - cdef: - list names = list(chunks[0].keys()) - object name - list warning_columns = [] - object warning_names - object common_type - - result = {} - for name in names: - arrs = [chunk.pop(name) for chunk in chunks] - # Check each arr for consistent types. - dtypes = {a.dtype for a in arrs} - numpy_dtypes = {x for x in dtypes if not is_categorical_dtype(x)} - if len(numpy_dtypes) > 1: - common_type = np.find_common_type(numpy_dtypes, []) - if common_type == object: - warning_columns.append(str(name)) - - dtype = dtypes.pop() - if is_categorical_dtype(dtype): - sort_categories = isinstance(dtype, str) - result[name] = union_categoricals(arrs, - sort_categories=sort_categories) - else: - if is_extension_array_dtype(dtype): - array_type = dtype.construct_array_type() - result[name] = array_type._concat_same_type(arrs) - else: - result[name] = np.concatenate(arrs) - - if warning_columns: - warning_names = ','.join(warning_columns) - warning_message = " ".join([ - f"Columns ({warning_names}) have mixed types." - f"Specify dtype option on import or set low_memory=False." - ]) - warnings.warn(warning_message, DtypeWarning, stacklevel=8) - return result - - # ---------------------------------------------------------------------- # NA values def _compute_na_values(): diff --git a/pandas/io/parsers/c_parser_wrapper.py b/pandas/io/parsers/c_parser_wrapper.py index 8305ff64c42c6..be4a503898c03 100644 --- a/pandas/io/parsers/c_parser_wrapper.py +++ b/pandas/io/parsers/c_parser_wrapper.py @@ -1,5 +1,17 @@ +import warnings + +import numpy as np + import pandas._libs.parsers as parsers from pandas._typing import FilePathOrBuffer +from pandas.errors import DtypeWarning + +from pandas.core.dtypes.common import ( + is_categorical_dtype, + is_extension_array_dtype, + pandas_dtype, +) +from pandas.core.dtypes.concat import union_categoricals from pandas.core.indexes.api import ensure_index_from_sequences @@ -47,6 +59,7 @@ def __init__(self, src: FilePathOrBuffer, **kwds): # TextIOBase, TextIOWrapper, mmap]" has no attribute "mmap" self.handles.handle = self.handles.handle.mmap # type: ignore[union-attr] + kwds["dtype"] = ensure_dtype_objs(kwds.get("dtype", None)) try: self._reader = parsers.TextReader(self.handles.handle, **kwds) except Exception: @@ -183,6 +196,10 @@ def read(self, nrows=None): else: self.close() raise + else: + if self._reader.low_memory: + # destructive to data + data = _concatenate_chunks(data) # Done with first read, next time raise StopIteration self._first_chunk = False @@ -265,7 +282,71 @@ def _get_index_names(self): return names, idx_names - def _maybe_parse_dates(self, values, index, try_parse_dates=True): + def _maybe_parse_dates(self, values, index: int, try_parse_dates=True): if try_parse_dates and self._should_parse_dates(index): values = self._date_conv(values) return values + + +def _concatenate_chunks(chunks: list[dict]) -> dict: + """ + Concatenate chunks of data read with low_memory=True. + + The tricky part is handling Categoricals, where different chunks + may have different inferred categories. + """ + names = list(chunks[0].keys()) + warning_columns = [] + + result = {} + for name in names: + arrs = [chunk.pop(name) for chunk in chunks] + # Check each arr for consistent types. + dtypes = {a.dtype for a in arrs} + # TODO: shouldn't we exclude all EA dtypes here? + numpy_dtypes = {x for x in dtypes if not is_categorical_dtype(x)} + if len(numpy_dtypes) > 1: + # error: Argument 1 to "find_common_type" has incompatible type + # "Set[Any]"; expected "Sequence[Union[dtype[Any], None, type, + # _SupportsDType, str, Union[Tuple[Any, int], Tuple[Any, + # Union[int, Sequence[int]]], List[Any], _DTypeDict, Tuple[Any, Any]]]]" + common_type = np.find_common_type( + numpy_dtypes, # type: ignore[arg-type] + [], + ) + if common_type == object: + warning_columns.append(str(name)) + + dtype = dtypes.pop() + if is_categorical_dtype(dtype): + result[name] = union_categoricals(arrs, sort_categories=False) + else: + if is_extension_array_dtype(dtype): + # TODO: concat_compat? + array_type = dtype.construct_array_type() + result[name] = array_type._concat_same_type(arrs) + else: + result[name] = np.concatenate(arrs) + + if warning_columns: + warning_names = ",".join(warning_columns) + warning_message = " ".join( + [ + f"Columns ({warning_names}) have mixed types." + f"Specify dtype option on import or set low_memory=False." + ] + ) + warnings.warn(warning_message, DtypeWarning, stacklevel=8) + return result + + +def ensure_dtype_objs(dtype): + """ + Ensure we have either None, a dtype object, or a dictionary mapping to + dtype objects. + """ + if isinstance(dtype, dict): + dtype = {k: pandas_dtype(dtype[k]) for k in dtype} + elif dtype is not None: + dtype = pandas_dtype(dtype) + return dtype diff --git a/pandas/tests/io/parser/test_textreader.py b/pandas/tests/io/parser/test_textreader.py index 104cf56419bfd..7f84c5e378d16 100644 --- a/pandas/tests/io/parser/test_textreader.py +++ b/pandas/tests/io/parser/test_textreader.py @@ -21,6 +21,7 @@ TextFileReader, read_csv, ) +from pandas.io.parsers.c_parser_wrapper import ensure_dtype_objs class TestTextReader: @@ -206,6 +207,8 @@ def test_numpy_string_dtype(self): aaaaa,5""" def _make_reader(**kwds): + if "dtype" in kwds: + kwds["dtype"] = ensure_dtype_objs(kwds["dtype"]) return TextReader(StringIO(data), delimiter=",", header=None, **kwds) reader = _make_reader(dtype="S5,i4") @@ -233,6 +236,8 @@ def test_pass_dtype(self): 4,d""" def _make_reader(**kwds): + if "dtype" in kwds: + kwds["dtype"] = ensure_dtype_objs(kwds["dtype"]) return TextReader(StringIO(data), delimiter=",", **kwds) reader = _make_reader(dtype={"one": "u1", 1: "S1"}) From 21a3c18331df72e650b41fdb63ffc1584e1058c5 Mon Sep 17 00:00:00 2001 From: Brock Date: Wed, 14 Apr 2021 22:12:05 -0700 Subject: [PATCH 2/3] future import --- pandas/io/parsers/c_parser_wrapper.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pandas/io/parsers/c_parser_wrapper.py b/pandas/io/parsers/c_parser_wrapper.py index be4a503898c03..b88ee3d3f479e 100644 --- a/pandas/io/parsers/c_parser_wrapper.py +++ b/pandas/io/parsers/c_parser_wrapper.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import warnings import numpy as np From 04ea30f78a037188adbdd17cc1404b2cbdd32f66 Mon Sep 17 00:00:00 2001 From: Brock Date: Thu, 15 Apr 2021 10:03:41 -0700 Subject: [PATCH 3/3] mypy fixup --- pandas/_libs/parsers.pyi | 2 +- pandas/_libs/parsers.pyx | 23 ++++++++---------- pandas/io/parsers/c_parser_wrapper.py | 34 +++++++++++++++++++-------- 3 files changed, 35 insertions(+), 24 deletions(-) diff --git a/pandas/_libs/parsers.pyi b/pandas/_libs/parsers.pyi index 1051c319b769b..18ae23e7fb90d 100644 --- a/pandas/_libs/parsers.pyi +++ b/pandas/_libs/parsers.pyi @@ -58,7 +58,6 @@ class TextReader: true_values=..., false_values=..., allow_leading_cols: bool = ..., - low_memory: bool = ..., skiprows=..., skipfooter: int = ..., # int64_t verbose: bool = ..., @@ -75,3 +74,4 @@ class TextReader: def close(self) -> None: ... def read(self, rows: int | None = ...) -> dict[int, ArrayLike]: ... + def read_low_memory(self, rows: int | None) -> list[dict[int, ArrayLike]]: ... diff --git a/pandas/_libs/parsers.pyx b/pandas/_libs/parsers.pyx index 8e7c927b7e2d5..fb7152f3d9e24 100644 --- a/pandas/_libs/parsers.pyx +++ b/pandas/_libs/parsers.pyx @@ -314,7 +314,7 @@ cdef class TextReader: cdef public: int64_t leading_cols, table_width, skipfooter, buffer_lines - bint allow_leading_cols, mangle_dupe_cols, low_memory + bint allow_leading_cols, mangle_dupe_cols bint delim_whitespace object delimiter # bytes or str object converters @@ -359,7 +359,6 @@ cdef class TextReader: true_values=None, false_values=None, bint allow_leading_cols=True, - bint low_memory=False, skiprows=None, skipfooter=0, # int64_t bint verbose=False, @@ -476,7 +475,6 @@ cdef class TextReader: self.na_filter = na_filter self.verbose = verbose - self.low_memory = low_memory if float_precision == "round_trip": # see gh-15140 @@ -768,19 +766,18 @@ cdef class TextReader: """ rows=None --> read all rows """ - if self.low_memory: - # Conserve intermediate space - # Caller is responsible for concatenating chunks, - # see c_parser_wrapper._concatenatve_chunks - columns = self._read_low_memory(rows) - else: - # Don't care about memory usage - columns = self._read_rows(rows, 1) + # Don't care about memory usage + columns = self._read_rows(rows, 1) return columns - # -> dict[int, "ArrayLike"] - cdef _read_low_memory(self, rows): + def read_low_memory(self, rows: int | None)-> list[dict[int, "ArrayLike"]]: + """ + rows=None --> read all rows + """ + # Conserve intermediate space + # Caller is responsible for concatenating chunks, + # see c_parser_wrapper._concatenatve_chunks cdef: size_t rows_read = 0 list chunks = [] diff --git a/pandas/io/parsers/c_parser_wrapper.py b/pandas/io/parsers/c_parser_wrapper.py index b88ee3d3f479e..4d3cbc41487e1 100644 --- a/pandas/io/parsers/c_parser_wrapper.py +++ b/pandas/io/parsers/c_parser_wrapper.py @@ -5,15 +5,18 @@ import numpy as np import pandas._libs.parsers as parsers -from pandas._typing import FilePathOrBuffer +from pandas._typing import ( + ArrayLike, + FilePathOrBuffer, +) from pandas.errors import DtypeWarning from pandas.core.dtypes.common import ( is_categorical_dtype, - is_extension_array_dtype, pandas_dtype, ) from pandas.core.dtypes.concat import union_categoricals +from pandas.core.dtypes.dtypes import ExtensionDtype from pandas.core.indexes.api import ensure_index_from_sequences @@ -24,12 +27,16 @@ class CParserWrapper(ParserBase): + low_memory: bool + def __init__(self, src: FilePathOrBuffer, **kwds): self.kwds = kwds kwds = kwds.copy() ParserBase.__init__(self, kwds) + self.low_memory = kwds.pop("low_memory", False) + # #2442 kwds["allow_leading_cols"] = self.index_col is not False @@ -175,7 +182,13 @@ def set_error_bad_lines(self, status): def read(self, nrows=None): try: - data = self._reader.read(nrows) + if self.low_memory: + chunks = self._reader.read_low_memory(nrows) + # destructive to chunks + data = _concatenate_chunks(chunks) + + else: + data = self._reader.read(nrows) except StopIteration: if self._first_chunk: self._first_chunk = False @@ -198,10 +211,6 @@ def read(self, nrows=None): else: self.close() raise - else: - if self._reader.low_memory: - # destructive to data - data = _concatenate_chunks(data) # Done with first read, next time raise StopIteration self._first_chunk = False @@ -290,7 +299,7 @@ def _maybe_parse_dates(self, values, index: int, try_parse_dates=True): return values -def _concatenate_chunks(chunks: list[dict]) -> dict: +def _concatenate_chunks(chunks: list[dict[int, ArrayLike]]) -> dict: """ Concatenate chunks of data read with low_memory=True. @@ -323,10 +332,15 @@ def _concatenate_chunks(chunks: list[dict]) -> dict: if is_categorical_dtype(dtype): result[name] = union_categoricals(arrs, sort_categories=False) else: - if is_extension_array_dtype(dtype): + if isinstance(dtype, ExtensionDtype): # TODO: concat_compat? array_type = dtype.construct_array_type() - result[name] = array_type._concat_same_type(arrs) + # error: Argument 1 to "_concat_same_type" of "ExtensionArray" + # has incompatible type "List[Union[ExtensionArray, ndarray]]"; + # expected "Sequence[ExtensionArray]" + result[name] = array_type._concat_same_type( + arrs # type: ignore[arg-type] + ) else: result[name] = np.concatenate(arrs)