Skip to content

Switch indicators from logging to delphi_utils structured logger #1261

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Sep 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions cdc_covidnet/delphi_cdc_covidnet/covidnet.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
"""

import json
import logging
from logging import Logger
import os
from typing import Tuple, List
from multiprocessing import cpu_count, Pool
Expand Down Expand Up @@ -100,7 +100,7 @@ def download_hosp_data(

@staticmethod
def download_all_hosp_data(
mappings_file: str, cache_path: str, parallel: bool = False
mappings_file: str, cache_path: str, logger: Logger, parallel: bool = False
) -> List[str]:
"""
Download hospitalization data for all states listed in the mappings JSON file to disk.
Expand Down Expand Up @@ -146,7 +146,7 @@ def download_all_hosp_data(
else:
for args in state_args:
CovidNet.download_hosp_data(*args)
logging.debug("Downloading for nid=%s, cid=%s", args[0], args[1])
logger.debug("Downloading for nid=%s, cid=%s", args[0], args[1])

return state_files

Expand Down
22 changes: 13 additions & 9 deletions cdc_covidnet/delphi_cdc_covidnet/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@
This module should contain a function called `run_module`, that is executed
when the module is run with `python -m delphi_cdc_covidnet`.
"""
import logging
from datetime import datetime
from os import remove
from os.path import join
from typing import Dict, Any

from delphi_utils import get_structured_logger

from .covidnet import CovidNet
from .update_sensor import update_sensor

Expand All @@ -32,7 +33,9 @@ def run_module(params: Dict[str, Dict[str, Any]]):
- "wip_signal": list of str or bool, to be passed to delphi_utils.add_prefix.
- "input_cache_dir": str, directory to download source files.
"""
logging.basicConfig(level=logging.DEBUG)
logger = get_structured_logger(
__name__, filename=params["common"].get("log_filename"),
log_exceptions=params["common"].get("log_exceptions", True))

start_date = datetime.strptime(params["indicator"]["start_date"], "%Y-%m-%d")

Expand All @@ -42,23 +45,24 @@ def run_module(params: Dict[str, Dict[str, Any]]):
else:
end_date = datetime.strptime(params["indicator"]["end_date"], "%Y-%m-%d")

logging.info("start date:\t%s", start_date.date())
logging.info("end date:\t%s", end_date.date())
logger.info("start date:\t%s", start_date.date())
logger.info("end date:\t%s", end_date.date())

logging.info("outpath:\t%s", params["common"]["export_dir"])
logging.info("parallel:\t%s", params["indicator"]["parallel"])
logger.info("outpath:\t%s", params["common"]["export_dir"])
logger.info("parallel:\t%s", params["indicator"]["parallel"])

# Only geo is state, and no weekday adjustment for now
# COVID-NET data is by weeks anyway, not daily
logging.info("starting state, no adj")
logger.info("starting state, no adj")

# Download latest COVID-NET files into the cache directory first
mappings_file = join(params["indicator"]["input_cache_dir"], "init.json")
CovidNet.download_mappings(outfile=mappings_file)
_, mmwr_info, _ = CovidNet.read_mappings(mappings_file)
state_files = CovidNet.download_all_hosp_data(
mappings_file, params["indicator"]["input_cache_dir"],
parallel=params["indicator"]["parallel"])
parallel=params["indicator"]["parallel"],
logger=logger)

update_sensor(
state_files,
Expand All @@ -73,4 +77,4 @@ def run_module(params: Dict[str, Dict[str, Any]]):
for state_file in state_files:
remove(state_file)

logging.info("finished all")
logger.info("finished all")
6 changes: 4 additions & 2 deletions cdc_covidnet/tests/test_covidnet.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import json
import logging
from os.path import join, exists
from tempfile import TemporaryDirectory

Expand All @@ -7,6 +8,7 @@
from delphi_cdc_covidnet.api_config import APIConfig
from delphi_cdc_covidnet.covidnet import CovidNet

TEST_LOGGER = logging.getLogger()

class TestCovidNet:

Expand Down Expand Up @@ -65,14 +67,14 @@ def test_hosp_data(self):

# Non-parallel
state_files = CovidNet.download_all_hosp_data(
init_file, temp_dir, parallel=False)
init_file, temp_dir, TEST_LOGGER, parallel=False)
assert len(state_files) == num_states
for state_file in state_files:
assert exists(state_file)

# Parallel
state_files_par = CovidNet.download_all_hosp_data(
init_file, temp_dir, parallel=True)
init_file, temp_dir, TEST_LOGGER, parallel=True)
assert set(state_files) == set(state_files_par)
assert len(state_files_par) == num_states
for state_file in state_files_par:
Expand Down
3 changes: 2 additions & 1 deletion changehc/delphi_changehc/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,8 @@ def run_module(params: Dict[str, Dict[str, Any]]):
weekday,
numtype,
params["indicator"]["se"],
params["indicator"]["wip_signal"]
params["indicator"]["wip_signal"],
logger
)
if numtype == "covid":
data = load_combined_data(file_dict["denom"],
Expand Down
4 changes: 2 additions & 2 deletions changehc/delphi_changehc/sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ def backfill(
return new_num, new_den

@staticmethod
def fit(y_data, first_sensor_date, geo_id, num_col="num", den_col="den"):
def fit(y_data, first_sensor_date, geo_id, logger, num_col="num", den_col="den"):
"""Fitting routine.

Args:
Expand Down Expand Up @@ -121,7 +121,7 @@ def fit(y_data, first_sensor_date, geo_id, num_col="num", den_col="den"):
se_valid = valid_rates.eval('sqrt(rate * (1 - rate) / den)')
rate_data['se'] = se_valid

logging.debug("{0}: {1:.3f},[{2:.3f}]".format(
logger.debug("{0}: {1:.3f},[{2:.3f}]".format(
geo_id, rate_data['rate'][-1], rate_data['se'][-1]
))
return {"geo_id": geo_id,
Expand Down
26 changes: 15 additions & 11 deletions changehc/delphi_changehc/update_sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from .weekday import Weekday


def write_to_csv(df, geo_level, write_se, day_shift, out_name, output_path=".", start_date=None, end_date=None):
def write_to_csv(df, geo_level, write_se, day_shift, out_name, logger, output_path=".", start_date=None, end_date=None):
"""Write sensor values to csv.

Args:
Expand All @@ -47,15 +47,15 @@ def write_to_csv(df, geo_level, write_se, day_shift, out_name, output_path=".",
assert df[suspicious_se_mask].empty, " se contains suspiciously large values"
assert not df["se"].isna().any(), " se contains nan values"
if write_se:
logging.info("========= WARNING: WRITING SEs TO {0} =========".format(out_name))
logger.info("========= WARNING: WRITING SEs TO {0} =========".format(out_name))
else:
df.loc[:, "se"] = np.nan

assert not df["val"].isna().any(), " val contains nan values"
suspicious_val_mask = df["val"].gt(90)
if not df[suspicious_val_mask].empty:
for geo in df.loc[suspicious_val_mask, "geo_id"]:
logging.warning("value suspiciously high, {0}: {1}".format(
logger.warning("value suspiciously high, {0}: {1}".format(
geo, out_name
))

Expand All @@ -68,10 +68,10 @@ def write_to_csv(df, geo_level, write_se, day_shift, out_name, output_path=".",
sensor=out_name,
write_empty_days=True
)
logging.debug("wrote {0} rows for {1} {2}".format(
logger.debug("wrote {0} rows for {1} {2}".format(
df.size, df["geo_id"].unique().size, geo_level
))
logging.debug("wrote files to {0}".format(output_path))
logger.debug("wrote files to {0}".format(output_path))
return dates


Expand All @@ -87,7 +87,8 @@ def __init__(self,
weekday,
numtype,
se,
wip_signal):
wip_signal,
logger):
"""Init Sensor Updator.

Args:
Expand All @@ -100,7 +101,9 @@ def __init__(self,
numtype: type of count data used, one of ["covid", "cli"]
se: boolean to write out standard errors, if true, use an obfuscated name
wip_signal: Prefix for WIP signals
logger: the structured logger
"""
self.logger = logger
self.startdate, self.enddate, self.dropdate = [
pd.to_datetime(t) for t in (startdate, enddate, dropdate)]
# handle dates
Expand Down Expand Up @@ -149,7 +152,7 @@ def geo_reindex(self, data):
geo = self.geo
gmpr = GeoMapper()
if geo not in {"county", "state", "msa", "hrr", "nation", "hhs"}:
logging.error("{0} is invalid, pick one of 'county', "
self.logger.error("{0} is invalid, pick one of 'county', "
"'state', 'msa', 'hrr', 'hss','nation'".format(geo))
return False
if geo == "county":
Expand Down Expand Up @@ -201,12 +204,12 @@ def update_sensor(self,
sub_data.reset_index(level=0,inplace=True)
if self.weekday:
sub_data = Weekday.calc_adjustment(wd_params, sub_data)
res = CHCSensor.fit(sub_data, self.burnindate, geo_id)
res = CHCSensor.fit(sub_data, self.burnindate, geo_id, self.logger)
res = pd.DataFrame(res).loc[final_sensor_idxs]
dfs.append(res)
else:
n_cpu = min(10, cpu_count())
logging.debug("starting pool with {0} workers".format(n_cpu))
self.logger.debug("starting pool with {0} workers".format(n_cpu))
with Pool(n_cpu) as pool:
pool_results = []
for geo_id, sub_data in data_frame.groupby(level=0,as_index=False):
Expand All @@ -215,7 +218,7 @@ def update_sensor(self,
sub_data = Weekday.calc_adjustment(wd_params, sub_data)
pool_results.append(
pool.apply_async(
CHCSensor.fit, args=(sub_data, self.burnindate, geo_id,),
CHCSensor.fit, args=(sub_data, self.burnindate, geo_id, self.logger),
)
)
pool_results = [proc.get() for proc in pool_results]
Expand Down Expand Up @@ -244,7 +247,8 @@ def update_sensor(self,
write_se=self.se,
day_shift=Config.DAY_SHIFT,
out_name=signal,
output_path=output_path
output_path=output_path,
logger=self.logger
)
if len(dates) > 0:
stats.append((max(dates), len(dates)))
Expand Down
4 changes: 3 additions & 1 deletion changehc/tests/test_sensor.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# standard
import logging

import numpy as np
import numpy.random as nr
Expand All @@ -19,6 +20,7 @@
COVID_FILEPATH = PARAMS["indicator"]["input_covid_file"]
DENOM_FILEPATH = PARAMS["indicator"]["input_denom_file"]
DROP_DATE = pd.to_datetime(PARAMS["indicator"]["drop_date"])
TEST_LOGGER = logging.getLogger()

class TestLoadData:
combined_data = load_combined_data(DENOM_FILEPATH, COVID_FILEPATH, DROP_DATE,
Expand Down Expand Up @@ -56,7 +58,7 @@ def test_fit_fips(self):
for fips in all_fips:
sub_data = self.combined_data.loc[fips]
sub_data = sub_data.reindex(date_range, fill_value=0)
res0 = CHCSensor.fit(sub_data, date_range[0], fips)
res0 = CHCSensor.fit(sub_data, date_range[0], fips, TEST_LOGGER)

if np.isnan(res0["rate"]).all():
assert res0["incl"].sum() == 0
Expand Down
26 changes: 18 additions & 8 deletions changehc/tests/test_update_sensor.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# standard
import logging
from copy import deepcopy
import os
from os.path import join, exists
Expand Down Expand Up @@ -27,6 +28,7 @@
DENOM_FILEPATH = PARAMS["indicator"]["input_denom_file"]
DROP_DATE = pd.to_datetime(PARAMS["indicator"]["drop_date"])
OUTPATH="test_data/"
TEST_LOGGER = logging.getLogger()

class TestCHCSensorUpdator:
"""Tests for updating the sensors."""
Expand All @@ -53,7 +55,8 @@ def test_shift_dates(self):
self.weekday,
self.numtype,
self.se,
""
"",
TEST_LOGGER
)
## Test init
assert su_inst.startdate.month == 2
Expand All @@ -77,7 +80,8 @@ def test_geo_reindex(self):
self.weekday,
self.numtype,
self.se,
""
"",
TEST_LOGGER
)
su_inst.shift_dates()
test_data = pd.DataFrame({
Expand All @@ -103,7 +107,8 @@ def test_update_sensor(self):
self.weekday,
self.numtype,
self.se,
""
"",
TEST_LOGGER
)
# As of 3/3/21 (40c258a), this set of data has county outputting data, state and hhs not
# outputting data, and nation outputting data, which is undesirable. Ideal behaviour
Expand Down Expand Up @@ -149,7 +154,8 @@ def test_write_to_csv_results(self):
write_se=False,
day_shift=CONFIG.DAY_SHIFT,
out_name="name_of_signal",
output_path=td.name
output_path=td.name,
logger=TEST_LOGGER
)

# check outputs
Expand Down Expand Up @@ -203,7 +209,8 @@ def test_write_to_csv_with_se_results(self):
write_se=True,
day_shift=CONFIG.DAY_SHIFT,
out_name="name_of_signal",
output_path=td.name
output_path=td.name,
logger=TEST_LOGGER
)

# check outputs
Expand Down Expand Up @@ -243,7 +250,8 @@ def test_write_to_csv_wrong_results(self):
write_se=False,
day_shift=CONFIG.DAY_SHIFT,
out_name="name_of_signal",
output_path=td.name
output_path=td.name,
logger=TEST_LOGGER
)

# nan se for included loc-date
Expand All @@ -258,7 +266,8 @@ def test_write_to_csv_wrong_results(self):
write_se=True,
day_shift=CONFIG.DAY_SHIFT,
out_name="name_of_signal",
output_path=td.name
output_path=td.name,
logger=TEST_LOGGER
)

# large se value
Expand All @@ -273,7 +282,8 @@ def test_write_to_csv_wrong_results(self):
write_se=True,
day_shift=CONFIG.DAY_SHIFT,
out_name="name_of_signal",
output_path=td.name
output_path=td.name,
logger=TEST_LOGGER
)

td.cleanup()
Loading