@@ -606,8 +606,6 @@ def check_rapid_change_num_rows(self, df_to_test, df_to_reference, checking_date
606
606
607
607
self .increment_total_checks ()
608
608
609
-
610
-
611
609
def check_positive_negative_spikes (self , source_df , api_frames , geo , sig ):
612
610
"""
613
611
Adapt Dan's corrections package to Python (only consider spikes) :
@@ -630,7 +628,8 @@ def check_positive_negative_spikes(self, source_df, api_frames, geo, sig):
630
628
# Combine all possible frames so that the rolling window calculations make sense.
631
629
source_frame_start = source_df ["time_value" ].min ()
632
630
source_frame_end = source_df ["time_value" ].max ()
633
- api_frames_end = min (api_frames ["time_value" ].max (), source_frame_start - timedelta (days = 1 ))
631
+ api_frames_end = min (api_frames ["time_value" ].max (
632
+ ), source_frame_start - timedelta (days = 1 ))
634
633
all_frames = pd .concat ([api_frames , source_df ]). \
635
634
drop_duplicates (subset = ["geo_id" , "time_value" ], keep = 'last' ). \
636
635
sort_values (by = ['time_value' ]).reset_index (drop = True )
@@ -640,51 +639,51 @@ def check_positive_negative_spikes(self, source_df, api_frames, geo, sig):
640
639
# check on the minimum value reported, sig_cut is a check
641
640
# on the ftstat or ststat reported (t-statistics) and sig_consec
642
641
# is a lower check for determining outliers that are next to each other.
643
- size_cut = 0
642
+ size_cut = 20
644
643
sig_cut = 3
645
644
sig_consec = 2.25
646
645
647
-
648
646
# Functions mapped to rows to determine outliers based on fstat and ststat values
647
+
649
648
def outlier_flag (frame ):
650
649
if (abs (frame ["val" ]) > size_cut ) and not (pd .isna (frame ["ststat" ])) \
651
- and (frame ["ststat" ] > sig_cut ):
650
+ and (frame ["ststat" ] > sig_cut ):
652
651
return 1
653
652
if (abs (frame ["val" ]) > size_cut ) and (pd .isna (frame ["ststat" ])) and \
654
- not (pd .isna (frame ["ftstat" ])) and (frame ["ftstat" ] > sig_cut ):
653
+ not (pd .isna (frame ["ftstat" ])) and (frame ["ftstat" ] > sig_cut ):
655
654
return 1
656
655
if (frame ["val" ] < - size_cut ) and not (pd .isna (frame ["ststat" ])) and \
657
- not pd .isna (frame ["ftstat" ]):
656
+ not pd .isna (frame ["ftstat" ]):
658
657
return 1
659
658
return 0
660
659
661
660
def outlier_nearby (frame ):
662
661
if (not pd .isna (frame ['ststat' ])) and (frame ['ststat' ] > sig_consec ):
663
662
return 1
664
- if pd .isna (frame ['ststat' ]) and (frame ['ftstat' ] > sig_consec ):
663
+ if pd .isna (frame ['ststat' ]) and (frame ['ftstat' ] > sig_consec ):
665
664
return 1
666
665
return 0
667
666
668
-
669
-
670
667
# Calculate ftstat and ststat values for the rolling windows, group fames by geo region
671
668
region_group = all_frames .groupby ("geo_id" )
672
669
window_size = 14
673
670
shift_val = 0
674
671
675
672
# Shift the window to match how R calculates rolling windows with even numbers
676
- if window_size % 2 == 0 :
673
+ if window_size % 2 == 0 :
677
674
shift_val = - 1
678
675
679
676
# Calculate the t-statistics for the two rolling windows (windows center and windows right)
680
677
all_full_frames = []
681
678
for _ , group in region_group :
682
- rolling_windows = group ["val" ].rolling (window_size , min_periods = window_size )
683
- center_windows = group ["val" ].rolling (window_size , min_periods = window_size , center = True )
679
+ rolling_windows = group ["val" ].rolling (
680
+ window_size , min_periods = window_size )
681
+ center_windows = group ["val" ].rolling (
682
+ window_size , min_periods = window_size , center = True )
684
683
fmedian = rolling_windows .median ()
685
684
smedian = center_windows .median ().shift (shift_val )
686
- fsd = rolling_windows .std () + 0.00001 # if std is 0
687
- ssd = center_windows .std ().shift (shift_val ) + 0.00001 # if std is 0
685
+ fsd = rolling_windows .std () + 0.00001 # if std is 0
686
+ ssd = center_windows .std ().shift (shift_val ) + 0.00001 # if std is 0
688
687
vals_modified_f = group ["val" ] - fmedian .fillna (0 )
689
688
vals_modified_s = group ["val" ] - smedian .fillna (0 )
690
689
ftstat = abs (vals_modified_f )/ fsd
@@ -697,44 +696,45 @@ def outlier_nearby(frame):
697
696
# Determine outliers in source frames only, only need the reference
698
697
# data from just before the start of the source data
699
698
# because lead and lag outlier calculations are only one day
700
- outlier_df = all_frames .query \
701
- ( 'time_value >= @api_frames_end & time_value <= @source_frame_end' )
699
+ outlier_df = all_frames .query (
700
+ 'time_value >= @api_frames_end & time_value <= @source_frame_end' )
702
701
outlier_df = outlier_df .sort_values (by = ['geo_id' , 'time_value' ]) \
703
702
.reset_index (drop = True ).copy ()
704
703
outlier_df ["flag" ] = 0
705
- outlier_df ["flag" ] = outlier_df .apply (outlier_flag , axis = 1 )
704
+ outlier_df ["flag" ] = outlier_df .apply (outlier_flag , axis = 1 )
706
705
outliers = outlier_df [outlier_df ["flag" ] == 1 ]
707
- outliers_reset = outliers .copy ().reset_index (drop = True )
706
+ outliers_reset = outliers .copy ().reset_index (drop = True )
708
707
709
708
# Find the lead outliers and the lag outliers. Check that the selected row
710
709
# is actually a leading and lagging row for given geo_id
711
- upper_index = list (filter (lambda x : x < outlier_df .shape [0 ], \
712
- list (outliers .index + 1 )))
710
+ upper_index = list (filter (lambda x : x < outlier_df .shape [0 ],
711
+ list (outliers .index + 1 )))
713
712
upper_df = outlier_df .iloc [upper_index , :].reset_index (drop = True )
714
- upper_compare = outliers_reset [:len (upper_index )]
715
- sel_upper_df = upper_df [upper_compare ["geo_id" ] == upper_df ["geo_id" ]].copy ()
713
+ upper_compare = outliers_reset [:len (upper_index )]
714
+ sel_upper_df = upper_df [upper_compare ["geo_id" ]
715
+ == upper_df ["geo_id" ]].copy ()
716
716
lower_index = list (filter (lambda x : x >= 0 , list (outliers .index - 1 )))
717
717
lower_df = outlier_df .iloc [lower_index , :].reset_index (drop = True )
718
- lower_compare = outliers_reset [- len (lower_index ):].reset_index (drop = True )
719
- sel_lower_df = lower_df [lower_compare ["geo_id" ] == lower_df ["geo_id" ]].copy ()
718
+ lower_compare = outliers_reset [- len (lower_index ) :].reset_index (drop = True )
719
+ sel_lower_df = lower_df [lower_compare ["geo_id" ]
720
+ == lower_df ["geo_id" ]].copy ()
720
721
721
722
sel_upper_df ["flag" ] = 0
722
723
sel_lower_df ["flag" ] = 0
723
724
724
- sel_upper_df ["flag" ] = sel_upper_df .apply (outlier_nearby , axis = 1 )
725
- sel_lower_df ["flag" ] = sel_lower_df .apply (outlier_nearby , axis = 1 )
725
+ sel_upper_df ["flag" ] = sel_upper_df .apply (outlier_nearby , axis = 1 )
726
+ sel_lower_df ["flag" ] = sel_lower_df .apply (outlier_nearby , axis = 1 )
726
727
727
728
upper_outliers = sel_upper_df [sel_upper_df ["flag" ] == 1 ]
728
729
lower_outliers = sel_lower_df [sel_lower_df ["flag" ] == 1 ]
729
730
730
731
all_outliers = pd .concat ([outliers , upper_outliers , lower_outliers ]). \
731
- sort_values (by = ['time_value' ,'geo_id' ]). \
732
+ sort_values (by = ['time_value' , 'geo_id' ]). \
732
733
drop_duplicates ().reset_index (drop = True )
733
734
734
-
735
735
# Identify outliers just in the source data
736
- source_outliers = all_outliers .query \
737
- ( "time_value >= @source_frame_start & time_value <= @source_frame_end" )
736
+ source_outliers = all_outliers .query (
737
+ "time_value >= @source_frame_start & time_value <= @source_frame_end" )
738
738
739
739
if source_outliers .shape [0 ] > 0 :
740
740
self .raised_errors .append (ValidationError (
@@ -744,8 +744,6 @@ def outlier_nearby(frame):
744
744
'Source dates with flagged ouliers based on the \
745
745
previous 14 days of data available' ))
746
746
747
-
748
-
749
747
def check_avg_val_vs_reference (self , df_to_test , df_to_reference , checking_date , geo_type ,
750
748
signal_type ):
751
749
"""
@@ -872,8 +870,6 @@ def validate(self, export_dir):
872
870
873
871
export_files = read_filenames (export_dir )
874
872
date_filter = make_date_filter (self .start_date , self .end_date )
875
-
876
-
877
873
878
874
# Make list of tuples of CSV names and regex match objects.
879
875
validate_files = [(f , m ) for (f , m ) in export_files if date_filter (m )]
@@ -919,11 +915,9 @@ def validate(self, export_dir):
919
915
date_list = [self .start_date + timedelta (days = days )
920
916
for days in range (self .span_length .days + 1 )]
921
917
922
-
923
918
# Get 14 days prior to the earliest list date
924
919
outlier_lookbehind = timedelta (days = 14 )
925
920
926
-
927
921
# Get all expected combinations of geo_type and signal.
928
922
geo_signal_combos = get_geo_signal_combos (self .data_source )
929
923
@@ -935,7 +929,6 @@ def validate(self, export_dir):
935
929
if self .test_mode :
936
930
kroc = 0
937
931
938
-
939
932
# Comparison checks
940
933
# Run checks for recent dates in each geo-sig combo vs semirecent (previous
941
934
# week) API data.
@@ -964,20 +957,19 @@ def validate(self, export_dir):
964
957
if geo_sig_api_df is None :
965
958
continue
966
959
967
-
968
-
969
960
# Outlier dataframe
970
- if (signal_type in ["confirmed_7dav_cumulative_num" , "confirmed_7dav_incidence_num" , \
971
- "confirmed_cumulative_num" , "confirmed_incidence_num" , "deaths_7dav_cumulative_num" , \
972
- "deaths_cumulative_num" ]):
961
+ if (signal_type in ["confirmed_7dav_cumulative_num" , "confirmed_7dav_incidence_num" ,
962
+ "confirmed_cumulative_num" , "confirmed_incidence_num" , "deaths_7dav_cumulative_num" ,
963
+ "deaths_cumulative_num" ]):
973
964
earliest_available_date = geo_sig_df ["time_value" ].min ()
974
965
source_df = geo_sig_df .query (
975
- 'time_value <= @date_list[-1] & time_value >= @date_list[0]' )
966
+ 'time_value <= @date_list[-1] & time_value >= @date_list[0]' )
976
967
outlier_start_date = earliest_available_date - outlier_lookbehind
977
968
outlier_end_date = earliest_available_date - timedelta (days = 1 )
978
- outlier_api_df = geo_sig_api_df .query \
979
- ('time_value <= @outlier_end_date & time_value >= @outlier_start_date' )
980
- self .check_positive_negative_spikes (source_df , outlier_api_df , geo_type , signal_type )
969
+ outlier_api_df = geo_sig_api_df .query (
970
+ 'time_value <= @outlier_end_date & time_value >= @outlier_start_date' )
971
+ self .check_positive_negative_spikes (
972
+ source_df , outlier_api_df , geo_type , signal_type )
981
973
982
974
# Check data from a group of dates against recent (previous 7 days,
983
975
# by default) data from the API.
@@ -1036,9 +1028,6 @@ def validate(self, export_dir):
1036
1028
if kroc == 2 :
1037
1029
break
1038
1030
1039
-
1040
-
1041
-
1042
1031
self .exit ()
1043
1032
1044
1033
def get_one_api_df (self , min_date , max_date ,
0 commit comments