Skip to content

PERF: some more perf/clean in saslib.pyx #12961

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion doc/source/whatsnew/v0.18.1.txt
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ Deprecations
Performance Improvements
~~~~~~~~~~~~~~~~~~~~~~~~

- Improved speed of SAS reader (:issue:`12656`)
- Improved speed of SAS reader (:issue:`12656`, :issue`12961`)
- Performance improvements in ``.groupby(..).cumcount()`` (:issue:`11039`)


Expand Down
110 changes: 73 additions & 37 deletions pandas/io/sas/saslib.pyx
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
# cython: profile=False
# cython: boundscheck=False, initializedcheck=False

import numpy as np
cimport numpy as np
from numpy cimport uint8_t, uint16_t, int8_t, int64_t
Expand All @@ -10,19 +13,19 @@ import sas_constants as const
cdef np.ndarray[uint8_t, ndim=1] rle_decompress(int result_length, np.ndarray[uint8_t, ndim=1] inbuff):

cdef:
uint8_t control_byte, x, end_of_first_byte
uint8_t control_byte, x
uint8_t [:] result = np.zeros(result_length, np.uint8)
int rpos = 0, ipos = 0, i, nbytes, length = len(inbuff)
int rpos = 0, ipos = 0, i, nbytes, end_of_first_byte, length = len(inbuff)

while ipos < length:
control_byte = inbuff[ipos] & 0xF0
end_of_first_byte = int(inbuff[ipos] & 0x0F)
end_of_first_byte = <int>(inbuff[ipos] & 0x0F)
ipos += 1

if control_byte == 0x00:
if end_of_first_byte != 0:
print("Unexpected non-zero end_of_first_byte")
nbytes = int(inbuff[ipos]) + 64
raise ValueError("Unexpected non-zero end_of_first_byte")
nbytes = <int>(inbuff[ipos]) + 64
ipos += 1
for i in range(nbytes):
result[rpos] = inbuff[ipos]
Expand All @@ -31,20 +34,20 @@ cdef np.ndarray[uint8_t, ndim=1] rle_decompress(int result_length, np.ndarray[ui
elif control_byte == 0x40:
# not documented
nbytes = end_of_first_byte * 16
nbytes += int(inbuff[ipos])
nbytes += <int>(inbuff[ipos])
ipos += 1
for i in range(nbytes):
result[rpos] = inbuff[ipos]
rpos += 1
ipos += 1
elif control_byte == 0x60:
nbytes = end_of_first_byte*256 + int(inbuff[ipos]) + 17
nbytes = end_of_first_byte*256 + <int>(inbuff[ipos]) + 17
ipos += 1
for i in range(nbytes):
result[rpos] = 0x20
rpos += 1
elif control_byte == 0x70:
nbytes = end_of_first_byte*256 + int(inbuff[ipos]) + 17
nbytes = end_of_first_byte*256 + <int>(inbuff[ipos]) + 17
ipos += 1
for i in range(nbytes):
result[rpos] = 0x00
Expand Down Expand Up @@ -99,7 +102,7 @@ cdef np.ndarray[uint8_t, ndim=1] rle_decompress(int result_length, np.ndarray[ui
raise ValueError("unknown control byte: %v", control_byte)

if len(result) != result_length:
print("RLE: %v != %v\n", (len(result), result_length))
raise ValueError("RLE: %v != %v", (len(result), result_length))

return np.asarray(result)

Expand Down Expand Up @@ -162,7 +165,7 @@ cdef np.ndarray[uint8_t, ndim=1] rdc_decompress(int result_length, np.ndarray[ui
ipos += 1
cnt += 16
for k in range(cnt):
outbuff[rpos + k] = outbuff[rpos - int(ofs) + k]
outbuff[rpos + k] = outbuff[rpos - <int>ofs + k]
rpos += cnt

# short pattern
Expand All @@ -171,7 +174,7 @@ cdef np.ndarray[uint8_t, ndim=1] rdc_decompress(int result_length, np.ndarray[ui
ofs += <uint16_t>inbuff[ipos] << 4
ipos += 1
for k in range(cmd):
outbuff[rpos + k] = outbuff[rpos - int(ofs) + k]
outbuff[rpos + k] = outbuff[rpos - <int>ofs + k]
rpos += cmd

else:
Expand All @@ -182,6 +185,17 @@ cdef np.ndarray[uint8_t, ndim=1] rdc_decompress(int result_length, np.ndarray[ui

return np.asarray(outbuff)

cdef enum ColumnTypes:
column_type_decimal = 1
column_type_string = 2


# type the page_data types
cdef int page_meta_type = const.page_meta_type
cdef int page_mix_types_0 = const.page_mix_types[0]
cdef int page_mix_types_1 = const.page_mix_types[1]
cdef int page_data_type = const.page_data_type
cdef int subheader_pointers_offset = const.subheader_pointers_offset

cdef class Parser(object):

Expand All @@ -194,11 +208,16 @@ cdef class Parser(object):
object[:, :] string_chunk
char *cached_page
int current_row_on_page_index
int current_page_block_count
int current_page_data_subheader_pointers_len
int current_page_subheaders_count
int current_row_in_chunk_index
int current_row_in_file_index
int header_length
int row_length
int bit_offset
int subheader_pointer_length
int current_page_type
bint is_little_endian
np.ndarray[uint8_t, ndim=1] (*decompress)(int result_length, np.ndarray[uint8_t, ndim=1] inbuff)
object parser
Expand All @@ -208,30 +227,30 @@ cdef class Parser(object):
int j
char[:] column_types

self.current_row_on_page_index = parser._current_row_on_page_index
self.current_row_in_chunk_index = parser._current_row_in_chunk_index
self.current_row_in_file_index = parser._current_row_in_file_index
self.parser = parser
self.header_length = self.parser.header_length
self.column_count = parser.column_count
self.lengths = parser._column_data_lengths
self.offsets = parser._column_data_offsets
self.byte_chunk = parser._byte_chunk
self.string_chunk = parser._string_chunk
self.row_length = parser.row_length
self.cached_page = <char *>parser._cached_page
self.bit_offset = self.parser._page_bit_offset
self.subheader_pointer_length = self.parser._subheader_pointer_length
self.is_little_endian = parser.byte_order == "<"
self.column_types = np.empty(self.column_count, dtype='int64')

# page indicators
self.update_next_page()

column_types = parser.column_types

# map column types
for j in range(self.column_count):
if column_types[j] == b'd':
self.column_types[j] = 1
self.column_types[j] = column_type_decimal
elif column_types[j] == b's':
self.column_types[j] = 2
self.column_types[j] = column_type_string
else:
raise ValueError("unknown column type: %s" % self.parser.columns[j].ctype)

Expand All @@ -243,6 +262,11 @@ cdef class Parser(object):
else:
self.decompress = NULL

# update to current state of the parser
self.current_row_in_chunk_index = parser._current_row_in_chunk_index
self.current_row_in_file_index = parser._current_row_in_file_index
self.current_row_on_page_index = parser._current_row_on_page_index

def read(self, int nrows):
cdef:
bint done
Expand All @@ -265,31 +289,39 @@ cdef class Parser(object):
if done:
self.cached_page = NULL
else:
self.cached_page = <char *>self.parser._cached_page
self.current_row_on_page_index = 0
self.update_next_page()
return done

cdef update_next_page(self):
# update data for the current page

self.cached_page = <char *>self.parser._cached_page
self.current_row_on_page_index = 0
self.current_page_type = self.parser._current_page_type
self.current_page_block_count = self.parser._current_page_block_count
self.current_page_data_subheader_pointers_len = len(self.parser._current_page_data_subheader_pointers)
self.current_page_subheaders_count = self.parser._current_page_subheaders_count

cdef bint readline(self):

cdef:
int offset, bit_offset, align_correction, subheader_pointer_length
int offset, bit_offset, align_correction, subheader_pointer_length, mn
bint done, flag

bit_offset = self.bit_offset
subheader_pointer_length = self.subheader_pointer_length

# If there is no page, go to the end of the header and read a page.
if self.cached_page == NULL:
self.parser._path_or_buf.seek(self.parser.header_length)
self.parser._path_or_buf.seek(self.header_length)
done = self.read_next_page()
if done:
return True

# Loop until a data row is read
while True:
if self.parser._current_page_type == const.page_meta_type:
flag = (self.current_row_on_page_index >=
len(self.parser._current_page_data_subheader_pointers))
if self.current_page_type == page_meta_type:
flag = self.current_row_on_page_index >= self.current_page_data_subheader_pointers_len
if flag:
done = self.read_next_page()
if done:
Expand All @@ -301,14 +333,14 @@ cdef class Parser(object):
self.process_byte_array_with_data(current_subheader_pointer.offset,
current_subheader_pointer.length)
return False
elif self.parser._current_page_type in const.page_mix_types:
align_correction = (bit_offset + const.subheader_pointers_offset +
self.parser._current_page_subheaders_count *
elif self.current_page_type == page_mix_types_0 or self.current_page_type == page_mix_types_1:
align_correction = (bit_offset + subheader_pointers_offset +
self.current_page_subheaders_count *
subheader_pointer_length)
align_correction = align_correction % 8
offset = bit_offset + align_correction
offset += const.subheader_pointers_offset
offset += (self.parser._current_page_subheaders_count *
offset += subheader_pointers_offset
offset += (self.current_page_subheaders_count *
subheader_pointer_length)
offset += self.current_row_on_page_index * self.row_length
self.process_byte_array_with_data(offset,
Expand All @@ -319,27 +351,29 @@ cdef class Parser(object):
if done:
return True
return False
elif self.parser._current_page_type == const.page_data_type:
elif self.current_page_type == page_data_type:
self.process_byte_array_with_data(bit_offset +
const.subheader_pointers_offset +
subheader_pointers_offset +
self.current_row_on_page_index *
self.row_length,
self.row_length)
flag = (self.current_row_on_page_index ==
self.parser._current_page_block_count)
self.current_page_block_count)
if flag:
done = self.read_next_page()
if done:
return True
return False
else:
raise ValueError("unknown page type: %s",
self.parser._current_page_type)
self.current_page_type)

cdef void process_byte_array_with_data(self, int offset, int length):

cdef:
int s, j, k, m, jb, js, lngt, start
Py_ssize_t j
int s, k, m, jb, js, current_row
int64_t lngt, start, ct
np.ndarray[uint8_t, ndim=1] source
int64_t[:] column_types
int64_t[:] lengths
Expand All @@ -352,6 +386,7 @@ cdef class Parser(object):
if self.decompress != NULL and (length < self.row_length):
source = self.decompress(self.row_length, source)

current_row = self.current_row_in_chunk_index
column_types = self.column_types
lengths = self.lengths
offsets = self.offsets
Expand All @@ -365,7 +400,8 @@ cdef class Parser(object):
if lngt == 0:
break
start = offsets[j]
if column_types[j] == 1:
ct = column_types[j]
if ct == column_type_decimal:
# decimal
if self.is_little_endian:
m = s + 8 - lngt
Expand All @@ -374,9 +410,9 @@ cdef class Parser(object):
for k in range(lngt):
byte_chunk[jb, m + k] = source[start + k]
jb += 1
elif column_types[j] == 2:
elif column_types[j] == column_type_string:
# string
string_chunk[js, self.current_row_in_chunk_index] = source[start:(start+lngt)].tostring().rstrip()
string_chunk[js, current_row] = source[start:(start+lngt)].tostring().rstrip()
js += 1

self.current_row_on_page_index += 1
Expand Down