5
5
from scipy .stats import binom
6
6
import boto3
7
7
from delphi_utils .weekday import Weekday
8
- from .constants import HTML_LINK , STATES , BUCKET
8
+ from .constants import HTML_LINK , STATES
9
9
from .. import (
10
10
get_structured_logger ,
11
11
)
@@ -30,7 +30,8 @@ def split_reporting_schedule_dfs(input_df, flash_dir, lag):
30
30
rep_sched = rep_sched .drop ('min_cut' )
31
31
glob_out_list = []
32
32
non_daily_ar = []
33
- for i , df in rep_sched .groupby ('0' ):
33
+ rep_sched .columns = ['schedule' ]
34
+ for i , df in rep_sched .groupby ('schedule' ):
34
35
fixed_sum = []
35
36
columns = []
36
37
for col in input_df .columns :
@@ -48,50 +49,53 @@ def split_reporting_schedule_dfs(input_df, flash_dir, lag):
48
49
glob_out_list .append (fixed_sum_df )
49
50
else :
50
51
non_daily_ar .append (fixed_sum_df )
51
- return daily_df , pd .concat (non_daily_ar ,axis = 1 ) , pd .concat (glob_out_list , axis = 1 )
52
+ return ( daily_df , pd .concat (non_daily_ar ,axis = 1 ) , pd .concat (glob_out_list , axis = 1 ) )
52
53
53
54
54
- def bin_approach (y , yhat , pop , log = False ):
55
+ def bin_approach (df , log = False ):
55
56
"""Create test statistic.
56
57
57
58
Parameters
58
59
----------
60
+ df with columns of
59
61
y: observed values for streams
60
62
yhat: predicted values for streams
61
63
pop: population for a region
62
- log: difference between reporting and reference date
64
+
65
+ log: taking the log for the test statistic measure
63
66
64
67
Returns
65
68
-------
66
69
today's test-statistic values for the stream
67
70
"""
68
71
def ts_dist (x , y , n ):
69
72
"""Initialize test statistic distribution which is then vectorized."""
70
- # print(x, y, y/n, n, binom.cdf(x, int(n), y/ n)
71
- return binom . cdf ( x , int ( n ), y / n )
73
+ return binom .cdf (x , int (n ), y / n )
74
+
72
75
vec_ts_dist = np .vectorize (ts_dist )
73
76
if log :
74
- return vec_ts_dist (np .log (y + 2 ), np .log (yhat + 2 ), np .log (pd .Series (pop )+ 2 ))
75
- return vec_ts_dist (y , yhat , pop )
77
+ return pd .DataFrame (vec_ts_dist (np .log (df .y + 2 ),
78
+ np .log (df .yhat + 2 ), np .log (df ['pop' ] + 2 )),
79
+ index = df .index )
80
+ return pd .DataFrame (vec_ts_dist (df .y , df .yhat , df .pop ), index = df .index )
81
+
76
82
77
83
78
- def global_outlier_detect (df , mean , var ):
84
+
85
+ def outlier_detect (df ):
79
86
"""Determine global outliers by using abs(t-statistic) > 5.
80
87
81
88
Parameters
82
89
----------
83
- df: Current df to evaluate for global outliers
84
- mean: Mean needed for t-statistic calculation
85
- var: Variance for t-statistic calculation
90
+ df: Current df to evaluate for global outliers with columns
91
+ for mean and var.
86
92
87
93
Returns
88
94
-------
89
95
The columns that are global outliers.
90
96
"""
91
- all_columns = list (df .columns )
92
- mean = mean [all_columns ]
93
- var = var [all_columns ]
94
- return df .columns [((abs (df .T .iloc [:, 0 ].sort_index () - mean .sort_index ())/ var .clip (1 )).gt (5 ))]
97
+ df .columns = ['x' , 'mean' , 'var' ]
98
+ return df .index [((abs (df ['x' ] - df ['mean' ]) / (df ['var' ].clip (1 ))).gt (5 ))]
95
99
96
100
def apply_ar (last_7 , flash_dir , lag , weekday_correction , non_daily_df , fips_pop_table ):
97
101
"""Predict y_hat using an AR model.
@@ -109,13 +113,17 @@ def apply_ar(last_7, flash_dir, lag, weekday_correction, non_daily_df, fips_pop_
109
113
-------
110
114
ts_streams: return of test statistic for the day's steams.
111
115
"""
112
- lin_coeff = pd .read_csv (f'{ flash_dir } /lin_coeff_{ lag } .csv' , index_col = 0 )
113
116
y = pd .concat ([weekday_correction , non_daily_df ], axis = 1 )
114
- y_hat = pd .Series ([np .dot (lin_coeff [x ], last_7 [x ]) for x in y .columns ])
115
- ts_streams = pd .DataFrame (bin_approach (y , y_hat ,
116
- list (fips_pop_table [y .columns ].iloc [0 , :]), log = True ),
117
- columns = y .columns )
118
- return ts_streams
117
+ y .name = 'y'
118
+ lin_coeff = pd .read_csv (f'{ flash_dir } /lin_coeff_{ lag } .csv' , index_col = 0 )
119
+ y_hat = pd .Series ([np .dot (lin_coeff [x ], last_7 [x ]) for x in y .columns ], name = 'yhat' )
120
+ y_hat .index = y .columns
121
+ df_for_ts = y .T .merge (y_hat , left_index = True , right_index = True ).merge (fips_pop_table .T
122
+ , left_index = True , right_index = True )
123
+ df_for_ts .columns = ['y' , 'yhat' , 'pop' ]
124
+
125
+ return df_for_ts , bin_approach (df_for_ts , log = True )
126
+
119
127
120
128
def output (evd_ranking , day , lag , signal , logger ):
121
129
"""Write the top streams that warrant human inspection to the log.
@@ -138,6 +146,8 @@ def output(evd_ranking, day, lag, signal, logger):
138
146
if j < 30 :
139
147
start_link = f"{ starter_link } ,{ day .strftime ('%Y-%m_%d' )} ,{ index } "
140
148
p_text += f"\t { start_link } |*{ index } *, { '{:.2f}' .format (value )} >\n "
149
+ else :
150
+ break
141
151
name = f"Signal: { signal } Lag: { lag } "
142
152
logger .info (name , payload = p_text )
143
153
@@ -157,10 +167,10 @@ def evd_ranking_fn(ts_streams, flash_dir):
157
167
EVD_max = pd .read_csv (f'{ flash_dir } /max.csv' , index_col = 0 )
158
168
EVD_min = pd .read_csv (f'{ flash_dir } /min.csv' , index_col = 0 )
159
169
evd_ranking = pd .concat (
160
- [ts_streams .apply (lambda x : sum (x .values [0 ] <= EVD_min [ '0' ])
161
- / EVD_min ['0' ]. shape [ 0 ] , axis = 0 ).sort_values (),
162
- ts_streams .apply (lambda x : sum (x .values [0 ] >= EVD_max [ '0' ])
163
- / EVD_max ['0' ]. shape [ 0 ] , axis = 0 ).sort_values ()],
170
+ [ts_streams .apply (lambda x : ts_val (x .values [0 ],
171
+ EVD_min ['0' ]) , axis = 1 ).sort_values (),
172
+ ts_streams .apply (lambda x : 1 - ts_val (x .values [0 ],
173
+ EVD_max ['0' ]) , axis = 1 ).sort_values ()],
164
174
axis = 1 ).max (axis = 1 )
165
175
evd_ranking .name = 'evd_ranking'
166
176
return evd_ranking
@@ -189,7 +199,7 @@ def streams_groups_fn(stream, ts_streams):
189
199
for col , val in ts_streams .T .iterrows ():
190
200
if key == col [:2 ]:
191
201
total_dist = pd .concat ([group [0 ], streams_state ]).reset_index (drop = True )
192
- ranking_streams [col ] = sum ( total_dist < val [0 ]) / total_dist . shape [ 0 ]
202
+ ranking_streams [col ] = ts_val ( val [0 ], total_dist )
193
203
stream_group = pd .Series (ranking_streams , name = 'stream_group' )
194
204
return stream_group
195
205
@@ -213,6 +223,22 @@ def setup_fips(flash_dir):
213
223
fips_pop_table .columns = [STATE_to_fips [x ] if x in list (STATES )
214
224
else x for x in fips_pop_table .columns .droplevel ()]
215
225
return STATE_to_fips , fips_pop_table
226
+
227
+
228
+ def ts_val (val , dist ):
229
+ """Determine p-value from the test statistic distribution.
230
+
231
+ Parameters
232
+ ----------
233
+ val: The test statistic
234
+ dist: The distribution to compare to
235
+
236
+ Returns: p-value
237
+ -------
238
+
239
+ """
240
+ return sum (val <= dist ) / dist .shape [0 ]
241
+
216
242
def flash_eval (lag , day , input_df , signal , params , logger = None ):
217
243
"""Evaluate most recent data using FlaSH.
218
244
@@ -224,20 +250,20 @@ def flash_eval(lag, day, input_df, signal, params, logger=None):
224
250
Ouput:
225
251
None
226
252
"""
227
-
228
253
if not logger :
229
254
logger = get_structured_logger (
230
255
name = signal ,
231
256
filename = params ["common" ].get ("log_filename" , None ),
232
257
log_exceptions = params ["common" ].get ("log_exceptions" , True ))
233
258
234
- #TODOv4: Change these to a local dir
259
+ #TODOv4: Change these to a local dir or aws
235
260
flash_dir = f'flash_ref/{ signal } '
236
261
last_7 = pd .read_csv (f'{ flash_dir } /last_7_{ lag } .csv' , index_col = 0 ).astype (float )
237
262
wk_mean = pd .read_csv (f'{ flash_dir } /weekday_mean_df_{ lag } .csv' , index_col = 0 )
238
263
wk_var = pd .read_csv (f'{ flash_dir } /weekday_var_df_{ lag } .csv' , index_col = 0 )
239
264
weekday_params = pd .read_csv (f'{ flash_dir } /weekday_params_{ lag } .csv' , index_col = 0 )
240
265
summary_stats = pd .read_csv (f'{ flash_dir } /summary_stats_{ lag } .csv' , index_col = 0 )
266
+ summary_stats .index = ['0.25' , 'median' , '0.75' , 'mean' , 'var' ]
241
267
stream = pd .read_csv (f'{ flash_dir } /ret_df2_{ lag } .csv' , index_col = 0 )
242
268
243
269
@@ -256,10 +282,9 @@ def flash_eval(lag, day, input_df, signal, params, logger=None):
256
282
daily_update_df , non_daily_df_test , non_ar_df = split_reporting_schedule_dfs (input_df ,
257
283
flash_dir , lag )
258
284
# Weekday outlier [only for Daily Df]
259
- weekday_outlier = daily_update_df .columns [((abs (
260
- daily_update_df .T .sort_index ()[day ] - wk_mean .loc [day .day_of_week ,
261
- daily_update_df .columns ].sort_index ()) / wk_var .loc [day .day_of_week ,
262
- daily_update_df .columns ].clip (1 )).gt (5 ))]
285
+ weekday_outlier = outlier_detect (daily_update_df .T .merge (wk_mean .loc [day .day_of_week , :],
286
+ left_index = True , right_index = True ).merge (wk_var .loc [day .day_of_week , :],
287
+ left_index = True , right_index = True ))
263
288
264
289
# Make weekday correction for daily update
265
290
additive_factor = summary_stats [daily_update_df .columns ].iloc [4 , :]
@@ -271,17 +296,18 @@ def flash_eval(lag, day, input_df, signal, params, logger=None):
271
296
272
297
global_outlier_list = []
273
298
for df in [weekday_correction , non_daily_df_test , non_ar_df ]:
274
- global_outlier_list += list (
275
- global_outlier_detect (df , summary_stats [df .columns ].iloc [2 ],
276
- summary_stats [df .columns ].iloc [4 ]))
299
+ global_outlier_list += list (outlier_detect (df .T .merge (summary_stats [df .columns ].loc ['median'
300
+ ,:],left_index = True , right_index = True
301
+ ).merge (summary_stats [df .columns ].loc ['var' ,:],
302
+ left_index = True , right_index = True )))
277
303
278
304
# Apply AR
279
- ts_streams = apply_ar (last_7 , flash_dir , lag , weekday_correction ,
305
+ ts_streams , df_for_ts = apply_ar (last_7 , flash_dir , lag , weekday_correction ,
280
306
non_daily_df_test , fips_pop_table )
281
307
# find stream ranking (individual)
282
- stream_individual = ts_streams .apply (
283
- lambda x : sum ( x . values [ 0 ] <= stream [x .name ].dropna ()) /
284
- stream [ x . name ]. dropna (). shape [ 0 ], axis = 0 )
308
+ stream_individual = ts_streams .T . apply (lambda x : ts_val ( x . values [ 0 ],
309
+ stream [x .name ].dropna ()))
310
+
285
311
stream_individual .name = 'stream_individual'
286
312
287
313
@@ -306,15 +332,16 @@ def flash_eval(lag, day, input_df, signal, params, logger=None):
306
332
how = 'outer' ).merge (stream_group ,
307
333
left_index = True , right_index = True , how = 'outer' ).merge (evd_ranking ,
308
334
left_index = True , right_index = True , how = 'outer'
309
- )
335
+ ).merge (df_for_ts , left_index = True ,
336
+ right_index = True , how = 'outer' )
310
337
#if aws parameters are passed, save this dataframe to AWS
311
338
if params .get ('archive' , None ):
312
339
if params ['archive' ].get ("aws_credentials" , None ):
313
340
session = boto3 .Session (
314
341
aws_access_key_id = params ['archive' ]['aws_credentials' ]["aws_access_key_id" ],
315
342
aws_secret_access_key = params ['archive' ]['aws_credentials' ]["aws_secret_access_key" ])
316
343
s3 = session .resource ('s3' )
317
- s3 .Object (BUCKET ,
344
+ s3 .Object (params [ 'flash' ][ "aws_bucket" ] ,
318
345
f'flags-dev/flash_results/{ signal } _{ day .strftime ("%m_%d_%Y" )} _{ lag } .csv' ).put (
319
346
Body = type_of_outlier .to_csv (), ACL = 'public-read' )
320
347
0 commit comments