diff --git a/jhu/delphi_jhu/run.py b/jhu/delphi_jhu/run.py index a6ff7d00b..278761a48 100644 --- a/jhu/delphi_jhu/run.py +++ b/jhu/delphi_jhu/run.py @@ -9,12 +9,14 @@ import time from typing import Dict, Any +import pandas as pd import numpy as np from delphi_utils import ( create_export_csv, Smoother, GeoMapper, get_structured_logger, + Nans, ) from .geo import geo_map @@ -62,6 +64,34 @@ ] +def add_nancodes(df, metric, geo_res, smoother): + """Add nancodes to the dataframe.""" + idx = pd.IndexSlice + + # Default missingness codes + df["missing_val"] = Nans.NOT_MISSING + df["missing_se"] = Nans.NOT_APPLICABLE + df["missing_sample_size"] = Nans.NOT_APPLICABLE + + # Mark early smoothing entries as data insufficient + if smoother == "seven_day_average": + df.sort_index(inplace=True) + min_time_value = df.index.min()[0] + 5 * pd.Timedelta(days=1) + df.loc[idx[:min_time_value, :], "missing_val"] = Nans.CENSORED + + # Mark Puerto Rico county deaths with a region exception code + # Search "Puerto Rico" here for details: + # https://github.com/CSSEGISandData/COVID-19/tree/master/csse_covid_19_data + if metric == "deaths" and geo_res == "county": + puerto_rico_fips = ["72" + str(i).zfill(3) for i in range(1, 155)] + df.loc[idx[:, puerto_rico_fips], "missing_val"] = Nans.REGION_EXCEPTION + + # Mark any remaining nans with unknown + remaining_nans_mask = df["val"].isnull() & df["missing_val"].eq(Nans.NOT_MISSING) + df.loc[remaining_nans_mask, "missing_val"] = Nans.OTHER + return df + + def run_module(params: Dict[str, Any]): """Run the JHU indicator module. @@ -85,8 +115,10 @@ def run_module(params: Dict[str, Any]): export_dir = params["common"]["export_dir"] base_url = params["indicator"]["base_url"] logger = get_structured_logger( - __name__, filename=params["common"].get("log_filename"), - log_exceptions=params["common"].get("log_exceptions", True)) + __name__, + filename=params["common"].get("log_filename"), + log_exceptions=params["common"].get("log_exceptions", True), + ) gmpr = GeoMapper() dfs = {metric: pull_jhu_data(base_url, metric, gmpr) for metric in METRICS} @@ -100,16 +132,22 @@ def run_module(params: Dict[str, Any]): metric=metric, geo_res=geo_res, sensor=sensor, - smoother=smoother) + smoother=smoother, + ) df = dfs[metric] # Aggregate to appropriate geographic resolution df = geo_map(df, geo_res, sensor) df.set_index(["timestamp", "geo_id"], inplace=True) + + # Smooth df["val"] = df[sensor].groupby(level=1).transform(SMOOTHERS_MAP[smoother][0]) + + # JHU is not a survey data source df["se"] = np.nan df["sample_size"] = np.nan - # Drop early entries where data insufficient for smoothing - df = df[~df["val"].isnull()] + + df = add_nancodes(df, metric, geo_res, smoother) + df = df.reset_index() sensor_name = SENSOR_NAME_MAP[sensor][0] # if (SENSOR_NAME_MAP[sensor][1] or SMOOTHERS_MAP[smoother][2]): @@ -129,16 +167,21 @@ def run_module(params: Dict[str, Any]): if not oldest_final_export_date: oldest_final_export_date = max(exported_csv_dates) oldest_final_export_date = min( - oldest_final_export_date, max(exported_csv_dates)) + oldest_final_export_date, max(exported_csv_dates) + ) elapsed_time_in_seconds = round(time.time() - start_time, 2) max_lag_in_days = None formatted_oldest_final_export_date = None if oldest_final_export_date: max_lag_in_days = (datetime.now() - oldest_final_export_date).days - formatted_oldest_final_export_date = oldest_final_export_date.strftime("%Y-%m-%d") - logger.info("Completed indicator run", - elapsed_time_in_seconds = elapsed_time_in_seconds, - csv_export_count = csv_export_count, - max_lag_in_days = max_lag_in_days, - oldest_final_export_date = formatted_oldest_final_export_date) + formatted_oldest_final_export_date = oldest_final_export_date.strftime( + "%Y-%m-%d" + ) + logger.info( + "Completed indicator run", + elapsed_time_in_seconds=elapsed_time_in_seconds, + csv_export_count=csv_export_count, + max_lag_in_days=max_lag_in_days, + oldest_final_export_date=formatted_oldest_final_export_date, + ) diff --git a/jhu/tests/test_run.py b/jhu/tests/test_run.py index 1ff1cc1dd..f4847bdab 100644 --- a/jhu/tests/test_run.py +++ b/jhu/tests/test_run.py @@ -2,46 +2,89 @@ from os.path import join, basename import pandas as pd +import numpy as np +from delphi_jhu.run import add_nancodes +from delphi_utils import Nans +def _non_ignored_files_set(directory): + """List all files in a directory not preceded by a '.' and store them in a set.""" + out = {fname for fname in listdir(directory) if not basename(fname).startswith(".")} + return out + class TestRun: def test_output_files_exist(self, run_as_module): - - csv_files = [x for x in listdir("receiving") if not basename(x).startswith(".")] - - dates = [ - "20200303", - "20200304", - "20200305", - "20200306", - "20200307", - "20200308", - "20200309", - "20200310", - ] + csv_files = _non_ignored_files_set("receiving") + dates = [d.strftime("%Y%m%d") for d in pd.date_range("20200303", "20200310")] geos = ["county", "hrr", "msa", "state", "hhs", "nation"] - metrics = [] - for event in ["confirmed", "deaths"]: - for smoothing in ["", "_7dav"]: - for window in ["incidence", "cumulative"]: - for stat in ["num", "prop"]: - metrics.append(f"{event}{smoothing}_{window}_{stat}") - - expected_files = [] - for date in dates: - for geo in geos: - for metric in metrics: - if "7dav" in metric and "cumulative" in metric: - continue - # Can't compute 7dav for first few days of data because of NAs - if date > "20200305" or "7dav" not in metric: - expected_files += [date + "_" + geo + "_" + metric + ".csv"] - - assert set(csv_files) == set(expected_files) + metrics = [ + f"{event}{smoothing}_{window}_{stat}" + for event in ["confirmed", "deaths"] + for smoothing in ["", "_7dav"] + for window in ["incidence", "cumulative"] + for stat in ["num", "prop"] + ] + expected_files = { + f"{date}_{geo}_{metric}.csv" + for date in dates + for geo in geos + for metric in metrics + if not ("7dav" in metric and "cumulative" in metric) + } + + assert csv_files == expected_files def test_output_file_format(self, run_as_module): - df = pd.read_csv( - join("receiving", "20200310_state_confirmed_cumulative_num.csv") - ) - assert (df.columns.values == ["geo_id", "val", "se", "sample_size"]).all() + df = pd.read_csv(join("receiving", "20200310_state_confirmed_cumulative_num.csv")) + expected_columns = [ + "geo_id", + "val", + "se", + "sample_size", + "missing_val", + "missing_se", + "missing_sample_size", + ] + assert (df.columns.values == expected_columns).all() + + def test_add_nancodes(self): + df = pd.DataFrame({ + "timestamp": pd.date_range("20200321", "20200328"), + "geo_id": ["01017", "01043", "01061", "01103", "02282", "72001", "31000", "49000"], + "val": [0.1, 0.2, 0.3, 0.4, 0.5, np.nan, 0.7, np.nan], + "se": [np.nan] * 8, + "sample_size": [np.nan] * 8 + }).set_index(["timestamp", "geo_id"]) + expected_df = pd.DataFrame({ + "timestamp": pd.date_range("20200321", "20200328"), + "geo_id": ["01017", "01043", "01061", "01103", "02282", "72001", "31000", "49000"], + "val": [0.1, 0.2, 0.3, 0.4, 0.5, np.nan, 0.7, np.nan], + "se": [np.nan] * 8, + "sample_size": [np.nan] * 8, + "missing_val": [Nans.NOT_MISSING] * 5 + [Nans.REGION_EXCEPTION, Nans.NOT_MISSING, Nans.OTHER], + "missing_se": [Nans.NOT_APPLICABLE] * 8, + "missing_sample_size": [Nans.NOT_APPLICABLE] * 8, + }).set_index(["timestamp", "geo_id"]) + + pd.testing.assert_frame_equal(add_nancodes(df, "deaths", "county", None), expected_df) + + df2 = pd.DataFrame({ + "timestamp": pd.date_range("20200321", "20200328"), + "geo_id": ["01017", "01043", "01061", "01103", "02282", "72001", "31000", "49000"], + "val": [np.nan] * 6 + [0.7, np.nan], + "se": [np.nan] * 8, + "sample_size": [np.nan] * 8 + }).set_index(["timestamp", "geo_id"]) + expected_df2 = pd.DataFrame({ + "timestamp": pd.date_range("20200321", "20200328"), + "geo_id": ["01017", "01043", "01061", "01103", "02282", "72001", "31000", "49000"], + "val": [np.nan] * 6 + [0.7, np.nan], + "se": [np.nan] * 8, + "sample_size": [np.nan] * 8, + "missing_val": [Nans.CENSORED] * 5 + [Nans.REGION_EXCEPTION, Nans.NOT_MISSING, Nans.OTHER], + "missing_se": [Nans.NOT_APPLICABLE] * 8, + "missing_sample_size": [Nans.NOT_APPLICABLE] * 8, + }).set_index(["timestamp", "geo_id"]) + + pd.testing.assert_frame_equal(add_nancodes(df2, "deaths", "county", "seven_day_average"), expected_df2)