|
9 | 9 | import nose
|
10 | 10 | import platform
|
11 | 11 |
|
| 12 | +from multiprocessing.pool import ThreadPool |
| 13 | + |
12 | 14 | from numpy import nan
|
13 | 15 | import numpy as np
|
14 | 16 | from pandas.io.common import DtypeWarning
|
@@ -4128,6 +4130,87 @@ def test_bool_header_arg(self):
|
4128 | 4130 | with tm.assertRaises(TypeError):
|
4129 | 4131 | pd.read_fwf(StringIO(data), header=arg)
|
4130 | 4132 |
|
| 4133 | + def test_multithread_stringio_read_csv(self): |
| 4134 | + # GH 11786 |
| 4135 | + max_row_range = 10000 |
| 4136 | + num_files = 100 |
| 4137 | + |
| 4138 | + bytes_to_df = [ |
| 4139 | + '\n'.join( |
| 4140 | + ['%d,%d,%d' % (i, i, i) for i in range(max_row_range)] |
| 4141 | + ).encode() for j in range(num_files)] |
| 4142 | + files = [BytesIO(b) for b in bytes_to_df] |
| 4143 | + |
| 4144 | + # Read all files in many threads |
| 4145 | + pool = ThreadPool(8) |
| 4146 | + results = pool.map(pd.read_csv, files) |
| 4147 | + first_result = results[0] |
| 4148 | + |
| 4149 | + for result in results: |
| 4150 | + tm.assert_frame_equal(first_result, result) |
| 4151 | + |
| 4152 | + def construct_dataframe(self, num_rows): |
| 4153 | + |
| 4154 | + df = DataFrame(np.random.rand(num_rows, 5), columns=list('abcde')) |
| 4155 | + df['foo'] = 'foo' |
| 4156 | + df['bar'] = 'bar' |
| 4157 | + df['baz'] = 'baz' |
| 4158 | + df['date'] = pd.date_range('20000101 09:00:00', |
| 4159 | + periods=num_rows, |
| 4160 | + freq='s') |
| 4161 | + df['int'] = np.arange(num_rows) |
| 4162 | + return df |
| 4163 | + |
| 4164 | + def generate_multithread_dataframe(self, path, num_rows, num_tasks): |
| 4165 | + |
| 4166 | + def reader(arg): |
| 4167 | + start, nrows = arg |
| 4168 | + |
| 4169 | + if not start: |
| 4170 | + return pd.read_csv(path, index_col=0, header=0, nrows=nrows, |
| 4171 | + parse_dates=['date']) |
| 4172 | + |
| 4173 | + return pd.read_csv(path, |
| 4174 | + index_col=0, |
| 4175 | + header=None, |
| 4176 | + skiprows=int(start) + 1, |
| 4177 | + nrows=nrows, |
| 4178 | + parse_dates=[9]) |
| 4179 | + |
| 4180 | + tasks = [ |
| 4181 | + (num_rows * i / num_tasks, |
| 4182 | + num_rows / num_tasks) for i in range(num_tasks) |
| 4183 | + ] |
| 4184 | + |
| 4185 | + pool = ThreadPool(processes=num_tasks) |
| 4186 | + |
| 4187 | + results = pool.map(reader, tasks) |
| 4188 | + |
| 4189 | + header = results[0].columns |
| 4190 | + for r in results[1:]: |
| 4191 | + r.columns = header |
| 4192 | + |
| 4193 | + final_dataframe = pd.concat(results) |
| 4194 | + |
| 4195 | + return final_dataframe |
| 4196 | + |
| 4197 | + def test_multithread_path_multipart_read_csv(self): |
| 4198 | + # GH 11786 |
| 4199 | + num_tasks = 4 |
| 4200 | + file_name = '__threadpool_reader__.csv' |
| 4201 | + num_rows = 100000 |
| 4202 | + |
| 4203 | + df = self.construct_dataframe(num_rows) |
| 4204 | + |
| 4205 | + with tm.ensure_clean(file_name) as path: |
| 4206 | + df.to_csv(path) |
| 4207 | + |
| 4208 | + final_dataframe = self.generate_multithread_dataframe(path, |
| 4209 | + num_rows, |
| 4210 | + num_tasks) |
| 4211 | + tm.assert_frame_equal(df, final_dataframe) |
| 4212 | + |
| 4213 | + |
4131 | 4214 | class TestMiscellaneous(tm.TestCase):
|
4132 | 4215 |
|
4133 | 4216 | # for tests that don't fit into any of the other classes, e.g. those that
|
|
0 commit comments