Skip to content

Commit cdd12ff

Browse files
authored
Merge pull request #1261 from cmu-delphi/use-structured-logger
Switch indicators from logging to delphi_utils structured logger
2 parents 086cf40 + 5b56357 commit cdd12ff

File tree

13 files changed

+106
-77
lines changed

13 files changed

+106
-77
lines changed

cdc_covidnet/delphi_cdc_covidnet/covidnet.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
"""
77

88
import json
9-
import logging
9+
from logging import Logger
1010
import os
1111
from typing import Tuple, List
1212
from multiprocessing import cpu_count, Pool
@@ -100,7 +100,7 @@ def download_hosp_data(
100100

101101
@staticmethod
102102
def download_all_hosp_data(
103-
mappings_file: str, cache_path: str, parallel: bool = False
103+
mappings_file: str, cache_path: str, logger: Logger, parallel: bool = False
104104
) -> List[str]:
105105
"""
106106
Download hospitalization data for all states listed in the mappings JSON file to disk.
@@ -146,7 +146,7 @@ def download_all_hosp_data(
146146
else:
147147
for args in state_args:
148148
CovidNet.download_hosp_data(*args)
149-
logging.debug("Downloading for nid=%s, cid=%s", args[0], args[1])
149+
logger.debug("Downloading for nid=%s, cid=%s", args[0], args[1])
150150

151151
return state_files
152152

cdc_covidnet/delphi_cdc_covidnet/run.py

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,13 @@
44
This module should contain a function called `run_module`, that is executed
55
when the module is run with `python -m delphi_cdc_covidnet`.
66
"""
7-
import logging
87
from datetime import datetime
98
from os import remove
109
from os.path import join
1110
from typing import Dict, Any
1211

12+
from delphi_utils import get_structured_logger
13+
1314
from .covidnet import CovidNet
1415
from .update_sensor import update_sensor
1516

@@ -32,7 +33,9 @@ def run_module(params: Dict[str, Dict[str, Any]]):
3233
- "wip_signal": list of str or bool, to be passed to delphi_utils.add_prefix.
3334
- "input_cache_dir": str, directory to download source files.
3435
"""
35-
logging.basicConfig(level=logging.DEBUG)
36+
logger = get_structured_logger(
37+
__name__, filename=params["common"].get("log_filename"),
38+
log_exceptions=params["common"].get("log_exceptions", True))
3639

3740
start_date = datetime.strptime(params["indicator"]["start_date"], "%Y-%m-%d")
3841

@@ -42,23 +45,24 @@ def run_module(params: Dict[str, Dict[str, Any]]):
4245
else:
4346
end_date = datetime.strptime(params["indicator"]["end_date"], "%Y-%m-%d")
4447

45-
logging.info("start date:\t%s", start_date.date())
46-
logging.info("end date:\t%s", end_date.date())
48+
logger.info("start date:\t%s", start_date.date())
49+
logger.info("end date:\t%s", end_date.date())
4750

48-
logging.info("outpath:\t%s", params["common"]["export_dir"])
49-
logging.info("parallel:\t%s", params["indicator"]["parallel"])
51+
logger.info("outpath:\t%s", params["common"]["export_dir"])
52+
logger.info("parallel:\t%s", params["indicator"]["parallel"])
5053

5154
# Only geo is state, and no weekday adjustment for now
5255
# COVID-NET data is by weeks anyway, not daily
53-
logging.info("starting state, no adj")
56+
logger.info("starting state, no adj")
5457

5558
# Download latest COVID-NET files into the cache directory first
5659
mappings_file = join(params["indicator"]["input_cache_dir"], "init.json")
5760
CovidNet.download_mappings(outfile=mappings_file)
5861
_, mmwr_info, _ = CovidNet.read_mappings(mappings_file)
5962
state_files = CovidNet.download_all_hosp_data(
6063
mappings_file, params["indicator"]["input_cache_dir"],
61-
parallel=params["indicator"]["parallel"])
64+
parallel=params["indicator"]["parallel"],
65+
logger=logger)
6266

6367
update_sensor(
6468
state_files,
@@ -73,4 +77,4 @@ def run_module(params: Dict[str, Dict[str, Any]]):
7377
for state_file in state_files:
7478
remove(state_file)
7579

76-
logging.info("finished all")
80+
logger.info("finished all")

cdc_covidnet/tests/test_covidnet.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import json
2+
import logging
23
from os.path import join, exists
34
from tempfile import TemporaryDirectory
45

@@ -7,6 +8,7 @@
78
from delphi_cdc_covidnet.api_config import APIConfig
89
from delphi_cdc_covidnet.covidnet import CovidNet
910

11+
TEST_LOGGER = logging.getLogger()
1012

1113
class TestCovidNet:
1214

@@ -65,14 +67,14 @@ def test_hosp_data(self):
6567

6668
# Non-parallel
6769
state_files = CovidNet.download_all_hosp_data(
68-
init_file, temp_dir, parallel=False)
70+
init_file, temp_dir, TEST_LOGGER, parallel=False)
6971
assert len(state_files) == num_states
7072
for state_file in state_files:
7173
assert exists(state_file)
7274

7375
# Parallel
7476
state_files_par = CovidNet.download_all_hosp_data(
75-
init_file, temp_dir, parallel=True)
77+
init_file, temp_dir, TEST_LOGGER, parallel=True)
7678
assert set(state_files) == set(state_files_par)
7779
assert len(state_files_par) == num_states
7880
for state_file in state_files_par:

changehc/delphi_changehc/run.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,8 @@ def run_module(params: Dict[str, Dict[str, Any]]):
173173
weekday,
174174
numtype,
175175
params["indicator"]["se"],
176-
params["indicator"]["wip_signal"]
176+
params["indicator"]["wip_signal"],
177+
logger
177178
)
178179
if numtype == "covid":
179180
data = load_combined_data(file_dict["denom"],

changehc/delphi_changehc/sensor.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ def backfill(
8787
return new_num, new_den
8888

8989
@staticmethod
90-
def fit(y_data, first_sensor_date, geo_id, num_col="num", den_col="den"):
90+
def fit(y_data, first_sensor_date, geo_id, logger, num_col="num", den_col="den"):
9191
"""Fitting routine.
9292
9393
Args:
@@ -121,7 +121,7 @@ def fit(y_data, first_sensor_date, geo_id, num_col="num", den_col="den"):
121121
se_valid = valid_rates.eval('sqrt(rate * (1 - rate) / den)')
122122
rate_data['se'] = se_valid
123123

124-
logging.debug("{0}: {1:.3f},[{2:.3f}]".format(
124+
logger.debug("{0}: {1:.3f},[{2:.3f}]".format(
125125
geo_id, rate_data['rate'][-1], rate_data['se'][-1]
126126
))
127127
return {"geo_id": geo_id,

changehc/delphi_changehc/update_sensor.py

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
from .weekday import Weekday
2121

2222

23-
def write_to_csv(df, geo_level, write_se, day_shift, out_name, output_path=".", start_date=None, end_date=None):
23+
def write_to_csv(df, geo_level, write_se, day_shift, out_name, logger, output_path=".", start_date=None, end_date=None):
2424
"""Write sensor values to csv.
2525
2626
Args:
@@ -47,15 +47,15 @@ def write_to_csv(df, geo_level, write_se, day_shift, out_name, output_path=".",
4747
assert df[suspicious_se_mask].empty, " se contains suspiciously large values"
4848
assert not df["se"].isna().any(), " se contains nan values"
4949
if write_se:
50-
logging.info("========= WARNING: WRITING SEs TO {0} =========".format(out_name))
50+
logger.info("========= WARNING: WRITING SEs TO {0} =========".format(out_name))
5151
else:
5252
df.loc[:, "se"] = np.nan
5353

5454
assert not df["val"].isna().any(), " val contains nan values"
5555
suspicious_val_mask = df["val"].gt(90)
5656
if not df[suspicious_val_mask].empty:
5757
for geo in df.loc[suspicious_val_mask, "geo_id"]:
58-
logging.warning("value suspiciously high, {0}: {1}".format(
58+
logger.warning("value suspiciously high, {0}: {1}".format(
5959
geo, out_name
6060
))
6161

@@ -68,10 +68,10 @@ def write_to_csv(df, geo_level, write_se, day_shift, out_name, output_path=".",
6868
sensor=out_name,
6969
write_empty_days=True
7070
)
71-
logging.debug("wrote {0} rows for {1} {2}".format(
71+
logger.debug("wrote {0} rows for {1} {2}".format(
7272
df.size, df["geo_id"].unique().size, geo_level
7373
))
74-
logging.debug("wrote files to {0}".format(output_path))
74+
logger.debug("wrote files to {0}".format(output_path))
7575
return dates
7676

7777

@@ -87,7 +87,8 @@ def __init__(self,
8787
weekday,
8888
numtype,
8989
se,
90-
wip_signal):
90+
wip_signal,
91+
logger):
9192
"""Init Sensor Updator.
9293
9394
Args:
@@ -100,7 +101,9 @@ def __init__(self,
100101
numtype: type of count data used, one of ["covid", "cli"]
101102
se: boolean to write out standard errors, if true, use an obfuscated name
102103
wip_signal: Prefix for WIP signals
104+
logger: the structured logger
103105
"""
106+
self.logger = logger
104107
self.startdate, self.enddate, self.dropdate = [
105108
pd.to_datetime(t) for t in (startdate, enddate, dropdate)]
106109
# handle dates
@@ -149,7 +152,7 @@ def geo_reindex(self, data):
149152
geo = self.geo
150153
gmpr = GeoMapper()
151154
if geo not in {"county", "state", "msa", "hrr", "nation", "hhs"}:
152-
logging.error("{0} is invalid, pick one of 'county', "
155+
self.logger.error("{0} is invalid, pick one of 'county', "
153156
"'state', 'msa', 'hrr', 'hss','nation'".format(geo))
154157
return False
155158
if geo == "county":
@@ -201,12 +204,12 @@ def update_sensor(self,
201204
sub_data.reset_index(level=0,inplace=True)
202205
if self.weekday:
203206
sub_data = Weekday.calc_adjustment(wd_params, sub_data)
204-
res = CHCSensor.fit(sub_data, self.burnindate, geo_id)
207+
res = CHCSensor.fit(sub_data, self.burnindate, geo_id, self.logger)
205208
res = pd.DataFrame(res).loc[final_sensor_idxs]
206209
dfs.append(res)
207210
else:
208211
n_cpu = min(10, cpu_count())
209-
logging.debug("starting pool with {0} workers".format(n_cpu))
212+
self.logger.debug("starting pool with {0} workers".format(n_cpu))
210213
with Pool(n_cpu) as pool:
211214
pool_results = []
212215
for geo_id, sub_data in data_frame.groupby(level=0,as_index=False):
@@ -215,7 +218,7 @@ def update_sensor(self,
215218
sub_data = Weekday.calc_adjustment(wd_params, sub_data)
216219
pool_results.append(
217220
pool.apply_async(
218-
CHCSensor.fit, args=(sub_data, self.burnindate, geo_id,),
221+
CHCSensor.fit, args=(sub_data, self.burnindate, geo_id, self.logger),
219222
)
220223
)
221224
pool_results = [proc.get() for proc in pool_results]
@@ -244,7 +247,8 @@ def update_sensor(self,
244247
write_se=self.se,
245248
day_shift=Config.DAY_SHIFT,
246249
out_name=signal,
247-
output_path=output_path
250+
output_path=output_path,
251+
logger=self.logger
248252
)
249253
if len(dates) > 0:
250254
stats.append((max(dates), len(dates)))

changehc/tests/test_sensor.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
# standard
2+
import logging
23

34
import numpy as np
45
import numpy.random as nr
@@ -19,6 +20,7 @@
1920
COVID_FILEPATH = PARAMS["indicator"]["input_covid_file"]
2021
DENOM_FILEPATH = PARAMS["indicator"]["input_denom_file"]
2122
DROP_DATE = pd.to_datetime(PARAMS["indicator"]["drop_date"])
23+
TEST_LOGGER = logging.getLogger()
2224

2325
class TestLoadData:
2426
combined_data = load_combined_data(DENOM_FILEPATH, COVID_FILEPATH, DROP_DATE,
@@ -56,7 +58,7 @@ def test_fit_fips(self):
5658
for fips in all_fips:
5759
sub_data = self.combined_data.loc[fips]
5860
sub_data = sub_data.reindex(date_range, fill_value=0)
59-
res0 = CHCSensor.fit(sub_data, date_range[0], fips)
61+
res0 = CHCSensor.fit(sub_data, date_range[0], fips, TEST_LOGGER)
6062

6163
if np.isnan(res0["rate"]).all():
6264
assert res0["incl"].sum() == 0

changehc/tests/test_update_sensor.py

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
# standard
2+
import logging
23
from copy import deepcopy
34
import os
45
from os.path import join, exists
@@ -27,6 +28,7 @@
2728
DENOM_FILEPATH = PARAMS["indicator"]["input_denom_file"]
2829
DROP_DATE = pd.to_datetime(PARAMS["indicator"]["drop_date"])
2930
OUTPATH="test_data/"
31+
TEST_LOGGER = logging.getLogger()
3032

3133
class TestCHCSensorUpdator:
3234
"""Tests for updating the sensors."""
@@ -53,7 +55,8 @@ def test_shift_dates(self):
5355
self.weekday,
5456
self.numtype,
5557
self.se,
56-
""
58+
"",
59+
TEST_LOGGER
5760
)
5861
## Test init
5962
assert su_inst.startdate.month == 2
@@ -77,7 +80,8 @@ def test_geo_reindex(self):
7780
self.weekday,
7881
self.numtype,
7982
self.se,
80-
""
83+
"",
84+
TEST_LOGGER
8185
)
8286
su_inst.shift_dates()
8387
test_data = pd.DataFrame({
@@ -103,7 +107,8 @@ def test_update_sensor(self):
103107
self.weekday,
104108
self.numtype,
105109
self.se,
106-
""
110+
"",
111+
TEST_LOGGER
107112
)
108113
# As of 3/3/21 (40c258a), this set of data has county outputting data, state and hhs not
109114
# outputting data, and nation outputting data, which is undesirable. Ideal behaviour
@@ -149,7 +154,8 @@ def test_write_to_csv_results(self):
149154
write_se=False,
150155
day_shift=CONFIG.DAY_SHIFT,
151156
out_name="name_of_signal",
152-
output_path=td.name
157+
output_path=td.name,
158+
logger=TEST_LOGGER
153159
)
154160

155161
# check outputs
@@ -203,7 +209,8 @@ def test_write_to_csv_with_se_results(self):
203209
write_se=True,
204210
day_shift=CONFIG.DAY_SHIFT,
205211
out_name="name_of_signal",
206-
output_path=td.name
212+
output_path=td.name,
213+
logger=TEST_LOGGER
207214
)
208215

209216
# check outputs
@@ -243,7 +250,8 @@ def test_write_to_csv_wrong_results(self):
243250
write_se=False,
244251
day_shift=CONFIG.DAY_SHIFT,
245252
out_name="name_of_signal",
246-
output_path=td.name
253+
output_path=td.name,
254+
logger=TEST_LOGGER
247255
)
248256

249257
# nan se for included loc-date
@@ -258,7 +266,8 @@ def test_write_to_csv_wrong_results(self):
258266
write_se=True,
259267
day_shift=CONFIG.DAY_SHIFT,
260268
out_name="name_of_signal",
261-
output_path=td.name
269+
output_path=td.name,
270+
logger=TEST_LOGGER
262271
)
263272

264273
# large se value
@@ -273,7 +282,8 @@ def test_write_to_csv_wrong_results(self):
273282
write_se=True,
274283
day_shift=CONFIG.DAY_SHIFT,
275284
out_name="name_of_signal",
276-
output_path=td.name
285+
output_path=td.name,
286+
logger=TEST_LOGGER
277287
)
278288

279289
td.cleanup()

0 commit comments

Comments
 (0)