From 505f6a6d14d12e5e8df0bd104b5187ce98d67f9b Mon Sep 17 00:00:00 2001 From: Jean-Mathieu Deschenes Date: Mon, 7 Dec 2015 13:35:59 -0500 Subject: [PATCH] BUG: Fixed an issue with thread safety when calling read_csv with a StringIO object., #11786 The issue was caused by a misplaced PyGilSate_Ensure() --- doc/source/whatsnew/v0.18.0.txt | 3 +- pandas/io/tests/test_parsers.py | 83 +++++++++++++++++++++++++++++++++ pandas/src/parser/io.c | 2 +- 3 files changed, 85 insertions(+), 3 deletions(-) diff --git a/doc/source/whatsnew/v0.18.0.txt b/doc/source/whatsnew/v0.18.0.txt index 58cc0fd647511..66fa5e7e381ce 100644 --- a/doc/source/whatsnew/v0.18.0.txt +++ b/doc/source/whatsnew/v0.18.0.txt @@ -480,8 +480,7 @@ Bug Fixes - Bug in value label reading for ``StataReader`` when reading incrementally (:issue:`12014`) - Bug in vectorized ``DateOffset`` when ``n`` parameter is ``0`` (:issue:`11370`) - Compat for numpy 1.11 w.r.t. ``NaT`` comparison changes (:issue:`12049`) - - +- Bug in ``read_csv`` when reading from a ``StringIO`` in threads (:issue:`11790`) diff --git a/pandas/io/tests/test_parsers.py b/pandas/io/tests/test_parsers.py index e36fcdf34a707..063c300147d51 100755 --- a/pandas/io/tests/test_parsers.py +++ b/pandas/io/tests/test_parsers.py @@ -9,6 +9,8 @@ import nose import platform +from multiprocessing.pool import ThreadPool + from numpy import nan import numpy as np from pandas.io.common import DtypeWarning @@ -4128,6 +4130,87 @@ def test_bool_header_arg(self): with tm.assertRaises(TypeError): pd.read_fwf(StringIO(data), header=arg) + def test_multithread_stringio_read_csv(self): + # GH 11786 + max_row_range = 10000 + num_files = 100 + + bytes_to_df = [ + '\n'.join( + ['%d,%d,%d' % (i, i, i) for i in range(max_row_range)] + ).encode() for j in range(num_files)] + files = [BytesIO(b) for b in bytes_to_df] + + # Read all files in many threads + pool = ThreadPool(8) + results = pool.map(pd.read_csv, files) + first_result = results[0] + + for result in results: + tm.assert_frame_equal(first_result, result) + + def construct_dataframe(self, num_rows): + + df = DataFrame(np.random.rand(num_rows, 5), columns=list('abcde')) + df['foo'] = 'foo' + df['bar'] = 'bar' + df['baz'] = 'baz' + df['date'] = pd.date_range('20000101 09:00:00', + periods=num_rows, + freq='s') + df['int'] = np.arange(num_rows) + return df + + def generate_multithread_dataframe(self, path, num_rows, num_tasks): + + def reader(arg): + start, nrows = arg + + if not start: + return pd.read_csv(path, index_col=0, header=0, nrows=nrows, + parse_dates=['date']) + + return pd.read_csv(path, + index_col=0, + header=None, + skiprows=int(start) + 1, + nrows=nrows, + parse_dates=[9]) + + tasks = [ + (num_rows * i / num_tasks, + num_rows / num_tasks) for i in range(num_tasks) + ] + + pool = ThreadPool(processes=num_tasks) + + results = pool.map(reader, tasks) + + header = results[0].columns + for r in results[1:]: + r.columns = header + + final_dataframe = pd.concat(results) + + return final_dataframe + + def test_multithread_path_multipart_read_csv(self): + # GH 11786 + num_tasks = 4 + file_name = '__threadpool_reader__.csv' + num_rows = 100000 + + df = self.construct_dataframe(num_rows) + + with tm.ensure_clean(file_name) as path: + df.to_csv(path) + + final_dataframe = self.generate_multithread_dataframe(path, + num_rows, + num_tasks) + tm.assert_frame_equal(df, final_dataframe) + + class TestMiscellaneous(tm.TestCase): # for tests that don't fit into any of the other classes, e.g. those that diff --git a/pandas/src/parser/io.c b/pandas/src/parser/io.c index e6d54bd59d6fd..0297d1ba49527 100644 --- a/pandas/src/parser/io.c +++ b/pandas/src/parser/io.c @@ -117,12 +117,12 @@ void* buffer_rd_bytes(void *source, size_t nbytes, size_t length; rd_source *src = RDS(source); + state = PyGILState_Ensure(); /* delete old object */ Py_XDECREF(src->buffer); args = Py_BuildValue("(i)", nbytes); - state = PyGILState_Ensure(); func = PyObject_GetAttrString(src->obj, "read"); /* printf("%s\n", PyBytes_AsString(PyObject_Repr(func))); */