Skip to content

Commit 3dcf4ac

Browse files
williamma12devin-petersohn
authored andcommitted
Fix issue by properly handling parse_dates (modin-project#473)
* Fix issue by properly handling parse_dates * linting
1 parent be54fcc commit 3dcf4ac

File tree

2 files changed

+131
-81
lines changed

2 files changed

+131
-81
lines changed

modin/engines/ray/pandas_on_ray/io.py

+22-1
Original file line numberDiff line numberDiff line change
@@ -154,8 +154,14 @@ def _read_csv_from_file_pandas_on_ray(cls, filepath, kwargs={}):
154154
column_names = empty_pd_df.columns
155155
skipfooter = kwargs.get("skipfooter", None)
156156
skiprows = kwargs.pop("skiprows", None)
157+
parse_dates = kwargs.pop("parse_dates", False)
157158
partition_kwargs = dict(
158-
kwargs, header=None, names=column_names, skipfooter=0, skiprows=None
159+
kwargs,
160+
header=None,
161+
names=column_names,
162+
skipfooter=0,
163+
skiprows=None,
164+
parse_dates=parse_dates,
159165
)
160166
with open(filepath, "rb") as f:
161167
# Get the BOM if necessary
@@ -209,6 +215,21 @@ def _read_csv_from_file_pandas_on_ray(cls, filepath, kwargs={}):
209215
new_index_ids = get_index.remote([empty_pd_df.index.name], *index_ids)
210216
new_index = ray.get(new_index_ids)
211217

218+
# If parse_dates is present, the column names that we have might not be
219+
# the same length as the returned column names. If we do need to modify
220+
# the column names, we remove the old names from the column names and
221+
# insert the new one at the front of the Index.
222+
if parse_dates is not None:
223+
# Check if is list of lists
224+
if isinstance(parse_dates, list) and isinstance(parse_dates[0], list):
225+
for group in parse_dates:
226+
new_col_name = "_".join(group)
227+
column_names = column_names.drop(group).insert(0, new_col_name)
228+
# Check if it is a dictionary
229+
elif isinstance(parse_dates, dict):
230+
for new_col_name, group in parse_dates.items():
231+
column_names = column_names.drop(group).insert(0, new_col_name)
232+
212233
new_query_compiler = PandasQueryCompiler(
213234
RayBlockPartitions(np.array(partition_ids)), new_index, column_names
214235
)

modin/pandas/test/test_io.py

+109-80
Original file line numberDiff line numberDiff line change
@@ -99,20 +99,44 @@ def teardown_parquet_file():
9999

100100

101101
@pytest.fixture
102-
def setup_csv_file(row_size, force=False, delimiter=",", encoding=None):
103-
if os.path.exists(TEST_CSV_FILENAME) and not force:
104-
pass
105-
else:
106-
df = pandas.DataFrame(
107-
{"col1": np.arange(row_size), "col2": np.arange(row_size)}
108-
)
109-
df.to_csv(TEST_CSV_FILENAME, sep=delimiter, encoding=encoding)
102+
def make_csv_file():
103+
"""Pytest fixture factory that makes temp csv files for testing.
110104
105+
Yields:
106+
Function that generates csv files
107+
"""
108+
filenames = []
111109

112-
@pytest.fixture
113-
def teardown_csv_file():
114-
if os.path.exists(TEST_CSV_FILENAME):
115-
os.remove(TEST_CSV_FILENAME)
110+
def _make_csv_file(
111+
filename=TEST_CSV_FILENAME,
112+
row_size=SMALL_ROW_SIZE,
113+
force=False,
114+
delimiter=",",
115+
encoding=None,
116+
):
117+
if os.path.exists(filename) and not force:
118+
pass
119+
else:
120+
dates = pandas.date_range("2000", freq="h", periods=row_size)
121+
df = pandas.DataFrame(
122+
{
123+
"col1": np.arange(row_size),
124+
"col2": [str(x.date()) for x in dates],
125+
"col3": np.arange(row_size),
126+
"col4": [str(x.time()) for x in dates],
127+
}
128+
)
129+
df.to_csv(filename, sep=delimiter, encoding=encoding)
130+
filenames.append(filename)
131+
return df
132+
133+
# Return function that generates csv files
134+
yield _make_csv_file
135+
136+
# Delete csv files that were created
137+
for filename in filenames:
138+
if os.path.exists(filename):
139+
os.remove(filename)
116140

117141

118142
@pytest.fixture
@@ -315,53 +339,6 @@ def test_from_parquet_with_columns():
315339
teardown_parquet_file()
316340

317341

318-
def test_from_csv():
319-
setup_csv_file(SMALL_ROW_SIZE)
320-
321-
pandas_df = pandas.read_csv(TEST_CSV_FILENAME)
322-
modin_df = pd.read_csv(TEST_CSV_FILENAME)
323-
324-
assert modin_df_equals_pandas(modin_df, pandas_df)
325-
326-
if not PY2:
327-
pandas_df = pandas.read_csv(Path(TEST_CSV_FILENAME))
328-
modin_df = pd.read_csv(Path(TEST_CSV_FILENAME))
329-
330-
assert modin_df_equals_pandas(modin_df, pandas_df)
331-
332-
teardown_csv_file()
333-
334-
335-
def test_from_csv_chunksize():
336-
setup_csv_file(SMALL_ROW_SIZE)
337-
338-
# Tests __next__ and correctness of reader as an iterator
339-
# Use larger chunksize to read through file quicker
340-
rdf_reader = pd.read_csv(TEST_CSV_FILENAME, chunksize=500)
341-
pd_reader = pandas.read_csv(TEST_CSV_FILENAME, chunksize=500)
342-
343-
for modin_df, pd_df in zip(rdf_reader, pd_reader):
344-
assert modin_df_equals_pandas(modin_df, pd_df)
345-
346-
# Tests that get_chunk works correctly
347-
rdf_reader = pd.read_csv(TEST_CSV_FILENAME, chunksize=1)
348-
pd_reader = pandas.read_csv(TEST_CSV_FILENAME, chunksize=1)
349-
350-
modin_df = rdf_reader.get_chunk(1)
351-
pd_df = pd_reader.get_chunk(1)
352-
353-
assert modin_df_equals_pandas(modin_df, pd_df)
354-
355-
# Tests that read works correctly
356-
rdf_reader = pd.read_csv(TEST_CSV_FILENAME, chunksize=1)
357-
pd_reader = pandas.read_csv(TEST_CSV_FILENAME, chunksize=1)
358-
359-
modin_df = rdf_reader.read()
360-
pd_df = pd_reader.read()
361-
362-
assert modin_df_equals_pandas(modin_df, pd_df)
363-
364-
365342
def test_from_json():
366343
setup_json_file(SMALL_ROW_SIZE)
367344

@@ -492,8 +469,53 @@ def test_from_sas():
492469
assert modin_df_equals_pandas(modin_df, pandas_df)
493470

494471

495-
def test_from_csv_delimiter():
496-
setup_csv_file(SMALL_ROW_SIZE, delimiter="|")
472+
def test_from_csv(make_csv_file):
473+
make_csv_file()
474+
475+
pandas_df = pandas.read_csv(TEST_CSV_FILENAME)
476+
modin_df = pd.read_csv(TEST_CSV_FILENAME)
477+
478+
assert modin_df_equals_pandas(modin_df, pandas_df)
479+
480+
if not PY2:
481+
pandas_df = pandas.read_csv(Path(TEST_CSV_FILENAME))
482+
modin_df = pd.read_csv(Path(TEST_CSV_FILENAME))
483+
484+
assert modin_df_equals_pandas(modin_df, pandas_df)
485+
486+
487+
def test_from_csv_chunksize(make_csv_file):
488+
make_csv_file()
489+
490+
# Tests __next__ and correctness of reader as an iterator
491+
# Use larger chunksize to read through file quicker
492+
rdf_reader = pd.read_csv(TEST_CSV_FILENAME, chunksize=500)
493+
pd_reader = pandas.read_csv(TEST_CSV_FILENAME, chunksize=500)
494+
495+
for modin_df, pd_df in zip(rdf_reader, pd_reader):
496+
assert modin_df_equals_pandas(modin_df, pd_df)
497+
498+
# Tests that get_chunk works correctly
499+
rdf_reader = pd.read_csv(TEST_CSV_FILENAME, chunksize=1)
500+
pd_reader = pandas.read_csv(TEST_CSV_FILENAME, chunksize=1)
501+
502+
modin_df = rdf_reader.get_chunk(1)
503+
pd_df = pd_reader.get_chunk(1)
504+
505+
assert modin_df_equals_pandas(modin_df, pd_df)
506+
507+
# Tests that read works correctly
508+
rdf_reader = pd.read_csv(TEST_CSV_FILENAME, chunksize=1)
509+
pd_reader = pandas.read_csv(TEST_CSV_FILENAME, chunksize=1)
510+
511+
modin_df = rdf_reader.read()
512+
pd_df = pd_reader.read()
513+
514+
assert modin_df_equals_pandas(modin_df, pd_df)
515+
516+
517+
def test_from_csv_delimiter(make_csv_file):
518+
make_csv_file(delimiter="|")
497519

498520
pandas_df = pandas.read_csv(TEST_CSV_FILENAME, sep="|")
499521
modin_df = pd.read_csv(TEST_CSV_FILENAME, sep="|")
@@ -503,35 +525,32 @@ def test_from_csv_delimiter():
503525
modin_df = pd.DataFrame.from_csv(
504526
TEST_CSV_FILENAME, sep="|", parse_dates=False, header="infer", index_col=None
505527
)
528+
pandas_df = pandas.DataFrame.from_csv(
529+
TEST_CSV_FILENAME, sep="|", parse_dates=False, header="infer", index_col=None
530+
)
506531
assert modin_df_equals_pandas(modin_df, pandas_df)
507532

508-
teardown_csv_file()
509533

510-
511-
def test_from_csv_skiprows():
512-
setup_csv_file(SMALL_ROW_SIZE)
534+
def test_from_csv_skiprows(make_csv_file):
535+
make_csv_file()
513536

514537
pandas_df = pandas.read_csv(TEST_CSV_FILENAME, skiprows=2)
515538
modin_df = pd.read_csv(TEST_CSV_FILENAME, skiprows=2)
516539

517540
assert modin_df_equals_pandas(modin_df, pandas_df)
518541

519-
teardown_csv_file()
520-
521542

522-
def test_from_csv_encoding():
523-
setup_csv_file(SMALL_ROW_SIZE, encoding="latin8")
543+
def test_from_csv_encoding(make_csv_file):
544+
make_csv_file(encoding="latin8")
524545

525546
pandas_df = pandas.read_csv(TEST_CSV_FILENAME, encoding="latin8")
526547
modin_df = pd.read_csv(TEST_CSV_FILENAME, encoding="latin8")
527548

528549
assert modin_df_equals_pandas(modin_df, pandas_df)
529550

530-
teardown_csv_file()
531-
532551

533-
def test_from_csv_default_to_pandas_behavior():
534-
setup_csv_file(SMALL_ROW_SIZE)
552+
def test_from_csv_default_to_pandas_behavior(make_csv_file):
553+
make_csv_file()
535554

536555
with pytest.warns(UserWarning):
537556
# Test nrows
@@ -548,26 +567,36 @@ def test_from_csv_default_to_pandas_behavior():
548567
pd.read_csv(TEST_CSV_FILENAME, skiprows=lambda x: x in [0, 2])
549568

550569

551-
def test_from_csv_index_col():
552-
setup_csv_file(SMALL_ROW_SIZE)
570+
def test_from_csv_index_col(make_csv_file):
571+
make_csv_file()
553572

554573
pandas_df = pandas.read_csv(TEST_CSV_FILENAME, index_col="col1")
555574
modin_df = pd.read_csv(TEST_CSV_FILENAME, index_col="col1")
556575

557576
assert modin_df_equals_pandas(modin_df, pandas_df)
558577

559-
teardown_csv_file()
560578

561-
562-
def test_from_csv_skipfooter():
563-
setup_csv_file(SMALL_ROW_SIZE)
579+
def test_from_csv_skipfooter(make_csv_file):
580+
make_csv_file()
564581

565582
pandas_df = pandas.read_csv(TEST_CSV_FILENAME, skipfooter=13)
566583
modin_df = pd.read_csv(TEST_CSV_FILENAME, skipfooter=13)
567584

568585
assert modin_df_equals_pandas(modin_df, pandas_df)
569586

570-
teardown_csv_file()
587+
588+
def test_from_csv_parse_dates(make_csv_file):
589+
make_csv_file(force=True)
590+
591+
pandas_df = pandas.read_csv(TEST_CSV_FILENAME, parse_dates=[["col2", "col4"]])
592+
modin_df = pd.read_csv(TEST_CSV_FILENAME, parse_dates=[["col2", "col4"]])
593+
assert modin_df_equals_pandas(modin_df, pandas_df)
594+
595+
pandas_df = pandas.read_csv(
596+
TEST_CSV_FILENAME, parse_dates={"time": ["col2", "col4"]}
597+
)
598+
modin_df = pd.read_csv(TEST_CSV_FILENAME, parse_dates={"time": ["col2", "col4"]})
599+
assert modin_df_equals_pandas(modin_df, pandas_df)
571600

572601

573602
@pytest.mark.skip(reason="No clipboard on Travis")

0 commit comments

Comments
 (0)