Skip to content

Commit df6e444

Browse files
committed
Nans JHU:
* keep nan values, add missing columns, add tests
1 parent 48b01ed commit df6e444

File tree

3 files changed

+149
-31
lines changed

3 files changed

+149
-31
lines changed

_delphi_utils_python/delphi_utils/archive.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,11 @@ def diff_export_csv(
112112
deleted_df[["val", "se", "sample_size"]] = np.nan
113113
deleted_df[["missing_val", "missing_se", "missing_sample_size"]] = Nans.DELETED
114114

115+
# Code deleted entries as nans with the deleted missing code
116+
deleted_df = before_df.loc[deleted_idx, :].copy()
117+
deleted_df[["val", "se", "sample_size"]] = np.nan
118+
deleted_df[["missing_val", "missing_se", "missing_sample_size"]] = Nans.DELETED
119+
115120
return (
116121
deleted_df,
117122
after_df_cmn.loc[~(same_mask.all(axis=1)), :],

jhu/delphi_jhu/run.py

Lines changed: 55 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,15 @@
99
import time
1010
from typing import Dict, Any
1111

12+
import pandas as pd
1213
import numpy as np
1314
from delphi_utils import (
1415
create_export_csv,
1516
S3ArchiveDiffer,
1617
Smoother,
1718
GeoMapper,
1819
get_structured_logger,
20+
Nans,
1921
)
2022

2123
from .geo import geo_map
@@ -63,6 +65,34 @@
6365
]
6466

6567

68+
def add_nancodes(df, metric, geo_res, smoother):
69+
"""Add nancodes to the dataframe."""
70+
idx = pd.IndexSlice
71+
72+
# Default missingness codes
73+
df["missing_val"] = Nans.NOT_MISSING
74+
df["missing_se"] = Nans.NOT_APPLICABLE
75+
df["missing_sample_size"] = Nans.NOT_APPLICABLE
76+
77+
# Mark early smoothing entries as data insufficient
78+
if smoother == "seven_day_average":
79+
df.sort_index(inplace=True)
80+
min_time_value = df.index.min()[0] + 5 * pd.Timedelta(days=1)
81+
df.loc[idx[:min_time_value, :], "missing_val"] = Nans.PRIVACY
82+
83+
# Mark Puerto Rico county deaths with a region exception code
84+
# Search "Puerto Rico" here for details:
85+
# https://github.com/CSSEGISandData/COVID-19/tree/master/csse_covid_19_data
86+
if metric == "deaths" and geo_res == "county":
87+
puerto_rico_fips = ["72" + str(i).zfill(3) for i in range(1, 155)]
88+
df.loc[idx[:, puerto_rico_fips], "missing_val"] = Nans.REGION_EXCEPTION
89+
90+
# Mark any remaining nans with unknown
91+
remaining_nans_mask = df["val"].isnull() & (df["missing_val"] == Nans.NOT_MISSING)
92+
df.loc[remaining_nans_mask, "missing_val"] = Nans.UNKNOWN
93+
return df
94+
95+
6696
def run_module(params: Dict[str, Any]):
6797
"""Run the JHU indicator module.
6898
@@ -86,8 +116,10 @@ def run_module(params: Dict[str, Any]):
86116
export_dir = params["common"]["export_dir"]
87117
base_url = params["indicator"]["base_url"]
88118
logger = get_structured_logger(
89-
__name__, filename=params["common"].get("log_filename"),
90-
log_exceptions=params["common"].get("log_exceptions", True))
119+
__name__,
120+
filename=params["common"].get("log_filename"),
121+
log_exceptions=params["common"].get("log_exceptions", True),
122+
)
91123

92124
if "archive" in params:
93125
arch_diff = S3ArchiveDiffer(
@@ -112,16 +144,22 @@ def run_module(params: Dict[str, Any]):
112144
metric=metric,
113145
geo_res=geo_res,
114146
sensor=sensor,
115-
smoother=smoother)
147+
smoother=smoother,
148+
)
116149
df = dfs[metric]
117150
# Aggregate to appropriate geographic resolution
118151
df = geo_map(df, geo_res, sensor)
119152
df.set_index(["timestamp", "geo_id"], inplace=True)
153+
154+
# Smooth
120155
df["val"] = df[sensor].groupby(level=1).transform(SMOOTHERS_MAP[smoother][0])
156+
157+
# JHU is not a survey data source
121158
df["se"] = np.nan
122159
df["sample_size"] = np.nan
123-
# Drop early entries where data insufficient for smoothing
124-
df = df[~df["val"].isnull()]
160+
161+
df = add_nancodes(df, metric, geo_res, smoother)
162+
125163
df = df.reset_index()
126164
sensor_name = SENSOR_NAME_MAP[sensor][0]
127165
# if (SENSOR_NAME_MAP[sensor][1] or SMOOTHERS_MAP[smoother][2]):
@@ -141,7 +179,8 @@ def run_module(params: Dict[str, Any]):
141179
if not oldest_final_export_date:
142180
oldest_final_export_date = max(exported_csv_dates)
143181
oldest_final_export_date = min(
144-
oldest_final_export_date, max(exported_csv_dates))
182+
oldest_final_export_date, max(exported_csv_dates)
183+
)
145184

146185
if arch_diff is not None:
147186
# Diff exports, and make incremental versions
@@ -167,9 +206,13 @@ def run_module(params: Dict[str, Any]):
167206
formatted_oldest_final_export_date = None
168207
if oldest_final_export_date:
169208
max_lag_in_days = (datetime.now() - oldest_final_export_date).days
170-
formatted_oldest_final_export_date = oldest_final_export_date.strftime("%Y-%m-%d")
171-
logger.info("Completed indicator run",
172-
elapsed_time_in_seconds = elapsed_time_in_seconds,
173-
csv_export_count = csv_export_count,
174-
max_lag_in_days = max_lag_in_days,
175-
oldest_final_export_date = formatted_oldest_final_export_date)
209+
formatted_oldest_final_export_date = oldest_final_export_date.strftime(
210+
"%Y-%m-%d"
211+
)
212+
logger.info(
213+
"Completed indicator run",
214+
elapsed_time_in_seconds=elapsed_time_in_seconds,
215+
csv_export_count=csv_export_count,
216+
max_lag_in_days=max_lag_in_days,
217+
oldest_final_export_date=formatted_oldest_final_export_date,
218+
)

jhu/tests/test_run.py

Lines changed: 89 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,25 @@
22
from os.path import join, basename
33

44
import pandas as pd
5+
import numpy as np
6+
from delphi_jhu.run import add_nancodes
7+
from delphi_utils import Nans
8+
9+
def _non_ignored_files_set(directory):
10+
"""List all files in a directory not preceded by a '.' and store them in a set."""
11+
out = {fname for fname in listdir(directory) if not basename(fname).startswith(".")}
12+
return out
13+
14+
def _non_ignored_files_set(directory):
15+
"""List all files in a directory not preceded by a '.' and store them in a set."""
16+
out = {fname for fname in listdir(directory) if not basename(fname).startswith(".")}
17+
return out
518

619

720
class TestRun:
821
def test_output_files_exist(self, run_as_module):
922

10-
csv_files = [x for x in listdir("receiving") if not basename(x).startswith(".")]
23+
csv_files = _non_ignored_files_set("receiving")
1124

1225
dates = [
1326
"20200303",
@@ -17,29 +30,86 @@ def test_output_files_exist(self, run_as_module):
1730
"20200307",
1831
"20200308",
1932
"20200309",
20-
"20200310",
33+
"20200310"
2134
]
2235
geos = ["county", "hrr", "msa", "state", "hhs", "nation"]
23-
metrics = []
24-
for event in ["confirmed", "deaths"]:
25-
for smoothing in ["", "_7dav"]:
26-
for window in ["incidence", "cumulative"]:
27-
for stat in ["num", "prop"]:
28-
metrics.append(f"{event}{smoothing}_{window}_{stat}")
29-
30-
expected_files = []
31-
for date in dates:
32-
for geo in geos:
33-
for metric in metrics:
34-
# Can't compute 7dav for first few days of data because of NAs
35-
if date > "20200305" or "7dav" not in metric:
36-
expected_files += [date + "_" + geo + "_" + metric + ".csv"]
37-
38-
assert set(csv_files) == set(expected_files)
36+
signals = ["confirmed", "deaths"]
37+
metrics = [
38+
"cumulative_num",
39+
"cumulative_prop",
40+
"incidence_num",
41+
"incidence_prop",
42+
"7dav_incidence_num",
43+
"7dav_incidence_prop",
44+
"7dav_cumulative_num",
45+
"7dav_cumulative_prop",
46+
]
47+
48+
expected_files = {
49+
date + "_" + geo + "_" + signal + "_" + metric + ".csv"
50+
for date in dates
51+
for geo in geos
52+
for signal in signals
53+
for metric in metrics
54+
}
55+
56+
assert csv_files == expected_files
3957

4058
def test_output_file_format(self, run_as_module):
4159

4260
df = pd.read_csv(
4361
join("receiving", "20200310_state_confirmed_cumulative_num.csv")
4462
)
45-
assert (df.columns.values == ["geo_id", "val", "se", "sample_size"]).all()
63+
assert (
64+
df.columns.values
65+
== [
66+
"geo_id",
67+
"val",
68+
"se",
69+
"sample_size",
70+
"missing_val",
71+
"missing_se",
72+
"missing_sample_size",
73+
]
74+
).all()
75+
76+
def test_add_nancodes(self):
77+
df = pd.DataFrame({
78+
"timestamp": pd.date_range("20200321", "20200328"),
79+
"geo_id": ["01017", "01043", "01061", "01103", "02282", "72001", "31000", "49000"],
80+
"val": [0.1, 0.2, 0.3, 0.4, 0.5, np.nan, 0.7, np.nan],
81+
"se": [np.nan] * 8,
82+
"sample_size": [np.nan] * 8
83+
}).set_index(["timestamp", "geo_id"])
84+
expected_df = pd.DataFrame({
85+
"timestamp": pd.date_range("20200321", "20200328"),
86+
"geo_id": ["01017", "01043", "01061", "01103", "02282", "72001", "31000", "49000"],
87+
"val": [0.1, 0.2, 0.3, 0.4, 0.5, np.nan, 0.7, np.nan],
88+
"se": [np.nan] * 8,
89+
"sample_size": [np.nan] * 8,
90+
"missing_val": [Nans.NOT_MISSING] * 5 + [Nans.REGION_EXCEPTION, Nans.NOT_MISSING, Nans.UNKNOWN],
91+
"missing_se": [Nans.NOT_APPLICABLE] * 8,
92+
"missing_sample_size": [Nans.NOT_APPLICABLE] * 8,
93+
}).set_index(["timestamp", "geo_id"])
94+
95+
pd.testing.assert_frame_equal(add_nancodes(df, "deaths", "county", None), expected_df)
96+
97+
df2 = pd.DataFrame({
98+
"timestamp": pd.date_range("20200321", "20200328"),
99+
"geo_id": ["01017", "01043", "01061", "01103", "02282", "72001", "31000", "49000"],
100+
"val": [np.nan] * 6 + [0.7, np.nan],
101+
"se": [np.nan] * 8,
102+
"sample_size": [np.nan] * 8
103+
}).set_index(["timestamp", "geo_id"])
104+
expected_df2 = pd.DataFrame({
105+
"timestamp": pd.date_range("20200321", "20200328"),
106+
"geo_id": ["01017", "01043", "01061", "01103", "02282", "72001", "31000", "49000"],
107+
"val": [np.nan] * 6 + [0.7, np.nan],
108+
"se": [np.nan] * 8,
109+
"sample_size": [np.nan] * 8,
110+
"missing_val": [Nans.PRIVACY] * 5 + [Nans.REGION_EXCEPTION, Nans.NOT_MISSING, Nans.UNKNOWN],
111+
"missing_se": [Nans.NOT_APPLICABLE] * 8,
112+
"missing_sample_size": [Nans.NOT_APPLICABLE] * 8,
113+
}).set_index(["timestamp", "geo_id"])
114+
115+
pd.testing.assert_frame_equal(add_nancodes(df2, "deaths", "county", "seven_day_average"), expected_df2)

0 commit comments

Comments
 (0)