@@ -467,6 +467,7 @@ def groupby(ingredient: BaseIngredient, *, result, **options) -> ProcedureResult
467
467
468
468
newdata = dict ()
469
469
470
+ # TODO: support apply function to all items?
470
471
if comp_type == 'aggregate' :
471
472
for k , func in options [comp_type ].items ():
472
473
func = mkfunc (func )
@@ -487,6 +488,60 @@ def groupby(ingredient: BaseIngredient, *, result, **options) -> ProcedureResult
487
488
return ProcedureResult (result , newkey , data = newdata )
488
489
489
490
491
+ def window (ingredient : BaseIngredient , result , ** options ) -> ProcedureResult :
492
+ """apply functions on a rolling window
493
+
494
+ available options:
495
+ window: dictionary
496
+ window definition. options are:
497
+ column: str, column which window is created from
498
+ size: int or 'expanding', if int then rolling window, if expanding then expanding window
499
+ min_periods: int, as in pandas
500
+ center: bool, as in pandas
501
+ aggregate: dictionary
502
+ aggregation functions, format should be
503
+ column: func or column: {function: func, param1: foo, param2: baz, ...}
504
+ """
505
+
506
+ logger .info ('window: ' + ingredient .ingred_id )
507
+
508
+ # reading options
509
+ window = options .pop ('window' )
510
+ aggregate = options .pop ('aggregate' )
511
+
512
+ column = read_opt (window , 'column' , required = True )
513
+ size = read_opt (window , 'size' , required = True )
514
+ min_periods = read_opt (window , 'min_periods' , default = None )
515
+ center = read_opt (window , 'center' , default = False )
516
+
517
+ data = ingredient .get_data ()
518
+ newdata = dict ()
519
+
520
+ for k , func in aggregate .items ():
521
+ f = mkfunc (func )
522
+ # keys for grouping. in multidimensional data like datapoints, we want create
523
+ # groups before rolling. Just group all key column except the column to aggregate.
524
+ keys = ingredient .key_to_list ()
525
+ keys .remove (column )
526
+ df = data [k ].set_index (ingredient .key_to_list ())
527
+ levels = [df .index .names .index (x ) for x in keys ]
528
+ if size == 'expanding' :
529
+ newdata [k ] = (df .groupby (level = levels , group_keys = False )
530
+ .expanding (on = column , min_periods = min_periods , center = center )
531
+ .agg (func ).reset_index ().dropna ())
532
+ else :
533
+ # There is a bug when running rolling on with groupby in pandas.
534
+ # see https://github.com/pandas-dev/pandas/issues/13966
535
+ # We will implement this later when we found work around or it's fixed
536
+ # for now, the we assume only 2 dimensions in the dataframe and don't
537
+ # use the `on` parameter in rolling.
538
+ if len (df .index .names ) > 2 :
539
+ raise NotImplementedError ('Not supporting more than 2 dimensions for now.' )
540
+ newdata [k ] = (df .groupby (level = levels , group_keys = False )
541
+ .rolling (window = size , min_periods = min_periods , center = center )
542
+ .agg (func ).reset_index ().dropna ())
543
+ return ProcedureResult (result , ingredient .key , newdata )
544
+
490
545
def accumulate (ingredient : BaseIngredient , * , result = None , ** options ) -> ProcedureResult :
491
546
"""run accumulate function on ingredient data.
492
547
0 commit comments