From a55e848f33ec7c738a3d4fd3f48dea53e9d8bed1 Mon Sep 17 00:00:00 2001 From: AbdealiJK Date: Sat, 11 Nov 2017 17:00:32 +0530 Subject: [PATCH] hdfs: Allow hdfs read/write for files Now the following will work: If hdfs3 is not installed, Throws: ImportError: The hdfs3 library is required to handle hdfs files If hdfs3 is installed but libhdfs3 is not installed, Throws: ImportError: Can not find the shared library: libhdfs3.so If hdfs3 is installed it works for the code: pd.read_csv("hdfs://localhost:9000/tmp/a.csv") If hdfs3 is installed and HADOOP_CONF_DIR is set, it works for the code: HADOOP_CONF_DIR=/usr/local/Cellar/hadoop/2.7.0/libexec/etc/hadoop/ pd.read_csv("hdfs:///tmp/a.csv") --- doc/source/io.rst | 10 ++++++++-- doc/source/whatsnew/v0.22.0.txt | 1 + pandas/compat/__init__.py | 6 ++++-- pandas/io/common.py | 25 +++++++++++++++++++++++-- pandas/io/hdfs.py | 22 ++++++++++++++++++++++ pandas/io/s3.py | 7 +------ pandas/tests/io/test_hdfs.py | 8 ++++++++ 7 files changed, 67 insertions(+), 12 deletions(-) create mode 100644 pandas/io/hdfs.py create mode 100644 pandas/tests/io/test_hdfs.py diff --git a/doc/source/io.rst b/doc/source/io.rst index 1a777c3e0b15f..b010af5cee074 100644 --- a/doc/source/io.rst +++ b/doc/source/io.rst @@ -78,7 +78,7 @@ Basic filepath_or_buffer : various Either a path to a file (a :class:`python:str`, :class:`python:pathlib.Path`, - or :class:`py:py._path.local.LocalPath`), URL (including http, ftp, and S3 + or :class:`py:py._path.local.LocalPath`), URL (including http, ftp, hdfs, and S3 locations), or any object with a ``read()`` method (such as an open file or :class:`~python:io.StringIO`). sep : str, defaults to ``','`` for :func:`read_csv`, ``\t`` for :func:`read_table` @@ -1579,6 +1579,12 @@ You can pass in a URL to a CSV file: df = pd.read_csv('https://download.bls.gov/pub/time.series/cu/cu.item', sep='\t') +Or a hdfs URL: + +.. code-block:: python + + df = pd.read_csv('hdfs://:/pandas-test/tips.csv') + S3 URLs are handled as well: .. code-block:: python @@ -1849,7 +1855,7 @@ The parser will try to parse a ``DataFrame`` if ``typ`` is not supplied or is ``None``. To explicitly force ``Series`` parsing, pass ``typ=series`` - ``filepath_or_buffer`` : a **VALID** JSON string or file handle / StringIO. The string could be - a URL. Valid URL schemes include http, ftp, S3, and file. For file URLs, a host + a URL. Valid URL schemes include http, ftp, hdfs, S3, and file. For file URLs, a host is expected. For instance, a local file could be file ://localhost/path/to/table.json - ``typ`` : type of object to recover (series or frame), default 'frame' diff --git a/doc/source/whatsnew/v0.22.0.txt b/doc/source/whatsnew/v0.22.0.txt index 9dc10a09378f8..bde99eb0ab573 100644 --- a/doc/source/whatsnew/v0.22.0.txt +++ b/doc/source/whatsnew/v0.22.0.txt @@ -139,6 +139,7 @@ Other Enhancements - :func:`read_excel()` has gained the ``nrows`` parameter (:issue:`16645`) - :func:``DataFrame.to_json`` and ``Series.to_json`` now accept an ``index`` argument which allows the user to exclude the index from the JSON output (:issue:`17394`) - ``IntervalIndex.to_tuples()`` has gained the ``na_tuple`` parameter to control whether NA is returned as a tuple of NA, or NA itself (:issue:`18756`) +- :func:`read_csv` now supports reading from hdfs by giving "hdfs:///tmp/data.csv". The hadoop configs will try to be automatically found. The configs can also be mentioned using the format "hdfs://:/tmp/data.csv" .. _whatsnew_0220.api_breaking: diff --git a/pandas/compat/__init__.py b/pandas/compat/__init__.py index 80a2c05d86971..8940661f4aa56 100644 --- a/pandas/compat/__init__.py +++ b/pandas/compat/__init__.py @@ -44,7 +44,7 @@ PY36 = (sys.version_info >= (3, 6)) PYPY = (platform.python_implementation() == 'PyPy') -try: +try: # Python 2 imports import __builtin__ as builtins # not writeable when instantiated with string, doesn't handle unicode well from cStringIO import StringIO as cStringIO @@ -53,12 +53,14 @@ BytesIO = StringIO import cPickle import httplib -except ImportError: + from urlparse import urlparse as parse_url +except ImportError: # Equivalent Python 3 imports import builtins from io import StringIO, BytesIO cStringIO = StringIO import pickle as cPickle import http.client as httplib + from urllib.parse import urlparse as parse_url from pandas.compat.chainmap import DeepChainMap diff --git a/pandas/io/common.py b/pandas/io/common.py index 534c1e0671150..87eb50d686711 100644 --- a/pandas/io/common.py +++ b/pandas/io/common.py @@ -99,6 +99,14 @@ def _is_s3_url(url): return False +def _is_hdfs_url(url): + """Check for an hdfs url""" + try: + return parse_url(url).scheme == 'hdfs' + except: + return False + + def _expand_user(filepath_or_buffer): """Return the argument with an initial component of ~ or ~user replaced by that user's home directory. @@ -201,6 +209,12 @@ def get_filepath_or_buffer(filepath_or_buffer, encoding=None, encoding=encoding, compression=compression) + if _is_hdfs_url(filepath_or_buffer): + from pandas.io import hdfs + return hdfs.get_filepath_or_buffer(filepath_or_buffer, + encoding=encoding, + compression=compression) + if isinstance(filepath_or_buffer, (compat.string_types, compat.binary_type, mmap.mmap)): @@ -314,12 +328,19 @@ def _get_handle(path_or_buf, mode, encoding=None, compression=None, handles : list of file-like objects A list of file-like object that were openned in this function. """ + need_text_wrapping = [BytesIO] try: from s3fs import S3File - need_text_wrapping = (BytesIO, S3File) + need_text_wrapping.append(S3File) + except ImportError: + pass + try: + from hdfs3 import HDFile + need_text_wrapping.append(HDFile) except ImportError: - need_text_wrapping = (BytesIO,) + pass + need_text_wrapping = tuple(need_text_wrapping) handles = list() f = path_or_buf diff --git a/pandas/io/hdfs.py b/pandas/io/hdfs.py new file mode 100644 index 0000000000000..af8ffa1afe976 --- /dev/null +++ b/pandas/io/hdfs.py @@ -0,0 +1,22 @@ +""" s3 support for remote file interactivity """ +from pandas import compat +try: + import hdfs3 +except: + raise ImportError("The hdfs3 library is required to handle hdfs files") + + +def get_filepath_or_buffer(filepath_or_buffer, encoding=None, + compression=None): + parsed_url = compat.parse_url(filepath_or_buffer) + if ":" in parsed_url.netloc: + host, port = parsed_url.netloc.rsplit(":", 1) + try: + port = int(port) + fs = hdfs3.HDFileSystem(host=host, port=port) + except ValueError: + pass + else: + fs = hdfs3.HDFileSystem() + filepath_or_buffer = fs.open(parsed_url.path) + return filepath_or_buffer, None, compression diff --git a/pandas/io/s3.py b/pandas/io/s3.py index 5e48de757d00e..c8f817d918065 100644 --- a/pandas/io/s3.py +++ b/pandas/io/s3.py @@ -6,15 +6,10 @@ except: raise ImportError("The s3fs library is required to handle s3 files") -if compat.PY3: - from urllib.parse import urlparse as parse_url -else: - from urlparse import urlparse as parse_url - def _strip_schema(url): """Returns the url without the s3:// part""" - result = parse_url(url) + result = compat.parse_url(url) return result.netloc + result.path diff --git a/pandas/tests/io/test_hdfs.py b/pandas/tests/io/test_hdfs.py new file mode 100644 index 0000000000000..a2658bc68a482 --- /dev/null +++ b/pandas/tests/io/test_hdfs.py @@ -0,0 +1,8 @@ +from pandas.io.common import _is_hdfs_url + + +class TestHDFSURL(object): + + def test_is_hdfs_url(self): + assert _is_hdfs_url("hdfs://pandas/somethingelse.com") + assert not _is_hdfs_url("hdf://pandas/somethingelse.com")