Skip to content

Commit 94a6e4c

Browse files
authored
Merge branch 'main' into clean-up
2 parents df58333 + 0e76a43 commit 94a6e4c

File tree

18 files changed

+122
-172
lines changed

18 files changed

+122
-172
lines changed

_delphi_utils_python/delphi_utils/export.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from os.path import join
55
from typing import Optional
66

7+
from epiweeks import Week
78
import numpy as np
89
import pandas as pd
910

@@ -16,7 +17,8 @@ def create_export_csv(
1617
start_date: Optional[datetime] = None,
1718
end_date: Optional[datetime] = None,
1819
remove_null_samples: Optional[bool] = False,
19-
write_empty_days: Optional[bool] = False
20+
write_empty_days: Optional[bool] = False,
21+
weekly_dates = False,
2022
):
2123
"""Export data in the format expected by the Delphi API.
2224
@@ -65,10 +67,15 @@ def create_export_csv(
6567
dates = pd.date_range(start_date, end_date)
6668

6769
for date in dates:
70+
if weekly_dates:
71+
t = Week.fromdate(pd.to_datetime(str(date)))
72+
date_str = "weekly_" + str(t.year) + str(t.week).zfill(2)
73+
else:
74+
date_str = date.strftime('%Y%m%d')
6875
if metric is None:
69-
export_filename = f"{date.strftime('%Y%m%d')}_{geo_res}_{sensor}.csv"
76+
export_filename = f"{date_str}_{geo_res}_{sensor}.csv"
7077
else:
71-
export_filename = f"{date.strftime('%Y%m%d')}_{geo_res}_{metric}_{sensor}.csv"
78+
export_filename = f"{date_str}_{geo_res}_{metric}_{sensor}.csv"
7279
export_file = join(export_dir, export_filename)
7380
export_df = df[df["timestamp"] == date][["geo_id", "val", "se", "sample_size",]]
7481
if remove_null_samples:

_delphi_utils_python/setup.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
required = [
88
"boto3",
99
"covidcast",
10+
"epiweeks",
1011
"freezegun",
1112
"gitpython",
1213
"mock",

cdc_covidnet/delphi_cdc_covidnet/covidnet.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
"""
77

88
import json
9-
import logging
9+
from logging import Logger
1010
import os
1111
from typing import Tuple, List
1212
from multiprocessing import cpu_count, Pool
@@ -100,7 +100,7 @@ def download_hosp_data(
100100

101101
@staticmethod
102102
def download_all_hosp_data(
103-
mappings_file: str, cache_path: str, parallel: bool = False
103+
mappings_file: str, cache_path: str, logger: Logger, parallel: bool = False
104104
) -> List[str]:
105105
"""
106106
Download hospitalization data for all states listed in the mappings JSON file to disk.
@@ -146,7 +146,7 @@ def download_all_hosp_data(
146146
else:
147147
for args in state_args:
148148
CovidNet.download_hosp_data(*args)
149-
logging.debug("Downloading for nid=%s, cid=%s", args[0], args[1])
149+
logger.debug("Downloading for nid=%s, cid=%s", args[0], args[1])
150150

151151
return state_files
152152

cdc_covidnet/delphi_cdc_covidnet/run.py

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,13 @@
44
This module should contain a function called `run_module`, that is executed
55
when the module is run with `python -m delphi_cdc_covidnet`.
66
"""
7-
import logging
87
from datetime import datetime
98
from os import remove
109
from os.path import join
1110
from typing import Dict, Any
1211

12+
from delphi_utils import get_structured_logger
13+
1314
from .covidnet import CovidNet
1415
from .update_sensor import update_sensor
1516

@@ -32,7 +33,9 @@ def run_module(params: Dict[str, Dict[str, Any]]):
3233
- "wip_signal": list of str or bool, to be passed to delphi_utils.add_prefix.
3334
- "input_cache_dir": str, directory to download source files.
3435
"""
35-
logging.basicConfig(level=logging.DEBUG)
36+
logger = get_structured_logger(
37+
__name__, filename=params["common"].get("log_filename"),
38+
log_exceptions=params["common"].get("log_exceptions", True))
3639

3740
start_date = datetime.strptime(params["indicator"]["start_date"], "%Y-%m-%d")
3841

@@ -42,23 +45,24 @@ def run_module(params: Dict[str, Dict[str, Any]]):
4245
else:
4346
end_date = datetime.strptime(params["indicator"]["end_date"], "%Y-%m-%d")
4447

45-
logging.info("start date:\t%s", start_date.date())
46-
logging.info("end date:\t%s", end_date.date())
48+
logger.info("start date:\t%s", start_date.date())
49+
logger.info("end date:\t%s", end_date.date())
4750

48-
logging.info("outpath:\t%s", params["common"]["export_dir"])
49-
logging.info("parallel:\t%s", params["indicator"]["parallel"])
51+
logger.info("outpath:\t%s", params["common"]["export_dir"])
52+
logger.info("parallel:\t%s", params["indicator"]["parallel"])
5053

5154
# Only geo is state, and no weekday adjustment for now
5255
# COVID-NET data is by weeks anyway, not daily
53-
logging.info("starting state, no adj")
56+
logger.info("starting state, no adj")
5457

5558
# Download latest COVID-NET files into the cache directory first
5659
mappings_file = join(params["indicator"]["input_cache_dir"], "init.json")
5760
CovidNet.download_mappings(outfile=mappings_file)
5861
_, mmwr_info, _ = CovidNet.read_mappings(mappings_file)
5962
state_files = CovidNet.download_all_hosp_data(
6063
mappings_file, params["indicator"]["input_cache_dir"],
61-
parallel=params["indicator"]["parallel"])
64+
parallel=params["indicator"]["parallel"],
65+
logger=logger)
6266

6367
update_sensor(
6468
state_files,
@@ -73,4 +77,4 @@ def run_module(params: Dict[str, Dict[str, Any]]):
7377
for state_file in state_files:
7478
remove(state_file)
7579

76-
logging.info("finished all")
80+
logger.info("finished all")

cdc_covidnet/tests/test_covidnet.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import json
2+
import logging
23
from os.path import join, exists
34
from tempfile import TemporaryDirectory
45

@@ -7,6 +8,7 @@
78
from delphi_cdc_covidnet.api_config import APIConfig
89
from delphi_cdc_covidnet.covidnet import CovidNet
910

11+
TEST_LOGGER = logging.getLogger()
1012

1113
class TestCovidNet:
1214

@@ -65,14 +67,14 @@ def test_hosp_data(self):
6567

6668
# Non-parallel
6769
state_files = CovidNet.download_all_hosp_data(
68-
init_file, temp_dir, parallel=False)
70+
init_file, temp_dir, TEST_LOGGER, parallel=False)
6971
assert len(state_files) == num_states
7072
for state_file in state_files:
7173
assert exists(state_file)
7274

7375
# Parallel
7476
state_files_par = CovidNet.download_all_hosp_data(
75-
init_file, temp_dir, parallel=True)
77+
init_file, temp_dir, TEST_LOGGER, parallel=True)
7678
assert set(state_files) == set(state_files_par)
7779
assert len(state_files_par) == num_states
7880
for state_file in state_files_par:

changehc/delphi_changehc/run.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,8 @@ def run_module(params: Dict[str, Dict[str, Any]]):
173173
weekday,
174174
numtype,
175175
params["indicator"]["se"],
176-
params["indicator"]["wip_signal"]
176+
params["indicator"]["wip_signal"],
177+
logger
177178
)
178179
if numtype == "covid":
179180
data = load_combined_data(file_dict["denom"],

changehc/delphi_changehc/sensor.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ def backfill(
8787
return new_num, new_den
8888

8989
@staticmethod
90-
def fit(y_data, first_sensor_date, geo_id, num_col="num", den_col="den"):
90+
def fit(y_data, first_sensor_date, geo_id, logger, num_col="num", den_col="den"):
9191
"""Fitting routine.
9292
9393
Args:
@@ -121,7 +121,7 @@ def fit(y_data, first_sensor_date, geo_id, num_col="num", den_col="den"):
121121
se_valid = valid_rates.eval('sqrt(rate * (1 - rate) / den)')
122122
rate_data['se'] = se_valid
123123

124-
logging.debug("{0}: {1:.3f},[{2:.3f}]".format(
124+
logger.debug("{0}: {1:.3f},[{2:.3f}]".format(
125125
geo_id, rate_data['rate'][-1], rate_data['se'][-1]
126126
))
127127
return {"geo_id": geo_id,

changehc/delphi_changehc/update_sensor.py

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
from .weekday import Weekday
2121

2222

23-
def write_to_csv(df, geo_level, write_se, day_shift, out_name, output_path=".", start_date=None, end_date=None):
23+
def write_to_csv(df, geo_level, write_se, day_shift, out_name, logger, output_path=".", start_date=None, end_date=None):
2424
"""Write sensor values to csv.
2525
2626
Args:
@@ -43,15 +43,15 @@ def write_to_csv(df, geo_level, write_se, day_shift, out_name, output_path=".",
4343
assert df[suspicious_se_mask].empty, " se contains suspiciously large values"
4444
assert not df["se"].isna().any(), " se contains nan values"
4545
if write_se:
46-
logging.info("========= WARNING: WRITING SEs TO {0} =========".format(out_name))
46+
logger.info("========= WARNING: WRITING SEs TO {0} =========".format(out_name))
4747
else:
4848
df["se"] = np.nan
4949

5050
assert not df["val"].isna().any(), " val contains nan values"
5151
suspicious_val_mask = df["val"].gt(90)
5252
if not df[suspicious_val_mask].empty:
5353
for geo in df.loc[suspicious_val_mask, "geo_id"]:
54-
logging.warning("value suspiciously high, {0}: {1}".format(
54+
logger.warning("value suspiciously high, {0}: {1}".format(
5555
geo, out_name
5656
))
5757

@@ -64,10 +64,10 @@ def write_to_csv(df, geo_level, write_se, day_shift, out_name, output_path=".",
6464
sensor=out_name,
6565
write_empty_days=True
6666
)
67-
logging.debug("wrote {0} rows for {1} {2}".format(
67+
logger.debug("wrote {0} rows for {1} {2}".format(
6868
df.size, df["geo_id"].unique().size, geo_level
6969
))
70-
logging.debug("wrote files to {0}".format(output_path))
70+
logger.debug("wrote files to {0}".format(output_path))
7171
return dates
7272

7373

@@ -83,7 +83,8 @@ def __init__(self,
8383
weekday,
8484
numtype,
8585
se,
86-
wip_signal):
86+
wip_signal,
87+
logger):
8788
"""Init Sensor Updater.
8889
8990
Args:
@@ -96,7 +97,9 @@ def __init__(self,
9697
numtype: type of count data used, one of ["covid", "cli"]
9798
se: boolean to write out standard errors, if true, use an obfuscated name
9899
wip_signal: Prefix for WIP signals
100+
logger: the structured logger
99101
"""
102+
self.logger = logger
100103
self.startdate, self.enddate, self.dropdate = [
101104
pd.to_datetime(t) for t in (startdate, enddate, dropdate)]
102105
# handle dates
@@ -145,7 +148,7 @@ def geo_reindex(self, data):
145148
geo = self.geo
146149
gmpr = GeoMapper()
147150
if geo not in {"county", "state", "msa", "hrr", "nation", "hhs"}:
148-
logging.error("{0} is invalid, pick one of 'county', "
151+
self.logger.error("{0} is invalid, pick one of 'county', "
149152
"'state', 'msa', 'hrr', 'hss','nation'".format(geo))
150153
return False
151154
if geo == "county":
@@ -197,12 +200,12 @@ def update_sensor(self,
197200
sub_data.reset_index(level=0,inplace=True)
198201
if self.weekday:
199202
sub_data = Weekday.calc_adjustment(wd_params, sub_data)
200-
res = CHCSensor.fit(sub_data, self.burnindate, geo_id)
203+
res = CHCSensor.fit(sub_data, self.burnindate, geo_id, self.logger)
201204
res = pd.DataFrame(res).loc[final_sensor_idxs]
202205
dfs.append(res)
203206
else:
204207
n_cpu = min(10, cpu_count())
205-
logging.debug("starting pool with {0} workers".format(n_cpu))
208+
self.logger.debug("starting pool with {0} workers".format(n_cpu))
206209
with Pool(n_cpu) as pool:
207210
pool_results = []
208211
for geo_id, sub_data in data_frame.groupby(level=0,as_index=False):
@@ -211,7 +214,7 @@ def update_sensor(self,
211214
sub_data = Weekday.calc_adjustment(wd_params, sub_data)
212215
pool_results.append(
213216
pool.apply_async(
214-
CHCSensor.fit, args=(sub_data, self.burnindate, geo_id,),
217+
CHCSensor.fit, args=(sub_data, self.burnindate, geo_id, self.logger),
215218
)
216219
)
217220
pool_results = [proc.get() for proc in pool_results]

changehc/tests/test_sensor.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
# standard
2+
import logging
23

34
import numpy as np
45
import numpy.random as nr
@@ -19,6 +20,7 @@
1920
COVID_FILEPATH = PARAMS["indicator"]["input_covid_file"]
2021
DENOM_FILEPATH = PARAMS["indicator"]["input_denom_file"]
2122
DROP_DATE = pd.to_datetime(PARAMS["indicator"]["drop_date"])
23+
TEST_LOGGER = logging.getLogger()
2224

2325
class TestLoadData:
2426
combined_data = load_combined_data(DENOM_FILEPATH, COVID_FILEPATH, DROP_DATE,
@@ -56,7 +58,7 @@ def test_fit_fips(self):
5658
for fips in all_fips:
5759
sub_data = self.combined_data.loc[fips]
5860
sub_data = sub_data.reindex(date_range, fill_value=0)
59-
res0 = CHCSensor.fit(sub_data, date_range[0], fips)
61+
res0 = CHCSensor.fit(sub_data, date_range[0], fips, TEST_LOGGER)
6062

6163
if np.isnan(res0["rate"]).all():
6264
assert res0["incl"].sum() == 0

0 commit comments

Comments
 (0)