Skip to content

Add NAN code support to JHU #839

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 55 additions & 12 deletions jhu/delphi_jhu/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@
import time
from typing import Dict, Any

import pandas as pd
import numpy as np
from delphi_utils import (
create_export_csv,
Smoother,
GeoMapper,
get_structured_logger,
Nans,
)

from .geo import geo_map
Expand Down Expand Up @@ -62,6 +64,34 @@
]


def add_nancodes(df, metric, geo_res, smoother):
"""Add nancodes to the dataframe."""
idx = pd.IndexSlice

# Default missingness codes
df["missing_val"] = Nans.NOT_MISSING
df["missing_se"] = Nans.NOT_APPLICABLE
df["missing_sample_size"] = Nans.NOT_APPLICABLE

# Mark early smoothing entries as data insufficient
if smoother == "seven_day_average":
df.sort_index(inplace=True)
min_time_value = df.index.min()[0] + 5 * pd.Timedelta(days=1)
df.loc[idx[:min_time_value, :], "missing_val"] = Nans.CENSORED

# Mark Puerto Rico county deaths with a region exception code
# Search "Puerto Rico" here for details:
# https://github.com/CSSEGISandData/COVID-19/tree/master/csse_covid_19_data
if metric == "deaths" and geo_res == "county":
puerto_rico_fips = ["72" + str(i).zfill(3) for i in range(1, 155)]
df.loc[idx[:, puerto_rico_fips], "missing_val"] = Nans.REGION_EXCEPTION

# Mark any remaining nans with unknown
remaining_nans_mask = df["val"].isnull() & df["missing_val"].eq(Nans.NOT_MISSING)
df.loc[remaining_nans_mask, "missing_val"] = Nans.OTHER
return df


def run_module(params: Dict[str, Any]):
"""Run the JHU indicator module.

Expand All @@ -85,8 +115,10 @@ def run_module(params: Dict[str, Any]):
export_dir = params["common"]["export_dir"]
base_url = params["indicator"]["base_url"]
logger = get_structured_logger(
__name__, filename=params["common"].get("log_filename"),
log_exceptions=params["common"].get("log_exceptions", True))
__name__,
filename=params["common"].get("log_filename"),
log_exceptions=params["common"].get("log_exceptions", True),
)

gmpr = GeoMapper()
dfs = {metric: pull_jhu_data(base_url, metric, gmpr) for metric in METRICS}
Expand All @@ -100,16 +132,22 @@ def run_module(params: Dict[str, Any]):
metric=metric,
geo_res=geo_res,
sensor=sensor,
smoother=smoother)
smoother=smoother,
)
df = dfs[metric]
# Aggregate to appropriate geographic resolution
df = geo_map(df, geo_res, sensor)
df.set_index(["timestamp", "geo_id"], inplace=True)

# Smooth
df["val"] = df[sensor].groupby(level=1).transform(SMOOTHERS_MAP[smoother][0])

# JHU is not a survey data source
df["se"] = np.nan
df["sample_size"] = np.nan
# Drop early entries where data insufficient for smoothing
df = df[~df["val"].isnull()]

df = add_nancodes(df, metric, geo_res, smoother)

df = df.reset_index()
sensor_name = SENSOR_NAME_MAP[sensor][0]
# if (SENSOR_NAME_MAP[sensor][1] or SMOOTHERS_MAP[smoother][2]):
Expand All @@ -129,16 +167,21 @@ def run_module(params: Dict[str, Any]):
if not oldest_final_export_date:
oldest_final_export_date = max(exported_csv_dates)
oldest_final_export_date = min(
oldest_final_export_date, max(exported_csv_dates))
oldest_final_export_date, max(exported_csv_dates)
)

elapsed_time_in_seconds = round(time.time() - start_time, 2)
max_lag_in_days = None
formatted_oldest_final_export_date = None
if oldest_final_export_date:
max_lag_in_days = (datetime.now() - oldest_final_export_date).days
formatted_oldest_final_export_date = oldest_final_export_date.strftime("%Y-%m-%d")
logger.info("Completed indicator run",
elapsed_time_in_seconds = elapsed_time_in_seconds,
csv_export_count = csv_export_count,
max_lag_in_days = max_lag_in_days,
oldest_final_export_date = formatted_oldest_final_export_date)
formatted_oldest_final_export_date = oldest_final_export_date.strftime(
"%Y-%m-%d"
)
logger.info(
"Completed indicator run",
elapsed_time_in_seconds=elapsed_time_in_seconds,
csv_export_count=csv_export_count,
max_lag_in_days=max_lag_in_days,
oldest_final_export_date=formatted_oldest_final_export_date,
)
113 changes: 78 additions & 35 deletions jhu/tests/test_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,46 +2,89 @@
from os.path import join, basename

import pandas as pd
import numpy as np
from delphi_jhu.run import add_nancodes
from delphi_utils import Nans


def _non_ignored_files_set(directory):
"""List all files in a directory not preceded by a '.' and store them in a set."""
out = {fname for fname in listdir(directory) if not basename(fname).startswith(".")}
return out

class TestRun:
def test_output_files_exist(self, run_as_module):

csv_files = [x for x in listdir("receiving") if not basename(x).startswith(".")]

dates = [
"20200303",
"20200304",
"20200305",
"20200306",
"20200307",
"20200308",
"20200309",
"20200310",
]
csv_files = _non_ignored_files_set("receiving")
dates = [d.strftime("%Y%m%d") for d in pd.date_range("20200303", "20200310")]
geos = ["county", "hrr", "msa", "state", "hhs", "nation"]
metrics = []
for event in ["confirmed", "deaths"]:
for smoothing in ["", "_7dav"]:
for window in ["incidence", "cumulative"]:
for stat in ["num", "prop"]:
metrics.append(f"{event}{smoothing}_{window}_{stat}")

expected_files = []
for date in dates:
for geo in geos:
for metric in metrics:
if "7dav" in metric and "cumulative" in metric:
continue
# Can't compute 7dav for first few days of data because of NAs
if date > "20200305" or "7dav" not in metric:
expected_files += [date + "_" + geo + "_" + metric + ".csv"]

assert set(csv_files) == set(expected_files)
metrics = [
f"{event}{smoothing}_{window}_{stat}"
for event in ["confirmed", "deaths"]
for smoothing in ["", "_7dav"]
for window in ["incidence", "cumulative"]
for stat in ["num", "prop"]
]
expected_files = {
f"{date}_{geo}_{metric}.csv"
for date in dates
for geo in geos
for metric in metrics
if not ("7dav" in metric and "cumulative" in metric)
}

assert csv_files == expected_files

def test_output_file_format(self, run_as_module):

df = pd.read_csv(
join("receiving", "20200310_state_confirmed_cumulative_num.csv")
)
assert (df.columns.values == ["geo_id", "val", "se", "sample_size"]).all()
df = pd.read_csv(join("receiving", "20200310_state_confirmed_cumulative_num.csv"))
expected_columns = [
"geo_id",
"val",
"se",
"sample_size",
"missing_val",
"missing_se",
"missing_sample_size",
]
assert (df.columns.values == expected_columns).all()

def test_add_nancodes(self):
df = pd.DataFrame({
"timestamp": pd.date_range("20200321", "20200328"),
"geo_id": ["01017", "01043", "01061", "01103", "02282", "72001", "31000", "49000"],
"val": [0.1, 0.2, 0.3, 0.4, 0.5, np.nan, 0.7, np.nan],
"se": [np.nan] * 8,
"sample_size": [np.nan] * 8
}).set_index(["timestamp", "geo_id"])
expected_df = pd.DataFrame({
"timestamp": pd.date_range("20200321", "20200328"),
"geo_id": ["01017", "01043", "01061", "01103", "02282", "72001", "31000", "49000"],
"val": [0.1, 0.2, 0.3, 0.4, 0.5, np.nan, 0.7, np.nan],
"se": [np.nan] * 8,
"sample_size": [np.nan] * 8,
"missing_val": [Nans.NOT_MISSING] * 5 + [Nans.REGION_EXCEPTION, Nans.NOT_MISSING, Nans.OTHER],
"missing_se": [Nans.NOT_APPLICABLE] * 8,
"missing_sample_size": [Nans.NOT_APPLICABLE] * 8,
}).set_index(["timestamp", "geo_id"])

pd.testing.assert_frame_equal(add_nancodes(df, "deaths", "county", None), expected_df)

df2 = pd.DataFrame({
"timestamp": pd.date_range("20200321", "20200328"),
"geo_id": ["01017", "01043", "01061", "01103", "02282", "72001", "31000", "49000"],
"val": [np.nan] * 6 + [0.7, np.nan],
"se": [np.nan] * 8,
"sample_size": [np.nan] * 8
}).set_index(["timestamp", "geo_id"])
expected_df2 = pd.DataFrame({
"timestamp": pd.date_range("20200321", "20200328"),
"geo_id": ["01017", "01043", "01061", "01103", "02282", "72001", "31000", "49000"],
"val": [np.nan] * 6 + [0.7, np.nan],
"se": [np.nan] * 8,
"sample_size": [np.nan] * 8,
"missing_val": [Nans.CENSORED] * 5 + [Nans.REGION_EXCEPTION, Nans.NOT_MISSING, Nans.OTHER],
"missing_se": [Nans.NOT_APPLICABLE] * 8,
"missing_sample_size": [Nans.NOT_APPLICABLE] * 8,
}).set_index(["timestamp", "geo_id"])

pd.testing.assert_frame_equal(add_nancodes(df2, "deaths", "county", "seven_day_average"), expected_df2)