diff --git a/asv_bench/benchmarks/io/csv.py b/asv_bench/benchmarks/io/csv.py index 3b7fdc6e2d78c..0f5d07f9fac55 100644 --- a/asv_bench/benchmarks/io/csv.py +++ b/asv_bench/benchmarks/io/csv.py @@ -118,38 +118,6 @@ def time_read_uint64_na_values(self): na_values=self.na_values) -class S3(object): - # Make sure that we can read part of a file from S3 without - # needing to download the entire thing. Use the timeit.default_timer - # to measure wall time instead of CPU time -- we want to see - # how long it takes to download the data. - timer = timeit.default_timer - params = ([None, "gzip", "bz2"], ["python", "c"]) - param_names = ["compression", "engine"] - - def setup(self, compression, engine): - if compression == "bz2" and engine == "c" and PY2: - # The Python 2 C parser can't read bz2 from open files. - raise NotImplementedError - try: - import s3fs # noqa - except ImportError: - # Skip these benchmarks if `boto` is not installed. - raise NotImplementedError - - ext = "" - if compression == "gzip": - ext = ".gz" - elif compression == "bz2": - ext = ".bz2" - self.big_fname = "s3://pandas-test/large_random.csv" + ext - - def time_read_csv_10_rows(self, compression, engine): - # Read a small number of rows from a huge (100,000 x 50) table. - read_csv(self.big_fname, nrows=10, compression=compression, - engine=engine) - - class ReadCSVThousands(BaseIO): goal_time = 0.2 diff --git a/pandas/tests/io/parser/test_network.py b/pandas/tests/io/parser/test_network.py index f16338fda6245..fdf45f307e953 100644 --- a/pandas/tests/io/parser/test_network.py +++ b/pandas/tests/io/parser/test_network.py @@ -4,13 +4,16 @@ Tests parsers ability to read and parse non-local files and hence require a network connection to be read. """ +import logging + import pytest +import numpy as np import pandas.util.testing as tm import pandas.util._test_decorators as td from pandas import DataFrame from pandas.io.parsers import read_csv, read_table -from pandas.compat import BytesIO +from pandas.compat import BytesIO, StringIO @pytest.mark.network @@ -45,9 +48,9 @@ def check_compressed_urls(salaries_table, compression, extension, mode, tm.assert_frame_equal(url_table, salaries_table) +@pytest.mark.usefixtures("s3_resource") class TestS3(object): - @tm.network def test_parse_public_s3_bucket(self): pytest.importorskip('s3fs') # more of an integration test due to the not-public contents portion @@ -66,7 +69,6 @@ def test_parse_public_s3_bucket(self): assert not df.empty tm.assert_frame_equal(read_csv(tm.get_data_path('tips.csv')), df) - @tm.network def test_parse_public_s3n_bucket(self): # Read from AWS s3 as "s3n" URL @@ -76,7 +78,6 @@ def test_parse_public_s3n_bucket(self): tm.assert_frame_equal(read_csv( tm.get_data_path('tips.csv')).iloc[:10], df) - @tm.network def test_parse_public_s3a_bucket(self): # Read from AWS s3 as "s3a" URL df = read_csv('s3a://pandas-test/tips.csv', nrows=10) @@ -85,7 +86,6 @@ def test_parse_public_s3a_bucket(self): tm.assert_frame_equal(read_csv( tm.get_data_path('tips.csv')).iloc[:10], df) - @tm.network def test_parse_public_s3_bucket_nrows(self): for ext, comp in [('', None), ('.gz', 'gzip'), ('.bz2', 'bz2')]: df = read_csv('s3://pandas-test/tips.csv' + @@ -95,7 +95,6 @@ def test_parse_public_s3_bucket_nrows(self): tm.assert_frame_equal(read_csv( tm.get_data_path('tips.csv')).iloc[:10], df) - @tm.network def test_parse_public_s3_bucket_chunked(self): # Read with a chunksize chunksize = 5 @@ -114,7 +113,6 @@ def test_parse_public_s3_bucket_chunked(self): chunksize * i_chunk: chunksize * (i_chunk + 1)] tm.assert_frame_equal(true_df, df) - @tm.network def test_parse_public_s3_bucket_chunked_python(self): # Read with a chunksize using the Python parser chunksize = 5 @@ -133,7 +131,6 @@ def test_parse_public_s3_bucket_chunked_python(self): chunksize * i_chunk: chunksize * (i_chunk + 1)] tm.assert_frame_equal(true_df, df) - @tm.network def test_parse_public_s3_bucket_python(self): for ext, comp in [('', None), ('.gz', 'gzip'), ('.bz2', 'bz2')]: df = read_csv('s3://pandas-test/tips.csv' + ext, engine='python', @@ -143,7 +140,6 @@ def test_parse_public_s3_bucket_python(self): tm.assert_frame_equal(read_csv( tm.get_data_path('tips.csv')), df) - @tm.network def test_infer_s3_compression(self): for ext in ['', '.gz', '.bz2']: df = read_csv('s3://pandas-test/tips.csv' + ext, @@ -153,7 +149,6 @@ def test_infer_s3_compression(self): tm.assert_frame_equal(read_csv( tm.get_data_path('tips.csv')), df) - @tm.network def test_parse_public_s3_bucket_nrows_python(self): for ext, comp in [('', None), ('.gz', 'gzip'), ('.bz2', 'bz2')]: df = read_csv('s3://pandas-test/tips.csv' + ext, engine='python', @@ -163,7 +158,6 @@ def test_parse_public_s3_bucket_nrows_python(self): tm.assert_frame_equal(read_csv( tm.get_data_path('tips.csv')).iloc[:10], df) - @tm.network def test_s3_fails(self): with pytest.raises(IOError): read_csv('s3://nyqpug/asdf.csv') @@ -188,3 +182,22 @@ def test_read_csv_handles_boto_s3_object(self, expected = read_csv(tips_file) tm.assert_frame_equal(result, expected) + + def test_read_csv_chunked_download(self, s3_resource, caplog): + # 8 MB, S3FS usees 5MB chunks + df = DataFrame(np.random.randn(100000, 4), columns=list('abcd')) + buf = BytesIO() + str_buf = StringIO() + + df.to_csv(str_buf) + + buf = BytesIO(str_buf.getvalue().encode('utf-8')) + + s3_resource.Bucket("pandas-test").put_object( + Key="large-file.csv", + Body=buf) + + with caplog.at_level(logging.DEBUG, logger='s3fs.core'): + read_csv("s3://pandas-test/large-file.csv", nrows=5) + # log of fetch_range (start, stop) + assert ((0, 5505024) in set(x.args[-2:] for x in caplog.records))