Skip to content

Make combo cases/deaths robust to source outages #343

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Oct 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,4 @@
"state",
"msa",
"hrr",
]
]
102 changes: 63 additions & 39 deletions combo_cases_and_deaths/delphi_combo_cases_and_deaths/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,46 +9,68 @@
from datetime import date, timedelta, datetime
from itertools import product
import re
import sys

import covidcast
import pandas as pd

from delphi_utils import read_params, create_export_csv
from .constants import *
from .handle_wip_signal import *
from delphi_utils import read_params
from .constants import METRICS, SMOOTH_TYPES, SENSORS, GEO_RESOLUTIONS
from .handle_wip_signal import add_prefix


def check_not_none(data_frame, label, date_range):
"""Exit gracefully if a data frame we attempted to retrieve is empty"""
def check_none_data_frame(data_frame, label, date_range):
"""Log and return True when a data frame is None."""
if data_frame is None:
print(f"{label} not available in range {date_range}")
sys.exit(1)
print(f"{label} completely unavailable in range {date_range}")
return True
return False

def combine_usafacts_and_jhu(signal, geo, date_range):
def maybe_append(df1, df2):
"""
If both data frames are available, append them and return. Otherwise, return
whichever frame is not None.
"""
if df1 is None:
return df2
if df2 is None:
return df1
return df1.append(df2)

COLUMN_MAPPING = {"time_value": "timestamp",
"geo_value": "geo_id",
"value": "val",
"stderr": "se",
"sample_size": "sample_size"}
def combine_usafacts_and_jhu(signal, geo, date_range, fetcher=covidcast.signal):
"""
Add rows for PR from JHU signals to USA-FACTS signals
"""
usafacts_df = covidcast.signal("usa-facts", signal, date_range[0], date_range[1], geo)
jhu_df = covidcast.signal("jhu-csse", signal, date_range[0], date_range[1], geo)
check_not_none(usafacts_df, "USA-FACTS", date_range)
check_not_none(jhu_df, "JHU", date_range)
print("Fetching usa-facts...")
usafacts_df = fetcher("usa-facts", signal, date_range[0], date_range[1], geo)
print("Fetching jhu-csse...")
jhu_df = fetcher("jhu-csse", signal, date_range[0], date_range[1], geo)

if check_none_data_frame(usafacts_df, "USA-FACTS", date_range) and \
(geo not in ('state', 'county') or \
check_none_data_frame(jhu_df, "JHU", date_range)):
return pd.DataFrame({}, columns=COLUMN_MAPPING.values())

# State level
if geo == 'state':
combined_df = usafacts_df.append(jhu_df[jhu_df["geo_value"] == 'pr'])
combined_df = maybe_append(
usafacts_df,
jhu_df if jhu_df is None else jhu_df[jhu_df["geo_value"] == 'pr'])
# County level
elif geo == 'county':
combined_df = usafacts_df.append(jhu_df[jhu_df["geo_value"] == '72000'])
combined_df = maybe_append(
usafacts_df,
jhu_df if jhu_df is None else jhu_df[jhu_df["geo_value"] == '72000'])
# For MSA and HRR level, they are the same
else:
combined_df = usafacts_df

combined_df = combined_df.drop(["direction"], axis=1)
combined_df = combined_df.rename({"time_value": "timestamp",
"geo_value": "geo_id",
"value": "val",
"stderr": "se"},
combined_df = combined_df.rename(COLUMN_MAPPING,
axis=1)
return combined_df

Expand Down Expand Up @@ -83,15 +105,12 @@ def sensor_signal(metric, sensor, smoother):
sensor_name = "_".join([smoother, sensor])
else:
sensor_name = sensor
signal = "_".join([metric, sensor_name])
return sensor_name, signal

def run_module():
"""Produce a combined cases and deaths signal using data from JHU and USA Facts"""
variants = [tuple((metric, geo_res)+sensor_signal(metric, sensor, smoother))
for (metric, geo_res, sensor, smoother) in
product(METRICS, GEO_RESOLUTIONS, SENSORS, SMOOTH_TYPES)]
return sensor_name, "_".join([metric, sensor_name])

def configure(variants):
"""
Validate params file and set date range.
"""
params = read_params()
params['export_start_date'] = date(*params['export_start_date'])
yesterday = date.today() - timedelta(days=1)
Expand All @@ -112,30 +131,36 @@ def run_module():
# create combined files for all of the historical reports
params['date_range'] = [params['export_start_date'], yesterday]
else:
pattern = re.compile(r'^\d{8}-\d{8}$')
match_res = re.findall(pattern, params['date_range'])
match_res = re.findall(re.compile(r'^\d{8}-\d{8}$'), params['date_range'])
if len(match_res) == 0:
raise ValueError(
"Invalid date_range parameter. Please choose from (new, all, yyyymmdd-yyyymmdd).")
try:
date1 = datetime.strptime(params['date_range'][:8], '%Y%m%d').date()
except ValueError:
raise ValueError("Invalid date_range parameter. Please check the first date.")
except ValueError as error:
raise ValueError(
"Invalid date_range parameter. Please check the first date.") from error
try:
date2 = datetime.strptime(params['date_range'][-8:], '%Y%m%d').date()
except ValueError:
raise ValueError("Invalid date_range parameter. Please check the second date.")
except ValueError as error:
raise ValueError(
"Invalid date_range parameter. Please check the second date.") from error

#The the valid start date
if date1 < params['export_start_date']:
date1 = params['export_start_date']
params['date_range'] = [date1, date2]
return params

for metric, geo_res, sensor_name, signal in variants:

df = combine_usafacts_and_jhu(signal, geo_res, extend_raw_date_range(params, sensor_name))

df = df.copy()
def run_module():
"""Produce a combined cases and deaths signal using data from JHU and USA Facts"""
variants = [tuple((metric, geo_res)+sensor_signal(metric, sensor, smoother))
for (metric, geo_res, sensor, smoother) in
product(METRICS, GEO_RESOLUTIONS, SENSORS, SMOOTH_TYPES)]
params = configure(variants)
for metric, geo_res, sensor_name, signal in variants:
df = combine_usafacts_and_jhu(signal, geo_res, extend_raw_date_range(params, sensor_name)) # pylint: disable=invalid-name
df["timestamp"] = pd.to_datetime(df["timestamp"])
start_date = pd.to_datetime(params['export_start_date'])
export_dir = params["export_dir"]
Expand All @@ -145,8 +170,7 @@ def run_module():

signal_name = add_prefix([signal], wip_signal=params["wip_signal"], prefix="wip_")
for date_ in dates:
export_fn = f'{date_.strftime("%Y%m%d")}_{geo_res}_' f"{signal_name[0]}.csv"
export_fn = f'{date_.strftime("%Y%m%d")}_{geo_res}_{signal_name[0]}.csv'
df[df["timestamp"] == date_][["geo_id", "val", "se", "sample_size", ]].to_csv(
f"{export_dir}/{export_fn}", index=False, na_rep="NA"
)

52 changes: 48 additions & 4 deletions combo_cases_and_deaths/tests/test_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,21 @@
from itertools import product
import pytest
import unittest
import pandas as pd

from delphi_combo_cases_and_deaths.run import extend_raw_date_range, sensor_signal
from delphi_combo_cases_and_deaths.run import extend_raw_date_range, sensor_signal, combine_usafacts_and_jhu, COLUMN_MAPPING
from delphi_combo_cases_and_deaths.handle_wip_signal import add_prefix
from delphi_utils import read_params
from delphi_combo_cases_and_deaths.constants import *
from delphi_combo_cases_and_deaths.constants import METRICS, SMOOTH_TYPES, SENSORS, GEO_RESOLUTIONS


def test_issue_dates():
"""The smoothed value for a particular date is computed from the raw
values for a span of dates. We want users to be able to see in the
API all the raw values that went into the smoothed computation,
for transparency and peer review. This means that each issue
should contain more days of raw data than smoothed data.
"""
reference_dr = [date.today(), date.today()]
params = {'date_range': reference_dr}
n_changed = 0
Expand All @@ -22,14 +29,16 @@ def test_issue_dates():
if dr[0] != reference_dr[0]:
n_changed += 1
variants_changed.append(sensor_name)
assert n_changed == len(variants) / 2, f"""Raw variants should post more days than smoothed.
assert n_changed == len(variants) / 2, f"""
Raw variants should post more days than smoothed.
All variants: {variants}
Date-extended variants: {variants_changed}
"""


def test_handle_wip_signal():

"""Verify that "wip_" prefixes are being applied appropriately.
"""
signal_list = [sensor_signal(metric, sensor, smoother)[1]
for (metric, sensor, smoother) in
product(METRICS, SENSORS, SMOOTH_TYPES)]
Expand All @@ -49,6 +58,41 @@ def test_handle_wip_signal():
assert signal_names[0].startswith("wip_")
assert all(not s.startswith("wip_") for s in signal_names[1:])

def test_unstable_sources():
"""Verify that combine_usafacts_and_jhu assembles the combined data
frame correctly for all cases where 0, 1, or both signals are
available.
"""
placeholder = lambda geo: pd.DataFrame(
[(date.today(),"pr" if geo == "state" else "72000",1,1,1,0)],
columns="time_value geo_value value stderr sample_size direction".split())
fetcher10 = lambda *x: placeholder(x[-1]) if x[0] == "usa-facts" else None
fetcher01 = lambda *x: placeholder(x[-1]) if x[0] == "jhu-csse" else None
fetcher11 = lambda *x: placeholder(x[-1])
fetcher00 = lambda *x: None

date_range = [date.today(), date.today()]

for geo in "state county msa".split():
for (fetcher, expected_size) in [
(fetcher00, 0),
(fetcher01, 0 if geo == "msa" else 1),
(fetcher10, 1),
(fetcher11, 1 if geo == "msa" else 2)
]:
df = combine_usafacts_and_jhu("", geo, date_range, fetcher)
assert df.size == expected_size * len(COLUMN_MAPPING), f"""
Wrong number of rows in combined data frame for the number of available signals.

input for {geo}:
{fetcher('usa-facts',geo)}
{fetcher('jhu-csse',geo)}

output:
{df}

expected rows: {expected_size}
"""

class MyTestCase(unittest.TestCase):
pass
Expand Down