Skip to content

Fix smoother imputing polynomial order #490

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Nov 11, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 90 additions & 53 deletions _delphi_utils_python/delphi_utils/smooth.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,15 +138,19 @@ def __init__(
raise ValueError("Invalid impute_method given.")
if self.boundary_method not in valid_boundary_methods:
raise ValueError("Invalid boundary_method given.")
if self.window_length <= 1:
raise ValueError("Window length is too short.")

if smoother_name == "savgol":
# The polynomial fitting is done on a past window of size window_length
# including the current day value.
self.coeffs = self.savgol_coeffs(-self.window_length + 1, 0)
self.coeffs = self.savgol_coeffs(
-self.window_length + 1, 0, self.poly_fit_degree
)
else:
self.coeffs = None

def smooth(self, signal: Union[np.ndarray, pd.Series]) -> Union[np.ndarray, pd.Series]:
def smooth(self, signal: Union[np.ndarray, pd.Series], impute_order=2) -> Union[np.ndarray, pd.Series]:
"""Apply a smoother to a signal.

The major workhorse smoothing function. Imputes the nans and then applies
Expand All @@ -156,31 +160,54 @@ def smooth(self, signal: Union[np.ndarray, pd.Series]) -> Union[np.ndarray, pd.S
----------
signal: np.ndarray or pd.Series
A 1D signal to be smoothed.
impute_order: int
The polynomial order of the fit used for imputation. By default, this is set to
2.

Returns
----------
signal_smoothed: np.ndarray or pd.Series
A smoothed 1D signal. Returns an array of the same type and length as
the input.
"""
# If all nans, pass through
if np.all(np.isnan(signal)):
return signal

is_pandas_series = isinstance(signal, pd.Series)
pandas_index = signal.index if is_pandas_series else None
signal = signal.to_numpy() if is_pandas_series else signal

signal = self.impute(signal)
# Find where the first non-nan value is located and truncate the initial nans
ix = np.where(~np.isnan(signal))[0][0]
signal = signal[ix:]

if self.smoother_name == "savgol":
signal_smoothed = self.savgol_smoother(signal)
elif self.smoother_name == "left_gauss_linear":
signal_smoothed = self.left_gauss_linear_smoother(signal)
elif self.smoother_name == "moving_average":
signal_smoothed = self.moving_average_smoother(signal)
else:
# Don't smooth in certain edge cases
if len(signal) < self.poly_fit_degree or len(signal) == 1:
signal_smoothed = signal.copy()

signal_smoothed = signal_smoothed if not is_pandas_series else pd.Series(signal_smoothed)
else:
# Impute
signal = self.impute(signal, impute_order=impute_order)

# Smooth
if self.smoother_name == "savgol":
signal_smoothed = self.savgol_smoother(signal)
elif self.smoother_name == "left_gauss_linear":
signal_smoothed = self.left_gauss_linear_smoother(signal)
elif self.smoother_name == "moving_average":
signal_smoothed = self.moving_average_smoother(signal)
elif self.smoother_name == "identity":
signal_smoothed = signal

# Append the nans back, since we want to preserve length
signal_smoothed = np.hstack([np.nan*np.ones(ix), signal_smoothed])
# Convert back to pandas if necessary
if is_pandas_series:
signal_smoothed = pd.Series(signal_smoothed)
signal_smoothed.index = pandas_index
return signal_smoothed

def impute(self, signal):
def impute(self, signal, impute_order=2):
"""Impute the nan values in the signal.

See the class docstring for an explanation of the impute methods.
Expand All @@ -189,6 +216,8 @@ def impute(self, signal):
----------
signal: np.ndarray
1D signal to be imputed.
impute_order: int
The polynomial order of the fit used for imputation.

Returns
-------
Expand All @@ -200,7 +229,7 @@ def impute(self, signal):
# To preserve input-output array lengths, this util will not drop NaNs for you.
if np.isnan(signal[0]):
raise ValueError("The signal should not begin with a nan value.")
imputed_signal = self.savgol_impute(signal)
imputed_signal = self.savgol_impute(signal, impute_order)
elif self.impute_method == "zeros":
imputed_signal = np.nan_to_num(signal)
elif self.impute_method is None:
Expand Down Expand Up @@ -279,33 +308,39 @@ def left_gauss_linear_smoother(self, signal):
signal_smoothed[signal_smoothed <= self.minval] = self.minval
return signal_smoothed

def savgol_predict(self, signal):
def savgol_predict(self, signal, poly_fit_degree, nr):
"""Predict a single value using the savgol method.

Fits a polynomial through the values given by the signal and returns the value
of the polynomial at the right-most signal-value. More precisely, fits a polynomial
f(t) of degree poly_fit_degree through the points signal[-n], signal[-n+1] ..., signal[-1],
and returns the evaluation of the polynomial at the location of signal[0].
of the polynomial at the right-most signal-value. More precisely, for a signal of length
n, fits a poly_fit_degree polynomial through the points signal[-n+1+nr], signal[-n+2+nr],
..., signal[nr], and returns the evaluation of the polynomial at signal[0]. Hence, if
nr=0, then the last value of the signal is smoothed, and if nr=-1, then the value after
the last signal value is anticipated.

Parameters
----------
signal: np.ndarray
A 1D signal to smooth.
poly_fit_degree: int
The degree of the polynomial fit.
nr: int
An integer that determines the position of the predicted value relative to the signal.

Returns
----------
predicted_value: float
The anticipated value that comes after the end of the signal based on a polynomial fit.
"""
# Add one
coeffs = self.savgol_coeffs(-len(signal) + 1, 0)
coeffs = self.savgol_coeffs(-len(signal) + 1 + nr, nr, poly_fit_degree)
predicted_value = signal @ coeffs
return predicted_value

def savgol_coeffs(self, nl, nr):
def savgol_coeffs(self, nl, nr, poly_fit_degree):
"""Solve for the Savitzky-Golay coefficients.

The coefficients c_i give a filter so that
Solves for the Savitzky-Golay coefficients. The coefficients c_i
give a filter so that
y = sum_{i=-{n_l}}^{n_r} c_i x_i
is the value at 0 (thus the constant term) of the polynomial fit
through the points {x_i}. The coefficients are c_i are calculated as
Expand All @@ -322,23 +357,20 @@ def savgol_coeffs(self, nl, nr):
The right window bound for the polynomial fit, inclusive.
poly_fit_degree: int
The degree of the polynomial to be fit.
gaussian_bandwidth: float or None
If float, performs regression with Gaussian weights whose variance is
the gaussian_bandwidth. If None, performs unweighted regression.

Returns
----------
coeffs: np.ndarray
A vector of coefficients of length nl that determines the savgol
A vector of coefficients of length nr - nl + 1 that determines the savgol
convolution filter.
"""
if nl >= nr:
raise ValueError("The left window bound should be less than the right.")
if nr > 0:
raise warnings.warn("The filter is no longer causal.")
warnings.warn("The filter is no longer causal.")

A = np.vstack( # pylint: disable=invalid-name
[np.arange(nl, nr + 1) ** j for j in range(self.poly_fit_degree + 1)]
A = np.vstack(
[np.arange(nl, nr + 1) ** j for j in range(poly_fit_degree + 1)]
).T

if self.gaussian_bandwidth is None:
Expand Down Expand Up @@ -382,15 +414,17 @@ def savgol_smoother(self, signal):
# - identity keeps the original signal (doesn't smooth)
# - nan writes nans
if self.boundary_method == "shortened_window": # pylint: disable=no-else-return
for ix in range(len(self.coeffs)):
for ix in range(min(len(self.coeffs), len(signal))):
if ix == 0:
signal_smoothed[ix] = signal[ix]
else:
# At the very edge, the design matrix is often singular, in which case
# we just fall back to the raw signal
try:
signal_smoothed[ix] = self.savgol_predict(signal[:ix+1])
except np.linalg.LinAlgError:
signal_smoothed[ix] = self.savgol_predict(
signal[: ix + 1], self.poly_fit_degree, 0
)
except np.linalg.LinAlgError: # for small ix, the design matrix is singular
signal_smoothed[ix] = signal[ix]
return signal_smoothed
elif self.boundary_method == "identity":
Expand All @@ -400,48 +434,51 @@ def savgol_smoother(self, signal):
elif self.boundary_method == "nan":
return signal_smoothed

def savgol_impute(self, signal):
def savgol_impute(self, signal, impute_order):
"""Impute the nan values in signal using savgol.

This method fills the nan values in the signal with an M-degree polynomial fit
This method fills the nan values in the signal with polynomial interpolation
on a rolling window of the immediate past up to window_length data points.

In the case of a single data point in the past, the single data point is
continued. In the case of no data points in the past (i.e. the signal starts
with nan), an error is raised.
A number of boundary cases are handled involving nan filling close to the boundary.

Note that in the case of many adjacent nans, the method will use previously
imputed values to do the fitting for later values. E.g. for
>>> x = np.array([1.0, 2.0, np.nan, 1.0, np.nan])
the last np.nan will be fit on np.array([1.0, 2.0, *, 1.0]), where * is the
result of imputing based on np.array([1.0, 2.0]) (depends on the savgol
settings).
imputed values to do the fitting for later values.

Parameters
----------
signal: np.ndarray
A 1D signal to be imputed.
impute_order: int
The polynomial order of the fit used for imputation.

Returns
----------
signal_imputed: np.ndarray
An imputed 1D signal.
"""
if impute_order > self.window_length:
raise ValueError("Impute order must be smaller than window length.")

signal_imputed = np.copy(signal)
for ix in np.where(np.isnan(signal))[0]:
for ix in np.where(np.isnan(signal_imputed))[0]:
# Boundary cases
if ix < self.window_length:
# A nan following a single value should just be extended
# At the boundary, a single value should just be extended
if ix == 1:
signal_imputed[ix] = signal_imputed[0]
# Otherwise, use savgol fitting
signal_imputed[ix] = signal_imputed[ix - 1]
# Otherwise, use savgol fitting on the largest window prior,
# reduce the polynomial degree if needed (can't fit if the
# imputation order is larger than the available data)
else:
coeffs = self.savgol_coeffs(-ix, -1)
signal_imputed[ix] = signal_imputed[:ix] @ coeffs
# Use a polynomial fit on the past window length to impute
signal_imputed[ix] = self.savgol_predict(
signal_imputed[:ix], min(ix-1, impute_order), -1
)
# Away from the boundary, use savgol fitting on a fixed window
else:
coeffs = self.savgol_coeffs(-self.window_length, -1)
signal_imputed[ix] = (
signal_imputed[ix - self.window_length : ix] @ coeffs
signal_imputed[ix] = self.savgol_predict(
signal_imputed[ix - self.window_length : ix],
impute_order,
-1,
)
return signal_imputed
Loading