15
15
16
16
def outlier (df , iqr_list2 = None , replace = pd .DataFrame (), replace_use = False ):
17
17
df_fix_unstack = df .ffill ()
18
- #print(df_fix_unstack)
19
18
diff_df_small = df_fix_unstack .copy ().diff (1 ).bfill ()
20
- upper = 0.80
21
- lower = 0.20
19
+ upper = 0.75
20
+ lower = 0.25
22
21
df ['day' ] = [x .weekday () for x in list (df .index )]
23
22
diff_df2 = diff_df_small
24
23
diff_df2 ['day' ] = df ['day' ]
25
24
26
-
27
25
diff_df2_stack = diff_df2 .drop (columns = ['day' ]).stack ().reset_index ()
28
26
diff_df2_stack .columns = ['date' , 'state' , 'val' ]
29
27
diff_df2_stack ['weekday' ] = diff_df2_stack .date .dt .weekday
@@ -37,8 +35,8 @@ def outlier(df, iqr_list2=None, replace=pd.DataFrame(), replace_use=False):
37
35
for i , (_ , ldf ) in enumerate (iqr_spec_df2 .groupby (['weekday' ])):
38
36
iqr = ldf .groupby ('state' ).apply (lambda x : x .val .quantile ([lower , 0.5 , upper ]).T )
39
37
def fix_iqr (x ):
40
- upper = 0.80
41
- lower = 0.20
38
+ upper = 0.75
39
+ lower = 0.25
42
40
if x [upper ] == x [lower ]:
43
41
x [upper ] += 2
44
42
x [lower ] -= 2
@@ -75,10 +73,7 @@ def eval_row(row, replace_use, iqr_list2, replace, df_fix_unstack, diff_df2):
75
73
f = float (df_fix_unstack .loc [yesterday_date , row .state ] + (1 + iqr_df2 .loc [row .state , '0.5' ]))
76
74
df_fix_unstack .loc [row .date , row .state ] = max (f , 1.0 )
77
75
p2_outliers .append (row .copy ())
78
- #print('APPENDED', row, p2_outliers)
79
- #print("OUTLIERS EVAL ON ", diff_df2_stack)
80
76
diff_df2_stack .apply (lambda x :eval_row (x , replace_use , iqr_list2 , replace , df_fix_unstack , diff_df2 ,) , axis = 1 )
81
- #print("OUTLIERS", pd.DataFrame(p2_outliers))
82
77
return df_fix_unstack , iqr_list2 , pd .DataFrame (p2_outliers )
83
78
84
79
def spike_outliers (df ):
@@ -108,7 +103,6 @@ def outlier_flag(frame):
108
103
def spike (x ):
109
104
window_size = 7
110
105
shift_val = - 1 if window_size % 2 == 0 else 0
111
- size_cut , sig_cut , sig_consec = 10 , 3 , 2.25
112
106
group = x .to_frame ()
113
107
group .columns = ["value" ]
114
108
rolling_windows = group ["value" ].rolling (
@@ -188,9 +182,6 @@ def return_vals(val, ref_dist):
188
182
dist .name = 'dist'
189
183
pval = dist .copy ()
190
184
pval .name = 'pval'
191
- # print("PVAL", pval)
192
- # print("DIST", dist)
193
- # print("VAL", val)
194
185
for state in dist .index :
195
186
pval [state ] = (sum (ref_dist .astype (float ) < float (dist [state ])) / len (ref_dist ))
196
187
val = val .merge (dist , left_on = 'state' , right_index = True , how = 'outer' )
@@ -201,28 +192,19 @@ def eval_day(input_df, iqr_dict, ref_date, weekday_params, linear_coeff):
201
192
val = pd .DataFrame ()
202
193
val ['y_raw' ] = input_df .iloc [- 1 , :]
203
194
input_df = input_df .clip (0 )+ 1
204
- #print(time.time(), 't1')
205
195
lags_names = [f'lags_{ i } ' for i in range (1 , 8 )]
206
- #print("INPUT DF INTO CHAOS", input_df)
207
196
input_df , _ , flag_out = outlier (input_df , iqr_list2 = iqr_dict ['Before' ])
208
- #print(time.time(), 't2')
209
197
input_df = input_df .clip (0 )
210
198
input_df = Weekday .calc_adjustment (weekday_params .to_numpy ()
211
199
,input_df .copy ().reset_index (), list (input_df .columns ),
212
200
'ref' ).fillna (0 ).set_index ('ref' )
213
- #print(time.time(), 't3')
214
201
input_df , _ , flag_out1 = outlier (input_df , iqr_list2 = iqr_dict ['After' ], replace = spike_outliers (input_df ), replace_use = True )
215
- #print(time.time(), 't4')
216
202
y_predict = input_df .iloc [:, : ].apply (predict_val ,params_state = linear_coeff , lags_names = lags_names , axis = 0 ).T .clip (0 )
217
203
y_predict .columns = ['y_predict' ]
218
- #print(time.time(), 't5')
219
- #print(flag_out1)
220
- #print(flag_out)
221
204
val = val .merge (y_predict , left_index = True , right_index = True , how = 'outer' )
222
205
flag_out ['flag' ] = 'weekday outlier'
223
206
flag_out1 ['flag' ] = 'large_spikes'
224
207
ret_val = pd .concat ([flag_out , flag_out1 ], axis = 0 ).query ("date==@ref_date" )
225
- print (ret_val )
226
208
return input_df , val , ret_val
227
209
228
210
def flash_eval_lag (input_df , range_tup , lag , source , signal , logger ):
@@ -266,27 +248,17 @@ def flash_eval_lag(input_df, range_tup, lag, source, signal, logger):
266
248
# #Make corrections & predictions
267
249
input_df , raw_val , preprocess_outlier = eval_day (input_df , iqr_dict , ref_date , weekday_params , linear_coeff )
268
250
raw_val = raw_val .droplevel (level = 0 )
269
- # input_df = pd.read_csv('input_df.csv', index_col=0)
270
- # raw_val = pd.read_csv('raw_val.csv', index_col=0)
271
- # preprocess_outlier = pd.read_csv('preprocess_outlier.csv', index_col=0)
272
-
273
251
s_val = raw_val ['y_raw' ].to_frame ()
274
252
out_range_outlier = pd .concat ([s_val .query ('y_raw< @range_tup[0]' ), s_val .query ('y_raw> @range_tup[-1]' )], axis = 0 )
275
- #
276
253
# #Anomaly Detection
277
254
thresh = 0.01
278
- # print(input_df, raw_val, preprocess_outlier)
279
- # input_df.to_csv('input_df.csv')
280
- # raw_val.to_csv('raw_val.csv')
281
- # preprocess_outlier.to_csv('preprocess_outlier.csv')
282
- val_min = return_vals (raw_val , dist_min )[["pval" ]]#.to_frame()
283
- val_max = return_vals (raw_val , dist_max )[["pval" ]]#.to_frame()
255
+ val_min = return_vals (raw_val , dist_min )[["pval" ]]
256
+ val_max = return_vals (raw_val , dist_max )[["pval" ]]
284
257
val_min ['flags' ] = 'EVD_min'
285
258
val_max ['flags' ] = 'EVD_max'
286
259
val_min .columns = ['pval' , 'flags' ]
287
260
val_max .columns = ['pval' , 'flags' ]
288
261
def process_anomalies (y , t_skew = None ):
289
- print ('y is ' , y )
290
262
def standardize (y , t_skew = None ):
291
263
val = y .pval
292
264
if t_skew == None :
@@ -305,7 +277,7 @@ def standardize(y, t_skew=None):
305
277
y ['pval' ] = tmp_list
306
278
if not y .empty :
307
279
y = y [['pval' ]]
308
- return y #.reset_index(drop=True)
280
+ return y
309
281
310
282
min_thresh = thresh * 2
311
283
max_thresh = 1 - (thresh * 2 )
@@ -324,7 +296,6 @@ def standardize(y, t_skew=None):
324
296
iter_df = iter_df .iloc [:20 , :]
325
297
if iter_df .shape [0 ] > 0 :
326
298
for j , row in iter_df .reset_index ().iterrows ():
327
- print (row )
328
299
total_flags += 1
329
300
start_link = f"{ HTML_LINK } { ref_date .strftime ('%Y-%m_%d' )} ,{ report_date .strftime ('%Y-%m_%d' )} ,{ row .state } "
330
301
if 'pval' in iter_df .columns :
@@ -342,7 +313,6 @@ def flash_eval(params):
342
313
# if they aren't there, then regenerate them
343
314
344
315
#STEP 1: API Call for file creation for prior days
345
- print ('in eval' , params )
346
316
logger = get_structured_logger (
347
317
__name__ , filename = params ["common" ].get ("log_filename" ),
348
318
log_exceptions = params ["common" ].get ("log_exceptions" , True ))
@@ -353,13 +323,11 @@ def flash_eval(params):
353
323
#get most recent d from current files in cache
354
324
#filter those by value & if they are in the json params and then generate the API files
355
325
file_tup = load_all_files (params ["common" ]["export_dir" ], most_recent_d - pd .Timedelta ('14d' ), most_recent_d )
356
- #available_signals = list(set(["_".join(x.split("_")[2:]).split(".")[0] for (x, y, z) in file_tup])) #need the data anyway to ad din
357
- #signals = list(set(available_signals) & set(params["flash"]["signals"]))
358
326
signals = params ["flash" ]["signals" ]
359
327
for signal in signals :
360
328
curr_df = pd .DataFrame ()
361
- #TODO: change back to 14d below
362
- for date_s in pd .date_range (most_recent_d - pd .Timedelta ('2d ' ), most_recent_d - pd .Timedelta ('1d' )):
329
+ #TODO: Time data querying time
330
+ for date_s in pd .date_range (most_recent_d - pd .Timedelta ('14d ' ), most_recent_d - pd .Timedelta ('1d' )):
363
331
data = covidcast .signal (source , signal , date_s - pd .Timedelta (f'7d' ), date_s ,
364
332
geo_type = "nation" , as_of = date_s )
365
333
data2 = covidcast .signal (source , signal , date_s - pd .Timedelta (f'7d' ), date_s ,
0 commit comments