Skip to content

Commit 008bd9b

Browse files
authored
Merge pull request #250 from cmu-delphi/wip_emr
EMR: standard signal names
2 parents 43df798 + a6e07b8 commit 008bd9b

File tree

10 files changed

+66304
-62
lines changed

10 files changed

+66304
-62
lines changed

emr_hosp/delphi_emr_hosp/constants.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
"""Registry for signal names and geo types"""
2+
SMOOTHED = "smoothed_covid19"
3+
SMOOTHED_ADJ = "smoothed_adj_covid19"
4+
SIGNALS = [SMOOTHED, SMOOTHED_ADJ]
5+
NA = "NA"
6+
HRR = "hrr"
7+
FIPS = "fips"

emr_hosp/delphi_emr_hosp/run.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,6 @@ def run_module():
6969
logging.info("parallel:\t\t%s", params["parallel"])
7070
logging.info("weekday:\t\t%s", params["weekday"])
7171
logging.info("se:\t\t\t%s", params["se"])
72-
logging.info("prefix:\t\t%s", params["obfuscated_prefix"])
7372

7473
## start generating
7574
for geo in params["geos"]:
@@ -85,8 +84,7 @@ def run_module():
8584
geo,
8685
params["parallel"],
8786
weekday,
88-
params["se"],
89-
params["obfuscated_prefix"]
87+
params["se"]
9088
)
9189
su_inst.update_sensor(
9290
params["input_emr_file"],
Lines changed: 71 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -1,49 +1,44 @@
11
"""
22
Generate EMR-hosp sensors.
3-
43
Author: Maria Jahja
54
Created: 2020-06-01
65
"""
7-
86
# standard packages
97
import logging
108
from datetime import timedelta
119
from multiprocessing import Pool, cpu_count
10+
import covidcast
11+
from delphi_utils import read_params
1212

1313
# third party
1414
import numpy as np
1515
import pandas as pd
16-
1716
# first party
1817
from .config import Config, Constants
1918
from .geo_maps import GeoMaps
2019
from .load_data import load_combined_data
2120
from .sensor import EMRHospSensor
2221
from .weekday import Weekday
22+
from .constants import SIGNALS, SMOOTHED, SMOOTHED_ADJ, HRR, NA, FIPS
2323

2424
from delphi_utils import GeoMapper
2525

26-
2726
def write_to_csv(output_dict, write_se, out_name, output_path="."):
2827
"""Write sensor values to csv.
29-
3028
Args:
3129
output_dict: dictionary containing sensor rates, se, unique dates, and unique geo_id
3230
write_se: boolean to write out standard errors, if true, use an obfuscated name
3331
out_name: name of the output file
3432
output_path: outfile path to write the csv (default is current directory)
3533
"""
36-
3734
if write_se:
3835
logging.info(f"========= WARNING: WRITING SEs TO {out_name} =========")
39-
4036
geo_level = output_dict["geo_level"]
4137
dates = output_dict["dates"]
4238
geo_ids = output_dict["geo_ids"]
4339
all_rates = output_dict["rates"]
4440
all_se = output_dict["se"]
4541
all_include = output_dict["include"]
46-
4742
out_n = 0
4843
for i, d in enumerate(dates):
4944
filename = "%s/%s_%s_%s.csv" % (
@@ -52,33 +47,83 @@ def write_to_csv(output_dict, write_se, out_name, output_path="."):
5247
geo_level,
5348
out_name,
5449
)
55-
5650
with open(filename, "w") as outfile:
5751
outfile.write("geo_id,val,se,direction,sample_size\n")
58-
5952
for geo_id in geo_ids:
6053
sensor = all_rates[geo_id][i]
6154
se = all_se[geo_id][i]
62-
6355
if all_include[geo_id][i]:
6456
assert not np.isnan(sensor), "value for included sensor is nan"
6557
assert not np.isnan(se), "se for included sensor is nan"
6658
if sensor > 90:
6759
logging.warning(f"value suspiciously high, {geo_id}: {sensor}")
6860
assert se < 5, f"se suspiciously high, {geo_id}: {se}"
69-
7061
if write_se:
7162
assert sensor > 0 and se > 0, "p=0, std_err=0 invalid"
7263
outfile.write(
73-
"%s,%f,%s,%s,%s\n" % (geo_id, sensor, se, "NA", "NA"))
64+
"%s,%f,%s,%s,%s\n" % (geo_id, sensor, se, NA, NA))
7465
else:
7566
# for privacy reasons we will not report the standard error
7667
outfile.write(
77-
"%s,%f,%s,%s,%s\n" % (geo_id, sensor, "NA", "NA", "NA")
68+
"%s,%f,%s,%s,%s\n" % (geo_id, sensor, NA, NA, NA)
7869
)
7970
out_n += 1
8071
logging.debug(f"wrote {out_n} rows for {len(geo_ids)} {geo_level}")
8172

73+
74+
def add_prefix(signal_names, wip_signal, prefix="wip_"):
75+
"""Adds prefix to signal if there is a WIP signal
76+
Parameters
77+
----------
78+
signal_names: List[str]
79+
Names of signals to be exported
80+
wip_signal : List[str] or bool
81+
a list of wip signals: [], OR
82+
all signals in the registry: True OR
83+
only signals that have never been published: False
84+
prefix : 'wip_'
85+
prefix for new/non public signals
86+
Returns
87+
-------
88+
List of signal names
89+
wip/non wip signals for further computation
90+
"""
91+
if wip_signal is True:
92+
return [prefix + signal for signal in signal_names]
93+
if isinstance(wip_signal, list):
94+
make_wip = set(wip_signal)
95+
return [
96+
prefix + signal if signal in make_wip else signal
97+
for signal in signal_names
98+
]
99+
if wip_signal in {False, ""}:
100+
return [
101+
signal if public_signal(signal)
102+
else prefix + signal
103+
for signal in signal_names
104+
]
105+
raise ValueError("Supply True | False or '' or [] | list()")
106+
107+
108+
def public_signal(signal_):
109+
"""Checks if the signal name is already public using COVIDcast
110+
Parameters
111+
----------
112+
signal_ : str
113+
Name of the signal
114+
Returns
115+
-------
116+
bool
117+
True if the signal is present
118+
False if the signal is not present
119+
"""
120+
epidata_df = covidcast.metadata()
121+
for index in range(len(epidata_df)):
122+
if epidata_df['signal'][index] == signal_:
123+
return True
124+
return False
125+
126+
82127
class EMRHospSensorUpdator:
83128

84129
def __init__(self,
@@ -88,10 +133,8 @@ def __init__(self,
88133
geo,
89134
parallel,
90135
weekday,
91-
se,
92-
prefix=None):
136+
se):
93137
"""Init Sensor Updator
94-
95138
Args:
96139
startdate: first sensor date (YYYY-mm-dd)
97140
enddate: last sensor date (YYYY-mm-dd)
@@ -100,11 +143,8 @@ def __init__(self,
100143
parallel: boolean to run the sensor update in parallel
101144
weekday: boolean to adjust for weekday effects
102145
se: boolean to write out standard errors, if true, use an obfuscated name
103-
prefix: string to prefix to output files (used for obfuscation in producing SEs)
104-
105146
"""
106147
self.startdate, self.enddate, self.dropdate = [pd.to_datetime(t) for t in (startdate, enddate, dropdate)]
107-
108148
# handle dates
109149
assert (self.startdate > (Config.FIRST_DATA_DATE + Config.BURN_IN_PERIOD)
110150
), f"not enough data to produce estimates starting {self.startdate}"
@@ -114,32 +154,28 @@ def __init__(self,
114154
self.geo, self.parallel, self.weekday, self.se = geo.lower(), parallel, weekday, se
115155

116156
# output file naming
117-
out_name = "smoothed_adj_covid19" if self.weekday else "smoothed_covid19"
118-
if se:
119-
assert prefix is not None, "supply obfuscated prefix in params"
120-
out_name = prefix + "_" + out_name
121-
self.output_filename = out_name
122-
157+
signals = SIGNALS.copy()
158+
signals.remove(SMOOTHED if self.weekday else SMOOTHED_ADJ)
159+
signal_names = add_prefix(
160+
signals,
161+
wip_signal=read_params()["wip_signal"])
162+
self.updated_signal_names = signal_names
123163

124164
def shift_dates(self):
125165
"""shift estimates forward to account for time lag, compute burnindates, sensordates
126166
"""
127-
128167
drange = lambda s, e: pd.date_range(start=s,periods=(e-s).days,freq='D')
129168
self.startdate = self.startdate - Config.DAY_SHIFT
130169
self.burnindate = self.startdate - Config.BURN_IN_PERIOD
131170
self.fit_dates = drange(Config.FIRST_DATA_DATE, self.dropdate)
132171
self.burn_in_dates = drange(self.burnindate, self.dropdate)
133172
self.sensor_dates = drange(self.startdate, self.enddate)
134173
return True
135-
136174
def geo_reindex(self,data):
137175
"""Reindex based on geography, include all date, geo pairs
138-
139176
Args:
140177
data: dataframe, the output of loadcombineddata
141178
staticpath: path for the static geographic files
142-
143179
Returns:
144180
dataframe
145181
"""
@@ -157,92 +193,80 @@ def geo_reindex(self,data):
157193
else:
158194
logging.error(f"{geo} is invalid, pick one of 'county', 'state', 'msa', 'hrr'")
159195
return False
160-
161196
self.unique_geo_ids = pd.unique(data_frame[geo])
162197
data_frame.set_index([geo,'date'],inplace=True)
163-
164198
# for each location, fill in all missing dates with 0 values
165199
multiindex = pd.MultiIndex.from_product((self.unique_geo_ids, self.fit_dates),
166200
names=[geo, "date"])
167201
assert (len(multiindex) <= (Constants.MAX_GEO[geo] * len(self.fit_dates))
168202
), "more loc-date pairs than maximum number of geographies x number of dates"
169-
170203
# fill dataframe with missing dates using 0
171204
data_frame = data_frame.reindex(multiindex, fill_value=0)
172205
data_frame.fillna(0, inplace=True)
173206
return data_frame
174207

175208

209+
176210
def update_sensor(self,
177211
emr_filepath,
178212
claims_filepath,
179213
outpath,
180214
staticpath):
181215
"""Generate sensor values, and write to csv format.
182-
183216
Args:
184217
emr_filepath: path to the aggregated EMR data
185218
claims_filepath: path to the aggregated claims data
186219
outpath: output path for the csv results
187220
staticpath: path for the static geographic files
188221
"""
189-
190222
self.shift_dates()
191223
final_sensor_idxs = (self.burn_in_dates >= self.startdate) & (self.burn_in_dates <= self.enddate)
192224

193225
# load data
194226
## JS: If the data is in fips then can we also put it into hrr?
195227
base_geo = "hrr" if self.geo == "hrr" else "fips"
228+
base_geo = HRR if self.geo == HRR else FIPS
196229
data = load_combined_data(emr_filepath, claims_filepath, self.dropdate, base_geo)
197230

198231
data.reset_index(inplace=True)
199232
data_frame = self.geo_reindex(data)
200-
201233
# handle if we need to adjust by weekday
202234
wd_params = Weekday.get_params(data_frame) if self.weekday else None
203-
204235
# run sensor fitting code (maybe in parallel)
205236
sensor_rates = {}
206237
sensor_se = {}
207238
sensor_include = {}
208239
if not self.parallel:
209240
for geo_id, sub_data in data_frame.groupby(level=0):
210241
sub_data.reset_index(level=0,inplace=True)
211-
212242
if self.weekday:
213243
sub_data = Weekday.calc_adjustment(wd_params, sub_data)
214-
215244
res = EMRHospSensor.fit(sub_data, self.burnindate, geo_id)
216245
res = pd.DataFrame(res)
217246
sensor_rates[geo_id] = np.array(res.loc[final_sensor_idxs,"rate"])
218247
sensor_se[geo_id] = np.array(res.loc[final_sensor_idxs,"se"])
219248
sensor_include[geo_id] = np.array(res.loc[final_sensor_idxs,"incl"])
220-
221249
else:
222250
n_cpu = min(10, cpu_count())
223251
logging.debug(f"starting pool with {n_cpu} workers")
224-
225252
with Pool(n_cpu) as pool:
226253
pool_results = []
227254
for geo_id, sub_data in data_frame.groupby(level=0,as_index=False):
228255
sub_data.reset_index(level=0, inplace=True)
229256
if self.weekday:
230257
sub_data = Weekday.calc_adjustment(wd_params, sub_data)
231-
232258
pool_results.append(
233259
pool.apply_async(
234260
EMRHospSensor.fit, args=(sub_data, self.burnindate, geo_id,),
235261
)
236262
)
237263
pool_results = [proc.get() for proc in pool_results]
238-
239264
for res in pool_results:
240265
geo_id = res["geo_id"]
241266
res = pd.DataFrame(res)
242267
sensor_rates[geo_id] = np.array(res.loc[final_sensor_idxs, "rate"])
243268
sensor_se[geo_id] = np.array(res.loc[final_sensor_idxs, "se"])
244269
sensor_include[geo_id] = np.array(res.loc[final_sensor_idxs, "incl"])
245-
246270
unique_geo_ids = list(sensor_rates.keys())
247271
output_dict = {
248272
"rates": sensor_rates,
@@ -254,6 +278,7 @@ def update_sensor(self,
254278
}
255279

256280
# write out results
257-
write_to_csv(output_dict, self.se, self.output_filename, outpath)
281+
for signal in self.updated_signal_names:
282+
write_to_csv(output_dict, self.se, signal, outpath)
258283
logging.debug(f"wrote files to {outpath}")
259-
return True
284+
return True

emr_hosp/params.json.template

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@
99
"n_backfill_days": 60,
1010
"n_waiting_days": 3,
1111
"se": false,
12-
"obfuscated_prefix": null,
1312
"parallel": false,
1413
"geos": ["state", "msa", "hrr", "county"],
15-
"weekday": [true, false]
14+
"weekday": [true, false],
15+
"wip_signal": ""
1616
}

emr_hosp/receiving/.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
*.csv

emr_hosp/setup.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,8 @@
88
"pytest",
99
"pytest-cov",
1010
"pylint",
11-
"delphi-utils"
11+
"delphi-utils",
12+
"covidcast"
1213
]
1314

1415
setup(

0 commit comments

Comments
 (0)