diff --git a/_delphi_utils_python/delphi_utils/smooth.py b/_delphi_utils_python/delphi_utils/smooth.py index 51349851b..704ed58e8 100644 --- a/_delphi_utils_python/delphi_utils/smooth.py +++ b/_delphi_utils_python/delphi_utils/smooth.py @@ -150,7 +150,9 @@ def __init__( else: self.coeffs = None - def smooth(self, signal: Union[np.ndarray, pd.Series], impute_order=2) -> Union[np.ndarray, pd.Series]: + def smooth( + self, signal: Union[np.ndarray, pd.Series], impute_order=2 + ) -> Union[np.ndarray, pd.Series]: """Apply a smoother to a signal. The major workhorse smoothing function. Imputes the nans and then applies @@ -200,7 +202,7 @@ def smooth(self, signal: Union[np.ndarray, pd.Series], impute_order=2) -> Union[ signal_smoothed = signal # Append the nans back, since we want to preserve length - signal_smoothed = np.hstack([np.nan*np.ones(ix), signal_smoothed]) + signal_smoothed = np.hstack([np.nan * np.ones(ix), signal_smoothed]) # Convert back to pandas if necessary if is_pandas_series: signal_smoothed = pd.Series(signal_smoothed) @@ -295,7 +297,9 @@ def left_gauss_linear_smoother(self, signal): weights = np.exp( -((np.arange(idx + 1) - idx) ** 2) / self.gaussian_bandwidth ) - AwA = np.dot(A[: (idx + 1), :].T * weights, A[: (idx + 1), :]) # pylint: disable=invalid-name + AwA = np.dot( # pylint: disable=invalid-name + A[: (idx + 1), :].T * weights, A[: (idx + 1), :] + ) Awy = np.dot( # pylint: disable=invalid-name A[: (idx + 1), :].T * weights, signal[: (idx + 1)].reshape(-1, 1) ) @@ -303,13 +307,18 @@ def left_gauss_linear_smoother(self, signal): beta = np.linalg.solve(AwA, Awy) signal_smoothed[idx] = np.dot(A[: (idx + 1), :], beta)[-1] except np.linalg.LinAlgError: - signal_smoothed[idx] = signal[idx] if self.impute else np.nan # pylint: disable=using-constant-test + signal_smoothed[idx] = ( + signal[idx] # pylint: disable=using-constant-test + if self.impute + else np.nan + ) if self.minval is not None: signal_smoothed[signal_smoothed <= self.minval] = self.minval return signal_smoothed def savgol_predict(self, signal, poly_fit_degree, nr): """Predict a single value using the savgol method. + Fits a polynomial through the values given by the signal and returns the value of the polynomial at the right-most signal-value. More precisely, for a signal of length n, fits a poly_fit_degree polynomial through the points signal[-n+1+nr], signal[-n+2+nr], @@ -368,7 +377,7 @@ def savgol_coeffs(self, nl, nr, poly_fit_degree): if nr > 0: warnings.warn("The filter is no longer causal.") - A = np.vstack( + A = np.vstack( # pylint: disable=invalid-name [np.arange(nl, nr + 1) ** j for j in range(poly_fit_degree + 1)] ).T @@ -471,7 +480,7 @@ def savgol_impute(self, signal, impute_order): # imputation order is larger than the available data) else: signal_imputed[ix] = self.savgol_predict( - signal_imputed[:ix], min(ix-1, impute_order), -1 + signal_imputed[:ix], min(ix - 1, impute_order), -1 ) # Away from the boundary, use savgol fitting on a fixed window else: diff --git a/jhu/delphi_jhu/run.py b/jhu/delphi_jhu/run.py index 8f8067da3..707782abb 100644 --- a/jhu/delphi_jhu/run.py +++ b/jhu/delphi_jhu/run.py @@ -6,26 +6,21 @@ """ from datetime import datetime from itertools import product -from functools import partial import numpy as np from delphi_utils import ( read_params, create_export_csv, S3ArchiveDiffer, + Smoother, + GeoMapper, ) -from delphi_utils import GeoMapper from .geo import geo_map from .pull import pull_jhu_data -from .smooth import ( - identity, - kday_moving_average, -) # global constants -seven_day_moving_average = partial(kday_moving_average, k=7) METRICS = [ "deaths", "confirmed", @@ -41,10 +36,10 @@ "seven_day_average", ] SENSOR_NAME_MAP = { - "new_counts": ("incidence_num", False), - "cumulative_counts": ("cumulative_num", False), - "incidence": ("incidence_prop", False), - "cumulative_prop": ("cumulative_prop", False), + "new_counts": ("incidence_num", False), + "cumulative_counts": ("cumulative_num", False), + "incidence": ("incidence_prop", False), + "cumulative_prop": ("cumulative_prop", False), } # Temporarily added for wip_ signals # WIP_SENSOR_NAME_MAP = { @@ -54,8 +49,8 @@ # "cumulative_prop": ("cumul_prop", False), # } SMOOTHERS_MAP = { - "unsmoothed": (identity, ''), - "seven_day_average": (seven_day_moving_average, '7dav_'), + "unsmoothed": (Smoother("identity").smooth, ""), + "seven_day_average": (Smoother("moving_average", window_length=7).smooth, "7dav_"), } GEO_RESOLUTIONS = [ "county", @@ -75,9 +70,12 @@ def run_module(): if len(params["bucket_name"]) > 0: arch_diff = S3ArchiveDiffer( - cache_dir, export_dir, - params["bucket_name"], "jhu", - params["aws_credentials"]) + cache_dir, + export_dir, + params["bucket_name"], + "jhu", + params["aws_credentials"], + ) arch_diff.update_cache() else: arch_diff = None @@ -85,7 +83,8 @@ def run_module(): gmpr = GeoMapper() dfs = {metric: pull_jhu_data(base_url, metric, gmpr) for metric in METRICS} for metric, geo_res, sensor, smoother in product( - METRICS, GEO_RESOLUTIONS, SENSORS, SMOOTHERS): + METRICS, GEO_RESOLUTIONS, SENSORS, SMOOTHERS + ): print(metric, geo_res, sensor, smoother) df = dfs[metric] # Aggregate to appropriate geographic resolution @@ -121,7 +120,9 @@ def run_module(): _, fails = arch_diff.archive_exports(to_archive) # Filter existing exports to exclude those that failed to archive - succ_common_diffs = {f: diff for f, diff in common_diffs.items() if f not in fails} + succ_common_diffs = { + f: diff for f, diff in common_diffs.items() if f not in fails + } arch_diff.filter_exports(succ_common_diffs) # Report failures: someone should probably look at them diff --git a/jhu/delphi_jhu/smooth.py b/jhu/delphi_jhu/smooth.py deleted file mode 100644 index 5c9e18a3f..000000000 --- a/jhu/delphi_jhu/smooth.py +++ /dev/null @@ -1,39 +0,0 @@ -# -*- coding: utf-8 -*- -'''Functions to smooth signals.''' - -import numpy as np - -def identity(x): - '''Trivial "smoother" that does no smoothing. - - Parameters - ---------- - x: np.ndarray - Input array - - Returns - ------- - np.ndarray: - Same as x - ''' - return x - -def kday_moving_average(x, k): - '''Compute k-day moving average on x. - - Parameters - ---------- - x: np.ndarray - Input array - - Returns - ------- - np.ndarray: - k-day moving average. The first k-1 entries are np.nan. - ''' - if not isinstance(k, int): - raise ValueError('k must be int.') - # temp = np.append(np.zeros(k - 1), x) - temp = np.append(np.nan*np.ones(k-1), x) - y = np.convolve(temp, np.ones(k, dtype=int), 'valid') / k - return y diff --git a/jhu/tests/test_run.py b/jhu/tests/test_run.py index 60d3e13b1..aed574200 100644 --- a/jhu/tests/test_run.py +++ b/jhu/tests/test_run.py @@ -4,7 +4,6 @@ from os.path import join import pandas as pd -from delphi_jhu.run import run_module class TestRun: diff --git a/jhu/tests/test_smooth.py b/jhu/tests/test_smooth.py index 303f7dab0..2b057a456 100644 --- a/jhu/tests/test_smooth.py +++ b/jhu/tests/test_smooth.py @@ -1,34 +1,31 @@ import pytest -from os import listdir from os.path import join import numpy as np import pandas as pd -from delphi_jhu.run import run_module + class TestSmooth: def test_output_files_smoothed(self, run_as_module): - dates = [str(x) for x in range(20200303, 20200310)] smoothed = pd.read_csv( - join("./receiving", - f"{dates[-1]}_state_confirmed_7dav_cumulative_num.csv") + join("./receiving", f"{dates[-1]}_state_confirmed_7dav_cumulative_num.csv") ) # Build a dataframe out of the individual day files - raw = pd.concat([ - pd.read_csv( - join("./receiving", - f"{date}_state_confirmed_cumulative_num.csv") - ) for date in dates - ]) - # Compute the mean across the time values; order doesn't matter - # this corresponds to the smoothed value on the last day + raw = pd.concat( + [ + pd.read_csv( + join("./receiving", f"{date}_state_confirmed_cumulative_num.csv") + ) + for date in dates + ] + ) + # Compute the mean across the time values; order doesn't matter + # this corresponds to the smoothed value on the last day # 2020-03-10 - raw = raw.groupby('geo_id')['val'].mean() - - df = pd.merge(smoothed, raw, on='geo_id', suffixes=('_smoothed', '_raw')) - assert np.allclose(df['val_smoothed'].values, df['val_raw'].values) - + raw = raw.groupby("geo_id")["val"].mean() + df = pd.merge(smoothed, raw, on="geo_id", suffixes=("_smoothed", "_raw")) + assert np.allclose(df["val_smoothed"].values, df["val_raw"].values)