|
| 1 | +import sys |
| 2 | +import re |
| 3 | +import pandas as pd |
| 4 | +import numpy as np |
| 5 | +from pathlib import Path |
| 6 | +from itertools import product |
| 7 | +from datetime import date, datetime, timedelta |
| 8 | +from .datafetcher import * |
| 9 | +import math |
| 10 | + |
| 11 | +negated_regex_dict = { |
| 12 | + 'county': '^(?!\d{5}).*$', |
| 13 | + 'hrr': '^(?!\d{1,3}).*$', |
| 14 | + 'msa': '^(?!\d{5}).*$', |
| 15 | + 'state': '^(?![A-Z]{2}).*$', |
| 16 | + 'national': '(?!usa).*$' |
| 17 | +} |
| 18 | + |
| 19 | +class ValidationError(Exception): |
| 20 | + def __init__(self, expression, message): |
| 21 | + self.expression = expression |
| 22 | + self.message = message |
| 23 | + |
| 24 | +def make_date_filter(start_date, end_date): |
| 25 | + start_code = int(start_date.strftime("%Y%m%d")) |
| 26 | + end_code = int(end_date.strftime("%Y%m%d")) |
| 27 | + def f(filename, match): |
| 28 | + if not match: return False |
| 29 | + code = int(match.groupdict()['date']) |
| 30 | + return code > start_code and code < end_code |
| 31 | + return f |
| 32 | + |
| 33 | +def validate_daily(df_to_test, nameformat, generation_date = date.today(), max_check_lookbehind = 7, sanity_check_rows_per_day = True, sanity_check_value_diffs = True, check_vs_working = True): |
| 34 | + |
| 35 | + # Perform some automated format and sanity checks of =df.to.test= |
| 36 | + if(type(max_check_lookbehind) != int | len(str(max_check_look_behind) != 1)): |
| 37 | + raise ValidationError(max_check_lookbehind, f"max_check_lookbehind ({max_check_lookbehind}) must be length 1, integer type") |
| 38 | + |
| 39 | + if( not isinstance(generation_date, datetime.date) or generation_date > date.today()): |
| 40 | + raise ValidationError(generation_date, f"generation.date ({generation.date}) must be a length 1 Date that is not in the future.") |
| 41 | + # example: 20200624_county_smoothed_nohh_cmnty_cli |
| 42 | + |
| 43 | + pattern_found = filename_regex.match(nameformat) |
| 44 | + if (not nameformat or not pattern_found): |
| 45 | + raise ValidationError(nameformat, 'nameformat ({nameformat}) not recognized') |
| 46 | + |
| 47 | +def check_bad_geo_id(df_to_test, geo_type): |
| 48 | + if geo_type not in negated_regex_dict: |
| 49 | + raise ValidationError(geo_type,"Unrecognized geo type") |
| 50 | + |
| 51 | + def find_all_unexpected_geo_ids(df_to_test, negated_regex): |
| 52 | + unexpected_geos = [ugeo[0] for ugeo in df_to_test['geo_id'].str.findall(negated_regex) if len(ugeo) > 0] |
| 53 | + if(len(unexpected_geos) > 0): |
| 54 | + raise ValidationError(unexpected_geos,"Non-conforming geo_ids exist!") |
| 55 | + |
| 56 | + find_all_unexpected_geo_ids(df_to_test, negated_regex_dict[geo_type]) |
| 57 | + |
| 58 | +def check_missing_dates(daily_filenames, sdate, edate): |
| 59 | + number_of_dates = edate - sdate + timedelta(days=1) |
| 60 | + date_seq = {sdate + timedelta(days=x) for x in range(number_of_dates.days)} |
| 61 | + unique_dates = set() |
| 62 | + unique_dates_obj = set() |
| 63 | + |
| 64 | + for daily_filename in daily_filenames: |
| 65 | + unique_dates.add(daily_filename[0:8]) |
| 66 | + for unique_date in unique_dates: |
| 67 | + newdate_obj = datetime.strptime(unique_date, '%Y%m%d') |
| 68 | + unique_dates_obj.add(newdate_obj) |
| 69 | + |
| 70 | + check_dateholes = date_seq.difference(unique_dates_obj) |
| 71 | + |
| 72 | + if check_dateholes: |
| 73 | + print("Missing dates are observed; if these dates are already in the API they would not be updated") |
| 74 | + print(check_dateholes) |
| 75 | + |
| 76 | +def check_bad_val(df_to_test): |
| 77 | + # if (not df_to_test[(df_to_test['val'] > 100)].empty): |
| 78 | + # print("val column can't have any cell greater than 100") |
| 79 | + # sys.exit() |
| 80 | + if (df_to_test.isnull().values.any()): |
| 81 | + raise ValidationError(None,"val column can't have any cell set to null") |
| 82 | + if (not df_to_test[(df_to_test['val'] < 0)].empty): |
| 83 | + raise ValidationError(None,"val column can't have any cell smaller than 0") |
| 84 | + |
| 85 | +def check_bad_se(df): |
| 86 | + if (df['se'].isnull().values.any()): |
| 87 | + raise ValidationError("se must not be NA") |
| 88 | + |
| 89 | + df.eval('se_upper_limit = (val * effective_sample_size + 50)/(effective_sample_size + 1)', inplace=True) |
| 90 | + |
| 91 | + df['se']= df['se'].round(3) |
| 92 | + df['se_upper_limit'] = df['se_upper_limit'].round(3) |
| 93 | + |
| 94 | + result = df.query('~((se > 0) & (se < 50) & (se <= se_upper_limit))') |
| 95 | + |
| 96 | + if not result.empty: |
| 97 | + raise ValidationError("se must be in (0,min(50,val*(1+eps))]") |
| 98 | + |
| 99 | +def check_bad_sample_size(df): |
| 100 | + if(df['sample_size'].isnull.values.any() | df['effective_sample_size'].isnull.values.any()): |
| 101 | + raise ValidationError("sample size can't be NA") |
| 102 | + |
| 103 | + qresult = df.query('(sample_size < 100) | (effective_sample_size < 100)') |
| 104 | + |
| 105 | + if not qresult.empty: |
| 106 | + raise ValidationError("sample size must be >= 100") |
| 107 | + |
| 108 | +def check_min_allowed_max_date(generation_date, max_date, max_weighted_date): |
| 109 | + if (max_weighted_date < generation_date - timedelta(days=4) |
| 110 | + or max_date < generation_date - timedelta(days=1)): |
| 111 | + raise ValidationError("latest date of generated file seems too long ago") |
| 112 | + |
| 113 | +def reldiff_by_min(x, y): |
| 114 | + return (x - y) / min(x,y) |
| 115 | + |
| 116 | +def check_rapid_change(checking_date, recent_df, recent_api_df, date_list, sig, geo): |
| 117 | + recent_rows_per_reporting_day = recent_df[recent_df['time_value'] == checking_date].shape[0] |
| 118 | + recent_api_rows_per_reporting_day = recent_api_df.shape[0] / len(date_list) |
| 119 | + |
| 120 | + if(abs(reldiff_by_min(recent_rows_per_reporting_day, recent_api_rows_per_reporting_day)) > 0.35): |
| 121 | + raise ValidationError((checking_date,sig,geo), "Number of rows per day (-with-any-rows) seems to have changed rapidly (latest vs recent window of data)") |
| 122 | + |
| 123 | +def check_avg_val_diffs(recent_df, recent_api_df, smooth_option): |
| 124 | + #print("recent_df dtypes", recent_df.dtypes) |
| 125 | + recent_df = recent_df.drop(columns=['geo_id']) |
| 126 | + mean_recent_df = recent_df[['val', 'se', 'sample_size']].mean() |
| 127 | + recent_api_df = recent_api_df.groupby(['geo_value'], as_index=False)[['val', 'se', 'sample_size']].mean() |
| 128 | + recent_api_df = recent_api_df.drop(columns=['geo_value']) |
| 129 | + |
| 130 | + mean_recent_api_df = recent_api_df.mean() |
| 131 | + |
| 132 | + mean_stddiff = ((mean_recent_df - mean_recent_api_df).mean() * 2) / (mean_recent_df.mean() + mean_recent_api_df.mean()) |
| 133 | + mean_stdabsdiff = ((mean_recent_df - mean_recent_api_df).abs().mean() * 2) / (mean_recent_df.mean() + mean_recent_api_df.mean()) |
| 134 | + #print("mean_stddiff", mean_stddiff) |
| 135 | + #print("mean_stdabsdiff", mean_stdabsdiff) |
| 136 | + #print("type(mean_stdabsdiff)",type(mean_stdabsdiff)) |
| 137 | + |
| 138 | + classes = ['mean.stddiff', 'val.mean.stddiff', 'mean.stdabsdiff'] |
| 139 | + raw_thresholds = pd.DataFrame([0.50, 0.30, 0.80], classes) |
| 140 | + |
| 141 | + smoothed_thresholds = raw_thresholds.apply(lambda x: x/(math.sqrt(7) * 1.5)) |
| 142 | + |
| 143 | + # Code reference from R code |
| 144 | + # changesum.by.variable.with.flags = changesum.by.variable %>>% |
| 145 | + # dplyr::mutate(mean.stddiff.high = abs(mean.stddiff) > thresholds[["mean.stddiff"]] | |
| 146 | + # variable=="val" & abs(mean.stddiff) > thresholds[["val.mean.stddiff"]], |
| 147 | + # mean.stdabsdiff.high = mean.stdabsdiff > thresholds[["mean.stdabsdiff"]]) %>>% |
| 148 | + # Todo - Check whats the purpose of variable=="val" in the above statement |
| 149 | + |
| 150 | + switcher = { |
| 151 | + 'raw': raw_thresholds, |
| 152 | + 'smoothed': smoothed_thresholds, |
| 153 | + } |
| 154 | + # Get the function from switcher dictionary |
| 155 | + thres = switcher.get(smooth_option, lambda: "Invalid smoothing option") |
| 156 | + |
| 157 | + #print(np.absolute(mean_stddiff) > thres.loc['mean.stddiff']) |
| 158 | + mean_stddiff_high = (np.absolute(mean_stddiff) > thres.loc['mean.stddiff']).bool() or (np.absolute(mean_stddiff) > thres.loc['val.mean.stddiff"']).bool() |
| 159 | + mean_stdabsdiff_high = (mean_stdabsdiff > thres.loc['mean.stdabsdiff']).bool() |
| 160 | + |
| 161 | + |
| 162 | + if mean_stddiff_high or mean_stdabsdiff_high: |
| 163 | + raise ValidationError('Average differences in variables by geoid between recent & semirecent data seem' \ |
| 164 | + + 'large --- either large increase tending toward one direction or large mean absolute' \ |
| 165 | + + 'difference, relative to average values of corresponding variables. For the former' \ |
| 166 | + + 'check, tolerances for `val` are more restrictive than those for other columns.') |
| 167 | + |
| 168 | +def validate(export_dir, start_date, end_date, max_check_lookbehind = timedelta(days=7), sanity_check_rows_per_day = True, sanity_check_value_diffs = True): |
| 169 | + |
| 170 | + export_files = read_filenames(export_dir) |
| 171 | + date_filter = make_date_filter(start_date, end_date) |
| 172 | + validate_files = [(f, m) for (f, m) in export_files if date_filter(f,m)] |
| 173 | + |
| 174 | + all_frames = [] |
| 175 | + |
| 176 | + # First, check file formats |
| 177 | + check_missing_dates(validate_files, start_date, end_date) |
| 178 | + for filename,match in validate_files: |
| 179 | + df = load_csv(filename) |
| 180 | + check_bad_geo_id(df, match.groupdict()['geo_type']) |
| 181 | + check_bad_val(df) |
| 182 | + check_bad_se(df) |
| 183 | + check_bad_sample_size(df) |
| 184 | + df['geo_type'] = match.groupdict()['geo_type'] |
| 185 | + df['date'] = match.groupdict()['date'] |
| 186 | + df['signal'] = match.groupdict()['signal'] |
| 187 | + all_frames.append(df) |
| 188 | + |
| 189 | + # Multi-indexed dataframe for a given (signal, geo_type) |
| 190 | + |
| 191 | + ## recent_lookbehind: start from the check date and working backward in time, |
| 192 | + ## how many days do we include in the window of date to check for anomalies? |
| 193 | + ## Choosing 1 day checks just the check data itself. |
| 194 | + recent_lookbehind = timedelta(days=1) |
| 195 | + |
| 196 | + ## semirecent_lookbehind: starting from the check date and working backward |
| 197 | + ## in time, how many days -- before subtracting out the "recent" days --- |
| 198 | + ## do we use to form the reference statistics? |
| 199 | + semirecent_lookbehind = timedelta(days=7) |
| 200 | + smooth_option_regex = re.compile(r'([^_]+)') |
| 201 | + |
| 202 | + kroc = 0 |
| 203 | + for recent_df, geo, sig in read_geo_sig_cmbo_files(geo_sig_cmbo, data_folder, filenames, date_slist): |
| 204 | + |
| 205 | + m = smooth_option_regex.match(sig) |
| 206 | + smooth_option = m.group(1) |
| 207 | + |
| 208 | + #recent_df.set_index("time_value", inplace = True) |
| 209 | + print("Printing recent_df scenes:", recent_df.shape) |
| 210 | + print(recent_df) |
| 211 | + for checking_date in date_list: |
| 212 | + #print(recent_df.loc[checking_date,:]) |
| 213 | + # -recent- dataframe run backwards from the checking_date |
| 214 | + recent_end_date = checking_date - timedelta(days=1) |
| 215 | + recent_begin_date = checking_date - max_check_lookbehind |
| 216 | + recent_api_df = covidcast.signal(DATA_SOURCE, sig, recent_begin_date, recent_end_date, geo) |
| 217 | + |
| 218 | + recent_api_df.rename(columns={'stderr': 'se', 'value': 'val'}, inplace = True) |
| 219 | + recent_api_df.drop(['direction', 'issue', 'lag'], axis=1, inplace = True) |
| 220 | + |
| 221 | + column_names = ["geo_value", "val", "se", "sample_size", "time_value"] |
| 222 | + |
| 223 | + recent_api_df = recent_api_df.reindex(columns=column_names) |
| 224 | + if (recent_df["se"].isnull().mean() > 0.5): |
| 225 | + print('Recent se values are >50% NA') |
| 226 | + |
| 227 | + if sanity_check_rows_per_day: |
| 228 | + check_rapid_change(checking_date, recent_df, recent_api_df, date_list, sig, geo) |
| 229 | + |
| 230 | + if sanity_check_value_diffs: |
| 231 | + check_avg_val_diffs(recent_df, recent_api_df, smooth_option) |
| 232 | + kroc += 1 |
| 233 | + if kroc == 2: |
| 234 | + break |
| 235 | + sys.exit() |
| 236 | + |
| 237 | + |
0 commit comments