5
5
from scipy .stats import binom
6
6
import boto3
7
7
from delphi_utils .weekday import Weekday
8
- from .constants import HTML_LINK , STATES , BUCKET
8
+ from .constants import HTML_LINK , STATES
9
9
from .. import (
10
10
get_structured_logger ,
11
11
)
@@ -30,7 +30,8 @@ def split_reporting_schedule_dfs(input_df, flash_dir, lag):
30
30
rep_sched = rep_sched .drop ('min_cut' )
31
31
glob_out_list = []
32
32
non_daily_ar = []
33
- for i , df in rep_sched .groupby ('0' ):
33
+ rep_sched .columns = ['schedule' ]
34
+ for i , df in rep_sched .groupby ('schedule' ):
34
35
fixed_sum = []
35
36
columns = []
36
37
for col in input_df .columns :
@@ -48,50 +49,53 @@ def split_reporting_schedule_dfs(input_df, flash_dir, lag):
48
49
glob_out_list .append (fixed_sum_df )
49
50
else :
50
51
non_daily_ar .append (fixed_sum_df )
51
- return daily_df , pd .concat (non_daily_ar ,axis = 1 ) , pd .concat (glob_out_list , axis = 1 )
52
+ return ( daily_df , pd .concat (non_daily_ar ,axis = 1 ) , pd .concat (glob_out_list , axis = 1 ) )
52
53
53
54
54
- def bin_approach (y , yhat , pop , log = False ):
55
+ def bin_approach (df , log = False ):
55
56
"""Create test statistic.
56
57
57
58
Parameters
58
59
----------
60
+ df with columns of
59
61
y: observed values for streams
60
62
yhat: predicted values for streams
61
63
pop: population for a region
62
- log: difference between reporting and reference date
64
+
65
+ log: taking the log for the test statistic measure
63
66
64
67
Returns
65
68
-------
66
69
today's test-statistic values for the stream
67
70
"""
68
71
def ts_dist (x , y , n ):
69
72
"""Initialize test statistic distribution which is then vectorized."""
70
- # print(x, y, y/n, n, binom.cdf(x, int(n), y/ n)
71
- return binom . cdf ( x , int ( n ), y / n )
73
+ return binom .cdf (x , int (n ), y / n )
74
+
72
75
vec_ts_dist = np .vectorize (ts_dist )
73
76
if log :
74
- return vec_ts_dist (np .log (y + 2 ), np .log (yhat + 2 ), np .log (pd .Series (pop )+ 2 ))
75
- return vec_ts_dist (y , yhat , pop )
77
+ return pd .DataFrame (vec_ts_dist (np .log (df .y + 2 ),
78
+ np .log (df .yhat + 2 ), np .log (df ['pop' ] + 2 )),
79
+ index = df .index )
80
+ return pd .DataFrame (vec_ts_dist (df .y , df .yhat , df .pop ), index = df .index )
81
+
76
82
77
83
78
- def global_outlier_detect (df , mean , var ):
84
+
85
+ def outlier_detect (df ):
79
86
"""Determine global outliers by using abs(t-statistic) > 5.
80
87
81
88
Parameters
82
89
----------
83
- df: Current df to evaluate for global outliers
84
- mean: Mean needed for t-statistic calculation
85
- var: Variance for t-statistic calculation
90
+ df: Current df to evaluate for global outliers with columns
91
+ for mean and var.
86
92
87
93
Returns
88
94
-------
89
95
The columns that are global outliers.
90
96
"""
91
- all_columns = list (df .columns )
92
- mean = mean [all_columns ]
93
- var = var [all_columns ]
94
- return df .columns [((abs (df .T .iloc [:, 0 ].sort_index () - mean .sort_index ())/ var .clip (1 )).gt (5 ))]
97
+ df .columns = ['x' , 'mean' , 'var' ]
98
+ return df .index [((abs (df ['x' ] - df ['mean' ]) / (df ['var' ].clip (1 ))).gt (5 ))]
95
99
96
100
def apply_ar (last_7 , flash_dir , lag , weekday_correction , non_daily_df , fips_pop_table ):
97
101
"""Predict y_hat using an AR model.
@@ -109,13 +113,17 @@ def apply_ar(last_7, flash_dir, lag, weekday_correction, non_daily_df, fips_pop_
109
113
-------
110
114
ts_streams: return of test statistic for the day's steams.
111
115
"""
112
- lin_coeff = pd .read_csv (f'{ flash_dir } /lin_coeff_{ lag } .csv' , index_col = 0 )
113
116
y = pd .concat ([weekday_correction , non_daily_df ], axis = 1 )
114
- y_hat = pd .Series ([np .dot (lin_coeff [x ], last_7 [x ]) for x in y .columns ])
115
- ts_streams = pd .DataFrame (bin_approach (y , y_hat ,
116
- list (fips_pop_table [y .columns ].iloc [0 , :]), log = True ),
117
- columns = y .columns )
118
- return ts_streams
117
+ y .name = 'y'
118
+ lin_coeff = pd .read_csv (f'{ flash_dir } /lin_coeff_{ lag } .csv' , index_col = 0 )
119
+ y_hat = pd .Series ([np .dot (lin_coeff [x ], last_7 [x ]) for x in y .columns ], name = 'yhat' )
120
+ y_hat .index = y .columns
121
+ df_for_ts = y .T .merge (y_hat , left_index = True , right_index = True ).merge (fips_pop_table .T
122
+ , left_index = True , right_index = True )
123
+ df_for_ts .columns = ['y' , 'yhat' , 'pop' ]
124
+
125
+ return df_for_ts , bin_approach (df_for_ts , log = True )
126
+
119
127
120
128
def output (evd_ranking , day , lag , signal , logger ):
121
129
"""Write the top streams that warrant human inspection to the log.
@@ -159,10 +167,10 @@ def evd_ranking_fn(ts_streams, flash_dir):
159
167
EVD_max = pd .read_csv (f'{ flash_dir } /max.csv' , index_col = 0 )
160
168
EVD_min = pd .read_csv (f'{ flash_dir } /min.csv' , index_col = 0 )
161
169
evd_ranking = pd .concat (
162
- [ts_streams .apply (lambda x : sum (x .values [0 ] <= EVD_min [ '0' ])
163
- / EVD_min ['0' ]. shape [ 0 ] , axis = 0 ).sort_values (),
164
- ts_streams .apply (lambda x : sum (x .values [0 ] >= EVD_max [ '0' ])
165
- / EVD_max ['0' ]. shape [ 0 ] , axis = 0 ).sort_values ()],
170
+ [ts_streams .apply (lambda x : ts_val (x .values [0 ],
171
+ EVD_min ['0' ]) , axis = 1 ).sort_values (),
172
+ ts_streams .apply (lambda x : 1 - ts_val (x .values [0 ],
173
+ EVD_max ['0' ]) , axis = 1 ).sort_values ()],
166
174
axis = 1 ).max (axis = 1 )
167
175
evd_ranking .name = 'evd_ranking'
168
176
return evd_ranking
@@ -191,7 +199,7 @@ def streams_groups_fn(stream, ts_streams):
191
199
for col , val in ts_streams .T .iterrows ():
192
200
if key == col [:2 ]:
193
201
total_dist = pd .concat ([group [0 ], streams_state ]).reset_index (drop = True )
194
- ranking_streams [col ] = sum ( total_dist < val [0 ]) / total_dist . shape [ 0 ]
202
+ ranking_streams [col ] = ts_val ( val [0 ], total_dist )
195
203
stream_group = pd .Series (ranking_streams , name = 'stream_group' )
196
204
return stream_group
197
205
@@ -215,6 +223,22 @@ def setup_fips(flash_dir):
215
223
fips_pop_table .columns = [STATE_to_fips [x ] if x in list (STATES )
216
224
else x for x in fips_pop_table .columns .droplevel ()]
217
225
return STATE_to_fips , fips_pop_table
226
+
227
+
228
+ def ts_val (val , dist ):
229
+ """Determine p-value from the test statistic distribution.
230
+
231
+ Parameters
232
+ ----------
233
+ val: The test statistic
234
+ dist: The distribution to compare to
235
+
236
+ Returns: p-value
237
+ -------
238
+
239
+ """
240
+ return sum (val <= dist ) / dist .shape [0 ]
241
+
218
242
def flash_eval (lag , day , input_df , signal , params , logger = None ):
219
243
"""Evaluate most recent data using FlaSH.
220
244
@@ -226,20 +250,20 @@ def flash_eval(lag, day, input_df, signal, params, logger=None):
226
250
Ouput:
227
251
None
228
252
"""
229
- print ("RF" )
230
253
if not logger :
231
254
logger = get_structured_logger (
232
255
name = signal ,
233
256
filename = params ["common" ].get ("log_filename" , None ),
234
257
log_exceptions = params ["common" ].get ("log_exceptions" , True ))
235
258
236
- #TODOv4: Change these to a local dir
259
+ #TODOv4: Change these to a local dir or aws
237
260
flash_dir = f'flash_ref/{ signal } '
238
261
last_7 = pd .read_csv (f'{ flash_dir } /last_7_{ lag } .csv' , index_col = 0 ).astype (float )
239
262
wk_mean = pd .read_csv (f'{ flash_dir } /weekday_mean_df_{ lag } .csv' , index_col = 0 )
240
263
wk_var = pd .read_csv (f'{ flash_dir } /weekday_var_df_{ lag } .csv' , index_col = 0 )
241
264
weekday_params = pd .read_csv (f'{ flash_dir } /weekday_params_{ lag } .csv' , index_col = 0 )
242
265
summary_stats = pd .read_csv (f'{ flash_dir } /summary_stats_{ lag } .csv' , index_col = 0 )
266
+ summary_stats .index = ['0.25' , 'median' , '0.75' , 'mean' , 'var' ]
243
267
stream = pd .read_csv (f'{ flash_dir } /ret_df2_{ lag } .csv' , index_col = 0 )
244
268
245
269
@@ -258,10 +282,9 @@ def flash_eval(lag, day, input_df, signal, params, logger=None):
258
282
daily_update_df , non_daily_df_test , non_ar_df = split_reporting_schedule_dfs (input_df ,
259
283
flash_dir , lag )
260
284
# Weekday outlier [only for Daily Df]
261
- weekday_outlier = daily_update_df .columns [((abs (
262
- daily_update_df .T .sort_index ()[day ] - wk_mean .loc [day .day_of_week ,
263
- daily_update_df .columns ].sort_index ()) / wk_var .loc [day .day_of_week ,
264
- daily_update_df .columns ].clip (1 )).gt (5 ))]
285
+ weekday_outlier = outlier_detect (daily_update_df .T .merge (wk_mean .loc [day .day_of_week , :],
286
+ left_index = True , right_index = True ).merge (wk_var .loc [day .day_of_week , :],
287
+ left_index = True , right_index = True ))
265
288
266
289
# Make weekday correction for daily update
267
290
additive_factor = summary_stats [daily_update_df .columns ].iloc [4 , :]
@@ -273,17 +296,18 @@ def flash_eval(lag, day, input_df, signal, params, logger=None):
273
296
274
297
global_outlier_list = []
275
298
for df in [weekday_correction , non_daily_df_test , non_ar_df ]:
276
- global_outlier_list += list (
277
- global_outlier_detect (df , summary_stats [df .columns ].iloc [2 ],
278
- summary_stats [df .columns ].iloc [4 ]))
299
+ global_outlier_list += list (outlier_detect (df .T .merge (summary_stats [df .columns ].loc ['median'
300
+ ,:],left_index = True , right_index = True
301
+ ).merge (summary_stats [df .columns ].loc ['var' ,:],
302
+ left_index = True , right_index = True )))
279
303
280
304
# Apply AR
281
- ts_streams = apply_ar (last_7 , flash_dir , lag , weekday_correction ,
305
+ ts_streams , df_for_ts = apply_ar (last_7 , flash_dir , lag , weekday_correction ,
282
306
non_daily_df_test , fips_pop_table )
283
307
# find stream ranking (individual)
284
- stream_individual = ts_streams .apply (
285
- lambda x : sum ( x . values [ 0 ] <= stream [x .name ].dropna ()) /
286
- stream [ x . name ]. dropna (). shape [ 0 ], axis = 0 )
308
+ stream_individual = ts_streams .T . apply (lambda x : ts_val ( x . values [ 0 ],
309
+ stream [x .name ].dropna ()))
310
+
287
311
stream_individual .name = 'stream_individual'
288
312
289
313
@@ -308,15 +332,16 @@ def flash_eval(lag, day, input_df, signal, params, logger=None):
308
332
how = 'outer' ).merge (stream_group ,
309
333
left_index = True , right_index = True , how = 'outer' ).merge (evd_ranking ,
310
334
left_index = True , right_index = True , how = 'outer'
311
- )
335
+ ).merge (df_for_ts , left_index = True ,
336
+ right_index = True , how = 'outer' )
312
337
#if aws parameters are passed, save this dataframe to AWS
313
338
if params .get ('archive' , None ):
314
339
if params ['archive' ].get ("aws_credentials" , None ):
315
340
session = boto3 .Session (
316
341
aws_access_key_id = params ['archive' ]['aws_credentials' ]["aws_access_key_id" ],
317
342
aws_secret_access_key = params ['archive' ]['aws_credentials' ]["aws_secret_access_key" ])
318
343
s3 = session .resource ('s3' )
319
- s3 .Object (BUCKET ,
344
+ s3 .Object (params [ 'flash' ][ "aws_bucket" ] ,
320
345
f'flags-dev/flash_results/{ signal } _{ day .strftime ("%m_%d_%Y" )} _{ lag } .csv' ).put (
321
346
Body = type_of_outlier .to_csv (), ACL = 'public-read' )
322
347
0 commit comments