Skip to content

Commit 0f50fd7

Browse files
committed
NAN codes for JHU:
* keep nan values, add missing columns, add missing today, add tests
1 parent 0b7103a commit 0f50fd7

File tree

2 files changed

+133
-47
lines changed

2 files changed

+133
-47
lines changed

jhu/delphi_jhu/run.py

Lines changed: 55 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,14 @@
99
import time
1010
from typing import Dict, Any
1111

12+
import pandas as pd
1213
import numpy as np
1314
from delphi_utils import (
1415
create_export_csv,
1516
Smoother,
1617
GeoMapper,
1718
get_structured_logger,
19+
Nans,
1820
)
1921

2022
from .geo import geo_map
@@ -62,6 +64,34 @@
6264
]
6365

6466

67+
def add_nancodes(df, metric, geo_res, smoother):
68+
"""Add nancodes to the dataframe."""
69+
idx = pd.IndexSlice
70+
71+
# Default missingness codes
72+
df["missing_val"] = Nans.NOT_MISSING
73+
df["missing_se"] = Nans.NOT_APPLICABLE
74+
df["missing_sample_size"] = Nans.NOT_APPLICABLE
75+
76+
# Mark early smoothing entries as data insufficient
77+
if smoother == "seven_day_average":
78+
df.sort_index(inplace=True)
79+
min_time_value = df.index.min()[0] + 5 * pd.Timedelta(days=1)
80+
df.loc[idx[:min_time_value, :], "missing_val"] = Nans.CENSORED
81+
82+
# Mark Puerto Rico county deaths with a region exception code
83+
# Search "Puerto Rico" here for details:
84+
# https://github.com/CSSEGISandData/COVID-19/tree/master/csse_covid_19_data
85+
if metric == "deaths" and geo_res == "county":
86+
puerto_rico_fips = ["72" + str(i).zfill(3) for i in range(1, 155)]
87+
df.loc[idx[:, puerto_rico_fips], "missing_val"] = Nans.REGION_EXCEPTION
88+
89+
# Mark any remaining nans with unknown
90+
remaining_nans_mask = df["val"].isnull() & df["missing_val"].eq(Nans.NOT_MISSING)
91+
df.loc[remaining_nans_mask, "missing_val"] = Nans.OTHER
92+
return df
93+
94+
6595
def run_module(params: Dict[str, Any]):
6696
"""Run the JHU indicator module.
6797
@@ -85,8 +115,10 @@ def run_module(params: Dict[str, Any]):
85115
export_dir = params["common"]["export_dir"]
86116
base_url = params["indicator"]["base_url"]
87117
logger = get_structured_logger(
88-
__name__, filename=params["common"].get("log_filename"),
89-
log_exceptions=params["common"].get("log_exceptions", True))
118+
__name__,
119+
filename=params["common"].get("log_filename"),
120+
log_exceptions=params["common"].get("log_exceptions", True),
121+
)
90122

91123
gmpr = GeoMapper()
92124
dfs = {metric: pull_jhu_data(base_url, metric, gmpr) for metric in METRICS}
@@ -100,16 +132,22 @@ def run_module(params: Dict[str, Any]):
100132
metric=metric,
101133
geo_res=geo_res,
102134
sensor=sensor,
103-
smoother=smoother)
135+
smoother=smoother,
136+
)
104137
df = dfs[metric]
105138
# Aggregate to appropriate geographic resolution
106139
df = geo_map(df, geo_res, sensor)
107140
df.set_index(["timestamp", "geo_id"], inplace=True)
141+
142+
# Smooth
108143
df["val"] = df[sensor].groupby(level=1).transform(SMOOTHERS_MAP[smoother][0])
144+
145+
# JHU is not a survey data source
109146
df["se"] = np.nan
110147
df["sample_size"] = np.nan
111-
# Drop early entries where data insufficient for smoothing
112-
df = df[~df["val"].isnull()]
148+
149+
df = add_nancodes(df, metric, geo_res, smoother)
150+
113151
df = df.reset_index()
114152
sensor_name = SENSOR_NAME_MAP[sensor][0]
115153
# if (SENSOR_NAME_MAP[sensor][1] or SMOOTHERS_MAP[smoother][2]):
@@ -129,16 +167,21 @@ def run_module(params: Dict[str, Any]):
129167
if not oldest_final_export_date:
130168
oldest_final_export_date = max(exported_csv_dates)
131169
oldest_final_export_date = min(
132-
oldest_final_export_date, max(exported_csv_dates))
170+
oldest_final_export_date, max(exported_csv_dates)
171+
)
133172

134173
elapsed_time_in_seconds = round(time.time() - start_time, 2)
135174
max_lag_in_days = None
136175
formatted_oldest_final_export_date = None
137176
if oldest_final_export_date:
138177
max_lag_in_days = (datetime.now() - oldest_final_export_date).days
139-
formatted_oldest_final_export_date = oldest_final_export_date.strftime("%Y-%m-%d")
140-
logger.info("Completed indicator run",
141-
elapsed_time_in_seconds = elapsed_time_in_seconds,
142-
csv_export_count = csv_export_count,
143-
max_lag_in_days = max_lag_in_days,
144-
oldest_final_export_date = formatted_oldest_final_export_date)
178+
formatted_oldest_final_export_date = oldest_final_export_date.strftime(
179+
"%Y-%m-%d"
180+
)
181+
logger.info(
182+
"Completed indicator run",
183+
elapsed_time_in_seconds=elapsed_time_in_seconds,
184+
csv_export_count=csv_export_count,
185+
max_lag_in_days=max_lag_in_days,
186+
oldest_final_export_date=formatted_oldest_final_export_date,
187+
)

jhu/tests/test_run.py

Lines changed: 78 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -2,46 +2,89 @@
22
from os.path import join, basename
33

44
import pandas as pd
5+
import numpy as np
6+
from delphi_jhu.run import add_nancodes
7+
from delphi_utils import Nans
58

69

10+
def _non_ignored_files_set(directory):
11+
"""List all files in a directory not preceded by a '.' and store them in a set."""
12+
out = {fname for fname in listdir(directory) if not basename(fname).startswith(".")}
13+
return out
14+
715
class TestRun:
816
def test_output_files_exist(self, run_as_module):
9-
10-
csv_files = [x for x in listdir("receiving") if not basename(x).startswith(".")]
11-
12-
dates = [
13-
"20200303",
14-
"20200304",
15-
"20200305",
16-
"20200306",
17-
"20200307",
18-
"20200308",
19-
"20200309",
20-
"20200310",
21-
]
17+
csv_files = _non_ignored_files_set("receiving")
18+
dates = [d.strftime("%Y%m%d") for d in pd.date_range("20200303", "20200310")]
2219
geos = ["county", "hrr", "msa", "state", "hhs", "nation"]
23-
metrics = []
24-
for event in ["confirmed", "deaths"]:
25-
for smoothing in ["", "_7dav"]:
26-
for window in ["incidence", "cumulative"]:
27-
for stat in ["num", "prop"]:
28-
metrics.append(f"{event}{smoothing}_{window}_{stat}")
29-
30-
expected_files = []
31-
for date in dates:
32-
for geo in geos:
33-
for metric in metrics:
34-
if "7dav" in metric and "cumulative" in metric:
35-
continue
36-
# Can't compute 7dav for first few days of data because of NAs
37-
if date > "20200305" or "7dav" not in metric:
38-
expected_files += [date + "_" + geo + "_" + metric + ".csv"]
39-
40-
assert set(csv_files) == set(expected_files)
20+
metrics = [
21+
f"{event}{smoothing}_{window}_{stat}"
22+
for event in ["confirmed", "deaths"]
23+
for smoothing in ["", "_7dav"]
24+
for window in ["incidence", "cumulative"]
25+
for stat in ["num", "prop"]
26+
]
27+
expected_files = {
28+
f"{date}_{geo}_{metric}.csv"
29+
for date in dates
30+
for geo in geos
31+
for metric in metrics
32+
if not ("7dav" in metric and "cumulative" in metric)
33+
}
34+
35+
assert csv_files == expected_files
4136

4237
def test_output_file_format(self, run_as_module):
4338

44-
df = pd.read_csv(
45-
join("receiving", "20200310_state_confirmed_cumulative_num.csv")
46-
)
47-
assert (df.columns.values == ["geo_id", "val", "se", "sample_size"]).all()
39+
df = pd.read_csv(join("receiving", "20200310_state_confirmed_cumulative_num.csv"))
40+
expected_columns = [
41+
"geo_id",
42+
"val",
43+
"se",
44+
"sample_size",
45+
"missing_val",
46+
"missing_se",
47+
"missing_sample_size",
48+
]
49+
assert (df.columns.values == expected_columns).all()
50+
51+
def test_add_nancodes(self):
52+
df = pd.DataFrame({
53+
"timestamp": pd.date_range("20200321", "20200328"),
54+
"geo_id": ["01017", "01043", "01061", "01103", "02282", "72001", "31000", "49000"],
55+
"val": [0.1, 0.2, 0.3, 0.4, 0.5, np.nan, 0.7, np.nan],
56+
"se": [np.nan] * 8,
57+
"sample_size": [np.nan] * 8
58+
}).set_index(["timestamp", "geo_id"])
59+
expected_df = pd.DataFrame({
60+
"timestamp": pd.date_range("20200321", "20200328"),
61+
"geo_id": ["01017", "01043", "01061", "01103", "02282", "72001", "31000", "49000"],
62+
"val": [0.1, 0.2, 0.3, 0.4, 0.5, np.nan, 0.7, np.nan],
63+
"se": [np.nan] * 8,
64+
"sample_size": [np.nan] * 8,
65+
"missing_val": [Nans.NOT_MISSING] * 5 + [Nans.REGION_EXCEPTION, Nans.NOT_MISSING, Nans.OTHER],
66+
"missing_se": [Nans.NOT_APPLICABLE] * 8,
67+
"missing_sample_size": [Nans.NOT_APPLICABLE] * 8,
68+
}).set_index(["timestamp", "geo_id"])
69+
70+
pd.testing.assert_frame_equal(add_nancodes(df, "deaths", "county", None), expected_df)
71+
72+
df2 = pd.DataFrame({
73+
"timestamp": pd.date_range("20200321", "20200328"),
74+
"geo_id": ["01017", "01043", "01061", "01103", "02282", "72001", "31000", "49000"],
75+
"val": [np.nan] * 6 + [0.7, np.nan],
76+
"se": [np.nan] * 8,
77+
"sample_size": [np.nan] * 8
78+
}).set_index(["timestamp", "geo_id"])
79+
expected_df2 = pd.DataFrame({
80+
"timestamp": pd.date_range("20200321", "20200328"),
81+
"geo_id": ["01017", "01043", "01061", "01103", "02282", "72001", "31000", "49000"],
82+
"val": [np.nan] * 6 + [0.7, np.nan],
83+
"se": [np.nan] * 8,
84+
"sample_size": [np.nan] * 8,
85+
"missing_val": [Nans.CENSORED] * 5 + [Nans.REGION_EXCEPTION, Nans.NOT_MISSING, Nans.OTHER],
86+
"missing_se": [Nans.NOT_APPLICABLE] * 8,
87+
"missing_sample_size": [Nans.NOT_APPLICABLE] * 8,
88+
}).set_index(["timestamp", "geo_id"])
89+
90+
pd.testing.assert_frame_equal(add_nancodes(df2, "deaths", "county", "seven_day_average"), expected_df2)

0 commit comments

Comments
 (0)