diff --git a/doc/source/whatsnew/v0.19.0.txt b/doc/source/whatsnew/v0.19.0.txt index 06625e09d70a1..326e124c2f03e 100644 --- a/doc/source/whatsnew/v0.19.0.txt +++ b/doc/source/whatsnew/v0.19.0.txt @@ -673,6 +673,7 @@ Performance Improvements Bug Fixes ~~~~~~~~~ +- Bug in ``pd.read_csv()``, which may cause a segfault or corruption when iterating in large chunks over a stream/file under rare circumstances (:issue:`13703`) - Bug in ``io.json.json_normalize()``, where non-ascii keys raised an exception (:issue:`13213`) - Bug in ``SparseSeries`` with ``MultiIndex`` ``[]`` indexing may raise ``IndexError`` (:issue:`13144`) - Bug in ``SparseSeries`` with ``MultiIndex`` ``[]`` indexing result may have normal ``Index`` (:issue:`13144`) diff --git a/pandas/io/tests/parser/c_parser_only.py b/pandas/io/tests/parser/c_parser_only.py index b6048051edc4d..103c9fa2b7ce8 100644 --- a/pandas/io/tests/parser/c_parser_only.py +++ b/pandas/io/tests/parser/c_parser_only.py @@ -381,3 +381,73 @@ def test_empty_header_read(count): for count in range(1, 101): test_empty_header_read(count) + + def test_parse_trim_buffers(self): + # This test is part of a bugfix for issue #13703. It attmepts to + # to stress the system memory allocator, to cause it to move the + # stream buffer and either let the OS reclaim the region, or let + # other memory requests of parser otherwise modify the contents + # of memory space, where it was formely located. + # This test is designed to cause a `segfault` with unpatched + # `tokenizer.c`. Sometimes the test fails on `segfault`, other + # times it fails due to memory corruption, which causes the + # loaded DataFrame to differ from the expected one. + + # Generate a large mixed-type CSV file on-the-fly (one record is + # approx 1.5KiB). + record_ = \ + """9999-9,99:99,,,,ZZ,ZZ,,,ZZZ-ZZZZ,.Z-ZZZZ,-9.99,,,9.99,Z""" \ + """ZZZZ,,-99,9,ZZZ-ZZZZ,ZZ-ZZZZ,,9.99,ZZZ-ZZZZZ,ZZZ-ZZZZZ,""" \ + """ZZZ-ZZZZ,ZZZ-ZZZZ,ZZZ-ZZZZ,ZZZ-ZZZZ,ZZZ-ZZZZ,ZZZ-ZZZZ,9""" \ + """99,ZZZ-ZZZZ,,ZZ-ZZZZ,,,,,ZZZZ,ZZZ-ZZZZZ,ZZZ-ZZZZ,,,9,9,""" \ + """9,9,99,99,999,999,ZZZZZ,ZZZ-ZZZZZ,ZZZ-ZZZZ,9,ZZ-ZZZZ,9.""" \ + """99,ZZ-ZZZZ,ZZ-ZZZZ,,,,ZZZZ,,,ZZ,ZZ,,,,,,,,,,,,,9,,,999.""" \ + """99,999.99,,,ZZZZZ,,,Z9,,,,,,,ZZZ,ZZZ,,,,,,,,,,,ZZZZZ,ZZ""" \ + """ZZZ,ZZZ-ZZZZZZ,ZZZ-ZZZZZZ,ZZ-ZZZZ,ZZ-ZZZZ,ZZ-ZZZZ,ZZ-ZZ""" \ + """ZZ,,,999999,999999,ZZZ,ZZZ,,,ZZZ,ZZZ,999.99,999.99,,,,Z""" \ + """ZZ-ZZZ,ZZZ-ZZZ,-9.99,-9.99,9,9,,99,,9.99,9.99,9,9,9.99,""" \ + """9.99,,,,9.99,9.99,,99,,99,9.99,9.99,,,ZZZ,ZZZ,,999.99,,""" \ + """999.99,ZZZ,ZZZ-ZZZZ,ZZZ-ZZZZ,,,ZZZZZ,ZZZZZ,ZZZ,ZZZ,9,9,""" \ + """,,,,,ZZZ-ZZZZ,ZZZ999Z,,,999.99,,999.99,ZZZ-ZZZZ,,,9.999""" \ + """,9.999,9.999,9.999,-9.999,-9.999,-9.999,-9.999,9.999,9.""" \ + """999,9.999,9.999,9.999,9.999,9.999,9.999,99999,ZZZ-ZZZZ,""" \ + """,9.99,ZZZ,,,,,,,,ZZZ,,,,,9,,,,9,,,,,,,,,,ZZZ-ZZZZ,ZZZ-Z""" \ + """ZZZ,,ZZZZZ,ZZZZZ,ZZZZZ,ZZZZZ,,,9.99,,ZZ-ZZZZ,ZZ-ZZZZ,ZZ""" \ + """,999,,,,ZZ-ZZZZ,ZZZ,ZZZ,ZZZ-ZZZZ,ZZZ-ZZZZ,,,99.99,99.99""" \ + """,,,9.99,9.99,9.99,9.99,ZZZ-ZZZZ,,,ZZZ-ZZZZZ,,,,,-9.99,-""" \ + """9.99,-9.99,-9.99,,,,,,,,,ZZZ-ZZZZ,,9,9.99,9.99,99ZZ,,-9""" \ + """.99,-9.99,ZZZ-ZZZZ,,,,,,,ZZZ-ZZZZ,9.99,9.99,9999,,,,,,,""" \ + """,,,-9.9,Z/Z-ZZZZ,999.99,9.99,,999.99,ZZ-ZZZZ,ZZ-ZZZZ,9.""" \ + """99,9.99,9.99,9.99,9.99,9.99,,ZZZ-ZZZZZ,ZZZ-ZZZZZ,ZZZ-ZZ""" \ + """ZZZ,ZZZ-ZZZZZ,ZZZ-ZZZZZ,ZZZ,ZZZ,ZZZ,ZZZ,9.99,,,-9.99,ZZ""" \ + """-ZZZZ,-999.99,,-9999,,999.99,,,,999.99,99.99,,,ZZ-ZZZZZ""" \ + """ZZZ,ZZ-ZZZZ-ZZZZZZZ,,,,ZZ-ZZ-ZZZZZZZZ,ZZZZZZZZ,ZZZ-ZZZZ""" \ + """,9999,999.99,ZZZ-ZZZZ,-9.99,-9.99,ZZZ-ZZZZ,99:99:99,,99""" \ + """,99,,9.99,,-99.99,,,,,,9.99,ZZZ-ZZZZ,-9.99,-9.99,9.99,9""" \ + """.99,,ZZZ,,,,,,,ZZZ,ZZZ,,,,,""" + + # Set the number of lines so that a call to `parser_trim_buffers` + # is triggered: after a couple of full chunks are consumed a + # relatively small 'residual' chunk would cause reallocation + # within the parser. + chunksize, n_lines = 128, 2 * 128 + 15 + csv_data = "\n".join([record_] * n_lines) + "\n" + + # We will use StringIO to load the CSV from this text buffer. + # pd.read_csv() will iterate over the file in chunks and will + # finally read a residual chunk of really small size. + + # Generate the expected output: manually create the dataframe + # by splitting by comma and repeating the `n_lines` times. + row = tuple(val_ if val_ else float("nan") + for val_ in record_.split(",")) + expected = pd.DataFrame([row for _ in range(n_lines)], + dtype=object, columns=None, index=None) + + # Iterate over the CSV file in chunks of `chunksize` lines + chunks_ = self.read_csv(StringIO(csv_data), header=None, + dtype=object, chunksize=chunksize) + result = pd.concat(chunks_, axis=0, ignore_index=True) + + # Check for data corruption if there was no segfault + tm.assert_frame_equal(result, expected) diff --git a/pandas/src/parser/tokenizer.c b/pandas/src/parser/tokenizer.c index 6091c79e2b4fc..ac909f2c8bfdb 100644 --- a/pandas/src/parser/tokenizer.c +++ b/pandas/src/parser/tokenizer.c @@ -1221,20 +1221,7 @@ int parser_trim_buffers(parser_t *self) { size_t new_cap; void *newptr; - /* trim stream */ - new_cap = _next_pow2(self->stream_len) + 1; - TRACE(("parser_trim_buffers: new_cap = %zu, stream_cap = %zu, lines_cap = %zu\n", - new_cap, self->stream_cap, self->lines_cap)); - if (new_cap < self->stream_cap) { - TRACE(("parser_trim_buffers: new_cap < self->stream_cap, calling safe_realloc\n")); - newptr = safe_realloc((void*) self->stream, new_cap); - if (newptr == NULL) { - return PARSER_OUT_OF_MEMORY; - } else { - self->stream = newptr; - self->stream_cap = new_cap; - } - } + int i; /* trim words, word_starts */ new_cap = _next_pow2(self->words_len) + 1; @@ -1255,6 +1242,35 @@ int parser_trim_buffers(parser_t *self) { } } + /* trim stream */ + new_cap = _next_pow2(self->stream_len) + 1; + TRACE(("parser_trim_buffers: new_cap = %zu, stream_cap = %zu, lines_cap = %zu\n", + new_cap, self->stream_cap, self->lines_cap)); + if (new_cap < self->stream_cap) { + TRACE(("parser_trim_buffers: new_cap < self->stream_cap, calling safe_realloc\n")); + newptr = safe_realloc((void*) self->stream, new_cap); + if (newptr == NULL) { + return PARSER_OUT_OF_MEMORY; + } else { + // Update the pointers in the self->words array (char **) if `safe_realloc` + // moved the `self->stream` buffer. This block mirrors a similar block in + // `make_stream_space`. + if (self->stream != newptr) { + /* TRACE(("Moving word pointers\n")) */ + self->pword_start = newptr + self->word_start; + + for (i = 0; i < self->words_len; ++i) + { + self->words[i] = newptr + self->word_starts[i]; + } + } + + self->stream = newptr; + self->stream_cap = new_cap; + + } + } + /* trim line_start, line_fields */ new_cap = _next_pow2(self->lines) + 1; if (new_cap < self->lines_cap) {