diff --git a/_delphi_utils_python/delphi_utils/smooth.py b/_delphi_utils_python/delphi_utils/smooth.py index 44c7ad032..98d7f9723 100644 --- a/_delphi_utils_python/delphi_utils/smooth.py +++ b/_delphi_utils_python/delphi_utils/smooth.py @@ -1,4 +1,5 @@ -""" +"""Smoother utility. + This file contains the smoothing utility functions. We have a number of possible smoothers to choose from: windowed average, local weighted regression, and a causal Savitzky-Golay filter. @@ -10,6 +11,7 @@ docstrings for details. """ +from typing import Union import warnings import numpy as np @@ -17,7 +19,8 @@ class Smoother: - """ + """Smoother class. + This is the smoothing utility class. This class holds the parameter settings for its smoother methods and provides reasonable defaults. Basic usage can be found in the examples below. @@ -36,9 +39,13 @@ class Smoother: Descriptions of the smoothers are available in the doc strings. Full mathematical details are in: https://github.com/cmu-delphi/covidcast-modeling/ in the folder 'indicator_smoother'. + poly_fit_degree: int + A parameter for the 'savgol' smoother which sets the degree of the polynomial fit. window_length: int - The length of the averaging window for 'savgol' and 'moving_average'. - This value is in the units of the data, which tends to be days. + The length of the fitting window for 'savgol' and the averaging window 'moving_average'. + This value is in the units provided by the data, which are likely to be days for Delphi. + Note that if window_length is smaller than the length of the signal, then only the + imputation method is run on the signal. gaussian_bandwidth: float or None If float, all regression is done with Gaussian weights whose variance is half the gaussian_bandwidth. If None, performs unweighted regression. (Applies @@ -60,14 +67,19 @@ class Smoother: The smallest value to allow in a signal. If None, there is no smallest value. Currently only implemented for 'left_gauss_linear'. This should probably not be in the scope of the smoothing utility. - poly_fit_degree: int - A parameter for the 'savgol' smoother which sets the degree of the polynomial fit. + boundary_method: {'shortened_window', 'identity', 'nan'} + Determines how the 'savgol' method handles smoothing at the (left) boundary, where the past + data length is shorter than the window_length parameter. If 'shortened_window', it uses the + maximum window available; at the very edge (generally up to poly_fit_degree) it keeps the + same value as the raw signal. If 'identity', it just keeps the raw signal. If 'nan', it + writes nans. For the other smoothing methods, 'moving_average' writes nans and + 'left_gauss_linear' uses a shortened window. Methods ---------- smooth: np.ndarray or pd.Series - Takes a 1D signal and returns a smoothed version. The input and the output have the same length - and type. + Takes a 1D signal and returns a smoothed version. The input and the output have the same + length and type. Example Usage ------------- @@ -108,6 +120,7 @@ def __init__( minval=None, boundary_method="shortened_window", ): + """See class docstring.""" self.smoother_name = smoother_name self.poly_fit_degree = poly_fit_degree self.window_length = window_length @@ -118,35 +131,38 @@ def __init__( valid_smoothers = {"savgol", "left_gauss_linear", "moving_average", "identity"} valid_impute_methods = {"savgol", "zeros", None} + valid_boundary_methods = {"shortened_window", "identity", "nan"} if self.smoother_name not in valid_smoothers: - raise ValueError("Invalid smoother name given.") + raise ValueError("Invalid smoother_name given.") if self.impute_method not in valid_impute_methods: - raise ValueError("Invalid impute method given.") + raise ValueError("Invalid impute_method given.") + if self.boundary_method not in valid_boundary_methods: + raise ValueError("Invalid boundary_method given.") if smoother_name == "savgol": + # The polynomial fitting is done on a past window of size window_length + # including the current day value. self.coeffs = self.savgol_coeffs(-self.window_length + 1, 0) else: self.coeffs = None - def smooth(self, signal): - """ - The major workhorse smoothing function. Can use one of three smoothing - methods, as specified by the class variable smoother_name. + def smooth(self, signal: Union[np.ndarray, pd.Series]) -> Union[np.ndarray, pd.Series]: + """Apply a smoother to a signal. + + The major workhorse smoothing function. Imputes the nans and then applies + a smoother to the signal. Parameters ---------- signal: np.ndarray or pd.Series A 1D signal to be smoothed. + Returns + ---------- signal_smoothed: np.ndarray or pd.Series A smoothed 1D signal. Returns an array of the same type and length as the input. """ - if len(signal) < self.window_length: - raise ValueError( - "The window_length must be smaller than the length of the signal." - ) - is_pandas_series = isinstance(signal, pd.Series) signal = signal.to_numpy() if is_pandas_series else signal @@ -158,14 +174,16 @@ def smooth(self, signal): signal_smoothed = self.left_gauss_linear_smoother(signal) elif self.smoother_name == "moving_average": signal_smoothed = self.moving_average_smoother(signal) - elif self.smoother_name == "identity": - signal_smoothed = signal + else: + signal_smoothed = signal.copy() - return signal_smoothed if not is_pandas_series else pd.Series(signal_smoothed) + signal_smoothed = signal_smoothed if not is_pandas_series else pd.Series(signal_smoothed) + return signal_smoothed def impute(self, signal): - """ - Imputes the nan values in the signal. + """Impute the nan values in the signal. + + See the class docstring for an explanation of the impute methods. Parameters ---------- @@ -191,8 +209,7 @@ def impute(self, signal): return imputed_signal def moving_average_smoother(self, signal): - """ - Computes a moving average on the signal. + """Compute a moving average on the signal. Parameters ---------- @@ -219,11 +236,10 @@ def moving_average_smoother(self, signal): return signal_smoothed def left_gauss_linear_smoother(self, signal): - """ - DEPRECATED: This method is available to help sanity check the 'savgol' method. - It is a little slow, so use 'savgol' with poly_fit_degree=1 instead. + """Smooth the y-values using a local linear regression with Gaussian weights. - Smooth the y-values using a local linear regression with Gaussian weights. + DEPRECATED: This method is available to help sanity check the 'savgol' method. + Use 'savgol' with poly_fit_degree=1 and the appropriate gaussian_bandwidth instead. At each time t, we use the data from times 1, ..., t-dt, weighted using the Gaussian kernel, to produce the estimate at time t. @@ -263,7 +279,8 @@ def left_gauss_linear_smoother(self, signal): return signal_smoothed def savgol_predict(self, signal): - """ + """Predict a single value using the savgol method. + Fits a polynomial through the values given by the signal and returns the value of the polynomial at the right-most signal-value. More precisely, fits a polynomial f(t) of degree poly_fit_degree through the points signal[-n], signal[-n+1] ..., signal[-1], @@ -279,14 +296,15 @@ def savgol_predict(self, signal): predicted_value: float The anticipated value that comes after the end of the signal based on a polynomial fit. """ + # Add one coeffs = self.savgol_coeffs(-len(signal) + 1, 0) predicted_value = signal @ coeffs return predicted_value def savgol_coeffs(self, nl, nr): - """ - Solves for the Savitzky-Golay coefficients. The coefficients c_i - give a filter so that + """Solve for the Savitzky-Golay coefficients. + + The coefficients c_i give a filter so that y = sum_{i=-{n_l}}^{n_r} c_i x_i is the value at 0 (thus the constant term) of the polynomial fit through the points {x_i}. The coefficients are c_i are calculated as @@ -298,9 +316,9 @@ def savgol_coeffs(self, nl, nr): Parameters ---------- nl: int - The left window bound for the polynomial fit. + The left window bound for the polynomial fit, inclusive. nr: int - The right window bound for the polynomial fit. + The right window bound for the polynomial fit, inclusive. poly_fit_degree: int The degree of the polynomial to be fit. gaussian_bandwidth: float or None @@ -336,14 +354,10 @@ def savgol_coeffs(self, nl, nr): return coeffs def savgol_smoother(self, signal): - """ - Returns a specific type of convolution of the 1D signal with the 1D signal - coeffs, respecting boundary effects. That is, the output y is - signal_smoothed_i = signal_i - signal_smoothed_i = sum_{j=0}^n coeffs_j signal_{i+j}, if i >= len(coeffs) - 1 - In words, entries close to the left boundary are not smoothed, the window does - not proceed over the right boundary, and the rest of the values are regular - convolution. + """Smooth signal with the savgol smoother. + + Returns a convolution of the 1D signal with the Savitzky-Golay coefficients, respecting + boundary effects. For an explanation of boundary effects methods, see the class docstring. Parameters ---------- @@ -355,15 +369,14 @@ def savgol_smoother(self, signal): signal_smoothed: np.ndarray A smoothed 1D signal of same length as signal. """ - - # reverse because np.convolve reverses the second argument + # Reverse because np.convolve reverses the second argument temp_reversed_coeffs = np.array(list(reversed(self.coeffs))) - # does the majority of the smoothing, with the calculated coefficients + # Smooth the part of the signal away from the boundary first signal_padded = np.append(np.nan * np.ones(len(self.coeffs) - 1), signal) signal_smoothed = np.convolve(signal_padded, temp_reversed_coeffs, mode="valid") - # this section handles the smoothing behavior at the (left) boundary: + # This section handles the smoothing behavior at the (left) boundary: # - shortened_window (default) applies savgol with a smaller window to do the fit # - identity keeps the original signal (doesn't smooth) # - nan writes nans @@ -372,22 +385,23 @@ def savgol_smoother(self, signal): if ix == 0: signal_smoothed[ix] = signal[ix] else: + # At the very edge, the design matrix is often singular, in which case + # we just fall back to the raw signal try: - signal_smoothed[ix] = self.savgol_predict(signal[: (ix + 1)]) - except np.linalg.LinAlgError: # for small ix, the design matrix is singular + signal_smoothed[ix] = self.savgol_predict(signal[:ix+1]) + except np.linalg.LinAlgError: signal_smoothed[ix] = signal[ix] return signal_smoothed elif self.boundary_method == "identity": - for ix in range(len(self.coeffs)): + for ix in range(min(len(self.coeffs), len(signal))): signal_smoothed[ix] = signal[ix] return signal_smoothed elif self.boundary_method == "nan": return signal_smoothed - else: - raise ValueError("Unknown boundary method.") def savgol_impute(self, signal): - """ + """Impute the nan values in signal using savgol. + This method fills the nan values in the signal with an M-degree polynomial fit on a rolling window of the immediate past up to window_length data points. diff --git a/_delphi_utils_python/tests/test_smooth.py b/_delphi_utils_python/tests/test_smooth.py index 8992f0b2e..00423dfbb 100644 --- a/_delphi_utils_python/tests/test_smooth.py +++ b/_delphi_utils_python/tests/test_smooth.py @@ -167,11 +167,11 @@ def test_impute(self): with pytest.raises(ValueError): imputed_signal = smoother.savgol_impute(signal) - # test window_length > len(signal) + # test window_length > len(signal) and boundary_method="identity" signal = np.arange(20) smoother = Smoother(smoother_name="savgol", boundary_method="identity", window_length=30) - with pytest.raises(ValueError): - smoothed_signal = smoother.smooth(signal) + smoothed_signal = smoother.smooth(signal) + assert np.allclose(signal, smoothed_signal) # test the boundary methods signal = np.arange(20)