5
5
import pandas as pd
6
6
from .errors import ValidationFailure , APIDataFetchError
7
7
from .datafetcher import get_geo_signal_combos , threaded_api_calls
8
- from .utils import relative_difference_by_min , TimeWindow
8
+ from .utils import relative_difference_by_min , TimeWindow , lag_converter
9
9
10
10
11
11
class DynamicValidator :
@@ -27,8 +27,10 @@ class Parameters:
27
27
max_check_lookbehind : timedelta
28
28
# names of signals that are smoothed (7-day avg, etc)
29
29
smoothed_signals : Set [str ]
30
- # how many days behind do we expect each signal to be
31
- expected_lag : Dict [str , int ]
30
+ # maximum number of days behind do we expect each signal to be
31
+ max_expected_lag : Dict [str , int ]
32
+ # minimum number of days behind do we expect each signal to be
33
+ min_expected_lag : Dict [str , int ]
32
34
33
35
def __init__ (self , params ):
34
36
"""
@@ -50,7 +52,10 @@ def __init__(self, params):
50
52
max_check_lookbehind = timedelta (
51
53
days = dynamic_params .get ("ref_window_size" , 7 )),
52
54
smoothed_signals = set (dynamic_params .get ("smoothed_signals" , [])),
53
- expected_lag = dynamic_params .get ("expected_lag" , dict ())
55
+ min_expected_lag = lag_converter (common_params .get (
56
+ "min_expected_lag" , dict ())),
57
+ max_expected_lag = lag_converter (common_params .get (
58
+ "max_expected_lag" , dict ()))
54
59
)
55
60
56
61
def validate (self , all_frames , report ):
@@ -64,15 +69,6 @@ def validate(self, all_frames, report):
64
69
report: ValidationReport
65
70
report to which the results of these checks will be added
66
71
"""
67
- # recent_lookbehind: start from the check date and working backward in time,
68
- # how many days at a time do we want to check for anomalies?
69
- # Choosing 1 day checks just the daily data.
70
- recent_lookbehind = timedelta (days = 1 )
71
-
72
- # semirecent_lookbehind: starting from the check date and working backward
73
- # in time, how many days do we use to form the reference statistics.
74
- semirecent_lookbehind = timedelta (days = 7 )
75
-
76
72
# Get 14 days prior to the earliest list date
77
73
outlier_lookbehind = timedelta (days = 14 )
78
74
@@ -117,7 +113,7 @@ def validate(self, all_frames, report):
117
113
118
114
report .increment_total_checks ()
119
115
if isinstance (api_df_or_error , APIDataFetchError ):
120
- report .raised_errors . append (api_df_or_error )
116
+ report .add_raised_error (api_df_or_error )
121
117
continue
122
118
123
119
# Outlier dataframe
@@ -142,55 +138,13 @@ def validate(self, all_frames, report):
142
138
# Check data from a group of dates against recent (previous 7 days,
143
139
# by default) data from the API.
144
140
for checking_date in self .params .time_window .date_seq :
145
- recent_cutoff_date = checking_date - \
146
- recent_lookbehind + timedelta (days = 1 )
147
- recent_df = geo_sig_df .query (
148
- 'time_value <= @checking_date & time_value >= @recent_cutoff_date' )
149
-
150
- report .increment_total_checks ()
151
-
152
- if recent_df .empty :
153
- report .add_raised_error (
154
- ValidationFailure ("check_missing_geo_sig_date_combo" ,
155
- checking_date ,
156
- geo_type ,
157
- signal_type ,
158
- "test data for a given checking date-geo type-signal type"
159
- " combination is missing. Source data may be missing"
160
- " for one or more dates" ))
161
- continue
162
141
163
- # Reference dataframe runs backwards from the recent_cutoff_date
164
- #
165
- # These variables are interpolated into the call to `api_df_or_error.query()`
166
- # below but pylint doesn't recognize that.
167
- # pylint: disable=unused-variable
168
- reference_start_date = recent_cutoff_date - \
169
- min (semirecent_lookbehind , self .params .max_check_lookbehind ) - \
170
- timedelta (days = 1 )
171
- if signal_type in self .params .smoothed_signals :
172
- # Add an extra 7 days to the reference period.
173
- reference_start_date = reference_start_date - \
174
- timedelta (days = 7 )
175
-
176
- reference_end_date = recent_cutoff_date - timedelta (days = 1 )
177
- # pylint: enable=unused-variable
178
-
179
- # Subset API data to relevant range of dates.
180
- reference_api_df = api_df_or_error .query (
181
- "time_value >= @reference_start_date & time_value <= @reference_end_date" )
182
-
183
- report .increment_total_checks ()
184
-
185
- if reference_api_df .empty :
186
- report .add_raised_error (
187
- ValidationFailure ("empty_reference_data" ,
188
- checking_date ,
189
- geo_type ,
190
- signal_type ,
191
- "reference data is empty; comparative checks could not "
192
- "be performed" ))
142
+ create_dfs_or_error = self .create_dfs (
143
+ geo_sig_df , api_df_or_error , checking_date , geo_type , signal_type , report )
144
+
145
+ if not create_dfs_or_error :
193
146
continue
147
+ recent_df , reference_api_df = create_dfs_or_error
194
148
195
149
self .check_max_date_vs_reference (
196
150
recent_df , reference_api_df , checking_date , geo_type , signal_type , report )
@@ -207,8 +161,9 @@ def validate(self, all_frames, report):
207
161
break
208
162
209
163
def check_min_allowed_max_date (self , max_date , geo_type , signal_type , report ):
210
- """
211
- Check if time since data was generated is reasonable or too long ago.
164
+ """Check if time since data was generated is reasonable or too long ago.
165
+
166
+ The most recent data should be at least max_expected_lag from generation date
212
167
213
168
Arguments:
214
169
- max_date: date of most recent data to be validated; datetime format.
@@ -219,11 +174,10 @@ def check_min_allowed_max_date(self, max_date, geo_type, signal_type, report):
219
174
Returns:
220
175
- None
221
176
"""
222
- thres = timedelta (
223
- days = self .params .expected_lag [signal_type ] if signal_type in self .params .expected_lag
224
- else 1 )
177
+ min_thres = timedelta (days = self .params .max_expected_lag .get (
178
+ signal_type , self .params .max_expected_lag .get ('all' , 10 )))
225
179
226
- if max_date < self .params .generation_date - thres :
180
+ if max_date < self .params .generation_date - min_thres :
227
181
report .add_raised_error (
228
182
ValidationFailure ("check_min_max_date" ,
229
183
geo_type = geo_type ,
@@ -233,8 +187,9 @@ def check_min_allowed_max_date(self, max_date, geo_type, signal_type, report):
233
187
report .increment_total_checks ()
234
188
235
189
def check_max_allowed_max_date (self , max_date , geo_type , signal_type , report ):
236
- """
237
- Check if time since data was generated is reasonable or too recent.
190
+ """Check if time since data was generated is reasonable or too recent.
191
+
192
+ The most recent data should be at most min_expected_lag from generation date
238
193
239
194
Arguments:
240
195
- max_date: date of most recent data to be validated; datetime format.
@@ -245,7 +200,10 @@ def check_max_allowed_max_date(self, max_date, geo_type, signal_type, report):
245
200
Returns:
246
201
- None
247
202
"""
248
- if max_date > self .params .generation_date :
203
+ max_thres = timedelta (days = self .params .min_expected_lag .get (
204
+ signal_type , self .params .min_expected_lag .get ('all' , 1 )))
205
+
206
+ if max_date > self .params .generation_date - max_thres :
249
207
report .add_raised_error (
250
208
ValidationFailure ("check_max_max_date" ,
251
209
geo_type = geo_type ,
@@ -254,6 +212,113 @@ def check_max_allowed_max_date(self, max_date, geo_type, signal_type, report):
254
212
255
213
report .increment_total_checks ()
256
214
215
+ def create_dfs (self , geo_sig_df , api_df_or_error , checking_date , geo_type , signal_type , report ):
216
+ """Create recent_df and reference_api_df from params.
217
+
218
+ Raises error if recent_df is empty.
219
+
220
+ Arguments:
221
+ - geo_sig_df: Pandas dataframe of test data
222
+ - api_df_or_error: pandas dataframe of reference data, either from the
223
+ COVIDcast API or semirecent data
224
+ - geo_type: str; geo type name (county, msa, hrr, state) as in the CSV name
225
+ - signal_type: str; signal name as in the CSV name
226
+ - report: ValidationReport; report where results are added
227
+
228
+ Returns:
229
+ - False if recent_df is empty, else (recent_df, reference_api_df)
230
+ (after reference_api_df has been padded if necessary)
231
+ """
232
+ # recent_lookbehind: start from the check date and working backward in time,
233
+ # how many days at a time do we want to check for anomalies?
234
+ # Choosing 1 day checks just the daily data.
235
+ recent_lookbehind = timedelta (days = 1 )
236
+
237
+ # semirecent_lookbehind: starting from the check date and working backward
238
+ # in time, how many days do we use to form the reference statistics.
239
+ semirecent_lookbehind = timedelta (days = 7 )
240
+
241
+ recent_cutoff_date = checking_date - \
242
+ recent_lookbehind + timedelta (days = 1 )
243
+ recent_df = geo_sig_df .query (
244
+ 'time_value <= @checking_date & time_value >= @recent_cutoff_date' )
245
+
246
+ report .increment_total_checks ()
247
+
248
+ if recent_df .empty :
249
+ report .add_raised_error (
250
+ ValidationFailure ("check_missing_geo_sig_date_combo" ,
251
+ checking_date ,
252
+ geo_type ,
253
+ signal_type ,
254
+ "test data for a given checking date-geo type-signal type"
255
+ " combination is missing. Source data may be missing"
256
+ " for one or more dates" ))
257
+ return False
258
+
259
+ # Reference dataframe runs backwards from the recent_cutoff_date
260
+ #
261
+ # These variables are interpolated into the call to `api_df_or_error.query()`
262
+ # below but pylint doesn't recognize that.
263
+ # pylint: disable=unused-variable
264
+ reference_start_date = recent_cutoff_date - \
265
+ min (semirecent_lookbehind , self .params .max_check_lookbehind ) - \
266
+ timedelta (days = 1 )
267
+ if signal_type in self .params .smoothed_signals :
268
+ # Add an extra 7 days to the reference period.
269
+ reference_start_date = reference_start_date - \
270
+ timedelta (days = 7 )
271
+
272
+ reference_end_date = recent_cutoff_date - timedelta (days = 1 )
273
+ # pylint: enable=unused-variable
274
+
275
+ # Subset API data to relevant range of dates.
276
+ reference_api_df = api_df_or_error .query (
277
+ "time_value >= @reference_start_date & time_value <= @reference_end_date" )
278
+
279
+ report .increment_total_checks ()
280
+
281
+ if reference_api_df .empty :
282
+ report .add_raised_error (
283
+ ValidationFailure ("empty_reference_data" ,
284
+ checking_date ,
285
+ geo_type ,
286
+ signal_type ,
287
+ "reference data is empty; comparative checks could not "
288
+ "be performed" ))
289
+ return False
290
+
291
+ reference_api_df = self .pad_reference_api_df (
292
+ reference_api_df , geo_sig_df , reference_end_date )
293
+
294
+ return (geo_sig_df , reference_api_df )
295
+
296
+ def pad_reference_api_df (self , reference_api_df , geo_sig_df , reference_end_date ):
297
+ """Check if API data is missing, and supplement from test data.
298
+
299
+ Arguments:
300
+ - reference_api_df: API data within lookbehind range
301
+ - geo_sig_df: Test data
302
+ - reference_end_date: Supposed end date of reference data
303
+
304
+ Returns:
305
+ - reference_api_df: Supplemented version of original
306
+ """
307
+ reference_api_df_max_date = reference_api_df .time_value .max ()
308
+ if reference_api_df_max_date < reference_end_date :
309
+ # Querying geo_sig_df, only taking relevant rows
310
+ geo_sig_df_supplement = geo_sig_df .query (
311
+ 'time_value <= @reference_end_date & time_value > \
312
+ @reference_api_df_max_date' )[[
313
+ "geo_id" , "val" , "se" , "sample_size" , "time_value" ]]
314
+ # Matching time_value format
315
+ geo_sig_df_supplement ["time_value" ] = \
316
+ pd .to_datetime (geo_sig_df_supplement ["time_value" ],
317
+ format = "%Y-%m-%d %H:%M:%S" )
318
+ reference_api_df = pd .concat (
319
+ [reference_api_df , geo_sig_df_supplement ])
320
+ return reference_api_df
321
+
257
322
def check_max_date_vs_reference (self , df_to_test , df_to_reference , checking_date ,
258
323
geo_type , signal_type , report ):
259
324
"""
@@ -354,9 +419,7 @@ def check_positive_negative_spikes(self, source_df, api_frames, geo, sig, report
354
419
# check on the minimum value reported, sig_cut is a check
355
420
# on the ftstat or ststat reported (t-statistics) and sig_consec
356
421
# is a lower check for determining outliers that are next to each other.
357
- size_cut = 5
358
- sig_cut = 3
359
- sig_consec = 2.25
422
+ size_cut , sig_cut , sig_consec = 5 , 3 , 2.25
360
423
361
424
# Functions mapped to rows to determine outliers based on fstat and ststat values
362
425
@@ -428,7 +491,11 @@ def outlier_nearby(frame):
428
491
== upper_df ["geo_id" ]].copy ()
429
492
lower_index = list (filter (lambda x : x >= 0 , list (outliers .index - 1 )))
430
493
lower_df = outlier_df .iloc [lower_index , :].reset_index (drop = True )
431
- lower_compare = outliers_reset [- len (lower_index ):].reset_index (drop = True )
494
+ # If lower_df is empty, then make lower_compare empty too
495
+ if lower_df .empty :
496
+ lower_compare = outliers_reset [0 :0 ]
497
+ else :
498
+ lower_compare = outliers_reset [- len (lower_index ):].reset_index (drop = True )
432
499
sel_lower_df = lower_df [lower_compare ["geo_id" ]
433
500
== lower_df ["geo_id" ]].copy ()
434
501
@@ -449,7 +516,7 @@ def outlier_nearby(frame):
449
516
"time_value >= @source_frame_start & time_value <= @source_frame_end" )
450
517
451
518
if source_outliers .shape [0 ] > 0 :
452
- report .raised_errors . append (
519
+ report .add_raised_error (
453
520
ValidationFailure (
454
521
"check_positive_negative_spikes" ,
455
522
source_frame_end ,
0 commit comments