diff --git a/_delphi_utils_python/delphi_utils/smooth.py b/_delphi_utils_python/delphi_utils/smooth.py index 9949b1262..005123527 100644 --- a/_delphi_utils_python/delphi_utils/smooth.py +++ b/_delphi_utils_python/delphi_utils/smooth.py @@ -142,7 +142,9 @@ def __init__( if smoother_name == "savgol": # The polynomial fitting is done on a past window of size window_length # including the current day value. - self.coeffs = self.savgol_coeffs(-self.window_length + 1, 0) + self.coeffs = self.savgol_coeffs( + -self.window_length + 1, 0, self.poly_fit_degree + ) else: self.coeffs = None @@ -279,30 +281,35 @@ def left_gauss_linear_smoother(self, signal): signal_smoothed[signal_smoothed <= self.minval] = self.minval return signal_smoothed - def savgol_predict(self, signal): + def savgol_predict(self, signal, poly_fit_degree, nr): """Predict a single value using the savgol method. Fits a polynomial through the values given by the signal and returns the value - of the polynomial at the right-most signal-value. More precisely, fits a polynomial - f(t) of degree poly_fit_degree through the points signal[-n], signal[-n+1] ..., signal[-1], - and returns the evaluation of the polynomial at the location of signal[0]. + of the polynomial at the right-most signal-value. More precisely, for a signal of length + n, fits a poly_fit_degree polynomial through the points signal[-n+1+nr], signal[-n+2+nr], + ..., signal[nr], and returns the evaluation of the polynomial at signal[0]. Hence, if + nr=0, then the last value of the signal is smoothed, and if nr=-1, then the value after + the last signal value is anticipated. Parameters ---------- signal: np.ndarray A 1D signal to smooth. + poly_fit_degree: int + The degree of the polynomial fit. + nr: int + An integer that determines the position of the predicted value relative to the signal. Returns ---------- predicted_value: float The anticipated value that comes after the end of the signal based on a polynomial fit. """ - # Add one - coeffs = self.savgol_coeffs(-len(signal) + 1, 0) + coeffs = self.savgol_coeffs(-len(signal) + 1 + nr, nr, poly_fit_degree) predicted_value = signal @ coeffs return predicted_value - def savgol_coeffs(self, nl, nr): + def savgol_coeffs(self, nl, nr, poly_fit_degree): """Solve for the Savitzky-Golay coefficients. The coefficients c_i give a filter so that @@ -322,23 +329,20 @@ def savgol_coeffs(self, nl, nr): The right window bound for the polynomial fit, inclusive. poly_fit_degree: int The degree of the polynomial to be fit. - gaussian_bandwidth: float or None - If float, performs regression with Gaussian weights whose variance is - the gaussian_bandwidth. If None, performs unweighted regression. Returns ---------- coeffs: np.ndarray - A vector of coefficients of length nl that determines the savgol + A vector of coefficients of length nr - nl + 1 that determines the savgol convolution filter. """ if nl >= nr: raise ValueError("The left window bound should be less than the right.") if nr > 0: - raise warnings.warn("The filter is no longer causal.") + warnings.warn("The filter is no longer causal.") - A = np.vstack( # pylint: disable=invalid-name - [np.arange(nl, nr + 1) ** j for j in range(self.poly_fit_degree + 1)] + A = np.vstack( + [np.arange(nl, nr + 1) ** j for j in range(poly_fit_degree + 1)] ).T if self.gaussian_bandwidth is None: @@ -389,8 +393,10 @@ def savgol_smoother(self, signal): # At the very edge, the design matrix is often singular, in which case # we just fall back to the raw signal try: - signal_smoothed[ix] = self.savgol_predict(signal[:ix+1]) - except np.linalg.LinAlgError: + signal_smoothed[ix] = self.savgol_predict( + signal[: ix + 1], self.poly_fit_degree, 0 + ) + except np.linalg.LinAlgError: # for small ix, the design matrix is singular signal_smoothed[ix] = signal[ix] return signal_smoothed elif self.boundary_method == "identity": @@ -403,19 +409,13 @@ def savgol_smoother(self, signal): def savgol_impute(self, signal): """Impute the nan values in signal using savgol. - This method fills the nan values in the signal with an M-degree polynomial fit + This method fills the nan values in the signal with a quadratic polynomial fit on a rolling window of the immediate past up to window_length data points. - In the case of a single data point in the past, the single data point is - continued. In the case of no data points in the past (i.e. the signal starts - with nan), an error is raised. + A number of boundary cases are handled involving nan filling close to the boundary. Note that in the case of many adjacent nans, the method will use previously - imputed values to do the fitting for later values. E.g. for - >>> x = np.array([1.0, 2.0, np.nan, 1.0, np.nan]) - the last np.nan will be fit on np.array([1.0, 2.0, *, 1.0]), where * is the - result of imputing based on np.array([1.0, 2.0]) (depends on the savgol - settings). + imputed values to do the fitting for later values. Parameters ---------- @@ -428,20 +428,27 @@ def savgol_impute(self, signal): An imputed 1D signal. """ signal_imputed = np.copy(signal) - for ix in np.where(np.isnan(signal))[0]: + for ix in np.where(np.isnan(signal_imputed))[0]: # Boundary cases if ix < self.window_length: - # A nan following a single value should just be extended + # At the boundary, a single value should just be extended if ix == 1: - signal_imputed[ix] = signal_imputed[0] - # Otherwise, use savgol fitting + signal_imputed[ix] = signal_imputed[ix - 1] + # Reduce the polynomial degree if needed + elif ix == 2: + signal_imputed[ix] = self.savgol_predict( + signal_imputed[:ix], 1, -1 + ) + # Otherwise, use savgol fitting on the largest window prior else: - coeffs = self.savgol_coeffs(-ix, -1) - signal_imputed[ix] = signal_imputed[:ix] @ coeffs - # Use a polynomial fit on the past window length to impute + signal_imputed[ix] = self.savgol_predict( + signal_imputed[:ix], self.poly_fit_degree, -1 + ) + # Away from the boundary, use savgol fitting on a fixed window else: - coeffs = self.savgol_coeffs(-self.window_length, -1) - signal_imputed[ix] = ( - signal_imputed[ix - self.window_length : ix] @ coeffs + signal_imputed[ix] = self.savgol_predict( + signal_imputed[ix - self.window_length : ix], + self.poly_fit_degree, + -1, ) return signal_imputed diff --git a/_delphi_utils_python/tests/test_smooth.py b/_delphi_utils_python/tests/test_smooth.py index 00423dfbb..692b0b10e 100644 --- a/_delphi_utils_python/tests/test_smooth.py +++ b/_delphi_utils_python/tests/test_smooth.py @@ -10,11 +10,24 @@ class TestSmoothers: + def test_bad_inputs(self): + with pytest.raises(ValueError): + Smoother(smoother_name="hamburger") + with pytest.raises(ValueError): + Smoother(impute_method="hamburger") + with pytest.raises(ValueError): + Smoother(boundary_method="hamburger") + def test_identity_smoother(self): signal = np.arange(30) + np.random.rand(30) assert np.allclose(signal, Smoother(smoother_name="identity").smooth(signal)) def test_moving_average_smoother(self): + # Test non-integer window-length + with pytest.raises(ValueError): + signal = np.array([1, 1, 1]) + Smoother(smoother_name="window_average", window_length=5.5).smooth(signal) + # The raw and smoothed lengths should match signal = np.ones(30) smoother = Smoother(smoother_name="moving_average") @@ -109,10 +122,13 @@ def test_causal_savgol_smoother(self): smoother_name="savgol", window_length=window_length, poly_fit_degree=1, ) smoothed_signal2 = smoother.smooth(signal) - assert np.allclose(smoothed_signal1, smoothed_signal2) def test_impute(self): + # test front nan error + with pytest.raises(ValueError): + Smoother().impute(signal=np.array([np.nan, 1, 1])) + # test the nan imputer signal = np.array([i if i % 3 else np.nan for i in range(1, 40)]) assert np.allclose(Smoother(impute_method=None).impute(signal), signal, equal_nan=True) @@ -180,6 +196,14 @@ def test_impute(self): smoothed_signal = smoother.savgol_impute(signal) assert np.allclose(smoothed_signal, signal) + # test that we don't hit a matrix inversion error when there are + # nans on less than window_length away from the boundary + signal = np.hstack([[1], np.nan*np.ones(12), np.arange(5)]) + smoother = Smoother(smoother_name="savgol", poly_fit_degree=2, + boundary_method="identity", window_length=10) + smoothed_signal = smoother.impute(signal) + assert np.allclose(smoothed_signal, np.hstack([[1], np.ones(12), np.arange(5)])) + def test_pandas_series_input(self): # The savgol method should match the linear regression method on the first # window_length-many values of the signal, if the savgol_weighting is set to true,