Skip to content

Commit 1a00a25

Browse files
committed
Nans changehc:
* allow nan values, add missing columns, and test
1 parent 0b7103a commit 1a00a25

File tree

2 files changed

+134
-71
lines changed

2 files changed

+134
-71
lines changed

changehc/delphi_changehc/update_sensor.py

Lines changed: 42 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -11,27 +11,57 @@
1111
# third party
1212
import numpy as np
1313
import pandas as pd
14-
from delphi_utils import GeoMapper, add_prefix, create_export_csv, Weekday
14+
from delphi_utils import GeoMapper, add_prefix, create_export_csv, Weekday, Nans
1515

1616
# first party
1717
from .config import Config
18-
from .constants import SMOOTHED, SMOOTHED_ADJ, SMOOTHED_CLI, SMOOTHED_ADJ_CLI, NA
18+
from .constants import SMOOTHED, SMOOTHED_ADJ, SMOOTHED_CLI, SMOOTHED_ADJ_CLI
1919
from .sensor import CHCSensor
2020

2121

22+
def censor_columns(df, cols, inplace=False):
23+
"""Replace values with nans in the specified columns."""
24+
df = df if inplace else df.copy()
25+
df.loc[:, cols] = np.nan
26+
return df
27+
28+
def add_nancodes(df, write_se, inplace=False):
29+
"""Add nancodes to the dataframe."""
30+
df = df if inplace else df.copy()
31+
32+
# Default missingness codes
33+
df["missing_val"] = Nans.NOT_MISSING
34+
df["missing_se"] = Nans.CENSORED if not write_se else Nans.NOT_MISSING
35+
df["missing_sample_size"] = Nans.CENSORED
36+
37+
# Censor those that weren't included
38+
df.loc[~df['incl'], ["val", "se"]] = np.nan # update to this line after nancodes get merged in
39+
df.loc[~df['incl'], ["missing_val", "missing_se"]] = Nans.CENSORED
40+
41+
# Mark any remaining nans with unknown
42+
remaining_nans_mask = df["val"].isnull() & df["missing_val"].eq(Nans.NOT_MISSING)
43+
df.loc[remaining_nans_mask, "missing_val"] = Nans.OTHER
44+
45+
remaining_nans_mask = df["se"].isnull() & df["missing_se"].eq(Nans.NOT_MISSING)
46+
df.loc[remaining_nans_mask, "missing_se"] = Nans.OTHER
47+
48+
return df
49+
2250
def write_to_csv(df, geo_level, write_se, day_shift, out_name, logger, output_path=".", start_date=None, end_date=None):
2351
"""Write sensor values to csv.
2452
2553
Args:
2654
df: dataframe containing unique timestamp, unqiue geo_id, val, se, sample_size
2755
geo_level: the geographic level being written e.g. county, state
28-
write_se: boolean to write out standard errors, if true, use an obfuscated name
2956
day_shift: a timedelta specifying the time shift to apply to the dates
3057
out_name: name of the output file
3158
output_path: outfile path to write the csv (default is current directory)
3259
start_date: the first date of the dates to be written
3360
end_date: the last date of the dates to be written
61+
logger: a logger object to log events while writing
3462
"""
63+
logger = logging if logger is None else logger
64+
3565
df = df.copy()
3666

3767
# shift dates forward for labeling
@@ -40,13 +70,12 @@ def write_to_csv(df, geo_level, write_se, day_shift, out_name, logger, output_pa
4070
# suspicious value warnings
4171
suspicious_se_mask = df["se"].gt(5)
4272
assert df[suspicious_se_mask].empty, " se contains suspiciously large values"
43-
assert not df["se"].isna().any(), " se contains nan values"
73+
4474
if write_se:
4575
logger.info("========= WARNING: WRITING SEs TO {0} =========".format(out_name))
4676
else:
4777
df["se"] = np.nan
4878

49-
assert not df["val"].isna().any(), " val contains nan values"
5079
suspicious_val_mask = df["val"].gt(90)
5180
if not df[suspicious_val_mask].empty:
5281
for geo in df.loc[suspicious_val_mask, "geo_id"]:
@@ -61,7 +90,8 @@ def write_to_csv(df, geo_level, write_se, day_shift, out_name, logger, output_pa
6190
start_date=start_date,
6291
end_date=end_date,
6392
sensor=out_name,
64-
write_empty_days=True
93+
write_empty_days=True,
94+
logger=logger
6595
)
6696
logger.debug("wrote {0} rows for {1} {2}".format(
6797
df.size, df["geo_id"].unique().size, geo_level
@@ -231,14 +261,12 @@ def update_sensor(self,
231261
res = pd.DataFrame(res).loc[final_sensor_idxs]
232262
dfs.append(res)
233263

234-
# Form the output dataframe
235-
df = pd.concat(dfs)
236-
# sample size is never shared
237-
df["sample_size"] = np.nan
238-
# conform to naming expected by create_export_csv()
239-
df = df.reset_index().rename(columns={"rate": "val"})
240-
# df.loc[~df['incl'], ["val", "se"]] = np.nan # update to this line after nancodes get merged in
241-
df = df[df["incl"]]
264+
# Form the output dataframe and conform to expected naming
265+
df = pd.concat(dfs).reset_index().rename(columns={"date": "timestamp", "rate": "val"})
266+
267+
# sample size is never shared; standard error might be shared
268+
df = censor_columns(df, ["sample_size"] if self.se else ["sample_size", "se"])
269+
df = add_nancodes(df, self.se)
242270

243271
# write out results
244272
dates = write_to_csv(

changehc/tests/test_update_sensor.py

Lines changed: 92 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,12 @@
1111
from boto3 import Session
1212
from moto import mock_s3
1313
import pytest
14+
import mock
1415

1516
# first party
1617
from delphi_changehc.config import Config
17-
from delphi_changehc.update_sensor import write_to_csv, CHCSensorUpdater
18+
from delphi_changehc.update_sensor import add_nancodes, censor_columns, write_to_csv, CHCSensorUpdater
19+
from delphi_utils.nancodes import Nans
1820

1921
CONFIG = Config()
2022
PARAMS = {
@@ -96,7 +98,8 @@ def test_geo_reindex(self):
9698
def test_update_sensor(self):
9799
"""Tests that the sensors are properly updated."""
98100
outputs = {}
99-
for geo in ["county", "state", "hhs", "nation"]:
101+
geos = ["county", "state", "hhs", "nation"]
102+
for geo in geos:
100103
td = TemporaryDirectory()
101104
su_inst = CHCSensorUpdater(
102105
"03-01-2020",
@@ -127,11 +130,10 @@ def test_update_sensor(self):
127130
assert len(os.listdir(td.name)) == len(su_inst.sensor_dates),\
128131
f"failed {geo} update sensor test"
129132
td.cleanup()
130-
assert outputs["20200319_county_smoothed_outpatient_covid.csv"].empty
131-
assert outputs["20200319_state_smoothed_outpatient_covid.csv"].empty
132-
assert outputs["20200319_hhs_smoothed_outpatient_covid.csv"].empty
133-
assert outputs["20200319_nation_smoothed_outpatient_covid.csv"].empty
134-
133+
value_columns = ["val", "se", "sample_size"]
134+
for geo in geos:
135+
assert np.isnan(outputs["20200319_" + geo + "_smoothed_outpatient_covid.csv"][value_columns]).all().all()
136+
assert outputs["20200319_" + geo + "_smoothed_outpatient_covid.csv"]["missing_val"].eq(3).all()
135137

136138
class TestWriteToCsv:
137139
"""Tests for writing output files to CSV."""
@@ -142,16 +144,19 @@ def test_write_to_csv_results(self):
142144
"se": [0.1, 1, 1.1] + [0.5, np.nan, 0.5],
143145
"sample_size": [np.nan] * 6,
144146
"timestamp": pd.to_datetime(["2020-05-01", "2020-05-02", "2020-05-04"] * 2),
145-
"include": [True, True, True] + [True, False, True],
147+
"incl": [True, True, True] + [True, False, True],
146148
"geo_id": ["a"] * 3 + ["b"] * 3,
147149
})
148150

149151
td = TemporaryDirectory()
150152

153+
res0 = censor_columns(res0, ["sample_size", "se"])
154+
res0 = add_nancodes(res0, write_se=False)
155+
151156
write_to_csv(
152-
res0[res0['include']],
157+
res0,
153158
geo_level="geography",
154-
write_se=False,
159+
write_se=True,
155160
day_shift=CONFIG.DAY_SHIFT,
156161
out_name="name_of_signal",
157162
output_path=td.name,
@@ -162,7 +167,10 @@ def test_write_to_csv_results(self):
162167
expected_name = "20200502_geography_name_of_signal.csv"
163168
assert exists(join(td.name, expected_name))
164169
output_data = pd.read_csv(join(td.name, expected_name))
165-
expected_columns = ["geo_id", "val", "se", "sample_size"]
170+
expected_columns = [
171+
"geo_id", "val", "se", "sample_size",
172+
"missing_val", "missing_se", "missing_sample_size"
173+
]
166174
assert (output_data.columns == expected_columns).all()
167175
assert (output_data.geo_id == ["a", "b"]).all()
168176
assert np.array_equal(output_data.val.values, np.array([0.1, 1]))
@@ -175,8 +183,8 @@ def test_write_to_csv_results(self):
175183
assert exists(join(td.name, expected_name))
176184
output_data = pd.read_csv(join(td.name, expected_name))
177185
assert (output_data.columns == expected_columns).all()
178-
assert (output_data.geo_id == ["a"]).all()
179-
assert np.array_equal(output_data.val.values, np.array([0.5]))
186+
assert (output_data.geo_id == ["a", "b"]).all()
187+
assert np.array_equal(output_data.val.values, np.array([0.5, np.nan]), equal_nan=True)
180188
assert np.isnan(output_data.se.values).all()
181189
assert np.isnan(output_data.sample_size.values).all()
182190

@@ -198,13 +206,15 @@ def test_write_to_csv_with_se_results(self):
198206
"se": [0.1, 1, 1.1] + [0.5, np.nan, 0.5],
199207
"sample_size": [np.nan] * 6,
200208
"timestamp": pd.to_datetime(["2020-05-01", "2020-05-02", "2020-05-04"] * 2),
201-
"include": [True, True, True] + [True, False, True],
209+
"incl": [True, True, True] + [True, False, True],
202210
"geo_id": ["a"] * 3 + ["b"] * 3,
203211
})
204212

213+
res0 = add_nancodes(res0, write_se=True)
214+
205215
td = TemporaryDirectory()
206216
write_to_csv(
207-
res0[res0['include']],
217+
res0,
208218
geo_level="geography",
209219
write_se=True,
210220
day_shift=CONFIG.DAY_SHIFT,
@@ -215,64 +225,46 @@ def test_write_to_csv_with_se_results(self):
215225

216226
# check outputs
217227
expected_name = "20200502_geography_name_of_signal.csv"
228+
expected_columns = [
229+
"geo_id", "val", "se", "sample_size",
230+
"missing_val", "missing_se", "missing_sample_size"
231+
]
218232
assert exists(join(td.name, expected_name))
219233
output_data = pd.read_csv(join(td.name, expected_name))
220-
expected_columns = ["geo_id", "val", "se", "sample_size"]
221234
assert (output_data.columns == expected_columns).all()
222235
assert (output_data.geo_id == ["a", "b"]).all()
223236
assert np.array_equal(output_data.val.values, np.array([0.1, 1]))
224237
assert np.array_equal(output_data.se.values, np.array([0.1, 0.5]))
225238
assert np.isnan(output_data.sample_size.values).all()
226239
td.cleanup()
227240

228-
def test_write_to_csv_wrong_results(self):
229-
"""Tests that nonsensical inputs trigger exceptions."""
241+
def test_suspicious_value_logging(self):
230242
res0 = pd.DataFrame({
231-
"val": [0.1, 0.5, 1.5] + [1, 2, 3],
232-
"se": [0.1, 1, 1.1] + [0.5, 0.5, 0.5],
243+
"val": [91, 0.5, 1.5] + [1, 2, 3],
244+
"se": [0.1, 1, 1.1] + [0.5, np.nan, 0.5],
233245
"sample_size": [np.nan] * 6,
234246
"timestamp": pd.to_datetime(["2020-05-01", "2020-05-02", "2020-05-04"] * 2),
235-
"include": [True, True, True] + [True, False, True],
247+
"incl": [True, True, True] + [True, False, True],
236248
"geo_id": ["a"] * 3 + ["b"] * 3,
237-
}).set_index(["timestamp", "geo_id"]).sort_index()
238-
239-
td = TemporaryDirectory()
249+
})
240250

241-
# nan value for included loc-date
242-
res1 = res0.copy()
243-
res1 = res1[res1['include']]
244-
res1.loc[("2020-05-01", "a"), "val"] = np.nan
245-
res1.reset_index(inplace=True)
246-
with pytest.raises(AssertionError):
247-
write_to_csv(
248-
res1,
249-
geo_level="geography",
250-
write_se=False,
251-
day_shift=CONFIG.DAY_SHIFT,
252-
out_name="name_of_signal",
253-
output_path=td.name,
254-
logger=TEST_LOGGER
255-
)
251+
res0 = add_nancodes(res0, write_se=True)
256252

257-
# nan se for included loc-date
258-
res2 = res0.copy()
259-
res2 = res2[res2['include']]
260-
res2.loc[("2020-05-01", "a"), "se"] = np.nan
261-
res2.reset_index(inplace=True)
262-
with pytest.raises(AssertionError):
263-
write_to_csv(
264-
res2,
265-
geo_level="geography",
266-
write_se=True,
267-
day_shift=CONFIG.DAY_SHIFT,
268-
out_name="name_of_signal",
269-
output_path=td.name,
270-
logger=TEST_LOGGER
271-
)
253+
mock_logger = mock.Mock()
254+
td = TemporaryDirectory()
255+
write_to_csv(
256+
res0,
257+
geo_level="geography",
258+
write_se=True,
259+
day_shift=CONFIG.DAY_SHIFT,
260+
out_name="name_of_signal",
261+
output_path=td.name,
262+
logger=mock_logger
263+
)
272264

273265
# large se value
274-
res3 = res0.copy()
275-
res3 = res3[res3['include']]
266+
res3 = res0.copy().set_index(["timestamp", "geo_id"])
267+
res3 = res3[res3['incl']]
276268
res3.loc[("2020-05-01", "a"), "se"] = 10
277269
res3.reset_index(inplace=True)
278270
with pytest.raises(AssertionError):
@@ -286,4 +278,47 @@ def test_write_to_csv_wrong_results(self):
286278
logger=TEST_LOGGER
287279
)
288280

289-
td.cleanup()
281+
mock_logger.warning.assert_called_once_with(
282+
"value suspiciously high, {0}: {1}".format("a", "name_of_signal")
283+
)
284+
285+
def test_add_nancodes(self):
286+
"""Tests that nancodes are correctly addded."""
287+
res0 = pd.DataFrame({
288+
"val": [np.nan, 0.5, 1.5] + [1, 2, 3],
289+
"se": [np.nan, 1, 1.1] + [np.nan, np.nan, 0.5],
290+
"sample_size": [np.nan] * 6,
291+
"timestamp": pd.to_datetime(["2020-05-01", "2020-05-02", "2020-05-04"] * 2),
292+
"incl": [True, True, True] + [True, False, True],
293+
"geo_id": ["a"] * 3 + ["b"] * 3,
294+
}).set_index(["timestamp", "geo_id"]).sort_index()
295+
296+
expected_df = pd.DataFrame({
297+
"val": [np.nan, 0.5, 1.5] + [1, np.nan, 3],
298+
"se": [np.nan, 1, 1.1] + [np.nan, np.nan, 0.5],
299+
"sample_size": [np.nan] * 6,
300+
"timestamp": pd.to_datetime(["2020-05-01", "2020-05-02", "2020-05-04"] * 2),
301+
"incl": [True, True, True] + [True, False, True],
302+
"geo_id": ["a"] * 3 + ["b"] * 3,
303+
"missing_val": [Nans.OTHER] + [Nans.NOT_MISSING] * 3 + [Nans.CENSORED, Nans.NOT_MISSING],
304+
"missing_se": [Nans.OTHER] + [Nans.NOT_MISSING] * 2 + [Nans.OTHER, Nans.CENSORED, Nans.NOT_MISSING],
305+
"missing_sample_size": [Nans.CENSORED] * 6,
306+
}).set_index(["timestamp", "geo_id"]).sort_index()
307+
308+
res = censor_columns(res0, ["sample_size"])
309+
pd.testing.assert_frame_equal(expected_df, add_nancodes(res, write_se=True))
310+
311+
expected_df = pd.DataFrame({
312+
"val": [np.nan, 0.5, 1.5] + [1, np.nan, 3],
313+
"se": [np.nan] * 6,
314+
"sample_size": [np.nan] * 6,
315+
"timestamp": pd.to_datetime(["2020-05-01", "2020-05-02", "2020-05-04"] * 2),
316+
"incl": [True, True, True] + [True, False, True],
317+
"geo_id": ["a"] * 3 + ["b"] * 3,
318+
"missing_val": [Nans.OTHER] + [Nans.NOT_MISSING] * 3 + [Nans.CENSORED, Nans.NOT_MISSING],
319+
"missing_se": [Nans.CENSORED] * 6,
320+
"missing_sample_size": [Nans.CENSORED] * 6,
321+
}).set_index(["timestamp", "geo_id"]).sort_index()
322+
323+
res = censor_columns(res0, ["sample_size", "se"])
324+
pd.testing.assert_frame_equal(expected_df, add_nancodes(res, write_se=False))

0 commit comments

Comments
 (0)