From eae38a13f6aad64aa5f6504b4d699d7a5a53c61f Mon Sep 17 00:00:00 2001 From: Matthew Roeschke <10647082+mroeschke@users.noreply.github.com> Date: Fri, 5 Apr 2024 13:14:27 -0700 Subject: [PATCH] TST: Use temp_file fixture over ensure_clean --- pandas/tests/io/test_stata.py | 975 ++++++++++----------- pandas/tests/series/methods/test_to_csv.py | 167 ++-- 2 files changed, 562 insertions(+), 580 deletions(-) diff --git a/pandas/tests/io/test_stata.py b/pandas/tests/io/test_stata.py index 0cc8018ea6213..1bd71768d226e 100644 --- a/pandas/tests/io/test_stata.py +++ b/pandas/tests/io/test_stata.py @@ -63,16 +63,16 @@ def read_csv(self, file): return read_csv(file, parse_dates=True) @pytest.mark.parametrize("version", [114, 117, 118, 119, None]) - def test_read_empty_dta(self, version): + def test_read_empty_dta(self, version, temp_file): empty_ds = DataFrame(columns=["unit"]) # GH 7369, make sure can read a 0-obs dta file - with tm.ensure_clean() as path: - empty_ds.to_stata(path, write_index=False, version=version) - empty_ds2 = read_stata(path) - tm.assert_frame_equal(empty_ds, empty_ds2) + path = temp_file + empty_ds.to_stata(path, write_index=False, version=version) + empty_ds2 = read_stata(path) + tm.assert_frame_equal(empty_ds, empty_ds2) @pytest.mark.parametrize("version", [114, 117, 118, 119, None]) - def test_read_empty_dta_with_dtypes(self, version): + def test_read_empty_dta_with_dtypes(self, version, temp_file): # GH 46240 # Fixing above bug revealed that types are not correctly preserved when # writing empty DataFrames @@ -91,9 +91,9 @@ def test_read_empty_dta_with_dtypes(self, version): } ) # GH 7369, make sure can read a 0-obs dta file - with tm.ensure_clean() as path: - empty_df_typed.to_stata(path, write_index=False, version=version) - empty_reread = read_stata(path) + path = temp_file + empty_df_typed.to_stata(path, write_index=False, version=version) + empty_reread = read_stata(path) expected = empty_df_typed # No uint# support. Downcast since values in range for int# @@ -108,12 +108,12 @@ def test_read_empty_dta_with_dtypes(self, version): tm.assert_series_equal(expected.dtypes, empty_reread.dtypes) @pytest.mark.parametrize("version", [114, 117, 118, 119, None]) - def test_read_index_col_none(self, version): + def test_read_index_col_none(self, version, temp_file): df = DataFrame({"a": range(5), "b": ["b1", "b2", "b3", "b4", "b5"]}) # GH 7369, make sure can read a 0-obs dta file - with tm.ensure_clean() as path: - df.to_stata(path, write_index=False, version=version) - read_df = read_stata(path) + path = temp_file + df.to_stata(path, write_index=False, version=version) + read_df = read_stata(path) assert isinstance(read_df.index, pd.RangeIndex) expected = df @@ -324,39 +324,39 @@ def test_read_dta18(self, datapath): assert rdr.data_label == "This is a Ünicode data label" - def test_read_write_dta5(self): + def test_read_write_dta5(self, temp_file): original = DataFrame( [(np.nan, np.nan, np.nan, np.nan, np.nan)], columns=["float_miss", "double_miss", "byte_miss", "int_miss", "long_miss"], ) original.index.name = "index" - with tm.ensure_clean() as path: - original.to_stata(path, convert_dates=None) - written_and_read_again = self.read_dta(path) + path = temp_file + original.to_stata(path, convert_dates=None) + written_and_read_again = self.read_dta(path) expected = original expected.index = expected.index.astype(np.int32) tm.assert_frame_equal(written_and_read_again.set_index("index"), expected) - def test_write_dta6(self, datapath): + def test_write_dta6(self, datapath, temp_file): original = self.read_csv(datapath("io", "data", "stata", "stata3.csv")) original.index.name = "index" original.index = original.index.astype(np.int32) original["year"] = original["year"].astype(np.int32) original["quarter"] = original["quarter"].astype(np.int32) - with tm.ensure_clean() as path: - original.to_stata(path, convert_dates=None) - written_and_read_again = self.read_dta(path) - tm.assert_frame_equal( - written_and_read_again.set_index("index"), - original, - check_index_type=False, - ) + path = temp_file + original.to_stata(path, convert_dates=None) + written_and_read_again = self.read_dta(path) + tm.assert_frame_equal( + written_and_read_again.set_index("index"), + original, + check_index_type=False, + ) @pytest.mark.parametrize("version", [114, 117, 118, 119, None]) - def test_read_write_dta10(self, version): + def test_read_write_dta10(self, version, temp_file): original = DataFrame( data=[["string", "object", 1, 1.1, np.datetime64("2003-12-25")]], columns=["string", "object", "integer", "floating", "datetime"], @@ -366,9 +366,9 @@ def test_read_write_dta10(self, version): original.index = original.index.astype(np.int32) original["integer"] = original["integer"].astype(np.int32) - with tm.ensure_clean() as path: - original.to_stata(path, convert_dates={"datetime": "tc"}, version=version) - written_and_read_again = self.read_dta(path) + path = temp_file + original.to_stata(path, convert_dates={"datetime": "tc"}, version=version) + written_and_read_again = self.read_dta(path) expected = original[:] # "tc" convert_dates means we store in ms @@ -379,14 +379,14 @@ def test_read_write_dta10(self, version): expected, ) - def test_stata_doc_examples(self): - with tm.ensure_clean() as path: - df = DataFrame( - np.random.default_rng(2).standard_normal((10, 2)), columns=list("AB") - ) - df.to_stata(path) + def test_stata_doc_examples(self, temp_file): + path = temp_file + df = DataFrame( + np.random.default_rng(2).standard_normal((10, 2)), columns=list("AB") + ) + df.to_stata(path) - def test_write_preserves_original(self): + def test_write_preserves_original(self, temp_file): # 9795 df = DataFrame( @@ -394,12 +394,12 @@ def test_write_preserves_original(self): ) df.loc[2, "a":"c"] = np.nan df_copy = df.copy() - with tm.ensure_clean() as path: - df.to_stata(path, write_index=False) + path = temp_file + df.to_stata(path, write_index=False) tm.assert_frame_equal(df, df_copy) @pytest.mark.parametrize("version", [114, 117, 118, 119, None]) - def test_encoding(self, version, datapath): + def test_encoding(self, version, datapath, temp_file): # GH 4626, proper encoding handling raw = read_stata(datapath("io", "data", "stata", "stata1_encoding.dta")) encoded = read_stata(datapath("io", "data", "stata", "stata1_encoding.dta")) @@ -409,12 +409,12 @@ def test_encoding(self, version, datapath): assert result == expected assert isinstance(result, str) - with tm.ensure_clean() as path: - encoded.to_stata(path, write_index=False, version=version) - reread_encoded = read_stata(path) - tm.assert_frame_equal(encoded, reread_encoded) + path = temp_file + encoded.to_stata(path, write_index=False, version=version) + reread_encoded = read_stata(path) + tm.assert_frame_equal(encoded, reread_encoded) - def test_read_write_dta11(self): + def test_read_write_dta11(self, temp_file): original = DataFrame( [(1, 2, 3, 4)], columns=[ @@ -431,18 +431,18 @@ def test_read_write_dta11(self): formatted.index.name = "index" formatted = formatted.astype(np.int32) - with tm.ensure_clean() as path: - with tm.assert_produces_warning(InvalidColumnName): - original.to_stata(path, convert_dates=None) + path = temp_file + with tm.assert_produces_warning(InvalidColumnName): + original.to_stata(path, convert_dates=None) - written_and_read_again = self.read_dta(path) + written_and_read_again = self.read_dta(path) expected = formatted expected.index = expected.index.astype(np.int32) tm.assert_frame_equal(written_and_read_again.set_index("index"), expected) @pytest.mark.parametrize("version", [114, 117, 118, 119, None]) - def test_read_write_dta12(self, version): + def test_read_write_dta12(self, version, temp_file): original = DataFrame( [(1, 2, 3, 4, 5, 6)], columns=[ @@ -468,18 +468,18 @@ def test_read_write_dta12(self, version): formatted.index.name = "index" formatted = formatted.astype(np.int32) - with tm.ensure_clean() as path: - with tm.assert_produces_warning(InvalidColumnName): - original.to_stata(path, convert_dates=None, version=version) - # should get a warning for that format. + path = temp_file + with tm.assert_produces_warning(InvalidColumnName): + original.to_stata(path, convert_dates=None, version=version) + # should get a warning for that format. - written_and_read_again = self.read_dta(path) + written_and_read_again = self.read_dta(path) expected = formatted expected.index = expected.index.astype(np.int32) tm.assert_frame_equal(written_and_read_again.set_index("index"), expected) - def test_read_write_dta13(self): + def test_read_write_dta13(self, temp_file): s1 = Series(2**9, dtype=np.int16) s2 = Series(2**17, dtype=np.int32) s3 = Series(2**33, dtype=np.int64) @@ -489,9 +489,9 @@ def test_read_write_dta13(self): formatted = original formatted["int64"] = formatted["int64"].astype(np.float64) - with tm.ensure_clean() as path: - original.to_stata(path) - written_and_read_again = self.read_dta(path) + path = temp_file + original.to_stata(path) + written_and_read_again = self.read_dta(path) expected = formatted expected.index = expected.index.astype(np.int32) @@ -501,16 +501,18 @@ def test_read_write_dta13(self): @pytest.mark.parametrize( "file", ["stata5_113", "stata5_114", "stata5_115", "stata5_117"] ) - def test_read_write_reread_dta14(self, file, parsed_114, version, datapath): + def test_read_write_reread_dta14( + self, file, parsed_114, version, datapath, temp_file + ): file = datapath("io", "data", "stata", f"{file}.dta") parsed = self.read_dta(file) parsed.index.name = "index" tm.assert_frame_equal(parsed_114, parsed) - with tm.ensure_clean() as path: - parsed_114.to_stata(path, convert_dates={"date_td": "td"}, version=version) - written_and_read_again = self.read_dta(path) + path = temp_file + parsed_114.to_stata(path, convert_dates={"date_td": "td"}, version=version) + written_and_read_again = self.read_dta(path) expected = parsed_114.copy() tm.assert_frame_equal(written_and_read_again.set_index("index"), expected) @@ -536,38 +538,38 @@ def test_read_write_reread_dta15(self, file, datapath): tm.assert_frame_equal(expected, parsed) @pytest.mark.parametrize("version", [114, 117, 118, 119, None]) - def test_timestamp_and_label(self, version): + def test_timestamp_and_label(self, version, temp_file): original = DataFrame([(1,)], columns=["variable"]) time_stamp = datetime(2000, 2, 29, 14, 21) data_label = "This is a data file." - with tm.ensure_clean() as path: - original.to_stata( - path, time_stamp=time_stamp, data_label=data_label, version=version - ) + path = temp_file + original.to_stata( + path, time_stamp=time_stamp, data_label=data_label, version=version + ) - with StataReader(path) as reader: - assert reader.time_stamp == "29 Feb 2000 14:21" - assert reader.data_label == data_label + with StataReader(path) as reader: + assert reader.time_stamp == "29 Feb 2000 14:21" + assert reader.data_label == data_label @pytest.mark.parametrize("version", [114, 117, 118, 119, None]) - def test_invalid_timestamp(self, version): + def test_invalid_timestamp(self, version, temp_file): original = DataFrame([(1,)], columns=["variable"]) time_stamp = "01 Jan 2000, 00:00:00" - with tm.ensure_clean() as path: - msg = "time_stamp should be datetime type" - with pytest.raises(ValueError, match=msg): - original.to_stata(path, time_stamp=time_stamp, version=version) - assert not os.path.isfile(path) + path = temp_file + msg = "time_stamp should be datetime type" + with pytest.raises(ValueError, match=msg): + original.to_stata(path, time_stamp=time_stamp, version=version) + assert not os.path.isfile(path) - def test_numeric_column_names(self): + def test_numeric_column_names(self, temp_file): original = DataFrame(np.reshape(np.arange(25.0), (5, 5))) original.index.name = "index" - with tm.ensure_clean() as path: - # should get a warning for that format. - with tm.assert_produces_warning(InvalidColumnName): - original.to_stata(path) + path = temp_file + # should get a warning for that format. + with tm.assert_produces_warning(InvalidColumnName): + original.to_stata(path) - written_and_read_again = self.read_dta(path) + written_and_read_again = self.read_dta(path) written_and_read_again = written_and_read_again.set_index("index") columns = list(written_and_read_again.columns) @@ -578,7 +580,7 @@ def test_numeric_column_names(self): tm.assert_frame_equal(expected, written_and_read_again) @pytest.mark.parametrize("version", [114, 117, 118, 119, None]) - def test_nan_to_missing_value(self, version): + def test_nan_to_missing_value(self, version, temp_file): s1 = Series(np.arange(4.0), dtype=np.float32) s2 = Series(np.arange(4.0), dtype=np.float64) s1[::2] = np.nan @@ -586,48 +588,48 @@ def test_nan_to_missing_value(self, version): original = DataFrame({"s1": s1, "s2": s2}) original.index.name = "index" - with tm.ensure_clean() as path: - original.to_stata(path, version=version) - written_and_read_again = self.read_dta(path) + path = temp_file + original.to_stata(path, version=version) + written_and_read_again = self.read_dta(path) written_and_read_again = written_and_read_again.set_index("index") expected = original tm.assert_frame_equal(written_and_read_again, expected) - def test_no_index(self): + def test_no_index(self, temp_file): columns = ["x", "y"] original = DataFrame(np.reshape(np.arange(10.0), (5, 2)), columns=columns) original.index.name = "index_not_written" - with tm.ensure_clean() as path: - original.to_stata(path, write_index=False) - written_and_read_again = self.read_dta(path) - with pytest.raises(KeyError, match=original.index.name): - written_and_read_again["index_not_written"] + path = temp_file + original.to_stata(path, write_index=False) + written_and_read_again = self.read_dta(path) + with pytest.raises(KeyError, match=original.index.name): + written_and_read_again["index_not_written"] - def test_string_no_dates(self): + def test_string_no_dates(self, temp_file): s1 = Series(["a", "A longer string"]) s2 = Series([1.0, 2.0], dtype=np.float64) original = DataFrame({"s1": s1, "s2": s2}) original.index.name = "index" - with tm.ensure_clean() as path: - original.to_stata(path) - written_and_read_again = self.read_dta(path) + path = temp_file + original.to_stata(path) + written_and_read_again = self.read_dta(path) expected = original tm.assert_frame_equal(written_and_read_again.set_index("index"), expected) - def test_large_value_conversion(self): + def test_large_value_conversion(self, temp_file): s0 = Series([1, 99], dtype=np.int8) s1 = Series([1, 127], dtype=np.int8) s2 = Series([1, 2**15 - 1], dtype=np.int16) s3 = Series([1, 2**63 - 1], dtype=np.int64) original = DataFrame({"s0": s0, "s1": s1, "s2": s2, "s3": s3}) original.index.name = "index" - with tm.ensure_clean() as path: - with tm.assert_produces_warning(PossiblePrecisionLoss): - original.to_stata(path) + path = temp_file + with tm.assert_produces_warning(PossiblePrecisionLoss): + original.to_stata(path) - written_and_read_again = self.read_dta(path) + written_and_read_again = self.read_dta(path) modified = original modified["s1"] = Series(modified["s1"], dtype=np.int16) @@ -635,14 +637,14 @@ def test_large_value_conversion(self): modified["s3"] = Series(modified["s3"], dtype=np.float64) tm.assert_frame_equal(written_and_read_again.set_index("index"), modified) - def test_dates_invalid_column(self): + def test_dates_invalid_column(self, temp_file): original = DataFrame([datetime(2006, 11, 19, 23, 13, 20)]) original.index.name = "index" - with tm.ensure_clean() as path: - with tm.assert_produces_warning(InvalidColumnName): - original.to_stata(path, convert_dates={0: "tc"}) + path = temp_file + with tm.assert_produces_warning(InvalidColumnName): + original.to_stata(path, convert_dates={0: "tc"}) - written_and_read_again = self.read_dta(path) + written_and_read_again = self.read_dta(path) expected = original.copy() expected.columns = ["_0"] @@ -673,7 +675,7 @@ def test_value_labels_old_format(self, datapath): with StataReader(dpath) as reader: assert reader.value_labels() == {} - def test_date_export_formats(self): + def test_date_export_formats(self, temp_file): columns = ["tc", "td", "tw", "tm", "tq", "th", "ty"] conversions = {c: c for c in columns} data = [datetime(2006, 11, 20, 23, 13, 20)] * len(columns) @@ -697,13 +699,13 @@ def test_date_export_formats(self): ) expected["tc"] = expected["tc"].astype("M8[ms]") - with tm.ensure_clean() as path: - original.to_stata(path, convert_dates=conversions) - written_and_read_again = self.read_dta(path) + path = temp_file + original.to_stata(path, convert_dates=conversions) + written_and_read_again = self.read_dta(path) tm.assert_frame_equal(written_and_read_again.set_index("index"), expected) - def test_write_missing_strings(self): + def test_write_missing_strings(self, temp_file): original = DataFrame([["1"], [None]], columns=["foo"]) expected = DataFrame( @@ -712,15 +714,15 @@ def test_write_missing_strings(self): columns=["foo"], ) - with tm.ensure_clean() as path: - original.to_stata(path) - written_and_read_again = self.read_dta(path) + path = temp_file + original.to_stata(path) + written_and_read_again = self.read_dta(path) tm.assert_frame_equal(written_and_read_again.set_index("index"), expected) @pytest.mark.parametrize("version", [114, 117, 118, 119, None]) @pytest.mark.parametrize("byteorder", [">", "<"]) - def test_bool_uint(self, byteorder, version): + def test_bool_uint(self, byteorder, version, temp_file): s0 = Series([0, 1, True], dtype=np.bool_) s1 = Series([0, 1, 100], dtype=np.uint8) s2 = Series([0, 1, 255], dtype=np.uint8) @@ -734,9 +736,9 @@ def test_bool_uint(self, byteorder, version): ) original.index.name = "index" - with tm.ensure_clean() as path: - original.to_stata(path, byteorder=byteorder, version=version) - written_and_read_again = self.read_dta(path) + path = temp_file + original.to_stata(path, byteorder=byteorder, version=version) + written_and_read_again = self.read_dta(path) written_and_read_again = written_and_read_again.set_index("index") @@ -768,7 +770,7 @@ def test_variable_labels(self, datapath): assert k in keys assert v in labels - def test_minimal_size_col(self): + def test_minimal_size_col(self, temp_file): str_lens = (1, 100, 244) s = {} for str_len in str_lens: @@ -776,16 +778,16 @@ def test_minimal_size_col(self): ["a" * str_len, "b" * str_len, "c" * str_len] ) original = DataFrame(s) - with tm.ensure_clean() as path: - original.to_stata(path, write_index=False) + path = temp_file + original.to_stata(path, write_index=False) - with StataReader(path) as sr: - sr._ensure_open() # The `_*list` variables are initialized here - for variable, fmt, typ in zip(sr._varlist, sr._fmtlist, sr._typlist): - assert int(variable[1:]) == int(fmt[1:-1]) - assert int(variable[1:]) == typ + with StataReader(path) as sr: + sr._ensure_open() # The `_*list` variables are initialized here + for variable, fmt, typ in zip(sr._varlist, sr._fmtlist, sr._typlist): + assert int(variable[1:]) == int(fmt[1:-1]) + assert int(variable[1:]) == typ - def test_excessively_long_string(self): + def test_excessively_long_string(self, temp_file): str_lens = (1, 244, 500) s = {} for str_len in str_lens: @@ -800,16 +802,16 @@ def test_excessively_long_string(self): r"the newer \(Stata 13 and later\) format\." ) with pytest.raises(ValueError, match=msg): - with tm.ensure_clean() as path: - original.to_stata(path) + path = temp_file + original.to_stata(path) - def test_missing_value_generator(self): + def test_missing_value_generator(self, temp_file): types = ("b", "h", "l") df = DataFrame([[0.0]], columns=["float_"]) - with tm.ensure_clean() as path: - df.to_stata(path) - with StataReader(path) as rdr: - valid_range = rdr.VALID_RANGE + path = temp_file + df.to_stata(path) + with StataReader(path) as rdr: + valid_range = rdr.VALID_RANGE expected_values = ["." + chr(97 + i) for i in range(26)] expected_values.insert(0, ".") for t in types: @@ -850,7 +852,7 @@ def test_missing_value_conversion(self, file, datapath): ) tm.assert_frame_equal(parsed, expected) - def test_big_dates(self, datapath): + def test_big_dates(self, datapath, temp_file): yr = [1960, 2000, 9999, 100, 2262, 1677] mo = [1, 1, 12, 1, 4, 9] dd = [1, 1, 31, 1, 22, 23] @@ -906,10 +908,10 @@ def test_big_dates(self, datapath): date_conversion = {c: c[-2:] for c in columns} # {c : c[-2:] for c in columns} - with tm.ensure_clean() as path: - expected.index.name = "index" - expected.to_stata(path, convert_dates=date_conversion) - written_and_read_again = self.read_dta(path) + path = temp_file + expected.index.name = "index" + expected.to_stata(path, convert_dates=date_conversion) + written_and_read_again = self.read_dta(path) tm.assert_frame_equal( written_and_read_again.set_index("index"), @@ -994,7 +996,7 @@ def test_drop_column(self, datapath): @pytest.mark.filterwarnings( "ignore:\\nStata value:pandas.io.stata.ValueLabelTypeMismatch" ) - def test_categorical_writing(self, version): + def test_categorical_writing(self, version, temp_file): original = DataFrame.from_records( [ ["one", "ten", "one", "one", "one", 1], @@ -1017,9 +1019,9 @@ def test_categorical_writing(self, version): "unlabeled", ], ) - with tm.ensure_clean() as path: - original.astype("category").to_stata(path, version=version) - written_and_read_again = self.read_dta(path) + path = temp_file + original.astype("category").to_stata(path, version=version) + written_and_read_again = self.read_dta(path) res = written_and_read_again.set_index("index") @@ -1042,7 +1044,7 @@ def test_categorical_writing(self, version): tm.assert_frame_equal(res, expected) - def test_categorical_warnings_and_errors(self): + def test_categorical_warnings_and_errors(self, temp_file): # Warning for non-string labels # Error for labels too long original = DataFrame.from_records( @@ -1051,13 +1053,13 @@ def test_categorical_warnings_and_errors(self): ) original = original.astype("category") - with tm.ensure_clean() as path: - msg = ( - "Stata value labels for a single variable must have " - r"a combined length less than 32,000 characters\." - ) - with pytest.raises(ValueError, match=msg): - original.to_stata(path) + path = temp_file + msg = ( + "Stata value labels for a single variable must have " + r"a combined length less than 32,000 characters\." + ) + with pytest.raises(ValueError, match=msg): + original.to_stata(path) original = DataFrame.from_records( [["a"], ["b"], ["c"], ["d"], [1]], columns=["Too_long"] @@ -1068,7 +1070,7 @@ def test_categorical_warnings_and_errors(self): # should get a warning for mixed content @pytest.mark.parametrize("version", [114, 117, 118, 119, None]) - def test_categorical_with_stata_missing_values(self, version): + def test_categorical_with_stata_missing_values(self, version, temp_file): values = [["a" + str(i)] for i in range(120)] values.append([np.nan]) original = DataFrame.from_records(values, columns=["many_labels"]) @@ -1076,9 +1078,9 @@ def test_categorical_with_stata_missing_values(self, version): [original[col].astype("category") for col in original], axis=1 ) original.index.name = "index" - with tm.ensure_clean() as path: - original.to_stata(path, version=version) - written_and_read_again = self.read_dta(path) + path = temp_file + original.to_stata(path, version=version) + written_and_read_again = self.read_dta(path) res = written_and_read_again.set_index("index") @@ -1313,54 +1315,50 @@ def test_read_chunks_columns(self, datapath): pos += chunksize @pytest.mark.parametrize("version", [114, 117, 118, 119, None]) - def test_write_variable_labels(self, version, mixed_frame): + def test_write_variable_labels(self, version, mixed_frame, temp_file): # GH 13631, add support for writing variable labels mixed_frame.index.name = "index" variable_labels = {"a": "City Rank", "b": "City Exponent", "c": "City"} - with tm.ensure_clean() as path: - mixed_frame.to_stata(path, variable_labels=variable_labels, version=version) - with StataReader(path) as sr: - read_labels = sr.variable_labels() - expected_labels = { - "index": "", - "a": "City Rank", - "b": "City Exponent", - "c": "City", - } - assert read_labels == expected_labels + path = temp_file + mixed_frame.to_stata(path, variable_labels=variable_labels, version=version) + with StataReader(path) as sr: + read_labels = sr.variable_labels() + expected_labels = { + "index": "", + "a": "City Rank", + "b": "City Exponent", + "c": "City", + } + assert read_labels == expected_labels variable_labels["index"] = "The Index" - with tm.ensure_clean() as path: - mixed_frame.to_stata(path, variable_labels=variable_labels, version=version) - with StataReader(path) as sr: - read_labels = sr.variable_labels() - assert read_labels == variable_labels + path = temp_file + mixed_frame.to_stata(path, variable_labels=variable_labels, version=version) + with StataReader(path) as sr: + read_labels = sr.variable_labels() + assert read_labels == variable_labels @pytest.mark.parametrize("version", [114, 117, 118, 119, None]) - def test_invalid_variable_labels(self, version, mixed_frame): + def test_invalid_variable_labels(self, version, mixed_frame, temp_file): mixed_frame.index.name = "index" variable_labels = {"a": "very long" * 10, "b": "City Exponent", "c": "City"} - with tm.ensure_clean() as path: - msg = "Variable labels must be 80 characters or fewer" - with pytest.raises(ValueError, match=msg): - mixed_frame.to_stata( - path, variable_labels=variable_labels, version=version - ) + path = temp_file + msg = "Variable labels must be 80 characters or fewer" + with pytest.raises(ValueError, match=msg): + mixed_frame.to_stata(path, variable_labels=variable_labels, version=version) @pytest.mark.parametrize("version", [114, 117]) - def test_invalid_variable_label_encoding(self, version, mixed_frame): + def test_invalid_variable_label_encoding(self, version, mixed_frame, temp_file): mixed_frame.index.name = "index" variable_labels = {"a": "very long" * 10, "b": "City Exponent", "c": "City"} variable_labels["a"] = "invalid character Œ" - with tm.ensure_clean() as path: - with pytest.raises( - ValueError, match="Variable labels must contain only characters" - ): - mixed_frame.to_stata( - path, variable_labels=variable_labels, version=version - ) + path = temp_file + with pytest.raises( + ValueError, match="Variable labels must contain only characters" + ): + mixed_frame.to_stata(path, variable_labels=variable_labels, version=version) - def test_write_variable_label_errors(self, mixed_frame): + def test_write_variable_label_errors(self, mixed_frame, temp_file): values = ["\u03a1", "\u0391", "\u039d", "\u0394", "\u0391", "\u03a3"] variable_labels_utf8 = { @@ -1374,8 +1372,8 @@ def test_write_variable_label_errors(self, mixed_frame): "encoded in Latin-1" ) with pytest.raises(ValueError, match=msg): - with tm.ensure_clean() as path: - mixed_frame.to_stata(path, variable_labels=variable_labels_utf8) + path = temp_file + mixed_frame.to_stata(path, variable_labels=variable_labels_utf8) variable_labels_long = { "a": "City Rank", @@ -1387,10 +1385,10 @@ def test_write_variable_label_errors(self, mixed_frame): msg = "Variable labels must be 80 characters or fewer" with pytest.raises(ValueError, match=msg): - with tm.ensure_clean() as path: - mixed_frame.to_stata(path, variable_labels=variable_labels_long) + path = temp_file + mixed_frame.to_stata(path, variable_labels=variable_labels_long) - def test_default_date_conversion(self): + def test_default_date_conversion(self, temp_file): # GH 12259 dates = [ dt.datetime(1999, 12, 31, 12, 12, 12, 12000), @@ -1409,29 +1407,29 @@ def test_default_date_conversion(self): # "tc" for convert_dates below stores with "ms" resolution expected["dates"] = expected["dates"].astype("M8[ms]") - with tm.ensure_clean() as path: - original.to_stata(path, write_index=False) - reread = read_stata(path, convert_dates=True) - tm.assert_frame_equal(expected, reread) + path = temp_file + original.to_stata(path, write_index=False) + reread = read_stata(path, convert_dates=True) + tm.assert_frame_equal(expected, reread) - original.to_stata(path, write_index=False, convert_dates={"dates": "tc"}) - direct = read_stata(path, convert_dates=True) - tm.assert_frame_equal(reread, direct) + original.to_stata(path, write_index=False, convert_dates={"dates": "tc"}) + direct = read_stata(path, convert_dates=True) + tm.assert_frame_equal(reread, direct) - dates_idx = original.columns.tolist().index("dates") - original.to_stata(path, write_index=False, convert_dates={dates_idx: "tc"}) - direct = read_stata(path, convert_dates=True) - tm.assert_frame_equal(reread, direct) + dates_idx = original.columns.tolist().index("dates") + original.to_stata(path, write_index=False, convert_dates={dates_idx: "tc"}) + direct = read_stata(path, convert_dates=True) + tm.assert_frame_equal(reread, direct) - def test_unsupported_type(self): + def test_unsupported_type(self, temp_file): original = DataFrame({"a": [1 + 2j, 2 + 4j]}) msg = "Data type complex128 not supported" with pytest.raises(NotImplementedError, match=msg): - with tm.ensure_clean() as path: - original.to_stata(path) + path = temp_file + original.to_stata(path) - def test_unsupported_datetype(self): + def test_unsupported_datetype(self, temp_file): dates = [ dt.datetime(1999, 12, 31, 12, 12, 12, 12000), dt.datetime(2012, 12, 21, 12, 21, 12, 21000), @@ -1447,8 +1445,8 @@ def test_unsupported_datetype(self): msg = "Format %tC not implemented" with pytest.raises(NotImplementedError, match=msg): - with tm.ensure_clean() as path: - original.to_stata(path, convert_dates={"dates": "tC"}) + path = temp_file + original.to_stata(path, convert_dates={"dates": "tC"}) dates = pd.date_range("1-1-1990", periods=3, tz="Asia/Hong_Kong") original = DataFrame( @@ -1459,8 +1457,8 @@ def test_unsupported_datetype(self): } ) with pytest.raises(NotImplementedError, match="Data type datetime64"): - with tm.ensure_clean() as path: - original.to_stata(path) + path = temp_file + original.to_stata(path) def test_repeated_column_labels(self, datapath): # GH 13923, 25772 @@ -1496,7 +1494,7 @@ def test_stata_111(self, datapath): original = original[["y", "x", "w", "z"]] tm.assert_frame_equal(original, df) - def test_out_of_range_double(self): + def test_out_of_range_double(self, temp_file): # GH 14618 df = DataFrame( { @@ -1509,10 +1507,10 @@ def test_out_of_range_double(self): r"supported by Stata \(.+\)" ) with pytest.raises(ValueError, match=msg): - with tm.ensure_clean() as path: - df.to_stata(path) + path = temp_file + df.to_stata(path) - def test_out_of_range_float(self): + def test_out_of_range_float(self, temp_file): original = DataFrame( { "ColumnOk": [ @@ -1531,16 +1529,16 @@ def test_out_of_range_float(self): for col in original: original[col] = original[col].astype(np.float32) - with tm.ensure_clean() as path: - original.to_stata(path) - reread = read_stata(path) + path = temp_file + original.to_stata(path) + reread = read_stata(path) original["ColumnTooBig"] = original["ColumnTooBig"].astype(np.float64) expected = original tm.assert_frame_equal(reread.set_index("index"), expected) @pytest.mark.parametrize("infval", [np.inf, -np.inf]) - def test_inf(self, infval): + def test_inf(self, infval, temp_file): # GH 45350 df = DataFrame({"WithoutInf": [0.0, 1.0], "WithInf": [2.0, infval]}) msg = ( @@ -1548,8 +1546,8 @@ def test_inf(self, infval): "which is outside the range supported by Stata." ) with pytest.raises(ValueError, match=msg): - with tm.ensure_clean() as path: - df.to_stata(path) + path = temp_file + df.to_stata(path) def test_path_pathlib(self): df = DataFrame( @@ -1563,19 +1561,19 @@ def test_path_pathlib(self): tm.assert_frame_equal(df, result) @pytest.mark.parametrize("write_index", [True, False]) - def test_value_labels_iterator(self, write_index): + def test_value_labels_iterator(self, write_index, temp_file): # GH 16923 d = {"A": ["B", "E", "C", "A", "E"]} df = DataFrame(data=d) df["A"] = df["A"].astype("category") - with tm.ensure_clean() as path: - df.to_stata(path, write_index=write_index) + path = temp_file + df.to_stata(path, write_index=write_index) - with read_stata(path, iterator=True) as dta_iter: - value_labels = dta_iter.value_labels() + with read_stata(path, iterator=True) as dta_iter: + value_labels = dta_iter.value_labels() assert value_labels == {"A": {0: "A", 1: "B", 2: "C", 3: "E"}} - def test_set_index(self): + def test_set_index(self, temp_file): # GH 17328 df = DataFrame( 1.1 * np.arange(120).reshape((30, 4)), @@ -1583,9 +1581,9 @@ def test_set_index(self): index=pd.Index([f"i-{i}" for i in range(30)], dtype=object), ) df.index.name = "index" - with tm.ensure_clean() as path: - df.to_stata(path) - reread = read_stata(path, index_col="index") + path = temp_file + df.to_stata(path) + reread = read_stata(path, index_col="index") tm.assert_frame_equal(df, reread) @pytest.mark.parametrize( @@ -1608,7 +1606,7 @@ def test_date_parsing_ignores_format_details(self, column, datapath): formatted = df.loc[0, column + "_fmt"] assert unformatted == formatted - def test_writer_117(self): + def test_writer_117(self, temp_file): original = DataFrame( data=[ [ @@ -1662,14 +1660,14 @@ def test_writer_117(self): original["float32"] = Series(original["float32"], dtype=np.float32) original.index.name = "index" copy = original.copy() - with tm.ensure_clean() as path: - original.to_stata( - path, - convert_dates={"datetime": "tc"}, - convert_strl=["forced_strl"], - version=117, - ) - written_and_read_again = self.read_dta(path) + path = temp_file + original.to_stata( + path, + convert_dates={"datetime": "tc"}, + convert_strl=["forced_strl"], + version=117, + ) + written_and_read_again = self.read_dta(path) expected = original[:] # "tc" for convert_dates means we store with "ms" resolution @@ -1681,7 +1679,7 @@ def test_writer_117(self): ) tm.assert_frame_equal(original, copy) - def test_convert_strl_name_swap(self): + def test_convert_strl_name_swap(self, temp_file): original = DataFrame( [["a" * 3000, "A", "apple"], ["b" * 1000, "B", "banana"]], columns=["long1" * 10, "long", 1], @@ -1689,14 +1687,14 @@ def test_convert_strl_name_swap(self): original.index.name = "index" with tm.assert_produces_warning(InvalidColumnName): - with tm.ensure_clean() as path: - original.to_stata(path, convert_strl=["long", 1], version=117) - reread = self.read_dta(path) - reread = reread.set_index("index") - reread.columns = original.columns - tm.assert_frame_equal(reread, original, check_index_type=False) - - def test_invalid_date_conversion(self): + path = temp_file + original.to_stata(path, convert_strl=["long", 1], version=117) + reread = self.read_dta(path) + reread = reread.set_index("index") + reread.columns = original.columns + tm.assert_frame_equal(reread, original, check_index_type=False) + + def test_invalid_date_conversion(self, temp_file): # GH 12259 dates = [ dt.datetime(1999, 12, 31, 12, 12, 12, 12000), @@ -1711,13 +1709,13 @@ def test_invalid_date_conversion(self): } ) - with tm.ensure_clean() as path: - msg = "convert_dates key must be a column or an integer" - with pytest.raises(ValueError, match=msg): - original.to_stata(path, convert_dates={"wrong_name": "tc"}) + path = temp_file + msg = "convert_dates key must be a column or an integer" + with pytest.raises(ValueError, match=msg): + original.to_stata(path, convert_dates={"wrong_name": "tc"}) @pytest.mark.parametrize("version", [114, 117, 118, 119, None]) - def test_nonfile_writing(self, version): + def test_nonfile_writing(self, version, temp_file): # GH 21041 bio = io.BytesIO() df = DataFrame( @@ -1726,15 +1724,15 @@ def test_nonfile_writing(self, version): index=pd.Index([f"i-{i}" for i in range(30)], dtype=object), ) df.index.name = "index" - with tm.ensure_clean() as path: - df.to_stata(bio, version=version) - bio.seek(0) - with open(path, "wb") as dta: - dta.write(bio.read()) - reread = read_stata(path, index_col="index") + path = temp_file + df.to_stata(bio, version=version) + bio.seek(0) + with open(path, "wb") as dta: + dta.write(bio.read()) + reread = read_stata(path, index_col="index") tm.assert_frame_equal(df, reread) - def test_gzip_writing(self): + def test_gzip_writing(self, temp_file): # writing version 117 requires seek and cannot be used with gzip df = DataFrame( 1.1 * np.arange(120).reshape((30, 4)), @@ -1742,11 +1740,11 @@ def test_gzip_writing(self): index=pd.Index([f"i-{i}" for i in range(30)], dtype=object), ) df.index.name = "index" - with tm.ensure_clean() as path: - with gzip.GzipFile(path, "wb") as gz: - df.to_stata(gz, version=114) - with gzip.GzipFile(path, "rb") as gz: - reread = read_stata(gz, index_col="index") + path = temp_file + with gzip.GzipFile(path, "wb") as gz: + df.to_stata(gz, version=114) + with gzip.GzipFile(path, "rb") as gz: + reread = read_stata(gz, index_col="index") tm.assert_frame_equal(df, reread) def test_unicode_dta_118(self, datapath): @@ -1766,70 +1764,65 @@ def test_unicode_dta_118(self, datapath): tm.assert_frame_equal(unicode_df, expected) - def test_mixed_string_strl(self): + def test_mixed_string_strl(self, temp_file): # GH 23633 output = [{"mixed": "string" * 500, "number": 0}, {"mixed": None, "number": 1}] output = DataFrame(output) output.number = output.number.astype("int32") - with tm.ensure_clean() as path: - output.to_stata(path, write_index=False, version=117) - reread = read_stata(path) - expected = output.fillna("") - tm.assert_frame_equal(reread, expected) + path = temp_file + output.to_stata(path, write_index=False, version=117) + reread = read_stata(path) + expected = output.fillna("") + tm.assert_frame_equal(reread, expected) - # Check strl supports all None (null) - output["mixed"] = None - output.to_stata( - path, write_index=False, convert_strl=["mixed"], version=117 - ) - reread = read_stata(path) - expected = output.fillna("") - tm.assert_frame_equal(reread, expected) + # Check strl supports all None (null) + output["mixed"] = None + output.to_stata(path, write_index=False, convert_strl=["mixed"], version=117) + reread = read_stata(path) + expected = output.fillna("") + tm.assert_frame_equal(reread, expected) @pytest.mark.parametrize("version", [114, 117, 118, 119, None]) - def test_all_none_exception(self, version): + def test_all_none_exception(self, version, temp_file): output = [{"none": "none", "number": 0}, {"none": None, "number": 1}] output = DataFrame(output) output["none"] = None - with tm.ensure_clean() as path: - with pytest.raises(ValueError, match="Column `none` cannot be exported"): - output.to_stata(path, version=version) + with pytest.raises(ValueError, match="Column `none` cannot be exported"): + output.to_stata(temp_file, version=version) @pytest.mark.parametrize("version", [114, 117, 118, 119, None]) - def test_invalid_file_not_written(self, version): + def test_invalid_file_not_written(self, version, temp_file): content = "Here is one __�__ Another one __·__ Another one __½__" df = DataFrame([content], columns=["invalid"]) - with tm.ensure_clean() as path: - msg1 = ( - r"'latin-1' codec can't encode character '\\ufffd' " - r"in position 14: ordinal not in range\(256\)" - ) - msg2 = ( - "'ascii' codec can't decode byte 0xef in position 14: " - r"ordinal not in range\(128\)" - ) - with pytest.raises(UnicodeEncodeError, match=f"{msg1}|{msg2}"): - df.to_stata(path) + msg1 = ( + r"'latin-1' codec can't encode character '\\ufffd' " + r"in position 14: ordinal not in range\(256\)" + ) + msg2 = ( + "'ascii' codec can't decode byte 0xef in position 14: " + r"ordinal not in range\(128\)" + ) + with pytest.raises(UnicodeEncodeError, match=f"{msg1}|{msg2}"): + df.to_stata(temp_file) - def test_strl_latin1(self): + def test_strl_latin1(self, temp_file): # GH 23573, correct GSO data to reflect correct size output = DataFrame( [["pandas"] * 2, ["þâÑÐŧ"] * 2], columns=["var_str", "var_strl"] ) - with tm.ensure_clean() as path: - output.to_stata(path, version=117, convert_strl=["var_strl"]) - with open(path, "rb") as reread: - content = reread.read() - expected = "þâÑÐŧ" - assert expected.encode("latin-1") in content - assert expected.encode("utf-8") in content - gsos = content.split(b"strls")[1][1:-2] - for gso in gsos.split(b"GSO")[1:]: - val = gso.split(b"\x00")[-2] - size = gso[gso.find(b"\x82") + 1] - assert len(val) == size - 1 + output.to_stata(temp_file, version=117, convert_strl=["var_strl"]) + with open(temp_file, "rb") as reread: + content = reread.read() + expected = "þâÑÐŧ" + assert expected.encode("latin-1") in content + assert expected.encode("utf-8") in content + gsos = content.split(b"strls")[1][1:-2] + for gso in gsos.split(b"GSO")[1:]: + val = gso.split(b"\x00")[-2] + size = gso[gso.find(b"\x82") + 1] + assert len(val) == size - 1 def test_encoding_latin1_118(self, datapath): # GH 25960 @@ -1864,7 +1857,7 @@ def test_stata_119(self, datapath): assert reader._nvar == 32999 @pytest.mark.parametrize("version", [118, 119, None]) - def test_utf8_writer(self, version): + def test_utf8_writer(self, version, temp_file): cat = pd.Categorical(["a", "β", "ĉ"], ordered=True) data = DataFrame( [ @@ -1885,48 +1878,45 @@ def test_utf8_writer(self, version): data_label = "ᴅaᵀa-label" value_labels = {"β": {1: "label", 2: "æøå", 3: "ŋot valid latin-1"}} data["β"] = data["β"].astype(np.int32) - with tm.ensure_clean() as path: - writer = StataWriterUTF8( - path, - data, - data_label=data_label, - convert_strl=["strls"], - variable_labels=variable_labels, - write_index=False, - version=version, - value_labels=value_labels, - ) - writer.write_file() - reread_encoded = read_stata(path) - # Missing is intentionally converted to empty strl - data["strls"] = data["strls"].fillna("") - # Variable with value labels is reread as categorical - data["β"] = ( - data["β"].replace(value_labels["β"]).astype("category").cat.as_ordered() - ) - tm.assert_frame_equal(data, reread_encoded) - with StataReader(path) as reader: - assert reader.data_label == data_label - assert reader.variable_labels() == variable_labels + writer = StataWriterUTF8( + temp_file, + data, + data_label=data_label, + convert_strl=["strls"], + variable_labels=variable_labels, + write_index=False, + version=version, + value_labels=value_labels, + ) + writer.write_file() + reread_encoded = read_stata(temp_file) + # Missing is intentionally converted to empty strl + data["strls"] = data["strls"].fillna("") + # Variable with value labels is reread as categorical + data["β"] = ( + data["β"].replace(value_labels["β"]).astype("category").cat.as_ordered() + ) + tm.assert_frame_equal(data, reread_encoded) + with StataReader(temp_file) as reader: + assert reader.data_label == data_label + assert reader.variable_labels() == variable_labels - data.to_stata(path, version=version, write_index=False) - reread_to_stata = read_stata(path) - tm.assert_frame_equal(data, reread_to_stata) + data.to_stata(temp_file, version=version, write_index=False) + reread_to_stata = read_stata(temp_file) + tm.assert_frame_equal(data, reread_to_stata) - def test_writer_118_exceptions(self): + def test_writer_118_exceptions(self, temp_file): df = DataFrame(np.zeros((1, 33000), dtype=np.int8)) - with tm.ensure_clean() as path: - with pytest.raises(ValueError, match="version must be either 118 or 119."): - StataWriterUTF8(path, df, version=117) - with tm.ensure_clean() as path: - with pytest.raises(ValueError, match="You must use version 119"): - StataWriterUTF8(path, df, version=118) + with pytest.raises(ValueError, match="version must be either 118 or 119."): + StataWriterUTF8(temp_file, df, version=117) + with pytest.raises(ValueError, match="You must use version 119"): + StataWriterUTF8(temp_file, df, version=118) @pytest.mark.parametrize( "dtype_backend", ["numpy_nullable", pytest.param("pyarrow", marks=td.skip_if_no("pyarrow"))], ) - def test_read_write_ea_dtypes(self, dtype_backend): + def test_read_write_ea_dtypes(self, dtype_backend, temp_file): df = DataFrame( { "a": [1, 2, None], @@ -1940,9 +1930,8 @@ def test_read_write_ea_dtypes(self, dtype_backend): df = df.convert_dtypes(dtype_backend=dtype_backend) df.to_stata("test_stata.dta", version=118) - with tm.ensure_clean() as path: - df.to_stata(path) - written_and_read_again = self.read_dta(path) + df.to_stata(temp_file) + written_and_read_again = self.read_dta(temp_file) expected = DataFrame( { @@ -1995,7 +1984,9 @@ def test_direct_read(datapath, monkeypatch): @pytest.mark.parametrize("version", [114, 117, 118, 119, None]) @pytest.mark.parametrize("use_dict", [True, False]) @pytest.mark.parametrize("infer", [True, False]) -def test_compression(compression, version, use_dict, infer, compression_to_extension): +def test_compression( + compression, version, use_dict, infer, compression_to_extension, tmp_path +): file_name = "dta_inferred_compression.dta" if compression: if use_dict: @@ -2013,31 +2004,32 @@ def test_compression(compression, version, use_dict, infer, compression_to_exten np.random.default_rng(2).standard_normal((10, 2)), columns=list("AB") ) df.index.name = "index" - with tm.ensure_clean(file_name) as path: - df.to_stata(path, version=version, compression=compression_arg) - if compression == "gzip": - with gzip.open(path, "rb") as comp: - fp = io.BytesIO(comp.read()) - elif compression == "zip": - with zipfile.ZipFile(path, "r") as comp: - fp = io.BytesIO(comp.read(comp.filelist[0])) - elif compression == "tar": - with tarfile.open(path) as tar: - fp = io.BytesIO(tar.extractfile(tar.getnames()[0]).read()) - elif compression == "bz2": - with bz2.open(path, "rb") as comp: - fp = io.BytesIO(comp.read()) - elif compression == "zstd": - zstd = pytest.importorskip("zstandard") - with zstd.open(path, "rb") as comp: - fp = io.BytesIO(comp.read()) - elif compression == "xz": - lzma = pytest.importorskip("lzma") - with lzma.open(path, "rb") as comp: - fp = io.BytesIO(comp.read()) - elif compression is None: - fp = path - reread = read_stata(fp, index_col="index") + path = tmp_path / file_name + path.touch() + df.to_stata(path, version=version, compression=compression_arg) + if compression == "gzip": + with gzip.open(path, "rb") as comp: + fp = io.BytesIO(comp.read()) + elif compression == "zip": + with zipfile.ZipFile(path, "r") as comp: + fp = io.BytesIO(comp.read(comp.filelist[0])) + elif compression == "tar": + with tarfile.open(path) as tar: + fp = io.BytesIO(tar.extractfile(tar.getnames()[0]).read()) + elif compression == "bz2": + with bz2.open(path, "rb") as comp: + fp = io.BytesIO(comp.read()) + elif compression == "zstd": + zstd = pytest.importorskip("zstandard") + with zstd.open(path, "rb") as comp: + fp = io.BytesIO(comp.read()) + elif compression == "xz": + lzma = pytest.importorskip("lzma") + with lzma.open(path, "rb") as comp: + fp = io.BytesIO(comp.read()) + elif compression is None: + fp = path + reread = read_stata(fp, index_col="index") expected = df tm.assert_frame_equal(reread, expected) @@ -2045,47 +2037,47 @@ def test_compression(compression, version, use_dict, infer, compression_to_exten @pytest.mark.parametrize("method", ["zip", "infer"]) @pytest.mark.parametrize("file_ext", [None, "dta", "zip"]) -def test_compression_dict(method, file_ext): +def test_compression_dict(method, file_ext, tmp_path): file_name = f"test.{file_ext}" archive_name = "test.dta" df = DataFrame( np.random.default_rng(2).standard_normal((10, 2)), columns=list("AB") ) df.index.name = "index" - with tm.ensure_clean(file_name) as path: - compression = {"method": method, "archive_name": archive_name} - df.to_stata(path, compression=compression) - if method == "zip" or file_ext == "zip": - with zipfile.ZipFile(path, "r") as zp: - assert len(zp.filelist) == 1 - assert zp.filelist[0].filename == archive_name - fp = io.BytesIO(zp.read(zp.filelist[0])) - else: - fp = path - reread = read_stata(fp, index_col="index") + compression = {"method": method, "archive_name": archive_name} + path = tmp_path / file_name + path.touch() + df.to_stata(path, compression=compression) + if method == "zip" or file_ext == "zip": + with zipfile.ZipFile(path, "r") as zp: + assert len(zp.filelist) == 1 + assert zp.filelist[0].filename == archive_name + fp = io.BytesIO(zp.read(zp.filelist[0])) + else: + fp = path + reread = read_stata(fp, index_col="index") expected = df tm.assert_frame_equal(reread, expected) @pytest.mark.parametrize("version", [114, 117, 118, 119, None]) -def test_chunked_categorical(version): +def test_chunked_categorical(version, temp_file): df = DataFrame({"cats": Series(["a", "b", "a", "b", "c"], dtype="category")}) df.index.name = "index" expected = df.copy() - with tm.ensure_clean() as path: - df.to_stata(path, version=version) - with StataReader(path, chunksize=2, order_categoricals=False) as reader: - for i, block in enumerate(reader): - block = block.set_index("index") - assert "cats" in block - tm.assert_series_equal( - block.cats, - expected.cats.iloc[2 * i : 2 * (i + 1)], - check_index_type=len(block) > 1, - ) + df.to_stata(temp_file, version=version) + with StataReader(temp_file, chunksize=2, order_categoricals=False) as reader: + for i, block in enumerate(reader): + block = block.set_index("index") + assert "cats" in block + tm.assert_series_equal( + block.cats, + expected.cats.iloc[2 * i : 2 * (i + 1)], + check_index_type=len(block) > 1, + ) def test_chunked_categorical_partial(datapath): @@ -2115,38 +2107,36 @@ def test_iterator_errors(datapath, chunksize): pass -def test_iterator_value_labels(): +def test_iterator_value_labels(temp_file): # GH 31544 values = ["c_label", "b_label"] + ["a_label"] * 500 df = DataFrame({f"col{k}": pd.Categorical(values, ordered=True) for k in range(2)}) - with tm.ensure_clean() as path: - df.to_stata(path, write_index=False) - expected = pd.Index(["a_label", "b_label", "c_label"], dtype="object") - with read_stata(path, chunksize=100) as reader: - for j, chunk in enumerate(reader): - for i in range(2): - tm.assert_index_equal(chunk.dtypes.iloc[i].categories, expected) - tm.assert_frame_equal(chunk, df.iloc[j * 100 : (j + 1) * 100]) + df.to_stata(temp_file, write_index=False) + expected = pd.Index(["a_label", "b_label", "c_label"], dtype="object") + with read_stata(temp_file, chunksize=100) as reader: + for j, chunk in enumerate(reader): + for i in range(2): + tm.assert_index_equal(chunk.dtypes.iloc[i].categories, expected) + tm.assert_frame_equal(chunk, df.iloc[j * 100 : (j + 1) * 100]) -def test_precision_loss(): +def test_precision_loss(temp_file): df = DataFrame( [[sum(2**i for i in range(60)), sum(2**i for i in range(52))]], columns=["big", "little"], ) - with tm.ensure_clean() as path: - with tm.assert_produces_warning( - PossiblePrecisionLoss, match="Column converted from int64 to float64" - ): - df.to_stata(path, write_index=False) - reread = read_stata(path) - expected_dt = Series([np.float64, np.float64], index=["big", "little"]) - tm.assert_series_equal(reread.dtypes, expected_dt) - assert reread.loc[0, "little"] == df.loc[0, "little"] - assert reread.loc[0, "big"] == float(df.loc[0, "big"]) + with tm.assert_produces_warning( + PossiblePrecisionLoss, match="Column converted from int64 to float64" + ): + df.to_stata(temp_file, write_index=False) + reread = read_stata(temp_file) + expected_dt = Series([np.float64, np.float64], index=["big", "little"]) + tm.assert_series_equal(reread.dtypes, expected_dt) + assert reread.loc[0, "little"] == df.loc[0, "little"] + assert reread.loc[0, "big"] == float(df.loc[0, "big"]) -def test_compression_roundtrip(compression): +def test_compression_roundtrip(compression, temp_file): df = DataFrame( [[0.123456, 0.234567, 0.567567], [12.32112, 123123.2, 321321.2]], index=["A", "B"], @@ -2154,22 +2144,21 @@ def test_compression_roundtrip(compression): ) df.index.name = "index" - with tm.ensure_clean() as path: - df.to_stata(path, compression=compression) - reread = read_stata(path, compression=compression, index_col="index") - tm.assert_frame_equal(df, reread) + df.to_stata(temp_file, compression=compression) + reread = read_stata(temp_file, compression=compression, index_col="index") + tm.assert_frame_equal(df, reread) - # explicitly ensure file was compressed. - with tm.decompress_file(path, compression) as fh: - contents = io.BytesIO(fh.read()) - reread = read_stata(contents, index_col="index") - tm.assert_frame_equal(df, reread) + # explicitly ensure file was compressed. + with tm.decompress_file(temp_file, compression) as fh: + contents = io.BytesIO(fh.read()) + reread = read_stata(contents, index_col="index") + tm.assert_frame_equal(df, reread) @pytest.mark.parametrize("to_infer", [True, False]) @pytest.mark.parametrize("read_infer", [True, False]) def test_stata_compression( - compression_only, read_infer, to_infer, compression_to_extension + compression_only, read_infer, to_infer, compression_to_extension, tmp_path ): compression = compression_only @@ -2186,13 +2175,14 @@ def test_stata_compression( to_compression = "infer" if to_infer else compression read_compression = "infer" if read_infer else compression - with tm.ensure_clean(filename) as path: - df.to_stata(path, compression=to_compression) - result = read_stata(path, compression=read_compression, index_col="index") - tm.assert_frame_equal(result, df) + path = tmp_path / filename + path.touch() + df.to_stata(path, compression=to_compression) + result = read_stata(path, compression=read_compression, index_col="index") + tm.assert_frame_equal(result, df) -def test_non_categorical_value_labels(): +def test_non_categorical_value_labels(temp_file): data = DataFrame( { "fully_labelled": [1, 2, 3, 3, 1], @@ -2202,35 +2192,35 @@ def test_non_categorical_value_labels(): } ) - with tm.ensure_clean() as path: - value_labels = { - "fully_labelled": {1: "one", 2: "two", 3: "three"}, - "partially_labelled": {1.0: "one", 2.0: "two"}, - } - expected = {**value_labels, "Z": {0: "j", 1: "k", 2: "l"}} + path = temp_file + value_labels = { + "fully_labelled": {1: "one", 2: "two", 3: "three"}, + "partially_labelled": {1.0: "one", 2.0: "two"}, + } + expected = {**value_labels, "Z": {0: "j", 1: "k", 2: "l"}} - writer = StataWriter(path, data, value_labels=value_labels) - writer.write_file() + writer = StataWriter(path, data, value_labels=value_labels) + writer.write_file() - with StataReader(path) as reader: - reader_value_labels = reader.value_labels() - assert reader_value_labels == expected + with StataReader(path) as reader: + reader_value_labels = reader.value_labels() + assert reader_value_labels == expected - msg = "Can't create value labels for notY, it wasn't found in the dataset." - value_labels = {"notY": {7: "label1", 8: "label2"}} - with pytest.raises(KeyError, match=msg): - StataWriter(path, data, value_labels=value_labels) + msg = "Can't create value labels for notY, it wasn't found in the dataset." + value_labels = {"notY": {7: "label1", 8: "label2"}} + with pytest.raises(KeyError, match=msg): + StataWriter(path, data, value_labels=value_labels) - msg = ( - "Can't create value labels for Z, value labels " - "can only be applied to numeric columns." - ) - value_labels = {"Z": {1: "a", 2: "k", 3: "j", 4: "i"}} - with pytest.raises(ValueError, match=msg): - StataWriter(path, data, value_labels=value_labels) + msg = ( + "Can't create value labels for Z, value labels " + "can only be applied to numeric columns." + ) + value_labels = {"Z": {1: "a", 2: "k", 3: "j", 4: "i"}} + with pytest.raises(ValueError, match=msg): + StataWriter(path, data, value_labels=value_labels) -def test_non_categorical_value_label_name_conversion(): +def test_non_categorical_value_label_name_conversion(temp_file): # Check conversion of invalid variable names data = DataFrame( { @@ -2258,16 +2248,15 @@ def test_non_categorical_value_label_name_conversion(): "_1__2_": {3: "three"}, } - with tm.ensure_clean() as path: - with tm.assert_produces_warning(InvalidColumnName): - data.to_stata(path, value_labels=value_labels) + with tm.assert_produces_warning(InvalidColumnName): + data.to_stata(temp_file, value_labels=value_labels) - with StataReader(path) as reader: - reader_value_labels = reader.value_labels() - assert reader_value_labels == expected + with StataReader(temp_file) as reader: + reader_value_labels = reader.value_labels() + assert reader_value_labels == expected -def test_non_categorical_value_label_convert_categoricals_error(): +def test_non_categorical_value_label_convert_categoricals_error(temp_file): # Mapping more than one value to the same label is valid for Stata # labels, but can't be read with convert_categoricals=True value_labels = { @@ -2280,17 +2269,16 @@ def test_non_categorical_value_label_convert_categoricals_error(): } ) - with tm.ensure_clean() as path: - data.to_stata(path, value_labels=value_labels) + data.to_stata(temp_file, value_labels=value_labels) - with StataReader(path, convert_categoricals=False) as reader: - reader_value_labels = reader.value_labels() - assert reader_value_labels == value_labels + with StataReader(temp_file, convert_categoricals=False) as reader: + reader_value_labels = reader.value_labels() + assert reader_value_labels == value_labels - col = "repeated_labels" - repeats = "-" * 80 + "\n" + "\n".join(["More than ten"]) + col = "repeated_labels" + repeats = "-" * 80 + "\n" + "\n".join(["More than ten"]) - msg = f""" + msg = f""" Value labels for column {col} are not unique. These cannot be converted to pandas categoricals. @@ -2301,8 +2289,8 @@ def test_non_categorical_value_label_convert_categoricals_error(): The repeated labels are: {repeats} """ - with pytest.raises(ValueError, match=msg): - read_stata(path, convert_categoricals=True) + with pytest.raises(ValueError, match=msg): + read_stata(temp_file, convert_categoricals=True) @pytest.mark.parametrize("version", [114, 117, 118, 119, None]) @@ -2320,7 +2308,7 @@ def test_non_categorical_value_label_convert_categoricals_error(): pd.UInt64Dtype, ], ) -def test_nullable_support(dtype, version): +def test_nullable_support(dtype, version, temp_file): df = DataFrame( { "a": Series([1.0, 2.0, 3.0]), @@ -2339,27 +2327,26 @@ def test_nullable_support(dtype, version): smv = StataMissingValue(value) expected_b = Series([1, smv, smv], dtype=object, name="b") expected_c = Series(["a", "b", ""], name="c") - with tm.ensure_clean() as path: - df.to_stata(path, write_index=False, version=version) - reread = read_stata(path, convert_missing=True) - tm.assert_series_equal(df.a, reread.a) - tm.assert_series_equal(reread.b, expected_b) - tm.assert_series_equal(reread.c, expected_c) + df.to_stata(temp_file, write_index=False, version=version) + reread = read_stata(temp_file, convert_missing=True) + tm.assert_series_equal(df.a, reread.a) + tm.assert_series_equal(reread.b, expected_b) + tm.assert_series_equal(reread.c, expected_c) -def test_empty_frame(): +def test_empty_frame(temp_file): # GH 46240 # create an empty DataFrame with int64 and float64 dtypes df = DataFrame(data={"a": range(3), "b": [1.0, 2.0, 3.0]}).head(0) - with tm.ensure_clean() as path: - df.to_stata(path, write_index=False, version=117) - # Read entire dataframe - df2 = read_stata(path) - assert "b" in df2 - # Dtypes don't match since no support for int32 - dtypes = Series({"a": np.dtype("int32"), "b": np.dtype("float64")}) - tm.assert_series_equal(df2.dtypes, dtypes) - # read one column of empty .dta file - df3 = read_stata(path, columns=["a"]) - assert "b" not in df3 - tm.assert_series_equal(df3.dtypes, dtypes.loc[["a"]]) + path = temp_file + df.to_stata(path, write_index=False, version=117) + # Read entire dataframe + df2 = read_stata(path) + assert "b" in df2 + # Dtypes don't match since no support for int32 + dtypes = Series({"a": np.dtype("int32"), "b": np.dtype("float64")}) + tm.assert_series_equal(df2.dtypes, dtypes) + # read one column of empty .dta file + df3 = read_stata(path, columns=["a"]) + assert "b" not in df3 + tm.assert_series_equal(df3.dtypes, dtypes.loc[["a"]]) diff --git a/pandas/tests/series/methods/test_to_csv.py b/pandas/tests/series/methods/test_to_csv.py index e292861012c8f..f7dec02ab0e5b 100644 --- a/pandas/tests/series/methods/test_to_csv.py +++ b/pandas/tests/series/methods/test_to_csv.py @@ -24,58 +24,55 @@ def read_csv(self, path, **kwargs): return out - def test_from_csv(self, datetime_series, string_series): + def test_from_csv(self, datetime_series, string_series, temp_file): # freq doesn't round-trip datetime_series.index = datetime_series.index._with_freq(None) - with tm.ensure_clean() as path: - datetime_series.to_csv(path, header=False) - ts = self.read_csv(path, parse_dates=True) - tm.assert_series_equal(datetime_series, ts, check_names=False) + path = temp_file + datetime_series.to_csv(path, header=False) + ts = self.read_csv(path, parse_dates=True) + tm.assert_series_equal(datetime_series, ts, check_names=False) - assert ts.name is None - assert ts.index.name is None + assert ts.name is None + assert ts.index.name is None - # see gh-10483 - datetime_series.to_csv(path, header=True) - ts_h = self.read_csv(path, header=0) - assert ts_h.name == "ts" + # see gh-10483 + datetime_series.to_csv(path, header=True) + ts_h = self.read_csv(path, header=0) + assert ts_h.name == "ts" - string_series.to_csv(path, header=False) - series = self.read_csv(path) - tm.assert_series_equal(string_series, series, check_names=False) + string_series.to_csv(path, header=False) + series = self.read_csv(path) + tm.assert_series_equal(string_series, series, check_names=False) - assert series.name is None - assert series.index.name is None + assert series.name is None + assert series.index.name is None - string_series.to_csv(path, header=True) - series_h = self.read_csv(path, header=0) - assert series_h.name == "series" + string_series.to_csv(path, header=True) + series_h = self.read_csv(path, header=0) + assert series_h.name == "series" - with open(path, "w", encoding="utf-8") as outfile: - outfile.write("1998-01-01|1.0\n1999-01-01|2.0") + with open(path, "w", encoding="utf-8") as outfile: + outfile.write("1998-01-01|1.0\n1999-01-01|2.0") - series = self.read_csv(path, sep="|", parse_dates=True) - check_series = Series( - {datetime(1998, 1, 1): 1.0, datetime(1999, 1, 1): 2.0} - ) - tm.assert_series_equal(check_series, series) + series = self.read_csv(path, sep="|", parse_dates=True) + check_series = Series({datetime(1998, 1, 1): 1.0, datetime(1999, 1, 1): 2.0}) + tm.assert_series_equal(check_series, series) - series = self.read_csv(path, sep="|", parse_dates=False) - check_series = Series({"1998-01-01": 1.0, "1999-01-01": 2.0}) - tm.assert_series_equal(check_series, series) + series = self.read_csv(path, sep="|", parse_dates=False) + check_series = Series({"1998-01-01": 1.0, "1999-01-01": 2.0}) + tm.assert_series_equal(check_series, series) - def test_to_csv(self, datetime_series): - with tm.ensure_clean() as path: - datetime_series.to_csv(path, header=False) + def test_to_csv(self, datetime_series, temp_file): + datetime_series.to_csv(temp_file, header=False) - with open(path, newline=None, encoding="utf-8") as f: - lines = f.readlines() - assert lines[1] != "\n" + with open(temp_file, newline=None, encoding="utf-8") as f: + lines = f.readlines() + assert lines[1] != "\n" - datetime_series.to_csv(path, index=False, header=False) - arr = np.loadtxt(path) - tm.assert_almost_equal(arr, datetime_series.values) + datetime_series.to_csv(temp_file, index=False, header=False) + arr = np.loadtxt(temp_file) + tm.assert_almost_equal(arr, datetime_series.values) def test_to_csv_unicode_index(self): buf = StringIO() @@ -87,14 +84,13 @@ def test_to_csv_unicode_index(self): s2 = self.read_csv(buf, index_col=0, encoding="UTF-8") tm.assert_series_equal(s, s2) - def test_to_csv_float_format(self): - with tm.ensure_clean() as filename: - ser = Series([0.123456, 0.234567, 0.567567]) - ser.to_csv(filename, float_format="%.2f", header=False) + def test_to_csv_float_format(self, temp_file): + ser = Series([0.123456, 0.234567, 0.567567]) + ser.to_csv(temp_file, float_format="%.2f", header=False) - rs = self.read_csv(filename) - xp = Series([0.12, 0.23, 0.57]) - tm.assert_series_equal(rs, xp) + rs = self.read_csv(temp_file) + xp = Series([0.12, 0.23, 0.57]) + tm.assert_series_equal(rs, xp) def test_to_csv_list_entries(self): s = Series(["jack and jill", "jesse and frank"]) @@ -128,50 +124,49 @@ def test_to_csv_path_is_none(self): ), ], ) - def test_to_csv_compression(self, s, encoding, compression): - with tm.ensure_clean() as filename: - s.to_csv(filename, compression=compression, encoding=encoding, header=True) - # test the round trip - to_csv -> read_csv - result = pd.read_csv( - filename, - compression=compression, - encoding=encoding, - index_col=0, - ).squeeze("columns") - tm.assert_series_equal(s, result) - - # test the round trip using file handle - to_csv -> read_csv - with get_handle( - filename, "w", compression=compression, encoding=encoding - ) as handles: - s.to_csv(handles.handle, encoding=encoding, header=True) - - result = pd.read_csv( - filename, - compression=compression, - encoding=encoding, - index_col=0, - ).squeeze("columns") - tm.assert_series_equal(s, result) - - # explicitly ensure file was compressed - with tm.decompress_file(filename, compression) as fh: - text = fh.read().decode(encoding or "utf8") - assert s.name in text - - with tm.decompress_file(filename, compression) as fh: - tm.assert_series_equal( - s, - pd.read_csv(fh, index_col=0, encoding=encoding).squeeze("columns"), - ) - - def test_to_csv_interval_index(self, using_infer_string): + def test_to_csv_compression(self, s, encoding, compression, temp_file): + filename = temp_file + s.to_csv(filename, compression=compression, encoding=encoding, header=True) + # test the round trip - to_csv -> read_csv + result = pd.read_csv( + filename, + compression=compression, + encoding=encoding, + index_col=0, + ).squeeze("columns") + tm.assert_series_equal(s, result) + + # test the round trip using file handle - to_csv -> read_csv + with get_handle( + filename, "w", compression=compression, encoding=encoding + ) as handles: + s.to_csv(handles.handle, encoding=encoding, header=True) + + result = pd.read_csv( + filename, + compression=compression, + encoding=encoding, + index_col=0, + ).squeeze("columns") + tm.assert_series_equal(s, result) + + # explicitly ensure file was compressed + with tm.decompress_file(filename, compression) as fh: + text = fh.read().decode(encoding or "utf8") + assert s.name in text + + with tm.decompress_file(filename, compression) as fh: + tm.assert_series_equal( + s, + pd.read_csv(fh, index_col=0, encoding=encoding).squeeze("columns"), + ) + + def test_to_csv_interval_index(self, using_infer_string, temp_file): # GH 28210 s = Series(["foo", "bar", "baz"], index=pd.interval_range(0, 3)) - with tm.ensure_clean("__tmp_to_csv_interval_index__.csv") as path: - s.to_csv(path, header=False) - result = self.read_csv(path, index_col=0) + s.to_csv(temp_file, header=False) + result = self.read_csv(temp_file, index_col=0) # can't roundtrip intervalindex via read_csv so check string repr (GH 23595) expected = s