Skip to content

Commit fe4731b

Browse files
committed
Integrate jreback's cython improvements
1 parent b7de358 commit fe4731b

File tree

2 files changed

+204
-144
lines changed

2 files changed

+204
-144
lines changed

pandas/io/sas/sas7bdat.py

+3-2
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import numpy as np
2121
import struct
2222
import pandas.io.sas.sas_constants as const
23-
from pandas.io.sas.saslib import do_read
23+
from pandas.io.sas.saslib import Parser
2424

2525

2626
class _subheader_pointer(object):
@@ -583,7 +583,8 @@ def read(self, nrows=None):
583583
self._byte_chunk = np.empty((nd, 8 * nrows), dtype=np.uint8)
584584

585585
self._current_row_in_chunk_index = 0
586-
do_read(self, nrows)
586+
p = Parser(self)
587+
p.read(nrows)
587588

588589
rslt = self._chunk_to_dataframe()
589590
if self.index is not None:

pandas/io/sas/saslib.pyx

+201-142
Original file line numberDiff line numberDiff line change
@@ -7,16 +7,12 @@ import sas_constants as const
77
# algorithm. It is partially documented here:
88
#
99
# https://cran.r-project.org/web/packages/sas7bdat/vignettes/sas7bdat.pdf
10-
cdef rle_decompress(int result_length, np.ndarray[uint8_t, ndim=1] inbuff):
10+
cdef np.ndarray[uint8_t, ndim=1] rle_decompress(int result_length, np.ndarray[uint8_t, ndim=1] inbuff):
1111

12-
cdef uint8_t control_byte
13-
cdef uint8_t [:] result = np.zeros(result_length, np.uint8)
14-
cdef int rpos = 0
15-
cdef int ipos = 0
16-
cdef int i
17-
cdef int nbytes
18-
cdef uint8_t x
19-
cdef length = len(inbuff)
12+
cdef:
13+
uint8_t control_byte, x, end_of_first_byte
14+
uint8_t [:] result = np.zeros(result_length, np.uint8)
15+
int rpos = 0, ipos = 0, i, nbytes, length = len(inbuff)
2016

2117
while ipos < length:
2218
control_byte = inbuff[ipos] & 0xF0
@@ -111,17 +107,13 @@ cdef rle_decompress(int result_length, np.ndarray[uint8_t, ndim=1] inbuff):
111107
# rdc_decompress decompresses data using the Ross Data Compression algorithm:
112108
#
113109
# http://collaboration.cmc.ec.gc.ca/science/rpn/biblio/ddj/Website/articles/CUJ/1992/9210/ross/ross.htm
114-
cdef rdc_decompress(int result_length, np.ndarray[uint8_t, ndim=1] inbuff):
115-
116-
cdef uint8_t cmd
117-
cdef uint16_t ctrl_bits
118-
cdef uint16_t ctrl_mask = 0
119-
cdef uint16_t ofs
120-
cdef uint16_t cnt
121-
cdef int ipos = 0
122-
cdef int rpos = 0
123-
cdef int k
124-
cdef uint8_t [:] outbuff = np.zeros(result_length, dtype=np.uint8)
110+
cdef np.ndarray[uint8_t, ndim=1] rdc_decompress(int result_length, np.ndarray[uint8_t, ndim=1] inbuff):
111+
112+
cdef:
113+
uint8_t cmd
114+
uint16_t ctrl_bits, ctrl_mask = 0, ofs, cnt
115+
int ipos = 0, rpos = 0, k
116+
uint8_t [:] outbuff = np.zeros(result_length, dtype=np.uint8)
125117

126118
ii = -1
127119

@@ -190,136 +182,203 @@ cdef rdc_decompress(int result_length, np.ndarray[uint8_t, ndim=1] inbuff):
190182

191183
return np.asarray(outbuff)
192184

193-
cdef decompress(object parser, int row_length, page):
194-
page = np.frombuffer(page, dtype=np.uint8)
195-
if parser.compression == const.rle_compression:
196-
return rle_decompress(row_length, page)
197-
elif parser.compression == const.rdc_compression:
198-
return rdc_decompress(row_length, page)
199-
else:
200-
raise ValueError("unknown SAS compression method: %s" %
201-
parser.compression)
202185

186+
cdef class Parser(object):
203187

204-
def do_read(object parser, int nrows):
205-
cdef int i
188+
cdef:
189+
int column_count
190+
long[:] lengths
191+
long[:] offsets
192+
long[:] column_types
193+
uint8_t[:, :] byte_chunk
194+
object[:, :] string_chunk
195+
char *cached_page
196+
int current_row_on_page_index
197+
int current_row_in_chunk_index
198+
int current_row_in_file_index
199+
int row_length
200+
int bit_offset
201+
int subheader_pointer_length
202+
bint is_little_endian
203+
np.ndarray[uint8_t, ndim=1] (*decompress)(int result_length, np.ndarray[uint8_t, ndim=1] inbuff)
204+
object parser
205+
206+
def __init__(self, object parser):
207+
cdef:
208+
int j
209+
char[:] column_types
210+
211+
self.current_row_on_page_index = parser._current_row_on_page_index
212+
self.current_row_in_chunk_index = parser._current_row_in_chunk_index
213+
self.current_row_in_file_index = parser._current_row_in_file_index
214+
self.parser = parser
215+
self.column_count = parser.column_count
216+
self.lengths = parser._column_data_lengths
217+
self.offsets = parser._column_data_offsets
218+
self.byte_chunk = parser._byte_chunk
219+
self.string_chunk = parser._string_chunk
220+
self.row_length = parser.row_length
221+
self.cached_page = <char *>parser._cached_page
222+
self.bit_offset = self.parser._page_bit_offset
223+
self.subheader_pointer_length = self.parser._subheader_pointer_length
224+
self.is_little_endian = parser.byte_order == "<"
225+
self.column_types = np.empty(self.column_count, dtype=long)
226+
227+
column_types = parser.column_types
228+
229+
# map column types
230+
for j in range(self.column_count):
231+
if column_types[j] == b'd':
232+
self.column_types[j] = 1
233+
elif column_types[j] == b's':
234+
self.column_types[j] = 2
235+
else:
236+
raise ValueError("unknown column type: %s" % self.parser.columns[j].ctype)
206237

207-
for i in range(nrows):
208-
done = readline(parser)
209-
if done:
210-
break
238+
# compression
239+
if parser.compression == const.rle_compression:
240+
self.decompress = rle_decompress
241+
elif parser.compression == const.rdc_compression:
242+
self.decompress = rdc_decompress
243+
else:
244+
self.decompress = NULL
211245

246+
def read(self, int nrows):
247+
cdef:
248+
bint done
249+
int i
212250

213-
cdef readline(object parser):
251+
for i in range(nrows):
252+
done = self.readline()
253+
if done:
254+
break
214255

215-
cdef:
216-
int offset, bit_offset, align_correction, subheader_pointer_length
256+
# update the parser
257+
self.parser._current_row_on_page_index = self.current_row_on_page_index
258+
self.parser._current_row_in_chunk_index = self.current_row_in_chunk_index
259+
self.parser._current_row_in_file_index = self.current_row_in_file_index
217260

218-
bit_offset = parser._page_bit_offset
219-
subheader_pointer_length = parser._subheader_pointer_length
261+
cdef bint read_next_page(self):
262+
cdef done
220263

221-
# If there is no page, go to the end of the header and read a page.
222-
if parser._cached_page is None:
223-
parser._path_or_buf.seek(parser.header_length)
224-
done = parser._read_next_page()
264+
done = self.parser._read_next_page()
225265
if done:
226-
return True
227-
228-
# Loop until a data row is read
229-
while True:
230-
if parser._current_page_type == const.page_meta_type:
231-
flag = (parser._current_row_on_page_index >=
232-
len(parser._current_page_data_subheader_pointers))
233-
if flag:
234-
done = parser._read_next_page()
235-
if done:
236-
return True
237-
parser._current_row_on_page_index = 0
238-
continue
239-
current_subheader_pointer = (
240-
parser._current_page_data_subheader_pointers[
241-
parser._current_row_on_page_index])
242-
process_byte_array_with_data(parser,
243-
current_subheader_pointer.offset,
244-
current_subheader_pointer.length)
245-
return False
246-
elif parser._current_page_type in const.page_mix_types:
247-
align_correction = (bit_offset + const.subheader_pointers_offset +
248-
parser._current_page_subheaders_count *
249-
subheader_pointer_length)
250-
align_correction = align_correction % 8
251-
offset = bit_offset + align_correction
252-
offset += const.subheader_pointers_offset
253-
offset += (parser._current_page_subheaders_count *
254-
subheader_pointer_length)
255-
offset += parser._current_row_on_page_index * parser.row_length
256-
process_byte_array_with_data(parser, offset, parser.row_length)
257-
mn = min(parser.row_count, parser._mix_page_row_count)
258-
if parser._current_row_on_page_index == mn:
259-
done = parser._read_next_page()
260-
if done:
261-
return True
262-
parser._current_row_on_page_index = 0
263-
return False
264-
elif parser._current_page_type == const.page_data_type:
265-
process_byte_array_with_data(parser,
266-
bit_offset +
267-
const.subheader_pointers_offset +
268-
parser._current_row_on_page_index *
269-
parser.row_length,
270-
parser.row_length)
271-
flag = (parser._current_row_on_page_index ==
272-
parser._current_page_block_count)
273-
if flag:
274-
done = parser._read_next_page()
275-
if done:
276-
return True
277-
parser._current_row_on_page_index = 0
278-
return False
266+
self.cached_page = NULL
279267
else:
280-
raise ValueError("unknown page type: %s",
281-
parser._current_page_type)
282-
283-
284-
cdef process_byte_array_with_data(object parser, int offset, int length):
285-
286-
cdef:
287-
int s, j, k, m, start, jb, js, lngt
288-
long[:] lengths = parser._column_data_lengths
289-
long[:] offsets = parser._column_data_offsets
290-
char[:] column_types = parser.column_types
291-
uint8_t[:, :] byte_chunk = parser._byte_chunk
292-
object[:, :] string_chunk = parser._string_chunk
293-
np.ndarray[uint8_t, ndim=1] source
294-
np.ndarray[uint8_t, ndim=1] raw_source = np.frombuffer(parser._cached_page[offset:offset+length], dtype=np.uint8)
295-
296-
if (parser.compression != "") and (length < parser.row_length):
297-
source = decompress(parser, parser.row_length, raw_source)
298-
else:
299-
source = raw_source
300-
301-
s = 8 * parser._current_row_in_chunk_index
302-
js = 0
303-
jb = 0
304-
for j in range(parser.column_count):
305-
lngt = lengths[j]
306-
if lngt == 0:
307-
break
308-
start = offsets[j]
309-
if column_types[j] == b'd':
310-
if parser.byte_order == "<":
311-
m = s + 8 - lngt
268+
self.cached_page = <char *>self.parser._cached_page
269+
self.current_row_on_page_index = 0
270+
return done
271+
272+
cdef bint readline(self):
273+
274+
cdef:
275+
int offset, bit_offset, align_correction, subheader_pointer_length
276+
bint done
277+
278+
bit_offset = self.bit_offset
279+
subheader_pointer_length = self.subheader_pointer_length
280+
281+
# If there is no page, go to the end of the header and read a page.
282+
if self.cached_page == NULL:
283+
self.parser._path_or_buf.seek(self.parser.header_length)
284+
done = self.read_next_page()
285+
if done:
286+
return True
287+
288+
# Loop until a data row is read
289+
while True:
290+
if self.parser._current_page_type == const.page_meta_type:
291+
flag = (self.current_row_on_page_index >=
292+
len(self.parser._current_page_data_subheader_pointers))
293+
if flag:
294+
done = self.read_next_page()
295+
if done:
296+
return True
297+
continue
298+
current_subheader_pointer = (
299+
self.parser._current_page_data_subheader_pointers[
300+
self.current_row_on_page_index])
301+
self.process_byte_array_with_data(current_subheader_pointer.offset,
302+
current_subheader_pointer.length)
303+
return False
304+
elif self.parser._current_page_type in const.page_mix_types:
305+
align_correction = (bit_offset + const.subheader_pointers_offset +
306+
self.parser._current_page_subheaders_count *
307+
subheader_pointer_length)
308+
align_correction = align_correction % 8
309+
offset = bit_offset + align_correction
310+
offset += const.subheader_pointers_offset
311+
offset += (self.parser._current_page_subheaders_count *
312+
subheader_pointer_length)
313+
offset += self.current_row_on_page_index * self.row_length
314+
self.process_byte_array_with_data(offset,
315+
self.row_length)
316+
mn = min(self.parser.row_count, self.parser._mix_page_row_count)
317+
if self.current_row_on_page_index == mn:
318+
done = self.read_next_page()
319+
if done:
320+
return True
321+
return False
322+
elif self.parser._current_page_type == const.page_data_type:
323+
self.process_byte_array_with_data(bit_offset +
324+
const.subheader_pointers_offset +
325+
self.current_row_on_page_index *
326+
self.row_length,
327+
self.row_length)
328+
flag = (self.current_row_on_page_index ==
329+
self.parser._current_page_block_count)
330+
if flag:
331+
done = self.read_next_page()
332+
if done:
333+
return True
334+
return False
312335
else:
313-
m = s
314-
for k in range(lngt):
315-
byte_chunk[jb, m + k] = source[start + k]
316-
jb += 1
317-
elif column_types[j] == b's':
318-
string_chunk[js, parser._current_row_in_chunk_index] = source[start:(start+lngt)].tostring().rstrip()
319-
js += 1
320-
else:
321-
raise ValueError("unknown column type: %s" % parser.columns[j].ctype)
322-
323-
parser._current_row_on_page_index += 1
324-
parser._current_row_in_chunk_index += 1
325-
parser._current_row_in_file_index += 1
336+
raise ValueError("unknown page type: %s",
337+
self.parser._current_page_type)
338+
339+
cdef void process_byte_array_with_data(self, int offset, int length):
340+
341+
cdef:
342+
long s, j, k, m, jb, js, lngt, start
343+
np.ndarray[uint8_t, ndim=1] source
344+
long[:] column_types
345+
long[:] lengths
346+
long[:] offsets
347+
uint8_t[:, :] byte_chunk
348+
object[:, :] string_chunk
349+
350+
source = np.frombuffer(self.cached_page[offset:offset+length], dtype=np.uint8)
351+
352+
if self.decompress != NULL and (length < self.row_length):
353+
source = self.decompress(self.row_length, source)
354+
355+
column_types = self.column_types
356+
lengths = self.lengths
357+
offsets = self.offsets
358+
byte_chunk = self.byte_chunk
359+
string_chunk = self.string_chunk
360+
s = 8 * self.current_row_in_chunk_index
361+
js = 0
362+
jb = 0
363+
for j in range(self.column_count):
364+
lngt = lengths[j]
365+
if lngt == 0:
366+
break
367+
start = offsets[j]
368+
if column_types[j] == 1:
369+
# decimal
370+
if self.is_little_endian:
371+
m = s + 8 - lngt
372+
else:
373+
m = s
374+
for k in range(lngt):
375+
byte_chunk[jb, m + k] = source[start + k]
376+
jb += 1
377+
elif column_types[j] == 2:
378+
# string
379+
string_chunk[js, self.current_row_in_chunk_index] = source[start:(start+lngt)].tostring().rstrip()
380+
js += 1
381+
382+
self.current_row_on_page_index += 1
383+
self.current_row_in_chunk_index += 1
384+
self.current_row_in_file_index += 1

0 commit comments

Comments
 (0)