Skip to content

Fix smoother to remove a non-invertible design matrix bug #476

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Nov 11, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 43 additions & 36 deletions _delphi_utils_python/delphi_utils/smooth.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,9 @@ def __init__(
if smoother_name == "savgol":
# The polynomial fitting is done on a past window of size window_length
# including the current day value.
self.coeffs = self.savgol_coeffs(-self.window_length + 1, 0)
self.coeffs = self.savgol_coeffs(
-self.window_length + 1, 0, self.poly_fit_degree
)
else:
self.coeffs = None

Expand Down Expand Up @@ -279,30 +281,35 @@ def left_gauss_linear_smoother(self, signal):
signal_smoothed[signal_smoothed <= self.minval] = self.minval
return signal_smoothed

def savgol_predict(self, signal):
def savgol_predict(self, signal, poly_fit_degree, nr):
"""Predict a single value using the savgol method.

Fits a polynomial through the values given by the signal and returns the value
of the polynomial at the right-most signal-value. More precisely, fits a polynomial
f(t) of degree poly_fit_degree through the points signal[-n], signal[-n+1] ..., signal[-1],
and returns the evaluation of the polynomial at the location of signal[0].
of the polynomial at the right-most signal-value. More precisely, for a signal of length
n, fits a poly_fit_degree polynomial through the points signal[-n+1+nr], signal[-n+2+nr],
..., signal[nr], and returns the evaluation of the polynomial at signal[0]. Hence, if
nr=0, then the last value of the signal is smoothed, and if nr=-1, then the value after
the last signal value is anticipated.

Parameters
----------
signal: np.ndarray
A 1D signal to smooth.
poly_fit_degree: int
The degree of the polynomial fit.
nr: int
An integer that determines the position of the predicted value relative to the signal.

Returns
----------
predicted_value: float
The anticipated value that comes after the end of the signal based on a polynomial fit.
"""
# Add one
coeffs = self.savgol_coeffs(-len(signal) + 1, 0)
coeffs = self.savgol_coeffs(-len(signal) + 1 + nr, nr, poly_fit_degree)
predicted_value = signal @ coeffs
return predicted_value

def savgol_coeffs(self, nl, nr):
def savgol_coeffs(self, nl, nr, poly_fit_degree):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add docstring for the new param.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

"""Solve for the Savitzky-Golay coefficients.

The coefficients c_i give a filter so that
Expand All @@ -322,23 +329,20 @@ def savgol_coeffs(self, nl, nr):
The right window bound for the polynomial fit, inclusive.
poly_fit_degree: int
The degree of the polynomial to be fit.
gaussian_bandwidth: float or None
If float, performs regression with Gaussian weights whose variance is
the gaussian_bandwidth. If None, performs unweighted regression.

Returns
----------
coeffs: np.ndarray
A vector of coefficients of length nl that determines the savgol
A vector of coefficients of length nr - nl + 1 that determines the savgol
convolution filter.
"""
if nl >= nr:
raise ValueError("The left window bound should be less than the right.")
if nr > 0:
raise warnings.warn("The filter is no longer causal.")
warnings.warn("The filter is no longer causal.")

A = np.vstack( # pylint: disable=invalid-name
[np.arange(nl, nr + 1) ** j for j in range(self.poly_fit_degree + 1)]
A = np.vstack(
[np.arange(nl, nr + 1) ** j for j in range(poly_fit_degree + 1)]
).T

if self.gaussian_bandwidth is None:
Expand Down Expand Up @@ -389,8 +393,10 @@ def savgol_smoother(self, signal):
# At the very edge, the design matrix is often singular, in which case
# we just fall back to the raw signal
try:
signal_smoothed[ix] = self.savgol_predict(signal[:ix+1])
except np.linalg.LinAlgError:
signal_smoothed[ix] = self.savgol_predict(
signal[: ix + 1], self.poly_fit_degree, 0
)
except np.linalg.LinAlgError: # for small ix, the design matrix is singular
signal_smoothed[ix] = signal[ix]
return signal_smoothed
elif self.boundary_method == "identity":
Expand All @@ -403,19 +409,13 @@ def savgol_smoother(self, signal):
def savgol_impute(self, signal):
"""Impute the nan values in signal using savgol.

This method fills the nan values in the signal with an M-degree polynomial fit
This method fills the nan values in the signal with a quadratic polynomial fit
on a rolling window of the immediate past up to window_length data points.

In the case of a single data point in the past, the single data point is
continued. In the case of no data points in the past (i.e. the signal starts
with nan), an error is raised.
A number of boundary cases are handled involving nan filling close to the boundary.

Note that in the case of many adjacent nans, the method will use previously
imputed values to do the fitting for later values. E.g. for
>>> x = np.array([1.0, 2.0, np.nan, 1.0, np.nan])
the last np.nan will be fit on np.array([1.0, 2.0, *, 1.0]), where * is the
result of imputing based on np.array([1.0, 2.0]) (depends on the savgol
settings).
imputed values to do the fitting for later values.

Parameters
----------
Expand All @@ -428,20 +428,27 @@ def savgol_impute(self, signal):
An imputed 1D signal.
"""
signal_imputed = np.copy(signal)
for ix in np.where(np.isnan(signal))[0]:
for ix in np.where(np.isnan(signal_imputed))[0]:
# Boundary cases
if ix < self.window_length:
# A nan following a single value should just be extended
# At the boundary, a single value should just be extended
if ix == 1:
signal_imputed[ix] = signal_imputed[0]
# Otherwise, use savgol fitting
signal_imputed[ix] = signal_imputed[ix - 1]
# Reduce the polynomial degree if needed
elif ix == 2:
signal_imputed[ix] = self.savgol_predict(
signal_imputed[:ix], 1, -1
)
# Otherwise, use savgol fitting on the largest window prior
else:
coeffs = self.savgol_coeffs(-ix, -1)
signal_imputed[ix] = signal_imputed[:ix] @ coeffs
# Use a polynomial fit on the past window length to impute
signal_imputed[ix] = self.savgol_predict(
signal_imputed[:ix], self.poly_fit_degree, -1
)
# Away from the boundary, use savgol fitting on a fixed window
else:
coeffs = self.savgol_coeffs(-self.window_length, -1)
signal_imputed[ix] = (
signal_imputed[ix - self.window_length : ix] @ coeffs
signal_imputed[ix] = self.savgol_predict(
signal_imputed[ix - self.window_length : ix],
self.poly_fit_degree,
-1,
)
return signal_imputed
26 changes: 25 additions & 1 deletion _delphi_utils_python/tests/test_smooth.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,24 @@


class TestSmoothers:
def test_bad_inputs(self):
with pytest.raises(ValueError):
Smoother(smoother_name="hamburger")
with pytest.raises(ValueError):
Smoother(impute_method="hamburger")
with pytest.raises(ValueError):
Smoother(boundary_method="hamburger")

def test_identity_smoother(self):
signal = np.arange(30) + np.random.rand(30)
assert np.allclose(signal, Smoother(smoother_name="identity").smooth(signal))

def test_moving_average_smoother(self):
# Test non-integer window-length
with pytest.raises(ValueError):
signal = np.array([1, 1, 1])
Smoother(smoother_name="window_average", window_length=5.5).smooth(signal)

# The raw and smoothed lengths should match
signal = np.ones(30)
smoother = Smoother(smoother_name="moving_average")
Expand Down Expand Up @@ -109,10 +122,13 @@ def test_causal_savgol_smoother(self):
smoother_name="savgol", window_length=window_length, poly_fit_degree=1,
)
smoothed_signal2 = smoother.smooth(signal)

assert np.allclose(smoothed_signal1, smoothed_signal2)

def test_impute(self):
# test front nan error
with pytest.raises(ValueError):
Smoother().impute(signal=np.array([np.nan, 1, 1]))

# test the nan imputer
signal = np.array([i if i % 3 else np.nan for i in range(1, 40)])
assert np.allclose(Smoother(impute_method=None).impute(signal), signal, equal_nan=True)
Expand Down Expand Up @@ -180,6 +196,14 @@ def test_impute(self):
smoothed_signal = smoother.savgol_impute(signal)
assert np.allclose(smoothed_signal, signal)

# test that we don't hit a matrix inversion error when there are
# nans on less than window_length away from the boundary
signal = np.hstack([[1], np.nan*np.ones(12), np.arange(5)])
smoother = Smoother(smoother_name="savgol", poly_fit_degree=2,
boundary_method="identity", window_length=10)
smoothed_signal = smoother.impute(signal)
assert np.allclose(smoothed_signal, np.hstack([[1], np.ones(12), np.arange(5)]))

def test_pandas_series_input(self):
# The savgol method should match the linear regression method on the first
# window_length-many values of the signal, if the savgol_weighting is set to true,
Expand Down