Skip to content

Commit 2904c1d

Browse files
authored
Merge pull request #343 from cmu-delphi/fix/combo-robustify
Make combo cases/deaths robust to source outages
2 parents 6008371 + 5994794 commit 2904c1d

File tree

3 files changed

+112
-44
lines changed

3 files changed

+112
-44
lines changed

combo_cases_and_deaths/delphi_combo_cases_and_deaths/constants.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,4 +18,4 @@
1818
"state",
1919
"msa",
2020
"hrr",
21-
]
21+
]

combo_cases_and_deaths/delphi_combo_cases_and_deaths/run.py

Lines changed: 63 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -9,46 +9,68 @@
99
from datetime import date, timedelta, datetime
1010
from itertools import product
1111
import re
12-
import sys
1312

1413
import covidcast
1514
import pandas as pd
1615

17-
from delphi_utils import read_params, create_export_csv
18-
from .constants import *
19-
from .handle_wip_signal import *
16+
from delphi_utils import read_params
17+
from .constants import METRICS, SMOOTH_TYPES, SENSORS, GEO_RESOLUTIONS
18+
from .handle_wip_signal import add_prefix
2019

2120

22-
def check_not_none(data_frame, label, date_range):
23-
"""Exit gracefully if a data frame we attempted to retrieve is empty"""
21+
def check_none_data_frame(data_frame, label, date_range):
22+
"""Log and return True when a data frame is None."""
2423
if data_frame is None:
25-
print(f"{label} not available in range {date_range}")
26-
sys.exit(1)
24+
print(f"{label} completely unavailable in range {date_range}")
25+
return True
26+
return False
2727

28-
def combine_usafacts_and_jhu(signal, geo, date_range):
28+
def maybe_append(df1, df2):
29+
"""
30+
If both data frames are available, append them and return. Otherwise, return
31+
whichever frame is not None.
32+
"""
33+
if df1 is None:
34+
return df2
35+
if df2 is None:
36+
return df1
37+
return df1.append(df2)
38+
39+
COLUMN_MAPPING = {"time_value": "timestamp",
40+
"geo_value": "geo_id",
41+
"value": "val",
42+
"stderr": "se",
43+
"sample_size": "sample_size"}
44+
def combine_usafacts_and_jhu(signal, geo, date_range, fetcher=covidcast.signal):
2945
"""
3046
Add rows for PR from JHU signals to USA-FACTS signals
3147
"""
32-
usafacts_df = covidcast.signal("usa-facts", signal, date_range[0], date_range[1], geo)
33-
jhu_df = covidcast.signal("jhu-csse", signal, date_range[0], date_range[1], geo)
34-
check_not_none(usafacts_df, "USA-FACTS", date_range)
35-
check_not_none(jhu_df, "JHU", date_range)
48+
print("Fetching usa-facts...")
49+
usafacts_df = fetcher("usa-facts", signal, date_range[0], date_range[1], geo)
50+
print("Fetching jhu-csse...")
51+
jhu_df = fetcher("jhu-csse", signal, date_range[0], date_range[1], geo)
52+
53+
if check_none_data_frame(usafacts_df, "USA-FACTS", date_range) and \
54+
(geo not in ('state', 'county') or \
55+
check_none_data_frame(jhu_df, "JHU", date_range)):
56+
return pd.DataFrame({}, columns=COLUMN_MAPPING.values())
3657

3758
# State level
3859
if geo == 'state':
39-
combined_df = usafacts_df.append(jhu_df[jhu_df["geo_value"] == 'pr'])
60+
combined_df = maybe_append(
61+
usafacts_df,
62+
jhu_df if jhu_df is None else jhu_df[jhu_df["geo_value"] == 'pr'])
4063
# County level
4164
elif geo == 'county':
42-
combined_df = usafacts_df.append(jhu_df[jhu_df["geo_value"] == '72000'])
65+
combined_df = maybe_append(
66+
usafacts_df,
67+
jhu_df if jhu_df is None else jhu_df[jhu_df["geo_value"] == '72000'])
4368
# For MSA and HRR level, they are the same
4469
else:
4570
combined_df = usafacts_df
4671

4772
combined_df = combined_df.drop(["direction"], axis=1)
48-
combined_df = combined_df.rename({"time_value": "timestamp",
49-
"geo_value": "geo_id",
50-
"value": "val",
51-
"stderr": "se"},
73+
combined_df = combined_df.rename(COLUMN_MAPPING,
5274
axis=1)
5375
return combined_df
5476

@@ -83,15 +105,12 @@ def sensor_signal(metric, sensor, smoother):
83105
sensor_name = "_".join([smoother, sensor])
84106
else:
85107
sensor_name = sensor
86-
signal = "_".join([metric, sensor_name])
87-
return sensor_name, signal
88-
89-
def run_module():
90-
"""Produce a combined cases and deaths signal using data from JHU and USA Facts"""
91-
variants = [tuple((metric, geo_res)+sensor_signal(metric, sensor, smoother))
92-
for (metric, geo_res, sensor, smoother) in
93-
product(METRICS, GEO_RESOLUTIONS, SENSORS, SMOOTH_TYPES)]
108+
return sensor_name, "_".join([metric, sensor_name])
94109

110+
def configure(variants):
111+
"""
112+
Validate params file and set date range.
113+
"""
95114
params = read_params()
96115
params['export_start_date'] = date(*params['export_start_date'])
97116
yesterday = date.today() - timedelta(days=1)
@@ -112,30 +131,36 @@ def run_module():
112131
# create combined files for all of the historical reports
113132
params['date_range'] = [params['export_start_date'], yesterday]
114133
else:
115-
pattern = re.compile(r'^\d{8}-\d{8}$')
116-
match_res = re.findall(pattern, params['date_range'])
134+
match_res = re.findall(re.compile(r'^\d{8}-\d{8}$'), params['date_range'])
117135
if len(match_res) == 0:
118136
raise ValueError(
119137
"Invalid date_range parameter. Please choose from (new, all, yyyymmdd-yyyymmdd).")
120138
try:
121139
date1 = datetime.strptime(params['date_range'][:8], '%Y%m%d').date()
122-
except ValueError:
123-
raise ValueError("Invalid date_range parameter. Please check the first date.")
140+
except ValueError as error:
141+
raise ValueError(
142+
"Invalid date_range parameter. Please check the first date.") from error
124143
try:
125144
date2 = datetime.strptime(params['date_range'][-8:], '%Y%m%d').date()
126-
except ValueError:
127-
raise ValueError("Invalid date_range parameter. Please check the second date.")
145+
except ValueError as error:
146+
raise ValueError(
147+
"Invalid date_range parameter. Please check the second date.") from error
128148

129149
#The the valid start date
130150
if date1 < params['export_start_date']:
131151
date1 = params['export_start_date']
132152
params['date_range'] = [date1, date2]
153+
return params
133154

134-
for metric, geo_res, sensor_name, signal in variants:
135-
136-
df = combine_usafacts_and_jhu(signal, geo_res, extend_raw_date_range(params, sensor_name))
137155

138-
df = df.copy()
156+
def run_module():
157+
"""Produce a combined cases and deaths signal using data from JHU and USA Facts"""
158+
variants = [tuple((metric, geo_res)+sensor_signal(metric, sensor, smoother))
159+
for (metric, geo_res, sensor, smoother) in
160+
product(METRICS, GEO_RESOLUTIONS, SENSORS, SMOOTH_TYPES)]
161+
params = configure(variants)
162+
for metric, geo_res, sensor_name, signal in variants:
163+
df = combine_usafacts_and_jhu(signal, geo_res, extend_raw_date_range(params, sensor_name)) # pylint: disable=invalid-name
139164
df["timestamp"] = pd.to_datetime(df["timestamp"])
140165
start_date = pd.to_datetime(params['export_start_date'])
141166
export_dir = params["export_dir"]
@@ -145,8 +170,7 @@ def run_module():
145170

146171
signal_name = add_prefix([signal], wip_signal=params["wip_signal"], prefix="wip_")
147172
for date_ in dates:
148-
export_fn = f'{date_.strftime("%Y%m%d")}_{geo_res}_' f"{signal_name[0]}.csv"
173+
export_fn = f'{date_.strftime("%Y%m%d")}_{geo_res}_{signal_name[0]}.csv'
149174
df[df["timestamp"] == date_][["geo_id", "val", "se", "sample_size", ]].to_csv(
150175
f"{export_dir}/{export_fn}", index=False, na_rep="NA"
151176
)
152-

combo_cases_and_deaths/tests/test_run.py

Lines changed: 48 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,21 @@
22
from itertools import product
33
import pytest
44
import unittest
5+
import pandas as pd
56

6-
from delphi_combo_cases_and_deaths.run import extend_raw_date_range, sensor_signal
7+
from delphi_combo_cases_and_deaths.run import extend_raw_date_range, sensor_signal, combine_usafacts_and_jhu, COLUMN_MAPPING
78
from delphi_combo_cases_and_deaths.handle_wip_signal import add_prefix
89
from delphi_utils import read_params
9-
from delphi_combo_cases_and_deaths.constants import *
10+
from delphi_combo_cases_and_deaths.constants import METRICS, SMOOTH_TYPES, SENSORS, GEO_RESOLUTIONS
1011

1112

1213
def test_issue_dates():
14+
"""The smoothed value for a particular date is computed from the raw
15+
values for a span of dates. We want users to be able to see in the
16+
API all the raw values that went into the smoothed computation,
17+
for transparency and peer review. This means that each issue
18+
should contain more days of raw data than smoothed data.
19+
"""
1320
reference_dr = [date.today(), date.today()]
1421
params = {'date_range': reference_dr}
1522
n_changed = 0
@@ -22,14 +29,16 @@ def test_issue_dates():
2229
if dr[0] != reference_dr[0]:
2330
n_changed += 1
2431
variants_changed.append(sensor_name)
25-
assert n_changed == len(variants) / 2, f"""Raw variants should post more days than smoothed.
32+
assert n_changed == len(variants) / 2, f"""
33+
Raw variants should post more days than smoothed.
2634
All variants: {variants}
2735
Date-extended variants: {variants_changed}
2836
"""
2937

3038

3139
def test_handle_wip_signal():
32-
40+
"""Verify that "wip_" prefixes are being applied appropriately.
41+
"""
3342
signal_list = [sensor_signal(metric, sensor, smoother)[1]
3443
for (metric, sensor, smoother) in
3544
product(METRICS, SENSORS, SMOOTH_TYPES)]
@@ -49,6 +58,41 @@ def test_handle_wip_signal():
4958
assert signal_names[0].startswith("wip_")
5059
assert all(not s.startswith("wip_") for s in signal_names[1:])
5160

61+
def test_unstable_sources():
62+
"""Verify that combine_usafacts_and_jhu assembles the combined data
63+
frame correctly for all cases where 0, 1, or both signals are
64+
available.
65+
"""
66+
placeholder = lambda geo: pd.DataFrame(
67+
[(date.today(),"pr" if geo == "state" else "72000",1,1,1,0)],
68+
columns="time_value geo_value value stderr sample_size direction".split())
69+
fetcher10 = lambda *x: placeholder(x[-1]) if x[0] == "usa-facts" else None
70+
fetcher01 = lambda *x: placeholder(x[-1]) if x[0] == "jhu-csse" else None
71+
fetcher11 = lambda *x: placeholder(x[-1])
72+
fetcher00 = lambda *x: None
73+
74+
date_range = [date.today(), date.today()]
75+
76+
for geo in "state county msa".split():
77+
for (fetcher, expected_size) in [
78+
(fetcher00, 0),
79+
(fetcher01, 0 if geo == "msa" else 1),
80+
(fetcher10, 1),
81+
(fetcher11, 1 if geo == "msa" else 2)
82+
]:
83+
df = combine_usafacts_and_jhu("", geo, date_range, fetcher)
84+
assert df.size == expected_size * len(COLUMN_MAPPING), f"""
85+
Wrong number of rows in combined data frame for the number of available signals.
86+
87+
input for {geo}:
88+
{fetcher('usa-facts',geo)}
89+
{fetcher('jhu-csse',geo)}
90+
91+
output:
92+
{df}
93+
94+
expected rows: {expected_size}
95+
"""
5296

5397
class MyTestCase(unittest.TestCase):
5498
pass

0 commit comments

Comments
 (0)