From 9574f768ca434b17b5e039699ab36d15ec1a6dc9 Mon Sep 17 00:00:00 2001 From: Jingjing Tang Date: Tue, 7 Jul 2020 11:07:37 -0400 Subject: [PATCH] add new wip signal --- google_health/delphi_google_health/export.py | 13 +-- google_health/delphi_google_health/run.py | 30 +++---- google_health/delphi_google_health/smooth.py | 86 ++++++++++++++++++++ 3 files changed, 110 insertions(+), 19 deletions(-) diff --git a/google_health/delphi_google_health/export.py b/google_health/delphi_google_health/export.py index 730ed738b..bbd1416d8 100644 --- a/google_health/delphi_google_health/export.py +++ b/google_health/delphi_google_health/export.py @@ -4,13 +4,13 @@ import numpy as np import pandas as pd -from .smooth import smoothed_values_by_geo_id +from .smooth import smoothed_values_by_geo_id, wip_smoothed_values_by_geo_id RESCALE_VAL = 4000 / 100 def export_csv( - df: pd.DataFrame, geo_name: str, sensor: str, smooth: bool, receiving_dir: str + df: pd.DataFrame, geo_name: str, sensor: str, smooth: str, receiving_dir: str ) -> None: """Export data set in format expected for injestion by the API @@ -25,16 +25,19 @@ def export_csv( name of the geographic region, such as "state" or "hrr" sensor: str name of the sensor; only used for naming the output file - smooth: bool - should the signal in "val" be smoothed? + smooth: str + choose from: "raw", "smooth", "wip" + should the signal in "val" be smoothed? which smoothing method? receiving_dir: str path to location where the output CSV files to be uploaded should be stored """ df = df.copy() - if smooth: + if smooth == "smooth": df["val"] = smoothed_values_by_geo_id(df) + elif smooth == "wip": + df["val"] = wip_smoothed_values_by_geo_id(df) df["val"] /= RESCALE_VAL df["se"] = np.nan diff --git a/google_health/delphi_google_health/run.py b/google_health/delphi_google_health/run.py index d7b9ec42d..7e79a9bcb 100644 --- a/google_health/delphi_google_health/run.py +++ b/google_health/delphi_google_health/run.py @@ -14,7 +14,6 @@ from .map_values import derived_counts_from_dma from .export import export_csv - def run_module(): """Main function run when calling the module. @@ -23,7 +22,7 @@ def run_module(): testing purposes). """ - #  read parameters + # read parameters params = read_params() ght_key = params["ght_key"] start_date = params["start_date"] @@ -44,7 +43,7 @@ def run_module(): # setup class to handle API calls ght = GoogleHealthTrends(ght_key=ght_key) - #  read data frame version of the data + # read data frame version of the data df_state = get_counts_states( ght, start_date, end_date, static_dir=static_dir, cache_dir=cache_dir ) @@ -53,17 +52,20 @@ def run_module(): ) df_hrr, df_msa = derived_counts_from_dma(df_dma, static_dir=static_dir) - #  export each geographic region, with both smoothed and unsmoothed data - export_csv(df_state, "state", "raw_search", smooth=False, receiving_dir=export_dir) - export_csv( - df_state, "state", "smoothed_search", smooth=True, receiving_dir=export_dir - ) + # export each geographic region, with both smoothed and unsmoothed data + + export_csv(df_state, "state", "raw_search", smooth="raw", receiving_dir=export_dir) + export_csv(df_state, "state", "smoothed_search", smooth="smooth", receiving_dir=export_dir) + export_csv(df_state, "state", "wip_smoothed_search", smooth="wip", receiving_dir=export_dir) - export_csv(df_dma, "dma", "raw_search", smooth=False, receiving_dir=export_dir) - export_csv(df_dma, "dma", "smoothed_search", smooth=True, receiving_dir=export_dir) + export_csv(df_dma, "dma", "raw_search", smooth="raw", receiving_dir=export_dir) + export_csv(df_dma, "dma", "smoothed_search", smooth="smooth", receiving_dir=export_dir) + export_csv(df_dma, "dma", "wip_smoothed_search", smooth="wip", receiving_dir=export_dir) - export_csv(df_hrr, "hrr", "raw_search", smooth=False, receiving_dir=export_dir) - export_csv(df_hrr, "hrr", "smoothed_search", smooth=True, receiving_dir=export_dir) + export_csv(df_hrr, "hrr", "raw_search", smooth="raw", receiving_dir=export_dir) + export_csv(df_hrr, "hrr", "smoothed_search", smooth="smooth", receiving_dir=export_dir) + export_csv(df_hrr, "hrr", "wip_smoothed_search", smooth="wip", receiving_dir=export_dir) - export_csv(df_msa, "msa", "raw_search", smooth=False, receiving_dir=export_dir) - export_csv(df_msa, "msa", "smoothed_search", smooth=True, receiving_dir=export_dir) + export_csv(df_msa, "msa", "raw_search", smooth="raw", receiving_dir=export_dir) + export_csv(df_msa, "msa", "smoothed_search", smooth="smooth", receiving_dir=export_dir) + export_csv(df_msa, "msa", "wip_smoothed_search", smooth="wip", receiving_dir=export_dir) diff --git a/google_health/delphi_google_health/smooth.py b/google_health/delphi_google_health/smooth.py index e34484ecf..defb7c0f7 100644 --- a/google_health/delphi_google_health/smooth.py +++ b/google_health/delphi_google_health/smooth.py @@ -74,3 +74,89 @@ def _left_gauss_linear(s: np.ndarray, h=10, impute=False, minval=None) -> np.nda if minval is not None: t[t <= minval] = minval return t + +def wip_smoothed_values_by_geo_id(df: pd.DataFrame, p = 2) -> np.ndarray: + """Computes a smoothed version of the variable 'val' within unique values of 'geo_id' + + Currently uses a local weighted least squares, where the weights are given + by a Gaussian kernel. + + Parameters + ---------- + df: pd.DataFrame + a data frame with columns "geo_id", "timestamp", and "val" + p: float + Smoothing window size = 2p + 1 + + Returns + ------- + np.ndarray + A one-dimensional numpy array containing the smoothed values. + """ + df = df.copy() + df["val_smooth"] = 0 + for geo_id in df["geo_id"].unique(): + df.loc[df["geo_id"] == geo_id, "val_smooth"] = _centre_gauss_linear( + s=df[df["geo_id"] == geo_id]["val"].values, p = p, h=10, impute=True, minval=0 + ) + return df["val_smooth"].values + +def gaussian_kernel(x, h): + """ + x: np.ndarray + An inreasing sequence with symmetric absolute values. + p: float + Smoothing window size = 2p + 1 + """ + + return np.exp(-(x ** 2) / (h ** 2)) + +def _centre_gauss_linear(s: np.ndarray, p = 2, h=10, impute=False, minval=None) -> np.ndarray: + """Local weighted least squares, where the weights are given by a Gaussian kernel. + + At each time t, we use the data from times 1, ..., t-dt, weighted + using the Gaussian kernel, to produce the estimate at time t. + + Parameters + ---------- + s: np.ndarray + Input data. Assumed to be ordered and on an equally spaced grid. + p: float + Smoothing window size = 2p + 1 + h: float + Bandwidth + impute: bool + Whether to set the fitted value at idx=0 to s[0]. (The local linear + estimate is ill-defined for a single data point). + minval: int + Enforce a minimum value; for example, used for sensors which are + nonnegative by definition. + + Returns + ------- + np.ndarray + the fitted values + """ + + assert h > 0, "Bandwidth must be positive" + + n = len(s) + t = np.zeros_like(s, dtype=np.float64) + X = np.vstack([np.ones(n), np.arange(n)]).T + for idx in range(n): + kernel_key = np.arange(n) - idx + wts = gaussian_kernel(kernel_key, h) + left = max(0, idx-p) + right = min(n, idx + p+1) + XwX = np.dot(X[left:right, :].T * wts[left:right], X[left:right, :]) + Xwy = np.dot(X[left:right, :].T * wts[left:right], s[left:right].reshape(-1, 1)) + try: + beta = np.linalg.solve(XwX, Xwy) + t[idx] = np.dot(X[left:right, :], beta)[idx - left] + except np.linalg.LinAlgError: + # At idx 0, method will fail due to rank deficiency. + t[idx] = s[idx] if impute else np.nan + if minval is not None: + t[t <= minval] = minval + return t +