diff --git a/asv_bench/benchmarks/io_bench.py b/asv_bench/benchmarks/io_bench.py index a171641502d3c..0f15ab6e5e142 100644 --- a/asv_bench/benchmarks/io_bench.py +++ b/asv_bench/benchmarks/io_bench.py @@ -1,9 +1,10 @@ from .pandas_vb_common import * -from pandas import concat, Timestamp +from pandas import concat, Timestamp, compat try: from StringIO import StringIO except ImportError: from io import StringIO +import timeit class frame_to_csv(object): @@ -135,4 +136,36 @@ def setup(self): self.df = DataFrame({'float1': randn(10000), 'float2': randn(10000), 'string1': (['foo'] * 10000), 'bool1': ([True] * 10000), 'int1': np.random.randint(0, 100000, size=10000), }, index=self.index) def time_write_csv_standard(self): - self.df.to_csv('__test__.csv') \ No newline at end of file + self.df.to_csv('__test__.csv') + + +class read_csv_from_s3(object): + # Make sure that we can read part of a file from S3 without + # needing to download the entire thing. Use the timeit.default_timer + # to measure wall time instead of CPU time -- we want to see + # how long it takes to download the data. + timer = timeit.default_timer + params = ([None, "gzip", "bz2"], ["python", "c"]) + param_names = ["compression", "engine"] + + def setup(self, compression, engine): + if compression == "bz2" and engine == "c" and compat.PY2: + # The Python 2 C parser can't read bz2 from open files. + raise NotImplementedError + try: + import boto + except ImportError: + # Skip these benchmarks if `boto` is not installed. + raise NotImplementedError + + self.big_fname = "s3://pandas-test/large_random.csv" + + def time_read_nrows(self, compression, engine): + # Read a small number of rows from a huge (100,000 x 50) table. + ext = "" + if compression == "gzip": + ext = ".gz" + elif compression == "bz2": + ext = ".bz2" + pd.read_csv(self.big_fname + ext, nrows=10, + compression=compression, engine=engine) diff --git a/doc/source/whatsnew/v0.17.0.txt b/doc/source/whatsnew/v0.17.0.txt index 9d8532aa3649a..5f9f7574a282b 100644 --- a/doc/source/whatsnew/v0.17.0.txt +++ b/doc/source/whatsnew/v0.17.0.txt @@ -479,6 +479,8 @@ Other enhancements - In ``pd.read_csv``, recognize "s3n://" and "s3a://" URLs as designating S3 file storage (:issue:`11070`, :issue:`11071`). +- Read CSV files from AWS S3 incrementally, instead of first downloading the entire file. (Full file download still required for compressed files in Python 2.) (:issue:`11070`, :issue:`11073`) + .. _whatsnew_0170.api: .. _whatsnew_0170.api_breaking: diff --git a/pandas/io/common.py b/pandas/io/common.py index 5ab5640ca12c0..7095a0fd60f2a 100644 --- a/pandas/io/common.py +++ b/pandas/io/common.py @@ -47,6 +47,77 @@ class DtypeWarning(Warning): pass +try: + from boto.s3 import key + class BotoFileLikeReader(key.Key): + """boto Key modified to be more file-like + + This modification of the boto Key will read through a supplied + S3 key once, then stop. The unmodified boto Key object will repeatedly + cycle through a file in S3: after reaching the end of the file, + boto will close the file. Then the next call to `read` or `next` will + re-open the file and start reading from the beginning. + + Also adds a `readline` function which will split the returned + values by the `\n` character. + """ + def __init__(self, *args, **kwargs): + encoding = kwargs.pop("encoding", None) # Python 2 compat + super(BotoFileLikeReader, self).__init__(*args, **kwargs) + self.finished_read = False # Add a flag to mark the end of the read. + self.buffer = "" + self.lines = [] + if encoding is None and compat.PY3: + encoding = "utf-8" + self.encoding = encoding + self.lines = [] + + def next(self): + return self.readline() + + __next__ = next + + def read(self, *args, **kwargs): + if self.finished_read: + return b'' if compat.PY3 else '' + return super(BotoFileLikeReader, self).read(*args, **kwargs) + + def close(self, *args, **kwargs): + self.finished_read = True + return super(BotoFileLikeReader, self).close(*args, **kwargs) + + def seekable(self): + """Needed for reading by bz2""" + return False + + def readline(self): + """Split the contents of the Key by '\n' characters.""" + if self.lines: + retval = self.lines[0] + self.lines = self.lines[1:] + return retval + if self.finished_read: + if self.buffer: + retval, self.buffer = self.buffer, "" + return retval + else: + raise StopIteration + + if self.encoding: + self.buffer = "{}{}".format(self.buffer, self.read(8192).decode(self.encoding)) + else: + self.buffer = "{}{}".format(self.buffer, self.read(8192)) + + split_buffer = self.buffer.split("\n") + self.lines.extend(split_buffer[:-1]) + self.buffer = split_buffer[-1] + + return self.readline() +except ImportError: + # boto is only needed for reading from S3. + pass + + def _is_url(url): """Check to see if a URL has a valid protocol. @@ -166,10 +237,14 @@ def get_filepath_or_buffer(filepath_or_buffer, encoding=None, conn = boto.connect_s3(anon=True) b = conn.get_bucket(parsed_url.netloc, validate=False) - k = boto.s3.key.Key(b) - k.key = parsed_url.path - filepath_or_buffer = BytesIO(k.get_contents_as_string( - encoding=encoding)) + if compat.PY2 and compression == 'gzip': + k = boto.s3.key.Key(b, parsed_url.path) + filepath_or_buffer = BytesIO(k.get_contents_as_string( + encoding=encoding)) + else: + k = BotoFileLikeReader(b, parsed_url.path, encoding=encoding) + k.open('r') # Expose read errors immediately + filepath_or_buffer = k return filepath_or_buffer, None, compression return _expand_user(filepath_or_buffer), None, compression diff --git a/pandas/io/tests/test_parsers.py b/pandas/io/tests/test_parsers.py index 205140e02a8ea..70a49e6bd6782 100755 --- a/pandas/io/tests/test_parsers.py +++ b/pandas/io/tests/test_parsers.py @@ -4241,16 +4241,22 @@ def setUp(self): @tm.network def test_parse_public_s3_bucket(self): - import nose.tools as nt - df = pd.read_csv('s3://nyqpug/tips.csv') - nt.assert_true(isinstance(df, pd.DataFrame)) - nt.assert_false(df.empty) - tm.assert_frame_equal(pd.read_csv(tm.get_data_path('tips.csv')), df) + for ext, comp in [('', None), ('.gz', 'gzip'), ('.bz2', 'bz2')]: + if comp == 'bz2' and compat.PY2: + # The Python 2 C parser can't read bz2 from S3. + self.assertRaises(ValueError, pd.read_csv, + 's3://pandas-test/tips.csv' + ext, + compression=comp) + else: + df = pd.read_csv('s3://pandas-test/tips.csv' + ext, compression=comp) + self.assertTrue(isinstance(df, pd.DataFrame)) + self.assertFalse(df.empty) + tm.assert_frame_equal(pd.read_csv(tm.get_data_path('tips.csv')), df) # Read public file from bucket with not-public contents df = pd.read_csv('s3://cant_get_it/tips.csv') - nt.assert_true(isinstance(df, pd.DataFrame)) - nt.assert_false(df.empty) + self.assertTrue(isinstance(df, pd.DataFrame)) + self.assertFalse(df.empty) tm.assert_frame_equal(pd.read_csv(tm.get_data_path('tips.csv')), df) @tm.network @@ -4269,6 +4275,81 @@ def test_parse_public_s3a_bucket(self): self.assertFalse(df.empty) tm.assert_frame_equal(pd.read_csv(tm.get_data_path('tips.csv')).iloc[:10], df) + @tm.network + def test_parse_public_s3_bucket_nrows(self): + for ext, comp in [('', None), ('.gz', 'gzip'), ('.bz2', 'bz2')]: + if comp == 'bz2' and compat.PY2: + # The Python 2 C parser can't read bz2 from S3. + self.assertRaises(ValueError, pd.read_csv, + 's3://pandas-test/tips.csv' + ext, + compression=comp) + else: + df = pd.read_csv('s3://pandas-test/tips.csv' + ext, nrows=10, compression=comp) + self.assertTrue(isinstance(df, pd.DataFrame)) + self.assertFalse(df.empty) + tm.assert_frame_equal(pd.read_csv(tm.get_data_path('tips.csv')).iloc[:10], df) + + @tm.network + def test_parse_public_s3_bucket_chunked(self): + # Read with a chunksize + chunksize = 5 + local_tips = pd.read_csv(tm.get_data_path('tips.csv')) + for ext, comp in [('', None), ('.gz', 'gzip'), ('.bz2', 'bz2')]: + if comp == 'bz2' and compat.PY2: + # The Python 2 C parser can't read bz2 from S3. + self.assertRaises(ValueError, pd.read_csv, + 's3://pandas-test/tips.csv' + ext, + compression=comp) + else: + df_reader = pd.read_csv('s3://pandas-test/tips.csv' + ext, + chunksize=chunksize, compression=comp) + self.assertEqual(df_reader.chunksize, chunksize) + for i_chunk in [0, 1, 2]: + # Read a couple of chunks and make sure we see them properly. + df = df_reader.get_chunk() + self.assertTrue(isinstance(df, pd.DataFrame)) + self.assertFalse(df.empty) + true_df = local_tips.iloc[chunksize * i_chunk: chunksize * (i_chunk + 1)] + true_df = true_df.reset_index().drop('index', axis=1) # Chunking doesn't preserve row numbering + tm.assert_frame_equal(true_df, df) + + @tm.network + def test_parse_public_s3_bucket_chunked_python(self): + # Read with a chunksize using the Python parser + chunksize = 5 + local_tips = pd.read_csv(tm.get_data_path('tips.csv')) + for ext, comp in [('', None), ('.gz', 'gzip'), ('.bz2', 'bz2')]: + df_reader = pd.read_csv('s3://pandas-test/tips.csv' + ext, + chunksize=chunksize, compression=comp, + engine='python') + self.assertEqual(df_reader.chunksize, chunksize) + for i_chunk in [0, 1, 2]: + # Read a couple of chunks and make sure we see them properly. + df = df_reader.get_chunk() + self.assertTrue(isinstance(df, pd.DataFrame)) + self.assertFalse(df.empty) + true_df = local_tips.iloc[chunksize * i_chunk: chunksize * (i_chunk + 1)] + true_df = true_df.reset_index().drop('index', axis=1) # Chunking doesn't preserve row numbering + tm.assert_frame_equal(true_df, df) + + @tm.network + def test_parse_public_s3_bucket_python(self): + for ext, comp in [('', None), ('.gz', 'gzip'), ('.bz2', 'bz2')]: + df = pd.read_csv('s3://pandas-test/tips.csv' + ext, engine='python', + compression=comp) + self.assertTrue(isinstance(df, pd.DataFrame)) + self.assertFalse(df.empty) + tm.assert_frame_equal(pd.read_csv(tm.get_data_path('tips.csv')), df) + + @tm.network + def test_parse_public_s3_bucket_nrows_python(self): + for ext, comp in [('', None), ('.gz', 'gzip'), ('.bz2', 'bz2')]: + df = pd.read_csv('s3://pandas-test/tips.csv' + ext, engine='python', + nrows=10, compression=comp) + self.assertTrue(isinstance(df, pd.DataFrame)) + self.assertFalse(df.empty) + tm.assert_frame_equal(pd.read_csv(tm.get_data_path('tips.csv')).iloc[:10], df) + @tm.network def test_s3_fails(self): import boto