Skip to content

ENH Enable streaming from S3 #11073

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 14, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 35 additions & 2 deletions asv_bench/benchmarks/io_bench.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from .pandas_vb_common import *
from pandas import concat, Timestamp
from pandas import concat, Timestamp, compat
try:
from StringIO import StringIO
except ImportError:
from io import StringIO
import timeit


class frame_to_csv(object):
Expand Down Expand Up @@ -135,4 +136,36 @@ def setup(self):
self.df = DataFrame({'float1': randn(10000), 'float2': randn(10000), 'string1': (['foo'] * 10000), 'bool1': ([True] * 10000), 'int1': np.random.randint(0, 100000, size=10000), }, index=self.index)

def time_write_csv_standard(self):
self.df.to_csv('__test__.csv')
self.df.to_csv('__test__.csv')


class read_csv_from_s3(object):
# Make sure that we can read part of a file from S3 without
# needing to download the entire thing. Use the timeit.default_timer
# to measure wall time instead of CPU time -- we want to see
# how long it takes to download the data.
timer = timeit.default_timer
params = ([None, "gzip", "bz2"], ["python", "c"])
param_names = ["compression", "engine"]

def setup(self, compression, engine):
if compression == "bz2" and engine == "c" and compat.PY2:
# The Python 2 C parser can't read bz2 from open files.
raise NotImplementedError
try:
import boto
except ImportError:
# Skip these benchmarks if `boto` is not installed.
raise NotImplementedError

self.big_fname = "s3://pandas-test/large_random.csv"

def time_read_nrows(self, compression, engine):
# Read a small number of rows from a huge (100,000 x 50) table.
ext = ""
if compression == "gzip":
ext = ".gz"
elif compression == "bz2":
ext = ".bz2"
pd.read_csv(self.big_fname + ext, nrows=10,
compression=compression, engine=engine)
2 changes: 2 additions & 0 deletions doc/source/whatsnew/v0.17.0.txt
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,8 @@ Other enhancements

- In ``pd.read_csv``, recognize "s3n://" and "s3a://" URLs as designating S3 file storage (:issue:`11070`, :issue:`11071`).

- Read CSV files from AWS S3 incrementally, instead of first downloading the entire file. (Full file download still required for compressed files in Python 2.) (:issue:`11070`, :issue:`11073`)

.. _whatsnew_0170.api:

.. _whatsnew_0170.api_breaking:
Expand Down
83 changes: 79 additions & 4 deletions pandas/io/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,77 @@ class DtypeWarning(Warning):
pass


try:
from boto.s3 import key
class BotoFileLikeReader(key.Key):
"""boto Key modified to be more file-like

This modification of the boto Key will read through a supplied
S3 key once, then stop. The unmodified boto Key object will repeatedly
cycle through a file in S3: after reaching the end of the file,
boto will close the file. Then the next call to `read` or `next` will
re-open the file and start reading from the beginning.

Also adds a `readline` function which will split the returned
values by the `\n` character.
"""
def __init__(self, *args, **kwargs):
encoding = kwargs.pop("encoding", None) # Python 2 compat
super(BotoFileLikeReader, self).__init__(*args, **kwargs)
self.finished_read = False # Add a flag to mark the end of the read.
self.buffer = ""
self.lines = []
if encoding is None and compat.PY3:
encoding = "utf-8"
self.encoding = encoding
self.lines = []

def next(self):
return self.readline()

__next__ = next

def read(self, *args, **kwargs):
if self.finished_read:
return b'' if compat.PY3 else ''
return super(BotoFileLikeReader, self).read(*args, **kwargs)

def close(self, *args, **kwargs):
self.finished_read = True
return super(BotoFileLikeReader, self).close(*args, **kwargs)

def seekable(self):
"""Needed for reading by bz2"""
return False

def readline(self):
"""Split the contents of the Key by '\n' characters."""
if self.lines:
retval = self.lines[0]
self.lines = self.lines[1:]
return retval
if self.finished_read:
if self.buffer:
retval, self.buffer = self.buffer, ""
return retval
else:
raise StopIteration

if self.encoding:
self.buffer = "{}{}".format(self.buffer, self.read(8192).decode(self.encoding))
else:
self.buffer = "{}{}".format(self.buffer, self.read(8192))

split_buffer = self.buffer.split("\n")
self.lines.extend(split_buffer[:-1])
self.buffer = split_buffer[-1]

return self.readline()
except ImportError:
# boto is only needed for reading from S3.
pass


def _is_url(url):
"""Check to see if a URL has a valid protocol.

Expand Down Expand Up @@ -166,10 +237,14 @@ def get_filepath_or_buffer(filepath_or_buffer, encoding=None,
conn = boto.connect_s3(anon=True)

b = conn.get_bucket(parsed_url.netloc, validate=False)
k = boto.s3.key.Key(b)
k.key = parsed_url.path
filepath_or_buffer = BytesIO(k.get_contents_as_string(
encoding=encoding))
if compat.PY2 and compression == 'gzip':
k = boto.s3.key.Key(b, parsed_url.path)
filepath_or_buffer = BytesIO(k.get_contents_as_string(
encoding=encoding))
else:
k = BotoFileLikeReader(b, parsed_url.path, encoding=encoding)
k.open('r') # Expose read errors immediately
filepath_or_buffer = k
return filepath_or_buffer, None, compression

return _expand_user(filepath_or_buffer), None, compression
Expand Down
95 changes: 88 additions & 7 deletions pandas/io/tests/test_parsers.py
Original file line number Diff line number Diff line change
Expand Up @@ -4241,16 +4241,22 @@ def setUp(self):

@tm.network
def test_parse_public_s3_bucket(self):
import nose.tools as nt
df = pd.read_csv('s3://nyqpug/tips.csv')
nt.assert_true(isinstance(df, pd.DataFrame))
nt.assert_false(df.empty)
tm.assert_frame_equal(pd.read_csv(tm.get_data_path('tips.csv')), df)
for ext, comp in [('', None), ('.gz', 'gzip'), ('.bz2', 'bz2')]:
if comp == 'bz2' and compat.PY2:
# The Python 2 C parser can't read bz2 from S3.
self.assertRaises(ValueError, pd.read_csv,
's3://pandas-test/tips.csv' + ext,
compression=comp)
else:
df = pd.read_csv('s3://pandas-test/tips.csv' + ext, compression=comp)
self.assertTrue(isinstance(df, pd.DataFrame))
self.assertFalse(df.empty)
tm.assert_frame_equal(pd.read_csv(tm.get_data_path('tips.csv')), df)

# Read public file from bucket with not-public contents
df = pd.read_csv('s3://cant_get_it/tips.csv')
nt.assert_true(isinstance(df, pd.DataFrame))
nt.assert_false(df.empty)
self.assertTrue(isinstance(df, pd.DataFrame))
self.assertFalse(df.empty)
tm.assert_frame_equal(pd.read_csv(tm.get_data_path('tips.csv')), df)

@tm.network
Expand All @@ -4269,6 +4275,81 @@ def test_parse_public_s3a_bucket(self):
self.assertFalse(df.empty)
tm.assert_frame_equal(pd.read_csv(tm.get_data_path('tips.csv')).iloc[:10], df)

@tm.network
def test_parse_public_s3_bucket_nrows(self):
for ext, comp in [('', None), ('.gz', 'gzip'), ('.bz2', 'bz2')]:
if comp == 'bz2' and compat.PY2:
# The Python 2 C parser can't read bz2 from S3.
self.assertRaises(ValueError, pd.read_csv,
's3://pandas-test/tips.csv' + ext,
compression=comp)
else:
df = pd.read_csv('s3://pandas-test/tips.csv' + ext, nrows=10, compression=comp)
self.assertTrue(isinstance(df, pd.DataFrame))
self.assertFalse(df.empty)
tm.assert_frame_equal(pd.read_csv(tm.get_data_path('tips.csv')).iloc[:10], df)

@tm.network
def test_parse_public_s3_bucket_chunked(self):
# Read with a chunksize
chunksize = 5
local_tips = pd.read_csv(tm.get_data_path('tips.csv'))
for ext, comp in [('', None), ('.gz', 'gzip'), ('.bz2', 'bz2')]:
if comp == 'bz2' and compat.PY2:
# The Python 2 C parser can't read bz2 from S3.
self.assertRaises(ValueError, pd.read_csv,
's3://pandas-test/tips.csv' + ext,
compression=comp)
else:
df_reader = pd.read_csv('s3://pandas-test/tips.csv' + ext,
chunksize=chunksize, compression=comp)
self.assertEqual(df_reader.chunksize, chunksize)
for i_chunk in [0, 1, 2]:
# Read a couple of chunks and make sure we see them properly.
df = df_reader.get_chunk()
self.assertTrue(isinstance(df, pd.DataFrame))
self.assertFalse(df.empty)
true_df = local_tips.iloc[chunksize * i_chunk: chunksize * (i_chunk + 1)]
true_df = true_df.reset_index().drop('index', axis=1) # Chunking doesn't preserve row numbering
tm.assert_frame_equal(true_df, df)

@tm.network
def test_parse_public_s3_bucket_chunked_python(self):
# Read with a chunksize using the Python parser
chunksize = 5
local_tips = pd.read_csv(tm.get_data_path('tips.csv'))
for ext, comp in [('', None), ('.gz', 'gzip'), ('.bz2', 'bz2')]:
df_reader = pd.read_csv('s3://pandas-test/tips.csv' + ext,
chunksize=chunksize, compression=comp,
engine='python')
self.assertEqual(df_reader.chunksize, chunksize)
for i_chunk in [0, 1, 2]:
# Read a couple of chunks and make sure we see them properly.
df = df_reader.get_chunk()
self.assertTrue(isinstance(df, pd.DataFrame))
self.assertFalse(df.empty)
true_df = local_tips.iloc[chunksize * i_chunk: chunksize * (i_chunk + 1)]
true_df = true_df.reset_index().drop('index', axis=1) # Chunking doesn't preserve row numbering
tm.assert_frame_equal(true_df, df)

@tm.network
def test_parse_public_s3_bucket_python(self):
for ext, comp in [('', None), ('.gz', 'gzip'), ('.bz2', 'bz2')]:
df = pd.read_csv('s3://pandas-test/tips.csv' + ext, engine='python',
compression=comp)
self.assertTrue(isinstance(df, pd.DataFrame))
self.assertFalse(df.empty)
tm.assert_frame_equal(pd.read_csv(tm.get_data_path('tips.csv')), df)

@tm.network
def test_parse_public_s3_bucket_nrows_python(self):
for ext, comp in [('', None), ('.gz', 'gzip'), ('.bz2', 'bz2')]:
df = pd.read_csv('s3://pandas-test/tips.csv' + ext, engine='python',
nrows=10, compression=comp)
self.assertTrue(isinstance(df, pd.DataFrame))
self.assertFalse(df.empty)
tm.assert_frame_equal(pd.read_csv(tm.get_data_path('tips.csv')).iloc[:10], df)

@tm.network
def test_s3_fails(self):
import boto
Expand Down