Skip to content

pd.read_parquet can't handle an S3 directory #28490

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
rjurney opened this issue Sep 18, 2019 · 14 comments
Closed

pd.read_parquet can't handle an S3 directory #28490

rjurney opened this issue Sep 18, 2019 · 14 comments
Labels
Duplicate Report Duplicate issue or pull request IO Parquet parquet, feather

Comments

@rjurney
Copy link

rjurney commented Sep 18, 2019

Summary

pyarrow can load parquet files directly from S3. So can Dask. pandas seems to not be able to. This would be really cool and since you use pyarrow underneath it should be easy.

For example in pyarrow, even with push-down filters:

stocks_close_ds = ParquetDataset(
    'data/v4.parquet',
    filters=[('Ticker','=',ticker)]
)
table = stocks_close_ds.read()
stocks_close_df = table.to_pandas()

Code

This code loads field names from an S3 file then loads an S3 directory containing lots of parquet files. Note that I have repeatedly verified that files are there and are publicly accessible. boto3 can directly access them individually.

# Tags and their indexes were precomputed using PySpark
sorted_all_tags = json.loads(read_from_s3('sorted_all_tags.2000.json', overwrite=True))
max_index = sorted_all_tags[-1][0] + 1

# Pandas can load Parquet data using pyarrow or fastparquet
s3_path = f's3://{BUCKET}/{PATH}/Questions.Stratified.Final.2000.parquet'
posts_df = pd.read_parquet(
    s3_path,
    columns=['_Body'] + ['label_{}'.format(i) for i in range(0, max_index)],
    engine='pyarrow'
)
posts_df.head(2)

It throws this exception if I use s3://bucket/path/file_dir:

ArrowIOError                              Traceback (most recent call last)
<ipython-input-6-f37a90c29660> in <module>
      5     s3_path,
      6     columns=['_Body'] + ['label_{}'.format(i) for i in range(0, max_index)],
----> 7     engine='pyarrow'
      8 )
      9 posts_df.head(2)

~/anaconda3/envs/deep/lib/python3.6/site-packages/pandas/io/parquet.py in read_parquet(path, engine, columns, **kwargs)
    294
    295     impl = get_engine(engine)
--> 296     return impl.read(path, columns=columns, **kwargs)

~/anaconda3/envs/deep/lib/python3.6/site-packages/pandas/io/parquet.py in read(self, path, columns, **kwargs)
    123         kwargs["use_pandas_metadata"] = True
    124         result = self.api.parquet.read_table(
--> 125             path, columns=columns, **kwargs
    126         ).to_pandas()
    127         if should_close:

~/anaconda3/envs/deep/lib/python3.6/site-packages/pyarrow/parquet.py in read_table(source, columns, use_threads, metadata, use_pandas_metadata, memory_map, filesystem, filters)
   1212                             filesystem=filesystem, filters=filters)
   1213     else:
-> 1214         pf = ParquetFile(source, metadata=metadata, memory_map=memory_map)
   1215     return pf.read(columns=columns, use_threads=use_threads,
   1216                    use_pandas_metadata=use_pandas_metadata)

~/anaconda3/envs/deep/lib/python3.6/site-packages/pyarrow/parquet.py in __init__(self, source, metadata, common_metadata, memory_map)
    130                  memory_map=True):
    131         self.reader = ParquetReader()
--> 132         self.reader.open(source, use_memory_map=memory_map, metadata=metadata)
    133         self.common_metadata = common_metadata
    134         self._nested_paths_by_prefix = self._build_nested_paths()

~/anaconda3/envs/deep/lib/python3.6/site-packages/pyarrow/_parquet.pyx in pyarrow._parquet.ParquetReader.open()

~/anaconda3/envs/deep/lib/python3.6/site-packages/pyarrow/error.pxi in pyarrow.lib.check_status()

ArrowIOError: Invalid Parquet file size is 0 bytes

It throws this exception if I use s3://bucket/path/file_dir/* or *.parquet:

FileNotFoundError                         Traceback (most recent call last)
~/anaconda3/envs/deep/lib/python3.6/site-packages/pandas/io/s3.py in get_filepath_or_buffer(filepath_or_buffer, encoding, compression, mode)
     29     try:
---> 30         filepath_or_buffer = fs.open(_strip_schema(filepath_or_buffer), mode)
     31     except (compat.FileNotFoundError, NoCredentialsError):

~/anaconda3/envs/deep/lib/python3.6/site-packages/fsspec/spec.py in open(self, path, mode, block_size, **kwargs)
    668             f = self._open(path, mode=mode, block_size=block_size,
--> 669                            autocommit=ac, **kwargs)
    670             if not ac:

~/anaconda3/envs/deep/lib/python3.6/site-packages/s3fs/core.py in _open(self, path, mode, block_size, acl, version_id, fill_cache, cache_type, autocommit, **kwargs)
    302                       s3_additional_kwargs=kw, cache_type=cache_type,
--> 303                       autocommit=autocommit)
    304

~/anaconda3/envs/deep/lib/python3.6/site-packages/s3fs/core.py in __init__(self, s3, path, mode, block_size, acl, version_id, fill_cache, s3_additional_kwargs, autocommit, cache_type)
    919         super().__init__(s3, path, mode, block_size, autocommit=autocommit,
--> 920                          cache_type=cache_type)
    921         self.s3 = self.fs  # compatibility

~/anaconda3/envs/deep/lib/python3.6/site-packages/fsspec/spec.py in __init__(self, fs, path, mode, block_size, autocommit, cache_type, **kwargs)
    863             if not hasattr(self, 'details'):
--> 864                 self.details = fs.info(path)
    865             self.size = self.details['size']

~/anaconda3/envs/deep/lib/python3.6/site-packages/s3fs/core.py in info(self, path, version_id)
    478                 raise ValueError('Failed to head path %r: %s' % (path, e))
--> 479         return super().info(path)
    480

~/anaconda3/envs/deep/lib/python3.6/site-packages/fsspec/spec.py in info(self, path, **kwargs)
    488         else:
--> 489             raise FileNotFoundError(path)
    490

FileNotFoundError: BUCKET/PATH/Questions.Stratified.Final.2000.parquet/*

During handling of the above exception, another exception occurred:

FileNotFoundError                         Traceback (most recent call last)
<ipython-input-9-5104e6b4fbad> in <module>
      9     s3_path,
     10     columns=['_Body'] + ['label_{}'.format(i) for i in range(0, max_index)],
---> 11     engine='pyarrow'
     12 )
     13 posts_df.head(2)

~/anaconda3/envs/deep/lib/python3.6/site-packages/pandas/io/parquet.py in read_parquet(path, engine, columns, **kwargs)
    280
    281     impl = get_engine(engine)
--> 282     return impl.read(path, columns=columns, **kwargs)

~/anaconda3/envs/deep/lib/python3.6/site-packages/pandas/io/parquet.py in read(self, path, columns, **kwargs)
    123
    124     def read(self, path, columns=None, **kwargs):
--> 125         path, _, _, should_close = get_filepath_or_buffer(path)
    126
    127         kwargs['use_pandas_metadata'] = True

~/anaconda3/envs/deep/lib/python3.6/site-packages/pandas/io/common.py in get_filepath_or_buffer(filepath_or_buffer, encoding, compression, mode)
    214                                          encoding=encoding,
    215                                          compression=compression,
--> 216                                          mode=mode)
    217
    218     if is_gcs_url(filepath_or_buffer):

~/anaconda3/envs/deep/lib/python3.6/site-packages/pandas/io/s3.py in get_filepath_or_buffer(filepath_or_buffer, encoding, compression, mode)
     37         # for that bucket.
     38         fs = s3fs.S3FileSystem(anon=True)
---> 39         filepath_or_buffer = fs.open(_strip_schema(filepath_or_buffer), mode)
     40     return filepath_or_buffer, None, compression, True

~/anaconda3/envs/deep/lib/python3.6/site-packages/fsspec/spec.py in open(self, path, mode, block_size, **kwargs)
    667             ac = kwargs.pop('autocommit', not self._intrans)
    668             f = self._open(path, mode=mode, block_size=block_size,
--> 669                            autocommit=ac, **kwargs)
    670             if not ac:
    671                 self.transaction.files.append(f)

~/anaconda3/envs/deep/lib/python3.6/site-packages/s3fs/core.py in _open(self, path, mode, block_size, acl, version_id, fill_cache, cache_type, autocommit, **kwargs)
    301                       version_id=version_id, fill_cache=fill_cache,
    302                       s3_additional_kwargs=kw, cache_type=cache_type,
--> 303                       autocommit=autocommit)
    304
    305     def _lsdir(self, path, refresh=False, max_items=None):

~/anaconda3/envs/deep/lib/python3.6/site-packages/s3fs/core.py in __init__(self, s3, path, mode, block_size, acl, version_id, fill_cache, s3_additional_kwargs, autocommit, cache_type)
    918         self.s3_additional_kwargs = s3_additional_kwargs or {}
    919         super().__init__(s3, path, mode, block_size, autocommit=autocommit,
--> 920                          cache_type=cache_type)
    921         self.s3 = self.fs  # compatibility
    922         if self.writable():

~/anaconda3/envs/deep/lib/python3.6/site-packages/fsspec/spec.py in __init__(self, fs, path, mode, block_size, autocommit, cache_type, **kwargs)
    862         if mode == 'rb':
    863             if not hasattr(self, 'details'):
--> 864                 self.details = fs.info(path)
    865             self.size = self.details['size']
    866             self.cache = caches[cache_type](self.blocksize, self._fetch_range,

~/anaconda3/envs/deep/lib/python3.6/site-packages/s3fs/core.py in info(self, path, version_id)
    477             except ParamValidationError as e:
    478                 raise ValueError('Failed to head path %r: %s' % (path, e))
--> 479         return super().info(path)
    480
    481     def ls(self, path, detail=False, refresh=False, **kwargs):

~/anaconda3/envs/deep/lib/python3.6/site-packages/fsspec/spec.py in info(self, path, **kwargs)
    487             return {'name': path, 'size': 0, 'type': 'directory'}
    488         else:
--> 489             raise FileNotFoundError(path)
    490
    491     def checksum(self, path):

FileNotFoundError: BUCKET/PATH/Questions.Stratified.Final.2000.parquet/*

Problem description

It seems that pd.read_parquet can't read a directory structured Parquet file from Amazon S3. I've tried a wildcard and it also throws an error. This is a bummer :(

Expected Output

Load the data, return a DataFrame.

Output of pd.show_versions()

INSTALLED VERSIONS
------------------
commit           : None
python           : 3.6.8.final.0
python-bits      : 64
OS               : Linux
OS-release       : 4.15.0-62-generic
machine          : x86_64
processor        : x86_64
byteorder        : little
LC_ALL           : None
LANG             : en_US.UTF-8
LOCALE           : en_US.UTF-8

pandas           : 0.25.1
numpy            : 1.17.0
pytz             : 2019.2
dateutil         : 2.8.0
pip              : 19.2.2
setuptools       : 41.0.1
Cython           : None
pytest           : None
hypothesis       : None
sphinx           : None
blosc            : None
feather          : None
xlsxwriter       : None
lxml.etree       : 4.4.1
html5lib         : None
pymysql          : None
psycopg2         : None
jinja2           : 2.10.1
IPython          : 7.7.0
pandas_datareader: None
bs4              : 4.8.0
bottleneck       : None
fastparquet      : None
gcsfs            : None
lxml.etree       : 4.4.1
matplotlib       : 3.1.1
numexpr          : None
odfpy            : None
openpyxl         : None
pandas_gbq       : None
pyarrow          : 0.14.1
pytables         : None
s3fs             : 0.3.3
scipy            : 1.3.1
sqlalchemy       : None
tables           : None
xarray           : None
xlrd             : None
xlwt             : None
xlsxwriter       : None
@TomAugspurger
Copy link
Contributor

Can you look into our IO code and write a proposal for how we could support this? Generally, we just deal with a single file / buffer. It's possible that this is doable as long as we hand things off to the parquet engine relatively quickly, but I'm not sure.

@TomAugspurger TomAugspurger added the IO Parquet parquet, feather label Sep 18, 2019
@TomAugspurger
Copy link
Contributor

Can is this specific to S3? Or do we see the same behavior for directories on the local file system?

@TomAugspurger TomAugspurger added this to the Contributions Welcome milestone Sep 18, 2019
@rjurney
Copy link
Author

rjurney commented Sep 18, 2019

@TomAugspurger this problem is specific to S3, it works fine locally. I’ll look into it, thanks.

@jorisvandenbossche
Copy link
Member

I seem to recall from an earlier similar discussion the idea to just pass through to pyarrow in case of s3, and let them handle that (since they support it).

Currently we call get_filepath_or_buffer, and in there we do this for s3:

pandas/pandas/io/common.py

Lines 233 to 238 in 2b28454

if is_s3_url(filepath_or_buffer):
from pandas.io import s3
return s3.get_filepath_or_buffer(
filepath_or_buffer, encoding=encoding, compression=compression, mode=mode
)

which then in our s3 codes tries to open the file:

pandas/pandas/io/s3.py

Lines 20 to 50 in 2b28454

def get_file_and_filesystem(
filepath_or_buffer: FilePathOrBuffer, mode: Optional[str] = None
) -> Tuple[IO, Any]:
from botocore.exceptions import NoCredentialsError
if mode is None:
mode = "rb"
fs = s3fs.S3FileSystem(anon=False)
try:
file = fs.open(_strip_schema(filepath_or_buffer), mode)
except (FileNotFoundError, NoCredentialsError):
# boto3 has troubles when trying to access a public file
# when credentialed...
# An OSError is raised if you have credentials, but they
# aren't valid for that bucket.
# A NoCredentialsError is raised if you don't have creds
# for that bucket.
fs = s3fs.S3FileSystem(anon=True)
file = fs.open(_strip_schema(filepath_or_buffer), mode)
return file, fs
def get_filepath_or_buffer(
filepath_or_buffer: FilePathOrBuffer,
encoding: Optional[str] = None,
compression: Optional[str] = None,
mode: Optional[str] = None,
) -> Tuple[IO, Optional[str], Optional[str], bool]:
file, _fs = get_file_and_filesystem(filepath_or_buffer, mode=mode)
return file, None, compression, True

But since you can pass a directory or wildcard, it is not always a valid file.

So one solution would be for this case, just pass the s3 string path directly to pyarrow (and also fastparquet I suppose)?

@rjurney
Copy link
Author

rjurney commented Sep 18, 2019

@jorisvandenbossche @TomAugspurger This is exactly what i was thinking. I'll be happy to make the changes.

Btw, pyarrow.parquet.ParquetDataSet now accepts pushdown filters, which we could add to the read_parquet interface.

See: #26551 See also apache/arrow@d235f69 which went out in pyarrow release which was released in July. The ticket says pandas would add this when pyarrow shipped, and it has shipped :) I would be happy to add this as well. Do you have any thoughts?

The way this works in pyarrow is:

s3_path = f's3://{BUCKET}/{PATH}/Questions.Stratified.Final.2000.parquet'
s3 = s3fs.S3FileSystem()
columns = ['field_1', 'field_2']
filters = ['field_1', '=', 0]

# Have to use pyarrow to load a parquet directory into pandas
posts_df = ParquetDataset(
    s3_path,
    filesystem=s3,
    columns=columns,
    filters=filters,
).read().to_pandas()

@jorisvandenbossche
Copy link
Member

jorisvandenbossche commented Sep 19, 2019

The ticket says pandas would add this when pyarrow shipped, and it has shipped :) I would be happy to add this as well.

It is "added" automatically, because additional arguments are passed through to pyarrow (so it should work already). But, a PR to add this to the documentation so it gets more visibility is certainly welcome!

@TomAugspurger
Copy link
Contributor

TomAugspurger commented Sep 19, 2019 via email

@rjurney
Copy link
Author

rjurney commented Sep 19, 2019

Ok, I'll do it this weekend!

@russellbrooks
Copy link

I've also experienced many issues with pandas reading S3-based parquet files ever since s3fs refactored the file system components into fspsec.

Many of the most recent errors appear to be resolved by forcing fsspec>=0.5.1 which was released 4 days ago. Otherwise s3fs was resolving to fsspec 0.4.0 using conda for me without other constraints.

@rjurney
Copy link
Author

rjurney commented Sep 23, 2019

@jorisvandenbossche @TomAugspurger Hmmm... could this be a solution?

@jorisvandenbossche
Copy link
Member

@rjurney that's a different issue. There have been some recent failures with s3fs, but even if that is fully working, you still have the problem that we should not try to open a file if the argument is a directory.

@dlindelof
Copy link

On our team we would love to see this addressed; are there any plans to do so? Is this something for which you would consider accepting a pull request?

@alimcmaster1
Copy link
Member

@dlindelof agree - lets continue the discussion over in #26388 since that's addressing the same problem

@jorisvandenbossche jorisvandenbossche added the Duplicate Report Duplicate issue or pull request label Apr 21, 2020
@jorisvandenbossche jorisvandenbossche modified the milestones: Contributions Welcome, No action Apr 21, 2020
@jtlz2
Copy link

jtlz2 commented Apr 14, 2022

Is this still an issue for anyone else? :(

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Duplicate Report Duplicate issue or pull request IO Parquet parquet, feather
Projects
None yet
Development

No branches or pull requests

7 participants