Skip to content

ENH: context-manager for chunksize/iterator-reader #38225

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Dec 4, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 21 additions & 19 deletions doc/source/user_guide/io.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1577,19 +1577,21 @@ value will be an iterable object of type ``TextFileReader``:

.. ipython:: python

reader = pd.read_csv("tmp.sv", sep="|", chunksize=4)
reader
with pd.read_csv("tmp.sv", sep="|", chunksize=4) as reader:
reader
for chunk in reader:
print(chunk)

for chunk in reader:
print(chunk)
.. versionchanged:: 1.2

``read_csv/json/sas`` return a context-manager when iterating through a file.

Specifying ``iterator=True`` will also return the ``TextFileReader`` object:

.. ipython:: python

reader = pd.read_csv("tmp.sv", sep="|", iterator=True)
reader.get_chunk(5)
with pd.read_csv("tmp.sv", sep="|", iterator=True) as reader:
reader.get_chunk(5)

.. ipython:: python
:suppress:
Expand Down Expand Up @@ -2238,10 +2240,10 @@ For line-delimited json files, pandas can also return an iterator which reads in
df.to_json(orient="records", lines=True)

# reader is an iterator that returns ``chunksize`` lines each iteration
reader = pd.read_json(StringIO(jsonl), lines=True, chunksize=1)
reader
for chunk in reader:
print(chunk)
with pd.read_json(StringIO(jsonl), lines=True, chunksize=1) as reader:
reader
for chunk in reader:
print(chunk)

.. _io.table_schema:

Expand Down Expand Up @@ -5471,19 +5473,19 @@ object can be used as an iterator.

.. ipython:: python

reader = pd.read_stata("stata.dta", chunksize=3)
for df in reader:
print(df.shape)
with pd.read_stata("stata.dta", chunksize=3) as reader:
for df in reader:
print(df.shape)

For more fine-grained control, use ``iterator=True`` and specify
``chunksize`` with each call to
:func:`~pandas.io.stata.StataReader.read`.

.. ipython:: python

reader = pd.read_stata("stata.dta", iterator=True)
chunk1 = reader.read(5)
chunk2 = reader.read(5)
with pd.read_stata("stata.dta", iterator=True) as reader:
chunk1 = reader.read(5)
chunk2 = reader.read(5)

Currently the ``index`` is retrieved as a column.

Expand Down Expand Up @@ -5595,9 +5597,9 @@ Obtain an iterator and read an XPORT file 100,000 lines at a time:
pass


rdr = pd.read_sas("sas_xport.xpt", chunk=100000)
for chunk in rdr:
do_something(chunk)
with pd.read_sas("sas_xport.xpt", chunk=100000) as rdr:
for chunk in rdr:
do_something(chunk)

The specification_ for the xport file format is available from the SAS
web site.
Expand Down
1 change: 1 addition & 0 deletions doc/source/whatsnew/v1.2.0.rst
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ Other enhancements
- Improve numerical stability for :meth:`.Rolling.skew`, :meth:`.Rolling.kurt`, :meth:`Expanding.skew` and :meth:`Expanding.kurt` through implementation of Kahan summation (:issue:`6929`)
- Improved error reporting for subsetting columns of a :class:`.DataFrameGroupBy` with ``axis=1`` (:issue:`37725`)
- Implement method ``cross`` for :meth:`DataFrame.merge` and :meth:`DataFrame.join` (:issue:`5401`)
- When :func:`read_csv/sas/json` are called with ``chuncksize``/``iterator`` they can be used in a ``with`` statement as they return context-managers (:issue:`38225`)

.. ---------------------------------------------------------------------------

Expand Down
5 changes: 2 additions & 3 deletions pandas/io/html.py
Original file line number Diff line number Diff line change
Expand Up @@ -794,9 +794,8 @@ def _data_to_frame(**kwargs):

# fill out elements of body that are "ragged"
_expand_elements(body)
tp = TextParser(body, header=header, **kwargs)
df = tp.read()
return df
with TextParser(body, header=header, **kwargs) as tp:
return tp.read()


_valid_parsers = {
Expand Down
13 changes: 12 additions & 1 deletion pandas/io/json/_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,10 @@ def read_json(
This can only be passed if `lines=True`.
If this is None, the file will be read into memory all at once.

.. versionchanged:: 1.2

``JsonReader`` is a context manager.

compression : {{'infer', 'gzip', 'bz2', 'zip', 'xz', None}}, default 'infer'
For on-the-fly decompression of on-disk data. If 'infer', then use
gzip, bz2, zip or xz if path_or_buf is a string ending in
Expand Down Expand Up @@ -555,7 +559,8 @@ def read_json(
if chunksize:
return json_reader

return json_reader.read()
with json_reader:
return json_reader.read()


class JsonReader(abc.Iterator):
Expand Down Expand Up @@ -747,6 +752,12 @@ def __next__(self):
self.close()
raise StopIteration

def __enter__(self):
return self

def __exit__(self, exc_type, exc_value, traceback):
self.close()


class Parser:
_split_keys: Tuple[str, ...]
Expand Down
22 changes: 16 additions & 6 deletions pandas/io/parsers.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,11 +276,19 @@
iterator : bool, default False
Return TextFileReader object for iteration or getting chunks with
``get_chunk()``.

.. versionchanged:: 1.2

``TextFileReader`` is a context manager.
chunksize : int, optional
Return TextFileReader object for iteration.
See the `IO Tools docs
<https://pandas.pydata.org/pandas-docs/stable/io.html#io-chunking>`_
for more information on ``iterator`` and ``chunksize``.

.. versionchanged:: 1.2

``TextFileReader`` is a context manager.
compression : {{'infer', 'gzip', 'bz2', 'zip', 'xz', None}}, default 'infer'
For on-the-fly decompression of on-disk data. If 'infer' and
`filepath_or_buffer` is path-like, then detect compression from the
Expand Down Expand Up @@ -451,12 +459,8 @@ def _read(filepath_or_buffer: FilePathOrBuffer, kwds):
if chunksize or iterator:
return parser

try:
data = parser.read(nrows)
finally:
parser.close()

return data
with parser:
return parser.read(nrows)


_parser_defaults = {
Expand Down Expand Up @@ -1074,6 +1078,12 @@ def get_chunk(self, size=None):
size = min(size, self.nrows - self._currow)
return self.read(nrows=size)

def __enter__(self):
return self

def __exit__(self, exc_type, exc_value, traceback):
self.close()


def _is_index_col(col):
return col is not None and col is not False
Expand Down
20 changes: 16 additions & 4 deletions pandas/io/sas/sasreader.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from pandas._typing import FilePathOrBuffer, Label

from pandas.io.common import IOHandles, stringify_path
from pandas.io.common import stringify_path

if TYPE_CHECKING:
from pandas import DataFrame
Expand All @@ -18,8 +18,6 @@ class ReaderBase(metaclass=ABCMeta):
Protocol for XportReader and SAS7BDATReader classes.
"""

handles: IOHandles

@abstractmethod
def read(self, nrows=None):
pass
Expand All @@ -28,6 +26,12 @@ def read(self, nrows=None):
def close(self):
pass

def __enter__(self):
return self

def __exit__(self, exc_type, exc_value, traceback):
self.close()


@overload
def read_sas(
Expand Down Expand Up @@ -87,9 +91,17 @@ def read_sas(
Encoding for text data. If None, text data are stored as raw bytes.
chunksize : int
Read file `chunksize` lines at a time, returns iterator.

.. versionchanged:: 1.2

``TextFileReader`` is a context manager.
iterator : bool, defaults to False
If True, returns an iterator for reading the file incrementally.

.. versionchanged:: 1.2

``TextFileReader`` is a context manager.

Returns
-------
DataFrame if iterator=False and chunksize=None, else SAS7BDATReader
Expand Down Expand Up @@ -136,5 +148,5 @@ def read_sas(
if iterator or chunksize:
return reader

with reader.handles:
with reader:
return reader.read()
6 changes: 4 additions & 2 deletions pandas/tests/io/json/test_compression.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,10 @@ def test_chunksize_with_compression(compression):
df = pd.read_json('{"a": ["foo", "bar", "baz"], "b": [4, 5, 6]}')
df.to_json(path, orient="records", lines=True, compression=compression)

res = pd.read_json(path, lines=True, chunksize=1, compression=compression)
roundtripped_df = pd.concat(res)
with pd.read_json(
path, lines=True, chunksize=1, compression=compression
) as res:
roundtripped_df = pd.concat(res)
tm.assert_frame_equal(df, roundtripped_df)


Expand Down
37 changes: 23 additions & 14 deletions pandas/tests/io/json/test_readlines.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,16 +77,17 @@ def test_readjson_chunks(lines_json_df, chunksize):
# GH17048: memory usage when lines=True

unchunked = read_json(StringIO(lines_json_df), lines=True)
reader = read_json(StringIO(lines_json_df), lines=True, chunksize=chunksize)
chunked = pd.concat(reader)
with read_json(StringIO(lines_json_df), lines=True, chunksize=chunksize) as reader:
chunked = pd.concat(reader)

tm.assert_frame_equal(chunked, unchunked)


def test_readjson_chunksize_requires_lines(lines_json_df):
msg = "chunksize can only be passed if lines=True"
with pytest.raises(ValueError, match=msg):
pd.read_json(StringIO(lines_json_df), lines=False, chunksize=2)
with pd.read_json(StringIO(lines_json_df), lines=False, chunksize=2) as _:
pass


def test_readjson_chunks_series():
Expand All @@ -97,15 +98,17 @@ def test_readjson_chunks_series():
unchunked = pd.read_json(strio, lines=True, typ="Series")

strio = StringIO(s.to_json(lines=True, orient="records"))
chunked = pd.concat(pd.read_json(strio, lines=True, typ="Series", chunksize=1))
with pd.read_json(strio, lines=True, typ="Series", chunksize=1) as reader:
chunked = pd.concat(reader)

tm.assert_series_equal(chunked, unchunked)


def test_readjson_each_chunk(lines_json_df):
# Other tests check that the final result of read_json(chunksize=True)
# is correct. This checks the intermediate chunks.
chunks = list(pd.read_json(StringIO(lines_json_df), lines=True, chunksize=2))
with pd.read_json(StringIO(lines_json_df), lines=True, chunksize=2) as reader:
chunks = list(reader)
assert chunks[0].shape == (2, 2)
assert chunks[1].shape == (1, 2)

Expand All @@ -114,7 +117,8 @@ def test_readjson_chunks_from_file():
with tm.ensure_clean("test.json") as path:
df = DataFrame({"A": [1, 2, 3], "B": [4, 5, 6]})
df.to_json(path, lines=True, orient="records")
chunked = pd.concat(pd.read_json(path, lines=True, chunksize=1))
with pd.read_json(path, lines=True, chunksize=1) as reader:
chunked = pd.concat(reader)
unchunked = pd.read_json(path, lines=True)
tm.assert_frame_equal(unchunked, chunked)

Expand All @@ -141,7 +145,8 @@ def test_readjson_chunks_closes(chunksize):
compression=None,
nrows=None,
)
reader.read()
with reader:
reader.read()
assert (
reader.handles.handle.closed
), f"didn't close stream with chunksize = {chunksize}"
Expand All @@ -152,7 +157,10 @@ def test_readjson_invalid_chunksize(lines_json_df, chunksize):
msg = r"'chunksize' must be an integer >=1"

with pytest.raises(ValueError, match=msg):
pd.read_json(StringIO(lines_json_df), lines=True, chunksize=chunksize)
with pd.read_json(
StringIO(lines_json_df), lines=True, chunksize=chunksize
) as _:
pass


@pytest.mark.parametrize("chunksize", [None, 1, 2])
Expand All @@ -176,7 +184,8 @@ def test_readjson_chunks_multiple_empty_lines(chunksize):
orig = DataFrame({"A": [1, 2, 3], "B": [4, 5, 6]})
test = pd.read_json(j, lines=True, chunksize=chunksize)
if chunksize is not None:
test = pd.concat(test)
with test:
test = pd.concat(test)
tm.assert_frame_equal(orig, test, obj=f"chunksize: {chunksize}")


Expand Down Expand Up @@ -212,8 +221,8 @@ def test_readjson_nrows_chunks(nrows, chunksize):
{"a": 3, "b": 4}
{"a": 5, "b": 6}
{"a": 7, "b": 8}"""
reader = read_json(jsonl, lines=True, nrows=nrows, chunksize=chunksize)
chunked = pd.concat(reader)
with read_json(jsonl, lines=True, nrows=nrows, chunksize=chunksize) as reader:
chunked = pd.concat(reader)
expected = DataFrame({"a": [1, 3, 5, 7], "b": [2, 4, 6, 8]}).iloc[:nrows]
tm.assert_frame_equal(chunked, expected)

Expand All @@ -240,6 +249,6 @@ def test_readjson_lines_chunks_fileurl(datapath):
]
os_path = datapath("io", "json", "data", "line_delimited.json")
file_url = Path(os_path).as_uri()
url_reader = pd.read_json(file_url, lines=True, chunksize=1)
for index, chuck in enumerate(url_reader):
tm.assert_frame_equal(chuck, df_list_expected[index])
with pd.read_json(file_url, lines=True, chunksize=1) as url_reader:
for index, chuck in enumerate(url_reader):
tm.assert_frame_equal(chuck, df_list_expected[index])
12 changes: 6 additions & 6 deletions pandas/tests/io/parser/test_c_parser_only.py
Original file line number Diff line number Diff line change
Expand Up @@ -376,25 +376,25 @@ def test_parse_trim_buffers(c_parser_only):
)

# Iterate over the CSV file in chunks of `chunksize` lines
chunks_ = parser.read_csv(
with parser.read_csv(
StringIO(csv_data), header=None, dtype=object, chunksize=chunksize
)
result = concat(chunks_, axis=0, ignore_index=True)
) as chunks_:
result = concat(chunks_, axis=0, ignore_index=True)

# Check for data corruption if there was no segfault
tm.assert_frame_equal(result, expected)

# This extra test was added to replicate the fault in gh-5291.
# Force 'utf-8' encoding, so that `_string_convert` would take
# a different execution branch.
chunks_ = parser.read_csv(
with parser.read_csv(
StringIO(csv_data),
header=None,
dtype=object,
chunksize=chunksize,
encoding="utf_8",
)
result = concat(chunks_, axis=0, ignore_index=True)
) as chunks_:
result = concat(chunks_, axis=0, ignore_index=True)
tm.assert_frame_equal(result, expected)


Expand Down
Loading