diff --git a/_delphi_utils_python/.pylintrc b/_delphi_utils_python/.pylintrc index f30837c7e..ad0180ed7 100644 --- a/_delphi_utils_python/.pylintrc +++ b/_delphi_utils_python/.pylintrc @@ -12,9 +12,9 @@ disable=logging-format-interpolation, [BASIC] # Allow arbitrarily short-named variables. -variable-rgx=[a-z_][a-z0-9_]* -argument-rgx=[a-z_][a-z0-9_]* -attr-rgx=[a-z_][a-z0-9_]* +variable-rgx=([a-z_][a-z0-9_]*|[a-zA-Z]) +argument-rgx=([a-z_][a-z0-9_]*|[a-zA-Z]) +attr-rgx=([a-z_][a-z0-9_]*|[a-zA-Z]) [DESIGN] diff --git a/_delphi_utils_python/delphi_utils/__init__.py b/_delphi_utils_python/delphi_utils/__init__.py index 314cb8296..b5c06daa0 100644 --- a/_delphi_utils_python/delphi_utils/__init__.py +++ b/_delphi_utils_python/delphi_utils/__init__.py @@ -13,5 +13,6 @@ from .smooth import Smoother from .signal import add_prefix from .nancodes import Nans +from .weekday import Weekday __version__ = "0.2.1" diff --git a/doctor_visits/delphi_doctor_visits/weekday.py b/_delphi_utils_python/delphi_utils/weekday.py similarity index 65% rename from doctor_visits/delphi_doctor_visits/weekday.py rename to _delphi_utils_python/delphi_utils/weekday.py index 529454f80..ba5d75815 100644 --- a/doctor_visits/delphi_doctor_visits/weekday.py +++ b/_delphi_utils_python/delphi_utils/weekday.py @@ -3,23 +3,48 @@ Created: 2020-05-06 """ - - - -# third party import cvxpy as cp -from cvxpy.error import SolverError import numpy as np - -# first party -from .config import Config +from cvxpy.error import SolverError class Weekday: """Class to handle weekday effects.""" @staticmethod - def get_params(data, logger): + def get_params(data, denominator_col, numerator_cols, date_col, scales, logger): + r"""Fit weekday correction for each col in numerator_cols. + + Return a matrix of parameters: the entire vector of betas, for each time + series column in the data. + """ + tmp = data.reset_index() + denoms = tmp.groupby(date_col).sum()[denominator_col] + nums = tmp.groupby(date_col).sum()[numerator_cols] + + # Construct design matrix to have weekday indicator columns and then day + # indicators. + X = np.zeros((nums.shape[0], 6 + nums.shape[0])) + not_sunday = np.where(nums.index.dayofweek != 6)[0] + X[not_sunday, np.array(nums.index.dayofweek)[not_sunday]] = 1 + X[np.where(nums.index.dayofweek == 6)[0], :6] = -1 + X[:, 6:] = np.eye(X.shape[0]) + + npnums, npdenoms = np.array(nums), np.array(denoms) + params = np.zeros((nums.shape[1], X.shape[1])) + + # Loop over the available numerator columns and smooth each separately. + for i in range(nums.shape[1]): + result = Weekday._fit(X, scales, npnums[:, i], npdenoms) + if result is None: + logger.error("Unable to calculate weekday correction") + else: + params[i,:] = result + + return params + + @staticmethod + def _fit(X, scales, npnums, npdenoms): r"""Correct a signal estimated as numerator/denominator for weekday effects. The ordinary estimate would be numerator_t/denominator_t for each time point @@ -53,57 +78,31 @@ def get_params(data, logger): ll = (numerator * (X*b + log(denominator)) - sum(exp(X*b) + log(denominator))) / num_days - - Return a matrix of parameters: the entire vector of betas, for each time - series column in the data. """ - denoms = data.groupby(Config.DATE_COL).sum()["Denominator"] - nums = data.groupby(Config.DATE_COL).sum()[Config.CLI_COLS + Config.FLU1_COL] - - # Construct design matrix to have weekday indicator columns and then day - # indicators. - X = np.zeros((nums.shape[0], 6 + nums.shape[0])) - not_sunday = np.where(nums.index.dayofweek != 6)[0] - X[not_sunday, np.array(nums.index.dayofweek)[not_sunday]] = 1 - X[np.where(nums.index.dayofweek == 6)[0], :6] = -1 - X[:, 6:] = np.eye(X.shape[0]) - - npnums, npdenoms = np.array(nums), np.array(denoms) - params = np.zeros((nums.shape[1], X.shape[1])) - - # Loop over the available numerator columns and smooth each separately. - for i in range(nums.shape[1]): - b = cp.Variable((X.shape[1])) - - lmbda = cp.Parameter(nonneg=True) - lmbda.value = 10 # Hard-coded for now, seems robust to changes - - ll = ( - cp.matmul(npnums[:, i], cp.matmul(X, b) + np.log(npdenoms)) - - cp.sum(cp.exp(cp.matmul(X, b) + np.log(npdenoms))) - ) / X.shape[0] - penalty = ( - lmbda * cp.norm(cp.diff(b[6:], 3), 1) / (X.shape[0] - 2) - ) # L-1 Norm of third differences, rewards smoothness - scales = [1, 1e5, 1e10, 1e15] - for scale in scales: - try: - prob = cp.Problem(cp.Minimize((-ll + lmbda * penalty) / scale)) - _ = prob.solve() - params[i,:] = b.value - break - except SolverError: - # If the magnitude of the objective function is too large, an error is - # thrown; Rescale the objective function by going through loop - pass - else: - # Leaving params[i,:] = 0 is equivalent to not performing weekday correction - logger.error("Unable to calculate weekday correction") - - return params + b = cp.Variable((X.shape[1])) + + lmbda = cp.Parameter(nonneg=True) + lmbda.value = 10 # Hard-coded for now, seems robust to changes + + ll = ( + cp.matmul(npnums, cp.matmul(X, b) + np.log(npdenoms)) - + cp.sum(cp.exp(cp.matmul(X, b) + np.log(npdenoms))) + ) / X.shape[0] + # L-1 Norm of third differences, rewards smoothness + penalty = lmbda * cp.norm(cp.diff(b[6:], 3), 1) / (X.shape[0] - 2) + for scale in scales: + try: + prob = cp.Problem(cp.Minimize((-ll + lmbda * penalty) / scale)) + _ = prob.solve() + return b.value + except SolverError: + # If the magnitude of the objective function is too large, an error is + # thrown; Rescale the objective function by going through loop + continue + return None @staticmethod - def calc_adjustment(params, sub_data): + def calc_adjustment(params, sub_data, cols, date_col): """Apply the weekday adjustment to a specific time series. Extracts the weekday fixed effects from the parameters and uses these to @@ -122,14 +121,15 @@ def calc_adjustment(params, sub_data): -- this has the same effect. """ - for i, c in enumerate(Config.CLI_COLS + Config.FLU1_COL): - wd_correction = np.zeros((len(sub_data[c]))) + tmp = sub_data.copy() + + for i, c in enumerate(cols): + wd_correction = np.zeros((len(tmp[c]))) for wd in range(7): - mask = sub_data[Config.DATE_COL].dt.dayofweek == wd - wd_correction[mask] = sub_data.loc[mask, c] / ( + mask = tmp[date_col].dt.dayofweek == wd + wd_correction[mask] = tmp.loc[mask, c] / ( np.exp(params[i, wd]) if wd < 6 else np.exp(-np.sum(params[i, :6])) ) - sub_data.loc[:, c] = wd_correction - - return sub_data + tmp.loc[:, c] = wd_correction + return tmp diff --git a/_delphi_utils_python/setup.py b/_delphi_utils_python/setup.py index 8d361c0de..1ef144cb4 100644 --- a/_delphi_utils_python/setup.py +++ b/_delphi_utils_python/setup.py @@ -7,6 +7,7 @@ required = [ "boto3", "covidcast", + "cvxpy", "epiweeks", "freezegun", "gitpython", diff --git a/_delphi_utils_python/tests/test_weekday.py b/_delphi_utils_python/tests/test_weekday.py new file mode 100644 index 000000000..52e6f4f7e --- /dev/null +++ b/_delphi_utils_python/tests/test_weekday.py @@ -0,0 +1,74 @@ +import logging + +import numpy as np +import pandas as pd +from delphi_utils.weekday import Weekday + + +class TestWeekday: + + TEST_DATA = pd.DataFrame({ + "num": np.arange(1, 11, 1), + "den": np.arange(11, 21, 1), + "date": pd.date_range("2020-01-01", "2020-01-10") + }) + + def test_get_params(self): + TEST_LOGGER = logging.getLogger() + + result = Weekday.get_params(self.TEST_DATA, "den", ["num"], "date", [1], TEST_LOGGER) + print(result) + expected_result = [ + -0.05993665, + -0.0727396, + -0.05618517, + 0.0343405, + 0.12534997, + 0.04561813, + -2.27669028, + -1.89564374, + -1.5695407, + -1.29838116, + -1.08216513, + -0.92089259, + -0.81456355, + -0.76317802, + -0.76673598, + -0.82523745, + ] + assert np.allclose(result, expected_result) + + def test_calc_adjustment_with_zero_parameters(self): + params = np.array([[0, 0, 0, 0, 0, 0, 0]]) + + result = Weekday.calc_adjustment(params, self.TEST_DATA, ["num"], "date") + + # Data should be unchanged when params are 0's + assert np.allclose(result["num"].values, self.TEST_DATA["num"].values) + assert np.allclose(result["den"].values, self.TEST_DATA["den"].values) + assert np.array_equal(result["date"].values, self.TEST_DATA["date"].values) + + def test_calc_adjustment(self): + params = np.array([[1, -1, 1, -1, 1, -1, 1]]) + + result = Weekday.calc_adjustment(params, self.TEST_DATA, ["num"], "date") + + print(result["num"].values) + print(result["den"].values) + expected_nums = [ + 0.36787944, + 5.43656366, + 1.10363832, + 10.87312731, + 5, + 2.20727665, + 19.0279728, + 2.94303553, + 24.46453646, + 3.67879441, + ] + + # The date and "den" column are unchanged by this function + assert np.allclose(result["num"].values, expected_nums) + assert np.allclose(result["den"].values, self.TEST_DATA["den"].values) + assert np.array_equal(result["date"].values, self.TEST_DATA["date"].values) \ No newline at end of file diff --git a/changehc/delphi_changehc/__init__.py b/changehc/delphi_changehc/__init__.py index a8df7b6ce..2846a8de5 100644 --- a/changehc/delphi_changehc/__init__.py +++ b/changehc/delphi_changehc/__init__.py @@ -13,6 +13,5 @@ from . import run from . import sensor from . import update_sensor -from . import weekday __version__ = "0.0.0" diff --git a/changehc/delphi_changehc/run.py b/changehc/delphi_changehc/run.py index c640a00f3..73670798d 100644 --- a/changehc/delphi_changehc/run.py +++ b/changehc/delphi_changehc/run.py @@ -184,7 +184,7 @@ def run_module(params: Dict[str, Dict[str, Any]]): file_dict["flu_like"],file_dict["covid_like"],dropdate_dt,"fips") more_stats = su_inst.update_sensor( data, - params["common"]["export_dir"] + params["common"]["export_dir"], ) stats.extend(more_stats) diff --git a/changehc/delphi_changehc/update_sensor.py b/changehc/delphi_changehc/update_sensor.py index 5d9bd00bb..eb68691c9 100644 --- a/changehc/delphi_changehc/update_sensor.py +++ b/changehc/delphi_changehc/update_sensor.py @@ -11,13 +11,12 @@ # third party import numpy as np import pandas as pd -from delphi_utils import GeoMapper, add_prefix, create_export_csv +from delphi_utils import GeoMapper, add_prefix, create_export_csv, Weekday # first party from .config import Config from .constants import SMOOTHED, SMOOTHED_ADJ, SMOOTHED_CLI, SMOOTHED_ADJ_CLI, NA from .sensor import CHCSensor -from .weekday import Weekday def write_to_csv(df, geo_level, write_se, day_shift, out_name, logger, output_path=".", start_date=None, end_date=None): @@ -176,8 +175,8 @@ def geo_reindex(self, data): def update_sensor(self, - data, - output_path): + data, + output_path): """Generate sensor values, and write to csv format. Args: @@ -192,14 +191,22 @@ def update_sensor(self, data.reset_index(inplace=True) data_frame = self.geo_reindex(data) # handle if we need to adjust by weekday - wd_params = Weekday.get_params(data_frame) if self.weekday else None + wd_params = Weekday.get_params( + data_frame, + "den", + ["num"], + Config.DATE_COL, + [1, 1e5], + self.logger, + ) if self.weekday else None # run sensor fitting code (maybe in parallel) if not self.parallel: dfs = [] for geo_id, sub_data in data_frame.groupby(level=0): - sub_data.reset_index(level=0,inplace=True) + sub_data.reset_index(inplace=True) if self.weekday: - sub_data = Weekday.calc_adjustment(wd_params, sub_data) + sub_data = Weekday.calc_adjustment(wd_params, sub_data, ["num"], Config.DATE_COL) + sub_data.set_index(Config.DATE_COL, inplace=True) res = CHCSensor.fit(sub_data, self.burnindate, geo_id, self.logger) res = pd.DataFrame(res).loc[final_sensor_idxs] dfs.append(res) @@ -209,9 +216,10 @@ def update_sensor(self, with Pool(n_cpu) as pool: pool_results = [] for geo_id, sub_data in data_frame.groupby(level=0,as_index=False): - sub_data.reset_index(level=0, inplace=True) + sub_data.reset_index(inplace=True) if self.weekday: - sub_data = Weekday.calc_adjustment(wd_params, sub_data) + sub_data = Weekday.calc_adjustment(wd_params, sub_data, ["num"], Config.DATE_COL) + sub_data.set_index(Config.DATE_COL, inplace=True) pool_results.append( pool.apply_async( CHCSensor.fit, args=(sub_data, self.burnindate, geo_id, self.logger), diff --git a/changehc/delphi_changehc/weekday.py b/changehc/delphi_changehc/weekday.py deleted file mode 100644 index e02997482..000000000 --- a/changehc/delphi_changehc/weekday.py +++ /dev/null @@ -1,125 +0,0 @@ -""" -Weekday effects (code from Aaron Rumack). - -Created: 2020-05-06 -""" - -# third party -import cvxpy as cp -import numpy as np -from cvxpy.error import SolverError - -# first party -from .config import Config - - -class Weekday: - """Class to handle weekday effects.""" - - @staticmethod - def get_params(data): - r"""Correct a signal estimated as numerator/denominator for weekday effects. - - The ordinary estimate would be numerator_t/denominator_t for each time point - t. Instead, model - - log(numerator_t/denominator_t) = alpha_{wd(t)} + phi_t - - where alpha is a vector of fixed effects for each weekday. For - identifiability, we constrain \sum alpha_j = 0, and to enforce this we set - Sunday's fixed effect to be the negative sum of the other weekdays. - - We estimate this as a penalized Poisson GLM problem with log link. We - rewrite the problem as - - log(numerator_t) = alpha_{wd(t)} + phi_t + log(denominator_t) - - and set a design matrix X with one row per time point. The first six columns - of X are weekday indicators; the remaining columns are the identity matrix, - so that each time point gets a unique phi. Using this X, we write - - log(numerator_t) = X beta + log(denominator_t) - - Hence the first six entries of beta correspond to alpha, and the remaining - entries to phi. - - The penalty is on the L1 norm of third differences of phi (so the third - differences of the corresponding columns of beta), to enforce smoothness. - Third differences ensure smoothness without removing peaks or valleys. - - Objective function is negative mean Poisson log likelihood plus penalty: - - ll = (numerator * (X*b + log(denominator)) - sum(exp(X*b) + log(denominator))) - / num_days - - Return a matrix of parameters: the entire vector of betas, for each time - series column in the data. - """ - tmp = data.reset_index() - denoms = tmp.groupby(Config.DATE_COL).sum()["den"] - nums = tmp.groupby(Config.DATE_COL).sum()["num"] - n_nums = 1 # only one numerator column - - # Construct design matrix to have weekday indicator columns and then day - # indicators. - X = np.zeros((nums.shape[0], 6 + nums.shape[0])) # pylint: disable=invalid-name - not_sunday = np.where(nums.index.dayofweek != 6)[0] - X[not_sunday, np.array(nums.index.dayofweek)[not_sunday]] = 1 - X[np.where(nums.index.dayofweek == 6)[0], :6] = -1 - X[:, 6:] = np.eye(X.shape[0]) - - npnums, npdenoms = np.array(nums), np.array(denoms) - params = np.zeros((n_nums, X.shape[1])) - - # fit model - b = cp.Variable((X.shape[1])) - lmbda = cp.Parameter(nonneg=True) - lmbda.value = 10 # Hard-coded for now, seems robust to changes - ll = (cp.matmul(npnums, cp.matmul(X, b) + np.log(npdenoms)) - - cp.sum(cp.exp(cp.matmul(X, b) + np.log(npdenoms))) - ) / X.shape[0] - penalty = (lmbda * cp.norm(cp.diff(b[6:], 3), 1) / (X.shape[0] - 2) - ) # L-1 Norm of third differences, rewards smoothness - try: - prob = cp.Problem(cp.Minimize(-ll + lmbda * penalty)) - _ = prob.solve() - except SolverError: - # If the magnitude of the objective function is too large, an error is - # thrown; Rescale the objective function - prob = cp.Problem(cp.Minimize((-ll + lmbda * penalty) / 1e5)) - _ = prob.solve() - params = b.value - - return params - - @staticmethod - def calc_adjustment(params, sub_data): - """Apply the weekday adjustment to a specific time series. - - Extracts the weekday fixed effects from the parameters and uses these to - adjust the time series. - - Since - - log(numerator_t / denominator_t) = alpha_{wd(t)} + phi_t, - - we have that - - numerator_t / denominator_t = exp(alpha_{wd(t)}) exp(phi_t) - - and can divide by exp(alpha_{wd(t)}) to get a weekday-corrected ratio. In - this case, we only divide the numerator, leaving the denominator unchanged - -- this has the same effect. - - """ - tmp = sub_data.reset_index() - - wd_correction = np.zeros((len(tmp["num"]))) - for wd in range(7): - mask = tmp[Config.DATE_COL].dt.dayofweek == wd - wd_correction[mask] = tmp.loc[mask, "num"] / ( - np.exp(params[wd]) if wd < 6 else np.exp(-np.sum(params[:6])) - ) - tmp.loc[:, "num"] = wd_correction - - return tmp.set_index(Config.DATE_COL) diff --git a/changehc/setup.py b/changehc/setup.py index d702874b3..e6f42e11e 100644 --- a/changehc/setup.py +++ b/changehc/setup.py @@ -4,7 +4,6 @@ required = [ "numpy", "pandas", - "cvxpy", "pydocstyle", "pytest", "pytest-cov", diff --git a/claims_hosp/delphi_claims_hosp/__init__.py b/claims_hosp/delphi_claims_hosp/__init__.py index 8a1e78f6c..194c98445 100644 --- a/claims_hosp/delphi_claims_hosp/__init__.py +++ b/claims_hosp/delphi_claims_hosp/__init__.py @@ -14,6 +14,5 @@ from . import run from . import smooth from . import update_indicator -from . import weekday __version__ = "0.1.0" diff --git a/claims_hosp/delphi_claims_hosp/run.py b/claims_hosp/delphi_claims_hosp/run.py index 5072bdcb8..58cba1c56 100644 --- a/claims_hosp/delphi_claims_hosp/run.py +++ b/claims_hosp/delphi_claims_hosp/run.py @@ -113,8 +113,11 @@ def run_module(params): params["indicator"]["write_se"], signal_name ) - updater.update_indicator(params["indicator"]["input_file"], - params["common"]["export_dir"]) + updater.update_indicator( + params["indicator"]["input_file"], + params["common"]["export_dir"], + logger, + ) logger.info("finished updating", geo = geo) elapsed_time_in_seconds = round(time.time() - start_time, 2) diff --git a/claims_hosp/delphi_claims_hosp/update_indicator.py b/claims_hosp/delphi_claims_hosp/update_indicator.py index 61c4e318e..b4169370d 100644 --- a/claims_hosp/delphi_claims_hosp/update_indicator.py +++ b/claims_hosp/delphi_claims_hosp/update_indicator.py @@ -16,10 +16,10 @@ from delphi_utils import GeoMapper # first party +from delphi_utils import Weekday from .config import Config, GeoConstants from .load_data import load_data from .indicator import ClaimsHospIndicator -from .weekday import Weekday class ClaimsHospIndicatorUpdater: @@ -133,7 +133,7 @@ def geo_reindex(self, data): data_frame.fillna(0, inplace=True) return data_frame - def update_indicator(self, input_filepath, outpath): + def update_indicator(self, input_filepath, outpath, logger): """ Generate and output indicator values. @@ -152,7 +152,14 @@ def update_indicator(self, input_filepath, outpath): data_frame = self.geo_reindex(data) # handle if we need to adjust by weekday - wd_params = Weekday.get_params(data_frame) if self.weekday else None + wd_params = Weekday.get_params( + data_frame, + "den", + ["num"], + Config.DATE_COL, + [1, 1e5], + logger, + ) if self.weekday else None # run fitting code (maybe in parallel) rates = {} @@ -160,9 +167,11 @@ def update_indicator(self, input_filepath, outpath): valid_inds = {} if not self.parallel: for geo_id, sub_data in data_frame.groupby(level=0): - sub_data.reset_index(level=0, inplace=True) + sub_data.reset_index(inplace=True) if self.weekday: - sub_data = Weekday.calc_adjustment(wd_params, sub_data) + sub_data = Weekday.calc_adjustment( + wd_params, sub_data, ["num"], Config.DATE_COL) + sub_data.set_index(Config.DATE_COL, inplace=True) res = ClaimsHospIndicator.fit(sub_data, self.burnindate, geo_id) res = pd.DataFrame(res) rates[geo_id] = np.array(res.loc[final_output_inds, "rate"]) @@ -174,9 +183,11 @@ def update_indicator(self, input_filepath, outpath): with Pool(n_cpu) as pool: pool_results = [] for geo_id, sub_data in data_frame.groupby(level=0, as_index=False): - sub_data.reset_index(level=0, inplace=True) + sub_data.reset_index(inplace=True) if self.weekday: - sub_data = Weekday.calc_adjustment(wd_params, sub_data) + sub_data = Weekday.calc_adjustment( + wd_params, sub_data, ["num"], Config.DATE_COL) + sub_data.set_index(Config.DATE_COL, inplace=True) pool_results.append( pool.apply_async( ClaimsHospIndicator.fit, diff --git a/claims_hosp/delphi_claims_hosp/weekday.py b/claims_hosp/delphi_claims_hosp/weekday.py deleted file mode 100644 index e02997482..000000000 --- a/claims_hosp/delphi_claims_hosp/weekday.py +++ /dev/null @@ -1,125 +0,0 @@ -""" -Weekday effects (code from Aaron Rumack). - -Created: 2020-05-06 -""" - -# third party -import cvxpy as cp -import numpy as np -from cvxpy.error import SolverError - -# first party -from .config import Config - - -class Weekday: - """Class to handle weekday effects.""" - - @staticmethod - def get_params(data): - r"""Correct a signal estimated as numerator/denominator for weekday effects. - - The ordinary estimate would be numerator_t/denominator_t for each time point - t. Instead, model - - log(numerator_t/denominator_t) = alpha_{wd(t)} + phi_t - - where alpha is a vector of fixed effects for each weekday. For - identifiability, we constrain \sum alpha_j = 0, and to enforce this we set - Sunday's fixed effect to be the negative sum of the other weekdays. - - We estimate this as a penalized Poisson GLM problem with log link. We - rewrite the problem as - - log(numerator_t) = alpha_{wd(t)} + phi_t + log(denominator_t) - - and set a design matrix X with one row per time point. The first six columns - of X are weekday indicators; the remaining columns are the identity matrix, - so that each time point gets a unique phi. Using this X, we write - - log(numerator_t) = X beta + log(denominator_t) - - Hence the first six entries of beta correspond to alpha, and the remaining - entries to phi. - - The penalty is on the L1 norm of third differences of phi (so the third - differences of the corresponding columns of beta), to enforce smoothness. - Third differences ensure smoothness without removing peaks or valleys. - - Objective function is negative mean Poisson log likelihood plus penalty: - - ll = (numerator * (X*b + log(denominator)) - sum(exp(X*b) + log(denominator))) - / num_days - - Return a matrix of parameters: the entire vector of betas, for each time - series column in the data. - """ - tmp = data.reset_index() - denoms = tmp.groupby(Config.DATE_COL).sum()["den"] - nums = tmp.groupby(Config.DATE_COL).sum()["num"] - n_nums = 1 # only one numerator column - - # Construct design matrix to have weekday indicator columns and then day - # indicators. - X = np.zeros((nums.shape[0], 6 + nums.shape[0])) # pylint: disable=invalid-name - not_sunday = np.where(nums.index.dayofweek != 6)[0] - X[not_sunday, np.array(nums.index.dayofweek)[not_sunday]] = 1 - X[np.where(nums.index.dayofweek == 6)[0], :6] = -1 - X[:, 6:] = np.eye(X.shape[0]) - - npnums, npdenoms = np.array(nums), np.array(denoms) - params = np.zeros((n_nums, X.shape[1])) - - # fit model - b = cp.Variable((X.shape[1])) - lmbda = cp.Parameter(nonneg=True) - lmbda.value = 10 # Hard-coded for now, seems robust to changes - ll = (cp.matmul(npnums, cp.matmul(X, b) + np.log(npdenoms)) - - cp.sum(cp.exp(cp.matmul(X, b) + np.log(npdenoms))) - ) / X.shape[0] - penalty = (lmbda * cp.norm(cp.diff(b[6:], 3), 1) / (X.shape[0] - 2) - ) # L-1 Norm of third differences, rewards smoothness - try: - prob = cp.Problem(cp.Minimize(-ll + lmbda * penalty)) - _ = prob.solve() - except SolverError: - # If the magnitude of the objective function is too large, an error is - # thrown; Rescale the objective function - prob = cp.Problem(cp.Minimize((-ll + lmbda * penalty) / 1e5)) - _ = prob.solve() - params = b.value - - return params - - @staticmethod - def calc_adjustment(params, sub_data): - """Apply the weekday adjustment to a specific time series. - - Extracts the weekday fixed effects from the parameters and uses these to - adjust the time series. - - Since - - log(numerator_t / denominator_t) = alpha_{wd(t)} + phi_t, - - we have that - - numerator_t / denominator_t = exp(alpha_{wd(t)}) exp(phi_t) - - and can divide by exp(alpha_{wd(t)}) to get a weekday-corrected ratio. In - this case, we only divide the numerator, leaving the denominator unchanged - -- this has the same effect. - - """ - tmp = sub_data.reset_index() - - wd_correction = np.zeros((len(tmp["num"]))) - for wd in range(7): - mask = tmp[Config.DATE_COL].dt.dayofweek == wd - wd_correction[mask] = tmp.loc[mask, "num"] / ( - np.exp(params[wd]) if wd < 6 else np.exp(-np.sum(params[:6])) - ) - tmp.loc[:, "num"] = wd_correction - - return tmp.set_index(Config.DATE_COL) diff --git a/claims_hosp/setup.py b/claims_hosp/setup.py index 6c88e4383..940e1d165 100644 --- a/claims_hosp/setup.py +++ b/claims_hosp/setup.py @@ -4,7 +4,6 @@ required = [ "numpy", "pandas", - "cvxpy", "pydocstyle", "pytest", "pytest-cov", diff --git a/claims_hosp/tests/test_update_indicator.py b/claims_hosp/tests/test_update_indicator.py index 27bf1f263..23c901a49 100644 --- a/claims_hosp/tests/test_update_indicator.py +++ b/claims_hosp/tests/test_update_indicator.py @@ -7,6 +7,7 @@ # third party import numpy as np import pandas as pd +import logging import pytest # first party @@ -25,6 +26,7 @@ DATA_FILEPATH = PARAMS["indicator"]["input_file"] DROP_DATE = pd.to_datetime(PARAMS["indicator"]["drop_date"]) OUTPATH = "test_data/" +TEST_LOGGER = logging.getLogger() class TestClaimsHospIndicatorUpdater: @@ -93,7 +95,8 @@ def test_update_indicator(self): updater.update_indicator( DATA_FILEPATH, - td.name + td.name, + TEST_LOGGER ) assert len(os.listdir(td.name)) == len( diff --git a/doctor_visits/delphi_doctor_visits/__init__.py b/doctor_visits/delphi_doctor_visits/__init__.py index 0aceb317a..bbb519448 100644 --- a/doctor_visits/delphi_doctor_visits/__init__.py +++ b/doctor_visits/delphi_doctor_visits/__init__.py @@ -15,6 +15,5 @@ from . import sensor from . import smooth from . import update_sensor -from . import weekday __version__ = "0.1.0" diff --git a/doctor_visits/delphi_doctor_visits/update_sensor.py b/doctor_visits/delphi_doctor_visits/update_sensor.py index 068d2a058..1bb068059 100644 --- a/doctor_visits/delphi_doctor_visits/update_sensor.py +++ b/doctor_visits/delphi_doctor_visits/update_sensor.py @@ -17,10 +17,10 @@ import pandas as pd # first party +from delphi_utils import Weekday from .config import Config from .geo_maps import GeoMaps from .sensor import DoctorVisitsSensor -from .weekday import Weekday def write_to_csv(output_df: pd.DataFrame, geo_level, se, out_name, logger, output_path="."): @@ -125,7 +125,14 @@ def update_sensor( (burn_in_dates >= startdate) & (burn_in_dates <= enddate))[0][:len(sensor_dates)] # handle if we need to adjust by weekday - params = Weekday.get_params(data, logger) if weekday else None + params = Weekday.get_params( + data, + "Denominator", + Config.CLI_COLS + Config.FLU1_COL, + Config.DATE_COL, + [1, 1e5, 1e10, 1e15], + logger, + ) if weekday else None if weekday and np.any(np.all(params == 0,axis=1)): # Weekday correction failed for at least one count type return None @@ -145,7 +152,10 @@ def update_sensor( for geo_id in unique_geo_ids: sub_data = data_groups.get_group(geo_id).copy() if weekday: - sub_data = Weekday.calc_adjustment(params, sub_data) + sub_data = Weekday.calc_adjustment(params, + sub_data, + Config.CLI_COLS + Config.FLU1_COL, + Config.DATE_COL) res = DoctorVisitsSensor.fit( sub_data, @@ -169,7 +179,10 @@ def update_sensor( for geo_id in unique_geo_ids: sub_data = data_groups.get_group(geo_id).copy() if weekday: - sub_data = Weekday.calc_adjustment(params, sub_data) + sub_data = Weekday.calc_adjustment(params, + sub_data, + Config.CLI_COLS + Config.FLU1_COL, + Config.DATE_COL) pool_results.append( pool.apply_async( diff --git a/doctor_visits/setup.py b/doctor_visits/setup.py index d7c0fe0a9..4a4b62bb6 100644 --- a/doctor_visits/setup.py +++ b/doctor_visits/setup.py @@ -5,7 +5,6 @@ "numpy", "pandas", "sklearn", - "cvxpy", "pytest", "pytest-cov", "pylint==2.8.3",