diff --git a/changehc/delphi_changehc/update_sensor.py b/changehc/delphi_changehc/update_sensor.py index eb68691c9..29354452b 100644 --- a/changehc/delphi_changehc/update_sensor.py +++ b/changehc/delphi_changehc/update_sensor.py @@ -11,27 +11,57 @@ # third party import numpy as np import pandas as pd -from delphi_utils import GeoMapper, add_prefix, create_export_csv, Weekday +from delphi_utils import GeoMapper, add_prefix, create_export_csv, Weekday, Nans # first party from .config import Config -from .constants import SMOOTHED, SMOOTHED_ADJ, SMOOTHED_CLI, SMOOTHED_ADJ_CLI, NA +from .constants import SMOOTHED, SMOOTHED_ADJ, SMOOTHED_CLI, SMOOTHED_ADJ_CLI from .sensor import CHCSensor +def censor_columns(df, cols, inplace=False): + """Replace values with nans in the specified columns.""" + df = df if inplace else df.copy() + df.loc[:, cols] = np.nan + return df + +def add_nancodes(df, write_se, inplace=False): + """Add nancodes to the dataframe.""" + df = df if inplace else df.copy() + + # Default missingness codes + df["missing_val"] = Nans.NOT_MISSING + df["missing_se"] = Nans.CENSORED if not write_se else Nans.NOT_MISSING + df["missing_sample_size"] = Nans.CENSORED + + # Censor those that weren't included + df.loc[~df['incl'], ["val", "se"]] = np.nan # update to this line after nancodes get merged in + df.loc[~df['incl'], ["missing_val", "missing_se"]] = Nans.CENSORED + + # Mark any remaining nans with unknown + remaining_nans_mask = df["val"].isnull() & df["missing_val"].eq(Nans.NOT_MISSING) + df.loc[remaining_nans_mask, "missing_val"] = Nans.OTHER + + remaining_nans_mask = df["se"].isnull() & df["missing_se"].eq(Nans.NOT_MISSING) + df.loc[remaining_nans_mask, "missing_se"] = Nans.OTHER + + return df + def write_to_csv(df, geo_level, write_se, day_shift, out_name, logger, output_path=".", start_date=None, end_date=None): """Write sensor values to csv. Args: df: dataframe containing unique timestamp, unqiue geo_id, val, se, sample_size geo_level: the geographic level being written e.g. county, state - write_se: boolean to write out standard errors, if true, use an obfuscated name day_shift: a timedelta specifying the time shift to apply to the dates out_name: name of the output file output_path: outfile path to write the csv (default is current directory) start_date: the first date of the dates to be written end_date: the last date of the dates to be written + logger: a logger object to log events while writing """ + logger = logging if logger is None else logger + df = df.copy() # shift dates forward for labeling @@ -40,13 +70,12 @@ def write_to_csv(df, geo_level, write_se, day_shift, out_name, logger, output_pa # suspicious value warnings suspicious_se_mask = df["se"].gt(5) assert df[suspicious_se_mask].empty, " se contains suspiciously large values" - assert not df["se"].isna().any(), " se contains nan values" + if write_se: logger.info("========= WARNING: WRITING SEs TO {0} =========".format(out_name)) else: df["se"] = np.nan - assert not df["val"].isna().any(), " val contains nan values" suspicious_val_mask = df["val"].gt(90) if not df[suspicious_val_mask].empty: for geo in df.loc[suspicious_val_mask, "geo_id"]: @@ -61,7 +90,8 @@ def write_to_csv(df, geo_level, write_se, day_shift, out_name, logger, output_pa start_date=start_date, end_date=end_date, sensor=out_name, - write_empty_days=True + write_empty_days=True, + logger=logger ) logger.debug("wrote {0} rows for {1} {2}".format( df.size, df["geo_id"].unique().size, geo_level @@ -231,14 +261,12 @@ def update_sensor(self, res = pd.DataFrame(res).loc[final_sensor_idxs] dfs.append(res) - # Form the output dataframe - df = pd.concat(dfs) - # sample size is never shared - df["sample_size"] = np.nan - # conform to naming expected by create_export_csv() - df = df.reset_index().rename(columns={"rate": "val"}) - # df.loc[~df['incl'], ["val", "se"]] = np.nan # update to this line after nancodes get merged in - df = df[df["incl"]] + # Form the output dataframe and conform to expected naming + df = pd.concat(dfs).reset_index().rename(columns={"date": "timestamp", "rate": "val"}) + + # sample size is never shared; standard error might be shared + df = censor_columns(df, ["sample_size"] if self.se else ["sample_size", "se"]) + df = add_nancodes(df, self.se) # write out results dates = write_to_csv( diff --git a/changehc/tests/test_update_sensor.py b/changehc/tests/test_update_sensor.py index 29cccc813..cf3ee6d97 100644 --- a/changehc/tests/test_update_sensor.py +++ b/changehc/tests/test_update_sensor.py @@ -11,10 +11,12 @@ from boto3 import Session from moto import mock_s3 import pytest +import mock # first party from delphi_changehc.config import Config -from delphi_changehc.update_sensor import write_to_csv, CHCSensorUpdater +from delphi_changehc.update_sensor import add_nancodes, censor_columns, write_to_csv, CHCSensorUpdater +from delphi_utils.nancodes import Nans CONFIG = Config() PARAMS = { @@ -96,7 +98,8 @@ def test_geo_reindex(self): def test_update_sensor(self): """Tests that the sensors are properly updated.""" outputs = {} - for geo in ["county", "state", "hhs", "nation"]: + geos = ["county", "state", "hhs", "nation"] + for geo in geos: td = TemporaryDirectory() su_inst = CHCSensorUpdater( "03-01-2020", @@ -127,11 +130,10 @@ def test_update_sensor(self): assert len(os.listdir(td.name)) == len(su_inst.sensor_dates),\ f"failed {geo} update sensor test" td.cleanup() - assert outputs["20200319_county_smoothed_outpatient_covid.csv"].empty - assert outputs["20200319_state_smoothed_outpatient_covid.csv"].empty - assert outputs["20200319_hhs_smoothed_outpatient_covid.csv"].empty - assert outputs["20200319_nation_smoothed_outpatient_covid.csv"].empty - + value_columns = ["val", "se", "sample_size"] + for geo in geos: + assert np.isnan(outputs["20200319_" + geo + "_smoothed_outpatient_covid.csv"][value_columns]).all().all() + assert outputs["20200319_" + geo + "_smoothed_outpatient_covid.csv"]["missing_val"].eq(3).all() class TestWriteToCsv: """Tests for writing output files to CSV.""" @@ -142,16 +144,19 @@ def test_write_to_csv_results(self): "se": [0.1, 1, 1.1] + [0.5, np.nan, 0.5], "sample_size": [np.nan] * 6, "timestamp": pd.to_datetime(["2020-05-01", "2020-05-02", "2020-05-04"] * 2), - "include": [True, True, True] + [True, False, True], + "incl": [True, True, True] + [True, False, True], "geo_id": ["a"] * 3 + ["b"] * 3, }) td = TemporaryDirectory() + res0 = censor_columns(res0, ["sample_size", "se"]) + res0 = add_nancodes(res0, write_se=False) + write_to_csv( - res0[res0['include']], + res0, geo_level="geography", - write_se=False, + write_se=True, day_shift=CONFIG.DAY_SHIFT, out_name="name_of_signal", output_path=td.name, @@ -162,7 +167,10 @@ def test_write_to_csv_results(self): expected_name = "20200502_geography_name_of_signal.csv" assert exists(join(td.name, expected_name)) output_data = pd.read_csv(join(td.name, expected_name)) - expected_columns = ["geo_id", "val", "se", "sample_size"] + expected_columns = [ + "geo_id", "val", "se", "sample_size", + "missing_val", "missing_se", "missing_sample_size" + ] assert (output_data.columns == expected_columns).all() assert (output_data.geo_id == ["a", "b"]).all() assert np.array_equal(output_data.val.values, np.array([0.1, 1])) @@ -175,8 +183,8 @@ def test_write_to_csv_results(self): assert exists(join(td.name, expected_name)) output_data = pd.read_csv(join(td.name, expected_name)) assert (output_data.columns == expected_columns).all() - assert (output_data.geo_id == ["a"]).all() - assert np.array_equal(output_data.val.values, np.array([0.5])) + assert (output_data.geo_id == ["a", "b"]).all() + assert np.array_equal(output_data.val.values, np.array([0.5, np.nan]), equal_nan=True) assert np.isnan(output_data.se.values).all() assert np.isnan(output_data.sample_size.values).all() @@ -198,13 +206,15 @@ def test_write_to_csv_with_se_results(self): "se": [0.1, 1, 1.1] + [0.5, np.nan, 0.5], "sample_size": [np.nan] * 6, "timestamp": pd.to_datetime(["2020-05-01", "2020-05-02", "2020-05-04"] * 2), - "include": [True, True, True] + [True, False, True], + "incl": [True, True, True] + [True, False, True], "geo_id": ["a"] * 3 + ["b"] * 3, }) + res0 = add_nancodes(res0, write_se=True) + td = TemporaryDirectory() write_to_csv( - res0[res0['include']], + res0, geo_level="geography", write_se=True, day_shift=CONFIG.DAY_SHIFT, @@ -215,9 +225,12 @@ def test_write_to_csv_with_se_results(self): # check outputs expected_name = "20200502_geography_name_of_signal.csv" + expected_columns = [ + "geo_id", "val", "se", "sample_size", + "missing_val", "missing_se", "missing_sample_size" + ] assert exists(join(td.name, expected_name)) output_data = pd.read_csv(join(td.name, expected_name)) - expected_columns = ["geo_id", "val", "se", "sample_size"] assert (output_data.columns == expected_columns).all() assert (output_data.geo_id == ["a", "b"]).all() assert np.array_equal(output_data.val.values, np.array([0.1, 1])) @@ -225,54 +238,33 @@ def test_write_to_csv_with_se_results(self): assert np.isnan(output_data.sample_size.values).all() td.cleanup() - def test_write_to_csv_wrong_results(self): - """Tests that nonsensical inputs trigger exceptions.""" + def test_suspicious_value_logging(self): res0 = pd.DataFrame({ - "val": [0.1, 0.5, 1.5] + [1, 2, 3], - "se": [0.1, 1, 1.1] + [0.5, 0.5, 0.5], + "val": [91, 0.5, 1.5] + [1, 2, 3], + "se": [0.1, 1, 1.1] + [0.5, np.nan, 0.5], "sample_size": [np.nan] * 6, "timestamp": pd.to_datetime(["2020-05-01", "2020-05-02", "2020-05-04"] * 2), - "include": [True, True, True] + [True, False, True], + "incl": [True, True, True] + [True, False, True], "geo_id": ["a"] * 3 + ["b"] * 3, - }).set_index(["timestamp", "geo_id"]).sort_index() - - td = TemporaryDirectory() + }) - # nan value for included loc-date - res1 = res0.copy() - res1 = res1[res1['include']] - res1.loc[("2020-05-01", "a"), "val"] = np.nan - res1.reset_index(inplace=True) - with pytest.raises(AssertionError): - write_to_csv( - res1, - geo_level="geography", - write_se=False, - day_shift=CONFIG.DAY_SHIFT, - out_name="name_of_signal", - output_path=td.name, - logger=TEST_LOGGER - ) + res0 = add_nancodes(res0, write_se=True) - # nan se for included loc-date - res2 = res0.copy() - res2 = res2[res2['include']] - res2.loc[("2020-05-01", "a"), "se"] = np.nan - res2.reset_index(inplace=True) - with pytest.raises(AssertionError): - write_to_csv( - res2, - geo_level="geography", - write_se=True, - day_shift=CONFIG.DAY_SHIFT, - out_name="name_of_signal", - output_path=td.name, - logger=TEST_LOGGER - ) + mock_logger = mock.Mock() + td = TemporaryDirectory() + write_to_csv( + res0, + geo_level="geography", + write_se=True, + day_shift=CONFIG.DAY_SHIFT, + out_name="name_of_signal", + output_path=td.name, + logger=mock_logger + ) # large se value - res3 = res0.copy() - res3 = res3[res3['include']] + res3 = res0.copy().set_index(["timestamp", "geo_id"]) + res3 = res3[res3['incl']] res3.loc[("2020-05-01", "a"), "se"] = 10 res3.reset_index(inplace=True) with pytest.raises(AssertionError): @@ -286,4 +278,47 @@ def test_write_to_csv_wrong_results(self): logger=TEST_LOGGER ) - td.cleanup() + mock_logger.warning.assert_called_once_with( + "value suspiciously high, {0}: {1}".format("a", "name_of_signal") + ) + + def test_add_nancodes(self): + """Tests that nancodes are correctly addded.""" + res0 = pd.DataFrame({ + "val": [np.nan, 0.5, 1.5] + [1, 2, 3], + "se": [np.nan, 1, 1.1] + [np.nan, np.nan, 0.5], + "sample_size": [np.nan] * 6, + "timestamp": pd.to_datetime(["2020-05-01", "2020-05-02", "2020-05-04"] * 2), + "incl": [True, True, True] + [True, False, True], + "geo_id": ["a"] * 3 + ["b"] * 3, + }).set_index(["timestamp", "geo_id"]).sort_index() + + expected_df = pd.DataFrame({ + "val": [np.nan, 0.5, 1.5] + [1, np.nan, 3], + "se": [np.nan, 1, 1.1] + [np.nan, np.nan, 0.5], + "sample_size": [np.nan] * 6, + "timestamp": pd.to_datetime(["2020-05-01", "2020-05-02", "2020-05-04"] * 2), + "incl": [True, True, True] + [True, False, True], + "geo_id": ["a"] * 3 + ["b"] * 3, + "missing_val": [Nans.OTHER] + [Nans.NOT_MISSING] * 3 + [Nans.CENSORED, Nans.NOT_MISSING], + "missing_se": [Nans.OTHER] + [Nans.NOT_MISSING] * 2 + [Nans.OTHER, Nans.CENSORED, Nans.NOT_MISSING], + "missing_sample_size": [Nans.CENSORED] * 6, + }).set_index(["timestamp", "geo_id"]).sort_index() + + res = censor_columns(res0, ["sample_size"]) + pd.testing.assert_frame_equal(expected_df, add_nancodes(res, write_se=True)) + + expected_df = pd.DataFrame({ + "val": [np.nan, 0.5, 1.5] + [1, np.nan, 3], + "se": [np.nan] * 6, + "sample_size": [np.nan] * 6, + "timestamp": pd.to_datetime(["2020-05-01", "2020-05-02", "2020-05-04"] * 2), + "incl": [True, True, True] + [True, False, True], + "geo_id": ["a"] * 3 + ["b"] * 3, + "missing_val": [Nans.OTHER] + [Nans.NOT_MISSING] * 3 + [Nans.CENSORED, Nans.NOT_MISSING], + "missing_se": [Nans.CENSORED] * 6, + "missing_sample_size": [Nans.CENSORED] * 6, + }).set_index(["timestamp", "geo_id"]).sort_index() + + res = censor_columns(res0, ["sample_size", "se"]) + pd.testing.assert_frame_equal(expected_df, add_nancodes(res, write_se=False))