Skip to content

Commit 5a60716

Browse files
committed
Nans changehc:
* allow nan values, add missing columns, and test
1 parent 8c752d9 commit 5a60716

File tree

2 files changed

+136
-90
lines changed

2 files changed

+136
-90
lines changed

changehc/delphi_changehc/update_sensor.py

Lines changed: 51 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -11,28 +11,61 @@
1111
# third party
1212
import numpy as np
1313
import pandas as pd
14-
from delphi_utils import GeoMapper, add_prefix, create_export_csv
14+
from delphi_utils import GeoMapper, add_prefix, create_export_csv, Nans
1515

1616
# first party
1717
from .config import Config
18-
from .constants import SMOOTHED, SMOOTHED_ADJ, SMOOTHED_CLI, SMOOTHED_ADJ_CLI, NA
18+
from .constants import SMOOTHED, SMOOTHED_ADJ, SMOOTHED_CLI, SMOOTHED_ADJ_CLI
1919
from .sensor import CHCSensor
2020
from .weekday import Weekday
2121

2222

23-
def write_to_csv(df, geo_level, write_se, day_shift, out_name, output_path=".", start_date=None, end_date=None):
23+
def censor_columns(df, cols, inplace=False):
24+
"""Replace values with nans in the specified columns."""
25+
df = df if inplace else df.copy()
26+
df.loc[:, cols] = np.nan
27+
return df
28+
29+
def add_nancodes(df, write_se, inplace=False):
30+
"""Add nancodes to the dataframe."""
31+
df = df if inplace else df.copy()
32+
33+
# Default missingness codes
34+
df["missing_val"] = Nans.NOT_MISSING
35+
df["missing_se"] = Nans.CENSORED if not write_se else Nans.NOT_MISSING
36+
df["missing_sample_size"] = Nans.CENSORED
37+
38+
# Censor those that weren't included
39+
df.loc[~df['incl'], ["val", "se"]] = np.nan # update to this line after nancodes get merged in
40+
df.loc[~df['incl'], ["missing_val", "missing_se"]] = Nans.CENSORED
41+
42+
# Mark any remaining nans with unknown
43+
remaining_nans_mask = df["val"].isnull() & df["missing_val"].eq(Nans.NOT_MISSING)
44+
df.loc[remaining_nans_mask, "missing_val"] = Nans.OTHER
45+
46+
remaining_nans_mask = df["se"].isnull() & df["missing_se"].eq(Nans.NOT_MISSING)
47+
df.loc[remaining_nans_mask, "missing_se"] = Nans.OTHER
48+
49+
return df
50+
51+
def write_to_csv(
52+
df, geo_level, day_shift, out_name,
53+
output_path=".", start_date=None, end_date=None,
54+
logger=None):
2455
"""Write sensor values to csv.
2556
2657
Args:
2758
df: dataframe containing unique timestamp, unqiue geo_id, val, se, sample_size
2859
geo_level: the geographic level being written e.g. county, state
29-
write_se: boolean to write out standard errors, if true, use an obfuscated name
3060
day_shift: a timedelta specifying the time shift to apply to the dates
3161
out_name: name of the output file
3262
output_path: outfile path to write the csv (default is current directory)
3363
start_date: the first date of the dates to be written
3464
end_date: the last date of the dates to be written
65+
logger: a logger object to log events while writing
3566
"""
67+
logger = logging if logger is None else logger
68+
3669
df = df.copy()
3770

3871
# shift dates forward for labeling
@@ -45,17 +78,11 @@ def write_to_csv(df, geo_level, write_se, day_shift, out_name, output_path=".",
4578
# suspicious value warnings
4679
suspicious_se_mask = df["se"].gt(5)
4780
assert df[suspicious_se_mask].empty, " se contains suspiciously large values"
48-
assert not df["se"].isna().any(), " se contains nan values"
49-
if write_se:
50-
logging.info("========= WARNING: WRITING SEs TO {0} =========".format(out_name))
51-
else:
52-
df.loc[:, "se"] = np.nan
5381

54-
assert not df["val"].isna().any(), " val contains nan values"
5582
suspicious_val_mask = df["val"].gt(90)
5683
if not df[suspicious_val_mask].empty:
5784
for geo in df.loc[suspicious_val_mask, "geo_id"]:
58-
logging.warning("value suspiciously high, {0}: {1}".format(
85+
logger.warning("value suspiciously high, {0}: {1}".format(
5986
geo, out_name
6087
))
6188

@@ -66,13 +93,13 @@ def write_to_csv(df, geo_level, write_se, day_shift, out_name, output_path=".",
6693
start_date=start_date,
6794
end_date=end_date,
6895
sensor=out_name,
69-
write_empty_days=True
96+
write_empty_days=True,
97+
logger=logger
7098
)
71-
logging.debug("wrote {0} rows for {1} {2}".format(
99+
logger.debug("wrote {0} rows for {1} {2}".format(
72100
df.size, df["geo_id"].unique().size, geo_level
73101
))
74-
logging.debug("wrote files to {0}".format(output_path))
75-
102+
logger.debug("wrote files to {0}".format(output_path))
76103

77104
class CHCSensorUpdator: # pylint: disable=too-many-instance-attributes
78105
"""Contains methods to update sensor and write results to csv."""
@@ -223,23 +250,23 @@ def update_sensor(self,
223250
res = pd.DataFrame(res).loc[final_sensor_idxs]
224251
dfs.append(res)
225252

226-
# Form the output dataframe
227-
df = pd.concat(dfs)
228-
# sample size is never shared
229-
df["sample_size"] = np.nan
230-
# conform to naming expected by create_export_csv()
231-
df = df.reset_index().rename(columns={"date": "timestamp", "rate": "val"})
232-
# df.loc[~df['incl'], ["val", "se"]] = np.nan # update to this line after nancodes get merged in
233-
df = df[df['incl']]
253+
# Form the output dataframe and conform to expected naming
254+
df = pd.concat(dfs).reset_index().rename(columns={"date": "timestamp", "rate": "val"})
255+
256+
# sample size is never shared; standard error might be shared
257+
df = censor_columns(df, ["sample_size"] if self.se else ["sample_size", "se"])
258+
df = add_nancodes(df, self.se)
234259

235260
# write out results
236261
for signal in self.updated_signal_names:
262+
if self.se:
263+
logging.info("========= WARNING: WRITING SEs TO {0} =========".format(signal))
264+
237265
write_to_csv(
238266
df,
239267
geo_level=self.geo,
240268
start_date=min(self.sensor_dates),
241269
end_date=max(self.sensor_dates),
242-
write_se=self.se,
243270
day_shift=Config.DAY_SHIFT,
244271
out_name=signal,
245272
output_path=output_path

changehc/tests/test_update_sensor.py

Lines changed: 85 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,12 @@
1010
from boto3 import Session
1111
from moto import mock_s3
1212
import pytest
13+
import mock
1314

1415
# first party
1516
from delphi_changehc.config import Config
16-
from delphi_changehc.update_sensor import write_to_csv, CHCSensorUpdator
17+
from delphi_changehc.update_sensor import add_nancodes, censor_columns, write_to_csv, CHCSensorUpdator
18+
from delphi_utils.nancodes import Nans
1719

1820
CONFIG = Config()
1921
PARAMS = {
@@ -92,7 +94,8 @@ def test_geo_reindex(self):
9294
def test_update_sensor(self):
9395
"""Tests that the sensors are properly updated."""
9496
outputs = {}
95-
for geo in ["county", "state", "hhs", "nation"]:
97+
geos = ["county", "state", "hhs", "nation"]
98+
for geo in geos:
9699
td = TemporaryDirectory()
97100
su_inst = CHCSensorUpdator(
98101
"03-01-2020",
@@ -122,11 +125,10 @@ def test_update_sensor(self):
122125
assert len(os.listdir(td.name)) == len(su_inst.sensor_dates),\
123126
f"failed {geo} update sensor test"
124127
td.cleanup()
125-
assert outputs["20200319_county_smoothed_outpatient_covid.csv"].empty
126-
assert outputs["20200319_state_smoothed_outpatient_covid.csv"].empty
127-
assert outputs["20200319_hhs_smoothed_outpatient_covid.csv"].empty
128-
assert outputs["20200319_nation_smoothed_outpatient_covid.csv"].empty
129-
128+
value_columns = ["val", "se", "sample_size"]
129+
for geo in geos:
130+
assert np.isnan(outputs["20200319_" + geo + "_smoothed_outpatient_covid.csv"][value_columns]).all().all()
131+
assert outputs["20200319_" + geo + "_smoothed_outpatient_covid.csv"]["missing_val"].eq(3).all()
130132

131133
class TestWriteToCsv:
132134
"""Tests for writing output files to CSV."""
@@ -137,16 +139,18 @@ def test_write_to_csv_results(self):
137139
"se": [0.1, 1, 1.1] + [0.5, np.nan, 0.5],
138140
"sample_size": [np.nan] * 6,
139141
"timestamp": pd.to_datetime(["2020-05-01", "2020-05-02", "2020-05-04"] * 2),
140-
"include": [True, True, True] + [True, False, True],
142+
"incl": [True, True, True] + [True, False, True],
141143
"geo_id": ["a"] * 3 + ["b"] * 3,
142144
})
143145

144146
td = TemporaryDirectory()
145147

148+
res0 = censor_columns(res0, ["sample_size", "se"])
149+
res0 = add_nancodes(res0, write_se=False)
150+
146151
write_to_csv(
147-
res0[res0['include']],
152+
res0,
148153
geo_level="geography",
149-
write_se=False,
150154
day_shift=CONFIG.DAY_SHIFT,
151155
out_name="name_of_signal",
152156
output_path=td.name
@@ -156,7 +160,10 @@ def test_write_to_csv_results(self):
156160
expected_name = "20200502_geography_name_of_signal.csv"
157161
assert exists(join(td.name, expected_name))
158162
output_data = pd.read_csv(join(td.name, expected_name))
159-
expected_columns = ["geo_id", "val", "se", "sample_size"]
163+
expected_columns = [
164+
"geo_id", "val", "se", "sample_size",
165+
"missing_val", "missing_se", "missing_sample_size"
166+
]
160167
assert (output_data.columns == expected_columns).all()
161168
assert (output_data.geo_id == ["a", "b"]).all()
162169
assert np.array_equal(output_data.val.values, np.array([0.1, 1]))
@@ -169,8 +176,8 @@ def test_write_to_csv_results(self):
169176
assert exists(join(td.name, expected_name))
170177
output_data = pd.read_csv(join(td.name, expected_name))
171178
assert (output_data.columns == expected_columns).all()
172-
assert (output_data.geo_id == ["a"]).all()
173-
assert np.array_equal(output_data.val.values, np.array([0.5]))
179+
assert (output_data.geo_id == ["a", "b"]).all()
180+
assert np.array_equal(output_data.val.values, np.array([0.5, np.nan]), equal_nan=True)
174181
assert np.isnan(output_data.se.values).all()
175182
assert np.isnan(output_data.sample_size.values).all()
176183

@@ -192,88 +199,100 @@ def test_write_to_csv_with_se_results(self):
192199
"se": [0.1, 1, 1.1] + [0.5, np.nan, 0.5],
193200
"sample_size": [np.nan] * 6,
194201
"timestamp": pd.to_datetime(["2020-05-01", "2020-05-02", "2020-05-04"] * 2),
195-
"include": [True, True, True] + [True, False, True],
202+
"incl": [True, True, True] + [True, False, True],
196203
"geo_id": ["a"] * 3 + ["b"] * 3,
197204
})
198205

206+
res0 = add_nancodes(res0, write_se=True)
207+
199208
td = TemporaryDirectory()
200209
write_to_csv(
201-
res0[res0['include']],
210+
res0,
202211
geo_level="geography",
203-
write_se=True,
204212
day_shift=CONFIG.DAY_SHIFT,
205213
out_name="name_of_signal",
206214
output_path=td.name
207215
)
208216

209217
# check outputs
210218
expected_name = "20200502_geography_name_of_signal.csv"
219+
expected_columns = [
220+
"geo_id", "val", "se", "sample_size",
221+
"missing_val", "missing_se", "missing_sample_size"
222+
]
211223
assert exists(join(td.name, expected_name))
212224
output_data = pd.read_csv(join(td.name, expected_name))
213-
expected_columns = ["geo_id", "val", "se", "sample_size"]
214225
assert (output_data.columns == expected_columns).all()
215226
assert (output_data.geo_id == ["a", "b"]).all()
216227
assert np.array_equal(output_data.val.values, np.array([0.1, 1]))
217228
assert np.array_equal(output_data.se.values, np.array([0.1, 0.5]))
218229
assert np.isnan(output_data.sample_size.values).all()
219230
td.cleanup()
220231

221-
def test_write_to_csv_wrong_results(self):
222-
"""Tests that nonsensical inputs trigger exceptions."""
232+
def test_suspicious_value_logging(self):
223233
res0 = pd.DataFrame({
224-
"val": [0.1, 0.5, 1.5] + [1, 2, 3],
225-
"se": [0.1, 1, 1.1] + [0.5, 0.5, 0.5],
234+
"val": [91, 0.5, 1.5] + [1, 2, 3],
235+
"se": [0.1, 1, 1.1] + [0.5, np.nan, 0.5],
226236
"sample_size": [np.nan] * 6,
227237
"timestamp": pd.to_datetime(["2020-05-01", "2020-05-02", "2020-05-04"] * 2),
228-
"include": [True, True, True] + [True, False, True],
238+
"incl": [True, True, True] + [True, False, True],
229239
"geo_id": ["a"] * 3 + ["b"] * 3,
230-
}).set_index(["timestamp", "geo_id"]).sort_index()
240+
})
241+
242+
res0 = add_nancodes(res0, write_se=True)
231243

244+
mock_logger = mock.Mock()
232245
td = TemporaryDirectory()
246+
write_to_csv(
247+
res0,
248+
geo_level="geography",
249+
day_shift=CONFIG.DAY_SHIFT,
250+
out_name="name_of_signal",
251+
output_path=td.name,
252+
logger=mock_logger
253+
)
233254

234-
# nan value for included loc-date
235-
res1 = res0.copy()
236-
res1 = res1[res1['include']]
237-
res1.loc[("2020-05-01", "a"), "val"] = np.nan
238-
res1.reset_index(inplace=True)
239-
with pytest.raises(AssertionError):
240-
write_to_csv(
241-
res1,
242-
geo_level="geography",
243-
write_se=False,
244-
day_shift=CONFIG.DAY_SHIFT,
245-
out_name="name_of_signal",
246-
output_path=td.name
247-
)
255+
mock_logger.warning.assert_called_once_with(
256+
"value suspiciously high, {0}: {1}".format("a", "name_of_signal")
257+
)
248258

249-
# nan se for included loc-date
250-
res2 = res0.copy()
251-
res2 = res2[res2['include']]
252-
res2.loc[("2020-05-01", "a"), "se"] = np.nan
253-
res2.reset_index(inplace=True)
254-
with pytest.raises(AssertionError):
255-
write_to_csv(
256-
res2,
257-
geo_level="geography",
258-
write_se=True,
259-
day_shift=CONFIG.DAY_SHIFT,
260-
out_name="name_of_signal",
261-
output_path=td.name
262-
)
259+
def test_add_nancodes(self):
260+
"""Tests that nancodes are correctly addded."""
261+
res0 = pd.DataFrame({
262+
"val": [np.nan, 0.5, 1.5] + [1, 2, 3],
263+
"se": [np.nan, 1, 1.1] + [np.nan, np.nan, 0.5],
264+
"sample_size": [np.nan] * 6,
265+
"timestamp": pd.to_datetime(["2020-05-01", "2020-05-02", "2020-05-04"] * 2),
266+
"incl": [True, True, True] + [True, False, True],
267+
"geo_id": ["a"] * 3 + ["b"] * 3,
268+
}).set_index(["timestamp", "geo_id"]).sort_index()
263269

264-
# large se value
265-
res3 = res0.copy()
266-
res3 = res3[res3['include']]
267-
res3.loc[("2020-05-01", "a"), "se"] = 10
268-
res3.reset_index(inplace=True)
269-
with pytest.raises(AssertionError):
270-
write_to_csv(
271-
res3,
272-
geo_level="geography",
273-
write_se=True,
274-
day_shift=CONFIG.DAY_SHIFT,
275-
out_name="name_of_signal",
276-
output_path=td.name
277-
)
270+
expected_df = pd.DataFrame({
271+
"val": [np.nan, 0.5, 1.5] + [1, np.nan, 3],
272+
"se": [np.nan, 1, 1.1] + [np.nan, np.nan, 0.5],
273+
"sample_size": [np.nan] * 6,
274+
"timestamp": pd.to_datetime(["2020-05-01", "2020-05-02", "2020-05-04"] * 2),
275+
"incl": [True, True, True] + [True, False, True],
276+
"geo_id": ["a"] * 3 + ["b"] * 3,
277+
"missing_val": [Nans.OTHER] + [Nans.NOT_MISSING] * 3 + [Nans.CENSORED, Nans.NOT_MISSING],
278+
"missing_se": [Nans.OTHER] + [Nans.NOT_MISSING] * 2 + [Nans.OTHER, Nans.CENSORED, Nans.NOT_MISSING],
279+
"missing_sample_size": [Nans.CENSORED] * 6,
280+
}).set_index(["timestamp", "geo_id"]).sort_index()
278281

279-
td.cleanup()
282+
res = censor_columns(res0, ["sample_size"])
283+
pd.testing.assert_frame_equal(expected_df, add_nancodes(res, write_se=True))
284+
285+
expected_df = pd.DataFrame({
286+
"val": [np.nan, 0.5, 1.5] + [1, np.nan, 3],
287+
"se": [np.nan] * 6,
288+
"sample_size": [np.nan] * 6,
289+
"timestamp": pd.to_datetime(["2020-05-01", "2020-05-02", "2020-05-04"] * 2),
290+
"incl": [True, True, True] + [True, False, True],
291+
"geo_id": ["a"] * 3 + ["b"] * 3,
292+
"missing_val": [Nans.OTHER] + [Nans.NOT_MISSING] * 3 + [Nans.CENSORED, Nans.NOT_MISSING],
293+
"missing_se": [Nans.CENSORED] * 6,
294+
"missing_sample_size": [Nans.CENSORED] * 6,
295+
}).set_index(["timestamp", "geo_id"]).sort_index()
296+
297+
res = censor_columns(res0, ["sample_size", "se"])
298+
pd.testing.assert_frame_equal(expected_df, add_nancodes(res, write_se=False))

0 commit comments

Comments
 (0)