diff --git a/asv_bench/benchmarks/io/sas.py b/asv_bench/benchmarks/io/sas.py index 369b79641dbc4..411e5b6099f76 100644 --- a/asv_bench/benchmarks/io/sas.py +++ b/asv_bench/benchmarks/io/sas.py @@ -1,30 +1,23 @@ -import os +from pathlib import Path from pandas import read_sas +ROOT = Path(__file__).parents[3] / "pandas" / "tests" / "io" / "sas" / "data" + class SAS: + def time_read_sas7bdat(self): + read_sas(ROOT / "test1.sas7bdat") - params = ["sas7bdat", "xport"] - param_names = ["format"] + def time_read_xpt(self): + read_sas(ROOT / "paxraw_d_short.xpt") - def setup(self, format): - # Read files that are located in 'pandas/tests/io/sas/data' - files = {"sas7bdat": "test1.sas7bdat", "xport": "paxraw_d_short.xpt"} - file = files[format] - paths = [ - os.path.dirname(__file__), - "..", - "..", - "..", - "pandas", - "tests", - "io", - "sas", - "data", - file, - ] - self.f = os.path.join(*paths) + def time_read_sas7bdat_2(self): + next(read_sas(ROOT / "0x00controlbyte.sas7bdat.bz2", chunksize=11000)) - def time_read_sas(self, format): - read_sas(self.f, format=format) + def time_read_sas7bdat_2_chunked(self): + for i, _ in enumerate( + read_sas(ROOT / "0x00controlbyte.sas7bdat.bz2", chunksize=1000) + ): + if i == 10: + break diff --git a/pandas/io/sas/sas.pyx b/pandas/io/sas/sas.pyx index 273f6f84a484d..8065859844b30 100644 --- a/pandas/io/sas/sas.pyx +++ b/pandas/io/sas/sas.pyx @@ -1,13 +1,58 @@ -# cython: profile=False -# cython: boundscheck=False, initializedcheck=False +# cython: language_level=3, initializedcheck=False +# cython: warn.undeclared=True, warn.maybe_uninitialized=True, warn.unused=True from cython cimport Py_ssize_t +from libc.stddef cimport size_t +from libc.stdint cimport ( + int64_t, + uint8_t, + uint16_t, +) +from libc.stdlib cimport ( + calloc, + free, +) + import numpy as np import pandas.io.sas.sas_constants as const -ctypedef signed long long int64_t -ctypedef unsigned char uint8_t -ctypedef unsigned short uint16_t + +cdef struct Buffer: + # Convenience wrapper for uint8_t data to allow fast and safe reads and writes. + # We use this as a replacement for np.array(..., dtype=np.uint8) because it's + # much slower to create NumPy arrays and we create Buffer instances many times + # when reading a SAS7BDAT file (roughly once per row that is being read). + uint8_t *data + size_t length + + +cdef inline uint8_t buf_get(Buffer buf, size_t offset) except? 255: + assert offset < buf.length, "Out of bounds read" + return buf.data[offset] + + +cdef inline bint buf_set(Buffer buf, size_t offset, uint8_t value) except 0: + assert offset < buf.length, "Out of bounds write" + buf.data[offset] = value + return True + + +cdef inline bytes buf_as_bytes(Buffer buf, size_t offset, size_t length): + assert offset + length <= buf.length, "Out of bounds read" + return buf.data[offset:offset+length] + + +cdef inline Buffer buf_new(size_t length) except *: + cdef uint8_t *data = calloc(length, sizeof(uint8_t)) + if data == NULL: + raise MemoryError(f"Failed to allocate {length} bytes") + return Buffer(data, length) + + +cdef inline buf_free(Buffer buf): + if buf.data != NULL: + free(buf.data) + cdef object np_nan = np.nan @@ -15,180 +60,170 @@ cdef object np_nan = np.nan # algorithm. It is partially documented here: # # https://cran.r-project.org/package=sas7bdat/vignettes/sas7bdat.pdf -cdef const uint8_t[:] rle_decompress(int result_length, const uint8_t[:] inbuff) except *: +cdef int rle_decompress(Buffer inbuff, Buffer outbuff) except? 0: cdef: uint8_t control_byte, x - uint8_t[:] result = np.zeros(result_length, np.uint8) int rpos = 0 int i, nbytes, end_of_first_byte - Py_ssize_t ipos = 0, length = len(inbuff) + size_t ipos = 0 - while ipos < length: - control_byte = inbuff[ipos] & 0xF0 - end_of_first_byte = (inbuff[ipos] & 0x0F) + while ipos < inbuff.length: + control_byte = buf_get(inbuff, ipos) & 0xF0 + end_of_first_byte = (buf_get(inbuff, ipos) & 0x0F) ipos += 1 if control_byte == 0x00: - nbytes = (inbuff[ipos]) + 64 + end_of_first_byte * 256 + nbytes = (buf_get(inbuff, ipos)) + 64 + end_of_first_byte * 256 ipos += 1 for _ in range(nbytes): - result[rpos] = inbuff[ipos] + buf_set(outbuff, rpos, buf_get(inbuff, ipos)) rpos += 1 ipos += 1 elif control_byte == 0x40: # not documented - nbytes = (inbuff[ipos] & 0xFF) + 18 + end_of_first_byte * 256 + nbytes = (buf_get(inbuff, ipos)) + 18 + end_of_first_byte * 256 ipos += 1 for _ in range(nbytes): - result[rpos] = inbuff[ipos] + buf_set(outbuff, rpos, buf_get(inbuff, ipos)) rpos += 1 ipos += 1 elif control_byte == 0x60: - nbytes = end_of_first_byte * 256 + (inbuff[ipos]) + 17 + nbytes = end_of_first_byte * 256 + (buf_get(inbuff, ipos)) + 17 ipos += 1 for _ in range(nbytes): - result[rpos] = 0x20 + buf_set(outbuff, rpos, 0x20) rpos += 1 elif control_byte == 0x70: - nbytes = end_of_first_byte * 256 + (inbuff[ipos]) + 17 + nbytes = end_of_first_byte * 256 + (buf_get(inbuff, ipos)) + 17 ipos += 1 for _ in range(nbytes): - result[rpos] = 0x00 + buf_set(outbuff, rpos, 0x00) rpos += 1 elif control_byte == 0x80: nbytes = end_of_first_byte + 1 for i in range(nbytes): - result[rpos] = inbuff[ipos + i] + buf_set(outbuff, rpos, buf_get(inbuff, ipos + i)) rpos += 1 ipos += nbytes elif control_byte == 0x90: nbytes = end_of_first_byte + 17 for i in range(nbytes): - result[rpos] = inbuff[ipos + i] + buf_set(outbuff, rpos, buf_get(inbuff, ipos + i)) rpos += 1 ipos += nbytes elif control_byte == 0xA0: nbytes = end_of_first_byte + 33 for i in range(nbytes): - result[rpos] = inbuff[ipos + i] + buf_set(outbuff, rpos, buf_get(inbuff, ipos + i)) rpos += 1 ipos += nbytes elif control_byte == 0xB0: nbytes = end_of_first_byte + 49 for i in range(nbytes): - result[rpos] = inbuff[ipos + i] + buf_set(outbuff, rpos, buf_get(inbuff, ipos + i)) rpos += 1 ipos += nbytes elif control_byte == 0xC0: nbytes = end_of_first_byte + 3 - x = inbuff[ipos] + x = buf_get(inbuff, ipos) ipos += 1 for _ in range(nbytes): - result[rpos] = x + buf_set(outbuff, rpos, x) rpos += 1 elif control_byte == 0xD0: nbytes = end_of_first_byte + 2 for _ in range(nbytes): - result[rpos] = 0x40 + buf_set(outbuff, rpos, 0x40) rpos += 1 elif control_byte == 0xE0: nbytes = end_of_first_byte + 2 for _ in range(nbytes): - result[rpos] = 0x20 + buf_set(outbuff, rpos, 0x20) rpos += 1 elif control_byte == 0xF0: nbytes = end_of_first_byte + 2 for _ in range(nbytes): - result[rpos] = 0x00 + buf_set(outbuff, rpos, 0x00) rpos += 1 else: raise ValueError(f"unknown control byte: {control_byte}") - # In py37 cython/clang sees `len(outbuff)` as size_t and not Py_ssize_t - if len(result) != result_length: - raise ValueError(f"RLE: {len(result)} != {result_length}") - - return np.asarray(result) + return rpos # rdc_decompress decompresses data using the Ross Data Compression algorithm: # # http://collaboration.cmc.ec.gc.ca/science/rpn/biblio/ddj/Website/articles/CUJ/1992/9210/ross/ross.htm -cdef const uint8_t[:] rdc_decompress(int result_length, const uint8_t[:] inbuff) except *: +cdef int rdc_decompress(Buffer inbuff, Buffer outbuff) except? 0: cdef: uint8_t cmd uint16_t ctrl_bits = 0, ctrl_mask = 0, ofs, cnt - int rpos = 0, k - uint8_t[:] outbuff = np.zeros(result_length, dtype=np.uint8) - Py_ssize_t ipos = 0, length = len(inbuff) + int rpos = 0, k, ii + size_t ipos = 0 ii = -1 - while ipos < length: + while ipos < inbuff.length: ii += 1 ctrl_mask = ctrl_mask >> 1 if ctrl_mask == 0: - ctrl_bits = ((inbuff[ipos] << 8) + - inbuff[ipos + 1]) + ctrl_bits = ((buf_get(inbuff, ipos) << 8) + + buf_get(inbuff, ipos + 1)) ipos += 2 ctrl_mask = 0x8000 if ctrl_bits & ctrl_mask == 0: - outbuff[rpos] = inbuff[ipos] + buf_set(outbuff, rpos, buf_get(inbuff, ipos)) ipos += 1 rpos += 1 continue - cmd = (inbuff[ipos] >> 4) & 0x0F - cnt = (inbuff[ipos] & 0x0F) + cmd = (buf_get(inbuff, ipos) >> 4) & 0x0F + cnt = (buf_get(inbuff, ipos) & 0x0F) ipos += 1 # short RLE if cmd == 0: cnt += 3 for k in range(cnt): - outbuff[rpos + k] = inbuff[ipos] + buf_set(outbuff, rpos + k, buf_get(inbuff, ipos)) rpos += cnt ipos += 1 # long RLE elif cmd == 1: - cnt += inbuff[ipos] << 4 + cnt += buf_get(inbuff, ipos) << 4 cnt += 19 ipos += 1 for k in range(cnt): - outbuff[rpos + k] = inbuff[ipos] + buf_set(outbuff, rpos + k, buf_get(inbuff, ipos)) rpos += cnt ipos += 1 # long pattern elif cmd == 2: ofs = cnt + 3 - ofs += inbuff[ipos] << 4 + ofs += buf_get(inbuff, ipos) << 4 ipos += 1 - cnt = inbuff[ipos] + cnt = buf_get(inbuff, ipos) ipos += 1 cnt += 16 for k in range(cnt): - outbuff[rpos + k] = outbuff[rpos - ofs + k] + buf_set(outbuff, rpos + k, buf_get(outbuff, rpos - ofs + k)) rpos += cnt # short pattern else: ofs = cnt + 3 - ofs += inbuff[ipos] << 4 + ofs += buf_get(inbuff, ipos) << 4 ipos += 1 for k in range(cmd): - outbuff[rpos + k] = outbuff[rpos - ofs + k] + buf_set(outbuff, rpos + k, buf_get(outbuff, rpos - ofs + k)) rpos += cmd - # In py37 cython/clang sees `len(outbuff)` as size_t and not Py_ssize_t - if len(outbuff) != result_length: - raise ValueError(f"RDC: {len(outbuff)} != {result_length}\n") - - return np.asarray(outbuff) + return rpos cdef enum ColumnTypes: @@ -215,7 +250,8 @@ cdef class Parser: int64_t[:] column_types uint8_t[:, :] byte_chunk object[:, :] string_chunk - char *cached_page + uint8_t *cached_page + int cached_page_len int current_row_on_page_index int current_page_block_count int current_page_data_subheader_pointers_len @@ -229,7 +265,7 @@ cdef class Parser: int subheader_pointer_length int current_page_type bint is_little_endian - const uint8_t[:] (*decompress)(int result_length, const uint8_t[:] inbuff) except * + int (*decompress)(Buffer, Buffer) except? 0 object parser def __init__(self, object parser): @@ -306,7 +342,8 @@ cdef class Parser: cdef update_next_page(self): # update data for the current page - self.cached_page = self.parser._cached_page + self.cached_page = self.parser._cached_page + self.cached_page_len = len(self.parser._cached_page) self.current_row_on_page_index = 0 self.current_page_type = self.parser._current_page_type self.current_page_block_count = self.parser._current_page_block_count @@ -387,20 +424,28 @@ cdef class Parser: cdef: Py_ssize_t j - int s, k, m, jb, js, current_row + int s, k, m, jb, js, current_row, rpos int64_t lngt, start, ct - const uint8_t[:] source + Buffer source, decompressed_source int64_t[:] column_types int64_t[:] lengths int64_t[:] offsets uint8_t[:, :] byte_chunk object[:, :] string_chunk - - source = np.frombuffer( - self.cached_page[offset:offset + length], dtype=np.uint8) - - if self.decompress != NULL and (length < self.row_length): - source = self.decompress(self.row_length, source) + bint compressed + + assert offset + length <= self.cached_page_len, "Out of bounds read" + source = Buffer(&self.cached_page[offset], length) + + compressed = self.decompress != NULL and length < self.row_length + if compressed: + decompressed_source = buf_new(self.row_length) + rpos = self.decompress(source, decompressed_source) + if rpos != self.row_length: + raise ValueError( + f"Expected decompressed line of length {self.row_length} bytes but decompressed {rpos} bytes" + ) + source = decompressed_source current_row = self.current_row_in_chunk_index column_types = self.column_types @@ -424,20 +469,23 @@ cdef class Parser: else: m = s for k in range(lngt): - byte_chunk[jb, m + k] = source[start + k] + byte_chunk[jb, m + k] = buf_get(source, start + k) jb += 1 elif column_types[j] == column_type_string: # string # Skip trailing whitespace. This is equivalent to calling # .rstrip(b"\x00 ") but without Python call overhead. - while lngt > 0 and source[start+lngt-1] in b"\x00 ": + while lngt > 0 and buf_get(source, start + lngt - 1) in b"\x00 ": lngt -= 1 if lngt == 0 and self.blank_missing: string_chunk[js, current_row] = np_nan else: - string_chunk[js, current_row] = (&source[start])[:lngt] + string_chunk[js, current_row] = buf_as_bytes(source, start, lngt) js += 1 self.current_row_on_page_index += 1 self.current_row_in_chunk_index += 1 self.current_row_in_file_index += 1 + + if compressed: + buf_free(decompressed_source) diff --git a/pandas/tests/io/sas/test_sas7bdat.py b/pandas/tests/io/sas/test_sas7bdat.py index 2b7ecbcdf9f80..ce4d960e3a9b0 100644 --- a/pandas/tests/io/sas/test_sas7bdat.py +++ b/pandas/tests/io/sas/test_sas7bdat.py @@ -350,37 +350,23 @@ def test_meta2_page(datapath): assert len(df) == 1000 -@pytest.mark.parametrize("test_file", ["test2.sas7bdat", "test3.sas7bdat"]) -def test_exception_propagation_rdc_rle_decompress(datapath, monkeypatch, test_file): - """Errors in RLE/RDC decompression should propagate the same error.""" - orig_np_zeros = np.zeros - - def _patched_zeros(size, dtype): - if isinstance(size, int): - # np.zeros() call in {rdc,rle}_decompress - raise Exception("Test exception") - else: - # Other calls to np.zeros - return orig_np_zeros(size, dtype) - - monkeypatch.setattr(np, "zeros", _patched_zeros) - - with pytest.raises(Exception, match="^Test exception$"): - pd.read_sas(datapath("io", "sas", "data", test_file)) - - -def test_exception_propagation_rle_decompress(tmp_path, datapath): - """Illegal control byte in RLE decompressor should raise the correct ValueError.""" - with open(datapath("io", "sas", "data", "test2.sas7bdat"), "rb") as f: - data = bytearray(f.read()) - invalid_control_byte = 0x10 - page_offset = 0x10000 - control_byte_pos = 55229 - data[page_offset + control_byte_pos] = invalid_control_byte - tmp_file = tmp_path / "test2.sas7bdat" - tmp_file.write_bytes(data) - with pytest.raises(ValueError, match="unknown control byte"): - pd.read_sas(tmp_file) +@pytest.mark.parametrize( + "test_file, override_offset, override_value, expected_msg", + [ + ("test2.sas7bdat", 0x10000 + 55229, 0x80 | 0x0F, "Out of bounds"), + ("test2.sas7bdat", 0x10000 + 55229, 0x10, "unknown control byte"), + ("test3.sas7bdat", 118170, 184, "Out of bounds"), + ], +) +def test_rle_rdc_exceptions( + datapath, test_file, override_offset, override_value, expected_msg +): + """Errors in RLE/RDC decompression should propagate.""" + with open(datapath("io", "sas", "data", test_file), "rb") as fd: + data = bytearray(fd.read()) + data[override_offset] = override_value + with pytest.raises(Exception, match=expected_msg): + pd.read_sas(io.BytesIO(data), format="sas7bdat") def test_0x40_control_byte(datapath):