Skip to content

Commit 9f3e429

Browse files
CI: Mark s3 tests parallel safe (pandas-dev#35895)
Closes pandas-dev#35856
1 parent c74ed38 commit 9f3e429

File tree

4 files changed

+41
-30
lines changed

4 files changed

+41
-30
lines changed

pandas/tests/io/conftest.py

+13-11
Original file line numberDiff line numberDiff line change
@@ -34,12 +34,13 @@ def feather_file(datapath):
3434

3535

3636
@pytest.fixture
37-
def s3so():
38-
return dict(client_kwargs={"endpoint_url": "http://127.0.0.1:5555/"})
37+
def s3so(worker_id):
38+
worker_id = "5" if worker_id == "master" else worker_id.lstrip("gw")
39+
return dict(client_kwargs={"endpoint_url": f"http://127.0.0.1:555{worker_id}/"})
3940

4041

41-
@pytest.fixture(scope="module")
42-
def s3_base():
42+
@pytest.fixture(scope="session")
43+
def s3_base(worker_id):
4344
"""
4445
Fixture for mocking S3 interaction.
4546
@@ -61,11 +62,13 @@ def s3_base():
6162
# Launching moto in server mode, i.e., as a separate process
6263
# with an S3 endpoint on localhost
6364

64-
endpoint_uri = "http://127.0.0.1:5555/"
65+
worker_id = "5" if worker_id == "master" else worker_id.lstrip("gw")
66+
endpoint_port = f"555{worker_id}"
67+
endpoint_uri = f"http://127.0.0.1:{endpoint_port}/"
6568

6669
# pipe to null to avoid logging in terminal
6770
proc = subprocess.Popen(
68-
shlex.split("moto_server s3 -p 5555"), stdout=subprocess.DEVNULL
71+
shlex.split(f"moto_server s3 -p {endpoint_port}"), stdout=subprocess.DEVNULL
6972
)
7073

7174
timeout = 5
@@ -79,7 +82,7 @@ def s3_base():
7982
pass
8083
timeout -= 0.1
8184
time.sleep(0.1)
82-
yield
85+
yield endpoint_uri
8386

8487
proc.terminate()
8588
proc.wait()
@@ -119,9 +122,8 @@ def add_tips_files(bucket_name):
119122
cli.put_object(Bucket=bucket_name, Key=s3_key, Body=f)
120123

121124
bucket = "pandas-test"
122-
endpoint_uri = "http://127.0.0.1:5555/"
123-
conn = boto3.resource("s3", endpoint_url=endpoint_uri)
124-
cli = boto3.client("s3", endpoint_url=endpoint_uri)
125+
conn = boto3.resource("s3", endpoint_url=s3_base)
126+
cli = boto3.client("s3", endpoint_url=s3_base)
125127

126128
try:
127129
cli.create_bucket(Bucket=bucket)
@@ -143,7 +145,7 @@ def add_tips_files(bucket_name):
143145
s3fs.S3FileSystem.clear_instance_cache()
144146
yield conn
145147

146-
s3 = s3fs.S3FileSystem(client_kwargs={"endpoint_url": "http://127.0.0.1:5555/"})
148+
s3 = s3fs.S3FileSystem(client_kwargs={"endpoint_url": s3_base})
147149

148150
try:
149151
s3.rm(bucket, recursive=True)

pandas/tests/io/json/test_compression.py

+2-4
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ def test_read_zipped_json(datapath):
3434

3535

3636
@td.skip_if_not_us_locale
37-
def test_with_s3_url(compression, s3_resource):
37+
def test_with_s3_url(compression, s3_resource, s3so):
3838
# Bucket "pandas-test" created in tests/io/conftest.py
3939

4040
df = pd.read_json('{"a": [1, 2, 3], "b": [4, 5, 6]}')
@@ -45,9 +45,7 @@ def test_with_s3_url(compression, s3_resource):
4545
s3_resource.Bucket("pandas-test").put_object(Key="test-1", Body=f)
4646

4747
roundtripped_df = pd.read_json(
48-
"s3://pandas-test/test-1",
49-
compression=compression,
50-
storage_options=dict(client_kwargs={"endpoint_url": "http://127.0.0.1:5555/"}),
48+
"s3://pandas-test/test-1", compression=compression, storage_options=s3so,
5149
)
5250
tm.assert_frame_equal(df, roundtripped_df)
5351

pandas/tests/io/json/test_pandas.py

+2-5
Original file line numberDiff line numberDiff line change
@@ -1702,17 +1702,14 @@ def test_json_multiindex(self, dataframe, expected):
17021702
result = series.to_json(orient="index")
17031703
assert result == expected
17041704

1705-
def test_to_s3(self, s3_resource):
1705+
def test_to_s3(self, s3_resource, s3so):
17061706
import time
17071707

17081708
# GH 28375
17091709
mock_bucket_name, target_file = "pandas-test", "test.json"
17101710
df = DataFrame({"x": [1, 2, 3], "y": [2, 4, 6]})
17111711
df.to_json(
1712-
f"s3://{mock_bucket_name}/{target_file}",
1713-
storage_options=dict(
1714-
client_kwargs={"endpoint_url": "http://127.0.0.1:5555/"}
1715-
),
1712+
f"s3://{mock_bucket_name}/{target_file}", storage_options=s3so,
17161713
)
17171714
timeout = 5
17181715
while True:

pandas/tests/io/test_parquet.py

+24-10
Original file line numberDiff line numberDiff line change
@@ -158,10 +158,6 @@ def check_round_trip(
158158
"""
159159
write_kwargs = write_kwargs or {"compression": None}
160160
read_kwargs = read_kwargs or {}
161-
if isinstance(path, str) and "s3://" in path:
162-
s3so = dict(client_kwargs={"endpoint_url": "http://127.0.0.1:5555/"})
163-
read_kwargs["storage_options"] = s3so
164-
write_kwargs["storage_options"] = s3so
165161

166162
if expected is None:
167163
expected = df
@@ -555,15 +551,24 @@ def test_s3_roundtrip_explicit_fs(self, df_compat, s3_resource, pa, s3so):
555551
write_kwargs=kw,
556552
)
557553

558-
def test_s3_roundtrip(self, df_compat, s3_resource, pa):
554+
def test_s3_roundtrip(self, df_compat, s3_resource, pa, s3so):
559555
if LooseVersion(pyarrow.__version__) <= LooseVersion("0.17.0"):
560556
pytest.skip()
561557
# GH #19134
562-
check_round_trip(df_compat, pa, path="s3://pandas-test/pyarrow.parquet")
558+
s3so = dict(storage_options=s3so)
559+
check_round_trip(
560+
df_compat,
561+
pa,
562+
path="s3://pandas-test/pyarrow.parquet",
563+
read_kwargs=s3so,
564+
write_kwargs=s3so,
565+
)
563566

564567
@td.skip_if_no("s3fs")
565568
@pytest.mark.parametrize("partition_col", [["A"], []])
566-
def test_s3_roundtrip_for_dir(self, df_compat, s3_resource, pa, partition_col):
569+
def test_s3_roundtrip_for_dir(
570+
self, df_compat, s3_resource, pa, partition_col, s3so
571+
):
567572
# GH #26388
568573
expected_df = df_compat.copy()
569574

@@ -587,7 +592,10 @@ def test_s3_roundtrip_for_dir(self, df_compat, s3_resource, pa, partition_col):
587592
pa,
588593
expected=expected_df,
589594
path="s3://pandas-test/parquet_dir",
590-
write_kwargs={"partition_cols": partition_col, "compression": None},
595+
read_kwargs=dict(storage_options=s3so),
596+
write_kwargs=dict(
597+
partition_cols=partition_col, compression=None, storage_options=s3so
598+
),
591599
check_like=True,
592600
repeat=1,
593601
)
@@ -761,9 +769,15 @@ def test_filter_row_groups(self, fp):
761769
result = read_parquet(path, fp, filters=[("a", "==", 0)])
762770
assert len(result) == 1
763771

764-
def test_s3_roundtrip(self, df_compat, s3_resource, fp):
772+
def test_s3_roundtrip(self, df_compat, s3_resource, fp, s3so):
765773
# GH #19134
766-
check_round_trip(df_compat, fp, path="s3://pandas-test/fastparquet.parquet")
774+
check_round_trip(
775+
df_compat,
776+
fp,
777+
path="s3://pandas-test/fastparquet.parquet",
778+
read_kwargs=dict(storage_options=s3so),
779+
write_kwargs=dict(compression=None, storage_options=s3so),
780+
)
767781

768782
def test_partition_cols_supported(self, fp, df_full):
769783
# GH #23283

0 commit comments

Comments
 (0)