1
1
"""
2
2
Generate EMR-hosp sensors.
3
-
4
3
Author: Maria Jahja
5
4
Created: 2020-06-01
6
5
"""
7
-
8
6
# standard packages
9
7
import logging
10
8
from datetime import timedelta
11
9
from multiprocessing import Pool , cpu_count
10
+ import covidcast
11
+ from delphi_utils import read_params
12
12
13
13
# third party
14
14
import numpy as np
15
15
import pandas as pd
16
-
17
16
# first party
18
17
from .config import Config , Constants
19
18
from .geo_maps import GeoMaps
20
19
from .load_data import load_combined_data
21
20
from .sensor import EMRHospSensor
22
21
from .weekday import Weekday
22
+ from .constants import SIGNALS , SMOOTHED , SMOOTHED_ADJ , HRR , NA , FIPS
23
23
24
24
from delphi_utils import GeoMapper
25
25
26
-
27
26
def write_to_csv (output_dict , write_se , out_name , output_path = "." ):
28
27
"""Write sensor values to csv.
29
-
30
28
Args:
31
29
output_dict: dictionary containing sensor rates, se, unique dates, and unique geo_id
32
30
write_se: boolean to write out standard errors, if true, use an obfuscated name
33
31
out_name: name of the output file
34
32
output_path: outfile path to write the csv (default is current directory)
35
33
"""
36
-
37
34
if write_se :
38
35
logging .info (f"========= WARNING: WRITING SEs TO { out_name } =========" )
39
-
40
36
geo_level = output_dict ["geo_level" ]
41
37
dates = output_dict ["dates" ]
42
38
geo_ids = output_dict ["geo_ids" ]
43
39
all_rates = output_dict ["rates" ]
44
40
all_se = output_dict ["se" ]
45
41
all_include = output_dict ["include" ]
46
-
47
42
out_n = 0
48
43
for i , d in enumerate (dates ):
49
44
filename = "%s/%s_%s_%s.csv" % (
@@ -52,33 +47,83 @@ def write_to_csv(output_dict, write_se, out_name, output_path="."):
52
47
geo_level ,
53
48
out_name ,
54
49
)
55
-
56
50
with open (filename , "w" ) as outfile :
57
51
outfile .write ("geo_id,val,se,direction,sample_size\n " )
58
-
59
52
for geo_id in geo_ids :
60
53
sensor = all_rates [geo_id ][i ]
61
54
se = all_se [geo_id ][i ]
62
-
63
55
if all_include [geo_id ][i ]:
64
56
assert not np .isnan (sensor ), "value for included sensor is nan"
65
57
assert not np .isnan (se ), "se for included sensor is nan"
66
58
if sensor > 90 :
67
59
logging .warning (f"value suspiciously high, { geo_id } : { sensor } " )
68
60
assert se < 5 , f"se suspiciously high, { geo_id } : { se } "
69
-
70
61
if write_se :
71
62
assert sensor > 0 and se > 0 , "p=0, std_err=0 invalid"
72
63
outfile .write (
73
- "%s,%f,%s,%s,%s\n " % (geo_id , sensor , se , "NA" , "NA" ))
64
+ "%s,%f,%s,%s,%s\n " % (geo_id , sensor , se , NA , NA ))
74
65
else :
75
66
# for privacy reasons we will not report the standard error
76
67
outfile .write (
77
- "%s,%f,%s,%s,%s\n " % (geo_id , sensor , "NA" , "NA" , "NA" )
68
+ "%s,%f,%s,%s,%s\n " % (geo_id , sensor , NA , NA , NA )
78
69
)
79
70
out_n += 1
80
71
logging .debug (f"wrote { out_n } rows for { len (geo_ids )} { geo_level } " )
81
72
73
+
74
+ def add_prefix (signal_names , wip_signal , prefix = "wip_" ):
75
+ """Adds prefix to signal if there is a WIP signal
76
+ Parameters
77
+ ----------
78
+ signal_names: List[str]
79
+ Names of signals to be exported
80
+ wip_signal : List[str] or bool
81
+ a list of wip signals: [], OR
82
+ all signals in the registry: True OR
83
+ only signals that have never been published: False
84
+ prefix : 'wip_'
85
+ prefix for new/non public signals
86
+ Returns
87
+ -------
88
+ List of signal names
89
+ wip/non wip signals for further computation
90
+ """
91
+ if wip_signal is True :
92
+ return [prefix + signal for signal in signal_names ]
93
+ if isinstance (wip_signal , list ):
94
+ make_wip = set (wip_signal )
95
+ return [
96
+ prefix + signal if signal in make_wip else signal
97
+ for signal in signal_names
98
+ ]
99
+ if wip_signal in {False , "" }:
100
+ return [
101
+ signal if public_signal (signal )
102
+ else prefix + signal
103
+ for signal in signal_names
104
+ ]
105
+ raise ValueError ("Supply True | False or '' or [] | list()" )
106
+
107
+
108
+ def public_signal (signal_ ):
109
+ """Checks if the signal name is already public using COVIDcast
110
+ Parameters
111
+ ----------
112
+ signal_ : str
113
+ Name of the signal
114
+ Returns
115
+ -------
116
+ bool
117
+ True if the signal is present
118
+ False if the signal is not present
119
+ """
120
+ epidata_df = covidcast .metadata ()
121
+ for index in range (len (epidata_df )):
122
+ if epidata_df ['signal' ][index ] == signal_ :
123
+ return True
124
+ return False
125
+
126
+
82
127
class EMRHospSensorUpdator :
83
128
84
129
def __init__ (self ,
@@ -88,10 +133,8 @@ def __init__(self,
88
133
geo ,
89
134
parallel ,
90
135
weekday ,
91
- se ,
92
- prefix = None ):
136
+ se ):
93
137
"""Init Sensor Updator
94
-
95
138
Args:
96
139
startdate: first sensor date (YYYY-mm-dd)
97
140
enddate: last sensor date (YYYY-mm-dd)
@@ -100,11 +143,8 @@ def __init__(self,
100
143
parallel: boolean to run the sensor update in parallel
101
144
weekday: boolean to adjust for weekday effects
102
145
se: boolean to write out standard errors, if true, use an obfuscated name
103
- prefix: string to prefix to output files (used for obfuscation in producing SEs)
104
-
105
146
"""
106
147
self .startdate , self .enddate , self .dropdate = [pd .to_datetime (t ) for t in (startdate , enddate , dropdate )]
107
-
108
148
# handle dates
109
149
assert (self .startdate > (Config .FIRST_DATA_DATE + Config .BURN_IN_PERIOD )
110
150
), f"not enough data to produce estimates starting { self .startdate } "
@@ -114,32 +154,28 @@ def __init__(self,
114
154
self .geo , self .parallel , self .weekday , self .se = geo .lower (), parallel , weekday , se
115
155
116
156
# output file naming
117
- out_name = "smoothed_adj_covid19" if self . weekday else "smoothed_covid19"
118
- if se :
119
- assert prefix is not None , "supply obfuscated prefix in params"
120
- out_name = prefix + "_" + out_name
121
- self . output_filename = out_name
122
-
157
+ signals = SIGNALS . copy ()
158
+ signals . remove ( SMOOTHED if self . weekday else SMOOTHED_ADJ )
159
+ signal_names = add_prefix (
160
+ signals ,
161
+ wip_signal = read_params ()[ "wip_signal" ])
162
+ self . updated_signal_names = signal_names
123
163
124
164
def shift_dates (self ):
125
165
"""shift estimates forward to account for time lag, compute burnindates, sensordates
126
166
"""
127
-
128
167
drange = lambda s , e : pd .date_range (start = s ,periods = (e - s ).days ,freq = 'D' )
129
168
self .startdate = self .startdate - Config .DAY_SHIFT
130
169
self .burnindate = self .startdate - Config .BURN_IN_PERIOD
131
170
self .fit_dates = drange (Config .FIRST_DATA_DATE , self .dropdate )
132
171
self .burn_in_dates = drange (self .burnindate , self .dropdate )
133
172
self .sensor_dates = drange (self .startdate , self .enddate )
134
173
return True
135
-
136
174
def geo_reindex (self ,data ):
137
175
"""Reindex based on geography, include all date, geo pairs
138
-
139
176
Args:
140
177
data: dataframe, the output of loadcombineddata
141
178
staticpath: path for the static geographic files
142
-
143
179
Returns:
144
180
dataframe
145
181
"""
@@ -157,92 +193,80 @@ def geo_reindex(self,data):
157
193
else :
158
194
logging .error (f"{ geo } is invalid, pick one of 'county', 'state', 'msa', 'hrr'" )
159
195
return False
160
-
161
196
self .unique_geo_ids = pd .unique (data_frame [geo ])
162
197
data_frame .set_index ([geo ,'date' ],inplace = True )
163
-
164
198
# for each location, fill in all missing dates with 0 values
165
199
multiindex = pd .MultiIndex .from_product ((self .unique_geo_ids , self .fit_dates ),
166
200
names = [geo , "date" ])
167
201
assert (len (multiindex ) <= (Constants .MAX_GEO [geo ] * len (self .fit_dates ))
168
202
), "more loc-date pairs than maximum number of geographies x number of dates"
169
-
170
203
# fill dataframe with missing dates using 0
171
204
data_frame = data_frame .reindex (multiindex , fill_value = 0 )
172
205
data_frame .fillna (0 , inplace = True )
173
206
return data_frame
174
207
175
208
209
+
176
210
def update_sensor (self ,
177
211
emr_filepath ,
178
212
claims_filepath ,
179
213
outpath ,
180
214
staticpath ):
181
215
"""Generate sensor values, and write to csv format.
182
-
183
216
Args:
184
217
emr_filepath: path to the aggregated EMR data
185
218
claims_filepath: path to the aggregated claims data
186
219
outpath: output path for the csv results
187
220
staticpath: path for the static geographic files
188
221
"""
189
-
190
222
self .shift_dates ()
191
223
final_sensor_idxs = (self .burn_in_dates >= self .startdate ) & (self .burn_in_dates <= self .enddate )
192
224
193
225
# load data
194
226
## JS: If the data is in fips then can we also put it into hrr?
195
227
base_geo = "hrr" if self .geo == "hrr" else "fips"
228
+ base_geo = HRR if self .geo == HRR else FIPS
196
229
data = load_combined_data (emr_filepath , claims_filepath , self .dropdate , base_geo )
197
230
198
231
data .reset_index (inplace = True )
199
232
data_frame = self .geo_reindex (data )
200
-
201
233
# handle if we need to adjust by weekday
202
234
wd_params = Weekday .get_params (data_frame ) if self .weekday else None
203
-
204
235
# run sensor fitting code (maybe in parallel)
205
236
sensor_rates = {}
206
237
sensor_se = {}
207
238
sensor_include = {}
208
239
if not self .parallel :
209
240
for geo_id , sub_data in data_frame .groupby (level = 0 ):
210
241
sub_data .reset_index (level = 0 ,inplace = True )
211
-
212
242
if self .weekday :
213
243
sub_data = Weekday .calc_adjustment (wd_params , sub_data )
214
-
215
244
res = EMRHospSensor .fit (sub_data , self .burnindate , geo_id )
216
245
res = pd .DataFrame (res )
217
246
sensor_rates [geo_id ] = np .array (res .loc [final_sensor_idxs ,"rate" ])
218
247
sensor_se [geo_id ] = np .array (res .loc [final_sensor_idxs ,"se" ])
219
248
sensor_include [geo_id ] = np .array (res .loc [final_sensor_idxs ,"incl" ])
220
-
221
249
else :
222
250
n_cpu = min (10 , cpu_count ())
223
251
logging .debug (f"starting pool with { n_cpu } workers" )
224
-
225
252
with Pool (n_cpu ) as pool :
226
253
pool_results = []
227
254
for geo_id , sub_data in data_frame .groupby (level = 0 ,as_index = False ):
228
255
sub_data .reset_index (level = 0 , inplace = True )
229
256
if self .weekday :
230
257
sub_data = Weekday .calc_adjustment (wd_params , sub_data )
231
-
232
258
pool_results .append (
233
259
pool .apply_async (
234
260
EMRHospSensor .fit , args = (sub_data , self .burnindate , geo_id ,),
235
261
)
236
262
)
237
263
pool_results = [proc .get () for proc in pool_results ]
238
-
239
264
for res in pool_results :
240
265
geo_id = res ["geo_id" ]
241
266
res = pd .DataFrame (res )
242
267
sensor_rates [geo_id ] = np .array (res .loc [final_sensor_idxs , "rate" ])
243
268
sensor_se [geo_id ] = np .array (res .loc [final_sensor_idxs , "se" ])
244
269
sensor_include [geo_id ] = np .array (res .loc [final_sensor_idxs , "incl" ])
245
-
246
270
unique_geo_ids = list (sensor_rates .keys ())
247
271
output_dict = {
248
272
"rates" : sensor_rates ,
@@ -254,6 +278,7 @@ def update_sensor(self,
254
278
}
255
279
256
280
# write out results
257
- write_to_csv (output_dict , self .se , self .output_filename , outpath )
281
+ for signal in self .updated_signal_names :
282
+ write_to_csv (output_dict , self .se , signal , outpath )
258
283
logging .debug (f"wrote files to { outpath } " )
259
- return True
284
+ return True
0 commit comments