diff --git a/_delphi_utils_python/delphi_utils/smooth.py b/_delphi_utils_python/delphi_utils/smooth.py index 9949b1262..b2233f056 100644 --- a/_delphi_utils_python/delphi_utils/smooth.py +++ b/_delphi_utils_python/delphi_utils/smooth.py @@ -138,15 +138,19 @@ def __init__( raise ValueError("Invalid impute_method given.") if self.boundary_method not in valid_boundary_methods: raise ValueError("Invalid boundary_method given.") + if self.window_length <= 1: + raise ValueError("Window length is too short.") if smoother_name == "savgol": # The polynomial fitting is done on a past window of size window_length # including the current day value. - self.coeffs = self.savgol_coeffs(-self.window_length + 1, 0) + self.coeffs = self.savgol_coeffs( + -self.window_length + 1, 0, self.poly_fit_degree + ) else: self.coeffs = None - def smooth(self, signal: Union[np.ndarray, pd.Series]) -> Union[np.ndarray, pd.Series]: + def smooth(self, signal: Union[np.ndarray, pd.Series], impute_order=2) -> Union[np.ndarray, pd.Series]: """Apply a smoother to a signal. The major workhorse smoothing function. Imputes the nans and then applies @@ -156,6 +160,9 @@ def smooth(self, signal: Union[np.ndarray, pd.Series]) -> Union[np.ndarray, pd.S ---------- signal: np.ndarray or pd.Series A 1D signal to be smoothed. + impute_order: int + The polynomial order of the fit used for imputation. By default, this is set to + 2. Returns ---------- @@ -163,24 +170,44 @@ def smooth(self, signal: Union[np.ndarray, pd.Series]) -> Union[np.ndarray, pd.S A smoothed 1D signal. Returns an array of the same type and length as the input. """ + # If all nans, pass through + if np.all(np.isnan(signal)): + return signal + is_pandas_series = isinstance(signal, pd.Series) + pandas_index = signal.index if is_pandas_series else None signal = signal.to_numpy() if is_pandas_series else signal - signal = self.impute(signal) + # Find where the first non-nan value is located and truncate the initial nans + ix = np.where(~np.isnan(signal))[0][0] + signal = signal[ix:] - if self.smoother_name == "savgol": - signal_smoothed = self.savgol_smoother(signal) - elif self.smoother_name == "left_gauss_linear": - signal_smoothed = self.left_gauss_linear_smoother(signal) - elif self.smoother_name == "moving_average": - signal_smoothed = self.moving_average_smoother(signal) - else: + # Don't smooth in certain edge cases + if len(signal) < self.poly_fit_degree or len(signal) == 1: signal_smoothed = signal.copy() - - signal_smoothed = signal_smoothed if not is_pandas_series else pd.Series(signal_smoothed) + else: + # Impute + signal = self.impute(signal, impute_order=impute_order) + + # Smooth + if self.smoother_name == "savgol": + signal_smoothed = self.savgol_smoother(signal) + elif self.smoother_name == "left_gauss_linear": + signal_smoothed = self.left_gauss_linear_smoother(signal) + elif self.smoother_name == "moving_average": + signal_smoothed = self.moving_average_smoother(signal) + elif self.smoother_name == "identity": + signal_smoothed = signal + + # Append the nans back, since we want to preserve length + signal_smoothed = np.hstack([np.nan*np.ones(ix), signal_smoothed]) + # Convert back to pandas if necessary + if is_pandas_series: + signal_smoothed = pd.Series(signal_smoothed) + signal_smoothed.index = pandas_index return signal_smoothed - def impute(self, signal): + def impute(self, signal, impute_order=2): """Impute the nan values in the signal. See the class docstring for an explanation of the impute methods. @@ -189,6 +216,8 @@ def impute(self, signal): ---------- signal: np.ndarray 1D signal to be imputed. + impute_order: int + The polynomial order of the fit used for imputation. Returns ------- @@ -200,7 +229,7 @@ def impute(self, signal): # To preserve input-output array lengths, this util will not drop NaNs for you. if np.isnan(signal[0]): raise ValueError("The signal should not begin with a nan value.") - imputed_signal = self.savgol_impute(signal) + imputed_signal = self.savgol_impute(signal, impute_order) elif self.impute_method == "zeros": imputed_signal = np.nan_to_num(signal) elif self.impute_method is None: @@ -279,33 +308,39 @@ def left_gauss_linear_smoother(self, signal): signal_smoothed[signal_smoothed <= self.minval] = self.minval return signal_smoothed - def savgol_predict(self, signal): + def savgol_predict(self, signal, poly_fit_degree, nr): """Predict a single value using the savgol method. - + Fits a polynomial through the values given by the signal and returns the value - of the polynomial at the right-most signal-value. More precisely, fits a polynomial - f(t) of degree poly_fit_degree through the points signal[-n], signal[-n+1] ..., signal[-1], - and returns the evaluation of the polynomial at the location of signal[0]. + of the polynomial at the right-most signal-value. More precisely, for a signal of length + n, fits a poly_fit_degree polynomial through the points signal[-n+1+nr], signal[-n+2+nr], + ..., signal[nr], and returns the evaluation of the polynomial at signal[0]. Hence, if + nr=0, then the last value of the signal is smoothed, and if nr=-1, then the value after + the last signal value is anticipated. Parameters ---------- signal: np.ndarray A 1D signal to smooth. + poly_fit_degree: int + The degree of the polynomial fit. + nr: int + An integer that determines the position of the predicted value relative to the signal. Returns ---------- predicted_value: float The anticipated value that comes after the end of the signal based on a polynomial fit. """ - # Add one - coeffs = self.savgol_coeffs(-len(signal) + 1, 0) + coeffs = self.savgol_coeffs(-len(signal) + 1 + nr, nr, poly_fit_degree) predicted_value = signal @ coeffs return predicted_value - def savgol_coeffs(self, nl, nr): + def savgol_coeffs(self, nl, nr, poly_fit_degree): """Solve for the Savitzky-Golay coefficients. - The coefficients c_i give a filter so that + Solves for the Savitzky-Golay coefficients. The coefficients c_i + give a filter so that y = sum_{i=-{n_l}}^{n_r} c_i x_i is the value at 0 (thus the constant term) of the polynomial fit through the points {x_i}. The coefficients are c_i are calculated as @@ -322,23 +357,20 @@ def savgol_coeffs(self, nl, nr): The right window bound for the polynomial fit, inclusive. poly_fit_degree: int The degree of the polynomial to be fit. - gaussian_bandwidth: float or None - If float, performs regression with Gaussian weights whose variance is - the gaussian_bandwidth. If None, performs unweighted regression. Returns ---------- coeffs: np.ndarray - A vector of coefficients of length nl that determines the savgol + A vector of coefficients of length nr - nl + 1 that determines the savgol convolution filter. """ if nl >= nr: raise ValueError("The left window bound should be less than the right.") if nr > 0: - raise warnings.warn("The filter is no longer causal.") + warnings.warn("The filter is no longer causal.") - A = np.vstack( # pylint: disable=invalid-name - [np.arange(nl, nr + 1) ** j for j in range(self.poly_fit_degree + 1)] + A = np.vstack( + [np.arange(nl, nr + 1) ** j for j in range(poly_fit_degree + 1)] ).T if self.gaussian_bandwidth is None: @@ -382,15 +414,17 @@ def savgol_smoother(self, signal): # - identity keeps the original signal (doesn't smooth) # - nan writes nans if self.boundary_method == "shortened_window": # pylint: disable=no-else-return - for ix in range(len(self.coeffs)): + for ix in range(min(len(self.coeffs), len(signal))): if ix == 0: signal_smoothed[ix] = signal[ix] else: # At the very edge, the design matrix is often singular, in which case # we just fall back to the raw signal try: - signal_smoothed[ix] = self.savgol_predict(signal[:ix+1]) - except np.linalg.LinAlgError: + signal_smoothed[ix] = self.savgol_predict( + signal[: ix + 1], self.poly_fit_degree, 0 + ) + except np.linalg.LinAlgError: # for small ix, the design matrix is singular signal_smoothed[ix] = signal[ix] return signal_smoothed elif self.boundary_method == "identity": @@ -400,48 +434,51 @@ def savgol_smoother(self, signal): elif self.boundary_method == "nan": return signal_smoothed - def savgol_impute(self, signal): + def savgol_impute(self, signal, impute_order): """Impute the nan values in signal using savgol. - This method fills the nan values in the signal with an M-degree polynomial fit + This method fills the nan values in the signal with polynomial interpolation on a rolling window of the immediate past up to window_length data points. - In the case of a single data point in the past, the single data point is - continued. In the case of no data points in the past (i.e. the signal starts - with nan), an error is raised. + A number of boundary cases are handled involving nan filling close to the boundary. Note that in the case of many adjacent nans, the method will use previously - imputed values to do the fitting for later values. E.g. for - >>> x = np.array([1.0, 2.0, np.nan, 1.0, np.nan]) - the last np.nan will be fit on np.array([1.0, 2.0, *, 1.0]), where * is the - result of imputing based on np.array([1.0, 2.0]) (depends on the savgol - settings). + imputed values to do the fitting for later values. Parameters ---------- signal: np.ndarray A 1D signal to be imputed. + impute_order: int + The polynomial order of the fit used for imputation. Returns ---------- signal_imputed: np.ndarray An imputed 1D signal. """ + if impute_order > self.window_length: + raise ValueError("Impute order must be smaller than window length.") + signal_imputed = np.copy(signal) - for ix in np.where(np.isnan(signal))[0]: + for ix in np.where(np.isnan(signal_imputed))[0]: # Boundary cases if ix < self.window_length: - # A nan following a single value should just be extended + # At the boundary, a single value should just be extended if ix == 1: - signal_imputed[ix] = signal_imputed[0] - # Otherwise, use savgol fitting + signal_imputed[ix] = signal_imputed[ix - 1] + # Otherwise, use savgol fitting on the largest window prior, + # reduce the polynomial degree if needed (can't fit if the + # imputation order is larger than the available data) else: - coeffs = self.savgol_coeffs(-ix, -1) - signal_imputed[ix] = signal_imputed[:ix] @ coeffs - # Use a polynomial fit on the past window length to impute + signal_imputed[ix] = self.savgol_predict( + signal_imputed[:ix], min(ix-1, impute_order), -1 + ) + # Away from the boundary, use savgol fitting on a fixed window else: - coeffs = self.savgol_coeffs(-self.window_length, -1) - signal_imputed[ix] = ( - signal_imputed[ix - self.window_length : ix] @ coeffs + signal_imputed[ix] = self.savgol_predict( + signal_imputed[ix - self.window_length : ix], + impute_order, + -1, ) return signal_imputed diff --git a/_delphi_utils_python/tests/test_smooth.py b/_delphi_utils_python/tests/test_smooth.py index 00423dfbb..56bdc4349 100644 --- a/_delphi_utils_python/tests/test_smooth.py +++ b/_delphi_utils_python/tests/test_smooth.py @@ -2,6 +2,7 @@ Tests for the smoothing utility. Authors: Dmitry Shemetov, Addison Hu, Maria Jahja """ +from numpy.lib.polynomial import poly import pytest import numpy as np @@ -10,11 +11,26 @@ class TestSmoothers: + def test_bad_inputs(self): + with pytest.raises(ValueError): + Smoother(smoother_name="hamburger") + with pytest.raises(ValueError): + Smoother(impute_method="hamburger") + with pytest.raises(ValueError): + Smoother(boundary_method="hamburger") + with pytest.raises(ValueError): + Smoother(window_length=1) + def test_identity_smoother(self): signal = np.arange(30) + np.random.rand(30) assert np.allclose(signal, Smoother(smoother_name="identity").smooth(signal)) def test_moving_average_smoother(self): + # Test non-integer window-length + with pytest.raises(ValueError): + signal = np.array([1, 1, 1]) + Smoother(smoother_name="window_average", window_length=5.5).smooth(signal) + # The raw and smoothed lengths should match signal = np.ones(30) smoother = Smoother(smoother_name="moving_average") @@ -109,10 +125,50 @@ def test_causal_savgol_smoother(self): smoother_name="savgol", window_length=window_length, poly_fit_degree=1, ) smoothed_signal2 = smoother.smooth(signal) - assert np.allclose(smoothed_signal1, smoothed_signal2) + # Test the all nans case + signal = np.nan * np.ones(10) + smoother = Smoother(window_length=9) + smoothed_signal = smoother.smooth(signal) + assert np.all(np.isnan(smoothed_signal)) + + # Test the case where the signal is length 1 + signal = np.ones(1) + smoother = Smoother() + smoothed_signal = smoother.smooth(signal) + assert np.allclose(smoothed_signal, signal) + + # Test the case where the signal length is less than polynomial_fit_degree + signal = np.ones(2) + smoother = Smoother(poly_fit_degree=3) + smoothed_signal = smoother.smooth(signal) + assert np.allclose(smoothed_signal, signal) + + # Test an edge fitting case + signal = np.array([np.nan, 1, np.nan]) + smoother = Smoother(poly_fit_degree=1, window_length=2) + smoothed_signal = smoother.smooth(signal) + assert np.allclose(smoothed_signal, np.array([np.nan, 1, 1]), equal_nan=True) + + # Test a range of cases where the signal size following a sequence of nans is returned + for i in range(10): + signal = np.hstack([[np.nan, np.nan, np.nan], np.ones(i)]) + smoother = Smoother(poly_fit_degree=0, window_length=5) + smoothed_signal = smoother.smooth(signal) + assert np.allclose(smoothed_signal, signal, equal_nan=True) + + # test window_length > len(signal) and boundary_method="identity" + signal = np.arange(20) + smoother = Smoother(boundary_method="identity", window_length=30) + smoothed_signal = smoother.smooth(signal) + assert np.allclose(signal, smoothed_signal) + def test_impute(self): + # test front nan error + with pytest.raises(ValueError): + Smoother().impute(signal=np.array([np.nan, 1, 1])) + # test the nan imputer signal = np.array([i if i % 3 else np.nan for i in range(1, 40)]) assert np.allclose(Smoother(impute_method=None).impute(signal), signal, equal_nan=True) @@ -128,7 +184,7 @@ def test_impute(self): signal = np.array([i if i % 3 else np.nan for i in range(1, 40)]) # test that the non-nan values are unchanged not_nans_ixs = np.bitwise_xor(np.isnan(signal, where=True), np.full(len(signal), True)) - smoothed_signal = Smoother().savgol_impute(signal) + smoothed_signal = Smoother().impute(signal) assert np.allclose(signal[not_nans_ixs], smoothed_signal[not_nans_ixs]) # test that the imputer is close to the true line assert np.allclose(range(1, 40), smoothed_signal, atol=0.5) @@ -137,14 +193,14 @@ def test_impute(self): signal = np.hstack([np.arange(10), [np.nan], np.arange(10)]) window_length = 10 smoother = Smoother( - smoother_name="savgol", window_length=window_length, poly_fit_degree=1 + window_length=window_length, poly_fit_degree=1 ) - imputed_signal = smoother.savgol_impute(signal) + imputed_signal = smoother.impute(signal) assert np.allclose(imputed_signal, np.hstack([np.arange(11), np.arange(10)])) smoother = Smoother( - smoother_name="savgol", window_length=window_length, poly_fit_degree=2 + window_length=window_length, poly_fit_degree=2 ) - imputed_signal = smoother.savgol_impute(signal) + imputed_signal = smoother.impute(signal) assert np.allclose(imputed_signal, np.hstack([np.arange(11), np.arange(10)])) # if there are nans on the boundary, should dynamically change window @@ -152,9 +208,9 @@ def test_impute(self): [np.arange(5), [np.nan], np.arange(20), [np.nan], np.arange(5)] ) smoother = Smoother( - smoother_name="savgol", window_length=window_length, poly_fit_degree=2 + window_length=window_length, poly_fit_degree=2 ) - imputed_signal = smoother.savgol_impute(signal) + imputed_signal = smoother.impute(signal) assert np.allclose( imputed_signal, np.hstack([np.arange(6), np.arange(21), np.arange(5)]), ) @@ -162,24 +218,33 @@ def test_impute(self): # if the array begins with np.nan, we should tell the user to peel it off before sending signal = np.hstack([[np.nan], np.arange(20), [np.nan], np.arange(5)]) smoother = Smoother( - smoother_name="savgol", window_length=window_length, poly_fit_degree=2 + window_length=window_length, poly_fit_degree=2 ) with pytest.raises(ValueError): - imputed_signal = smoother.savgol_impute(signal) - - # test window_length > len(signal) and boundary_method="identity" - signal = np.arange(20) - smoother = Smoother(smoother_name="savgol", boundary_method="identity", window_length=30) - smoothed_signal = smoother.smooth(signal) - assert np.allclose(signal, smoothed_signal) + imputed_signal = smoother.impute(signal) # test the boundary methods signal = np.arange(20) - smoother = Smoother(smoother_name="savgol", poly_fit_degree=0, + smoother = Smoother(poly_fit_degree=0, boundary_method="identity", window_length=10) - smoothed_signal = smoother.savgol_impute(signal) + smoothed_signal = smoother.impute(signal) assert np.allclose(smoothed_signal, signal) + # test that we don't hit a matrix inversion error when there are + # nans on less than window_length away from the boundary + signal = np.hstack([[1], np.nan*np.ones(12), np.arange(5)]) + smoother = Smoother(smoother_name="savgol", poly_fit_degree=2, + boundary_method="identity", window_length=10) + smoothed_signal = smoother.impute(signal) + assert np.allclose(smoothed_signal, np.hstack([[1], np.ones(12), np.arange(5)])) + + # test the impute_order argument + signal = np.hstack([[1, np.nan, np.nan, 2], np.arange(5)]) + smoother = Smoother() + smoothed_signal = smoother.impute(signal, impute_order=1) + assert np.allclose(smoothed_signal, np.hstack([[1, 1, 1, 2], np.arange(5)])) + + def test_pandas_series_input(self): # The savgol method should match the linear regression method on the first # window_length-many values of the signal, if the savgol_weighting is set to true, @@ -223,3 +288,11 @@ def test_pandas_series_input(self): assert np.allclose( signal[window_length - 1 :], smoothed_signal[window_length - 1 :] ) + + # Test that the index of the series gets preserved + signal = pd.Series(np.ones(30), index=np.arange(50, 80)) + smoother = Smoother(smoother_name="moving_average", window_length=10) + smoothed_signal = signal.transform(smoother.smooth) + ix1 = signal.index + ix2 = smoothed_signal.index + assert ix1.equals(ix2)