Skip to content

Begin smoothing utility integration into JHU. #177

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Nov 12, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 15 additions & 6 deletions _delphi_utils_python/delphi_utils/smooth.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,9 @@ def __init__(
else:
self.coeffs = None

def smooth(self, signal: Union[np.ndarray, pd.Series], impute_order=2) -> Union[np.ndarray, pd.Series]:
def smooth(
self, signal: Union[np.ndarray, pd.Series], impute_order=2
) -> Union[np.ndarray, pd.Series]:
"""Apply a smoother to a signal.

The major workhorse smoothing function. Imputes the nans and then applies
Expand Down Expand Up @@ -200,7 +202,7 @@ def smooth(self, signal: Union[np.ndarray, pd.Series], impute_order=2) -> Union[
signal_smoothed = signal

# Append the nans back, since we want to preserve length
signal_smoothed = np.hstack([np.nan*np.ones(ix), signal_smoothed])
signal_smoothed = np.hstack([np.nan * np.ones(ix), signal_smoothed])
# Convert back to pandas if necessary
if is_pandas_series:
signal_smoothed = pd.Series(signal_smoothed)
Expand Down Expand Up @@ -295,21 +297,28 @@ def left_gauss_linear_smoother(self, signal):
weights = np.exp(
-((np.arange(idx + 1) - idx) ** 2) / self.gaussian_bandwidth
)
AwA = np.dot(A[: (idx + 1), :].T * weights, A[: (idx + 1), :]) # pylint: disable=invalid-name
AwA = np.dot( # pylint: disable=invalid-name
A[: (idx + 1), :].T * weights, A[: (idx + 1), :]
)
Awy = np.dot( # pylint: disable=invalid-name
A[: (idx + 1), :].T * weights, signal[: (idx + 1)].reshape(-1, 1)
)
try:
beta = np.linalg.solve(AwA, Awy)
signal_smoothed[idx] = np.dot(A[: (idx + 1), :], beta)[-1]
except np.linalg.LinAlgError:
signal_smoothed[idx] = signal[idx] if self.impute else np.nan # pylint: disable=using-constant-test
signal_smoothed[idx] = (
signal[idx] # pylint: disable=using-constant-test
if self.impute
else np.nan
)
if self.minval is not None:
signal_smoothed[signal_smoothed <= self.minval] = self.minval
return signal_smoothed

def savgol_predict(self, signal, poly_fit_degree, nr):
"""Predict a single value using the savgol method.

Fits a polynomial through the values given by the signal and returns the value
of the polynomial at the right-most signal-value. More precisely, for a signal of length
n, fits a poly_fit_degree polynomial through the points signal[-n+1+nr], signal[-n+2+nr],
Expand Down Expand Up @@ -368,7 +377,7 @@ def savgol_coeffs(self, nl, nr, poly_fit_degree):
if nr > 0:
warnings.warn("The filter is no longer causal.")

A = np.vstack(
A = np.vstack( # pylint: disable=invalid-name
[np.arange(nl, nr + 1) ** j for j in range(poly_fit_degree + 1)]
).T

Expand Down Expand Up @@ -471,7 +480,7 @@ def savgol_impute(self, signal, impute_order):
# imputation order is larger than the available data)
else:
signal_imputed[ix] = self.savgol_predict(
signal_imputed[:ix], min(ix-1, impute_order), -1
signal_imputed[:ix], min(ix - 1, impute_order), -1
)
# Away from the boundary, use savgol fitting on a fixed window
else:
Expand Down
37 changes: 19 additions & 18 deletions jhu/delphi_jhu/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,21 @@
"""
from datetime import datetime
from itertools import product
from functools import partial

import numpy as np
from delphi_utils import (
read_params,
create_export_csv,
S3ArchiveDiffer,
Smoother,
GeoMapper,
)

from delphi_utils import GeoMapper
from .geo import geo_map
from .pull import pull_jhu_data
from .smooth import (
identity,
kday_moving_average,
)


# global constants
seven_day_moving_average = partial(kday_moving_average, k=7)
METRICS = [
"deaths",
"confirmed",
Expand All @@ -41,10 +36,10 @@
"seven_day_average",
]
SENSOR_NAME_MAP = {
"new_counts": ("incidence_num", False),
"cumulative_counts": ("cumulative_num", False),
"incidence": ("incidence_prop", False),
"cumulative_prop": ("cumulative_prop", False),
"new_counts": ("incidence_num", False),
"cumulative_counts": ("cumulative_num", False),
"incidence": ("incidence_prop", False),
"cumulative_prop": ("cumulative_prop", False),
}
# Temporarily added for wip_ signals
# WIP_SENSOR_NAME_MAP = {
Expand All @@ -54,8 +49,8 @@
# "cumulative_prop": ("cumul_prop", False),
# }
SMOOTHERS_MAP = {
"unsmoothed": (identity, ''),
"seven_day_average": (seven_day_moving_average, '7dav_'),
"unsmoothed": (Smoother("identity").smooth, ""),
"seven_day_average": (Smoother("moving_average", window_length=7).smooth, "7dav_"),
}
GEO_RESOLUTIONS = [
"county",
Expand All @@ -75,17 +70,21 @@ def run_module():

if len(params["bucket_name"]) > 0:
arch_diff = S3ArchiveDiffer(
cache_dir, export_dir,
params["bucket_name"], "jhu",
params["aws_credentials"])
cache_dir,
export_dir,
params["bucket_name"],
"jhu",
params["aws_credentials"],
)
arch_diff.update_cache()
else:
arch_diff = None

gmpr = GeoMapper()
dfs = {metric: pull_jhu_data(base_url, metric, gmpr) for metric in METRICS}
for metric, geo_res, sensor, smoother in product(
METRICS, GEO_RESOLUTIONS, SENSORS, SMOOTHERS):
METRICS, GEO_RESOLUTIONS, SENSORS, SMOOTHERS
):
print(metric, geo_res, sensor, smoother)
df = dfs[metric]
# Aggregate to appropriate geographic resolution
Expand Down Expand Up @@ -121,7 +120,9 @@ def run_module():
_, fails = arch_diff.archive_exports(to_archive)

# Filter existing exports to exclude those that failed to archive
succ_common_diffs = {f: diff for f, diff in common_diffs.items() if f not in fails}
succ_common_diffs = {
f: diff for f, diff in common_diffs.items() if f not in fails
}
arch_diff.filter_exports(succ_common_diffs)

# Report failures: someone should probably look at them
Expand Down
39 changes: 0 additions & 39 deletions jhu/delphi_jhu/smooth.py

This file was deleted.

1 change: 0 additions & 1 deletion jhu/tests/test_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
from os.path import join

import pandas as pd
from delphi_jhu.run import run_module


class TestRun:
Expand Down
33 changes: 15 additions & 18 deletions jhu/tests/test_smooth.py
Original file line number Diff line number Diff line change
@@ -1,34 +1,31 @@
import pytest

from os import listdir
from os.path import join

import numpy as np
import pandas as pd
from delphi_jhu.run import run_module


class TestSmooth:
def test_output_files_smoothed(self, run_as_module):

dates = [str(x) for x in range(20200303, 20200310)]

smoothed = pd.read_csv(
join("./receiving",
f"{dates[-1]}_state_confirmed_7dav_cumulative_num.csv")
join("./receiving", f"{dates[-1]}_state_confirmed_7dav_cumulative_num.csv")
)

# Build a dataframe out of the individual day files
raw = pd.concat([
pd.read_csv(
join("./receiving",
f"{date}_state_confirmed_cumulative_num.csv")
) for date in dates
])
# Compute the mean across the time values; order doesn't matter
# this corresponds to the smoothed value on the last day
raw = pd.concat(
[
pd.read_csv(
join("./receiving", f"{date}_state_confirmed_cumulative_num.csv")
)
for date in dates
]
)
# Compute the mean across the time values; order doesn't matter
# this corresponds to the smoothed value on the last day
# 2020-03-10
raw = raw.groupby('geo_id')['val'].mean()

df = pd.merge(smoothed, raw, on='geo_id', suffixes=('_smoothed', '_raw'))
assert np.allclose(df['val_smoothed'].values, df['val_raw'].values)

raw = raw.groupby("geo_id")["val"].mean()
df = pd.merge(smoothed, raw, on="geo_id", suffixes=("_smoothed", "_raw"))
assert np.allclose(df["val_smoothed"].values, df["val_raw"].values)