@@ -34,166 +34,6 @@ cpdef check_result_array(object obj, object dtype):
34
34
raise ValueError (" Must produce aggregated value" )
35
35
36
36
37
- cdef class _BaseGrouper:
38
- cdef _check_dummy(self , object dummy):
39
- # both values and index must be an ndarray!
40
-
41
- values = dummy.values
42
- # GH 23683: datetimetz types are equivalent to datetime types here
43
- if (dummy.dtype != self .arr.dtype
44
- and values.dtype != self .arr.dtype):
45
- raise ValueError (' Dummy array must be same dtype' )
46
- if is_array(values) and not values.flags.contiguous:
47
- # e.g. Categorical has no `flags` attribute
48
- values = values.copy()
49
- index = dummy.index.values
50
- if not index.flags.contiguous:
51
- index = index.copy()
52
-
53
- return values, index
54
-
55
- cdef _init_dummy_series_and_index(self , Slider islider, Slider vslider):
56
- """
57
- Create Series and Index objects that we will alter in-place while iterating.
58
- """
59
- cached_index = self .ityp(islider.buf, dtype = self .idtype)
60
- cached_series = self .typ(
61
- vslider.buf, dtype = vslider.buf.dtype, index = cached_index, name = self .name
62
- )
63
- return cached_index, cached_series
64
-
65
- cdef inline _update_cached_objs(self , object cached_series, object cached_index,
66
- Slider islider, Slider vslider):
67
- cached_index._engine.clear_mapping()
68
- cached_index._cache.clear() # e.g. inferred_freq must go
69
- cached_series._mgr.set_values(vslider.buf)
70
-
71
- cdef inline object _apply_to_group(self ,
72
- object cached_series, object cached_index,
73
- bint initialized):
74
- """
75
- Call self.f on our new group, then update to the next group.
76
- """
77
- cdef:
78
- object res
79
-
80
- # NB: we assume that _update_cached_objs has already cleared cleared
81
- # the cache and engine mapping
82
- res = self .f(cached_series)
83
- res = extract_result(res)
84
- if not initialized:
85
- # On the first pass, we check the output shape to see
86
- # if this looks like a reduction.
87
- initialized = True
88
- check_result_array(res, cached_series.dtype)
89
-
90
- return res, initialized
91
-
92
-
93
- cdef class SeriesGrouper(_BaseGrouper):
94
- """
95
- Performs generic grouping operation while avoiding ndarray construction
96
- overhead
97
- """
98
- cdef:
99
- Py_ssize_t nresults, ngroups
100
-
101
- cdef public:
102
- ndarray arr, index, dummy_arr, dummy_index
103
- object f, labels, values, typ, ityp, name, idtype
104
-
105
- def __init__ (self , object series , object f , ndarray[intp_t] labels ,
106
- Py_ssize_t ngroups ):
107
-
108
- if len (series) == 0 :
109
- # get_result would never assign `result`
110
- raise ValueError (" SeriesGrouper requires non-empty `series`" )
111
-
112
- self .labels = labels
113
- self .f = f
114
-
115
- values = series.values
116
- if is_array(values) and not values.flags.c_contiguous:
117
- # e.g. Categorical has no `flags` attribute
118
- values = values.copy(' C' )
119
- self .arr = values
120
- self .typ = series._constructor
121
- self .ityp = series.index._constructor
122
- self .idtype = series.index.dtype
123
- self .index = series.index.values
124
- self .name = series.name
125
-
126
- dummy = series.iloc[:0 ]
127
- self .dummy_arr, self .dummy_index = self ._check_dummy(dummy)
128
- self .ngroups = ngroups
129
-
130
- def get_result (self ):
131
- cdef:
132
- # Define result to avoid UnboundLocalError
133
- ndarray arr, result = None
134
- ndarray[intp_t] labels
135
- ndarray[int64_t] counts
136
- Py_ssize_t i, n, group_size, lab, start, end
137
- object res
138
- bint initialized = 0
139
- Slider vslider, islider
140
- object cached_series = None , cached_index = None
141
-
142
- labels = self .labels
143
- counts = np.zeros(self .ngroups, dtype = np.int64)
144
- group_size = 0
145
- n = len (self .arr)
146
-
147
- vslider = Slider(self .arr, self .dummy_arr)
148
- islider = Slider(self .index, self .dummy_index)
149
-
150
- result = np.empty(self .ngroups, dtype = ' O' )
151
-
152
- cached_index, cached_series = self ._init_dummy_series_and_index(
153
- islider, vslider
154
- )
155
-
156
- start = 0
157
- try :
158
- for i in range (n):
159
- group_size += 1
160
-
161
- lab = labels[i]
162
-
163
- if i == n - 1 or lab != labels[i + 1 ]:
164
- if lab == - 1 :
165
- start += group_size
166
- group_size = 0
167
- continue
168
-
169
- end = start + group_size
170
- islider.move(start, end)
171
- vslider.move(start, end)
172
-
173
- self ._update_cached_objs(
174
- cached_series, cached_index, islider, vslider)
175
-
176
- res, initialized = self ._apply_to_group(cached_series, cached_index,
177
- initialized)
178
-
179
- start += group_size
180
-
181
- result[lab] = res
182
- counts[lab] = group_size
183
- group_size = 0
184
-
185
- finally :
186
- # so we don't free the wrong memory
187
- islider.reset()
188
- vslider.reset()
189
-
190
- # We check for empty series in the constructor, so should always
191
- # have result initialized by this point.
192
- assert initialized, " `result` has not been initialized."
193
-
194
- return result, counts
195
-
196
-
197
37
cpdef inline extract_result(object res):
198
38
""" extract the result object, it might be a 0-dim ndarray
199
39
or a len-1 0-dim, or a scalar """
@@ -208,40 +48,3 @@ cpdef inline extract_result(object res):
208
48
# see test_resampler_grouper.py::test_apply
209
49
res = res[0 ]
210
50
return res
211
-
212
-
213
- cdef class Slider:
214
- """
215
- Only handles contiguous data for now
216
- """
217
- cdef:
218
- ndarray values, buf
219
- Py_ssize_t stride
220
- char * orig_data
221
-
222
- def __init__ (self , ndarray values , ndarray buf ):
223
- assert values.ndim == 1
224
- assert values.dtype == buf.dtype
225
-
226
- if not values.flags.contiguous:
227
- values = values.copy()
228
-
229
- self .values = values
230
- self .buf = buf
231
-
232
- self .stride = values.strides[0 ]
233
- self .orig_data = self .buf.data
234
-
235
- self .buf.data = self .values.data
236
- self .buf.strides[0 ] = self .stride
237
-
238
- cdef move(self , int start, int end):
239
- """
240
- For slicing
241
- """
242
- self .buf.data = self .values.data + self .stride * start
243
- self .buf.shape[0 ] = end - start
244
-
245
- cdef reset(self ):
246
- self .buf.data = self .orig_data
247
- self .buf.shape[0 ] = 0
0 commit comments