From e6ab6cba12291ecaf9d65b22a8566dd602e0ffda Mon Sep 17 00:00:00 2001 From: Tom Augspurger Date: Tue, 25 Aug 2020 09:49:12 -0500 Subject: [PATCH 1/3] CI: Mark s3 tests as single Closes https://github.com/pandas-dev/pandas/issues/35856 I think we need to update the pytest pattern though, so this should fail. --- pandas/tests/io/excel/test_readers.py | 1 + pandas/tests/io/json/test_pandas.py | 2 ++ pandas/tests/io/parser/test_network.py | 1 + pandas/tests/io/test_fsspec.py | 3 +++ pandas/tests/io/test_parquet.py | 4 ++++ 5 files changed, 11 insertions(+) diff --git a/pandas/tests/io/excel/test_readers.py b/pandas/tests/io/excel/test_readers.py index 431a50477fccc..05d8d5e43dd98 100644 --- a/pandas/tests/io/excel/test_readers.py +++ b/pandas/tests/io/excel/test_readers.py @@ -606,6 +606,7 @@ def test_read_from_http_url(self, read_ext): tm.assert_frame_equal(url_table, local_table) @td.skip_if_not_us_locale + @pytest.mark.single def test_read_from_s3_url(self, read_ext, s3_resource, s3so): # Bucket "pandas-test" created in tests/io/conftest.py with open("test1" + read_ext, "rb") as f: diff --git a/pandas/tests/io/json/test_pandas.py b/pandas/tests/io/json/test_pandas.py index 64a666079876f..c7e70e9768888 100644 --- a/pandas/tests/io/json/test_pandas.py +++ b/pandas/tests/io/json/test_pandas.py @@ -1213,6 +1213,7 @@ def test_read_inline_jsonl(self): tm.assert_frame_equal(result, expected) @td.skip_if_not_us_locale + @pytest.mark.single def test_read_s3_jsonl(self, s3_resource, s3so): # GH17200 @@ -1702,6 +1703,7 @@ def test_json_multiindex(self, dataframe, expected): result = series.to_json(orient="index") assert result == expected + @pytest.mark.single def test_to_s3(self, s3_resource): import time diff --git a/pandas/tests/io/parser/test_network.py b/pandas/tests/io/parser/test_network.py index b8b03cbd14a1d..878c67e393334 100644 --- a/pandas/tests/io/parser/test_network.py +++ b/pandas/tests/io/parser/test_network.py @@ -69,6 +69,7 @@ def tips_df(datapath): @pytest.mark.usefixtures("s3_resource") @td.skip_if_not_us_locale() +@pytest.mark.single class TestS3: @td.skip_if_no("s3fs") def test_parse_public_s3_bucket(self, tips_df, s3so): diff --git a/pandas/tests/io/test_fsspec.py b/pandas/tests/io/test_fsspec.py index 666da677d702e..2c5b4f6cc874e 100644 --- a/pandas/tests/io/test_fsspec.py +++ b/pandas/tests/io/test_fsspec.py @@ -131,6 +131,7 @@ def test_fastparquet_options(fsspectest): @td.skip_if_no("s3fs") +@pytest.mark.single def test_from_s3_csv(s3_resource, tips_file, s3so): tm.assert_equal( read_csv("s3://pandas-test/tips.csv", storage_options=s3so), read_csv(tips_file) @@ -148,6 +149,7 @@ def test_from_s3_csv(s3_resource, tips_file, s3so): @pytest.mark.parametrize("protocol", ["s3", "s3a", "s3n"]) @td.skip_if_no("s3fs") +@pytest.mark.single def test_s3_protocols(s3_resource, tips_file, protocol, s3so): tm.assert_equal( read_csv("%s://pandas-test/tips.csv" % protocol, storage_options=s3so), @@ -157,6 +159,7 @@ def test_s3_protocols(s3_resource, tips_file, protocol, s3so): @td.skip_if_no("s3fs") @td.skip_if_no("fastparquet") +@pytest.mark.single def test_s3_parquet(s3_resource, s3so): fn = "s3://pandas-test/test.parquet" df1.to_parquet( diff --git a/pandas/tests/io/test_parquet.py b/pandas/tests/io/test_parquet.py index 4e0c16c71a6a8..9edf141c91eae 100644 --- a/pandas/tests/io/test_parquet.py +++ b/pandas/tests/io/test_parquet.py @@ -541,6 +541,7 @@ def test_categorical(self, pa): expected = df.astype(object) check_round_trip(df, pa, expected=expected) + @pytest.mark.single def test_s3_roundtrip_explicit_fs(self, df_compat, s3_resource, pa, s3so): s3fs = pytest.importorskip("s3fs") if LooseVersion(pyarrow.__version__) <= LooseVersion("0.17.0"): @@ -555,6 +556,7 @@ def test_s3_roundtrip_explicit_fs(self, df_compat, s3_resource, pa, s3so): write_kwargs=kw, ) + @pytest.mark.single def test_s3_roundtrip(self, df_compat, s3_resource, pa): if LooseVersion(pyarrow.__version__) <= LooseVersion("0.17.0"): pytest.skip() @@ -563,6 +565,7 @@ def test_s3_roundtrip(self, df_compat, s3_resource, pa): @td.skip_if_no("s3fs") @pytest.mark.parametrize("partition_col", [["A"], []]) + @pytest.mark.single def test_s3_roundtrip_for_dir(self, df_compat, s3_resource, pa, partition_col): # GH #26388 expected_df = df_compat.copy() @@ -761,6 +764,7 @@ def test_filter_row_groups(self, fp): result = read_parquet(path, fp, filters=[("a", "==", 0)]) assert len(result) == 1 + @pytest.mark.single def test_s3_roundtrip(self, df_compat, s3_resource, fp): # GH #19134 check_round_trip(df_compat, fp, path="s3://pandas-test/fastparquet.parquet") From fcb55e9c1aff0c730ec7a7ec05c25905d42dc94d Mon Sep 17 00:00:00 2001 From: Tom Augspurger Date: Tue, 25 Aug 2020 12:53:13 -0500 Subject: [PATCH 2/3] parallel --- pandas/tests/io/conftest.py | 24 ++++++++------- pandas/tests/io/excel/test_readers.py | 1 - pandas/tests/io/json/test_compression.py | 6 ++-- pandas/tests/io/json/test_pandas.py | 9 ++---- pandas/tests/io/parser/test_network.py | 1 - pandas/tests/io/test_fsspec.py | 3 -- pandas/tests/io/test_parquet.py | 39 +++++++++++++++--------- 7 files changed, 42 insertions(+), 41 deletions(-) diff --git a/pandas/tests/io/conftest.py b/pandas/tests/io/conftest.py index 518f31d73efa9..193baa8c3ed74 100644 --- a/pandas/tests/io/conftest.py +++ b/pandas/tests/io/conftest.py @@ -34,12 +34,13 @@ def feather_file(datapath): @pytest.fixture -def s3so(): - return dict(client_kwargs={"endpoint_url": "http://127.0.0.1:5555/"}) +def s3so(worker_id): + worker_id = "5" if worker_id == "master" else worker_id.lstrip("gw") + return dict(client_kwargs={"endpoint_url": f"http://127.0.0.1:555{worker_id}/"}) -@pytest.fixture(scope="module") -def s3_base(): +@pytest.fixture(scope="session") +def s3_base(worker_id): """ Fixture for mocking S3 interaction. @@ -61,11 +62,13 @@ def s3_base(): # Launching moto in server mode, i.e., as a separate process # with an S3 endpoint on localhost - endpoint_uri = "http://127.0.0.1:5555/" + worker_id = "5" if worker_id == "master" else worker_id.lstrip("gw") + endpoint_port = f"555{worker_id}" + endpoint_uri = f"http://127.0.0.1:{endpoint_port}/" # pipe to null to avoid logging in terminal proc = subprocess.Popen( - shlex.split("moto_server s3 -p 5555"), stdout=subprocess.DEVNULL + shlex.split(f"moto_server s3 -p {endpoint_port}"), stdout=subprocess.DEVNULL ) timeout = 5 @@ -79,7 +82,7 @@ def s3_base(): pass timeout -= 0.1 time.sleep(0.1) - yield + yield endpoint_uri proc.terminate() proc.wait() @@ -119,9 +122,8 @@ def add_tips_files(bucket_name): cli.put_object(Bucket=bucket_name, Key=s3_key, Body=f) bucket = "pandas-test" - endpoint_uri = "http://127.0.0.1:5555/" - conn = boto3.resource("s3", endpoint_url=endpoint_uri) - cli = boto3.client("s3", endpoint_url=endpoint_uri) + conn = boto3.resource("s3", endpoint_url=s3_base) + cli = boto3.client("s3", endpoint_url=s3_base) try: cli.create_bucket(Bucket=bucket) @@ -143,7 +145,7 @@ def add_tips_files(bucket_name): s3fs.S3FileSystem.clear_instance_cache() yield conn - s3 = s3fs.S3FileSystem(client_kwargs={"endpoint_url": "http://127.0.0.1:5555/"}) + s3 = s3fs.S3FileSystem(client_kwargs={"endpoint_url": s3_base}) try: s3.rm(bucket, recursive=True) diff --git a/pandas/tests/io/excel/test_readers.py b/pandas/tests/io/excel/test_readers.py index 05d8d5e43dd98..431a50477fccc 100644 --- a/pandas/tests/io/excel/test_readers.py +++ b/pandas/tests/io/excel/test_readers.py @@ -606,7 +606,6 @@ def test_read_from_http_url(self, read_ext): tm.assert_frame_equal(url_table, local_table) @td.skip_if_not_us_locale - @pytest.mark.single def test_read_from_s3_url(self, read_ext, s3_resource, s3so): # Bucket "pandas-test" created in tests/io/conftest.py with open("test1" + read_ext, "rb") as f: diff --git a/pandas/tests/io/json/test_compression.py b/pandas/tests/io/json/test_compression.py index 5bb205842269e..c0e3220454bf1 100644 --- a/pandas/tests/io/json/test_compression.py +++ b/pandas/tests/io/json/test_compression.py @@ -34,7 +34,7 @@ def test_read_zipped_json(datapath): @td.skip_if_not_us_locale -def test_with_s3_url(compression, s3_resource): +def test_with_s3_url(compression, s3_resource, s3so): # Bucket "pandas-test" created in tests/io/conftest.py df = pd.read_json('{"a": [1, 2, 3], "b": [4, 5, 6]}') @@ -45,9 +45,7 @@ def test_with_s3_url(compression, s3_resource): s3_resource.Bucket("pandas-test").put_object(Key="test-1", Body=f) roundtripped_df = pd.read_json( - "s3://pandas-test/test-1", - compression=compression, - storage_options=dict(client_kwargs={"endpoint_url": "http://127.0.0.1:5555/"}), + "s3://pandas-test/test-1", compression=compression, storage_options=s3so, ) tm.assert_frame_equal(df, roundtripped_df) diff --git a/pandas/tests/io/json/test_pandas.py b/pandas/tests/io/json/test_pandas.py index c7e70e9768888..2022abbaee323 100644 --- a/pandas/tests/io/json/test_pandas.py +++ b/pandas/tests/io/json/test_pandas.py @@ -1213,7 +1213,6 @@ def test_read_inline_jsonl(self): tm.assert_frame_equal(result, expected) @td.skip_if_not_us_locale - @pytest.mark.single def test_read_s3_jsonl(self, s3_resource, s3so): # GH17200 @@ -1703,18 +1702,14 @@ def test_json_multiindex(self, dataframe, expected): result = series.to_json(orient="index") assert result == expected - @pytest.mark.single - def test_to_s3(self, s3_resource): + def test_to_s3(self, s3_resource, s3so): import time # GH 28375 mock_bucket_name, target_file = "pandas-test", "test.json" df = DataFrame({"x": [1, 2, 3], "y": [2, 4, 6]}) df.to_json( - f"s3://{mock_bucket_name}/{target_file}", - storage_options=dict( - client_kwargs={"endpoint_url": "http://127.0.0.1:5555/"} - ), + f"s3://{mock_bucket_name}/{target_file}", storage_options=s3so, ) timeout = 5 while True: diff --git a/pandas/tests/io/parser/test_network.py b/pandas/tests/io/parser/test_network.py index 878c67e393334..b8b03cbd14a1d 100644 --- a/pandas/tests/io/parser/test_network.py +++ b/pandas/tests/io/parser/test_network.py @@ -69,7 +69,6 @@ def tips_df(datapath): @pytest.mark.usefixtures("s3_resource") @td.skip_if_not_us_locale() -@pytest.mark.single class TestS3: @td.skip_if_no("s3fs") def test_parse_public_s3_bucket(self, tips_df, s3so): diff --git a/pandas/tests/io/test_fsspec.py b/pandas/tests/io/test_fsspec.py index 2c5b4f6cc874e..666da677d702e 100644 --- a/pandas/tests/io/test_fsspec.py +++ b/pandas/tests/io/test_fsspec.py @@ -131,7 +131,6 @@ def test_fastparquet_options(fsspectest): @td.skip_if_no("s3fs") -@pytest.mark.single def test_from_s3_csv(s3_resource, tips_file, s3so): tm.assert_equal( read_csv("s3://pandas-test/tips.csv", storage_options=s3so), read_csv(tips_file) @@ -149,7 +148,6 @@ def test_from_s3_csv(s3_resource, tips_file, s3so): @pytest.mark.parametrize("protocol", ["s3", "s3a", "s3n"]) @td.skip_if_no("s3fs") -@pytest.mark.single def test_s3_protocols(s3_resource, tips_file, protocol, s3so): tm.assert_equal( read_csv("%s://pandas-test/tips.csv" % protocol, storage_options=s3so), @@ -159,7 +157,6 @@ def test_s3_protocols(s3_resource, tips_file, protocol, s3so): @td.skip_if_no("s3fs") @td.skip_if_no("fastparquet") -@pytest.mark.single def test_s3_parquet(s3_resource, s3so): fn = "s3://pandas-test/test.parquet" df1.to_parquet( diff --git a/pandas/tests/io/test_parquet.py b/pandas/tests/io/test_parquet.py index 9edf141c91eae..ffda5fd90b871 100644 --- a/pandas/tests/io/test_parquet.py +++ b/pandas/tests/io/test_parquet.py @@ -158,10 +158,6 @@ def check_round_trip( """ write_kwargs = write_kwargs or {"compression": None} read_kwargs = read_kwargs or {} - if isinstance(path, str) and "s3://" in path: - s3so = dict(client_kwargs={"endpoint_url": "http://127.0.0.1:5555/"}) - read_kwargs["storage_options"] = s3so - write_kwargs["storage_options"] = s3so if expected is None: expected = df @@ -541,7 +537,6 @@ def test_categorical(self, pa): expected = df.astype(object) check_round_trip(df, pa, expected=expected) - @pytest.mark.single def test_s3_roundtrip_explicit_fs(self, df_compat, s3_resource, pa, s3so): s3fs = pytest.importorskip("s3fs") if LooseVersion(pyarrow.__version__) <= LooseVersion("0.17.0"): @@ -556,17 +551,24 @@ def test_s3_roundtrip_explicit_fs(self, df_compat, s3_resource, pa, s3so): write_kwargs=kw, ) - @pytest.mark.single - def test_s3_roundtrip(self, df_compat, s3_resource, pa): + def test_s3_roundtrip(self, df_compat, s3_resource, pa, s3so): if LooseVersion(pyarrow.__version__) <= LooseVersion("0.17.0"): pytest.skip() # GH #19134 - check_round_trip(df_compat, pa, path="s3://pandas-test/pyarrow.parquet") + s3so = dict(storage_options=s3so) + check_round_trip( + df_compat, + pa, + path="s3://pandas-test/pyarrow.parquet", + read_kwargs=s3so, + write_kwargs=s3so, + ) @td.skip_if_no("s3fs") @pytest.mark.parametrize("partition_col", [["A"], []]) - @pytest.mark.single - def test_s3_roundtrip_for_dir(self, df_compat, s3_resource, pa, partition_col): + def test_s3_roundtrip_for_dir( + self, df_compat, s3_resource, pa, partition_col, s3so + ): # GH #26388 expected_df = df_compat.copy() @@ -590,7 +592,10 @@ def test_s3_roundtrip_for_dir(self, df_compat, s3_resource, pa, partition_col): pa, expected=expected_df, path="s3://pandas-test/parquet_dir", - write_kwargs={"partition_cols": partition_col, "compression": None}, + read_kwargs=dict(storage_options=s3so), + write_kwargs=dict( + partition_cols=partition_col, compression=None, storage_options=s3so + ), check_like=True, repeat=1, ) @@ -764,10 +769,16 @@ def test_filter_row_groups(self, fp): result = read_parquet(path, fp, filters=[("a", "==", 0)]) assert len(result) == 1 - @pytest.mark.single - def test_s3_roundtrip(self, df_compat, s3_resource, fp): + def test_s3_roundtrip(self, df_compat, s3_resource, fp, s3so): # GH #19134 - check_round_trip(df_compat, fp, path="s3://pandas-test/fastparquet.parquet") + s3so = dict(storage_options=s3so) + check_round_trip( + df_compat, + fp, + path="s3://pandas-test/fastparquet.parquet", + read_kwargs=s3so, + write_kwargs=s3so, + ) def test_partition_cols_supported(self, fp, df_full): # GH #23283 From 74aa98204c0d8a8f58f21c51f5f26c726d6b2493 Mon Sep 17 00:00:00 2001 From: Tom Augspurger Date: Tue, 25 Aug 2020 13:49:52 -0500 Subject: [PATCH 3/3] compresion --- pandas/tests/io/test_parquet.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/pandas/tests/io/test_parquet.py b/pandas/tests/io/test_parquet.py index ffda5fd90b871..15f9837176315 100644 --- a/pandas/tests/io/test_parquet.py +++ b/pandas/tests/io/test_parquet.py @@ -771,13 +771,12 @@ def test_filter_row_groups(self, fp): def test_s3_roundtrip(self, df_compat, s3_resource, fp, s3so): # GH #19134 - s3so = dict(storage_options=s3so) check_round_trip( df_compat, fp, path="s3://pandas-test/fastparquet.parquet", - read_kwargs=s3so, - write_kwargs=s3so, + read_kwargs=dict(storage_options=s3so), + write_kwargs=dict(compression=None, storage_options=s3so), ) def test_partition_cols_supported(self, fp, df_full):