Skip to content

Commit d64dc06

Browse files
committed
NAN codes for JHU:
* keep nan values, add missing columns, add missing today, add tests
1 parent 3f6f456 commit d64dc06

File tree

3 files changed

+199
-31
lines changed

3 files changed

+199
-31
lines changed

_delphi_utils_python/delphi_utils/nancodes.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
"""Provides unified not-a-number codes for the indicators.
22
33
Currently requires a manual sync between the covidcast-indicators
4-
and the delphi-epidata repo.
4+
and the delphi-epidata repo.
55
* in covidcast-indicators: _delphi_utils_python/delphi_utils
66
* in delphi-epidata: src/acquisition/covidcast
77
"""

jhu/delphi_jhu/run.py

Lines changed: 86 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,15 @@
99
import time
1010
from typing import Dict, Any
1111

12+
import pandas as pd
1213
import numpy as np
1314
from delphi_utils import (
1415
create_export_csv,
1516
S3ArchiveDiffer,
1617
Smoother,
1718
GeoMapper,
1819
get_structured_logger,
20+
Nans,
1921
)
2022

2123
from .geo import geo_map
@@ -63,6 +65,64 @@
6365
]
6466

6567

68+
def add_nancodes(df, metric, geo_res, smoother):
69+
"""Add nancodes to the dataframe."""
70+
idx = pd.IndexSlice
71+
72+
# Default missingness codes
73+
df["missing_val"] = Nans.NOT_MISSING
74+
df["missing_se"] = Nans.NOT_APPLICABLE
75+
df["missing_sample_size"] = Nans.NOT_APPLICABLE
76+
77+
# Mark early smoothing entries as data insufficient
78+
if smoother == "seven_day_average":
79+
df.sort_index(inplace=True)
80+
min_time_value = df.index.min()[0] + 5 * pd.Timedelta(days=1)
81+
df.loc[idx[:min_time_value, :], "missing_val"] = Nans.PRIVACY
82+
83+
# Mark Puerto Rico county deaths with a region exception code
84+
# Search "Puerto Rico" here for details:
85+
# https://github.com/CSSEGISandData/COVID-19/tree/master/csse_covid_19_data
86+
if metric == "deaths" and geo_res == "county":
87+
puerto_rico_fips = ["72" + str(i).zfill(3) for i in range(1, 155)]
88+
df.loc[idx[:, puerto_rico_fips], "missing_val"] = Nans.REGION_EXCEPTION
89+
90+
# Mark any remaining nans with unknown
91+
remaining_nans_mask = df["val"].isnull() & df["missing_val"].eq(Nans.NOT_MISSING)
92+
df.loc[remaining_nans_mask, "missing_val"] = Nans.UNKNOWN
93+
return df
94+
95+
def add_missing_current_day(df):
96+
"""Add missing entry for today if geo had a value previously."""
97+
df = df.copy().reset_index()
98+
today = pd.Timestamp.today().date()
99+
missing_today_mask = lambda x: not any(pd.to_datetime(x).dt.date == today)
100+
has_prior_data_mask = lambda x: any(pd.to_datetime(x).dt.date < today)
101+
expected_geos = set(
102+
df.groupby(["geo_id"])["timestamp"]
103+
.agg(
104+
mask1=missing_today_mask,
105+
mask2=has_prior_data_mask
106+
)
107+
.query("mask1 == True and mask2 == True")
108+
.index
109+
.values
110+
)
111+
new_rows = pd.DataFrame({
112+
"timestamp": [pd.to_datetime(today)] * len(expected_geos),
113+
"geo_id": list(expected_geos),
114+
"new_counts": [np.nan] * len(expected_geos),
115+
"cumulative_counts": [np.nan] * len(expected_geos),
116+
"population": [np.nan] * len(expected_geos),
117+
"incidence": [np.nan] * len(expected_geos),
118+
"cumulative_prop": [np.nan] * len(expected_geos),
119+
"val": [np.nan] * len(expected_geos),
120+
"se": [np.nan] * len(expected_geos),
121+
"sample_size": [np.nan] * len(expected_geos),
122+
})
123+
df = df.append(new_rows).set_index(["timestamp", "geo_id"])
124+
return df
125+
66126
def run_module(params: Dict[str, Any]):
67127
"""Run the JHU indicator module.
68128
@@ -86,8 +146,10 @@ def run_module(params: Dict[str, Any]):
86146
export_dir = params["common"]["export_dir"]
87147
base_url = params["indicator"]["base_url"]
88148
logger = get_structured_logger(
89-
__name__, filename=params["common"].get("log_filename"),
90-
log_exceptions=params["common"].get("log_exceptions", True))
149+
__name__,
150+
filename=params["common"].get("log_filename"),
151+
log_exceptions=params["common"].get("log_exceptions", True),
152+
)
91153

92154
if "archive" in params:
93155
arch_diff = S3ArchiveDiffer(
@@ -112,16 +174,23 @@ def run_module(params: Dict[str, Any]):
112174
metric=metric,
113175
geo_res=geo_res,
114176
sensor=sensor,
115-
smoother=smoother)
177+
smoother=smoother,
178+
)
116179
df = dfs[metric]
117180
# Aggregate to appropriate geographic resolution
118181
df = geo_map(df, geo_res, sensor)
119182
df.set_index(["timestamp", "geo_id"], inplace=True)
183+
184+
# Smooth
120185
df["val"] = df[sensor].groupby(level=1).transform(SMOOTHERS_MAP[smoother][0])
186+
187+
# JHU is not a survey data source
121188
df["se"] = np.nan
122189
df["sample_size"] = np.nan
123-
# Drop early entries where data insufficient for smoothing
124-
df = df[~df["val"].isnull()]
190+
191+
df = add_missing_current_day(df)
192+
df = add_nancodes(df, metric, geo_res, smoother)
193+
125194
df = df.reset_index()
126195
sensor_name = SENSOR_NAME_MAP[sensor][0]
127196
# if (SENSOR_NAME_MAP[sensor][1] or SMOOTHERS_MAP[smoother][2]):
@@ -141,7 +210,8 @@ def run_module(params: Dict[str, Any]):
141210
if not oldest_final_export_date:
142211
oldest_final_export_date = max(exported_csv_dates)
143212
oldest_final_export_date = min(
144-
oldest_final_export_date, max(exported_csv_dates))
213+
oldest_final_export_date, max(exported_csv_dates)
214+
)
145215

146216
if arch_diff is not None:
147217
# Diff exports, and make incremental versions
@@ -167,9 +237,13 @@ def run_module(params: Dict[str, Any]):
167237
formatted_oldest_final_export_date = None
168238
if oldest_final_export_date:
169239
max_lag_in_days = (datetime.now() - oldest_final_export_date).days
170-
formatted_oldest_final_export_date = oldest_final_export_date.strftime("%Y-%m-%d")
171-
logger.info("Completed indicator run",
172-
elapsed_time_in_seconds = elapsed_time_in_seconds,
173-
csv_export_count = csv_export_count,
174-
max_lag_in_days = max_lag_in_days,
175-
oldest_final_export_date = formatted_oldest_final_export_date)
240+
formatted_oldest_final_export_date = oldest_final_export_date.strftime(
241+
"%Y-%m-%d"
242+
)
243+
logger.info(
244+
"Completed indicator run",
245+
elapsed_time_in_seconds=elapsed_time_in_seconds,
246+
csv_export_count=csv_export_count,
247+
max_lag_in_days=max_lag_in_days,
248+
oldest_final_export_date=formatted_oldest_final_export_date,
249+
)

jhu/tests/test_run.py

Lines changed: 112 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,19 @@
22
from os.path import join, basename
33

44
import pandas as pd
5+
import numpy as np
6+
from delphi_jhu.run import add_nancodes, add_missing_current_day
7+
from delphi_utils import Nans
58

9+
def _non_ignored_files_set(directory):
10+
"""List all files in a directory not preceded by a '.' and store them in a set."""
11+
out = {fname for fname in listdir(directory) if not basename(fname).startswith(".")}
12+
return out
613

714
class TestRun:
815
def test_output_files_exist(self, run_as_module):
916

10-
csv_files = [x for x in listdir("receiving") if not basename(x).startswith(".")]
17+
csv_files = _non_ignored_files_set("receiving")
1118

1219
dates = [
1320
"20200303",
@@ -18,28 +25,115 @@ def test_output_files_exist(self, run_as_module):
1825
"20200308",
1926
"20200309",
2027
"20200310",
28+
pd.Timestamp.today().date().strftime("%Y%m%d")
2129
]
2230
geos = ["county", "hrr", "msa", "state", "hhs", "nation"]
23-
metrics = []
24-
for event in ["confirmed", "deaths"]:
25-
for smoothing in ["", "_7dav"]:
26-
for window in ["incidence", "cumulative"]:
27-
for stat in ["num", "prop"]:
28-
metrics.append(f"{event}{smoothing}_{window}_{stat}")
29-
30-
expected_files = []
31-
for date in dates:
32-
for geo in geos:
33-
for metric in metrics:
34-
# Can't compute 7dav for first few days of data because of NAs
35-
if date > "20200305" or "7dav" not in metric:
36-
expected_files += [date + "_" + geo + "_" + metric + ".csv"]
37-
38-
assert set(csv_files) == set(expected_files)
31+
signals = ["confirmed", "deaths"]
32+
metrics = [
33+
"cumulative_num",
34+
"cumulative_prop",
35+
"incidence_num",
36+
"incidence_prop",
37+
"7dav_incidence_num",
38+
"7dav_incidence_prop",
39+
"7dav_cumulative_num",
40+
"7dav_cumulative_prop",
41+
]
42+
43+
expected_files = {
44+
date + "_" + geo + "_" + signal + "_" + metric + ".csv"
45+
for date in dates
46+
for geo in geos
47+
for signal in signals
48+
for metric in metrics
49+
}
50+
51+
assert csv_files == expected_files
3952

4053
def test_output_file_format(self, run_as_module):
4154

4255
df = pd.read_csv(
4356
join("receiving", "20200310_state_confirmed_cumulative_num.csv")
4457
)
45-
assert (df.columns.values == ["geo_id", "val", "se", "sample_size"]).all()
58+
assert (
59+
df.columns.values
60+
== [
61+
"geo_id",
62+
"val",
63+
"se",
64+
"sample_size",
65+
"missing_val",
66+
"missing_se",
67+
"missing_sample_size",
68+
]
69+
).all()
70+
71+
def test_add_nancodes(self):
72+
df = pd.DataFrame({
73+
"timestamp": pd.date_range("20200321", "20200328"),
74+
"geo_id": ["01017", "01043", "01061", "01103", "02282", "72001", "31000", "49000"],
75+
"val": [0.1, 0.2, 0.3, 0.4, 0.5, np.nan, 0.7, np.nan],
76+
"se": [np.nan] * 8,
77+
"sample_size": [np.nan] * 8
78+
}).set_index(["timestamp", "geo_id"])
79+
expected_df = pd.DataFrame({
80+
"timestamp": pd.date_range("20200321", "20200328"),
81+
"geo_id": ["01017", "01043", "01061", "01103", "02282", "72001", "31000", "49000"],
82+
"val": [0.1, 0.2, 0.3, 0.4, 0.5, np.nan, 0.7, np.nan],
83+
"se": [np.nan] * 8,
84+
"sample_size": [np.nan] * 8,
85+
"missing_val": [Nans.NOT_MISSING] * 5 + [Nans.REGION_EXCEPTION, Nans.NOT_MISSING, Nans.UNKNOWN],
86+
"missing_se": [Nans.NOT_APPLICABLE] * 8,
87+
"missing_sample_size": [Nans.NOT_APPLICABLE] * 8,
88+
}).set_index(["timestamp", "geo_id"])
89+
90+
pd.testing.assert_frame_equal(add_nancodes(df, "deaths", "county", None), expected_df)
91+
92+
df2 = pd.DataFrame({
93+
"timestamp": pd.date_range("20200321", "20200328"),
94+
"geo_id": ["01017", "01043", "01061", "01103", "02282", "72001", "31000", "49000"],
95+
"val": [np.nan] * 6 + [0.7, np.nan],
96+
"se": [np.nan] * 8,
97+
"sample_size": [np.nan] * 8
98+
}).set_index(["timestamp", "geo_id"])
99+
expected_df2 = pd.DataFrame({
100+
"timestamp": pd.date_range("20200321", "20200328"),
101+
"geo_id": ["01017", "01043", "01061", "01103", "02282", "72001", "31000", "49000"],
102+
"val": [np.nan] * 6 + [0.7, np.nan],
103+
"se": [np.nan] * 8,
104+
"sample_size": [np.nan] * 8,
105+
"missing_val": [Nans.PRIVACY] * 5 + [Nans.REGION_EXCEPTION, Nans.NOT_MISSING, Nans.UNKNOWN],
106+
"missing_se": [Nans.NOT_APPLICABLE] * 8,
107+
"missing_sample_size": [Nans.NOT_APPLICABLE] * 8,
108+
}).set_index(["timestamp", "geo_id"])
109+
110+
pd.testing.assert_frame_equal(add_nancodes(df2, "deaths", "county", "seven_day_average"), expected_df2)
111+
112+
def test_add_missing_current_day(self):
113+
today = pd.Timestamp.today().date().strftime("%Y%m%d")
114+
df = pd.DataFrame({
115+
"timestamp": pd.to_datetime(["20200304", today]),
116+
"geo_id": ["01017", "01061"],
117+
"new_counts": [np.nan] * 2,
118+
"cumulative_counts": [np.nan] * 2,
119+
"population": [np.nan] * 2,
120+
"incidence": [np.nan] * 2,
121+
"cumulative_prop": [np.nan] * 2,
122+
"val": [0.1, 0.3],
123+
"se": [np.nan] * 2,
124+
"sample_size": [np.nan] * 2
125+
}).set_index(["timestamp", "geo_id"])
126+
expected_df = pd.DataFrame({
127+
"timestamp": pd.to_datetime(["20200304", today, today]),
128+
"geo_id": ["01017", "01061", "01017"],
129+
"new_counts": [np.nan] * 3,
130+
"cumulative_counts": [np.nan] * 3,
131+
"population": [np.nan] * 3,
132+
"incidence": [np.nan] * 3,
133+
"cumulative_prop": [np.nan] * 3,
134+
"val": [0.1, 0.3, np.nan],
135+
"se": [np.nan] * 3,
136+
"sample_size": [np.nan] * 3
137+
}).set_index(["timestamp", "geo_id"])
138+
139+
pd.testing.assert_frame_equal(add_missing_current_day(df), expected_df)

0 commit comments

Comments
 (0)