forked from pandas-dev/pandas
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathwriters.pyx
171 lines (141 loc) · 4.3 KB
/
writers.pyx
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
import cython
import numpy as np
from cpython cimport PyBytes_GET_SIZE, PyUnicode_GET_LENGTH
from numpy cimport ndarray, uint8_t
ctypedef fused pandas_string:
str
bytes
@cython.boundscheck(False)
@cython.wraparound(False)
def write_csv_rows(
list data,
ndarray data_index,
Py_ssize_t nlevels,
ndarray cols,
object writer
):
"""
Write the given data to the writer object, pre-allocating where possible
for performance improvements.
Parameters
----------
data : list
data_index : ndarray
nlevels : int
cols : ndarray
writer : object
"""
# In crude testing, N>100 yields little marginal improvement
cdef:
Py_ssize_t i, j = 0, k = len(data_index), N = 100, ncols = len(cols)
list rows
# pre-allocate rows
rows = [[None] * (nlevels + ncols) for _ in range(N)]
if nlevels == 1:
for j in range(k):
row = rows[j % N]
row[0] = data_index[j]
for i in range(ncols):
row[1 + i] = data[i][j]
if j >= N - 1 and j % N == N - 1:
writer.writerows(rows)
elif nlevels > 1:
for j in range(k):
row = rows[j % N]
row[:nlevels] = list(data_index[j])
for i in range(ncols):
row[nlevels + i] = data[i][j]
if j >= N - 1 and j % N == N - 1:
writer.writerows(rows)
else:
for j in range(k):
row = rows[j % N]
for i in range(ncols):
row[i] = data[i][j]
if j >= N - 1 and j % N == N - 1:
writer.writerows(rows)
if j >= 0 and (j < N - 1 or (j % N) != N - 1):
writer.writerows(rows[:((j + 1) % N)])
@cython.boundscheck(False)
@cython.wraparound(False)
def convert_json_to_lines(arr: object) -> str:
"""
replace comma separated json with line feeds, paying special attention
to quotes & brackets
"""
cdef:
Py_ssize_t i = 0, num_open_brackets_seen = 0, length
bint in_quotes = False, is_escaping = False
ndarray[uint8_t, ndim=1] narr
unsigned char val, newline, comma, left_bracket, right_bracket, quote
unsigned char backslash
newline = ord('\n')
comma = ord(',')
left_bracket = ord('{')
right_bracket = ord('}')
quote = ord('"')
backslash = ord('\\')
narr = np.frombuffer(arr.encode('utf-8'), dtype='u1').copy()
length = narr.shape[0]
for i in range(length):
val = narr[i]
if val == quote and i > 0 and not is_escaping:
in_quotes = ~in_quotes
if val == backslash or is_escaping:
is_escaping = ~is_escaping
if val == comma: # commas that should be \n
if num_open_brackets_seen == 0 and not in_quotes:
narr[i] = newline
elif val == left_bracket:
if not in_quotes:
num_open_brackets_seen += 1
elif val == right_bracket:
if not in_quotes:
num_open_brackets_seen -= 1
return narr.tobytes().decode('utf-8')
# stata, pytables
@cython.boundscheck(False)
@cython.wraparound(False)
def max_len_string_array(pandas_string[:] arr) -> Py_ssize_t:
"""
Return the maximum size of elements in a 1-dim string array.
"""
cdef:
Py_ssize_t i, m = 0, l = 0, length = arr.shape[0]
pandas_string val
for i in range(length):
val = arr[i]
l = word_len(val)
if l > m:
m = l
return m
cpdef inline Py_ssize_t word_len(object val):
"""
Return the maximum length of a string or bytes value.
"""
cdef:
Py_ssize_t l = 0
if isinstance(val, str):
l = PyUnicode_GET_LENGTH(val)
elif isinstance(val, bytes):
l = PyBytes_GET_SIZE(val)
return l
# ------------------------------------------------------------------
# PyTables Helpers
@cython.boundscheck(False)
@cython.wraparound(False)
def string_array_replace_from_nan_rep(
ndarray[object, ndim=1] arr,
object nan_rep,
object replace=np.nan
):
"""
Replace the values in the array with 'replacement' if
they are 'nan_rep'. Return the same array.
"""
cdef:
Py_ssize_t length = len(arr), i = 0
for i in range(length):
if arr[i] == nan_rep:
arr[i] = replace
return arr