11
11
# third party
12
12
import numpy as np
13
13
import pandas as pd
14
- from delphi_utils import GeoMapper , read_params , add_prefix
14
+ from delphi_utils import GeoMapper , add_prefix , create_export_csv
15
15
16
16
# first party
17
17
from .config import Config
20
20
from .weekday import Weekday
21
21
22
22
23
- def write_to_csv (output_dict , write_se , out_name , output_path = "." ):
23
+ def write_to_csv (df , geo_level , write_se , day_shift , out_name , output_path = "." , start_date = None , end_date = None ):
24
24
"""Write sensor values to csv.
25
25
26
26
Args:
27
- output_dict: dictionary containing sensor rates, se, unique dates, and unique geo_id
27
+ df: dataframe containing unique timestamp, unqiue geo_id, val, se, sample_size
28
+ geo_level: the geographic level being written e.g. county, state
28
29
write_se: boolean to write out standard errors, if true, use an obfuscated name
30
+ day_shift: a timedelta specifying the time shift to apply to the dates
29
31
out_name: name of the output file
30
32
output_path: outfile path to write the csv (default is current directory)
33
+ start_date: the first date of the dates to be written
34
+ end_date: the last date of the dates to be written
31
35
"""
36
+ df = df .copy ()
37
+
38
+ # shift dates forward for labeling
39
+ df ["timestamp" ] += day_shift
40
+ if start_date is None :
41
+ start_date = min (df ["timestamp" ])
42
+ if end_date is None :
43
+ end_date = max (df ["timestamp" ])
44
+
45
+ # suspicious value warnings
46
+ suspicious_se_mask = df ["se" ].gt (5 )
47
+ assert df [suspicious_se_mask ].empty , " se contains suspiciously large values"
48
+ assert not df ["se" ].isna ().any (), " se contains nan values"
32
49
if write_se :
33
50
logging .info ("========= WARNING: WRITING SEs TO {0} =========" .format (out_name ))
34
- geo_level = output_dict ["geo_level" ]
35
- dates = output_dict ["dates" ]
36
- geo_ids = output_dict ["geo_ids" ]
37
- all_rates = output_dict ["rates" ]
38
- all_se = output_dict ["se" ]
39
- all_include = output_dict ["include" ]
40
- out_n = 0
41
- for i , d in enumerate (dates ):
42
- filename = "%s/%s_%s_%s.csv" % (
43
- output_path ,
44
- (d + Config .DAY_SHIFT ).strftime ("%Y%m%d" ),
45
- geo_level ,
46
- out_name ,
47
- )
48
- with open (filename , "w" ) as outfile :
49
- outfile .write ("geo_id,val,se,direction,sample_size\n " )
50
- for geo_id in geo_ids :
51
- sensor = all_rates [geo_id ][i ]
52
- se = all_se [geo_id ][i ]
53
- if all_include [geo_id ][i ]:
54
- assert not np .isnan (sensor ), "value for included sensor is nan"
55
- assert not np .isnan (se ), "se for included sensor is nan"
56
- if sensor > 90 :
57
- logging .warning ("value suspiciously high, {0}: {1}" .format (
58
- geo_id , sensor
59
- ))
60
- assert se < 5 , f"se suspiciously high, { geo_id } : { se } "
61
- if write_se :
62
- assert sensor > 0 and se > 0 , "p=0, std_err=0 invalid"
63
- outfile .write (
64
- "%s,%f,%s,%s,%s\n " % (geo_id , sensor , se , NA , NA ))
65
- else :
66
- # for privacy reasons we will not report the standard error
67
- outfile .write (
68
- "%s,%f,%s,%s,%s\n " % (geo_id , sensor , NA , NA , NA )
69
- )
70
- out_n += 1
51
+ else :
52
+ df .loc [:, "se" ] = np .nan
53
+
54
+ assert not df ["val" ].isna ().any (), " val contains nan values"
55
+ suspicious_val_mask = df ["val" ].gt (90 )
56
+ if not df [suspicious_val_mask ].empty :
57
+ for geo in df .loc [suspicious_val_mask , "geo_id" ]:
58
+ logging .warning ("value suspiciously high, {0}: {1}" .format (
59
+ geo , out_name
60
+ ))
61
+
62
+ create_export_csv (
63
+ df ,
64
+ export_dir = output_path ,
65
+ geo_res = geo_level ,
66
+ start_date = start_date ,
67
+ end_date = end_date ,
68
+ sensor = out_name ,
69
+ write_empty_days = True
70
+ )
71
71
logging .debug ("wrote {0} rows for {1} {2}" .format (
72
- out_n , len ( geo_ids ) , geo_level
72
+ df . size , df [ "geo_id" ]. unique (). size , geo_level
73
73
))
74
+ logging .debug ("wrote files to {0}" .format (output_path ))
74
75
75
76
76
77
class CHCSensorUpdator : # pylint: disable=too-many-instance-attributes
@@ -176,12 +177,12 @@ def geo_reindex(self, data):
176
177
177
178
def update_sensor (self ,
178
179
data ,
179
- outpath ):
180
+ output_path ):
180
181
"""Generate sensor values, and write to csv format.
181
182
182
183
Args:
183
184
data: pd.DataFrame with columns num and den
184
- outpath : output path for the csv results
185
+ output_path : output path for the csv results
185
186
"""
186
187
self .shift_dates ()
187
188
final_sensor_idxs = (self .burn_in_dates >= self .startdate ) & \
@@ -193,19 +194,15 @@ def update_sensor(self,
193
194
# handle if we need to adjust by weekday
194
195
wd_params = Weekday .get_params (data_frame ) if self .weekday else None
195
196
# run sensor fitting code (maybe in parallel)
196
- sensor_rates = {}
197
- sensor_se = {}
198
- sensor_include = {}
199
197
if not self .parallel :
198
+ dfs = []
200
199
for geo_id , sub_data in data_frame .groupby (level = 0 ):
201
200
sub_data .reset_index (level = 0 ,inplace = True )
202
201
if self .weekday :
203
202
sub_data = Weekday .calc_adjustment (wd_params , sub_data )
204
203
res = CHCSensor .fit (sub_data , self .burnindate , geo_id )
205
- res = pd .DataFrame (res )
206
- sensor_rates [geo_id ] = np .array (res .loc [final_sensor_idxs ,"rate" ])
207
- sensor_se [geo_id ] = np .array (res .loc [final_sensor_idxs ,"se" ])
208
- sensor_include [geo_id ] = np .array (res .loc [final_sensor_idxs ,"incl" ])
204
+ res = pd .DataFrame (res ).loc [final_sensor_idxs ]
205
+ dfs .append (res )
209
206
else :
210
207
n_cpu = min (10 , cpu_count ())
211
208
logging .debug ("starting pool with {0} workers" .format (n_cpu ))
@@ -221,23 +218,29 @@ def update_sensor(self,
221
218
)
222
219
)
223
220
pool_results = [proc .get () for proc in pool_results ]
221
+ dfs = []
224
222
for res in pool_results :
225
- geo_id = res ["geo_id" ]
226
- res = pd .DataFrame (res )
227
- sensor_rates [geo_id ] = np .array (res .loc [final_sensor_idxs , "rate" ])
228
- sensor_se [geo_id ] = np .array (res .loc [final_sensor_idxs , "se" ])
229
- sensor_include [geo_id ] = np .array (res .loc [final_sensor_idxs , "incl" ])
230
- unique_geo_ids = list (sensor_rates .keys ())
231
- output_dict = {
232
- "rates" : sensor_rates ,
233
- "se" : sensor_se ,
234
- "dates" : self .sensor_dates ,
235
- "geo_ids" : unique_geo_ids ,
236
- "geo_level" : self .geo ,
237
- "include" : sensor_include ,
238
- }
223
+ res = pd .DataFrame (res ).loc [final_sensor_idxs ]
224
+ dfs .append (res )
225
+
226
+ # Form the output dataframe
227
+ df = pd .concat (dfs )
228
+ # sample size is never shared
229
+ df ["sample_size" ] = np .nan
230
+ # conform to naming expected by create_export_csv()
231
+ df = df .reset_index ().rename (columns = {"date" : "timestamp" , "rate" : "val" })
232
+ # df.loc[~df['incl'], ["val", "se"]] = np.nan # update to this line after nancodes get merged in
233
+ df = df [df ['incl' ]]
239
234
240
235
# write out results
241
236
for signal in self .updated_signal_names :
242
- write_to_csv (output_dict , self .se , signal , outpath )
243
- logging .debug ("wrote files to {0}" .format (outpath ))
237
+ write_to_csv (
238
+ df ,
239
+ geo_level = self .geo ,
240
+ start_date = min (self .sensor_dates ),
241
+ end_date = max (self .sensor_dates ),
242
+ write_se = self .se ,
243
+ day_shift = Config .DAY_SHIFT ,
244
+ out_name = signal ,
245
+ output_path = output_path
246
+ )
0 commit comments