Skip to content

Commit bf0a15d

Browse files
committed
Merge pull request #11073 from stephen-hoover/stream-csv-from-s3
ENH Enable streaming from S3
2 parents 5ee3a4f + 67879f5 commit bf0a15d

File tree

4 files changed

+204
-13
lines changed

4 files changed

+204
-13
lines changed

asv_bench/benchmarks/io_bench.py

+35-2
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
from .pandas_vb_common import *
2-
from pandas import concat, Timestamp
2+
from pandas import concat, Timestamp, compat
33
try:
44
from StringIO import StringIO
55
except ImportError:
66
from io import StringIO
7+
import timeit
78

89

910
class frame_to_csv(object):
@@ -135,4 +136,36 @@ def setup(self):
135136
self.df = DataFrame({'float1': randn(10000), 'float2': randn(10000), 'string1': (['foo'] * 10000), 'bool1': ([True] * 10000), 'int1': np.random.randint(0, 100000, size=10000), }, index=self.index)
136137

137138
def time_write_csv_standard(self):
138-
self.df.to_csv('__test__.csv')
139+
self.df.to_csv('__test__.csv')
140+
141+
142+
class read_csv_from_s3(object):
143+
# Make sure that we can read part of a file from S3 without
144+
# needing to download the entire thing. Use the timeit.default_timer
145+
# to measure wall time instead of CPU time -- we want to see
146+
# how long it takes to download the data.
147+
timer = timeit.default_timer
148+
params = ([None, "gzip", "bz2"], ["python", "c"])
149+
param_names = ["compression", "engine"]
150+
151+
def setup(self, compression, engine):
152+
if compression == "bz2" and engine == "c" and compat.PY2:
153+
# The Python 2 C parser can't read bz2 from open files.
154+
raise NotImplementedError
155+
try:
156+
import boto
157+
except ImportError:
158+
# Skip these benchmarks if `boto` is not installed.
159+
raise NotImplementedError
160+
161+
self.big_fname = "s3://pandas-test/large_random.csv"
162+
163+
def time_read_nrows(self, compression, engine):
164+
# Read a small number of rows from a huge (100,000 x 50) table.
165+
ext = ""
166+
if compression == "gzip":
167+
ext = ".gz"
168+
elif compression == "bz2":
169+
ext = ".bz2"
170+
pd.read_csv(self.big_fname + ext, nrows=10,
171+
compression=compression, engine=engine)

doc/source/whatsnew/v0.17.0.txt

+2
Original file line numberDiff line numberDiff line change
@@ -479,6 +479,8 @@ Other enhancements
479479

480480
- In ``pd.read_csv``, recognize "s3n://" and "s3a://" URLs as designating S3 file storage (:issue:`11070`, :issue:`11071`).
481481

482+
- Read CSV files from AWS S3 incrementally, instead of first downloading the entire file. (Full file download still required for compressed files in Python 2.) (:issue:`11070`, :issue:`11073`)
483+
482484
.. _whatsnew_0170.api:
483485

484486
.. _whatsnew_0170.api_breaking:

pandas/io/common.py

+79-4
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,77 @@ class DtypeWarning(Warning):
4747
pass
4848

4949

50+
try:
51+
from boto.s3 import key
52+
class BotoFileLikeReader(key.Key):
53+
"""boto Key modified to be more file-like
54+
55+
This modification of the boto Key will read through a supplied
56+
S3 key once, then stop. The unmodified boto Key object will repeatedly
57+
cycle through a file in S3: after reaching the end of the file,
58+
boto will close the file. Then the next call to `read` or `next` will
59+
re-open the file and start reading from the beginning.
60+
61+
Also adds a `readline` function which will split the returned
62+
values by the `\n` character.
63+
"""
64+
def __init__(self, *args, **kwargs):
65+
encoding = kwargs.pop("encoding", None) # Python 2 compat
66+
super(BotoFileLikeReader, self).__init__(*args, **kwargs)
67+
self.finished_read = False # Add a flag to mark the end of the read.
68+
self.buffer = ""
69+
self.lines = []
70+
if encoding is None and compat.PY3:
71+
encoding = "utf-8"
72+
self.encoding = encoding
73+
self.lines = []
74+
75+
def next(self):
76+
return self.readline()
77+
78+
__next__ = next
79+
80+
def read(self, *args, **kwargs):
81+
if self.finished_read:
82+
return b'' if compat.PY3 else ''
83+
return super(BotoFileLikeReader, self).read(*args, **kwargs)
84+
85+
def close(self, *args, **kwargs):
86+
self.finished_read = True
87+
return super(BotoFileLikeReader, self).close(*args, **kwargs)
88+
89+
def seekable(self):
90+
"""Needed for reading by bz2"""
91+
return False
92+
93+
def readline(self):
94+
"""Split the contents of the Key by '\n' characters."""
95+
if self.lines:
96+
retval = self.lines[0]
97+
self.lines = self.lines[1:]
98+
return retval
99+
if self.finished_read:
100+
if self.buffer:
101+
retval, self.buffer = self.buffer, ""
102+
return retval
103+
else:
104+
raise StopIteration
105+
106+
if self.encoding:
107+
self.buffer = "{}{}".format(self.buffer, self.read(8192).decode(self.encoding))
108+
else:
109+
self.buffer = "{}{}".format(self.buffer, self.read(8192))
110+
111+
split_buffer = self.buffer.split("\n")
112+
self.lines.extend(split_buffer[:-1])
113+
self.buffer = split_buffer[-1]
114+
115+
return self.readline()
116+
except ImportError:
117+
# boto is only needed for reading from S3.
118+
pass
119+
120+
50121
def _is_url(url):
51122
"""Check to see if a URL has a valid protocol.
52123
@@ -166,10 +237,14 @@ def get_filepath_or_buffer(filepath_or_buffer, encoding=None,
166237
conn = boto.connect_s3(anon=True)
167238

168239
b = conn.get_bucket(parsed_url.netloc, validate=False)
169-
k = boto.s3.key.Key(b)
170-
k.key = parsed_url.path
171-
filepath_or_buffer = BytesIO(k.get_contents_as_string(
172-
encoding=encoding))
240+
if compat.PY2 and compression == 'gzip':
241+
k = boto.s3.key.Key(b, parsed_url.path)
242+
filepath_or_buffer = BytesIO(k.get_contents_as_string(
243+
encoding=encoding))
244+
else:
245+
k = BotoFileLikeReader(b, parsed_url.path, encoding=encoding)
246+
k.open('r') # Expose read errors immediately
247+
filepath_or_buffer = k
173248
return filepath_or_buffer, None, compression
174249

175250
return _expand_user(filepath_or_buffer), None, compression

pandas/io/tests/test_parsers.py

+88-7
Original file line numberDiff line numberDiff line change
@@ -4241,16 +4241,22 @@ def setUp(self):
42414241

42424242
@tm.network
42434243
def test_parse_public_s3_bucket(self):
4244-
import nose.tools as nt
4245-
df = pd.read_csv('s3://nyqpug/tips.csv')
4246-
nt.assert_true(isinstance(df, pd.DataFrame))
4247-
nt.assert_false(df.empty)
4248-
tm.assert_frame_equal(pd.read_csv(tm.get_data_path('tips.csv')), df)
4244+
for ext, comp in [('', None), ('.gz', 'gzip'), ('.bz2', 'bz2')]:
4245+
if comp == 'bz2' and compat.PY2:
4246+
# The Python 2 C parser can't read bz2 from S3.
4247+
self.assertRaises(ValueError, pd.read_csv,
4248+
's3://pandas-test/tips.csv' + ext,
4249+
compression=comp)
4250+
else:
4251+
df = pd.read_csv('s3://pandas-test/tips.csv' + ext, compression=comp)
4252+
self.assertTrue(isinstance(df, pd.DataFrame))
4253+
self.assertFalse(df.empty)
4254+
tm.assert_frame_equal(pd.read_csv(tm.get_data_path('tips.csv')), df)
42494255

42504256
# Read public file from bucket with not-public contents
42514257
df = pd.read_csv('s3://cant_get_it/tips.csv')
4252-
nt.assert_true(isinstance(df, pd.DataFrame))
4253-
nt.assert_false(df.empty)
4258+
self.assertTrue(isinstance(df, pd.DataFrame))
4259+
self.assertFalse(df.empty)
42544260
tm.assert_frame_equal(pd.read_csv(tm.get_data_path('tips.csv')), df)
42554261

42564262
@tm.network
@@ -4269,6 +4275,81 @@ def test_parse_public_s3a_bucket(self):
42694275
self.assertFalse(df.empty)
42704276
tm.assert_frame_equal(pd.read_csv(tm.get_data_path('tips.csv')).iloc[:10], df)
42714277

4278+
@tm.network
4279+
def test_parse_public_s3_bucket_nrows(self):
4280+
for ext, comp in [('', None), ('.gz', 'gzip'), ('.bz2', 'bz2')]:
4281+
if comp == 'bz2' and compat.PY2:
4282+
# The Python 2 C parser can't read bz2 from S3.
4283+
self.assertRaises(ValueError, pd.read_csv,
4284+
's3://pandas-test/tips.csv' + ext,
4285+
compression=comp)
4286+
else:
4287+
df = pd.read_csv('s3://pandas-test/tips.csv' + ext, nrows=10, compression=comp)
4288+
self.assertTrue(isinstance(df, pd.DataFrame))
4289+
self.assertFalse(df.empty)
4290+
tm.assert_frame_equal(pd.read_csv(tm.get_data_path('tips.csv')).iloc[:10], df)
4291+
4292+
@tm.network
4293+
def test_parse_public_s3_bucket_chunked(self):
4294+
# Read with a chunksize
4295+
chunksize = 5
4296+
local_tips = pd.read_csv(tm.get_data_path('tips.csv'))
4297+
for ext, comp in [('', None), ('.gz', 'gzip'), ('.bz2', 'bz2')]:
4298+
if comp == 'bz2' and compat.PY2:
4299+
# The Python 2 C parser can't read bz2 from S3.
4300+
self.assertRaises(ValueError, pd.read_csv,
4301+
's3://pandas-test/tips.csv' + ext,
4302+
compression=comp)
4303+
else:
4304+
df_reader = pd.read_csv('s3://pandas-test/tips.csv' + ext,
4305+
chunksize=chunksize, compression=comp)
4306+
self.assertEqual(df_reader.chunksize, chunksize)
4307+
for i_chunk in [0, 1, 2]:
4308+
# Read a couple of chunks and make sure we see them properly.
4309+
df = df_reader.get_chunk()
4310+
self.assertTrue(isinstance(df, pd.DataFrame))
4311+
self.assertFalse(df.empty)
4312+
true_df = local_tips.iloc[chunksize * i_chunk: chunksize * (i_chunk + 1)]
4313+
true_df = true_df.reset_index().drop('index', axis=1) # Chunking doesn't preserve row numbering
4314+
tm.assert_frame_equal(true_df, df)
4315+
4316+
@tm.network
4317+
def test_parse_public_s3_bucket_chunked_python(self):
4318+
# Read with a chunksize using the Python parser
4319+
chunksize = 5
4320+
local_tips = pd.read_csv(tm.get_data_path('tips.csv'))
4321+
for ext, comp in [('', None), ('.gz', 'gzip'), ('.bz2', 'bz2')]:
4322+
df_reader = pd.read_csv('s3://pandas-test/tips.csv' + ext,
4323+
chunksize=chunksize, compression=comp,
4324+
engine='python')
4325+
self.assertEqual(df_reader.chunksize, chunksize)
4326+
for i_chunk in [0, 1, 2]:
4327+
# Read a couple of chunks and make sure we see them properly.
4328+
df = df_reader.get_chunk()
4329+
self.assertTrue(isinstance(df, pd.DataFrame))
4330+
self.assertFalse(df.empty)
4331+
true_df = local_tips.iloc[chunksize * i_chunk: chunksize * (i_chunk + 1)]
4332+
true_df = true_df.reset_index().drop('index', axis=1) # Chunking doesn't preserve row numbering
4333+
tm.assert_frame_equal(true_df, df)
4334+
4335+
@tm.network
4336+
def test_parse_public_s3_bucket_python(self):
4337+
for ext, comp in [('', None), ('.gz', 'gzip'), ('.bz2', 'bz2')]:
4338+
df = pd.read_csv('s3://pandas-test/tips.csv' + ext, engine='python',
4339+
compression=comp)
4340+
self.assertTrue(isinstance(df, pd.DataFrame))
4341+
self.assertFalse(df.empty)
4342+
tm.assert_frame_equal(pd.read_csv(tm.get_data_path('tips.csv')), df)
4343+
4344+
@tm.network
4345+
def test_parse_public_s3_bucket_nrows_python(self):
4346+
for ext, comp in [('', None), ('.gz', 'gzip'), ('.bz2', 'bz2')]:
4347+
df = pd.read_csv('s3://pandas-test/tips.csv' + ext, engine='python',
4348+
nrows=10, compression=comp)
4349+
self.assertTrue(isinstance(df, pd.DataFrame))
4350+
self.assertFalse(df.empty)
4351+
tm.assert_frame_equal(pd.read_csv(tm.get_data_path('tips.csv')).iloc[:10], df)
4352+
42724353
@tm.network
42734354
def test_s3_fails(self):
42744355
import boto

0 commit comments

Comments
 (0)