From bb3e8e6b63af311bf7f87f30ee7a4f0a3189ffa2 Mon Sep 17 00:00:00 2001 From: cdknox <> Date: Sat, 14 Nov 2020 18:37:22 -0500 Subject: [PATCH 01/41] storage_options as headers and tests added --- pandas/io/common.py | 17 ++++--- pandas/tests/io/test_common.py | 84 ++++++++++++++++++++++++++++++++++ 2 files changed, 95 insertions(+), 6 deletions(-) diff --git a/pandas/io/common.py b/pandas/io/common.py index 695c1671abd61..8f557c51d71db 100644 --- a/pandas/io/common.py +++ b/pandas/io/common.py @@ -288,12 +288,17 @@ def _get_filepath_or_buffer( fsspec_mode += "b" if isinstance(filepath_or_buffer, str) and is_url(filepath_or_buffer): - # TODO: fsspec can also handle HTTP via requests, but leaving this unchanged - if storage_options: - raise ValueError( - "storage_options passed with file object or non-fsspec file path" - ) - req = urlopen(filepath_or_buffer) + # TODO: fsspec can also handle HTTP via requests, but leaving this + # unchanged. using fsspec appears to break the ability to infer if the + # server responded with gzipped data + storage_options = storage_options or dict() + # waiting until now for importing to match intended lazy logic of + # urlopen function defined elsewhere in this module + import urllib.request + + # assuming storage_options is to be interpretted as headers + req = urllib.request.Request(filepath_or_buffer, headers=storage_options) + req = urlopen(req) content_encoding = req.headers.get("Content-Encoding", None) if content_encoding == "gzip": # Override compression based on Content-Encoding header diff --git a/pandas/tests/io/test_common.py b/pandas/tests/io/test_common.py index c7a7101b5fe17..fc0106fd94469 100644 --- a/pandas/tests/io/test_common.py +++ b/pandas/tests/io/test_common.py @@ -1,6 +1,7 @@ """ Tests for the pandas.io.common functionalities """ +from http.server import BaseHTTPRequestHandler, HTTPServer from io import StringIO import mmap import os @@ -16,6 +17,8 @@ import pandas.io.common as icom +import threading + class CustomFSPath: """For testing fspath on unknown objects""" @@ -411,3 +414,84 @@ def test_is_fsspec_url(): assert not icom.is_fsspec_url("random:pandas/somethingelse.com") assert not icom.is_fsspec_url("/local/path") assert not icom.is_fsspec_url("relative/local/path") + + +class HeaderCSVResponder(BaseHTTPRequestHandler): + def do_GET(self): + requested_from_user_agent = self.headers["User-Agent"] + self.send_response(200) + self.send_header("Content-type", "text/csv") + self.end_headers() + response_df = pd.DataFrame( + { + "header": [requested_from_user_agent], + } + ) + response_bytes = response_df.to_csv(index=False).encode("utf-8") + self.wfile.write(response_bytes) + + # server = HTTPServer(("0.0.0.0", 8080), HeaderCSVResponder) + + +class HeaderJSONResponder(BaseHTTPRequestHandler): + def do_GET(self): + requested_from_user_agent = self.headers["User-Agent"] + self.send_response(200) + self.send_header("Content-type", "text/csv") + self.end_headers() + response_df = pd.DataFrame( + { + "header": [requested_from_user_agent], + } + ) + response_bytes = response_df.to_json().encode("utf-8") + self.wfile.write(response_bytes) + + +@pytest.mark.parametrize( + "responder, read_method, port", + [ + (HeaderCSVResponder, pd.read_csv, 34259), + (HeaderJSONResponder, pd.read_json, 34260), + ], +) +def test_server_and_default_headers(responder, read_method, port): + server = HTTPServer(("0.0.0.0", port), responder) + server_thread = threading.Thread(target=server.serve_forever) + server_thread.start() + try: + df_http = read_method(f"http://localhost:{port}") + server.shutdown() + except Exception as e: + df_http = pd.DataFrame({"header": [1]}) + server.shutdown() + + server_thread.join() + df_true = pd.DataFrame({"header": [1]}) + assert not (df_true == df_http).all(axis=None) + + +@pytest.mark.parametrize( + "responder, read_method, port", + [ + (HeaderCSVResponder, pd.read_csv, 34261), + (HeaderJSONResponder, pd.read_json, 34262), + ], +) +def test_server_and_custom_headers(responder, read_method, port): + custom_user_agent = "Super Cool One" + server = HTTPServer(("0.0.0.0", port), responder) + server_thread = threading.Thread(target=server.serve_forever) + server_thread.start() + try: + df_http = read_method( + f"http://localhost:{port}", + storage_options={"User-Agent": custom_user_agent}, + ) + server.shutdown() + except Exception as e: + df_http = pd.DataFrame({"header": [1]}) + server.shutdown() + server_thread.join() + df_true = pd.DataFrame({"header": [custom_user_agent]}) + assert (df_true == df_http).all(axis=None) From db514744d344e3e9c376d0c64ef5c203e836d019 Mon Sep 17 00:00:00 2001 From: cdknox <> Date: Sun, 15 Nov 2020 00:33:43 -0500 Subject: [PATCH 02/41] additional tests - gzip, test additional headers receipt --- pandas/tests/io/test_common.py | 121 +++++++++++++++++++++++++++++---- 1 file changed, 107 insertions(+), 14 deletions(-) diff --git a/pandas/tests/io/test_common.py b/pandas/tests/io/test_common.py index fc0106fd94469..b5c3ad7cef49c 100644 --- a/pandas/tests/io/test_common.py +++ b/pandas/tests/io/test_common.py @@ -1,8 +1,9 @@ """ Tests for the pandas.io.common functionalities """ +import gzip from http.server import BaseHTTPRequestHandler, HTTPServer -from io import StringIO +from io import StringIO, BytesIO import mmap import os from pathlib import Path @@ -420,7 +421,7 @@ class HeaderCSVResponder(BaseHTTPRequestHandler): def do_GET(self): requested_from_user_agent = self.headers["User-Agent"] self.send_response(200) - self.send_header("Content-type", "text/csv") + self.send_header("Content-Type", "text/csv") self.end_headers() response_df = pd.DataFrame( { @@ -430,14 +431,51 @@ def do_GET(self): response_bytes = response_df.to_csv(index=False).encode("utf-8") self.wfile.write(response_bytes) - # server = HTTPServer(("0.0.0.0", 8080), HeaderCSVResponder) + +class HeaderCSVGzipResponder(BaseHTTPRequestHandler): + def do_GET(self): + requested_from_user_agent = self.headers["User-Agent"] + self.send_response(200) + self.send_header("Content-Type", "text/csv") + self.send_header("Content-Encoding", "gzip") + self.end_headers() + response_df = pd.DataFrame( + { + "header": [requested_from_user_agent], + } + ) + response_bytes = response_df.to_csv(index=False).encode("utf-8") + bio = BytesIO() + zipper = gzip.GzipFile(fileobj=bio, mode='w') + zipper.write(response_bytes) + zipper.close() + response_bytes = bio.getvalue() + self.wfile.write(response_bytes) class HeaderJSONResponder(BaseHTTPRequestHandler): def do_GET(self): requested_from_user_agent = self.headers["User-Agent"] self.send_response(200) - self.send_header("Content-type", "text/csv") + #self.send_header("Content-Type", "text/csv") + self.send_header("Content-Type", "application/json") + self.end_headers() + response_df = pd.DataFrame( + { + "header": [requested_from_user_agent], + } + ) + response_bytes = response_df.to_json().encode("utf-8") + self.wfile.write(response_bytes) + + +class HeaderJSONGzipResponder(BaseHTTPRequestHandler): + def do_GET(self): + requested_from_user_agent = self.headers["User-Agent"] + self.send_response(200) + #self.send_header("Content-Type", "text/csv") + self.send_header("Content-Type", "application/json") + self.send_header("Content-Encoding", "gzip") self.end_headers() response_df = pd.DataFrame( { @@ -445,42 +483,61 @@ def do_GET(self): } ) response_bytes = response_df.to_json().encode("utf-8") + bio = BytesIO() + zipper = gzip.GzipFile(fileobj=bio, mode='w') + zipper.write(response_bytes) + zipper.close() + response_bytes = bio.getvalue() + self.wfile.write(response_bytes) + + +class AllHeaderCSVResponder(BaseHTTPRequestHandler): + def do_GET(self): + response_df = pd.DataFrame(self.headers.items()) + self.send_response(200) + self.send_header("Content-Type", "text/csv") + self.end_headers() + response_bytes = response_df.to_csv(index=False).encode("utf-8") self.wfile.write(response_bytes) + @pytest.mark.parametrize( "responder, read_method, port", [ (HeaderCSVResponder, pd.read_csv, 34259), (HeaderJSONResponder, pd.read_json, 34260), + (HeaderCSVGzipResponder, pd.read_csv, 34261), + (HeaderJSONGzipResponder, pd.read_json, 34262), ], ) def test_server_and_default_headers(responder, read_method, port): - server = HTTPServer(("0.0.0.0", port), responder) + server = HTTPServer(("localhost", port), responder) server_thread = threading.Thread(target=server.serve_forever) server_thread.start() try: df_http = read_method(f"http://localhost:{port}") server.shutdown() - except Exception as e: - df_http = pd.DataFrame({"header": [1]}) + except Exception: + df_http = pd.DataFrame({"header": []}) server.shutdown() server_thread.join() - df_true = pd.DataFrame({"header": [1]}) - assert not (df_true == df_http).all(axis=None) + assert not df_http.empty @pytest.mark.parametrize( "responder, read_method, port", [ - (HeaderCSVResponder, pd.read_csv, 34261), - (HeaderJSONResponder, pd.read_json, 34262), + (HeaderCSVResponder, pd.read_csv, 34263), + (HeaderJSONResponder, pd.read_json, 34264), + (HeaderCSVGzipResponder, pd.read_csv, 34265), + (HeaderJSONGzipResponder, pd.read_json, 34266), ], ) def test_server_and_custom_headers(responder, read_method, port): custom_user_agent = "Super Cool One" - server = HTTPServer(("0.0.0.0", port), responder) + server = HTTPServer(("localhost", port), responder) server_thread = threading.Thread(target=server.serve_forever) server_thread.start() try: @@ -489,9 +546,45 @@ def test_server_and_custom_headers(responder, read_method, port): storage_options={"User-Agent": custom_user_agent}, ) server.shutdown() - except Exception as e: - df_http = pd.DataFrame({"header": [1]}) + except Exception: + df_http = pd.DataFrame({"header": []}) server.shutdown() server_thread.join() df_true = pd.DataFrame({"header": [custom_user_agent]}) assert (df_true == df_http).all(axis=None) + + +@pytest.mark.parametrize( + "responder, read_method, port", + [ + (AllHeaderCSVResponder, pd.read_csv, 34267), + ], +) +def test_server_and_custom_headers(responder, read_method, port): + custom_user_agent = "Super Cool One" + custom_auth_token = "Super Secret One" + storage_options = {"User-Agent": custom_user_agent, + 'Auth': custom_auth_token, + } + server = HTTPServer(("localhost", port), responder) + server_thread = threading.Thread(target=server.serve_forever) + server_thread.start() + try: + df_http = read_method( + f"http://localhost:{port}", + storage_options=storage_options, + ) + server.shutdown() + except Exception: + df_http = pd.DataFrame({'0': [], '1': []}) + server.shutdown() + server_thread.join() + df_http = df_http[df_http['0'].isin(storage_options.keys())] + df_http = df_http.sort_values(['0']).reset_index() + df_http = df_http[['0', '1']] + keys = list(sorted(storage_options.keys())) + df_true = pd.DataFrame({'0': [k for k in keys], + '1': [storage_options[k] for k in keys]}) + df_true = df_true.sort_values(['0']) + assert df_http.shape[0] == len(storage_options) + assert (df_true == df_http).all(axis=None) From 6f901b853692a5910a1259eb3264847a4939ff0b Mon Sep 17 00:00:00 2001 From: cdknox <> Date: Thu, 19 Nov 2020 17:30:03 -0500 Subject: [PATCH 03/41] bailed on using threading for testing --- pandas/tests/io/test_common.py | 416 +++++++++++++++++++-------------- 1 file changed, 245 insertions(+), 171 deletions(-) diff --git a/pandas/tests/io/test_common.py b/pandas/tests/io/test_common.py index b5c3ad7cef49c..771e1d5e649e1 100644 --- a/pandas/tests/io/test_common.py +++ b/pandas/tests/io/test_common.py @@ -417,174 +417,248 @@ def test_is_fsspec_url(): assert not icom.is_fsspec_url("relative/local/path") -class HeaderCSVResponder(BaseHTTPRequestHandler): - def do_GET(self): - requested_from_user_agent = self.headers["User-Agent"] - self.send_response(200) - self.send_header("Content-Type", "text/csv") - self.end_headers() - response_df = pd.DataFrame( - { - "header": [requested_from_user_agent], - } - ) - response_bytes = response_df.to_csv(index=False).encode("utf-8") - self.wfile.write(response_bytes) - - -class HeaderCSVGzipResponder(BaseHTTPRequestHandler): - def do_GET(self): - requested_from_user_agent = self.headers["User-Agent"] - self.send_response(200) - self.send_header("Content-Type", "text/csv") - self.send_header("Content-Encoding", "gzip") - self.end_headers() - response_df = pd.DataFrame( - { - "header": [requested_from_user_agent], - } - ) - response_bytes = response_df.to_csv(index=False).encode("utf-8") - bio = BytesIO() - zipper = gzip.GzipFile(fileobj=bio, mode='w') - zipper.write(response_bytes) - zipper.close() - response_bytes = bio.getvalue() - self.wfile.write(response_bytes) - - -class HeaderJSONResponder(BaseHTTPRequestHandler): - def do_GET(self): - requested_from_user_agent = self.headers["User-Agent"] - self.send_response(200) - #self.send_header("Content-Type", "text/csv") - self.send_header("Content-Type", "application/json") - self.end_headers() - response_df = pd.DataFrame( - { - "header": [requested_from_user_agent], - } - ) - response_bytes = response_df.to_json().encode("utf-8") - self.wfile.write(response_bytes) - - -class HeaderJSONGzipResponder(BaseHTTPRequestHandler): - def do_GET(self): - requested_from_user_agent = self.headers["User-Agent"] - self.send_response(200) - #self.send_header("Content-Type", "text/csv") - self.send_header("Content-Type", "application/json") - self.send_header("Content-Encoding", "gzip") - self.end_headers() - response_df = pd.DataFrame( - { - "header": [requested_from_user_agent], - } - ) - response_bytes = response_df.to_json().encode("utf-8") - bio = BytesIO() - zipper = gzip.GzipFile(fileobj=bio, mode='w') - zipper.write(response_bytes) - zipper.close() - response_bytes = bio.getvalue() - self.wfile.write(response_bytes) - - -class AllHeaderCSVResponder(BaseHTTPRequestHandler): - def do_GET(self): - response_df = pd.DataFrame(self.headers.items()) - self.send_response(200) - self.send_header("Content-Type", "text/csv") - self.end_headers() - response_bytes = response_df.to_csv(index=False).encode("utf-8") - self.wfile.write(response_bytes) - - - -@pytest.mark.parametrize( - "responder, read_method, port", - [ - (HeaderCSVResponder, pd.read_csv, 34259), - (HeaderJSONResponder, pd.read_json, 34260), - (HeaderCSVGzipResponder, pd.read_csv, 34261), - (HeaderJSONGzipResponder, pd.read_json, 34262), - ], -) -def test_server_and_default_headers(responder, read_method, port): - server = HTTPServer(("localhost", port), responder) - server_thread = threading.Thread(target=server.serve_forever) - server_thread.start() - try: - df_http = read_method(f"http://localhost:{port}") - server.shutdown() - except Exception: - df_http = pd.DataFrame({"header": []}) - server.shutdown() - - server_thread.join() - assert not df_http.empty - - -@pytest.mark.parametrize( - "responder, read_method, port", - [ - (HeaderCSVResponder, pd.read_csv, 34263), - (HeaderJSONResponder, pd.read_json, 34264), - (HeaderCSVGzipResponder, pd.read_csv, 34265), - (HeaderJSONGzipResponder, pd.read_json, 34266), - ], -) -def test_server_and_custom_headers(responder, read_method, port): - custom_user_agent = "Super Cool One" - server = HTTPServer(("localhost", port), responder) - server_thread = threading.Thread(target=server.serve_forever) - server_thread.start() - try: - df_http = read_method( - f"http://localhost:{port}", - storage_options={"User-Agent": custom_user_agent}, - ) - server.shutdown() - except Exception: - df_http = pd.DataFrame({"header": []}) - server.shutdown() - server_thread.join() - df_true = pd.DataFrame({"header": [custom_user_agent]}) - assert (df_true == df_http).all(axis=None) - - -@pytest.mark.parametrize( - "responder, read_method, port", - [ - (AllHeaderCSVResponder, pd.read_csv, 34267), - ], -) -def test_server_and_custom_headers(responder, read_method, port): - custom_user_agent = "Super Cool One" - custom_auth_token = "Super Secret One" - storage_options = {"User-Agent": custom_user_agent, - 'Auth': custom_auth_token, - } - server = HTTPServer(("localhost", port), responder) - server_thread = threading.Thread(target=server.serve_forever) - server_thread.start() - try: - df_http = read_method( - f"http://localhost:{port}", - storage_options=storage_options, - ) - server.shutdown() - except Exception: - df_http = pd.DataFrame({'0': [], '1': []}) - server.shutdown() - server_thread.join() - df_http = df_http[df_http['0'].isin(storage_options.keys())] - df_http = df_http.sort_values(['0']).reset_index() - df_http = df_http[['0', '1']] - keys = list(sorted(storage_options.keys())) - df_true = pd.DataFrame({'0': [k for k in keys], - '1': [storage_options[k] for k in keys]}) - df_true = df_true.sort_values(['0']) - assert df_http.shape[0] == len(storage_options) - assert (df_true == df_http).all(axis=None) +# class HeaderCSVResponder(BaseHTTPRequestHandler): +# def do_GET(self): +# requested_from_user_agent = self.headers["User-Agent"] +# self.send_response(200) +# self.send_header("Content-Type", "text/csv") +# self.end_headers() +# response_df = pd.DataFrame( +# { +# "header": [requested_from_user_agent], +# } +# ) +# response_bytes = response_df.to_csv(index=False).encode("utf-8") +# self.wfile.write(response_bytes) +# +# +# class HeaderCSVGzipResponder(BaseHTTPRequestHandler): +# def do_GET(self): +# requested_from_user_agent = self.headers["User-Agent"] +# self.send_response(200) +# self.send_header("Content-Type", "text/csv") +# self.send_header("Content-Encoding", "gzip") +# self.end_headers() +# response_df = pd.DataFrame( +# { +# "header": [requested_from_user_agent], +# } +# ) +# response_bytes = response_df.to_csv(index=False).encode("utf-8") +# bio = BytesIO() +# zipper = gzip.GzipFile(fileobj=bio, mode='w') +# zipper.write(response_bytes) +# zipper.close() +# response_bytes = bio.getvalue() +# self.wfile.write(response_bytes) +# +# +# class HeaderJSONResponder(BaseHTTPRequestHandler): +# def do_GET(self): +# requested_from_user_agent = self.headers["User-Agent"] +# self.send_response(200) +# #self.send_header("Content-Type", "text/csv") +# self.send_header("Content-Type", "application/json") +# self.end_headers() +# response_df = pd.DataFrame( +# { +# "header": [requested_from_user_agent], +# } +# ) +# response_bytes = response_df.to_json().encode("utf-8") +# self.wfile.write(response_bytes) +# +# +# class HeaderJSONGzipResponder(BaseHTTPRequestHandler): +# def do_GET(self): +# requested_from_user_agent = self.headers["User-Agent"] +# self.send_response(200) +# #self.send_header("Content-Type", "text/csv") +# self.send_header("Content-Type", "application/json") +# self.send_header("Content-Encoding", "gzip") +# self.end_headers() +# response_df = pd.DataFrame( +# { +# "header": [requested_from_user_agent], +# } +# ) +# response_bytes = response_df.to_json().encode("utf-8") +# bio = BytesIO() +# zipper = gzip.GzipFile(fileobj=bio, mode='w') +# zipper.write(response_bytes) +# zipper.close() +# response_bytes = bio.getvalue() +# self.wfile.write(response_bytes) +# +# +# class AllHeaderCSVResponder(BaseHTTPRequestHandler): +# def do_GET(self): +# response_df = pd.DataFrame(self.headers.items()) +# self.send_response(200) +# self.send_header("Content-Type", "text/csv") +# self.end_headers() +# response_bytes = response_df.to_csv(index=False).encode("utf-8") +# self.wfile.write(response_bytes) +# +# +# +# @pytest.mark.parametrize( +# "responder, read_method, port", +# [ +# (HeaderCSVResponder, pd.read_csv, 34259), +# (HeaderJSONResponder, pd.read_json, 34260), +# (HeaderCSVGzipResponder, pd.read_csv, 34261), +# (HeaderJSONGzipResponder, pd.read_json, 34262), +# ], +# ) +# def test_server_and_default_headers(responder, read_method, port): +# server = HTTPServer(("localhost", port), responder) +# server_thread = threading.Thread(target=server.serve_forever) +# server_thread.start() +# try: +# df_http = read_method(f"http://localhost:{port}") +# server.shutdown() +# except Exception: +# df_http = pd.DataFrame({"header": []}) +# server.shutdown() +# +# server_thread.join() +# assert not df_http.empty +# +# +# @pytest.mark.parametrize( +# "responder, read_method, port", +# [ +# (HeaderCSVResponder, pd.read_csv, 34263), +# (HeaderJSONResponder, pd.read_json, 34264), +# (HeaderCSVGzipResponder, pd.read_csv, 34265), +# (HeaderJSONGzipResponder, pd.read_json, 34266), +# ], +# ) +# def test_server_and_custom_headers(responder, read_method, port): +# custom_user_agent = "Super Cool One" +# server = HTTPServer(("localhost", port), responder) +# server_thread = threading.Thread(target=server.serve_forever) +# server_thread.start() +# try: +# df_http = read_method( +# f"http://localhost:{port}", +# storage_options={"User-Agent": custom_user_agent}, +# ) +# server.shutdown() +# except Exception: +# df_http = pd.DataFrame({"header": []}) +# server.shutdown() +# server_thread.join() +# df_true = pd.DataFrame({"header": [custom_user_agent]}) +# assert (df_true == df_http).all(axis=None) +# +# +# @pytest.mark.parametrize( +# "responder, read_method, port", +# [ +# (AllHeaderCSVResponder, pd.read_csv, 34267), +# ], +# ) +# def test_server_and_custom_headers(responder, read_method, port): +# custom_user_agent = "Super Cool One" +# custom_auth_token = "Super Secret One" +# storage_options = {"User-Agent": custom_user_agent, +# 'Auth': custom_auth_token, +# } +# server = HTTPServer(("localhost", port), responder) +# server_thread = threading.Thread(target=server.serve_forever) +# server_thread.start() +# try: +# df_http = read_method( +# f"http://localhost:{port}", +# storage_options=storage_options, +# ) +# server.shutdown() +# except Exception: +# df_http = pd.DataFrame({'0': [], '1': []}) +# server.shutdown() +# server_thread.join() +# df_http = df_http[df_http['0'].isin(storage_options.keys())] +# df_http = df_http.sort_values(['0']).reset_index() +# df_http = df_http[['0', '1']] +# keys = list(sorted(storage_options.keys())) +# df_true = pd.DataFrame({'0': [k for k in keys], +# '1': [storage_options[k] for k in keys]}) +# df_true = df_true.sort_values(['0']) +# assert df_http.shape[0] == len(storage_options) +# assert (df_true == df_http).all(axis=None) + + +from unittest.mock import MagicMock, patch + + +def test_plain_text_read_csv_custom_headers(): + true_df = pd.DataFrame({"column_name": ["column_value"]}) + df_csv_bytes = true_df.to_csv(index=False).encode("utf-8") + headers = { + "User-Agent": "custom", + "Auth": "other_custom", + } + + class DummyResponse: + headers = { + "Content-Type": "text/csv", + } + + @staticmethod + def read(): + return df_csv_bytes + + @staticmethod + def close(): + pass + + def dummy_response_getter(url): + return DummyResponse() + + dummy_request = MagicMock() + with patch("urllib.request.Request", new=dummy_request): + with patch("urllib.request.urlopen", new=dummy_response_getter): + received_df = pd.read_csv( + "http://localhost:80/test.csv", storage_options=headers + ) + assert dummy_request.called_with(headers=headers) + assert (received_df == true_df).all(axis=None) + + +def test_gzip_read_csv_custom_headers(): + true_df = pd.DataFrame({"column_name": ["column_value"]}) + df_csv_bytes = true_df.to_csv(index=False).encode("utf-8") + headers = { + "User-Agent": "custom", + "Auth": "other_custom", + } + + class DummyResponse: + headers = { + "Content-Type": "text/csv", + "Content-Encoding": "gzip", + } + + @staticmethod + def read(): + bio = BytesIO() + zipper = gzip.GzipFile(fileobj=bio, mode="w") + zipper.write(df_csv_bytes) + zipper.close() + response_bytes = bio.getvalue() + return response_bytes + + @staticmethod + def close(): + pass + + def dummy_response_getter(url): + return DummyResponse() + + dummy_request = MagicMock() + with patch("urllib.request.Request", new=dummy_request): + with patch("urllib.request.urlopen", new=dummy_response_getter): + df = pd.read_csv("http://localhost:80/test.csv", storage_options=headers) + assert dummy_request.called_with(headers=headers) From 3af6a3d9b7f9a13422b2588f474a247bbd839cc8 Mon Sep 17 00:00:00 2001 From: cdknox <> Date: Thu, 19 Nov 2020 17:46:23 -0500 Subject: [PATCH 04/41] clean up comments add json http tests --- pandas/tests/io/test_common.py | 266 ++++++++++----------------------- 1 file changed, 83 insertions(+), 183 deletions(-) diff --git a/pandas/tests/io/test_common.py b/pandas/tests/io/test_common.py index 771e1d5e649e1..dfaf126f8a4ed 100644 --- a/pandas/tests/io/test_common.py +++ b/pandas/tests/io/test_common.py @@ -2,7 +2,6 @@ Tests for the pandas.io.common functionalities """ import gzip -from http.server import BaseHTTPRequestHandler, HTTPServer from io import StringIO, BytesIO import mmap import os @@ -18,7 +17,7 @@ import pandas.io.common as icom -import threading +from unittest.mock import MagicMock, patch class CustomFSPath: @@ -417,183 +416,7 @@ def test_is_fsspec_url(): assert not icom.is_fsspec_url("relative/local/path") -# class HeaderCSVResponder(BaseHTTPRequestHandler): -# def do_GET(self): -# requested_from_user_agent = self.headers["User-Agent"] -# self.send_response(200) -# self.send_header("Content-Type", "text/csv") -# self.end_headers() -# response_df = pd.DataFrame( -# { -# "header": [requested_from_user_agent], -# } -# ) -# response_bytes = response_df.to_csv(index=False).encode("utf-8") -# self.wfile.write(response_bytes) -# -# -# class HeaderCSVGzipResponder(BaseHTTPRequestHandler): -# def do_GET(self): -# requested_from_user_agent = self.headers["User-Agent"] -# self.send_response(200) -# self.send_header("Content-Type", "text/csv") -# self.send_header("Content-Encoding", "gzip") -# self.end_headers() -# response_df = pd.DataFrame( -# { -# "header": [requested_from_user_agent], -# } -# ) -# response_bytes = response_df.to_csv(index=False).encode("utf-8") -# bio = BytesIO() -# zipper = gzip.GzipFile(fileobj=bio, mode='w') -# zipper.write(response_bytes) -# zipper.close() -# response_bytes = bio.getvalue() -# self.wfile.write(response_bytes) -# -# -# class HeaderJSONResponder(BaseHTTPRequestHandler): -# def do_GET(self): -# requested_from_user_agent = self.headers["User-Agent"] -# self.send_response(200) -# #self.send_header("Content-Type", "text/csv") -# self.send_header("Content-Type", "application/json") -# self.end_headers() -# response_df = pd.DataFrame( -# { -# "header": [requested_from_user_agent], -# } -# ) -# response_bytes = response_df.to_json().encode("utf-8") -# self.wfile.write(response_bytes) -# -# -# class HeaderJSONGzipResponder(BaseHTTPRequestHandler): -# def do_GET(self): -# requested_from_user_agent = self.headers["User-Agent"] -# self.send_response(200) -# #self.send_header("Content-Type", "text/csv") -# self.send_header("Content-Type", "application/json") -# self.send_header("Content-Encoding", "gzip") -# self.end_headers() -# response_df = pd.DataFrame( -# { -# "header": [requested_from_user_agent], -# } -# ) -# response_bytes = response_df.to_json().encode("utf-8") -# bio = BytesIO() -# zipper = gzip.GzipFile(fileobj=bio, mode='w') -# zipper.write(response_bytes) -# zipper.close() -# response_bytes = bio.getvalue() -# self.wfile.write(response_bytes) -# -# -# class AllHeaderCSVResponder(BaseHTTPRequestHandler): -# def do_GET(self): -# response_df = pd.DataFrame(self.headers.items()) -# self.send_response(200) -# self.send_header("Content-Type", "text/csv") -# self.end_headers() -# response_bytes = response_df.to_csv(index=False).encode("utf-8") -# self.wfile.write(response_bytes) -# -# -# -# @pytest.mark.parametrize( -# "responder, read_method, port", -# [ -# (HeaderCSVResponder, pd.read_csv, 34259), -# (HeaderJSONResponder, pd.read_json, 34260), -# (HeaderCSVGzipResponder, pd.read_csv, 34261), -# (HeaderJSONGzipResponder, pd.read_json, 34262), -# ], -# ) -# def test_server_and_default_headers(responder, read_method, port): -# server = HTTPServer(("localhost", port), responder) -# server_thread = threading.Thread(target=server.serve_forever) -# server_thread.start() -# try: -# df_http = read_method(f"http://localhost:{port}") -# server.shutdown() -# except Exception: -# df_http = pd.DataFrame({"header": []}) -# server.shutdown() -# -# server_thread.join() -# assert not df_http.empty -# -# -# @pytest.mark.parametrize( -# "responder, read_method, port", -# [ -# (HeaderCSVResponder, pd.read_csv, 34263), -# (HeaderJSONResponder, pd.read_json, 34264), -# (HeaderCSVGzipResponder, pd.read_csv, 34265), -# (HeaderJSONGzipResponder, pd.read_json, 34266), -# ], -# ) -# def test_server_and_custom_headers(responder, read_method, port): -# custom_user_agent = "Super Cool One" -# server = HTTPServer(("localhost", port), responder) -# server_thread = threading.Thread(target=server.serve_forever) -# server_thread.start() -# try: -# df_http = read_method( -# f"http://localhost:{port}", -# storage_options={"User-Agent": custom_user_agent}, -# ) -# server.shutdown() -# except Exception: -# df_http = pd.DataFrame({"header": []}) -# server.shutdown() -# server_thread.join() -# df_true = pd.DataFrame({"header": [custom_user_agent]}) -# assert (df_true == df_http).all(axis=None) -# -# -# @pytest.mark.parametrize( -# "responder, read_method, port", -# [ -# (AllHeaderCSVResponder, pd.read_csv, 34267), -# ], -# ) -# def test_server_and_custom_headers(responder, read_method, port): -# custom_user_agent = "Super Cool One" -# custom_auth_token = "Super Secret One" -# storage_options = {"User-Agent": custom_user_agent, -# 'Auth': custom_auth_token, -# } -# server = HTTPServer(("localhost", port), responder) -# server_thread = threading.Thread(target=server.serve_forever) -# server_thread.start() -# try: -# df_http = read_method( -# f"http://localhost:{port}", -# storage_options=storage_options, -# ) -# server.shutdown() -# except Exception: -# df_http = pd.DataFrame({'0': [], '1': []}) -# server.shutdown() -# server_thread.join() -# df_http = df_http[df_http['0'].isin(storage_options.keys())] -# df_http = df_http.sort_values(['0']).reset_index() -# df_http = df_http[['0', '1']] -# keys = list(sorted(storage_options.keys())) -# df_true = pd.DataFrame({'0': [k for k in keys], -# '1': [storage_options[k] for k in keys]}) -# df_true = df_true.sort_values(['0']) -# assert df_http.shape[0] == len(storage_options) -# assert (df_true == df_http).all(axis=None) - - -from unittest.mock import MagicMock, patch - - -def test_plain_text_read_csv_custom_headers(): +def test_plain_text_read_csv_http_custom_headers(): true_df = pd.DataFrame({"column_name": ["column_value"]}) df_csv_bytes = true_df.to_csv(index=False).encode("utf-8") headers = { @@ -627,7 +450,7 @@ def dummy_response_getter(url): assert (received_df == true_df).all(axis=None) -def test_gzip_read_csv_custom_headers(): +def test_gzip_read_csv_http_custom_headers(): true_df = pd.DataFrame({"column_name": ["column_value"]}) df_csv_bytes = true_df.to_csv(index=False).encode("utf-8") headers = { @@ -647,8 +470,82 @@ def read(): zipper = gzip.GzipFile(fileobj=bio, mode="w") zipper.write(df_csv_bytes) zipper.close() - response_bytes = bio.getvalue() - return response_bytes + gzipped_response = bio.getvalue() + return gzipped_response + + @staticmethod + def close(): + pass + + def dummy_response_getter(url): + return DummyResponse() + + dummy_request = MagicMock() + with patch("urllib.request.Request", new=dummy_request): + with patch("urllib.request.urlopen", new=dummy_response_getter): + received_df = pd.read_csv( + "http://localhost:80/test.csv", storage_options=headers + ) + assert dummy_request.called_with(headers=headers) + assert (received_df == true_df).all(axis=None) + + +def test_plain_text_read_json_http_custom_headers(): + true_df = pd.DataFrame({"column_name": ["column_value"]}) + df_json_bytes = true_df.to_json().encode("utf-8") + headers = { + "User-Agent": "custom", + "Auth": "other_custom", + } + + class DummyResponse: + headers = { + "Content-Type": "application/json", + } + + @staticmethod + def read(): + return df_json_bytes + + @staticmethod + def close(): + pass + + def dummy_response_getter(url): + return DummyResponse() + + dummy_request = MagicMock() + with patch("urllib.request.Request", new=dummy_request): + with patch("urllib.request.urlopen", new=dummy_response_getter): + received_df = pd.read_json( + "http://localhost:80/test.json", storage_options=headers + ) + assert dummy_request.called_with(headers=headers) + assert (received_df == true_df).all(axis=None) + + +def test_gzip_read_json_http_custom_headers(): + true_df = pd.DataFrame({"column_name": ["column_value"]}) + df_json_bytes = true_df.to_json().encode("utf-8") + headers = { + "User-Agent": "custom", + "Auth": "other_custom", + } + + class DummyResponse: + headers = { + "Content-Type": "application/json", + "Content-Encoding": "gzip", + } + + @staticmethod + def read(): + bio = BytesIO() + zipper = gzip.GzipFile(fileobj=bio, mode="w") + zipper.write(df_json_bytes) + zipper.close() + gzipped_response = bio.getvalue() + return gzipped_response @staticmethod def close(): @@ -660,5 +557,8 @@ def dummy_response_getter(url): dummy_request = MagicMock() with patch("urllib.request.Request", new=dummy_request): with patch("urllib.request.urlopen", new=dummy_response_getter): - df = pd.read_csv("http://localhost:80/test.csv", storage_options=headers) + received_df = pd.read_json( + "http://localhost:80/test.json", storage_options=headers + ) assert dummy_request.called_with(headers=headers) + assert (received_df == true_df).all(axis=None) From 8f5a0f138b04360cb636c4450c98e50161ea6222 Mon Sep 17 00:00:00 2001 From: cdknox <> Date: Thu, 19 Nov 2020 18:19:51 -0500 Subject: [PATCH 05/41] added documentation on storage_options for headers --- doc/source/user_guide/io.rst | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/doc/source/user_guide/io.rst b/doc/source/user_guide/io.rst index 1bd35131622ab..79f46194d7491 100644 --- a/doc/source/user_guide/io.rst +++ b/doc/source/user_guide/io.rst @@ -1625,6 +1625,18 @@ functions - the following example shows reading a CSV file: df = pd.read_csv("https://download.bls.gov/pub/time.series/cu/cu.item", sep="\t") +A custom header can be sent alongside HTTP(s) requests by passing a dictionary +of header key value mappings to the ``storage_options`` keyword argument as shown below: + +.. code-block:: python + + headers = {"User-Agent": "pandas"} + df = pd.read_csv( + "https://download.bls.gov/pub/time.series/cu/cu.item", + sep="\t", + storage_options=headers + ) + All URLs which are not local files or HTTP(s) are handled by `fsspec`_, if installed, and its various filesystem implementations (including Amazon S3, Google Cloud, SSH, FTP, webHDFS...). From 9fcc72a826847ba2b96897bf3e23b0d640dd4cfc Mon Sep 17 00:00:00 2001 From: cdknox <> Date: Thu, 19 Nov 2020 18:56:26 -0500 Subject: [PATCH 06/41] DOC:Added doc for custom HTTP headers in read_csv and read_json --- doc/source/whatsnew/v1.2.0.rst | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/doc/source/whatsnew/v1.2.0.rst b/doc/source/whatsnew/v1.2.0.rst index cea42cbffa906..f5b8865fa7725 100644 --- a/doc/source/whatsnew/v1.2.0.rst +++ b/doc/source/whatsnew/v1.2.0.rst @@ -221,6 +221,23 @@ Additionally ``mean`` supports execution via `Numba ` the ``engine`` and ``engine_kwargs`` arguments. Numba must be installed as an optional dependency to use this feature. +.. _whatsnew_120.read_csv_json_http_headers: + +Custom HTTP(s) headers when reading csv or json files +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +:meth:`read_csv` and :meth:`read_json` use the dictionary passed to ``storage_options`` to create custom HTTP(s) headers. +For example: + +.. ipython:: python + + headers = {"User-Agent": "pandas"} + df = pd.read_csv( + "https://download.bls.gov/pub/time.series/cu/cu.item", + sep="\t", + storage_options=headers + ) + .. _whatsnew_120.enhancements.other: Other enhancements From df6e5395f3b63c87858b1dc3674984b00564031a Mon Sep 17 00:00:00 2001 From: cdknox <> Date: Fri, 20 Nov 2020 19:53:13 -0500 Subject: [PATCH 07/41] DOC:Corrected versionadded tag and added issue number for reference --- doc/source/user_guide/io.rst | 2 ++ doc/source/whatsnew/v1.2.0.rst | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/doc/source/user_guide/io.rst b/doc/source/user_guide/io.rst index 79f46194d7491..62533f9fe2182 100644 --- a/doc/source/user_guide/io.rst +++ b/doc/source/user_guide/io.rst @@ -1625,6 +1625,8 @@ functions - the following example shows reading a CSV file: df = pd.read_csv("https://download.bls.gov/pub/time.series/cu/cu.item", sep="\t") +.. versionadded:: 1.2.0 + A custom header can be sent alongside HTTP(s) requests by passing a dictionary of header key value mappings to the ``storage_options`` keyword argument as shown below: diff --git a/doc/source/whatsnew/v1.2.0.rst b/doc/source/whatsnew/v1.2.0.rst index f5b8865fa7725..48c446aff7c1d 100644 --- a/doc/source/whatsnew/v1.2.0.rst +++ b/doc/source/whatsnew/v1.2.0.rst @@ -226,7 +226,7 @@ to use this feature. Custom HTTP(s) headers when reading csv or json files ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ -:meth:`read_csv` and :meth:`read_json` use the dictionary passed to ``storage_options`` to create custom HTTP(s) headers. +:meth:`read_csv` and :meth:`read_json` use the dictionary passed to ``storage_options`` to create custom HTTP(s) headers. (:issue:`36688`) For example: .. ipython:: python From 98db1c4ef98868715dbd932bb5cceead12ad2dc5 Mon Sep 17 00:00:00 2001 From: cdknox <> Date: Sat, 21 Nov 2020 13:53:42 -0500 Subject: [PATCH 08/41] DOC:updated storage_options documentation --- pandas/core/shared_docs.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/pandas/core/shared_docs.py b/pandas/core/shared_docs.py index 9de9d1f434a12..2f920e11bd8d4 100644 --- a/pandas/core/shared_docs.py +++ b/pandas/core/shared_docs.py @@ -383,8 +383,9 @@ "storage_options" ] = """storage_options : dict, optional Extra options that make sense for a particular storage connection, e.g. - host, port, username, password, etc., if using a URL that will - be parsed by ``fsspec``, e.g., starting "s3://", "gcs://". An error - will be raised if providing this argument with a non-fsspec URL. - See the fsspec and backend storage implementation docs for the set of - allowed keys and values.""" + host, port, username, password, etc. If using a URL that will + be parsed by ``fsspec``, e.g., starting "s3://", "gcs://" then see the + fsspec and backend storage implementation docs for the set of + allowed keys and values. Otherwise if the remote URL is handled via urllib + this key and value mapping will be passed to the headers argument of the + urllib request.""" From f28f36ccc9e35e8363dc463a241d0b04223a4396 Mon Sep 17 00:00:00 2001 From: cdknox <> Date: Sat, 21 Nov 2020 13:58:38 -0500 Subject: [PATCH 09/41] TST:updated with tm.assert_frame_equal --- pandas/tests/io/test_common.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pandas/tests/io/test_common.py b/pandas/tests/io/test_common.py index dfaf126f8a4ed..575c5a89e6269 100644 --- a/pandas/tests/io/test_common.py +++ b/pandas/tests/io/test_common.py @@ -447,7 +447,7 @@ def dummy_response_getter(url): "http://localhost:80/test.csv", storage_options=headers ) assert dummy_request.called_with(headers=headers) - assert (received_df == true_df).all(axis=None) + assert tm.assert_frame_equal(received_df, true_df) def test_gzip_read_csv_http_custom_headers(): @@ -487,7 +487,7 @@ def dummy_response_getter(url): "http://localhost:80/test.csv", storage_options=headers ) assert dummy_request.called_with(headers=headers) - assert (received_df == true_df).all(axis=None) + assert tm.assert_frame_equal(received_df, true_df) def test_plain_text_read_json_http_custom_headers(): @@ -521,7 +521,7 @@ def dummy_response_getter(url): "http://localhost:80/test.json", storage_options=headers ) assert dummy_request.called_with(headers=headers) - assert (received_df == true_df).all(axis=None) + assert tm.assert_frame_equal(received_df, true_df) def test_gzip_read_json_http_custom_headers(): @@ -561,4 +561,4 @@ def dummy_response_getter(url): "http://localhost:80/test.json", storage_options=headers ) assert dummy_request.called_with(headers=headers) - assert (received_df == true_df).all(axis=None) + assert tm.assert_frame_equal(received_df, true_df) From dd3265f394d70f97f789227e957066799ebe9d26 Mon Sep 17 00:00:00 2001 From: cdknox <> Date: Sat, 21 Nov 2020 14:02:57 -0500 Subject: [PATCH 10/41] TST:fixed incorrect usage of tm.assert_frame_equal --- pandas/tests/io/test_common.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pandas/tests/io/test_common.py b/pandas/tests/io/test_common.py index 575c5a89e6269..ad1da4ea12c8b 100644 --- a/pandas/tests/io/test_common.py +++ b/pandas/tests/io/test_common.py @@ -447,7 +447,7 @@ def dummy_response_getter(url): "http://localhost:80/test.csv", storage_options=headers ) assert dummy_request.called_with(headers=headers) - assert tm.assert_frame_equal(received_df, true_df) + tm.assert_frame_equal(received_df, true_df) def test_gzip_read_csv_http_custom_headers(): @@ -487,7 +487,7 @@ def dummy_response_getter(url): "http://localhost:80/test.csv", storage_options=headers ) assert dummy_request.called_with(headers=headers) - assert tm.assert_frame_equal(received_df, true_df) + tm.assert_frame_equal(received_df, true_df) def test_plain_text_read_json_http_custom_headers(): @@ -521,7 +521,7 @@ def dummy_response_getter(url): "http://localhost:80/test.json", storage_options=headers ) assert dummy_request.called_with(headers=headers) - assert tm.assert_frame_equal(received_df, true_df) + tm.assert_frame_equal(received_df, true_df) def test_gzip_read_json_http_custom_headers(): @@ -561,4 +561,4 @@ def dummy_response_getter(url): "http://localhost:80/test.json", storage_options=headers ) assert dummy_request.called_with(headers=headers) - assert tm.assert_frame_equal(received_df, true_df) + tm.assert_frame_equal(received_df, true_df) From 02fc8407beec129cc346ed23939e83187a353d74 Mon Sep 17 00:00:00 2001 From: cdknox <> Date: Sat, 21 Nov 2020 14:53:27 -0500 Subject: [PATCH 11/41] CLN:reordered imports to fix pre-commit error --- pandas/tests/io/test_common.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/pandas/tests/io/test_common.py b/pandas/tests/io/test_common.py index ad1da4ea12c8b..756a0a6760f9c 100644 --- a/pandas/tests/io/test_common.py +++ b/pandas/tests/io/test_common.py @@ -2,10 +2,11 @@ Tests for the pandas.io.common functionalities """ import gzip -from io import StringIO, BytesIO +from io import BytesIO, StringIO import mmap import os from pathlib import Path +from unittest.mock import MagicMock, patch import pytest @@ -17,8 +18,6 @@ import pandas.io.common as icom -from unittest.mock import MagicMock, patch - class CustomFSPath: """For testing fspath on unknown objects""" @@ -416,7 +415,7 @@ def test_is_fsspec_url(): assert not icom.is_fsspec_url("relative/local/path") -def test_plain_text_read_csv_http_custom_headers(): +def test_plain_text_read_csv_http_custom_headers(monkeypatch): true_df = pd.DataFrame({"column_name": ["column_value"]}) df_csv_bytes = true_df.to_csv(index=False).encode("utf-8") headers = { From da97f0a178c7b366d143a7e30b405b8c92089fc8 Mon Sep 17 00:00:00 2001 From: cdknox <> Date: Sat, 21 Nov 2020 20:12:23 -0500 Subject: [PATCH 12/41] DOC:changed whatsnew and added to shared_docs.py GH36688 --- doc/source/whatsnew/v1.2.0.rst | 5 ++++- pandas/core/shared_docs.py | 8 +++++--- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/doc/source/whatsnew/v1.2.0.rst b/doc/source/whatsnew/v1.2.0.rst index 48c446aff7c1d..9d8c0605c4d8e 100644 --- a/doc/source/whatsnew/v1.2.0.rst +++ b/doc/source/whatsnew/v1.2.0.rst @@ -226,7 +226,10 @@ to use this feature. Custom HTTP(s) headers when reading csv or json files ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ -:meth:`read_csv` and :meth:`read_json` use the dictionary passed to ``storage_options`` to create custom HTTP(s) headers. (:issue:`36688`) +When reading from a remote URL that is not handled by fsspec (ie. HTTP and +HTTPS) the dictionary passed to ``storage_options`` will be used to create the +headers included in the request. This can be used to control the User-Agent +header or send other custom headers (:issue:`36688`). For example: .. ipython:: python diff --git a/pandas/core/shared_docs.py b/pandas/core/shared_docs.py index 2f920e11bd8d4..9087298b17dfb 100644 --- a/pandas/core/shared_docs.py +++ b/pandas/core/shared_docs.py @@ -386,6 +386,8 @@ host, port, username, password, etc. If using a URL that will be parsed by ``fsspec``, e.g., starting "s3://", "gcs://" then see the fsspec and backend storage implementation docs for the set of - allowed keys and values. Otherwise if the remote URL is handled via urllib - this key and value mapping will be passed to the headers argument of the - urllib request.""" + allowed keys and values. Otherwise, in ``read_*`` funtions only, the + remote URL is handled via urllib (eg. HTTP or HTTPS). When handled by + urllib this key value mapping will be passed to the headers + argument of the urllib request thereby allowing custom headers + such as setting the User-Agent.""" From fce4b1762ad526f38425428751f120b45d8ee84a Mon Sep 17 00:00:00 2001 From: cdknox <> Date: Sat, 21 Nov 2020 20:20:08 -0500 Subject: [PATCH 13/41] ENH: read nonfsspec URL with headers built from storage_options GH36688 --- pandas/io/parquet.py | 39 +++++++++++++++++++++++++++++++-------- 1 file changed, 31 insertions(+), 8 deletions(-) diff --git a/pandas/io/parquet.py b/pandas/io/parquet.py index a19b132a7891d..4cf4384ab5d30 100644 --- a/pandas/io/parquet.py +++ b/pandas/io/parquet.py @@ -13,7 +13,13 @@ from pandas import DataFrame, MultiIndex, get_option from pandas.core import generic -from pandas.io.common import IOHandles, get_handle, is_fsspec_url, stringify_path +from pandas.io.common import ( + IOHandles, + get_handle, + is_fsspec_url, + stringify_path, + is_url, +) def get_engine(engine: str) -> "BaseImpl": @@ -65,8 +71,10 @@ def _get_path_or_handle( fs, path_or_handle = fsspec.core.url_to_fs( path_or_handle, **(storage_options or {}) ) - elif storage_options: - raise ValueError("storage_options passed with buffer or non-fsspec filepath") + elif storage_options and (not is_url(path_or_handle) or mode != "rb"): + # can't write to a remote url + # without making use of fsspec at the moment + raise ValueError("storage_options passed with buffer, or non-supported URL") handles = None if ( @@ -78,7 +86,9 @@ def _get_path_or_handle( # use get_handle only when we are very certain that it is not a directory # fsspec resources can also point to directories # this branch is used for example when reading from non-fsspec URLs - handles = get_handle(path_or_handle, mode, is_text=False) + handles = get_handle( + path_or_handle, mode, is_text=False, storage_options=storage_options + ) fs = None path_or_handle = handles.handle return path_or_handle, handles, fs @@ -271,7 +281,9 @@ def read( # use get_handle only when we are very certain that it is not a directory # fsspec resources can also point to directories # this branch is used for example when reading from non-fsspec URLs - handles = get_handle(path, "rb", is_text=False) + handles = get_handle( + path, "rb", is_text=False, storage_options=storage_options + ) path = handles.handle parquet_file = self.api.ParquetFile(path, **parquet_kwargs) @@ -368,7 +380,14 @@ def to_parquet( return None -def read_parquet(path, engine: str = "auto", columns=None, **kwargs): +@doc(storage_options=generic._shared_docs["storage_options"]) +def read_parquet( + path, + engine: str = "auto", + columns=None, + storage_options: StorageOptions = None, + **kwargs, +): """ Load a parquet object from the file path, returning a DataFrame. @@ -390,13 +409,17 @@ def read_parquet(path, engine: str = "auto", columns=None, **kwargs): By file-like object, we refer to objects with a ``read()`` method, such as a file handle (e.g. via builtin ``open`` function) or ``StringIO``. - engine : {'auto', 'pyarrow', 'fastparquet'}, default 'auto' + engine : {{'auto', 'pyarrow', 'fastparquet'}}, default 'auto' Parquet library to use. If 'auto', then the option ``io.parquet.engine`` is used. The default ``io.parquet.engine`` behavior is to try 'pyarrow', falling back to 'fastparquet' if 'pyarrow' is unavailable. columns : list, default=None If not None, only these columns will be read from the file. + {storage_options} + + .. versionadded:: 1.2.0 + **kwargs Any additional kwargs are passed to the engine. @@ -405,4 +428,4 @@ def read_parquet(path, engine: str = "auto", columns=None, **kwargs): DataFrame """ impl = get_engine(engine) - return impl.read(path, columns=columns, **kwargs) + return impl.read(path, columns=columns, storage_options=storage_options, **kwargs) From e0cfcb63aa9bbaf27ca83e387673de502ff8cc0b Mon Sep 17 00:00:00 2001 From: cdknox <> Date: Sat, 21 Nov 2020 20:28:37 -0500 Subject: [PATCH 14/41] TST:Added additional tests parquet and other read methods GH36688 --- pandas/tests/io/test_common.py | 146 +++++++++++++++++++++++++++++++++ 1 file changed, 146 insertions(+) diff --git a/pandas/tests/io/test_common.py b/pandas/tests/io/test_common.py index 756a0a6760f9c..8c73aa13cfdc4 100644 --- a/pandas/tests/io/test_common.py +++ b/pandas/tests/io/test_common.py @@ -561,3 +561,149 @@ def dummy_response_getter(url): ) assert dummy_request.called_with(headers=headers) tm.assert_frame_equal(received_df, true_df) + + +def test_read_parquet_http_no_storage_options(): + true_df = pd.DataFrame({"column_name": ["column_value"]}) + df_parquet_bytes = true_df.to_parquet(index=False) + + class DummyResponse: + headers = { + "Content-Type": "application/octet-stream", + } + + @staticmethod + def read(): + return df_parquet_bytes + + @staticmethod + def close(): + pass + + def dummy_response_getter(url): + return DummyResponse() + + dummy_request = MagicMock() + with patch("urllib.request.Request", new=dummy_request): + with patch("urllib.request.urlopen", new=dummy_response_getter): + received_df = pd.read_parquet("http://localhost:80/test.parquet") + tm.assert_frame_equal(received_df, true_df) + + +def test_read_parquet_http_with_storage_options(): + true_df = pd.DataFrame({"column_name": ["column_value"]}) + df_parquet_bytes = true_df.to_parquet(index=False) + headers = { + "User-Agent": "custom", + "Auth": "other_custom", + } + + class DummyResponse: + headers = { + "Content-Type": "application/octet-stream", + } + + @staticmethod + def read(): + return df_parquet_bytes + + @staticmethod + def close(): + pass + + def dummy_response_getter(url): + return DummyResponse() + + dummy_request = MagicMock() + with patch("urllib.request.Request", new=dummy_request): + with patch("urllib.request.urlopen", new=dummy_response_getter): + received_df = pd.read_parquet( + "http://localhost:80/test.parquet", storage_options=headers + ) + assert dummy_request.called_with(headers=headers) + tm.assert_frame_equal(received_df, true_df) + + +def test_read_pickle_http_with_storage_options(): + true_df = pd.DataFrame({"column_name": ["column_value"]}) + bio = BytesIO() + true_df.to_pickle(bio) + df_pickle_bytes = bio.getvalue() + headers = { + "User-Agent": "custom", + "Auth": "other_custom", + } + + class DummyResponse: + headers = { + "Content-Type": "application/octet-stream", + } + + @staticmethod + def read(): + return df_pickle_bytes + + @staticmethod + def close(): + pass + + def dummy_response_getter(url): + return DummyResponse() + + dummy_request = MagicMock() + with patch("urllib.request.Request", new=dummy_request): + with patch("urllib.request.urlopen", new=dummy_response_getter): + received_df = pd.read_pickle( + "http://localhost:80/test.pkl", storage_options=headers + ) + assert dummy_request.called_with(headers=headers) + tm.assert_frame_equal(received_df, true_df) + + +def test_read_stata_http_with_storage_options(): + true_df = pd.DataFrame({"column_name": ["column_value"]}) + bio = BytesIO() + true_df.to_stata(bio, write_index=False) + df_pickle_bytes = bio.getvalue() + headers = { + "User-Agent": "custom", + "Auth": "other_custom", + } + + class DummyResponse: + headers = { + "Content-Type": "application/octet-stream", + } + + @staticmethod + def read(): + return df_pickle_bytes + + @staticmethod + def close(): + pass + + def dummy_response_getter(url): + return DummyResponse() + + dummy_request = MagicMock() + with patch("urllib.request.Request", new=dummy_request): + with patch("urllib.request.urlopen", new=dummy_response_getter): + received_df = pd.read_stata( + "http://localhost:80/test.dta", storage_options=headers + ) + assert dummy_request.called_with(headers=headers) + tm.assert_frame_equal(received_df, true_df) + + +def test_to_parquet_to_disk_with_storage_options(): + headers = { + "User-Agent": "custom", + "Auth": "other_custom", + } + + true_df = pd.DataFrame({"column_name": ["column_value"]}) + with pytest.raises(ValueError): + df_parquet_bytes = true_df.to_parquet( + "/tmp/junk.parquet", storage_options=headers + ) From 33115b739fc5e414ee3f2b61c57210ee64b4eb8c Mon Sep 17 00:00:00 2001 From: cdknox <> Date: Wed, 2 Dec 2020 21:34:34 -0500 Subject: [PATCH 15/41] TST:removed mocking in favor of threaded http server --- pandas/tests/io/test_common.py | 479 +++++++++++++++------------------ 1 file changed, 217 insertions(+), 262 deletions(-) diff --git a/pandas/tests/io/test_common.py b/pandas/tests/io/test_common.py index 8c73aa13cfdc4..5fe2bcbe64809 100644 --- a/pandas/tests/io/test_common.py +++ b/pandas/tests/io/test_common.py @@ -6,7 +6,11 @@ import mmap import os from pathlib import Path -from unittest.mock import MagicMock, patch + +# from unittest.mock import MagicMock, patch +import threading +import http.server + import pytest @@ -415,285 +419,236 @@ def test_is_fsspec_url(): assert not icom.is_fsspec_url("relative/local/path") -def test_plain_text_read_csv_http_custom_headers(monkeypatch): - true_df = pd.DataFrame({"column_name": ["column_value"]}) - df_csv_bytes = true_df.to_csv(index=False).encode("utf-8") - headers = { - "User-Agent": "custom", - "Auth": "other_custom", - } - - class DummyResponse: - headers = { - "Content-Type": "text/csv", - } - - @staticmethod - def read(): - return df_csv_bytes - - @staticmethod - def close(): - pass - - def dummy_response_getter(url): - return DummyResponse() - - dummy_request = MagicMock() - with patch("urllib.request.Request", new=dummy_request): - with patch("urllib.request.urlopen", new=dummy_response_getter): - received_df = pd.read_csv( - "http://localhost:80/test.csv", storage_options=headers - ) - assert dummy_request.called_with(headers=headers) - tm.assert_frame_equal(received_df, true_df) - - -def test_gzip_read_csv_http_custom_headers(): - true_df = pd.DataFrame({"column_name": ["column_value"]}) - df_csv_bytes = true_df.to_csv(index=False).encode("utf-8") - headers = { - "User-Agent": "custom", - "Auth": "other_custom", - } - - class DummyResponse: - headers = { - "Content-Type": "text/csv", - "Content-Encoding": "gzip", - } - - @staticmethod - def read(): - bio = BytesIO() - zipper = gzip.GzipFile(fileobj=bio, mode="w") - zipper.write(df_csv_bytes) - zipper.close() - gzipped_response = bio.getvalue() - return gzipped_response - - @staticmethod - def close(): - pass - - def dummy_response_getter(url): - return DummyResponse() - - dummy_request = MagicMock() - with patch("urllib.request.Request", new=dummy_request): - with patch("urllib.request.urlopen", new=dummy_response_getter): - received_df = pd.read_csv( - "http://localhost:80/test.csv", storage_options=headers - ) - assert dummy_request.called_with(headers=headers) - tm.assert_frame_equal(received_df, true_df) - - -def test_plain_text_read_json_http_custom_headers(): - true_df = pd.DataFrame({"column_name": ["column_value"]}) - df_json_bytes = true_df.to_json().encode("utf-8") - headers = { - "User-Agent": "custom", - "Auth": "other_custom", - } - - class DummyResponse: - headers = { - "Content-Type": "application/json", - } - - @staticmethod - def read(): - return df_json_bytes - - @staticmethod - def close(): - pass - - def dummy_response_getter(url): - return DummyResponse() - - dummy_request = MagicMock() - with patch("urllib.request.Request", new=dummy_request): - with patch("urllib.request.urlopen", new=dummy_response_getter): - received_df = pd.read_json( - "http://localhost:80/test.json", storage_options=headers - ) - assert dummy_request.called_with(headers=headers) - tm.assert_frame_equal(received_df, true_df) - - -def test_gzip_read_json_http_custom_headers(): - true_df = pd.DataFrame({"column_name": ["column_value"]}) - df_json_bytes = true_df.to_json().encode("utf-8") - headers = { - "User-Agent": "custom", - "Auth": "other_custom", - } - - class DummyResponse: - headers = { - "Content-Type": "application/json", - "Content-Encoding": "gzip", - } - - @staticmethod - def read(): - bio = BytesIO() - zipper = gzip.GzipFile(fileobj=bio, mode="w") - zipper.write(df_json_bytes) - zipper.close() - gzipped_response = bio.getvalue() - return gzipped_response - - @staticmethod - def close(): - pass - - def dummy_response_getter(url): - return DummyResponse() - - dummy_request = MagicMock() - with patch("urllib.request.Request", new=dummy_request): - with patch("urllib.request.urlopen", new=dummy_response_getter): - received_df = pd.read_json( - "http://localhost:80/test.json", storage_options=headers - ) - assert dummy_request.called_with(headers=headers) - tm.assert_frame_equal(received_df, true_df) - - -def test_read_parquet_http_no_storage_options(): - true_df = pd.DataFrame({"column_name": ["column_value"]}) - df_parquet_bytes = true_df.to_parquet(index=False) - - class DummyResponse: - headers = { - "Content-Type": "application/octet-stream", - } - - @staticmethod - def read(): - return df_parquet_bytes - - @staticmethod - def close(): - pass - - def dummy_response_getter(url): - return DummyResponse() +class BaseUserAgentResponder(http.server.BaseHTTPRequestHandler): + """ + Base class for setting up a server that can be set up to respond + with a particular file format with accompanying content-type headers + """ - dummy_request = MagicMock() - with patch("urllib.request.Request", new=dummy_request): - with patch("urllib.request.urlopen", new=dummy_response_getter): - received_df = pd.read_parquet("http://localhost:80/test.parquet") - tm.assert_frame_equal(received_df, true_df) + def start_processing_headers(self): + """ + shared logic at the start of a GET request + """ + self.send_response(200) + self.requested_from_user_agent = self.headers["User-Agent"] + response_df = pd.DataFrame( + { + "header": [self.requested_from_user_agent], + } + ) + return response_df + def gzip_bytes(self, response_bytes): + """ + some web servers will send back gzipped files to save bandwidth + """ + bio = BytesIO() + zipper = gzip.GzipFile(fileobj=bio, mode="w") + zipper.write(response_bytes) + zipper.close() + response_bytes = bio.getvalue() + return response_bytes + + def write_back_bytes(self, response_bytes): + """ + shared logic at the end of a GET request + """ + self.wfile.write(response_bytes) -def test_read_parquet_http_with_storage_options(): - true_df = pd.DataFrame({"column_name": ["column_value"]}) - df_parquet_bytes = true_df.to_parquet(index=False) - headers = { - "User-Agent": "custom", - "Auth": "other_custom", - } - class DummyResponse: - headers = { - "Content-Type": "application/octet-stream", - } +class CSVUserAgentResponder(BaseUserAgentResponder): + def do_GET(self): + response_df = self.start_processing_headers() - @staticmethod - def read(): - return df_parquet_bytes + self.send_header("Content-Type", "text/csv") + self.end_headers() - @staticmethod - def close(): - pass + response_bytes = response_df.to_csv(index=False).encode("utf-8") + self.write_back_bytes(response_bytes) - def dummy_response_getter(url): - return DummyResponse() - dummy_request = MagicMock() - with patch("urllib.request.Request", new=dummy_request): - with patch("urllib.request.urlopen", new=dummy_response_getter): - received_df = pd.read_parquet( - "http://localhost:80/test.parquet", storage_options=headers - ) - assert dummy_request.called_with(headers=headers) - tm.assert_frame_equal(received_df, true_df) +class GzippedCSVUserAgentResponder(BaseUserAgentResponder): + def do_GET(self): + response_df = self.start_processing_headers() + self.send_header("Content-Type", "text/csv") + self.send_header("Content-Encoding", "gzip") + self.end_headers() + response_bytes = response_df.to_csv(index=False).encode("utf-8") + response_bytes = self.gzip_bytes(response_bytes) -def test_read_pickle_http_with_storage_options(): - true_df = pd.DataFrame({"column_name": ["column_value"]}) - bio = BytesIO() - true_df.to_pickle(bio) - df_pickle_bytes = bio.getvalue() - headers = { - "User-Agent": "custom", - "Auth": "other_custom", - } + self.write_back_bytes(response_bytes) - class DummyResponse: - headers = { - "Content-Type": "application/octet-stream", - } - @staticmethod - def read(): - return df_pickle_bytes +class JSONUserAgentResponder(BaseUserAgentResponder): + def do_GET(self): + response_df = self.start_processing_headers() + self.send_header("Content-Type", "application/json") + self.end_headers() - @staticmethod - def close(): - pass + response_bytes = response_df.to_json().encode("utf-8") - def dummy_response_getter(url): - return DummyResponse() + self.write_back_bytes(response_bytes) - dummy_request = MagicMock() - with patch("urllib.request.Request", new=dummy_request): - with patch("urllib.request.urlopen", new=dummy_response_getter): - received_df = pd.read_pickle( - "http://localhost:80/test.pkl", storage_options=headers - ) - assert dummy_request.called_with(headers=headers) - tm.assert_frame_equal(received_df, true_df) +class GzippedJSONUserAgentResponder(BaseUserAgentResponder): + def do_GET(self): + response_df = self.start_processing_headers() + self.send_header("Content-Type", "application/json") + self.send_header("Content-Encoding", "gzip") + self.end_headers() -def test_read_stata_http_with_storage_options(): - true_df = pd.DataFrame({"column_name": ["column_value"]}) - bio = BytesIO() - true_df.to_stata(bio, write_index=False) - df_pickle_bytes = bio.getvalue() - headers = { - "User-Agent": "custom", - "Auth": "other_custom", + response_bytes = response_df.to_json().encode("utf-8") + response_bytes = self.gzip_bytes(response_bytes) + + self.write_back_bytes(response_bytes) + + +class ParquetUserAgentResponder(BaseUserAgentResponder): + def do_GET(self): + response_df = self.start_processing_headers() + self.send_header("Content-Type", "application/octet-stream") + self.end_headers() + + response_bytes = response_df.to_parquet(index=False) + + self.write_back_bytes(response_bytes) + + +class PickleUserAgentResponder(BaseUserAgentResponder): + def do_GET(self): + response_df = self.start_processing_headers() + self.send_header("Content-Type", "application/octet-stream") + self.end_headers() + + bio = BytesIO() + response_df.to_pickle(bio) + response_bytes = bio.getvalue() + + self.write_back_bytes(response_bytes) + + +class StataUserAgentResponder(BaseUserAgentResponder): + def do_GET(self): + response_df = self.start_processing_headers() + self.send_header("Content-Type", "application/octet-stream") + self.end_headers() + + bio = BytesIO() + response_df.to_stata(bio) + response_bytes = bio.getvalue() + + self.write_back_bytes(response_bytes) + + +class AllHeaderCSVResponder(http.server.BaseHTTPRequestHandler): + """ + Send all request headers back for checking round trip + """ + + def do_GET(self): + response_df = pd.DataFrame(self.headers.items()) + self.send_response(200) + self.send_header("Content-Type", "text/csv") + self.end_headers() + response_bytes = response_df.to_csv(index=False).encode("utf-8") + self.wfile.write(response_bytes) + + +@pytest.mark.parametrize( + "responder, read_method, port", + [ + (CSVUserAgentResponder, pd.read_csv, 34259), + (JSONUserAgentResponder, pd.read_json, 34260), + (ParquetUserAgentResponder, pd.read_parquet, 34268), + (PickleUserAgentResponder, pd.read_pickle, 34271), + (StataUserAgentResponder, pd.read_stata, 34272), + (GzippedCSVUserAgentResponder, pd.read_csv, 34261), + (GzippedJSONUserAgentResponder, pd.read_json, 34262), + ], +) +def test_server_and_default_headers(responder, read_method, port): + server = http.server.HTTPServer(("localhost", port), responder) + server_thread = threading.Thread(target=server.serve_forever) + server_thread.start() + try: + df_http = read_method(f"http://localhost:{port}") + server.shutdown() + except Exception: + df_http = pd.DataFrame({"header": []}) + server.shutdown() + + server_thread.join() + assert not df_http.empty + + +@pytest.mark.parametrize( + "responder, read_method, port", + [ + (CSVUserAgentResponder, pd.read_csv, 34263), + (JSONUserAgentResponder, pd.read_json, 34264), + (ParquetUserAgentResponder, pd.read_parquet, 34270), + (PickleUserAgentResponder, pd.read_pickle, 34273), + (StataUserAgentResponder, pd.read_stata, 34274), + (GzippedCSVUserAgentResponder, pd.read_csv, 34265), + (GzippedJSONUserAgentResponder, pd.read_json, 34266), + ], +) +def test_server_and_custom_headers(responder, read_method, port): + custom_user_agent = "Super Cool One" + df_true = pd.DataFrame({"header": [custom_user_agent]}) + server = http.server.HTTPServer(("localhost", port), responder) + server_thread = threading.Thread(target=server.serve_forever) + server_thread.start() + try: + df_http = read_method( + f"http://localhost:{port}", + storage_options={"User-Agent": custom_user_agent}, + ) + server.shutdown() + except Exception: + df_http = pd.DataFrame({"header": []}) + server.shutdown() + server_thread.join() + tm.assert_frame_equal(df_true, df_http) + + +@pytest.mark.parametrize( + "responder, read_method, port", + [ + (AllHeaderCSVResponder, pd.read_csv, 34267), + ], +) +def test_server_and_custom_headers(responder, read_method, port): + custom_user_agent = "Super Cool One" + custom_auth_token = "Super Secret One" + storage_options = { + "User-Agent": custom_user_agent, + "Auth": custom_auth_token, } - - class DummyResponse: - headers = { - "Content-Type": "application/octet-stream", - } - - @staticmethod - def read(): - return df_pickle_bytes - - @staticmethod - def close(): - pass - - def dummy_response_getter(url): - return DummyResponse() - - dummy_request = MagicMock() - with patch("urllib.request.Request", new=dummy_request): - with patch("urllib.request.urlopen", new=dummy_response_getter): - received_df = pd.read_stata( - "http://localhost:80/test.dta", storage_options=headers - ) - assert dummy_request.called_with(headers=headers) - tm.assert_frame_equal(received_df, true_df) + server = http.server.HTTPServer(("localhost", port), responder) + server_thread = threading.Thread(target=server.serve_forever) + server_thread.start() + try: + df_http = read_method( + f"http://localhost:{port}", + storage_options=storage_options, + ) + server.shutdown() + except Exception: + df_http = pd.DataFrame({"0": [], "1": []}) + server.shutdown() + server_thread.join() + df_http = df_http[df_http["0"].isin(storage_options.keys())] + df_http = df_http.sort_values(["0"]).reset_index() + df_http = df_http[["0", "1"]] + keys = list(storage_options.keys()) + df_true = pd.DataFrame( + {"0": [k for k in keys], "1": [storage_options[k] for k in keys]} + ) + df_true = df_true.sort_values(["0"]) + df_true = df_true.reset_index().drop(["index"], axis=1) + tm.assert_frame_equal(df_true, df_http) def test_to_parquet_to_disk_with_storage_options(): From 5a1c64e0dac883309f0768ba33463d3a865ddc69 Mon Sep 17 00:00:00 2001 From: cdknox <> Date: Wed, 2 Dec 2020 21:42:52 -0500 Subject: [PATCH 16/41] DOC:refined storage_options doscstring --- pandas/core/shared_docs.py | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/pandas/core/shared_docs.py b/pandas/core/shared_docs.py index 9087298b17dfb..ca0b2b43b1584 100644 --- a/pandas/core/shared_docs.py +++ b/pandas/core/shared_docs.py @@ -383,11 +383,8 @@ "storage_options" ] = """storage_options : dict, optional Extra options that make sense for a particular storage connection, e.g. - host, port, username, password, etc. If using a URL that will - be parsed by ``fsspec``, e.g., starting "s3://", "gcs://" then see the - fsspec and backend storage implementation docs for the set of - allowed keys and values. Otherwise, in ``read_*`` funtions only, the - remote URL is handled via urllib (eg. HTTP or HTTPS). When handled by - urllib this key value mapping will be passed to the headers - argument of the urllib request thereby allowing custom headers - such as setting the User-Agent.""" + host, port, username, password, etc. For HTTP(S) URLs the key-value pairs + are forwarded to ``urllib`` as header options. For other URLs (e.g. + starting with "s3://", and "gcs://") the key-value pairs are forwarded to + ``fsspec``. Please see ``fsspec`` and ``urllib`` for more details. + """ From 87d7dc6982d1b27b1033aeb29a2ad1e8b59baf6f Mon Sep 17 00:00:00 2001 From: cdknox <> Date: Wed, 2 Dec 2020 23:09:51 -0500 Subject: [PATCH 17/41] CLN:used the github editor and had pep8 issues --- pandas/io/parquet.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/pandas/io/parquet.py b/pandas/io/parquet.py index df49a49d5fad1..c1fa214c89760 100644 --- a/pandas/io/parquet.py +++ b/pandas/io/parquet.py @@ -416,7 +416,6 @@ def to_parquet( return None - @doc(storage_options=generic._shared_docs["storage_options"]) def read_parquet( path, @@ -479,7 +478,9 @@ def read_parquet( impl = get_engine(engine) return impl.read( - path, columns=columns, storage_options=storage_options, - use_nullable_dtypes=use_nullable_dtypes, **kwargs + path, + columns=columns, + storage_options=storage_options, + use_nullable_dtypes=use_nullable_dtypes, + **kwargs, ) - From 64a0d19516d7949650d13cf37b178e2bc5692f71 Mon Sep 17 00:00:00 2001 From: cdknox <> Date: Wed, 2 Dec 2020 23:14:57 -0500 Subject: [PATCH 18/41] CLN: leftover comment removed --- pandas/tests/io/test_common.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pandas/tests/io/test_common.py b/pandas/tests/io/test_common.py index 5fe2bcbe64809..d723e2743ab7e 100644 --- a/pandas/tests/io/test_common.py +++ b/pandas/tests/io/test_common.py @@ -2,14 +2,13 @@ Tests for the pandas.io.common functionalities """ import gzip +import http.server from io import BytesIO, StringIO import mmap import os from pathlib import Path -# from unittest.mock import MagicMock, patch import threading -import http.server import pytest From 1724e9b90eaf3a0e6cc17d8e934ff4037d391666 Mon Sep 17 00:00:00 2001 From: cdknox <> Date: Thu, 3 Dec 2020 00:06:55 -0500 Subject: [PATCH 19/41] TST:attempted to address test warning of unclosed socket GH36688 --- pandas/tests/io/test_common.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/pandas/tests/io/test_common.py b/pandas/tests/io/test_common.py index d723e2743ab7e..6b8aa8ce4b4fd 100644 --- a/pandas/tests/io/test_common.py +++ b/pandas/tests/io/test_common.py @@ -559,7 +559,7 @@ def do_GET(self): [ (CSVUserAgentResponder, pd.read_csv, 34259), (JSONUserAgentResponder, pd.read_json, 34260), - (ParquetUserAgentResponder, pd.read_parquet, 34268), + #(ParquetUserAgentResponder, pd.read_parquet, 34268), (PickleUserAgentResponder, pd.read_pickle, 34271), (StataUserAgentResponder, pd.read_stata, 34272), (GzippedCSVUserAgentResponder, pd.read_csv, 34261), @@ -576,6 +576,7 @@ def test_server_and_default_headers(responder, read_method, port): except Exception: df_http = pd.DataFrame({"header": []}) server.shutdown() + server.server_close() server_thread.join() assert not df_http.empty @@ -586,7 +587,7 @@ def test_server_and_default_headers(responder, read_method, port): [ (CSVUserAgentResponder, pd.read_csv, 34263), (JSONUserAgentResponder, pd.read_json, 34264), - (ParquetUserAgentResponder, pd.read_parquet, 34270), + #(ParquetUserAgentResponder, pd.read_parquet, 34270), (PickleUserAgentResponder, pd.read_pickle, 34273), (StataUserAgentResponder, pd.read_stata, 34274), (GzippedCSVUserAgentResponder, pd.read_csv, 34265), @@ -608,6 +609,7 @@ def test_server_and_custom_headers(responder, read_method, port): except Exception: df_http = pd.DataFrame({"header": []}) server.shutdown() + server.server_close() server_thread.join() tm.assert_frame_equal(df_true, df_http) @@ -637,6 +639,7 @@ def test_server_and_custom_headers(responder, read_method, port): except Exception: df_http = pd.DataFrame({"0": [], "1": []}) server.shutdown() + server.server_close() server_thread.join() df_http = df_http[df_http["0"].isin(storage_options.keys())] df_http = df_http.sort_values(["0"]).reset_index() From f8b8c4323ab7912940d695586d4dcbf7fa48ad76 Mon Sep 17 00:00:00 2001 From: cdknox <> Date: Thu, 3 Dec 2020 01:07:35 -0500 Subject: [PATCH 20/41] TST:added pytest.importorskip to handle the two main parquet engines GH36688 --- pandas/tests/io/test_common.py | 77 ++++++++++++++++++++++------------ 1 file changed, 51 insertions(+), 26 deletions(-) diff --git a/pandas/tests/io/test_common.py b/pandas/tests/io/test_common.py index 6b8aa8ce4b4fd..0a60fe0edd7fa 100644 --- a/pandas/tests/io/test_common.py +++ b/pandas/tests/io/test_common.py @@ -555,56 +555,72 @@ def do_GET(self): @pytest.mark.parametrize( - "responder, read_method, port", + "responder, read_method, port, parquet_engine", [ - (CSVUserAgentResponder, pd.read_csv, 34259), - (JSONUserAgentResponder, pd.read_json, 34260), - #(ParquetUserAgentResponder, pd.read_parquet, 34268), - (PickleUserAgentResponder, pd.read_pickle, 34271), - (StataUserAgentResponder, pd.read_stata, 34272), - (GzippedCSVUserAgentResponder, pd.read_csv, 34261), - (GzippedJSONUserAgentResponder, pd.read_json, 34262), + (CSVUserAgentResponder, pd.read_csv, 34259, None), + (JSONUserAgentResponder, pd.read_json, 34260, None), + (ParquetUserAgentResponder, pd.read_parquet, 34268, "pyarrow"), + (ParquetUserAgentResponder, pd.read_parquet, 34273, "fastparquet"), + (PickleUserAgentResponder, pd.read_pickle, 34271, None), + (StataUserAgentResponder, pd.read_stata, 34272, None), + (GzippedCSVUserAgentResponder, pd.read_csv, 34261, None), + (GzippedJSONUserAgentResponder, pd.read_json, 34262, None), ], ) -def test_server_and_default_headers(responder, read_method, port): +def test_server_and_default_headers(responder, read_method, port, parquet_engine): + if read_method is pd.read_parquet: + pytest.importorskip(parquet_engine) server = http.server.HTTPServer(("localhost", port), responder) server_thread = threading.Thread(target=server.serve_forever) server_thread.start() try: - df_http = read_method(f"http://localhost:{port}") + if parquet_engine is None: + df_http = read_method(f"http://localhost:{port}") + else: + df_http = read_method(f"http://localhost:{port}", engine=parquet_engine) server.shutdown() except Exception: df_http = pd.DataFrame({"header": []}) server.shutdown() server.server_close() - server_thread.join() assert not df_http.empty @pytest.mark.parametrize( - "responder, read_method, port", + "responder, read_method, port, parquet_engine", [ - (CSVUserAgentResponder, pd.read_csv, 34263), - (JSONUserAgentResponder, pd.read_json, 34264), - #(ParquetUserAgentResponder, pd.read_parquet, 34270), - (PickleUserAgentResponder, pd.read_pickle, 34273), - (StataUserAgentResponder, pd.read_stata, 34274), - (GzippedCSVUserAgentResponder, pd.read_csv, 34265), - (GzippedJSONUserAgentResponder, pd.read_json, 34266), + (CSVUserAgentResponder, pd.read_csv, 34263, None), + (JSONUserAgentResponder, pd.read_json, 34264, None), + (ParquetUserAgentResponder, pd.read_parquet, 34270, "pyarrow"), + (ParquetUserAgentResponder, pd.read_parquet, 34270, "fastparquet"), + (PickleUserAgentResponder, pd.read_pickle, 34273, None), + (StataUserAgentResponder, pd.read_stata, 34274, None), + (GzippedCSVUserAgentResponder, pd.read_csv, 34265, None), + (GzippedJSONUserAgentResponder, pd.read_json, 34266, None), ], ) -def test_server_and_custom_headers(responder, read_method, port): +def test_server_and_custom_headers(responder, read_method, port, parquet_engine): + if read_method is pd.read_parquet: + pytest.importorskip(parquet_engine) custom_user_agent = "Super Cool One" df_true = pd.DataFrame({"header": [custom_user_agent]}) server = http.server.HTTPServer(("localhost", port), responder) server_thread = threading.Thread(target=server.serve_forever) server_thread.start() try: - df_http = read_method( - f"http://localhost:{port}", - storage_options={"User-Agent": custom_user_agent}, - ) + if parquet_engine is None: + df_http = read_method( + f"http://localhost:{port}", + storage_options={"User-Agent": custom_user_agent}, + ) + else: + df_http = read_method( + f"http://localhost:{port}", + storage_options={"User-Agent": custom_user_agent}, + engine=parquet_engine, + ) + df_http = read_method(f"http://localhost:{port}", engine=parquet_engine) server.shutdown() except Exception: df_http = pd.DataFrame({"header": []}) @@ -653,14 +669,23 @@ def test_server_and_custom_headers(responder, read_method, port): tm.assert_frame_equal(df_true, df_http) -def test_to_parquet_to_disk_with_storage_options(): +@pytest.mark.parametrize( + "engine", + [ + "pyarrow", + "fastparquet", + ], +) +def test_to_parquet_to_disk_with_storage_options(engine): headers = { "User-Agent": "custom", "Auth": "other_custom", } + pytest.importorskip(engine) + true_df = pd.DataFrame({"column_name": ["column_value"]}) with pytest.raises(ValueError): df_parquet_bytes = true_df.to_parquet( - "/tmp/junk.parquet", storage_options=headers + "/tmp/junk.parquet", storage_options=headers, engine=engine ) From a17d574d0b2a75d9d9856dd89cb2ee7d15a0ceaf Mon Sep 17 00:00:00 2001 From: cdknox <> Date: Thu, 3 Dec 2020 01:20:45 -0500 Subject: [PATCH 21/41] CLN: imports moved to correct order GH36688 --- pandas/io/parquet.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pandas/io/parquet.py b/pandas/io/parquet.py index c1fa214c89760..d553801adff50 100644 --- a/pandas/io/parquet.py +++ b/pandas/io/parquet.py @@ -18,8 +18,8 @@ IOHandles, get_handle, is_fsspec_url, - stringify_path, is_url, + stringify_path, ) From eed89154269a266c53f54a373eae0cd8696b4d26 Mon Sep 17 00:00:00 2001 From: cdknox <> Date: Thu, 3 Dec 2020 17:44:36 -0500 Subject: [PATCH 22/41] TST:fix fastparquet tests GH36688 --- pandas/tests/io/test_common.py | 35 ++++++++++++++++++++++++++++------ 1 file changed, 29 insertions(+), 6 deletions(-) diff --git a/pandas/tests/io/test_common.py b/pandas/tests/io/test_common.py index 0a60fe0edd7fa..a6c8a06115e5a 100644 --- a/pandas/tests/io/test_common.py +++ b/pandas/tests/io/test_common.py @@ -503,13 +503,36 @@ def do_GET(self): self.write_back_bytes(response_bytes) -class ParquetUserAgentResponder(BaseUserAgentResponder): +class ParquetPyArrowUserAgentResponder(BaseUserAgentResponder): def do_GET(self): response_df = self.start_processing_headers() self.send_header("Content-Type", "application/octet-stream") self.end_headers() - response_bytes = response_df.to_parquet(index=False) + response_bytes = response_df.to_parquet(index=False, engine="pyarrow") + + self.write_back_bytes(response_bytes) + + +class ParquetFastParquetUserAgentResponder(BaseUserAgentResponder): + def do_GET(self): + response_df = self.start_processing_headers() + self.send_header("Content-Type", "application/octet-stream") + self.end_headers() + + # the fastparquet engine doesn't like to write to a buffer + # it can do it via the open_with function being set appropriately + # however it automatically calls the close method and wipes the buffer + # so just overwrite that attribute on this instance to not do that + def dummy_close(): + pass + + bio = BytesIO() + bio.close = dummy_close + response_df.to_parquet( + "none", index=False, engine="fastparquet", open_with=lambda x, y: bio + ) + response_bytes = bio.getvalue() self.write_back_bytes(response_bytes) @@ -559,8 +582,8 @@ def do_GET(self): [ (CSVUserAgentResponder, pd.read_csv, 34259, None), (JSONUserAgentResponder, pd.read_json, 34260, None), - (ParquetUserAgentResponder, pd.read_parquet, 34268, "pyarrow"), - (ParquetUserAgentResponder, pd.read_parquet, 34273, "fastparquet"), + (ParquetPyArrowUserAgentResponder, pd.read_parquet, 34268, "pyarrow"), + (ParquetFastParquetUserAgentResponder, pd.read_parquet, 34273, "fastparquet"), (PickleUserAgentResponder, pd.read_pickle, 34271, None), (StataUserAgentResponder, pd.read_stata, 34272, None), (GzippedCSVUserAgentResponder, pd.read_csv, 34261, None), @@ -592,8 +615,8 @@ def test_server_and_default_headers(responder, read_method, port, parquet_engine [ (CSVUserAgentResponder, pd.read_csv, 34263, None), (JSONUserAgentResponder, pd.read_json, 34264, None), - (ParquetUserAgentResponder, pd.read_parquet, 34270, "pyarrow"), - (ParquetUserAgentResponder, pd.read_parquet, 34270, "fastparquet"), + (ParquetPyArrowUserAgentResponder, pd.read_parquet, 34270, "pyarrow"), + (ParquetFastParquetUserAgentResponder, pd.read_parquet, 34270, "fastparquet"), (PickleUserAgentResponder, pd.read_pickle, 34273, None), (StataUserAgentResponder, pd.read_stata, 34274, None), (GzippedCSVUserAgentResponder, pd.read_csv, 34265, None), From 75573a42873b9a54b10bd1a970f58b94a6e17bef Mon Sep 17 00:00:00 2001 From: cdknox <> Date: Thu, 3 Dec 2020 18:11:47 -0500 Subject: [PATCH 23/41] CLN:removed blank line at end of docstring GH36688 --- pandas/core/shared_docs.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pandas/core/shared_docs.py b/pandas/core/shared_docs.py index 53f669daa95bd..6d3249802ee5e 100644 --- a/pandas/core/shared_docs.py +++ b/pandas/core/shared_docs.py @@ -386,5 +386,4 @@ host, port, username, password, etc. For HTTP(S) URLs the key-value pairs are forwarded to ``urllib`` as header options. For other URLs (e.g. starting with "s3://", and "gcs://") the key-value pairs are forwarded to - ``fsspec``. Please see ``fsspec`` and ``urllib`` for more details. - """ + ``fsspec``. Please see ``fsspec`` and ``urllib`` for more details.""" From dc596c6c17bb96ce125284bb5e77af1eec8dfa32 Mon Sep 17 00:00:00 2001 From: cdknox <> Date: Thu, 3 Dec 2020 18:23:40 -0500 Subject: [PATCH 24/41] CLN:removed excess newlines GH36688 --- pandas/tests/io/test_common.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/pandas/tests/io/test_common.py b/pandas/tests/io/test_common.py index a6c8a06115e5a..16dc13b3685a4 100644 --- a/pandas/tests/io/test_common.py +++ b/pandas/tests/io/test_common.py @@ -7,10 +7,8 @@ import mmap import os from pathlib import Path - import threading - import pytest from pandas.compat import is_platform_windows From e27e3a9291b0636383f4d4fdc5f928e5b1baf66b Mon Sep 17 00:00:00 2001 From: cdknox <> Date: Thu, 3 Dec 2020 19:54:01 -0500 Subject: [PATCH 25/41] CLN:fixed flake8 issues GH36688 --- pandas/tests/io/test_common.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pandas/tests/io/test_common.py b/pandas/tests/io/test_common.py index 16dc13b3685a4..6ab0901603316 100644 --- a/pandas/tests/io/test_common.py +++ b/pandas/tests/io/test_common.py @@ -657,7 +657,7 @@ def test_server_and_custom_headers(responder, read_method, port, parquet_engine) (AllHeaderCSVResponder, pd.read_csv, 34267), ], ) -def test_server_and_custom_headers(responder, read_method, port): +def test_server_and_all_custom_headers(responder, read_method, port): custom_user_agent = "Super Cool One" custom_auth_token = "Super Secret One" storage_options = { @@ -683,7 +683,7 @@ def test_server_and_custom_headers(responder, read_method, port): df_http = df_http[["0", "1"]] keys = list(storage_options.keys()) df_true = pd.DataFrame( - {"0": [k for k in keys], "1": [storage_options[k] for k in keys]} + {"0": keys, "1": [storage_options[k] for k in keys]} ) df_true = df_true.sort_values(["0"]) df_true = df_true.reset_index().drop(["index"], axis=1) @@ -707,6 +707,6 @@ def test_to_parquet_to_disk_with_storage_options(engine): true_df = pd.DataFrame({"column_name": ["column_value"]}) with pytest.raises(ValueError): - df_parquet_bytes = true_df.to_parquet( + true_df.to_parquet( "/tmp/junk.parquet", storage_options=headers, engine=engine ) From 734c9d377cc7b9cc88e66f53a5fbff9a8356f31c Mon Sep 17 00:00:00 2001 From: cdknox <> Date: Thu, 3 Dec 2020 23:39:30 -0500 Subject: [PATCH 26/41] TST:renamed a test that was getting clobbered and fixed the logic GH36688 --- pandas/tests/io/test_common.py | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/pandas/tests/io/test_common.py b/pandas/tests/io/test_common.py index 6ab0901603316..6294701a9b377 100644 --- a/pandas/tests/io/test_common.py +++ b/pandas/tests/io/test_common.py @@ -555,7 +555,7 @@ def do_GET(self): self.end_headers() bio = BytesIO() - response_df.to_stata(bio) + response_df.to_stata(bio, write_index=False) response_bytes = bio.getvalue() self.write_back_bytes(response_bytes) @@ -614,7 +614,7 @@ def test_server_and_default_headers(responder, read_method, port, parquet_engine (CSVUserAgentResponder, pd.read_csv, 34263, None), (JSONUserAgentResponder, pd.read_json, 34264, None), (ParquetPyArrowUserAgentResponder, pd.read_parquet, 34270, "pyarrow"), - (ParquetFastParquetUserAgentResponder, pd.read_parquet, 34270, "fastparquet"), + (ParquetFastParquetUserAgentResponder, pd.read_parquet, 34275, "fastparquet"), (PickleUserAgentResponder, pd.read_pickle, 34273, None), (StataUserAgentResponder, pd.read_stata, 34274, None), (GzippedCSVUserAgentResponder, pd.read_csv, 34265, None), @@ -624,6 +624,7 @@ def test_server_and_default_headers(responder, read_method, port, parquet_engine def test_server_and_custom_headers(responder, read_method, port, parquet_engine): if read_method is pd.read_parquet: pytest.importorskip(parquet_engine) + custom_user_agent = "Super Cool One" df_true = pd.DataFrame({"header": [custom_user_agent]}) server = http.server.HTTPServer(("localhost", port), responder) @@ -641,13 +642,13 @@ def test_server_and_custom_headers(responder, read_method, port, parquet_engine) storage_options={"User-Agent": custom_user_agent}, engine=parquet_engine, ) - df_http = read_method(f"http://localhost:{port}", engine=parquet_engine) server.shutdown() except Exception: df_http = pd.DataFrame({"header": []}) server.shutdown() server.server_close() server_thread.join() + tm.assert_frame_equal(df_true, df_http) @@ -682,9 +683,7 @@ def test_server_and_all_custom_headers(responder, read_method, port): df_http = df_http.sort_values(["0"]).reset_index() df_http = df_http[["0", "1"]] keys = list(storage_options.keys()) - df_true = pd.DataFrame( - {"0": keys, "1": [storage_options[k] for k in keys]} - ) + df_true = pd.DataFrame({"0": keys, "1": [storage_options[k] for k in keys]}) df_true = df_true.sort_values(["0"]) df_true = df_true.reset_index().drop(["index"], axis=1) tm.assert_frame_equal(df_true, df_http) @@ -707,6 +706,4 @@ def test_to_parquet_to_disk_with_storage_options(engine): true_df = pd.DataFrame({"column_name": ["column_value"]}) with pytest.raises(ValueError): - true_df.to_parquet( - "/tmp/junk.parquet", storage_options=headers, engine=engine - ) + true_df.to_parquet("/tmp/junk.parquet", storage_options=headers, engine=engine) From 8a5c5a39fa5ededec08b2cc28bdf2a44521b1309 Mon Sep 17 00:00:00 2001 From: cdknox <> Date: Fri, 4 Dec 2020 01:02:30 -0500 Subject: [PATCH 27/41] CLN:try to silence mypy error via renaming GH36688 --- pandas/io/common.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pandas/io/common.py b/pandas/io/common.py index a31c499d7129e..ccc6e7bcb3847 100644 --- a/pandas/io/common.py +++ b/pandas/io/common.py @@ -285,8 +285,8 @@ def _get_filepath_or_buffer( import urllib.request # assuming storage_options is to be interpretted as headers - req = urllib.request.Request(filepath_or_buffer, headers=storage_options) - req = urlopen(req) + req_info = urllib.request.Request(filepath_or_buffer, headers=storage_options) + req = urlopen(req_info) content_encoding = req.headers.get("Content-Encoding", None) if content_encoding == "gzip": # Override compression based on Content-Encoding header From 978d94a3deb253d8b686c4956ab67b60326cd7bd Mon Sep 17 00:00:00 2001 From: cdknox <> Date: Fri, 4 Dec 2020 01:22:37 -0500 Subject: [PATCH 28/41] TST:pytest.importorfail replaced with pytest.skip GH36688 --- pandas/tests/io/test_common.py | 27 +++++++++++++++++++++++---- 1 file changed, 23 insertions(+), 4 deletions(-) diff --git a/pandas/tests/io/test_common.py b/pandas/tests/io/test_common.py index 6294701a9b377..5d51e259955a1 100644 --- a/pandas/tests/io/test_common.py +++ b/pandas/tests/io/test_common.py @@ -589,8 +589,18 @@ def do_GET(self): ], ) def test_server_and_default_headers(responder, read_method, port, parquet_engine): - if read_method is pd.read_parquet: - pytest.importorskip(parquet_engine) + if parquet_engine is not None: + if parquet_engine == "pyarrow": + try: + import pyarrow # noqa:F401 + except ImportError: + pytest.skip("pyarrow is not installed") + if parquet_engine == "fastparquet": + try: + import fastparquet # noqa:F401 + except ImportError: + pytest.skip("fastparquet is not installed") + server = http.server.HTTPServer(("localhost", port), responder) server_thread = threading.Thread(target=server.serve_forever) server_thread.start() @@ -622,8 +632,17 @@ def test_server_and_default_headers(responder, read_method, port, parquet_engine ], ) def test_server_and_custom_headers(responder, read_method, port, parquet_engine): - if read_method is pd.read_parquet: - pytest.importorskip(parquet_engine) + if parquet_engine is not None: + if parquet_engine == "pyarrow": + try: + import pyarrow # noqa:F401 + except ImportError: + pytest.skip("pyarrow is not installed") + if parquet_engine == "fastparquet": + try: + import fastparquet # noqa:F401 + except ImportError: + pytest.skip("fastparquet is not installed") custom_user_agent = "Super Cool One" df_true = pd.DataFrame({"header": [custom_user_agent]}) From 807eb25825cbd0cc52313b225fbc8ffbbea702f0 Mon Sep 17 00:00:00 2001 From: cdknox <> Date: Fri, 4 Dec 2020 01:25:52 -0500 Subject: [PATCH 29/41] TST:content of dataframe on error made more useful GH36688 --- pandas/tests/io/test_common.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pandas/tests/io/test_common.py b/pandas/tests/io/test_common.py index 5d51e259955a1..2e7faf2e05c5b 100644 --- a/pandas/tests/io/test_common.py +++ b/pandas/tests/io/test_common.py @@ -662,8 +662,8 @@ def test_server_and_custom_headers(responder, read_method, port, parquet_engine) engine=parquet_engine, ) server.shutdown() - except Exception: - df_http = pd.DataFrame({"header": []}) + except Exception as e: + df_http = pd.DataFrame({"header": [str(e)]}) server.shutdown() server.server_close() server_thread.join() From 44c2869bde27cb46fae50f923bbf51e67cfb5855 Mon Sep 17 00:00:00 2001 From: cdknox <> Date: Fri, 4 Dec 2020 11:40:05 -0500 Subject: [PATCH 30/41] CLN:fixed flake8 error GH36688 --- doc/source/user_guide/io.rst | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/doc/source/user_guide/io.rst b/doc/source/user_guide/io.rst index 62533f9fe2182..a47349774f0fb 100644 --- a/doc/source/user_guide/io.rst +++ b/doc/source/user_guide/io.rst @@ -1634,10 +1634,10 @@ of header key value mappings to the ``storage_options`` keyword argument as show headers = {"User-Agent": "pandas"} df = pd.read_csv( - "https://download.bls.gov/pub/time.series/cu/cu.item", - sep="\t", - storage_options=headers - ) + "https://download.bls.gov/pub/time.series/cu/cu.item", + sep="\t", + storage_options=headers + ) All URLs which are not local files or HTTP(s) are handled by `fsspec`_, if installed, and its various filesystem implementations From 01ce3ae4ef98b011462fb5004d654f6fe53fb614 Mon Sep 17 00:00:00 2001 From: cdknox <> Date: Fri, 4 Dec 2020 12:28:49 -0500 Subject: [PATCH 31/41] TST: windows fastparquet error needs raised for troubleshooting GH36688 --- pandas/tests/io/test_common.py | 46 ++++++++++++++++++++++------------ 1 file changed, 30 insertions(+), 16 deletions(-) diff --git a/pandas/tests/io/test_common.py b/pandas/tests/io/test_common.py index 2e7faf2e05c5b..3cc22cf49b479 100644 --- a/pandas/tests/io/test_common.py +++ b/pandas/tests/io/test_common.py @@ -649,22 +649,36 @@ def test_server_and_custom_headers(responder, read_method, port, parquet_engine) server = http.server.HTTPServer(("localhost", port), responder) server_thread = threading.Thread(target=server.serve_forever) server_thread.start() - try: - if parquet_engine is None: - df_http = read_method( - f"http://localhost:{port}", - storage_options={"User-Agent": custom_user_agent}, - ) - else: - df_http = read_method( - f"http://localhost:{port}", - storage_options={"User-Agent": custom_user_agent}, - engine=parquet_engine, - ) - server.shutdown() - except Exception as e: - df_http = pd.DataFrame({"header": [str(e)]}) - server.shutdown() + # try: + # if parquet_engine is None: + # df_http = read_method( + # f"http://localhost:{port}", + # storage_options={"User-Agent": custom_user_agent}, + # ) + # else: + # df_http = read_method( + # f"http://localhost:{port}", + # storage_options={"User-Agent": custom_user_agent}, + # engine=parquet_engine, + # ) + # server.shutdown() + # except Exception as e: + # df_http = pd.DataFrame({"header": [str(e)]}) + # server.shutdown() + + if parquet_engine is None: + df_http = read_method( + f"http://localhost:{port}", + storage_options={"User-Agent": custom_user_agent}, + ) + else: + df_http = read_method( + f"http://localhost:{port}", + storage_options={"User-Agent": custom_user_agent}, + engine=parquet_engine, + ) + server.shutdown() + server.server_close() server_thread.join() From 13bc7757e8744de74a01b45b07bfeebe798fa773 Mon Sep 17 00:00:00 2001 From: cdknox <> Date: Fri, 4 Dec 2020 12:41:23 -0500 Subject: [PATCH 32/41] CLN:fix for flake8 GH36688 --- doc/source/whatsnew/v1.2.0.rst | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/doc/source/whatsnew/v1.2.0.rst b/doc/source/whatsnew/v1.2.0.rst index 934ff58dde654..f5f3752b1e211 100644 --- a/doc/source/whatsnew/v1.2.0.rst +++ b/doc/source/whatsnew/v1.2.0.rst @@ -266,10 +266,10 @@ For example: headers = {"User-Agent": "pandas"} df = pd.read_csv( - "https://download.bls.gov/pub/time.series/cu/cu.item", - sep="\t", - storage_options=headers - ) + "https://download.bls.gov/pub/time.series/cu/cu.item", + sep="\t", + storage_options=headers + ) .. _whatsnew_120.enhancements.other: From 69155170c2b4828240f25b1cadded062d7449269 Mon Sep 17 00:00:00 2001 From: cdknox <> Date: Fri, 4 Dec 2020 17:11:35 -0500 Subject: [PATCH 33/41] TST:changed compression used in to_parquet from 'snappy' to None GH36688 The default compression on to_parquet is not installed on one of the test environments. It should work just the same with no compression anyway. --- pandas/tests/io/test_common.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/pandas/tests/io/test_common.py b/pandas/tests/io/test_common.py index 3cc22cf49b479..22f2044b6b28b 100644 --- a/pandas/tests/io/test_common.py +++ b/pandas/tests/io/test_common.py @@ -527,8 +527,13 @@ def dummy_close(): bio = BytesIO() bio.close = dummy_close + # windows py38 np18 doesn't have 'snappy' installed response_df.to_parquet( - "none", index=False, engine="fastparquet", open_with=lambda x, y: bio + "none", + index=False, + engine="fastparquet", + compression=None, + open_with=lambda x, y: bio, ) response_bytes = bio.getvalue() From 186b0a44f9fe375fa2145dceff265a8fc16cd18a Mon Sep 17 00:00:00 2001 From: cdknox <> Date: Fri, 4 Dec 2020 17:23:31 -0500 Subject: [PATCH 34/41] TST:allowed exceptions to be raised via removing a try except block GH36688 --- pandas/tests/io/test_common.py | 52 +++++++++++----------------------- 1 file changed, 17 insertions(+), 35 deletions(-) diff --git a/pandas/tests/io/test_common.py b/pandas/tests/io/test_common.py index 22f2044b6b28b..4e98a168b8616 100644 --- a/pandas/tests/io/test_common.py +++ b/pandas/tests/io/test_common.py @@ -419,7 +419,9 @@ def test_is_fsspec_url(): class BaseUserAgentResponder(http.server.BaseHTTPRequestHandler): """ Base class for setting up a server that can be set up to respond - with a particular file format with accompanying content-type headers + with a particular file format with accompanying content-type headers. + The interfaces on the different io methods are different enough + that this seemed logical to do. """ def start_processing_headers(self): @@ -609,15 +611,11 @@ def test_server_and_default_headers(responder, read_method, port, parquet_engine server = http.server.HTTPServer(("localhost", port), responder) server_thread = threading.Thread(target=server.serve_forever) server_thread.start() - try: - if parquet_engine is None: - df_http = read_method(f"http://localhost:{port}") - else: - df_http = read_method(f"http://localhost:{port}", engine=parquet_engine) - server.shutdown() - except Exception: - df_http = pd.DataFrame({"header": []}) - server.shutdown() + if parquet_engine is None: + df_http = read_method(f"http://localhost:{port}") + else: + df_http = read_method(f"http://localhost:{port}", engine=parquet_engine) + server.shutdown() server.server_close() server_thread.join() assert not df_http.empty @@ -654,22 +652,6 @@ def test_server_and_custom_headers(responder, read_method, port, parquet_engine) server = http.server.HTTPServer(("localhost", port), responder) server_thread = threading.Thread(target=server.serve_forever) server_thread.start() - # try: - # if parquet_engine is None: - # df_http = read_method( - # f"http://localhost:{port}", - # storage_options={"User-Agent": custom_user_agent}, - # ) - # else: - # df_http = read_method( - # f"http://localhost:{port}", - # storage_options={"User-Agent": custom_user_agent}, - # engine=parquet_engine, - # ) - # server.shutdown() - # except Exception as e: - # df_http = pd.DataFrame({"header": [str(e)]}) - # server.shutdown() if parquet_engine is None: df_http = read_method( @@ -706,24 +688,24 @@ def test_server_and_all_custom_headers(responder, read_method, port): server = http.server.HTTPServer(("localhost", port), responder) server_thread = threading.Thread(target=server.serve_forever) server_thread.start() - try: - df_http = read_method( - f"http://localhost:{port}", - storage_options=storage_options, - ) - server.shutdown() - except Exception: - df_http = pd.DataFrame({"0": [], "1": []}) - server.shutdown() + + df_http = read_method( + f"http://localhost:{port}", + storage_options=storage_options, + ) + server.shutdown() server.server_close() server_thread.join() + df_http = df_http[df_http["0"].isin(storage_options.keys())] df_http = df_http.sort_values(["0"]).reset_index() df_http = df_http[["0", "1"]] + keys = list(storage_options.keys()) df_true = pd.DataFrame({"0": keys, "1": [storage_options[k] for k in keys]}) df_true = df_true.sort_values(["0"]) df_true = df_true.reset_index().drop(["index"], axis=1) + tm.assert_frame_equal(df_true, df_http) From 88e9600ed82272cd31aa878f169dfd45150e8ef7 Mon Sep 17 00:00:00 2001 From: cdknox <> Date: Fri, 4 Dec 2020 18:04:32 -0500 Subject: [PATCH 35/41] TST:replaced try except with pytest.importorskip GH36688 --- pandas/tests/io/test_common.py | 22 ++-------------------- 1 file changed, 2 insertions(+), 20 deletions(-) diff --git a/pandas/tests/io/test_common.py b/pandas/tests/io/test_common.py index 4e98a168b8616..b35c197ab77ac 100644 --- a/pandas/tests/io/test_common.py +++ b/pandas/tests/io/test_common.py @@ -597,16 +597,7 @@ def do_GET(self): ) def test_server_and_default_headers(responder, read_method, port, parquet_engine): if parquet_engine is not None: - if parquet_engine == "pyarrow": - try: - import pyarrow # noqa:F401 - except ImportError: - pytest.skip("pyarrow is not installed") - if parquet_engine == "fastparquet": - try: - import fastparquet # noqa:F401 - except ImportError: - pytest.skip("fastparquet is not installed") + pytest.importorskip(parquet_engine) server = http.server.HTTPServer(("localhost", port), responder) server_thread = threading.Thread(target=server.serve_forever) @@ -636,16 +627,7 @@ def test_server_and_default_headers(responder, read_method, port, parquet_engine ) def test_server_and_custom_headers(responder, read_method, port, parquet_engine): if parquet_engine is not None: - if parquet_engine == "pyarrow": - try: - import pyarrow # noqa:F401 - except ImportError: - pytest.skip("pyarrow is not installed") - if parquet_engine == "fastparquet": - try: - import fastparquet # noqa:F401 - except ImportError: - pytest.skip("fastparquet is not installed") + pytest.importorskip(parquet_engine) custom_user_agent = "Super Cool One" df_true = pd.DataFrame({"header": [custom_user_agent]}) From 2a05d0f4233ced60b545e22e9c136b1c93064ce9 Mon Sep 17 00:00:00 2001 From: cdknox <> Date: Sat, 12 Dec 2020 23:18:34 -0500 Subject: [PATCH 36/41] CLN:removed dict() in favor of {} GH36688 --- pandas/io/common.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pandas/io/common.py b/pandas/io/common.py index ccc6e7bcb3847..250c9422213e7 100644 --- a/pandas/io/common.py +++ b/pandas/io/common.py @@ -279,7 +279,8 @@ def _get_filepath_or_buffer( # TODO: fsspec can also handle HTTP via requests, but leaving this # unchanged. using fsspec appears to break the ability to infer if the # server responded with gzipped data - storage_options = storage_options or dict() + storage_options = storage_options or {} + # waiting until now for importing to match intended lazy logic of # urlopen function defined elsewhere in this module import urllib.request From 268e06af5c196ceb41a8c531f8877530758711ea Mon Sep 17 00:00:00 2001 From: cdknox <> Date: Sun, 13 Dec 2020 12:55:32 -0500 Subject: [PATCH 37/41] DOC: changed potentially included version from 1.2.0 to 1.3.0 GH36688 --- doc/source/user_guide/io.rst | 2 +- doc/source/whatsnew/v1.2.0.rst | 20 -------------------- doc/source/whatsnew/v1.3.0.rst | 20 ++++++++++++++++++++ pandas/io/parquet.py | 2 +- 4 files changed, 22 insertions(+), 22 deletions(-) diff --git a/doc/source/user_guide/io.rst b/doc/source/user_guide/io.rst index 6de58bbc8cd99..b04abf512fbeb 100644 --- a/doc/source/user_guide/io.rst +++ b/doc/source/user_guide/io.rst @@ -1627,7 +1627,7 @@ functions - the following example shows reading a CSV file: df = pd.read_csv("https://download.bls.gov/pub/time.series/cu/cu.item", sep="\t") -.. versionadded:: 1.2.0 +.. versionadded:: 1.3.0 A custom header can be sent alongside HTTP(s) requests by passing a dictionary of header key value mappings to the ``storage_options`` keyword argument as shown below: diff --git a/doc/source/whatsnew/v1.2.0.rst b/doc/source/whatsnew/v1.2.0.rst index 98e950630f61e..bc7f5b8174573 100644 --- a/doc/source/whatsnew/v1.2.0.rst +++ b/doc/source/whatsnew/v1.2.0.rst @@ -266,26 +266,6 @@ Additionally ``mean`` supports execution via `Numba ` the ``engine`` and ``engine_kwargs`` arguments. Numba must be installed as an optional dependency to use this feature. -.. _whatsnew_120.read_csv_json_http_headers: - -Custom HTTP(s) headers when reading csv or json files -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - -When reading from a remote URL that is not handled by fsspec (ie. HTTP and -HTTPS) the dictionary passed to ``storage_options`` will be used to create the -headers included in the request. This can be used to control the User-Agent -header or send other custom headers (:issue:`36688`). -For example: - -.. ipython:: python - - headers = {"User-Agent": "pandas"} - df = pd.read_csv( - "https://download.bls.gov/pub/time.series/cu/cu.item", - sep="\t", - storage_options=headers - ) - .. _whatsnew_120.enhancements.other: Other enhancements diff --git a/doc/source/whatsnew/v1.3.0.rst b/doc/source/whatsnew/v1.3.0.rst index 26e548f519ecd..188ef83244be8 100644 --- a/doc/source/whatsnew/v1.3.0.rst +++ b/doc/source/whatsnew/v1.3.0.rst @@ -13,6 +13,26 @@ including other versions of pandas. Enhancements ~~~~~~~~~~~~ +.. _whatsnew_130.read_csv_json_http_headers: + +Custom HTTP(s) headers when reading csv or json files +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +When reading from a remote URL that is not handled by fsspec (ie. HTTP and +HTTPS) the dictionary passed to ``storage_options`` will be used to create the +headers included in the request. This can be used to control the User-Agent +header or send other custom headers (:issue:`36688`). +For example: + +.. ipython:: python + + headers = {"User-Agent": "pandas"} + df = pd.read_csv( + "https://download.bls.gov/pub/time.series/cu/cu.item", + sep="\t", + storage_options=headers + ) + .. _whatsnew_130.enhancements.other: diff --git a/pandas/io/parquet.py b/pandas/io/parquet.py index d553801adff50..44b58f244a2ad 100644 --- a/pandas/io/parquet.py +++ b/pandas/io/parquet.py @@ -456,7 +456,7 @@ def read_parquet( {storage_options} - .. versionadded:: 1.2.0 + .. versionadded:: 1.3.0 use_nullable_dtypes : bool, default False If True, use dtypes that use ``pd.NA`` as missing value indicator From 565197f6cde49c91d50d453706f77db99178af81 Mon Sep 17 00:00:00 2001 From: cdknox <> Date: Sun, 13 Dec 2020 13:09:56 -0500 Subject: [PATCH 38/41] TST:user agent tests moved from test_common to their own file GH36688 --- pandas/tests/io/test_common.py | 300 +--------------------------- pandas/tests/io/test_user_agent.py | 307 +++++++++++++++++++++++++++++ 2 files changed, 308 insertions(+), 299 deletions(-) create mode 100644 pandas/tests/io/test_user_agent.py diff --git a/pandas/tests/io/test_common.py b/pandas/tests/io/test_common.py index 223ab8c385ce0..c3b21daa0ac04 100644 --- a/pandas/tests/io/test_common.py +++ b/pandas/tests/io/test_common.py @@ -1,13 +1,10 @@ """ Tests for the pandas.io.common functionalities """ -import gzip -import http.server -from io import BytesIO, StringIO +from io import StringIO import mmap import os from pathlib import Path -import threading import pytest @@ -414,298 +411,3 @@ def test_is_fsspec_url(): assert not icom.is_fsspec_url("random:pandas/somethingelse.com") assert not icom.is_fsspec_url("/local/path") assert not icom.is_fsspec_url("relative/local/path") - - -class BaseUserAgentResponder(http.server.BaseHTTPRequestHandler): - """ - Base class for setting up a server that can be set up to respond - with a particular file format with accompanying content-type headers. - The interfaces on the different io methods are different enough - that this seemed logical to do. - """ - - def start_processing_headers(self): - """ - shared logic at the start of a GET request - """ - self.send_response(200) - self.requested_from_user_agent = self.headers["User-Agent"] - response_df = pd.DataFrame( - { - "header": [self.requested_from_user_agent], - } - ) - return response_df - - def gzip_bytes(self, response_bytes): - """ - some web servers will send back gzipped files to save bandwidth - """ - bio = BytesIO() - zipper = gzip.GzipFile(fileobj=bio, mode="w") - zipper.write(response_bytes) - zipper.close() - response_bytes = bio.getvalue() - return response_bytes - - def write_back_bytes(self, response_bytes): - """ - shared logic at the end of a GET request - """ - self.wfile.write(response_bytes) - - -class CSVUserAgentResponder(BaseUserAgentResponder): - def do_GET(self): - response_df = self.start_processing_headers() - - self.send_header("Content-Type", "text/csv") - self.end_headers() - - response_bytes = response_df.to_csv(index=False).encode("utf-8") - self.write_back_bytes(response_bytes) - - -class GzippedCSVUserAgentResponder(BaseUserAgentResponder): - def do_GET(self): - response_df = self.start_processing_headers() - self.send_header("Content-Type", "text/csv") - self.send_header("Content-Encoding", "gzip") - self.end_headers() - - response_bytes = response_df.to_csv(index=False).encode("utf-8") - response_bytes = self.gzip_bytes(response_bytes) - - self.write_back_bytes(response_bytes) - - -class JSONUserAgentResponder(BaseUserAgentResponder): - def do_GET(self): - response_df = self.start_processing_headers() - self.send_header("Content-Type", "application/json") - self.end_headers() - - response_bytes = response_df.to_json().encode("utf-8") - - self.write_back_bytes(response_bytes) - - -class GzippedJSONUserAgentResponder(BaseUserAgentResponder): - def do_GET(self): - response_df = self.start_processing_headers() - self.send_header("Content-Type", "application/json") - self.send_header("Content-Encoding", "gzip") - self.end_headers() - - response_bytes = response_df.to_json().encode("utf-8") - response_bytes = self.gzip_bytes(response_bytes) - - self.write_back_bytes(response_bytes) - - -class ParquetPyArrowUserAgentResponder(BaseUserAgentResponder): - def do_GET(self): - response_df = self.start_processing_headers() - self.send_header("Content-Type", "application/octet-stream") - self.end_headers() - - response_bytes = response_df.to_parquet(index=False, engine="pyarrow") - - self.write_back_bytes(response_bytes) - - -class ParquetFastParquetUserAgentResponder(BaseUserAgentResponder): - def do_GET(self): - response_df = self.start_processing_headers() - self.send_header("Content-Type", "application/octet-stream") - self.end_headers() - - # the fastparquet engine doesn't like to write to a buffer - # it can do it via the open_with function being set appropriately - # however it automatically calls the close method and wipes the buffer - # so just overwrite that attribute on this instance to not do that - def dummy_close(): - pass - - bio = BytesIO() - bio.close = dummy_close - # windows py38 np18 doesn't have 'snappy' installed - response_df.to_parquet( - "none", - index=False, - engine="fastparquet", - compression=None, - open_with=lambda x, y: bio, - ) - response_bytes = bio.getvalue() - - self.write_back_bytes(response_bytes) - - -class PickleUserAgentResponder(BaseUserAgentResponder): - def do_GET(self): - response_df = self.start_processing_headers() - self.send_header("Content-Type", "application/octet-stream") - self.end_headers() - - bio = BytesIO() - response_df.to_pickle(bio) - response_bytes = bio.getvalue() - - self.write_back_bytes(response_bytes) - - -class StataUserAgentResponder(BaseUserAgentResponder): - def do_GET(self): - response_df = self.start_processing_headers() - self.send_header("Content-Type", "application/octet-stream") - self.end_headers() - - bio = BytesIO() - response_df.to_stata(bio, write_index=False) - response_bytes = bio.getvalue() - - self.write_back_bytes(response_bytes) - - -class AllHeaderCSVResponder(http.server.BaseHTTPRequestHandler): - """ - Send all request headers back for checking round trip - """ - - def do_GET(self): - response_df = pd.DataFrame(self.headers.items()) - self.send_response(200) - self.send_header("Content-Type", "text/csv") - self.end_headers() - response_bytes = response_df.to_csv(index=False).encode("utf-8") - self.wfile.write(response_bytes) - - -@pytest.mark.parametrize( - "responder, read_method, port, parquet_engine", - [ - (CSVUserAgentResponder, pd.read_csv, 34259, None), - (JSONUserAgentResponder, pd.read_json, 34260, None), - (ParquetPyArrowUserAgentResponder, pd.read_parquet, 34268, "pyarrow"), - (ParquetFastParquetUserAgentResponder, pd.read_parquet, 34273, "fastparquet"), - (PickleUserAgentResponder, pd.read_pickle, 34271, None), - (StataUserAgentResponder, pd.read_stata, 34272, None), - (GzippedCSVUserAgentResponder, pd.read_csv, 34261, None), - (GzippedJSONUserAgentResponder, pd.read_json, 34262, None), - ], -) -def test_server_and_default_headers(responder, read_method, port, parquet_engine): - if parquet_engine is not None: - pytest.importorskip(parquet_engine) - - server = http.server.HTTPServer(("localhost", port), responder) - server_thread = threading.Thread(target=server.serve_forever) - server_thread.start() - if parquet_engine is None: - df_http = read_method(f"http://localhost:{port}") - else: - df_http = read_method(f"http://localhost:{port}", engine=parquet_engine) - server.shutdown() - server.server_close() - server_thread.join() - assert not df_http.empty - - -@pytest.mark.parametrize( - "responder, read_method, port, parquet_engine", - [ - (CSVUserAgentResponder, pd.read_csv, 34263, None), - (JSONUserAgentResponder, pd.read_json, 34264, None), - (ParquetPyArrowUserAgentResponder, pd.read_parquet, 34270, "pyarrow"), - (ParquetFastParquetUserAgentResponder, pd.read_parquet, 34275, "fastparquet"), - (PickleUserAgentResponder, pd.read_pickle, 34273, None), - (StataUserAgentResponder, pd.read_stata, 34274, None), - (GzippedCSVUserAgentResponder, pd.read_csv, 34265, None), - (GzippedJSONUserAgentResponder, pd.read_json, 34266, None), - ], -) -def test_server_and_custom_headers(responder, read_method, port, parquet_engine): - if parquet_engine is not None: - pytest.importorskip(parquet_engine) - - custom_user_agent = "Super Cool One" - df_true = pd.DataFrame({"header": [custom_user_agent]}) - server = http.server.HTTPServer(("localhost", port), responder) - server_thread = threading.Thread(target=server.serve_forever) - server_thread.start() - - if parquet_engine is None: - df_http = read_method( - f"http://localhost:{port}", - storage_options={"User-Agent": custom_user_agent}, - ) - else: - df_http = read_method( - f"http://localhost:{port}", - storage_options={"User-Agent": custom_user_agent}, - engine=parquet_engine, - ) - server.shutdown() - - server.server_close() - server_thread.join() - - tm.assert_frame_equal(df_true, df_http) - - -@pytest.mark.parametrize( - "responder, read_method, port", - [ - (AllHeaderCSVResponder, pd.read_csv, 34267), - ], -) -def test_server_and_all_custom_headers(responder, read_method, port): - custom_user_agent = "Super Cool One" - custom_auth_token = "Super Secret One" - storage_options = { - "User-Agent": custom_user_agent, - "Auth": custom_auth_token, - } - server = http.server.HTTPServer(("localhost", port), responder) - server_thread = threading.Thread(target=server.serve_forever) - server_thread.start() - - df_http = read_method( - f"http://localhost:{port}", - storage_options=storage_options, - ) - server.shutdown() - server.server_close() - server_thread.join() - - df_http = df_http[df_http["0"].isin(storage_options.keys())] - df_http = df_http.sort_values(["0"]).reset_index() - df_http = df_http[["0", "1"]] - - keys = list(storage_options.keys()) - df_true = pd.DataFrame({"0": keys, "1": [storage_options[k] for k in keys]}) - df_true = df_true.sort_values(["0"]) - df_true = df_true.reset_index().drop(["index"], axis=1) - - tm.assert_frame_equal(df_true, df_http) - - -@pytest.mark.parametrize( - "engine", - [ - "pyarrow", - "fastparquet", - ], -) -def test_to_parquet_to_disk_with_storage_options(engine): - headers = { - "User-Agent": "custom", - "Auth": "other_custom", - } - - pytest.importorskip(engine) - - true_df = pd.DataFrame({"column_name": ["column_value"]}) - with pytest.raises(ValueError): - true_df.to_parquet("/tmp/junk.parquet", storage_options=headers, engine=engine) diff --git a/pandas/tests/io/test_user_agent.py b/pandas/tests/io/test_user_agent.py new file mode 100644 index 0000000000000..a29d0a6ca7eed --- /dev/null +++ b/pandas/tests/io/test_user_agent.py @@ -0,0 +1,307 @@ +""" +Tests for the pandas.io.common functionalities +""" +import gzip +import http.server +from io import BytesIO +import threading + +import pytest + +import pandas as pd +import pandas._testing as tm + + +class BaseUserAgentResponder(http.server.BaseHTTPRequestHandler): + """ + Base class for setting up a server that can be set up to respond + with a particular file format with accompanying content-type headers. + The interfaces on the different io methods are different enough + that this seemed logical to do. + """ + + def start_processing_headers(self): + """ + shared logic at the start of a GET request + """ + self.send_response(200) + self.requested_from_user_agent = self.headers["User-Agent"] + response_df = pd.DataFrame( + { + "header": [self.requested_from_user_agent], + } + ) + return response_df + + def gzip_bytes(self, response_bytes): + """ + some web servers will send back gzipped files to save bandwidth + """ + bio = BytesIO() + zipper = gzip.GzipFile(fileobj=bio, mode="w") + zipper.write(response_bytes) + zipper.close() + response_bytes = bio.getvalue() + return response_bytes + + def write_back_bytes(self, response_bytes): + """ + shared logic at the end of a GET request + """ + self.wfile.write(response_bytes) + + +class CSVUserAgentResponder(BaseUserAgentResponder): + def do_GET(self): + response_df = self.start_processing_headers() + + self.send_header("Content-Type", "text/csv") + self.end_headers() + + response_bytes = response_df.to_csv(index=False).encode("utf-8") + self.write_back_bytes(response_bytes) + + +class GzippedCSVUserAgentResponder(BaseUserAgentResponder): + def do_GET(self): + response_df = self.start_processing_headers() + self.send_header("Content-Type", "text/csv") + self.send_header("Content-Encoding", "gzip") + self.end_headers() + + response_bytes = response_df.to_csv(index=False).encode("utf-8") + response_bytes = self.gzip_bytes(response_bytes) + + self.write_back_bytes(response_bytes) + + +class JSONUserAgentResponder(BaseUserAgentResponder): + def do_GET(self): + response_df = self.start_processing_headers() + self.send_header("Content-Type", "application/json") + self.end_headers() + + response_bytes = response_df.to_json().encode("utf-8") + + self.write_back_bytes(response_bytes) + + +class GzippedJSONUserAgentResponder(BaseUserAgentResponder): + def do_GET(self): + response_df = self.start_processing_headers() + self.send_header("Content-Type", "application/json") + self.send_header("Content-Encoding", "gzip") + self.end_headers() + + response_bytes = response_df.to_json().encode("utf-8") + response_bytes = self.gzip_bytes(response_bytes) + + self.write_back_bytes(response_bytes) + + +class ParquetPyArrowUserAgentResponder(BaseUserAgentResponder): + def do_GET(self): + response_df = self.start_processing_headers() + self.send_header("Content-Type", "application/octet-stream") + self.end_headers() + + response_bytes = response_df.to_parquet(index=False, engine="pyarrow") + + self.write_back_bytes(response_bytes) + + +class ParquetFastParquetUserAgentResponder(BaseUserAgentResponder): + def do_GET(self): + response_df = self.start_processing_headers() + self.send_header("Content-Type", "application/octet-stream") + self.end_headers() + + # the fastparquet engine doesn't like to write to a buffer + # it can do it via the open_with function being set appropriately + # however it automatically calls the close method and wipes the buffer + # so just overwrite that attribute on this instance to not do that + def dummy_close(): + pass + + bio = BytesIO() + bio.close = dummy_close + # windows py38 np18 doesn't have 'snappy' installed + response_df.to_parquet( + "none", + index=False, + engine="fastparquet", + compression=None, + open_with=lambda x, y: bio, + ) + response_bytes = bio.getvalue() + + self.write_back_bytes(response_bytes) + + +class PickleUserAgentResponder(BaseUserAgentResponder): + def do_GET(self): + response_df = self.start_processing_headers() + self.send_header("Content-Type", "application/octet-stream") + self.end_headers() + + bio = BytesIO() + response_df.to_pickle(bio) + response_bytes = bio.getvalue() + + self.write_back_bytes(response_bytes) + + +class StataUserAgentResponder(BaseUserAgentResponder): + def do_GET(self): + response_df = self.start_processing_headers() + self.send_header("Content-Type", "application/octet-stream") + self.end_headers() + + bio = BytesIO() + response_df.to_stata(bio, write_index=False) + response_bytes = bio.getvalue() + + self.write_back_bytes(response_bytes) + + +class AllHeaderCSVResponder(http.server.BaseHTTPRequestHandler): + """ + Send all request headers back for checking round trip + """ + + def do_GET(self): + response_df = pd.DataFrame(self.headers.items()) + self.send_response(200) + self.send_header("Content-Type", "text/csv") + self.end_headers() + response_bytes = response_df.to_csv(index=False).encode("utf-8") + self.wfile.write(response_bytes) + + +@pytest.mark.parametrize( + "responder, read_method, port, parquet_engine", + [ + (CSVUserAgentResponder, pd.read_csv, 34259, None), + (JSONUserAgentResponder, pd.read_json, 34260, None), + (ParquetPyArrowUserAgentResponder, pd.read_parquet, 34268, "pyarrow"), + (ParquetFastParquetUserAgentResponder, pd.read_parquet, 34273, "fastparquet"), + (PickleUserAgentResponder, pd.read_pickle, 34271, None), + (StataUserAgentResponder, pd.read_stata, 34272, None), + (GzippedCSVUserAgentResponder, pd.read_csv, 34261, None), + (GzippedJSONUserAgentResponder, pd.read_json, 34262, None), + ], +) +def test_server_and_default_headers(responder, read_method, port, parquet_engine): + if parquet_engine is not None: + pytest.importorskip(parquet_engine) + + server = http.server.HTTPServer(("localhost", port), responder) + server_thread = threading.Thread(target=server.serve_forever) + server_thread.start() + if parquet_engine is None: + df_http = read_method(f"http://localhost:{port}") + else: + df_http = read_method(f"http://localhost:{port}", engine=parquet_engine) + server.shutdown() + server.server_close() + server_thread.join() + assert not df_http.empty + + +@pytest.mark.parametrize( + "responder, read_method, port, parquet_engine", + [ + (CSVUserAgentResponder, pd.read_csv, 34263, None), + (JSONUserAgentResponder, pd.read_json, 34264, None), + (ParquetPyArrowUserAgentResponder, pd.read_parquet, 34270, "pyarrow"), + (ParquetFastParquetUserAgentResponder, pd.read_parquet, 34275, "fastparquet"), + (PickleUserAgentResponder, pd.read_pickle, 34273, None), + (StataUserAgentResponder, pd.read_stata, 34274, None), + (GzippedCSVUserAgentResponder, pd.read_csv, 34265, None), + (GzippedJSONUserAgentResponder, pd.read_json, 34266, None), + ], +) +def test_server_and_custom_headers(responder, read_method, port, parquet_engine): + if parquet_engine is not None: + pytest.importorskip(parquet_engine) + + custom_user_agent = "Super Cool One" + df_true = pd.DataFrame({"header": [custom_user_agent]}) + server = http.server.HTTPServer(("localhost", port), responder) + server_thread = threading.Thread(target=server.serve_forever) + server_thread.start() + + if parquet_engine is None: + df_http = read_method( + f"http://localhost:{port}", + storage_options={"User-Agent": custom_user_agent}, + ) + else: + df_http = read_method( + f"http://localhost:{port}", + storage_options={"User-Agent": custom_user_agent}, + engine=parquet_engine, + ) + server.shutdown() + + server.server_close() + server_thread.join() + + tm.assert_frame_equal(df_true, df_http) + + +@pytest.mark.parametrize( + "responder, read_method, port", + [ + (AllHeaderCSVResponder, pd.read_csv, 34267), + ], +) +def test_server_and_all_custom_headers(responder, read_method, port): + custom_user_agent = "Super Cool One" + custom_auth_token = "Super Secret One" + storage_options = { + "User-Agent": custom_user_agent, + "Auth": custom_auth_token, + } + server = http.server.HTTPServer(("localhost", port), responder) + server_thread = threading.Thread(target=server.serve_forever) + server_thread.start() + + df_http = read_method( + f"http://localhost:{port}", + storage_options=storage_options, + ) + server.shutdown() + server.server_close() + server_thread.join() + + df_http = df_http[df_http["0"].isin(storage_options.keys())] + df_http = df_http.sort_values(["0"]).reset_index() + df_http = df_http[["0", "1"]] + + keys = list(storage_options.keys()) + df_true = pd.DataFrame({"0": keys, "1": [storage_options[k] for k in keys]}) + df_true = df_true.sort_values(["0"]) + df_true = df_true.reset_index().drop(["index"], axis=1) + + tm.assert_frame_equal(df_true, df_http) + + +@pytest.mark.parametrize( + "engine", + [ + "pyarrow", + "fastparquet", + ], +) +def test_to_parquet_to_disk_with_storage_options(engine): + headers = { + "User-Agent": "custom", + "Auth": "other_custom", + } + + pytest.importorskip(engine) + + true_df = pd.DataFrame({"column_name": ["column_value"]}) + with pytest.raises(ValueError): + true_df.to_parquet("/tmp/junk.parquet", storage_options=headers, engine=engine) From 842e5941e6fe97caf9c500291fe2b14984310b9a Mon Sep 17 00:00:00 2001 From: cdknox <> Date: Sun, 13 Dec 2020 14:25:33 -0500 Subject: [PATCH 39/41] TST: used fsspec instead of patching bytesio GH36688 --- pandas/tests/io/test_user_agent.py | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/pandas/tests/io/test_user_agent.py b/pandas/tests/io/test_user_agent.py index a29d0a6ca7eed..c7dc0e76362f5 100644 --- a/pandas/tests/io/test_user_agent.py +++ b/pandas/tests/io/test_user_agent.py @@ -1,11 +1,12 @@ """ -Tests for the pandas.io.common functionalities +Tests for the pandas custom headers in http(s) requests """ import gzip import http.server from io import BytesIO import threading +import fsspec import pytest import pandas as pd @@ -120,20 +121,15 @@ def do_GET(self): # it can do it via the open_with function being set appropriately # however it automatically calls the close method and wipes the buffer # so just overwrite that attribute on this instance to not do that - def dummy_close(): - pass - bio = BytesIO() - bio.close = dummy_close - # windows py38 np18 doesn't have 'snappy' installed response_df.to_parquet( - "none", + "memory://fastparquet_user_agent.parquet", index=False, engine="fastparquet", compression=None, - open_with=lambda x, y: bio, ) - response_bytes = bio.getvalue() + with fsspec.open("memory://fastparquet_user_agent.parquet", "rb") as f: + response_bytes = f.read() self.write_back_bytes(response_bytes) From c0c3d34ca76fad87cddd231298eb077af814487c Mon Sep 17 00:00:00 2001 From: cdknox <> Date: Sun, 13 Dec 2020 16:11:31 -0500 Subject: [PATCH 40/41] TST: added importorskip for fsspec on FastParquet test GH36688 --- pandas/tests/io/test_user_agent.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/pandas/tests/io/test_user_agent.py b/pandas/tests/io/test_user_agent.py index c7dc0e76362f5..1e7c45f416bff 100644 --- a/pandas/tests/io/test_user_agent.py +++ b/pandas/tests/io/test_user_agent.py @@ -6,7 +6,6 @@ from io import BytesIO import threading -import fsspec import pytest import pandas as pd @@ -122,6 +121,9 @@ def do_GET(self): # however it automatically calls the close method and wipes the buffer # so just overwrite that attribute on this instance to not do that + # protected by an importorskip in the respective test + import fsspec + response_df.to_parquet( "memory://fastparquet_user_agent.parquet", index=False, @@ -220,6 +222,8 @@ def test_server_and_default_headers(responder, read_method, port, parquet_engine def test_server_and_custom_headers(responder, read_method, port, parquet_engine): if parquet_engine is not None: pytest.importorskip(parquet_engine) + if parquet_engine == "fastparquet": + pytest.importorskip("fsspec") custom_user_agent = "Super Cool One" df_true = pd.DataFrame({"header": [custom_user_agent]}) From 7025abbd682b17fdb51af838a9a77695526a32c8 Mon Sep 17 00:00:00 2001 From: cdknox <> Date: Sun, 13 Dec 2020 18:17:01 -0500 Subject: [PATCH 41/41] TST:added missing importorskip to fsspec in another test GH36688 --- pandas/tests/io/test_user_agent.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pandas/tests/io/test_user_agent.py b/pandas/tests/io/test_user_agent.py index 1e7c45f416bff..8894351597903 100644 --- a/pandas/tests/io/test_user_agent.py +++ b/pandas/tests/io/test_user_agent.py @@ -192,6 +192,8 @@ def do_GET(self): def test_server_and_default_headers(responder, read_method, port, parquet_engine): if parquet_engine is not None: pytest.importorskip(parquet_engine) + if parquet_engine == "fastparquet": + pytest.importorskip("fsspec") server = http.server.HTTPServer(("localhost", port), responder) server_thread = threading.Thread(target=server.serve_forever)