Skip to content

read_json engine argument integration #49041

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 11 commits into from
3 changes: 3 additions & 0 deletions pandas/_typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,9 @@ def closed(self) -> bool:
# read_csv engines
CSVEngine = Literal["c", "python", "pyarrow", "python-fwf"]

# read_json engines
JSONEngine = Literal["ujson", "pyarrow"]

# read_xml parsers
XMLParsers = Literal["lxml", "etree"]

Expand Down
82 changes: 67 additions & 15 deletions pandas/io/json/_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
DtypeArg,
FilePath,
IndexLabel,
JSONEngine,
JSONSerializable,
ReadBuffer,
StorageOptions,
Expand Down Expand Up @@ -70,6 +71,7 @@
build_table_schema,
parse_table_schema,
)
from pandas.io.json.arrow_json_parser_wrapper import ArrowJsonParserWrapper
from pandas.io.parsers.readers import validate_integer

if TYPE_CHECKING:
Expand Down Expand Up @@ -394,6 +396,7 @@ def read_json(
date_unit: str | None = ...,
encoding: str | None = ...,
encoding_errors: str | None = ...,
engine: JSONEngine = ...,
lines: bool = ...,
chunksize: int,
compression: CompressionOptions = ...,
Expand All @@ -417,6 +420,7 @@ def read_json(
date_unit: str | None = ...,
encoding: str | None = ...,
encoding_errors: str | None = ...,
engine: JSONEngine = ...,
lines: bool = ...,
chunksize: int,
compression: CompressionOptions = ...,
Expand All @@ -440,6 +444,7 @@ def read_json(
date_unit: str | None = ...,
encoding: str | None = ...,
encoding_errors: str | None = ...,
engine: JSONEngine = ...,
lines: bool = ...,
chunksize: None = ...,
compression: CompressionOptions = ...,
Expand All @@ -462,6 +467,7 @@ def read_json(
date_unit: str | None = ...,
encoding: str | None = ...,
encoding_errors: str | None = ...,
engine: JSONEngine = ...,
lines: bool = ...,
chunksize: None = ...,
compression: CompressionOptions = ...,
Expand All @@ -488,6 +494,7 @@ def read_json(
date_unit: str | None = None,
encoding: str | None = None,
encoding_errors: str | None = "strict",
engine: JSONEngine = "ujson",
lines: bool = False,
chunksize: int | None = None,
compression: CompressionOptions = "infer",
Expand Down Expand Up @@ -609,6 +616,9 @@ def read_json(

.. versionadded:: 1.3.0

engine : {{'ujson', 'pyarrow'}}, default "ujson"
Parser engine to use.

lines : bool, default False
Read the file as a json object per line.

Expand Down Expand Up @@ -744,6 +754,7 @@ def read_json(
precise_float=precise_float,
date_unit=date_unit,
encoding=encoding,
engine=engine,
lines=lines,
chunksize=chunksize,
compression=compression,
Expand Down Expand Up @@ -786,6 +797,7 @@ def __init__(
nrows: int | None,
storage_options: StorageOptions = None,
encoding_errors: str | None = "strict",
engine: JSONEngine = "ujson",
) -> None:

self.orient = orient
Expand All @@ -797,6 +809,7 @@ def __init__(
self.precise_float = precise_float
self.date_unit = date_unit
self.encoding = encoding
self.engine = engine
self.compression = compression
self.storage_options = storage_options
self.lines = lines
Expand All @@ -814,9 +827,45 @@ def __init__(
self.nrows = validate_integer("nrows", self.nrows, 0)
if not self.lines:
raise ValueError("nrows can only be passed if lines=True")
if self.engine == "pyarrow":
if not self.lines:
raise ValueError(
"currently pyarrow engine only supports "
"the line-delimited JSON format"
)

data = self._get_data_from_filepath(filepath_or_buffer)
self.data = self._preprocess_data(data)
if self.engine == "pyarrow":
self._engine = self._make_engine(filepath_or_buffer, self.engine)
if self.engine == "ujson":
data = self._get_data_from_filepath(filepath_or_buffer)
self.data = self._preprocess_data(data)

def _make_engine(
self,
filepath_or_buffer: FilePath | ReadBuffer[str] | ReadBuffer[bytes],
engine: JSONEngine = "pyarrow",
) -> ArrowJsonParserWrapper:

if not isinstance(filepath_or_buffer, list):
is_text = False
mode = "rb"
self.handles = get_handle(
filepath_or_buffer,
mode=mode,
encoding=self.encoding,
is_text=is_text,
compression=self.compression,
storage_options=self.storage_options,
errors=self.encoding_errors,
)
filepath_or_buffer = self.handles.handle

try:
return ArrowJsonParserWrapper(filepath_or_buffer)
except Exception:
if self.handles is not None:
self.handles.close()
raise

def _preprocess_data(self, data):
"""
Expand Down Expand Up @@ -900,20 +949,23 @@ def read(self) -> DataFrame | Series:
Read the whole JSON input into a pandas object.
"""
obj: DataFrame | Series
if self.lines:
if self.chunksize:
obj = concat(self)
elif self.nrows:
lines = list(islice(self.data, self.nrows))
lines_json = self._combine_lines(lines)
obj = self._get_object_parser(lines_json)
if self.engine == "pyarrow":
obj = self._engine.read()
if self.engine == "ujson":
if self.lines:
if self.chunksize:
obj = concat(self)
elif self.nrows:
lines = list(islice(self.data, self.nrows))
lines_json = self._combine_lines(lines)
obj = self._get_object_parser(lines_json)
else:
data = ensure_str(self.data)
data_lines = data.split("\n")
obj = self._get_object_parser(self._combine_lines(data_lines))
else:
data = ensure_str(self.data)
data_lines = data.split("\n")
obj = self._get_object_parser(self._combine_lines(data_lines))
else:
obj = self._get_object_parser(self.data)
self.close()
obj = self._get_object_parser(self.data)
self.close()
return obj

def _get_object_parser(self, json) -> DataFrame | Series:
Expand Down
100 changes: 100 additions & 0 deletions pandas/io/json/arrow_json_parser_wrapper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
from __future__ import annotations

from typing import TYPE_CHECKING

from pandas._typing import ReadBuffer
from pandas.compat._optional import import_optional_dependency

from pandas.core.dtypes.inference import is_integer

if TYPE_CHECKING:
from pandas import DataFrame


class ArrowJsonParserWrapper:
"""
Wrapper for the pyarrow engine for read_json()
"""

def __init__(self, src: ReadBuffer[bytes]) -> None:
super().__init__()
self.src = src

def _parse_kwd(self) -> None:
"""
Validates keywords before passing to pyarrow
"""
...

def _get_pyarrow_options(self) -> None:
...

def read(self) -> DataFrame:
"""
Reads the contents of a JSON file into a DataFrame and
processes it according to the kwargs passed in the
constructor.

Returns
-------
DataFrame
The DataFrame created from the JSON file.
"""
pyarrow_json = import_optional_dependency("pyarrow.json")
table = pyarrow_json.read_json(self.src)

frame = table.to_pandas()
return frame

def _finalize_output(self, frame: DataFrame) -> DataFrame:
"""
Processes data read in based on kwargs.

Parameters
----------
frame: DataFrame
The DataFrame to process.

Returns
-------
DataFrame
The processed DataFrame.
"""
num_cols = len(frame.columns)
multi_index_named = True
if self.header is None:
if self.names is None:
if self.prefix is not None:
self.names = [f"{self.prefix}{i}" for i in range(num_cols)]
elif self.header is None:
self.names = range(num_cols)
if len(self.names) != num_cols:
# usecols is passed through to pyarrow, we only handle index col here
# The only way self.names is not the same length as number of cols is
# if we have int index_col. We should just pad the names(they will get
# removed anyways) to expected length then.
self.names = list(range(num_cols - len(self.names))) + self.names
multi_index_named = False
frame.columns = self.names
# we only need the frame not the names
frame.columns, frame = self._do_date_conversions(frame.columns, frame)
if self.index_col is not None:
for i, item in enumerate(self.index_col):
if is_integer(item):
self.index_col[i] = frame.columns[item]
else:
# String case
if item not in frame.columns:
raise ValueError(f"Index {item} invalid")
frame.set_index(self.index_col, drop=True, inplace=True)
# Clear names if headerless and no name given
if self.header is None and not multi_index_named:
frame.index.names = [None] * len(frame.index.names)

if self.kwds.get("dtype") is not None:
try:
frame = frame.astype(self.kwds.get("dtype"))
except TypeError as e:
# GH#44901 reraise to keep api consistent
raise ValueError(e)
return frame
8 changes: 8 additions & 0 deletions pandas/tests/io/json/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,11 @@ def orient(request):
Fixture for orients excluding the table format.
"""
return request.param


@pytest.fixture
def json_dir_path(datapath):
"""
The directory path to the data files needed for parser tests.
"""
return datapath("io", "json", "data")
32 changes: 32 additions & 0 deletions pandas/tests/io/json/test_readlines.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from io import StringIO
import os
from pathlib import Path
from typing import Iterator

Expand Down Expand Up @@ -27,6 +28,37 @@ def test_read_jsonl():
tm.assert_frame_equal(result, expected)


def test_read_jsonl_engine_pyarrow(json_dir_path):
# '{"a": 1, "b": 2}\n{"a": 3, "b": 4}\n{"a": 5, "b": 6}'

result = read_json(
os.path.join(json_dir_path, "line_delimited.json"),
lines=True,
engine="pyarrow",
)
expected = DataFrame({"a": [1, 3, 5], "b": [2, 4, 6]})
tm.assert_frame_equal(result, expected)


@pytest.mark.xfail
def test_read_jsonl_engine_pyarrow_lines_false(json_dir_path):
result = read_json(
os.path.join(json_dir_path, "line_delimited.json"),
engine="pyarrow",
)
expected = DataFrame({"a": [1, 3, 5], "b": [2, 4, 6]})
tm.assert_frame_equal(result, expected)


@pytest.mark.xfail
def test_read_jsonl_engine_pyarrow_json_string():
result = read_json(
'{"a": 1, "b": 2}, {"a": 3, "b": 4}, {"a": 5, "b": 6}', engine="pyarrow"
)
expected = DataFrame({"a": [1, 3, 5], "b": [2, 4, 6]})
tm.assert_frame_equal(result, expected)


def test_read_datetime():
# GH33787
df = DataFrame(
Expand Down