@@ -646,3 +646,46 @@ def get_geos_within(
646
646
"must be one of (state, nation), (state, hhs), (county, state)"
647
647
", (fips, state), (chng-fips, state)"
648
648
)
649
+
650
+ def aggregate_by_weighted_sum (
651
+ self , df : pd .DataFrame , to_geo : str , sensor : str , population_column : str
652
+ ) -> pd .DataFrame :
653
+ """Aggregate sensor, weighted by time-dependent population.
654
+
655
+ Note: This function generates its own population weights and adjusts the
656
+ weights based on which data is NA. This is in contrast to the
657
+ `replace_geocode` function, which assumes that the weights are already
658
+ present in the data and does not adjust for missing data (see the
659
+ docstring for the GeoMapper class).
660
+
661
+ Parameters
662
+ ---------
663
+ df: pd.DataFrame
664
+ Input dataframe, assumed to have a sensor column (e.g. "visits"), a
665
+ to_geo column (e.g. "state"), and a population column (corresponding
666
+ to a from_geo, e.g. "wastewater collection site").
667
+ to_geo: str
668
+ The column name of the geocode to aggregate to.
669
+ sensor: str
670
+ The column name of the sensor to aggregate.
671
+ population_column: str
672
+ The column name of the population to weight the sensor by.
673
+
674
+ Returns
675
+ ---------
676
+ agg_df: pd.DataFrame
677
+ A dataframe with the aggregated sensor values, weighted by population.
678
+ """
679
+ # Zero-out populations where the sensor is NA
680
+ df [f"relevant_pop_{ sensor } " ] = df [population_column ] * df [sensor ].abs ().notna ()
681
+ # Weight the sensor by the population
682
+ df [f"weighted_{ sensor } " ] = df [sensor ] * df [f"relevant_pop_{ sensor } " ]
683
+ agg_df = df .groupby (["timestamp" , to_geo ]).agg (
684
+ {
685
+ f"relevant_pop_{ sensor } " : "sum" ,
686
+ f"weighted_{ sensor } " : lambda x : x .sum (min_count = 1 ),
687
+ }
688
+ )
689
+ agg_df ["val" ] = agg_df [f"weighted_{ sensor } " ] / agg_df [f"relevant_pop_{ sensor } " ]
690
+ agg_df = agg_df .reset_index ()
691
+ return agg_df
0 commit comments