@@ -5,6 +5,8 @@ import cython
5
5
from libc.math cimport round
6
6
from libcpp.deque cimport deque
7
7
8
+ from pandas._libs.algos cimport TiebreakEnumType
9
+
8
10
import numpy as np
9
11
10
12
cimport numpy as cnp
@@ -50,6 +52,8 @@ cdef extern from "../src/skiplist.h":
50
52
double skiplist_get(skiplist_t* , int , int * ) nogil
51
53
int skiplist_insert(skiplist_t* , double ) nogil
52
54
int skiplist_remove(skiplist_t* , double ) nogil
55
+ int skiplist_rank(skiplist_t* , double ) nogil
56
+ int skiplist_min_rank(skiplist_t* , double ) nogil
53
57
54
58
cdef:
55
59
float32_t MINfloat32 = np.NINF
@@ -795,7 +799,7 @@ def roll_median_c(const float64_t[:] values, ndarray[int64_t] start,
795
799
val = values[j]
796
800
if notnan(val):
797
801
nobs += 1
798
- err = skiplist_insert(sl, val) ! = 1
802
+ err = skiplist_insert(sl, val) == - 1
799
803
if err:
800
804
break
801
805
@@ -806,7 +810,7 @@ def roll_median_c(const float64_t[:] values, ndarray[int64_t] start,
806
810
val = values[j]
807
811
if notnan(val):
808
812
nobs += 1
809
- err = skiplist_insert(sl, val) ! = 1
813
+ err = skiplist_insert(sl, val) == - 1
810
814
if err:
811
815
break
812
816
@@ -1139,6 +1143,122 @@ def roll_quantile(const float64_t[:] values, ndarray[int64_t] start,
1139
1143
return output
1140
1144
1141
1145
1146
+ rolling_rank_tiebreakers = {
1147
+ " average" : TiebreakEnumType.TIEBREAK_AVERAGE,
1148
+ " min" : TiebreakEnumType.TIEBREAK_MIN,
1149
+ " max" : TiebreakEnumType.TIEBREAK_MAX,
1150
+ }
1151
+
1152
+
1153
+ def roll_rank (const float64_t[:] values , ndarray[int64_t] start ,
1154
+ ndarray[int64_t] end , int64_t minp , bint percentile ,
1155
+ str method , bint ascending ) -> np.ndarray:
1156
+ """
1157
+ O(N log(window )) implementation using skip list
1158
+
1159
+ derived from roll_quantile
1160
+ """
1161
+ cdef:
1162
+ Py_ssize_t i , j , s , e , N = len (values), idx
1163
+ float64_t rank_min = 0 , rank = 0
1164
+ int64_t nobs = 0 , win
1165
+ float64_t val
1166
+ skiplist_t *skiplist
1167
+ float64_t[::1] output
1168
+ TiebreakEnumType rank_type
1169
+
1170
+ try:
1171
+ rank_type = rolling_rank_tiebreakers[method]
1172
+ except KeyError:
1173
+ raise ValueError(f"Method '{method}' is not supported")
1174
+
1175
+ is_monotonic_increasing_bounds = is_monotonic_increasing_start_end_bounds(
1176
+ start, end
1177
+ )
1178
+ # we use the Fixed/Variable Indexer here as the
1179
+ # actual skiplist ops outweigh any window computation costs
1180
+ output = np.empty(N, dtype = np.float64)
1181
+
1182
+ win = (end - start).max()
1183
+ if win == 0:
1184
+ output[:] = NaN
1185
+ return np.asarray(output )
1186
+ skiplist = skiplist_init(< int > win)
1187
+ if skiplist == NULL:
1188
+ raise MemoryError("skiplist_init failed")
1189
+
1190
+ with nogil:
1191
+ for i in range(N ):
1192
+ s = start[i]
1193
+ e = end[i]
1194
+
1195
+ if i == 0 or not is_monotonic_increasing_bounds:
1196
+ if not is_monotonic_increasing_bounds:
1197
+ nobs = 0
1198
+ skiplist_destroy(skiplist)
1199
+ skiplist = skiplist_init(< int > win)
1200
+
1201
+ # setup
1202
+ for j in range (s, e):
1203
+ val = values[j] if ascending else - values[j]
1204
+ if notnan(val):
1205
+ nobs += 1
1206
+ rank = skiplist_insert(skiplist, val)
1207
+ if rank == - 1 :
1208
+ raise MemoryError (" skiplist_insert failed" )
1209
+ if rank_type == TiebreakEnumType.TIEBREAK_AVERAGE:
1210
+ # The average rank of `val` is the sum of the ranks of all
1211
+ # instances of `val` in the skip list divided by the number
1212
+ # of instances. The sum of consecutive integers from 1 to N
1213
+ # is N * (N + 1) / 2.
1214
+ # The sum of the ranks is the sum of integers from the
1215
+ # lowest rank to the highest rank, which is the sum of
1216
+ # integers from 1 to the highest rank minus the sum of
1217
+ # integers from 1 to one less than the lowest rank.
1218
+ rank_min = skiplist_min_rank(skiplist, val)
1219
+ rank = (((rank * (rank + 1 ) / 2 )
1220
+ - ((rank_min - 1 ) * rank_min / 2 ))
1221
+ / (rank - rank_min + 1 ))
1222
+ elif rank_type == TiebreakEnumType.TIEBREAK_MIN:
1223
+ rank = skiplist_min_rank(skiplist, val)
1224
+ else :
1225
+ rank = NaN
1226
+
1227
+ else :
1228
+ # calculate deletes
1229
+ for j in range (start[i - 1 ], s):
1230
+ val = values[j] if ascending else - values[j]
1231
+ if notnan(val):
1232
+ skiplist_remove(skiplist, val)
1233
+ nobs -= 1
1234
+
1235
+ # calculate adds
1236
+ for j in range (end[i - 1 ], e):
1237
+ val = values[j] if ascending else - values[j]
1238
+ if notnan(val):
1239
+ nobs += 1
1240
+ rank = skiplist_insert(skiplist, val)
1241
+ if rank == - 1 :
1242
+ raise MemoryError (" skiplist_insert failed" )
1243
+ if rank_type == TiebreakEnumType.TIEBREAK_AVERAGE:
1244
+ rank_min = skiplist_min_rank(skiplist, val)
1245
+ rank = (((rank * (rank + 1 ) / 2 )
1246
+ - ((rank_min - 1 ) * rank_min / 2 ))
1247
+ / (rank - rank_min + 1 ))
1248
+ elif rank_type == TiebreakEnumType.TIEBREAK_MIN:
1249
+ rank = skiplist_min_rank(skiplist, val)
1250
+ else :
1251
+ rank = NaN
1252
+ if nobs >= minp:
1253
+ output[i] = rank / nobs if percentile else rank
1254
+ else :
1255
+ output[i] = NaN
1256
+
1257
+ skiplist_destroy(skiplist)
1258
+
1259
+ return np.asarray(output)
1260
+
1261
+
1142
1262
def roll_apply (object obj ,
1143
1263
ndarray[int64_t] start , ndarray[int64_t] end ,
1144
1264
int64_t minp ,
0 commit comments