Skip to content

Commit 410049d

Browse files
committed
NAN codes for JHU:
* keep nan values, add missing columns, add missing today, add tests
1 parent ea75253 commit 410049d

File tree

3 files changed

+144
-31
lines changed

3 files changed

+144
-31
lines changed

_delphi_utils_python/delphi_utils/archive.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,11 @@ def diff_export_csv(
112112
deleted_df[["val", "se", "sample_size"]] = np.nan
113113
deleted_df[["missing_val", "missing_se", "missing_sample_size"]] = Nans.DELETED
114114

115+
# Code deleted entries as nans with the deleted missing code
116+
deleted_df = before_df.loc[deleted_idx, :].copy()
117+
deleted_df[["val", "se", "sample_size"]] = np.nan
118+
deleted_df[["missing_val", "missing_se", "missing_sample_size"]] = Nans.DELETED
119+
115120
return (
116121
deleted_df,
117122
after_df_cmn.loc[~(same_mask.all(axis=1)), :],

jhu/delphi_jhu/run.py

Lines changed: 55 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,15 @@
99
import time
1010
from typing import Dict, Any
1111

12+
import pandas as pd
1213
import numpy as np
1314
from delphi_utils import (
1415
create_export_csv,
1516
S3ArchiveDiffer,
1617
Smoother,
1718
GeoMapper,
1819
get_structured_logger,
20+
Nans,
1921
)
2022

2123
from .geo import geo_map
@@ -63,6 +65,34 @@
6365
]
6466

6567

68+
def add_nancodes(df, metric, geo_res, smoother):
69+
"""Add nancodes to the dataframe."""
70+
idx = pd.IndexSlice
71+
72+
# Default missingness codes
73+
df["missing_val"] = Nans.NOT_MISSING
74+
df["missing_se"] = Nans.NOT_APPLICABLE
75+
df["missing_sample_size"] = Nans.NOT_APPLICABLE
76+
77+
# Mark early smoothing entries as data insufficient
78+
if smoother == "seven_day_average":
79+
df.sort_index(inplace=True)
80+
min_time_value = df.index.min()[0] + 5 * pd.Timedelta(days=1)
81+
df.loc[idx[:min_time_value, :], "missing_val"] = Nans.PRIVACY
82+
83+
# Mark Puerto Rico county deaths with a region exception code
84+
# Search "Puerto Rico" here for details:
85+
# https://github.com/CSSEGISandData/COVID-19/tree/master/csse_covid_19_data
86+
if metric == "deaths" and geo_res == "county":
87+
puerto_rico_fips = ["72" + str(i).zfill(3) for i in range(1, 155)]
88+
df.loc[idx[:, puerto_rico_fips], "missing_val"] = Nans.REGION_EXCEPTION
89+
90+
# Mark any remaining nans with unknown
91+
remaining_nans_mask = df["val"].isnull() & df["missing_val"].eq(Nans.NOT_MISSING)
92+
df.loc[remaining_nans_mask, "missing_val"] = Nans.UNKNOWN
93+
return df
94+
95+
6696
def run_module(params: Dict[str, Any]):
6797
"""Run the JHU indicator module.
6898
@@ -86,8 +116,10 @@ def run_module(params: Dict[str, Any]):
86116
export_dir = params["common"]["export_dir"]
87117
base_url = params["indicator"]["base_url"]
88118
logger = get_structured_logger(
89-
__name__, filename=params["common"].get("log_filename"),
90-
log_exceptions=params["common"].get("log_exceptions", True))
119+
__name__,
120+
filename=params["common"].get("log_filename"),
121+
log_exceptions=params["common"].get("log_exceptions", True),
122+
)
91123

92124
if "archive" in params:
93125
arch_diff = S3ArchiveDiffer(
@@ -112,16 +144,22 @@ def run_module(params: Dict[str, Any]):
112144
metric=metric,
113145
geo_res=geo_res,
114146
sensor=sensor,
115-
smoother=smoother)
147+
smoother=smoother,
148+
)
116149
df = dfs[metric]
117150
# Aggregate to appropriate geographic resolution
118151
df = geo_map(df, geo_res, sensor)
119152
df.set_index(["timestamp", "geo_id"], inplace=True)
153+
154+
# Smooth
120155
df["val"] = df[sensor].groupby(level=1).transform(SMOOTHERS_MAP[smoother][0])
156+
157+
# JHU is not a survey data source
121158
df["se"] = np.nan
122159
df["sample_size"] = np.nan
123-
# Drop early entries where data insufficient for smoothing
124-
df = df[~df["val"].isnull()]
160+
161+
df = add_nancodes(df, metric, geo_res, smoother)
162+
125163
df = df.reset_index()
126164
sensor_name = SENSOR_NAME_MAP[sensor][0]
127165
# if (SENSOR_NAME_MAP[sensor][1] or SMOOTHERS_MAP[smoother][2]):
@@ -141,7 +179,8 @@ def run_module(params: Dict[str, Any]):
141179
if not oldest_final_export_date:
142180
oldest_final_export_date = max(exported_csv_dates)
143181
oldest_final_export_date = min(
144-
oldest_final_export_date, max(exported_csv_dates))
182+
oldest_final_export_date, max(exported_csv_dates)
183+
)
145184

146185
if arch_diff is not None:
147186
# Diff exports, and make incremental versions
@@ -167,9 +206,13 @@ def run_module(params: Dict[str, Any]):
167206
formatted_oldest_final_export_date = None
168207
if oldest_final_export_date:
169208
max_lag_in_days = (datetime.now() - oldest_final_export_date).days
170-
formatted_oldest_final_export_date = oldest_final_export_date.strftime("%Y-%m-%d")
171-
logger.info("Completed indicator run",
172-
elapsed_time_in_seconds = elapsed_time_in_seconds,
173-
csv_export_count = csv_export_count,
174-
max_lag_in_days = max_lag_in_days,
175-
oldest_final_export_date = formatted_oldest_final_export_date)
209+
formatted_oldest_final_export_date = oldest_final_export_date.strftime(
210+
"%Y-%m-%d"
211+
)
212+
logger.info(
213+
"Completed indicator run",
214+
elapsed_time_in_seconds=elapsed_time_in_seconds,
215+
csv_export_count=csv_export_count,
216+
max_lag_in_days=max_lag_in_days,
217+
oldest_final_export_date=formatted_oldest_final_export_date,
218+
)

jhu/tests/test_run.py

Lines changed: 84 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,20 @@
22
from os.path import join, basename
33

44
import pandas as pd
5+
import numpy as np
6+
from delphi_jhu.run import add_nancodes
7+
from delphi_utils import Nans
58

69

10+
def _non_ignored_files_set(directory):
11+
"""List all files in a directory not preceded by a '.' and store them in a set."""
12+
out = {fname for fname in listdir(directory) if not basename(fname).startswith(".")}
13+
return out
14+
715
class TestRun:
816
def test_output_files_exist(self, run_as_module):
917

10-
csv_files = [x for x in listdir("receiving") if not basename(x).startswith(".")]
18+
csv_files = _non_ignored_files_set("receiving")
1119

1220
dates = [
1321
"20200303",
@@ -17,29 +25,86 @@ def test_output_files_exist(self, run_as_module):
1725
"20200307",
1826
"20200308",
1927
"20200309",
20-
"20200310",
28+
"20200310"
2129
]
2230
geos = ["county", "hrr", "msa", "state", "hhs", "nation"]
23-
metrics = []
24-
for event in ["confirmed", "deaths"]:
25-
for smoothing in ["", "_7dav"]:
26-
for window in ["incidence", "cumulative"]:
27-
for stat in ["num", "prop"]:
28-
metrics.append(f"{event}{smoothing}_{window}_{stat}")
29-
30-
expected_files = []
31-
for date in dates:
32-
for geo in geos:
33-
for metric in metrics:
34-
# Can't compute 7dav for first few days of data because of NAs
35-
if date > "20200305" or "7dav" not in metric:
36-
expected_files += [date + "_" + geo + "_" + metric + ".csv"]
37-
38-
assert set(csv_files) == set(expected_files)
31+
signals = ["confirmed", "deaths"]
32+
metrics = [
33+
"cumulative_num",
34+
"cumulative_prop",
35+
"incidence_num",
36+
"incidence_prop",
37+
"7dav_incidence_num",
38+
"7dav_incidence_prop",
39+
"7dav_cumulative_num",
40+
"7dav_cumulative_prop",
41+
]
42+
43+
expected_files = {
44+
date + "_" + geo + "_" + signal + "_" + metric + ".csv"
45+
for date in dates
46+
for geo in geos
47+
for signal in signals
48+
for metric in metrics
49+
}
50+
51+
assert csv_files == expected_files
3952

4053
def test_output_file_format(self, run_as_module):
4154

4255
df = pd.read_csv(
4356
join("receiving", "20200310_state_confirmed_cumulative_num.csv")
4457
)
45-
assert (df.columns.values == ["geo_id", "val", "se", "sample_size"]).all()
58+
assert (
59+
df.columns.values
60+
== [
61+
"geo_id",
62+
"val",
63+
"se",
64+
"sample_size",
65+
"missing_val",
66+
"missing_se",
67+
"missing_sample_size",
68+
]
69+
).all()
70+
71+
def test_add_nancodes(self):
72+
df = pd.DataFrame({
73+
"timestamp": pd.date_range("20200321", "20200328"),
74+
"geo_id": ["01017", "01043", "01061", "01103", "02282", "72001", "31000", "49000"],
75+
"val": [0.1, 0.2, 0.3, 0.4, 0.5, np.nan, 0.7, np.nan],
76+
"se": [np.nan] * 8,
77+
"sample_size": [np.nan] * 8
78+
}).set_index(["timestamp", "geo_id"])
79+
expected_df = pd.DataFrame({
80+
"timestamp": pd.date_range("20200321", "20200328"),
81+
"geo_id": ["01017", "01043", "01061", "01103", "02282", "72001", "31000", "49000"],
82+
"val": [0.1, 0.2, 0.3, 0.4, 0.5, np.nan, 0.7, np.nan],
83+
"se": [np.nan] * 8,
84+
"sample_size": [np.nan] * 8,
85+
"missing_val": [Nans.NOT_MISSING] * 5 + [Nans.REGION_EXCEPTION, Nans.NOT_MISSING, Nans.UNKNOWN],
86+
"missing_se": [Nans.NOT_APPLICABLE] * 8,
87+
"missing_sample_size": [Nans.NOT_APPLICABLE] * 8,
88+
}).set_index(["timestamp", "geo_id"])
89+
90+
pd.testing.assert_frame_equal(add_nancodes(df, "deaths", "county", None), expected_df)
91+
92+
df2 = pd.DataFrame({
93+
"timestamp": pd.date_range("20200321", "20200328"),
94+
"geo_id": ["01017", "01043", "01061", "01103", "02282", "72001", "31000", "49000"],
95+
"val": [np.nan] * 6 + [0.7, np.nan],
96+
"se": [np.nan] * 8,
97+
"sample_size": [np.nan] * 8
98+
}).set_index(["timestamp", "geo_id"])
99+
expected_df2 = pd.DataFrame({
100+
"timestamp": pd.date_range("20200321", "20200328"),
101+
"geo_id": ["01017", "01043", "01061", "01103", "02282", "72001", "31000", "49000"],
102+
"val": [np.nan] * 6 + [0.7, np.nan],
103+
"se": [np.nan] * 8,
104+
"sample_size": [np.nan] * 8,
105+
"missing_val": [Nans.PRIVACY] * 5 + [Nans.REGION_EXCEPTION, Nans.NOT_MISSING, Nans.UNKNOWN],
106+
"missing_se": [Nans.NOT_APPLICABLE] * 8,
107+
"missing_sample_size": [Nans.NOT_APPLICABLE] * 8,
108+
}).set_index(["timestamp", "geo_id"])
109+
110+
pd.testing.assert_frame_equal(add_nancodes(df2, "deaths", "county", "seven_day_average"), expected_df2)

0 commit comments

Comments
 (0)