@@ -645,3 +645,41 @@ def get_geos_within(
645
645
"must be one of (state, nation), (state, hhs), (county, state)"
646
646
", (fips, state), (chng-fips, state)"
647
647
)
648
+
649
+ def aggregate_by_weighted_sum (df : pd .DataFrame , to_geo : str , sensor : str , population_column : str ) -> pd .DataFrame :
650
+ """Aggregate sensor, weighted by time-dependent population.
651
+
652
+ Note: This function generates its own population weights and adjusts the
653
+ weights based on which data is NA. This is in contrast to the
654
+ `replace_geocode` function, which assumes that the weights are already
655
+ present in the data and does not adjust for missing data (see the
656
+ docstring for the GeoMapper class).
657
+
658
+ Parameters
659
+ ---------
660
+ df: pd.DataFrame
661
+ Input dataframe, assumed to have a sensor column (e.g. "visits"), a
662
+ to_geo column (e.g. "state"), and a population column (corresponding
663
+ to a from_geo, e.g. "wastewater collection site").
664
+ to_geo: str
665
+ The column name of the geocode to aggregate to.
666
+ sensor: str
667
+ The column name of the sensor to aggregate.
668
+ population_column: str
669
+ The column name of the population to weight the sensor by.
670
+
671
+ Returns
672
+ ---------
673
+ agg_df: pd.DataFrame
674
+ A dataframe with the aggregated sensor values, weighted by population.
675
+ """
676
+ # Zero-out populations where the sensor is NA
677
+ df [f"relevant_pop_{ sensor } " ] = df [population_column ] * df [sensor ].abs ().notna ()
678
+ # Weight the sensor by the population
679
+ df [f"weighted_{ sensor } " ] = df [sensor ] * df [f"relevant_pop_{ sensor } " ]
680
+ agg_df = df .groupby (["timestamp" , to_geo ]).agg (
681
+ {f"relevant_pop_{ sensor } " : "sum" , f"weighted_{ sensor } " : lambda x : x .sum (min_count = 1 )}
682
+ )
683
+ agg_df ["val" ] = agg_df [f"weighted_{ sensor } " ] / agg_df [f"relevant_pop_{ sensor } " ]
684
+ agg_df = agg_df .reset_index ()
685
+ return agg_df
0 commit comments