1
1
"Functions pretaining to running FlaSH daily."
2
- import time
2
+ # import time
3
3
import math
4
4
import numpy as np
5
5
import covidcast
@@ -41,7 +41,6 @@ def outlier(df, iqr_list=None, replace=pd.DataFrame(), replace_use=False):
41
41
df ['day' ] = [x .weekday () for x in list (df .index )]
42
42
diff_df2 = diff_df_small
43
43
diff_df2 ['day' ] = df ['day' ]
44
-
45
44
diff_df2_stack = diff_df2 .drop (columns = ['day' ]).stack ().reset_index ()
46
45
diff_df2_stack .columns = ['date' , 'state' , 'val' ]
47
46
diff_df2_stack ['weekday' ] = diff_df2_stack .date .dt .weekday
@@ -54,7 +53,7 @@ def outlier(df, iqr_list=None, replace=pd.DataFrame(), replace_use=False):
54
53
iqr_spec_df2 = diff_df2_stack .iloc [1 :, :]
55
54
for _ , (_ , ldf ) in enumerate (iqr_spec_df2 .groupby (['weekday' ])):
56
55
iqr = ldf .groupby ('state' ).apply (lambda x : x .val .quantile ([lower , 0.5 , upper ]).T )
57
- iqr = fix_iqr (iqr )
56
+ iqr = iqr . apply ( lambda x : fix_iqr (x ), axis = 1 )
58
57
iqr ['delta' ] = 1.5 * (np .ceil (iqr [upper ]) - np .floor (iqr [lower ]))
59
58
iqr ['lower_bound' ] = iqr [lower ] - iqr ['delta' ]
60
59
iqr ['upper_bound' ] = iqr [upper ] + iqr ['delta' ]
@@ -113,7 +112,7 @@ def spike(x):
113
112
window_size = 7
114
113
shift_val = - 1 if window_size % 2 == 0 else 0
115
114
group = x .to_frame ()
116
- group .columns = ["value" ]
115
+ group .columns = ["value" ]
117
116
rolling_windows = group ["value" ].rolling (
118
117
window_size , min_periods = window_size )
119
118
center_windows = group ["value" ].rolling (
@@ -127,7 +126,7 @@ def spike(x):
127
126
group ['state' ] = x .name
128
127
group_list .append (group )
129
128
130
- spike (all_frames_orig . T )
129
+ all_frames_orig . apply ( lambda x : spike (x ), axis = 0 ). to_list ( )
131
130
all_frames = pd .concat (group_list )
132
131
outlier_df = all_frames .reset_index ().sort_values (by = ['state' , 'ref' ]) \
133
132
.reset_index (drop = True ).copy ()
@@ -197,7 +196,7 @@ def predict_val(col, params_state, lags_names):
197
196
lags_names = lags_names , axis = 0 ).T .clip (0 )
198
197
y_predict .columns = ['y_predict' ]
199
198
y_values_df = y_values_df .merge (y_predict , left_index = True ,
200
- right_index = True , how = 'outer' ).droplevel (level = 0 )
199
+ right_index = True , how = 'outer' )# .droplevel(level=0)
201
200
weekday_outlier_flags ['flag' ] = 'weekday outlier'
202
201
large_spike_flags ['flag' ] = 'large_spikes'
203
202
flags_returned = pd .concat ([weekday_outlier_flags ,
@@ -215,8 +214,8 @@ def return_vals(val, ref_dist):
215
214
pval .name = 'pval'
216
215
for state in dist .index :
217
216
pval [state ] = (sum (ref_dist .astype (float ) < float (dist [state ])) / len (ref_dist ))
218
- val = val .merge (dist , left_on = 'state' , right_index = True , how = 'outer' )
219
- val = val .merge (pval , left_on = 'state' , right_index = True , how = 'outer' )
217
+ val = val .merge (dist , left_index = True , right_index = True , how = 'outer' )
218
+ val = val .merge (pval , left_index = True , right_index = True , how = 'outer' )
220
219
return val
221
220
222
221
def process_anomalies (y , t_skew = None ):
@@ -322,17 +321,17 @@ def flash_eval_lag(input_df, range_tup, lag, signal, logger):
322
321
if iter_df .shape [0 ] > 0 :
323
322
for _ , row in iter_df .reset_index ().iterrows ():
324
323
total_flags += 1
325
-
326
- start_link = f"{ starter_link } ,{ row .state } "
324
+ start_link = f"{ starter_link } ,{ row ['index' ]} "
327
325
if 'pval' in iter_df .columns :
328
- p_text += f"\t { start_link } |*{ row . state } , { row .pval } *>\n "
326
+ p_text += f"\t { start_link } |*{ row [ 'index' ] } , { row .pval } *>\n "
329
327
elif 'y_raw' in iter_df .columns :
330
- p_text += f"\t { start_link } |*{ row . state } , { row .y_raw } *>\n "
328
+ p_text += f"\t { start_link } |*{ row [ 'index' ] } , { row .y_raw } *>\n "
331
329
logger .info (name ,
332
330
payload = p_text ,
333
331
hits = iter_df .shape [0 ])
334
332
p_text = ""
335
333
334
+
336
335
def flash_eval (params ):
337
336
""" Evaluate most recent data using FlaSH.
338
337
First, get any necessary files from the cache or download from the API.
@@ -343,7 +342,6 @@ def flash_eval(params):
343
342
Ouput: None
344
343
"""
345
344
346
-
347
345
logger = get_structured_logger (
348
346
__name__ , filename = params ["common" ].get ("log_filename" ),
349
347
log_exceptions = params ["common" ].get ("log_exceptions" , True ))
@@ -358,7 +356,7 @@ def flash_eval(params):
358
356
signals = params ["flash" ]["signals" ]
359
357
for signal in signals :
360
358
curr_df = pd .DataFrame ()
361
- start_time = time .time ()
359
+ # start_time = time.time()
362
360
for date_s in pd .date_range (most_recent_d - pd .Timedelta ('14d' ),
363
361
most_recent_d - pd .Timedelta ('1d' )):
364
362
data = covidcast .signal (source , signal ,
@@ -395,12 +393,12 @@ def flash_eval(params):
395
393
data = data .set_index (['state' , 'lag' , 'ref' , 'as_of' ])
396
394
curr_df = pd .concat ([data , curr_df ])
397
395
curr_df = curr_df [~ curr_df .index .duplicated (keep = 'first' )].reset_index ()
398
- end_time = time .time ()
399
- print (f"Total Download Time: { start_time - end_time } " )
396
+ # end_time = time.time()
397
+ # print(f"Total Download Time: {start_time-end_time}")
400
398
401
399
402
400
for lag in range (1 ,8 ):
403
- start_time = time .time ()
401
+ # start_time = time.time()
404
402
date_range = list (pd .date_range (most_recent_d - pd .Timedelta (f'{ lag + 7 } d' ),
405
403
most_recent_d - pd .Timedelta (f'{ lag } d' )))
406
404
input_df = curr_df .query ('lag==@lag and ref in @date_range' ).sort_values ('ref' )
@@ -411,6 +409,7 @@ def flash_eval(params):
411
409
input_df = input_df .merge (date_df , left_index = True , right_index = True ,
412
410
how = 'right' ).ffill ().bfill ().reset_index ()
413
411
input_df = input_df .set_index (['ref' , 'state' ])[['value' ]].unstack ().ffill ().bfill ()
412
+ input_df .columns = input_df .columns .droplevel ()
414
413
flash_eval_lag (input_df , [0 , math .inf ], lag , signal , logger )
415
- end_time = time .time ()
416
- print (f"Time lag { lag } : { start_time - end_time } " )
414
+ # end_time = time.time()
415
+ # print(f"Time lag {lag}: {start_time - end_time}")
0 commit comments